Whamcloud - gitweb
LU-15938 lod: prevent endless retry in recovery thread
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 29a4f51..e939c6e 100644 (file)
@@ -63,6 +63,7 @@
 #include <lustre_barrier.h>
 #include <obd_cksum.h>
 #include <llog_swab.h>
+#include <lustre_crypto.h>
 
 #include "mdt_internal.h"
 
@@ -270,6 +271,41 @@ static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o
         EXIT;
 }
 
+/**
+ * Check whether \a o is directory stripe object.
+ *
+ * \param[in]  info    thread environment
+ * \param[in]  o       MDT object
+ *
+ * \retval 1   is directory stripe.
+ * \retval 0   isn't directory stripe.
+ * \retval < 1  error code
+ */
+static int mdt_is_dir_stripe(struct mdt_thread_info *info,
+                               struct mdt_object *o)
+{
+       struct md_attr *ma = &info->mti_attr;
+       struct lmv_mds_md_v1 *lmv;
+       int rc;
+
+       rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+       if (rc < 0)
+               return rc;
+
+       if (!(ma->ma_valid & MA_LMV))
+               return 0;
+
+       lmv = &ma->ma_lmv->lmv_md_v1;
+
+       if (!lmv_is_sane2(lmv))
+               return -EBADF;
+
+       if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE)
+               return 1;
+
+       return 0;
+}
+
 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                              struct lu_fid *fid)
 {
@@ -765,8 +801,9 @@ static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                         const struct lu_attr *attr, const struct lu_fid *fid)
 {
-       struct md_attr *ma = &info->mti_attr;
+       struct mdt_device *mdt = info->mti_mdt;
        struct obd_export *exp = info->mti_exp;
+       struct md_attr *ma = &info->mti_attr;
        struct lu_nodemap *nodemap = NULL;
 
        LASSERT(ma->ma_valid & MA_INODE);
@@ -795,7 +832,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_nlink = attr->la_nlink;
                b->mbo_valid |= OBD_MD_FLNLINK;
        }
-       if (attr->la_valid & (LA_UID|LA_GID)) {
+       if (attr->la_valid & (LA_UID|LA_GID|LA_PROJID)) {
                nodemap = nodemap_get_from_exp(exp);
                if (IS_ERR(nodemap))
                        goto out;
@@ -814,8 +851,9 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
        }
 
        if (attr->la_valid & LA_PROJID) {
-               /* TODO, nodemap for project id */
-               b->mbo_projid = attr->la_projid;
+               b->mbo_projid = nodemap_map_id(nodemap, NODEMAP_PROJID,
+                                              NODEMAP_FS_TO_CLIENT,
+                                              attr->la_projid);
                b->mbo_valid |= OBD_MD_FLPROJID;
        }
 
@@ -860,7 +898,9 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                        else
                                b->mbo_blocks = 1;
                        b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-               } else if (info->mti_som_valid) { /* som is valid */
+               } else if (info->mti_som_strict &&
+                          mdt->mdt_opts.mo_enable_strict_som) {
+                       /* use SOM for size*/
                        b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
                } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
                        b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
@@ -1284,6 +1324,20 @@ out:
        RETURN(rc);
 }
 
