Whamcloud - gitweb
LU-14999 mdt: Deadlock on parent during resend
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 2f069ff..3040be6 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/mdt/mdt_handler.c
  *
 #include <uapi/linux/lustre/lustre_param.h>
 #include <lustre_quota.h>
 #include <lustre_swab.h>
+#include <lustre_lmv.h>
 #include <obd.h>
 #include <obd_support.h>
 #include <lustre_barrier.h>
 #include <obd_cksum.h>
 #include <llog_swab.h>
+#include <lustre_crypto.h>
 
 #include "mdt_internal.h"
 
@@ -163,6 +164,13 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
        lh->mlh_type = MDT_REG_LOCK;
 }
 
+void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
+{
+       mdt_lock_reg_init(lh, lock->l_req_mode);
+       if (lock->l_req_mode == LCK_GROUP)
+               lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
+}
+
 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
                       const struct lu_name *lname)
 {
@@ -263,11 +271,47 @@ static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o
         EXIT;
 }
 
+/**
+ * Check whether \a o is directory stripe object.
+ *
+ * \param[in]  info    thread environment
+ * \param[in]  o       MDT object
+ *
+ * \retval 1   is directory stripe.
+ * \retval 0   isn't directory stripe.
+ * \retval < 1  error code
+ */
+static int mdt_is_dir_stripe(struct mdt_thread_info *info,
+                               struct mdt_object *o)
+{
+       struct md_attr *ma = &info->mti_attr;
+       struct lmv_mds_md_v1 *lmv;
+       int rc;
+
+       rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+       if (rc < 0)
+               return rc;
+
+       if (!(ma->ma_valid & MA_LMV))
+               return 0;
+
+       lmv = &ma->ma_lmv->lmv_md_v1;
+
+       if (!lmv_is_sane2(lmv))
+               return -EBADF;
+
+       if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE)
+               return 1;
+
+       return 0;
+}
+
 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                              struct lu_fid *fid)
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct lu_name *lname = &info->mti_name;
+       const char *start = fileset;
        char *filename = info->mti_filename;
        struct mdt_object *parent;
        u32 mode;
@@ -282,8 +326,8 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
         */
        *fid = mdt->mdt_md_root_fid;
 
-       while (rc == 0 && fileset != NULL && *fileset != '\0') {
-               const char *s1 = fileset;
+       while (rc == 0 && start != NULL && *start != '\0') {
+               const char *s1 = start;
                const char *s2;
 
                while (*++s1 == '/')
@@ -295,7 +339,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                if (s2 == s1)
                        break;
 
-               fileset = s2;
+               start = s2;
 
                lname->ln_namelen = s2 - s1;
                if (lname->ln_namelen > NAME_MAX) {
@@ -331,9 +375,21 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                        rc = PTR_ERR(parent);
                else {
                        mode = lu_object_attr(&parent->mot_obj);
-                       mdt_object_put(info->mti_env, parent);
-                       if (!S_ISDIR(mode))
+                       if (!S_ISDIR(mode)) {
                                rc = -ENOTDIR;
+                       } else if (mdt_is_remote_object(info, parent, parent)) {
+                               if (!mdt->mdt_enable_remote_subdir_mount) {
+                                       rc = -EREMOTE;
+                                       LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
+                                                     mdt_obd_name(mdt),
+                                                     fileset, rc);
+                               } else {
+                                       LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
+                                                     mdt_obd_name(mdt),
+                                                     fileset);
+                               }
+                       }
+                       mdt_object_put(info->mti_env, parent);
                }
        }
 
@@ -414,6 +470,8 @@ static int mdt_statfs(struct tgt_session_info *tsi)
        struct obd_statfs *osfs;
        struct mdt_body *reqbody = NULL;
        struct mdt_statfs_cache *msf;
+       ktime_t kstart = ktime_get();
+       int current_blockbits;
        int rc;
 
        ENTRY;
@@ -470,6 +528,15 @@ static int mdt_statfs(struct tgt_session_info *tsi)
                        spin_unlock(&mdt->mdt_lock);
        }
 
+       /* tgd_blockbit is recordsize bits set during mkfs.
+        * This once set does not change. However, 'zfs set'
+        * can be used to change the MDT blocksize. Instead
+        * of using cached value of 'tgd_blockbit' always
+        * calculate the blocksize bits which may have
+        * changed.
+        */
+       current_blockbits = fls64(osfs->os_bsize) - 1;
+
        /* at least try to account for cached pages.  its still racy and
         * might be under-reporting if clients haven't announced their
         * caches with brw recently */
@@ -477,12 +544,12 @@ static int mdt_statfs(struct tgt_session_info *tsi)
               " pending %llu free %llu avail %llu\n",
               tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
               tgd->tgd_tot_pending,
-              osfs->os_bfree << tgd->tgd_blockbits,
-              osfs->os_bavail << tgd->tgd_blockbits);
+              osfs->os_bfree << current_blockbits,
+              osfs->os_bavail << current_blockbits);
 
        osfs->os_bavail -= min_t(u64, osfs->os_bavail,
                                 ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
-                                  osfs->os_bsize - 1) >> tgd->tgd_blockbits));
+                                  osfs->os_bsize - 1) >> current_blockbits));
 
        tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
        CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
@@ -491,19 +558,20 @@ static int mdt_statfs(struct tgt_session_info *tsi)
               osfs->os_files, osfs->os_ffree, osfs->os_state);
 
        if (!exp_grant_param_supp(tsi->tsi_exp) &&
-           tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) {
+           current_blockbits > COMPAT_BSIZE_SHIFT) {
                /* clients which don't support OBD_CONNECT_GRANT_PARAM
                 * should not see a block size > page size, otherwise
                 * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
                 * block size which is the biggest block size known to work
                 * with all client's page size. */
-               osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
-               osfs->os_bfree  <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
-               osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bfree  <<= current_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
                osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
        }
        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_STATFS);
+               mdt_counter_incr(req, LPROC_MDT_STATFS,
+                                ktime_us_delta(ktime_get(), kstart));
 out:
        mdt_thread_info_fini(info);
        RETURN(rc);
@@ -635,6 +703,7 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
        if (buf->lb_len == 0)
                RETURN(0);
 
