X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=4caa22a4b90301da10f2e2694aea3ff95490d2a6;hp=508aafa35e9971923e22d92d07fa391a4e1a4571;hb=4d7b022e373d265f4f3b9d90af44cddd0e65f9ae;hpb=687a868cc1d88ea8a10c69d9dd0a9307d9cde368 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 508aafa..4caa22a 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -66,7 +66,6 @@ #include "mdt_internal.h" - static unsigned int max_mod_rpcs_per_client = 8; module_param(max_mod_rpcs_per_client, uint, 0644); MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client"); @@ -269,18 +268,13 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, { struct mdt_device *mdt = info->mti_mdt; struct lu_name *lname = &info->mti_name; - char *name = NULL; + char *filename = info->mti_filename; struct mdt_object *parent; u32 mode; int rc = 0; LASSERT(!info->mti_cross_ref); - OBD_ALLOC(name, NAME_MAX + 1); - if (name == NULL) - return -ENOMEM; - lname->ln_name = name; - /* * We may want to allow this to mount a completely separate * fileset from the MDT in the future, but keeping it to @@ -316,8 +310,9 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, break; } - strncpy(name, s1, lname->ln_namelen); - name[lname->ln_namelen] = '\0'; + strncpy(filename, s1, lname->ln_namelen); + filename[lname->ln_namelen] = '\0'; + lname->ln_name = filename; parent = mdt_object_find(info->mti_env, mdt, fid); if (IS_ERR(parent)) { @@ -342,8 +337,6 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, } } - OBD_FREE(name, NAME_MAX + 1); - return rc; } @@ -412,13 +405,16 @@ out: static int mdt_statfs(struct tgt_session_info *tsi) { - struct ptlrpc_request *req = tgt_ses_req(tsi); - struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct mdt_device *mdt = info->mti_mdt; - struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; - struct ptlrpc_service_part *svcpt; - struct obd_statfs *osfs; - int rc; + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct mdt_thread_info *info = tsi2mdt_info(tsi); + struct mdt_device *mdt = info->mti_mdt; + struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; + struct md_device *next = mdt->mdt_child; + struct ptlrpc_service_part *svcpt; + struct obd_statfs *osfs; + struct mdt_body *reqbody = NULL; + struct mdt_statfs_cache *msf; + int rc; ENTRY; @@ -440,11 +436,39 @@ static int mdt_statfs(struct tgt_session_info *tsi) if (!osfs) GOTO(out, rc = -EPROTO); - rc = tgt_statfs_internal(tsi->tsi_env, &mdt->mdt_lut, osfs, - ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS, - NULL); - if (unlikely(rc)) - GOTO(out, rc); + if (mdt_is_sum_statfs_client(req->rq_export) && + lustre_packed_msg_size(req->rq_reqmsg) == + req_capsule_fmt_size(req->rq_reqmsg->lm_magic, + &RQF_MDS_STATFS_NEW, RCL_CLIENT)) { + req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW); + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + } + + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + msf = &mdt->mdt_sum_osfs; + else + msf = &mdt->mdt_osfs; + + if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) { + /** statfs data is too old, get up-to-date one */ + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + rc = next->md_ops->mdo_statfs(info->mti_env, + next, osfs); + else + rc = dt_statfs(info->mti_env, mdt->mdt_bottom, + osfs); + if (rc) + GOTO(out, rc); + spin_lock(&mdt->mdt_lock); + msf->msf_osfs = *osfs; + msf->msf_age = ktime_get_seconds(); + spin_unlock(&mdt->mdt_lock); + } else { + /** use cached statfs data */ + spin_lock(&mdt->mdt_lock); + *osfs = msf->msf_osfs; + spin_unlock(&mdt->mdt_lock); + } /* at least try to account for cached pages. its still racy and * might be under-reporting if clients haven't announced their @@ -489,11 +513,12 @@ out: * Pack size attributes into the reply. */ int mdt_pack_size2body(struct mdt_thread_info *info, - const struct lu_fid *fid, bool dom_lock) + const struct lu_fid *fid, struct lustre_handle *lh) { struct mdt_body *b; struct md_attr *ma = &info->mti_attr; int dom_stripe; + bool dom_lock = false; ENTRY; @@ -508,6 +533,16 @@ int mdt_pack_size2body(struct mdt_thread_info *info, if (dom_stripe == LMM_NO_DOM) RETURN(-ENOENT); + if (lustre_handle_is_used(lh)) { + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(lh); + if (lock != NULL) { + dom_lock = ldlm_has_dom(lock); + LDLM_LOCK_PUT(lock); + } + } + /* no DoM lock, no size in reply */ if (!dom_lock) RETURN(0); @@ -904,6 +939,8 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, return -EINVAL; } + LASSERT(buf->lb_buf); + rc = mo_xattr_get(info->mti_env, next, buf, name); if (rc > 0) { @@ -956,8 +993,8 @@ got: return rc; } -static int mdt_attr_get_pfid(struct mdt_thread_info *info, - struct mdt_object *o, struct lu_fid *pfid) +int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, + struct lu_fid *pfid) { struct lu_buf *buf = &info->mti_buf; struct link_ea_header *leh; @@ -1027,7 +1064,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc); if (S_ISREG(mode)) - (void) mdt_get_som(info, o, &ma->ma_attr); + (void) mdt_get_som(info, o, ma); ma->ma_valid |= MA_INODE; } @@ -1057,6 +1094,15 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc); } + /* + * In the handle of MA_INODE, we may already get the SOM attr. + */ + if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) { + rc = mdt_get_som(info, o, ma); + if (rc != 0) + GOTO(out, rc); + } + if (need & MA_HSM && S_ISREG(mode)) { buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(info->mti_xattr_buf); @@ -1406,8 +1452,9 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, if (rc) GOTO(out, rc); + mutex_lock(&obj->mot_som_mutex); rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout); - + mutex_unlock(&obj->mot_som_mutex); mdt_object_unlock(info, obj, lh, 1); out: RETURN(rc); @@ -1480,12 +1527,12 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) /* permission check. Make sure the calling process having permission * to write both files. */ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL, - MAY_WRITE); + MAY_WRITE); if (rc < 0) GOTO(put, rc); rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL, - MAY_WRITE); + MAY_WRITE); if (rc < 0) GOTO(put, rc); @@ -1819,10 +1866,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, LDLM_LOCK_PUT(lock); mdt_object_put(info->mti_env, child); /* NB: call the mdt_pack_size2body always after - * mdt_object_put(), that is why this speacial + * mdt_object_put(), that is why this special * exit path is used. */ rc = mdt_pack_size2body(info, child_fid, - child_bits & MDS_INODELOCK_DOM); + &lhc->mlh_reg_lh); if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { /* DOM lock was taken in advance but this is * not DoM file. Drop the lock. */ @@ -1833,17 +1880,17 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, GOTO(out_parent, rc = 0); } - } - if (lock) - LDLM_LOCK_PUT(lock); + } + if (lock) + LDLM_LOCK_PUT(lock); - EXIT; + EXIT; out_child: - mdt_object_put(info->mti_env, child); + mdt_object_put(info->mti_env, child); out_parent: - if (lhp) - mdt_object_unlock(info, parent, lhp, 1); - return rc; + if (lhp) + mdt_object_unlock(info, parent, lhp, 1); + return rc; } /* normal handler: should release the child lock */ @@ -2112,11 +2159,24 @@ static int mdt_reint_internal(struct mdt_thread_info *info, out_ucred: mdt_exit_ucred(info); out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; - return rc; + mdt_client_compatibility(info); + + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; + + /* + * Data-on-MDT optimization - read data along with OPEN and return it + * in reply. Do that only if we have both DOM and LAYOUT locks. + */ + if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) && + info->mti_attr.ma_lmm != NULL && + mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) { + rc = mdt_dom_read_on_open(info, info->mti_mdt, + &lhc->mlh_reg_lh); + } + + return rc; } static long mdt_reint_opcode(struct ptlrpc_request *req, @@ -2158,7 +2218,7 @@ static int mdt_reint(struct tgt_session_info *tsi) [REINT_OPEN] = &RQF_MDS_REINT_OPEN, [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR, [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK, - [REINT_MIGRATE] = &RQF_MDS_REINT_RENAME, + [REINT_MIGRATE] = &RQF_MDS_REINT_MIGRATE, [REINT_RESYNC] = &RQF_MDS_REINT_RESYNC, }; @@ -2182,7 +2242,7 @@ static int mdt_reint(struct tgt_session_info *tsi) } /* this should sync the whole device */ -static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) +int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) { struct dt_device *dt = mdt->mdt_bottom; int rc; @@ -2334,10 +2394,12 @@ static int mdt_quotactl(struct tgt_session_info *tsi) /* master quotactl */ case Q_SETINFO: case Q_SETQUOTA: + case LUSTRE_Q_SETDEFAULT: if (!nodemap_can_setquota(nodemap)) GOTO(out_nodemap, rc = -EPERM); case Q_GETINFO: case Q_GETQUOTA: + case LUSTRE_Q_GETDEFAULT: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ @@ -2387,6 +2449,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_SETINFO: case Q_SETQUOTA: case Q_GETQUOTA: + case LUSTRE_Q_SETDEFAULT: + case LUSTRE_Q_GETDEFAULT: /* forward quotactl request to QMT */ rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl); break; @@ -2594,6 +2658,7 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, { struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd; struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + struct ldlm_cb_set_arg *arg = data; bool commit_async = false; int rc; ENTRY; @@ -2606,17 +2671,22 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, unlock_res_and_lock(lock); RETURN(0); } - /* There is no lock conflict if l_blocking_lock == NULL, - * it indicates a blocking ast sent from ldlm_lock_decref_internal - * when the last reference to a local lock was released */ - if (lock->l_req_mode & (LCK_PW | LCK_EX) && - lock->l_blocking_lock != NULL) { + + /* A blocking ast may be sent from ldlm_lock_decref_internal + * when the last reference to a local lock was released and + * during blocking event from ldlm_work_bl_ast_lock(). + * The 'data' parameter is l_ast_data in the first case and + * callback arguments in the second one. Distinguish them by that. + */ + if (!data || data == lock->l_ast_data || !arg->bl_desc) + goto skip_cos_checks; + + if (lock->l_req_mode & (LCK_PW | LCK_EX)) { if (mdt_cos_is_enabled(mdt)) { - if (lock->l_client_cookie != - lock->l_blocking_lock->l_client_cookie) + if (!arg->bl_desc->bl_same_client) mdt_set_lock_sync(lock); } else if (mdt_slc_is_enabled(mdt) && - ldlm_is_cos_incompat(lock->l_blocking_lock)) { + arg->bl_desc->bl_cos_incompat) { mdt_set_lock_sync(lock); /* * we may do extra commit here, but there is a small @@ -2630,11 +2700,11 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, */ commit_async = true; } - } else if (lock->l_req_mode == LCK_COS && - lock->l_blocking_lock != NULL) { + } else if (lock->l_req_mode == LCK_COS) { commit_async = true; } +skip_cos_checks: rc = ldlm_blocking_ast_nocheck(lock); if (commit_async) { @@ -2828,7 +2898,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags = 0; + __u64 dlmflags = 0, *cookie = NULL; int rc; ENTRY; @@ -2860,10 +2930,12 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, } } - fid_build_reg_res_name(mdt_object_fid(o), res_id); dlmflags |= LDLM_FL_ATOMIC_CB; + if (info->mti_exp) + cookie = &info->mti_exp->exp_handle.h_cookie; + /* * Take PDO lock on whole directory and build correct @res_id for lock * on part of directory. @@ -2877,12 +2949,16 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * is never going to be sent to client and we do not * want it slowed down due to possible cancels. */ - policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; - policy->l_inodebits.try_bits = 0; - rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + policy->l_inodebits.bits = + *ibits & MDS_INODELOCK_UPDATE; + policy->l_inodebits.try_bits = + trybits & MDS_INODELOCK_UPDATE; + /* at least one of them should be set */ + LASSERT(policy->l_inodebits.bits | + policy->l_inodebits.try_bits); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh, + lh->mlh_pdo_mode, policy, res_id, + dlmflags, cookie); if (unlikely(rc != 0)) GOTO(out_unlock, rc); } @@ -2902,10 +2978,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, + policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags, + cookie); out_unlock: if (rc != 0) mdt_object_unlock(info, o, lh, 1); @@ -2982,6 +3057,10 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, } } + /* other components like LFSCK can use lockless access + * and populate cache, so we better invalidate it */ + mo_invalidate(info->mti_env, mdt_object_child(o)); + RETURN(0); } @@ -3377,25 +3456,14 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi) return tgt_connect(tsi); } -enum mdt_it_code { - MDT_IT_OPEN, - MDT_IT_GETATTR, - MDT_IT_LOOKUP, - MDT_IT_GETXATTR, - MDT_IT_LAYOUT, - MDT_IT_QUOTA, - MDT_IT_GLIMPSE, - MDT_IT_BRW, -}; - -static int mdt_intent_glimpse(enum mdt_it_code opcode, +static int mdt_intent_glimpse(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) { return mdt_glimpse_enqueue(info, info->mti_mdt->mdt_namespace, lockp, flags); } -static int mdt_intent_brw(enum mdt_it_code opcode, +static int mdt_intent_brw(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) { @@ -3542,10 +3610,10 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info, dlmreq->lock_handle[0].cookie); } -static int mdt_intent_getxattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags) +static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *ldlm_rep = NULL; @@ -3592,7 +3660,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode, RETURN(rc); } -static int mdt_intent_getattr(enum mdt_it_code opcode, +static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) @@ -3615,18 +3683,19 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - switch (opcode) { - case MDT_IT_LOOKUP: + switch (it_opc) { + case IT_LOOKUP: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; - break; - case MDT_IT_GETATTR: + break; + case IT_GETATTR: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM; - break; - default: - CERROR("Unsupported intent (%d)\n", opcode); - GOTO(out_shrink, rc = -EINVAL); - } + break; + default: + CERROR("%s: unsupported intent %#x\n", + mdt_obd_name(info->mti_mdt), (unsigned int)it_opc); + GOTO(out_shrink, rc = -EINVAL); + } rc = mdt_init_ucred_intent_getattr(info, reqbody); if (rc) @@ -3661,7 +3730,7 @@ out_shrink: return rc; } -static int mdt_intent_layout(enum mdt_it_code opcode, +static int mdt_intent_layout(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) @@ -3675,12 +3744,6 @@ static int mdt_intent_layout(enum mdt_it_code opcode, int rc = 0; ENTRY; - if (opcode != MDT_IT_LAYOUT) { - CERROR("%s: Unknown intent (%d)\n", mdt_obd_name(info->mti_mdt), - opcode); - RETURN(-EINVAL); - } - fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); intent = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT); @@ -3740,6 +3803,8 @@ static int mdt_intent_layout(enum mdt_it_code opcode, if (layout_size > info->mti_mdt->mdt_max_mdsize) info->mti_mdt->mdt_max_mdsize = layout_size; } + CDEBUG(D_INFO, "%s: layout_size %d\n", + mdt_obd_name(info->mti_mdt), layout_size); } /* @@ -3811,10 +3876,10 @@ out_obj: out: lhc->mlh_reg_lh.cookie = 0; - return rc; + RETURN(rc); } -static int mdt_intent_open(enum mdt_it_code opcode, +static int mdt_intent_open(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) @@ -3884,99 +3949,60 @@ static int mdt_intent_open(enum mdt_it_code opcode, RETURN(ELDLM_LOCK_ABORTED); } -static struct mdt_it_flavor { - const struct req_format *it_fmt; - int (*it_act)(enum mdt_it_code, - struct mdt_thread_info *, - struct ldlm_lock **, - __u64); - enum tgt_handler_flags it_handler_flags; -} mdt_it_flavor[] = { - [MDT_IT_OPEN] = { - /* - * OCREAT is not a MUTABOR request as if the file - * already exists. - * We do the extra check of OBD_CONNECT_RDONLY in - * mdt_reint_open() when we really need to create - * the object. - */ - .it_fmt = &RQF_LDLM_INTENT, - .it_act = mdt_intent_open, - }, - [MDT_IT_GETATTR] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_act = mdt_intent_getattr, - .it_handler_flags = HABEO_REFERO, - }, - [MDT_IT_LOOKUP] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_act = mdt_intent_getattr, - .it_handler_flags = HABEO_REFERO, - }, - [MDT_IT_GETXATTR] = { - .it_fmt = &RQF_LDLM_INTENT_GETXATTR, - .it_act = mdt_intent_getxattr, - .it_handler_flags = HABEO_CORPUS, - }, - [MDT_IT_LAYOUT] = { - .it_fmt = &RQF_LDLM_INTENT_LAYOUT, - .it_act = mdt_intent_layout, - }, - [MDT_IT_GLIMPSE] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_act = mdt_intent_glimpse, - }, - [MDT_IT_BRW] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_act = mdt_intent_brw, - }, -}; - -static int mdt_intent_opc(enum ldlm_intent_flags it_code, +static int mdt_intent_opc(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, - struct ldlm_lock **lockp, __u64 flags) + struct ldlm_lock **lockp, + u64 flags /* LDLM_FL_* */) { struct req_capsule *pill = info->mti_pill; struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_it_flavor *flv; - enum mdt_it_code opc; + const struct req_format *it_format; + int (*it_handler)(enum ldlm_intent_flags, + struct mdt_thread_info *, + struct ldlm_lock **, + u64); + enum tgt_handler_flags it_handler_flags = 0; + struct ldlm_reply *rep; int rc; ENTRY; - switch (it_code) { + switch (it_opc) { case IT_OPEN: case IT_OPEN|IT_CREAT: - opc = MDT_IT_OPEN; + /* + * OCREAT is not a MUTABOR request since the file may + * already exist. We do the extra check of + * OBD_CONNECT_RDONLY in mdt_reint_open() when we + * really need to create the object. + */ + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_open; break; case IT_GETATTR: - opc = MDT_IT_GETATTR; - break; case IT_LOOKUP: - opc = MDT_IT_LOOKUP; + it_format = &RQF_LDLM_INTENT_GETATTR; + it_handler = &mdt_intent_getattr; + it_handler_flags = HABEO_REFERO; break; case IT_GETXATTR: - opc = MDT_IT_GETXATTR; + it_format = &RQF_LDLM_INTENT_GETXATTR; + it_handler = &mdt_intent_getxattr; + it_handler_flags = HABEO_CORPUS; break; case IT_LAYOUT: - opc = MDT_IT_LAYOUT; - break; - case IT_QUOTA_DQACQ: - case IT_QUOTA_CONN: - opc = MDT_IT_QUOTA; + it_format = &RQF_LDLM_INTENT_LAYOUT; + it_handler = &mdt_intent_layout; break; case IT_GLIMPSE: - opc = MDT_IT_GLIMPSE; + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_glimpse; break; case IT_BRW: - opc = MDT_IT_BRW; + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_brw; break; - default: - CERROR("%s: unknown intent code %#x\n", - mdt_obd_name(info->mti_mdt), it_code); - RETURN(-EPROTO); - } - - if (opc == MDT_IT_QUOTA) { + case IT_QUOTA_DQACQ: + case IT_QUOTA_CONN: { struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev; if (qmt == NULL) @@ -3992,40 +4018,31 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_code, flags); RETURN(rc); } - - if (!(0 <= opc && opc < ARRAY_SIZE(mdt_it_flavor))) - RETURN(-EPROTO); - - flv = &mdt_it_flavor[opc]; - - /* Fail early on unknown requests. */ - if (flv->it_fmt == NULL) + default: + CERROR("%s: unknown intent code %#x\n", + mdt_obd_name(info->mti_mdt), it_opc); RETURN(-EPROTO); + } - req_capsule_extend(pill, flv->it_fmt); + req_capsule_extend(pill, it_format); - rc = mdt_unpack_req_pack_rep(info, flv->it_handler_flags); + rc = mdt_unpack_req_pack_rep(info, it_handler_flags); if (rc < 0) RETURN(rc); - if (flv->it_handler_flags & MUTABOR && mdt_rdonly(req->rq_export)) + if (it_handler_flags & MUTABOR && mdt_rdonly(req->rq_export)) RETURN(-EROFS); - if (flv->it_act != NULL) { - struct ldlm_reply *rep; + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); + /* execute policy */ + rc = (*it_handler)(it_opc, info, lockp, flags); - /* execute policy */ - rc = flv->it_act(opc, info, lockp, flags); - - /* Check whether the reply has been packed successfully. */ - if (req->rq_repmsg != NULL) { - rep = req_capsule_server_get(info->mti_pill, - &RMF_DLM_REP); - rep->lock_policy_res2 = - ptlrpc_status_hton(rep->lock_policy_res2); - } + /* Check whether the reply has been packed successfully. */ + if (req->rq_repmsg != NULL) { + rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + rep->lock_policy_res2 = + ptlrpc_status_hton(rep->lock_policy_res2); } RETURN(rc); @@ -4043,9 +4060,12 @@ static void mdt_ptlrpc_stats_update(struct ptlrpc_request *req, LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE)); } -static int mdt_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - enum ldlm_mode mode, __u64 flags, void *data) +static int mdt_intent_policy(const struct lu_env *env, + struct ldlm_namespace *ns, + struct ldlm_lock **lockp, + void *req_cookie, + enum ldlm_mode mode, + __u64 flags, void *data) { struct tgt_session_info *tsi; struct mdt_thread_info *info; @@ -4059,7 +4079,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, LASSERT(req != NULL); - tsi = tgt_ses_info(req->rq_svc_thread->t_env); + tsi = tgt_ses_info(env); info = tsi2mdt_info(tsi); LASSERT(info != NULL); @@ -5024,7 +5044,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd = class_name2obd(dev); LASSERT(obd != NULL); - m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */ + m->mdt_max_mdsize = MAX_MD_SIZE_OLD; m->mdt_opts.mo_evict_tgt_nids = 1; m->mdt_opts.mo_cos = MDT_COS_DEFAULT; @@ -5044,14 +5064,18 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, } /* DoM files get IO lock at open by default */ - m->mdt_opts.mo_dom_lock = 1; + m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN; + /* DoM files are read at open and data is packed in the reply */ + m->mdt_opts.mo_dom_read_open = 1; m->mdt_squash.rsi_uid = 0; m->mdt_squash.rsi_gid = 0; INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); init_rwsem(&m->mdt_squash.rsi_sem); spin_lock_init(&m->mdt_lock); - m->mdt_enable_remote_dir = 0; + m->mdt_enable_remote_dir = 1; + m->mdt_enable_striped_dir = 1; + m->mdt_enable_dir_migration = 1; m->mdt_enable_remote_dir_gid = 0; atomic_set(&m->mdt_mds_mds_conns, 0); @@ -5346,6 +5370,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, lu_object_add_top(h, o); o->lo_ops = &mdt_obj_ops; spin_lock_init(&mo->mot_write_lock); + mutex_init(&mo->mot_som_mutex); mutex_init(&mo->mot_lov_mutex); init_rwsem(&mo->mot_dom_sem); init_rwsem(&mo->mot_open_sem); @@ -5699,7 +5724,7 @@ static int mdt_export_cleanup(struct obd_export *exp) /* Remove mfd handle so it can't be found again. * We are consuming the mfd_list reference here. */ - class_handle_unhash(&mfd->mfd_handle); + class_handle_unhash(&mfd->mfd_open_handle); list_move_tail(&mfd->mfd_list, &closing_list); } spin_unlock(&med->med_open_lock); @@ -5740,7 +5765,7 @@ static int mdt_export_cleanup(struct obd_export *exp) * archive request into a noop if it's not actually * dirty. */ - if (mfd->mfd_mode & MDS_FMODE_WRITE) + if (mfd->mfd_open_flags & MDS_FMODE_WRITE) rc = mdt_ctxt_add_dirty_flag(&env, info, mfd); /* Don't unlink orphan on failover umount, LU-184 */