+static void mdt_preset_encctx_size(struct mdt_thread_info *info)
+{
+       struct req_capsule *pill = info->mti_pill;
+
+       ENTRY;
+       if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
+                                 RCL_SERVER))
+               /* pre-set size in server part with max size */
+               req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
+                                    RCL_SERVER,
+                                    info->mti_mdt->mdt_max_mdsize);
+       EXIT;
+}
+
 static int mdt_getattr_internal(struct mdt_thread_info *info,
                                struct mdt_object *o, int ma_need)
 {
@@ -1419,6 +1473,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
        if (mdt_body_has_lov(la, reqbody)) {
                u32 stripe_count = 1;
+               bool fixed_layout = false;
 
                if (ma->ma_valid & MA_LOV) {
                        LASSERT(ma->ma_lmm_size);
@@ -1443,6 +1498,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
 
                        stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+                       fixed_layout = lmv_is_fixed(lmv);
                        if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
                                mdt_restripe_migrate_add(info, o);
                        else if (magic == LMV_MAGIC_V1 &&
@@ -1474,7 +1530,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                    !fid_is_root(mdt_object_fid(o)) &&
                    mdt->mdt_enable_dir_auto_split &&
                    !o->mot_restriping &&
-                   stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1)
+                   stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
+                   !fixed_layout)
                        mdt_auto_split_add(info, o);
        } else if (S_ISLNK(la->la_mode) &&
                   reqbody->mbo_valid & OBD_MD_LINKNAME) {
@@ -1601,6 +1658,7 @@ static int mdt_getattr(struct tgt_session_info *tsi)
         * enlarge the buffer when necessary. */
        req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
                             LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+       mdt_preset_encctx_size(info);
 
        rc = req_capsule_server_pack(pill);
        if (unlikely(rc != 0))
@@ -1618,6 +1676,10 @@ static int mdt_getattr(struct tgt_session_info *tsi)
        info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
 
        rc = mdt_getattr_internal(info, obj, 0);
+       if (unlikely(rc))
+               GOTO(out_shrink, rc);
+
+       rc = mdt_pack_encctx_in_reply(info, obj);
        EXIT;
 out_shrink:
        mdt_client_compatibility(info);
@@ -1805,29 +1867,143 @@ out:
 
 static int mdt_raw_lookup(struct mdt_thread_info *info,
                          struct mdt_object *parent,
-                         const struct lu_name *lname,
-                         struct ldlm_reply *ldlm_rep)
+                         const struct lu_name *lname)
 {
-       struct lu_fid   *child_fid = &info->mti_tmp_fid1;
-       int              rc;
+       struct lu_fid *fid = &info->mti_tmp_fid1;
+       struct mdt_body *repbody;
+       bool is_dotdot = false;
+       bool is_old_parent_stripe = false;
+       bool is_new_parent_checked = false;
+       int rc;
+
        ENTRY;
 
        LASSERT(!info->mti_cross_ref);
+       /* Always allow to lookup ".." */
+       if (lname->ln_namelen == 2 &&
+           lname->ln_name[0] == '.' && lname->ln_name[1] == '.') {
+               info->mti_spec.sp_permitted = 1;
+               is_dotdot = true;
+               if (mdt_is_dir_stripe(info, parent) == 1)
+                       is_old_parent_stripe = true;
+       }
 
+       mdt_object_get(info->mti_env, parent);
+lookup:
        /* Only got the fid of this obj by name */
-       fid_zero(child_fid);
-       rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
-                       lname, child_fid, &info->mti_spec);
-       if (rc == 0) {
-               struct mdt_body *repbody;
+       fid_zero(fid);
+       rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid,
+                       &info->mti_spec);
+       mdt_object_put(info->mti_env, parent);
+       if (rc)
+               RETURN(rc);
+
+       /* getattr_name("..") should return master object FID for striped dir */
+       if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) {
+               parent = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+               if (IS_ERR(parent))
+                       RETURN(PTR_ERR(parent));
+
+               /* old client getattr_name("..") with stripe FID */
+               if (unlikely(is_old_parent_stripe)) {
+                       is_old_parent_stripe = false;
+                       goto lookup;
+               }
+
+               /* ".." may be a stripe */
+               if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) {
+                       is_new_parent_checked = true;
+                       goto lookup;
+               }
+
+               mdt_object_put(info->mti_env, parent);
+       }
+
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       repbody->mbo_fid1 = *fid;
+       repbody->mbo_valid = OBD_MD_FLID;
+
+       RETURN(rc);
+}
+
+/**
+ * Find name matching hash
+ *
+ * We search \a child LinkEA for a name whose hash matches \a lname
+ * (it contains an encoded hash).
+ *
+ * \param info mdt thread info
+ * \param lname encoded hash to find
+ * \param parent parent object
+ * \param child object to search with LinkEA
+ *
+ * \retval 1 match found
+ * \retval 0 no match found
+ * \retval -ev negative errno upon error
+ */
+int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname,
+                           struct mdt_object *parent, struct mdt_object *child)
+{
+       /* Here, lname is an encoded hash of on-disk name, and
+        * client is doing access without encryption key.
+        * So we need to get LinkEA, check parent fid is correct and
+        * compare name hash with the one in the request.
+        */
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct lu_name name;
+       struct lu_fid pfid;
+       struct linkea_data ldata = { NULL };
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       struct lu_buf link = { 0 };
+       char *hash;
+       int reclen, count, rc;
+
+       ENTRY;
+       if (lname->ln_namelen < LL_CRYPTO_BLOCK_SIZE)
+               RETURN(-EINVAL);
+
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (!buf->lb_buf)
+               RETURN(-ENOMEM);
 
-               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-               repbody->mbo_fid1 = *child_fid;
-               repbody->mbo_valid = OBD_MD_FLID;
-               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
-       } else if (rc == -ENOENT) {
-               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+       ldata.ld_buf = buf;
+       rc = mdt_links_read(info, child, &ldata);
+       if (rc < 0)
+               RETURN(rc);
+
+       hash = kmalloc(lname->ln_namelen, GFP_NOFS);
+       if (!hash)
+               RETURN(-ENOMEM);
+       rc = critical_decode(lname->ln_name, lname->ln_namelen, hash);
+
+       leh = buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       for (count = 0; count < leh->leh_reccount; count++) {
+               linkea_entry_unpack(lee, &reclen, &name, &pfid);
+               if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) {
+                       lu_buf_check_and_alloc(&link, name.ln_namelen);
+                       if (!link.lb_buf)
+                               GOTO(out_match, rc = -ENOMEM);
+                       rc = critical_decode(name.ln_name, name.ln_namelen,
+                                            link.lb_buf);
+
+                       if (memcmp(LLCRYPT_EXTRACT_DIGEST(link.lb_buf, rc),
+                                  hash, LL_CRYPTO_BLOCK_SIZE) == 0) {
+                               *lname = name;
+                               break;
+                       }
+               }
+               lee = (struct link_ea_entry *) ((char *)lee + reclen);
        }