+       LASSERT(!info->mti_big_acl_used);
 again:
        rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
        if (rc < 0) {
@@ -644,10 +713,9 @@ again:
                        rc = 0;
                } else if (rc == -EOPNOTSUPP) {
                        rc = 0;
-               } else {
-                       if (rc == -ERANGE &&
-                           exp_connect_large_acl(info->mti_exp) &&
-                           buf->lb_buf != info->mti_big_acl) {
+               } else if (rc == -ERANGE) {
+                       if (exp_connect_large_acl(info->mti_exp) &&
+                           !info->mti_big_acl_used) {
                                if (info->mti_big_acl == NULL) {
                                        info->mti_big_aclsize =
                                                        min_t(unsigned int,
@@ -673,47 +741,19 @@ again:
 
                                buf->lb_buf = info->mti_big_acl;
                                buf->lb_len = info->mti_big_aclsize;
-
+                               info->mti_big_acl_used = 1;
                                goto again;
                        }
-
+                       /* FS has ACL bigger that our limits */
+                       CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
+                              mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
+                              info->mti_big_aclsize);
+                       rc = -E2BIG;
+               } else {
                        CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
                               mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
                }
        } else {
-               int client;
-               int server;
-               int acl_buflen;
-               int lmm_buflen = 0;
-               int lmmsize = 0;
-
-               acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
-               if (acl_buflen >= rc)
-                       goto map;
-
-               /* If LOV/LMA EA is small, we can reuse part of their buffer */
-               client = ptlrpc_req_get_repsize(pill->rc_req);
-               server = lustre_packed_msg_size(pill->rc_req->rq_repmsg);
-               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
-                       lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD,
-                                                         RCL_SERVER);
-                       lmmsize = repbody->mbo_eadatasize;
-               }
-
-               if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) {
-                       CDEBUG(D_INODE, "%s: client prepared buffer size %d "
-                              "is not big enough with the ACL size %d (%d)\n",
-                              mdt_obd_name(mdt), client, rc,
-                              server - acl_buflen - lmm_buflen + rc + lmmsize);
-                       repbody->mbo_aclsize = 0;
-                       repbody->mbo_valid &= ~OBD_MD_FLACL;
-                       RETURN(-ERANGE);
-               }
-
-map:
-               if (buf->lb_buf == info->mti_big_acl)
-                       info->mti_big_acl_used = 1;
-
                rc = nodemap_map_acl(nodemap, buf->lb_buf,
                                     rc, NODEMAP_FS_TO_CLIENT);
                /* if all ACLs mapped out, rc is still >= 0 */
@@ -791,7 +831,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_nlink = attr->la_nlink;
                b->mbo_valid |= OBD_MD_FLNLINK;
        }
-       if (attr->la_valid & (LA_UID|LA_GID)) {
+       if (attr->la_valid & (LA_UID|LA_GID|LA_PROJID)) {
                nodemap = nodemap_get_from_exp(exp);
                if (IS_ERR(nodemap))
                        goto out;
@@ -810,8 +850,9 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
        }
 
        if (attr->la_valid & LA_PROJID) {
-               /* TODO, nodemap for project id */
-               b->mbo_projid = attr->la_projid;
+               b->mbo_projid = nodemap_map_id(nodemap, NODEMAP_PROJID,
+                                              NODEMAP_FS_TO_CLIENT,
+                                              attr->la_projid);
                b->mbo_valid |= OBD_MD_FLPROJID;
        }
 
@@ -979,8 +1020,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(rc);
 }
 
-int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
-                  struct md_attr *ma, const char *name)
+int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                    struct md_attr *ma, const char *name)
 {
        struct md_object *next = mdt_object_child(o);
        struct lu_buf    *buf = &info->mti_buf;
@@ -1056,6 +1097,40 @@ got:
        return rc;
 }
 
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                  struct md_attr *ma, const char *name)
+{
+       int rc;
+
+       if (!info->mti_big_lmm) {
+               OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
+               if (!info->mti_big_lmm)
+                       return -ENOMEM;
+               info->mti_big_lmmsize = PAGE_SIZE;
+       }
+
+       if (strcmp(name, XATTR_NAME_LOV) == 0) {
+               ma->ma_lmm = info->mti_big_lmm;
+               ma->ma_lmm_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LOV;
+       } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
+               ma->ma_lmv = info->mti_big_lmm;
+               ma->ma_lmv_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LMV;
+       } else {
+               LBUG();
+       }
+
+       LASSERT(!info->mti_big_lmm_used);
+       rc = __mdt_stripe_get(info, o, ma, name);
+       /* since big_lmm is always used here, clear 'used' flag to avoid
+        * assertion in mdt_big_xattr_get().
+        */
+       info->mti_big_lmm_used = 0;
+
+       return rc;
+}
+
 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
                      struct lu_fid *pfid)
 {
@@ -1103,6 +1178,51 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(0);
 }
 
+int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct lu_fid *pfid, struct lu_name *lname)
+{
+       struct lu_buf *buf = &info->mti_buf;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       int reclen;
+       int rc;
+
+       buf->lb_buf = info->mti_xattr_buf;
+       buf->lb_len = sizeof(info->mti_xattr_buf);
+       rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
+                         XATTR_NAME_LINK);
+       if (rc == -ERANGE) {
+               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+               buf->lb_buf = info->mti_big_lmm;
+               buf->lb_len = info->mti_big_lmmsize;
+       }
+       if (rc < 0)
+               return rc;
+
+       if (rc < sizeof(*leh)) {
+               CERROR("short LinkEA on "DFID": rc = %d\n",
+                      PFID(mdt_object_fid(o)), rc);
+               return -ENODATA;
+       }
+
+       leh = (struct link_ea_header *)buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+               leh->leh_magic = LINK_EA_MAGIC;
+               leh->leh_reccount = __swab32(leh->leh_reccount);
+               leh->leh_len = __swab64(leh->leh_len);
+       }
+       if (leh->leh_magic != LINK_EA_MAGIC)
+               return -EINVAL;
+
+       if (leh->leh_reccount == 0)
+               return -ENODATA;
+
+       linkea_entry_unpack(lee, &reclen, lname, pfid);
+
+       return 0;
+}
+
 int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma)
 {
@@ -1140,19 +1260,19 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
        }
 
        if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
                if (rc)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV_DEF && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
@@ -1202,19 +1322,22 @@ out:
 }
 
 static int mdt_getattr_internal(struct mdt_thread_info *info,
-                                struct mdt_object *o, int ma_need)
+                               struct mdt_object *o, int ma_need)
 {
-       struct md_object        *next = mdt_object_child(o);
-       const struct mdt_body   *reqbody = info->mti_body;
-       struct ptlrpc_request   *req = mdt_info_req(info);
-       struct md_attr          *ma = &info->mti_attr;
-       struct lu_attr          *la = &ma->ma_attr;
-       struct req_capsule      *pill = info->mti_pill;
-       const struct lu_env     *env = info->mti_env;
-       struct mdt_body         *repbody;
-       struct lu_buf           *buffer = &info->mti_buf;
-       struct obd_export       *exp = info->mti_exp;
-       int                      rc;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct md_object *next = mdt_object_child(o);
+       const struct mdt_body *reqbody = info->mti_body;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_attr *la = &ma->ma_attr;
+       struct req_capsule *pill = info->mti_pill;
+       const struct lu_env *env = info->mti_env;
+       struct mdt_body *repbody;
+       struct lu_buf *buffer = &info->mti_buf;
+       struct obd_export *exp = info->mti_exp;
+       ktime_t kstart = ktime_get();
+       int rc;
+
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
@@ -1301,20 +1424,20 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                }
        }
 
-        if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
+       if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
            reqbody->mbo_valid & OBD_MD_FLDIREA  &&
-            lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
-                /* get default stripe info for this dir. */
-                ma->ma_need |= MA_LOV_DEF;
-        }
-        ma->ma_need |= ma_need;
+           lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
+               /* get default stripe info for this dir. */
+               ma->ma_need |= MA_LOV_DEF;
+       }
+       ma->ma_need |= ma_need;
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
-               CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
-                      "%s: getattr error for "DFID": rc = %d\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(mdt_object_fid(o)), rc);
+               CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
+                            "%s: getattr error for "DFID": rc = %d\n",
+                            mdt_obd_name(info->mti_mdt),
+                            PFID(mdt_object_fid(o)), rc);
                RETURN(rc);
        }
 
