From: Mikhal Pershin Date: Mon, 7 Dec 2015 10:19:00 +0000 (+0300) Subject: LU-3285 mdt: IO request handling in MDT X-Git-Tag: 2.10.56~64^2~17 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=2bcc5ad0ed6a440e15233b454191e7f66fcb1921 LU-3285 mdt: IO request handling in MDT Add methods to handle IO requests in MDT similar to OFD. Introduce MDS_INODEBITS_DOM bit for data on MDT, destinguish IO requests to MDT and OST and take appropriate lock in target code. Change-Id: I7feaa00c381f821510ca1343b042ed5f09050ac6 Signed-off-by: Mikhal Pershin Reviewed-on: https://review.whamcloud.com/28013 Reviewed-by: Jinshan Xiong Reviewed-by: Bobi Jam Tested-by: Jenkins Tested-by: Maloo --- diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 0d3ef96..fa9ad61 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -426,15 +426,12 @@ int tgt_sync(const struct lu_env *env, struct lu_target *tgt, int tgt_io_thread_init(struct ptlrpc_thread *thread); void tgt_io_thread_done(struct ptlrpc_thread *thread); +int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, + struct lustre_handle *lh, int mode, __u64 *flags); int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, __u64 start, __u64 end, struct lustre_handle *lh, int mode, __u64 *flags); void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode); -int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, - struct obd_ioobj *obj, struct niobuf_remote *nb, - struct lustre_handle *lh, enum ldlm_mode mode); -void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob, - struct lustre_handle *lh, enum ldlm_mode mode); int tgt_brw_read(struct tgt_session_info *tsi); int tgt_brw_write(struct tgt_session_info *tsi); int tgt_hpreq_handler(struct ptlrpc_request *req); diff --git a/lustre/include/lustre_osc.h b/lustre/include/lustre_osc.h index 48048af..d693db7 100644 --- a/lustre/include/lustre_osc.h +++ b/lustre/include/lustre_osc.h @@ -521,6 +521,9 @@ int osc_attr_update(const struct lu_env *env, struct cl_object *obj, int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj, struct ost_lvb *lvb); +/* osc_request.c */ +void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd); + /***************************************************************************** * * Accessors and type conversions. diff --git a/lustre/include/uapi/linux/lustre/lustre_idl.h b/lustre/include/uapi/linux/lustre/lustre_idl.h index 597bc36..8730351 100644 --- a/lustre/include/uapi/linux/lustre/lustre_idl.h +++ b/lustre/include/uapi/linux/lustre/lustre_idl.h @@ -107,7 +107,7 @@ #define MDC_REPLY_PORTAL 10 //#define MDC_BULK_PORTAL 11 #define MDS_REQUEST_PORTAL 12 -//#define MDS_REPLY_PORTAL 13 +#define MDS_IO_PORTAL 13 #define MDS_BULK_PORTAL 14 #define LDLM_CB_REQUEST_PORTAL 15 #define LDLM_CB_REPLY_PORTAL 16 @@ -845,8 +845,9 @@ struct ptlrpc_body_v2 { OBD_CONNECT_FLOCK_DEAD | \ OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | \ OBD_CONNECT_OPEN_BY_FID | \ - OBD_CONNECT_DIR_STRIPE | \ - OBD_CONNECT_BULK_MBITS | \ + OBD_CONNECT_DIR_STRIPE | OBD_CONNECT_GRANT | \ + OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_SRVLOCK | \ + OBD_CONNECT_BULK_MBITS | OBD_CONNECT_CKSUM | \ OBD_CONNECT_MULTIMODRPCS | \ OBD_CONNECT_SUBTREE | OBD_CONNECT_LARGE_ACL | \ OBD_CONNECT_FLAGS2) @@ -1602,8 +1603,9 @@ typedef enum { * will grant LOOKUP_LOCK. */ #define MDS_INODELOCK_PERM 0x000010 #define MDS_INODELOCK_XATTR 0x000020 /* extended attributes */ +#define MDS_INODELOCK_DOM 0x000040 /* Data for data-on-mdt files */ -#define MDS_INODELOCK_MAXSHIFT 5 +#define MDS_INODELOCK_MAXSHIFT 6 /* This FULL lock is useful to take on unlink sort of operations */ #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1) diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 87159a0..516c4f4 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -197,12 +197,12 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, } /* indicate MDT features supported by this client */ - data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH | - OBD_CONNECT_ATTRFID | - OBD_CONNECT_VERSION | OBD_CONNECT_BRW_SIZE | - OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | - OBD_CONNECT_CANCELSET | OBD_CONNECT_FID | - OBD_CONNECT_AT | OBD_CONNECT_LOV_V3 | + data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH | + OBD_CONNECT_ATTRFID | OBD_CONNECT_GRANT | + OBD_CONNECT_VERSION | OBD_CONNECT_BRW_SIZE | + OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | + OBD_CONNECT_CANCELSET | OBD_CONNECT_FID | + OBD_CONNECT_AT | OBD_CONNECT_LOV_V3 | OBD_CONNECT_VBR | OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH | OBD_CONNECT_EINPROGRESS | @@ -213,7 +213,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | OBD_CONNECT_OPEN_BY_FID | OBD_CONNECT_DIR_STRIPE | - OBD_CONNECT_BULK_MBITS | + OBD_CONNECT_BULK_MBITS | OBD_CONNECT_CKSUM | OBD_CONNECT_SUBTREE | OBD_CONNECT_FLAGS2 | OBD_CONNECT_MULTIMODRPCS; @@ -228,6 +228,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_LARGE_ACL; #endif + data->ocd_cksum_types = cksum_types_supported_client(); + if (OBD_FAIL_CHECK(OBD_FAIL_MDC_LIGHTWEIGHT)) /* flag mdc connection as lightweight, only used for test * purpose, use with care */ diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 718d6ff..cc5c7d0 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -56,6 +56,7 @@ #include #include #include +#include #include "mdc_internal.h" @@ -2354,7 +2355,15 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, LASSERT(imp->imp_obd == obd); switch (event) { + case IMP_EVENT_DISCON: { + struct client_obd *cli = &obd->u.cli; + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = 0; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); + break; + } case IMP_EVENT_INACTIVE: { struct client_obd *cli = &obd->u.cli; /* @@ -2382,10 +2391,15 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, if (rc == 0) rc = mdc_kuc_reregister(imp); break; - case IMP_EVENT_OCD: + case IMP_EVENT_OCD: { + struct obd_connect_data *ocd = &imp->imp_connect_data; + + if (OCD_HAS_FLAG(ocd, GRANT)) + osc_init_grant(&obd->u.cli, ocd); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD); break; - case IMP_EVENT_DISCON: + } case IMP_EVENT_DEACTIVATE: case IMP_EVENT_ACTIVATE: break; diff --git a/lustre/mdt/Makefile.in b/lustre/mdt/Makefile.in index 559b9f6..0165cfe 100644 --- a/lustre/mdt/Makefile.in +++ b/lustre/mdt/Makefile.in @@ -1,7 +1,7 @@ MODULES := mdt mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o mdt-objs += mdt_open.o mdt_identity.o mdt_lproc.o mdt_fs.o -mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o +mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o mdt_io.o mdt-objs += mdt_hsm_cdt_actions.o mdt-objs += mdt_hsm_cdt_requests.o mdt-objs += mdt_hsm_cdt_client.o diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index dfaa281..4916d40 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -61,7 +61,7 @@ #include #include #include - +#include #include #include "mdt_internal.h" @@ -665,17 +665,21 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, /* if no object is allocated on osts, the size on mds is valid. * b=22272 */ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL && - mdt_hsm_is_released(ma->ma_lmm)) { - /* A released file stores its size on MDS. */ - /* But return 1 block for released file, unless tools like tar - * will consider it fully sparse. (LU-3864) - */ - if (unlikely(b->mbo_size == 0)) - b->mbo_blocks = 0; - else - b->mbo_blocks = 1; - b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) { + if (mdt_hsm_is_released(ma->ma_lmm)) { + /* A released file stores its size on MDS. */ + /* But return 1 block for released file, unless tools + * like tar will consider it fully sparse. (LU-3864) + */ + if (unlikely(b->mbo_size == 0)) + b->mbo_blocks = 0; + else + b->mbo_blocks = 1; + b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } else if (lov_pattern(ma->ma_lmm->lmm_pattern) == + LOV_PATTERN_MDT) { + b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } } if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE)) @@ -2082,20 +2086,21 @@ static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) } /* this should sync this object */ -static int mdt_object_sync(struct mdt_thread_info *info) +static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp, + struct mdt_object *mo) { - struct md_object *next; int rc; + ENTRY; - if (!mdt_object_exists(info->mti_object)) { + if (!mdt_object_exists(mo)) { CWARN("%s: non existing object "DFID": rc = %d\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(info->mti_object)), -ESTALE); + exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)), + -ESTALE); RETURN(-ESTALE); } - next = mdt_object_child(info->mti_object); - rc = mo_object_sync(info->mti_env, next); + + rc = mo_object_sync(env, mdt_object_child(mo)); RETURN(rc); } @@ -2118,7 +2123,8 @@ static int mdt_sync(struct tgt_session_info *tsi) struct mdt_thread_info *info = tsi2mdt_info(tsi); /* sync an object */ - rc = mdt_object_sync(info); + rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, + info->mti_object); if (rc == 0) { const struct lu_fid *fid; struct lu_attr *la = &info->mti_attr.ma_attr; @@ -2142,6 +2148,54 @@ static int mdt_sync(struct tgt_session_info *tsi) RETURN(rc); } +static int mdt_data_sync(struct tgt_session_info *tsi) +{ + struct mdt_thread_info *info; + struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp); + struct ost_body *body = tsi->tsi_ost_body; + struct ost_body *repbody; + struct mdt_object *mo = NULL; + struct md_attr *ma; + int rc = 0; + + ENTRY; + + repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY); + + /* if no fid is specified then do nothing, + * device sync is done via MDS_SYNC */ + if (fid_is_zero(&tsi->tsi_fid)) + RETURN(0); + + mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid); + if (IS_ERR(mo)) + RETURN(PTR_ERR(mo)); + + rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo); + if (rc) + GOTO(put, rc); + + repbody->oa.o_oi = body->oa.o_oi; + repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + + info = tsi2mdt_info(tsi); + ma = &info->mti_attr; + ma->ma_need = MA_INODE; + ma->ma_valid = 0; + rc = mdt_attr_get_complex(info, mo, ma); + if (rc == 0) + obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS); + else + rc = 0; + mdt_thread_info_fini(info); + + EXIT; +put: + if (mo != NULL) + mdt_object_put(tsi->tsi_env, mo); + return rc; +} + /* * Handle quota control requests to consult current usage/limit, but also * to configure quota enforcement @@ -2865,8 +2919,8 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, * \param mode lock mode * \param decref force immediate lock releasing */ -static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, - enum ldlm_mode mode, int decref) +void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, + enum ldlm_mode mode, int decref) { ENTRY; @@ -4631,6 +4685,11 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi) return rc; } +#define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET +#define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET +#define OST_BRW_READ OST_READ +#define OST_BRW_WRITE OST_WRITE + static struct tgt_handler mdt_tgt_handlers[] = { TGT_RPC_HANDLER(MDS_FIRST_OPC, 0, MDS_CONNECT, mdt_tgt_connect, @@ -4671,6 +4730,14 @@ TGT_MDT_HDL(HABEO_CLAVIS | HABEO_CORPUS | HABEO_REFERO | MUTABOR, mdt_swap_layouts), }; +static struct tgt_handler mdt_io_ops[] = { +TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read), +TGT_OST_HDL(HABEO_CORPUS | MUTABOR, OST_BRW_WRITE, tgt_brw_write), +TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO | MUTABOR, + OST_PUNCH, mdt_punch_hdl), +TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_SYNC, mdt_data_sync), +}; + static struct tgt_handler mdt_sec_ctx_ops[] = { TGT_SEC_HDL_VAR(0, SEC_CTX_INIT, mdt_sec_ctx_handle), TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT,mdt_sec_ctx_handle), @@ -4732,7 +4799,11 @@ static struct tgt_opc_slice mdt_common_slice[] = { .tos_opc_end = LFSCK_LAST_OPC, .tos_hs = tgt_lfsck_handlers }, - + { + .tos_opc_start = OST_FIRST_OPC, + .tos_opc_end = OST_LAST_OPC, + .tos_hs = mdt_io_ops + }, { .tos_hs = NULL } @@ -5157,6 +5228,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, o->lo_ops = &mdt_obj_ops; spin_lock_init(&mo->mot_write_lock); mutex_init(&mo->mot_lov_mutex); + init_rwsem(&mo->mot_dom_sem); init_rwsem(&mo->mot_open_sem); atomic_set(&mo->mot_open_count, 0); RETURN(o); @@ -5325,9 +5397,10 @@ static int mdt_obd_set_info_async(const struct lu_env *env, * \retval -EPROTO \a data unexpectedly has zero obd_connect_data::ocd_brw_size * \retval -EBADE client and server feature requirements are incompatible */ -static int mdt_connect_internal(struct obd_export *exp, +static int mdt_connect_internal(const struct lu_env *env, + struct obd_export *exp, struct mdt_device *mdt, - struct obd_connect_data *data) + struct obd_connect_data *data, bool reconnect) { LASSERT(data != NULL); @@ -5373,6 +5446,10 @@ static int mdt_connect_internal(struct obd_export *exp, } } + if (OCD_HAS_FLAG(data, GRANT)) + data->ocd_grant = mdt_grant_connect(env, exp, data->ocd_grant, + !reconnect); + /* NB: Disregard the rule against updating * exp_connect_data.ocd_connect_flags in this case, since * tgt_client_new() needs to know if this is a lightweight @@ -5416,6 +5493,32 @@ static int mdt_connect_internal(struct obd_export *exp, spin_unlock(&exp->exp_lock); } + if (OCD_HAS_FLAG(data, CKSUM)) { + __u32 cksum_types = data->ocd_cksum_types; + + /* The client set in ocd_cksum_types the checksum types it + * supports. We have to mask off the algorithms that we don't + * support */ + data->ocd_cksum_types &= cksum_types_supported_server(); + + if (unlikely(data->ocd_cksum_types == 0)) { + CERROR("%s: Connect with checksum support but no " + "ocd_cksum_types is set\n", + exp->exp_obd->obd_name); + RETURN(-EPROTO); + } + + CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return " + "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp), + cksum_types, data->ocd_cksum_types); + } else { + /* This client does not support OBD_CONNECT_CKSUM + * fall back to CRC32 */ + CDEBUG(D_RPCTRACE, "%s: cli %s does not support " + "OBD_CONNECT_CKSUM, CRC32 will be used\n", + exp->exp_obd->obd_name, obd_export_nid2str(exp)); + } + return 0; } @@ -5619,7 +5722,7 @@ static int mdt_obd_connect(const struct lu_env *env, if (rc != 0 && rc != -EEXIST) GOTO(out, rc); - rc = mdt_connect_internal(lexp, mdt, data); + rc = mdt_connect_internal(env, lexp, mdt, data, false); if (rc == 0) { struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd; @@ -5665,7 +5768,8 @@ static int mdt_obd_reconnect(const struct lu_env *env, if (rc != 0 && rc != -EEXIST) RETURN(rc); - rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data); + rc = mdt_connect_internal(env, exp, mdt_dev(obd->obd_lu_dev), data, + true); if (rc == 0) mdt_export_stats_init(obd, exp, localdata); else @@ -6292,6 +6396,9 @@ static struct obd_ops mdt_obd_device_ops = { .o_destroy_export = mdt_destroy_export, .o_iocontrol = mdt_iocontrol, .o_postrecov = mdt_obd_postrecov, + /* Data-on-MDT IO methods */ + .o_preprw = mdt_obd_preprw, + .o_commitrw = mdt_obd_commitrw, }; static struct lu_device* mdt_device_fini(const struct lu_env *env, diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 23c993c..bb8e2df 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -272,6 +272,8 @@ struct mdt_object { spinlock_t mot_write_lock; /* Lock to protect create_data */ struct mutex mot_lov_mutex; + /* lock to protect read/write stages for Data-on-MDT files */ + struct rw_semaphore mot_dom_sem; /* Lock to protect lease open. * Lease open acquires write lock; normal open acquires read lock */ struct rw_semaphore mot_open_sem; @@ -643,6 +645,8 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *mo, void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *mo, struct mdt_lock_handle *lh, int decref); +void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, + enum ldlm_mode mode, int decref); struct mdt_object *mdt_object_new(const struct lu_env *env, struct mdt_device *, @@ -1074,9 +1078,12 @@ enum { LPROC_MDT_SETXATTR, LPROC_MDT_STATFS, LPROC_MDT_SYNC, - LPROC_MDT_SAMEDIR_RENAME, - LPROC_MDT_CROSSDIR_RENAME, - LPROC_MDT_LAST, + LPROC_MDT_SAMEDIR_RENAME, + LPROC_MDT_CROSSDIR_RENAME, + LPROC_MDT_IO_READ, + LPROC_MDT_IO_WRITE, + LPROC_MDT_IO_PUNCH, + LPROC_MDT_LAST, }; void mdt_counter_incr(struct ptlrpc_request *req, int opcode); void mdt_stats_counter_init(struct lprocfs_stats *stats); @@ -1117,4 +1124,24 @@ static inline char *mdt_req_get_jobid(struct ptlrpc_request *req) return jobid; } +/* MDT IO */ + +#define VALID_FLAGS (LA_TYPE | LA_MODE | LA_SIZE | LA_BLOCKS | \ + LA_BLKSIZE | LA_ATIME | LA_MTIME | LA_CTIME) + +int mdt_obd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int *nr_local, + struct niobuf_local *lnb); + +int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int npages, + struct niobuf_local *lnb, int old_rc); +int mdt_punch_hdl(struct tgt_session_info *tsi); + +/* grants */ +long mdt_grant_connect(const struct lu_env *env, struct obd_export *exp, + u64 want, bool conservative); + #endif /* _MDT_INTERNAL_H */ diff --git a/lustre/mdt/mdt_io.c b/lustre/mdt/mdt_io.c new file mode 100644 index 0000000..954713e --- /dev/null +++ b/lustre/mdt/mdt_io.c @@ -0,0 +1,614 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright (c) 2012, 2017 Intel Corporation. + */ +/* + * lustre/mdt/mdt_io.c + * + * Author: Mikhail Pershin + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include "mdt_internal.h" + +/* --------------- MDT grant code ---------------- */ + +long mdt_grant_connect(const struct lu_env *env, + struct obd_export *exp, + u64 want, bool conservative) +{ + struct mdt_device *mdt = mdt_exp2dev(exp); + u64 left; + long grant; + + ENTRY; + + dt_statfs(env, mdt->mdt_bottom, &mdt->mdt_osfs); + + left = (mdt->mdt_osfs.os_bavail * mdt->mdt_osfs.os_bsize) / 2; + + grant = left; + + CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: %llu left: %llu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, + exp, grant, want, left); + + return grant; +} + +void mdt_grant_prepare_write(const struct lu_env *env, + struct obd_export *exp, struct obdo *oa, + struct niobuf_remote *rnb, int niocount) +{ + struct mdt_device *mdt = mdt_exp2dev(exp); + u64 left; + + ENTRY; + + left = (mdt->mdt_osfs.os_bavail * mdt->mdt_osfs.os_bsize) / 2; + + /* grant more space back to the client if possible */ + oa->o_grant = left; +} +/* ---------------- end of MDT grant code ---------------- */ + +/* functions below are stubs for now, they will be implemented with + * grant support on MDT */ +static inline void mdt_io_counter_incr(struct obd_export *exp, int opcode, + char *jobid, long amount) +{ + return; +} + +void mdt_grant_prepare_read(const struct lu_env *env, + struct obd_export *exp, struct obdo *oa) +{ + return; +} + +void mdt_grant_commit(struct obd_export *exp, unsigned long pending, + int rc) +{ + return; + +} + +static inline void mdt_dom_read_lock(struct mdt_object *mo) +{ + down_read(&mo->mot_dom_sem); +} + +static inline void mdt_dom_read_unlock(struct mdt_object *mo) +{ + up_read(&mo->mot_dom_sem); +} + +static inline void mdt_dom_write_lock(struct mdt_object *mo) +{ + down_write(&mo->mot_dom_sem); +} + +static inline void mdt_dom_write_unlock(struct mdt_object *mo) +{ + up_write(&mo->mot_dom_sem); +} + +static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp, + struct mdt_device *mdt, struct mdt_object *mo, + struct lu_attr *la, int niocount, + struct niobuf_remote *rnb, int *nr_local, + struct niobuf_local *lnb, char *jobid) +{ + struct dt_object *dob; + int i, j, rc, tot_bytes = 0; + + ENTRY; + + mdt_dom_read_lock(mo); + if (!mdt_object_exists(mo)) + GOTO(unlock, rc = -ENOENT); + + dob = mdt_obj2dt(mo); + /* parse remote buffers to local buffers and prepare the latter */ + *nr_local = 0; + for (i = 0, j = 0; i < niocount; i++) { + rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 0); + if (unlikely(rc < 0)) + GOTO(buf_put, rc); + /* correct index for local buffers to continue with */ + j += rc; + *nr_local += rc; + tot_bytes += rnb[i].rnb_len; + } + + rc = dt_attr_get(env, dob, la); + if (unlikely(rc)) + GOTO(buf_put, rc); + + rc = dt_read_prep(env, dob, lnb, *nr_local); + if (unlikely(rc)) + GOTO(buf_put, rc); + + mdt_io_counter_incr(exp, LPROC_MDT_IO_READ, jobid, tot_bytes); + RETURN(0); +buf_put: + dt_bufs_put(env, dob, lnb, *nr_local); +unlock: + mdt_dom_read_unlock(mo); + return rc; +} + +static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp, + struct mdt_device *mdt, struct mdt_object *mo, + struct lu_attr *la, struct obdo *oa, + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int *nr_local, + struct niobuf_local *lnb, char *jobid) +{ + struct dt_object *dob; + int i, j, k, rc = 0, tot_bytes = 0; + + ENTRY; + + /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some + * space back if possible */ + mdt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt); + + mdt_dom_read_lock(mo); + if (!mdt_object_exists(mo)) { + CDEBUG(D_ERROR, "%s: BRW to missing obj "DFID"\n", + exp->exp_obd->obd_name, PFID(mdt_object_fid(mo))); + GOTO(unlock, rc = -ENOENT); + } + + dob = mdt_obj2dt(mo); + /* parse remote buffers to local buffers and prepare the latter */ + *nr_local = 0; + for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) { + rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 1); + if (unlikely(rc < 0)) + GOTO(err, rc); + /* correct index for local buffers to continue with */ + for (k = 0; k < rc; k++) + lnb[j+k].lnb_flags = rnb[i].rnb_flags; + j += rc; + *nr_local += rc; + tot_bytes += rnb[i].rnb_len; + } + + rc = dt_write_prep(env, dob, lnb, *nr_local); + if (likely(rc)) + GOTO(err, rc); + + mdt_io_counter_incr(exp, LPROC_MDT_IO_WRITE, jobid, tot_bytes); + RETURN(0); +err: + dt_bufs_put(env, dob, lnb, *nr_local); +unlock: + mdt_dom_read_unlock(mo); + /* tgt_grant_prepare_write() was called, so we must commit */ + mdt_grant_commit(exp, oa->o_grant_used, rc); + /* let's still process incoming grant information packed in the oa, + * but without enforcing grant since we won't proceed with the write. + * Just like a read request actually. */ + mdt_grant_prepare_read(env, exp, oa); + return rc; +} + +int mdt_obd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int *nr_local, + struct niobuf_local *lnb) +{ + struct tgt_session_info *tsi = tgt_ses_info(env); + struct mdt_thread_info *info = tsi2mdt_info(tsi); + struct lu_attr *la = &info->mti_attr.ma_attr; + struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev); + struct mdt_object *mo; + char *jobid; + int rc = 0; + + /* The default value PTLRPC_MAX_BRW_PAGES is set in tgt_brw_write() + * but for MDT it is different, correct it here. */ + if (*nr_local > MD_MAX_BRW_PAGES) + *nr_local = MD_MAX_BRW_PAGES; + + jobid = tsi->tsi_jobid; + + if (!oa || objcount != 1 || obj->ioo_bufcnt == 0) { + CERROR("%s: bad parameters %p/%i/%i\n", + exp->exp_obd->obd_name, oa, objcount, obj->ioo_bufcnt); + rc = -EPROTO; + } + + mo = mdt_object_find(env, mdt, &tsi->tsi_fid); + if (IS_ERR(mo)) + GOTO(out, rc = PTR_ERR(mo)); + + LASSERT(info->mti_object == NULL); + info->mti_object = mo; + + if (cmd == OBD_BRW_WRITE) { + la_from_obdo(la, oa, OBD_MD_FLGETATTR); + rc = mdt_preprw_write(env, exp, mdt, mo, la, oa, + objcount, obj, rnb, nr_local, lnb, + jobid); + } else if (cmd == OBD_BRW_READ) { + mdt_grant_prepare_read(env, exp, oa); + rc = mdt_preprw_read(env, exp, mdt, mo, la, + obj->ioo_bufcnt, rnb, nr_local, lnb, + jobid); + obdo_from_la(oa, la, LA_ATIME); + } else { + CERROR("%s: wrong cmd %d received!\n", + exp->exp_obd->obd_name, cmd); + rc = -EPROTO; + } + if (rc) { + lu_object_put(env, &mo->mot_obj); + info->mti_object = NULL; + } +out: + RETURN(rc); +} + +static int mdt_commitrw_read(const struct lu_env *env, struct mdt_device *mdt, + struct mdt_object *mo, int objcount, int niocount, + struct niobuf_local *lnb) +{ + struct dt_object *dob; + int rc = 0; + + ENTRY; + + LASSERT(niocount > 0); + + dob = mdt_obj2dt(mo); + + dt_bufs_put(env, dob, lnb, niocount); + + mdt_dom_read_unlock(mo); + RETURN(rc); +} + +static int mdt_commitrw_write(const struct lu_env *env, struct obd_export *exp, + struct mdt_device *mdt, struct mdt_object *mo, + struct lu_attr *la, int objcount, int niocount, + struct niobuf_local *lnb, unsigned long granted, + int old_rc) +{ + struct dt_device *dt = mdt->mdt_bottom; + struct dt_object *dob; + struct thandle *th; + int rc = 0; + int retries = 0; + int i; + + ENTRY; + + dob = mdt_obj2dt(mo); + + if (old_rc) + GOTO(out, rc = old_rc); + + la->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME; +retry: + if (!dt_object_exists(dob)) + GOTO(out, rc = -ENOENT); + + th = dt_trans_create(env, dt); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + for (i = 0; i < niocount; i++) { + if (!(lnb[i].lnb_flags & OBD_BRW_ASYNC)) { + th->th_sync = 1; + break; + } + } + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET)) + GOTO(out_stop, rc = -EINPROGRESS); + + rc = dt_declare_write_commit(env, dob, lnb, niocount, th); + if (rc) + GOTO(out_stop, rc); + + if (la->la_valid) { + /* update [mac]time if needed */ + rc = dt_declare_attr_set(env, dob, la, th); + if (rc) + GOTO(out_stop, rc); + } + + rc = dt_trans_start(env, dt, th); + if (rc) + GOTO(out_stop, rc); + + dt_write_lock(env, dob, 0); + rc = dt_write_commit(env, dob, lnb, niocount, th); + if (rc) + GOTO(unlock, rc); + + if (la->la_valid) { + rc = dt_attr_set(env, dob, la, th); + if (rc) + GOTO(unlock, rc); + } + /* get attr to return */ + rc = dt_attr_get(env, dob, la); +unlock: + dt_write_unlock(env, dob); + +out_stop: + /* Force commit to make the just-deleted blocks + * reusable. LU-456 */ + if (rc == -ENOSPC) + th->th_sync = 1; + + th->th_result = rc; + dt_trans_stop(env, dt, th); + if (rc == -ENOSPC && retries++ < 3) { + CDEBUG(D_INODE, "retry after force commit, retries:%d\n", + retries); + goto retry; + } + +out: + dt_bufs_put(env, dob, lnb, niocount); + mdt_dom_read_unlock(mo); + mdt_grant_commit(exp, granted, old_rc); + RETURN(rc); +} + +int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int npages, + struct niobuf_local *lnb, int old_rc) +{ + struct mdt_thread_info *info = mdt_th_info(env); + struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev); + struct mdt_object *mo = info->mti_object; + struct lu_attr *la = &info->mti_attr.ma_attr; + __u64 valid; + int rc = 0; + + if (npages == 0) { + CERROR("%s: no pages to commit\n", + exp->exp_obd->obd_name); + rc = -EPROTO; + } + + LASSERT(mo); + + if (cmd == OBD_BRW_WRITE) { + /* Don't update timestamps if this write is older than a + * setattr which modifies the timestamps. b=10150 */ + + /* XXX when we start having persistent reservations this needs + * to be changed to ofd_fmd_get() to create the fmd if it + * doesn't already exist so we can store the reservation handle + * there. */ + valid = OBD_MD_FLUID | OBD_MD_FLGID; + valid |= OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME; + + la_from_obdo(la, oa, valid); + + rc = mdt_commitrw_write(env, exp, mdt, mo, la, objcount, + npages, lnb, oa->o_grant_used, old_rc); + if (rc == 0) + obdo_from_la(oa, la, VALID_FLAGS | LA_GID | LA_UID); + else + obdo_from_la(oa, la, LA_GID | LA_UID); + + /* don't report overquota flag if we failed before reaching + * commit */ + if (old_rc == 0 && (rc == 0 || rc == -EDQUOT)) { + /* return the overquota flags to client */ + if (lnb[0].lnb_flags & OBD_BRW_OVER_USRQUOTA) { + if (oa->o_valid & OBD_MD_FLFLAGS) + oa->o_flags |= OBD_FL_NO_USRQUOTA; + else + oa->o_flags = OBD_FL_NO_USRQUOTA; + } + + if (lnb[0].lnb_flags & OBD_BRW_OVER_GRPQUOTA) { + if (oa->o_valid & OBD_MD_FLFLAGS) + oa->o_flags |= OBD_FL_NO_GRPQUOTA; + else + oa->o_flags = OBD_FL_NO_GRPQUOTA; + } + + oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLUSRQUOTA | + OBD_MD_FLGRPQUOTA; + } + } else if (cmd == OBD_BRW_READ) { + rc = mdt_commitrw_read(env, mdt, mo, objcount, npages, lnb); + if (old_rc) + rc = old_rc; + } else { + rc = -EPROTO; + } + /* this put is pair to object_get in ofd_preprw_write */ + mdt_thread_info_fini(info); + RETURN(rc); +} + +int mdt_object_punch(const struct lu_env *env, struct dt_device *dt, + struct dt_object *dob, __u64 start, __u64 end, + struct lu_attr *la) +{ + struct thandle *th; + int rc; + + ENTRY; + + /* we support truncate, not punch yet */ + LASSERT(end == OBD_OBJECT_EOF); + + if (!dt_object_exists(dob)) + RETURN(-ENOENT); + + th = dt_trans_create(env, dt); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = dt_declare_attr_set(env, dob, la, th); + if (rc) + GOTO(stop, rc); + + rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th); + if (rc) + GOTO(stop, rc); + + tgt_vbr_obj_set(env, dob); + rc = dt_trans_start(env, dt, th); + if (rc) + GOTO(stop, rc); + + dt_write_lock(env, dob, 0); + rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th); + if (rc) + GOTO(unlock, rc); + rc = dt_attr_set(env, dob, la, th); + if (rc) + GOTO(unlock, rc); +unlock: + dt_write_unlock(env, dob); +stop: + th->th_result = rc; + dt_trans_stop(env, dt, th); + RETURN(rc); +} + +int mdt_punch_hdl(struct tgt_session_info *tsi) +{ + const struct obdo *oa = &tsi->tsi_ost_body->oa; + struct ost_body *repbody; + struct mdt_thread_info *info; + struct lu_attr *la; + struct ldlm_namespace *ns = tsi->tsi_tgt->lut_obd->obd_namespace; + struct obd_export *exp = tsi->tsi_exp; + struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev); + struct mdt_object *mo; + struct dt_object *dob; + __u64 flags = 0; + struct lustre_handle lh = { 0, }; + __u64 start, end; + int rc; + bool srvlock; + + ENTRY; + + /* check that we do support OBD_CONNECT_TRUNCLOCK. */ + CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK); + + if ((oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) != + (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) + RETURN(err_serious(-EPROTO)); + + repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY); + if (repbody == NULL) + RETURN(err_serious(-ENOMEM)); + + /* punch start,end are passed in o_size,o_blocks throught wire */ + start = oa->o_size; + end = oa->o_blocks; + + if (end != OBD_OBJECT_EOF) /* Only truncate is supported */ + RETURN(-EPROTO); + + info = tsi2mdt_info(tsi); + la = &info->mti_attr.ma_attr; + /* standard truncate optimization: if file body is completely + * destroyed, don't send data back to the server. */ + if (start == 0) + flags |= LDLM_FL_AST_DISCARD_DATA; + + repbody->oa.o_oi = oa->o_oi; + repbody->oa.o_valid = OBD_MD_FLID; + + srvlock = (exp_connect_flags(exp) & OBD_CONNECT_SRVLOCK) && + oa->o_valid & OBD_MD_FLFLAGS && + oa->o_flags & OBD_FL_SRVLOCK; + + if (srvlock) { + rc = tgt_mdt_data_lock(ns, &tsi->tsi_resid, &lh, LCK_PW, + &flags); + if (rc != 0) + GOTO(out, rc); + } + + CDEBUG(D_INODE, "calling punch for object "DFID", valid = %#llx" + ", start = %lld, end = %lld\n", PFID(&tsi->tsi_fid), + oa->o_valid, start, end); + + mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid); + if (IS_ERR(mo)) + GOTO(out_unlock, rc = PTR_ERR(mo)); + + mdt_dom_write_lock(mo); + if (!mdt_object_exists(mo)) + GOTO(out_put, rc = -ENOENT); + dob = mdt_obj2dt(mo); + + la_from_obdo(la, oa, OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME); + la->la_size = start; + la->la_valid |= LA_SIZE; + + rc = mdt_object_punch(tsi->tsi_env, mdt->mdt_bottom, dob, + start, end, la); + mdt_dom_write_unlock(mo); + if (rc) + GOTO(out_put, rc); + + mdt_io_counter_incr(tsi->tsi_exp, LPROC_MDT_IO_PUNCH, + tsi->tsi_jobid, 1); + EXIT; +out_put: + lu_object_put(tsi->tsi_env, &mo->mot_obj); +out_unlock: + if (srvlock) + mdt_save_lock(info, &lh, LCK_PW, rc); +out: + mdt_thread_info_fini(info); + if (rc == 0) { + struct ldlm_resource *res; + + /* we do not call this before to avoid lu_object_find() in + * ->lvbo_update() holding another reference on the object. + * otherwise concurrent destroy can make the object unavailable + * for 2nd lu_object_find() waiting for the first reference + * to go... deadlock! */ + res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid, + LDLM_IBITS, 0); + if (!IS_ERR(res)) { + ldlm_res_lvbo_update(res, NULL, 0); + ldlm_resource_putref(res); + } + } + return rc; +} + diff --git a/lustre/mdt/mdt_lproc.c b/lustre/mdt/mdt_lproc.c index d0b37fb..c60742e 100644 --- a/lustre/mdt/mdt_lproc.c +++ b/lustre/mdt/mdt_lproc.c @@ -894,6 +894,8 @@ void mdt_counter_incr(struct ptlrpc_request *req, int opcode) void mdt_stats_counter_init(struct lprocfs_stats *stats) { + LASSERT(stats && stats->ls_num >= LPROC_MDT_LAST); + lprocfs_counter_init(stats, LPROC_MDT_OPEN, 0, "open", "reqs"); lprocfs_counter_init(stats, LPROC_MDT_CLOSE, 0, "close", "reqs"); lprocfs_counter_init(stats, LPROC_MDT_MKNOD, 0, "mknod", "reqs"); @@ -912,6 +914,11 @@ void mdt_stats_counter_init(struct lprocfs_stats *stats) "samedir_rename", "reqs"); lprocfs_counter_init(stats, LPROC_MDT_CROSSDIR_RENAME, 0, "crossdir_rename", "reqs"); + lprocfs_counter_init(stats, LPROC_MDT_IO_READ, + LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes"); + lprocfs_counter_init(stats, LPROC_MDT_IO_WRITE, + LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes"); + lprocfs_counter_init(stats, LPROC_MDT_IO_PUNCH, 0, "punch", "reqs"); } int mdt_procfs_init(struct mdt_device *mdt, const char *name) diff --git a/lustre/mdt/mdt_mds.c b/lustre/mdt/mdt_mds.c index a06e102..078051a 100644 --- a/lustre/mdt/mdt_mds.c +++ b/lustre/mdt/mdt_mds.c @@ -64,6 +64,7 @@ struct mds_device { struct ptlrpc_service *mds_mdsc_service; struct ptlrpc_service *mds_mdss_service; struct ptlrpc_service *mds_fld_service; + struct ptlrpc_service *mds_io_service; struct mutex mds_health_mutex; struct kset *mds_kset; }; @@ -75,6 +76,10 @@ static unsigned long mds_num_threads; module_param(mds_num_threads, ulong, 0444); MODULE_PARM_DESC(mds_num_threads, "number of MDS service threads to start"); +int mds_max_io_threads = 512; +module_param(mds_max_io_threads, int, 0444); +MODULE_PARM_DESC(mds_max_io_threads, "maximum number of MDS IO service threads"); + static char *mds_num_cpts; module_param(mds_num_cpts, charp, 0444); MODULE_PARM_DESC(mds_num_cpts, "CPU partitions MDS threads should run on"); @@ -134,6 +139,10 @@ static void mds_stop_ptlrpc_service(struct mds_device *m) ptlrpc_unregister_service(m->mds_fld_service); m->mds_fld_service = NULL; } + if (m->mds_io_service != NULL) { + ptlrpc_unregister_service(m->mds_io_service); + m->mds_io_service = NULL; + } mutex_unlock(&m->mds_health_mutex); EXIT; @@ -440,6 +449,43 @@ static int mds_start_ptlrpc_service(struct mds_device *m) GOTO(err_mds_svc, rc); } + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = LUSTRE_MDT_NAME "_io", + .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR, + .psc_buf = { + .bc_nbufs = OST_NBUFS, + .bc_buf_size = OST_IO_BUFSIZE, + .bc_req_max_size = OST_IO_MAXREQSIZE, + .bc_rep_max_size = OST_IO_MAXREPSIZE, + .bc_req_portal = MDS_IO_PORTAL, + .bc_rep_portal = MDC_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "ll_mdt_io", + .tc_thr_factor = OSS_THR_FACTOR, + .tc_nthrs_init = OSS_NTHRS_INIT, + .tc_nthrs_base = OSS_NTHRS_BASE, + .tc_nthrs_max = mds_max_io_threads, + .tc_cpu_affinity = 1, + .tc_ctx_tags = LCT_DT_THREAD | LCT_MD_THREAD, + }, + .psc_ops = { + .so_thr_init = tgt_io_thread_init, + .so_thr_done = tgt_io_thread_done, + .so_req_handler = tgt_request_handle, + .so_req_printer = target_print_req, + }, + }; + m->mds_io_service = ptlrpc_register_service(&conf, m->mds_kset, + procfs_entry); + if (IS_ERR(m->mds_io_service)) { + rc = PTR_ERR(m->mds_io_service); + CERROR("failed to start MDT I/O service: %d\n", rc); + m->mds_io_service = NULL; + GOTO(err_mds_svc, rc); + } + EXIT; err_mds_svc: if (rc) @@ -554,6 +600,7 @@ static int mds_health_check(const struct lu_env *env, struct obd_device *obd) rc |= ptlrpc_service_health_check(mds->mds_mdsc_service); rc |= ptlrpc_service_health_check(mds->mds_mdss_service); rc |= ptlrpc_service_health_check(mds->mds_fld_service); + rc |= ptlrpc_service_health_check(mds->mds_io_service); mutex_unlock(&mds->mds_health_mutex); return rc != 0 ? 1 : 0; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 5f259e9..d34c30e 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -902,7 +902,7 @@ static int osc_del_shrink_grant(struct client_obd *client) TIMEOUT_GRANT); } -static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) +void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) { /* * ocd_grant is the total grant amount we're expect to hold: if we've @@ -959,6 +959,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) list_empty(&cli->cl_grant_shrink_list)) osc_add_shrink_grant(cli); } +EXPORT_SYMBOL(osc_init_grant); /* We assume that the reason this OSC got a short read is because it read * beyond the end of a stripe file; i.e. lustre is reading a sparse file diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index cceeecf..e173964 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -1415,8 +1415,7 @@ static dnode_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj, if (rc) return ERR_PTR(rc); - if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid)) && - osd->od_is_ost) { + if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid))) { /* The minimum block size must be at least page size otherwise * it will break the assumption in tgt_thread_big_cache where * the array size is PTLRPC_MAX_BRW_PAGES. It will also affect diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index 8460774..9026f21 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -1570,6 +1570,35 @@ void tgt_io_thread_done(struct ptlrpc_thread *thread) EXIT; } EXPORT_SYMBOL(tgt_io_thread_done); + +/** + * Helper function for getting Data-on-MDT file server DLM lock + * if asked by client. + */ +int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, + struct lustre_handle *lh, int mode, __u64 *flags) +{ + union ldlm_policy_data policy; + int rc; + + ENTRY; + + LASSERT(lh != NULL); + LASSERT(ns != NULL); + LASSERT(!lustre_handle_is_used(lh)); + + policy.l_inodebits.bits = MDS_INODELOCK_DOM | MDS_INODELOCK_UPDATE; + policy.l_inodebits.try_bits = 0; + + rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy, mode, + flags, ldlm_blocking_ast, + ldlm_completion_ast, ldlm_glimpse_ast, + NULL, 0, LVB_T_NONE, NULL, lh); + + RETURN(rc == ELDLM_OK ? 0 : -EIO); +} +EXPORT_SYMBOL(tgt_mdt_data_lock); + /** * Helper function for getting server side [start, start+count] DLM lock * if asked by client. @@ -1614,13 +1643,15 @@ void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode) } EXPORT_SYMBOL(tgt_extent_unlock); -int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, - struct obd_ioobj *obj, struct niobuf_remote *nb, - struct lustre_handle *lh, enum ldlm_mode mode) +static int tgt_brw_lock(struct obd_export *exp, struct ldlm_res_id *res_id, + struct obd_ioobj *obj, struct niobuf_remote *nb, + struct lustre_handle *lh, enum ldlm_mode mode) { + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; __u64 flags = 0; int nrbufs = obj->ioo_bufcnt; int i; + int rc; ENTRY; @@ -1637,14 +1668,19 @@ int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, if (!(nb[i].rnb_flags & OBD_BRW_SRVLOCK)) RETURN(-EFAULT); - RETURN(tgt_extent_lock(ns, res_id, nb[0].rnb_offset, - nb[nrbufs - 1].rnb_offset + - nb[nrbufs - 1].rnb_len - 1, - lh, mode, &flags)); + /* MDT IO for data-on-mdt */ + if (exp->exp_connect_data.ocd_connect_flags & OBD_CONNECT_IBITS) + rc = tgt_mdt_data_lock(ns, res_id, lh, mode, &flags); + else + rc = tgt_extent_lock(ns, res_id, nb[0].rnb_offset, + nb[nrbufs - 1].rnb_offset + + nb[nrbufs - 1].rnb_len - 1, + lh, mode, &flags); + RETURN(rc); } -void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob, - struct lustre_handle *lh, enum ldlm_mode mode) +static void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob, + struct lustre_handle *lh, enum ldlm_mode mode) { ENTRY; @@ -1882,7 +1918,8 @@ int tgt_brw_read(struct tgt_session_info *tsi) ENTRY; - if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) { + if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL && + ptlrpc_req2svc(req)->srv_req_portal != MDS_IO_PORTAL) { CERROR("%s: deny read request from %s to portal %u\n", tgt_name(tsi->tsi_tgt), obd_export_nid2str(req->rq_export), @@ -1925,8 +1962,8 @@ int tgt_brw_read(struct tgt_session_info *tsi) local_nb = tbc->local; - rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo, - remote_nb, &lockh, LCK_PR); + rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh, + LCK_PR); if (rc != 0) RETURN(rc); @@ -2136,7 +2173,8 @@ int tgt_brw_write(struct tgt_session_info *tsi) ENTRY; - if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) { + if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL && + ptlrpc_req2svc(req)->srv_req_portal != MDS_IO_PORTAL) { CERROR("%s: deny write request from %s to portal %u\n", tgt_name(tsi->tsi_tgt), obd_export_nid2str(req->rq_export), @@ -2200,8 +2238,8 @@ int tgt_brw_write(struct tgt_session_info *tsi) local_nb = tbc->local; - rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo, - remote_nb, &lockh, LCK_PW); + rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh, + LCK_PW); if (rc != 0) GOTO(out, rc);