+       if (count == leh->leh_reccount)
+               rc = 0;
+       else
+               rc = 1;
+
+out_match:
+       lu_buf_free(&link);
+       kfree(hash);
 
        RETURN(rc);
 }
@@ -1928,7 +2104,30 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        lname = &info->mti_name;
        mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
 
-       if (lu_name_is_valid(lname)) {
+       if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
+               reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
+               if (unlikely(reqbody == NULL))
+                       RETURN(err_serious(-EPROTO));
+
+               *child_fid = reqbody->mbo_fid2;
+               if (unlikely(!fid_is_sane(child_fid)))
+                       RETURN(err_serious(-EINVAL));
+
+               if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
+                       mdt_object_get(info->mti_env, parent);
+                       child = parent;
+               } else {
+                       child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                               child_fid);
+                       if (IS_ERR(child))
+                               RETURN(PTR_ERR(child));
+               }
+
+               CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
+                      "ldlm_rep = %p\n",
+                      PFID(mdt_object_fid(parent)),
+                      PFID(&reqbody->mbo_fid2), ldlm_rep);
+       } else if (lu_name_is_valid(lname)) {
                if (mdt_object_remote(parent)) {
                        CERROR("%s: parent "DFID" is on remote target\n",
                               mdt_obd_name(info->mti_mdt),
@@ -1980,22 +2179,37 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
 
-       if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
+       if (unlikely(!mdt_object_exists(parent)) &&
+           !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
+           lu_name_is_valid(lname)) {
                LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                &parent->mot_obj,
                                "Parent doesn't exist!");
                GOTO(out_child, rc = -ESTALE);
        }
 
-       if (lu_name_is_valid(lname)) {
-               /* Always allow to lookup ".." */
-               if (unlikely(lname->ln_namelen == 2 &&
-                            lname->ln_name[0] == '.' &&
-                            lname->ln_name[1] == '.'))
-                       info->mti_spec.sp_permitted = 1;
-
+       if (!child && is_resent) {
+               lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+               if (lock == NULL) {
+                       /* Lock is pinned by ldlm_handle_enqueue0() as it is
+                        * a resend case, however, it could be already destroyed
+                        * due to client eviction or a raced cancel RPC.
+                        */
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
+                                         lhc->mlh_reg_lh.cookie);
+                       RETURN(-ESTALE);
+               }
+               fid_extract_from_res_name(child_fid,
+                                         &lock->l_resource->lr_name);
+               LDLM_LOCK_PUT(lock);
+               child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                       child_fid);
+               if (IS_ERR(child))
+                       RETURN(PTR_ERR(child));
+       } else if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
+           lu_name_is_valid(lname)) {
                if (info->mti_body->mbo_valid == OBD_MD_FLID) {
-                       rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
+                       rc = mdt_raw_lookup(info, parent, lname);
 
                        RETURN(rc);
                }
@@ -2031,6 +2245,19 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        /* step 3: lock child regardless if it is local or remote. */
        LASSERT(child);
 
+       if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
+               /* Here, lname is an encoded hash of on-disk name, and
+                * client is doing access without encryption key.
+                * So we need to compare name hash with the one in the request.
+                */
+               if (!find_name_matching_hash(info, lname, parent,
+                                            child)) {
+                       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+                       mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+                       GOTO(out_child, rc = -ENOENT);
+               }
+       }
+
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
        if (!mdt_object_exists(child)) {
                LU_OBJECT_DEBUG(D_INODE, info->mti_env,
@@ -2104,19 +2331,22 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        /* finally, we can get attr for child. */
        rc = mdt_getattr_internal(info, child, ma_need);
        if (unlikely(rc != 0)) {
-               mdt_object_unlock(info, child, lhc, 1);
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
                GOTO(out_child, rc);
        }
 
        rc = mdt_pack_secctx_in_reply(info, child);
        if (unlikely(rc)) {
-               mdt_object_unlock(info, child, lhc, 1);
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
                GOTO(out_child, rc);
        }
 
        rc = mdt_pack_encctx_in_reply(info, child);
        if (unlikely(rc)) {
-               mdt_object_unlock(info, child, lhc, 1);
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
                GOTO(out_child, rc);
        }
 
@@ -2130,6 +2360,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         PLDLMRES(lock->l_resource),
                         PFID(mdt_object_fid(child)));
 
+               if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
+                       if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+                               OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
+                                                req->rq_deadline -
+                                                req->rq_arrival_time.tv_sec +
+                                                cfs_fail_val ?: 3);
+                       /* Put the lock to the waiting list and force the cancel */
+                       ldlm_set_ast_sent(lock);
+               }
+
                if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
                    !mdt_object_remote(child) && child != parent) {
                        mdt_object_put(info->mti_env, child);
@@ -2156,6 +2396,15 @@ out_child:
 unlock_parent:
        if (lhp)
                mdt_object_unlock(info, parent, lhp, 1);
+       if (rc == -ENOENT) {
+               /* return -ENOKEY instead of -ENOENT to encryption-unaware
+                * client if trying to access an encrypted file
+                */
+               int rc2 = mdt_check_enc(info, parent);
+
+               if (rc2)
+                       rc = rc2;
+       }
        return rc;
 }
 