@@ -1326,22 +1449,28 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_t_state = MS_RESTORE;
        }
 
-        if (likely(ma->ma_valid & MA_INODE))
-                mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
-        else
-                RETURN(-EFAULT);
+       if (unlikely(!(ma->ma_valid & MA_INODE)))
+               RETURN(-EFAULT);
+
+       mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
+
+       if (mdt_body_has_lov(la, reqbody)) {
+               u32 stripe_count = 1;
+               bool fixed_layout = false;
 
-        if (mdt_body_has_lov(la, reqbody)) {
-                if (ma->ma_valid & MA_LOV) {
-                        LASSERT(ma->ma_lmm_size);
+               if (ma->ma_valid & MA_LOV) {
+                       LASSERT(ma->ma_lmm_size);
                        repbody->mbo_eadatasize = ma->ma_lmm_size;
                        if (S_ISDIR(la->la_mode))
                                repbody->mbo_valid |= OBD_MD_FLDIREA;
                        else
                                repbody->mbo_valid |= OBD_MD_FLEASIZE;
                        mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
-                }
+               }
                if (ma->ma_valid & MA_LMV) {
+                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+                       u32 magic = le32_to_cpu(lmv->lmv_magic);
+
                        /* Return -ENOTSUPP for old client */
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
@@ -1350,6 +1479,14 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->mbo_eadatasize = ma->ma_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
+
+                       stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+                       fixed_layout = lmv_is_fixed(lmv);
+                       if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
+                               mdt_restripe_migrate_add(info, o);
+                       else if (magic == LMV_MAGIC_V1 &&
+                                lmv_is_restriping(lmv))
+                               mdt_restripe_update_add(info, o);
                }
                if (ma->ma_valid & MA_LMV_DEF) {
                        /* Return -ENOTSUPP for old client */
@@ -1366,6 +1503,19 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_valid |= (OBD_MD_FLDIREA |
                                               OBD_MD_DEFAULT_MEA);
                }
+               CDEBUG(D_VFSTRACE,
+                      "dirent count %llu stripe count %u MDT count %d\n",
+                      ma->ma_attr.la_dirent_count, stripe_count,
+                      atomic_read(&mdt->mdt_mds_mds_conns) + 1);
+               if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
+                   ma->ma_attr.la_dirent_count >
+                       mdt->mdt_restriper.mdr_dir_split_count &&
+                   !fid_is_root(mdt_object_fid(o)) &&
+                   mdt->mdt_enable_dir_auto_split &&
+                   !o->mot_restriping &&
+                   stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
+                   !fixed_layout)
+                       mdt_auto_split_add(info, o);
        } else if (S_ISLNK(la->la_mode) &&
                   reqbody->mbo_valid & OBD_MD_LINKNAME) {
                buffer->lb_buf = ma->ma_lmm;
@@ -1403,8 +1553,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                               print_limit < rc ? "..." : "", print_limit,
                               (char *)ma->ma_lmm + rc - print_limit, rc);
                        rc = 0;
-                }
-        }
+               }
+       }
 
        if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
                repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
@@ -1426,10 +1576,11 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 #endif
 
 out:
-        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_GETATTR);
+       if (rc == 0)
+               mdt_counter_incr(req, LPROC_MDT_GETATTR,
+                                ktime_us_delta(ktime_get(), kstart));
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_getattr(struct tgt_session_info *tsi)
@@ -1694,29 +1845,152 @@ out:
 
 static int mdt_raw_lookup(struct mdt_thread_info *info,
                          struct mdt_object *parent,
-                         const struct lu_name *lname,
-                         struct ldlm_reply *ldlm_rep)
+                         const struct lu_name *lname)
 {
-       struct lu_fid   *child_fid = &info->mti_tmp_fid1;
-       int              rc;
+       struct lu_fid *fid = &info->mti_tmp_fid1;
+       struct mdt_body *repbody;
+       bool is_dotdot = false;
+       bool is_old_parent_stripe = false;
+       bool is_new_parent_checked = false;
+       int rc;
+
        ENTRY;
 
        LASSERT(!info->mti_cross_ref);
+       /* Always allow to lookup ".." */
+       if (lname->ln_namelen == 2 &&
+           lname->ln_name[0] == '.' && lname->ln_name[1] == '.') {
+               info->mti_spec.sp_permitted = 1;
+               is_dotdot = true;
+               if (mdt_is_dir_stripe(info, parent) == 1)
+                       is_old_parent_stripe = true;
+       }
 
+       mdt_object_get(info->mti_env, parent);
+lookup:
        /* Only got the fid of this obj by name */
-       fid_zero(child_fid);
-       rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
-                       lname, child_fid, &info->mti_spec);
-       if (rc == 0) {
-               struct mdt_body *repbody;
+       fid_zero(fid);
+       rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid,
+                       &info->mti_spec);
+       mdt_object_put(info->mti_env, parent);
+       if (rc)
+               RETURN(rc);
+
+       /* getattr_name("..") should return master object FID for striped dir */
+       if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) {
+               parent = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+               if (IS_ERR(parent))
+                       RETURN(PTR_ERR(parent));
+
+               /* old client getattr_name("..") with stripe FID */
+               if (unlikely(is_old_parent_stripe)) {
+                       is_old_parent_stripe = false;
+                       goto lookup;
+               }
+
+               /* ".." may be a stripe */
+               if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) {
+                       is_new_parent_checked = true;
+                       goto lookup;
+               }
+
+               mdt_object_put(info->mti_env, parent);
+       }
+
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       repbody->mbo_fid1 = *fid;
+       repbody->mbo_valid = OBD_MD_FLID;
+
+       RETURN(rc);
+}
+
+/**
+ * Find name matching hash
+ *
+ * We search \a child LinkEA for a name whose hash matches \a lname
+ * (it contains an encoded hash).
+ *
+ * \param info mdt thread info
+ * \param lname encoded hash to find
+ * \param parent parent object
+ * \param child object to search with LinkEA
+ * \param force_check true to check hash even if LinkEA has only one entry
+ *
+ * \retval 1 match found
+ * \retval 0 no match found
+ * \retval -ev negative errno upon error
+ */
+int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname,
+                           struct mdt_object *parent, struct mdt_object *child,
+                           bool force_check)
+{
+       /* Here, lname is an encoded hash of on-disk name, and
+        * client is doing access without encryption key.
+        * So we need to get LinkEA, check parent fid is correct and
+        * compare name hash with the one in the request.
+        */
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct lu_name name;
+       struct lu_fid pfid;
+       struct linkea_data ldata = { NULL };
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       struct lu_buf link = { 0 };
+       char *hash = NULL;
+       int reclen, count, rc;
+
+       ENTRY;
+
+       if (lname->ln_namelen < LLCRYPT_FNAME_DIGEST_SIZE)
+               RETURN(-EINVAL);
+
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (!buf->lb_buf)
+               RETURN(-ENOMEM);
 
-               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-               repbody->mbo_fid1 = *child_fid;
-               repbody->mbo_valid = OBD_MD_FLID;
-               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
-       } else if (rc == -ENOENT) {
-               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+       ldata.ld_buf = buf;
+       rc = mdt_links_read(info, child, &ldata);
+       if (rc < 0)
+               RETURN(rc);
+
+       leh = buf->lb_buf;
+       if (force_check || leh->leh_reccount > 1) {
+               hash = kmalloc(lname->ln_namelen, GFP_NOFS);
+               if (!hash)
+                       RETURN(-ENOMEM);
+               rc = critical_decode(lname->ln_name, lname->ln_namelen, hash);
        }
+       lee = (struct link_ea_entry *)(leh + 1);
+       for (count = 0; count < leh->leh_reccount; count++) {
+               linkea_entry_unpack(lee, &reclen, &name, &pfid);
+               if (!force_check && leh->leh_reccount == 1) {
+                       /* if there is only one rec, it has to be it */
+                       *lname = name;
+                       break;
+               }
+               if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) {
+                       lu_buf_check_and_alloc(&link, name.ln_namelen);
+                       if (!link.lb_buf)
+                               GOTO(out_match, rc = -ENOMEM);
+                       rc = critical_decode(name.ln_name, name.ln_namelen,
+                                            link.lb_buf);
+
+                       if (memcmp(LLCRYPT_FNAME_DIGEST(link.lb_buf, rc),
+                                  hash, LLCRYPT_FNAME_DIGEST_SIZE) == 0) {
+                               *lname = name;
+                               break;
+                       }
+               }
+               lee = (struct link_ea_entry *) ((char *)lee + reclen);
+       }
+       if (count == leh->leh_reccount)
+               rc = 0;
+       else
+               rc = 1;
+
+out_match:
+       lu_buf_free(&link);
+       kfree(hash);
 
        RETURN(rc);
 }
