X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=0a2149b4f6537f3b2a99ddd74c998c51563c3b28;hp=64557082d981c8b5285a87c9b36a77dbf8ccf5c6;hb=HEAD;hpb=6a4be282bbbd5c6d92787abe9ae316e3c702192c diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 6455708..7a8e853 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -63,22 +63,55 @@ #include #include #include +#include #include "mdt_internal.h" -static unsigned int max_mod_rpcs_per_client = 8; -module_param(max_mod_rpcs_per_client, uint, 0644); -MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client"); +#if OBD_OCD_VERSION(3, 0, 53, 0) > LUSTRE_VERSION_CODE +static int mdt_max_mod_rpcs_per_client_set(const char *val, + cfs_kernel_param_arg_t *kp) +{ + unsigned int num; + int rc; + + rc = kstrtouint(val, 0, &num); + if (rc < 0) + return rc; + + if (num < 1 || num > OBD_MAX_RIF_MAX) + return -EINVAL; + + CWARN("max_mod_rpcs_per_client is deprecated, set mdt.*.max_mod_rpcs_in_flight parameter instead\n"); + + max_mod_rpcs_per_client = num; + return 0; +} +static const struct kernel_param_ops + param_ops_max_mod_rpcs_per_client = { + .set = mdt_max_mod_rpcs_per_client_set, + .get = param_get_uint, +}; + +#define param_check_max_mod_rpcs_per_client(name, p) \ + __param_check(name, p, unsigned int) + +module_param_cb(max_mod_rpcs_per_client, + ¶m_ops_max_mod_rpcs_per_client, + &max_mod_rpcs_per_client, 0644); + +MODULE_PARM_DESC(max_mod_rpcs_per_client, + "maximum number of modify RPCs in flight allowed per client (Deprecated)"); +#endif mdl_mode_t mdt_mdl_lock_modes[] = { - [LCK_MINMODE] = MDL_MINMODE, - [LCK_EX] = MDL_EX, - [LCK_PW] = MDL_PW, - [LCK_PR] = MDL_PR, - [LCK_CW] = MDL_CW, - [LCK_CR] = MDL_CR, - [LCK_NL] = MDL_NL, - [LCK_GROUP] = MDL_GROUP + [LCK_MINMODE] = MDL_MINMODE, + [LCK_EX] = MDL_EX, + [LCK_PW] = MDL_PW, + [LCK_PR] = MDL_PR, + [LCK_CW] = MDL_CW, + [LCK_CR] = MDL_CR, + [LCK_NL] = MDL_NL, + [LCK_GROUP] = MDL_GROUP }; enum ldlm_mode mdt_dlm_lock_modes[] = { @@ -155,8 +188,17 @@ void mdt_set_disposition(struct mdt_thread_info *info, rep->lock_policy_res1 |= op_flag; } +/* assert lock is unlocked before reuse */ +static inline void mdt_lock_handle_assert(struct mdt_lock_handle *lh) +{ + LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); + LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh)); + LASSERT(!lustre_handle_is_used(&lh->mlh_rreg_lh)); +} + void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm) { + mdt_lock_handle_assert(lh); lh->mlh_pdo_hash = 0; lh->mlh_reg_mode = lm; lh->mlh_rreg_mode = lm; @@ -173,6 +215,7 @@ void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock) void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode, const struct lu_name *lname) { + mdt_lock_handle_assert(lh); lh->mlh_reg_mode = lock_mode; lh->mlh_pdo_mode = LCK_MINMODE; lh->mlh_rreg_mode = lock_mode; @@ -183,11 +226,11 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode, lname->ln_namelen); /* XXX Workaround for LU-2856 * - * Zero is a valid return value of full_name_hash, but - * several users of mlh_pdo_hash assume a non-zero - * hash value. We therefore map zero onto an - * arbitrary, but consistent value (1) to avoid - * problems further down the road. */ + * Zero is a valid return value of full_name_hash, but several + * users of mlh_pdo_hash assume a non-zero hash value. We + * therefore map zero onto an arbitrary, but consistent + * value (1) to avoid problems further down the road. + */ if (unlikely(lh->mlh_pdo_hash == 0)) lh->mlh_pdo_hash = 1; } else { @@ -196,78 +239,114 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode, } static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh) -{ - mdl_mode_t mode; - ENTRY; - - /* - * Any dir access needs couple of locks: - * - * 1) on part of dir we gonna take lookup/modify; - * - * 2) on whole dir to protect it from concurrent splitting and/or to - * flush client's cache for readdir(). - * - * so, for a given mode and object this routine decides what lock mode - * to use for lock #2: - * - * 1) if caller's gonna lookup in dir then we need to protect dir from - * being splitted only - LCK_CR - * - * 2) if caller's gonna modify dir then we need to protect dir from - * being splitted and to flush cache - LCK_CW - * - * 3) if caller's gonna modify dir and that dir seems ready for - * splitting then we need to protect it from any type of access - * (lookup/modify/split) - LCK_EX --bzzz - */ - - LASSERT(lh->mlh_reg_mode != LCK_MINMODE); - LASSERT(lh->mlh_pdo_mode == LCK_MINMODE); - - /* - * Ask underlaying level its opinion about preferable PDO lock mode - * having access type passed as regular lock mode: - * - * - MDL_MINMODE means that lower layer does not want to specify lock - * mode; - * - * - MDL_NL means that no PDO lock should be taken. This is used in some - * cases. Say, for non-splittable directories no need to use PDO locks - * at all. - */ - mode = mdo_lock_mode(info->mti_env, mdt_object_child(o), - mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode)); - - if (mode != MDL_MINMODE) { - lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode); - } else { - /* - * Lower layer does not want to specify locking mode. We do it - * our selves. No special protection is needed, just flush - * client's cache on modification and allow concurrent - * mondification. - */ - switch (lh->mlh_reg_mode) { - case LCK_EX: - lh->mlh_pdo_mode = LCK_EX; - break; - case LCK_PR: - lh->mlh_pdo_mode = LCK_CR; - break; - case LCK_PW: - lh->mlh_pdo_mode = LCK_CW; - break; - default: - CERROR("Not expected lock type (0x%x)\n", - (int)lh->mlh_reg_mode); - LBUG(); - } - } - - LASSERT(lh->mlh_pdo_mode != LCK_MINMODE); - EXIT; + struct mdt_lock_handle *lh) +{ + mdl_mode_t mode; + + ENTRY; + + /* + * Any dir access needs couple of locks: + * + * 1) on part of dir we gonna take lookup/modify; + * + * 2) on whole dir to protect it from concurrent splitting and/or to + * flush client's cache for readdir(). + * + * so, for a given mode and object this routine decides what lock mode + * to use for lock #2: + * + * 1) if caller's gonna lookup in dir then we need to protect dir from + * being splitted only - LCK_CR + * + * 2) if caller's gonna modify dir then we need to protect dir from + * being splitted and to flush cache - LCK_CW + * + * 3) if caller's gonna modify dir and that dir seems ready for + * splitting then we need to protect it from any type of access + * (lookup/modify/split) - LCK_EX --bzzz + */ + + LASSERT(lh->mlh_reg_mode != LCK_MINMODE); + LASSERT(lh->mlh_pdo_mode == LCK_MINMODE); + + /* + * Ask underlaying level its opinion about preferable PDO lock mode + * having access type passed as regular lock mode: + * + * - MDL_MINMODE means that lower layer does not want to specify lock + * mode; + * + * - MDL_NL means that no PDO lock should be taken. This is used in some + * cases. Say, for non-splittable directories no need to use PDO locks + * at all. + */ + mode = mdo_lock_mode(info->mti_env, mdt_object_child(o), + mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode)); + + if (mode != MDL_MINMODE) { + lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode); + } else { + /* + * Lower layer does not want to specify locking mode. We do it + * our selves. No special protection is needed, just flush + * client's cache on modification and allow concurrent + * mondification. + */ + switch (lh->mlh_reg_mode) { + case LCK_EX: + lh->mlh_pdo_mode = LCK_EX; + break; + case LCK_PR: + lh->mlh_pdo_mode = LCK_CR; + break; + case LCK_PW: + lh->mlh_pdo_mode = LCK_CW; + break; + default: + CERROR("Not expected lock type (0x%x)\n", + (int)lh->mlh_reg_mode); + LBUG(); + } + } + + LASSERT(lh->mlh_pdo_mode != LCK_MINMODE); + EXIT; +} + +/** + * Check whether \a o is directory stripe object. + * + * \param[in] info thread environment + * \param[in] o MDT object + * + * \retval 1 is directory stripe. + * \retval 0 isn't directory stripe. + * \retval < 1 error code + */ +static int mdt_is_dir_stripe(struct mdt_thread_info *info, + struct mdt_object *o) +{ + struct md_attr *ma = &info->mti_attr; + struct lmv_mds_md_v1 *lmv; + int rc; + + rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); + if (rc < 0) + return rc; + + if (!(ma->ma_valid & MA_LMV)) + return 0; + + lmv = &ma->ma_lmv->lmv_md_v1; + + if (!lmv_is_sane2(lmv)) + return -EBADF; + + if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE) + return 1; + + return 0; } static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, @@ -277,8 +356,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, struct lu_name *lname = &info->mti_name; const char *start = fileset; char *filename = info->mti_filename; - struct mdt_object *parent; - u32 mode; + struct mdt_object *obj; int rc = 0; LASSERT(!info->mti_cross_ref); @@ -312,8 +390,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, } /* reject .. as a path component */ - if (lname->ln_namelen == 2 && - strncmp(s1, "..", 2) == 0) { + if (lname->ln_namelen == 2 && strncmp(s1, "..", 2) == 0) { rc = -EINVAL; break; } @@ -322,39 +399,18 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, filename[lname->ln_namelen] = '\0'; lname->ln_name = filename; - parent = mdt_object_find(info->mti_env, mdt, fid); - if (IS_ERR(parent)) { - rc = PTR_ERR(parent); + obj = mdt_object_find(info->mti_env, mdt, fid); + if (IS_ERR(obj)) { + rc = PTR_ERR(obj); break; } /* Only got the fid of this obj by name */ fid_zero(fid); - rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, + rc = mdo_lookup(info->mti_env, mdt_object_child(obj), lname, fid, &info->mti_spec); - mdt_object_put(info->mti_env, parent); - } - if (!rc) { - parent = mdt_object_find(info->mti_env, mdt, fid); - if (IS_ERR(parent)) - rc = PTR_ERR(parent); - else { - mode = lu_object_attr(&parent->mot_obj); - if (!S_ISDIR(mode)) { - rc = -ENOTDIR; - } else if (mdt_is_remote_object(info, parent, parent)) { - if (!mdt->mdt_enable_remote_subdir_mount) { - rc = -EREMOTE; - LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n", - mdt_obd_name(mdt), - fileset, rc); - } else { - LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n", - mdt_obd_name(mdt), - fileset); - } - } - mdt_object_put(info->mti_env, parent); - } + if (!rc && !S_ISDIR(lu_object_attr(&obj->mot_obj))) + rc = -ENOTDIR; + mdt_object_put(info->mti_env, obj); } return rc; @@ -376,7 +432,7 @@ static int mdt_get_root(struct tgt_session_info *tsi) if (rc) GOTO(out, rc = err_serious(rc)); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK)) GOTO(out, rc = err_serious(-ENOMEM)); repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); @@ -391,7 +447,8 @@ static int mdt_get_root(struct tgt_session_info *tsi) CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset); if (fileset) { /* consider fileset from client as a sub-fileset - * of the nodemap one */ + * of the nodemap one + */ OBD_ALLOC(buffer, PATH_MAX + 1); if (buffer == NULL) GOTO(out, rc = err_serious(-ENOMEM)); @@ -413,6 +470,7 @@ static int mdt_get_root(struct tgt_session_info *tsi) } else { repbody->mbo_fid1 = mdt->mdt_md_root_fid; } + exp->exp_root_fid = repbody->mbo_fid1; repbody->mbo_valid |= OBD_MD_FLID; EXIT; @@ -437,21 +495,22 @@ static int mdt_statfs(struct tgt_session_info *tsi) ktime_t kstart = ktime_get(); int current_blockbits; int rc; + timeout_t at_est; ENTRY; svcpt = req->rq_rqbd->rqbd_svcpt; /* This will trigger a watchdog timeout */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, - (MDT_SERVICE_WATCHDOG_FACTOR * - at_get(&svcpt->scp_at_estimate)) + 1); + at_est = obd_at_get(mdt->mdt_lu_dev.ld_obd, &svcpt->scp_at_estimate); + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, + (MDT_SERVICE_WATCHDOG_FACTOR * at_est) + 1); rc = mdt_check_ucred(info); if (rc) GOTO(out, rc = err_serious(rc)); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) GOTO(out, rc = err_serious(-ENOMEM)); osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS); @@ -472,24 +531,23 @@ static int mdt_statfs(struct tgt_session_info *tsi) msf = &mdt->mdt_osfs; if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) { - /** statfs data is too old, get up-to-date one */ - if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) - rc = next->md_ops->mdo_statfs(info->mti_env, - next, osfs); - else - rc = dt_statfs(info->mti_env, mdt->mdt_bottom, - osfs); - if (rc) - GOTO(out, rc); - spin_lock(&mdt->mdt_lock); - msf->msf_osfs = *osfs; - msf->msf_age = ktime_get_seconds(); - spin_unlock(&mdt->mdt_lock); + /** statfs data is too old, get up-to-date one */ + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + rc = next->md_ops->mdo_statfs(info->mti_env, next, + osfs); + else + rc = dt_statfs(info->mti_env, mdt->mdt_bottom, osfs); + if (rc) + GOTO(out, rc); + spin_lock(&mdt->mdt_lock); + msf->msf_osfs = *osfs; + msf->msf_age = ktime_get_seconds(); + spin_unlock(&mdt->mdt_lock); } else { - /** use cached statfs data */ - spin_lock(&mdt->mdt_lock); - *osfs = msf->msf_osfs; - spin_unlock(&mdt->mdt_lock); + /** use cached statfs data */ + spin_lock(&mdt->mdt_lock); + *osfs = msf->msf_osfs; + spin_unlock(&mdt->mdt_lock); } /* tgd_blockbit is recordsize bits set during mkfs. @@ -501,11 +559,10 @@ static int mdt_statfs(struct tgt_session_info *tsi) */ current_blockbits = fls64(osfs->os_bsize) - 1; - /* at least try to account for cached pages. its still racy and - * might be under-reporting if clients haven't announced their - * caches with brw recently */ - CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu" - " pending %llu free %llu avail %llu\n", + /* Account for cached pages. its still racy and might be under-reporting + * if clients haven't announced their caches with brw recently + */ + CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu pending %llu free %llu avail %llu\n", tgd->tgd_tot_dirty, tgd->tgd_tot_granted, tgd->tgd_tot_pending, osfs->os_bfree << current_blockbits, @@ -516,6 +573,8 @@ static int mdt_statfs(struct tgt_session_info *tsi) osfs->os_bsize - 1) >> current_blockbits)); tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__); + if (mdt->mdt_lut.lut_no_create) + osfs->os_state |= OS_STATFS_NOCREATE; CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; " "%llu objects: %llu free; state %x\n", osfs->os_blocks, osfs->os_bfree, osfs->os_bavail, @@ -527,7 +586,8 @@ static int mdt_statfs(struct tgt_session_info *tsi) * should not see a block size > page size, otherwise * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12) * block size which is the biggest block size known to work - * with all client's page size. */ + * with all client's page size. + */ osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT; osfs->os_bfree <<= current_blockbits - COMPAT_BSIZE_SHIFT; osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT; @@ -564,7 +624,7 @@ __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only) /* Fast check for DoM entry with no mirroring, should be the first */ if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 && - lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT) + !(lov_pattern(le32_to_cpu(v1->lmm_pattern)) & LOV_PATTERN_MDT)) RETURN(0); /* check all entries otherwise */ @@ -578,7 +638,7 @@ __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only) off = le32_to_cpu(lcme->lcme_offset); v1 = (struct lov_mds_md *)((char *)comp_v1 + off); - if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) == + if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) & LOV_PATTERN_MDT) dom_stripesize = le32_to_cpu(v1->lmm_stripe_size); else @@ -593,9 +653,7 @@ __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only) RETURN(dom_stripesize); } -/** - * Pack size attributes into the reply. - */ +/* Pack size attributes into the reply. */ int mdt_pack_size2body(struct mdt_thread_info *info, const struct lu_fid *fid, struct lustre_handle *lh) { @@ -632,7 +690,8 @@ int mdt_pack_size2body(struct mdt_thread_info *info, RETURN(0); /* Either DoM lock exists or LMM has only DoM stripe then - * return size on body. */ + * return size on body. + */ b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock); @@ -763,10 +822,11 @@ static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm) } void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, - const struct lu_attr *attr, const struct lu_fid *fid) + const struct lu_attr *attr, const struct lu_fid *fid) { - struct md_attr *ma = &info->mti_attr; + struct mdt_device *mdt = info->mti_mdt; struct obd_export *exp = info->mti_exp; + struct md_attr *ma = &info->mti_attr; struct lu_nodemap *nodemap = NULL; LASSERT(ma->ma_valid & MA_INODE); @@ -795,7 +855,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, b->mbo_nlink = attr->la_nlink; b->mbo_valid |= OBD_MD_FLNLINK; } - if (attr->la_valid & (LA_UID|LA_GID)) { + if (attr->la_valid & (LA_UID|LA_GID|LA_PROJID)) { nodemap = nodemap_get_from_exp(exp); if (IS_ERR(nodemap)) goto out; @@ -814,8 +874,9 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, } if (attr->la_valid & LA_PROJID) { - /* TODO, nodemap for project id */ - b->mbo_projid = attr->la_projid; + b->mbo_projid = nodemap_map_id(nodemap, NODEMAP_PROJID, + NODEMAP_FS_TO_CLIENT, + attr->la_projid); b->mbo_valid |= OBD_MD_FLPROJID; } @@ -847,7 +908,8 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, /* just ignore blocks occupied by extend attributes on MDS */ b->mbo_blocks = 0; /* if no object is allocated on osts, the size on mds is valid. - * b=22272 */ + * b=22272 + */ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) { if (mdt_hsm_is_released(ma->ma_lmm)) { @@ -860,7 +922,8 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, else b->mbo_blocks = 1; b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - } else if (info->mti_som_valid) { /* som is valid */ + } else if (info->mti_som_strict && mdt->mdt_enable_strict_som) { + /* use SOM for size*/ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */ b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS; @@ -888,27 +951,29 @@ static inline int mdt_body_has_lov(const struct lu_attr *la, void mdt_client_compatibility(struct mdt_thread_info *info) { - struct mdt_body *body; - struct ptlrpc_request *req = mdt_info_req(info); - struct obd_export *exp = req->rq_export; - struct md_attr *ma = &info->mti_attr; - struct lu_attr *la = &ma->ma_attr; - ENTRY; + struct mdt_body *body; + struct ptlrpc_request *req = mdt_info_req(info); + struct obd_export *exp = req->rq_export; + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + + ENTRY; if (exp_connect_layout(exp)) /* the client can deal with 16-bit lmm_stripe_count */ RETURN_EXIT; - body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - if (!mdt_body_has_lov(la, body)) - RETURN_EXIT; + if (!mdt_body_has_lov(la, body)) + RETURN_EXIT; - /* now we have a reply with a lov for a client not compatible with the - * layout lock so we have to clean the layout generation number */ - if (S_ISREG(la->la_mode)) - ma->ma_lmm->lmm_layout_gen = 0; - EXIT; + /* now we have a reply with a lov for a client not compatible with the + * layout lock so we have to clean the layout generation number + */ + if (S_ISREG(la->la_mode)) + ma->ma_lmm->lmm_layout_gen = 0; + EXIT; } static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info, @@ -949,6 +1014,7 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, { const struct lu_env *env = info->mti_env; int rc; + ENTRY; LASSERT(info->mti_big_lmm_used == 0); @@ -1008,6 +1074,13 @@ int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, LASSERT(buf->lb_buf); + if (!mdt_object_exists(o)) + return -ENOENT; + + if (mdt_object_remote(o) && S_ISDIR(lu_object_attr(&o->mot_obj))) + /* force reload layout for remote dir in case layout changed */ + mo_invalidate(info->mti_env, mdt_object_child(o)); + rc = mo_xattr_get(info->mti_env, next, buf, name); if (rc > 0) { @@ -1022,10 +1095,9 @@ got: !(exp_connect_flags(info->mti_exp) & OBD_CONNECT_LFSCK)) { return -EIO; - } else { - ma->ma_lmm_size = rc; - ma->ma_valid |= MA_LOV; } + ma->ma_lmm_size = rc; + ma->ma_valid |= MA_LOV; } else if (strcmp(name, XATTR_NAME_LMV) == 0) { if (info->mti_big_lmm_used) ma->ma_lmv = info->mti_big_lmm; @@ -1047,7 +1119,8 @@ got: rc = 0; } else if (rc == -ERANGE) { /* Default LMV has fixed size, so it must be able to fit - * in the original buffer */ + * in the original buffer + */ if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) return rc; rc = mdt_big_xattr_get(info, o, name); @@ -1101,6 +1174,7 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, struct link_ea_header *leh; struct link_ea_entry *lee; int rc; + ENTRY; buf->lb_buf = info->mti_big_lmm; @@ -1108,7 +1182,8 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf, XATTR_NAME_LINK); /* ignore errors, MA_PFID won't be set and it is - * up to the caller to treat this as an error */ + * up to the caller to treat this as an error + */ if (rc == -ERANGE || buf->lb_len == 0) { rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK); buf->lb_buf = info->mti_big_lmm; @@ -1195,6 +1270,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, int need = ma->ma_need; int rc = 0, rc2; u32 mode; + ENTRY; ma->ma_valid = 0; @@ -1205,6 +1281,10 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, if (need & MA_INODE) { ma->ma_need = MA_INODE; + if (need & MA_DIRENT_CNT) + ma->ma_attr.la_valid |= LA_DIRENT_CNT; + else + ma->ma_attr.la_valid &= ~LA_DIRENT_CNT; rc = mo_attr_get(env, next, ma); if (rc) GOTO(out, rc); @@ -1240,9 +1320,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc); } - /* - * In the handle of MA_INODE, we may already get the SOM attr. - */ + /* In the handle of MA_INODE, we may already get the SOM attr. */ if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) { rc = mdt_get_som(info, o, ma); if (rc != 0) @@ -1284,6 +1362,20 @@ out: RETURN(rc); } +static void mdt_preset_encctx_size(struct mdt_thread_info *info) +{ + struct req_capsule *pill = info->mti_pill; + + ENTRY; + if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX, + RCL_SERVER)) + /* pre-set size in server part with max size */ + req_capsule_set_size(pill, &RMF_FILE_ENCCTX, + RCL_SERVER, + info->mti_mdt->mdt_max_mdsize); + EXIT; +} + static int mdt_getattr_internal(struct mdt_thread_info *info, struct mdt_object *o, int ma_need) { @@ -1303,7 +1395,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) RETURN(err_serious(-ENOMEM)); repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); @@ -1311,8 +1403,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, ma->ma_valid = 0; if (mdt_object_remote(o)) { - /* This object is located on remote node.*/ - /* Return -ENOTSUPP for old client */ + /* obj is located on remote node Return -ENOTSUPP(old client) */ if (!mdt_is_dne_client(req->rq_export)) GOTO(out, rc = -ENOTSUPP); @@ -1404,6 +1495,13 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, RETURN(rc); } + /* return immutable attr on fscrypt metadata files + * if fscrypt admin is not permitted + */ + if (o->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD && + !mdt_ucred(info)->uc_rbac_fscrypt_admin) + la->la_flags |= LUSTRE_IMMUTABLE_FL; + /* if file is released, check if a restore is running */ if (ma->ma_valid & MA_HSM) { repbody->mbo_valid |= OBD_MD_TSTATE; @@ -1483,7 +1581,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, reqbody->mbo_valid & OBD_MD_LINKNAME) { buffer->lb_buf = ma->ma_lmm; /* eadatasize from client includes NULL-terminator, so - * there is no need to read it */ + * there is no need to read it + */ buffer->lb_len = reqbody->mbo_eadatasize - 1; rc = mo_readlink(env, next, buffer); if (unlikely(rc <= 0)) { @@ -1494,15 +1593,16 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } else { int print_limit = min_t(int, PAGE_SIZE - 128, rc); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO)) rc -= 2; repbody->mbo_valid |= OBD_MD_LINKNAME; /* we need to report back size with NULL-terminator - * because client expects that */ + * because client expects that + */ repbody->mbo_eadatasize = rc + 1; if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize) - CDEBUG(D_INODE, "%s: Read shorter symlink %d " - "on "DFID ", expected %d\n", + CDEBUG(D_INODE, "%s: Read shorter symlink %d on " + DFID ", expected %d\n", mdt_obd_name(info->mti_mdt), rc, PFID(mdt_object_fid(o)), reqbody->mbo_eadatasize - 1); @@ -1511,7 +1611,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, /* If the total CDEBUG() size is larger than a page, it * will print a warning to the console, avoid this by - * printing just the last part of the symlink. */ + * printing just the last part of the symlink. + */ CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n", print_limit < rc ? "..." : "", print_limit, (char *)ma->ma_lmm + rc - print_limit, rc); @@ -1530,6 +1631,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) && (reqbody->mbo_valid & OBD_MD_FLACL)) { struct lu_nodemap *nodemap = nodemap_get_from_exp(exp); + if (IS_ERR(nodemap)) RETURN(PTR_ERR(nodemap)); @@ -1549,12 +1651,13 @@ out: static int mdt_getattr(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct mdt_object *obj = info->mti_object; - struct req_capsule *pill = info->mti_pill; - struct mdt_body *reqbody; - struct mdt_body *repbody; - int rc, rc2; - ENTRY; + struct mdt_object *obj = info->mti_object; + struct req_capsule *pill = info->mti_pill; + struct mdt_body *reqbody; + struct mdt_body *repbody; + int rc, rc2; + + ENTRY; if (unlikely(info->mti_object == NULL)) RETURN(-EPROTO); @@ -1572,11 +1675,12 @@ static int mdt_getattr(struct tgt_session_info *tsi) /* Unlike intent case where we need to pre-fill out buffers early on * in intent policy for ldlm reasons, here we can have a much better * guess at EA size by just reading it from disk. - * Exceptions are readdir and (missing) directory striping */ - /* Readlink */ - if (reqbody->mbo_valid & OBD_MD_LINKNAME) { + * Exceptions are readdir and (missing) directory striping + */ + if (reqbody->mbo_valid & OBD_MD_LINKNAME) { /* Readlink */ /* No easy way to know how long is the symlink, but it cannot - * be more than PATH_MAX, so we allocate +1 */ + * be more than PATH_MAX, so we allocate +1 + */ rc = PATH_MAX + 1; /* A special case for fs ROOT: getattr there might fetch * default EA for entire fs, not just for this dir! @@ -1587,7 +1691,8 @@ static int mdt_getattr(struct tgt_session_info *tsi) (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) == MDS_GETATTR)) { /* Should the default strping be bigger, mdt_fix_reply - * will reallocate */ + * will reallocate + */ rc = DEF_REP_MD_SIZE; } else { /* Read the actual EA size from disk */ @@ -1601,16 +1706,18 @@ static int mdt_getattr(struct tgt_session_info *tsi) /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD * by default. If the target object has more ACL entries, then - * enlarge the buffer when necessary. */ + * enlarge the buffer when necessary. + */ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); + mdt_preset_encctx_size(info); rc = req_capsule_server_pack(pill); if (unlikely(rc != 0)) GOTO(out, rc = err_serious(rc)); - repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); - LASSERT(repbody != NULL); + repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); + LASSERT(repbody != NULL); repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; @@ -1620,8 +1727,18 @@ static int mdt_getattr(struct tgt_session_info *tsi) info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); + rc = mdt_init_ucred(info, reqbody); + if (rc) + GOTO(out_shrink, rc); + rc = mdt_getattr_internal(info, obj, 0); + if (unlikely(rc)) + GOTO(out_ucred, rc); + + rc = mdt_pack_encctx_in_reply(info, obj); EXIT; +out_ucred: + mdt_exit_ucred(info); out_shrink: mdt_client_compatibility(info); rc2 = mdt_fix_reply(info); @@ -1674,9 +1791,7 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, if (layout->mlc_opc == MD_LAYOUT_WRITE) lockpart |= MDS_INODELOCK_UPDATE; - mdt_lock_handle_init(lhc); - mdt_lock_reg_init(lhc, LCK_EX); - rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false); + rc = mdt_object_lock(info, obj, lhc, lockpart, LCK_EX); if (rc) RETURN(rc); } @@ -1721,7 +1836,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) struct mdt_object *o1, *o2, *o; struct mdt_lock_handle *lh1, *lh2; struct mdc_swap_layouts *msl; - int rc; + int rc; + ENTRY; /* client does not support layout lock, so layout swaping @@ -1730,7 +1846,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) * layout lock yet. If those clients have already opened the file * they won't be notified at all so that old layout may still be * used to do IO. This can be fixed after file release is landed by - * doing exclusive open and taking full EX ibits lock. - Jinshan */ + * doing exclusive open and taking full EX ibits lock. - Jinshan + */ if (!exp_connect_layout(exp)) RETURN(-EOPNOTSUPP); @@ -1758,7 +1875,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) swap(o1, o2); /* permission check. Make sure the calling process having permission - * to write both files. */ + * to write both files. + */ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL, MAY_WRITE); if (rc < 0) @@ -1774,22 +1892,19 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) GOTO(put, rc = -EPROTO); lh1 = &info->mti_lh[MDT_LH_NEW]; - mdt_lock_reg_init(lh1, LCK_EX); lh2 = &info->mti_lh[MDT_LH_OLD]; - mdt_lock_reg_init(lh2, LCK_EX); - rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT | - MDS_INODELOCK_XATTR); + MDS_INODELOCK_XATTR, LCK_EX); if (rc < 0) GOTO(put, rc); rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT | - MDS_INODELOCK_XATTR); + MDS_INODELOCK_XATTR, LCK_EX); if (rc < 0) GOTO(unlock1, rc); rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1), - mdt_object_child(o2), msl->msl_flags); + mdt_object_child(o2), 0, 0, msl->msl_flags); if (rc < 0) GOTO(unlock2, rc); @@ -1808,29 +1923,143 @@ out: static int mdt_raw_lookup(struct mdt_thread_info *info, struct mdt_object *parent, - const struct lu_name *lname, - struct ldlm_reply *ldlm_rep) + const struct lu_name *lname) { - struct lu_fid *child_fid = &info->mti_tmp_fid1; - int rc; + struct lu_fid *fid = &info->mti_tmp_fid1; + struct mdt_body *repbody; + bool is_dotdot = false; + bool is_old_parent_stripe = false; + bool is_new_parent_checked = false; + int rc; + ENTRY; LASSERT(!info->mti_cross_ref); + /* Always allow to lookup ".." */ + if (lname->ln_namelen == 2 && + lname->ln_name[0] == '.' && lname->ln_name[1] == '.') { + info->mti_spec.sp_permitted = 1; + is_dotdot = true; + if (mdt_is_dir_stripe(info, parent) == 1) + is_old_parent_stripe = true; + } + mdt_object_get(info->mti_env, parent); +lookup: /* Only got the fid of this obj by name */ - fid_zero(child_fid); - rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object), - lname, child_fid, &info->mti_spec); - if (rc == 0) { - struct mdt_body *repbody; + fid_zero(fid); + rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid, + &info->mti_spec); + mdt_object_put(info->mti_env, parent); + if (rc) + RETURN(rc); + + /* getattr_name("..") should return master object FID for striped dir */ + if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) { + parent = mdt_object_find(info->mti_env, info->mti_mdt, fid); + if (IS_ERR(parent)) + RETURN(PTR_ERR(parent)); + + /* old client getattr_name("..") with stripe FID */ + if (unlikely(is_old_parent_stripe)) { + is_old_parent_stripe = false; + goto lookup; + } + + /* ".." may be a stripe */ + if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) { + is_new_parent_checked = true; + goto lookup; + } + + mdt_object_put(info->mti_env, parent); + } + + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + repbody->mbo_fid1 = *fid; + repbody->mbo_valid = OBD_MD_FLID; + + RETURN(rc); +} + +/** + * Find name matching hash + * + * We search \a child LinkEA for a name whose hash matches \a lname + * (it contains an encoded hash). + * + * \param info mdt thread info + * \param lname encoded hash to find + * \param parent parent object + * \param child object to search with LinkEA + * + * \retval 1 match found + * \retval 0 no match found + * \retval -ev negative errno upon error + */ +int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname, + struct mdt_object *parent, struct mdt_object *child) +{ + /* Here, lname is an encoded hash of on-disk name, and + * client is doing access without encryption key. + * So we need to get LinkEA, check parent fid is correct and + * compare name hash with the one in the request. + */ + struct lu_buf *buf = &info->mti_big_buf; + struct lu_name name; + struct lu_fid pfid; + struct linkea_data ldata = { NULL }; + struct link_ea_header *leh; + struct link_ea_entry *lee; + struct lu_buf link = { 0 }; + char *hash; + int reclen, count, rc; + + ENTRY; + if (lname->ln_namelen < LL_CRYPTO_BLOCK_SIZE) + RETURN(-EINVAL); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - repbody->mbo_fid1 = *child_fid; - repbody->mbo_valid = OBD_MD_FLID; - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); - } else if (rc == -ENOENT) { - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); + buf = lu_buf_check_and_alloc(buf, PATH_MAX); + if (!buf->lb_buf) + RETURN(-ENOMEM); + + ldata.ld_buf = buf; + rc = mdt_links_read(info, child, &ldata); + if (rc < 0) + RETURN(rc); + + hash = kmalloc(lname->ln_namelen, GFP_NOFS); + if (!hash) + RETURN(-ENOMEM); + rc = critical_decode(lname->ln_name, lname->ln_namelen, hash); + + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + for (count = 0; count < leh->leh_reccount; count++) { + linkea_entry_unpack(lee, &reclen, &name, &pfid); + if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) { + lu_buf_check_and_alloc(&link, name.ln_namelen); + if (!link.lb_buf) + GOTO(out_match, rc = -ENOMEM); + rc = critical_decode(name.ln_name, name.ln_namelen, + link.lb_buf); + + if (memcmp(LLCRYPT_EXTRACT_DIGEST(link.lb_buf, rc), + hash, LL_CRYPTO_BLOCK_SIZE) == 0) { + *lname = name; + break; + } + } + lee = (struct link_ea_entry *) ((char *)lee + reclen); } + if (count == leh->leh_reccount) + rc = 0; + else + rc = 1; + +out_match: + lu_buf_free(&link); + kfree(hash); RETURN(rc); } @@ -1842,9 +2071,9 @@ static int mdt_raw_lookup(struct mdt_thread_info *info, * (2)intent request will grant the lock to client. */ static int mdt_getattr_name_lock(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc, - __u64 child_bits, - struct ldlm_reply *ldlm_rep) + struct mdt_lock_handle *lhc, + __u64 child_bits, + struct ldlm_reply *ldlm_rep) { struct ptlrpc_request *req = mdt_info_req(info); struct mdt_body *reqbody = NULL; @@ -1855,6 +2084,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, struct mdt_lock_handle *lhp = NULL; struct ldlm_lock *lock; struct req_capsule *pill = info->mti_pill; + bool fscrypt_md = false; __u64 try_bits = 0; bool is_resent; int ma_need = 0; @@ -1869,22 +2099,21 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (parent == NULL) RETURN(-ENOENT); + if (info->mti_mdt->mdt_enable_dir_auto_split) + ma_need |= MA_DIRENT_CNT; + if (info->mti_cross_ref) { /* Only getattr on the child. Parent is on another node. */ mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD | DISP_LOOKUP_POS); child = parent; - CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", " - "ldlm_rep = %p\n", + CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", ldlm_rep = %p\n", PFID(mdt_object_fid(child)), ldlm_rep); rc = mdt_check_resent_lock(info, child, lhc); if (rc < 0) { RETURN(rc); } else if (rc > 0) { - mdt_lock_handle_init(lhc); - mdt_lock_reg_init(lhc, LCK_PR); - /* * Object's name entry is on another MDS, it will * request PERM lock only because LOOKUP lock is owned @@ -1895,8 +2124,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, child_bits &= ~(MDS_INODELOCK_LOOKUP | MDS_INODELOCK_LAYOUT); child_bits |= MDS_INODELOCK_PERM; - - rc = mdt_object_lock(info, child, lhc, child_bits); + rc = mdt_object_lock(info, child, lhc, child_bits, + LCK_PR); if (rc < 0) RETURN(rc); } @@ -1910,7 +2139,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, RETURN(-ENOENT); } - rc = mdt_getattr_internal(info, child, 0); + rc = mdt_getattr_internal(info, child, ma_need); if (unlikely(rc != 0)) { mdt_object_unlock(info, child, lhc, 1); RETURN(rc); @@ -1931,7 +2160,29 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, lname = &info->mti_name; mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON); - if (lu_name_is_valid(lname)) { + if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) { + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); + if (unlikely(reqbody == NULL)) + RETURN(err_serious(-EPROTO)); + + *child_fid = reqbody->mbo_fid2; + if (unlikely(!fid_is_sane(child_fid))) + RETURN(err_serious(-EINVAL)); + + if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { + mdt_object_get(info->mti_env, parent); + child = parent; + } else { + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } + + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", ldlm_rep = %p\n", + PFID(mdt_object_fid(parent)), + PFID(&reqbody->mbo_fid2), ldlm_rep); + } else if (lu_name_is_valid(lname)) { if (mdt_object_remote(parent)) { CERROR("%s: parent "DFID" is on remote target\n", mdt_obd_name(info->mti_mdt), @@ -1939,9 +2190,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, RETURN(-EPROTO); } - CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", " - "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", ldlm_rep = %p\n", + PFID(mdt_object_fid(parent)), PNAME(lname), ldlm_rep); + + if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD || + (fid_is_root(mdt_object_fid(parent)) && + lname->ln_namelen == strlen(dot_fscrypt_name) && + strncmp(lname->ln_name, dot_fscrypt_name, + lname->ln_namelen) == 0)) + fscrypt_md = true; } else { reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); if (unlikely(reqbody == NULL)) @@ -1975,30 +2233,44 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (rc) child_bits &= ~MDS_INODELOCK_LOOKUP; - CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " - "ldlm_rep = %p\n", + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), PFID(&reqbody->mbo_fid2), ldlm_rep); } mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); - if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) { + if (unlikely(!mdt_object_exists(parent)) && + !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) && + lu_name_is_valid(lname)) { LU_OBJECT_DEBUG(D_INODE, info->mti_env, &parent->mot_obj, "Parent doesn't exist!"); GOTO(out_child, rc = -ESTALE); } - if (lu_name_is_valid(lname)) { - /* Always allow to lookup ".." */ - if (unlikely(lname->ln_namelen == 2 && - lname->ln_name[0] == '.' && - lname->ln_name[1] == '.')) - info->mti_spec.sp_permitted = 1; - + if (!child && is_resent) { + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + if (lock == NULL) { + /* Lock is pinned by ldlm_handle_enqueue0() as it is + * a resend case, however, it could be already destroyed + * due to client eviction or a raced cancel RPC. + */ + LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx", + lhc->mlh_reg_lh.cookie); + RETURN(-ESTALE); + } + fid_extract_from_res_name(child_fid, + &lock->l_resource->lr_name); + LDLM_LOCK_PUT(lock); + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } else if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) && + lu_name_is_valid(lname)) { if (info->mti_body->mbo_valid == OBD_MD_FLID) { - rc = mdt_raw_lookup(info, parent, lname, ldlm_rep); + rc = mdt_raw_lookup(info, parent, lname); RETURN(rc); } @@ -2006,15 +2278,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, /* step 1: lock parent only if parent is a directory */ if (S_ISDIR(lu_object_attr(&parent->mot_obj))) { lhp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lhp, LCK_PR, lname); - rc = mdt_object_lock(info, parent, lhp, - MDS_INODELOCK_UPDATE); + rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PR); if (unlikely(rc != 0)) RETURN(rc); } - /* step 2: lookup child's fid by name */ - fid_zero(child_fid); + /* step 2: lookup child's fid by name */ + fid_zero(child_fid); rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, child_fid, &info->mti_spec); if (rc == -ENOENT) @@ -2034,11 +2304,24 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, /* step 3: lock child regardless if it is local or remote. */ LASSERT(child); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); - if (!mdt_object_exists(child)) { - LU_OBJECT_DEBUG(D_INODE, info->mti_env, - &child->mot_obj, - "Object doesn't exist!"); + if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) { + /* Here, lname is an encoded hash of on-disk name, and + * client is doing access without encryption key. + * So we need to compare name hash with the one in the request. + */ + if (!find_name_matching_hash(info, lname, parent, + child)) { + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); + mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + GOTO(out_child, rc = -ENOENT); + } + } + + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); + if (!mdt_object_exists(child)) { + LU_OBJECT_DEBUG(D_INODE, info->mti_env, + &child->mot_obj, + "Object doesn't exist!"); GOTO(out_child, rc = -ENOENT); } @@ -2046,9 +2329,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (rc < 0) { GOTO(out_child, rc); } else if (rc > 0) { - mdt_lock_handle_init(lhc); - mdt_lock_reg_init(lhc, LCK_PR); - if (!(child_bits & MDS_INODELOCK_UPDATE) && !mdt_object_remote(child)) { struct md_attr *ma = &info->mti_attr; @@ -2063,7 +2343,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, * return not only a LOOKUP lock, but also an UPDATE * lock and this might save us RPC on later STAT. For * directories, it also let negative dentry cache start - * working for this dir. */ + * working for this dir. + */ if (ma->ma_valid & MA_INODE && ma->ma_attr.la_valid & LA_CTIME && info->mti_mdt->mdt_namespace->ns_ctime_age_limit + @@ -2072,11 +2353,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, } /* layout lock must be granted in a best-effort way - * for IT operations */ + * for IT operations + */ LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT)); if (S_ISREG(lu_object_attr(&child->mot_obj)) && !mdt_object_remote(child) && ldlm_rep != NULL) { - if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) && + if (!CFS_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) && exp_connect_layout(info->mti_exp)) { /* try to grant layout lock for regular file. */ try_bits = MDS_INODELOCK_LAYOUT; @@ -2086,40 +2368,80 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, try_bits |= MDS_INODELOCK_DOM; } + /* + * To avoid possible deadlock between batched statahead RPC + * and rename()/migrate() operation, it should use trylock to + * obtain the DLM PR ibits lock for file attributes in a + * batched statahead RPC. A failed trylock means that other + * users maybe modify the directory simultaneously as in current + * Lustre design the server only grants read lock to a client. + * + * When a trylock failed, the MDT reports the conflict with + * error code -EBUSY, and stops statahead immediately. + */ + if (info->mti_batch_env) { + /* + * This is a sub stat-ahead request in a batched RPC. + * However, the @child is a remote object, we just + * return -EREMOTE here to forbid stat-ahead on it. + */ + if (mdt_object_remote(child)) + GOTO(out_child, rc = -EREMOTE); + + try_bits |= child_bits; + child_bits = 0; + } + if (try_bits != 0) { /* try layout lock, it may fail to be granted due to - * contention at LOOKUP or UPDATE */ + * contention at LOOKUP or UPDATE + */ rc = mdt_object_lock_try(info, child, lhc, &child_bits, - try_bits, false); + try_bits, LCK_PR); if (child_bits & MDS_INODELOCK_LAYOUT) ma_need |= MA_LOV; } else { /* Do not enqueue the UPDATE lock from MDT(cross-MDT), - * client will enqueue the lock to the remote MDT */ + * client will enqueue the lock to the remote MDT + */ if (mdt_object_remote(child)) - child_bits &= ~MDS_INODELOCK_UPDATE; - rc = mdt_object_lock(info, child, lhc, child_bits); + rc = mdt_object_lookup_lock(info, NULL, child, + lhc, LCK_PR); + else + rc = mdt_object_lock(info, child, lhc, + child_bits, LCK_PR); } if (unlikely(rc != 0)) GOTO(out_child, rc); + if (info->mti_batch_env && child_bits == 0) { + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc = -EBUSY); + } } + if (fscrypt_md) + child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD; + /* finally, we can get attr for child. */ rc = mdt_getattr_internal(info, child, ma_need); if (unlikely(rc != 0)) { - mdt_object_unlock(info, child, lhc, 1); + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); GOTO(out_child, rc); } rc = mdt_pack_secctx_in_reply(info, child); if (unlikely(rc)) { - mdt_object_unlock(info, child, lhc, 1); + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); GOTO(out_child, rc); } rc = mdt_pack_encctx_in_reply(info, child); if (unlikely(rc)) { - mdt_object_unlock(info, child, lhc, 1); + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); GOTO(out_child, rc); } @@ -2133,9 +2455,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, PLDLMRES(lock->l_resource), PFID(mdt_object_fid(child))); - if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) { + if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) { if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) - OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND, + CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND, req->rq_deadline - req->rq_arrival_time.tv_sec + cfs_fail_val ?: 3); @@ -2143,8 +2465,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, ldlm_set_ast_sent(lock); } - if (S_ISREG(lu_object_attr(&child->mot_obj)) && - !mdt_object_remote(child) && child != parent) { + /* + * check whether the object is remote as we can't + * really check attributes w/o explicit check for + * object's existence first. + */ + if (!mdt_object_remote(child) && child != parent && + S_ISREG(lu_object_attr(&child->mot_obj))) { mdt_object_put(info->mti_env, child); rc = mdt_pack_size2body(info, child_fid, &lhc->mlh_reg_lh); @@ -2169,6 +2496,15 @@ out_child: unlock_parent: if (lhp) mdt_object_unlock(info, parent, lhp, 1); + if (rc == -ENOENT) { + /* return -ENOKEY instead of -ENOENT to encryption-unaware + * client if trying to access an encrypted file + */ + int rc2 = mdt_check_enc(info, parent); + + if (rc2) + rc = rc2; + } return rc; } @@ -2218,14 +2554,14 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info, struct mdt_object *obj, s64 ctime) { struct lu_fid *child_fid = &info->mti_tmp_fid1; - struct ldlm_enqueue_info *einfo = &info->mti_einfo[0]; + struct ldlm_enqueue_info *einfo = &info->mti_einfo; struct mdt_device *mdt = info->mti_mdt; struct md_attr *ma = &info->mti_attr; struct mdt_lock_handle *parent_lh; struct mdt_lock_handle *child_lh; struct mdt_object *pobj; - bool cos_incompat = false; int rc; + ENTRY; pobj = mdt_object_find(info->mti_env, mdt, pfid); @@ -2233,14 +2569,10 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info, GOTO(out, rc = PTR_ERR(pobj)); parent_lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(parent_lh, LCK_PW, name); - rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE); + rc = mdt_parent_lock(info, pobj, parent_lh, name, LCK_PW); if (rc != 0) GOTO(put_parent, rc); - if (mdt_object_remote(pobj)) - cos_incompat = true; - rc = mdo_lookup(info->mti_env, mdt_object_child(pobj), name, child_fid, &info->mti_spec); if (rc != 0) @@ -2250,10 +2582,9 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info, GOTO(unlock_parent, rc = -EREMCHG); child_lh = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_reg_init(child_lh, LCK_EX); - rc = mdt_reint_striped_lock(info, obj, child_lh, - MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE, - einfo, cos_incompat); + rc = mdt_object_stripes_lock(info, pobj, obj, child_lh, einfo, + MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE, LCK_EX); if (rc != 0) GOTO(unlock_parent, rc); @@ -2276,7 +2607,7 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info, mutex_unlock(&obj->mot_lov_mutex); unlock_child: - mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1); + mdt_object_stripes_unlock(info, obj, child_lh, einfo, 1); unlock_parent: mdt_object_unlock(info, pobj, parent_lh, 1); put_parent: @@ -2292,6 +2623,7 @@ static int mdt_rmfid_check_permission(struct mdt_thread_info *info, struct md_attr *ma = &info->mti_attr; struct lu_attr *la = &ma->ma_attr; int rc = 0; + ENTRY; ma->ma_need = MA_INODE; @@ -2300,17 +2632,22 @@ static int mdt_rmfid_check_permission(struct mdt_thread_info *info, GOTO(out, rc); if (la->la_flags & LUSTRE_IMMUTABLE_FL) - rc = -EACCES; + rc = -EACCES; - if (md_capable(uc, CAP_DAC_OVERRIDE)) + /* we want rbac roles to have precedence over any other + * permission or capability checks + */ + if (!uc->uc_rbac_byfid_ops) + RETURN(-EPERM); + if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE)) RETURN(0); if (uc->uc_fsuid == la->la_uid) { - if ((la->la_mode & S_IWUSR) == 0) + if ((la->la_mode & 0200) == 0) rc = -EACCES; } else if (uc->uc_fsgid == la->la_gid) { - if ((la->la_mode & S_IWGRP) == 0) + if ((la->la_mode & 0020) == 0) rc = -EACCES; - } else if ((la->la_mode & S_IWOTH) == 0) { + } else if ((la->la_mode & 0002) == 0) { rc = -EACCES; } @@ -2330,6 +2667,7 @@ static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid, struct link_ea_header *leh; struct link_ea_entry *lee; int reclen, count, rc = 0; + ENTRY; if (!fid_is_sane(fid)) @@ -2389,6 +2727,7 @@ static int mdt_rmfid(struct tgt_session_info *tsi) int bufsize, rc; __u32 *rcs; int i, nr; + ENTRY; reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY); @@ -2428,6 +2767,62 @@ out: static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg); +int mdt_io_set_info(struct tgt_session_info *tsi) +{ + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct ost_body *body = NULL, *repbody; + void *key, *val = NULL; + int keylen, vallen, rc = 0; + bool is_grant_shrink; + + ENTRY; + + key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY); + if (key == NULL) { + DEBUG_REQ(D_HA, req, "no set_info key"); + RETURN(err_serious(-EFAULT)); + } + keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY, + RCL_CLIENT); + + val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL); + if (val == NULL) { + DEBUG_REQ(D_HA, req, "no set_info val"); + RETURN(err_serious(-EFAULT)); + } + vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL, + RCL_CLIENT); + + is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK); + if (is_grant_shrink) + /* In this case the value is actually an RMF_OST_BODY, so we + * transmutate the type of this PTLRPC + */ + req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO); + + rc = req_capsule_server_pack(tsi->tsi_pill); + if (rc < 0) + RETURN(rc); + + if (is_grant_shrink) { + body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY); + + repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY); + *repbody = *body; + + /** handle grant shrink, similar to a read request */ + tgt_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp, + &repbody->oa); + } else { + CERROR("%s: Unsupported key %s\n", + tgt_name(tsi->tsi_tgt), (char *)key); + rc = -EOPNOTSUPP; + } + + RETURN(rc); +} + + static int mdt_set_info(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); @@ -2457,6 +2852,22 @@ static int mdt_set_info(struct tgt_session_info *tsi) /* Swab any part of val you need to here */ if (KEY_IS(KEY_READ_ONLY)) { + /* If client wants rw, make sure nodemap does not enforce ro. */ + if (!*(__u32 *)val) { + struct lu_nodemap *nm = NULL; + bool readonly = false; + + if (req->rq_export) + nm = nodemap_get_from_exp(req->rq_export); + + if (!IS_ERR_OR_NULL(nm)) { + readonly = nm->nmf_readonly_mount; + nodemap_putref(nm); + } + + if (unlikely(readonly)) + RETURN(-EROFS); + } spin_lock(&req->rq_export->exp_lock); if (*(__u32 *)val) *exp_connect_flags_ptr(req->rq_export) |= @@ -2478,7 +2889,7 @@ static int mdt_set_info(struct tgt_session_info *tsi) __swab32s(&cs->cs_id); } - if (!mdt_is_rootadmin(tsi2mdt_info(tsi))) + if (!mdt_changelog_allow(tsi2mdt_info(tsi))) RETURN(-EACCES); rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export, vallen, val, NULL); @@ -2503,18 +2914,18 @@ static int mdt_readpage(struct tgt_session_info *tsi) ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) RETURN(err_serious(-ENOMEM)); repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY); if (repbody == NULL || reqbody == NULL) - RETURN(err_serious(-EFAULT)); + RETURN(err_serious(-EFAULT)); - /* - * prepare @rdpg before calling lower layers and transfer itself. Here - * reqbody->size contains offset of where to start to read and - * reqbody->nlink contains number bytes to read. - */ + /* + * prepare @rdpg before calling lower layers and transfer itself. Here + * reqbody->size contains offset of where to start to read and + * reqbody->nlink contains number bytes to read. + */ rdpg->rp_hash = reqbody->mbo_size; if (rdpg->rp_hash != reqbody->mbo_size) { CERROR("Invalid hash: %#llx != %#llx\n", @@ -2555,7 +2966,7 @@ free_rdpg: __free_page(rdpg->rp_pages[i]); OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) RETURN(0); return rc; @@ -2573,7 +2984,8 @@ static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op) if ((attr->la_valid & LA_UID) && (attr->la_uid != -1)) attr->la_uid = uc->uc_fsuid; /* for S_ISGID, inherit gid from his parent, such work will be - * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */ + * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. + */ if ((attr->la_valid & LA_GID) && (attr->la_gid != -1)) attr->la_gid = uc->uc_fsgid; } @@ -2600,28 +3012,18 @@ static void mdt_preset_secctx_size(struct mdt_thread_info *info) /* pre-set size in server part with max size */ req_capsule_set_size(pill, &RMF_FILE_SECCTX, RCL_SERVER, - OBD_MAX_DEFAULT_EA_SIZE); + req_capsule_ptlreq(pill) ? + OBD_MAX_DEFAULT_EA_SIZE : + MAX_MD_SIZE_OLD); else req_capsule_set_size(pill, &RMF_FILE_SECCTX, RCL_SERVER, 0); } } -static void mdt_preset_encctx_size(struct mdt_thread_info *info) -{ - struct req_capsule *pill = info->mti_pill; - - if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX, - RCL_SERVER)) - /* pre-set size in server part with max size */ - req_capsule_set_size(pill, &RMF_FILE_ENCCTX, - RCL_SERVER, - info->mti_mdt->mdt_max_mdsize); -} - static int mdt_reint_internal(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc, - __u32 op) + struct mdt_lock_handle *lhc, + __u32 op) { struct req_capsule *pill = info->mti_pill; struct mdt_body *repbody; @@ -2637,7 +3039,8 @@ static int mdt_reint_internal(struct mdt_thread_info *info, /* check if the file system is set to readonly. O_RDONLY open - * is still allowed even the file system is set to readonly mode */ + * is still allowed even the file system is set to readonly mode + */ if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op)) RETURN(err_serious(-EROFS)); @@ -2652,7 +3055,8 @@ static int mdt_reint_internal(struct mdt_thread_info *info, /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD * by default. If the target object has more ACL entries, then - * enlarge the buffer when necessary. */ + * enlarge the buffer when necessary. + */ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER)) req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); @@ -2673,7 +3077,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info, repbody->mbo_aclsize = 0; } - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); /* for replay no cookkie / lmm need, because client have this already */ if (info->mti_spec.no_create) @@ -2733,8 +3137,8 @@ static long mdt_reint_opcode(struct ptlrpc_request *req, req_capsule_extend(&req->rq_pill, fmt[opc]); else { mdt = mdt_exp2dev(req->rq_export); - CERROR("%s: Unsupported opcode '%ld' from client '%s':" - " rc = %d\n", req->rq_export->exp_obd->obd_name, + CERROR("%s: Unsupported opcode '%ld' from client '%s': rc = %d\n", + req->rq_export->exp_obd->obd_name, opc, mdt->mdt_ldlm_client->cli_name, -EFAULT); opc = err_serious(-EFAULT); } @@ -2783,12 +3187,13 @@ static int mdt_reint(struct tgt_session_info *tsi) /* this should sync the whole device */ int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) { - struct dt_device *dt = mdt->mdt_bottom; - int rc; - ENTRY; + struct dt_device *dt = mdt->mdt_bottom; + int rc; + + ENTRY; - rc = dt->dd_ops->dt_sync(env, dt); - RETURN(rc); + rc = dt->dd_ops->dt_sync(env, dt); + RETURN(rc); } /* this should sync this object */ @@ -2830,7 +3235,7 @@ static int mdt_sync(struct tgt_session_info *tsi) ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) RETURN(err_serious(-ENOMEM)); if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) { @@ -2882,8 +3287,7 @@ static int mdt_data_sync(struct tgt_session_info *tsi) repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY); - /* if no fid is specified then do nothing, - * device sync is done via MDS_SYNC */ + /* device sync is done via MDS_SYNC. NOOP if no fid is specified */ if (fid_is_zero(&tsi->tsi_fid)) RETURN(0); @@ -2925,16 +3329,25 @@ static int mdt_quotactl(struct tgt_session_info *tsi) struct obd_export *exp = tsi->tsi_exp; struct req_capsule *pill = tsi->tsi_pill; struct obd_quotactl *oqctl, *repoqc; - int id, rc; struct mdt_device *mdt = mdt_exp2dev(exp); struct lu_device *qmt = mdt->mdt_qmt_dev; struct lu_nodemap *nodemap; + char *buffer = NULL; + int id, rc; + ENTRY; oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL); if (!oqctl) RETURN(err_serious(-EPROTO)); + if (oqctl->qc_cmd == LUSTRE_Q_ITERQUOTA || + oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) + req_capsule_set_size(pill, &RMF_OBD_QUOTA_ITER, RCL_SERVER, + LQUOTA_ITER_BUFLEN); + else + req_capsule_set_size(pill, &RMF_OBD_QUOTA_ITER, RCL_SERVER, 0); + rc = req_capsule_server_pack(pill); if (rc) RETURN(err_serious(rc)); @@ -2951,21 +3364,26 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case LUSTRE_Q_SETQUOTAPOOL: case LUSTRE_Q_SETINFOPOOL: case LUSTRE_Q_SETDEFAULT_POOL: - if (!nodemap_can_setquota(nodemap)) + case LUSTRE_Q_DELETEQID: + case LUSTRE_Q_RESETQID: + if (!nodemap_can_setquota(nodemap, oqctl->qc_type, + oqctl->qc_id)) GOTO(out_nodemap, rc = -EPERM); - /* fallthrough */ + fallthrough; case Q_GETINFO: case Q_GETQUOTA: case LUSTRE_Q_GETDEFAULT: case LUSTRE_Q_GETQUOTAPOOL: case LUSTRE_Q_GETINFOPOOL: case LUSTRE_Q_GETDEFAULT_POOL: + case LUSTRE_Q_ITERQUOTA: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ - /* fallthrough */ + fallthrough; case Q_GETOINFO: case Q_GETOQUOTA: + case LUSTRE_Q_ITEROQUOTA: break; default: rc = -EFAULT; @@ -2985,8 +3403,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi) NODEMAP_CLIENT_TO_FS, id); break; case PRJQUOTA: - /* todo: check/map project id */ - id = oqctl->qc_id; + id = nodemap_map_id(nodemap, NODEMAP_PROJID, + NODEMAP_CLIENT_TO_FS, id); break; default: GOTO(out_nodemap, rc = -EOPNOTSUPP); @@ -2995,6 +3413,13 @@ static int mdt_quotactl(struct tgt_session_info *tsi) if (repoqc == NULL) GOTO(out_nodemap, rc = err_serious(-EFAULT)); + if (oqctl->qc_cmd == LUSTRE_Q_ITERQUOTA || + oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) { + buffer = req_capsule_server_get(pill, &RMF_OBD_QUOTA_ITER); + if (buffer == NULL) + GOTO(out_nodemap, rc = err_serious(-EFAULT)); + } + if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) barrier_exit(tsi->tsi_tgt->lut_bottom); @@ -3020,15 +3445,22 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case LUSTRE_Q_GETINFOPOOL: case LUSTRE_Q_SETDEFAULT_POOL: case LUSTRE_Q_GETDEFAULT_POOL: + case LUSTRE_Q_DELETEQID: + case LUSTRE_Q_RESETQID: + case LUSTRE_Q_ITERQUOTA: /* forward quotactl request to QMT */ - rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl); + rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl, buffer, + buffer == NULL ? 0 : + LQUOTA_ITER_BUFLEN); break; case Q_GETOINFO: case Q_GETOQUOTA: + case LUSTRE_Q_ITEROQUOTA: /* slave quotactl */ rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, - oqctl); + oqctl, buffer, + buffer == NULL ? 0 : LQUOTA_ITER_BUFLEN); break; default: @@ -3039,7 +3471,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi) if (oqctl->qc_id != id) swap(oqctl->qc_id, id); - QCTL_COPY(repoqc, oqctl); + QCTL_COPY_NO_PNAME(repoqc, oqctl); EXIT; out_nodemap: @@ -3057,39 +3489,38 @@ out_nodemap: * context into our context list here. */ static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt, - int idx) + int idx) { - struct md_device *next = mdt->mdt_child; - struct llog_ctxt *ctxt; - int rc; + struct md_device *next = mdt->mdt_child; + struct llog_ctxt *ctxt; + int rc; - if (!llog_ctxt_null(mdt2obd_dev(mdt), idx)) - return 0; + if (!llog_ctxt_null(mdt2obd_dev(mdt), idx)) + return 0; - rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt); - if (rc || ctxt == NULL) { + rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt); + if (rc || ctxt == NULL) return 0; - } - rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx); - if (rc) - CERROR("Can't set mdt ctxt %d\n", rc); + rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx); + if (rc) + CERROR("Can't set mdt ctxt %d\n", rc); - return rc; + return rc; } static int mdt_llog_ctxt_unclone(const struct lu_env *env, - struct mdt_device *mdt, int idx) + struct mdt_device *mdt, int idx) { - struct llog_ctxt *ctxt; + struct llog_ctxt *ctxt; - ctxt = llog_get_context(mdt2obd_dev(mdt), idx); - if (ctxt == NULL) - return 0; - /* Put once for the get we just did, and once for the clone */ - llog_ctxt_put(ctxt); - llog_ctxt_put(ctxt); - return 0; + ctxt = llog_get_context(mdt2obd_dev(mdt), idx); + if (ctxt == NULL) + return 0; + /* Put once for the get we just did, and once for the clone */ + llog_ctxt_put(ctxt); + llog_ctxt_put(ctxt); + return 0; } /* @@ -3110,6 +3541,7 @@ static int mdt_quota_dqacq(struct tgt_session_info *tsi) struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp); struct lu_device *qmt = mdt->mdt_qmt_dev; int rc; + ENTRY; if (qmt == NULL) @@ -3126,6 +3558,7 @@ struct mdt_object *mdt_object_new(const struct lu_env *env, struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; struct lu_object *o; struct mdt_object *m; + ENTRY; CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f)); @@ -3143,6 +3576,7 @@ struct mdt_object *mdt_object_find(const struct lu_env *env, { struct lu_object *o; struct mdt_object *m; + ENTRY; CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f)); @@ -3164,10 +3598,11 @@ struct mdt_object *mdt_object_find(const struct lu_env *env, * \param mdt the mdt device */ static void mdt_device_commit_async(const struct lu_env *env, - struct mdt_device *mdt) + struct mdt_device *mdt) { struct dt_device *dt = mdt->mdt_bottom; int rc; + ENTRY; rc = dt->dd_ops->dt_commit_async(env, dt); @@ -3190,7 +3625,7 @@ static void mdt_device_commit_async(const struct lu_env *env, */ static inline void mdt_set_lock_sync(struct ldlm_lock *lock) { - lock->l_ast_data = (void*)1; + lock->l_ast_data = (void *)1; } /** @@ -3205,7 +3640,7 @@ static inline void mdt_set_lock_sync(struct ldlm_lock *lock) */ static inline int mdt_is_lock_sync(struct ldlm_lock *lock) { - return lock->l_ast_data != NULL; + return lock->l_ast_data != NULL; } /** @@ -3229,8 +3664,8 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, struct ldlm_cb_set_arg *arg = data; bool commit_async = false; int rc; - ENTRY; + ENTRY; if (flag == LDLM_CB_CANCELING) RETURN(0); @@ -3246,33 +3681,17 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, * The 'data' parameter is l_ast_data in the first case and * callback arguments in the second one. Distinguish them by that. */ - if (!data || data == lock->l_ast_data || !arg->bl_desc) - goto skip_cos_checks; - - if (lock->l_req_mode & (LCK_PW | LCK_EX)) { - if (mdt_cos_is_enabled(mdt)) { - if (!arg->bl_desc->bl_same_client) - mdt_set_lock_sync(lock); - } else if (mdt_slc_is_enabled(mdt) && - arg->bl_desc->bl_cos_incompat) { - mdt_set_lock_sync(lock); - /* - * we may do extra commit here, but there is a small - * window to miss a commit: lock was unlocked (saved), - * then a conflict lock queued and we come here, but - * REP-ACK not received, so lock was not converted to - * COS mode yet. - * Fortunately this window is quite small, so the - * extra commit should be rare (not to say distributed - * operation is rare too). - */ + if (data && data != lock->l_ast_data && arg->bl_desc) { + if (lock->l_req_mode & (LCK_COS | LCK_TXN)) commit_async = true; - } - } else if (lock->l_req_mode == LCK_COS) { - commit_async = true; + else if ((lock->l_req_mode & (LCK_PW | LCK_EX)) && + ((mdt_cos_is_enabled(mdt) && + !arg->bl_desc->bl_same_client) || + (mdt_slc_is_enabled(mdt) && + arg->bl_desc->bl_txn_dependent))) + mdt_set_lock_sync(lock); } -skip_cos_checks: rc = ldlm_blocking_ast_nocheck(lock); if (commit_async) { @@ -3280,8 +3699,7 @@ skip_cos_checks: rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) - CWARN("%s: lu_env initialization failed, cannot " - "start asynchronous commit: rc = %d\n", + CWARN("%s: lu_env initialization failed, cannot start asynchronous commit: rc = %d\n", obd->obd_name, rc); else mdt_device_commit_async(&env, mdt); @@ -3306,6 +3724,7 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { int rc = 0; + ENTRY; switch (flag) { @@ -3326,10 +3745,11 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev->ld_site->ls_top_dev); - LDLM_DEBUG(lock, "Revoke remote lock\n"); + LDLM_DEBUG(lock, "Revoke remote lock"); /* discard slc lock here so that it can be cleaned anytime, - * especially for cleanup_resource() */ + * especially for cleanup_resource() + */ tgt_discard_slc_lock(&mdt->mdt_lut, lock); /* once we cache lock, l_ast_data is set to mdt_object */ @@ -3376,7 +3796,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, if (lock == NULL) { /* Lock is pinned by ldlm_handle_enqueue0() as it is * a resend case, however, it could be already destroyed - * due to client eviction or a raced cancel RPC. */ + * due to client eviction or a raced cancel RPC. + */ LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx", lhc->mlh_reg_lh.cookie); RETURN(-ESTALE); @@ -3384,8 +3805,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, if (!fid_res_name_eq(mdt_object_fid(mo), &lock->l_resource->lr_name)) { - CWARN("%s: Although resent, but still not " - "get child lock:"DFID"\n", + CWARN("%s: Although resent, but still not get child lock:" + DFID"\n", info->mti_exp->exp_obd->obd_name, PFID(mdt_object_fid(mo))); LDLM_LOCK_PUT(lock); @@ -3402,20 +3823,18 @@ static void mdt_remote_object_lock_created_cb(struct ldlm_lock *lock) mdt_object_get(NULL, lock->l_ast_data); } -int mdt_remote_object_lock_try(struct mdt_thread_info *mti, - struct mdt_object *o, const struct lu_fid *fid, - struct lustre_handle *lh, enum ldlm_mode mode, - __u64 *ibits, __u64 trybits, bool cache) +static int mdt_remote_object_lock_try(struct mdt_thread_info *mti, + struct mdt_object *obj, + struct lustre_handle *lh, + enum ldlm_mode mode, + union ldlm_policy_data *policy, + struct ldlm_res_id *res_id, + bool cache) { struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo; - union ldlm_policy_data *policy = &mti->mti_policy; - struct ldlm_res_id *res_id = &mti->mti_res_id; - int rc = 0; - ENTRY; - - LASSERT(mdt_object_remote(o)); + int rc; - fid_build_reg_res_name(fid, res_id); + LASSERT(mdt_object_remote(obj)); memset(einfo, 0, sizeof(*einfo)); einfo->ei_type = LDLM_IBITS; @@ -3424,252 +3843,373 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, einfo->ei_cb_cp = ldlm_completion_ast; einfo->ei_enq_slave = 0; einfo->ei_res_id = res_id; - + einfo->ei_req_slot = 1; if (cache) { /* * if we cache lock, couple lock with mdt_object, so that object * can be easily found in lock ASTs. */ - einfo->ei_cbdata = o; + einfo->ei_cbdata = obj; einfo->ei_cb_created = mdt_remote_object_lock_created_cb; } - memset(policy, 0, sizeof(*policy)); - policy->l_inodebits.bits = *ibits; - policy->l_inodebits.try_bits = trybits; - - rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, + rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), lh, einfo, policy); - - /* Return successfully acquired bits to a caller */ - if (rc == 0) { - struct ldlm_lock *lock = ldlm_handle2lock(lh); - - LASSERT(lock); - *ibits = lock->l_policy_data.l_inodebits.bits; - LDLM_LOCK_PUT(lock); + if (rc) { + lh->cookie = 0ull; + return rc; } - RETURN(rc); -} -int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o, - const struct lu_fid *fid, struct lustre_handle *lh, - enum ldlm_mode mode, __u64 ibits, bool cache) -{ - return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0, - cache); + /* other components like LFSCK can use lockless access + * and populate cache, so we better invalidate it + */ + if (policy->l_inodebits.bits & + (MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR)) + mo_invalidate(mti->mti_env, mdt_object_child(obj)); + + return 0; } -int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 *ibits, - __u64 trybits, bool cos_incompat) +/* + * Helper function to take PDO and hash lock. + * + * if \a pdo_lock is false, don't take PDO lock, this is case in rename. + */ +int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lh, const struct lu_name *name, + enum ldlm_mode mode, bool pdo_lock) { struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags = 0, *cookie = NULL; + /* + * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it is never going to + * be sent to client and we do not want it slowed down due to possible + * cancels. + */ + __u64 dlmflags = LDLM_FL_ATOMIC_CB; + __u64 *cookie = NULL; int rc; - ENTRY; - - LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); - LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh)); - LASSERT(lh->mlh_reg_mode != LCK_MINMODE); - LASSERT(lh->mlh_type != MDT_NUL_LOCK); - - if (cos_incompat) { - LASSERT(lh->mlh_reg_mode == LCK_PW || - lh->mlh_reg_mode == LCK_EX); - dlmflags |= LDLM_FL_COS_INCOMPAT; - } else if (mdt_cos_is_enabled(info->mti_mdt)) { - dlmflags |= LDLM_FL_COS_ENABLED; - } - - /* Only enqueue LOOKUP lock for remote object */ - LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP)); - - /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */ - if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX && - *ibits == MDS_INODELOCK_OPEN) - dlmflags |= LDLM_FL_CANCEL_ON_BLOCK; - - if (lh->mlh_type == MDT_PDO_LOCK) { - /* check for exists after object is locked */ - if (mdt_object_exists(o) == 0) { - /* Non-existent object shouldn't have PDO lock */ - RETURN(-ESTALE); - } else { - /* Non-dir object shouldn't have PDO lock */ - if (!S_ISDIR(lu_object_attr(&o->mot_obj))) - RETURN(-ENOTDIR); - } - } - fid_build_reg_res_name(mdt_object_fid(o), res_id); - dlmflags |= LDLM_FL_ATOMIC_CB; + LASSERT(obj); + /* check for exists after object is locked */ + if (!mdt_object_exists(obj)) + /* Non-existent object shouldn't have PDO lock */ + return -ESTALE; + + /* Non-dir object shouldn't have PDO lock */ + if (!S_ISDIR(lu_object_attr(&obj->mot_obj))) + return -ENOTDIR; + + policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; + policy->l_inodebits.try_bits = 0; + policy->l_inodebits.li_gid = 0; + policy->l_inodebits.li_initiator_id = mdt_node_id(info->mti_mdt); + fid_build_reg_res_name(mdt_object_fid(obj), res_id); if (info->mti_exp) cookie = &info->mti_exp->exp_handle.h_cookie; - /* - * Take PDO lock on whole directory and build correct @res_id for lock - * on part of directory. - */ - if (lh->mlh_pdo_hash != 0) { - LASSERT(lh->mlh_type == MDT_PDO_LOCK); - mdt_lock_pdo_mode(info, o, lh); - if (lh->mlh_pdo_mode != LCK_NL) { - /* - * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it - * is never going to be sent to client and we do not - * want it slowed down due to possible cancels. - */ - policy->l_inodebits.bits = - *ibits & MDS_INODELOCK_UPDATE; - policy->l_inodebits.try_bits = - trybits & MDS_INODELOCK_UPDATE; - /* at least one of them should be set */ - LASSERT(policy->l_inodebits.bits | - policy->l_inodebits.try_bits); - rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh, - lh->mlh_pdo_mode, policy, res_id, - dlmflags, cookie); - if (unlikely(rc != 0)) - GOTO(out_unlock, rc); - } + mdt_lock_pdo_init(lh, mode, name); + mdt_lock_pdo_mode(info, obj, lh); + if (lh->mlh_pdo_mode != LCK_NL) { + if (pdo_lock) { + if (mdt_object_remote(obj)) { + rc = mdt_remote_object_lock_try(info, obj, + &lh->mlh_pdo_lh, lh->mlh_pdo_mode, + policy, res_id, false); + lh->mlh_pdo_remote = 1; + } else { + rc = mdt_fid_lock(info->mti_env, ns, + &lh->mlh_pdo_lh, lh->mlh_pdo_mode, + policy, res_id, dlmflags, cookie); + } + if (rc) { + mdt_object_unlock(info, obj, lh, 1); + return rc; + } + } + res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash; + } + + if (mdt_object_remote(obj)) + rc = mdt_remote_object_lock_try(info, obj, &lh->mlh_rreg_lh, + lh->mlh_rreg_mode, policy, res_id, false); + else + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, + lh->mlh_reg_mode, policy, res_id, dlmflags, cookie); + if (rc) + mdt_object_unlock(info, obj, lh, 1); + else if (CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK) && + lh->mlh_pdo_hash != 0 && + (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX)) + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15); - /* - * Finish res_id initializing by name hash marking part of - * directory which is taking modification. - */ - res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash; - } + return rc; +} + +int mdt_object_lock_internal(struct mdt_thread_info *info, + struct mdt_object *obj, const struct lu_fid *fid, + struct mdt_lock_handle *lh, __u64 *ibits, + __u64 trybits, bool cache) +{ + union ldlm_policy_data *policy = &info->mti_policy; + struct ldlm_res_id *res_id = &info->mti_res_id; + struct lustre_handle *handle; + int rc; policy->l_inodebits.bits = *ibits; policy->l_inodebits.try_bits = trybits; policy->l_inodebits.li_gid = lh->mlh_gid; + policy->l_inodebits.li_initiator_id = mdt_node_id(info->mti_mdt); + fid_build_reg_res_name(fid, res_id); - /* - * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is - * going to be sent to client. If it is - mdt_intent_policy() path will - * fix it up and turn FL_LOCAL flag off. - */ - rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, - policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - cookie); -out_unlock: - if (rc != 0) - mdt_object_unlock(info, o, lh, 1); - else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) && - lh->mlh_pdo_hash != 0 && - (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX)) - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15); + if (obj && mdt_object_remote(obj)) { + handle = &lh->mlh_rreg_lh; + LASSERT(!lustre_handle_is_used(handle)); + LASSERT(lh->mlh_rreg_mode != LCK_MINMODE); + LASSERT(lh->mlh_type != MDT_NUL_LOCK); + rc = mdt_remote_object_lock_try(info, obj, handle, + lh->mlh_rreg_mode, policy, + res_id, cache); + } else { + struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; + /* + * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if + * it is going to be sent to client. If it is - + * mdt_intent_policy() path will fix it up and turn FL_LOCAL + * flag off. + */ + __u64 dlmflags = LDLM_FL_ATOMIC_CB | LDLM_FL_LOCAL_ONLY; + __u64 *cookie = NULL; + + handle = &lh->mlh_reg_lh; + LASSERT(!lustre_handle_is_used(handle)); + LASSERT(lh->mlh_reg_mode != LCK_MINMODE); + LASSERT(lh->mlh_type != MDT_NUL_LOCK); + + /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */ + if (lh->mlh_type == MDT_REG_LOCK && + lh->mlh_reg_mode == LCK_EX && *ibits == MDS_INODELOCK_OPEN) + dlmflags |= LDLM_FL_CANCEL_ON_BLOCK; + + + if (info->mti_exp) + cookie = &info->mti_exp->exp_handle.h_cookie; + + rc = mdt_fid_lock(info->mti_env, ns, handle, lh->mlh_reg_mode, + policy, res_id, dlmflags, cookie); + if (rc) + mdt_object_unlock(info, obj, lh, 1); + } - /* Return successfully acquired bits to a caller */ if (rc == 0) { - struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh); + struct ldlm_lock *lock; + /* Return successfully acquired bits to a caller */ + lock = ldlm_handle2lock(handle); LASSERT(lock); *ibits = lock->l_policy_data.l_inodebits.bits; LDLM_LOCK_PUT(lock); } - RETURN(rc); + + return rc; } -static int -mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 *ibits, - __u64 trybits, bool cos_incompat) +/* + * MDT object locking functions: + * mdt_object_lock(): lock object, this is used in most places, and normally + * lock ibits doesn't contain LOOKUP, unless the caller knows it's not + * remote object. + * mdt_object_check_lock(): lock object with LOOKUP and other ibits, it needs + * to check whether parent is on remote MDT, if so, take LOOKUP on parent + * MDT separately, and then lock other ibits on child object. + * mdt_parent_lock(): take parent UPDATE lock with specific mode, if parent is + * local, take PDO lock by name hash, otherwise take regular lock. + * mdt_object_stripes_lock(): lock object which should be local, and if it's a + * striped directory, lock its stripes, this is called in operations which + * modify both object and stripes. + * mdt_object_lock_try(): lock object with trybits, the trybits contains + * optional inode lock bits that can be granted. This is called by + * getattr/open to fetch more inode lock bits to client, and is also called + * by dir migration to lock link parent in non-block mode to avoid + * deadlock. + */ + +/** + * lock object + * + * this is used to lock object in most places, and normally lock ibits doesn't + * contain LOOKUP, unless the caller knows it's not remote object. + * + * \param info struct mdt_thread_info + * \param obj object + * \param lh lock handle + * \param ibits MDS inode lock bits + * \param mode lock mode + * + * \retval 0 on success, -ev on error. + */ +int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lh, __u64 ibits, + enum ldlm_mode mode) { - struct mdt_lock_handle *local_lh = NULL; int rc; - ENTRY; - - if (!mdt_object_remote(o)) { - rc = mdt_object_local_lock(info, o, lh, ibits, trybits, - cos_incompat); - RETURN(rc); - } - /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */ - *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT | - MDS_INODELOCK_XATTR); + ENTRY; + mdt_lock_reg_init(lh, mode); + rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, + &ibits, 0, false); + RETURN(rc); +} - /* Only enqueue LOOKUP lock for remote object */ - if (*ibits & MDS_INODELOCK_LOOKUP) { - __u64 local = MDS_INODELOCK_LOOKUP; +/** + * lock object with LOOKUP and other ibits + * + * it will check whether parent and child are on different MDTs, if so, take + * LOOKUP lock on parent MDT, and lock other ibits on child MDT, otherwise lock + * all ibits on child MDT. Note, parent and child shouldn't be both on remote + * MDTs, in which case specific lock function should be used, and it's in + * rename and migrate only. + * + * \param info struct mdt_thread_info + * \param parent parent object + * \param child child object + * \param lh lock handle + * \param ibits MDS inode lock bits + * \param mode lock mode + * + * \retval 0 on success, -ev on error. + */ +int mdt_object_check_lock(struct mdt_thread_info *info, + struct mdt_object *parent, struct mdt_object *child, + struct mdt_lock_handle *lh, __u64 ibits, + enum ldlm_mode mode) +{ + int rc; - rc = mdt_object_local_lock(info, o, lh, &local, 0, - cos_incompat); - if (rc != ELDLM_OK) + ENTRY; + /* if LOOKUP ibit is not set, use mdt_object_lock() */ + LASSERT(ibits & MDS_INODELOCK_LOOKUP); + /* if only LOOKUP ibit is needed, use mdt_object_lookup_lock() */ + LASSERT(ibits != MDS_INODELOCK_LOOKUP); + LASSERT(parent); + /* @parent and @child shouldn't both be on remote MDTs */ + LASSERT(!(mdt_object_remote(parent) && mdt_object_remote(child))); + + mdt_lock_reg_init(lh, mode); + if (mdt_object_remote(parent) ^ mdt_object_remote(child)) { + __u64 lookup_ibits = MDS_INODELOCK_LOOKUP; + + rc = mdt_object_lock_internal(info, parent, + mdt_object_fid(child), lh, + &lookup_ibits, 0, false); + if (rc) RETURN(rc); - local_lh = lh; - } - - if ((*ibits | trybits) & MDS_INODELOCK_UPDATE) { - /* Sigh, PDO needs to enqueue 2 locks right now, but - * enqueue RPC can only request 1 lock, to avoid extra - * RPC, so it will instead enqueue EX lock for remote - * object anyway XXX*/ - if (lh->mlh_type == MDT_PDO_LOCK && - lh->mlh_pdo_hash != 0) { - CDEBUG(D_INFO, - "%s: "DFID" convert PDO lock to EX lock.\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(o))); - lh->mlh_pdo_hash = 0; - lh->mlh_rreg_mode = LCK_EX; - lh->mlh_type = MDT_REG_LOCK; - } - - rc = mdt_remote_object_lock_try(info, o, mdt_object_fid(o), - &lh->mlh_rreg_lh, - lh->mlh_rreg_mode, - ibits, trybits, false); - if (rc != ELDLM_OK) { - if (local_lh != NULL) - mdt_object_unlock(info, o, local_lh, rc); - RETURN(rc); - } + ibits &= ~MDS_INODELOCK_LOOKUP; } - /* other components like LFSCK can use lockless access - * and populate cache, so we better invalidate it */ - mo_invalidate(info->mti_env, mdt_object_child(o)); + rc = mdt_object_lock_internal(info, child, mdt_object_fid(child), lh, + &ibits, 0, false); + if (rc && !(ibits & MDS_INODELOCK_LOOKUP)) + mdt_object_unlock(info, NULL, lh, 1); - RETURN(0); + RETURN(rc); } -int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits) +/** + * take parent UPDATE lock + * + * if parent is local or mode is LCK_PW, take PDO lock, otherwise take regular + * lock. + * + * \param info struct mdt_thread_info + * \param obj parent object + * \param lh lock handle + * \param lname child name + * \param mode lock mode + * + * \retval 0 on success, -ev on error. + */ +int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lh, const struct lu_name *lname, + enum ldlm_mode mode) { - return mdt_object_lock_internal(info, o, lh, &ibits, 0, false); -} + int rc; -int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits, - bool cos_incompat) -{ - LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX); - return mdt_object_lock_internal(info, o, lh, &ibits, 0, - cos_incompat); + ENTRY; + LASSERT(obj && lname); + LASSERT(mode == LCK_PW || mode == LCK_PR); + if (mdt_object_remote(obj) && mode == LCK_PR) { + __u64 ibits = MDS_INODELOCK_UPDATE; + + mdt_lock_reg_init(lh, mode); + rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), + lh, &ibits, 0, false); + } else { + rc = mdt_object_pdo_lock(info, obj, lh, lname, mode, true); + } + RETURN(rc); } -int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, +/** + * lock object with trybits + * + * the trybits contains optional inode lock bits that can be granted. This is + * called by getattr/open to fetch more inode lock bits to client, and is also + * called by dir migration to lock link parent in non-block mode to avoid + * deadlock. + * + * \param info struct mdt_thread_info + * \param obj object + * \param lh lock handle + * \param ibits MDS inode lock bits + * \param trybits optional inode lock bits + * \param mode lock mode + * + * \retval 0 on success, -ev on error. + */ +int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *obj, struct mdt_lock_handle *lh, __u64 *ibits, - __u64 trybits, bool cos_incompat) + __u64 trybits, enum ldlm_mode mode) { bool trylock_only = *ibits == 0; int rc; + ENTRY; LASSERT(!(*ibits & trybits)); - rc = mdt_object_lock_internal(info, o, lh, ibits, trybits, - cos_incompat); + mdt_lock_reg_init(lh, mode); + rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, ibits, + trybits, false); if (rc && trylock_only) { /* clear error for try ibits lock only */ LASSERT(*ibits == 0); rc = 0; } - return rc; + RETURN(rc); +} + +/* + * Helper function to take \a obj LOOKUP lock. + * + * Both \a pobj and \a obj may be located on remote MDTs. + */ +int mdt_object_lookup_lock(struct mdt_thread_info *info, + struct mdt_object *pobj, struct mdt_object *obj, + struct mdt_lock_handle *lh, enum ldlm_mode mode) +{ + __u64 ibits = MDS_INODELOCK_LOOKUP; + int rc; + + ENTRY; + /* if @parent is NULL, it's on local MDT, and @child is remote, + * this is case in getattr/unlink/open by name. + */ + LASSERT(ergo(!pobj, mdt_object_remote(obj))); + mdt_lock_reg_init(lh, mode); + rc = mdt_object_lock_internal(info, pobj, mdt_object_fid(obj), lh, + &ibits, 0, false); + RETURN(rc); } /** @@ -3685,55 +4225,65 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, * \param mode lock mode * \param decref force immediate lock releasing */ -void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, - enum ldlm_mode mode, int decref) +static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, + enum ldlm_mode mode, int decref) { + struct tgt_session_info *tsi = info->mti_env->le_ses ? + tgt_ses_info(info->mti_env) : NULL; + ENTRY; if (lustre_handle_is_used(h)) { - if (decref || !info->mti_has_trans || - !(mode & (LCK_PW | LCK_EX))) { + bool has_trans = tsi && tsi->tsi_has_trans; + + if (decref || !has_trans || !(mode & (LCK_PW | LCK_EX))) { mdt_fid_unlock(h, mode); } else { struct mdt_device *mdt = info->mti_mdt; struct ldlm_lock *lock = ldlm_handle2lock(h); struct ptlrpc_request *req = mdt_info_req(info); - bool cos = mdt_cos_is_enabled(mdt); - bool convert_lock = !cos && mdt_slc_is_enabled(mdt); + bool no_ack = false; LASSERTF(lock != NULL, "no lock for cookie %#llx\n", h->cookie); /* there is no request if mdt_object_unlock() is called - * from mdt_export_cleanup()->mdt_add_dirty_flag() */ + * from mdt_export_cleanup()->mdt_add_dirty_flag() + */ if (likely(req != NULL)) { - LDLM_DEBUG(lock, "save lock request %p reply " - "state %p transno %lld\n", req, + LDLM_DEBUG(lock, "save lock request %p reply state %p transno %lld", + req, req->rq_reply_state, req->rq_transno); - if (cos) { - ldlm_lock_mode_downgrade(lock, LCK_COS); + if (mdt_cos_is_enabled(mdt)) { mode = LCK_COS; + no_ack = true; + ldlm_lock_mode_downgrade(lock, mode); + } else if (mdt_slc_is_enabled(mdt)) { + no_ack = true; + if (mode != LCK_TXN) { + mode = LCK_TXN; + ldlm_lock_mode_downgrade(lock, + mode); + } } if (req->rq_export->exp_disconnected) mdt_fid_unlock(h, mode); else - ptlrpc_save_lock(req, h, mode, cos, - convert_lock); + ptlrpc_save_lock(req, h, no_ack); } else { mdt_fid_unlock(h, mode); } - if (mdt_is_lock_sync(lock)) { - CDEBUG(D_HA, "found sync-lock," - " async commit started\n"); - mdt_device_commit_async(info->mti_env, - mdt); - } - LDLM_LOCK_PUT(lock); - } - h->cookie = 0ull; - } - EXIT; + if (mdt_is_lock_sync(lock)) { + CDEBUG(D_HA, "sync_lock, do async commit\n"); + mdt_device_commit_async(info->mti_env, mdt); + } + LDLM_LOCK_PUT(lock); + } + h->cookie = 0ull; + } + + EXIT; } /** @@ -3762,8 +4312,8 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info, (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE))) mo_invalidate(info->mti_env, mdt_object_child(o)); - if (decref || !info->mti_has_trans || !req || - !(mode & (LCK_PW | LCK_EX))) { + if (decref || !req || !(mode & (LCK_PW | LCK_EX)) || + !tgt_ses_info(info->mti_env)->tsi_has_trans) { ldlm_lock_decref_and_cancel(h, mode); LDLM_LOCK_PUT(lock); } else { @@ -3796,7 +4346,11 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, { ENTRY; - mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); + if (lh->mlh_pdo_remote) + mdt_save_remote_lock(info, o, &lh->mlh_pdo_lh, + lh->mlh_pdo_mode, decref); + else + mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); mdt_save_remote_lock(info, o, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, decref); @@ -3805,32 +4359,32 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, } struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info, - const struct lu_fid *f, - struct mdt_lock_handle *lh, - __u64 ibits) + const struct lu_fid *f, + struct mdt_lock_handle *lh, + __u64 ibits, enum ldlm_mode mode) { - struct mdt_object *o; + struct mdt_object *o; - o = mdt_object_find(info->mti_env, info->mti_mdt, f); - if (!IS_ERR(o)) { - int rc; + o = mdt_object_find(info->mti_env, info->mti_mdt, f); + if (!IS_ERR(o)) { + int rc; - rc = mdt_object_lock(info, o, lh, ibits); - if (rc != 0) { - mdt_object_put(info->mti_env, o); - o = ERR_PTR(rc); - } - } - return o; + rc = mdt_object_lock(info, o, lh, ibits, mode); + if (rc != 0) { + mdt_object_put(info->mti_env, o); + o = ERR_PTR(rc); + } + } + return o; } -void mdt_object_unlock_put(struct mdt_thread_info * info, - struct mdt_object * o, - struct mdt_lock_handle *lh, - int decref) +void mdt_object_unlock_put(struct mdt_thread_info *info, + struct mdt_object *o, + struct mdt_lock_handle *lh, + int decref) { - mdt_object_unlock(info, o, lh, decref); - mdt_object_put(info->mti_env, o); + mdt_object_unlock(info, o, lh, decref); + mdt_object_put(info->mti_env, o); } /* @@ -3848,41 +4402,42 @@ void mdt_object_unlock_put(struct mdt_thread_info * info, static int mdt_body_unpack(struct mdt_thread_info *info, enum tgt_handler_flags flags) { - const struct mdt_body *body; - struct mdt_object *obj; - const struct lu_env *env; - struct req_capsule *pill; - int rc; - ENTRY; + const struct mdt_body *body; + struct mdt_object *obj; + const struct lu_env *env; + struct req_capsule *pill; + int rc; - env = info->mti_env; - pill = info->mti_pill; + ENTRY; - body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY); - if (body == NULL) - RETURN(-EFAULT); + env = info->mti_env; + pill = info->mti_pill; + + body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EFAULT); if (!(body->mbo_valid & OBD_MD_FLID)) RETURN(0); if (!fid_is_sane(&body->mbo_fid1)) { CERROR("Invalid fid: "DFID"\n", PFID(&body->mbo_fid1)); - RETURN(-EINVAL); - } + RETURN(-EINVAL); + } obj = mdt_object_find(env, info->mti_mdt, &body->mbo_fid1); if (!IS_ERR(obj)) { if ((flags & HAS_BODY) && !mdt_object_exists(obj)) { mdt_object_put(env, obj); rc = -ENOENT; - } else { - info->mti_object = obj; - rc = 0; - } - } else - rc = PTR_ERR(obj); + } else { + info->mti_object = obj; + rc = 0; + } + } else + rc = PTR_ERR(obj); - RETURN(rc); + RETURN(rc); } static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, @@ -3902,14 +4457,17 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, /* Pack reply. */ if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, - DEF_REP_MD_SIZE); + req_capsule_ptlreq(pill) ? + DEF_REP_MD_SIZE : MAX_MD_SIZE_OLD); + if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD * by default. If the target object has more ACL entries, then - * enlarge the buffer when necessary. */ + * enlarge the buffer when necessary. + */ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER)) req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); @@ -3925,21 +4483,28 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, RETURN(rc); } -void mdt_lock_handle_init(struct mdt_lock_handle *lh) +void mdt_thread_info_reset(struct mdt_thread_info *info) { - lh->mlh_type = MDT_NUL_LOCK; - lh->mlh_reg_lh.cookie = 0ull; - lh->mlh_reg_mode = LCK_MINMODE; - lh->mlh_pdo_lh.cookie = 0ull; - lh->mlh_pdo_mode = LCK_MINMODE; - lh->mlh_rreg_lh.cookie = 0ull; - lh->mlh_rreg_mode = LCK_MINMODE; -} + memset(&info->mti_attr, 0, sizeof(info->mti_attr)); + info->mti_body = NULL; + info->mti_dlm_req = NULL; + info->mti_cross_ref = 0; + info->mti_opdata = 0; + info->mti_big_lmm_used = 0; + info->mti_big_acl_used = 0; + info->mti_som_strict = 0; -void mdt_lock_handle_fini(struct mdt_lock_handle *lh) -{ - LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); - LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh)); + info->mti_spec.no_create = 0; + info->mti_spec.sp_rm_entry = 0; + info->mti_spec.sp_permitted = 0; + + info->mti_spec.u.sp_ea.eadata = NULL; + info->mti_spec.u.sp_ea.eadatalen = 0; + + if (info->mti_batch_env && info->mti_object != NULL) { + mdt_object_put(info->mti_env, info->mti_object); + info->mti_object = NULL; + } } /* @@ -3950,41 +4515,21 @@ void mdt_lock_handle_fini(struct mdt_lock_handle *lh) void mdt_thread_info_init(struct ptlrpc_request *req, struct mdt_thread_info *info) { - int i; + info->mti_pill = &req->rq_pill; - info->mti_pill = &req->rq_pill; - - /* lock handle */ - for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++) - mdt_lock_handle_init(&info->mti_lh[i]); - - /* mdt device: it can be NULL while CONNECT */ - if (req->rq_export) { - info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); - info->mti_exp = req->rq_export; - } else - info->mti_mdt = NULL; + /* mdt device: it can be NULL while CONNECT */ + if (req->rq_export) { + info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); + info->mti_exp = req->rq_export; + } else + info->mti_mdt = NULL; info->mti_env = req->rq_svc_thread->t_env; info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg); + info->mti_big_buf = LU_BUF_NULL; + info->mti_batch_env = 0; + info->mti_object = NULL; - memset(&info->mti_attr, 0, sizeof(info->mti_attr)); - info->mti_big_buf = LU_BUF_NULL; - info->mti_body = NULL; - info->mti_object = NULL; - info->mti_dlm_req = NULL; - info->mti_has_trans = 0; - info->mti_cross_ref = 0; - info->mti_opdata = 0; - info->mti_big_lmm_used = 0; - info->mti_big_acl_used = 0; - info->mti_som_valid = 0; - - info->mti_spec.no_create = 0; - info->mti_spec.sp_rm_entry = 0; - info->mti_spec.sp_permitted = 0; - - info->mti_spec.u.sp_ea.eadata = NULL; - info->mti_spec.u.sp_ea.eadatalen = 0; + mdt_thread_info_reset(info); } void mdt_thread_info_fini(struct mdt_thread_info *info) @@ -3997,7 +4542,7 @@ void mdt_thread_info_fini(struct mdt_thread_info *info) } for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++) - mdt_lock_handle_fini(&info->mti_lh[i]); + mdt_lock_handle_assert(&info->mti_lh[i]); info->mti_env = NULL; info->mti_pill = NULL; info->mti_exp = NULL; @@ -4027,7 +4572,7 @@ struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi) static int mdt_tgt_connect(struct tgt_session_info *tsi) { - if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) && + if (CFS_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) && cfs_fail_val == tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) schedule_timeout_uninterruptible(cfs_time_seconds(3)); @@ -4055,112 +4600,117 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, struct mdt_lock_handle *lh, __u64 flags, int result) { - struct ptlrpc_request *req = mdt_info_req(info); - struct ldlm_lock *lock = *lockp; + struct ptlrpc_request *req = mdt_info_req(info); + struct ldlm_lock *lock = *lockp; struct ldlm_lock *new_lock; /* If possible resent found a lock, @lh is set to its handle */ new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0); - if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) { - lh->mlh_reg_lh.cookie = 0; - RETURN(0); - } - - if (new_lock == NULL && (flags & LDLM_FL_RESENT)) { - /* Lock is pinned by ldlm_handle_enqueue0() as it is - * a resend case, however, it could be already destroyed - * due to client eviction or a raced cancel RPC. */ - LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n", - lh->mlh_reg_lh.cookie); + if (new_lock == NULL) { + if (flags & LDLM_FL_INTENT_ONLY) { + result = 0; + } else if (flags & LDLM_FL_RESENT) { + /* Lock is pinned by ldlm_handle_enqueue0() as it is a + * resend case, however, it could be already destroyed + * due to client eviction or a raced cancel RPC. + */ + LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n", + lh->mlh_reg_lh.cookie); + result = -ESTALE; + } else { + CERROR("%s: Invalid lockh=%#llx flags=%#llx fid1="DFID" fid2="DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), + lh->mlh_reg_lh.cookie, flags, + PFID(&info->mti_tmp_fid1), + PFID(&info->mti_tmp_fid2), result); + result = -ESTALE; + } lh->mlh_reg_lh.cookie = 0; - RETURN(-ESTALE); + RETURN(result); + } + + /* + * If we've already given this lock to a client once, then we should + * have no readers or writers. Otherwise, we should have one reader + * _or_ writer ref (which will be zeroed below) before returning the + * lock to a client. + */ + if (new_lock->l_export == req->rq_export) { + LASSERT(new_lock->l_readers + new_lock->l_writers == 0); + } else { + LASSERT(new_lock->l_export == NULL); + LASSERT(new_lock->l_readers + new_lock->l_writers == 1); } - LASSERTF(new_lock != NULL, - "lockh %#llx flags %#llx : rc = %d\n", - lh->mlh_reg_lh.cookie, flags, result); - - /* - * If we've already given this lock to a client once, then we should - * have no readers or writers. Otherwise, we should have one reader - * _or_ writer ref (which will be zeroed below) before returning the - * lock to a client. - */ - if (new_lock->l_export == req->rq_export) { - LASSERT(new_lock->l_readers + new_lock->l_writers == 0); - } else { - LASSERT(new_lock->l_export == NULL); - LASSERT(new_lock->l_readers + new_lock->l_writers == 1); - } - - *lockp = new_lock; - - if (new_lock->l_export == req->rq_export) { - /* - * Already gave this to the client, which means that we - * reconstructed a reply. - */ - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & - MSG_RESENT); + *lockp = new_lock; + + if (new_lock->l_export == req->rq_export) { + /* + * Already gave this to the client, which means that we + * reconstructed a reply. + */ + LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & + MSG_RESENT); LDLM_LOCK_RELEASE(new_lock); - lh->mlh_reg_lh.cookie = 0; - RETURN(ELDLM_LOCK_REPLACED); - } - - /* - * Fixup the lock to be given to the client. - */ - lock_res_and_lock(new_lock); - /* Zero new_lock->l_readers and new_lock->l_writers without triggering - * possible blocking AST. */ - while (new_lock->l_readers > 0) { - lu_ref_del(&new_lock->l_reference, "reader", new_lock); - lu_ref_del(&new_lock->l_reference, "user", new_lock); - new_lock->l_readers--; - } - while (new_lock->l_writers > 0) { - lu_ref_del(&new_lock->l_reference, "writer", new_lock); - lu_ref_del(&new_lock->l_reference, "user", new_lock); - new_lock->l_writers--; - } - - new_lock->l_export = class_export_lock_get(req->rq_export, new_lock); - new_lock->l_blocking_ast = lock->l_blocking_ast; - new_lock->l_completion_ast = lock->l_completion_ast; + lh->mlh_reg_lh.cookie = 0; + RETURN(ELDLM_LOCK_REPLACED); + } + + /* + * Fixup the lock to be given to the client. + */ + lock_res_and_lock(new_lock); + /* Zero new_lock->l_readers and new_lock->l_writers without triggering + * possible blocking AST. + */ + while (new_lock->l_readers > 0) { + lu_ref_del(&new_lock->l_reference, "reader", new_lock); + lu_ref_del(&new_lock->l_reference, "user", new_lock); + new_lock->l_readers--; + } + while (new_lock->l_writers > 0) { + lu_ref_del(&new_lock->l_reference, "writer", new_lock); + lu_ref_del(&new_lock->l_reference, "user", new_lock); + new_lock->l_writers--; + } + + new_lock->l_export = class_export_lock_get(req->rq_export, new_lock); + new_lock->l_blocking_ast = lock->l_blocking_ast; + new_lock->l_completion_ast = lock->l_completion_ast; if (ldlm_has_dom(new_lock)) new_lock->l_glimpse_ast = ldlm_server_glimpse_ast; - new_lock->l_remote_handle = lock->l_remote_handle; - new_lock->l_flags &= ~LDLM_FL_LOCAL; + new_lock->l_remote_handle = lock->l_remote_handle; + new_lock->l_flags &= ~LDLM_FL_LOCAL; - unlock_res_and_lock(new_lock); + unlock_res_and_lock(new_lock); - cfs_hash_add(new_lock->l_export->exp_lock_hash, - &new_lock->l_remote_handle, - &new_lock->l_exp_hash); + cfs_hash_add(new_lock->l_export->exp_lock_hash, + &new_lock->l_remote_handle, + &new_lock->l_exp_hash); - LDLM_LOCK_RELEASE(new_lock); - lh->mlh_reg_lh.cookie = 0; + LDLM_LOCK_RELEASE(new_lock); + lh->mlh_reg_lh.cookie = 0; - RETURN(ELDLM_LOCK_REPLACED); + RETURN(ELDLM_LOCK_REPLACED); } void mdt_intent_fixup_resent(struct mdt_thread_info *info, struct ldlm_lock *new_lock, struct mdt_lock_handle *lh, __u64 flags) { - struct ptlrpc_request *req = mdt_info_req(info); - struct ldlm_request *dlmreq; + struct ptlrpc_request *req = mdt_info_req(info); + struct ldlm_request *dlmreq; - if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) - return; + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) + return; - dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ); + dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ); - /* Check if this is a resend case (MSG_RESENT is set on RPC) and a - * lock was found by ldlm_handle_enqueue(); if so @lh must be - * initialized. */ + /* if this is a resend case (MSG_RESENT is set on RPC) and a lock was + * found by ldlm_handle_enqueue(); if so @lh must be initialized. + */ if (flags & LDLM_FL_RESENT) { lh->mlh_reg_lh.cookie = new_lock->l_handle.h_cookie; lh->mlh_reg_mode = new_lock->l_granted_mode; @@ -4178,12 +4728,12 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info, if (req_can_reconstruct(req, NULL) != 0) return; - /* - * This remote handle isn't enqueued, so we never received or processed - * this request. Clear MSG_RESENT, because it can be handled like any - * normal request now. - */ - lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); + /* + * This remote handle isn't enqueued, so we never received or processed + * this request. Clear MSG_RESENT, because it can be handled like any + * normal request now. + */ + lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle %#llx", dlmreq->lock_handle[0].cookie); @@ -4197,6 +4747,7 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *ldlm_rep = NULL; int rc; + ENTRY; /* @@ -4206,9 +4757,8 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) { - mdt_lock_reg_init(lhc, (*lockp)->l_req_mode); rc = mdt_object_lock(info, info->mti_object, lhc, - MDS_INODELOCK_XATTR); + MDS_INODELOCK_XATTR, (*lockp)->l_req_mode); if (rc) return rc; } @@ -4219,7 +4769,7 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); if (ldlm_rep == NULL || - OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) { + CFS_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) { mdt_object_unlock(info, info->mti_object, lhc, 1); if (is_serious(rc)) RETURN(rc); @@ -4229,8 +4779,7 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, ldlm_rep->lock_policy_res2 = clear_serious(rc); - /* This is left for interop instead of adding a new interop flag. - * LU-7433 */ + /* This is for interop instead of adding a new interop flag. LU-7433 */ #if LUSTRE_VERSION_CODE > OBD_OCD_VERSION(3, 0, 0, 0) if (ldlm_rep->lock_policy_res2) { mdt_object_unlock(info, info->mti_object, lhc, 1); @@ -4243,23 +4792,24 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, } static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, __u64 flags) { - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; - __u64 child_bits; - struct ldlm_reply *ldlm_rep; - struct mdt_body *reqbody; - struct mdt_body *repbody; - int rc, rc2; - ENTRY; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; + __u64 child_bits; + struct ldlm_reply *ldlm_rep; + struct mdt_body *reqbody; + struct mdt_body *repbody; + int rc, rc2; + + ENTRY; - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(reqbody); + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(reqbody); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(repbody); + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(repbody); info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); repbody->mbo_eadatasize = 0; @@ -4283,8 +4833,8 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, if (rc) GOTO(out_shrink, rc); - ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); - mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); + ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); /* Get lock from request for possible resent case. */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); @@ -4292,24 +4842,25 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep); ldlm_rep->lock_policy_res2 = clear_serious(rc); - if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG)) - ldlm_rep->lock_policy_res2 = 0; - if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || - ldlm_rep->lock_policy_res2) { - lhc->mlh_reg_lh.cookie = 0ull; - GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED); - } + if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG) && + ldlm_rep->lock_policy_res2 != -ENOKEY) + ldlm_rep->lock_policy_res2 = 0; + if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || + ldlm_rep->lock_policy_res2) { + lhc->mlh_reg_lh.cookie = 0ull; + GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED); + } rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc); - EXIT; + EXIT; out_ucred: - mdt_exit_ucred(info); + mdt_exit_ucred(info); out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; - return rc; + mdt_client_compatibility(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; + return rc; } static int mdt_intent_layout(enum ldlm_intent_flags it_opc, @@ -4335,14 +4886,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, if (intent == NULL) RETURN(-EPROTO); - CDEBUG(D_INFO, DFID "got layout change request from client: " - "opc:%u flags:%#x extent "DEXT"\n", - PFID(fid), intent->li_opc, intent->li_flags, - PEXT(&intent->li_extent)); + CDEBUG(D_INFO, DFID "got layout change request from client: opc:%u flags:%#x extent " + DEXT"\n", + PFID(fid), intent->lai_opc, intent->lai_flags, + PEXT(&intent->lai_extent)); - switch (intent->li_opc) { + switch (intent->lai_opc) { case LAYOUT_INTENT_TRUNC: case LAYOUT_INTENT_WRITE: + case LAYOUT_INTENT_PCCRO_SET: + case LAYOUT_INTENT_PCCRO_CLEAR: layout.mlc_opc = MD_LAYOUT_WRITE; layout.mlc_intent = intent; break; @@ -4353,11 +4906,11 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, case LAYOUT_INTENT_RELEASE: case LAYOUT_INTENT_RESTORE: CERROR("%s: Unsupported layout intent opc %d\n", - mdt_obd_name(info->mti_mdt), intent->li_opc); + mdt_obd_name(info->mti_mdt), intent->lai_opc); RETURN(-ENOTSUPP); default: CERROR("%s: Unknown layout intent opc %d\n", - mdt_obd_name(info->mti_mdt), intent->li_opc); + mdt_obd_name(info->mti_mdt), intent->lai_opc); RETURN(-EINVAL); } @@ -4457,27 +5010,27 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, struct ldlm_lock **lockp, __u64 flags) { - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; - struct ldlm_reply *rep = NULL; - long opc; - int rc; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; + struct ldlm_reply *rep = NULL; + long opc; + int rc; struct ptlrpc_request *req = mdt_info_req(info); - static const struct req_format *intent_fmts[REINT_MAX] = { - [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE, - [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN - }; + static const struct req_format *intent_fmts[REINT_MAX] = { + [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE, + [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN + }; - ENTRY; + ENTRY; opc = mdt_reint_opcode(mdt_info_req(info), intent_fmts); - if (opc < 0) - RETURN(opc); + if (opc < 0) + RETURN(opc); /* Get lock from request for possible resent case. */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); - rc = mdt_reint_internal(info, lhc, opc); + rc = mdt_reint_internal(info, lhc, opc); if (rc < 0 && lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) DEBUG_REQ(D_ERROR, req, "Replay open failed with %d", rc); @@ -4492,12 +5045,13 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, RETURN(err_serious(-EFAULT)); } - /* MDC expects this in any case */ - if (rc != 0) - mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD); + /* MDC expects this in any case */ + if (rc != 0) + mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD); /* the open lock or the lock for cross-ref object should be - * returned to the client */ + * returned to the client + */ if (lustre_handle_is_used(&lhc->mlh_reg_lh) && (rc == 0 || rc == -MDT_EREMOTE_OPEN)) { rep->lock_policy_res2 = 0; @@ -4507,22 +5061,22 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, rep->lock_policy_res2 = clear_serious(rc); - if (rep->lock_policy_res2 == -ENOENT && + if (rep->lock_policy_res2 == -ENOENT && mdt_get_disposition(rep, DISP_LOOKUP_NEG) && !mdt_get_disposition(rep, DISP_OPEN_CREATE)) rep->lock_policy_res2 = 0; lhc->mlh_reg_lh.cookie = 0ull; - if (rc == -ENOTCONN || rc == -ENODEV || - rc == -EOVERFLOW) { /**< if VBR failure then return error */ - /* - * If it is the disconnect error (ENODEV & ENOCONN), the error - * will be returned by rq_status, and client at ptlrpc layer - * will detect this, then disconnect, reconnect the import - * immediately, instead of impacting the following the rpc. - */ - RETURN(rc); - } + if (rc == -ENOTCONN || rc == -ENODEV || + rc == -EOVERFLOW) { /**< if VBR failure then return error */ + /* + * If it is the disconnect error (ENODEV & ENOCONN), the error + * will be returned by rq_status, and client at ptlrpc layer + * will detect this, then disconnect, reconnect the import + * immediately, instead of impacting the following the rpc. + */ + RETURN(rc); + } /* * For other cases, the error will be returned by intent, and client * will retrieve the result from intent. @@ -4546,6 +5100,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, struct ldlm_reply *rep; bool check_mdt_object = false; int rc; + ENTRY; switch (it_opc) { @@ -4562,7 +5117,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, break; case IT_GETATTR: check_mdt_object = true; - /* fallthrough */ + fallthrough; case IT_LOOKUP: it_format = &RQF_LDLM_INTENT_GETATTR; it_handler = &mdt_intent_getattr; @@ -4609,7 +5164,8 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, RETURN(-EPROTO); } - req_capsule_extend(pill, it_format); + if (!info->mti_batch_env) + req_capsule_extend(pill, it_format); rc = mdt_unpack_req_pack_rep(info, it_handler_flags); if (rc < 0) @@ -4621,13 +5177,13 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, if (it_handler_flags & IS_MUTABLE && mdt_rdonly(req->rq_export)) RETURN(-EROFS); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); /* execute policy */ rc = (*it_handler)(it_opc, info, lockp, flags); /* Check whether the reply has been packed successfully. */ - if (req->rq_repmsg != NULL) { + if (info->mti_batch_env || req->rq_repmsg != NULL) { rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); rep->lock_policy_res2 = ptlrpc_status_hton(rep->lock_policy_res2); @@ -4669,14 +5225,30 @@ static int mdt_intent_policy(const struct lu_env *env, tsi = tgt_ses_info(env); - info = tsi2mdt_info(tsi); + info = mdt_th_info(env); LASSERT(info != NULL); - pill = info->mti_pill; + + /* Check whether it is a sub request processing in a batch request */ + if (info->mti_batch_env) { + pill = info->mti_pill; + LASSERT(pill == &info->mti_sub_pill); + } else { + info = tsi2mdt_info(tsi); + pill = info->mti_pill; + } + LASSERT(pill->rc_req == req); ldesc = &info->mti_dlm_req->lock_desc; - if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) { - req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC); + if (info->mti_batch_env || + req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) { + /* + * For batch processing environment, the request format has + * already been set. + */ + if (!info->mti_batch_env) + req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC); + it = req_capsule_client_get(pill, &RMF_LDLM_INTENT); if (it != NULL) { mdt_ptlrpc_stats_update(req, it->opc); @@ -4688,7 +5260,8 @@ static int mdt_intent_policy(const struct lu_env *env, * later in ldlm. Let's check it now to see if we have * ibits corrupted somewhere in mdt_intent_opc(). * The case for client miss to set ibits has been - * processed by others. */ + * processed by others. + */ LASSERT(ergo(ldesc->l_resource.lr_type == LDLM_IBITS, ldesc->l_policy_data.l_inodebits.bits != 0)); } else { @@ -4718,7 +5291,9 @@ static int mdt_intent_policy(const struct lu_env *env, if (rc) rc = err_serious(rc); } - mdt_thread_info_fini(info); + + if (!info->mti_batch_env) + mdt_thread_info_fini(info); RETURN(rc); } @@ -4770,6 +5345,7 @@ static int mdt_register_lwp_callback(void *data) struct mdt_device *mdt = data; struct lu_server_fld *fld = mdt_seq_site(mdt)->ss_server_fld; int rc; + ENTRY; LASSERT(mdt_seq_site(mdt)->ss_node_id != 0); @@ -4781,7 +5357,8 @@ static int mdt_register_lwp_callback(void *data) } /* Allocate new sequence now to avoid creating local transaction - * in the normal transaction process */ + * in the normal transaction process + */ rc = seq_server_check_and_alloc_super(&env, mdt_seq_site(mdt)->ss_server_seq); if (rc < 0) @@ -4845,10 +5422,12 @@ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt) { struct seq_server_site *ss = mdt_seq_site(mdt); char *prefix; + ENTRY; /* check if this is adding the first MDC and controller is not yet - * initialized. */ + * initialized. + */ OBD_ALLOC_PTR(ss->ss_client_seq); if (ss->ss_client_seq == NULL) RETURN(-ENOMEM); @@ -4874,6 +5453,7 @@ static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt) { struct seq_server_site *ss; int rc; + ENTRY; ss = mdt_seq_site(mdt); @@ -4921,9 +5501,10 @@ out_seq_fini: * FLD wrappers */ static int mdt_fld_fini(const struct lu_env *env, - struct mdt_device *m) + struct mdt_device *m) { struct seq_server_site *ss = mdt_seq_site(m); + ENTRY; if (ss && ss->ss_server_fld) { @@ -4936,11 +5517,12 @@ static int mdt_fld_fini(const struct lu_env *env, } static int mdt_fld_init(const struct lu_env *env, - const char *uuid, - struct mdt_device *m) + const char *uuid, + struct mdt_device *m) { struct seq_server_site *ss; int rc; + ENTRY; ss = mdt_seq_site(m); @@ -4966,6 +5548,7 @@ static void mdt_stack_pre_fini(const struct lu_env *env, struct lustre_cfg_bufs *bufs; struct lustre_cfg *lcfg; struct mdt_thread_info *info; + ENTRY; LASSERT(top); @@ -4982,7 +5565,8 @@ static void mdt_stack_pre_fini(const struct lu_env *env, /* XXX: this is needed because all layers are referenced by * objects (some of them are pinned by osd, for example * * the proper solution should be a model where object used - * by osd only doesn't have mdt/mdd slices -bzzz */ + * by osd only doesn't have mdt/mdd slices -bzzz + */ lustre_cfg_bufs_reset(bufs, mdt_obd_name(m)); lustre_cfg_bufs_set_string(bufs, 1, NULL); OBD_ALLOC(lcfg, lustre_cfg_len(bufs->lcfg_bufcount, bufs->lcfg_buflen)); @@ -5003,6 +5587,7 @@ static void mdt_stack_fini(const struct lu_env *env, struct lustre_cfg *lcfg; struct mdt_thread_info *info; char flags[3] = ""; + ENTRY; info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); @@ -5048,6 +5633,7 @@ static int mdt_connect_to_next(const struct lu_env *env, struct mdt_device *m, struct obd_connect_data *data = NULL; struct obd_device *obd; int rc; + ENTRY; OBD_ALLOC_PTR(data); @@ -5088,7 +5674,8 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt, struct obd_device *obd; struct lustre_profile *lprof; struct lu_site *site; - ENTRY; + + ENTRY; /* in 1.8 we had the only device in the stack - MDS. * 2.0 introduces MDT, MDD, OSD; MDT starts others internally. @@ -5114,7 +5701,8 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt, * #02 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0 * 3:lustre-MDD0000 4:f * - * notice we build the stack from down to top: MDD first, then MDT */ + * notice we build the stack from down to top: MDD first, then MDT + */ name_size = MAX_OBD_NAME; uuid_size = MAX_OBD_NAME; @@ -5236,6 +5824,7 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt, struct lustre_profile *lprof; struct obd_connect_data *data; int rc; + ENTRY; LASSERT(mdt->mdt_qmt_exp == NULL); @@ -5252,7 +5841,8 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt, * We generate the QMT name from the MDT one, just replacing MD with QM * after all the preparations, the logical equivalent will be: * #01 (160)setup 0:lustre-QMT0000 1:lustre-QMT0000_UUID 2:0 - * 3:lustre-MDT0000-osd 4:f */ + * 3:lustre-MDT0000-osd 4:f + */ OBD_ALLOC(qmtname, MAX_OBD_NAME); OBD_ALLOC(uuid, UUID_MAX); OBD_ALLOC_PTR(bufs); @@ -5320,7 +5910,7 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt, mdt->mdt_qmt_dev = obd->obd_lu_dev; /* configure local quota objects */ - if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_INIT)) + if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_INIT)) rc = -EBADF; else rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env, @@ -5384,7 +5974,8 @@ static void mdt_quota_fini(const struct lu_env *env, struct mdt_device *mdt) /* mdt_getxattr() is used from mdt_intent_getxattr(), use this wrapper * for now. This will be removed along with converting rest of MDT code - * to use tgt_session_info */ + * to use tgt_session_info + */ static int mdt_tgt_getxattr(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); @@ -5403,7 +5994,7 @@ static int mdt_llog_open(struct tgt_session_info *tsi) { ENTRY; - if (!mdt_is_rootadmin(tsi2mdt_info(tsi))) + if (!mdt_changelog_allow(tsi2mdt_info(tsi))) RETURN(err_serious(-EACCES)); RETURN(tgt_llog_open(tsi)); @@ -5453,6 +6044,7 @@ TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_SWAP_LAYOUTS, mdt_swap_layouts), TGT_MDT_HDL(IS_MUTABLE, MDS_RMFID, mdt_rmfid), +TGT_MDT_HDL(IS_MUTABLE, MDS_BATCH, mdt_batch), }; static struct tgt_handler mdt_io_ops[] = { @@ -5462,14 +6054,19 @@ TGT_OST_HDL_HP(HAS_BODY | IS_MUTABLE, OST_BRW_WRITE, tgt_brw_write, mdt_hp_brw), TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_PUNCH, mdt_punch_hdl, - mdt_hp_punch), + mdt_hp_punch), TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC, mdt_data_sync), +TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE, + mdt_fallocate_hdl), TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek), +TGT_RPC_HANDLER(OST_FIRST_OPC, + 0, OST_SET_INFO, mdt_io_set_info, + &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION), }; static struct tgt_handler mdt_sec_ctx_ops[] = { TGT_SEC_HDL_VAR(0, SEC_CTX_INIT, mdt_sec_ctx_handle), -TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT,mdt_sec_ctx_handle), +TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT, mdt_sec_ctx_handle), TGT_SEC_HDL_VAR(0, SEC_CTX_FINI, mdt_sec_ctx_handle) }; @@ -5590,8 +6187,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; - mdt_fs_cleanup(env, m); - tgt_fini(env, &m->mdt_lut); mdt_hsm_cdt_fini(m); @@ -5648,6 +6243,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, int rc; long node_id; mntopt_t mntopts; + ENTRY; lu_device_init(&m->mdt_lu_dev, ldt); @@ -5670,7 +6266,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, LASSERT(obd != NULL); m->mdt_max_mdsize = MAX_MD_SIZE_OLD; - m->mdt_opts.mo_evict_tgt_nids = 1; + m->mdt_evict_tgt_nids = 1; m->mdt_opts.mo_cos = MDT_COS_DEFAULT; lmi = server_get_mount(dev); @@ -5686,41 +6282,51 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (rc) RETURN(rc); - obd->u.obt.obt_magic = OBT_MAGIC; - if (lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK) + obd_obt_init(obd); + if (test_bit(LMD_FLG_SKIP_LFSCK, lsi->lsi_lmd->lmd_flags)) m->mdt_skip_lfsck = 1; + if (test_bit(LMD_FLG_NO_CREATE, lsi->lsi_lmd->lmd_flags)) + m->mdt_lut.lut_no_create = 1; } /* Just try to get a DoM lock by default. Otherwise, having a group - * lock granted, it may get blocked for a long time. */ + * lock granted, it may get blocked for a long time. + */ m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN; /* DoM files are read at open and data is packed in the reply */ - m->mdt_opts.mo_dom_read_open = 1; + m->mdt_dom_read_open = 1; m->mdt_squash.rsi_uid = 0; m->mdt_squash.rsi_gid = 0; INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); spin_lock_init(&m->mdt_squash.rsi_lock); spin_lock_init(&m->mdt_lock); - m->mdt_enable_remote_dir = 1; - m->mdt_enable_striped_dir = 1; + m->mdt_enable_chprojid_gid = 0; m->mdt_enable_dir_migration = 1; m->mdt_enable_dir_restripe = 0; m->mdt_enable_dir_auto_split = 0; + m->mdt_enable_parallel_rename_dir = 1; + m->mdt_enable_parallel_rename_file = 1; + m->mdt_enable_parallel_rename_crossdir = 1; + m->mdt_enable_remote_dir = 1; m->mdt_enable_remote_dir_gid = 0; - m->mdt_enable_chprojid_gid = 0; m->mdt_enable_remote_rename = 1; + m->mdt_enable_striped_dir = 1; + m->mdt_enable_dmv_implicit_inherit = 1; m->mdt_dir_restripe_nsonly = 1; - m->mdt_enable_remote_subdir_mount = 1; + m->mdt_max_mod_rpcs_in_flight = OBD_MAX_RIF_DEFAULT; atomic_set(&m->mdt_mds_mds_conns, 0); atomic_set(&m->mdt_async_commit_count, 0); + atomic_set(&m->mdt_dmv_old_client_count, 0); m->mdt_lu_dev.ld_ops = &mdt_lu_ops; m->mdt_lu_dev.ld_obd = obd; /* Set this lu_device to obd for error handling purposes. */ obd->obd_lu_dev = &m->mdt_lu_dev; + strncpy(m->mdt_job_xattr, XATTR_NAME_JOB_DEFAULT, XATTR_JOB_MAX_LEN); + /* init the stack */ rc = mdt_stack_init((struct lu_env *)env, m, cfg); if (rc) { @@ -5740,13 +6346,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, /* failover is the default * FIXME: we do not failout mds0/mgs, which may cause some problems. * assumed whose ss_node_id == 0 XXX - * */ + */ obd->obd_replayable = 1; /* No connection accepted until configurations will finish */ obd->obd_no_conn = 1; if (cfg->lcfg_bufcount > 4 && LUSTRE_CFG_BUFLEN(cfg, 4) > 0) { char *str = lustre_cfg_string(cfg, 4); + if (strchr(str, 'n')) { CWARN("%s: recovery disabled\n", mdt_obd_name(m)); obd->obd_replayable = 0; @@ -5789,7 +6396,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_free_ns, rc); /* Amount of available space excluded from granting and reserved - * for metadata. It is a percentage of the total MDT size. */ + * for metadata. It is a percentage of the total MDT size. + */ tgd->tgd_reserved_pcnt = 10; if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits)) @@ -5797,16 +6405,15 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, else m->mdt_brw_size = ONE_MB_BRW_SIZE; - rc = mdt_fs_setup(env, m, obd, lsi); - if (rc) - GOTO(err_tgt, rc); + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP)) + GOTO(err_tgt, rc = -ENOENT); fid.f_seq = FID_SEQ_LOCAL_NAME; fid.f_oid = 1; fid.f_ver = 0; rc = local_oid_storage_init(env, m->mdt_bottom, &fid, &m->mdt_los); if (rc != 0) - GOTO(err_fs_cleanup, rc); + GOTO(err_tgt, rc); rc = mdt_hsm_cdt_init(m); if (rc != 0) { @@ -5834,13 +6441,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, else m->mdt_opts.mo_acl = 0; + m->mdt_enable_strict_som = 1; + /* XXX: to support suppgid for ACL, we enable identity_upcall - * by default, otherwise, maybe got unexpected -EACCESS. */ + * by default, otherwise, maybe got unexpected -EACCESS. + */ if (m->mdt_opts.mo_acl) identity_upcall = MDT_IDENTITY_UPCALL_PATH; m->mdt_identity_cache = upcall_cache_init(mdt_obd_name(m), identity_upcall, + UC_IDCACHE_HASH_SIZE, + 1200, /* entry expire: 20 mn */ + 30, /* acquire expire: 30 s */ + true, /* acquire can replay */ &mdt_identity_upcall_cache_ops); if (IS_ERR(m->mdt_identity_cache)) { rc = PTR_ERR(m->mdt_identity_cache); @@ -5864,17 +6478,18 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, ping_evictor_start(); - /* recovery will be started upon mdt_prepare() - * when the whole stack is complete and ready - * to serve the requests */ + /* recovery will be started upon mdt_prepare() when the whole stack is + * complete and ready to serve the requests + */ /* Reduce the initial timeout on an MDS because it doesn't need such * a long timeout as an OST does. Adaptive timeouts will adjust this - * value appropriately. */ + * value appropriately. + */ if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT) ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT; - if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_LOCAL_RECOV)) + if (test_bit(LMD_FLG_LOCAL_RECOV, lsi->lsi_lmd->lmd_flags)) m->mdt_lut.lut_local_recovery = 1; rc = mdt_restriper_start(m); @@ -5895,8 +6510,6 @@ err_free_hsm: err_los_fini: local_oid_storage_fini(env, m->mdt_los); m->mdt_los = NULL; -err_fs_cleanup: - mdt_fs_cleanup(env, m); err_tgt: /* keep recoverable clients */ obd->obd_fail = 1; @@ -5916,12 +6529,13 @@ err_fini_stack: err_lmi: if (lmi) server_put_mount(dev, true); - return(rc); + return rc; } /* For interoperability, the left element is old parameter, the right one * is the new version of the parameter, if some parameter is deprecated, - * the new version should be set as NULL. */ + * the new version should be set as NULL. + */ static struct cfg_interop_param mdt_interop_param[] = { { "mdt.group_upcall", NULL }, { "mdt.quota_type", NULL }, @@ -5934,13 +6548,14 @@ static struct cfg_interop_param mdt_interop_param[] = { /* used by MGS to process specific configurations */ static int mdt_process_config(const struct lu_env *env, - struct lu_device *d, struct lustre_cfg *cfg) + struct lu_device *d, struct lustre_cfg *cfg) { - struct mdt_device *m = mdt_dev(d); - struct md_device *md_next = m->mdt_child; - struct lu_device *next = md2lu_dev(md_next); - int rc; - ENTRY; + struct mdt_device *m = mdt_dev(d); + struct md_device *md_next = m->mdt_child; + struct lu_device *next = md2lu_dev(md_next); + int rc; + + ENTRY; switch (cfg->lcfg_command) { case LCFG_PARAM: { @@ -5962,8 +6577,8 @@ static int mdt_process_config(const struct lu_env *env, if (ptr != NULL) { if (ptr->new_param == NULL) { rc = 0; - CWARN("For interoperability, skip this %s." - " It is obsolete.\n", ptr->old_param); + CWARN("For interoperability, skip this %s. It is obsolete.\n", + ptr->old_param); break; } @@ -6001,12 +6616,12 @@ static int mdt_process_config(const struct lu_env *env, cfg->lcfg_buflens)); break; } - default: - /* others are passed further */ - rc = next->ld_ops->ldo_process_config(env, next, cfg); - break; - } - RETURN(rc); + default: + /* others are passed further */ + rc = next->ld_ops->ldo_process_config(env, next, cfg); + break; + } + RETURN(rc); } static struct lu_object *mdt_object_alloc(const struct lu_env *env, @@ -6036,31 +6651,35 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, atomic_set(&mo->mot_open_count, 0); mo->mot_restripe_offset = 0; INIT_LIST_HEAD(&mo->mot_restripe_linkage); + mo->mot_lsom_size = 0; + mo->mot_lsom_blocks = 0; + mo->mot_lsom_inited = false; RETURN(o); } RETURN(NULL); } static int mdt_object_init(const struct lu_env *env, struct lu_object *o, - const struct lu_object_conf *unused) + const struct lu_object_conf *unused) { - struct mdt_device *d = mdt_dev(o->lo_dev); - struct lu_device *under; - struct lu_object *below; - int rc = 0; - ENTRY; + struct mdt_device *d = mdt_dev(o->lo_dev); + struct lu_device *under; + struct lu_object *below; + int rc = 0; - CDEBUG(D_INFO, "object init, fid = "DFID"\n", - PFID(lu_object_fid(o))); + ENTRY; + + CDEBUG(D_INFO, "object init, fid = "DFID"\n", + PFID(lu_object_fid(o))); - under = &d->mdt_child->md_lu_dev; - below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); - if (below != NULL) { - lu_object_add(o, below); - } else - rc = -ENOMEM; + under = &d->mdt_child->md_lu_dev; + below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); + if (below != NULL) + lu_object_add(o, below); + else + rc = -ENOMEM; - RETURN(rc); + RETURN(rc); } static void mdt_object_free_rcu(struct rcu_head *head) @@ -6075,6 +6694,7 @@ static void mdt_object_free(const struct lu_env *env, struct lu_object *o) { struct mdt_object *mo = mdt_obj(o); struct lu_object_header *h; + ENTRY; h = o->lo_header; @@ -6131,7 +6751,8 @@ static int mdt_prepare(const struct lu_env *env, rc = lfsck_register_namespace(env, mdt->mdt_bottom, mdt->mdt_namespace); /* The LFSCK instance is registered just now, so it must be there when - * register the namespace to such instance. */ + * register the namespace to such instance. + */ LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc); if (mdt->mdt_seq_site.ss_node_id == 0) { @@ -6157,22 +6778,22 @@ static int mdt_prepare(const struct lu_env *env, } const struct lu_device_operations mdt_lu_ops = { - .ldo_object_alloc = mdt_object_alloc, - .ldo_process_config = mdt_process_config, + .ldo_object_alloc = mdt_object_alloc, + .ldo_process_config = mdt_process_config, .ldo_prepare = mdt_prepare, }; static const struct lu_object_operations mdt_obj_ops = { - .loo_object_init = mdt_object_init, - .loo_object_free = mdt_object_free, - .loo_object_print = mdt_object_print + .loo_object_init = mdt_object_init, + .loo_object_free = mdt_object_free, + .loo_object_print = mdt_object_print }; static int mdt_obd_set_info_async(const struct lu_env *env, - struct obd_export *exp, - __u32 keylen, void *key, - __u32 vallen, void *val, - struct ptlrpc_request_set *set) + struct obd_export *exp, + __u32 keylen, void *key, + __u32 vallen, void *val, + struct ptlrpc_request_set *set) { int rc; @@ -6229,6 +6850,7 @@ static int mdt_connect_internal(const struct lu_env *env, struct obd_connect_data *data, bool reconnect) { const char *obd_name = mdt_obd_name(mdt); + LASSERT(data != NULL); data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; @@ -6253,11 +6875,7 @@ static int mdt_connect_internal(const struct lu_env *env, data->ocd_brw_size = min(data->ocd_brw_size, mdt->mdt_brw_size); if (data->ocd_brw_size == 0) { - CERROR("%s: cli %s/%p ocd_connect_flags: %#llx " - "ocd_version: %x ocd_grant: %d ocd_index: %u " - "ocd_brw_size unexpectedly zero, network data " - "corruption? Refusing to connect this client\n", - obd_name, exp->exp_client_uuid.uuid, + CERROR("%s: cli %s/%p ocd_connect_flags: %#llx ocd_version: %x ocd_grant: %d ocd_index: %u ocd_brw_size unexpectedly zero, network data corruption? Refusing to connect this client\n", obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_connect_flags, data->ocd_version, data->ocd_grant, data->ocd_index); return -EPROTO; @@ -6271,7 +6889,8 @@ static int mdt_connect_internal(const struct lu_env *env, exp->exp_target_data.ted_pagebits = data->ocd_grant_blkbits; data->ocd_grant_blkbits = mdt->mdt_lut.lut_tgd.tgd_blockbits; /* ddp_inodespace may not be power-of-two value, eg. for ldiskfs - * it's LDISKFS_DIR_REC_LEN(20) = 28. */ + * it's LDISKFS_DIR_REC_LEN(20) = 28. + */ data->ocd_grant_inobits = fls(ddp->ddp_inodespace - 1); /* ocd_grant_tax_kb is in 1K byte blocks */ data->ocd_grant_tax_kb = ddp->ddp_extent_tax >> 10; @@ -6280,7 +6899,8 @@ static int mdt_connect_internal(const struct lu_env *env, /* Save connect_data we have so far because tgt_grant_connect() * uses it to calculate grant, and we want to save the client - * version before it is overwritten by LUSTRE_VERSION_CODE. */ + * version before it is overwritten by LUSTRE_VERSION_CODE. + */ exp->exp_connect_data = *data; if (OCD_HAS_FLAG(data, GRANT)) tgt_grant_connect(env, exp, data, !reconnect); @@ -6292,7 +6912,8 @@ static int mdt_connect_internal(const struct lu_env *env, * exp_connect_data.ocd_connect_flags in this case, since * tgt_client_new() needs to know if this is a lightweight * connection, and it is safe to expose this flag before - * connection processing completes. */ + * connection processing completes. + */ if (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) { spin_lock(&exp->exp_lock); *exp_connect_flags_ptr(exp) |= OBD_CONNECT_LIGHTWEIGHT; @@ -6323,9 +6944,18 @@ static int mdt_connect_internal(const struct lu_env *env, * exp_connect_data.ocd_connect_flags in this case, since * tgt_client_new() needs to know if this is client supports * multiple modify RPCs, and it is safe to expose this flag before - * connection processing completes. */ + * connection processing completes. + */ if (data->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) { - data->ocd_maxmodrpcs = max_mod_rpcs_per_client; + if (mdt_max_mod_rpcs_changed(mdt)) + /* The new mdt.*.max_mod_rpcs_in_flight parameter + * has not changed since initialization, but the + * deprecated module parameter was changed, + * so use that instead. + */ + data->ocd_maxmodrpcs = max_mod_rpcs_per_client; + else + data->ocd_maxmodrpcs = mdt->mdt_max_mod_rpcs_in_flight; spin_lock(&exp->exp_lock); *exp_connect_flags_ptr(exp) |= OBD_CONNECT_MULTIMODRPCS; spin_unlock(&exp->exp_lock); @@ -6337,20 +6967,17 @@ static int mdt_connect_internal(const struct lu_env *env, tgt_mask_cksum_types(&mdt->mdt_lut, &data->ocd_cksum_types); if (unlikely(data->ocd_cksum_types == 0)) { - CERROR("%s: Connect with checksum support but no " - "ocd_cksum_types is set\n", + CERROR("%s: Connect with checksum support but no ocd_cksum_types is set\n", exp->exp_obd->obd_name); RETURN(-EPROTO); } - CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return " - "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp), + CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return %x\n", + exp->exp_obd->obd_name, obd_export_nid2str(exp), cksum_types, data->ocd_cksum_types); } else { - /* This client does not support OBD_CONNECT_CKSUM - * fall back to CRC32 */ - CDEBUG(D_RPCTRACE, "%s: cli %s does not support " - "OBD_CONNECT_CKSUM, CRC32 will be used\n", + /* Client not support OBD_CONNECT_CKSUM? fall back to CRC32 */ + CDEBUG(D_RPCTRACE, "%s: cli %s does not support OBD_CONNECT_CKSUM, CRC32 will be used\n", exp->exp_obd->obd_name, obd_export_nid2str(exp)); } @@ -6363,6 +6990,12 @@ static int mdt_connect_internal(const struct lu_env *env, if (!mdt->mdt_lut.lut_dt_conf.ddp_has_lseek_data_hole) data->ocd_connect_flags2 &= ~OBD_CONNECT2_LSEEK; + if (!OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) && + !OCD_HAS_FLAG2(data, DMV_IMP_INHERIT)) { + atomic_inc(&mdt->mdt_dmv_old_client_count); + mdt->mdt_enable_dmv_implicit_inherit = 0; + } + return 0; } @@ -6372,6 +7005,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env, { struct lu_context ses; int rc; + ENTRY; rc = lu_context_init(&ses, LCT_SERVER_SESSION); @@ -6382,6 +7016,13 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env, lu_context_enter(&ses); mdt_ucred(info)->uc_valid = UCRED_OLD; + /* do not let rbac interfere with dirty flag internal system event */ + mdt_ucred(info)->uc_rbac_file_perms = 1; + mdt_ucred(info)->uc_rbac_dne_ops = 1; + mdt_ucred(info)->uc_rbac_quota_ops = 1; + mdt_ucred(info)->uc_rbac_byfid_ops = 1; + mdt_ucred(info)->uc_rbac_chlg_ops = 1; + mdt_ucred(info)->uc_rbac_fscrypt_admin = 1; rc = mdt_add_dirty_flag(info, mfd->mfd_object, &info->mti_attr); lu_context_exit(&ses); @@ -6401,38 +7042,40 @@ static int mdt_export_cleanup(struct obd_export *exp) struct lu_env env; struct mdt_file_data *mfd, *n; int rc = 0; + ENTRY; spin_lock(&med->med_open_lock); while (!list_empty(&med->med_open_head)) { struct list_head *tmp = med->med_open_head.next; + mfd = list_entry(tmp, struct mdt_file_data, mfd_list); /* Remove mfd handle so it can't be found again. - * We are consuming the mfd_list reference here. */ + * We are consuming the mfd_list reference here. + */ class_handle_unhash(&mfd->mfd_open_handle); list_move_tail(&mfd->mfd_list, &closing_list); } spin_unlock(&med->med_open_lock); - mdt = mdt_dev(obd->obd_lu_dev); - LASSERT(mdt != NULL); + mdt = mdt_dev(obd->obd_lu_dev); + LASSERT(mdt != NULL); - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) - RETURN(rc); + rc = lu_env_init(&env, LCT_MD_THREAD); + if (rc) + RETURN(rc); - info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); - LASSERT(info != NULL); - memset(info, 0, sizeof *info); - info->mti_env = &env; - info->mti_mdt = mdt; - info->mti_exp = exp; + info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); + LASSERT(info != NULL); + memset(info, 0, sizeof(*info)); + info->mti_env = &env; + info->mti_mdt = mdt; + info->mti_exp = exp; if (!list_empty(&closing_list)) { struct md_attr *ma = &info->mti_attr; - /* Close any open files (which may also cause orphan - * unlinking). */ + /* Close any open files (which may cause orphan unlinking). */ list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) { list_del_init(&mfd->mfd_list); ma->ma_need = ma->ma_valid = 0; @@ -6455,7 +7098,8 @@ static int mdt_export_cleanup(struct obd_export *exp) rc = mdt_ctxt_add_dirty_flag(&env, info, mfd); /* Don't unlink orphan on failover umount, LU-184 */ - if (exp->exp_flags & OBD_OPT_FAILOVER) { + if (exp->exp_flags & OBD_OPT_FAILOVER || + exp->exp_obd->obd_stopping) { ma->ma_valid = MA_FLAGS; ma->ma_attr_flags |= MDS_KEEP_ORPHAN; } @@ -6468,13 +7112,15 @@ static int mdt_export_cleanup(struct obd_export *exp) /* Do not erase record for recoverable client. */ if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) tgt_client_del(&env, exp); - lu_env_fini(&env); + lu_env_fini(&env); - RETURN(rc); + RETURN(rc); } static int mdt_obd_disconnect(struct obd_export *exp) { + struct obd_connect_data *data = &exp->exp_connect_data; + struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev); int rc; ENTRY; @@ -6482,16 +7128,14 @@ static int mdt_obd_disconnect(struct obd_export *exp) LASSERT(exp); class_export_get(exp); - if (!(exp->exp_flags & OBD_OPT_FORCE)) - tgt_grant_sanity_check(exp->exp_obd, __func__); - - if ((exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) && - !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)) { - struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev); + if (OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) && + atomic_dec_and_test(&mdt->mdt_mds_mds_conns)) + mdt_disable_slc(mdt); - if (atomic_dec_and_test(&mdt->mdt_mds_mds_conns)) - mdt_disable_slc(mdt); - } + if (!OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) && + !OCD_HAS_FLAG2(data, DMV_IMP_INHERIT) && + atomic_dec_and_test(&mdt->mdt_dmv_old_client_count)) + mdt->mdt_enable_dmv_implicit_inherit = 1; rc = server_disconnect_export(exp); if (rc != 0) @@ -6499,6 +7143,9 @@ static int mdt_obd_disconnect(struct obd_export *exp) tgt_grant_discard(exp); + if (!(exp->exp_flags & OBD_OPT_FORCE)) + tgt_grant_sanity_check(exp->exp_obd, __func__); + rc = mdt_export_cleanup(exp); nodemap_del_member(exp); class_export_put(exp); @@ -6516,7 +7163,8 @@ static int mdt_obd_connect(const struct lu_env *env, struct lustre_handle conn = { 0 }; struct mdt_device *mdt; int rc; - lnet_nid_t *client_nid = localdata; + struct lnet_nid *client_nid = localdata; + ENTRY; LASSERT(env != NULL); @@ -6550,7 +7198,7 @@ static int mdt_obd_connect(const struct lu_env *env, lexp = class_conn2export(&conn); LASSERT(lexp != NULL); - rc = nodemap_add_member(*client_nid, lexp); + rc = nodemap_add_member(client_nid, lexp); if (rc != 0 && rc != -EEXIST) GOTO(out, rc); @@ -6559,7 +7207,7 @@ static int mdt_obd_connect(const struct lu_env *env, struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd; LASSERT(lcd); - memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid); + memcpy(lcd->lcd_uuid, cluuid, sizeof(lcd->lcd_uuid)); rc = tgt_client_new(env, lexp); if (rc == 0) mdt_export_stats_init(obd, lexp, localdata); @@ -6572,7 +7220,8 @@ out: } else { *exp = lexp; /* Because we do not want this export to be evicted by pinger, - * let's not add this export to the timed chain list. */ + * let's not add this export to the timed chain list. + */ if (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) { spin_lock(&lexp->exp_obd->obd_dev_lock); list_del_init(&lexp->exp_obd_chain_timed); @@ -6589,14 +7238,15 @@ static int mdt_obd_reconnect(const struct lu_env *env, struct obd_connect_data *data, void *localdata) { - lnet_nid_t *client_nid = localdata; - int rc; + struct lnet_nid *client_nid = localdata; + int rc; + ENTRY; if (exp == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); - rc = nodemap_add_member(*client_nid, exp); + rc = nodemap_add_member(client_nid, exp); if (rc != 0 && rc != -EEXIST) RETURN(rc); @@ -6615,6 +7265,7 @@ static int mdt_init_export(struct obd_export *exp) { struct mdt_export_data *med = &exp->exp_mdt_data; int rc; + ENTRY; INIT_LIST_HEAD(&med->med_open_head); @@ -6628,20 +7279,20 @@ static int mdt_init_export(struct obd_export *exp) if (exp->exp_used_slots == NULL) RETURN(-ENOMEM); - /* self-export doesn't need client data and ldlm initialization */ - if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, - &exp->exp_client_uuid))) - RETURN(0); + /* self-export doesn't need client data and ldlm initialization */ + if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, + &exp->exp_client_uuid))) + RETURN(0); - rc = tgt_client_alloc(exp); - if (rc) + rc = tgt_client_alloc(exp); + if (rc) GOTO(err, rc); rc = ldlm_init_export(exp); if (rc) GOTO(err_free, rc); - RETURN(rc); + RETURN(rc); err_free: tgt_client_free(exp); @@ -6657,18 +7308,19 @@ err: static int mdt_destroy_export(struct obd_export *exp) { - ENTRY; + ENTRY; - target_destroy_export(exp); + target_destroy_export(exp); if (exp->exp_used_slots) OBD_FREE(exp->exp_used_slots, BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); - /* destroy can be called from failed obd_setup, so - * checking uuid is safer than obd_self_export */ - if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, - &exp->exp_client_uuid))) - RETURN(0); + /* destroy can be called from failed obd_setup, so + * checking uuid is safer than obd_self_export + */ + if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, + &exp->exp_client_uuid))) + RETURN(0); ldlm_destroy_export(exp); tgt_client_free(exp); @@ -6682,7 +7334,7 @@ static int mdt_destroy_export(struct obd_export *exp) */ tgt_grant_discard(exp); if (exp_connect_flags(exp) & OBD_CONNECT_GRANT) - exp->exp_obd->u.obt.obt_lut->lut_tgd.tgd_tot_granted_clients--; + obd2obt(exp->exp_obd)->obt_lut->lut_tgd.tgd_tot_granted_clients--; if (!(exp->exp_flags & OBD_OPT_FORCE)) tgt_grant_sanity_check(exp->exp_obd, __func__); @@ -6743,12 +7395,15 @@ static int mdt_path_current(struct mdt_thread_info *info, struct lu_name *tmpname = &info->mti_name; struct lu_fid *tmpfid = &info->mti_tmp_fid1; struct lu_buf *buf = &info->mti_big_buf; - struct md_attr *ma = &info->mti_attr; struct linkea_data ldata = { NULL }; bool first = true; struct mdt_object *mdt_obj; struct link_ea_header *leh; struct link_ea_entry *lee; + bool worthchecking = true; + bool needsfid = false; + bool supported = false; + int isenc = -1; char *ptr; int reclen; int rc = 0; @@ -6756,7 +7411,8 @@ static int mdt_path_current(struct mdt_thread_info *info, ENTRY; /* temp buffer for path element, the buffer will be finally freed - * in mdt_thread_info_fini */ + * in mdt_thread_info_fini + */ buf = lu_buf_check_and_alloc(buf, PATH_MAX); if (buf->lb_buf == NULL) RETURN(-ENOMEM); @@ -6791,6 +7447,37 @@ static int mdt_path_current(struct mdt_thread_info *info, GOTO(remote_out, rc = -EREMOTE); } + if (worthchecking) { + /* need to know if FID being looked up is encrypted */ + struct lu_attr la = { 0 }; + struct dt_object *dt = mdt_obj2dt(mdt_obj); + + if (dt && dt->do_ops && dt->do_ops->do_attr_get) + dt_attr_get(info->mti_env, dt, &la); + if (la.la_valid & LA_FLAGS && + la.la_flags & LUSTRE_ENCRYPT_FL) { + if (!supported && mdt_info_req(info) && + !exp_connect_encrypt_fid2path( + mdt_info_req(info)->rq_export)) { + /* client does not support fid2path + * for encrypted files + */ + mdt_object_put(info->mti_env, mdt_obj); + GOTO(out, rc = -ENODATA); + } else { + supported = true; + } + needsfid = true; + if (isenc == -1) + isenc = 1; + } else { + worthchecking = false; + needsfid = false; + if (isenc == -1) + isenc = 0; + } + } + rc = mdt_links_read(info, mdt_obj, &ldata); if (rc != 0) { mdt_object_put(info->mti_env, mdt_obj); @@ -6801,9 +7488,11 @@ static int mdt_path_current(struct mdt_thread_info *info, lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ linkea_entry_unpack(lee, &reclen, tmpname, tmpfid); /* If set, use link #linkno for path lookup, otherwise use - link #0. Only do this for the final path element. */ + * link #0. Only do this for the final path ement. + */ if (first && fp->gf_linkno < leh->leh_reccount) { int count; + for (count = 0; count < fp->gf_linkno; count++) { lee = (struct link_ea_entry *) ((char *)lee + reclen); @@ -6816,34 +7505,38 @@ static int mdt_path_current(struct mdt_thread_info *info, } /* Check if it is slave stripes */ - rc = mdt_stripe_get(info, mdt_obj, ma, XATTR_NAME_LMV); + rc = mdt_is_dir_stripe(info, mdt_obj); mdt_object_put(info->mti_env, mdt_obj); if (rc < 0) GOTO(out, rc); - - if (ma->ma_valid & MA_LMV) { - struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1; - - if (!lmv_is_sane2(lmv)) - GOTO(out, rc = -EBADF); - - /* For slave stripes, get its master */ - if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE) { - fp->gf_fid = *tmpfid; - continue; - } + if (rc == 1) { + fp->gf_fid = *tmpfid; + continue; } /* Pack the name in the end of the buffer */ ptr -= tmpname->ln_namelen; if (ptr - 1 <= fp->gf_u.gf_path) - GOTO(out, rc = -EOVERFLOW); + GOTO(out, rc = -ENAMETOOLONG); strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen); + if (needsfid) { + /* Pack FID before file name, so that client can build + * encoded/digested form. + */ + char fidstr[FID_LEN + 1]; + + snprintf(fidstr, sizeof(fidstr), DFID, + PFID(&fp->gf_fid)); + ptr -= strlen(fidstr); + if (ptr - 1 <= fp->gf_u.gf_path) + GOTO(out, rc = -ENAMETOOLONG); + strncpy(ptr, fidstr, strlen(fidstr)); + } *(--ptr) = '/'; - /* keep the last resolved fid to the client, so the - * client will build the left path on another MDT for - * remote object */ + /* keep the last resolved fid to the client, so the client will + * build the left path on another MDT for remote object + */ fp->gf_fid = *tmpfid; first = false; @@ -6853,7 +7546,8 @@ static int mdt_path_current(struct mdt_thread_info *info, rc = 0; remote_out: - ptr++; /* skip leading / */ + if (isenc != 1) + ptr++; /* skip leading / unless this is an encrypted file */ memmove(fp->gf_u.gf_path, ptr, fp->gf_u.gf_path + fp->gf_pathlen - ptr); @@ -6871,7 +7565,7 @@ out: * \param[in] info Per-thread common data shared by mdt level handlers. * \param[in] obj Object to do path lookup of * \param[in,out] fp User-provided struct for arguments and to store path - * information + * information * * \retval 0 Lookup successful, path information stored in fp * \retval negative errno if there was a problem @@ -6882,6 +7576,7 @@ static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj, struct mdt_device *mdt = info->mti_mdt; int tries = 3; int rc = -EAGAIN; + ENTRY; if (fp->gf_pathlen < 3) @@ -6912,7 +7607,7 @@ static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj, * * \param[in] info Per-thread common data shared by mdt level handlers. * \param[in,out] fp User-provided struct for arguments and to store path - * information + * information * * \retval 0 Lookup successful, path information and recno stored in fp * \retval -ENOENT, object does not exist @@ -6925,6 +7620,7 @@ static int mdt_fid2path(struct mdt_thread_info *info, struct mdt_device *mdt = info->mti_mdt; struct mdt_object *obj; int rc; + ENTRY; CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n", @@ -6934,12 +7630,24 @@ static int mdt_fid2path(struct mdt_thread_info *info, RETURN(-EINVAL); if (!fid_is_namespace_visible(&fp->gf_fid)) { - CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx" - ", or f_oid != 0, or f_ver == 0\n", mdt_obd_name(mdt), + CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx, or f_oid != 0, or f_ver == 0\n", + mdt_obd_name(mdt), PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL); RETURN(-EINVAL); } + /* return error if client-provided root fid is not the one stored in + * the export + */ + if (root_fid && !fid_is_zero(&info->mti_exp->exp_root_fid) && + !lu_fid_eq(root_fid, &info->mti_exp->exp_root_fid)) { + CDEBUG(D_INFO, + "%s: root fid from client "DFID" but "DFID" stored in export\n", + mdt_obd_name(mdt), PFID(root_fid), + PFID(&info->mti_exp->exp_root_fid)); + RETURN(-EXDEV); + } + obj = mdt_object_find(info->mti_env, mdt, &fp->gf_fid); if (IS_ERR(obj)) { rc = PTR_ERR(obj); @@ -6980,7 +7688,7 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen, struct lu_fid *root_fid = NULL; int rc = 0; - fpin = key + cfs_size_round(sizeof(KEY_FID2PATH)); + fpin = key + round_up(sizeof(KEY_FID2PATH), 8); fpout = val; if (req_capsule_req_need_swab(info->mti_pill)) @@ -6990,7 +7698,7 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen, if (fpout->gf_pathlen != vallen - sizeof(*fpin)) RETURN(-EINVAL); - if (keylen >= cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*fpin) + + if (keylen >= round_up(sizeof(KEY_FID2PATH), 8) + sizeof(*fpin) + sizeof(struct lu_fid)) { /* client sent its root FID, which is normally fileset FID */ root_fid = fpin->gf_u.gf_root_fid; @@ -7062,6 +7770,7 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg) struct mdt_object *obj; struct mdt_lock_handle *lh; int rc; + ENTRY; if (data->ioc_inlbuf1 == NULL || data->ioc_inllen1 != sizeof(*fid) || @@ -7075,12 +7784,10 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg) CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid)); - lh = &mti->mti_lh[MDT_LH_PARENT]; - mdt_lock_reg_init(lh, LCK_CR); - - obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE); - if (IS_ERR(obj)) - RETURN(PTR_ERR(obj)); + lh = &mti->mti_lh[MDT_LH_PARENT]; + obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE, LCK_CR); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); if (mdt_object_remote(obj)) { rc = -EREMOTE; @@ -7105,67 +7812,76 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg) static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct lu_env env; - struct obd_device *obd = exp->exp_obd; - struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); - struct dt_device *dt = mdt->mdt_bottom; - int rc; + struct obd_device *obd = exp->exp_obd; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + struct dt_device *dt = mdt->mdt_bottom; + struct obd_ioctl_data *data; + struct lu_env env; + int rc; + + ENTRY; + CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n", + obd->obd_name, cmd, len, karg, uarg); - ENTRY; - CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd); - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) - RETURN(rc); + rc = lu_env_init(&env, LCT_MD_THREAD); + if (rc) + RETURN(rc); + /* handle commands that don't use @karg first */ switch (cmd) { case OBD_IOC_SYNC: rc = mdt_device_sync(&env, mdt); - break; + GOTO(out, rc); case OBD_IOC_SET_READONLY: rc = dt_sync(&env, dt); if (rc == 0) rc = dt_ro(&env, dt); - break; - case OBD_IOC_ABORT_RECOVERY: { - struct obd_ioctl_data *data = karg; + GOTO(out, rc); + } + + if (unlikely(karg == NULL)) { + OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", rc = -EINVAL); + GOTO(out, rc); + } + data = karg; - CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt)); - if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) - obd->obd_abort_recov_mdt = 1; - else /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */ + switch (cmd) { + case OBD_IOC_ABORT_RECOVERY: { + if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) { + LCONSOLE_WARN("%s: Aborting MDT recovery\n", + obd->obd_name); + obd->obd_abort_mdt_recovery = 1; + wake_up(&obd->obd_next_transno_waitq); + } else { /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */ /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */ + LCONSOLE_WARN("%s: Aborting client recovery\n", + obd->obd_name); obd->obd_abort_recovery = 1; - - target_stop_recovery_thread(obd); + target_stop_recovery_thread(obd); + } rc = 0; break; } - case OBD_IOC_CHANGELOG_REG: - case OBD_IOC_CHANGELOG_DEREG: - case OBD_IOC_CHANGELOG_CLEAR: - rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env, - mdt->mdt_child, + case OBD_IOC_CHANGELOG_REG: + case OBD_IOC_CHANGELOG_DEREG: + case OBD_IOC_CHANGELOG_CLEAR: + case OBD_IOC_LLOG_PRINT: + case OBD_IOC_LLOG_CANCEL: + rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env, mdt->mdt_child, cmd, len, karg); - break; + break; case OBD_IOC_START_LFSCK: { struct md_device *next = mdt->mdt_child; - struct obd_ioctl_data *data = karg; struct lfsck_start_param lsp; - if (unlikely(data == NULL)) { - rc = -EINVAL; - break; - } - lsp.lsp_start = (struct lfsck_start *)(data->ioc_inlbuf1); lsp.lsp_index_valid = 0; rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, &lsp); break; } case OBD_IOC_STOP_LFSCK: { - struct md_device *next = mdt->mdt_child; - struct obd_ioctl_data *data = karg; - struct lfsck_stop stop; + struct md_device *next = mdt->mdt_child; + struct lfsck_stop stop; stop.ls_status = LS_STOPPED; /* Old lfsck utils may pass NULL @stop. */ @@ -7179,24 +7895,24 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, break; } case OBD_IOC_QUERY_LFSCK: { - struct md_device *next = mdt->mdt_child; - struct obd_ioctl_data *data = karg; + struct md_device *next = mdt->mdt_child; rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, data->ioc_inlbuf1); break; } - case OBD_IOC_GET_OBJ_VERSION: { - struct mdt_thread_info *mti; - mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key); - memset(mti, 0, sizeof *mti); - mti->mti_env = &env; - mti->mti_mdt = mdt; - mti->mti_exp = exp; - - rc = mdt_ioc_version_get(mti, karg); - break; - } + case OBD_IOC_GET_OBJ_VERSION: { + struct mdt_thread_info *mti; + + mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key); + memset(mti, 0, sizeof(*mti)); + mti->mti_env = &env; + mti->mti_mdt = mdt; + mti->mti_exp = exp; + + rc = mdt_ioc_version_get(mti, karg); + break; + } case OBD_IOC_CATLOGLIST: { struct mdt_thread_info *mti; @@ -7205,21 +7921,21 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rc = llog_catalog_list(&env, mdt->mdt_bottom, 0, karg, &mti->mti_tmp_fid1); break; - } + } default: - rc = -EOPNOTSUPP; - CERROR("%s: Not supported cmd = %d, rc = %d\n", - mdt_obd_name(mdt), cmd, rc); + rc = OBD_IOC_ERROR(obd->obd_name, cmd, "unrecognized", -ENOTTY); + break; } - - lu_env_fini(&env); - RETURN(rc); +out: + lu_env_fini(&env); + RETURN(rc); } static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt) { struct lu_device *ld = md2lu_dev(mdt->mdt_child); int rc; + ENTRY; if (!mdt->mdt_skip_lfsck && !mdt->mdt_bottom->dd_rdonly) { @@ -7241,46 +7957,48 @@ static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt) static int mdt_obd_postrecov(struct obd_device *obd) { - struct lu_env env; - int rc; + struct lu_env env; + int rc; - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) - RETURN(rc); - rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev)); - lu_env_fini(&env); - return rc; + rc = lu_env_init(&env, LCT_MD_THREAD); + if (rc) + RETURN(rc); + rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev)); + lu_env_fini(&env); + return rc; } static const struct obd_ops mdt_obd_device_ops = { - .o_owner = THIS_MODULE, - .o_set_info_async = mdt_obd_set_info_async, - .o_connect = mdt_obd_connect, - .o_reconnect = mdt_obd_reconnect, - .o_disconnect = mdt_obd_disconnect, - .o_init_export = mdt_init_export, - .o_destroy_export = mdt_destroy_export, - .o_iocontrol = mdt_iocontrol, - .o_postrecov = mdt_obd_postrecov, + .o_owner = THIS_MODULE, + .o_set_info_async = mdt_obd_set_info_async, + .o_connect = mdt_obd_connect, + .o_reconnect = mdt_obd_reconnect, + .o_disconnect = mdt_obd_disconnect, + .o_init_export = mdt_init_export, + .o_destroy_export = mdt_destroy_export, + .o_iocontrol = mdt_iocontrol, + .o_postrecov = mdt_obd_postrecov, /* Data-on-MDT IO methods */ .o_preprw = mdt_obd_preprw, .o_commitrw = mdt_obd_commitrw, }; -static struct lu_device* mdt_device_fini(const struct lu_env *env, - struct lu_device *d) +static struct lu_device *mdt_device_fini(const struct lu_env *env, + struct lu_device *d) { - struct mdt_device *m = mdt_dev(d); - ENTRY; + struct mdt_device *m = mdt_dev(d); - mdt_fini(env, m); - RETURN(NULL); + ENTRY; + + mdt_fini(env, m); + RETURN(NULL); } static struct lu_device *mdt_device_free(const struct lu_env *env, - struct lu_device *d) + struct lu_device *d) { struct mdt_device *m = mdt_dev(d); + ENTRY; lu_device_fini(&m->mdt_lu_dev); @@ -7290,33 +8008,33 @@ static struct lu_device *mdt_device_free(const struct lu_env *env, } static struct lu_device *mdt_device_alloc(const struct lu_env *env, - struct lu_device_type *t, - struct lustre_cfg *cfg) + struct lu_device_type *t, + struct lustre_cfg *cfg) { - struct lu_device *l; - struct mdt_device *m; + struct lu_device *l; + struct mdt_device *m; - OBD_ALLOC_PTR(m); - if (m != NULL) { - int rc; + OBD_ALLOC_PTR(m); + if (m != NULL) { + int rc; l = &m->mdt_lu_dev; - rc = mdt_init0(env, m, t, cfg); - if (rc != 0) { - mdt_device_free(env, l); - l = ERR_PTR(rc); - return l; - } - } else - l = ERR_PTR(-ENOMEM); - return l; + rc = mdt_init0(env, m, t, cfg); + if (rc != 0) { + mdt_device_free(env, l); + l = ERR_PTR(rc); + return l; + } + } else + l = ERR_PTR(-ENOMEM); + return l; } /* context key constructor/destructor: mdt_key_init, mdt_key_fini */ LU_KEY_INIT(mdt, struct mdt_thread_info); static void mdt_key_fini(const struct lu_context *ctx, - struct lu_context_key *key, void* data) + struct lu_context_key *key, void *data) { struct mdt_thread_info *info = data; @@ -7358,14 +8076,14 @@ struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info) */ void mdt_enable_cos(struct mdt_device *mdt, bool val) { - struct lu_env env; - int rc; + struct lu_env env; + int rc; mdt->mdt_opts.mo_cos = val; - rc = lu_env_init(&env, LCT_LOCAL); + rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) { - CWARN("%s: lu_env initialization failed, cannot " - "sync: rc = %d\n", mdt_obd_name(mdt), rc); + CWARN("%s: lu_env initialization failed, cannot sync: rc = %d\n", + mdt_obd_name(mdt), rc); return; } mdt_device_sync(&env, mdt); @@ -7381,7 +8099,7 @@ void mdt_enable_cos(struct mdt_device *mdt, bool val) */ int mdt_cos_is_enabled(struct mdt_device *mdt) { - return mdt->mdt_opts.mo_cos != 0; + return mdt->mdt_opts.mo_cos != 0; } static const struct lu_device_type_operations mdt_device_type_ops = { @@ -7391,10 +8109,10 @@ static const struct lu_device_type_operations mdt_device_type_ops = { }; static struct lu_device_type mdt_device_type = { - .ldt_tags = LU_DEVICE_MD, - .ldt_name = LUSTRE_MDT_NAME, - .ldt_ops = &mdt_device_type_ops, - .ldt_ctx_tags = LCT_MD_THREAD + .ldt_tags = LU_DEVICE_MD, + .ldt_name = LUSTRE_MDT_NAME, + .ldt_ops = &mdt_device_type_ops, + .ldt_ctx_tags = LCT_MD_THREAD }; static int __init mdt_init(void) @@ -7405,6 +8123,11 @@ static int __init mdt_init(void) FID_NOBRACE_LEN + 1); BUILD_BUG_ON(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") != FID_LEN + 1); + + rc = libcfs_setup(); + if (rc) + return rc; + rc = lu_kmem_init(mdt_caches); if (rc) return rc;