@@ -2289,7 +2538,7 @@ static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
        if (la->la_flags & LUSTRE_IMMUTABLE_FL)
                        rc = -EACCES;
 
-       if (md_capable(uc, CAP_DAC_OVERRIDE))
+       if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE))
                RETURN(0);
        if (uc->uc_fsuid == la->la_uid) {
                if ((la->la_mode & S_IWUSR) == 0)
@@ -2415,6 +2664,61 @@ out:
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg);
 
+int mdt_io_set_info(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct ost_body         *body = NULL, *repbody;
+       void                    *key, *val = NULL;
+       int                      keylen, vallen, rc = 0;
+       bool                     is_grant_shrink;
+
+       ENTRY;
+
+       key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
+       if (key == NULL) {
+               DEBUG_REQ(D_HA, req, "no set_info key");
+               RETURN(err_serious(-EFAULT));
+       }
+       keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
+                                     RCL_CLIENT);
+
+       val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
+       if (val == NULL) {
+               DEBUG_REQ(D_HA, req, "no set_info val");
+               RETURN(err_serious(-EFAULT));
+       }
+       vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
+                                     RCL_CLIENT);
+
+       is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK);
+       if (is_grant_shrink)
+               /* In this case the value is actually an RMF_OST_BODY, so we
+                * transmutate the type of this PTLRPC */
+               req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO);
+
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (is_grant_shrink) {
+               body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+               repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+               *repbody = *body;
+
+               /** handle grant shrink, similar to a read request */
+               tgt_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp,
+                                      &repbody->oa);
+       } else {
+               CERROR("%s: Unsupported key %s\n",
+                      tgt_name(tsi->tsi_tgt), (char *)key);
+               rc = -EOPNOTSUPP;
+       }
+
+       RETURN(rc);
+}
+
+
 static int mdt_set_info(struct tgt_session_info *tsi)
 {
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
@@ -2460,7 +2764,7 @@ static int mdt_set_info(struct tgt_session_info *tsi)
                               tgt_name(tsi->tsi_tgt), vallen);
                        RETURN(-EINVAL);
                }