@@ -1732,14 +2006,14 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                                  __u64 child_bits,
                                  struct ldlm_reply *ldlm_rep)
 {
-       struct ptlrpc_request  *req = mdt_info_req(info);
-       struct mdt_body        *reqbody = NULL;
-       struct mdt_object      *parent = info->mti_object;
-       struct mdt_object      *child;
-       struct lu_fid          *child_fid = &info->mti_tmp_fid1;
-       struct lu_name         *lname = NULL;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct mdt_body *reqbody = NULL;
+       struct mdt_object *parent = info->mti_object;
+       struct mdt_object *child = NULL;
+       struct lu_fid *child_fid = &info->mti_tmp_fid1;
+       struct lu_name *lname = NULL;
        struct mdt_lock_handle *lhp = NULL;
-       struct ldlm_lock       *lock;
+       struct ldlm_lock *lock;
        struct req_capsule *pill = info->mti_pill;
        __u64 try_bits = 0;
        bool is_resent;
@@ -1803,6 +2077,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                }
 
                rc = mdt_pack_secctx_in_reply(info, child);
+               if (unlikely(rc)) {
+                       mdt_object_unlock(info, child, lhc, 1);
+                       RETURN(rc);
+               }
+
+               rc = mdt_pack_encctx_in_reply(info, child);
                if (unlikely(rc))
                        mdt_object_unlock(info, child, lhc, 1);
                RETURN(rc);
@@ -1811,7 +2091,37 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        lname = &info->mti_name;
        mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
 
-       if (lu_name_is_valid(lname)) {
+       if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
+               reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
+               if (unlikely(reqbody == NULL))
+                       RETURN(err_serious(-EPROTO));
+
+               *child_fid = reqbody->mbo_fid2;
+               if (unlikely(!fid_is_sane(child_fid)))
+                       RETURN(err_serious(-EINVAL));
+
+               if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
+                       mdt_object_get(info->mti_env, parent);
+                       child = parent;
+               } else {
+                       child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                               child_fid);
+                       if (IS_ERR(child))
+                               RETURN(PTR_ERR(child));
+               }
+
+               CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
+                      "ldlm_rep = %p\n",
+                      PFID(mdt_object_fid(parent)),
+                      PFID(&reqbody->mbo_fid2), ldlm_rep);
+       } else if (lu_name_is_valid(lname)) {
+               if (mdt_object_remote(parent)) {
+                       CERROR("%s: parent "DFID" is on remote target\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(parent)));
+                       RETURN(-EPROTO);
+               }
+
                CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
                       "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
                       PNAME(lname), ldlm_rep);
@@ -1821,10 +2131,33 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        RETURN(err_serious(-EPROTO));
 
                *child_fid = reqbody->mbo_fid2;
-
                if (unlikely(!fid_is_sane(child_fid)))
                        RETURN(err_serious(-EINVAL));
 
+               if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
+                       mdt_object_get(info->mti_env, parent);
+                       child = parent;
+               } else {
+                       child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                               child_fid);
+                       if (IS_ERR(child))
+                               RETURN(PTR_ERR(child));
+               }
+
+               if (mdt_object_remote(child)) {
+                       CERROR("%s: child "DFID" is on remote target\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(child)));
+                       GOTO(out_child, rc = -EPROTO);
+               }
+
+               /* don't fetch LOOKUP lock if it's remote object */
+               rc = mdt_is_remote_object(info, parent, child);
+               if (rc < 0)
+                       GOTO(out_child, rc);
+               if (rc)
+                       child_bits &= ~MDS_INODELOCK_LOOKUP;
+
                CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
                       "ldlm_rep = %p\n",
                       PFID(mdt_object_fid(parent)),
@@ -1833,29 +2166,37 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
 
-       if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
+       if (unlikely(!mdt_object_exists(parent)) &&
+           !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
+           lu_name_is_valid(lname)) {
                LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                &parent->mot_obj,
                                "Parent doesn't exist!");
-               RETURN(-ESTALE);
-       }
-
-       if (mdt_object_remote(parent)) {
-               CERROR("%s: parent "DFID" is on remote target\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(mdt_object_fid(parent)));
-               RETURN(-EIO);
+               GOTO(out_child, rc = -ESTALE);
        }
 
-       if (lu_name_is_valid(lname)) {
-               /* Always allow to lookup ".." */
-               if (unlikely(lname->ln_namelen == 2 &&
-                            lname->ln_name[0] == '.' &&
-                            lname->ln_name[1] == '.'))
-                       info->mti_spec.sp_permitted = 1;
-
+       if (!child && is_resent) {
+               lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+               if (lock == NULL) {
+                       /* Lock is pinned by ldlm_handle_enqueue0() as it is
+                        * a resend case, however, it could be already destroyed
+                        * due to client eviction or a raced cancel RPC.
+                        */
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
+                                         lhc->mlh_reg_lh.cookie);
+                       RETURN(-ESTALE);
+               }
+               fid_extract_from_res_name(child_fid,
+                                         &lock->l_resource->lr_name);
+               LDLM_LOCK_PUT(lock);
+               child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                       child_fid);
+               if (IS_ERR(child))
+                       RETURN(PTR_ERR(child));
+       } else if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
+           lu_name_is_valid(lname)) {
                if (info->mti_body->mbo_valid == OBD_MD_FLID) {
-                       rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
+                       rc = mdt_raw_lookup(info, parent, lname);
 
                        RETURN(rc);
                }
@@ -1878,30 +2219,31 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
 
                if (rc != 0)
-                       GOTO(out_parent, rc);
-       }
+                       GOTO(unlock_parent, rc);
 
