X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_handler.c;h=3c5ebd50d75d0d83eccffffe226ea80224d0f584;hp=84e21d6e45a78b5707f4fbf19c9b24c7d1833701;hb=f238540c879dc668e18cf99cba62f117ccae64d6;hpb=ca68e3d677a371497586167a2318268db1d94cab diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 84e21d6..3c5ebd5 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. * * lustre/mdt/mdt_handler.c * @@ -58,11 +57,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include "mdt_internal.h" @@ -163,6 +164,13 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm) lh->mlh_type = MDT_REG_LOCK; } +void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock) +{ + mdt_lock_reg_init(lh, lock->l_req_mode); + if (lock->l_req_mode == LCK_GROUP) + lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid; +} + void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode, const struct lu_name *lname) { @@ -263,11 +271,47 @@ static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o EXIT; } +/** + * Check whether \a o is directory stripe object. + * + * \param[in] info thread environment + * \param[in] o MDT object + * + * \retval 1 is directory stripe. + * \retval 0 isn't directory stripe. + * \retval < 1 error code + */ +static int mdt_is_dir_stripe(struct mdt_thread_info *info, + struct mdt_object *o) +{ + struct md_attr *ma = &info->mti_attr; + struct lmv_mds_md_v1 *lmv; + int rc; + + rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); + if (rc < 0) + return rc; + + if (!(ma->ma_valid & MA_LMV)) + return 0; + + lmv = &ma->ma_lmv->lmv_md_v1; + + if (!lmv_is_sane2(lmv)) + return -EBADF; + + if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE) + return 1; + + return 0; +} + static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, struct lu_fid *fid) { struct mdt_device *mdt = info->mti_mdt; struct lu_name *lname = &info->mti_name; + const char *start = fileset; char *filename = info->mti_filename; struct mdt_object *parent; u32 mode; @@ -282,8 +326,8 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, */ *fid = mdt->mdt_md_root_fid; - while (rc == 0 && fileset != NULL && *fileset != '\0') { - const char *s1 = fileset; + while (rc == 0 && start != NULL && *start != '\0') { + const char *s1 = start; const char *s2; while (*++s1 == '/') @@ -295,7 +339,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, if (s2 == s1) break; - fileset = s2; + start = s2; lname->ln_namelen = s2 - s1; if (lname->ln_namelen > NAME_MAX) { @@ -331,9 +375,21 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset, rc = PTR_ERR(parent); else { mode = lu_object_attr(&parent->mot_obj); - mdt_object_put(info->mti_env, parent); - if (!S_ISDIR(mode)) + if (!S_ISDIR(mode)) { rc = -ENOTDIR; + } else if (mdt_is_remote_object(info, parent, parent)) { + if (!mdt->mdt_enable_remote_subdir_mount) { + rc = -EREMOTE; + LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n", + mdt_obd_name(mdt), + fileset, rc); + } else { + LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n", + mdt_obd_name(mdt), + fileset); + } + } + mdt_object_put(info->mti_env, parent); } } @@ -414,6 +470,8 @@ static int mdt_statfs(struct tgt_session_info *tsi) struct obd_statfs *osfs; struct mdt_body *reqbody = NULL; struct mdt_statfs_cache *msf; + ktime_t kstart = ktime_get(); + int current_blockbits; int rc; ENTRY; @@ -470,6 +528,15 @@ static int mdt_statfs(struct tgt_session_info *tsi) spin_unlock(&mdt->mdt_lock); } + /* tgd_blockbit is recordsize bits set during mkfs. + * This once set does not change. However, 'zfs set' + * can be used to change the MDT blocksize. Instead + * of using cached value of 'tgd_blockbit' always + * calculate the blocksize bits which may have + * changed. + */ + current_blockbits = fls64(osfs->os_bsize) - 1; + /* at least try to account for cached pages. its still racy and * might be under-reporting if clients haven't announced their * caches with brw recently */ @@ -477,12 +544,12 @@ static int mdt_statfs(struct tgt_session_info *tsi) " pending %llu free %llu avail %llu\n", tgd->tgd_tot_dirty, tgd->tgd_tot_granted, tgd->tgd_tot_pending, - osfs->os_bfree << tgd->tgd_blockbits, - osfs->os_bavail << tgd->tgd_blockbits); + osfs->os_bfree << current_blockbits, + osfs->os_bavail << current_blockbits); osfs->os_bavail -= min_t(u64, osfs->os_bavail, ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending + - osfs->os_bsize - 1) >> tgd->tgd_blockbits)); + osfs->os_bsize - 1) >> current_blockbits)); tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__); CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; " @@ -491,24 +558,77 @@ static int mdt_statfs(struct tgt_session_info *tsi) osfs->os_files, osfs->os_ffree, osfs->os_state); if (!exp_grant_param_supp(tsi->tsi_exp) && - tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) { + current_blockbits > COMPAT_BSIZE_SHIFT) { /* clients which don't support OBD_CONNECT_GRANT_PARAM * should not see a block size > page size, otherwise * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12) * block size which is the biggest block size known to work * with all client's page size. */ - osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; - osfs->os_bfree <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; - osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT; + osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT; + osfs->os_bfree <<= current_blockbits - COMPAT_BSIZE_SHIFT; + osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT; osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT; } if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_STATFS); + mdt_counter_incr(req, LPROC_MDT_STATFS, + ktime_us_delta(ktime_get(), kstart)); out: mdt_thread_info_fini(info); RETURN(rc); } +__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only) +{ + struct lov_comp_md_v1 *comp_v1; + struct lov_mds_md *v1; + __u32 off; + __u32 dom_stripesize = 0; + int i; + bool has_ost_stripes = false; + + ENTRY; + + if (is_dom_only) + *is_dom_only = 0; + + if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1) + RETURN(0); + + comp_v1 = (struct lov_comp_md_v1 *)lmm; + off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + off); + + /* Fast check for DoM entry with no mirroring, should be the first */ + if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 && + lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT) + RETURN(0); + + /* check all entries otherwise */ + for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) { + struct lov_comp_md_entry_v1 *lcme; + + lcme = &comp_v1->lcm_entries[i]; + if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT)) + continue; + + off = le32_to_cpu(lcme->lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + off); + + if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) == + LOV_PATTERN_MDT) + dom_stripesize = le32_to_cpu(v1->lmm_stripe_size); + else + has_ost_stripes = true; + + if (dom_stripesize && has_ost_stripes) + RETURN(dom_stripesize); + } + /* DoM-only case exits here */ + if (is_dom_only && dom_stripesize) + *is_dom_only = 1; + RETURN(dom_stripesize); +} + /** * Pack size attributes into the reply. */ @@ -517,7 +637,7 @@ int mdt_pack_size2body(struct mdt_thread_info *info, { struct mdt_body *b; struct md_attr *ma = &info->mti_attr; - int dom_stripe; + __u32 dom_stripe; bool dom_lock = false; ENTRY; @@ -528,9 +648,9 @@ int mdt_pack_size2body(struct mdt_thread_info *info, !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)) RETURN(-ENODATA); - dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm); + dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm); /* no DoM stripe, no size in reply */ - if (dom_stripe == LMM_NO_DOM) + if (!dom_stripe) RETURN(-ENOENT); if (lustre_handle_is_used(lh)) { @@ -583,6 +703,7 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody, if (buf->lb_len == 0) RETURN(0); + LASSERT(!info->mti_big_acl_used); again: rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS); if (rc < 0) { @@ -592,14 +713,14 @@ again: rc = 0; } else if (rc == -EOPNOTSUPP) { rc = 0; - } else { - if (rc == -ERANGE && - exp_connect_large_acl(info->mti_exp) && - buf->lb_buf != info->mti_big_acl) { + } else if (rc == -ERANGE) { + if (exp_connect_large_acl(info->mti_exp) && + !info->mti_big_acl_used) { if (info->mti_big_acl == NULL) { info->mti_big_aclsize = - MIN(mdt->mdt_max_ea_size, - XATTR_SIZE_MAX); + min_t(unsigned int, + mdt->mdt_max_ea_size, + XATTR_SIZE_MAX); OBD_ALLOC_LARGE(info->mti_big_acl, info->mti_big_aclsize); if (info->mti_big_acl == NULL) { @@ -620,47 +741,19 @@ again: buf->lb_buf = info->mti_big_acl; buf->lb_len = info->mti_big_aclsize; - + info->mti_big_acl_used = 1; goto again; } - + /* FS has ACL bigger that our limits */ + CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n", + mdt_obd_name(mdt), PFID(mdt_object_fid(o)), + info->mti_big_aclsize); + rc = -E2BIG; + } else { CERROR("%s: unable to read "DFID" ACL: rc = %d\n", mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc); } } else { - int client; - int server; - int acl_buflen; - int lmm_buflen = 0; - int lmmsize = 0; - - acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER); - if (acl_buflen >= rc) - goto map; - - /* If LOV/LMA EA is small, we can reuse part of their buffer */ - client = ptlrpc_req_get_repsize(pill->rc_req); - server = lustre_packed_msg_size(pill->rc_req->rq_repmsg); - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) { - lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD, - RCL_SERVER); - lmmsize = repbody->mbo_eadatasize; - } - - if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) { - CDEBUG(D_INODE, "%s: client prepared buffer size %d " - "is not big enough with the ACL size %d (%d)\n", - mdt_obd_name(mdt), client, rc, - server - acl_buflen - lmm_buflen + rc + lmmsize); - repbody->mbo_aclsize = 0; - repbody->mbo_valid &= ~OBD_MD_FLACL; - RETURN(-ERANGE); - } - -map: - if (buf->lb_buf == info->mti_big_acl) - info->mti_big_acl_used = 1; - rc = nodemap_map_acl(nodemap, buf->lb_buf, rc, NODEMAP_FS_TO_CLIENT); /* if all ACLs mapped out, rc is still >= 0 */ @@ -708,8 +801,9 @@ static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm) void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, const struct lu_attr *attr, const struct lu_fid *fid) { - struct md_attr *ma = &info->mti_attr; + struct mdt_device *mdt = info->mti_mdt; struct obd_export *exp = info->mti_exp; + struct md_attr *ma = &info->mti_attr; struct lu_nodemap *nodemap = NULL; LASSERT(ma->ma_valid & MA_INODE); @@ -726,6 +820,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, b->mbo_ctime = attr->la_ctime; b->mbo_valid |= OBD_MD_FLCTIME; } + if (attr->la_valid & LA_BTIME) { + b->mbo_btime = attr->la_btime; + b->mbo_valid |= OBD_MD_FLBTIME; + } if (attr->la_valid & LA_FLAGS) { b->mbo_flags = attr->la_flags; b->mbo_valid |= OBD_MD_FLFLAGS; @@ -734,7 +832,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, b->mbo_nlink = attr->la_nlink; b->mbo_valid |= OBD_MD_FLNLINK; } - if (attr->la_valid & (LA_UID|LA_GID)) { + if (attr->la_valid & (LA_UID|LA_GID|LA_PROJID)) { nodemap = nodemap_get_from_exp(exp); if (IS_ERR(nodemap)) goto out; @@ -753,8 +851,9 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, } if (attr->la_valid & LA_PROJID) { - /* TODO, nodemap for project id */ - b->mbo_projid = attr->la_projid; + b->mbo_projid = nodemap_map_id(nodemap, NODEMAP_PROJID, + NODEMAP_FS_TO_CLIENT, + attr->la_projid); b->mbo_valid |= OBD_MD_FLPROJID; } @@ -799,7 +898,9 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, else b->mbo_blocks = 1; b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - } else if (info->mti_som_valid) { /* som is valid */ + } else if (info->mti_som_strict && + mdt->mdt_opts.mo_enable_strict_som) { + /* use SOM for size*/ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */ b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS; @@ -922,8 +1023,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o, RETURN(rc); } -int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, - struct md_attr *ma, const char *name) +int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, + struct md_attr *ma, const char *name) { struct md_object *next = mdt_object_child(o); struct lu_buf *buf = &info->mti_buf; @@ -999,6 +1100,40 @@ got: return rc; } +int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o, + struct md_attr *ma, const char *name) +{ + int rc; + + if (!info->mti_big_lmm) { + OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE); + if (!info->mti_big_lmm) + return -ENOMEM; + info->mti_big_lmmsize = PAGE_SIZE; + } + + if (strcmp(name, XATTR_NAME_LOV) == 0) { + ma->ma_lmm = info->mti_big_lmm; + ma->ma_lmm_size = info->mti_big_lmmsize; + ma->ma_valid &= ~MA_LOV; + } else if (strcmp(name, XATTR_NAME_LMV) == 0) { + ma->ma_lmv = info->mti_big_lmm; + ma->ma_lmv_size = info->mti_big_lmmsize; + ma->ma_valid &= ~MA_LMV; + } else { + LBUG(); + } + + LASSERT(!info->mti_big_lmm_used); + rc = __mdt_stripe_get(info, o, ma, name); + /* since big_lmm is always used here, clear 'used' flag to avoid + * assertion in mdt_big_xattr_get(). + */ + info->mti_big_lmm_used = 0; + + return rc; +} + int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, struct lu_fid *pfid) { @@ -1046,6 +1181,51 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o, RETURN(0); } +int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o, + struct lu_fid *pfid, struct lu_name *lname) +{ + struct lu_buf *buf = &info->mti_buf; + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen; + int rc; + + buf->lb_buf = info->mti_xattr_buf; + buf->lb_len = sizeof(info->mti_xattr_buf); + rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf, + XATTR_NAME_LINK); + if (rc == -ERANGE) { + rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK); + buf->lb_buf = info->mti_big_lmm; + buf->lb_len = info->mti_big_lmmsize; + } + if (rc < 0) + return rc; + + if (rc < sizeof(*leh)) { + CERROR("short LinkEA on "DFID": rc = %d\n", + PFID(mdt_object_fid(o)), rc); + return -ENODATA; + } + + leh = (struct link_ea_header *)buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + } + if (leh->leh_magic != LINK_EA_MAGIC) + return -EINVAL; + + if (leh->leh_reccount == 0) + return -ENODATA; + + linkea_entry_unpack(lee, &reclen, lname, pfid); + + return 0; +} + int mdt_attr_get_complex(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma) { @@ -1083,19 +1263,19 @@ int mdt_attr_get_complex(struct mdt_thread_info *info, } if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV); if (rc) GOTO(out, rc); } if (need & MA_LMV && S_ISDIR(mode)) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV); if (rc != 0) GOTO(out, rc); } if (need & MA_LMV_DEF && S_ISDIR(mode)) { - rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV); + rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV); if (rc != 0) GOTO(out, rc); } @@ -1144,20 +1324,37 @@ out: RETURN(rc); } +static void mdt_preset_encctx_size(struct mdt_thread_info *info) +{ + struct req_capsule *pill = info->mti_pill; + + ENTRY; + if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX, + RCL_SERVER)) + /* pre-set size in server part with max size */ + req_capsule_set_size(pill, &RMF_FILE_ENCCTX, + RCL_SERVER, + info->mti_mdt->mdt_max_mdsize); + EXIT; +} + static int mdt_getattr_internal(struct mdt_thread_info *info, - struct mdt_object *o, int ma_need) + struct mdt_object *o, int ma_need) { - struct md_object *next = mdt_object_child(o); - const struct mdt_body *reqbody = info->mti_body; - struct ptlrpc_request *req = mdt_info_req(info); - struct md_attr *ma = &info->mti_attr; - struct lu_attr *la = &ma->ma_attr; - struct req_capsule *pill = info->mti_pill; - const struct lu_env *env = info->mti_env; - struct mdt_body *repbody; - struct lu_buf *buffer = &info->mti_buf; - struct obd_export *exp = info->mti_exp; - int rc; + struct mdt_device *mdt = info->mti_mdt; + struct md_object *next = mdt_object_child(o); + const struct mdt_body *reqbody = info->mti_body; + struct ptlrpc_request *req = mdt_info_req(info); + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + struct req_capsule *pill = info->mti_pill; + const struct lu_env *env = info->mti_env; + struct mdt_body *repbody; + struct lu_buf *buffer = &info->mti_buf; + struct obd_export *exp = info->mti_exp; + ktime_t kstart = ktime_get(); + int rc; + ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) @@ -1244,20 +1441,20 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } } - if (S_ISDIR(lu_object_attr(&next->mo_lu)) && + if (S_ISDIR(lu_object_attr(&next->mo_lu)) && reqbody->mbo_valid & OBD_MD_FLDIREA && - lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) { - /* get default stripe info for this dir. */ - ma->ma_need |= MA_LOV_DEF; - } - ma->ma_need |= ma_need; + lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) { + /* get default stripe info for this dir. */ + ma->ma_need |= MA_LOV_DEF; + } + ma->ma_need |= ma_need; rc = mdt_attr_get_complex(info, o, ma); if (unlikely(rc)) { - CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR, - "%s: getattr error for "DFID": rc = %d\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(o)), rc); + CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR, + "%s: getattr error for "DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(o)), rc); RETURN(rc); } @@ -1269,22 +1466,28 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->mbo_t_state = MS_RESTORE; } - if (likely(ma->ma_valid & MA_INODE)) - mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o)); - else - RETURN(-EFAULT); + if (unlikely(!(ma->ma_valid & MA_INODE))) + RETURN(-EFAULT); + + mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o)); - if (mdt_body_has_lov(la, reqbody)) { - if (ma->ma_valid & MA_LOV) { - LASSERT(ma->ma_lmm_size); + if (mdt_body_has_lov(la, reqbody)) { + u32 stripe_count = 1; + bool fixed_layout = false; + + if (ma->ma_valid & MA_LOV) { + LASSERT(ma->ma_lmm_size); repbody->mbo_eadatasize = ma->ma_lmm_size; if (S_ISDIR(la->la_mode)) repbody->mbo_valid |= OBD_MD_FLDIREA; else repbody->mbo_valid |= OBD_MD_FLEASIZE; mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid); - } + } if (ma->ma_valid & MA_LMV) { + struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1; + u32 magic = le32_to_cpu(lmv->lmv_magic); + /* Return -ENOTSUPP for old client */ if (!mdt_is_striped_client(req->rq_export)) RETURN(-ENOTSUPP); @@ -1293,6 +1496,14 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, mdt_dump_lmv(D_INFO, ma->ma_lmv); repbody->mbo_eadatasize = ma->ma_lmv_size; repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA); + + stripe_count = le32_to_cpu(lmv->lmv_stripe_count); + fixed_layout = lmv_is_fixed(lmv); + if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv)) + mdt_restripe_migrate_add(info, o); + else if (magic == LMV_MAGIC_V1 && + lmv_is_restriping(lmv)) + mdt_restripe_update_add(info, o); } if (ma->ma_valid & MA_LMV_DEF) { /* Return -ENOTSUPP for old client */ @@ -1309,6 +1520,19 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->mbo_valid |= (OBD_MD_FLDIREA | OBD_MD_DEFAULT_MEA); } + CDEBUG(D_VFSTRACE, + "dirent count %llu stripe count %u MDT count %d\n", + ma->ma_attr.la_dirent_count, stripe_count, + atomic_read(&mdt->mdt_mds_mds_conns) + 1); + if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET && + ma->ma_attr.la_dirent_count > + mdt->mdt_restriper.mdr_dir_split_count && + !fid_is_root(mdt_object_fid(o)) && + mdt->mdt_enable_dir_auto_split && + !o->mot_restriping && + stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 && + !fixed_layout) + mdt_auto_split_add(info, o); } else if (S_ISLNK(la->la_mode) && reqbody->mbo_valid & OBD_MD_LINKNAME) { buffer->lb_buf = ma->ma_lmm; @@ -1346,8 +1570,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, print_limit < rc ? "..." : "", print_limit, (char *)ma->ma_lmm + rc - print_limit, rc); rc = 0; - } - } + } + } if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) { repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize; @@ -1369,10 +1593,11 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, #endif out: - if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_GETATTR); + if (rc == 0) + mdt_counter_incr(req, LPROC_MDT_GETATTR, + ktime_us_delta(ktime_get(), kstart)); - RETURN(rc); + RETURN(rc); } static int mdt_getattr(struct tgt_session_info *tsi) @@ -1433,6 +1658,7 @@ static int mdt_getattr(struct tgt_session_info *tsi) * enlarge the buffer when necessary. */ req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); + mdt_preset_encctx_size(info); rc = req_capsule_server_pack(pill); if (unlikely(rc != 0)) @@ -1450,6 +1676,10 @@ static int mdt_getattr(struct tgt_session_info *tsi) info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); rc = mdt_getattr_internal(info, obj, 0); + if (unlikely(rc)) + GOTO(out_shrink, rc); + + rc = mdt_pack_encctx_in_reply(info, obj); EXIT; out_shrink: mdt_client_compatibility(info); @@ -1497,10 +1727,15 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj, if (rc > 0) { /* not resent */ + __u64 lockpart = MDS_INODELOCK_LAYOUT; + + /* take layout lock to prepare layout change */ + if (layout->mlc_opc == MD_LAYOUT_WRITE) + lockpart |= MDS_INODELOCK_UPDATE; + mdt_lock_handle_init(lhc); mdt_lock_reg_init(lhc, LCK_EX); - rc = mdt_reint_object_lock(info, obj, lhc, MDS_INODELOCK_LAYOUT, - false); + rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false); if (rc) RETURN(rc); } @@ -1632,29 +1867,143 @@ out: static int mdt_raw_lookup(struct mdt_thread_info *info, struct mdt_object *parent, - const struct lu_name *lname, - struct ldlm_reply *ldlm_rep) + const struct lu_name *lname) { - struct lu_fid *child_fid = &info->mti_tmp_fid1; - int rc; + struct lu_fid *fid = &info->mti_tmp_fid1; + struct mdt_body *repbody; + bool is_dotdot = false; + bool is_old_parent_stripe = false; + bool is_new_parent_checked = false; + int rc; + ENTRY; LASSERT(!info->mti_cross_ref); + /* Always allow to lookup ".." */ + if (lname->ln_namelen == 2 && + lname->ln_name[0] == '.' && lname->ln_name[1] == '.') { + info->mti_spec.sp_permitted = 1; + is_dotdot = true; + if (mdt_is_dir_stripe(info, parent) == 1) + is_old_parent_stripe = true; + } + mdt_object_get(info->mti_env, parent); +lookup: /* Only got the fid of this obj by name */ - fid_zero(child_fid); - rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object), - lname, child_fid, &info->mti_spec); - if (rc == 0) { - struct mdt_body *repbody; + fid_zero(fid); + rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid, + &info->mti_spec); + mdt_object_put(info->mti_env, parent); + if (rc) + RETURN(rc); + + /* getattr_name("..") should return master object FID for striped dir */ + if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) { + parent = mdt_object_find(info->mti_env, info->mti_mdt, fid); + if (IS_ERR(parent)) + RETURN(PTR_ERR(parent)); + + /* old client getattr_name("..") with stripe FID */ + if (unlikely(is_old_parent_stripe)) { + is_old_parent_stripe = false; + goto lookup; + } + + /* ".." may be a stripe */ + if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) { + is_new_parent_checked = true; + goto lookup; + } + + mdt_object_put(info->mti_env, parent); + } + + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + repbody->mbo_fid1 = *fid; + repbody->mbo_valid = OBD_MD_FLID; + + RETURN(rc); +} + +/** + * Find name matching hash + * + * We search \a child LinkEA for a name whose hash matches \a lname + * (it contains an encoded hash). + * + * \param info mdt thread info + * \param lname encoded hash to find + * \param parent parent object + * \param child object to search with LinkEA + * + * \retval 1 match found + * \retval 0 no match found + * \retval -ev negative errno upon error + */ +int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname, + struct mdt_object *parent, struct mdt_object *child) +{ + /* Here, lname is an encoded hash of on-disk name, and + * client is doing access without encryption key. + * So we need to get LinkEA, check parent fid is correct and + * compare name hash with the one in the request. + */ + struct lu_buf *buf = &info->mti_big_buf; + struct lu_name name; + struct lu_fid pfid; + struct linkea_data ldata = { NULL }; + struct link_ea_header *leh; + struct link_ea_entry *lee; + struct lu_buf link = { 0 }; + char *hash; + int reclen, count, rc; + + ENTRY; + if (lname->ln_namelen < LL_CRYPTO_BLOCK_SIZE) + RETURN(-EINVAL); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - repbody->mbo_fid1 = *child_fid; - repbody->mbo_valid = OBD_MD_FLID; - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); - } else if (rc == -ENOENT) { - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); + buf = lu_buf_check_and_alloc(buf, PATH_MAX); + if (!buf->lb_buf) + RETURN(-ENOMEM); + + ldata.ld_buf = buf; + rc = mdt_links_read(info, child, &ldata); + if (rc < 0) + RETURN(rc); + + hash = kmalloc(lname->ln_namelen, GFP_NOFS); + if (!hash) + RETURN(-ENOMEM); + rc = critical_decode(lname->ln_name, lname->ln_namelen, hash); + + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); + for (count = 0; count < leh->leh_reccount; count++) { + linkea_entry_unpack(lee, &reclen, &name, &pfid); + if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) { + lu_buf_check_and_alloc(&link, name.ln_namelen); + if (!link.lb_buf) + GOTO(out_match, rc = -ENOMEM); + rc = critical_decode(name.ln_name, name.ln_namelen, + link.lb_buf); + + if (memcmp(LLCRYPT_EXTRACT_DIGEST(link.lb_buf, rc), + hash, LL_CRYPTO_BLOCK_SIZE) == 0) { + *lname = name; + break; + } + } + lee = (struct link_ea_entry *) ((char *)lee + reclen); } + if (count == leh->leh_reccount) + rc = 0; + else + rc = 1; + +out_match: + lu_buf_free(&link); + kfree(hash); RETURN(rc); } @@ -1670,14 +2019,14 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, __u64 child_bits, struct ldlm_reply *ldlm_rep) { - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_body *reqbody = NULL; - struct mdt_object *parent = info->mti_object; - struct mdt_object *child; - struct lu_fid *child_fid = &info->mti_tmp_fid1; - struct lu_name *lname = NULL; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_body *reqbody = NULL; + struct mdt_object *parent = info->mti_object; + struct mdt_object *child = NULL; + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct lu_name *lname = NULL; struct mdt_lock_handle *lhp = NULL; - struct ldlm_lock *lock; + struct ldlm_lock *lock; struct req_capsule *pill = info->mti_pill; __u64 try_bits = 0; bool is_resent; @@ -1735,18 +2084,57 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, } rc = mdt_getattr_internal(info, child, 0); - if (unlikely(rc != 0)) + if (unlikely(rc != 0)) { mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); + } - mdt_pack_secctx_in_reply(info, child); + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) { + mdt_object_unlock(info, child, lhc, 1); + RETURN(rc); + } + rc = mdt_pack_encctx_in_reply(info, child); + if (unlikely(rc)) + mdt_object_unlock(info, child, lhc, 1); RETURN(rc); } lname = &info->mti_name; mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON); - if (lu_name_is_valid(lname)) { + if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) { + reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); + if (unlikely(reqbody == NULL)) + RETURN(err_serious(-EPROTO)); + + *child_fid = reqbody->mbo_fid2; + if (unlikely(!fid_is_sane(child_fid))) + RETURN(err_serious(-EINVAL)); + + if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { + mdt_object_get(info->mti_env, parent); + child = parent; + } else { + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } + + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " + "ldlm_rep = %p\n", + PFID(mdt_object_fid(parent)), + PFID(&reqbody->mbo_fid2), ldlm_rep); + } else if (lu_name_is_valid(lname)) { + if (mdt_object_remote(parent)) { + CERROR("%s: parent "DFID" is on remote target\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(parent))); + RETURN(-EPROTO); + } + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", " "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), PNAME(lname), ldlm_rep); @@ -1756,10 +2144,33 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, RETURN(err_serious(-EPROTO)); *child_fid = reqbody->mbo_fid2; - if (unlikely(!fid_is_sane(child_fid))) RETURN(err_serious(-EINVAL)); + if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { + mdt_object_get(info->mti_env, parent); + child = parent; + } else { + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } + + if (mdt_object_remote(child)) { + CERROR("%s: child "DFID" is on remote target\n", + mdt_obd_name(info->mti_mdt), + PFID(mdt_object_fid(child))); + GOTO(out_child, rc = -EPROTO); + } + + /* don't fetch LOOKUP lock if it's remote object */ + rc = mdt_is_remote_object(info, parent, child); + if (rc < 0) + GOTO(out_child, rc); + if (rc) + child_bits &= ~MDS_INODELOCK_LOOKUP; + CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", " "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), @@ -1768,29 +2179,37 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); - if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) { + if (unlikely(!mdt_object_exists(parent)) && + !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) && + lu_name_is_valid(lname)) { LU_OBJECT_DEBUG(D_INODE, info->mti_env, - &parent->mot_obj, - "Parent doesn't exist!"); - RETURN(-ESTALE); - } - - if (mdt_object_remote(parent)) { - CERROR("%s: parent "DFID" is on remote target\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(parent))); - RETURN(-EIO); + &parent->mot_obj, + "Parent doesn't exist!"); + GOTO(out_child, rc = -ESTALE); } - if (lu_name_is_valid(lname)) { - /* Always allow to lookup ".." */ - if (unlikely(lname->ln_namelen == 2 && - lname->ln_name[0] == '.' && - lname->ln_name[1] == '.')) - info->mti_spec.sp_permitted = 1; - + if (!child && is_resent) { + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); + if (lock == NULL) { + /* Lock is pinned by ldlm_handle_enqueue0() as it is + * a resend case, however, it could be already destroyed + * due to client eviction or a raced cancel RPC. + */ + LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx", + lhc->mlh_reg_lh.cookie); + RETURN(-ESTALE); + } + fid_extract_from_res_name(child_fid, + &lock->l_resource->lr_name); + LDLM_LOCK_PUT(lock); + child = mdt_object_find(info->mti_env, info->mti_mdt, + child_fid); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + } else if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) && + lu_name_is_valid(lname)) { if (info->mti_body->mbo_valid == OBD_MD_FLID) { - rc = mdt_raw_lookup(info, parent, lname, ldlm_rep); + rc = mdt_raw_lookup(info, parent, lname); RETURN(rc); } @@ -1813,30 +2232,31 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); if (rc != 0) - GOTO(out_parent, rc); - } - - mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + GOTO(unlock_parent, rc); - /* - *step 3: find the child object by fid & lock it. - * regardless if it is local or remote. - * - *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e) - * set parent dir fid the same as child fid in getattr by fid case - * we should not lu_object_find() the object again, could lead - * to hung if there is a concurrent unlink destroyed the object. - */ - if (lu_fid_eq(mdt_object_fid(parent), child_fid)) { - mdt_object_get(info->mti_env, parent); - child = parent; - } else { child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); + if (unlikely(IS_ERR(child))) + GOTO(unlock_parent, rc = PTR_ERR(child)); } - if (unlikely(IS_ERR(child))) - GOTO(out_parent, rc = PTR_ERR(child)); + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + + /* step 3: lock child regardless if it is local or remote. */ + LASSERT(child); + + if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) { + /* Here, lname is an encoded hash of on-disk name, and + * client is doing access without encryption key. + * So we need to compare name hash with the one in the request. + */ + if (!find_name_matching_hash(info, lname, parent, + child)) { + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); + mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + GOTO(out_child, rc = -ENOENT); + } + } OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); if (!mdt_object_exists(child)) { @@ -1854,7 +2274,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, mdt_lock_reg_init(lhc, LCK_PR); if (!(child_bits & MDS_INODELOCK_UPDATE) && - mdt_object_exists(child) && !mdt_object_remote(child)) { + !mdt_object_remote(child)) { struct md_attr *ma = &info->mti_attr; ma->ma_valid = 0; @@ -1868,12 +2288,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, * lock and this might save us RPC on later STAT. For * directories, it also let negative dentry cache start * working for this dir. */ - if (ma->ma_valid & MA_INODE && - ma->ma_attr.la_valid & LA_CTIME && - info->mti_mdt->mdt_namespace->ns_ctime_age_limit + - ma->ma_attr.la_ctime < ktime_get_real_seconds()) - child_bits |= MDS_INODELOCK_UPDATE; - } + if (ma->ma_valid & MA_INODE && + ma->ma_attr.la_valid & LA_CTIME && + info->mti_mdt->mdt_namespace->ns_ctime_age_limit + + ma->ma_attr.la_ctime < ktime_get_real_seconds()) + child_bits |= MDS_INODELOCK_UPDATE; + } /* layout lock must be granted in a best-effort way * for IT operations */ @@ -1911,11 +2331,24 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, /* finally, we can get attr for child. */ rc = mdt_getattr_internal(info, child, ma_need); if (unlikely(rc != 0)) { - mdt_object_unlock(info, child, lhc, 1); + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc); + } + + rc = mdt_pack_secctx_in_reply(info, child); + if (unlikely(rc)) { + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); GOTO(out_child, rc); } - mdt_pack_secctx_in_reply(info, child); + rc = mdt_pack_encctx_in_reply(info, child); + if (unlikely(rc)) { + if (!is_resent) + mdt_object_unlock(info, child, lhc, 1); + GOTO(out_child, rc); + } lock = ldlm_handle2lock(&lhc->mlh_reg_lh); if (lock) { @@ -1927,8 +2360,17 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, PLDLMRES(lock->l_resource), PFID(mdt_object_fid(child))); - if (mdt_object_exists(child) && - S_ISREG(lu_object_attr(&child->mot_obj)) && + if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) { + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) + OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND, + req->rq_deadline - + req->rq_arrival_time.tv_sec + + cfs_fail_val ?: 3); + /* Put the lock to the waiting list and force the cancel */ + ldlm_set_ast_sent(lock); + } + + if (S_ISREG(lu_object_attr(&child->mot_obj)) && !mdt_object_remote(child) && child != parent) { mdt_object_put(info->mti_env, child); rc = mdt_pack_size2body(info, child_fid, @@ -1942,17 +2384,27 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, unlock_res_and_lock(lock); } LDLM_LOCK_PUT(lock); - GOTO(out_parent, rc = 0); + GOTO(unlock_parent, rc = 0); } LDLM_LOCK_PUT(lock); } EXIT; out_child: - mdt_object_put(info->mti_env, child); -out_parent: + if (child) + mdt_object_put(info->mti_env, child); +unlock_parent: if (lhp) mdt_object_unlock(info, parent, lhp, 1); + if (rc == -ENOENT) { + /* return -ENOKEY instead of -ENOENT to encryption-unaware + * client if trying to access an encrypted file + */ + int rc2 = mdt_check_enc(info, parent); + + if (rc2) + rc = rc2; + } return rc; } @@ -1960,37 +2412,38 @@ out_parent: static int mdt_getattr_name(struct tgt_session_info *tsi) { struct mdt_thread_info *info = tsi2mdt_info(tsi); - struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD]; - struct mdt_body *reqbody; - struct mdt_body *repbody; - int rc, rc2; - ENTRY; + struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD]; + struct mdt_body *reqbody; + struct mdt_body *repbody; + int rc, rc2; - reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(reqbody != NULL); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - LASSERT(repbody != NULL); + ENTRY; + + reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(reqbody != NULL); + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(repbody != NULL); info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF); repbody->mbo_eadatasize = 0; repbody->mbo_aclsize = 0; - rc = mdt_init_ucred_intent_getattr(info, reqbody); - if (unlikely(rc)) - GOTO(out_shrink, rc); + rc = mdt_init_ucred(info, reqbody); + if (unlikely(rc)) + GOTO(out_shrink, rc); - rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL); - if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { - ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode); - lhc->mlh_reg_lh.cookie = 0; - } - mdt_exit_ucred(info); - EXIT; + rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL); + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { + ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode); + lhc->mlh_reg_lh.cookie = 0; + } + mdt_exit_ucred(info); + EXIT; out_shrink: - mdt_client_compatibility(info); - rc2 = mdt_fix_reply(info); - if (rc == 0) - rc = rc2; + mdt_client_compatibility(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; mdt_thread_info_fini(info); return rc; } @@ -2085,7 +2538,7 @@ static int mdt_rmfid_check_permission(struct mdt_thread_info *info, if (la->la_flags & LUSTRE_IMMUTABLE_FL) rc = -EACCES; - if (md_capable(uc, CFS_CAP_DAC_OVERRIDE)) + if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE)) RETURN(0); if (uc->uc_fsuid == la->la_uid) { if ((la->la_mode & S_IWUSR) == 0) @@ -2211,6 +2664,61 @@ out: static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg); +int mdt_io_set_info(struct tgt_session_info *tsi) +{ + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct ost_body *body = NULL, *repbody; + void *key, *val = NULL; + int keylen, vallen, rc = 0; + bool is_grant_shrink; + + ENTRY; + + key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY); + if (key == NULL) { + DEBUG_REQ(D_HA, req, "no set_info key"); + RETURN(err_serious(-EFAULT)); + } + keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY, + RCL_CLIENT); + + val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL); + if (val == NULL) { + DEBUG_REQ(D_HA, req, "no set_info val"); + RETURN(err_serious(-EFAULT)); + } + vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL, + RCL_CLIENT); + + is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK); + if (is_grant_shrink) + /* In this case the value is actually an RMF_OST_BODY, so we + * transmutate the type of this PTLRPC */ + req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO); + + rc = req_capsule_server_pack(tsi->tsi_pill); + if (rc < 0) + RETURN(rc); + + if (is_grant_shrink) { + body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY); + + repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY); + *repbody = *body; + + /** handle grant shrink, similar to a read request */ + tgt_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp, + &repbody->oa); + } else { + CERROR("%s: Unsupported key %s\n", + tgt_name(tsi->tsi_tgt), (char *)key); + rc = -EOPNOTSUPP; + } + + RETURN(rc); +} + + static int mdt_set_info(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); @@ -2256,11 +2764,13 @@ static int mdt_set_info(struct tgt_session_info *tsi) tgt_name(tsi->tsi_tgt), vallen); RETURN(-EINVAL); } - if (ptlrpc_req_need_swab(req)) { + if (req_capsule_req_need_swab(&req->rq_pill)) { __swab64s(&cs->cs_recno); __swab32s(&cs->cs_id); } + if (!mdt_is_rootadmin(tsi2mdt_info(tsi))) + RETURN(-EACCES); rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export, vallen, val, NULL); } else if (KEY_IS(KEY_EVICT_BY_NID)) { @@ -2310,17 +2820,17 @@ static int mdt_readpage(struct tgt_session_info *tsi) exp_max_brw_size(tsi->tsi_exp)); rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT; - OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); - if (rdpg->rp_pages == NULL) - RETURN(-ENOMEM); + OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages); + if (rdpg->rp_pages == NULL) + RETURN(-ENOMEM); - for (i = 0; i < rdpg->rp_npages; ++i) { + for (i = 0; i < rdpg->rp_npages; ++i) { rdpg->rp_pages[i] = alloc_page(GFP_NOFS); - if (rdpg->rp_pages[i] == NULL) - GOTO(free_rdpg, rc = -ENOMEM); - } + if (rdpg->rp_pages[i] == NULL) + GOTO(free_rdpg, rc = -ENOMEM); + } - /* call lower layers to fill allocated pages with directory data */ + /* call lower layers to fill allocated pages with directory data */ rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg); if (rc < 0) GOTO(free_rdpg, rc); @@ -2334,7 +2844,7 @@ free_rdpg: for (i = 0; i < rdpg->rp_npages; i++) if (rdpg->rp_pages[i] != NULL) __free_page(rdpg->rp_pages[i]); - OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]); + OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) RETURN(0); @@ -2377,17 +2887,15 @@ static void mdt_preset_secctx_size(struct mdt_thread_info *info) req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME, RCL_CLIENT)) { if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME, - RCL_CLIENT) != 0) { + RCL_CLIENT) != 0) /* pre-set size in server part with max size */ req_capsule_set_size(pill, &RMF_FILE_SECCTX, RCL_SERVER, - info->mti_mdt->mdt_max_ea_size); - } else { + OBD_MAX_DEFAULT_EA_SIZE); + else req_capsule_set_size(pill, &RMF_FILE_SECCTX, RCL_SERVER, 0); - } } - } static int mdt_reint_internal(struct mdt_thread_info *info, @@ -2429,6 +2937,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); mdt_preset_secctx_size(info); + mdt_preset_encctx_size(info); rc = req_capsule_server_pack(pill); if (rc != 0) { @@ -2479,14 +2988,11 @@ out_shrink: /* * Data-on-MDT optimization - read data along with OPEN and return it - * in reply. Do that only if we have both DOM and LAYOUT locks. + * in reply when possible. */ - if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) && - info->mti_attr.ma_lmm != NULL && - mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) { + if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req)) rc = mdt_dom_read_on_open(info, info->mti_mdt, &lhc->mlh_reg_lh); - } return rc; } @@ -2598,6 +3104,7 @@ static int mdt_sync(struct tgt_session_info *tsi) struct ptlrpc_request *req = tgt_ses_req(tsi); struct req_capsule *pill = tsi->tsi_pill; struct mdt_body *body; + ktime_t kstart = ktime_get(); int rc; ENTRY; @@ -2634,7 +3141,8 @@ static int mdt_sync(struct tgt_session_info *tsi) mdt_thread_info_fini(info); } if (rc == 0) - mdt_counter_incr(req, LPROC_MDT_SYNC); + mdt_counter_incr(req, LPROC_MDT_SYNC, + ktime_us_delta(ktime_get(), kstart)); RETURN(rc); } @@ -2693,17 +3201,17 @@ put: */ static int mdt_quotactl(struct tgt_session_info *tsi) { - struct obd_export *exp = tsi->tsi_exp; - struct req_capsule *pill = tsi->tsi_pill; - struct obd_quotactl *oqctl, *repoqc; - int id, rc; - struct mdt_device *mdt = mdt_exp2dev(exp); - struct lu_device *qmt = mdt->mdt_qmt_dev; - struct lu_nodemap *nodemap; + struct obd_export *exp = tsi->tsi_exp; + struct req_capsule *pill = tsi->tsi_pill; + struct obd_quotactl *oqctl, *repoqc; + int id, rc; + struct mdt_device *mdt = mdt_exp2dev(exp); + struct lu_device *qmt = mdt->mdt_qmt_dev; + struct lu_nodemap *nodemap; ENTRY; oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL); - if (oqctl == NULL) + if (!oqctl) RETURN(err_serious(-EPROTO)); rc = req_capsule_server_pack(pill); @@ -2719,22 +3227,32 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_SETINFO: case Q_SETQUOTA: case LUSTRE_Q_SETDEFAULT: - if (!nodemap_can_setquota(nodemap)) + case LUSTRE_Q_SETQUOTAPOOL: + case LUSTRE_Q_SETINFOPOOL: + case LUSTRE_Q_SETDEFAULT_POOL: + case LUSTRE_Q_DELETEQID: + if (!nodemap_can_setquota(nodemap, oqctl->qc_type, + oqctl->qc_id)) GOTO(out_nodemap, rc = -EPERM); - /* fallthrough */ + fallthrough; case Q_GETINFO: case Q_GETQUOTA: case LUSTRE_Q_GETDEFAULT: + case LUSTRE_Q_GETQUOTAPOOL: + case LUSTRE_Q_GETINFOPOOL: + case LUSTRE_Q_GETDEFAULT_POOL: if (qmt == NULL) GOTO(out_nodemap, rc = -EOPNOTSUPP); /* slave quotactl */ - /* fallthrough */ + fallthrough; case Q_GETOINFO: case Q_GETOQUOTA: break; default: - CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd); - GOTO(out_nodemap, rc = -EFAULT); + rc = -EFAULT; + CERROR("%s: unsupported quotactl command %d: rc = %d\n", + mdt_obd_name(mdt), oqctl->qc_cmd, rc); + GOTO(out_nodemap, rc); } id = oqctl->qc_id; @@ -2748,8 +3266,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi) NODEMAP_CLIENT_TO_FS, id); break; case PRJQUOTA: - /* todo: check/map project id */ - id = oqctl->qc_id; + id = nodemap_map_id(nodemap, NODEMAP_PROJID, + NODEMAP_CLIENT_TO_FS, id); break; default: GOTO(out_nodemap, rc = -EOPNOTSUPP); @@ -2777,6 +3295,13 @@ static int mdt_quotactl(struct tgt_session_info *tsi) case Q_GETQUOTA: case LUSTRE_Q_SETDEFAULT: case LUSTRE_Q_GETDEFAULT: + case LUSTRE_Q_SETQUOTAPOOL: + case LUSTRE_Q_GETQUOTAPOOL: + case LUSTRE_Q_SETINFOPOOL: + case LUSTRE_Q_GETINFOPOOL: + case LUSTRE_Q_SETDEFAULT_POOL: + case LUSTRE_Q_GETDEFAULT_POOL: + case LUSTRE_Q_DELETEQID: /* forward quotactl request to QMT */ rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl); break; @@ -2796,8 +3321,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi) if (oqctl->qc_id != id) swap(oqctl->qc_id, id); - *repoqc = *oqctl; - + QCTL_COPY(repoqc, oqctl); EXIT; out_nodemap: @@ -3097,10 +3621,9 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, rc = lu_env_init(&env, LCT_MD_THREAD); if (unlikely(rc != 0)) { - CWARN("%s: lu_env initialization failed, object" - "%p "DFID" is leaked!\n", + CWARN("%s: lu_env initialization failed, object %p "DFID" is leaked!: rc = %d\n", obd->obd_name, mo, - PFID(mdt_object_fid(mo))); + PFID(mdt_object_fid(mo)), rc); RETURN(rc); } @@ -3183,6 +3706,7 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti, einfo->ei_cb_cp = ldlm_completion_ast; einfo->ei_enq_slave = 0; einfo->ei_res_id = res_id; + einfo->ei_req_slot = 1; if (cache) { /* @@ -3219,10 +3743,9 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o, cache); } -static int mdt_object_local_lock(struct mdt_thread_info *info, - struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 *ibits, - __u64 trybits, bool cos_incompat) +int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o, + struct mdt_lock_handle *lh, __u64 *ibits, + __u64 trybits, bool cos_incompat) { struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; @@ -3306,6 +3829,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info, policy->l_inodebits.bits = *ibits; policy->l_inodebits.try_bits = trybits; + policy->l_inodebits.li_gid = lh->mlh_gid; /* * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is @@ -3372,8 +3896,9 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o, * object anyway XXX*/ if (lh->mlh_type == MDT_PDO_LOCK && lh->mlh_pdo_hash != 0) { - CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to" - "EX lock.\n", mdt_obd_name(info->mti_mdt), + CDEBUG(D_INFO, + "%s: "DFID" convert PDO lock to EX lock.\n", + mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(o))); lh->mlh_pdo_hash = 0; lh->mlh_rreg_mode = LCK_EX; @@ -3513,20 +4038,18 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info, if (lustre_handle_is_used(h)) { struct ldlm_lock *lock = ldlm_handle2lock(h); + struct ptlrpc_request *req = mdt_info_req(info); if (o != NULL && (lock->l_policy_data.l_inodebits.bits & (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE))) mo_invalidate(info->mti_env, mdt_object_child(o)); - if (decref || !info->mti_has_trans || + if (decref || !info->mti_has_trans || !req || !(mode & (LCK_PW | LCK_EX))) { ldlm_lock_decref_and_cancel(h, mode); LDLM_LOCK_PUT(lock); } else { - struct ptlrpc_request *req = mdt_info_req(info); - - LASSERT(req != NULL); tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock, req->rq_transno); ldlm_lock_decref(h, mode); @@ -3675,6 +4198,7 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); mdt_preset_secctx_size(info); + mdt_preset_encctx_size(info); rc = req_capsule_server_pack(pill); if (rc) @@ -3736,12 +4260,11 @@ void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_opdata = 0; info->mti_big_lmm_used = 0; info->mti_big_acl_used = 0; - info->mti_som_valid = 0; + info->mti_som_strict = 0; info->mti_spec.no_create = 0; info->mti_spec.sp_rm_entry = 0; info->mti_spec.sp_permitted = 0; - info->mti_spec.sp_migrate_close = 0; info->mti_spec.u.sp_ea.eadata = NULL; info->mti_spec.u.sp_ea.eadatalen = 0; @@ -3761,6 +4284,7 @@ void mdt_thread_info_fini(struct mdt_thread_info *info) info->mti_env = NULL; info->mti_pill = NULL; info->mti_exp = NULL; + info->mti_mdt = NULL; if (unlikely(info->mti_big_buf.lb_buf != NULL)) lu_buf_free(&info->mti_big_buf); @@ -3788,10 +4312,8 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi) { if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) && cfs_fail_val == - tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) { - set_current_state(TASK_UNINTERRUPTIBLE); - schedule_timeout(cfs_time_seconds(3)); - } + tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) + schedule_timeout_uninterruptible(cfs_time_seconds(3)); return tgt_connect(tsi); } @@ -3823,25 +4345,29 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, /* If possible resent found a lock, @lh is set to its handle */ new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0); - if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) { - lh->mlh_reg_lh.cookie = 0; - RETURN(0); - } - - if (new_lock == NULL && (flags & LDLM_FL_RESENT)) { - /* Lock is pinned by ldlm_handle_enqueue0() as it is - * a resend case, however, it could be already destroyed - * due to client eviction or a raced cancel RPC. */ - LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n", - lh->mlh_reg_lh.cookie); + if (new_lock == NULL) { + if (flags & LDLM_FL_INTENT_ONLY) { + result = 0; + } else if (flags & LDLM_FL_RESENT) { + /* Lock is pinned by ldlm_handle_enqueue0() as it is a + * resend case, however, it could be already destroyed + * due to client eviction or a raced cancel RPC. + */ + LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n", + lh->mlh_reg_lh.cookie); + result = -ESTALE; + } else { + CERROR("%s: Invalid lockh=%#llx flags=%#llx fid1="DFID" fid2="DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), + lh->mlh_reg_lh.cookie, flags, + PFID(&info->mti_tmp_fid1), + PFID(&info->mti_tmp_fid2), result); + result = -ESTALE; + } lh->mlh_reg_lh.cookie = 0; - RETURN(-ESTALE); + RETURN(result); } - LASSERTF(new_lock != NULL, - "lockh %#llx flags %#llx : rc = %d\n", - lh->mlh_reg_lh.cookie, flags, result); - /* * If we've already given this lock to a client once, then we should * have no readers or writers. Otherwise, we should have one reader @@ -3936,7 +4462,7 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info, * If the xid matches, then we know this is a resent request, and allow * it. (It's probably an OPEN, for which we don't send a lock. */ - if (req_can_reconstruct(req, NULL) == 1) + if (req_can_reconstruct(req, NULL) != 0) return; /* @@ -4040,7 +4566,7 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, GOTO(out_shrink, rc = -EINVAL); } - rc = mdt_init_ucred_intent_getattr(info, reqbody); + rc = mdt_init_ucred(info, reqbody); if (rc) GOTO(out_shrink, rc); @@ -4053,13 +4579,14 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc, rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep); ldlm_rep->lock_policy_res2 = clear_serious(rc); - if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG)) - ldlm_rep->lock_policy_res2 = 0; - if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || - ldlm_rep->lock_policy_res2) { - lhc->mlh_reg_lh.cookie = 0ull; - GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED); - } + if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG) && + ldlm_rep->lock_policy_res2 != -ENOKEY) + ldlm_rep->lock_policy_res2 = 0; + if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || + ldlm_rep->lock_policy_res2) { + lhc->mlh_reg_lh.cookie = 0ull; + GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED); + } rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc); EXIT; @@ -4222,6 +4749,7 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, struct ldlm_reply *rep = NULL; long opc; int rc; + struct ptlrpc_request *req = mdt_info_req(info); static const struct req_format *intent_fmts[REINT_MAX] = { [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE, @@ -4239,6 +4767,9 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc, rc = mdt_reint_internal(info, lhc, opc); + if (rc < 0 && lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + DEBUG_REQ(D_ERROR, req, "Replay open failed with %d", rc); + /* Check whether the reply has been packed successfully. */ if (mdt_info_req(info)->rq_repmsg != NULL) rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP); @@ -4319,7 +4850,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc, break; case IT_GETATTR: check_mdt_object = true; - /* fallthrough */ + fallthrough; case IT_LOOKUP: it_format = &RQF_LDLM_INTENT_GETATTR; it_handler = &mdt_intent_getattr; @@ -4451,6 +4982,23 @@ static int mdt_intent_policy(const struct lu_env *env, } else { rc = err_serious(-EFAULT); } + } else if (ldesc->l_resource.lr_type == LDLM_IBITS && + ldesc->l_policy_data.l_inodebits.bits == MDS_INODELOCK_DOM) { + struct ldlm_reply *rep; + + /* No intent was provided but INTENT flag is set along with + * DOM bit, this is considered as GLIMPSE request. + * This logic is common for MDT and OST glimpse + */ + mdt_ptlrpc_stats_update(req, IT_GLIMPSE); + rc = mdt_glimpse_enqueue(info, ns, lockp, flags); + /* Check whether the reply has been packed successfully. */ + if (req->rq_repmsg != NULL) { + rep = req_capsule_server_get(info->mti_pill, + &RMF_DLM_REP); + rep->lock_policy_res2 = + ptlrpc_status_hton(rep->lock_policy_res2); + } } else { /* No intent was provided */ req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); @@ -4583,9 +5131,8 @@ out_free: */ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt) { - struct seq_server_site *ss = mdt_seq_site(mdt); - int rc; - char *prefix; + struct seq_server_site *ss = mdt_seq_site(mdt); + char *prefix; ENTRY; /* check if this is adding the first MDC and controller is not yet @@ -4603,19 +5150,12 @@ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt) /* Note: seq_client_fini will be called in seq_site_fini */ snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt)); - rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA, - prefix, ss->ss_node_id == 0 ? ss->ss_control_seq : + seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA, + prefix, ss->ss_node_id == 0 ? ss->ss_control_seq : NULL); OBD_FREE(prefix, MAX_OBD_NAME + 5); - if (rc != 0) { - OBD_FREE_PTR(ss->ss_client_seq); - ss->ss_client_seq = NULL; - RETURN(rc); - } - rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq); - - RETURN(rc); + RETURN(seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq)); } static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt) @@ -5147,6 +5687,16 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi) return rc; } +static int mdt_llog_open(struct tgt_session_info *tsi) +{ + ENTRY; + + if (!mdt_is_rootadmin(tsi2mdt_info(tsi))) + RETURN(err_serious(-EACCES)); + + RETURN(tgt_llog_open(tsi)); +} + #define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET #define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET #define OST_BRW_READ OST_READ @@ -5161,7 +5711,7 @@ TGT_RPC_HANDLER(MDS_FIRST_OPC, &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION), TGT_RPC_HANDLER(MDS_FIRST_OPC, HAS_REPLY, MDS_SET_INFO, mdt_set_info, - &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION), + &RQF_MDT_SET_INFO, LUSTRE_MDS_VERSION), TGT_MDT_HDL(0, MDS_GET_INFO, mdt_get_info), TGT_MDT_HDL(HAS_REPLY, MDS_GET_ROOT, mdt_get_root), TGT_MDT_HDL(HAS_BODY, MDS_GETATTR, mdt_getattr), @@ -5202,6 +5752,12 @@ TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_PUNCH, mdt_punch_hdl, mdt_hp_punch), TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC, mdt_data_sync), +TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE, + mdt_fallocate_hdl), +TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek), +TGT_RPC_HANDLER(OST_FIRST_OPC, + 0, OST_SET_INFO, mdt_io_set_info, + &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION), }; static struct tgt_handler mdt_sec_ctx_ops[] = { @@ -5214,6 +5770,13 @@ static struct tgt_handler mdt_quota_ops[] = { TGT_QUOTA_HDL(HAS_REPLY, QUOTA_DQACQ, mdt_quota_dqacq), }; +static struct tgt_handler mdt_llog_handlers[] = { + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE, mdt_llog_open), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, tgt_llog_next_block), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header), + TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK, tgt_llog_prev_block), +}; + static struct tgt_opc_slice mdt_common_slice[] = { { .tos_opc_start = MDS_FIRST_OPC, @@ -5258,7 +5821,7 @@ static struct tgt_opc_slice mdt_common_slice[] = { { .tos_opc_start = LLOG_FIRST_OPC, .tos_opc_end = LLOG_LAST_OPC, - .tos_hs = tgt_llog_handlers + .tos_hs = mdt_llog_handlers }, { .tos_opc_start = LFSCK_FIRST_OPC, @@ -5288,6 +5851,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop); mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child)); + + mdt_restriper_stop(m); ping_evictor_stop(); /* Remove the HSM /proc entry so the coordinator cannot be @@ -5407,17 +5972,21 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, RETURN(-EFAULT); } else { lsi = s2lsi(lmi->lmi_sb); + LASSERT(lsi->lsi_lmd); /* CMD is supported only in IAM mode */ LASSERT(num); - node_id = simple_strtol(num, NULL, 10); + rc = kstrtol(num, 10, &node_id); + if (rc) + RETURN(rc); + obd->u.obt.obt_magic = OBT_MAGIC; - if (lsi->lsi_lmd != NULL && - lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK) + if (lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK) m->mdt_skip_lfsck = 1; } - /* DoM files get IO lock at open by default */ - m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN; + /* Just try to get a DoM lock by default. Otherwise, having a group + * lock granted, it may get blocked for a long time. */ + m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN; /* DoM files are read at open and data is packed in the reply */ m->mdt_opts.mo_dom_read_open = 1; @@ -5426,11 +5995,19 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids); spin_lock_init(&m->mdt_squash.rsi_lock); spin_lock_init(&m->mdt_lock); - m->mdt_enable_remote_dir = 1; - m->mdt_enable_striped_dir = 1; + m->mdt_enable_chprojid_gid = 0; m->mdt_enable_dir_migration = 1; + m->mdt_enable_dir_restripe = 0; + m->mdt_enable_dir_auto_split = 0; + m->mdt_enable_parallel_rename_dir = 1; + m->mdt_enable_parallel_rename_file = 1; + m->mdt_enable_remote_dir = 1; m->mdt_enable_remote_dir_gid = 0; m->mdt_enable_remote_rename = 1; + m->mdt_enable_remote_subdir_mount = 1; + m->mdt_enable_striped_dir = 1; + m->mdt_dir_restripe_nsonly = 1; + m->mdt_rename_stats.rs_init = ktime_get(); atomic_set(&m->mdt_mds_mds_conns, 0); atomic_set(&m->mdt_async_commit_count, 0); @@ -5486,8 +6063,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_GREEDY, LDLM_NS_TYPE_MDT); - if (m->mdt_namespace == NULL) - GOTO(err_fini_seq, rc = -ENOMEM); + if (IS_ERR(m->mdt_namespace)) { + rc = PTR_ERR(m->mdt_namespace); + CERROR("%s: unable to create server namespace: rc = %d\n", + obd->obd_name, rc); + m->mdt_namespace = NULL; + GOTO(err_fini_seq, rc); + } m->mdt_namespace->ns_lvbp = m; m->mdt_namespace->ns_lvbo = &mdt_lvbo; @@ -5503,8 +6085,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_free_ns, rc); /* Amount of available space excluded from granting and reserved - * for metadata. It is in percentage and 50% is default value. */ - tgd->tgd_reserved_pcnt = 50; + * for metadata. It is a percentage of the total MDT size. */ + tgd->tgd_reserved_pcnt = 10; if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits)) m->mdt_brw_size = 1U << tgd->tgd_blockbits; @@ -5548,6 +6130,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, else m->mdt_opts.mo_acl = 0; + m->mdt_opts.mo_enable_strict_som = 1; + /* XXX: to support suppgid for ACL, we enable identity_upcall * by default, otherwise, maybe got unexpected -EACCESS. */ if (m->mdt_opts.mo_acl) @@ -5588,7 +6172,17 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT) ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT; + if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_LOCAL_RECOV)) + m->mdt_lut.lut_local_recovery = 1; + + rc = mdt_restriper_start(m); + if (rc) + GOTO(err_ping_evictor, rc); + RETURN(0); + +err_ping_evictor: + ping_evictor_stop(); err_procfs: mdt_tunables_fini(m); err_recovery: @@ -5738,6 +6332,11 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env, init_rwsem(&mo->mot_dom_sem); init_rwsem(&mo->mot_open_sem); atomic_set(&mo->mot_open_count, 0); + mo->mot_restripe_offset = 0; + INIT_LIST_HEAD(&mo->mot_restripe_linkage); + mo->mot_lsom_size = 0; + mo->mot_lsom_blocks = 0; + mo->mot_lsom_inited = false; RETURN(o); } RETURN(NULL); @@ -5765,22 +6364,31 @@ static int mdt_object_init(const struct lu_env *env, struct lu_object *o, RETURN(rc); } +static void mdt_object_free_rcu(struct rcu_head *head) +{ + struct mdt_object *mo = container_of(head, struct mdt_object, + mot_header.loh_rcu); + + kmem_cache_free(mdt_object_kmem, mo); +} + static void mdt_object_free(const struct lu_env *env, struct lu_object *o) { - struct mdt_object *mo = mdt_obj(o); - struct lu_object_header *h; - ENTRY; + struct mdt_object *mo = mdt_obj(o); + struct lu_object_header *h; + ENTRY; - h = o->lo_header; - CDEBUG(D_INFO, "object free, fid = "DFID"\n", - PFID(lu_object_fid(o))); + h = o->lo_header; + CDEBUG(D_INFO, "object free, fid = "DFID"\n", + PFID(lu_object_fid(o))); LASSERT(atomic_read(&mo->mot_open_count) == 0); LASSERT(atomic_read(&mo->mot_lease_count) == 0); lu_object_fini(o); lu_object_header_fini(h); - OBD_SLAB_FREE_PTR(mo, mdt_object_kmem); + OBD_FREE_PRE(mo, sizeof(*mo), "slab-freed"); + call_rcu(&mo->mot_header.loh_rcu, mdt_object_free_rcu); EXIT; } @@ -5879,6 +6487,18 @@ static int mdt_obd_set_info_async(const struct lu_env *env, RETURN(0); } +static inline void mdt_enable_slc(struct mdt_device *mdt) +{ + if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER) + mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING; +} + +static inline void mdt_disable_slc(struct mdt_device *mdt) +{ + if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING) + mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER; +} + /** * Match client and server connection feature flags. * @@ -6015,11 +6635,7 @@ static int mdt_connect_internal(const struct lu_env *env, if (OCD_HAS_FLAG(data, CKSUM)) { __u32 cksum_types = data->ocd_cksum_types; - /* The client set in ocd_cksum_types the checksum types it - * supports. We have to mask off the algorithms that we don't - * support */ - data->ocd_cksum_types &= - obd_cksum_types_supported_server(obd_name); + tgt_mask_cksum_types(&mdt->mdt_lut, &data->ocd_cksum_types); if (unlikely(data->ocd_cksum_types == 0)) { CERROR("%s: Connect with checksum support but no " @@ -6039,6 +6655,15 @@ static int mdt_connect_internal(const struct lu_env *env, exp->exp_obd->obd_name, obd_export_nid2str(exp)); } + if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) && + !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { + atomic_inc(&mdt->mdt_mds_mds_conns); + mdt_enable_slc(mdt); + } + + if (!mdt->mdt_lut.lut_dt_conf.ddp_has_lseek_data_hole) + data->ocd_connect_flags2 &= ~OBD_CONNECT2_LSEEK; + return 0; } @@ -6069,7 +6694,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env, static int mdt_export_cleanup(struct obd_export *exp) { - struct list_head closing_list; + LIST_HEAD(closing_list); struct mdt_export_data *med = &exp->exp_mdt_data; struct obd_device *obd = exp->exp_obd; struct mdt_device *mdt; @@ -6079,7 +6704,6 @@ static int mdt_export_cleanup(struct obd_export *exp) int rc = 0; ENTRY; - INIT_LIST_HEAD(&closing_list); spin_lock(&med->med_open_lock); while (!list_empty(&med->med_open_head)) { struct list_head *tmp = med->med_open_head.next; @@ -6132,7 +6756,8 @@ static int mdt_export_cleanup(struct obd_export *exp) rc = mdt_ctxt_add_dirty_flag(&env, info, mfd); /* Don't unlink orphan on failover umount, LU-184 */ - if (exp->exp_flags & OBD_OPT_FAILOVER) { + if (exp->exp_flags & OBD_OPT_FAILOVER || + exp->exp_obd->obd_stopping) { ma->ma_valid = MA_FLAGS; ma->ma_attr_flags |= MDS_KEEP_ORPHAN; } @@ -6150,18 +6775,6 @@ static int mdt_export_cleanup(struct obd_export *exp) RETURN(rc); } -static inline void mdt_enable_slc(struct mdt_device *mdt) -{ - if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL) - mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL; -} - -static inline void mdt_disable_slc(struct mdt_device *mdt) -{ - if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL) - mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL; -} - static int mdt_obd_disconnect(struct obd_export *exp) { int rc; @@ -6216,12 +6829,6 @@ static int mdt_obd_connect(const struct lu_env *env, mdt = mdt_dev(obd->obd_lu_dev); - if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) && - !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) { - atomic_inc(&mdt->mdt_mds_mds_conns); - mdt_enable_slc(mdt); - } - /* * first, check whether the stack is ready to handle requests * XXX: probably not very appropriate method is used now @@ -6434,18 +7041,19 @@ static int mdt_path_current(struct mdt_thread_info *info, struct getinfo_fid2path *fp, struct lu_fid *root_fid) { - struct mdt_device *mdt = info->mti_mdt; - struct mdt_object *mdt_obj; - struct link_ea_header *leh; - struct link_ea_entry *lee; - struct lu_name *tmpname = &info->mti_name; - struct lu_fid *tmpfid = &info->mti_tmp_fid1; - struct lu_buf *buf = &info->mti_big_buf; - char *ptr; - int reclen; - struct linkea_data ldata = { NULL }; - int rc = 0; - bool first = true; + struct mdt_device *mdt = info->mti_mdt; + struct lu_name *tmpname = &info->mti_name; + struct lu_fid *tmpfid = &info->mti_tmp_fid1; + struct lu_buf *buf = &info->mti_big_buf; + struct linkea_data ldata = { NULL }; + bool first = true; + struct mdt_object *mdt_obj; + struct link_ea_header *leh; + struct link_ea_entry *lee; + char *ptr; + int reclen; + int rc = 0; + ENTRY; /* temp buffer for path element, the buffer will be finally freed @@ -6461,8 +7069,6 @@ static int mdt_path_current(struct mdt_thread_info *info, *tmpfid = fp->gf_fid = *mdt_object_fid(obj); while (!lu_fid_eq(root_fid, &fp->gf_fid)) { - struct lu_buf lmv_buf; - if (!lu_fid_eq(root_fid, &mdt->mdt_md_root_fid) && lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid)) GOTO(out, rc = -ENOENT); @@ -6510,26 +7116,16 @@ static int mdt_path_current(struct mdt_thread_info *info, fp->gf_linkno++; } - lmv_buf.lb_buf = info->mti_xattr_buf; - lmv_buf.lb_len = sizeof(info->mti_xattr_buf); /* Check if it is slave stripes */ - rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj), - &lmv_buf, XATTR_NAME_LMV); + rc = mdt_is_dir_stripe(info, mdt_obj); mdt_object_put(info->mti_env, mdt_obj); - if (rc > 0) { - union lmv_mds_md *lmm = lmv_buf.lb_buf; - - /* For slave stripes, get its master */ - if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) { - fp->gf_fid = *tmpfid; - continue; - } - } else if (rc < 0 && rc != -ENODATA) { + if (rc < 0) GOTO(out, rc); + if (rc == 1) { + fp->gf_fid = *tmpfid; + continue; } - rc = 0; - /* Pack the name in the end of the buffer */ ptr -= tmpname->ln_namelen; if (ptr - 1 <= fp->gf_u.gf_path) @@ -6545,6 +7141,9 @@ static int mdt_path_current(struct mdt_thread_info *info, first = false; } + /* non-zero will be treated as an error */ + rc = 0; + remote_out: ptr++; /* skip leading / */ memmove(fp->gf_u.gf_path, ptr, @@ -6641,12 +7240,24 @@ static int mdt_fid2path(struct mdt_thread_info *info, RETURN(rc); } - if (mdt_object_remote(obj)) + if (mdt_object_remote(obj)) { rc = -EREMOTE; - else if (!mdt_object_exists(obj)) + } else if (!mdt_object_exists(obj)) { rc = -ENOENT; - else - rc = 0; + } else { + struct lu_attr la = { 0 }; + struct dt_object *dt = mdt_obj2dt(obj); + + if (dt && dt->do_ops && dt->do_ops->do_attr_get) + dt_attr_get(info->mti_env, mdt_obj2dt(obj), &la); + if (la.la_valid & LA_FLAGS && la.la_flags & LUSTRE_ENCRYPT_FL) + /* path resolution cannot be carried out on server + * side for encrypted files + */ + rc = -ENODATA; + else + rc = 0; + } if (rc < 0) { mdt_object_put(info->mti_env, obj); @@ -6676,7 +7287,7 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen, fpin = key + cfs_size_round(sizeof(KEY_FID2PATH)); fpout = val; - if (ptlrpc_req_need_swab(info->mti_pill->rc_req)) + if (req_capsule_req_need_swab(info->mti_pill)) lustre_swab_fid2path(fpin); memcpy(fpout, fpin, sizeof(*fpin)); @@ -6687,7 +7298,7 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen, sizeof(struct lu_fid)) { /* client sent its root FID, which is normally fileset FID */ root_fid = fpin->gf_u.gf_root_fid; - if (ptlrpc_req_need_swab(info->mti_pill->rc_req)) + if (req_capsule_req_need_swab(info->mti_pill)) lustre_swab_lu_fid(root_fid); if (root_fid != NULL && !fid_is_sane(root_fid)) @@ -6819,12 +7430,21 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, if (rc == 0) rc = dt_ro(&env, dt); break; - case OBD_IOC_ABORT_RECOVERY: + case OBD_IOC_ABORT_RECOVERY: { + struct obd_ioctl_data *data = karg; + CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt)); - obd->obd_abort_recovery = 1; - target_stop_recovery_thread(obd); + if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) { + obd->obd_abort_recov_mdt = 1; + wake_up(&obd->obd_next_transno_waitq); + } else { /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */ + /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */ + obd->obd_abort_recovery = 1; + target_stop_recovery_thread(obd); + } rc = 0; break; + } case OBD_IOC_CHANGELOG_REG: case OBD_IOC_CHANGELOG_DEREG: case OBD_IOC_CHANGELOG_CLEAR: @@ -6937,7 +7557,7 @@ static int mdt_obd_postrecov(struct obd_device *obd) return rc; } -static struct obd_ops mdt_obd_device_ops = { +static const struct obd_ops mdt_obd_device_ops = { .o_owner = THIS_MODULE, .o_set_info_async = mdt_obd_set_info_async, .o_connect = mdt_obd_connect, @@ -7069,10 +7689,10 @@ int mdt_cos_is_enabled(struct mdt_device *mdt) return mdt->mdt_opts.mo_cos != 0; } -static struct lu_device_type_operations mdt_device_type_ops = { - .ldto_device_alloc = mdt_device_alloc, - .ldto_device_free = mdt_device_free, - .ldto_device_fini = mdt_device_fini +static const struct lu_device_type_operations mdt_device_type_ops = { + .ldto_device_alloc = mdt_device_alloc, + .ldto_device_free = mdt_device_free, + .ldto_device_fini = mdt_device_fini }; static struct lu_device_type mdt_device_type = { @@ -7098,7 +7718,7 @@ static int __init mdt_init(void) if (rc) GOTO(lu_fini, rc); - rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL, + rc = class_register_type(&mdt_obd_device_ops, NULL, true, LUSTRE_MDT_NAME, &mdt_device_type); if (rc) GOTO(mds_fini, rc);