X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=9181c9a21a15bf0bd205860f20e98639e153c12f;hp=94a675a81fe7146449b9aed269195eb1c32c641f;hb=HEAD;hpb=75a417fa0065d52a31215daaaaf41c0fa9751a89 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 94a675a..ab01713 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. * * lustre/mdt/mdt_handler.c * @@ -58,27 +57,61 @@ #include #include #include +#include #include #include #include #include #include +#include #include "mdt_internal.h" -static unsigned int max_mod_rpcs_per_client = 8; -module_param(max_mod_rpcs_per_client, uint, 0644); -MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client"); +#if OBD_OCD_VERSION(3, 0, 53, 0) > LUSTRE_VERSION_CODE +static int mdt_max_mod_rpcs_per_client_set(const char *val, + cfs_kernel_param_arg_t *kp) +{ + unsigned int num; + int rc; + + rc = kstrtouint(val, 0, &num); + if (rc < 0) + return rc; + + if (num < 1 || num > OBD_MAX_RIF_MAX) + return -EINVAL; + + CWARN("max_mod_rpcs_per_client is deprecated, set mdt.*.max_mod_rpcs_in_flight parameter instead\n"); + + max_mod_rpcs_per_client = num; + return 0; +} +static const struct kernel_param_ops + param_ops_max_mod_rpcs_per_client = { + .set = mdt_max_mod_rpcs_per_client_set, + .get = param_get_uint, +}; + +#define param_check_max_mod_rpcs_per_client(name, p) \ + __param_check(name, p, unsigned int) + +module_param_cb(max_mod_rpcs_per_client, + ¶m_ops_max_mod_rpcs_per_client, + &max_mod_rpcs_per_client, 0644); + +MODULE_PARM_DESC(max_mod_rpcs_per_client, + "maximum number of modify RPCs in flight allowed per client (Deprecated)"); +#endif mdl_mode_t mdt_mdl_lock_modes[] = { - [LCK_MINMODE] = MDL_MINMODE, - [LCK_EX] = MDL_EX, - [LCK_PW] = MDL_PW, - [LCK_PR] = MDL_PR, - [LCK_CW] = MDL_CW, - [LCK_CR] = MDL_CR, - [LCK_NL] = MDL_NL, - [LCK_GROUP] = MDL_GROUP + [LCK_MINMODE] = MDL_MINMODE, + [LCK_EX] = MDL_EX, + [LCK_PW] = MDL_PW, + [LCK_PR] = MDL_PR, + [LCK_CW] = MDL_CW, + [LCK_CR] = MDL_CR, + [LCK_NL] = MDL_NL, + [LCK_GROUP] = MDL_GROUP }; enum ldlm_mode mdt_dlm_lock_modes[] = { @@ -155,17 +188,34 @@ void mdt_set_disposition(struct mdt_thread_info *info, rep->lock_policy_res1 |= op_flag; } +/* assert lock is unlocked before reuse */ +static inline void mdt_lock_handle_assert(struct mdt_lock_handle *lh) +{ + LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); + LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh)); + LASSERT(!lustre_handle_is_used(&lh->mlh_rreg_lh)); +} + void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm) { + mdt_lock_handle_assert(lh); lh->mlh_pdo_hash = 0; lh->mlh_reg_mode = lm; lh->mlh_rreg_mode = lm; lh->mlh_type = MDT_REG_LOCK; } +void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock) +{ + mdt_lock_reg_init(lh, lock->l_req_mode); + if (lock->l_req_mode == LCK_GROUP) + lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid; +} + void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode, const struct lu_name *lname) { + mdt_lock_handle_assert(lh); lh->mlh_reg_mode = lock_mode; lh->mlh_pdo_mode = LCK_MINMODE; lh->mlh_rreg_mode = lock_mode; @@ -176,11 +226,11 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode, lname->ln_namelen); /* XXX Workaround for LU-2856 * - * Zero is a valid return value of full_name_hash, but - * several users of mlh_pdo_hash assume a non-zero - * hash value. We therefore map zero onto an - * arbitrary, but consistent value (1) to avoid - * problems further down the road. */ + * Zero is a valid return value of full_name_hash, but several + * users of mlh_pdo_hash assume a non-zero hash value. We + * therefore map zero onto an arbitrary, but consistent + * value (1) to avoid problems further down the road. + */ if (unlikely(lh->mlh_pdo_hash == 0)) lh->mlh_pdo_hash = 1; } else { @@ -189,78 +239,114 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode, } static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh) -{ - mdl_mode_t mode; - ENTRY; - - /* - * Any dir access needs couple of locks: - * - * 1) on part of dir we gonna take lookup/modify; - * - * 2) on whole dir to protect it from concurrent splitting and/or to - * flush client's cache for readdir(). - * - * so, for a given mode and object this routine decides what lock mode - * to use for lock #2: - * - * 1) if caller's gonna lookup in dir then we need to protect dir from - * being splitted only - LCK_CR - * - * 2) if caller's gonna modify dir then we need to protect dir from - * being splitted and to flush cache - LCK_CW - * - * 3) if caller's gonna modify dir and that dir seems ready for - * splitting then we need to protect it from any type of access - * (lookup/modify/split) - LCK_EX --bzzz - */ - - LASSERT(lh->mlh_reg_mode != LCK_MINMODE); - LASSERT(lh->mlh_pdo_mode == LCK_MINMODE); - - /* - * Ask underlaying level its opinion about preferable PDO lock mode - * having access type passed as regular lock mode: - * - * - MDL_MINMODE means that lower layer does not want to specify lock - * mode; - * - * - MDL_NL means that no PDO lock should be taken. This is used in some - * cases. Say, for non-splittable directories no need to use PDO locks - * at all. - */ - mode = mdo_lock_mode(info->mti_env, mdt_object_child(o), - mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode)); - - if (mode != MDL_MINMODE) { - lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode); - } else { - /* - * Lower layer does not want to specify locking mode. We do it - * our selves. No special protection is needed, just flush - * client's cache on modification and allow concurrent - * mondification. - */ - switch (lh->mlh_reg_mode) { - case LCK_EX: - lh->mlh_pdo_mode = LCK_EX; - break; - case LCK_PR: - lh->mlh_pdo_mode = LCK_CR; - break; - case LCK_PW: - lh->mlh_pdo_mode = LCK_CW; - break; - default: - CERROR("Not expected lock type (0x%x)\n", - (int)lh->mlh_reg_mode); - LBUG(); - } - } - - LASSERT(lh->mlh_pdo_mode != LCK_MINMODE); - EXIT; + struct mdt_lock_handle *lh) +{ + mdl_mode_t mode; + + ENTRY; + + /* + * Any dir access needs couple of locks: + * + * 1) on part of dir we gonna take lookup/modify; + * + * 2) on whole dir to protect it from concurrent splitting and/or to + * flush client's cache for readdir(). + * + * so, for a given mode and object this routine decides what lock mode + * to use for lock #2: + * + * 1) if caller's gonna lookup in dir then we need to protect dir from + * being splitted only - LCK_CR + * + * 2) if caller's gonna modify dir then we need to protect dir from + * being splitted and to flush cache - LCK_CW + * + * 3) if caller's gonna modify dir and that dir seems ready for + * splitting then we need to protect it from any type of access + * (lookup/modify/split) - LCK_EX --bzzz + */ + + LASSERT(lh->mlh_reg_mode != LCK_MINMODE); + LASSERT(lh->mlh_pdo_mode == LCK_MINMODE); + + /* + * Ask underlaying level its opinion about preferable PDO lock mode + * having access type passed as regular lock mode: + * + * - MDL_MINMODE means that lower layer does not want to specify lock + * mode; + * + * - MDL_NL means that no PDO lock should be taken. This is used in some + * cases. Say, for non-splittable directories no need to use PDO locks + * at all. + */ + mode = mdo_lock_mode(info->mti_env, mdt_object_child(o), + mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode)); + + if (mode != MDL_MINMODE) { + lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode); + } else { + /* + * Lower layer does not want to specify locking mode. We do it + * our selves. No special protection is needed, just flush + * client's cache on modification and allow concurrent + * mondification. + */ + switch (lh->mlh_reg_mode) { + case LCK_EX: + lh->mlh_pdo_mode = LCK_EX; + break; + case LCK_PR: + lh->mlh_pdo_mode = LCK_CR; + break; + case LCK_PW: + lh->mlh_pdo_mode = LCK_CW; + break; + default: + CERROR("Not expected lock type (0x%x)\n", + (int)lh->mlh_reg_mode); + LBUG(); + } + } + + LASSERT(lh->mlh_pdo_mode != LCK_MINMODE); + EXIT; +} + +/** + * Check whether \a o is directory stripe object. + * + * \param[in] info thread environment + * \param[in] o MDT object + * + * \retval 1 is directory stripe. + * \retval 0 isn't directory stripe. + * \retval < 1 error code + */ +static int mdt_is_dir_stripe(struct mdt_thread_info *info, + struct mdt_object *o) +{ + struct md_attr *ma = &info->mti_attr; + struct lmv_mds_md_v1 *lmv; + int rc; + + rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); + if (rc < 0) + return rc; + + if (!(ma->ma_valid & MA_LMV)) + return 0; + + lmv = &ma->ma_lmv->lmv_md_v1; + + if (!lmv_is_sane2(lmv)) + return -EBADF; + + if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE) + return 1; + + return 0; } static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, @@ -268,9 +354,9 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, { struct mdt_device *mdt = info->mti_mdt; struct lu_name *lname = &info->mti_name; + const char *start = fileset; char *filename = info->mti_filename; - struct mdt_object *parent; - u32 mode; + struct mdt_object *obj; int rc = 0; LASSERT(!info->mti_cross_ref); @@ -282,8 +368,8 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, */ *fid = mdt->mdt_md_root_fid; - while (rc == 0 && fileset != NULL && *fileset != '\0') { - const char *s1 = fileset; + while (rc == 0 && start != NULL && *start != '\0') { + const char *s1 = start; const char *s2; while (*++s1 == '/') @@ -295,7 +381,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, if (s2 == s1) break; - fileset = s2; + start = s2; lname->ln_namelen = s2 - s1; if (lname->ln_namelen > NAME_MAX) { @@ -304,8 +390,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, } /* reject .. as a path component */ - if (lname->ln_namelen == 2 && - strncmp(s1, "..", 2) == 0) { + if (lname->ln_namelen == 2 && strncmp(s1, "..", 2) == 0) { rc = -EINVAL; break; } @@ -314,27 +399,18 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, filename[lname->ln_namelen] = '\0'; lname->ln_name = filename; - parent = mdt_object_find(info->mti_env, mdt, fid); - if (IS_ERR(parent)) { - rc = PTR_ERR(parent); + obj = mdt_object_find(info->mti_env, mdt, fid); + if (IS_ERR(obj)) { + rc = PTR_ERR(obj); break; } /* Only got the fid of this obj by name */ fid_zero(fid); - rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, + rc = mdo_lookup(info->mti_env, mdt_object_child(obj), lname, fid, &info->mti_spec); - mdt_object_put(info->mti_env, parent); - } - if (!rc) { - parent = mdt_object_find(info->mti_env, mdt, fid); - if (IS_ERR(parent)) - rc = PTR_ERR(parent); - else { - mode = lu_object_attr(&parent->mot_obj); - mdt_object_put(info->mti_env, parent); - if (!S_ISDIR(mode)) - rc = -ENOTDIR; - } + if (!rc && !S_ISDIR(lu_object_attr(&obj->mot_obj))) + rc = -ENOTDIR; + mdt_object_put(info->mti_env, obj); } return rc; @@ -356,7 +432,7 @@ static int mdt_get_root(struct tgt_session_info *tsi) if (rc) GOTO(out, rc = err_serious(rc)); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK)) GOTO(out, rc = err_serious(-ENOMEM)); repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); @@ -371,7 +447,8 @@ static int mdt_get_root(struct tgt_session_info *tsi) CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset); if (fileset) { /* consider fileset from client as a sub-fileset - * of the nodemap one */ + * of the nodemap one + */ OBD_ALLOC(buffer, PATH_MAX + 1); if (buffer == NULL) GOTO(out, rc = err_serious(-ENOMEM)); @@ -393,6 +470,7 @@ static int mdt_get_root(struct tgt_session_info *tsi) } else { repbody->mbo_fid1 = mdt->mdt_md_root_fid; } + exp->exp_root_fid = repbody->mbo_fid1; repbody->mbo_valid |= OBD_MD_FLID; EXIT; @@ -414,22 +492,25 @@ static int mdt_statfs(struct tgt_session_info *tsi) struct obd_statfs *osfs; struct mdt_body *reqbody = NULL; struct mdt_statfs_cache *msf; + ktime_t kstart = ktime_get(); + int current_blockbits; int rc; + timeout_t at_est; ENTRY; svcpt = req->rq_rqbd->rqbd_svcpt; /* This will trigger a watchdog timeout */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, - (MDT_SERVICE_WATCHDOG_FACTOR * - at_get(&svcpt->scp_at_estimate)) + 1); + at_est = obd_at_get(mdt->mdt_lu_dev.ld_obd, &svcpt->scp_at_estimate); + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, + (MDT_SERVICE_WATCHDOG_FACTOR * at_est) + 1); rc = mdt_check_ucred(info); if (rc) GOTO(out, rc = err_serious(rc)); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) GOTO(out, rc = err_serious(-ENOMEM)); osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS); @@ -450,74 +531,135 @@ static int mdt_statfs(struct tgt_session_info *tsi) msf = &mdt->mdt_osfs; if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) { - /** statfs data is too old, get up-to-date one */ - if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) - rc = next->md_ops->mdo_statfs(info->mti_env, - next, osfs); - else - rc = dt_statfs(info->mti_env, mdt->mdt_bottom, - osfs); - if (rc) - GOTO(out, rc); - spin_lock(&mdt->mdt_lock); - msf->msf_osfs = *osfs; - msf->msf_age = ktime_get_seconds(); - spin_unlock(&mdt->mdt_lock); + /** statfs data is too old, get up-to-date one */ + if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS) + rc = next->md_ops->mdo_statfs(info->mti_env, next, + osfs); + else + rc = dt_statfs(info->mti_env, mdt->mdt_bottom, osfs); + if (rc) + GOTO(out, rc); + spin_lock(&mdt->mdt_lock); + msf->msf_osfs = *osfs; + msf->msf_age = ktime_get_seconds(); + spin_unlock(&mdt->mdt_lock); } else { - /** use cached statfs data */ - spin_lock(&mdt->mdt_lock); - *osfs = msf->msf_osfs; - spin_unlock(&mdt->mdt_lock); - } + /** use cached statfs data */ + spin_lock(&mdt->mdt_lock); + *osfs = msf->msf_osfs; + spin_unlock(&mdt->mdt_lock); + } + + /* tgd_blockbit is recordsize bits set during mkfs. + * This once set does not change. However, 'zfs set' + * can be used to change the MDT blocksize. Instead + * of using cached value of 'tgd_blockbit' always + * calculate the blocksize bits which may have + * changed. + */ + current_blockbits = fls64(osfs->os_bsize) - 1; - /* at least try to account for cached pages. its still racy and - * might be under-reporting if clients haven't announced their - * caches with brw recently */ - CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu" - " pending %llu free %llu avail %llu\n", + /* Account for cached pages. its still racy and might be under-reporting + * if clients haven't announced their caches with brw recently + */ + CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu pending %llu free %llu avail %llu\n", tgd->tgd_tot_dirty, tgd->tgd_tot_granted, tgd->tgd_tot_pending, - osfs->os_bfree << tgd->tgd_blockbits, - osfs->os_bavail << tgd->tgd_blockbits); + osfs->os_bfree << current_blockbits, + osfs->os_bavail << current_blockbits); osfs->os_bavail -= min_t(u64, osfs->os_bavail, ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending + - osfs->os_bsize - 1) >> tgd->tgd_blockbits)); + osfs->os_bsize - 1) >> current_blockbits)); tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__); + if (mdt->mdt_lut.lut_no_create) + osfs->os_state |= OS_STATFS_NOCREATE; CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; " "%llu objects: %llu free; state %x\n", osfs->os_blocks, osfs->os_bfree, osfs->os_bavail, osfs->os_files, osfs->os_ffree, osfs->os_state); if (!exp_grant_param_supp(tsi->tsi_exp) && - tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) { + current_blockbits > COMPAT_BSIZE_SHIFT) { /* clients which don't support OBD_CONNECT_GRANT_PARAM * should not see a block size > page size, otherwise * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12) * block size which is the biggest block size known to work - * with all client's page size. */ - osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; - osfs->os_bfree <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; - osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; + * with all client's page size. + */ + osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT; + osfs->os_bfree <<= current_blockbits - COMPAT_BSIZE_SHIFT; + osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT; osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT; } if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_STATFS); + mdt_counter_incr(req, LPROC_MDT_STATFS, + ktime_us_delta(ktime_get(), kstart)); out: mdt_thread_info_fini(info); RETURN(rc); } -/** - * Pack size attributes into the reply. - */ +__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only) +{ + struct lov_comp_md_v1 *comp_v1; + struct lov_mds_md *v1; + __u32 off; + __u32 dom_stripesize = 0; + int i; + bool has_ost_stripes = false; + + ENTRY; + + if (is_dom_only) + *is_dom_only = 0; + + if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1) + RETURN(0); + + comp_v1 = (struct lov_comp_md_v1 *)lmm; + off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + off); + + /* Fast check for DoM entry with no mirroring, should be the first */ + if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 && + !(lov_pattern(le32_to_cpu(v1->lmm_pattern)) & LOV_PATTERN_MDT)) + RETURN(0); + + /* check all entries otherwise */ + for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) { + struct lov_comp_md_entry_v1 *lcme; + + lcme = &comp_v1->lcm_entries[i]; + if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT)) + continue; + + off = le32_to_cpu(lcme->lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + off); + + if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) & + LOV_PATTERN_MDT) + dom_stripesize = le32_to_cpu(v1->lmm_stripe_size); + else + has_ost_stripes = true; + + if (dom_stripesize && has_ost_stripes) + RETURN(dom_stripesize); + } + /* DoM-only case exits here */ + if (is_dom_only && dom_stripesize) + *is_dom_only = 1; + RETURN(dom_stripesize); +} + +/* Pack size attributes into the reply. */ int mdt_pack_size2body(struct mdt_thread_info *info, const struct lu_fid *fid, struct lustre_handle *lh) { struct mdt_body *b; struct md_attr *ma = &info->mti_attr; - int dom_stripe; + __u32 dom_stripe; bool dom_lock = false; ENTRY; @@ -528,9 +670,9 @@ int mdt_pack_size2body(struct mdt_thread_info *info, !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)) RETURN(-ENODATA); - dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm); + dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm); /* no DoM stripe, no size in reply */ - if (dom_stripe == LMM_NO_DOM) + if (!dom_stripe) RETURN(-ENOENT); if (lustre_handle_is_used(lh)) { @@ -548,14 +690,15 @@ int mdt_pack_size2body(struct mdt_thread_info *info, RETURN(0); /* Either DoM lock exists or LMM has only DoM stripe then - * return size on body. */ + * return size on body. + */ b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock); RETURN(0); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL /* * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap. * @@ -583,6 +726,7 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody, if (buf->lb_len == 0) RETURN(0); + LASSERT(!info->mti_big_acl_used); again: rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS); if (rc < 0) { @@ -592,74 +736,47 @@ again: rc = 0; } else if (rc == -EOPNOTSUPP) { rc = 0; - } else { - if (rc == -ERANGE && - exp_connect_large_acl(info->mti_exp) && - buf->lb_buf != info->mti_big_acl) { + } else if (rc == -ERANGE) { + if (exp_connect_large_acl(info->mti_exp) && + !info->mti_big_acl_used) { if (info->mti_big_acl == NULL) { + info->mti_big_aclsize = + min_t(unsigned int, + mdt->mdt_max_ea_size, + XATTR_SIZE_MAX); OBD_ALLOC_LARGE(info->mti_big_acl, - mdt->mdt_max_ea_size); + info->mti_big_aclsize); if (info->mti_big_acl == NULL) { + info->mti_big_aclsize = 0; CERROR("%s: unable to grow " DFID" ACL buffer\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o))); RETURN(-ENOMEM); } - - info->mti_big_aclsize = - mdt->mdt_max_ea_size; } CDEBUG(D_INODE, "%s: grow the "DFID " ACL buffer to size %d\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o)), - mdt->mdt_max_ea_size); + info->mti_big_aclsize); buf->lb_buf = info->mti_big_acl; buf->lb_len = info->mti_big_aclsize; - + info->mti_big_acl_used = 1; goto again; } - + /* FS has ACL bigger that our limits */ + CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n", + mdt_obd_name(mdt), PFID(mdt_object_fid(o)), + info->mti_big_aclsize); + rc = -E2BIG; + } else { CERROR("%s: unable to read "DFID" ACL: rc = %d\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc); } } else { - int client; - int server; - int acl_buflen; - int lmm_buflen = 0; - int lmmsize = 0; - - acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER); - if (acl_buflen >= rc) - goto map; - - /* If LOV/LMA EA is small, we can reuse part of their buffer */ - client = ptlrpc_req_get_repsize(pill->rc_req); - server = lustre_packed_msg_size(pill->rc_req->rq_repmsg); - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) { - lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD, - RCL_SERVER); - lmmsize = repbody->mbo_eadatasize; - } - - if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) { - CDEBUG(D_INODE, "%s: client prepared buffer size %d " - "is not big enough with the ACL size %d (%d)\n", - mdt_obd_name(mdt), client, rc, - server - acl_buflen - lmm_buflen + rc + lmmsize); - repbody->mbo_aclsize = 0; - repbody->mbo_valid &= ~OBD_MD_FLACL; - RETURN(-ERANGE); - } - -map: - if (buf->lb_buf == info->mti_big_acl) - info->mti_big_acl_used = 1; - rc = nodemap_map_acl(nodemap, buf->lb_buf, rc, NODEMAP_FS_TO_CLIENT); /* if all ACLs mapped out, rc is still >= 0 */ @@ -705,10 +822,11 @@ static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm) } void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, - const struct lu_attr *attr, const struct lu_fid *fid) + const struct lu_attr *attr, const struct lu_fid *fid) { - struct md_attr *ma = &info->mti_attr; + struct mdt_device *mdt = info->mti_mdt; struct obd_export *exp = info->mti_exp; + struct md_attr *ma = &info->mti_attr; struct lu_nodemap *nodemap = NULL; LASSERT(ma->ma_valid & MA_INODE); @@ -725,6 +843,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, b->mbo_ctime = attr->la_ctime; b->mbo_valid |= OBD_MD_FLCTIME; } + if (attr->la_valid & LA_BTIME) { + b->mbo_btime = attr->la_btime; + b->mbo_valid |= OBD_MD_FLBTIME; + } if (attr->la_valid & LA_FLAGS) { b->mbo_flags = attr->la_flags; b->mbo_valid |= OBD_MD_FLFLAGS; @@ -733,7 +855,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, b->mbo_nlink = attr->la_nlink; b->mbo_valid |= OBD_MD_FLNLINK; } - if (attr->la_valid & (LA_UID|LA_GID)) { + if (attr->la_valid & (LA_UID|LA_GID|LA_PROJID)) { nodemap = nodemap_get_from_exp(exp); if (IS_ERR(nodemap)) goto out; @@ -752,8 +874,9 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, } if (attr->la_valid & LA_PROJID) { - /* TODO, nodemap for project id */ - b->mbo_projid = attr->la_projid; + b->mbo_projid = nodemap_map_id(nodemap, NODEMAP_PROJID, + NODEMAP_FS_TO_CLIENT, + attr->la_projid); b->mbo_valid |= OBD_MD_FLPROJID; } @@ -785,7 +908,8 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, /* just ignore blocks occupied by extend attributes on MDS */ b->mbo_blocks = 0; /* if no object is allocated on osts, the size on mds is valid. - * b=22272 */ + * b=22272 + */ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) { if (mdt_hsm_is_released(ma->ma_lmm)) { @@ -798,12 +922,18 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, else b->mbo_blocks = 1; b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - } else if (info->mti_som_valid) { /* som is valid */ + } else if (info->mti_som_strict && mdt->mdt_enable_strict_som) { + /* use SOM for size*/ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */ + b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS; + b->mbo_size = ma->ma_som.ms_size; + b->mbo_blocks = ma->ma_som.ms_blocks; } } - if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE)) + if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE || + b->mbo_valid & OBD_MD_FLLAZYSIZE)) CDEBUG(D_VFSTRACE, DFID": returning size %llu\n", PFID(fid), (unsigned long long)b->mbo_size); @@ -821,27 +951,29 @@ static inline int mdt_body_has_lov(const struct lu_attr *la, void mdt_client_compatibility(struct mdt_thread_info *info) { - struct mdt_body *body; - struct ptlrpc_request *req = mdt_info_req(info); - struct obd_export *exp = req->rq_export; - struct md_attr *ma = &info->mti_attr; - struct lu_attr *la = &ma->ma_attr; - ENTRY; + struct mdt_body *body; + struct ptlrpc_request *req = mdt_info_req(info); + struct obd_export *exp = req->rq_export; + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + + ENTRY; if (exp_connect_layout(exp)) /* the client can deal with 16-bit lmm_stripe_count */ RETURN_EXIT; - body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - if (!mdt_body_has_lov(la, body)) - RETURN_EXIT; + if (!mdt_body_has_lov(la, body)) + RETURN_EXIT; - /* now we have a reply with a lov for a client not compatible with the - * layout lock so we have to clean the layout generation number */ - if (S_ISREG(la->la_mode)) - ma->ma_lmm->lmm_layout_gen = 0; - EXIT; + /* now we have a reply with a lov for a client not compatible with the + * layout lock so we have to clean the layout generation number + */ + if (S_ISREG(la->la_mode)) + ma->ma_lmm->lmm_layout_gen = 0; + EXIT; } static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info, @@ -882,6 +1014,7 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, { const struct lu_env *env = info->mti_env; int rc; + ENTRY; LASSERT(info->mti_big_lmm_used == 0); @@ -916,8 +1049,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, RETURN(rc); } -int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, - struct md_attr *ma, const char *name) +int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, + struct md_attr *ma, const char *name) { struct md_object *next = mdt_object_child(o); struct lu_buf *buf = &info->mti_buf; @@ -932,8 +1065,8 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, buf->lb_len = ma->ma_lmv_size; LASSERT(!(ma->ma_valid & MA_LMV)); } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - buf->lb_buf = ma->ma_lmv; - buf->lb_len = ma->ma_lmv_size; + buf->lb_buf = ma->ma_default_lmv; + buf->lb_len = ma->ma_default_lmv_size; LASSERT(!(ma->ma_valid & MA_LMV_DEF)); } else { return -EINVAL; @@ -941,6 +1074,13 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, LASSERT(buf->lb_buf); + if (!mdt_object_exists(o)) + return -ENOENT; + + if (mdt_object_remote(o) && S_ISDIR(lu_object_attr(&o->mot_obj))) + /* force reload layout for remote dir in case layout changed */ + mo_invalidate(info->mti_env, mdt_object_child(o)); + rc = mo_xattr_get(info->mti_env, next, buf, name); if (rc > 0) { @@ -955,10 +1095,9 @@ got: !(exp_connect_flags(info->mti_exp) & OBD_CONNECT_LFSCK)) { return -EIO; - } else { - ma->ma_lmm_size = rc; - ma->ma_valid |= MA_LOV; } + ma->ma_lmm_size = rc; + ma->ma_valid |= MA_LOV; } else if (strcmp(name, XATTR_NAME_LMV) == 0) { if (info->mti_big_lmm_used) ma->ma_lmv = info->mti_big_lmm; @@ -966,7 +1105,7 @@ got: ma->ma_lmv_size = rc; ma->ma_valid |= MA_LMV; } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - ma->ma_lmv_size = rc; + ma->ma_default_lmv_size = rc; ma->ma_valid |= MA_LMV_DEF; } @@ -980,7 +1119,8 @@ got: rc = 0; } else if (rc == -ERANGE) { /* Default LMV has fixed size, so it must be able to fit - * in the original buffer */ + * in the original buffer + */ if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) return rc; rc = mdt_big_xattr_get(info, o, name); @@ -993,6 +1133,40 @@ got: return rc; } +int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, + struct md_attr *ma, const char *name) +{ + int rc; + + if (!info->mti_big_lmm) { + OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE); + if (!info->mti_big_lmm) + return -ENOMEM; + info->mti_big_lmmsize = PAGE_SIZE; + } + + if (strcmp(name, XATTR_NAME_LOV) == 0) { + ma->ma_lmm = info->mti_big_lmm; + ma->ma_lmm_size = info->mti_big_lmmsize; + ma->ma_valid &= ~MA_LOV; + } else if (strcmp(name, XATTR_NAME_LMV) == 0) { + ma->ma_lmv = info->mti_big_lmm; + ma->ma_lmv_size = info->mti_big_lmmsize; + ma->ma_valid &= ~MA_LMV; + } else { + LBUG(); + } + + LASSERT(!info->mti_big_lmm_used); + rc = __mdt_stripe_get(info, o, ma, name); + /* since big_lmm is always used here, clear 'used' flag to avoid + * assertion in mdt_big_xattr_get(). + */ + info->mti_big_lmm_used = 0; + + return rc; +} + int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, struct lu_fid *pfid) { @@ -1000,6 +1174,7 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, struct link_ea_header *leh; struct link_ea_entry *lee; int rc; + ENTRY; buf->lb_buf = info->mti_big_lmm; @@ -1007,7 +1182,8 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf, XATTR_NAME_LINK); /* ignore errors, MA_PFID won't be set and it is - * up to the caller to treat this as an error */ + * up to the caller to treat this as an error + */ if (rc == -ERANGE || buf->lb_len == 0) { rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK); buf->lb_buf = info->mti_big_lmm; @@ -1040,6 +1216,51 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, RETURN(0); } +int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o, + struct lu_fid *pfid, struct lu_name *lname) +{ + struct lu_buf *buf = &info->mti_buf; + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen; + int rc; + + buf->lb_buf = info->mti_xattr_buf; + buf->lb_len = sizeof(info->mti_xattr_buf); + rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf, + XATTR_NAME_LINK); + if (rc == -ERANGE) { + rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK); + buf->lb_buf = info->mti_big_lmm; + buf->lb_len = info->mti_big_lmmsize; + } + if (rc < 0) + return rc; + + if (rc < sizeof(*leh)) { + CERROR("short LinkEA on "DFID": rc = %d\n", + PFID(mdt_object_fid(o)), rc); + return -ENODATA; + } + + leh = (struct link_ea_header *)buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + } + if (leh->leh_magic != LINK_EA_MAGIC) + return -EINVAL; + + if (leh->leh_reccount == 0) + return -ENODATA; + + linkea_entry_unpack(lee, &reclen, lname, pfid); + + return 0; +} + int mdt_attr_get_complex(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma) { @@ -1049,6 +1270,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, int need = ma->ma_need; int rc = 0, rc2; u32 mode; + ENTRY; ma->ma_valid = 0; @@ -1059,6 +1281,10 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, if (need & MA_INODE) { ma->ma_need = MA_INODE; + if (need & MA_DIRENT_CNT) + ma->ma_attr.la_valid |= LA_DIRENT_CNT; + else + ma->ma_attr.la_valid &= ~LA_DIRENT_CNT; rc = mo_attr_get(env, next, ma); if (rc) GOTO(out, rc); @@ -1077,26 +1303,24 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, } if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV); if (rc) GOTO(out, rc); } if (need & MA_LMV && S_ISDIR(mode)) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); if (rc != 0) GOTO(out, rc); } if (need & MA_LMV_DEF && S_ISDIR(mode)) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV); if (rc != 0) GOTO(out, rc); } - /* - * In the handle of MA_INODE, we may already get the SOM attr. - */ + /* In the handle of MA_INODE, we may already get the SOM attr. */ if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) { rc = mdt_get_som(info, o, ma); if (rc != 0) @@ -1106,8 +1330,8 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, if (need & MA_HSM && S_ISREG(mode)) { buf->lb_buf = info->mti_xattr_buf; buf->lb_len = sizeof(info->mti_xattr_buf); - CLASSERT(sizeof(struct hsm_attrs) <= - sizeof(info->mti_xattr_buf)); + BUILD_BUG_ON(sizeof(struct hsm_attrs) > + sizeof(info->mti_xattr_buf)); rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM); rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm); if (rc2 == 0) @@ -1116,7 +1340,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, GOTO(out, rc = rc2); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL if (need & MA_ACL_DEF && S_ISDIR(mode)) { buf->lb_buf = ma->ma_acl; buf->lb_len = ma->ma_acl_size; @@ -1138,23 +1362,40 @@ out: RETURN(rc); } +static void mdt_preset_encctx_size(struct mdt_thread_info *info) +{ + struct req_capsule *pill = info->mti_pill; + + ENTRY; + if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX, + RCL_SERVER)) + /* pre-set size in server part with max size */ + req_capsule_set_size(pill, &RMF_FILE_ENCCTX, + RCL_SERVER, + info->mti_mdt->mdt_max_mdsize); + EXIT; +} + static int mdt_getattr_internal(struct mdt_thread_info *info, - struct mdt_object *o, int ma_need) + struct mdt_object *o, int ma_need) { - struct md_object *next = mdt_object_child(o); - const struct mdt_body *reqbody = info->mti_body; - struct ptlrpc_request *req = mdt_info_req(info); - struct md_attr *ma = &info->mti_attr; - struct lu_attr *la = &ma->ma_attr; - struct req_capsule *pill = info->mti_pill; - const struct lu_env *env = info->mti_env; - struct mdt_body *repbody; - struct lu_buf *buffer = &info->mti_buf; - struct obd_export *exp = info->mti_exp; - int rc; + struct mdt_device *mdt = info->mti_mdt; + struct md_object *next = mdt_object_child(o); + const struct mdt_body *reqbody = info->mti_body; + struct ptlrpc_request *req = mdt_info_req(info); + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + struct req_capsule *pill = info->mti_pill; + const struct lu_env *env = info->mti_env; + struct mdt_body *repbody; + struct lu_buf *buffer = &info->mti_buf; + struct obd_export *exp = info->mti_exp; + ktime_t kstart = ktime_get(); + int rc; + ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) RETURN(err_serious(-ENOMEM)); repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); @@ -1162,8 +1403,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, ma->ma_valid = 0; if (mdt_object_remote(o)) { - /* This object is located on remote node.*/ - /* Return -ENOTSUPP for old client */ + /* obj is located on remote node Return -ENOTSUPP(old client) */ if (!mdt_is_dne_client(req->rq_export)) GOTO(out, rc = -ENOTSUPP); @@ -1187,44 +1427,81 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, req->rq_export->exp_client_uuid.uuid); } - /* If it is dir object and client require MEA, then we got MEA */ + /* from 2.12.58 intent_getattr pack default LMV in reply */ if (S_ISDIR(lu_object_attr(&next->mo_lu)) && - (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) { + ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) == + (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) && + req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD, + RCL_SERVER)) { + ma->ma_lmv = buffer->lb_buf; + ma->ma_lmv_size = buffer->lb_len; + ma->ma_default_lmv = req_capsule_server_get(pill, + &RMF_DEFAULT_MDT_MD); + ma->ma_default_lmv_size = req_capsule_get_size(pill, + &RMF_DEFAULT_MDT_MD, + RCL_SERVER); + ma->ma_need = MA_INODE; + if (ma->ma_lmv_size > 0) + ma->ma_need |= MA_LMV; + if (ma->ma_default_lmv_size > 0) + ma->ma_need |= MA_LMV_DEF; + } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) && + (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) { + /* If it is dir and client require MEA, then we got MEA */ /* Assumption: MDT_MD size is enough for lmv size. */ ma->ma_lmv = buffer->lb_buf; ma->ma_lmv_size = buffer->lb_len; ma->ma_need = MA_INODE; if (ma->ma_lmv_size > 0) { - if (reqbody->mbo_valid & OBD_MD_MEA) + if (reqbody->mbo_valid & OBD_MD_MEA) { ma->ma_need |= MA_LMV; - else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) + } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) { ma->ma_need |= MA_LMV_DEF; + ma->ma_default_lmv = buffer->lb_buf; + ma->ma_lmv = NULL; + ma->ma_default_lmv_size = buffer->lb_len; + ma->ma_lmv_size = 0; + } } } else { ma->ma_lmm = buffer->lb_buf; ma->ma_lmm_size = buffer->lb_len; ma->ma_need = MA_INODE | MA_HSM; - if (ma->ma_lmm_size > 0) + if (ma->ma_lmm_size > 0) { ma->ma_need |= MA_LOV; + /* Older clients may crash if they getattr overstriped + * files + */ + if (!exp_connect_overstriping(exp) && + mdt_lmm_is_overstriping(ma->ma_lmm)) + RETURN(-EOPNOTSUPP); + } } - if (S_ISDIR(lu_object_attr(&next->mo_lu)) && + if (S_ISDIR(lu_object_attr(&next->mo_lu)) && reqbody->mbo_valid & OBD_MD_FLDIREA && - lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) { - /* get default stripe info for this dir. */ - ma->ma_need |= MA_LOV_DEF; - } - ma->ma_need |= ma_need; + lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) { + /* get default stripe info for this dir. */ + ma->ma_need |= MA_LOV_DEF; + } + ma->ma_need |= ma_need; rc = mdt_attr_get_complex(info, o, ma); if (unlikely(rc)) { - CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR, - "%s: getattr error for "DFID": rc = %d\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(o)), rc); + CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR, + "%s: getattr error for "DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(o)), rc); RETURN(rc); } + /* return immutable attr on fscrypt metadata files + * if fscrypt admin is not permitted + */ + if (o->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD && + !mdt_ucred(info)->uc_rbac_fscrypt_admin) + la->la_flags |= LUSTRE_IMMUTABLE_FL; + /* if file is released, check if a restore is running */ if (ma->ma_valid & MA_HSM) { repbody->mbo_valid |= OBD_MD_TSTATE; @@ -1233,22 +1510,28 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->mbo_t_state = MS_RESTORE; } - if (likely(ma->ma_valid & MA_INODE)) - mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o)); - else - RETURN(-EFAULT); + if (unlikely(!(ma->ma_valid & MA_INODE))) + RETURN(-EFAULT); + + mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o)); - if (mdt_body_has_lov(la, reqbody)) { - if (ma->ma_valid & MA_LOV) { - LASSERT(ma->ma_lmm_size); + if (mdt_body_has_lov(la, reqbody)) { + u32 stripe_count = 1; + bool fixed_layout = false; + + if (ma->ma_valid & MA_LOV) { + LASSERT(ma->ma_lmm_size); repbody->mbo_eadatasize = ma->ma_lmm_size; if (S_ISDIR(la->la_mode)) repbody->mbo_valid |= OBD_MD_FLDIREA; else repbody->mbo_valid |= OBD_MD_FLEASIZE; mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid); - } + } if (ma->ma_valid & MA_LMV) { + struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1; + u32 magic = le32_to_cpu(lmv->lmv_magic); + /* Return -ENOTSUPP for old client */ if (!mdt_is_striped_client(req->rq_export)) RETURN(-ENOTSUPP); @@ -1257,22 +1540,49 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, mdt_dump_lmv(D_INFO, ma->ma_lmv); repbody->mbo_eadatasize = ma->ma_lmv_size; repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA); + + stripe_count = le32_to_cpu(lmv->lmv_stripe_count); + fixed_layout = lmv_is_fixed(lmv); + if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv)) + mdt_restripe_migrate_add(info, o); + else if (magic == LMV_MAGIC_V1 && + lmv_is_restriping(lmv)) + mdt_restripe_update_add(info, o); } if (ma->ma_valid & MA_LMV_DEF) { /* Return -ENOTSUPP for old client */ if (!mdt_is_striped_client(req->rq_export)) RETURN(-ENOTSUPP); LASSERT(S_ISDIR(la->la_mode)); - mdt_dump_lmv(D_INFO, ma->ma_lmv); - repbody->mbo_eadatasize = ma->ma_lmv_size; + /* + * when ll_dir_getstripe() gets default LMV, it + * checks mbo_eadatasize. + */ + if (!(ma->ma_valid & MA_LMV)) + repbody->mbo_eadatasize = + ma->ma_default_lmv_size; repbody->mbo_valid |= (OBD_MD_FLDIREA | OBD_MD_DEFAULT_MEA); } + CDEBUG(D_VFSTRACE, + "dirent count %llu stripe count %u MDT count %d\n", + ma->ma_attr.la_dirent_count, stripe_count, + atomic_read(&mdt->mdt_mds_mds_conns) + 1); + if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET && + ma->ma_attr.la_dirent_count > + mdt->mdt_restriper.mdr_dir_split_count && + !fid_is_root(mdt_object_fid(o)) && + mdt->mdt_enable_dir_auto_split && + !o->mot_restriping && + stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 && + !fixed_layout) + mdt_auto_split_add(info, o); } else if (S_ISLNK(la->la_mode) && reqbody->mbo_valid & OBD_MD_LINKNAME) { buffer->lb_buf = ma->ma_lmm; /* eadatasize from client includes NULL-terminator, so - * there is no need to read it */ + * there is no need to read it + */ buffer->lb_len = reqbody->mbo_eadatasize - 1; rc = mo_readlink(env, next, buffer); if (unlikely(rc <= 0)) { @@ -1283,15 +1593,16 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } else { int print_limit = min_t(int, PAGE_SIZE - 128, rc); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO)) rc -= 2; repbody->mbo_valid |= OBD_MD_LINKNAME; /* we need to report back size with NULL-terminator - * because client expects that */ + * because client expects that + */ repbody->mbo_eadatasize = rc + 1; if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize) - CDEBUG(D_INODE, "%s: Read shorter symlink %d " - "on "DFID ", expected %d\n", + CDEBUG(D_INODE, "%s: Read shorter symlink %d on " + DFID ", expected %d\n", mdt_obd_name(info->mti_mdt), rc, PFID(mdt_object_fid(o)), reqbody->mbo_eadatasize - 1); @@ -1300,13 +1611,14 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, /* If the total CDEBUG() size is larger than a page, it * will print a warning to the console, avoid this by - * printing just the last part of the symlink. */ + * printing just the last part of the symlink. + */ CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n", print_limit < rc ? "..." : "", print_limit, (char *)ma->ma_lmm + rc - print_limit, rc); rc = 0; - } - } + } + } if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) { repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize; @@ -1315,10 +1627,11 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->mbo_max_mdsize); } -#ifdef CONFIG_FS_POSIX_ACL +#ifdef CONFIG_LUSTRE_FS_POSIX_ACL if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) && (reqbody->mbo_valid & OBD_MD_FLACL)) { struct lu_nodemap *nodemap = nodemap_get_from_exp(exp); + if (IS_ERR(nodemap)) RETURN(PTR_ERR(nodemap)); @@ -1328,25 +1641,29 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, #endif out: - if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_GETATTR); + if (rc == 0) + mdt_counter_incr(req, LPROC_MDT_GETATTR, + ktime_us_delta(ktime_get(), kstart)); - RETURN(rc); + RETURN(rc); } static int mdt_getattr(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct mdt_object *obj = info->mti_object; - struct req_capsule *pill = info->mti_pill; - struct mdt_body *reqbody; - struct mdt_body *repbody; - int rc, rc2; - ENTRY; - - reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); - LASSERT(reqbody); - LASSERT(obj != NULL); + struct mdt_object *obj = info->mti_object; + struct req_capsule *pill = info->mti_pill; + struct mdt_body *reqbody; + struct mdt_body *repbody; + int rc, rc2; + + ENTRY; + + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); + + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); + LASSERT(reqbody); LASSERT(lu_object_assert_exists(&obj->mot_obj)); /* Special case for Data-on-MDT files to get data version */ @@ -1358,11 +1675,12 @@ static int mdt_getattr(struct tgt_session_info *tsi) /* Unlike intent case where we need to pre-fill out buffers early on * in intent policy for ldlm reasons, here we can have a much better * guess at EA size by just reading it from disk. - * Exceptions are readdir and (missing) directory striping */ - /* Readlink */ - if (reqbody->mbo_valid & OBD_MD_LINKNAME) { + * Exceptions are readdir and (missing) directory striping + */ + if (reqbody->mbo_valid & OBD_MD_LINKNAME) { /* Readlink */ /* No easy way to know how long is the symlink, but it cannot - * be more than PATH_MAX, so we allocate +1 */ + * be more than PATH_MAX, so we allocate +1 + */ rc = PATH_MAX + 1; /* A special case for fs ROOT: getattr there might fetch * default EA for entire fs, not just for this dir! @@ -1373,7 +1691,8 @@ static int mdt_getattr(struct tgt_session_info *tsi) (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) == MDS_GETATTR)) { /* Should the default strping be bigger, mdt_fix_reply - * will reallocate */ + * will reallocate + */ rc = DEF_REP_MD_SIZE; } else { /* Read the actual EA size from disk */ @@ -1387,16 +1706,18 @@ static int mdt_getattr(struct tgt_session_info *tsi) /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD * by default. If the target object has more ACL entries, then - * enlarge the buffer when necessary. */ + * enlarge the buffer when necessary. + */ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); + mdt_preset_encctx_size(info); rc = req_capsule_server_pack(pill); if (unlikely(rc != 0)) GOTO(out, rc = err_serious(rc)); - repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); - LASSERT(repbody != NULL); + repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); + LASSERT(repbody != NULL); repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; @@ -1406,8 +1727,18 @@ static int mdt_getattr(struct tgt_session_info *tsi) info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); + rc = mdt_init_ucred(info, reqbody); + if (rc) + GOTO(out_shrink, rc); + rc = mdt_getattr_internal(info, obj, 0); + if (unlikely(rc)) + GOTO(out_ucred, rc); + + rc = mdt_pack_encctx_in_reply(info, obj); EXIT; +out_ucred: + mdt_exit_ucred(info); out_shrink: mdt_client_compatibility(info); rc2 = mdt_fix_reply(info); @@ -1421,42 +1752,57 @@ out: /** * Handler of layout intent RPC requiring the layout modification * - * \param[in] info thread environment - * \param[in] obj object - * \param[in] layout layout change descriptor + * \param[in] info thread environment + * \param[in] obj object + * \param[out] lhc object ldlm lock handle + * \param[in] layout layout change descriptor * * \retval 0 on success * \retval < 0 error code */ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lhc, struct md_layout_change *layout) { - struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL]; int rc; + ENTRY; if (!mdt_object_exists(obj)) - GOTO(out, rc = -ENOENT); + RETURN(-ENOENT); if (!S_ISREG(lu_object_attr(&obj->mot_obj))) - GOTO(out, rc = -EINVAL); + RETURN(-EINVAL); rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL, MAY_WRITE); if (rc) - GOTO(out, rc); + RETURN(rc); - /* take layout lock to prepare layout change */ - mdt_lock_reg_init(lh, LCK_EX); - rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT); - if (rc) - GOTO(out, rc); + rc = mdt_check_resent_lock(info, obj, lhc); + if (rc < 0) + RETURN(rc); + + if (rc > 0) { + /* not resent */ + __u64 lockpart = MDS_INODELOCK_LAYOUT; + + /* take layout lock to prepare layout change */ + if (layout->mlc_opc == MD_LAYOUT_WRITE) + lockpart |= MDS_INODELOCK_UPDATE; + + rc = mdt_object_lock(info, obj, lhc, lockpart, LCK_EX); + if (rc) + RETURN(rc); + } mutex_lock(&obj->mot_som_mutex); rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout); mutex_unlock(&obj->mot_som_mutex); - mdt_object_unlock(info, obj, lh, 1); -out: + + if (rc) + mdt_object_unlock(info, obj, lhc, 1); + RETURN(rc); } @@ -1490,7 +1836,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) struct mdt_object *o1, *o2, *o; struct mdt_lock_handle *lh1, *lh2; struct mdc_swap_layouts *msl; - int rc; + int rc; + ENTRY; /* client does not support layout lock, so layout swaping @@ -1499,11 +1846,14 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) * layout lock yet. If those clients have already opened the file * they won't be notified at all so that old layout may still be * used to do IO. This can be fixed after file release is landed by - * doing exclusive open and taking full EX ibits lock. - Jinshan */ + * doing exclusive open and taking full EX ibits lock. - Jinshan + */ if (!exp_connect_layout(exp)) RETURN(-EOPNOTSUPP); info = tsi2mdt_info(tsi); + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); if (info->mti_dlm_req != NULL) ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); @@ -1525,7 +1875,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) swap(o1, o2); /* permission check. Make sure the calling process having permission - * to write both files. */ + * to write both files. + */ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL, MAY_WRITE); if (rc < 0) @@ -1541,22 +1892,19 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi) GOTO(put, rc = -EPROTO); lh1 = &info->mti_lh[MDT_LH_NEW]; - mdt_lock_reg_init(lh1, LCK_EX); lh2 = &info->mti_lh[MDT_LH_OLD]; - mdt_lock_reg_init(lh2, LCK_EX); - rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT | - MDS_INODELOCK_XATTR); + MDS_INODELOCK_XATTR, LCK_EX); if (rc < 0) GOTO(put, rc); rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT | - MDS_INODELOCK_XATTR); + MDS_INODELOCK_XATTR, LCK_EX); if (rc < 0) GOTO(unlock1, rc); rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1), - mdt_object_child(o2), msl->msl_flags); + mdt_object_child(o2), 0, 0, msl->msl_flags); if (rc < 0) GOTO(unlock2, rc); @@ -1575,29 +1923,143 @@ out: static int mdt_raw_lookup(struct mdt_thread_info *info, struct mdt_object *parent, - const struct lu_name *lname, - struct ldlm_reply *ldlm_rep) + const struct lu_name *lname) { - struct lu_fid *child_fid = &info->mti_tmp_fid1; - int rc; + struct lu_fid *fid = &info->mti_tmp_fid1; + struct mdt_body *repbody; + bool is_dotdot = false; + bool is_old_parent_stripe = false; + bool is_new_parent_checked = false; + int rc; + ENTRY; LASSERT(!info->mti_cross_ref); + /* Always allow to lookup ".." */ + if (lname->ln_namelen == 2 && + lname->ln_name[0] == '.' && lname->ln_name[1] == '.') { + info->mti_spec.sp_permitted = 1; + is_dotdot = true; + if (mdt_is_dir_stripe(info, parent) == 1) + is_old_parent_stripe = true; + } + mdt_object_get(info->mti_env, parent); +lookup: /* Only got the fid of this obj by name */ - fid_zero(child_fid); - rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object), - lname, child_fid, &info->mti_spec); - if (rc == 0) { - struct mdt_body *repbody; + fid_zero(fid); + rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid, + &info->mti_spec); + mdt_object_put(info->mti_env, parent); + if (rc) + RETURN(rc); + + /* getattr_name("..") should return master object FID for striped dir */ + if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) { + parent = mdt_object_find(info->mti_env, info->mti_mdt, fid); + if (IS_ERR(parent)) + RETURN(PTR_ERR(parent)); + + /* old client getattr_name("..") with stripe FID */ + if (unlikely(is_old_parent_stripe)) { + is_old_parent_stripe = false; + goto lookup; + } + + /* ".." may be a stripe */ + if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) { + is_new_parent_checked = true; + goto lookup; + } + + mdt_object_put(info->mti_env, parent); + } + + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + repbody->mbo_fid1 = *fid; + repbody->mbo_valid = OBD_MD_FLID; + + RETURN(rc); +} + +/** + * Find name matching hash + * + * We search \a child LinkEA for a name whose hash matches \a lname + * (it contains an encoded hash). + * + * \param info mdt thread info + * \param lname encoded hash to find + * \param parent parent object + * \param child object to search with LinkEA + * + * \retval 1 match found + * \retval 0 no match found + * \retval -ev negative errno upon error + */ +int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname, + struct mdt_object *parent, struct mdt_object *child) +{ + /* Here, lname is an encoded hash of on-disk name, and + * client is doing access without encryption key. + * So we need to get LinkEA, check parent fid is correct and + * compare name hash with the one in the request. + */ + struct lu_buf *buf = &info->mti_big_buf; + struct lu_name name; + struct lu_fid pfid; + struct linkea_data ldata = { NULL }; + struct link_ea_header *leh; + struct link_ea_entry *lee; + struct lu_buf link = { 0 }; + char *hash; + int reclen, count, rc; + + ENTRY; + if (lname->ln_namelen < LL_CRYPTO_BLOCK_SIZE) + RETURN(-EINVAL); + + buf = lu_buf_check_and_alloc(buf, PATH_MAX); + if (!buf->lb_buf) + RETURN(-ENOMEM); + + ldata.ld_buf = buf; + rc = mdt_links_read(info, child, &ldata); + if (rc < 0) + RETURN(rc); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - repbody->mbo_fid1 = *child_fid; - repbody->mbo_valid = OBD_MD_FLID; - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); - } else if (rc == -ENOENT) { - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); + hash = kmalloc(lname->ln_namelen, GFP_NOFS); + if (!hash) + RETURN(-ENOMEM); + rc = critical_decode(lname->ln_name, lname->ln_namelen, hash); + + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + for (count = 0; count < leh->leh_reccount; count++) { + linkea_entry_unpack(lee, &reclen, &name, &pfid); + if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) { + lu_buf_check_and_alloc(&link, name.ln_namelen); + if (!link.lb_buf) + GOTO(out_match, rc = -ENOMEM); + rc = critical_decode(name.ln_name, name.ln_namelen, + link.lb_buf); + + if (memcmp(LLCRYPT_EXTRACT_DIGEST(link.lb_buf, rc), + hash, LL_CRYPTO_BLOCK_SIZE) == 0) { + *lname = name; + break; + } + } + lee = (struct link_ea_entry *) ((char *)lee + reclen); } + if (count == leh->leh_reccount) + rc = 0; + else + rc = 1; + +out_match: + lu_buf_free(&link); + kfree(hash); RETURN(rc); } @@ -1609,18 +2071,20 @@ static int mdt_raw_lookup(struct mdt_thread_info *info, * (2)intent request will grant the lock to client. */ static int mdt_getattr_name_lock(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc, - __u64 child_bits, - struct ldlm_reply *ldlm_rep) + struct mdt_lock_handle *lhc, + __u64 child_bits, + struct ldlm_reply *ldlm_rep) { - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_body *reqbody = NULL; - struct mdt_object *parent = info->mti_object; - struct mdt_object *child; - struct lu_fid *child_fid = &info->mti_tmp_fid1; - struct lu_name *lname = NULL; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_body *reqbody = NULL; + struct mdt_object *parent = info->mti_object; + struct mdt_object *child = NULL; + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct lu_name *lname = NULL; struct mdt_lock_handle *lhp = NULL; - struct ldlm_lock *lock; + struct ldlm_lock *lock; + struct req_capsule *pill = info->mti_pill; + bool fscrypt_md = false; __u64 try_bits = 0; bool is_resent; int ma_need = 0; @@ -1635,31 +2099,33 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (parent == NULL) RETURN(-ENOENT); + if (info->mti_mdt->mdt_enable_dir_auto_split) + ma_need |= MA_DIRENT_CNT; + if (info->mti_cross_ref) { /* Only getattr on the child. Parent is on another node. */ mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD | DISP_LOOKUP_POS); child = parent; - CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", " - "ldlm_rep = %p\n", + CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", ldlm_rep = %p\n", PFID(mdt_object_fid(child)), ldlm_rep); rc = mdt_check_resent_lock(info, child, lhc); if (rc < 0) { RETURN(rc); } else if (rc > 0) { - mdt_lock_handle_init(lhc); - mdt_lock_reg_init(lhc, LCK_PR); - /* - * Object's name is on another MDS, no lookup or layout - * lock is needed here but update lock is. + * Object's name entry is on another MDS, it will + * request PERM lock only because LOOKUP lock is owned + * by the MDS where name entry resides. + * + * TODO: it should try layout lock too. - Jinshan */ child_bits &= ~(MDS_INODELOCK_LOOKUP | MDS_INODELOCK_LAYOUT); - child_bits |= MDS_INODELOCK_PERM | MDS_INODELOCK_UPDATE; - - rc = mdt_object_lock(info, child, lhc, child_bits); + child_bits |= MDS_INODELOCK_PERM; + rc = mdt_object_lock(info, child, lhc, child_bits, + LCK_PR); if (rc < 0) RETURN(rc); } @@ -1673,61 +2139,138 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, RETURN(-ENOENT); } - rc = mdt_getattr_internal(info, child, 0); - if (unlikely(rc != 0)) + rc = mdt_getattr_internal(info, child, ma_need); + if (unlikely(rc != 0)) { mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); + } + + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) { + mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); + } - RETURN(rc); - } + rc = mdt_pack_encctx_in_reply(info, child); + if (unlikely(rc)) + mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); + } lname = &info->mti_name; - mdt_name_unpack(info->mti_pill, &RMF_NAME, lname, MNF_FIX_ANON); + mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON); - if (lu_name_is_valid(lname)) { - CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", " - "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), + if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) { + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); + if (unlikely(reqbody == NULL)) + RETURN(err_serious(-EPROTO)); + + *child_fid = reqbody->mbo_fid2; + if (unlikely(!fid_is_sane(child_fid))) + RETURN(err_serious(-EINVAL)); + + if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { + mdt_object_get(info->mti_env, parent); + child = parent; + } else { + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } + + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", ldlm_rep = %p\n", + PFID(mdt_object_fid(parent)), + PFID(&reqbody->mbo_fid2), ldlm_rep); + } else if (lu_name_is_valid(lname)) { + if (mdt_object_remote(parent)) { + CERROR("%s: parent "DFID" is on remote target\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(parent))); + RETURN(-EPROTO); + } + + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", ldlm_rep = %p\n", + PFID(mdt_object_fid(parent)), PNAME(lname), ldlm_rep); + + if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD || + (fid_is_root(mdt_object_fid(parent)) && + lname->ln_namelen == strlen(dot_fscrypt_name) && + strncmp(lname->ln_name, dot_fscrypt_name, + lname->ln_namelen) == 0)) + fscrypt_md = true; } else { - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); if (unlikely(reqbody == NULL)) RETURN(err_serious(-EPROTO)); *child_fid = reqbody->mbo_fid2; - if (unlikely(!fid_is_sane(child_fid))) RETURN(err_serious(-EINVAL)); - CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " - "ldlm_rep = %p\n", + if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { + mdt_object_get(info->mti_env, parent); + child = parent; + } else { + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } + + if (mdt_object_remote(child)) { + CERROR("%s: child "DFID" is on remote target\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(child))); + GOTO(out_child, rc = -EPROTO); + } + + /* don't fetch LOOKUP lock if it's remote object */ + rc = mdt_is_remote_object(info, parent, child); + if (rc < 0) + GOTO(out_child, rc); + if (rc) + child_bits &= ~MDS_INODELOCK_LOOKUP; + + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), PFID(&reqbody->mbo_fid2), ldlm_rep); } mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); - if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) { + if (unlikely(!mdt_object_exists(parent)) && + !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) && + lu_name_is_valid(lname)) { LU_OBJECT_DEBUG(D_INODE, info->mti_env, &parent->mot_obj, "Parent doesn't exist!"); - RETURN(-ESTALE); + GOTO(out_child, rc = -ESTALE); } - if (mdt_object_remote(parent)) { - CERROR("%s: parent "DFID" is on remote target\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(parent))); - RETURN(-EIO); - } - - if (lu_name_is_valid(lname)) { - /* Always allow to lookup ".." */ - if (unlikely(lname->ln_namelen == 2 && - lname->ln_name[0] == '.' && - lname->ln_name[1] == '.')) - info->mti_spec.sp_permitted = 1; - + if (!child && is_resent) { + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + if (lock == NULL) { + /* Lock is pinned by ldlm_handle_enqueue0() as it is + * a resend case, however, it could be already destroyed + * due to client eviction or a raced cancel RPC. + */ + LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx", + lhc->mlh_reg_lh.cookie); + RETURN(-ESTALE); + } + fid_extract_from_res_name(child_fid, + &lock->l_resource->lr_name); + LDLM_LOCK_PUT(lock); + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } else if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) && + lu_name_is_valid(lname)) { if (info->mti_body->mbo_valid == OBD_MD_FLID) { - rc = mdt_raw_lookup(info, parent, lname, ldlm_rep); + rc = mdt_raw_lookup(info, parent, lname); RETURN(rc); } @@ -1735,47 +2278,46 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, /* step 1: lock parent only if parent is a directory */ if (S_ISDIR(lu_object_attr(&parent->mot_obj))) { lhp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lhp, LCK_PR, lname); - rc = mdt_object_lock(info, parent, lhp, - MDS_INODELOCK_UPDATE); + rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PR); if (unlikely(rc != 0)) RETURN(rc); } - /* step 2: lookup child's fid by name */ - fid_zero(child_fid); + /* step 2: lookup child's fid by name */ + fid_zero(child_fid); rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, child_fid, &info->mti_spec); if (rc == -ENOENT) mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); if (rc != 0) - GOTO(out_parent, rc); - } - - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + GOTO(unlock_parent, rc); - /* - *step 3: find the child object by fid & lock it. - * regardless if it is local or remote. - * - *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e) - * set parent dir fid the same as child fid in getattr by fid case - * we should not lu_object_find() the object again, could lead - * to hung if there is a concurrent unlink destroyed the object. - */ - if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { - mdt_object_get(info->mti_env, parent); - child = parent; - } else { child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); + if (unlikely(IS_ERR(child))) + GOTO(unlock_parent, rc = PTR_ERR(child)); } - if (unlikely(IS_ERR(child))) - GOTO(out_parent, rc = PTR_ERR(child)); + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + + /* step 3: lock child regardless if it is local or remote. */ + LASSERT(child); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); + if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) { + /* Here, lname is an encoded hash of on-disk name, and + * client is doing access without encryption key. + * So we need to compare name hash with the one in the request. + */ + if (!find_name_matching_hash(info, lname, parent, + child)) { + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); + mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + GOTO(out_child, rc = -ENOENT); + } + } + + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); if (!mdt_object_exists(child)) { LU_OBJECT_DEBUG(D_INODE, info->mti_env, &child->mot_obj, @@ -1787,11 +2329,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (rc < 0) { GOTO(out_child, rc); } else if (rc > 0) { - mdt_lock_handle_init(lhc); - mdt_lock_reg_init(lhc, LCK_PR); - if (!(child_bits & MDS_INODELOCK_UPDATE) && - mdt_object_exists(child) && !mdt_object_remote(child)) { + !mdt_object_remote(child)) { struct md_attr *ma = &info->mti_attr; ma->ma_valid = 0; @@ -1804,20 +2343,22 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, * return not only a LOOKUP lock, but also an UPDATE * lock and this might save us RPC on later STAT. For * directories, it also let negative dentry cache start - * working for this dir. */ - if (ma->ma_valid & MA_INODE && - ma->ma_attr.la_valid & LA_CTIME && - info->mti_mdt->mdt_namespace->ns_ctime_age_limit + - ma->ma_attr.la_ctime < ktime_get_real_seconds()) - child_bits |= MDS_INODELOCK_UPDATE; - } + * working for this dir. + */ + if (ma->ma_valid & MA_INODE && + ma->ma_attr.la_valid & LA_CTIME && + info->mti_mdt->mdt_namespace->ns_ctime_age_limit + + ma->ma_attr.la_ctime < ktime_get_real_seconds()) + child_bits |= MDS_INODELOCK_UPDATE; + } /* layout lock must be granted in a best-effort way - * for IT operations */ + * for IT operations + */ LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT)); if (S_ISREG(lu_object_attr(&child->mot_obj)) && !mdt_object_remote(child) && ldlm_rep != NULL) { - if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) && + if (!CFS_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) && exp_connect_layout(info->mti_exp)) { /* try to grant layout lock for regular file. */ try_bits = MDS_INODELOCK_LAYOUT; @@ -1827,31 +2368,85 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, try_bits |= MDS_INODELOCK_DOM; } + /* + * To avoid possible deadlock between batched statahead RPC + * and rename()/migrate() operation, it should use trylock to + * obtain the DLM PR ibits lock for file attributes in a + * batched statahead RPC. A failed trylock means that other + * users maybe modify the directory simultaneously as in current + * Lustre design the server only grants read lock to a client. + * + * When a trylock failed, the MDT reports the conflict with + * error code -EBUSY, and stops statahead immediately. + */ + if (info->mti_batch_env) { + /* + * This is a sub stat-ahead request in a batched RPC. + * However, the @child is a remote object, we just + * return -EREMOTE here to forbid stat-ahead on it. + */ + if (mdt_object_remote(child)) + GOTO(out_child, rc = -EREMOTE); + + try_bits |= child_bits; + child_bits = 0; + } + if (try_bits != 0) { /* try layout lock, it may fail to be granted due to - * contention at LOOKUP or UPDATE */ + * contention at LOOKUP or UPDATE + */ rc = mdt_object_lock_try(info, child, lhc, &child_bits, - try_bits, false); + try_bits, LCK_PR); if (child_bits & MDS_INODELOCK_LAYOUT) ma_need |= MA_LOV; } else { /* Do not enqueue the UPDATE lock from MDT(cross-MDT), - * client will enqueue the lock to the remote MDT */ + * client will enqueue the lock to the remote MDT + */ if (mdt_object_remote(child)) - child_bits &= ~MDS_INODELOCK_UPDATE; - rc = mdt_object_lock(info, child, lhc, child_bits); + rc = mdt_object_lookup_lock(info, NULL, child, + lhc, LCK_PR); + else + rc = mdt_object_lock(info, child, lhc, + child_bits, LCK_PR); } - if (unlikely(rc != 0)) - GOTO(out_child, rc); - } + if (unlikely(rc != 0)) + GOTO(out_child, rc); + if (info->mti_batch_env && child_bits == 0) { + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc = -EBUSY); + } + } + + if (fscrypt_md) + child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD; + + /* finally, we can get attr for child. */ + rc = mdt_getattr_internal(info, child, ma_need); + if (unlikely(rc != 0)) { + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc); + } + + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) { + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc); + } - lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + rc = mdt_pack_encctx_in_reply(info, child); + if (unlikely(rc)) { + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc); + } - /* finally, we can get attr for child. */ - rc = mdt_getattr_internal(info, child, ma_need); - if (unlikely(rc != 0)) { - mdt_object_unlock(info, child, lhc, 1); - } else if (lock) { + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + if (lock) { /* Debugging code. */ LDLM_DEBUG(lock, "Returning lock to client"); LASSERTF(fid_res_name_eq(mdt_object_fid(child), @@ -1860,81 +2455,374 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, PLDLMRES(lock->l_resource), PFID(mdt_object_fid(child))); - if (S_ISREG(lu_object_attr(&child->mot_obj)) && - mdt_object_exists(child) && !mdt_object_remote(child) && - child != parent) { - LDLM_LOCK_PUT(lock); + if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) { + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) + CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND, + req->rq_deadline - + req->rq_arrival_time.tv_sec + + cfs_fail_val ?: 3); + /* Put the lock to the waiting list and force the cancel */ + ldlm_set_ast_sent(lock); + } + + /* + * check whether the object is remote as we can't + * really check attributes w/o explicit check for + * object's existence first. + */ + if (!mdt_object_remote(child) && child != parent && + S_ISREG(lu_object_attr(&child->mot_obj))) { mdt_object_put(info->mti_env, child); - /* NB: call the mdt_pack_size2body always after - * mdt_object_put(), that is why this special - * exit path is used. */ rc = mdt_pack_size2body(info, child_fid, &lhc->mlh_reg_lh); if (rc != 0 && child_bits & MDS_INODELOCK_DOM) { /* DOM lock was taken in advance but this is - * not DoM file. Drop the lock. */ + * not DoM file. Drop the lock. + */ lock_res_and_lock(lock); ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM); unlock_res_and_lock(lock); } + LDLM_LOCK_PUT(lock); + GOTO(unlock_parent, rc = 0); + } + LDLM_LOCK_PUT(lock); + } + + EXIT; +out_child: + if (child) + mdt_object_put(info->mti_env, child); +unlock_parent: + if (lhp) + mdt_object_unlock(info, parent, lhp, 1); + if (rc == -ENOENT) { + /* return -ENOKEY instead of -ENOENT to encryption-unaware + * client if trying to access an encrypted file + */ + int rc2 = mdt_check_enc(info, parent); + + if (rc2) + rc = rc2; + } + return rc; +} + +/* normal handler: should release the child lock */ +static int mdt_getattr_name(struct tgt_session_info *tsi) +{ + struct mdt_thread_info *info = tsi2mdt_info(tsi); + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD]; + struct mdt_body *reqbody; + struct mdt_body *repbody; + int rc, rc2; + + ENTRY; + + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(reqbody != NULL); + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(repbody != NULL); + + info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); + repbody->mbo_eadatasize = 0; + repbody->mbo_aclsize = 0; + + rc = mdt_init_ucred(info, reqbody); + if (unlikely(rc)) + GOTO(out_shrink, rc); + + rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL); + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { + ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode); + lhc->mlh_reg_lh.cookie = 0; + } + mdt_exit_ucred(info); + EXIT; +out_shrink: + mdt_client_compatibility(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; + mdt_thread_info_fini(info); + return rc; +} + +static int mdt_rmfid_unlink(struct mdt_thread_info *info, + const struct lu_fid *pfid, + const struct lu_name *name, + struct mdt_object *obj, s64 ctime) +{ + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct ldlm_enqueue_info *einfo = &info->mti_einfo; + struct mdt_device *mdt = info->mti_mdt; + struct md_attr *ma = &info->mti_attr; + struct mdt_lock_handle *parent_lh; + struct mdt_lock_handle *child_lh; + struct mdt_object *pobj; + int rc; + + ENTRY; + + pobj = mdt_object_find(info->mti_env, mdt, pfid); + if (IS_ERR(pobj)) + GOTO(out, rc = PTR_ERR(pobj)); + + parent_lh = &info->mti_lh[MDT_LH_PARENT]; + rc = mdt_parent_lock(info, pobj, parent_lh, name, LCK_PW); + if (rc != 0) + GOTO(put_parent, rc); + + rc = mdo_lookup(info->mti_env, mdt_object_child(pobj), + name, child_fid, &info->mti_spec); + if (rc != 0) + GOTO(unlock_parent, rc); + + if (!lu_fid_eq(child_fid, mdt_object_fid(obj))) + GOTO(unlock_parent, rc = -EREMCHG); + + child_lh = &info->mti_lh[MDT_LH_CHILD]; + rc = mdt_object_stripes_lock(info, pobj, obj, child_lh, einfo, + MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE, LCK_EX); + if (rc != 0) + GOTO(unlock_parent, rc); + + if (atomic_read(&obj->mot_open_count)) { + CDEBUG(D_OTHER, "object "DFID" open, skip\n", + PFID(mdt_object_fid(obj))); + GOTO(unlock_child, rc = -EBUSY); + } + + ma->ma_need = 0; + ma->ma_valid = MA_INODE; + ma->ma_attr.la_valid = LA_CTIME; + ma->ma_attr.la_ctime = ctime; + + mutex_lock(&obj->mot_lov_mutex); + + rc = mdo_unlink(info->mti_env, mdt_object_child(pobj), + mdt_object_child(obj), name, ma, 0); + + mutex_unlock(&obj->mot_lov_mutex); + +unlock_child: + mdt_object_stripes_unlock(info, obj, child_lh, einfo, 1); +unlock_parent: + mdt_object_unlock(info, pobj, parent_lh, 1); +put_parent: + mdt_object_put(info->mti_env, pobj); +out: + RETURN(rc); +} + +static int mdt_rmfid_check_permission(struct mdt_thread_info *info, + struct mdt_object *obj) +{ + struct lu_ucred *uc = lu_ucred(info->mti_env); + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + int rc = 0; + + ENTRY; + + ma->ma_need = MA_INODE; + rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma); + if (rc) + GOTO(out, rc); + + if (la->la_flags & LUSTRE_IMMUTABLE_FL) + rc = -EACCES; + + /* we want rbac roles to have precedence over any other + * permission or capability checks + */ + if (!uc->uc_rbac_byfid_ops) + RETURN(-EPERM); + if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE)) + RETURN(0); + if (uc->uc_fsuid == la->la_uid) { + if ((la->la_mode & 0200) == 0) + rc = -EACCES; + } else if (uc->uc_fsgid == la->la_gid) { + if ((la->la_mode & 0020) == 0) + rc = -EACCES; + } else if ((la->la_mode & 0002) == 0) { + rc = -EACCES; + } + +out: + RETURN(rc); +} + +static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid, + s64 ctime) +{ + struct mdt_device *mdt = info->mti_mdt; + struct mdt_object *obj = NULL; + struct linkea_data ldata = { NULL }; + struct lu_buf *buf = &info->mti_big_buf; + struct lu_name *name = &info->mti_name; + struct lu_fid *pfid = &info->mti_tmp_fid1; + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen, count, rc = 0; + + ENTRY; + + if (!fid_is_sane(fid)) + GOTO(out, rc = -EINVAL); + + if (!fid_is_namespace_visible(fid)) + GOTO(out, rc = -EINVAL); + + obj = mdt_object_find(info->mti_env, mdt, fid); + if (IS_ERR(obj)) + GOTO(out, rc = PTR_ERR(obj)); + + if (mdt_object_remote(obj)) + GOTO(out, rc = -EREMOTE); + if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header)) + GOTO(out, rc = -ENOENT); + + rc = mdt_rmfid_check_permission(info, obj); + if (rc) + GOTO(out, rc); + + /* take LinkEA */ + buf = lu_buf_check_and_alloc(buf, PATH_MAX); + if (!buf->lb_buf) + GOTO(out, rc = -ENOMEM); + + ldata.ld_buf = buf; + rc = mdt_links_read(info, obj, &ldata); + if (rc) + GOTO(out, rc); - GOTO(out_parent, rc = 0); - } + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + for (count = 0; count < leh->leh_reccount; count++) { + /* remove every hardlink */ + linkea_entry_unpack(lee, &reclen, name, pfid); + lee = (struct link_ea_entry *) ((char *)lee + reclen); + rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime); + if (rc) + break; } - if (lock) - LDLM_LOCK_PUT(lock); - EXIT; -out_child: - mdt_object_put(info->mti_env, child); -out_parent: - if (lhp) - mdt_object_unlock(info, parent, lhp, 1); - return rc; +out: + if (obj && !IS_ERR(obj)) + mdt_object_put(info->mti_env, obj); + if (info->mti_big_buf.lb_buf) + lu_buf_free(&info->mti_big_buf); + + RETURN(rc); } -/* normal handler: should release the child lock */ -static int mdt_getattr_name(struct tgt_session_info *tsi) +static int mdt_rmfid(struct tgt_session_info *tsi) { - struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD]; - struct mdt_body *reqbody; - struct mdt_body *repbody; - int rc, rc2; - ENTRY; + struct mdt_thread_info *mti = tsi2mdt_info(tsi); + struct mdt_body *reqbody; + struct lu_fid *fids, *rfids; + int bufsize, rc; + __u32 *rcs; + int i, nr; - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(reqbody != NULL); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(repbody != NULL); + ENTRY; - info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); - repbody->mbo_eadatasize = 0; - repbody->mbo_aclsize = 0; + reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY); + if (reqbody == NULL) + RETURN(-EPROTO); + bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY, + RCL_CLIENT); + nr = bufsize / sizeof(struct lu_fid); + if (nr * sizeof(struct lu_fid) != bufsize) + RETURN(-EINVAL); + req_capsule_set_size(tsi->tsi_pill, &RMF_RCS, + RCL_SERVER, nr * sizeof(__u32)); + req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY, + RCL_SERVER, nr * sizeof(struct lu_fid)); + rc = req_capsule_server_pack(tsi->tsi_pill); + if (rc) + GOTO(out, rc = err_serious(rc)); + fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY); + if (fids == NULL) + RETURN(-EPROTO); + rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS); + LASSERT(rcs); + rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY); + LASSERT(rfids); - rc = mdt_init_ucred_intent_getattr(info, reqbody); - if (unlikely(rc)) - GOTO(out_shrink, rc); - - rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL); - if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { - ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode); - lhc->mlh_reg_lh.cookie = 0; - } - mdt_exit_ucred(info); - EXIT; -out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; - mdt_thread_info_fini(info); - return rc; + mdt_init_ucred(mti, reqbody); + for (i = 0; i < nr; i++) { + rfids[i] = fids[i]; + rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime); + } + mdt_exit_ucred(mti); + +out: + RETURN(rc); } static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg); +int mdt_io_set_info(struct tgt_session_info *tsi) +{ + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct ost_body *body = NULL, *repbody; + void *key, *val = NULL; + int keylen, vallen, rc = 0; + bool is_grant_shrink; + + ENTRY; + + key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY); + if (key == NULL) { + DEBUG_REQ(D_HA, req, "no set_info key"); + RETURN(err_serious(-EFAULT)); + } + keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY, + RCL_CLIENT); + + val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL); + if (val == NULL) { + DEBUG_REQ(D_HA, req, "no set_info val"); + RETURN(err_serious(-EFAULT)); + } + vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL, + RCL_CLIENT); + + is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK); + if (is_grant_shrink) + /* In this case the value is actually an RMF_OST_BODY, so we + * transmutate the type of this PTLRPC + */ + req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO); + + rc = req_capsule_server_pack(tsi->tsi_pill); + if (rc < 0) + RETURN(rc); + + if (is_grant_shrink) { + body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY); + + repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY); + *repbody = *body; + + /** handle grant shrink, similar to a read request */ + tgt_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp, + &repbody->oa); + } else { + CERROR("%s: Unsupported key %s\n", + tgt_name(tsi->tsi_tgt), (char *)key); + rc = -EOPNOTSUPP; + } + + RETURN(rc); +} + + static int mdt_set_info(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); @@ -1964,6 +2852,22 @@ static int mdt_set_info(struct tgt_session_info *tsi) /* Swab any part of val you need to here */ if (KEY_IS(KEY_READ_ONLY)) { + /* If client wants rw, make sure nodemap does not enforce ro. */ + if (!*(__u32 *)val) { + struct lu_nodemap *nm = NULL; + bool readonly = false; + + if (req->rq_export) + nm = nodemap_get_from_exp(req->rq_export); + + if (!IS_ERR_OR_NULL(nm)) { + readonly = nm->nmf_readonly_mount; + nodemap_putref(nm); + } + + if (unlikely(readonly)) + RETURN(-EROFS); + } spin_lock(&req->rq_export->exp_lock); if (*(__u32 *)val) *exp_connect_flags_ptr(req->rq_export) |= @@ -1980,11 +2884,13 @@ static int mdt_set_info(struct tgt_session_info *tsi) tgt_name(tsi->tsi_tgt), vallen); RETURN(-EINVAL); } - if (ptlrpc_req_need_swab(req)) { + if (req_capsule_req_need_swab(&req->rq_pill)) { __swab64s(&cs->cs_recno); __swab32s(&cs->cs_id); } + if (!mdt_changelog_allow(tsi2mdt_info(tsi))) + RETURN(-EACCES); rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export, vallen, val, NULL); } else if (KEY_IS(KEY_EVICT_BY_NID)) { @@ -2008,18 +2914,18 @@ static int mdt_readpage(struct tgt_session_info *tsi) ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) RETURN(err_serious(-ENOMEM)); repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY); if (repbody == NULL || reqbody == NULL) - RETURN(err_serious(-EFAULT)); + RETURN(err_serious(-EFAULT)); - /* - * prepare @rdpg before calling lower layers and transfer itself. Here - * reqbody->size contains offset of where to start to read and - * reqbody->nlink contains number bytes to read. - */ + /* + * prepare @rdpg before calling lower layers and transfer itself. Here + * reqbody->size contains offset of where to start to read and + * reqbody->nlink contains number bytes to read. + */ rdpg->rp_hash = reqbody->mbo_size; if (rdpg->rp_hash != reqbody->mbo_size) { CERROR("Invalid hash: %#llx != %#llx\n", @@ -2034,17 +2940,17 @@ static int mdt_readpage(struct tgt_session_info *tsi) exp_max_brw_size(tsi->tsi_exp)); rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT; - OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); - if (rdpg->rp_pages == NULL) - RETURN(-ENOMEM); + OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages); + if (rdpg->rp_pages == NULL) + RETURN(-ENOMEM); - for (i = 0; i < rdpg->rp_npages; ++i) { + for (i = 0; i < rdpg->rp_npages; ++i) { rdpg->rp_pages[i] = alloc_page(GFP_NOFS); - if (rdpg->rp_pages[i] == NULL) - GOTO(free_rdpg, rc = -ENOMEM); - } + if (rdpg->rp_pages[i] == NULL) + GOTO(free_rdpg, rc = -ENOMEM); + } - /* call lower layers to fill allocated pages with directory data */ + /* call lower layers to fill allocated pages with directory data */ rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg); if (rc < 0) GOTO(free_rdpg, rc); @@ -2058,9 +2964,9 @@ free_rdpg: for (i = 0; i < rdpg->rp_npages; i++) if (rdpg->rp_pages[i] != NULL) __free_page(rdpg->rp_pages[i]); - OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); + OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) RETURN(0); return rc; @@ -2078,7 +2984,8 @@ static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op) if ((attr->la_valid & LA_UID) && (attr->la_uid != -1)) attr->la_uid = uc->uc_fsuid; /* for S_ISGID, inherit gid from his parent, such work will be - * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */ + * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. + */ if ((attr->la_valid & LA_GID) && (attr->la_gid != -1)) attr->la_gid = uc->uc_fsgid; } @@ -2086,9 +2993,37 @@ static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op) return 0; } +static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op) +{ + return op == REINT_OPEN && + !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT)); +} + +static void mdt_preset_secctx_size(struct mdt_thread_info *info) +{ + struct req_capsule *pill = info->mti_pill; + + if (req_capsule_has_field(pill, &RMF_FILE_SECCTX, + RCL_SERVER) && + req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT)) { + if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME, + RCL_CLIENT) != 0) + /* pre-set size in server part with max size */ + req_capsule_set_size(pill, &RMF_FILE_SECCTX, + RCL_SERVER, + req_capsule_ptlreq(pill) ? + OBD_MAX_DEFAULT_EA_SIZE : + MAX_MD_SIZE_OLD); + else + req_capsule_set_size(pill, &RMF_FILE_SECCTX, + RCL_SERVER, 0); + } +} + static int mdt_reint_internal(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc, - __u32 op) + struct mdt_lock_handle *lhc, + __u32 op) { struct req_capsule *pill = info->mti_pill; struct mdt_body *repbody; @@ -2096,68 +3031,79 @@ static int mdt_reint_internal(struct mdt_thread_info *info, ENTRY; - rc = mdt_reint_unpack(info, op); - if (rc != 0) { - CERROR("Can't unpack reint, rc %d\n", rc); - RETURN(err_serious(rc)); - } + rc = mdt_reint_unpack(info, op); + if (rc != 0) { + CERROR("Can't unpack reint, rc %d\n", rc); + RETURN(err_serious(rc)); + } + + + /* check if the file system is set to readonly. O_RDONLY open + * is still allowed even the file system is set to readonly mode + */ + if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op)) + RETURN(err_serious(-EROFS)); - /* for replay (no_create) lmm is not needed, client has it already */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, + /* for replay (no_create) lmm is not needed, client has it already */ + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, DEF_REP_MD_SIZE); /* llog cookies are always 0, the field is kept for compatibility */ - if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) + if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD * by default. If the target object has more ACL entries, then - * enlarge the buffer when necessary. */ + * enlarge the buffer when necessary. + */ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER)) req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); - rc = req_capsule_server_pack(pill); - if (rc != 0) { - CERROR("Can't pack response, rc %d\n", rc); - RETURN(err_serious(rc)); - } + mdt_preset_secctx_size(info); + mdt_preset_encctx_size(info); + + rc = req_capsule_server_pack(pill); + if (rc != 0) { + CERROR("Can't pack response, rc %d\n", rc); + RETURN(err_serious(rc)); + } - if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) { - repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); - LASSERT(repbody); + if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) { + repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); + LASSERT(repbody); repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - } + } - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); - /* for replay no cookkie / lmm need, because client have this already */ - if (info->mti_spec.no_create) - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0); + /* for replay no cookkie / lmm need, because client have this already */ + if (info->mti_spec.no_create) + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0); - rc = mdt_init_ucred_reint(info); - if (rc) - GOTO(out_shrink, rc); + rc = mdt_init_ucred_reint(info); + if (rc) + GOTO(out_shrink, rc); - rc = mdt_fix_attr_ucred(info, op); - if (rc != 0) - GOTO(out_ucred, rc = err_serious(rc)); + rc = mdt_fix_attr_ucred(info, op); + if (rc != 0) + GOTO(out_ucred, rc = err_serious(rc)); rc = mdt_check_resent(info, mdt_reconstruct, lhc); if (rc < 0) { GOTO(out_ucred, rc); } else if (rc == 1) { - DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); + DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt"); rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); - GOTO(out_ucred, rc); - } - rc = mdt_reint_rec(info, lhc); - EXIT; + GOTO(out_ucred, rc); + } + rc = mdt_reint_rec(info, lhc); + EXIT; out_ucred: - mdt_exit_ucred(info); + mdt_exit_ucred(info); out_shrink: mdt_client_compatibility(info); @@ -2167,14 +3113,11 @@ out_shrink: /* * Data-on-MDT optimization - read data along with OPEN and return it - * in reply. Do that only if we have both DOM and LAYOUT locks. + * in reply when possible. */ - if (rc == 0 && op == REINT_OPEN && - info->mti_attr.ma_lmm != NULL && - mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) { + if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req)) rc = mdt_dom_read_on_open(info, info->mti_mdt, &lhc->mlh_reg_lh); - } return rc; } @@ -2194,8 +3137,8 @@ static long mdt_reint_opcode(struct ptlrpc_request *req, req_capsule_extend(&req->rq_pill, fmt[opc]); else { mdt = mdt_exp2dev(req->rq_export); - CERROR("%s: Unsupported opcode '%ld' from client '%s':" - " rc = %d\n", req->rq_export->exp_obd->obd_name, + CERROR("%s: Unsupported opcode '%ld' from client '%s': rc = %d\n", + req->rq_export->exp_obd->obd_name, opc, mdt->mdt_ldlm_client->cli_name, -EFAULT); opc = err_serious(-EFAULT); } @@ -2244,19 +3187,20 @@ static int mdt_reint(struct tgt_session_info *tsi) /* this should sync the whole device */ int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt) { - struct dt_device *dt = mdt->mdt_bottom; - int rc; - ENTRY; + struct dt_device *dt = mdt->mdt_bottom; + int rc; + + ENTRY; - rc = dt->dd_ops->dt_sync(env, dt); - RETURN(rc); + rc = dt->dd_ops->dt_sync(env, dt); + RETURN(rc); } /* this should sync this object */ static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp, struct mdt_object *mo) { - int rc; + int rc = 0; ENTRY; @@ -2267,7 +3211,16 @@ static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp, RETURN(-ESTALE); } - rc = mo_object_sync(env, mdt_object_child(mo)); + if (S_ISREG(lu_object_attr(&mo->mot_obj))) { + struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt; + dt_obj_version_t version; + + version = dt_version_get(env, mdt_obj2dt(mo)); + if (version > tgt->lut_obd->obd_last_committed) + rc = mo_object_sync(env, mdt_object_child(mo)); + } else { + rc = mo_object_sync(env, mdt_object_child(mo)); + } RETURN(rc); } @@ -2277,11 +3230,12 @@ static int mdt_sync(struct tgt_session_info *tsi) struct ptlrpc_request *req = tgt_ses_req(tsi); struct req_capsule *pill = tsi->tsi_pill; struct mdt_body *body; + ktime_t kstart = ktime_get(); int rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) RETURN(err_serious(-ENOMEM)); if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) { @@ -2289,6 +3243,9 @@ static int mdt_sync(struct tgt_session_info *tsi) } else { struct mdt_thread_info *info = tsi2mdt_info(tsi); + if (unlikely(info->mti_object == NULL)) + RETURN(-EPROTO); + /* sync an object */ rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, info->mti_object); @@ -2310,7 +3267,8 @@ static int mdt_sync(struct tgt_session_info *tsi) mdt_thread_info_fini(info); } if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_SYNC); + mdt_counter_incr(req, LPROC_MDT_SYNC, + ktime_us_delta(ktime_get(), kstart)); RETURN(rc); } @@ -2329,8 +3287,7 @@ static int mdt_data_sync(struct tgt_session_info *tsi) repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY); - /* if no fid is specified then do nothing, - * device sync is done via MDS_SYNC */ + /* device sync is done via MDS_SYNC. NOOP if no fid is specified */ if (fid_is_zero(&tsi->tsi_fid)) RETURN(0); @@ -2369,19 +3326,28 @@ put: */ static int mdt_quotactl(struct tgt_session_info *tsi) { - struct obd_export *exp = tsi->tsi_exp; - struct req_capsule *pill = tsi->tsi_pill; - struct obd_quotactl *oqctl, *repoqc; - int id, rc; - struct mdt_device *mdt = mdt_exp2dev(exp); - struct lu_device *qmt = mdt->mdt_qmt_dev; - struct lu_nodemap *nodemap; + struct obd_export *exp = tsi->tsi_exp; + struct req_capsule *pill = tsi->tsi_pill; + struct obd_quotactl *oqctl, *repoqc; + struct mdt_device *mdt = mdt_exp2dev(exp); + struct lu_device *qmt = mdt->mdt_qmt_dev; + struct lu_nodemap *nodemap; + char *buffer = NULL; + int id, rc; + ENTRY; oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL); - if (oqctl == NULL) + if (!oqctl) RETURN(err_serious(-EPROTO)); + if (oqctl->qc_cmd == LUSTRE_Q_ITERQUOTA || + oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) + req_capsule_set_size(pill, &RMF_OBD_QUOTA_ITER, RCL_SERVER, + LQUOTA_ITER_BUFLEN); + else + req_capsule_set_size(pill, &RMF_OBD_QUOTA_ITER, RCL_SERVER, 0); + rc = req_capsule_server_pack(pill); if (rc) RETURN(err_serious(rc)); @@ -2395,20 +3361,35 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_SETINFO: case Q_SETQUOTA: case LUSTRE_Q_SETDEFAULT: - if (!nodemap_can_setquota(nodemap)) + case LUSTRE_Q_SETQUOTAPOOL: + case LUSTRE_Q_SETINFOPOOL: + case LUSTRE_Q_SETDEFAULT_POOL: + case LUSTRE_Q_DELETEQID: + case LUSTRE_Q_RESETQID: + if (!nodemap_can_setquota(nodemap, oqctl->qc_type, + oqctl->qc_id)) GOTO(out_nodemap, rc = -EPERM); + fallthrough; case Q_GETINFO: case Q_GETQUOTA: case LUSTRE_Q_GETDEFAULT: + case LUSTRE_Q_GETQUOTAPOOL: + case LUSTRE_Q_GETINFOPOOL: + case LUSTRE_Q_GETDEFAULT_POOL: + case LUSTRE_Q_ITERQUOTA: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ + fallthrough; case Q_GETOINFO: case Q_GETOQUOTA: + case LUSTRE_Q_ITEROQUOTA: break; default: - CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd); - GOTO(out_nodemap, rc = -EFAULT); + rc = -EFAULT; + CERROR("%s: unsupported quotactl command %d: rc = %d\n", + mdt_obd_name(mdt), oqctl->qc_cmd, rc); + GOTO(out_nodemap, rc); } id = oqctl->qc_id; @@ -2422,8 +3403,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi) NODEMAP_CLIENT_TO_FS, id); break; case PRJQUOTA: - /* todo: check/map project id */ - id = oqctl->qc_id; + id = nodemap_map_id(nodemap, NODEMAP_PROJID, + NODEMAP_CLIENT_TO_FS, id); break; default: GOTO(out_nodemap, rc = -EOPNOTSUPP); @@ -2432,6 +3413,13 @@ static int mdt_quotactl(struct tgt_session_info *tsi) if (repoqc == NULL) GOTO(out_nodemap, rc = err_serious(-EFAULT)); + if (oqctl->qc_cmd == LUSTRE_Q_ITERQUOTA || + oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) { + buffer = req_capsule_server_get(pill, &RMF_OBD_QUOTA_ITER); + if (buffer == NULL) + GOTO(out_nodemap, rc = err_serious(-EFAULT)); + } + if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) barrier_exit(tsi->tsi_tgt->lut_bottom); @@ -2451,15 +3439,28 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_GETQUOTA: case LUSTRE_Q_SETDEFAULT: case LUSTRE_Q_GETDEFAULT: + case LUSTRE_Q_SETQUOTAPOOL: + case LUSTRE_Q_GETQUOTAPOOL: + case LUSTRE_Q_SETINFOPOOL: + case LUSTRE_Q_GETINFOPOOL: + case LUSTRE_Q_SETDEFAULT_POOL: + case LUSTRE_Q_GETDEFAULT_POOL: + case LUSTRE_Q_DELETEQID: + case LUSTRE_Q_RESETQID: + case LUSTRE_Q_ITERQUOTA: /* forward quotactl request to QMT */ - rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl); + rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl, buffer, + buffer == NULL ? 0 : + LQUOTA_ITER_BUFLEN); break; case Q_GETOINFO: case Q_GETOQUOTA: + case LUSTRE_Q_ITEROQUOTA: /* slave quotactl */ rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, - oqctl); + oqctl, buffer, + buffer == NULL ? 0 : LQUOTA_ITER_BUFLEN); break; default: @@ -2470,8 +3471,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi) if (oqctl->qc_id != id) swap(oqctl->qc_id, id); - *repoqc = *oqctl; - + QCTL_COPY_NO_PNAME(repoqc, oqctl); EXIT; out_nodemap: @@ -2489,39 +3489,38 @@ out_nodemap: * context into our context list here. */ static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt, - int idx) + int idx) { - struct md_device *next = mdt->mdt_child; - struct llog_ctxt *ctxt; - int rc; + struct md_device *next = mdt->mdt_child; + struct llog_ctxt *ctxt; + int rc; - if (!llog_ctxt_null(mdt2obd_dev(mdt), idx)) - return 0; + if (!llog_ctxt_null(mdt2obd_dev(mdt), idx)) + return 0; - rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt); - if (rc || ctxt == NULL) { + rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt); + if (rc || ctxt == NULL) return 0; - } - rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx); - if (rc) - CERROR("Can't set mdt ctxt %d\n", rc); + rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx); + if (rc) + CERROR("Can't set mdt ctxt %d\n", rc); - return rc; + return rc; } static int mdt_llog_ctxt_unclone(const struct lu_env *env, - struct mdt_device *mdt, int idx) + struct mdt_device *mdt, int idx) { - struct llog_ctxt *ctxt; + struct llog_ctxt *ctxt; - ctxt = llog_get_context(mdt2obd_dev(mdt), idx); - if (ctxt == NULL) - return 0; - /* Put once for the get we just did, and once for the clone */ - llog_ctxt_put(ctxt); - llog_ctxt_put(ctxt); - return 0; + ctxt = llog_get_context(mdt2obd_dev(mdt), idx); + if (ctxt == NULL) + return 0; + /* Put once for the get we just did, and once for the clone */ + llog_ctxt_put(ctxt); + llog_ctxt_put(ctxt); + return 0; } /* @@ -2542,6 +3541,7 @@ static int mdt_quota_dqacq(struct tgt_session_info *tsi) struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp); struct lu_device *qmt = mdt->mdt_qmt_dev; int rc; + ENTRY; if (qmt == NULL) @@ -2558,6 +3558,7 @@ struct mdt_object *mdt_object_new(const struct lu_env *env, struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; struct lu_object *o; struct mdt_object *m; + ENTRY; CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f)); @@ -2575,6 +3576,7 @@ struct mdt_object *mdt_object_find(const struct lu_env *env, { struct lu_object *o; struct mdt_object *m; + ENTRY; CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f)); @@ -2596,10 +3598,11 @@ struct mdt_object *mdt_object_find(const struct lu_env *env, * \param mdt the mdt device */ static void mdt_device_commit_async(const struct lu_env *env, - struct mdt_device *mdt) + struct mdt_device *mdt) { struct dt_device *dt = mdt->mdt_bottom; int rc; + ENTRY; rc = dt->dd_ops->dt_commit_async(env, dt); @@ -2622,7 +3625,7 @@ static void mdt_device_commit_async(const struct lu_env *env, */ static inline void mdt_set_lock_sync(struct ldlm_lock *lock) { - lock->l_ast_data = (void*)1; + lock->l_ast_data = (void *)1; } /** @@ -2637,7 +3640,7 @@ static inline void mdt_set_lock_sync(struct ldlm_lock *lock) */ static inline int mdt_is_lock_sync(struct ldlm_lock *lock) { - return lock->l_ast_data != NULL; + return lock->l_ast_data != NULL; } /** @@ -2661,8 +3664,8 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, struct ldlm_cb_set_arg *arg = data; bool commit_async = false; int rc; - ENTRY; + ENTRY; if (flag == LDLM_CB_CANCELING) RETURN(0); @@ -2678,33 +3681,17 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, * The 'data' parameter is l_ast_data in the first case and * callback arguments in the second one. Distinguish them by that. */ - if (!data || data == lock->l_ast_data || !arg->bl_desc) - goto skip_cos_checks; - - if (lock->l_req_mode & (LCK_PW | LCK_EX)) { - if (mdt_cos_is_enabled(mdt)) { - if (!arg->bl_desc->bl_same_client) - mdt_set_lock_sync(lock); - } else if (mdt_slc_is_enabled(mdt) && - arg->bl_desc->bl_cos_incompat) { - mdt_set_lock_sync(lock); - /* - * we may do extra commit here, but there is a small - * window to miss a commit: lock was unlocked (saved), - * then a conflict lock queued and we come here, but - * REP-ACK not received, so lock was not converted to - * COS mode yet. - * Fortunately this window is quite small, so the - * extra commit should be rare (not to say distributed - * operation is rare too). - */ + if (data && data != lock->l_ast_data && arg->bl_desc) { + if (lock->l_req_mode & (LCK_COS | LCK_TXN)) commit_async = true; - } - } else if (lock->l_req_mode == LCK_COS) { - commit_async = true; + else if ((lock->l_req_mode & (LCK_PW | LCK_EX)) && + ((mdt_cos_is_enabled(mdt) && + !arg->bl_desc->bl_same_client) || + (mdt_slc_is_enabled(mdt) && + arg->bl_desc->bl_txn_dependent))) + mdt_set_lock_sync(lock); } -skip_cos_checks: rc = ldlm_blocking_ast_nocheck(lock); if (commit_async) { @@ -2712,8 +3699,7 @@ skip_cos_checks: rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) - CWARN("%s: lu_env initialization failed, cannot " - "start asynchronous commit: rc = %d\n", + CWARN("%s: lu_env initialization failed, cannot start asynchronous commit: rc = %d\n", obd->obd_name, rc); else mdt_device_commit_async(&env, mdt); @@ -2738,6 +3724,7 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { int rc = 0; + ENTRY; switch (flag) { @@ -2758,10 +3745,11 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev->ld_site->ls_top_dev); - LDLM_DEBUG(lock, "Revoke remote lock\n"); + LDLM_DEBUG(lock, "Revoke remote lock"); /* discard slc lock here so that it can be cleaned anytime, - * especially for cleanup_resource() */ + * especially for cleanup_resource() + */ tgt_discard_slc_lock(&mdt->mdt_lut, lock); /* once we cache lock, l_ast_data is set to mdt_object */ @@ -2771,10 +3759,9 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, rc = lu_env_init(&env, LCT_MD_THREAD); if (unlikely(rc != 0)) { - CWARN("%s: lu_env initialization failed, object" - "%p "DFID" is leaked!\n", + CWARN("%s: lu_env initialization failed, object %p "DFID" is leaked!: rc = %d\n", obd->obd_name, mo, - PFID(mdt_object_fid(mo))); + PFID(mdt_object_fid(mo)), rc); RETURN(rc); } @@ -2809,7 +3796,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, if (lock == NULL) { /* Lock is pinned by ldlm_handle_enqueue0() as it is * a resend case, however, it could be already destroyed - * due to client eviction or a raced cancel RPC. */ + * due to client eviction or a raced cancel RPC. + */ LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx", lhc->mlh_reg_lh.cookie); RETURN(-ESTALE); @@ -2817,8 +3805,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, if (!fid_res_name_eq(mdt_object_fid(mo), &lock->l_resource->lr_name)) { - CWARN("%s: Although resent, but still not " - "get child lock:"DFID"\n", + CWARN("%s: Although resent, but still not get child lock:" + DFID"\n", info->mti_exp->exp_obd->obd_name, PFID(mdt_object_fid(mo))); LDLM_LOCK_PUT(lock); @@ -2830,20 +3818,23 @@ int mdt_check_resent_lock(struct mdt_thread_info *info, return 1; } -int mdt_remote_object_lock_try(struct mdt_thread_info *mti, - struct mdt_object *o, const struct lu_fid *fid, - struct lustre_handle *lh, enum ldlm_mode mode, - __u64 *ibits, __u64 trybits, bool cache) +static void mdt_remote_object_lock_created_cb(struct ldlm_lock *lock) { - struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo; - union ldlm_policy_data *policy = &mti->mti_policy; - struct ldlm_res_id *res_id = &mti->mti_res_id; - int rc = 0; - ENTRY; + mdt_object_get(NULL, lock->l_ast_data); +} - LASSERT(mdt_object_remote(o)); +static int mdt_remote_object_lock_try(struct mdt_thread_info *mti, + struct mdt_object *obj, + struct lustre_handle *lh, + enum ldlm_mode mode, + union ldlm_policy_data *policy, + struct ldlm_res_id *res_id, + bool cache) +{ + struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo; + int rc; - fid_build_reg_res_name(fid, res_id); + LASSERT(mdt_object_remote(obj)); memset(einfo, 0, sizeof(*einfo)); einfo->ei_type = LDLM_IBITS; @@ -2852,248 +3843,373 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, einfo->ei_cb_cp = ldlm_completion_ast; einfo->ei_enq_slave = 0; einfo->ei_res_id = res_id; - + einfo->ei_req_slot = 1; if (cache) { /* * if we cache lock, couple lock with mdt_object, so that object * can be easily found in lock ASTs. */ - mdt_object_get(mti->mti_env, o); - einfo->ei_cbdata = o; + einfo->ei_cbdata = obj; + einfo->ei_cb_created = mdt_remote_object_lock_created_cb; } - memset(policy, 0, sizeof(*policy)); - policy->l_inodebits.bits = *ibits; - policy->l_inodebits.try_bits = trybits; - - rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, + rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), lh, einfo, policy); - if (rc < 0 && cache) - mdt_object_put(mti->mti_env, o); - - /* Return successfully acquired bits to a caller */ - if (rc == 0) { - struct ldlm_lock *lock = ldlm_handle2lock(lh); - - LASSERT(lock); - *ibits = lock->l_policy_data.l_inodebits.bits; - LDLM_LOCK_PUT(lock); + if (rc) { + lh->cookie = 0ull; + return rc; } - RETURN(rc); -} -int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o, - const struct lu_fid *fid, struct lustre_handle *lh, - enum ldlm_mode mode, __u64 ibits, bool cache) -{ - return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0, - cache); + /* other components like LFSCK can use lockless access + * and populate cache, so we better invalidate it + */ + if (policy->l_inodebits.bits & + (MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR)) + mo_invalidate(mti->mti_env, mdt_object_child(obj)); + + return 0; } -static int mdt_object_local_lock(struct mdt_thread_info *info, - struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 *ibits, - __u64 trybits, bool cos_incompat) +/* + * Helper function to take PDO and hash lock. + * + * if \a pdo_lock is false, don't take PDO lock, this is case in rename. + */ +int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lh, const struct lu_name *name, + enum ldlm_mode mode, bool pdo_lock) { struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - __u64 dlmflags = 0, *cookie = NULL; + /* + * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it is never going to + * be sent to client and we do not want it slowed down due to possible + * cancels. + */ + __u64 dlmflags = LDLM_FL_ATOMIC_CB; + __u64 *cookie = NULL; int rc; - ENTRY; - LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); - LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh)); - LASSERT(lh->mlh_reg_mode != LCK_MINMODE); - LASSERT(lh->mlh_type != MDT_NUL_LOCK); - - if (cos_incompat) { - LASSERT(lh->mlh_reg_mode == LCK_PW || - lh->mlh_reg_mode == LCK_EX); - dlmflags |= LDLM_FL_COS_INCOMPAT; - } else if (mdt_cos_is_enabled(info->mti_mdt)) { - dlmflags |= LDLM_FL_COS_ENABLED; - } - - /* Only enqueue LOOKUP lock for remote object */ - LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP)); - - if (lh->mlh_type == MDT_PDO_LOCK) { - /* check for exists after object is locked */ - if (mdt_object_exists(o) == 0) { - /* Non-existent object shouldn't have PDO lock */ - RETURN(-ESTALE); - } else { - /* Non-dir object shouldn't have PDO lock */ - if (!S_ISDIR(lu_object_attr(&o->mot_obj))) - RETURN(-ENOTDIR); - } - } - - fid_build_reg_res_name(mdt_object_fid(o), res_id); - dlmflags |= LDLM_FL_ATOMIC_CB; + LASSERT(obj); + /* check for exists after object is locked */ + if (!mdt_object_exists(obj)) + /* Non-existent object shouldn't have PDO lock */ + return -ESTALE; + + /* Non-dir object shouldn't have PDO lock */ + if (!S_ISDIR(lu_object_attr(&obj->mot_obj))) + return -ENOTDIR; + + policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; + policy->l_inodebits.try_bits = 0; + policy->l_inodebits.li_gid = 0; + policy->l_inodebits.li_initiator_id = mdt_node_id(info->mti_mdt); + fid_build_reg_res_name(mdt_object_fid(obj), res_id); if (info->mti_exp) cookie = &info->mti_exp->exp_handle.h_cookie; - /* - * Take PDO lock on whole directory and build correct @res_id for lock - * on part of directory. - */ - if (lh->mlh_pdo_hash != 0) { - LASSERT(lh->mlh_type == MDT_PDO_LOCK); - mdt_lock_pdo_mode(info, o, lh); - if (lh->mlh_pdo_mode != LCK_NL) { - /* - * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it - * is never going to be sent to client and we do not - * want it slowed down due to possible cancels. - */ - policy->l_inodebits.bits = - *ibits & MDS_INODELOCK_UPDATE; - policy->l_inodebits.try_bits = - trybits & MDS_INODELOCK_UPDATE; - /* at least one of them should be set */ - LASSERT(policy->l_inodebits.bits | - policy->l_inodebits.try_bits); - rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh, - lh->mlh_pdo_mode, policy, res_id, - dlmflags, cookie); - if (unlikely(rc != 0)) - GOTO(out_unlock, rc); - } + mdt_lock_pdo_init(lh, mode, name); + mdt_lock_pdo_mode(info, obj, lh); + if (lh->mlh_pdo_mode != LCK_NL) { + if (pdo_lock) { + if (mdt_object_remote(obj)) { + rc = mdt_remote_object_lock_try(info, obj, + &lh->mlh_pdo_lh, lh->mlh_pdo_mode, + policy, res_id, false); + lh->mlh_pdo_remote = 1; + } else { + rc = mdt_fid_lock(info->mti_env, ns, + &lh->mlh_pdo_lh, lh->mlh_pdo_mode, + policy, res_id, dlmflags, cookie); + } + if (rc) { + mdt_object_unlock(info, obj, lh, 1); + return rc; + } + } + res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash; + } + + if (mdt_object_remote(obj)) + rc = mdt_remote_object_lock_try(info, obj, &lh->mlh_rreg_lh, + lh->mlh_rreg_mode, policy, res_id, false); + else + rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, + lh->mlh_reg_mode, policy, res_id, dlmflags, cookie); + if (rc) + mdt_object_unlock(info, obj, lh, 1); + else if (CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK) && + lh->mlh_pdo_hash != 0 && + (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX)) + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15); - /* - * Finish res_id initializing by name hash marking part of - * directory which is taking modification. - */ - res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash; - } + return rc; +} + +int mdt_object_lock_internal(struct mdt_thread_info *info, + struct mdt_object *obj, const struct lu_fid *fid, + struct mdt_lock_handle *lh, __u64 *ibits, + __u64 trybits, bool cache) +{ + union ldlm_policy_data *policy = &info->mti_policy; + struct ldlm_res_id *res_id = &info->mti_res_id; + struct lustre_handle *handle; + int rc; policy->l_inodebits.bits = *ibits; policy->l_inodebits.try_bits = trybits; + policy->l_inodebits.li_gid = lh->mlh_gid; + policy->l_inodebits.li_initiator_id = mdt_node_id(info->mti_mdt); + fid_build_reg_res_name(fid, res_id); - /* - * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is - * going to be sent to client. If it is - mdt_intent_policy() path will - * fix it up and turn FL_LOCAL flag off. - */ - rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, - policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags, - cookie); -out_unlock: - if (rc != 0) - mdt_object_unlock(info, o, lh, 1); - else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) && - lh->mlh_pdo_hash != 0 && - (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX)) - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15); + if (obj && mdt_object_remote(obj)) { + handle = &lh->mlh_rreg_lh; + LASSERT(!lustre_handle_is_used(handle)); + LASSERT(lh->mlh_rreg_mode != LCK_MINMODE); + LASSERT(lh->mlh_type != MDT_NUL_LOCK); + rc = mdt_remote_object_lock_try(info, obj, handle, + lh->mlh_rreg_mode, policy, + res_id, cache); + } else { + struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; + /* + * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if + * it is going to be sent to client. If it is - + * mdt_intent_policy() path will fix it up and turn FL_LOCAL + * flag off. + */ + __u64 dlmflags = LDLM_FL_ATOMIC_CB | LDLM_FL_LOCAL_ONLY; + __u64 *cookie = NULL; + + handle = &lh->mlh_reg_lh; + LASSERT(!lustre_handle_is_used(handle)); + LASSERT(lh->mlh_reg_mode != LCK_MINMODE); + LASSERT(lh->mlh_type != MDT_NUL_LOCK); + + /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */ + if (lh->mlh_type == MDT_REG_LOCK && + lh->mlh_reg_mode == LCK_EX && *ibits == MDS_INODELOCK_OPEN) + dlmflags |= LDLM_FL_CANCEL_ON_BLOCK; + + + if (info->mti_exp) + cookie = &info->mti_exp->exp_handle.h_cookie; + + rc = mdt_fid_lock(info->mti_env, ns, handle, lh->mlh_reg_mode, + policy, res_id, dlmflags, cookie); + if (rc) + mdt_object_unlock(info, obj, lh, 1); + } - /* Return successfully acquired bits to a caller */ if (rc == 0) { - struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh); + struct ldlm_lock *lock; + /* Return successfully acquired bits to a caller */ + lock = ldlm_handle2lock(handle); LASSERT(lock); *ibits = lock->l_policy_data.l_inodebits.bits; LDLM_LOCK_PUT(lock); } + + return rc; +} + +/* + * MDT object locking functions: + * mdt_object_lock(): lock object, this is used in most places, and normally + * lock ibits doesn't contain LOOKUP, unless the caller knows it's not + * remote object. + * mdt_object_check_lock(): lock object with LOOKUP and other ibits, it needs + * to check whether parent is on remote MDT, if so, take LOOKUP on parent + * MDT separately, and then lock other ibits on child object. + * mdt_parent_lock(): take parent UPDATE lock with specific mode, if parent is + * local, take PDO lock by name hash, otherwise take regular lock. + * mdt_object_stripes_lock(): lock object which should be local, and if it's a + * striped directory, lock its stripes, this is called in operations which + * modify both object and stripes. + * mdt_object_lock_try(): lock object with trybits, the trybits contains + * optional inode lock bits that can be granted. This is called by + * getattr/open to fetch more inode lock bits to client, and is also called + * by dir migration to lock link parent in non-block mode to avoid + * deadlock. + */ + +/** + * lock object + * + * this is used to lock object in most places, and normally lock ibits doesn't + * contain LOOKUP, unless the caller knows it's not remote object. + * + * \param info struct mdt_thread_info + * \param obj object + * \param lh lock handle + * \param ibits MDS inode lock bits + * \param mode lock mode + * + * \retval 0 on success, -ev on error. + */ +int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lh, __u64 ibits, + enum ldlm_mode mode) +{ + int rc; + + ENTRY; + mdt_lock_reg_init(lh, mode); + rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, + &ibits, 0, false); RETURN(rc); } -static int -mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 *ibits, - __u64 trybits, bool cos_incompat) +/** + * lock object with LOOKUP and other ibits + * + * it will check whether parent and child are on different MDTs, if so, take + * LOOKUP lock on parent MDT, and lock other ibits on child MDT, otherwise lock + * all ibits on child MDT. Note, parent and child shouldn't be both on remote + * MDTs, in which case specific lock function should be used, and it's in + * rename and migrate only. + * + * \param info struct mdt_thread_info + * \param parent parent object + * \param child child object + * \param lh lock handle + * \param ibits MDS inode lock bits + * \param mode lock mode + * + * \retval 0 on success, -ev on error. + */ +int mdt_object_check_lock(struct mdt_thread_info *info, + struct mdt_object *parent, struct mdt_object *child, + struct mdt_lock_handle *lh, __u64 ibits, + enum ldlm_mode mode) { - struct mdt_lock_handle *local_lh = NULL; int rc; - ENTRY; - - if (!mdt_object_remote(o)) { - rc = mdt_object_local_lock(info, o, lh, ibits, trybits, - cos_incompat); - RETURN(rc); - } - /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */ - *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT | - MDS_INODELOCK_XATTR); - - /* Only enqueue LOOKUP lock for remote object */ - if (*ibits & MDS_INODELOCK_LOOKUP) { - __u64 local = MDS_INODELOCK_LOOKUP; - - rc = mdt_object_local_lock(info, o, lh, &local, 0, - cos_incompat); - if (rc != ELDLM_OK) + ENTRY; + /* if LOOKUP ibit is not set, use mdt_object_lock() */ + LASSERT(ibits & MDS_INODELOCK_LOOKUP); + /* if only LOOKUP ibit is needed, use mdt_object_lookup_lock() */ + LASSERT(ibits != MDS_INODELOCK_LOOKUP); + LASSERT(parent); + /* @parent and @child shouldn't both be on remote MDTs */ + LASSERT(!(mdt_object_remote(parent) && mdt_object_remote(child))); + + mdt_lock_reg_init(lh, mode); + if (mdt_object_remote(parent) ^ mdt_object_remote(child)) { + __u64 lookup_ibits = MDS_INODELOCK_LOOKUP; + + rc = mdt_object_lock_internal(info, parent, + mdt_object_fid(child), lh, + &lookup_ibits, 0, false); + if (rc) RETURN(rc); - local_lh = lh; - } - - if ((*ibits | trybits) & MDS_INODELOCK_UPDATE) { - /* Sigh, PDO needs to enqueue 2 locks right now, but - * enqueue RPC can only request 1 lock, to avoid extra - * RPC, so it will instead enqueue EX lock for remote - * object anyway XXX*/ - if (lh->mlh_type == MDT_PDO_LOCK && - lh->mlh_pdo_hash != 0) { - CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to" - "EX lock.\n", mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(o))); - lh->mlh_pdo_hash = 0; - lh->mlh_rreg_mode = LCK_EX; - lh->mlh_type = MDT_REG_LOCK; - } - - rc = mdt_remote_object_lock_try(info, o, mdt_object_fid(o), - &lh->mlh_rreg_lh, - lh->mlh_rreg_mode, - ibits, trybits, false); - if (rc != ELDLM_OK) { - if (local_lh != NULL) - mdt_object_unlock(info, o, local_lh, rc); - RETURN(rc); - } + ibits &= ~MDS_INODELOCK_LOOKUP; } - /* other components like LFSCK can use lockless access - * and populate cache, so we better invalidate it */ - mo_invalidate(info->mti_env, mdt_object_child(o)); + rc = mdt_object_lock_internal(info, child, mdt_object_fid(child), lh, + &ibits, 0, false); + if (rc && !(ibits & MDS_INODELOCK_LOOKUP)) + mdt_object_unlock(info, NULL, lh, 1); - RETURN(0); + RETURN(rc); } -int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits) +/** + * take parent UPDATE lock + * + * if parent is local or mode is LCK_PW, take PDO lock, otherwise take regular + * lock. + * + * \param info struct mdt_thread_info + * \param obj parent object + * \param lh lock handle + * \param lname child name + * \param mode lock mode + * + * \retval 0 on success, -ev on error. + */ +int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *obj, + struct mdt_lock_handle *lh, const struct lu_name *lname, + enum ldlm_mode mode) { - return mdt_object_lock_internal(info, o, lh, &ibits, 0, false); -} + int rc; -int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits, - bool cos_incompat) -{ - LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX); - return mdt_object_lock_internal(info, o, lh, &ibits, 0, - cos_incompat); + ENTRY; + LASSERT(obj && lname); + LASSERT(mode == LCK_PW || mode == LCK_PR); + if (mdt_object_remote(obj) && mode == LCK_PR) { + __u64 ibits = MDS_INODELOCK_UPDATE; + + mdt_lock_reg_init(lh, mode); + rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), + lh, &ibits, 0, false); + } else { + rc = mdt_object_pdo_lock(info, obj, lh, lname, mode, true); + } + RETURN(rc); } -int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, +/** + * lock object with trybits + * + * the trybits contains optional inode lock bits that can be granted. This is + * called by getattr/open to fetch more inode lock bits to client, and is also + * called by dir migration to lock link parent in non-block mode to avoid + * deadlock. + * + * \param info struct mdt_thread_info + * \param obj object + * \param lh lock handle + * \param ibits MDS inode lock bits + * \param trybits optional inode lock bits + * \param mode lock mode + * + * \retval 0 on success, -ev on error. + */ +int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *obj, struct mdt_lock_handle *lh, __u64 *ibits, - __u64 trybits, bool cos_incompat) + __u64 trybits, enum ldlm_mode mode) { bool trylock_only = *ibits == 0; int rc; + ENTRY; LASSERT(!(*ibits & trybits)); - rc = mdt_object_lock_internal(info, o, lh, ibits, trybits, - cos_incompat); + mdt_lock_reg_init(lh, mode); + rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, ibits, + trybits, false); if (rc && trylock_only) { /* clear error for try ibits lock only */ LASSERT(*ibits == 0); rc = 0; } - return rc; + RETURN(rc); +} + +/* + * Helper function to take \a obj LOOKUP lock. + * + * Both \a pobj and \a obj may be located on remote MDTs. + */ +int mdt_object_lookup_lock(struct mdt_thread_info *info, + struct mdt_object *pobj, struct mdt_object *obj, + struct mdt_lock_handle *lh, enum ldlm_mode mode) +{ + __u64 ibits = MDS_INODELOCK_LOOKUP; + int rc; + + ENTRY; + /* if @parent is NULL, it's on local MDT, and @child is remote, + * this is case in getattr/unlink/open by name. + */ + LASSERT(ergo(!pobj, mdt_object_remote(obj))); + mdt_lock_reg_init(lh, mode); + rc = mdt_object_lock_internal(info, pobj, mdt_object_fid(obj), lh, + &ibits, 0, false); + RETURN(rc); } /** @@ -3109,55 +4225,65 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o, * \param mode lock mode * \param decref force immediate lock releasing */ -void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, - enum ldlm_mode mode, int decref) +static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h, + enum ldlm_mode mode, int decref) { + struct tgt_session_info *tsi = info->mti_env->le_ses ? + tgt_ses_info(info->mti_env) : NULL; + ENTRY; if (lustre_handle_is_used(h)) { - if (decref || !info->mti_has_trans || - !(mode & (LCK_PW | LCK_EX))) { + bool has_trans = tsi && tsi->tsi_has_trans; + + if (decref || !has_trans || !(mode & (LCK_PW | LCK_EX))) { mdt_fid_unlock(h, mode); } else { struct mdt_device *mdt = info->mti_mdt; struct ldlm_lock *lock = ldlm_handle2lock(h); struct ptlrpc_request *req = mdt_info_req(info); - bool cos = mdt_cos_is_enabled(mdt); - bool convert_lock = !cos && mdt_slc_is_enabled(mdt); + bool no_ack = false; LASSERTF(lock != NULL, "no lock for cookie %#llx\n", h->cookie); /* there is no request if mdt_object_unlock() is called - * from mdt_export_cleanup()->mdt_add_dirty_flag() */ + * from mdt_export_cleanup()->mdt_add_dirty_flag() + */ if (likely(req != NULL)) { - LDLM_DEBUG(lock, "save lock request %p reply " - "state %p transno %lld\n", req, + LDLM_DEBUG(lock, "save lock request %p reply state %p transno %lld", + req, req->rq_reply_state, req->rq_transno); - if (cos) { - ldlm_lock_mode_downgrade(lock, LCK_COS); + if (mdt_cos_is_enabled(mdt)) { mode = LCK_COS; + no_ack = true; + ldlm_lock_mode_downgrade(lock, mode); + } else if (mdt_slc_is_enabled(mdt)) { + no_ack = true; + if (mode != LCK_TXN) { + mode = LCK_TXN; + ldlm_lock_mode_downgrade(lock, + mode); + } } if (req->rq_export->exp_disconnected) mdt_fid_unlock(h, mode); else - ptlrpc_save_lock(req, h, mode, cos, - convert_lock); + ptlrpc_save_lock(req, h, no_ack); } else { mdt_fid_unlock(h, mode); } - if (mdt_is_lock_sync(lock)) { - CDEBUG(D_HA, "found sync-lock," - " async commit started\n"); - mdt_device_commit_async(info->mti_env, - mdt); - } - LDLM_LOCK_PUT(lock); - } - h->cookie = 0ull; - } - EXIT; + if (mdt_is_lock_sync(lock)) { + CDEBUG(D_HA, "sync_lock, do async commit\n"); + mdt_device_commit_async(info->mti_env, mdt); + } + LDLM_LOCK_PUT(lock); + } + h->cookie = 0ull; + } + + EXIT; } /** @@ -3179,20 +4305,18 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info, if (lustre_handle_is_used(h)) { struct ldlm_lock *lock = ldlm_handle2lock(h); + struct ptlrpc_request *req = mdt_info_req(info); if (o != NULL && (lock->l_policy_data.l_inodebits.bits & (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE))) mo_invalidate(info->mti_env, mdt_object_child(o)); - if (decref || !info->mti_has_trans || - !(mode & (LCK_PW | LCK_EX))) { + if (decref || !req || !(mode & (LCK_PW | LCK_EX)) || + !tgt_ses_info(info->mti_env)->tsi_has_trans) { ldlm_lock_decref_and_cancel(h, mode); LDLM_LOCK_PUT(lock); } else { - struct ptlrpc_request *req = mdt_info_req(info); - - LASSERT(req != NULL); tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock, req->rq_transno); ldlm_lock_decref(h, mode); @@ -3222,7 +4346,11 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, { ENTRY; - mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); + if (lh->mlh_pdo_remote) + mdt_save_remote_lock(info, o, &lh->mlh_pdo_lh, + lh->mlh_pdo_mode, decref); + else + mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); mdt_save_remote_lock(info, o, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, decref); @@ -3231,32 +4359,32 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, } struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info, - const struct lu_fid *f, - struct mdt_lock_handle *lh, - __u64 ibits) + const struct lu_fid *f, + struct mdt_lock_handle *lh, + __u64 ibits, enum ldlm_mode mode) { - struct mdt_object *o; + struct mdt_object *o; - o = mdt_object_find(info->mti_env, info->mti_mdt, f); - if (!IS_ERR(o)) { - int rc; + o = mdt_object_find(info->mti_env, info->mti_mdt, f); + if (!IS_ERR(o)) { + int rc; - rc = mdt_object_lock(info, o, lh, ibits); - if (rc != 0) { - mdt_object_put(info->mti_env, o); - o = ERR_PTR(rc); - } - } - return o; + rc = mdt_object_lock(info, o, lh, ibits, mode); + if (rc != 0) { + mdt_object_put(info->mti_env, o); + o = ERR_PTR(rc); + } + } + return o; } -void mdt_object_unlock_put(struct mdt_thread_info * info, - struct mdt_object * o, - struct mdt_lock_handle *lh, - int decref) +void mdt_object_unlock_put(struct mdt_thread_info *info, + struct mdt_object *o, + struct mdt_lock_handle *lh, + int decref) { - mdt_object_unlock(info, o, lh, decref); - mdt_object_put(info->mti_env, o); + mdt_object_unlock(info, o, lh, decref); + mdt_object_put(info->mti_env, o); } /* @@ -3267,98 +4395,117 @@ void mdt_object_unlock_put(struct mdt_thread_info * info, * - create lu_object, corresponding to the fid in mdt_body, and save it in * @info; * - * - if HABEO_CORPUS flag is set for this request type check whether object + * - if HAS_BODY flag is set for this request type check whether object * actually exists on storage (lu_object_exists()). * */ static int mdt_body_unpack(struct mdt_thread_info *info, enum tgt_handler_flags flags) { - const struct mdt_body *body; - struct mdt_object *obj; - const struct lu_env *env; - struct req_capsule *pill; - int rc; - ENTRY; + const struct mdt_body *body; + struct mdt_object *obj; + const struct lu_env *env; + struct req_capsule *pill; + int rc; + + ENTRY; - env = info->mti_env; - pill = info->mti_pill; + env = info->mti_env; + pill = info->mti_pill; - body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY); - if (body == NULL) - RETURN(-EFAULT); + body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EFAULT); if (!(body->mbo_valid & OBD_MD_FLID)) RETURN(0); if (!fid_is_sane(&body->mbo_fid1)) { CERROR("Invalid fid: "DFID"\n", PFID(&body->mbo_fid1)); - RETURN(-EINVAL); - } + RETURN(-EINVAL); + } obj = mdt_object_find(env, info->mti_mdt, &body->mbo_fid1); if (!IS_ERR(obj)) { - if ((flags & HABEO_CORPUS) && !mdt_object_exists(obj)) { + if ((flags & HAS_BODY) && !mdt_object_exists(obj)) { mdt_object_put(env, obj); rc = -ENOENT; - } else { - info->mti_object = obj; - rc = 0; - } - } else - rc = PTR_ERR(obj); + } else { + info->mti_object = obj; + rc = 0; + } + } else + rc = PTR_ERR(obj); - RETURN(rc); + RETURN(rc); } static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, enum tgt_handler_flags flags) { - struct req_capsule *pill = info->mti_pill; - int rc; - ENTRY; - - if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) - rc = mdt_body_unpack(info, flags); - else - rc = 0; - - if (rc == 0 && (flags & HABEO_REFERO)) { - /* Pack reply. */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, - DEF_REP_MD_SIZE); - if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) + struct req_capsule *pill = info->mti_pill; + int rc; + + ENTRY; + + if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) + rc = mdt_body_unpack(info, flags); + else + rc = 0; + + if (rc == 0 && (flags & HAS_REPLY)) { + /* Pack reply. */ + if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, + req_capsule_ptlreq(pill) ? + DEF_REP_MD_SIZE : MAX_MD_SIZE_OLD); + + if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD * by default. If the target object has more ACL entries, then - * enlarge the buffer when necessary. */ + * enlarge the buffer when necessary. + */ if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER)) req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); - rc = req_capsule_server_pack(pill); - } - RETURN(rc); -} + mdt_preset_secctx_size(info); + mdt_preset_encctx_size(info); -void mdt_lock_handle_init(struct mdt_lock_handle *lh) -{ - lh->mlh_type = MDT_NUL_LOCK; - lh->mlh_reg_lh.cookie = 0ull; - lh->mlh_reg_mode = LCK_MINMODE; - lh->mlh_pdo_lh.cookie = 0ull; - lh->mlh_pdo_mode = LCK_MINMODE; - lh->mlh_rreg_lh.cookie = 0ull; - lh->mlh_rreg_mode = LCK_MINMODE; + rc = req_capsule_server_pack(pill); + if (rc) + CWARN("%s: cannot pack response: rc = %d\n", + mdt_obd_name(info->mti_mdt), rc); + } + RETURN(rc); } -void mdt_lock_handle_fini(struct mdt_lock_handle *lh) +void mdt_thread_info_reset(struct mdt_thread_info *info) { - LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); - LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh)); + memset(&info->mti_attr, 0, sizeof(info->mti_attr)); + info->mti_body = NULL; + info->mti_dlm_req = NULL; + info->mti_cross_ref = 0; + info->mti_opdata = 0; + info->mti_big_lmm_used = 0; + info->mti_big_acl_used = 0; + info->mti_som_strict = 0; + info->mti_intent_lock = 0; + + info->mti_spec.no_create = 0; + info->mti_spec.sp_rm_entry = 0; + info->mti_spec.sp_permitted = 0; + + info->mti_spec.u.sp_ea.eadata = NULL; + info->mti_spec.u.sp_ea.eadatalen = 0; + + if (info->mti_batch_env && info->mti_object != NULL) { + mdt_object_put(info->mti_env, info->mti_object); + info->mti_object = NULL; + } } /* @@ -3369,42 +4516,21 @@ void mdt_lock_handle_fini(struct mdt_lock_handle *lh) void mdt_thread_info_init(struct ptlrpc_request *req, struct mdt_thread_info *info) { - int i; - - info->mti_pill = &req->rq_pill; - - /* lock handle */ - for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++) - mdt_lock_handle_init(&info->mti_lh[i]); + info->mti_pill = &req->rq_pill; - /* mdt device: it can be NULL while CONNECT */ - if (req->rq_export) { - info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); - info->mti_exp = req->rq_export; - } else - info->mti_mdt = NULL; + /* mdt device: it can be NULL while CONNECT */ + if (req->rq_export) { + info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); + info->mti_exp = req->rq_export; + } else + info->mti_mdt = NULL; info->mti_env = req->rq_svc_thread->t_env; info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg); - - memset(&info->mti_attr, 0, sizeof(info->mti_attr)); info->mti_big_buf = LU_BUF_NULL; - info->mti_body = NULL; - info->mti_object = NULL; - info->mti_dlm_req = NULL; - info->mti_has_trans = 0; - info->mti_cross_ref = 0; - info->mti_opdata = 0; - info->mti_big_lmm_used = 0; - info->mti_big_acl_used = 0; - info->mti_som_valid = 0; + info->mti_batch_env = 0; + info->mti_object = NULL; - info->mti_spec.no_create = 0; - info->mti_spec.sp_rm_entry = 0; - info->mti_spec.sp_permitted = 0; - info->mti_spec.sp_migrate_close = 0; - - info->mti_spec.u.sp_ea.eadata = NULL; - info->mti_spec.u.sp_ea.eadatalen = 0; + mdt_thread_info_reset(info); } void mdt_thread_info_fini(struct mdt_thread_info *info) @@ -3417,10 +4543,11 @@ void mdt_thread_info_fini(struct mdt_thread_info *info) } for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++) - mdt_lock_handle_fini(&info->mti_lh[i]); + mdt_lock_handle_assert(&info->mti_lh[i]); info->mti_env = NULL; info->mti_pill = NULL; info->mti_exp = NULL; + info->mti_mdt = NULL; if (unlikely(info->mti_big_buf.lb_buf != NULL)) lu_buf_free(&info->mti_big_buf); @@ -3446,12 +4573,10 @@ struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi) static int mdt_tgt_connect(struct tgt_session_info *tsi) { - if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) && + if (CFS_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) && cfs_fail_val == - tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) { - set_current_state(TASK_UNINTERRUPTIBLE); - schedule_timeout(msecs_to_jiffies(3 * MSEC_PER_SEC)); - } + tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) + schedule_timeout_uninterruptible(cfs_time_seconds(3)); return tgt_connect(tsi); } @@ -3476,112 +4601,117 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, struct mdt_lock_handle *lh, __u64 flags, int result) { - struct ptlrpc_request *req = mdt_info_req(info); - struct ldlm_lock *lock = *lockp; + struct ptlrpc_request *req = mdt_info_req(info); + struct ldlm_lock *lock = *lockp; struct ldlm_lock *new_lock; /* If possible resent found a lock, @lh is set to its handle */ new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0); - if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) { - lh->mlh_reg_lh.cookie = 0; - RETURN(0); - } - - if (new_lock == NULL && (flags & LDLM_FL_RESENT)) { - /* Lock is pinned by ldlm_handle_enqueue0() as it is - * a resend case, however, it could be already destroyed - * due to client eviction or a raced cancel RPC. */ - LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n", - lh->mlh_reg_lh.cookie); + if (new_lock == NULL) { + if (flags & LDLM_FL_INTENT_ONLY) { + result = 0; + } else if (flags & LDLM_FL_RESENT) { + /* Lock is pinned by ldlm_handle_enqueue0() as it is a + * resend case, however, it could be already destroyed + * due to client eviction or a raced cancel RPC. + */ + LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n", + lh->mlh_reg_lh.cookie); + result = -ESTALE; + } else { + CERROR("%s: Invalid lockh=%#llx flags=%#llx fid1="DFID" fid2="DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), + lh->mlh_reg_lh.cookie, flags, + PFID(&info->mti_tmp_fid1), + PFID(&info->mti_tmp_fid2), result); + result = -ESTALE; + } lh->mlh_reg_lh.cookie = 0; - RETURN(-ESTALE); + RETURN(result); + } + + /* + * If we've already given this lock to a client once, then we should + * have no readers or writers. Otherwise, we should have one reader + * _or_ writer ref (which will be zeroed below) before returning the + * lock to a client. + */ + if (new_lock->l_export == req->rq_export) { + LASSERT(new_lock->l_readers + new_lock->l_writers == 0); + } else { + LASSERT(new_lock->l_export == NULL); + LASSERT(new_lock->l_readers + new_lock->l_writers == 1); } - LASSERTF(new_lock != NULL, - "lockh %#llx flags %#llx : rc = %d\n", - lh->mlh_reg_lh.cookie, flags, result); - - /* - * If we've already given this lock to a client once, then we should - * have no readers or writers. Otherwise, we should have one reader - * _or_ writer ref (which will be zeroed below) before returning the - * lock to a client. - */ - if (new_lock->l_export == req->rq_export) { - LASSERT(new_lock->l_readers + new_lock->l_writers == 0); - } else { - LASSERT(new_lock->l_export == NULL); - LASSERT(new_lock->l_readers + new_lock->l_writers == 1); - } - - *lockp = new_lock; - - if (new_lock->l_export == req->rq_export) { - /* - * Already gave this to the client, which means that we - * reconstructed a reply. - */ - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & - MSG_RESENT); + *lockp = new_lock; + + if (new_lock->l_export == req->rq_export) { + /* + * Already gave this to the client, which means that we + * reconstructed a reply. + */ + LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & + MSG_RESENT); LDLM_LOCK_RELEASE(new_lock); - lh->mlh_reg_lh.cookie = 0; - RETURN(ELDLM_LOCK_REPLACED); - } - - /* - * Fixup the lock to be given to the client. - */ - lock_res_and_lock(new_lock); - /* Zero new_lock->l_readers and new_lock->l_writers without triggering - * possible blocking AST. */ - while (new_lock->l_readers > 0) { - lu_ref_del(&new_lock->l_reference, "reader", new_lock); - lu_ref_del(&new_lock->l_reference, "user", new_lock); - new_lock->l_readers--; - } - while (new_lock->l_writers > 0) { - lu_ref_del(&new_lock->l_reference, "writer", new_lock); - lu_ref_del(&new_lock->l_reference, "user", new_lock); - new_lock->l_writers--; - } - - new_lock->l_export = class_export_lock_get(req->rq_export, new_lock); - new_lock->l_blocking_ast = lock->l_blocking_ast; - new_lock->l_completion_ast = lock->l_completion_ast; + lh->mlh_reg_lh.cookie = 0; + RETURN(ELDLM_LOCK_REPLACED); + } + + /* + * Fixup the lock to be given to the client. + */ + lock_res_and_lock(new_lock); + /* Zero new_lock->l_readers and new_lock->l_writers without triggering + * possible blocking AST. + */ + while (new_lock->l_readers > 0) { + lu_ref_del(&new_lock->l_reference, "reader", new_lock); + lu_ref_del(&new_lock->l_reference, "user", new_lock); + new_lock->l_readers--; + } + while (new_lock->l_writers > 0) { + lu_ref_del(&new_lock->l_reference, "writer", new_lock); + lu_ref_del(&new_lock->l_reference, "user", new_lock); + new_lock->l_writers--; + } + + new_lock->l_export = class_export_lock_get(req->rq_export, new_lock); + new_lock->l_blocking_ast = lock->l_blocking_ast; + new_lock->l_completion_ast = lock->l_completion_ast; if (ldlm_has_dom(new_lock)) new_lock->l_glimpse_ast = ldlm_server_glimpse_ast; - new_lock->l_remote_handle = lock->l_remote_handle; - new_lock->l_flags &= ~LDLM_FL_LOCAL; + new_lock->l_remote_handle = lock->l_remote_handle; + new_lock->l_flags &= ~LDLM_FL_LOCAL; - unlock_res_and_lock(new_lock); + unlock_res_and_lock(new_lock); - cfs_hash_add(new_lock->l_export->exp_lock_hash, - &new_lock->l_remote_handle, - &new_lock->l_exp_hash); + cfs_hash_add(new_lock->l_export->exp_lock_hash, + &new_lock->l_remote_handle, + &new_lock->l_exp_hash); - LDLM_LOCK_RELEASE(new_lock); - lh->mlh_reg_lh.cookie = 0; + LDLM_LOCK_RELEASE(new_lock); + lh->mlh_reg_lh.cookie = 0; - RETURN(ELDLM_LOCK_REPLACED); + RETURN(ELDLM_LOCK_REPLACED); } void mdt_intent_fixup_resent(struct mdt_thread_info *info, struct ldlm_lock *new_lock, struct mdt_lock_handle *lh, __u64 flags) { - struct ptlrpc_request *req = mdt_info_req(info); - struct ldlm_request *dlmreq; + struct ptlrpc_request *req = mdt_info_req(info); + struct ldlm_request *dlmreq; - if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) - return; + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) + return; - dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ); + dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ); - /* Check if this is a resend case (MSG_RESENT is set on RPC) and a - * lock was found by ldlm_handle_enqueue(); if so @lh must be - * initialized. */ + /* if this is a resend case (MSG_RESENT is set on RPC) and a lock was + * found by ldlm_handle_enqueue(); if so @lh must be initialized. + */ if (flags & LDLM_FL_RESENT) { lh->mlh_reg_lh.cookie = new_lock->l_handle.h_cookie; lh->mlh_reg_mode = new_lock->l_granted_mode; @@ -3596,15 +4726,15 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info, * If the xid matches, then we know this is a resent request, and allow * it. (It's probably an OPEN, for which we don't send a lock. */ - if (req_can_reconstruct(req, NULL)) + if (req_can_reconstruct(req, NULL) != 0) return; - /* - * This remote handle isn't enqueued, so we never received or processed - * this request. Clear MSG_RESENT, because it can be handled like any - * normal request now. - */ - lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); + /* + * This remote handle isn't enqueued, so we never received or processed + * this request. Clear MSG_RESENT, because it can be handled like any + * normal request now. + */ + lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle %#llx", dlmreq->lock_handle[0].cookie); @@ -3618,6 +4748,7 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct ldlm_reply *ldlm_rep = NULL; int rc; + ENTRY; /* @@ -3627,9 +4758,8 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) { - mdt_lock_reg_init(lhc, (*lockp)->l_req_mode); rc = mdt_object_lock(info, info->mti_object, lhc, - MDS_INODELOCK_XATTR); + MDS_INODELOCK_XATTR, (*lockp)->l_req_mode); if (rc) return rc; } @@ -3640,15 +4770,17 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); if (ldlm_rep == NULL || - OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) { + CFS_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) { mdt_object_unlock(info, info->mti_object, lhc, 1); - RETURN(err_serious(-EFAULT)); + if (is_serious(rc)) + RETURN(rc); + else + RETURN(err_serious(-EFAULT)); } ldlm_rep->lock_policy_res2 = clear_serious(rc); - /* This is left for interop instead of adding a new interop flag. - * LU-7433 */ + /* This is for interop instead of adding a new interop flag. LU-7433 */ #if LUSTRE_VERSION_CODE > OBD_OCD_VERSION(3, 0, 0, 0) if (ldlm_rep->lock_policy_res2) { mdt_object_unlock(info, info->mti_object, lhc, 1); @@ -3661,23 +4793,24 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc, } static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, - struct mdt_thread_info *info, - struct ldlm_lock **lockp, + struct mdt_thread_info *info, + struct ldlm_lock **lockp, __u64 flags) { - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; - __u64 child_bits; - struct ldlm_reply *ldlm_rep; - struct mdt_body *reqbody; - struct mdt_body *repbody; - int rc, rc2; - ENTRY; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; + __u64 child_bits; + struct ldlm_reply *ldlm_rep; + struct mdt_body *reqbody; + struct mdt_body *repbody; + int rc, rc2; - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(reqbody); + ENTRY; + + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(reqbody); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(repbody); + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(repbody); info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); repbody->mbo_eadatasize = 0; @@ -3697,12 +4830,12 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, GOTO(out_shrink, rc = -EINVAL); } - rc = mdt_init_ucred_intent_getattr(info, reqbody); + rc = mdt_init_ucred(info, reqbody); if (rc) GOTO(out_shrink, rc); - ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); - mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); + ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); /* Get lock from request for possible resent case. */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); @@ -3710,24 +4843,27 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep); ldlm_rep->lock_policy_res2 = clear_serious(rc); - if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG)) - ldlm_rep->lock_policy_res2 = 0; - if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || - ldlm_rep->lock_policy_res2) { - lhc->mlh_reg_lh.cookie = 0ull; - GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED); - } + if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG) && + ldlm_rep->lock_policy_res2 != -ENOKEY) + ldlm_rep->lock_policy_res2 = 0; + if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || + ldlm_rep->lock_policy_res2) { + lhc->mlh_reg_lh.cookie = 0ull; + /* Return error code immediately to stop batched statahead. */ + GOTO(out_ucred, rc = info->mti_batch_env ? rc : + ELDLM_LOCK_ABORTED); + } rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc); - EXIT; + EXIT; out_ucred: - mdt_exit_ucred(info); + mdt_exit_ucred(info); out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; - return rc; + mdt_client_compatibility(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; + return rc; } static int mdt_intent_layout(enum ldlm_intent_flags it_opc, @@ -3735,13 +4871,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, struct ldlm_lock **lockp, __u64 flags) { - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT]; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP }; struct layout_intent *intent; + struct ldlm_reply *ldlm_rep; struct lu_fid *fid = &info->mti_tmp_fid2; struct mdt_object *obj = NULL; int layout_size = 0; + struct lu_buf *buf = &layout.mlc_buf; int rc = 0; + ENTRY; fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); @@ -3750,14 +4889,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, if (intent == NULL) RETURN(-EPROTO); - CDEBUG(D_INFO, DFID "got layout change request from client: " - "opc:%u flags:%#x extent "DEXT"\n", - PFID(fid), intent->li_opc, intent->li_flags, - PEXT(&intent->li_extent)); + CDEBUG(D_INFO, DFID "got layout change request from client: opc:%u flags:%#x extent " + DEXT"\n", + PFID(fid), intent->lai_opc, intent->lai_flags, + PEXT(&intent->lai_extent)); - switch (intent->li_opc) { + switch (intent->lai_opc) { case LAYOUT_INTENT_TRUNC: case LAYOUT_INTENT_WRITE: + case LAYOUT_INTENT_PCCRO_SET: + case LAYOUT_INTENT_PCCRO_CLEAR: layout.mlc_opc = MD_LAYOUT_WRITE; layout.mlc_intent = intent; break; @@ -3768,25 +4909,17 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, case LAYOUT_INTENT_RELEASE: case LAYOUT_INTENT_RESTORE: CERROR("%s: Unsupported layout intent opc %d\n", - mdt_obd_name(info->mti_mdt), intent->li_opc); - rc = -ENOTSUPP; - break; + mdt_obd_name(info->mti_mdt), intent->lai_opc); + RETURN(-ENOTSUPP); default: CERROR("%s: Unknown layout intent opc %d\n", - mdt_obd_name(info->mti_mdt), intent->li_opc); - rc = -EINVAL; - break; + mdt_obd_name(info->mti_mdt), intent->lai_opc); + RETURN(-EINVAL); } - if (rc < 0) - RETURN(rc); - - /* Get lock from request for possible resent case. */ - mdt_intent_fixup_resent(info, *lockp, lhc, flags); obj = mdt_object_find(info->mti_env, info->mti_mdt, fid); if (IS_ERR(obj)) - GOTO(out, rc = PTR_ERR(obj)); - + RETURN(PTR_ERR(obj)); if (mdt_object_exists(obj) && !mdt_object_remote(obj)) { /* if layout is going to be changed don't use the current EA @@ -3798,7 +4931,7 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, } else { layout_size = mdt_attr_get_eabuf_size(info, obj); if (layout_size < 0) - GOTO(out_obj, rc = layout_size); + GOTO(out, rc = layout_size); if (layout_size > info->mti_mdt->mdt_max_mdsize) info->mti_mdt->mdt_max_mdsize = layout_size; @@ -3811,72 +4944,68 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc, * set reply buffer size, so that ldlm_handle_enqueue0()-> * ldlm_lvbo_fill() will fill the reply buffer with lovea. */ - (*lockp)->l_lvb_type = LVB_T_LAYOUT; req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER, layout_size); rc = req_capsule_server_pack(info->mti_pill); if (rc) - GOTO(out_obj, rc); + GOTO(out, rc); + ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + if (!ldlm_rep) + GOTO(out, rc = -EPROTO); - if (layout.mlc_opc != MD_LAYOUT_NOP) { - struct lu_buf *buf = &layout.mlc_buf; + mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); - /** - * mdt_layout_change is a reint operation, when the request - * is resent, layout write shouldn't reprocess it again. - */ - rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc); - if (rc) - GOTO(out_obj, rc = rc < 0 ? rc : 0); + /* take lock in ldlm_lock_enqueue() for LAYOUT_INTENT_ACCESS */ + if (layout.mlc_opc == MD_LAYOUT_NOP) + GOTO(out, rc = 0); - /** - * There is another resent case: the client's job has been - * done by another client, referring lod_declare_layout_change - * -EALREADY case, and it became a operation w/o transaction, - * so we should not do the layout change, otherwise - * mdt_layout_change() will try to cancel the granted server - * CR lock whose remote counterpart is still in hold on the - * client, and a deadlock ensues. - */ - rc = mdt_check_resent_lock(info, obj, lhc); - if (rc <= 0) - GOTO(out_obj, rc); - - buf->lb_buf = NULL; - buf->lb_len = 0; - if (unlikely(req_is_replay(mdt_info_req(info)))) { - buf->lb_buf = req_capsule_client_get(info->mti_pill, - &RMF_EADATA); - buf->lb_len = req_capsule_get_size(info->mti_pill, - &RMF_EADATA, RCL_CLIENT); - /* - * If it's a replay of layout write intent RPC, the - * client has saved the extended lovea when - * it get reply then. - */ - if (buf->lb_len > 0) - mdt_fix_lov_magic(info, buf->lb_buf); - } + rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc); + if (rc < 0) + GOTO(out, rc); + if (rc == 1) { + DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); + rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); + GOTO(out, rc); + } + + buf->lb_buf = NULL; + buf->lb_len = 0; + if (unlikely(req_is_replay(mdt_info_req(info)))) { + buf->lb_buf = req_capsule_client_get(info->mti_pill, + &RMF_EADATA); + buf->lb_len = req_capsule_get_size(info->mti_pill, + &RMF_EADATA, RCL_CLIENT); /* - * Instantiate some layout components, if @buf contains - * lovea, then it's a replay of the layout intent write - * RPC. + * If it's a replay of layout write intent RPC, the client has + * saved the extended lovea when it get reply then. */ - rc = mdt_layout_change(info, obj, &layout); - if (rc) - GOTO(out_obj, rc); + if (buf->lb_len > 0) + mdt_fix_lov_magic(info, buf->lb_buf); } -out_obj: - mdt_object_put(info->mti_env, obj); - if (rc == 0 && lustre_handle_is_used(&lhc->mlh_reg_lh)) + /* Get lock from request for possible resent case. */ + mdt_intent_fixup_resent(info, *lockp, lhc, flags); + (*lockp)->l_lvb_type = LVB_T_LAYOUT; + + /* + * Instantiate some layout components, if @buf contains lovea, then it's + * a replay of the layout intent write RPC. + */ + rc = mdt_layout_change(info, obj, lhc, &layout); + ldlm_rep->lock_policy_res2 = clear_serious(rc); + + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc); + if (rc == ELDLM_LOCK_REPLACED && + (*lockp)->l_granted_mode == LCK_EX) + ldlm_lock_mode_downgrade(*lockp, LCK_CR); + } + EXIT; out: - lhc->mlh_reg_lh.cookie = 0; - - RETURN(rc); + mdt_object_put(info->mti_env, obj); + return rc; } static int mdt_intent_open(enum ldlm_intent_flags it_opc, @@ -3884,39 +5013,48 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, struct ldlm_lock **lockp, __u64 flags) { - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; - struct ldlm_reply *rep = NULL; - long opc; - int rc; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT]; + struct ldlm_reply *rep = NULL; + long opc; + int rc; + struct ptlrpc_request *req = mdt_info_req(info); - static const struct req_format *intent_fmts[REINT_MAX] = { - [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE, - [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN - }; + static const struct req_format *intent_fmts[REINT_MAX] = { + [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE, + [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN + }; - ENTRY; + ENTRY; opc = mdt_reint_opcode(mdt_info_req(info), intent_fmts); - if (opc < 0) - RETURN(opc); + if (opc < 0) + RETURN(opc); /* Get lock from request for possible resent case. */ mdt_intent_fixup_resent(info, *lockp, lhc, flags); - rc = mdt_reint_internal(info, lhc, opc); + rc = mdt_reint_internal(info, lhc, opc); + + if (rc < 0 && lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + DEBUG_REQ(D_ERROR, req, "Replay open failed with %d", rc); - /* Check whether the reply has been packed successfully. */ - if (mdt_info_req(info)->rq_repmsg != NULL) - rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); - if (rep == NULL) - RETURN(err_serious(-EFAULT)); + /* Check whether the reply has been packed successfully. */ + if (mdt_info_req(info)->rq_repmsg != NULL) + rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); + if (rep == NULL) { + if (is_serious(rc)) + RETURN(rc); + else + RETURN(err_serious(-EFAULT)); + } - /* MDC expects this in any case */ - if (rc != 0) - mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD); + /* MDC expects this in any case */ + if (rc != 0) + mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD); /* the open lock or the lock for cross-ref object should be - * returned to the client */ + * returned to the client + */ if (lustre_handle_is_used(&lhc->mlh_reg_lh) && (rc == 0 || rc == -MDT_EREMOTE_OPEN)) { rep->lock_policy_res2 = 0; @@ -3926,22 +5064,22 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, rep->lock_policy_res2 = clear_serious(rc); - if (rep->lock_policy_res2 == -ENOENT && + if (rep->lock_policy_res2 == -ENOENT && mdt_get_disposition(rep, DISP_LOOKUP_NEG) && !mdt_get_disposition(rep, DISP_OPEN_CREATE)) rep->lock_policy_res2 = 0; lhc->mlh_reg_lh.cookie = 0ull; - if (rc == -ENOTCONN || rc == -ENODEV || - rc == -EOVERFLOW) { /**< if VBR failure then return error */ - /* - * If it is the disconnect error (ENODEV & ENOCONN), the error - * will be returned by rq_status, and client at ptlrpc layer - * will detect this, then disconnect, reconnect the import - * immediately, instead of impacting the following the rpc. - */ - RETURN(rc); - } + if (rc == -ENOTCONN || rc == -ENODEV || + rc == -EOVERFLOW) { /**< if VBR failure then return error */ + /* + * If it is the disconnect error (ENODEV & ENOCONN), the error + * will be returned by rq_status, and client at ptlrpc layer + * will detect this, then disconnect, reconnect the import + * immediately, instead of impacting the following the rpc. + */ + RETURN(rc); + } /* * For other cases, the error will be returned by intent, and client * will retrieve the result from intent. @@ -3963,14 +5101,17 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, u64); enum tgt_handler_flags it_handler_flags = 0; struct ldlm_reply *rep; + bool check_mdt_object = false; int rc; + ENTRY; switch (it_opc) { case IT_OPEN: + case IT_CREAT: case IT_OPEN|IT_CREAT: /* - * OCREAT is not a MUTABOR request since the file may + * OCREAT is not a IS_MUTABLE request since the file may * already exist. We do the extra check of * OBD_CONNECT_RDONLY in mdt_reint_open() when we * really need to create the object. @@ -3979,15 +5120,18 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, it_handler = &mdt_intent_open; break; case IT_GETATTR: + check_mdt_object = true; + fallthrough; case IT_LOOKUP: it_format = &RQF_LDLM_INTENT_GETATTR; it_handler = &mdt_intent_getattr; - it_handler_flags = HABEO_REFERO; + it_handler_flags = HAS_REPLY; break; case IT_GETXATTR: + check_mdt_object = true; it_format = &RQF_LDLM_INTENT_GETXATTR; it_handler = &mdt_intent_getxattr; - it_handler_flags = HABEO_CORPUS; + it_handler_flags = HAS_BODY; break; case IT_LAYOUT: it_format = &RQF_LDLM_INTENT_LAYOUT; @@ -4024,22 +5168,26 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, RETURN(-EPROTO); } - req_capsule_extend(pill, it_format); + if (!info->mti_batch_env) + req_capsule_extend(pill, it_format); rc = mdt_unpack_req_pack_rep(info, it_handler_flags); if (rc < 0) RETURN(rc); - if (it_handler_flags & MUTABOR && mdt_rdonly(req->rq_export)) + if (unlikely(info->mti_object == NULL && check_mdt_object)) + RETURN(-EPROTO); + + if (it_handler_flags & IS_MUTABLE && mdt_rdonly(req->rq_export)) RETURN(-EROFS); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); + CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10); /* execute policy */ rc = (*it_handler)(it_opc, info, lockp, flags); /* Check whether the reply has been packed successfully. */ - if (req->rq_repmsg != NULL) { + if (info->mti_batch_env || req->rq_repmsg != NULL) { rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); rep->lock_policy_res2 = ptlrpc_status_hton(rep->lock_policy_res2); @@ -4081,17 +5229,40 @@ static int mdt_intent_policy(const struct lu_env *env, tsi = tgt_ses_info(env); - info = tsi2mdt_info(tsi); + info = mdt_th_info(env); LASSERT(info != NULL); - pill = info->mti_pill; + + /* Check whether it is a sub request processing in a batch request */ + if (info->mti_batch_env) { + pill = info->mti_pill; + LASSERT(pill == &info->mti_sub_pill); + } else { + info = tsi2mdt_info(tsi); + pill = info->mti_pill; + } + LASSERT(pill->rc_req == req); ldesc = &info->mti_dlm_req->lock_desc; - if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) { - req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC); + if (info->mti_batch_env || + req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) { + /* + * For batch processing environment, the request format has + * already been set. + */ + if (!info->mti_batch_env) + req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC); + it = req_capsule_client_get(pill, &RMF_LDLM_INTENT); if (it != NULL) { mdt_ptlrpc_stats_update(req, it->opc); + info->mti_intent_lock = 1; + /* + * For intent lock request with policy, the ELC locks + * have been cancelled in ldlm_handle_enqueue0(). + * Thus set @mti_dlm_req with null here. + */ + info->mti_dlm_req = NULL; rc = mdt_intent_opc(it->opc, info, lockp, flags); if (rc == 0) rc = ELDLM_OK; @@ -4100,12 +5271,30 @@ static int mdt_intent_policy(const struct lu_env *env, * later in ldlm. Let's check it now to see if we have * ibits corrupted somewhere in mdt_intent_opc(). * The case for client miss to set ibits has been - * processed by others. */ + * processed by others. + */ LASSERT(ergo(ldesc->l_resource.lr_type == LDLM_IBITS, ldesc->l_policy_data.l_inodebits.bits != 0)); } else { rc = err_serious(-EFAULT); } + } else if (ldesc->l_resource.lr_type == LDLM_IBITS && + ldesc->l_policy_data.l_inodebits.bits == MDS_INODELOCK_DOM) { + struct ldlm_reply *rep; + + /* No intent was provided but INTENT flag is set along with + * DOM bit, this is considered as GLIMPSE request. + * This logic is common for MDT and OST glimpse + */ + mdt_ptlrpc_stats_update(req, IT_GLIMPSE); + rc = mdt_glimpse_enqueue(info, ns, lockp, flags); + /* Check whether the reply has been packed successfully. */ + if (req->rq_repmsg != NULL) { + rep = req_capsule_server_get(info->mti_pill, + &RMF_DLM_REP); + rep->lock_policy_res2 = + ptlrpc_status_hton(rep->lock_policy_res2); + } } else { /* No intent was provided */ req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); @@ -4113,7 +5302,9 @@ static int mdt_intent_policy(const struct lu_env *env, if (rc) rc = err_serious(rc); } - mdt_thread_info_fini(info); + + if (!info->mti_batch_env) + mdt_thread_info_fini(info); RETURN(rc); } @@ -4165,6 +5356,7 @@ static int mdt_register_lwp_callback(void *data) struct mdt_device *mdt = data; struct lu_server_fld *fld = mdt_seq_site(mdt)->ss_server_fld; int rc; + ENTRY; LASSERT(mdt_seq_site(mdt)->ss_node_id != 0); @@ -4176,7 +5368,8 @@ static int mdt_register_lwp_callback(void *data) } /* Allocate new sequence now to avoid creating local transaction - * in the normal transaction process */ + * in the normal transaction process + */ rc = seq_server_check_and_alloc_super(&env, mdt_seq_site(mdt)->ss_server_seq); if (rc < 0) @@ -4238,13 +5431,14 @@ out_free: */ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt) { - struct seq_server_site *ss = mdt_seq_site(mdt); - int rc; - char *prefix; + struct seq_server_site *ss = mdt_seq_site(mdt); + char *prefix; + ENTRY; /* check if this is adding the first MDC and controller is not yet - * initialized. */ + * initialized. + */ OBD_ALLOC_PTR(ss->ss_client_seq); if (ss->ss_client_seq == NULL) RETURN(-ENOMEM); @@ -4258,25 +5452,19 @@ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt) /* Note: seq_client_fini will be called in seq_site_fini */ snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt)); - rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA, - prefix, ss->ss_node_id == 0 ? ss->ss_control_seq : + seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA, + prefix, ss->ss_node_id == 0 ? ss->ss_control_seq : NULL); OBD_FREE(prefix, MAX_OBD_NAME + 5); - if (rc != 0) { - OBD_FREE_PTR(ss->ss_client_seq); - ss->ss_client_seq = NULL; - RETURN(rc); - } - - rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq); - RETURN(rc); + RETURN(seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq)); } static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt) { struct seq_server_site *ss; int rc; + ENTRY; ss = mdt_seq_site(mdt); @@ -4324,9 +5512,10 @@ out_seq_fini: * FLD wrappers */ static int mdt_fld_fini(const struct lu_env *env, - struct mdt_device *m) + struct mdt_device *m) { struct seq_server_site *ss = mdt_seq_site(m); + ENTRY; if (ss && ss->ss_server_fld) { @@ -4339,11 +5528,12 @@ static int mdt_fld_fini(const struct lu_env *env, } static int mdt_fld_init(const struct lu_env *env, - const char *uuid, - struct mdt_device *m) + const char *uuid, + struct mdt_device *m) { struct seq_server_site *ss; int rc; + ENTRY; ss = mdt_seq_site(m); @@ -4369,6 +5559,7 @@ static void mdt_stack_pre_fini(const struct lu_env *env, struct lustre_cfg_bufs *bufs; struct lustre_cfg *lcfg; struct mdt_thread_info *info; + ENTRY; LASSERT(top); @@ -4385,7 +5576,8 @@ static void mdt_stack_pre_fini(const struct lu_env *env, /* XXX: this is needed because all layers are referenced by * objects (some of them are pinned by osd, for example * * the proper solution should be a model where object used - * by osd only doesn't have mdt/mdd slices -bzzz */ + * by osd only doesn't have mdt/mdd slices -bzzz + */ lustre_cfg_bufs_reset(bufs, mdt_obd_name(m)); lustre_cfg_bufs_set_string(bufs, 1, NULL); OBD_ALLOC(lcfg, lustre_cfg_len(bufs->lcfg_bufcount, bufs->lcfg_buflen)); @@ -4406,6 +5598,7 @@ static void mdt_stack_fini(const struct lu_env *env, struct lustre_cfg *lcfg; struct mdt_thread_info *info; char flags[3] = ""; + ENTRY; info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); @@ -4451,6 +5644,7 @@ static int mdt_connect_to_next(const struct lu_env *env, struct mdt_device *m, struct obd_connect_data *data = NULL; struct obd_device *obd; int rc; + ENTRY; OBD_ALLOC_PTR(data); @@ -4491,7 +5685,8 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt, struct obd_device *obd; struct lustre_profile *lprof; struct lu_site *site; - ENTRY; + + ENTRY; /* in 1.8 we had the only device in the stack - MDS. * 2.0 introduces MDT, MDD, OSD; MDT starts others internally. @@ -4517,7 +5712,8 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt, * #02 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0 * 3:lustre-MDD0000 4:f * - * notice we build the stack from down to top: MDD first, then MDT */ + * notice we build the stack from down to top: MDD first, then MDT + */ name_size = MAX_OBD_NAME; uuid_size = MAX_OBD_NAME; @@ -4639,6 +5835,7 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt, struct lustre_profile *lprof; struct obd_connect_data *data; int rc; + ENTRY; LASSERT(mdt->mdt_qmt_exp == NULL); @@ -4655,7 +5852,8 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt, * We generate the QMT name from the MDT one, just replacing MD with QM * after all the preparations, the logical equivalent will be: * #01 (160)setup 0:lustre-QMT0000 1:lustre-QMT0000_UUID 2:0 - * 3:lustre-MDT0000-osd 4:f */ + * 3:lustre-MDT0000-osd 4:f + */ OBD_ALLOC(qmtname, MAX_OBD_NAME); OBD_ALLOC(uuid, UUID_MAX); OBD_ALLOC_PTR(bufs); @@ -4723,9 +5921,12 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt, mdt->mdt_qmt_dev = obd->obd_lu_dev; /* configure local quota objects */ - rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env, - &mdt->mdt_lu_dev, - mdt->mdt_qmt_dev); + if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_INIT)) + rc = -EBADF; + else + rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env, + &mdt->mdt_lu_dev, + mdt->mdt_qmt_dev); if (rc) GOTO(class_cleanup, rc); @@ -4745,6 +5946,7 @@ class_cleanup: if (rc) { class_manual_cleanup(obd); mdt->mdt_qmt_dev = NULL; + GOTO(lcfg_cleanup, rc); } class_detach: if (rc) @@ -4783,18 +5985,32 @@ static void mdt_quota_fini(const struct lu_env *env, struct mdt_device *mdt) /* mdt_getxattr() is used from mdt_intent_getxattr(), use this wrapper * for now. This will be removed along with converting rest of MDT code - * to use tgt_session_info */ + * to use tgt_session_info + */ static int mdt_tgt_getxattr(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); int rc; + if (unlikely(info->mti_object == NULL)) + return -EPROTO; + rc = mdt_getxattr(info); mdt_thread_info_fini(info); return rc; } +static int mdt_llog_open(struct tgt_session_info *tsi) +{ + ENTRY; + + if (!mdt_changelog_allow(tsi2mdt_info(tsi))) + RETURN(err_serious(-EACCES)); + + RETURN(tgt_llog_open(tsi)); +} + #define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET #define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET #define OST_BRW_READ OST_READ @@ -4808,57 +6024,72 @@ TGT_RPC_HANDLER(MDS_FIRST_OPC, 0, MDS_DISCONNECT, tgt_disconnect, &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION), TGT_RPC_HANDLER(MDS_FIRST_OPC, - HABEO_REFERO, MDS_SET_INFO, mdt_set_info, - &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION), + HAS_REPLY, MDS_SET_INFO, mdt_set_info, + &RQF_MDT_SET_INFO, LUSTRE_MDS_VERSION), TGT_MDT_HDL(0, MDS_GET_INFO, mdt_get_info), -TGT_MDT_HDL(0 | HABEO_REFERO, MDS_GET_ROOT, mdt_get_root), -TGT_MDT_HDL(HABEO_CORPUS, MDS_GETATTR, mdt_getattr), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, +TGT_MDT_HDL(HAS_REPLY, MDS_GET_ROOT, mdt_get_root), +TGT_MDT_HDL(HAS_BODY, MDS_GETATTR, mdt_getattr), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_GETATTR_NAME, mdt_getattr_name), -TGT_MDT_HDL(HABEO_CORPUS, MDS_GETXATTR, mdt_tgt_getxattr), -TGT_MDT_HDL(0 | HABEO_REFERO, MDS_STATFS, mdt_statfs), -TGT_MDT_HDL(0 | MUTABOR, MDS_REINT, mdt_reint), -TGT_MDT_HDL(HABEO_CORPUS, MDS_CLOSE, mdt_close), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_READPAGE, mdt_readpage), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_SYNC, mdt_sync), +TGT_MDT_HDL(HAS_BODY, MDS_GETXATTR, mdt_tgt_getxattr), +TGT_MDT_HDL(HAS_REPLY, MDS_STATFS, mdt_statfs), +TGT_MDT_HDL(IS_MUTABLE, MDS_REINT, mdt_reint), +TGT_MDT_HDL(HAS_BODY, MDS_CLOSE, mdt_close), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_READPAGE, mdt_readpage), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_SYNC, mdt_sync), TGT_MDT_HDL(0, MDS_QUOTACTL, mdt_quotactl), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_PROGRESS, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_PROGRESS, mdt_hsm_progress), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_REGISTER, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_CT_REGISTER, mdt_hsm_ct_register), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_UNREGISTER, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_CT_UNREGISTER, mdt_hsm_ct_unregister), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_STATE_GET, mdt_hsm_state_get), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_STATE_SET, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_HSM_STATE_SET, mdt_hsm_state_set), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action), -TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST, +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_ACTION, mdt_hsm_action), +TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_REQUEST, mdt_hsm_request), -TGT_MDT_HDL(HABEO_CLAVIS | HABEO_CORPUS | HABEO_REFERO | MUTABOR, +TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE, MDS_SWAP_LAYOUTS, mdt_swap_layouts), +TGT_MDT_HDL(IS_MUTABLE, MDS_RMFID, mdt_rmfid), +TGT_MDT_HDL(IS_MUTABLE, MDS_BATCH, mdt_batch), }; static struct tgt_handler mdt_io_ops[] = { -TGT_OST_HDL_HP(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read, +TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY, OST_BRW_READ, tgt_brw_read, mdt_hp_brw), -TGT_OST_HDL_HP(HABEO_CORPUS | MUTABOR, OST_BRW_WRITE, tgt_brw_write, +TGT_OST_HDL_HP(HAS_BODY | IS_MUTABLE, OST_BRW_WRITE, tgt_brw_write, mdt_hp_brw), -TGT_OST_HDL_HP(HABEO_CORPUS | HABEO_REFERO | MUTABOR, +TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_PUNCH, mdt_punch_hdl, - mdt_hp_punch), -TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_SYNC, mdt_data_sync), + mdt_hp_punch), +TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC, mdt_data_sync), +TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE, + mdt_fallocate_hdl), +TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek), +TGT_RPC_HANDLER(OST_FIRST_OPC, + 0, OST_SET_INFO, mdt_io_set_info, + &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION), }; static struct tgt_handler mdt_sec_ctx_ops[] = { TGT_SEC_HDL_VAR(0, SEC_CTX_INIT, mdt_sec_ctx_handle), -TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT,mdt_sec_ctx_handle), +TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT, mdt_sec_ctx_handle), TGT_SEC_HDL_VAR(0, SEC_CTX_FINI, mdt_sec_ctx_handle) }; static struct tgt_handler mdt_quota_ops[] = { -TGT_QUOTA_HDL(HABEO_REFERO, QUOTA_DQACQ, mdt_quota_dqacq), +TGT_QUOTA_HDL(HAS_REPLY, QUOTA_DQACQ, mdt_quota_dqacq), +}; + +static struct tgt_handler mdt_llog_handlers[] = { + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE, mdt_llog_open), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, tgt_llog_next_block), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK, tgt_llog_prev_block), }; static struct tgt_opc_slice mdt_common_slice[] = { @@ -4905,7 +6136,7 @@ static struct tgt_opc_slice mdt_common_slice[] = { { .tos_opc_start = LLOG_FIRST_OPC, .tos_opc_end = LLOG_LAST_OPC, - .tos_hs = tgt_llog_handlers + .tos_hs = mdt_llog_handlers }, { .tos_opc_start = LFSCK_FIRST_OPC, @@ -4935,11 +6166,13 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop); mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child)); + + mdt_restriper_stop(m); ping_evictor_stop(); /* Remove the HSM /proc entry so the coordinator cannot be - * restarted by a user while it's shutting down. */ - hsm_cdt_procfs_fini(m); + * restarted by a user while it's shutting down. + */ mdt_hsm_cdt_stop(m); mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT); @@ -4959,14 +6192,12 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) /* Calling the cleanup functions in the same order as in the mdt_init0 * error path */ - mdt_procfs_fini(m); + mdt_tunables_fini(m); target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; - mdt_fs_cleanup(env, m); - tgt_fini(env, &m->mdt_lut); mdt_hsm_cdt_fini(m); @@ -5023,6 +6254,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, int rc; long node_id; mntopt_t mntopts; + ENTRY; lu_device_init(&m->mdt_lu_dev, ldt); @@ -5045,7 +6277,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, LASSERT(obd != NULL); m->mdt_max_mdsize = MAX_MD_SIZE_OLD; - m->mdt_opts.mo_evict_tgt_nids = 1; + m->mdt_evict_tgt_nids = 1; m->mdt_opts.mo_cos = MDT_COS_DEFAULT; lmi = server_get_mount(dev); @@ -5054,36 +6286,58 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, RETURN(-EFAULT); } else { lsi = s2lsi(lmi->lmi_sb); + LASSERT(lsi->lsi_lmd); /* CMD is supported only in IAM mode */ LASSERT(num); - node_id = simple_strtol(num, NULL, 10); - obd->u.obt.obt_magic = OBT_MAGIC; - if (lsi->lsi_lmd != NULL && - lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK) + rc = kstrtol(num, 10, &node_id); + if (rc) + RETURN(rc); + + obd_obt_init(obd); + if (test_bit(LMD_FLG_SKIP_LFSCK, lsi->lsi_lmd->lmd_flags)) m->mdt_skip_lfsck = 1; + if (test_bit(LMD_FLG_NO_CREATE, lsi->lsi_lmd->lmd_flags)) + m->mdt_lut.lut_no_create = 1; } - /* DoM files get IO lock at open by default */ - m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN; + /* Just try to get a DoM lock by default. Otherwise, having a group + * lock granted, it may get blocked for a long time. + */ + m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN; /* DoM files are read at open and data is packed in the reply */ - m->mdt_opts.mo_dom_read_open = 1; + m->mdt_dom_read_open = 1; m->mdt_squash.rsi_uid = 0; m->mdt_squash.rsi_gid = 0; INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); - init_rwsem(&m->mdt_squash.rsi_sem); + spin_lock_init(&m->mdt_squash.rsi_lock); spin_lock_init(&m->mdt_lock); - m->mdt_enable_remote_dir = 0; + m->mdt_enable_chprojid_gid = 0; + m->mdt_enable_dir_migration = 1; + m->mdt_enable_dir_restripe = 0; + m->mdt_enable_dir_auto_split = 0; + m->mdt_enable_parallel_rename_dir = 1; + m->mdt_enable_parallel_rename_file = 1; + m->mdt_enable_parallel_rename_crossdir = 1; + m->mdt_enable_remote_dir = 1; m->mdt_enable_remote_dir_gid = 0; + m->mdt_enable_remote_rename = 1; + m->mdt_enable_striped_dir = 1; + m->mdt_enable_dmv_implicit_inherit = 1; + m->mdt_dir_restripe_nsonly = 1; + m->mdt_max_mod_rpcs_in_flight = OBD_MAX_RIF_DEFAULT; atomic_set(&m->mdt_mds_mds_conns, 0); atomic_set(&m->mdt_async_commit_count, 0); + atomic_set(&m->mdt_dmv_old_client_count, 0); m->mdt_lu_dev.ld_ops = &mdt_lu_ops; m->mdt_lu_dev.ld_obd = obd; /* Set this lu_device to obd for error handling purposes. */ obd->obd_lu_dev = &m->mdt_lu_dev; + strncpy(m->mdt_job_xattr, XATTR_NAME_JOB_DEFAULT, XATTR_JOB_MAX_LEN); + /* init the stack */ rc = mdt_stack_init((struct lu_env *)env, m, cfg); if (rc) { @@ -5103,13 +6357,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, /* failover is the default * FIXME: we do not failout mds0/mgs, which may cause some problems. * assumed whose ss_node_id == 0 XXX - * */ + */ obd->obd_replayable = 1; /* No connection accepted until configurations will finish */ obd->obd_no_conn = 1; if (cfg->lcfg_bufcount > 4 && LUSTRE_CFG_BUFLEN(cfg, 4) > 0) { char *str = lustre_cfg_string(cfg, 4); + if (strchr(str, 'n')) { CWARN("%s: recovery disabled\n", mdt_obd_name(m)); obd->obd_replayable = 0; @@ -5130,8 +6385,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_GREEDY, LDLM_NS_TYPE_MDT); - if (m->mdt_namespace == NULL) - GOTO(err_fini_seq, rc = -ENOMEM); + if (IS_ERR(m->mdt_namespace)) { + rc = PTR_ERR(m->mdt_namespace); + CERROR("%s: unable to create server namespace: rc = %d\n", + obd->obd_name, rc); + m->mdt_namespace = NULL; + GOTO(err_fini_seq, rc); + } m->mdt_namespace->ns_lvbp = m; m->mdt_namespace->ns_lvbo = &mdt_lvbo; @@ -5147,24 +6407,24 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_free_ns, rc); /* Amount of available space excluded from granting and reserved - * for metadata. It is in percentage and 50% is default value. */ - tgd->tgd_reserved_pcnt = 50; + * for metadata. It is a percentage of the total MDT size. + */ + tgd->tgd_reserved_pcnt = 10; if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits)) m->mdt_brw_size = 1U << tgd->tgd_blockbits; else m->mdt_brw_size = ONE_MB_BRW_SIZE; - rc = mdt_fs_setup(env, m, obd, lsi); - if (rc) - GOTO(err_tgt, rc); + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP)) + GOTO(err_tgt, rc = -ENOENT); fid.f_seq = FID_SEQ_LOCAL_NAME; fid.f_oid = 1; fid.f_ver = 0; rc = local_oid_storage_init(env, m->mdt_bottom, &fid, &m->mdt_los); if (rc != 0) - GOTO(err_fs_cleanup, rc); + GOTO(err_tgt, rc); rc = mdt_hsm_cdt_init(m); if (rc != 0) { @@ -5192,13 +6452,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, else m->mdt_opts.mo_acl = 0; + m->mdt_enable_strict_som = 1; + /* XXX: to support suppgid for ACL, we enable identity_upcall - * by default, otherwise, maybe got unexpected -EACCESS. */ + * by default, otherwise, maybe got unexpected -EACCESS. + */ if (m->mdt_opts.mo_acl) identity_upcall = MDT_IDENTITY_UPCALL_PATH; m->mdt_identity_cache = upcall_cache_init(mdt_obd_name(m), identity_upcall, + UC_IDCACHE_HASH_SIZE, + 1200, /* entry expire: 20 mn */ + 30, /* acquire expire: 30 s */ + true, /* acquire can replay */ &mdt_identity_upcall_cache_ops); if (IS_ERR(m->mdt_identity_cache)) { rc = PTR_ERR(m->mdt_identity_cache); @@ -5206,7 +6473,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_free_hsm, rc); } - rc = mdt_procfs_init(m, dev); + rc = mdt_tunables_init(m, dev); if (rc) { CERROR("Can't init MDT lprocfs, rc %d\n", rc); GOTO(err_recovery, rc); @@ -5222,21 +6489,31 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, ping_evictor_start(); - /* recovery will be started upon mdt_prepare() - * when the whole stack is complete and ready - * to serve the requests */ + /* recovery will be started upon mdt_prepare() when the whole stack is + * complete and ready to serve the requests + */ /* Reduce the initial timeout on an MDS because it doesn't need such * a long timeout as an OST does. Adaptive timeouts will adjust this - * value appropriately. */ + * value appropriately. + */ if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT) ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT; + if (test_bit(LMD_FLG_LOCAL_RECOV, lsi->lsi_lmd->lmd_flags)) + m->mdt_lut.lut_local_recovery = 1; + + rc = mdt_restriper_start(m); + if (rc) + GOTO(err_ping_evictor, rc); + RETURN(0); + +err_ping_evictor: + ping_evictor_stop(); err_procfs: - mdt_procfs_fini(m); + mdt_tunables_fini(m); err_recovery: - target_recovery_fini(obd); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; err_free_hsm: @@ -5244,9 +6521,12 @@ err_free_hsm: err_los_fini: local_oid_storage_fini(env, m->mdt_los); m->mdt_los = NULL; -err_fs_cleanup: - mdt_fs_cleanup(env, m); err_tgt: + /* keep recoverable clients */ + obd->obd_fail = 1; + target_recovery_fini(obd); + obd_exports_barrier(obd); + obd_zombie_barrier(); tgt_fini(env, &m->mdt_lut); err_free_ns: ldlm_namespace_free(m->mdt_namespace, NULL, 0); @@ -5260,12 +6540,13 @@ err_fini_stack: err_lmi: if (lmi) server_put_mount(dev, true); - return(rc); + return rc; } /* For interoperability, the left element is old parameter, the right one * is the new version of the parameter, if some parameter is deprecated, - * the new version should be set as NULL. */ + * the new version should be set as NULL. + */ static struct cfg_interop_param mdt_interop_param[] = { { "mdt.group_upcall", NULL }, { "mdt.quota_type", NULL }, @@ -5278,22 +6559,23 @@ static struct cfg_interop_param mdt_interop_param[] = { /* used by MGS to process specific configurations */ static int mdt_process_config(const struct lu_env *env, - struct lu_device *d, struct lustre_cfg *cfg) + struct lu_device *d, struct lustre_cfg *cfg) { - struct mdt_device *m = mdt_dev(d); - struct md_device *md_next = m->mdt_child; - struct lu_device *next = md2lu_dev(md_next); - int rc; - ENTRY; + struct mdt_device *m = mdt_dev(d); + struct md_device *md_next = m->mdt_child; + struct lu_device *next = md2lu_dev(md_next); + int rc; + + ENTRY; switch (cfg->lcfg_command) { case LCFG_PARAM: { - struct obd_device *obd = d->ld_obd; - + struct obd_device *obd = d->ld_obd; /* For interoperability */ - struct cfg_interop_param *ptr = NULL; - struct lustre_cfg *old_cfg = NULL; - char *param = NULL; + struct cfg_interop_param *ptr = NULL; + struct lustre_cfg *old_cfg = NULL; + char *param = NULL; + ssize_t count; param = lustre_cfg_string(cfg, 1); if (param == NULL) { @@ -5306,8 +6588,8 @@ static int mdt_process_config(const struct lu_env *env, if (ptr != NULL) { if (ptr->new_param == NULL) { rc = 0; - CWARN("For interoperability, skip this %s." - " It is obsolete.\n", ptr->old_param); + CWARN("For interoperability, skip this %s. It is obsolete.\n", + ptr->old_param); break; } @@ -5322,17 +6604,22 @@ static int mdt_process_config(const struct lu_env *env, } } - rc = class_process_proc_param(PARAM_MDT, obd->obd_vars, - cfg, obd); - if (rc > 0 || rc == -ENOSYS) { + count = class_modify_config(cfg, PARAM_MDT, + &obd->obd_kset.kobj); + if (count < 0) { + struct coordinator *cdt = &m->mdt_coordinator; + /* is it an HSM var ? */ - rc = class_process_proc_param(PARAM_HSM, - hsm_cdt_get_proc_vars(), - cfg, obd); - if (rc > 0 || rc == -ENOSYS) + count = class_modify_config(cfg, PARAM_HSM, + &cdt->cdt_hsm_kobj); + if (count < 0) /* we don't understand; pass it on */ rc = next->ld_ops->ldo_process_config(env, next, cfg); + else + rc = count > 0 ? 0 : count; + } else { + rc = count > 0 ? 0 : count; } if (old_cfg) @@ -5340,12 +6627,12 @@ static int mdt_process_config(const struct lu_env *env, cfg->lcfg_buflens)); break; } - default: - /* others are passed further */ - rc = next->ld_ops->ldo_process_config(env, next, cfg); - break; - } - RETURN(rc); + default: + /* others are passed further */ + rc = next->ld_ops->ldo_process_config(env, next, cfg); + break; + } + RETURN(rc); } static struct lu_object *mdt_object_alloc(const struct lu_env *env, @@ -5373,49 +6660,65 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, init_rwsem(&mo->mot_dom_sem); init_rwsem(&mo->mot_open_sem); atomic_set(&mo->mot_open_count, 0); + mo->mot_restripe_offset = 0; + INIT_LIST_HEAD(&mo->mot_restripe_linkage); + mo->mot_lsom_size = 0; + mo->mot_lsom_blocks = 0; + mo->mot_lsom_inited = false; RETURN(o); } RETURN(NULL); } static int mdt_object_init(const struct lu_env *env, struct lu_object *o, - const struct lu_object_conf *unused) + const struct lu_object_conf *unused) { - struct mdt_device *d = mdt_dev(o->lo_dev); - struct lu_device *under; - struct lu_object *below; - int rc = 0; - ENTRY; + struct mdt_device *d = mdt_dev(o->lo_dev); + struct lu_device *under; + struct lu_object *below; + int rc = 0; + + ENTRY; - CDEBUG(D_INFO, "object init, fid = "DFID"\n", - PFID(lu_object_fid(o))); + CDEBUG(D_INFO, "object init, fid = "DFID"\n", + PFID(lu_object_fid(o))); + + under = &d->mdt_child->md_lu_dev; + below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); + if (below != NULL) + lu_object_add(o, below); + else + rc = -ENOMEM; + + RETURN(rc); +} - under = &d->mdt_child->md_lu_dev; - below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); - if (below != NULL) { - lu_object_add(o, below); - } else - rc = -ENOMEM; +static void mdt_object_free_rcu(struct rcu_head *head) +{ + struct mdt_object *mo = container_of(head, struct mdt_object, + mot_header.loh_rcu); - RETURN(rc); + kmem_cache_free(mdt_object_kmem, mo); } static void mdt_object_free(const struct lu_env *env, struct lu_object *o) { - struct mdt_object *mo = mdt_obj(o); - struct lu_object_header *h; - ENTRY; + struct mdt_object *mo = mdt_obj(o); + struct lu_object_header *h; + + ENTRY; - h = o->lo_header; - CDEBUG(D_INFO, "object free, fid = "DFID"\n", - PFID(lu_object_fid(o))); + h = o->lo_header; + CDEBUG(D_INFO, "object free, fid = "DFID"\n", + PFID(lu_object_fid(o))); LASSERT(atomic_read(&mo->mot_open_count) == 0); LASSERT(atomic_read(&mo->mot_lease_count) == 0); lu_object_fini(o); lu_object_header_fini(h); - OBD_SLAB_FREE_PTR(mo, mdt_object_kmem); + OBD_FREE_PRE(mo, sizeof(*mo), "slab-freed"); + call_rcu(&mo->mot_header.loh_rcu, mdt_object_free_rcu); EXIT; } @@ -5459,7 +6762,8 @@ static int mdt_prepare(const struct lu_env *env, rc = lfsck_register_namespace(env, mdt->mdt_bottom, mdt->mdt_namespace); /* The LFSCK instance is registered just now, so it must be there when - * register the namespace to such instance. */ + * register the namespace to such instance. + */ LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc); if (mdt->mdt_seq_site.ss_node_id == 0) { @@ -5485,22 +6789,22 @@ static int mdt_prepare(const struct lu_env *env, } const struct lu_device_operations mdt_lu_ops = { - .ldo_object_alloc = mdt_object_alloc, - .ldo_process_config = mdt_process_config, + .ldo_object_alloc = mdt_object_alloc, + .ldo_process_config = mdt_process_config, .ldo_prepare = mdt_prepare, }; static const struct lu_object_operations mdt_obj_ops = { - .loo_object_init = mdt_object_init, - .loo_object_free = mdt_object_free, - .loo_object_print = mdt_object_print + .loo_object_init = mdt_object_init, + .loo_object_free = mdt_object_free, + .loo_object_print = mdt_object_print }; static int mdt_obd_set_info_async(const struct lu_env *env, - struct obd_export *exp, - __u32 keylen, void *key, - __u32 vallen, void *val, - struct ptlrpc_request_set *set) + struct obd_export *exp, + __u32 keylen, void *key, + __u32 vallen, void *val, + struct ptlrpc_request_set *set) { int rc; @@ -5514,6 +6818,18 @@ static int mdt_obd_set_info_async(const struct lu_env *env, RETURN(0); } +static inline void mdt_enable_slc(struct mdt_device *mdt) +{ + if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER) + mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING; +} + +static inline void mdt_disable_slc(struct mdt_device *mdt) +{ + if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING) + mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER; +} + /** * Match client and server connection feature flags. * @@ -5545,6 +6861,7 @@ static int mdt_connect_internal(const struct lu_env *env, struct obd_connect_data *data, bool reconnect) { const char *obd_name = mdt_obd_name(mdt); + LASSERT(data != NULL); data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; @@ -5569,11 +6886,7 @@ static int mdt_connect_internal(const struct lu_env *env, data->ocd_brw_size = min(data->ocd_brw_size, mdt->mdt_brw_size); if (data->ocd_brw_size == 0) { - CERROR("%s: cli %s/%p ocd_connect_flags: %#llx " - "ocd_version: %x ocd_grant: %d ocd_index: %u " - "ocd_brw_size unexpectedly zero, network data " - "corruption? Refusing to connect this client\n", - obd_name, exp->exp_client_uuid.uuid, + CERROR("%s: cli %s/%p ocd_connect_flags: %#llx ocd_version: %x ocd_grant: %d ocd_index: %u ocd_brw_size unexpectedly zero, network data corruption? Refusing to connect this client\n", obd_name, exp->exp_client_uuid.uuid, exp, data->ocd_connect_flags, data->ocd_version, data->ocd_grant, data->ocd_index); return -EPROTO; @@ -5587,7 +6900,8 @@ static int mdt_connect_internal(const struct lu_env *env, exp->exp_target_data.ted_pagebits = data->ocd_grant_blkbits; data->ocd_grant_blkbits = mdt->mdt_lut.lut_tgd.tgd_blockbits; /* ddp_inodespace may not be power-of-two value, eg. for ldiskfs - * it's LDISKFS_DIR_REC_LEN(20) = 28. */ + * it's LDISKFS_DIR_REC_LEN(20) = 28. + */ data->ocd_grant_inobits = fls(ddp->ddp_inodespace - 1); /* ocd_grant_tax_kb is in 1K byte blocks */ data->ocd_grant_tax_kb = ddp->ddp_extent_tax >> 10; @@ -5596,7 +6910,8 @@ static int mdt_connect_internal(const struct lu_env *env, /* Save connect_data we have so far because tgt_grant_connect() * uses it to calculate grant, and we want to save the client - * version before it is overwritten by LUSTRE_VERSION_CODE. */ + * version before it is overwritten by LUSTRE_VERSION_CODE. + */ exp->exp_connect_data = *data; if (OCD_HAS_FLAG(data, GRANT)) tgt_grant_connect(env, exp, data, !reconnect); @@ -5608,7 +6923,8 @@ static int mdt_connect_internal(const struct lu_env *env, * exp_connect_data.ocd_connect_flags in this case, since * tgt_client_new() needs to know if this is a lightweight * connection, and it is safe to expose this flag before - * connection processing completes. */ + * connection processing completes. + */ if (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) { spin_lock(&exp->exp_lock); *exp_connect_flags_ptr(exp) |= OBD_CONNECT_LIGHTWEIGHT; @@ -5639,9 +6955,18 @@ static int mdt_connect_internal(const struct lu_env *env, * exp_connect_data.ocd_connect_flags in this case, since * tgt_client_new() needs to know if this is client supports * multiple modify RPCs, and it is safe to expose this flag before - * connection processing completes. */ + * connection processing completes. + */ if (data->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) { - data->ocd_maxmodrpcs = max_mod_rpcs_per_client; + if (mdt_max_mod_rpcs_changed(mdt)) + /* The new mdt.*.max_mod_rpcs_in_flight parameter + * has not changed since initialization, but the + * deprecated module parameter was changed, + * so use that instead. + */ + data->ocd_maxmodrpcs = max_mod_rpcs_per_client; + else + data->ocd_maxmodrpcs = mdt->mdt_max_mod_rpcs_in_flight; spin_lock(&exp->exp_lock); *exp_connect_flags_ptr(exp) |= OBD_CONNECT_MULTIMODRPCS; spin_unlock(&exp->exp_lock); @@ -5650,30 +6975,38 @@ static int mdt_connect_internal(const struct lu_env *env, if (OCD_HAS_FLAG(data, CKSUM)) { __u32 cksum_types = data->ocd_cksum_types; - /* The client set in ocd_cksum_types the checksum types it - * supports. We have to mask off the algorithms that we don't - * support */ - data->ocd_cksum_types &= - obd_cksum_types_supported_server(obd_name); + tgt_mask_cksum_types(&mdt->mdt_lut, &data->ocd_cksum_types); if (unlikely(data->ocd_cksum_types == 0)) { - CERROR("%s: Connect with checksum support but no " - "ocd_cksum_types is set\n", + CERROR("%s: Connect with checksum support but no ocd_cksum_types is set\n", exp->exp_obd->obd_name); RETURN(-EPROTO); } - CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return " - "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp), + CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return %x\n", + exp->exp_obd->obd_name, obd_export_nid2str(exp), cksum_types, data->ocd_cksum_types); } else { - /* This client does not support OBD_CONNECT_CKSUM - * fall back to CRC32 */ - CDEBUG(D_RPCTRACE, "%s: cli %s does not support " - "OBD_CONNECT_CKSUM, CRC32 will be used\n", + /* Client not support OBD_CONNECT_CKSUM? fall back to CRC32 */ + CDEBUG(D_RPCTRACE, "%s: cli %s does not support OBD_CONNECT_CKSUM, CRC32 will be used\n", exp->exp_obd->obd_name, obd_export_nid2str(exp)); } + if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) && + !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { + atomic_inc(&mdt->mdt_mds_mds_conns); + mdt_enable_slc(mdt); + } + + if (!mdt->mdt_lut.lut_dt_conf.ddp_has_lseek_data_hole) + data->ocd_connect_flags2 &= ~OBD_CONNECT2_LSEEK; + + if (!OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) && + !OCD_HAS_FLAG2(data, DMV_IMP_INHERIT)) { + atomic_inc(&mdt->mdt_dmv_old_client_count); + mdt->mdt_enable_dmv_implicit_inherit = 0; + } + return 0; } @@ -5683,6 +7016,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env, { struct lu_context ses; int rc; + ENTRY; rc = lu_context_init(&ses, LCT_SERVER_SESSION); @@ -5693,6 +7027,13 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env, lu_context_enter(&ses); mdt_ucred(info)->uc_valid = UCRED_OLD; + /* do not let rbac interfere with dirty flag internal system event */ + mdt_ucred(info)->uc_rbac_file_perms = 1; + mdt_ucred(info)->uc_rbac_dne_ops = 1; + mdt_ucred(info)->uc_rbac_quota_ops = 1; + mdt_ucred(info)->uc_rbac_byfid_ops = 1; + mdt_ucred(info)->uc_rbac_chlg_ops = 1; + mdt_ucred(info)->uc_rbac_fscrypt_admin = 1; rc = mdt_add_dirty_flag(info, mfd->mfd_object, &info->mti_attr); lu_context_exit(&ses); @@ -5704,7 +7045,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env, static int mdt_export_cleanup(struct obd_export *exp) { - struct list_head closing_list; + LIST_HEAD(closing_list); struct mdt_export_data *med = &exp->exp_mdt_data; struct obd_device *obd = exp->exp_obd; struct mdt_device *mdt; @@ -5712,39 +7053,40 @@ static int mdt_export_cleanup(struct obd_export *exp) struct lu_env env; struct mdt_file_data *mfd, *n; int rc = 0; + ENTRY; - INIT_LIST_HEAD(&closing_list); spin_lock(&med->med_open_lock); while (!list_empty(&med->med_open_head)) { struct list_head *tmp = med->med_open_head.next; + mfd = list_entry(tmp, struct mdt_file_data, mfd_list); /* Remove mfd handle so it can't be found again. - * We are consuming the mfd_list reference here. */ + * We are consuming the mfd_list reference here. + */ class_handle_unhash(&mfd->mfd_open_handle); list_move_tail(&mfd->mfd_list, &closing_list); } spin_unlock(&med->med_open_lock); - mdt = mdt_dev(obd->obd_lu_dev); - LASSERT(mdt != NULL); + mdt = mdt_dev(obd->obd_lu_dev); + LASSERT(mdt != NULL); - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) - RETURN(rc); + rc = lu_env_init(&env, LCT_MD_THREAD); + if (rc) + RETURN(rc); - info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); - LASSERT(info != NULL); - memset(info, 0, sizeof *info); - info->mti_env = &env; - info->mti_mdt = mdt; - info->mti_exp = exp; + info = lu_context_key_get(&env.le_ctx, &mdt_thread_key); + LASSERT(info != NULL); + memset(info, 0, sizeof(*info)); + info->mti_env = &env; + info->mti_mdt = mdt; + info->mti_exp = exp; if (!list_empty(&closing_list)) { struct md_attr *ma = &info->mti_attr; - /* Close any open files (which may also cause orphan - * unlinking). */ + /* Close any open files (which may cause orphan unlinking). */ list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) { list_del_init(&mfd->mfd_list); ma->ma_need = ma->ma_valid = 0; @@ -5767,37 +7109,29 @@ static int mdt_export_cleanup(struct obd_export *exp) rc = mdt_ctxt_add_dirty_flag(&env, info, mfd); /* Don't unlink orphan on failover umount, LU-184 */ - if (exp->exp_flags & OBD_OPT_FAILOVER) { + if (exp->exp_flags & OBD_OPT_FAILOVER || + exp->exp_obd->obd_stopping) { ma->ma_valid = MA_FLAGS; ma->ma_attr_flags |= MDS_KEEP_ORPHAN; } - mdt_mfd_close(info, mfd); - } - } - info->mti_mdt = NULL; - /* cleanup client slot early */ - /* Do not erase record for recoverable client. */ - if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) + ma->ma_valid |= MA_FORCE_LOG; + mdt_mfd_close(info, mfd); + } + } + info->mti_mdt = NULL; + /* cleanup client slot early */ + /* Do not erase record for recoverable client. */ + if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed) tgt_client_del(&env, exp); - lu_env_fini(&env); - - RETURN(rc); -} - -static inline void mdt_enable_slc(struct mdt_device *mdt) -{ - if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL) - mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL; -} + lu_env_fini(&env); -static inline void mdt_disable_slc(struct mdt_device *mdt) -{ - if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL) - mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL; + RETURN(rc); } static int mdt_obd_disconnect(struct obd_export *exp) { + struct obd_connect_data *data = &exp->exp_connect_data; + struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev); int rc; ENTRY; @@ -5805,16 +7139,14 @@ static int mdt_obd_disconnect(struct obd_export *exp) LASSERT(exp); class_export_get(exp); - if (!(exp->exp_flags & OBD_OPT_FORCE)) - tgt_grant_sanity_check(exp->exp_obd, __func__); - - if ((exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) && - !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)) { - struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev); + if (OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) && + atomic_dec_and_test(&mdt->mdt_mds_mds_conns)) + mdt_disable_slc(mdt); - if (atomic_dec_and_test(&mdt->mdt_mds_mds_conns)) - mdt_disable_slc(mdt); - } + if (!OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) && + !OCD_HAS_FLAG2(data, DMV_IMP_INHERIT) && + atomic_dec_and_test(&mdt->mdt_dmv_old_client_count)) + mdt->mdt_enable_dmv_implicit_inherit = 1; rc = server_disconnect_export(exp); if (rc != 0) @@ -5822,6 +7154,9 @@ static int mdt_obd_disconnect(struct obd_export *exp) tgt_grant_discard(exp); + if (!(exp->exp_flags & OBD_OPT_FORCE)) + tgt_grant_sanity_check(exp->exp_obd, __func__); + rc = mdt_export_cleanup(exp); nodemap_del_member(exp); class_export_put(exp); @@ -5839,7 +7174,8 @@ static int mdt_obd_connect(const struct lu_env *env, struct lustre_handle conn = { 0 }; struct mdt_device *mdt; int rc; - lnet_nid_t *client_nid = localdata; + struct lnet_nid *client_nid = localdata; + ENTRY; LASSERT(env != NULL); @@ -5850,12 +7186,6 @@ static int mdt_obd_connect(const struct lu_env *env, mdt = mdt_dev(obd->obd_lu_dev); - if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) && - !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { - atomic_inc(&mdt->mdt_mds_mds_conns); - mdt_enable_slc(mdt); - } - /* * first, check whether the stack is ready to handle requests * XXX: probably not very appropriate method is used now @@ -5879,7 +7209,7 @@ static int mdt_obd_connect(const struct lu_env *env, lexp = class_conn2export(&conn); LASSERT(lexp != NULL); - rc = nodemap_add_member(*client_nid, lexp); + rc = nodemap_add_member(client_nid, lexp); if (rc != 0 && rc != -EEXIST) GOTO(out, rc); @@ -5888,7 +7218,7 @@ static int mdt_obd_connect(const struct lu_env *env, struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd; LASSERT(lcd); - memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid); + memcpy(lcd->lcd_uuid, cluuid, sizeof(lcd->lcd_uuid)); rc = tgt_client_new(env, lexp); if (rc == 0) mdt_export_stats_init(obd, lexp, localdata); @@ -5901,7 +7231,8 @@ out: } else { *exp = lexp; /* Because we do not want this export to be evicted by pinger, - * let's not add this export to the timed chain list. */ + * let's not add this export to the timed chain list. + */ if (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) { spin_lock(&lexp->exp_obd->obd_dev_lock); list_del_init(&lexp->exp_obd_chain_timed); @@ -5918,14 +7249,15 @@ static int mdt_obd_reconnect(const struct lu_env *env, struct obd_connect_data *data, void *localdata) { - lnet_nid_t *client_nid = localdata; - int rc; + struct lnet_nid *client_nid = localdata; + int rc; + ENTRY; if (exp == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); - rc = nodemap_add_member(*client_nid, exp); + rc = nodemap_add_member(client_nid, exp); if (rc != 0 && rc != -EEXIST) RETURN(rc); @@ -5944,6 +7276,7 @@ static int mdt_init_export(struct obd_export *exp) { struct mdt_export_data *med = &exp->exp_mdt_data; int rc; + ENTRY; INIT_LIST_HEAD(&med->med_open_head); @@ -5952,24 +7285,33 @@ static int mdt_init_export(struct obd_export *exp) exp->exp_connecting = 1; spin_unlock(&exp->exp_lock); - /* self-export doesn't need client data and ldlm initialization */ - if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, - &exp->exp_client_uuid))) - RETURN(0); + OBD_ALLOC(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + if (exp->exp_used_slots == NULL) + RETURN(-ENOMEM); + + /* self-export doesn't need client data and ldlm initialization */ + if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, + &exp->exp_client_uuid))) + RETURN(0); - rc = tgt_client_alloc(exp); - if (rc) + rc = tgt_client_alloc(exp); + if (rc) GOTO(err, rc); rc = ldlm_init_export(exp); if (rc) GOTO(err_free, rc); - RETURN(rc); + RETURN(rc); err_free: tgt_client_free(exp); err: + OBD_FREE(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + exp->exp_used_slots = NULL; + CERROR("%s: Failed to initialize export: rc = %d\n", exp->exp_obd->obd_name, rc); return rc; @@ -5977,14 +7319,19 @@ err: static int mdt_destroy_export(struct obd_export *exp) { - ENTRY; + ENTRY; - target_destroy_export(exp); - /* destroy can be called from failed obd_setup, so - * checking uuid is safer than obd_self_export */ - if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, - &exp->exp_client_uuid))) - RETURN(0); + target_destroy_export(exp); + if (exp->exp_used_slots) + OBD_FREE(exp->exp_used_slots, + BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long)); + + /* destroy can be called from failed obd_setup, so + * checking uuid is safer than obd_self_export + */ + if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid, + &exp->exp_client_uuid))) + RETURN(0); ldlm_destroy_export(exp); tgt_client_free(exp); @@ -5998,7 +7345,7 @@ static int mdt_destroy_export(struct obd_export *exp) */ tgt_grant_discard(exp); if (exp_connect_flags(exp) & OBD_CONNECT_GRANT) - exp->exp_obd->u.obt.obt_lut->lut_tgd.tgd_tot_granted_clients--; + obd2obt(exp->exp_obd)->obt_lut->lut_tgd.tgd_tot_granted_clients--; if (!(exp->exp_flags & OBD_OPT_FORCE)) tgt_grant_sanity_check(exp->exp_obd, __func__); @@ -6055,22 +7402,28 @@ static int mdt_path_current(struct mdt_thread_info *info, struct getinfo_fid2path *fp, struct lu_fid *root_fid) { - struct mdt_device *mdt = info->mti_mdt; - struct mdt_object *mdt_obj; - struct link_ea_header *leh; - struct link_ea_entry *lee; - struct lu_name *tmpname = &info->mti_name; - struct lu_fid *tmpfid = &info->mti_tmp_fid1; - struct lu_buf *buf = &info->mti_big_buf; - char *ptr; - int reclen; - struct linkea_data ldata = { NULL }; - int rc = 0; - bool first = true; + struct mdt_device *mdt = info->mti_mdt; + struct lu_name *tmpname = &info->mti_name; + struct lu_fid *tmpfid = &info->mti_tmp_fid1; + struct lu_buf *buf = &info->mti_big_buf; + struct linkea_data ldata = { NULL }; + bool first = true; + struct mdt_object *mdt_obj; + struct link_ea_header *leh; + struct link_ea_entry *lee; + bool worthchecking = true; + bool needsfid = false; + bool supported = false; + int isenc = -1; + char *ptr; + int reclen; + int rc = 0; + ENTRY; /* temp buffer for path element, the buffer will be finally freed - * in mdt_thread_info_fini */ + * in mdt_thread_info_fini + */ buf = lu_buf_check_and_alloc(buf, PATH_MAX); if (buf->lb_buf == NULL) RETURN(-ENOMEM); @@ -6082,8 +7435,6 @@ static int mdt_path_current(struct mdt_thread_info *info, *tmpfid = fp->gf_fid = *mdt_object_fid(obj); while (!lu_fid_eq(root_fid, &fp->gf_fid)) { - struct lu_buf lmv_buf; - if (!lu_fid_eq(root_fid, &mdt->mdt_md_root_fid) && lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid)) GOTO(out, rc = -ENOENT); @@ -6107,6 +7458,37 @@ static int mdt_path_current(struct mdt_thread_info *info, GOTO(remote_out, rc = -EREMOTE); } + if (worthchecking) { + /* need to know if FID being looked up is encrypted */ + struct lu_attr la = { 0 }; + struct dt_object *dt = mdt_obj2dt(mdt_obj); + + if (dt && dt->do_ops && dt->do_ops->do_attr_get) + dt_attr_get(info->mti_env, dt, &la); + if (la.la_valid & LA_FLAGS && + la.la_flags & LUSTRE_ENCRYPT_FL) { + if (!supported && mdt_info_req(info) && + !exp_connect_encrypt_fid2path( + mdt_info_req(info)->rq_export)) { + /* client does not support fid2path + * for encrypted files + */ + mdt_object_put(info->mti_env, mdt_obj); + GOTO(out, rc = -ENODATA); + } else { + supported = true; + } + needsfid = true; + if (isenc == -1) + isenc = 1; + } else { + worthchecking = false; + needsfid = false; + if (isenc == -1) + isenc = 0; + } + } + rc = mdt_links_read(info, mdt_obj, &ldata); if (rc != 0) { mdt_object_put(info->mti_env, mdt_obj); @@ -6117,9 +7499,11 @@ static int mdt_path_current(struct mdt_thread_info *info, lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ linkea_entry_unpack(lee, &reclen, tmpname, tmpfid); /* If set, use link #linkno for path lookup, otherwise use - link #0. Only do this for the final path element. */ + * link #0. Only do this for the final path ement. + */ if (first && fp->gf_linkno < leh->leh_reccount) { int count; + for (count = 0; count < fp->gf_linkno; count++) { lee = (struct link_ea_entry *) ((char *)lee + reclen); @@ -6131,43 +7515,50 @@ static int mdt_path_current(struct mdt_thread_info *info, fp->gf_linkno++; } - lmv_buf.lb_buf = info->mti_xattr_buf; - lmv_buf.lb_len = sizeof(info->mti_xattr_buf); /* Check if it is slave stripes */ - rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj), - &lmv_buf, XATTR_NAME_LMV); + rc = mdt_is_dir_stripe(info, mdt_obj); mdt_object_put(info->mti_env, mdt_obj); - if (rc > 0) { - union lmv_mds_md *lmm = lmv_buf.lb_buf; - - /* For slave stripes, get its master */ - if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) { - fp->gf_fid = *tmpfid; - continue; - } - } else if (rc < 0 && rc != -ENODATA) { + if (rc < 0) GOTO(out, rc); + if (rc == 1) { + fp->gf_fid = *tmpfid; + continue; } - rc = 0; - /* Pack the name in the end of the buffer */ ptr -= tmpname->ln_namelen; if (ptr - 1 <= fp->gf_u.gf_path) - GOTO(out, rc = -EOVERFLOW); + GOTO(out, rc = -ENAMETOOLONG); strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen); + if (needsfid) { + /* Pack FID before file name, so that client can build + * encoded/digested form. + */ + char fidstr[FID_LEN + 1]; + + snprintf(fidstr, sizeof(fidstr), DFID, + PFID(&fp->gf_fid)); + ptr -= strlen(fidstr); + if (ptr - 1 <= fp->gf_u.gf_path) + GOTO(out, rc = -ENAMETOOLONG); + strncpy(ptr, fidstr, strlen(fidstr)); + } *(--ptr) = '/'; - /* keep the last resolved fid to the client, so the - * client will build the left path on another MDT for - * remote object */ + /* keep the last resolved fid to the client, so the client will + * build the left path on another MDT for remote object + */ fp->gf_fid = *tmpfid; first = false; } + /* non-zero will be treated as an error */ + rc = 0; + remote_out: - ptr++; /* skip leading / */ + if (isenc != 1) + ptr++; /* skip leading / unless this is an encrypted file */ memmove(fp->gf_u.gf_path, ptr, fp->gf_u.gf_path + fp->gf_pathlen - ptr); @@ -6185,7 +7576,7 @@ out: * \param[in] info Per-thread common data shared by mdt level handlers. * \param[in] obj Object to do path lookup of * \param[in,out] fp User-provided struct for arguments and to store path - * information + * information * * \retval 0 Lookup successful, path information stored in fp * \retval negative errno if there was a problem @@ -6196,6 +7587,7 @@ static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj, struct mdt_device *mdt = info->mti_mdt; int tries = 3; int rc = -EAGAIN; + ENTRY; if (fp->gf_pathlen < 3) @@ -6226,7 +7618,7 @@ static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj, * * \param[in] info Per-thread common data shared by mdt level handlers. * \param[in,out] fp User-provided struct for arguments and to store path - * information + * information * * \retval 0 Lookup successful, path information and recno stored in fp * \retval -ENOENT, object does not exist @@ -6239,6 +7631,7 @@ static int mdt_fid2path(struct mdt_thread_info *info, struct mdt_device *mdt = info->mti_mdt; struct mdt_object *obj; int rc; + ENTRY; CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n", @@ -6248,12 +7641,24 @@ static int mdt_fid2path(struct mdt_thread_info *info, RETURN(-EINVAL); if (!fid_is_namespace_visible(&fp->gf_fid)) { - CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx" - ", or f_oid != 0, or f_ver == 0\n", mdt_obd_name(mdt), + CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx, or f_oid != 0, or f_ver == 0\n", + mdt_obd_name(mdt), PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL); RETURN(-EINVAL); } + /* return error if client-provided root fid is not the one stored in + * the export + */ + if (root_fid && !fid_is_zero(&info->mti_exp->exp_root_fid) && + !lu_fid_eq(root_fid, &info->mti_exp->exp_root_fid)) { + CDEBUG(D_INFO, + "%s: root fid from client "DFID" but "DFID" stored in export\n", + mdt_obd_name(mdt), PFID(root_fid), + PFID(&info->mti_exp->exp_root_fid)); + RETURN(-EXDEV); + } + obj = mdt_object_find(info->mti_env, mdt, &fp->gf_fid); if (IS_ERR(obj)) { rc = PTR_ERR(obj); @@ -6294,21 +7699,21 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen, struct lu_fid *root_fid = NULL; int rc = 0; - fpin = key + cfs_size_round(sizeof(KEY_FID2PATH)); + fpin = key + round_up(sizeof(KEY_FID2PATH), 8); fpout = val; - if (ptlrpc_req_need_swab(info->mti_pill->rc_req)) + if (req_capsule_req_need_swab(info->mti_pill)) lustre_swab_fid2path(fpin); memcpy(fpout, fpin, sizeof(*fpin)); if (fpout->gf_pathlen != vallen - sizeof(*fpin)) RETURN(-EINVAL); - if (keylen >= cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*fpin) + + if (keylen >= round_up(sizeof(KEY_FID2PATH), 8) + sizeof(*fpin) + sizeof(struct lu_fid)) { /* client sent its root FID, which is normally fileset FID */ root_fid = fpin->gf_u.gf_root_fid; - if (ptlrpc_req_need_swab(info->mti_pill->rc_req)) + if (req_capsule_req_need_swab(info->mti_pill)) lustre_swab_lu_fid(root_fid); if (root_fid != NULL && !fid_is_sane(root_fid)) @@ -6376,6 +7781,7 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg) struct mdt_object *obj; struct mdt_lock_handle *lh; int rc; + ENTRY; if (data->ioc_inlbuf1 == NULL || data->ioc_inllen1 != sizeof(*fid) || @@ -6389,12 +7795,10 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg) CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid)); - lh = &mti->mti_lh[MDT_LH_PARENT]; - mdt_lock_reg_init(lh, LCK_CR); - - obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE); - if (IS_ERR(obj)) - RETURN(PTR_ERR(obj)); + lh = &mti->mti_lh[MDT_LH_PARENT]; + obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE, LCK_CR); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); if (mdt_object_remote(obj)) { rc = -EREMOTE; @@ -6419,59 +7823,76 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg) static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct lu_env env; - struct obd_device *obd = exp->exp_obd; - struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); - struct dt_device *dt = mdt->mdt_bottom; - int rc; + struct obd_device *obd = exp->exp_obd; + struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); + struct dt_device *dt = mdt->mdt_bottom; + struct obd_ioctl_data *data; + struct lu_env env; + int rc; - ENTRY; - CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd); - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) - RETURN(rc); + ENTRY; + CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n", + obd->obd_name, cmd, len, karg, uarg); + + rc = lu_env_init(&env, LCT_MD_THREAD); + if (rc) + RETURN(rc); + /* handle commands that don't use @karg first */ switch (cmd) { case OBD_IOC_SYNC: rc = mdt_device_sync(&env, mdt); - break; + GOTO(out, rc); case OBD_IOC_SET_READONLY: rc = dt_sync(&env, dt); if (rc == 0) rc = dt_ro(&env, dt); - break; - case OBD_IOC_ABORT_RECOVERY: - CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt)); - obd->obd_abort_recovery = 1; - target_stop_recovery_thread(obd); + GOTO(out, rc); + } + + if (unlikely(karg == NULL)) { + OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", rc = -EINVAL); + GOTO(out, rc); + } + data = karg; + + switch (cmd) { + case OBD_IOC_ABORT_RECOVERY: { + if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) { + LCONSOLE_WARN("%s: Aborting MDT recovery\n", + obd->obd_name); + obd->obd_abort_mdt_recovery = 1; + wake_up(&obd->obd_next_transno_waitq); + } else { /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */ + /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */ + LCONSOLE_WARN("%s: Aborting client recovery\n", + obd->obd_name); + obd->obd_abort_recovery = 1; + target_stop_recovery_thread(obd); + } rc = 0; break; - case OBD_IOC_CHANGELOG_REG: - case OBD_IOC_CHANGELOG_DEREG: - case OBD_IOC_CHANGELOG_CLEAR: - rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env, - mdt->mdt_child, + } + case OBD_IOC_CHANGELOG_REG: + case OBD_IOC_CHANGELOG_DEREG: + case OBD_IOC_CHANGELOG_CLEAR: + case OBD_IOC_LLOG_PRINT: + case OBD_IOC_LLOG_CANCEL: + rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env, mdt->mdt_child, cmd, len, karg); - break; + break; case OBD_IOC_START_LFSCK: { struct md_device *next = mdt->mdt_child; - struct obd_ioctl_data *data = karg; struct lfsck_start_param lsp; - if (unlikely(data == NULL)) { - rc = -EINVAL; - break; - } - lsp.lsp_start = (struct lfsck_start *)(data->ioc_inlbuf1); lsp.lsp_index_valid = 0; rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, &lsp); break; } case OBD_IOC_STOP_LFSCK: { - struct md_device *next = mdt->mdt_child; - struct obd_ioctl_data *data = karg; - struct lfsck_stop stop; + struct md_device *next = mdt->mdt_child; + struct lfsck_stop stop; stop.ls_status = LS_STOPPED; /* Old lfsck utils may pass NULL @stop. */ @@ -6485,24 +7906,24 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, break; } case OBD_IOC_QUERY_LFSCK: { - struct md_device *next = mdt->mdt_child; - struct obd_ioctl_data *data = karg; + struct md_device *next = mdt->mdt_child; rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, data->ioc_inlbuf1); break; } - case OBD_IOC_GET_OBJ_VERSION: { - struct mdt_thread_info *mti; - mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key); - memset(mti, 0, sizeof *mti); - mti->mti_env = &env; - mti->mti_mdt = mdt; - mti->mti_exp = exp; - - rc = mdt_ioc_version_get(mti, karg); - break; - } + case OBD_IOC_GET_OBJ_VERSION: { + struct mdt_thread_info *mti; + + mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key); + memset(mti, 0, sizeof(*mti)); + mti->mti_env = &env; + mti->mti_mdt = mdt; + mti->mti_exp = exp; + + rc = mdt_ioc_version_get(mti, karg); + break; + } case OBD_IOC_CATLOGLIST: { struct mdt_thread_info *mti; @@ -6511,21 +7932,21 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rc = llog_catalog_list(&env, mdt->mdt_bottom, 0, karg, &mti->mti_tmp_fid1); break; - } + } default: - rc = -EOPNOTSUPP; - CERROR("%s: Not supported cmd = %d, rc = %d\n", - mdt_obd_name(mdt), cmd, rc); + rc = OBD_IOC_ERROR(obd->obd_name, cmd, "unrecognized", -ENOTTY); + break; } - - lu_env_fini(&env); - RETURN(rc); +out: + lu_env_fini(&env); + RETURN(rc); } static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt) { struct lu_device *ld = md2lu_dev(mdt->mdt_child); int rc; + ENTRY; if (!mdt->mdt_skip_lfsck && !mdt->mdt_bottom->dd_rdonly) { @@ -6547,46 +7968,48 @@ static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt) static int mdt_obd_postrecov(struct obd_device *obd) { - struct lu_env env; - int rc; - - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) - RETURN(rc); - rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev)); - lu_env_fini(&env); - return rc; -} - -static struct obd_ops mdt_obd_device_ops = { - .o_owner = THIS_MODULE, - .o_set_info_async = mdt_obd_set_info_async, - .o_connect = mdt_obd_connect, - .o_reconnect = mdt_obd_reconnect, - .o_disconnect = mdt_obd_disconnect, - .o_init_export = mdt_init_export, - .o_destroy_export = mdt_destroy_export, - .o_iocontrol = mdt_iocontrol, - .o_postrecov = mdt_obd_postrecov, + struct lu_env env; + int rc; + + rc = lu_env_init(&env, LCT_MD_THREAD); + if (rc) + RETURN(rc); + rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev)); + lu_env_fini(&env); + return rc; +} + +static const struct obd_ops mdt_obd_device_ops = { + .o_owner = THIS_MODULE, + .o_set_info_async = mdt_obd_set_info_async, + .o_connect = mdt_obd_connect, + .o_reconnect = mdt_obd_reconnect, + .o_disconnect = mdt_obd_disconnect, + .o_init_export = mdt_init_export, + .o_destroy_export = mdt_destroy_export, + .o_iocontrol = mdt_iocontrol, + .o_postrecov = mdt_obd_postrecov, /* Data-on-MDT IO methods */ .o_preprw = mdt_obd_preprw, .o_commitrw = mdt_obd_commitrw, }; -static struct lu_device* mdt_device_fini(const struct lu_env *env, - struct lu_device *d) +static struct lu_device *mdt_device_fini(const struct lu_env *env, + struct lu_device *d) { - struct mdt_device *m = mdt_dev(d); - ENTRY; + struct mdt_device *m = mdt_dev(d); - mdt_fini(env, m); - RETURN(NULL); + ENTRY; + + mdt_fini(env, m); + RETURN(NULL); } static struct lu_device *mdt_device_free(const struct lu_env *env, - struct lu_device *d) + struct lu_device *d) { struct mdt_device *m = mdt_dev(d); + ENTRY; lu_device_fini(&m->mdt_lu_dev); @@ -6596,33 +8019,33 @@ static struct lu_device *mdt_device_free(const struct lu_env *env, } static struct lu_device *mdt_device_alloc(const struct lu_env *env, - struct lu_device_type *t, - struct lustre_cfg *cfg) + struct lu_device_type *t, + struct lustre_cfg *cfg) { - struct lu_device *l; - struct mdt_device *m; + struct lu_device *l; + struct mdt_device *m; - OBD_ALLOC_PTR(m); - if (m != NULL) { - int rc; + OBD_ALLOC_PTR(m); + if (m != NULL) { + int rc; l = &m->mdt_lu_dev; - rc = mdt_init0(env, m, t, cfg); - if (rc != 0) { - mdt_device_free(env, l); - l = ERR_PTR(rc); - return l; - } - } else - l = ERR_PTR(-ENOMEM); - return l; + rc = mdt_init0(env, m, t, cfg); + if (rc != 0) { + mdt_device_free(env, l); + l = ERR_PTR(rc); + return l; + } + } else + l = ERR_PTR(-ENOMEM); + return l; } /* context key constructor/destructor: mdt_key_init, mdt_key_fini */ LU_KEY_INIT(mdt, struct mdt_thread_info); static void mdt_key_fini(const struct lu_context *ctx, - struct lu_context_key *key, void* data) + struct lu_context_key *key, void *data) { struct mdt_thread_info *info = data; @@ -6664,14 +8087,14 @@ struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info) */ void mdt_enable_cos(struct mdt_device *mdt, bool val) { - struct lu_env env; - int rc; + struct lu_env env; + int rc; mdt->mdt_opts.mo_cos = val; - rc = lu_env_init(&env, LCT_LOCAL); + rc = lu_env_init(&env, LCT_LOCAL); if (unlikely(rc != 0)) { - CWARN("%s: lu_env initialization failed, cannot " - "sync: rc = %d\n", mdt_obd_name(mdt), rc); + CWARN("%s: lu_env initialization failed, cannot sync: rc = %d\n", + mdt_obd_name(mdt), rc); return; } mdt_device_sync(&env, mdt); @@ -6687,30 +8110,35 @@ void mdt_enable_cos(struct mdt_device *mdt, bool val) */ int mdt_cos_is_enabled(struct mdt_device *mdt) { - return mdt->mdt_opts.mo_cos != 0; + return mdt->mdt_opts.mo_cos != 0; } -static struct lu_device_type_operations mdt_device_type_ops = { - .ldto_device_alloc = mdt_device_alloc, - .ldto_device_free = mdt_device_free, - .ldto_device_fini = mdt_device_fini +static const struct lu_device_type_operations mdt_device_type_ops = { + .ldto_device_alloc = mdt_device_alloc, + .ldto_device_free = mdt_device_free, + .ldto_device_fini = mdt_device_fini }; static struct lu_device_type mdt_device_type = { - .ldt_tags = LU_DEVICE_MD, - .ldt_name = LUSTRE_MDT_NAME, - .ldt_ops = &mdt_device_type_ops, - .ldt_ctx_tags = LCT_MD_THREAD + .ldt_tags = LU_DEVICE_MD, + .ldt_name = LUSTRE_MDT_NAME, + .ldt_ops = &mdt_device_type_ops, + .ldt_ctx_tags = LCT_MD_THREAD }; static int __init mdt_init(void) { int rc; - CLASSERT(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") == - FID_NOBRACE_LEN + 1); - CLASSERT(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") == - FID_LEN + 1); + BUILD_BUG_ON(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") != + FID_NOBRACE_LEN + 1); + BUILD_BUG_ON(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") != + FID_LEN + 1); + + rc = libcfs_setup(); + if (rc) + return rc; + rc = lu_kmem_init(mdt_caches); if (rc) return rc; @@ -6719,7 +8147,7 @@ static int __init mdt_init(void) if (rc) GOTO(lu_fini, rc); - rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL, + rc = class_register_type(&mdt_obd_device_ops, NULL, true, LUSTRE_MDT_NAME, &mdt_device_type); if (rc) GOTO(mds_fini, rc);