-       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
-
-       /*
-        *step 3: find the child object by fid & lock it.
-        *        regardless if it is local or remote.
-        *
-        *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e)
-        *      set parent dir fid the same as child fid in getattr by fid case
-        *      we should not lu_object_find() the object again, could lead
-        *      to hung if there is a concurrent unlink destroyed the object.
-        */
-       if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
-               mdt_object_get(info->mti_env, parent);
-               child = parent;
-       } else {
                child = mdt_object_find(info->mti_env, info->mti_mdt,
                                        child_fid);
+               if (unlikely(IS_ERR(child)))
+                       GOTO(unlock_parent, rc = PTR_ERR(child));
        }
 
-       if (unlikely(IS_ERR(child)))
-               GOTO(out_parent, rc = PTR_ERR(child));
+       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+
+       /* step 3: lock child regardless if it is local or remote. */
+       LASSERT(child);
+
+       if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
+               /* Here, lname is an encoded hash of on-disk name, and
+                * client is doing access without encryption key.
+                * So we need to compare name hash with the one in the request.
+                */
+               if (!find_name_matching_hash(info, lname, parent,
+                                            child, true)) {
+                       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+                       mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+                       GOTO(out_child, rc = -ENOENT);
+               }
+       }
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
        if (!mdt_object_exists(child)) {
@@ -1976,13 +2318,22 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        /* finally, we can get attr for child. */
        rc = mdt_getattr_internal(info, child, ma_need);
        if (unlikely(rc != 0)) {
-               mdt_object_unlock(info, child, lhc, 1);
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
                GOTO(out_child, rc);
        }
 
        rc = mdt_pack_secctx_in_reply(info, child);
        if (unlikely(rc)) {
-               mdt_object_unlock(info, child, lhc, 1);
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
+               GOTO(out_child, rc);
+       }
+
+       rc = mdt_pack_encctx_in_reply(info, child);
+       if (unlikely(rc)) {
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
                GOTO(out_child, rc);
        }
 
@@ -1996,6 +2347,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         PLDLMRES(lock->l_resource),
                         PFID(mdt_object_fid(child)));
 
+               if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
+                       if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+                               OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
+                                                req->rq_deadline -
+                                                req->rq_arrival_time.tv_sec +
+                                                cfs_fail_val ?: 3);
+                       /* Put the lock to the waiting list and force the cancel */
+                       ldlm_set_ast_sent(lock);
+               }
+
                if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
                    !mdt_object_remote(child) && child != parent) {
                        mdt_object_put(info->mti_env, child);
@@ -2010,15 +2371,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                                unlock_res_and_lock(lock);
                        }
                        LDLM_LOCK_PUT(lock);
-                       GOTO(out_parent, rc = 0);
+                       GOTO(unlock_parent, rc = 0);
                }
                LDLM_LOCK_PUT(lock);
        }
 
        EXIT;
 out_child:
-       mdt_object_put(info->mti_env, child);
-out_parent:
+       if (child)
+               mdt_object_put(info->mti_env, child);
+unlock_parent:
        if (lhp)
                mdt_object_unlock(info, parent, lhp, 1);
        return rc;
@@ -2028,37 +2390,38 @@ out_parent:
 static int mdt_getattr_name(struct tgt_session_info *tsi)
 {
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
-        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
-        struct mdt_body        *reqbody;
-        struct mdt_body        *repbody;
-        int rc, rc2;
-        ENTRY;
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
+       struct mdt_body *reqbody;
+       struct mdt_body *repbody;
+       int rc, rc2;
 
-        reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(reqbody != NULL);
-        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(repbody != NULL);
+       ENTRY;
+
+       reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(reqbody != NULL);
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(repbody != NULL);
 
        info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
        repbody->mbo_eadatasize = 0;
        repbody->mbo_aclsize = 0;
 
-        rc = mdt_init_ucred_intent_getattr(info, reqbody);
-        if (unlikely(rc))
-                GOTO(out_shrink, rc);
+       rc = mdt_init_ucred(info, reqbody);
+       if (unlikely(rc))
+               GOTO(out_shrink, rc);
 
-        rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
-        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
-                lhc->mlh_reg_lh.cookie = 0;
-        }
-        mdt_exit_ucred(info);
-        EXIT;
+       rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
+               lhc->mlh_reg_lh.cookie = 0;
+       }
+       mdt_exit_ucred(info);
+       EXIT;
 out_shrink:
-        mdt_client_compatibility(info);
-        rc2 = mdt_fix_reply(info);
-        if (rc == 0)
-                rc = rc2;
+       mdt_client_compatibility(info);
+       rc2 = mdt_fix_reply(info);
+       if (rc == 0)
+               rc = rc2;
        mdt_thread_info_fini(info);
        return rc;
 }
@@ -2153,7 +2516,7 @@ static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
        if (la->la_flags & LUSTRE_IMMUTABLE_FL)
                        rc = -EACCES;
 
-       if (md_capable(uc, CFS_CAP_DAC_OVERRIDE))
+       if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE))
                RETURN(0);
        if (uc->uc_fsuid == la->la_uid) {
                if ((la->la_mode & S_IWUSR) == 0)
@@ -2279,6 +2642,61 @@ out:
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg);
 
+int mdt_io_set_info(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct ost_body         *body = NULL, *repbody;
+       void                    *key, *val = NULL;
+       int                      keylen, vallen, rc = 0;
+       bool                     is_grant_shrink;
+
+       ENTRY;
+
+       key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
+       if (key == NULL) {
+               DEBUG_REQ(D_HA, req, "no set_info key");
+               RETURN(err_serious(-EFAULT));
+       }
+       keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
+                                     RCL_CLIENT);
+
+       val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
+       if (val == NULL) {
+               DEBUG_REQ(D_HA, req, "no set_info val");
+               RETURN(err_serious(-EFAULT));
+       }
+       vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
+                                     RCL_CLIENT);
+
+       is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK);
+       if (is_grant_shrink)
+               /* In this case the value is actually an RMF_OST_BODY, so we
+                * transmutate the type of this PTLRPC */
+               req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO);
+
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (is_grant_shrink) {
+               body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+               repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+               *repbody = *body;
+
+               /** handle grant shrink, similar to a read request */
+               tgt_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp,
+                                      &repbody->oa);
+       } else {
+               CERROR("%s: Unsupported key %s\n",
+                      tgt_name(tsi->tsi_tgt), (char *)key);
+               rc = -EOPNOTSUPP;
+       }
+
+       RETURN(rc);
+}
+
+
 static int mdt_set_info(struct tgt_session_info *tsi)
 {
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
@@ -2324,7 +2742,7 @@ static int mdt_set_info(struct tgt_session_info *tsi)
                               tgt_name(tsi->tsi_tgt), vallen);
                        RETURN(-EINVAL);
                }
-               if (ptlrpc_req_need_swab(req)) {
+               if (req_capsule_req_need_swab(&req->rq_pill)) {
                        __swab64s(&cs->cs_recno);
                        __swab32s(&cs->cs_id);
                }
@@ -2380,17 +2798,17 @@ static int mdt_readpage(struct tgt_session_info *tsi)
                                exp_max_brw_size(tsi->tsi_exp));
        rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
                          PAGE_SHIFT;