-               if (ptlrpc_req_need_swab(req)) {
+               if (req_capsule_req_need_swab(&req->rq_pill)) {
                        __swab64s(&cs->cs_recno);
                        __swab32s(&cs->cs_id);
                }
@@ -2594,18 +2898,6 @@ static void mdt_preset_secctx_size(struct mdt_thread_info *info)
        }
 }
 
-static void mdt_preset_encctx_size(struct mdt_thread_info *info)
-{
-       struct req_capsule *pill = info->mti_pill;
-
-       if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
-                                 RCL_SERVER))
-               /* pre-set size in server part with max size */
-               req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
-                                    RCL_SERVER,
-                                    info->mti_mdt->mdt_max_mdsize);
-}
-
 static int mdt_reint_internal(struct mdt_thread_info *info,
                               struct mdt_lock_handle *lhc,
                               __u32 op)
@@ -2937,18 +3229,22 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case LUSTRE_Q_SETDEFAULT:
        case LUSTRE_Q_SETQUOTAPOOL:
        case LUSTRE_Q_SETINFOPOOL:
-               if (!nodemap_can_setquota(nodemap))
+       case LUSTRE_Q_SETDEFAULT_POOL:
+       case LUSTRE_Q_DELETEQID:
+               if (!nodemap_can_setquota(nodemap, oqctl->qc_type,
+                                         oqctl->qc_id))
                        GOTO(out_nodemap, rc = -EPERM);
-               /* fallthrough */
+               fallthrough;
        case Q_GETINFO:
        case Q_GETQUOTA:
        case LUSTRE_Q_GETDEFAULT:
        case LUSTRE_Q_GETQUOTAPOOL:
        case LUSTRE_Q_GETINFOPOOL:
+       case LUSTRE_Q_GETDEFAULT_POOL:
                if (qmt == NULL)
                        GOTO(out_nodemap, rc = -EOPNOTSUPP);
                /* slave quotactl */
-               /* fallthrough */
+               fallthrough;
        case Q_GETOINFO:
        case Q_GETOQUOTA:
                break;
@@ -2970,8 +3266,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
                                    NODEMAP_CLIENT_TO_FS, id);
                break;
        case PRJQUOTA:
-               /* todo: check/map project id */
-               id = oqctl->qc_id;
+               id = nodemap_map_id(nodemap, NODEMAP_PROJID,
+                                   NODEMAP_CLIENT_TO_FS, id);
                break;
        default:
                GOTO(out_nodemap, rc = -EOPNOTSUPP);
@@ -3003,6 +3299,9 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case LUSTRE_Q_GETQUOTAPOOL:
        case LUSTRE_Q_SETINFOPOOL:
        case LUSTRE_Q_GETINFOPOOL:
+       case LUSTRE_Q_SETDEFAULT_POOL:
+       case LUSTRE_Q_GETDEFAULT_POOL:
+       case LUSTRE_Q_DELETEQID:
                /* forward quotactl request to QMT */
                rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
                break;
@@ -3407,6 +3706,7 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
        einfo->ei_cb_cp = ldlm_completion_ast;
        einfo->ei_enq_slave = 0;
        einfo->ei_res_id = res_id;
+       einfo->ei_req_slot = 1;
 
        if (cache) {
                /*
@@ -3960,7 +4260,7 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_opdata = 0;
        info->mti_big_lmm_used = 0;
        info->mti_big_acl_used = 0;
-       info->mti_som_valid = 0;
+       info->mti_som_strict = 0;
 
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
@@ -4045,25 +4345,29 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
        /* If possible resent found a lock, @lh is set to its handle */
        new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
 
-        if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
-                lh->mlh_reg_lh.cookie = 0;
-                RETURN(0);
-        }
-
-       if (new_lock == NULL && (flags & LDLM_FL_RESENT)) {
-               /* Lock is pinned by ldlm_handle_enqueue0() as it is
-                * a resend case, however, it could be already destroyed
-                * due to client eviction or a raced cancel RPC. */
-               LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
-                                 lh->mlh_reg_lh.cookie);
+       if (new_lock == NULL) {
+               if (flags & LDLM_FL_INTENT_ONLY) {
+                       result = 0;
+               } else if (flags & LDLM_FL_RESENT) {
+                       /* Lock is pinned by ldlm_handle_enqueue0() as it is a
+                        * resend case, however, it could be already destroyed
+                        * due to client eviction or a raced cancel RPC.
+                        */
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
+                                         lh->mlh_reg_lh.cookie);
+                       result = -ESTALE;
+               } else {
+                       CERROR("%s: Invalid lockh=%#llx flags=%#llx fid1="DFID" fid2="DFID": rc = %d\n",
+                              mdt_obd_name(info->mti_mdt),
+                              lh->mlh_reg_lh.cookie, flags,
+                              PFID(&info->mti_tmp_fid1),
+                              PFID(&info->mti_tmp_fid2), result);
+                       result = -ESTALE;
+               }
                lh->mlh_reg_lh.cookie = 0;
