X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=d94fbbc2c30f91081f85e9c3062812087248e622;hb=e5abcf83c0575b8a79594c1eb9ea727739d91522;hp=267f469f7fd67341d08b08bc41b1c89203060611;hpb=1e7fc14bbf48f7e89876cbaa609972981e343944;p=fs%2Flustre-release.git diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 267f469f..d94fbbc 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -66,7 +66,6 @@ #include "mdt_internal.h" - static unsigned int max_mod_rpcs_per_client = 8; module_param(max_mod_rpcs_per_client, uint, 0644); MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client"); @@ -94,7 +93,6 @@ enum ldlm_mode mdt_dlm_lock_modes[] = { }; static struct mdt_device *mdt_dev(struct lu_device *d); -static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags); static const struct lu_object_operations mdt_obj_ops; @@ -270,18 +268,13 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, { struct mdt_device *mdt = info->mti_mdt; struct lu_name *lname = &info->mti_name; - char *name = NULL; + char *filename = info->mti_filename; struct mdt_object *parent; u32 mode; int rc = 0; LASSERT(!info->mti_cross_ref); - OBD_ALLOC(name, NAME_MAX + 1); - if (name == NULL) - return -ENOMEM; - lname->ln_name = name; - /* * We may want to allow this to mount a completely separate * fileset from the MDT in the future, but keeping it to @@ -317,8 +310,9 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, break; } - strncpy(name, s1, lname->ln_namelen); - name[lname->ln_namelen] = '\0'; + strncpy(filename, s1, lname->ln_namelen); + filename[lname->ln_namelen] = '\0'; + lname->ln_name = filename; parent = mdt_object_find(info->mti_env, mdt, fid); if (IS_ERR(parent)) { @@ -343,8 +337,6 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, } } - OBD_FREE(name, NAME_MAX + 1); - return rc; } @@ -413,13 +405,16 @@ out: static int mdt_statfs(struct tgt_session_info *tsi) { - struct ptlrpc_request *req = tgt_ses_req(tsi); - struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct mdt_device *mdt = info->mti_mdt; - struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; - struct ptlrpc_service_part *svcpt; - struct obd_statfs *osfs; - int rc; + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct mdt_thread_info *info = tsi2mdt_info(tsi); + struct mdt_device *mdt = info->mti_mdt; + struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd; + struct md_device *next = mdt->mdt_child; + struct ptlrpc_service_part *svcpt; + struct obd_statfs *osfs; + struct mdt_body *reqbody = NULL; + struct mdt_statfs_cache *msf; + int rc; ENTRY; @@ -441,11 +436,39 @@ static int mdt_statfs(struct tgt_session_info *tsi) if (!osfs) GOTO(out, rc = -EPROTO); - rc = tgt_statfs_internal(tsi->tsi_env, &mdt->mdt_lut, osfs, - ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS, - NULL); - if (unlikely(rc)) - GOTO(out, rc); + if (mdt_is_sum_statfs_client(req->rq_export) && + lustre_packed_msg_size(req->rq_reqmsg) == + req_capsule_fmt_size(req->rq_reqmsg->lm_magic, + &RQF_MDS_STATFS_NEW, RCL_CLIENT)) { + req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW); + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + } + + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + msf = &mdt->mdt_sum_osfs; + else + msf = &mdt->mdt_osfs; + + if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) { + /** statfs data is too old, get up-to-date one */ + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + rc = next->md_ops->mdo_statfs(info->mti_env, + next, osfs); + else + rc = dt_statfs(info->mti_env, mdt->mdt_bottom, + osfs); + if (rc) + GOTO(out, rc); + spin_lock(&mdt->mdt_lock); + msf->msf_osfs = *osfs; + msf->msf_age = ktime_get_seconds(); + spin_unlock(&mdt->mdt_lock); + } else { + /** use cached statfs data */ + spin_lock(&mdt->mdt_lock); + *osfs = msf->msf_osfs; + spin_unlock(&mdt->mdt_lock); + } /* at least try to account for cached pages. its still racy and * might be under-reporting if clients haven't announced their @@ -490,11 +513,12 @@ out: * Pack size attributes into the reply. */ int mdt_pack_size2body(struct mdt_thread_info *info, - const struct lu_fid *fid, bool dom_lock) + const struct lu_fid *fid, struct lustre_handle *lh) { struct mdt_body *b; struct md_attr *ma = &info->mti_attr; int dom_stripe; + bool dom_lock = false; ENTRY; @@ -509,6 +533,16 @@ int mdt_pack_size2body(struct mdt_thread_info *info, if (dom_stripe == LMM_NO_DOM) RETURN(-ENOENT); + if (lustre_handle_is_used(lh)) { + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(lh); + if (lock != NULL) { + dom_lock = ldlm_has_dom(lock); + LDLM_LOCK_PUT(lock); + } + } + /* no DoM lock, no size in reply */ if (!dom_lock) RETURN(0); @@ -905,6 +939,8 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, return -EINVAL; } + LASSERT(buf->lb_buf); + rc = mo_xattr_get(info->mti_env, next, buf, name); if (rc > 0) { @@ -957,8 +993,8 @@ got: return rc; } -static int mdt_attr_get_pfid(struct mdt_thread_info *info, - struct mdt_object *o, struct lu_fid *pfid) +int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, + struct lu_fid *pfid) { struct lu_buf *buf = &info->mti_buf; struct link_ea_header *leh; @@ -1028,7 +1064,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc); if (S_ISREG(mode)) - (void) mdt_get_som(info, o, &ma->ma_attr); + (void) mdt_get_som(info, o, ma); ma->ma_valid |= MA_INODE; } @@ -1058,6 +1094,15 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc); } + /* + * In the handle of MA_INODE, we may already get the SOM attr. + */ + if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) { + rc = mdt_get_som(info, o, ma); + if (rc != 0) + GOTO(out, rc); + } + if (need & MA_HSM && S_ISREG(mode)) { buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(info->mti_xattr_buf); @@ -1407,8 +1452,9 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, if (rc) GOTO(out, rc); + mutex_lock(&obj->mot_som_mutex); rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout); - + mutex_unlock(&obj->mot_som_mutex); mdt_object_unlock(info, obj, lh, 1); out: RETURN(rc); @@ -1820,10 +1866,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, LDLM_LOCK_PUT(lock); mdt_object_put(info->mti_env, child); /* NB: call the mdt_pack_size2body always after - * mdt_object_put(), that is why this speacial + * mdt_object_put(), that is why this special * exit path is used. */ rc = mdt_pack_size2body(info, child_fid, - child_bits & MDS_INODELOCK_DOM); + &lhc->mlh_reg_lh); if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { /* DOM lock was taken in advance but this is * not DoM file. Drop the lock. */ @@ -1834,17 +1880,17 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, GOTO(out_parent, rc = 0); } - } - if (lock) - LDLM_LOCK_PUT(lock); + } + if (lock) + LDLM_LOCK_PUT(lock); - EXIT; + EXIT; out_child: - mdt_object_put(info->mti_env, child); + mdt_object_put(info->mti_env, child); out_parent: - if (lhp) - mdt_object_unlock(info, parent, lhp, 1); - return rc; + if (lhp) + mdt_object_unlock(info, parent, lhp, 1); + return rc; } /* normal handler: should release the child lock */ @@ -2113,11 +2159,24 @@ static int mdt_reint_internal(struct mdt_thread_info *info, out_ucred: mdt_exit_ucred(info); out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; - return rc; + mdt_client_compatibility(info); + + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; + + /* + * Data-on-MDT optimization - read data along with OPEN and return it + * in reply. Do that only if we have both DOM and LAYOUT locks. + */ + if (rc == 0 && op == REINT_OPEN && + info->mti_attr.ma_lmm != NULL && + mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) { + rc = mdt_dom_read_on_open(info, info->mti_mdt, + &lhc->mlh_reg_lh); + } + + return rc; } static long mdt_reint_opcode(struct ptlrpc_request *req, @@ -2159,7 +2218,7 @@ static int mdt_reint(struct tgt_session_info *tsi) [REINT_OPEN] = &RQF_MDS_REINT_OPEN, [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR, [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK, - [REINT_MIGRATE] = &RQF_MDS_REINT_RENAME, + [REINT_MIGRATE] = &RQF_MDS_REINT_MIGRATE, [REINT_RESYNC] = &RQF_MDS_REINT_RESYNC, }; @@ -2183,7 +2242,7 @@ static int mdt_reint(struct tgt_session_info *tsi) } /* this should sync the whole device */ -static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) +int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) { struct dt_device *dt = mdt->mdt_bottom; int rc; @@ -2335,10 +2394,12 @@ static int mdt_quotactl(struct tgt_session_info *tsi) /* master quotactl */ case Q_SETINFO: case Q_SETQUOTA: + case LUSTRE_Q_SETDEFAULT: if (!nodemap_can_setquota(nodemap)) GOTO(out_nodemap, rc = -EPERM); case Q_GETINFO: case Q_GETQUOTA: + case LUSTRE_Q_GETDEFAULT: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ @@ -2388,6 +2449,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_SETINFO: case Q_SETQUOTA: case Q_GETQUOTA: + case LUSTRE_Q_SETDEFAULT: + case LUSTRE_Q_GETDEFAULT: /* forward quotactl request to QMT */ rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl); break; @@ -2766,7 +2829,7 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, struct lustre_handle *lh, enum ldlm_mode mode, __u64 *ibits, __u64 trybits, bool cache) { - struct ldlm_enqueue_info *einfo = &mti->mti_einfo; + struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo; union ldlm_policy_data *policy = &mti->mti_policy; struct ldlm_res_id *res_id = &mti->mti_res_id; int rc = 0; @@ -2793,17 +2856,14 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, einfo->ei_cbdata = o; } - memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = *ibits; policy->l_inodebits.try_bits = trybits; rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, policy); - if (rc < 0 && cache) { + if (rc < 0 && cache) mdt_object_put(mti->mti_env, o); - einfo->ei_cbdata = NULL; - } /* Return successfully acquired bits to a caller */ if (rc == 0) { @@ -2832,7 +2892,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags = 0; + __u64 dlmflags = 0, *cookie = NULL; int rc; ENTRY; @@ -2864,10 +2924,12 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, } } - fid_build_reg_res_name(mdt_object_fid(o), res_id); dlmflags |= LDLM_FL_ATOMIC_CB; + if (info->mti_exp) + cookie = &info->mti_exp->exp_handle.h_cookie; + /* * Take PDO lock on whole directory and build correct @res_id for lock * on part of directory. @@ -2881,12 +2943,16 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * is never going to be sent to client and we do not * want it slowed down due to possible cancels. */ - policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; - policy->l_inodebits.try_bits = 0; - rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, - policy, res_id, dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + policy->l_inodebits.bits = + *ibits & MDS_INODELOCK_UPDATE; + policy->l_inodebits.try_bits = + trybits & MDS_INODELOCK_UPDATE; + /* at least one of them should be set */ + LASSERT(policy->l_inodebits.bits | + policy->l_inodebits.try_bits); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh, + lh->mlh_pdo_mode, policy, res_id, + dlmflags, cookie); if (unlikely(rc != 0)) GOTO(out_unlock, rc); } @@ -2906,10 +2972,9 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - info->mti_exp == NULL ? NULL : - &info->mti_exp->exp_handle.h_cookie); + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, + policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags, + cookie); out_unlock: if (rc != 0) mdt_object_unlock(info, o, lh, 1); @@ -3196,7 +3261,8 @@ void mdt_object_unlock_put(struct mdt_thread_info * info, * actually exists on storage (lu_object_exists()). * */ -static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) +static int mdt_body_unpack(struct mdt_thread_info *info, + enum tgt_handler_flags flags) { const struct mdt_body *body; struct mdt_object *obj; @@ -3235,7 +3301,8 @@ static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags) RETURN(rc); } -static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags) +static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, + enum tgt_handler_flags flags) { struct req_capsule *pill = info->mti_pill; int rc; @@ -3379,48 +3446,14 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi) return tgt_connect(tsi); } -enum mdt_it_code { - MDT_IT_OPEN, - MDT_IT_OCREAT, - MDT_IT_CREATE, - MDT_IT_GETATTR, - MDT_IT_READDIR, - MDT_IT_LOOKUP, - MDT_IT_UNLINK, - MDT_IT_TRUNC, - MDT_IT_GETXATTR, - MDT_IT_LAYOUT, - MDT_IT_QUOTA, - MDT_IT_GLIMPSE, - MDT_IT_BRW, - MDT_IT_NR -}; - -static int mdt_intent_getattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, __u64); - -static int mdt_intent_getxattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags); - -static int mdt_intent_layout(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, - __u64); -static int mdt_intent_reint(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **, - __u64); -static int mdt_intent_glimpse(enum mdt_it_code opcode, +static int mdt_intent_glimpse(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) { return mdt_glimpse_enqueue(info, info->mti_mdt->mdt_namespace, lockp, flags); } -static int mdt_intent_brw(enum mdt_it_code opcode, +static int mdt_intent_brw(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) { @@ -3428,90 +3461,6 @@ static int mdt_intent_brw(enum mdt_it_code opcode, lockp, flags); } -static struct mdt_it_flavor { - const struct req_format *it_fmt; - __u32 it_flags; - int (*it_act)(enum mdt_it_code , - struct mdt_thread_info *, - struct ldlm_lock **, - __u64); - long it_reint; -} mdt_it_flavor[] = { - [MDT_IT_OPEN] = { - .it_fmt = &RQF_LDLM_INTENT, - /*.it_flags = HABEO_REFERO,*/ - .it_flags = 0, - .it_act = mdt_intent_reint, - .it_reint = REINT_OPEN - }, - [MDT_IT_OCREAT] = { - .it_fmt = &RQF_LDLM_INTENT, - /* - * OCREAT is not a MUTABOR request as if the file - * already exists. - * We do the extra check of OBD_CONNECT_RDONLY in - * mdt_reint_open() when we really need to create - * the object. - */ - .it_flags = 0, - .it_act = mdt_intent_reint, - .it_reint = REINT_OPEN - }, - [MDT_IT_CREATE] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_flags = MUTABOR, - .it_act = mdt_intent_reint, - .it_reint = REINT_CREATE - }, - [MDT_IT_GETATTR] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_flags = HABEO_REFERO, - .it_act = mdt_intent_getattr - }, - [MDT_IT_READDIR] = { - .it_fmt = NULL, - .it_flags = 0, - .it_act = NULL - }, - [MDT_IT_LOOKUP] = { - .it_fmt = &RQF_LDLM_INTENT_GETATTR, - .it_flags = HABEO_REFERO, - .it_act = mdt_intent_getattr - }, - [MDT_IT_UNLINK] = { - .it_fmt = &RQF_LDLM_INTENT_UNLINK, - .it_flags = MUTABOR, - .it_act = NULL, - .it_reint = REINT_UNLINK - }, - [MDT_IT_TRUNC] = { - .it_fmt = NULL, - .it_flags = MUTABOR, - .it_act = NULL - }, - [MDT_IT_GETXATTR] = { - .it_fmt = &RQF_LDLM_INTENT_GETXATTR, - .it_flags = HABEO_CORPUS, - .it_act = mdt_intent_getxattr - }, - [MDT_IT_LAYOUT] = { - .it_fmt = &RQF_LDLM_INTENT_LAYOUT, - .it_flags = 0, - .it_act = mdt_intent_layout - }, - [MDT_IT_GLIMPSE] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_flags = 0, - .it_act = mdt_intent_glimpse, - }, - [MDT_IT_BRW] = { - .it_fmt = &RQF_LDLM_INTENT, - .it_flags = 0, - .it_act = mdt_intent_brw, - }, - -}; - int mdt_intent_lock_replace(struct mdt_thread_info *info, struct ldlm_lock **lockp, struct mdt_lock_handle *lh, @@ -3651,10 +3600,10 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info, dlmreq->lock_handle[0].cookie); } -static int mdt_intent_getxattr(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags) +static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *ldlm_rep = NULL; @@ -3701,7 +3650,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode, RETURN(rc); } -static int mdt_intent_getattr(enum mdt_it_code opcode, +static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) @@ -3724,18 +3673,19 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - switch (opcode) { - case MDT_IT_LOOKUP: + switch (it_opc) { + case IT_LOOKUP: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; - break; - case MDT_IT_GETATTR: + break; + case IT_GETATTR: child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM; - break; - default: - CERROR("Unsupported intent (%d)\n", opcode); - GOTO(out_shrink, rc = -EINVAL); - } + break; + default: + CERROR("%s: unsupported intent %#x\n", + mdt_obd_name(info->mti_mdt), (unsigned int)it_opc); + GOTO(out_shrink, rc = -EINVAL); + } rc = mdt_init_ucred_intent_getattr(info, reqbody); if (rc) @@ -3770,7 +3720,7 @@ out_shrink: return rc; } -static int mdt_intent_layout(enum mdt_it_code opcode, +static int mdt_intent_layout(enum ldlm_intent_flags it_opc, struct mdt_thread_info *info, struct ldlm_lock **lockp, __u64 flags) @@ -3784,12 +3734,6 @@ static int mdt_intent_layout(enum mdt_it_code opcode, int rc = 0; ENTRY; - if (opcode != MDT_IT_LAYOUT) { - CERROR("%s: Unknown intent (%d)\n", mdt_obd_name(info->mti_mdt), - opcode); - RETURN(-EINVAL); - } - fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); intent = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT); @@ -3849,6 +3793,8 @@ static int mdt_intent_layout(enum mdt_it_code opcode, if (layout_size > info->mti_mdt->mdt_max_mdsize) info->mti_mdt->mdt_max_mdsize = layout_size; } + CDEBUG(D_INFO, "%s: layout_size %d\n", + mdt_obd_name(info->mti_mdt), layout_size); } /* @@ -3920,13 +3866,13 @@ out_obj: out: lhc->mlh_reg_lh.cookie = 0; - return rc; + RETURN(rc); } -static int mdt_intent_reint(enum mdt_it_code opcode, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, - __u64 flags) +static int mdt_intent_open(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + __u64 flags) { struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *rep = NULL; @@ -3944,12 +3890,6 @@ static int mdt_intent_reint(enum mdt_it_code opcode, if (opc < 0) RETURN(opc); - if (mdt_it_flavor[opcode].it_reint != opc) { - CERROR("Reint code %ld doesn't match intent: %d\n", - opc, opcode); - RETURN(err_serious(-EPROTO)); - } - /* Get lock from request for possible resent case. */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); @@ -3999,75 +3939,60 @@ static int mdt_intent_reint(enum mdt_it_code opcode, RETURN(ELDLM_LOCK_ABORTED); } -static int mdt_intent_code(enum ldlm_intent_flags itcode) +static int mdt_intent_opc(enum ldlm_intent_flags it_opc, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, + u64 flags /* LDLM_FL_* */) { + struct req_capsule *pill = info->mti_pill; + struct ptlrpc_request *req = mdt_info_req(info); + const struct req_format *it_format; + int (*it_handler)(enum ldlm_intent_flags, + struct mdt_thread_info *, + struct ldlm_lock **, + u64); + enum tgt_handler_flags it_handler_flags = 0; + struct ldlm_reply *rep; int rc; + ENTRY; - switch (itcode) { + switch (it_opc) { case IT_OPEN: - rc = MDT_IT_OPEN; - break; case IT_OPEN|IT_CREAT: - rc = MDT_IT_OCREAT; - break; - case IT_CREAT: - rc = MDT_IT_CREATE; - break; - case IT_READDIR: - rc = MDT_IT_READDIR; + /* + * OCREAT is not a MUTABOR request since the file may + * already exist. We do the extra check of + * OBD_CONNECT_RDONLY in mdt_reint_open() when we + * really need to create the object. + */ + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_open; break; case IT_GETATTR: - rc = MDT_IT_GETATTR; - break; case IT_LOOKUP: - rc = MDT_IT_LOOKUP; - break; - case IT_UNLINK: - rc = MDT_IT_UNLINK; - break; - case IT_TRUNC: - rc = MDT_IT_TRUNC; + it_format = &RQF_LDLM_INTENT_GETATTR; + it_handler = &mdt_intent_getattr; + it_handler_flags = HABEO_REFERO; break; case IT_GETXATTR: - rc = MDT_IT_GETXATTR; + it_format = &RQF_LDLM_INTENT_GETXATTR; + it_handler = &mdt_intent_getxattr; + it_handler_flags = HABEO_CORPUS; break; case IT_LAYOUT: - rc = MDT_IT_LAYOUT; - break; - case IT_QUOTA_DQACQ: - case IT_QUOTA_CONN: - rc = MDT_IT_QUOTA; + it_format = &RQF_LDLM_INTENT_LAYOUT; + it_handler = &mdt_intent_layout; break; case IT_GLIMPSE: - rc = MDT_IT_GLIMPSE; + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_glimpse; break; case IT_BRW: - rc = MDT_IT_BRW; + it_format = &RQF_LDLM_INTENT; + it_handler = &mdt_intent_brw; break; - default: - CERROR("Unknown intent opcode: 0x%08x\n", itcode); - rc = -EINVAL; - break; - } - return rc; -} - -static int mdt_intent_opc(enum ldlm_intent_flags itopc, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, __u64 flags) -{ - struct req_capsule *pill = info->mti_pill; - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_it_flavor *flv; - int opc; - int rc; - ENTRY; - - opc = mdt_intent_code(itopc); - if (opc < 0) - RETURN(-EINVAL); - - if (opc == MDT_IT_QUOTA) { + case IT_QUOTA_DQACQ: + case IT_QUOTA_CONN: { struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev; if (qmt == NULL) @@ -4083,33 +4008,31 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc, flags); RETURN(rc); } + default: + CERROR("%s: unknown intent code %#x\n", + mdt_obd_name(info->mti_mdt), it_opc); + RETURN(-EPROTO); + } - flv = &mdt_it_flavor[opc]; - if (flv->it_fmt != NULL) - req_capsule_extend(pill, flv->it_fmt); + req_capsule_extend(pill, it_format); - rc = mdt_unpack_req_pack_rep(info, flv->it_flags); + rc = mdt_unpack_req_pack_rep(info, it_handler_flags); if (rc < 0) RETURN(rc); - if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export)) + if (it_handler_flags & MUTABOR && mdt_rdonly(req->rq_export)) RETURN(-EROFS); - if (flv->it_act != NULL) { - struct ldlm_reply *rep; + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); + /* execute policy */ + rc = (*it_handler)(it_opc, info, lockp, flags); - /* execute policy */ - rc = flv->it_act(opc, info, lockp, flags); - - /* Check whether the reply has been packed successfully. */ - if (req->rq_repmsg != NULL) { - rep = req_capsule_server_get(info->mti_pill, - &RMF_DLM_REP); - rep->lock_policy_res2 = - ptlrpc_status_hton(rep->lock_policy_res2); - } + /* Check whether the reply has been packed successfully. */ + if (req->rq_repmsg != NULL) { + rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + rep->lock_policy_res2 = + ptlrpc_status_hton(rep->lock_policy_res2); } RETURN(rc); @@ -4127,9 +4050,12 @@ static void mdt_ptlrpc_stats_update(struct ptlrpc_request *req, LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE)); } -static int mdt_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - enum ldlm_mode mode, __u64 flags, void *data) +static int mdt_intent_policy(const struct lu_env *env, + struct ldlm_namespace *ns, + struct ldlm_lock **lockp, + void *req_cookie, + enum ldlm_mode mode, + __u64 flags, void *data) { struct tgt_session_info *tsi; struct mdt_thread_info *info; @@ -4143,7 +4069,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, LASSERT(req != NULL); - tsi = tgt_ses_info(req->rq_svc_thread->t_env); + tsi = tgt_ses_info(env); info = tsi2mdt_info(tsi); LASSERT(info != NULL); @@ -5035,6 +4961,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) mdt_hsm_cdt_fini(m); + if (m->mdt_los != NULL) { + local_oid_storage_fini(env, m->mdt_los); + m->mdt_los = NULL; + } + if (m->mdt_namespace != NULL) { ldlm_namespace_free_post(m->mdt_namespace); d->ld_obd->obd_namespace = m->mdt_namespace = NULL; @@ -5078,6 +5009,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, struct seq_server_site *ss_site; const char *identity_upcall = "NONE"; struct md_device *next; + struct lu_fid fid; int rc; long node_id; mntopt_t mntopts; @@ -5102,7 +5034,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd = class_name2obd(dev); LASSERT(obd != NULL); - m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */ + m->mdt_max_mdsize = MAX_MD_SIZE_OLD; m->mdt_opts.mo_evict_tgt_nids = 1; m->mdt_opts.mo_cos = MDT_COS_DEFAULT; @@ -5122,7 +5054,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, } /* DoM files get IO lock at open by default */ - m->mdt_opts.mo_dom_lock = 1; + m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN; + /* DoM files are read at open and data is packed in the reply */ + m->mdt_opts.mo_dom_read_open = 1; m->mdt_squash.rsi_uid = 0; m->mdt_squash.rsi_gid = 0; @@ -5196,18 +5130,11 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, /* set obd_namespace for compatibility with old code */ obd->obd_namespace = m->mdt_namespace; - rc = mdt_hsm_cdt_init(m); - if (rc != 0) { - CERROR("%s: error initializing coordinator, rc %d\n", - mdt_obd_name(m), rc); - GOTO(err_free_ns, rc); - } - rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice, OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_MDS_ALL_REPLY_NET); if (rc) - GOTO(err_free_hsm, rc); + GOTO(err_free_ns, rc); /* Amount of available space excluded from granting and reserved * for metadata. It is in percentage and 50% is default value. */ @@ -5222,6 +5149,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (rc) GOTO(err_tgt, rc); + fid.f_seq = FID_SEQ_LOCAL_NAME; + fid.f_oid = 1; + fid.f_ver = 0; + rc = local_oid_storage_init(env, m->mdt_bottom, &fid, &m->mdt_los); + if (rc != 0) + GOTO(err_fs_cleanup, rc); + + rc = mdt_hsm_cdt_init(m); + if (rc != 0) { + CERROR("%s: error initializing coordinator, rc %d\n", + mdt_obd_name(m), rc); + GOTO(err_los_fini, rc); + } + tgt_adapt_sptlrpc_conf(&m->mdt_lut); next = m->mdt_child; @@ -5252,7 +5193,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (IS_ERR(m->mdt_identity_cache)) { rc = PTR_ERR(m->mdt_identity_cache); m->mdt_identity_cache = NULL; - GOTO(err_fs_cleanup, rc); + GOTO(err_free_hsm, rc); } rc = mdt_procfs_init(m, dev); @@ -5288,12 +5229,15 @@ err_recovery: target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; +err_free_hsm: + mdt_hsm_cdt_fini(m); +err_los_fini: + local_oid_storage_fini(env, m->mdt_los); + m->mdt_los = NULL; err_fs_cleanup: mdt_fs_cleanup(env, m); err_tgt: tgt_fini(env, &m->mdt_lut); -err_free_hsm: - mdt_hsm_cdt_fini(m); err_free_ns: ldlm_namespace_free(m->mdt_namespace, NULL, 0); obd->obd_namespace = m->mdt_namespace = NULL; @@ -5414,6 +5358,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, lu_object_add_top(h, o); o->lo_ops = &mdt_obj_ops; spin_lock_init(&mo->mot_write_lock); + mutex_init(&mo->mot_som_mutex); mutex_init(&mo->mot_lov_mutex); init_rwsem(&mo->mot_dom_sem); init_rwsem(&mo->mot_open_sem); @@ -5589,6 +5534,7 @@ static int mdt_connect_internal(const struct lu_env *env, struct mdt_device *mdt, struct obd_connect_data *data, bool reconnect) { + const char *obd_name = mdt_obd_name(mdt); LASSERT(data != NULL); data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; @@ -5617,8 +5563,7 @@ static int mdt_connect_internal(const struct lu_env *env, "ocd_version: %x ocd_grant: %d ocd_index: %u " "ocd_brw_size unexpectedly zero, network data " "corruption? Refusing to connect this client\n", - mdt_obd_name(mdt), - exp->exp_client_uuid.uuid, + obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_connect_flags, data->ocd_version, data->ocd_grant, data->ocd_index); return -EPROTO; @@ -5664,7 +5609,7 @@ static int mdt_connect_internal(const struct lu_env *env, if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) { CWARN("%s: MDS requires FID support, but client not\n", - mdt_obd_name(mdt)); + obd_name); return -EBADE; } @@ -5698,7 +5643,8 @@ static int mdt_connect_internal(const struct lu_env *env, /* The client set in ocd_cksum_types the checksum types it * supports. We have to mask off the algorithms that we don't * support */ - data->ocd_cksum_types &= cksum_types_supported_server(); + data->ocd_cksum_types &= + obd_cksum_types_supported_server(obd_name); if (unlikely(data->ocd_cksum_types == 0)) { CERROR("%s: Connect with checksum support but no " @@ -5766,7 +5712,7 @@ static int mdt_export_cleanup(struct obd_export *exp) /* Remove mfd handle so it can't be found again. * We are consuming the mfd_list reference here. */ - class_handle_unhash(&mfd->mfd_handle); + class_handle_unhash(&mfd->mfd_open_handle); list_move_tail(&mfd->mfd_list, &closing_list); } spin_unlock(&med->med_open_lock); @@ -5807,7 +5753,7 @@ static int mdt_export_cleanup(struct obd_export *exp) * archive request into a noop if it's not actually * dirty. */ - if (mfd->mfd_mode & FMODE_WRITE) + if (mfd->mfd_mode & MDS_FMODE_WRITE) rc = mdt_ctxt_add_dirty_flag(&env, info, mfd); /* Don't unlink orphan on failover umount, LU-184 */ @@ -6706,12 +6652,12 @@ struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info) * \param mdt mdt device * \param val 0 disables COS, other values enable COS */ -void mdt_enable_cos(struct mdt_device *mdt, int val) +void mdt_enable_cos(struct mdt_device *mdt, bool val) { struct lu_env env; int rc; - mdt->mdt_opts.mo_cos = !!val; + mdt->mdt_opts.mo_cos = val; rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) { CWARN("%s: lu_env initialization failed, cannot "