-        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
-        if (rdpg->rp_pages == NULL)
-                RETURN(-ENOMEM);
+       OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
+       if (rdpg->rp_pages == NULL)
+               RETURN(-ENOMEM);
 
-        for (i = 0; i < rdpg->rp_npages; ++i) {
+       for (i = 0; i < rdpg->rp_npages; ++i) {
                rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
-                if (rdpg->rp_pages[i] == NULL)
-                        GOTO(free_rdpg, rc = -ENOMEM);
-        }
+               if (rdpg->rp_pages[i] == NULL)
+                       GOTO(free_rdpg, rc = -ENOMEM);
+       }
 
-        /* call lower layers to fill allocated pages with directory data */
+       /* call lower layers to fill allocated pages with directory data */
        rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
        if (rc < 0)
                GOTO(free_rdpg, rc);
@@ -2404,7 +2822,7 @@ free_rdpg:
        for (i = 0; i < rdpg->rp_npages; i++)
                if (rdpg->rp_pages[i] != NULL)
                        __free_page(rdpg->rp_pages[i]);
-       OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+       OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
                RETURN(0);
@@ -2447,17 +2865,27 @@ static void mdt_preset_secctx_size(struct mdt_thread_info *info)
            req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
                                  RCL_CLIENT)) {
                if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
-                                        RCL_CLIENT) != 0) {
+                                        RCL_CLIENT) != 0)
                        /* pre-set size in server part with max size */
                        req_capsule_set_size(pill, &RMF_FILE_SECCTX,
                                             RCL_SERVER,
-                                            info->mti_mdt->mdt_max_ea_size);
-               } else {
+                                            OBD_MAX_DEFAULT_EA_SIZE);
+               else
                        req_capsule_set_size(pill, &RMF_FILE_SECCTX,
                                             RCL_SERVER, 0);
-               }
        }
+}
 
+static void mdt_preset_encctx_size(struct mdt_thread_info *info)
+{
+       struct req_capsule *pill = info->mti_pill;
+
+       if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
+                                 RCL_SERVER))
+               /* pre-set size in server part with max size */
+               req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
+                                    RCL_SERVER,
+                                    info->mti_mdt->mdt_max_mdsize);
 }
 
 static int mdt_reint_internal(struct mdt_thread_info *info,
@@ -2499,6 +2927,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                                     LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
        mdt_preset_secctx_size(info);
+       mdt_preset_encctx_size(info);
 
        rc = req_capsule_server_pack(pill);
        if (rc != 0) {
@@ -2665,6 +3094,7 @@ static int mdt_sync(struct tgt_session_info *tsi)
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
        struct req_capsule      *pill = tsi->tsi_pill;
        struct mdt_body         *body;
+       ktime_t                  kstart = ktime_get();
        int                      rc;
 
        ENTRY;
@@ -2701,7 +3131,8 @@ static int mdt_sync(struct tgt_session_info *tsi)
                mdt_thread_info_fini(info);
        }
        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_SYNC);
+               mdt_counter_incr(req, LPROC_MDT_SYNC,
+                                ktime_us_delta(ktime_get(), kstart));
 
        RETURN(rc);
 }
@@ -2788,7 +3219,9 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case LUSTRE_Q_SETDEFAULT:
        case LUSTRE_Q_SETQUOTAPOOL:
        case LUSTRE_Q_SETINFOPOOL:
-               if (!nodemap_can_setquota(nodemap))
+       case LUSTRE_Q_SETDEFAULT_POOL:
+               if (!nodemap_can_setquota(nodemap, oqctl->qc_type,
+                                         oqctl->qc_id))
                        GOTO(out_nodemap, rc = -EPERM);
                /* fallthrough */
        case Q_GETINFO:
@@ -2796,6 +3229,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case LUSTRE_Q_GETDEFAULT:
        case LUSTRE_Q_GETQUOTAPOOL:
        case LUSTRE_Q_GETINFOPOOL:
+       case LUSTRE_Q_GETDEFAULT_POOL:
                if (qmt == NULL)
                        GOTO(out_nodemap, rc = -EOPNOTSUPP);
                /* slave quotactl */
@@ -2821,8 +3255,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
                                    NODEMAP_CLIENT_TO_FS, id);
                break;
        case PRJQUOTA:
-               /* todo: check/map project id */
-               id = oqctl->qc_id;
+               id = nodemap_map_id(nodemap, NODEMAP_PROJID,
+                                   NODEMAP_CLIENT_TO_FS, id);
                break;
        default:
                GOTO(out_nodemap, rc = -EOPNOTSUPP);
@@ -2854,6 +3288,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case LUSTRE_Q_GETQUOTAPOOL:
        case LUSTRE_Q_SETINFOPOOL:
        case LUSTRE_Q_GETINFOPOOL:
+       case LUSTRE_Q_SETDEFAULT_POOL:
+       case LUSTRE_Q_GETDEFAULT_POOL:
                /* forward quotactl request to QMT */
                rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
                break;
@@ -3294,10 +3730,9 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
                                          cache);
 }
 
-static int mdt_object_local_lock(struct mdt_thread_info *info,
-                                struct mdt_object *o,
-                                struct mdt_lock_handle *lh, __u64 *ibits,
-                                __u64 trybits, bool cos_incompat)
+int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
+                         struct mdt_lock_handle *lh, __u64 *ibits,
+                         __u64 trybits, bool cos_incompat)
 {
        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
        union ldlm_policy_data *policy = &info->mti_policy;
@@ -3381,6 +3816,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info,
 
        policy->l_inodebits.bits = *ibits;
        policy->l_inodebits.try_bits = trybits;
+       policy->l_inodebits.li_gid = lh->mlh_gid;
 
         /*
          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
@@ -3749,6 +4185,7 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
                                             LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
                mdt_preset_secctx_size(info);
+               mdt_preset_encctx_size(info);
 
                rc = req_capsule_server_pack(pill);
                if (rc)
@@ -3895,25 +4332,29 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
        /* If possible resent found a lock, @lh is set to its handle */
        new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
 
-        if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
-                lh->mlh_reg_lh.cookie = 0;
-                RETURN(0);
-        }
-
-       if (new_lock == NULL && (flags & LDLM_FL_RESENT)) {
-               /* Lock is pinned by ldlm_handle_enqueue0() as it is
-                * a resend case, however, it could be already destroyed
-                * due to client eviction or a raced cancel RPC. */
-               LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
-                                 lh->mlh_reg_lh.cookie);
+       if (new_lock == NULL) {
+               if (flags & LDLM_FL_INTENT_ONLY) {
+                       result = 0;
+               } else if (flags & LDLM_FL_RESENT) {
+                       /* Lock is pinned by ldlm_handle_enqueue0() as it is a
+                        * resend case, however, it could be already destroyed
+                        * due to client eviction or a raced cancel RPC.
+                        */
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
+                                         lh->mlh_reg_lh.cookie);
+                       result = -ESTALE;
+               } else {
+                       CERROR("%s: Invalid lockh=%#llx flags=%#llx fid1="DFID" fid2="DFID": rc = %d\n",
+                              mdt_obd_name(info->mti_mdt),
+                              lh->mlh_reg_lh.cookie, flags,
+                              PFID(&info->mti_tmp_fid1),
+                              PFID(&info->mti_tmp_fid2), result);
+                       result = -ESTALE;
+               }
                lh->mlh_reg_lh.cookie = 0;