-               RETURN(-ESTALE);
+               RETURN(result);
        }
 
-       LASSERTF(new_lock != NULL,
-                "lockh %#llx flags %#llx : rc = %d\n",
-                lh->mlh_reg_lh.cookie, flags, result);
-
         /*
          * If we've already given this lock to a client once, then we should
          * have no readers or writers.  Otherwise, we should have one reader
@@ -4275,13 +4579,14 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc,
        rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
        ldlm_rep->lock_policy_res2 = clear_serious(rc);
 
-        if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
-                ldlm_rep->lock_policy_res2 = 0;
-        if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
-            ldlm_rep->lock_policy_res2) {
-                lhc->mlh_reg_lh.cookie = 0ull;
-                GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
-        }
+       if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG) &&
+           ldlm_rep->lock_policy_res2 != -ENOKEY)
+               ldlm_rep->lock_policy_res2 = 0;
+       if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
+           ldlm_rep->lock_policy_res2) {
+               lhc->mlh_reg_lh.cookie = 0ull;
+               GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
+       }
 
        rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
         EXIT;
@@ -4545,7 +4850,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
                break;
        case IT_GETATTR:
                check_mdt_object = true;
-               /* fallthrough */
+               fallthrough;
        case IT_LOOKUP:
                it_format = &RQF_LDLM_INTENT_GETATTR;
                it_handler = &mdt_intent_getattr;
@@ -5447,7 +5752,12 @@ TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE,
                                         OST_PUNCH,     mdt_punch_hdl,
                                                        mdt_hp_punch),
 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC,    mdt_data_sync),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE,
+                                                       mdt_fallocate_hdl),
 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek),
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+               0,                      OST_SET_INFO,   mdt_io_set_info,
+               &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION),
 };
 
 static struct tgt_handler mdt_sec_ctx_ops[] = {
@@ -5685,16 +5995,19 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
        spin_lock_init(&m->mdt_squash.rsi_lock);
        spin_lock_init(&m->mdt_lock);
-       m->mdt_enable_remote_dir = 1;
-       m->mdt_enable_striped_dir = 1;
+       m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_dir_migration = 1;
        m->mdt_enable_dir_restripe = 0;
        m->mdt_enable_dir_auto_split = 0;
+       m->mdt_enable_parallel_rename_dir = 1;
+       m->mdt_enable_parallel_rename_file = 1;
+       m->mdt_enable_remote_dir = 1;
        m->mdt_enable_remote_dir_gid = 0;
-       m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_remote_rename = 1;
-       m->mdt_dir_restripe_nsonly = 1;
        m->mdt_enable_remote_subdir_mount = 1;
+       m->mdt_enable_striped_dir = 1;
+       m->mdt_dir_restripe_nsonly = 1;
+       m->mdt_rename_stats.rs_init = ktime_get();
 
        atomic_set(&m->mdt_mds_mds_conns, 0);
        atomic_set(&m->mdt_async_commit_count, 0);
@@ -5817,6 +6130,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        else
                m->mdt_opts.mo_acl = 0;
 
+       m->mdt_opts.mo_enable_strict_som = 1;
+
        /* XXX: to support suppgid for ACL, we enable identity_upcall
         * by default, otherwise, maybe got unexpected -EACCESS. */
        if (m->mdt_opts.mo_acl)
@@ -6019,6 +6334,9 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                atomic_set(&mo->mot_open_count, 0);
                mo->mot_restripe_offset = 0;
                INIT_LIST_HEAD(&mo->mot_restripe_linkage);
+               mo->mot_lsom_size = 0;
+               mo->mot_lsom_blocks = 0;
+               mo->mot_lsom_inited = false;
                RETURN(o);
        }
        RETURN(NULL);