-               RETURN(-ESTALE);
+               RETURN(result);
        }
 
-       LASSERTF(new_lock != NULL,
-                "lockh %#llx flags %#llx : rc = %d\n",
-                lh->mlh_reg_lh.cookie, flags, result);
-
         /*
          * If we've already given this lock to a client once, then we should
          * have no readers or writers.  Otherwise, we should have one reader
@@ -4112,7 +4553,7 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc,
                GOTO(out_shrink, rc = -EINVAL);
        }
 
-       rc = mdt_init_ucred_intent_getattr(info, reqbody);
+       rc = mdt_init_ucred(info, reqbody);
        if (rc)
                GOTO(out_shrink, rc);
 
@@ -4294,6 +4735,7 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
         struct ldlm_reply      *rep = NULL;
         long                    opc;
         int                     rc;
+       struct ptlrpc_request  *req = mdt_info_req(info);
 
         static const struct req_format *intent_fmts[REINT_MAX] = {
                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
@@ -4311,6 +4753,9 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
 
         rc = mdt_reint_internal(info, lhc, opc);
 
+       if (rc < 0 && lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+               DEBUG_REQ(D_ERROR, req, "Replay open failed with %d", rc);
+
        /* Check whether the reply has been packed successfully. */
        if (mdt_info_req(info)->rq_repmsg != NULL)
                rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
@@ -4523,6 +4968,23 @@ static int mdt_intent_policy(const struct lu_env *env,
                } else {
                        rc = err_serious(-EFAULT);
                }
+       } else if (ldesc->l_resource.lr_type == LDLM_IBITS &&
+                  ldesc->l_policy_data.l_inodebits.bits == MDS_INODELOCK_DOM) {
+               struct ldlm_reply *rep;
+
+               /* No intent was provided but INTENT flag is set along with
+                * DOM bit, this is considered as GLIMPSE request.
+                * This logic is common for MDT and OST glimpse
+                */
+               mdt_ptlrpc_stats_update(req, IT_GLIMPSE);
+               rc = mdt_glimpse_enqueue(info, ns, lockp, flags);
+               /* Check whether the reply has been packed successfully. */
+               if (req->rq_repmsg != NULL) {
+                       rep = req_capsule_server_get(info->mti_pill,
+                                                    &RMF_DLM_REP);
+                       rep->lock_policy_res2 =
+                               ptlrpc_status_hton(rep->lock_policy_res2);
+               }
        } else {
                /* No intent was provided */
                req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
@@ -5276,6 +5738,12 @@ TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE,
                                         OST_PUNCH,     mdt_punch_hdl,
                                                        mdt_hp_punch),
 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC,    mdt_data_sync),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE,
+                                                       mdt_fallocate_hdl),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek),
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+               0,                      OST_SET_INFO,   mdt_io_set_info,
+               &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION),
 };
 
 static struct tgt_handler mdt_sec_ctx_ops[] = {
@@ -5369,6 +5837,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
 
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
+
+       mdt_restriper_stop(m);
        ping_evictor_stop();
 
        /* Remove the HSM /proc entry so the coordinator cannot be
@@ -5491,14 +5961,18 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                LASSERT(lsi->lsi_lmd);
                /* CMD is supported only in IAM mode */
                LASSERT(num);
-               node_id = simple_strtol(num, NULL, 10);
+               rc = kstrtol(num, 10, &node_id);
+               if (rc)
+                       RETURN(rc);
+
                obd->u.obt.obt_magic = OBT_MAGIC;
                if (lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
                        m->mdt_skip_lfsck = 1;
        }
 
-       /* DoM files get IO lock at open optionally by default */
-       m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
+       /* Just try to get a DoM lock by default. Otherwise, having a group
+        * lock granted, it may get blocked for a long time. */
+       m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN;
        /* DoM files are read at open and data is packed in the reply */
        m->mdt_opts.mo_dom_read_open = 1;
 
@@ -5510,10 +5984,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_enable_remote_dir = 1;
        m->mdt_enable_striped_dir = 1;
        m->mdt_enable_dir_migration = 1;
-       m->mdt_enable_dir_restripe = 1;
+       m->mdt_enable_dir_restripe = 0;
+       m->mdt_enable_dir_auto_split = 0;
        m->mdt_enable_remote_dir_gid = 0;
        m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_remote_rename = 1;
+       m->mdt_dir_restripe_nsonly = 1;
+       m->mdt_enable_remote_subdir_mount = 1;
 
        atomic_set(&m->mdt_mds_mds_conns, 0);
        atomic_set(&m->mdt_async_commit_count, 0);
@@ -5569,8 +6046,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                                              LDLM_NAMESPACE_SERVER,
                                              LDLM_NAMESPACE_GREEDY,
                                              LDLM_NS_TYPE_MDT);
-       if (m->mdt_namespace == NULL)
-               GOTO(err_fini_seq, rc = -ENOMEM);
+       if (IS_ERR(m->mdt_namespace)) {
+               rc = PTR_ERR(m->mdt_namespace);
+               CERROR("%s: unable to create server namespace: rc = %d\n",
+                      obd->obd_name, rc);
+               m->mdt_namespace = NULL;
+               GOTO(err_fini_seq, rc);
+       }
 
        m->mdt_namespace->ns_lvbp = m;
        m->mdt_namespace->ns_lvbo = &mdt_lvbo;
@@ -5674,7 +6156,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_LOCAL_RECOV))
                m->mdt_lut.lut_local_recovery = 1;
 
+       rc = mdt_restriper_start(m);
+       if (rc)
+               GOTO(err_ping_evictor, rc);
+
        RETURN(0);
+
+err_ping_evictor:
+       ping_evictor_stop();
 err_procfs:
        mdt_tunables_fini(m);
 err_recovery:
@@ -5824,6 +6313,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                init_rwsem(&mo->mot_dom_sem);
                init_rwsem(&mo->mot_open_sem);
                atomic_set(&mo->mot_open_count, 0);
+               mo->mot_restripe_offset = 0;
+               INIT_LIST_HEAD(&mo->mot_restripe_linkage);
                RETURN(o);
        }
        RETURN(NULL);
@@ -6122,11 +6613,7 @@ static int mdt_connect_internal(const struct lu_env *env,
        if (OCD_HAS_FLAG(data, CKSUM)) {
                __u32 cksum_types = data->ocd_cksum_types;
 
-               /* The client set in ocd_cksum_types the checksum types it
-                * supports. We have to mask off the algorithms that we don't
-                * support */
-               data->ocd_cksum_types &=
-                       obd_cksum_types_supported_server(obd_name);
+               tgt_mask_cksum_types(&mdt->mdt_lut, &data->ocd_cksum_types);
 
                if (unlikely(data->ocd_cksum_types == 0)) {
                        CERROR("%s: Connect with checksum support but no "
@@ -6152,6 +6639,9 @@ static int mdt_connect_internal(const struct lu_env *env,
                mdt_enable_slc(mdt);
        }
 
+       if (!mdt->mdt_lut.lut_dt_conf.ddp_has_lseek_data_hole)
+               data->ocd_connect_flags2 &= ~OBD_CONNECT2_LSEEK;
+
        return 0;
 }
 
@@ -6244,7 +6734,8 @@ static int mdt_export_cleanup(struct obd_export *exp)
                                rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
 
                        /* Don't unlink orphan on failover umount, LU-184 */
-                       if (exp->exp_flags & OBD_OPT_FAILOVER) {
+                       if (exp->exp_flags & OBD_OPT_FAILOVER ||
+                           exp->exp_obd->obd_stopping) {
                                ma->ma_valid = MA_FLAGS;
                                ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
                        }
@@ -6528,18 +7019,19 @@ static int mdt_path_current(struct mdt_thread_info *info,
                            struct getinfo_fid2path *fp,
                            struct lu_fid *root_fid)
 {
-       struct mdt_device       *mdt = info->mti_mdt;
-       struct mdt_object       *mdt_obj;
-       struct link_ea_header   *leh;
-       struct link_ea_entry    *lee;
-       struct lu_name          *tmpname = &info->mti_name;
-       struct lu_fid           *tmpfid = &info->mti_tmp_fid1;
-       struct lu_buf           *buf = &info->mti_big_buf;
-       char                    *ptr;
-       int                     reclen;
-       struct linkea_data      ldata = { NULL };
-       int                     rc = 0;
-       bool                    first = true;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lu_name *tmpname = &info->mti_name;
+       struct lu_fid *tmpfid = &info->mti_tmp_fid1;
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct linkea_data ldata = { NULL };
+       bool first = true;
+       struct mdt_object *mdt_obj;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       char *ptr;
+       int reclen;
+       int rc = 0;
+
        ENTRY;
 
        /* temp buffer for path element, the buffer will be finally freed
@@ -6555,8 +7047,6 @@ static int mdt_path_current(struct mdt_thread_info *info,
        *tmpfid = fp->gf_fid = *mdt_object_fid(obj);
 
        while (!lu_fid_eq(root_fid, &fp->gf_fid)) {
-               struct lu_buf           lmv_buf;
-
                if (!lu_fid_eq(root_fid, &mdt->mdt_md_root_fid) &&
                    lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid))
                        GOTO(out, rc = -ENOENT);
@@ -6604,26 +7094,16 @@ static int mdt_path_current(struct mdt_thread_info *info,
                                fp->gf_linkno++;
                }
 
-               lmv_buf.lb_buf = info->mti_xattr_buf;
-               lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
                /* Check if it is slave stripes */
-               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
-                                 &lmv_buf, XATTR_NAME_LMV);
+               rc = mdt_is_dir_stripe(info, mdt_obj);
                mdt_object_put(info->mti_env, mdt_obj);
-               if (rc > 0) {
-                       union lmv_mds_md *lmm = lmv_buf.lb_buf;
-
-                       /* For slave stripes, get its master */
-                       if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) {
-                               fp->gf_fid = *tmpfid;
-                               continue;
-                       }
-               } else if (rc < 0 && rc != -ENODATA) {
+               if (rc < 0)
                        GOTO(out, rc);
+               if (rc == 1) {
+                       fp->gf_fid = *tmpfid;
+                       continue;
                }
 
-               rc = 0;
-
                /* Pack the name in the end of the buffer */
                ptr -= tmpname->ln_namelen;
                if (ptr - 1 <= fp->gf_u.gf_path)
@@ -6639,6 +7119,9 @@ static int mdt_path_current(struct mdt_thread_info *info,
                first = false;
        }
 
+       /* non-zero will be treated as an error */
+       rc = 0;
+
 remote_out:
        ptr++; /* skip leading / */
        memmove(fp->gf_u.gf_path, ptr,
@@ -6735,12 +7218,24 @@ static int mdt_fid2path(struct mdt_thread_info *info,
                RETURN(rc);
        }
 
-       if (mdt_object_remote(obj))
+       if (mdt_object_remote(obj)) {
                rc = -EREMOTE;
-       else if (!mdt_object_exists(obj))
+       } else if (!mdt_object_exists(obj)) {
                rc = -ENOENT;
-       else
-               rc = 0;
+       } else {
+               struct lu_attr la = { 0 };
+               struct dt_object *dt = mdt_obj2dt(obj);
+
+               if (dt && dt->do_ops && dt->do_ops->do_attr_get)
+                       dt_attr_get(info->mti_env, mdt_obj2dt(obj), &la);
+               if (la.la_valid & LA_FLAGS && la.la_flags & LUSTRE_ENCRYPT_FL)
+                       /* path resolution cannot be carried out on server
+                        * side for encrypted files
+                        */
+                       rc = -ENODATA;
+               else
+                       rc = 0;
+       }
 
        if (rc < 0) {
                mdt_object_put(info->mti_env, obj);
@@ -6770,7 +7265,7 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen,
        fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
        fpout = val;
 
-       if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+       if (req_capsule_req_need_swab(info->mti_pill))
                lustre_swab_fid2path(fpin);
 
        memcpy(fpout, fpin, sizeof(*fpin));
@@ -6781,7 +7276,7 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen,
                      sizeof(struct lu_fid)) {
                /* client sent its root FID, which is normally fileset FID */
                root_fid = fpin->gf_u.gf_root_fid;
-               if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+               if (req_capsule_req_need_swab(info->mti_pill))
                        lustre_swab_lu_fid(root_fid);
 
                if (root_fid != NULL && !fid_is_sane(root_fid))
@@ -6913,12 +7408,21 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                if (rc == 0)
                        rc = dt_ro(&env, dt);
                break;
-       case OBD_IOC_ABORT_RECOVERY:
+       case OBD_IOC_ABORT_RECOVERY: {
+               struct obd_ioctl_data *data = karg;
+
                CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
-               obd->obd_abort_recovery = 1;
-               target_stop_recovery_thread(obd);
+               if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) {
+                       obd->obd_abort_recov_mdt = 1;
+                       wake_up(&obd->obd_next_transno_waitq);
+               } else { /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */
+                       /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */
+                       obd->obd_abort_recovery = 1;
+                       target_stop_recovery_thread(obd);
+               }
                rc = 0;
                break;
+       }
         case OBD_IOC_CHANGELOG_REG:
         case OBD_IOC_CHANGELOG_DEREG:
         case OBD_IOC_CHANGELOG_CLEAR:
@@ -7163,10 +7667,10 @@ int mdt_cos_is_enabled(struct mdt_device *mdt)
         return mdt->mdt_opts.mo_cos != 0;
 }
 
-static struct lu_device_type_operations mdt_device_type_ops = {
-        .ldto_device_alloc = mdt_device_alloc,
-        .ldto_device_free  = mdt_device_free,
-        .ldto_device_fini  = mdt_device_fini
+static const struct lu_device_type_operations mdt_device_type_ops = {
+       .ldto_device_alloc = mdt_device_alloc,
+       .ldto_device_free  = mdt_device_free,
+       .ldto_device_fini  = mdt_device_fini
 };
 
 static struct lu_device_type mdt_device_type = {
@@ -7192,7 +7696,7 @@ static int __init mdt_init(void)
        if (rc)
                GOTO(lu_fini, rc);
 
-       rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL,
+       rc = class_register_type(&mdt_obd_device_ops, NULL, true,
                                 LUSTRE_MDT_NAME, &mdt_device_type);
        if (rc)
                GOTO(mds_fini, rc);