@@ -6438,7 +6756,8 @@ static int mdt_export_cleanup(struct obd_export *exp)
                                rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
 
                        /* Don't unlink orphan on failover umount, LU-184 */
-                       if (exp->exp_flags & OBD_OPT_FAILOVER) {
+                       if (exp->exp_flags & OBD_OPT_FAILOVER ||
+                           exp->exp_obd->obd_stopping) {
                                ma->ma_valid = MA_FLAGS;
                                ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
                        }
@@ -6726,7 +7045,6 @@ static int mdt_path_current(struct mdt_thread_info *info,
        struct lu_name *tmpname = &info->mti_name;
        struct lu_fid *tmpfid = &info->mti_tmp_fid1;
        struct lu_buf *buf = &info->mti_big_buf;
-       struct md_attr *ma = &info->mti_attr;
        struct linkea_data ldata = { NULL };
        bool first = true;
        struct mdt_object *mdt_obj;
@@ -6799,22 +7117,13 @@ static int mdt_path_current(struct mdt_thread_info *info,
                }
 
                /* Check if it is slave stripes */
-               rc = mdt_stripe_get(info, mdt_obj, ma, XATTR_NAME_LMV);
+               rc = mdt_is_dir_stripe(info, mdt_obj);
                mdt_object_put(info->mti_env, mdt_obj);
                if (rc < 0)
                        GOTO(out, rc);
-
-               if (ma->ma_valid & MA_LMV) {
-                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
-
-                       if (!lmv_is_sane2(lmv))
-                               GOTO(out, rc = -EBADF);
-
-                       /* For slave stripes, get its master */
-                       if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE) {
-                               fp->gf_fid = *tmpfid;
-                               continue;
-                       }
+               if (rc == 1) {
+                       fp->gf_fid = *tmpfid;
+                       continue;
                }
 
                /* Pack the name in the end of the buffer */
@@ -6931,12 +7240,24 @@ static int mdt_fid2path(struct mdt_thread_info *info,
                RETURN(rc);
        }
 
-       if (mdt_object_remote(obj))
+       if (mdt_object_remote(obj)) {
                rc = -EREMOTE;
-       else if (!mdt_object_exists(obj))
+       } else if (!mdt_object_exists(obj)) {
                rc = -ENOENT;
-       else
-               rc = 0;
+       } else {
+               struct lu_attr la = { 0 };
+               struct dt_object *dt = mdt_obj2dt(obj);
+
+               if (dt && dt->do_ops && dt->do_ops->do_attr_get)
+                       dt_attr_get(info->mti_env, mdt_obj2dt(obj), &la);
+               if (la.la_valid & LA_FLAGS && la.la_flags & LUSTRE_ENCRYPT_FL)
+                       /* path resolution cannot be carried out on server
+                        * side for encrypted files
+                        */
+                       rc = -ENODATA;
+               else
+                       rc = 0;
+       }
 
        if (rc < 0) {
                mdt_object_put(info->mti_env, obj);
@@ -6966,7 +7287,7 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen,
        fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
        fpout = val;
 
-       if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+       if (req_capsule_req_need_swab(info->mti_pill))
                lustre_swab_fid2path(fpin);
 
        memcpy(fpout, fpin, sizeof(*fpin));
@@ -6977,7 +7298,7 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen,
                      sizeof(struct lu_fid)) {
                /* client sent its root FID, which is normally fileset FID */
                root_fid = fpin->gf_u.gf_root_fid;
-               if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+               if (req_capsule_req_need_swab(info->mti_pill))
                        lustre_swab_lu_fid(root_fid);
 
                if (root_fid != NULL && !fid_is_sane(root_fid))
@@ -7112,14 +7433,18 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
        case OBD_IOC_ABORT_RECOVERY: {
                struct obd_ioctl_data *data = karg;
 
-               CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
-               if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT)
+               if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) {
+                       CERROR("%s: Aborting MDT recovery\n",
+                              mdt_obd_name(mdt));
                        obd->obd_abort_recov_mdt = 1;
-               else /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */
+                       wake_up(&obd->obd_next_transno_waitq);
+               } else { /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */
                        /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */
+                       CERROR("%s: Aborting client recovery\n",
+                              mdt_obd_name(mdt));
                        obd->obd_abort_recovery = 1;
-
-               target_stop_recovery_thread(obd);
+                       target_stop_recovery_thread(obd);
+               }
                rc = 0;
                break;
        }