Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index e2397cf..7a8e853 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/mdt/mdt_handler.c
  *
 #include <uapi/linux/lustre/lustre_param.h>
 #include <lustre_quota.h>
 #include <lustre_swab.h>
+#include <lustre_lmv.h>
 #include <obd.h>
 #include <obd_support.h>
 #include <lustre_barrier.h>
 #include <obd_cksum.h>
 #include <llog_swab.h>
+#include <lustre_crypto.h>
 
 #include "mdt_internal.h"
 
-static unsigned int max_mod_rpcs_per_client = 8;
-module_param(max_mod_rpcs_per_client, uint, 0644);
-MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
+#if OBD_OCD_VERSION(3, 0, 53, 0) > LUSTRE_VERSION_CODE
+static int mdt_max_mod_rpcs_per_client_set(const char *val,
+                               cfs_kernel_param_arg_t *kp)
+{
+       unsigned int num;
+       int rc;
+
+       rc = kstrtouint(val, 0, &num);
+       if (rc < 0)
+               return rc;
+
+       if (num < 1 || num > OBD_MAX_RIF_MAX)
+               return -EINVAL;
+
+       CWARN("max_mod_rpcs_per_client is deprecated, set mdt.*.max_mod_rpcs_in_flight parameter instead\n");
+
+       max_mod_rpcs_per_client = num;
+       return 0;
+}
+static const struct kernel_param_ops
+                               param_ops_max_mod_rpcs_per_client = {
+       .set = mdt_max_mod_rpcs_per_client_set,
+       .get = param_get_uint,
+};
+
+#define param_check_max_mod_rpcs_per_client(name, p) \
+               __param_check(name, p, unsigned int)
+
+module_param_cb(max_mod_rpcs_per_client,
+                               &param_ops_max_mod_rpcs_per_client,
+                               &max_mod_rpcs_per_client, 0644);
+
+MODULE_PARM_DESC(max_mod_rpcs_per_client,
+       "maximum number of modify RPCs in flight allowed per client (Deprecated)");
+#endif
 
 mdl_mode_t mdt_mdl_lock_modes[] = {
-        [LCK_MINMODE] = MDL_MINMODE,
-        [LCK_EX]      = MDL_EX,
-        [LCK_PW]      = MDL_PW,
-        [LCK_PR]      = MDL_PR,
-        [LCK_CW]      = MDL_CW,
-        [LCK_CR]      = MDL_CR,
-        [LCK_NL]      = MDL_NL,
-        [LCK_GROUP]   = MDL_GROUP
+       [LCK_MINMODE] = MDL_MINMODE,
+       [LCK_EX]      = MDL_EX,
+       [LCK_PW]      = MDL_PW,
+       [LCK_PR]      = MDL_PR,
+       [LCK_CW]      = MDL_CW,
+       [LCK_CR]      = MDL_CR,
+       [LCK_NL]      = MDL_NL,
+       [LCK_GROUP]   = MDL_GROUP
 };
 
 enum ldlm_mode mdt_dlm_lock_modes[] = {
@@ -155,17 +188,34 @@ void mdt_set_disposition(struct mdt_thread_info *info,
                rep->lock_policy_res1 |= op_flag;
 }
 
+/* assert lock is unlocked before reuse */
+static inline void mdt_lock_handle_assert(struct mdt_lock_handle *lh)
+{
+       LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+       LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
+       LASSERT(!lustre_handle_is_used(&lh->mlh_rreg_lh));
+}
+
 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
 {
+       mdt_lock_handle_assert(lh);
        lh->mlh_pdo_hash = 0;
        lh->mlh_reg_mode = lm;
        lh->mlh_rreg_mode = lm;
        lh->mlh_type = MDT_REG_LOCK;
 }
 
+void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
+{
+       mdt_lock_reg_init(lh, lock->l_req_mode);
+       if (lock->l_req_mode == LCK_GROUP)
+               lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
+}
+
 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
                       const struct lu_name *lname)
 {
+       mdt_lock_handle_assert(lh);
        lh->mlh_reg_mode = lock_mode;
        lh->mlh_pdo_mode = LCK_MINMODE;
        lh->mlh_rreg_mode = lock_mode;
@@ -176,11 +226,11 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
                                                     lname->ln_namelen);
                /* XXX Workaround for LU-2856
                 *
-                * Zero is a valid return value of full_name_hash, but
-                * several users of mlh_pdo_hash assume a non-zero
-                * hash value. We therefore map zero onto an
-                * arbitrary, but consistent value (1) to avoid
-                * problems further down the road. */
+                * Zero is a valid return value of full_name_hash, but several
+                * users of mlh_pdo_hash assume a non-zero hash value. We
+                * therefore map zero onto an arbitrary, but consistent
+                * value (1) to avoid problems further down the road.
+                */
                if (unlikely(lh->mlh_pdo_hash == 0))
                        lh->mlh_pdo_hash = 1;
        } else {
@@ -189,78 +239,114 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
 }
 
 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
-                              struct mdt_lock_handle *lh)
-{
-        mdl_mode_t mode;
-        ENTRY;
-
-        /*
-         * Any dir access needs couple of locks:
-         *
-         * 1) on part of dir we gonna take lookup/modify;
-         *
-         * 2) on whole dir to protect it from concurrent splitting and/or to
-         * flush client's cache for readdir().
-         *
-         * so, for a given mode and object this routine decides what lock mode
-         * to use for lock #2:
-         *
-         * 1) if caller's gonna lookup in dir then we need to protect dir from
-         * being splitted only - LCK_CR
-         *
-         * 2) if caller's gonna modify dir then we need to protect dir from
-         * being splitted and to flush cache - LCK_CW
-         *
-         * 3) if caller's gonna modify dir and that dir seems ready for
-         * splitting then we need to protect it from any type of access
-         * (lookup/modify/split) - LCK_EX --bzzz
-         */
-
-        LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
-        LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
-
-        /*
-         * Ask underlaying level its opinion about preferable PDO lock mode
-         * having access type passed as regular lock mode:
-         *
-         * - MDL_MINMODE means that lower layer does not want to specify lock
-         * mode;
-         *
-         * - MDL_NL means that no PDO lock should be taken. This is used in some
-         * cases. Say, for non-splittable directories no need to use PDO locks
-         * at all.
-         */
-        mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
-                             mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
-
-        if (mode != MDL_MINMODE) {
-                lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
-        } else {
-                /*
-                 * Lower layer does not want to specify locking mode. We do it
-                 * our selves. No special protection is needed, just flush
-                 * client's cache on modification and allow concurrent
-                 * mondification.
-                 */
-                switch (lh->mlh_reg_mode) {
-                case LCK_EX:
-                        lh->mlh_pdo_mode = LCK_EX;
-                        break;
-                case LCK_PR:
-                        lh->mlh_pdo_mode = LCK_CR;
-                        break;
-                case LCK_PW:
-                        lh->mlh_pdo_mode = LCK_CW;
-                        break;
-                default:
-                        CERROR("Not expected lock type (0x%x)\n",
-                               (int)lh->mlh_reg_mode);
-                        LBUG();
-                }
-        }
-
-        LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
-        EXIT;
+                             struct mdt_lock_handle *lh)
+{
+       mdl_mode_t mode;
+
+       ENTRY;
+
+       /*
+        * Any dir access needs couple of locks:
+        *
+        * 1) on part of dir we gonna take lookup/modify;
+        *
+        * 2) on whole dir to protect it from concurrent splitting and/or to
+        * flush client's cache for readdir().
+        *
+        * so, for a given mode and object this routine decides what lock mode
+        * to use for lock #2:
+        *
+        * 1) if caller's gonna lookup in dir then we need to protect dir from
+        * being splitted only - LCK_CR
+        *
+        * 2) if caller's gonna modify dir then we need to protect dir from
+        * being splitted and to flush cache - LCK_CW
+        *
+        * 3) if caller's gonna modify dir and that dir seems ready for
+        * splitting then we need to protect it from any type of access
+        * (lookup/modify/split) - LCK_EX --bzzz
+        */
+
+       LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
+       LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
+
+       /*
+        * Ask underlaying level its opinion about preferable PDO lock mode
+        * having access type passed as regular lock mode:
+        *
+        * - MDL_MINMODE means that lower layer does not want to specify lock
+        * mode;
+        *
+        * - MDL_NL means that no PDO lock should be taken. This is used in some
+        * cases. Say, for non-splittable directories no need to use PDO locks
+        * at all.
+        */
+       mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
+                            mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
+
+       if (mode != MDL_MINMODE) {
+               lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
+       } else {
+               /*
+                * Lower layer does not want to specify locking mode. We do it
+                * our selves. No special protection is needed, just flush
+                * client's cache on modification and allow concurrent
+                * mondification.
+                */
+               switch (lh->mlh_reg_mode) {
+               case LCK_EX:
+                       lh->mlh_pdo_mode = LCK_EX;
+                       break;
+               case LCK_PR:
+                       lh->mlh_pdo_mode = LCK_CR;
+                       break;
+               case LCK_PW:
+                       lh->mlh_pdo_mode = LCK_CW;
+                       break;
+               default:
+                       CERROR("Not expected lock type (0x%x)\n",
+                              (int)lh->mlh_reg_mode);
+                       LBUG();
+               }
+       }
+
+       LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
+       EXIT;
+}
+
+/**
+ * Check whether \a o is directory stripe object.
+ *
+ * \param[in]  info    thread environment
+ * \param[in]  o       MDT object
+ *
+ * \retval 1   is directory stripe.
+ * \retval 0   isn't directory stripe.
+ * \retval < 1  error code
+ */
+static int mdt_is_dir_stripe(struct mdt_thread_info *info,
+                               struct mdt_object *o)
+{
+       struct md_attr *ma = &info->mti_attr;
+       struct lmv_mds_md_v1 *lmv;
+       int rc;
+
+       rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+       if (rc < 0)
+               return rc;
+
+       if (!(ma->ma_valid & MA_LMV))
+               return 0;
+
+       lmv = &ma->ma_lmv->lmv_md_v1;
+
+       if (!lmv_is_sane2(lmv))
+               return -EBADF;
+
+       if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE)
+               return 1;
+
+       return 0;
 }
 
 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
@@ -268,9 +354,9 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct lu_name *lname = &info->mti_name;
+       const char *start = fileset;
        char *filename = info->mti_filename;
-       struct mdt_object *parent;
-       u32 mode;
+       struct mdt_object *obj;
        int rc = 0;
 
        LASSERT(!info->mti_cross_ref);
@@ -282,8 +368,8 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
         */
        *fid = mdt->mdt_md_root_fid;
 
-       while (rc == 0 && fileset != NULL && *fileset != '\0') {
-               const char *s1 = fileset;
+       while (rc == 0 && start != NULL && *start != '\0') {
+               const char *s1 = start;
                const char *s2;
 
                while (*++s1 == '/')
@@ -295,7 +381,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                if (s2 == s1)
                        break;
 
-               fileset = s2;
+               start = s2;
 
                lname->ln_namelen = s2 - s1;
                if (lname->ln_namelen > NAME_MAX) {
@@ -304,8 +390,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                }
 
                /* reject .. as a path component */
-               if (lname->ln_namelen == 2 &&
-                   strncmp(s1, "..", 2) == 0) {
+               if (lname->ln_namelen == 2 && strncmp(s1, "..", 2) == 0) {
                        rc = -EINVAL;
                        break;
                }
@@ -314,27 +399,18 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                filename[lname->ln_namelen] = '\0';
                lname->ln_name = filename;
 
-               parent = mdt_object_find(info->mti_env, mdt, fid);
-               if (IS_ERR(parent)) {
-                       rc = PTR_ERR(parent);
+               obj = mdt_object_find(info->mti_env, mdt, fid);
+               if (IS_ERR(obj)) {
+                       rc = PTR_ERR(obj);
                        break;
                }
                /* Only got the fid of this obj by name */
                fid_zero(fid);
-               rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
+               rc = mdo_lookup(info->mti_env, mdt_object_child(obj), lname,
                                fid, &info->mti_spec);
-               mdt_object_put(info->mti_env, parent);
-       }
-       if (!rc) {
-               parent = mdt_object_find(info->mti_env, mdt, fid);
-               if (IS_ERR(parent))
-                       rc = PTR_ERR(parent);
-               else {
-                       mode = lu_object_attr(&parent->mot_obj);
-                       mdt_object_put(info->mti_env, parent);
-                       if (!S_ISDIR(mode))
-                               rc = -ENOTDIR;
-               }
+               if (!rc && !S_ISDIR(lu_object_attr(&obj->mot_obj)))
+                       rc = -ENOTDIR;
+               mdt_object_put(info->mti_env, obj);
        }
 
        return rc;
@@ -356,7 +432,7 @@ static int mdt_get_root(struct tgt_session_info *tsi)
        if (rc)
                GOTO(out, rc = err_serious(rc));
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
                GOTO(out, rc = err_serious(-ENOMEM));
 
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
@@ -371,7 +447,8 @@ static int mdt_get_root(struct tgt_session_info *tsi)
                CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
                if (fileset) {
                        /* consider fileset from client as a sub-fileset
-                        * of the nodemap one */
+                        * of the nodemap one
+                        */
                        OBD_ALLOC(buffer, PATH_MAX + 1);
                        if (buffer == NULL)
                                GOTO(out, rc = err_serious(-ENOMEM));
@@ -393,6 +470,7 @@ static int mdt_get_root(struct tgt_session_info *tsi)
        } else {
                repbody->mbo_fid1 = mdt->mdt_md_root_fid;
        }
+       exp->exp_root_fid = repbody->mbo_fid1;
        repbody->mbo_valid |= OBD_MD_FLID;
 
        EXIT;
@@ -414,22 +492,25 @@ static int mdt_statfs(struct tgt_session_info *tsi)
        struct obd_statfs *osfs;
        struct mdt_body *reqbody = NULL;
        struct mdt_statfs_cache *msf;
+       ktime_t kstart = ktime_get();
+       int current_blockbits;
        int rc;
+       timeout_t at_est;
 
        ENTRY;
 
        svcpt = req->rq_rqbd->rqbd_svcpt;
 
        /* This will trigger a watchdog timeout */
-       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
-                        (MDT_SERVICE_WATCHDOG_FACTOR *
-                         at_get(&svcpt->scp_at_estimate)) + 1);
+       at_est = obd_at_get(mdt->mdt_lu_dev.ld_obd, &svcpt->scp_at_estimate);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
+                        (MDT_SERVICE_WATCHDOG_FACTOR * at_est) + 1);
 
        rc = mdt_check_ucred(info);
        if (rc)
                GOTO(out, rc = err_serious(rc));
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
                GOTO(out, rc = err_serious(-ENOMEM));
 
        osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
@@ -450,74 +531,135 @@ static int mdt_statfs(struct tgt_session_info *tsi)
                msf = &mdt->mdt_osfs;
 
        if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
-                       /** statfs data is too old, get up-to-date one */
-                       if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
-                               rc = next->md_ops->mdo_statfs(info->mti_env,
-                                                             next, osfs);
-                       else
-                               rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
-                                              osfs);
-                       if (rc)
-                               GOTO(out, rc);
-                       spin_lock(&mdt->mdt_lock);
-                       msf->msf_osfs = *osfs;
-                       msf->msf_age = ktime_get_seconds();
-                       spin_unlock(&mdt->mdt_lock);
+               /** statfs data is too old, get up-to-date one */
+               if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
+                       rc = next->md_ops->mdo_statfs(info->mti_env, next,
+                                                     osfs);
+               else
+                       rc = dt_statfs(info->mti_env, mdt->mdt_bottom, osfs);
+               if (rc)
+                       GOTO(out, rc);
+               spin_lock(&mdt->mdt_lock);
+               msf->msf_osfs = *osfs;
+               msf->msf_age = ktime_get_seconds();
+               spin_unlock(&mdt->mdt_lock);
        } else {
-                       /** use cached statfs data */
-                       spin_lock(&mdt->mdt_lock);
-                       *osfs = msf->msf_osfs;
-                       spin_unlock(&mdt->mdt_lock);
-       }
+               /** use cached statfs data */
+               spin_lock(&mdt->mdt_lock);
+               *osfs = msf->msf_osfs;
+               spin_unlock(&mdt->mdt_lock);
+       }
+
+       /* tgd_blockbit is recordsize bits set during mkfs.
+        * This once set does not change. However, 'zfs set'
+        * can be used to change the MDT blocksize. Instead
+        * of using cached value of 'tgd_blockbit' always
+        * calculate the blocksize bits which may have
+        * changed.
+        */
+       current_blockbits = fls64(osfs->os_bsize) - 1;
 
-       /* at least try to account for cached pages.  its still racy and
-        * might be under-reporting if clients haven't announced their
-        * caches with brw recently */
-       CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
-              " pending %llu free %llu avail %llu\n",
+       /* Account for cached pages. its still racy and might be under-reporting
+        * if clients haven't announced their caches with brw recently
+        */
+       CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu pending %llu free %llu avail %llu\n",
               tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
               tgd->tgd_tot_pending,
-              osfs->os_bfree << tgd->tgd_blockbits,
-              osfs->os_bavail << tgd->tgd_blockbits);
+              osfs->os_bfree << current_blockbits,
+              osfs->os_bavail << current_blockbits);
 
        osfs->os_bavail -= min_t(u64, osfs->os_bavail,
                                 ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
-                                  osfs->os_bsize - 1) >> tgd->tgd_blockbits));
+                                  osfs->os_bsize - 1) >> current_blockbits));
 
        tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
+       if (mdt->mdt_lut.lut_no_create)
+               osfs->os_state |= OS_STATFS_NOCREATE;
        CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
               "%llu objects: %llu free; state %x\n",
               osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
               osfs->os_files, osfs->os_ffree, osfs->os_state);
 
        if (!exp_grant_param_supp(tsi->tsi_exp) &&
-           tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) {
+           current_blockbits > COMPAT_BSIZE_SHIFT) {
                /* clients which don't support OBD_CONNECT_GRANT_PARAM
                 * should not see a block size > page size, otherwise
                 * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
                 * block size which is the biggest block size known to work
-                * with all client's page size. */
-               osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
-               osfs->os_bfree  <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
-               osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
+                * with all client's page size.
+                */
+               osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bfree  <<= current_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
                osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
        }
        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_STATFS);
+               mdt_counter_incr(req, LPROC_MDT_STATFS,
+                                ktime_us_delta(ktime_get(), kstart));
 out:
        mdt_thread_info_fini(info);
        RETURN(rc);
 }
 
-/**
- * Pack size attributes into the reply.
- */
+__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
+{
+       struct lov_comp_md_v1 *comp_v1;
+       struct lov_mds_md *v1;
+       __u32 off;
+       __u32 dom_stripesize = 0;
+       int i;
+       bool has_ost_stripes = false;
+
+       ENTRY;
+
+       if (is_dom_only)
+               *is_dom_only = 0;
+
+       if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
+               RETURN(0);
+
+       comp_v1 = (struct lov_comp_md_v1 *)lmm;
+       off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
+       v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+       /* Fast check for DoM entry with no mirroring, should be the first */
+       if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
+           !(lov_pattern(le32_to_cpu(v1->lmm_pattern)) & LOV_PATTERN_MDT))
+               RETURN(0);
+
+       /* check all entries otherwise */
+       for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
+               struct lov_comp_md_entry_v1 *lcme;
+
+               lcme = &comp_v1->lcm_entries[i];
+               if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
+                       continue;
+
+               off = le32_to_cpu(lcme->lcme_offset);
+               v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+               if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) &
+                   LOV_PATTERN_MDT)
+                       dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
+               else
+                       has_ost_stripes = true;
+
+               if (dom_stripesize && has_ost_stripes)
+                       RETURN(dom_stripesize);
+       }
+       /* DoM-only case exits here */
+       if (is_dom_only && dom_stripesize)
+               *is_dom_only = 1;
+       RETURN(dom_stripesize);
+}
+
+/* Pack size attributes into the reply. */
 int mdt_pack_size2body(struct mdt_thread_info *info,
                        const struct lu_fid *fid, struct lustre_handle *lh)
 {
        struct mdt_body *b;
        struct md_attr *ma = &info->mti_attr;
-       int dom_stripe;
+       __u32 dom_stripe;
        bool dom_lock = false;
 
        ENTRY;
@@ -528,9 +670,9 @@ int mdt_pack_size2body(struct mdt_thread_info *info,
            !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
                RETURN(-ENODATA);
 
-       dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm);
+       dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
        /* no DoM stripe, no size in reply */
-       if (dom_stripe == LMM_NO_DOM)
+       if (!dom_stripe)
                RETURN(-ENOENT);
 
        if (lustre_handle_is_used(lh)) {
@@ -548,7 +690,8 @@ int mdt_pack_size2body(struct mdt_thread_info *info,
                RETURN(0);
 
        /* Either DoM lock exists or LMM has only DoM stripe then
-        * return size on body. */
+        * return size on body.
+        */
        b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
        mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
@@ -583,6 +726,7 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
        if (buf->lb_len == 0)
                RETURN(0);
 
+       LASSERT(!info->mti_big_acl_used);
 again:
        rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
        if (rc < 0) {
@@ -592,14 +736,14 @@ again:
                        rc = 0;
                } else if (rc == -EOPNOTSUPP) {
                        rc = 0;
-               } else {
-                       if (rc == -ERANGE &&
-                           exp_connect_large_acl(info->mti_exp) &&
-                           buf->lb_buf != info->mti_big_acl) {
+               } else if (rc == -ERANGE) {
+                       if (exp_connect_large_acl(info->mti_exp) &&
+                           !info->mti_big_acl_used) {
                                if (info->mti_big_acl == NULL) {
                                        info->mti_big_aclsize =
-                                                       MIN(mdt->mdt_max_ea_size,
-                                                           XATTR_SIZE_MAX);
+                                                       min_t(unsigned int,
+                                                             mdt->mdt_max_ea_size,
+                                                             XATTR_SIZE_MAX);
                                        OBD_ALLOC_LARGE(info->mti_big_acl,
                                                        info->mti_big_aclsize);
                                        if (info->mti_big_acl == NULL) {
@@ -620,47 +764,19 @@ again:
 
                                buf->lb_buf = info->mti_big_acl;
                                buf->lb_len = info->mti_big_aclsize;
-
+                               info->mti_big_acl_used = 1;
                                goto again;
                        }
-
+                       /* FS has ACL bigger that our limits */
+                       CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
+                              mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
+                              info->mti_big_aclsize);
+                       rc = -E2BIG;
+               } else {
                        CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
                               mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
                }
        } else {
-               int client;
-               int server;
-               int acl_buflen;
-               int lmm_buflen = 0;
-               int lmmsize = 0;
-
-               acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
-               if (acl_buflen >= rc)
-                       goto map;
-
-               /* If LOV/LMA EA is small, we can reuse part of their buffer */
-               client = ptlrpc_req_get_repsize(pill->rc_req);
-               server = lustre_packed_msg_size(pill->rc_req->rq_repmsg);
-               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
-                       lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD,
-                                                         RCL_SERVER);
-                       lmmsize = repbody->mbo_eadatasize;
-               }
-
-               if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) {
-                       CDEBUG(D_INODE, "%s: client prepared buffer size %d "
-                              "is not big enough with the ACL size %d (%d)\n",
-                              mdt_obd_name(mdt), client, rc,
-                              server - acl_buflen - lmm_buflen + rc + lmmsize);
-                       repbody->mbo_aclsize = 0;
-                       repbody->mbo_valid &= ~OBD_MD_FLACL;
-                       RETURN(-ERANGE);
-               }
-
-map:
-               if (buf->lb_buf == info->mti_big_acl)
-                       info->mti_big_acl_used = 1;
-
                rc = nodemap_map_acl(nodemap, buf->lb_buf,
                                     rc, NODEMAP_FS_TO_CLIENT);
                /* if all ACLs mapped out, rc is still >= 0 */
@@ -706,10 +822,11 @@ static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
 }
 
 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
-                        const struct lu_attr *attr, const struct lu_fid *fid)
+                       const struct lu_attr *attr, const struct lu_fid *fid)
 {
-       struct md_attr *ma = &info->mti_attr;
+       struct mdt_device *mdt = info->mti_mdt;
        struct obd_export *exp = info->mti_exp;
+       struct md_attr *ma = &info->mti_attr;
        struct lu_nodemap *nodemap = NULL;
 
        LASSERT(ma->ma_valid & MA_INODE);
@@ -726,6 +843,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_ctime = attr->la_ctime;
                b->mbo_valid |= OBD_MD_FLCTIME;
        }
+       if (attr->la_valid & LA_BTIME) {
+               b->mbo_btime = attr->la_btime;
+               b->mbo_valid |= OBD_MD_FLBTIME;
+       }
        if (attr->la_valid & LA_FLAGS) {
                b->mbo_flags = attr->la_flags;
                b->mbo_valid |= OBD_MD_FLFLAGS;
@@ -734,7 +855,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_nlink = attr->la_nlink;
                b->mbo_valid |= OBD_MD_FLNLINK;
        }
-       if (attr->la_valid & (LA_UID|LA_GID)) {
+       if (attr->la_valid & (LA_UID|LA_GID|LA_PROJID)) {
                nodemap = nodemap_get_from_exp(exp);
                if (IS_ERR(nodemap))
                        goto out;
@@ -753,8 +874,9 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
        }
 
        if (attr->la_valid & LA_PROJID) {
-               /* TODO, nodemap for project id */
-               b->mbo_projid = attr->la_projid;
+               b->mbo_projid = nodemap_map_id(nodemap, NODEMAP_PROJID,
+                                              NODEMAP_FS_TO_CLIENT,
+                                              attr->la_projid);
                b->mbo_valid |= OBD_MD_FLPROJID;
        }
 
@@ -786,7 +908,8 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                /* just ignore blocks occupied by extend attributes on MDS */
                b->mbo_blocks = 0;
                /* if no object is allocated on osts, the size on mds is valid.
-                * b=22272 */
+                * b=22272
+                */
                b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
        } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
                if (mdt_hsm_is_released(ma->ma_lmm)) {
@@ -799,7 +922,8 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                        else
                                b->mbo_blocks = 1;
                        b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-               } else if (info->mti_som_valid) { /* som is valid */
+               } else if (info->mti_som_strict && mdt->mdt_enable_strict_som) {
+                       /* use SOM for size*/
                        b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
                } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
                        b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
@@ -827,27 +951,29 @@ static inline int mdt_body_has_lov(const struct lu_attr *la,
 
 void mdt_client_compatibility(struct mdt_thread_info *info)
 {
-        struct mdt_body       *body;
-        struct ptlrpc_request *req = mdt_info_req(info);
-        struct obd_export     *exp = req->rq_export;
-        struct md_attr        *ma = &info->mti_attr;
-        struct lu_attr        *la = &ma->ma_attr;
-        ENTRY;
+       struct mdt_body       *body;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct obd_export     *exp = req->rq_export;
+       struct md_attr        *ma = &info->mti_attr;
+       struct lu_attr        *la = &ma->ma_attr;
+
+       ENTRY;
 
        if (exp_connect_layout(exp))
                /* the client can deal with 16-bit lmm_stripe_count */
                RETURN_EXIT;
 
-        body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-        if (!mdt_body_has_lov(la, body))
-                RETURN_EXIT;
+       if (!mdt_body_has_lov(la, body))
+               RETURN_EXIT;
 
-        /* now we have a reply with a lov for a client not compatible with the
-         * layout lock so we have to clean the layout generation number */
-        if (S_ISREG(la->la_mode))
-                ma->ma_lmm->lmm_layout_gen = 0;
-        EXIT;
+       /* now we have a reply with a lov for a client not compatible with the
+        * layout lock so we have to clean the layout generation number
+        */
+       if (S_ISREG(la->la_mode))
+               ma->ma_lmm->lmm_layout_gen = 0;
+       EXIT;
 }
 
 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
@@ -888,6 +1014,7 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
 {
        const struct lu_env *env = info->mti_env;
        int rc;
+
        ENTRY;
 
        LASSERT(info->mti_big_lmm_used == 0);
@@ -922,8 +1049,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(rc);
 }
 
-int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
-                  struct md_attr *ma, const char *name)
+int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                    struct md_attr *ma, const char *name)
 {
        struct md_object *next = mdt_object_child(o);
        struct lu_buf    *buf = &info->mti_buf;
@@ -947,6 +1074,13 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
 
        LASSERT(buf->lb_buf);
 
+       if (!mdt_object_exists(o))
+               return -ENOENT;
+
+       if (mdt_object_remote(o) && S_ISDIR(lu_object_attr(&o->mot_obj)))
+               /* force reload layout for remote dir in case layout changed */
+               mo_invalidate(info->mti_env, mdt_object_child(o));
+
        rc = mo_xattr_get(info->mti_env, next, buf, name);
        if (rc > 0) {
 
@@ -961,10 +1095,9 @@ got:
                            !(exp_connect_flags(info->mti_exp) &
                              OBD_CONNECT_LFSCK)) {
                                return -EIO;
-                       } else {
-                               ma->ma_lmm_size = rc;
-                               ma->ma_valid |= MA_LOV;
                        }
+                       ma->ma_lmm_size = rc;
+                       ma->ma_valid |= MA_LOV;
                } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
                        if (info->mti_big_lmm_used)
                                ma->ma_lmv = info->mti_big_lmm;
@@ -986,7 +1119,8 @@ got:
                rc = 0;
        } else if (rc == -ERANGE) {
                /* Default LMV has fixed size, so it must be able to fit
-                * in the original buffer */
+                * in the original buffer
+                */
                if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
                        return rc;
                rc = mdt_big_xattr_get(info, o, name);
@@ -999,6 +1133,40 @@ got:
        return rc;
 }
 
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                  struct md_attr *ma, const char *name)
+{
+       int rc;
+
+       if (!info->mti_big_lmm) {
+               OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
+               if (!info->mti_big_lmm)
+                       return -ENOMEM;
+               info->mti_big_lmmsize = PAGE_SIZE;
+       }
+
+       if (strcmp(name, XATTR_NAME_LOV) == 0) {
+               ma->ma_lmm = info->mti_big_lmm;
+               ma->ma_lmm_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LOV;
+       } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
+               ma->ma_lmv = info->mti_big_lmm;
+               ma->ma_lmv_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LMV;
+       } else {
+               LBUG();
+       }
+
+       LASSERT(!info->mti_big_lmm_used);
+       rc = __mdt_stripe_get(info, o, ma, name);
+       /* since big_lmm is always used here, clear 'used' flag to avoid
+        * assertion in mdt_big_xattr_get().
+        */
+       info->mti_big_lmm_used = 0;
+
+       return rc;
+}
+
 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
                      struct lu_fid *pfid)
 {
@@ -1006,6 +1174,7 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
        struct link_ea_header   *leh;
        struct link_ea_entry    *lee;
        int                      rc;
+
        ENTRY;
 
        buf->lb_buf = info->mti_big_lmm;
@@ -1013,7 +1182,8 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
        rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
                          buf, XATTR_NAME_LINK);
        /* ignore errors, MA_PFID won't be set and it is
-        * up to the caller to treat this as an error */
+        * up to the caller to treat this as an error
+        */
        if (rc == -ERANGE || buf->lb_len == 0) {
                rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
                buf->lb_buf = info->mti_big_lmm;
@@ -1046,6 +1216,51 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(0);
 }
 
+int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct lu_fid *pfid, struct lu_name *lname)
+{
+       struct lu_buf *buf = &info->mti_buf;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       int reclen;
+       int rc;
+
+       buf->lb_buf = info->mti_xattr_buf;
+       buf->lb_len = sizeof(info->mti_xattr_buf);
+       rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
+                         XATTR_NAME_LINK);
+       if (rc == -ERANGE) {
+               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+               buf->lb_buf = info->mti_big_lmm;
+               buf->lb_len = info->mti_big_lmmsize;
+       }
+       if (rc < 0)
+               return rc;
+
+       if (rc < sizeof(*leh)) {
+               CERROR("short LinkEA on "DFID": rc = %d\n",
+                      PFID(mdt_object_fid(o)), rc);
+               return -ENODATA;
+       }
+
+       leh = (struct link_ea_header *)buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+               leh->leh_magic = LINK_EA_MAGIC;
+               leh->leh_reccount = __swab32(leh->leh_reccount);
+               leh->leh_len = __swab64(leh->leh_len);
+       }
+       if (leh->leh_magic != LINK_EA_MAGIC)
+               return -EINVAL;
+
+       if (leh->leh_reccount == 0)
+               return -ENODATA;
+
+       linkea_entry_unpack(lee, &reclen, lname, pfid);
+
+       return 0;
+}
+
 int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma)
 {
@@ -1055,6 +1270,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
        int                  need = ma->ma_need;
        int                  rc = 0, rc2;
        u32                  mode;
+
        ENTRY;
 
        ma->ma_valid = 0;
@@ -1065,6 +1281,10 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
 
        if (need & MA_INODE) {
                ma->ma_need = MA_INODE;
+               if (need & MA_DIRENT_CNT)
+                       ma->ma_attr.la_valid |= LA_DIRENT_CNT;
+               else
+                       ma->ma_attr.la_valid &= ~LA_DIRENT_CNT;
                rc = mo_attr_get(env, next, ma);
                if (rc)
                        GOTO(out, rc);
@@ -1083,26 +1303,24 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
        }
 
        if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
                if (rc)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV_DEF && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
-       /*
-        * In the handle of MA_INODE, we may already get the SOM attr.
-        */
+       /* In the handle of MA_INODE, we may already get the SOM attr.  */
        if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
                rc = mdt_get_som(info, o, ma);
                if (rc != 0)
@@ -1144,23 +1362,40 @@ out:
        RETURN(rc);
 }
 
+static void mdt_preset_encctx_size(struct mdt_thread_info *info)
+{
+       struct req_capsule *pill = info->mti_pill;
+
+       ENTRY;
+       if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
+                                 RCL_SERVER))
+               /* pre-set size in server part with max size */
+               req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
+                                    RCL_SERVER,
+                                    info->mti_mdt->mdt_max_mdsize);
+       EXIT;
+}
+
 static int mdt_getattr_internal(struct mdt_thread_info *info,
-                                struct mdt_object *o, int ma_need)
+                               struct mdt_object *o, int ma_need)
 {
-       struct md_object        *next = mdt_object_child(o);
-       const struct mdt_body   *reqbody = info->mti_body;
-       struct ptlrpc_request   *req = mdt_info_req(info);
-       struct md_attr          *ma = &info->mti_attr;
-       struct lu_attr          *la = &ma->ma_attr;
-       struct req_capsule      *pill = info->mti_pill;
-       const struct lu_env     *env = info->mti_env;
-       struct mdt_body         *repbody;
-       struct lu_buf           *buffer = &info->mti_buf;
-       struct obd_export       *exp = info->mti_exp;
-       int                      rc;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct md_object *next = mdt_object_child(o);
+       const struct mdt_body *reqbody = info->mti_body;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_attr *la = &ma->ma_attr;
+       struct req_capsule *pill = info->mti_pill;
+       const struct lu_env *env = info->mti_env;
+       struct mdt_body *repbody;
+       struct lu_buf *buffer = &info->mti_buf;
+       struct obd_export *exp = info->mti_exp;
+       ktime_t kstart = ktime_get();
+       int rc;
+
        ENTRY;
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
                RETURN(err_serious(-ENOMEM));
 
        repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
@@ -1168,8 +1403,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
        ma->ma_valid = 0;
 
        if (mdt_object_remote(o)) {
-               /* This object is located on remote node.*/
-               /* Return -ENOTSUPP for old client */
+               /* obj is located on remote node Return -ENOTSUPP(old client) */
                if (!mdt_is_dne_client(req->rq_export))
                        GOTO(out, rc = -ENOTSUPP);
 
@@ -1244,23 +1478,30 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                }
        }
 
-        if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
+       if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
            reqbody->mbo_valid & OBD_MD_FLDIREA  &&
-            lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
-                /* get default stripe info for this dir. */
-                ma->ma_need |= MA_LOV_DEF;
-        }
-        ma->ma_need |= ma_need;
+           lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
+               /* get default stripe info for this dir. */
+               ma->ma_need |= MA_LOV_DEF;
+       }
+       ma->ma_need |= ma_need;
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
-               CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
-                      "%s: getattr error for "DFID": rc = %d\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(mdt_object_fid(o)), rc);
+               CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
+                            "%s: getattr error for "DFID": rc = %d\n",
+                            mdt_obd_name(info->mti_mdt),
+                            PFID(mdt_object_fid(o)), rc);
                RETURN(rc);
        }
 
+       /* return immutable attr on fscrypt metadata files
+        * if fscrypt admin is not permitted
+        */
+       if (o->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD &&
+           !mdt_ucred(info)->uc_rbac_fscrypt_admin)
+               la->la_flags |= LUSTRE_IMMUTABLE_FL;
+
        /* if file is released, check if a restore is running */
        if (ma->ma_valid & MA_HSM) {
                repbody->mbo_valid |= OBD_MD_TSTATE;
@@ -1269,22 +1510,28 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_t_state = MS_RESTORE;
        }
 
-        if (likely(ma->ma_valid & MA_INODE))
-                mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
-        else
-                RETURN(-EFAULT);
+       if (unlikely(!(ma->ma_valid & MA_INODE)))
+               RETURN(-EFAULT);
+
+       mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
 
-        if (mdt_body_has_lov(la, reqbody)) {
-                if (ma->ma_valid & MA_LOV) {
-                        LASSERT(ma->ma_lmm_size);
+       if (mdt_body_has_lov(la, reqbody)) {
+               u32 stripe_count = 1;
+               bool fixed_layout = false;
+
+               if (ma->ma_valid & MA_LOV) {
+                       LASSERT(ma->ma_lmm_size);
                        repbody->mbo_eadatasize = ma->ma_lmm_size;
                        if (S_ISDIR(la->la_mode))
                                repbody->mbo_valid |= OBD_MD_FLDIREA;
                        else
                                repbody->mbo_valid |= OBD_MD_FLEASIZE;
                        mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
-                }
+               }
                if (ma->ma_valid & MA_LMV) {
+                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+                       u32 magic = le32_to_cpu(lmv->lmv_magic);
+
                        /* Return -ENOTSUPP for old client */
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
@@ -1293,6 +1540,14 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->mbo_eadatasize = ma->ma_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
+
+                       stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+                       fixed_layout = lmv_is_fixed(lmv);
+                       if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
+                               mdt_restripe_migrate_add(info, o);
+                       else if (magic == LMV_MAGIC_V1 &&
+                                lmv_is_restriping(lmv))
+                               mdt_restripe_update_add(info, o);
                }
                if (ma->ma_valid & MA_LMV_DEF) {
                        /* Return -ENOTSUPP for old client */
@@ -1309,11 +1564,25 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_valid |= (OBD_MD_FLDIREA |
                                               OBD_MD_DEFAULT_MEA);
                }
+               CDEBUG(D_VFSTRACE,
+                      "dirent count %llu stripe count %u MDT count %d\n",
+                      ma->ma_attr.la_dirent_count, stripe_count,
+                      atomic_read(&mdt->mdt_mds_mds_conns) + 1);
+               if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
+                   ma->ma_attr.la_dirent_count >
+                       mdt->mdt_restriper.mdr_dir_split_count &&
+                   !fid_is_root(mdt_object_fid(o)) &&
+                   mdt->mdt_enable_dir_auto_split &&
+                   !o->mot_restriping &&
+                   stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
+                   !fixed_layout)
+                       mdt_auto_split_add(info, o);
        } else if (S_ISLNK(la->la_mode) &&
                   reqbody->mbo_valid & OBD_MD_LINKNAME) {
                buffer->lb_buf = ma->ma_lmm;
                /* eadatasize from client includes NULL-terminator, so
-                * there is no need to read it */
+                * there is no need to read it
+                */
                buffer->lb_len = reqbody->mbo_eadatasize - 1;
                rc = mo_readlink(env, next, buffer);
                if (unlikely(rc <= 0)) {
@@ -1324,15 +1593,16 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                } else {
                        int print_limit = min_t(int, PAGE_SIZE - 128, rc);
 
-                       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
+                       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
                                rc -= 2;
                        repbody->mbo_valid |= OBD_MD_LINKNAME;
                        /* we need to report back size with NULL-terminator
-                        * because client expects that */
+                        * because client expects that
+                        */
                        repbody->mbo_eadatasize = rc + 1;
                        if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
-                               CDEBUG(D_INODE, "%s: Read shorter symlink %d "
-                                      "on "DFID ", expected %d\n",
+                               CDEBUG(D_INODE, "%s: Read shorter symlink %d on "
+                                      DFID ", expected %d\n",
                                       mdt_obd_name(info->mti_mdt),
                                       rc, PFID(mdt_object_fid(o)),
                                       reqbody->mbo_eadatasize - 1);
@@ -1341,13 +1611,14 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
                        /* If the total CDEBUG() size is larger than a page, it
                         * will print a warning to the console, avoid this by
-                        * printing just the last part of the symlink. */
+                        * printing just the last part of the symlink.
+                        */
                        CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
                               print_limit < rc ? "..." : "", print_limit,
                               (char *)ma->ma_lmm + rc - print_limit, rc);
                        rc = 0;
-                }
-        }
+               }
+       }
 
        if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
                repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
@@ -1360,6 +1631,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
        if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
                 (reqbody->mbo_valid & OBD_MD_FLACL)) {
                struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
+
                if (IS_ERR(nodemap))
                        RETURN(PTR_ERR(nodemap));
 
@@ -1369,21 +1641,23 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 #endif
 
 out:
-        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_GETATTR);
+       if (rc == 0)
+               mdt_counter_incr(req, LPROC_MDT_GETATTR,
+                                ktime_us_delta(ktime_get(), kstart));
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_getattr(struct tgt_session_info *tsi)
 {
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
-        struct mdt_object       *obj = info->mti_object;
-        struct req_capsule      *pill = info->mti_pill;
-        struct mdt_body         *reqbody;
-        struct mdt_body         *repbody;
-        int rc, rc2;
-        ENTRY;
+       struct mdt_object       *obj = info->mti_object;
+       struct req_capsule      *pill = info->mti_pill;
+       struct mdt_body         *reqbody;
+       struct mdt_body         *repbody;
+       int rc, rc2;
+
+       ENTRY;
 
        if (unlikely(info->mti_object == NULL))
                RETURN(-EPROTO);
@@ -1401,11 +1675,12 @@ static int mdt_getattr(struct tgt_session_info *tsi)
        /* Unlike intent case where we need to pre-fill out buffers early on
         * in intent policy for ldlm reasons, here we can have a much better
         * guess at EA size by just reading it from disk.
-        * Exceptions are readdir and (missing) directory striping */
-       /* Readlink */
-       if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
+        * Exceptions are readdir and (missing) directory striping
+        */
+       if (reqbody->mbo_valid & OBD_MD_LINKNAME) { /* Readlink */
                /* No easy way to know how long is the symlink, but it cannot
-                * be more than PATH_MAX, so we allocate +1 */
+                * be more than PATH_MAX, so we allocate +1
+                */
                rc = PATH_MAX + 1;
        /* A special case for fs ROOT: getattr there might fetch
         * default EA for entire fs, not just for this dir!
@@ -1416,7 +1691,8 @@ static int mdt_getattr(struct tgt_session_info *tsi)
                   (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
                                                                 MDS_GETATTR)) {
                /* Should the default strping be bigger, mdt_fix_reply
-                * will reallocate */
+                * will reallocate
+                */
                rc = DEF_REP_MD_SIZE;
        } else {
                /* Read the actual EA size from disk */
@@ -1430,16 +1706,18 @@ static int mdt_getattr(struct tgt_session_info *tsi)
 
        /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
         * by default. If the target object has more ACL entries, then
-        * enlarge the buffer when necessary. */
+        * enlarge the buffer when necessary.
+        */
        req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
                             LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+       mdt_preset_encctx_size(info);
 
        rc = req_capsule_server_pack(pill);
        if (unlikely(rc != 0))
                GOTO(out, rc = err_serious(rc));
 
-        repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
-        LASSERT(repbody != NULL);
+       repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
+       LASSERT(repbody != NULL);
        repbody->mbo_eadatasize = 0;
        repbody->mbo_aclsize = 0;
 
@@ -1449,8 +1727,18 @@ static int mdt_getattr(struct tgt_session_info *tsi)
 
        info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
 
+       rc = mdt_init_ucred(info, reqbody);
+       if (rc)
+               GOTO(out_shrink, rc);
+
        rc = mdt_getattr_internal(info, obj, 0);
-       EXIT;
+       if (unlikely(rc))
+               GOTO(out_ucred, rc);
+
+       rc = mdt_pack_encctx_in_reply(info, obj);
+       EXIT;
+out_ucred:
+       mdt_exit_ucred(info);
 out_shrink:
        mdt_client_compatibility(info);
        rc2 = mdt_fix_reply(info);
@@ -1497,10 +1785,13 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
 
        if (rc > 0) {
                /* not resent */
-               mdt_lock_handle_init(lhc);
-               mdt_lock_reg_init(lhc, LCK_EX);
-               rc = mdt_reint_object_lock(info, obj, lhc, MDS_INODELOCK_LAYOUT,
-                                          false);
+               __u64 lockpart = MDS_INODELOCK_LAYOUT;
+
+               /* take layout lock to prepare layout change */
+               if (layout->mlc_opc == MD_LAYOUT_WRITE)
+                       lockpart |= MDS_INODELOCK_UPDATE;
+
+               rc = mdt_object_lock(info, obj, lhc, lockpart, LCK_EX);
                if (rc)
                        RETURN(rc);
        }
@@ -1545,7 +1836,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi)
        struct mdt_object       *o1, *o2, *o;
        struct mdt_lock_handle  *lh1, *lh2;
        struct mdc_swap_layouts *msl;
-       int                      rc;
+       int rc;
+
        ENTRY;
 
        /* client does not support layout lock, so layout swaping
@@ -1554,7 +1846,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi)
         * layout lock yet. If those clients have already opened the file
         * they won't be notified at all so that old layout may still be
         * used to do IO. This can be fixed after file release is landed by
-        * doing exclusive open and taking full EX ibits lock. - Jinshan */
+        * doing exclusive open and taking full EX ibits lock. - Jinshan
+        */
        if (!exp_connect_layout(exp))
                RETURN(-EOPNOTSUPP);
 
@@ -1582,7 +1875,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi)
                swap(o1, o2);
 
        /* permission check. Make sure the calling process having permission
-        * to write both files. */
+        * to write both files.
+        */
        rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
                           MAY_WRITE);
        if (rc < 0)
@@ -1598,22 +1892,19 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi)
                GOTO(put, rc = -EPROTO);
 
        lh1 = &info->mti_lh[MDT_LH_NEW];
-       mdt_lock_reg_init(lh1, LCK_EX);
        lh2 = &info->mti_lh[MDT_LH_OLD];
-       mdt_lock_reg_init(lh2, LCK_EX);
-
        rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_EX);
        if (rc < 0)
                GOTO(put, rc);
 
        rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
+                            MDS_INODELOCK_XATTR, LCK_EX);
        if (rc < 0)
                GOTO(unlock1, rc);
 
        rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
-                            mdt_object_child(o2), msl->msl_flags);
+                            mdt_object_child(o2), 0, 0, msl->msl_flags);
        if (rc < 0)
                GOTO(unlock2, rc);
 
@@ -1632,29 +1923,143 @@ out:
 
 static int mdt_raw_lookup(struct mdt_thread_info *info,
                          struct mdt_object *parent,
-                         const struct lu_name *lname,
-                         struct ldlm_reply *ldlm_rep)
+                         const struct lu_name *lname)
 {
-       struct lu_fid   *child_fid = &info->mti_tmp_fid1;
-       int              rc;
+       struct lu_fid *fid = &info->mti_tmp_fid1;
+       struct mdt_body *repbody;
+       bool is_dotdot = false;
+       bool is_old_parent_stripe = false;
+       bool is_new_parent_checked = false;
+       int rc;
+
        ENTRY;
 
        LASSERT(!info->mti_cross_ref);
+       /* Always allow to lookup ".." */
+       if (lname->ln_namelen == 2 &&
+           lname->ln_name[0] == '.' && lname->ln_name[1] == '.') {
+               info->mti_spec.sp_permitted = 1;
+               is_dotdot = true;
+               if (mdt_is_dir_stripe(info, parent) == 1)
+                       is_old_parent_stripe = true;
+       }
 
+       mdt_object_get(info->mti_env, parent);
+lookup:
        /* Only got the fid of this obj by name */
-       fid_zero(child_fid);
-       rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
-                       lname, child_fid, &info->mti_spec);
-       if (rc == 0) {
-               struct mdt_body *repbody;
+       fid_zero(fid);
+       rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid,
+                       &info->mti_spec);
+       mdt_object_put(info->mti_env, parent);
+       if (rc)
+               RETURN(rc);
+
+       /* getattr_name("..") should return master object FID for striped dir */
+       if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) {
+               parent = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+               if (IS_ERR(parent))
+                       RETURN(PTR_ERR(parent));
+
+               /* old client getattr_name("..") with stripe FID */
+               if (unlikely(is_old_parent_stripe)) {
+                       is_old_parent_stripe = false;
+                       goto lookup;
+               }
+
+               /* ".." may be a stripe */
+               if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) {
+                       is_new_parent_checked = true;
+                       goto lookup;
+               }
+
+               mdt_object_put(info->mti_env, parent);
+       }
+
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       repbody->mbo_fid1 = *fid;
+       repbody->mbo_valid = OBD_MD_FLID;
+
+       RETURN(rc);
+}
+
+/**
+ * Find name matching hash
+ *
+ * We search \a child LinkEA for a name whose hash matches \a lname
+ * (it contains an encoded hash).
+ *
+ * \param info mdt thread info
+ * \param lname encoded hash to find
+ * \param parent parent object
+ * \param child object to search with LinkEA
+ *
+ * \retval 1 match found
+ * \retval 0 no match found
+ * \retval -ev negative errno upon error
+ */
+int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname,
+                           struct mdt_object *parent, struct mdt_object *child)
+{
+       /* Here, lname is an encoded hash of on-disk name, and
+        * client is doing access without encryption key.
+        * So we need to get LinkEA, check parent fid is correct and
+        * compare name hash with the one in the request.
+        */
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct lu_name name;
+       struct lu_fid pfid;
+       struct linkea_data ldata = { NULL };
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       struct lu_buf link = { 0 };
+       char *hash;
+       int reclen, count, rc;
+
+       ENTRY;
+       if (lname->ln_namelen < LL_CRYPTO_BLOCK_SIZE)
+               RETURN(-EINVAL);
+
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (!buf->lb_buf)
+               RETURN(-ENOMEM);
+
+       ldata.ld_buf = buf;
+       rc = mdt_links_read(info, child, &ldata);
+       if (rc < 0)
+               RETURN(rc);
+
+       hash = kmalloc(lname->ln_namelen, GFP_NOFS);
+       if (!hash)
+               RETURN(-ENOMEM);
+       rc = critical_decode(lname->ln_name, lname->ln_namelen, hash);
 
-               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-               repbody->mbo_fid1 = *child_fid;
-               repbody->mbo_valid = OBD_MD_FLID;
-               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
-       } else if (rc == -ENOENT) {
-               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+       leh = buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       for (count = 0; count < leh->leh_reccount; count++) {
+               linkea_entry_unpack(lee, &reclen, &name, &pfid);
+               if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) {
+                       lu_buf_check_and_alloc(&link, name.ln_namelen);
+                       if (!link.lb_buf)
+                               GOTO(out_match, rc = -ENOMEM);
+                       rc = critical_decode(name.ln_name, name.ln_namelen,
+                                            link.lb_buf);
+
+                       if (memcmp(LLCRYPT_EXTRACT_DIGEST(link.lb_buf, rc),
+                                  hash, LL_CRYPTO_BLOCK_SIZE) == 0) {
+                               *lname = name;
+                               break;
+                       }
+               }
+               lee = (struct link_ea_entry *) ((char *)lee + reclen);
        }
+       if (count == leh->leh_reccount)
+               rc = 0;
+       else
+               rc = 1;
+
+out_match:
+       lu_buf_free(&link);
+       kfree(hash);
 
        RETURN(rc);
 }
@@ -1666,19 +2071,20 @@ static int mdt_raw_lookup(struct mdt_thread_info *info,
  *            (2)intent request will grant the lock to client.
  */
 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
-                                 struct mdt_lock_handle *lhc,
-                                 __u64 child_bits,
-                                 struct ldlm_reply *ldlm_rep)
+                                struct mdt_lock_handle *lhc,
+                                __u64 child_bits,
+                                struct ldlm_reply *ldlm_rep)
 {
-       struct ptlrpc_request  *req = mdt_info_req(info);
-       struct mdt_body        *reqbody = NULL;
-       struct mdt_object      *parent = info->mti_object;
-       struct mdt_object      *child;
-       struct lu_fid          *child_fid = &info->mti_tmp_fid1;
-       struct lu_name         *lname = NULL;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct mdt_body *reqbody = NULL;
+       struct mdt_object *parent = info->mti_object;
+       struct mdt_object *child = NULL;
+       struct lu_fid *child_fid = &info->mti_tmp_fid1;
+       struct lu_name *lname = NULL;
        struct mdt_lock_handle *lhp = NULL;
-       struct ldlm_lock       *lock;
+       struct ldlm_lock *lock;
        struct req_capsule *pill = info->mti_pill;
+       bool fscrypt_md = false;
        __u64 try_bits = 0;
        bool is_resent;
        int ma_need = 0;
@@ -1693,22 +2099,21 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        if (parent == NULL)
                RETURN(-ENOENT);
 
+       if (info->mti_mdt->mdt_enable_dir_auto_split)
+               ma_need |= MA_DIRENT_CNT;
+
        if (info->mti_cross_ref) {
                /* Only getattr on the child. Parent is on another node. */
                mdt_set_disposition(info, ldlm_rep,
                                    DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
                child = parent;
-               CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
-                      "ldlm_rep = %p\n",
+               CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", ldlm_rep = %p\n",
                       PFID(mdt_object_fid(child)), ldlm_rep);
 
                rc = mdt_check_resent_lock(info, child, lhc);
                if (rc < 0) {
                        RETURN(rc);
                } else if (rc > 0) {
-                       mdt_lock_handle_init(lhc);
-                       mdt_lock_reg_init(lhc, LCK_PR);
-
                        /*
                         * Object's name entry is on another MDS, it will
                         * request PERM lock only because LOOKUP lock is owned
@@ -1719,8 +2124,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        child_bits &= ~(MDS_INODELOCK_LOOKUP |
                                        MDS_INODELOCK_LAYOUT);
                        child_bits |= MDS_INODELOCK_PERM;
-
-                       rc = mdt_object_lock(info, child, lhc, child_bits);
+                       rc = mdt_object_lock(info, child, lhc, child_bits,
+                                            LCK_PR);
                        if (rc < 0)
                                RETURN(rc);
                }
@@ -1734,13 +2139,19 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        RETURN(-ENOENT);
                }
 
-               rc = mdt_getattr_internal(info, child, 0);
+               rc = mdt_getattr_internal(info, child, ma_need);
                if (unlikely(rc != 0)) {
                        mdt_object_unlock(info, child, lhc, 1);
                        RETURN(rc);
                }
 
                rc = mdt_pack_secctx_in_reply(info, child);
+               if (unlikely(rc)) {
+                       mdt_object_unlock(info, child, lhc, 1);
+                       RETURN(rc);
+               }
+
+               rc = mdt_pack_encctx_in_reply(info, child);
                if (unlikely(rc))
                        mdt_object_unlock(info, child, lhc, 1);
                RETURN(rc);
@@ -1749,51 +2160,117 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        lname = &info->mti_name;
        mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
 
-       if (lu_name_is_valid(lname)) {
-               CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
-                      "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
+       if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
+               reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
+               if (unlikely(reqbody == NULL))
+                       RETURN(err_serious(-EPROTO));
+
+               *child_fid = reqbody->mbo_fid2;
+               if (unlikely(!fid_is_sane(child_fid)))
+                       RETURN(err_serious(-EINVAL));
+
+               if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
+                       mdt_object_get(info->mti_env, parent);
+                       child = parent;
+               } else {
+                       child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                               child_fid);
+                       if (IS_ERR(child))
+                               RETURN(PTR_ERR(child));
+               }
+
+               CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", ldlm_rep = %p\n",
+                      PFID(mdt_object_fid(parent)),
+                      PFID(&reqbody->mbo_fid2), ldlm_rep);
+       } else if (lu_name_is_valid(lname)) {
+               if (mdt_object_remote(parent)) {
+                       CERROR("%s: parent "DFID" is on remote target\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(parent)));
+                       RETURN(-EPROTO);
+               }
+
+               CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", ldlm_rep = %p\n",
+                      PFID(mdt_object_fid(parent)),
                       PNAME(lname), ldlm_rep);
+
+               if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
+                   (fid_is_root(mdt_object_fid(parent)) &&
+                    lname->ln_namelen == strlen(dot_fscrypt_name) &&
+                    strncmp(lname->ln_name, dot_fscrypt_name,
+                            lname->ln_namelen) == 0))
+                       fscrypt_md = true;
        } else {
                reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
                if (unlikely(reqbody == NULL))
                        RETURN(err_serious(-EPROTO));
 
                *child_fid = reqbody->mbo_fid2;
-
                if (unlikely(!fid_is_sane(child_fid)))
                        RETURN(err_serious(-EINVAL));
 
-               CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
-                      "ldlm_rep = %p\n",
+               if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
+                       mdt_object_get(info->mti_env, parent);
+                       child = parent;
+               } else {
+                       child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                               child_fid);
+                       if (IS_ERR(child))
+                               RETURN(PTR_ERR(child));
+               }
+
+               if (mdt_object_remote(child)) {
+                       CERROR("%s: child "DFID" is on remote target\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(child)));
+                       GOTO(out_child, rc = -EPROTO);
+               }
+
+               /* don't fetch LOOKUP lock if it's remote object */
+               rc = mdt_is_remote_object(info, parent, child);
+               if (rc < 0)
+                       GOTO(out_child, rc);
+               if (rc)
+                       child_bits &= ~MDS_INODELOCK_LOOKUP;
+
+               CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", ldlm_rep = %p\n",
                       PFID(mdt_object_fid(parent)),
                       PFID(&reqbody->mbo_fid2), ldlm_rep);
        }
 
        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
 
-       if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
+       if (unlikely(!mdt_object_exists(parent)) &&
+           !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
+           lu_name_is_valid(lname)) {
                LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                &parent->mot_obj,
                                "Parent doesn't exist!");
-               RETURN(-ESTALE);
+               GOTO(out_child, rc = -ESTALE);
        }
 
-       if (mdt_object_remote(parent)) {
-               CERROR("%s: parent "DFID" is on remote target\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(mdt_object_fid(parent)));
-               RETURN(-EIO);
-       }
-
-       if (lu_name_is_valid(lname)) {
-               /* Always allow to lookup ".." */
-               if (unlikely(lname->ln_namelen == 2 &&
-                            lname->ln_name[0] == '.' &&
-                            lname->ln_name[1] == '.'))
-                       info->mti_spec.sp_permitted = 1;
-
+       if (!child && is_resent) {
+               lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+               if (lock == NULL) {
+                       /* Lock is pinned by ldlm_handle_enqueue0() as it is
+                        * a resend case, however, it could be already destroyed
+                        * due to client eviction or a raced cancel RPC.
+                        */
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
+                                         lhc->mlh_reg_lh.cookie);
+                       RETURN(-ESTALE);
+               }
+               fid_extract_from_res_name(child_fid,
+                                         &lock->l_resource->lr_name);
+               LDLM_LOCK_PUT(lock);
+               child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                       child_fid);
+               if (IS_ERR(child))
+                       RETURN(PTR_ERR(child));
+       } else if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
+           lu_name_is_valid(lname)) {
                if (info->mti_body->mbo_valid == OBD_MD_FLID) {
-                       rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
+                       rc = mdt_raw_lookup(info, parent, lname);
 
                        RETURN(rc);
                }
@@ -1801,47 +2278,46 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                /* step 1: lock parent only if parent is a directory */
                if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
                        lhp = &info->mti_lh[MDT_LH_PARENT];
-                       mdt_lock_pdo_init(lhp, LCK_PR, lname);
-                       rc = mdt_object_lock(info, parent, lhp,
-                                            MDS_INODELOCK_UPDATE);
+                       rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PR);
                        if (unlikely(rc != 0))
                                RETURN(rc);
                }
 
-                /* step 2: lookup child's fid by name */
-                fid_zero(child_fid);
+               /* step 2: lookup child's fid by name */
+               fid_zero(child_fid);
                rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
                                child_fid, &info->mti_spec);
                if (rc == -ENOENT)
                        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
 
                if (rc != 0)
-                       GOTO(out_parent, rc);
-       }
-
-       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+                       GOTO(unlock_parent, rc);
 
-       /*
-        *step 3: find the child object by fid & lock it.
-        *        regardless if it is local or remote.
-        *
-        *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e)
-        *      set parent dir fid the same as child fid in getattr by fid case
-        *      we should not lu_object_find() the object again, could lead
-        *      to hung if there is a concurrent unlink destroyed the object.
-        */
-       if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
-               mdt_object_get(info->mti_env, parent);
-               child = parent;
-       } else {
                child = mdt_object_find(info->mti_env, info->mti_mdt,
                                        child_fid);
+               if (unlikely(IS_ERR(child)))
+                       GOTO(unlock_parent, rc = PTR_ERR(child));
        }
 
-       if (unlikely(IS_ERR(child)))
-               GOTO(out_parent, rc = PTR_ERR(child));
+       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+
+       /* step 3: lock child regardless if it is local or remote. */
+       LASSERT(child);
+
+       if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
+               /* Here, lname is an encoded hash of on-disk name, and
+                * client is doing access without encryption key.
+                * So we need to compare name hash with the one in the request.
+                */
+               if (!find_name_matching_hash(info, lname, parent,
+                                            child)) {
+                       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+                       mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+                       GOTO(out_child, rc = -ENOENT);
+               }
+       }
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
        if (!mdt_object_exists(child)) {
                LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                &child->mot_obj,
@@ -1853,9 +2329,6 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        if (rc < 0) {
                GOTO(out_child, rc);
        } else if (rc > 0) {
-               mdt_lock_handle_init(lhc);
-               mdt_lock_reg_init(lhc, LCK_PR);
-
                if (!(child_bits & MDS_INODELOCK_UPDATE) &&
                    !mdt_object_remote(child)) {
                        struct md_attr *ma = &info->mti_attr;
@@ -1870,7 +2343,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         * return not only a LOOKUP lock, but also an UPDATE
                         * lock and this might save us RPC on later STAT. For
                         * directories, it also let negative dentry cache start
-                        * working for this dir. */
+                        * working for this dir.
+                        */
                        if (ma->ma_valid & MA_INODE &&
                            ma->ma_attr.la_valid & LA_CTIME &&
                            info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
@@ -1879,11 +2353,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                }
 
                /* layout lock must be granted in a best-effort way
-                * for IT operations */
+                * for IT operations
+                */
                LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
                if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
                    !mdt_object_remote(child) && ldlm_rep != NULL) {
-                       if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
+                       if (!CFS_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
                            exp_connect_layout(info->mti_exp)) {
                                /* try to grant layout lock for regular file. */
                                try_bits = MDS_INODELOCK_LAYOUT;
@@ -1893,34 +2368,80 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                                try_bits |= MDS_INODELOCK_DOM;
                }
 
+               /*
+                * To avoid possible deadlock between batched statahead RPC
+                * and rename()/migrate() operation, it should use trylock to
+                * obtain the DLM PR ibits lock for file attributes in a
+                * batched statahead RPC. A failed trylock means that other
+                * users maybe modify the directory simultaneously as in current
+                * Lustre design the server only grants read lock to a client.
+                *
+                * When a trylock failed, the MDT reports the conflict with
+                * error code -EBUSY, and stops statahead immediately.
+                */
+               if (info->mti_batch_env) {
+                       /*
+                        * This is a sub stat-ahead request in a batched RPC.
+                        * However, the @child is a remote object, we just
+                        * return -EREMOTE here to forbid stat-ahead on it.
+                        */
+                       if (mdt_object_remote(child))
+                               GOTO(out_child, rc = -EREMOTE);
+
+                       try_bits |= child_bits;
+                       child_bits = 0;
+               }
+
                if (try_bits != 0) {
                        /* try layout lock, it may fail to be granted due to
-                        * contention at LOOKUP or UPDATE */
+                        * contention at LOOKUP or UPDATE
+                        */
                        rc = mdt_object_lock_try(info, child, lhc, &child_bits,
-                                                try_bits, false);
+                                                try_bits, LCK_PR);
                        if (child_bits & MDS_INODELOCK_LAYOUT)
                                ma_need |= MA_LOV;
                } else {
                        /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
-                        * client will enqueue the lock to the remote MDT */
+                        * client will enqueue the lock to the remote MDT
+                        */
                        if (mdt_object_remote(child))
-                               child_bits &= ~MDS_INODELOCK_UPDATE;
-                       rc = mdt_object_lock(info, child, lhc, child_bits);
+                               rc = mdt_object_lookup_lock(info, NULL, child,
+                                                           lhc, LCK_PR);
+                       else
+                               rc = mdt_object_lock(info, child, lhc,
+                                                    child_bits, LCK_PR);
                }
                if (unlikely(rc != 0))
                        GOTO(out_child, rc);
+               if (info->mti_batch_env && child_bits == 0) {
+                       if (!is_resent)
+                               mdt_object_unlock(info, child, lhc, 1);
+                       GOTO(out_child, rc = -EBUSY);
+               }
        }
 
+       if (fscrypt_md)
+               child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
+
        /* finally, we can get attr for child. */
        rc = mdt_getattr_internal(info, child, ma_need);
        if (unlikely(rc != 0)) {
-               mdt_object_unlock(info, child, lhc, 1);
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
                GOTO(out_child, rc);
        }
 
        rc = mdt_pack_secctx_in_reply(info, child);
        if (unlikely(rc)) {
-               mdt_object_unlock(info, child, lhc, 1);
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
+               GOTO(out_child, rc);
+       }
+
+       rc = mdt_pack_encctx_in_reply(info, child);
+       if (unlikely(rc)) {
+               if (!is_resent)
+                       mdt_object_unlock(info, child, lhc, 1);
                GOTO(out_child, rc);
        }
 
@@ -1934,8 +2455,23 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         PLDLMRES(lock->l_resource),
                         PFID(mdt_object_fid(child)));
 
-               if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
-                   !mdt_object_remote(child) && child != parent) {
+               if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
+                       if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+                               CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
+                                                req->rq_deadline -
+                                                req->rq_arrival_time.tv_sec +
+                                                cfs_fail_val ?: 3);
+                       /* Put the lock to the waiting list and force the cancel */
+                       ldlm_set_ast_sent(lock);
+               }
+
+               /*
+                * check whether the object is remote as we can't
+                * really check attributes w/o explicit check for
+                * object's existence first.
+                */
+               if (!mdt_object_remote(child) && child != parent &&
+                   S_ISREG(lu_object_attr(&child->mot_obj))) {
                        mdt_object_put(info->mti_env, child);
                        rc = mdt_pack_size2body(info, child_fid,
                                                &lhc->mlh_reg_lh);
@@ -1948,17 +2484,27 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                                unlock_res_and_lock(lock);
                        }
                        LDLM_LOCK_PUT(lock);
-                       GOTO(out_parent, rc = 0);
+                       GOTO(unlock_parent, rc = 0);
                }
                LDLM_LOCK_PUT(lock);
        }
 
        EXIT;
 out_child:
-       mdt_object_put(info->mti_env, child);
-out_parent:
+       if (child)
+               mdt_object_put(info->mti_env, child);
+unlock_parent:
        if (lhp)
                mdt_object_unlock(info, parent, lhp, 1);
+       if (rc == -ENOENT) {
+               /* return -ENOKEY instead of -ENOENT to encryption-unaware
+                * client if trying to access an encrypted file
+                */
+               int rc2 = mdt_check_enc(info, parent);
+
+               if (rc2)
+                       rc = rc2;
+       }
        return rc;
 }
 
@@ -1966,37 +2512,38 @@ out_parent:
 static int mdt_getattr_name(struct tgt_session_info *tsi)
 {
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
-        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
-        struct mdt_body        *reqbody;
-        struct mdt_body        *repbody;
-        int rc, rc2;
-        ENTRY;
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
+       struct mdt_body *reqbody;
+       struct mdt_body *repbody;
+       int rc, rc2;
+
+       ENTRY;
 
-        reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(reqbody != NULL);
-        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(repbody != NULL);
+       reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(reqbody != NULL);
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(repbody != NULL);
 
        info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
        repbody->mbo_eadatasize = 0;
        repbody->mbo_aclsize = 0;
 
-        rc = mdt_init_ucred_intent_getattr(info, reqbody);
-        if (unlikely(rc))
-                GOTO(out_shrink, rc);
-
-        rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
-        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
-                lhc->mlh_reg_lh.cookie = 0;
-        }
-        mdt_exit_ucred(info);
-        EXIT;
+       rc = mdt_init_ucred(info, reqbody);
+       if (unlikely(rc))
+               GOTO(out_shrink, rc);
+
+       rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
+               lhc->mlh_reg_lh.cookie = 0;
+       }
+       mdt_exit_ucred(info);
+       EXIT;
 out_shrink:
-        mdt_client_compatibility(info);
-        rc2 = mdt_fix_reply(info);
-        if (rc == 0)
-                rc = rc2;
+       mdt_client_compatibility(info);
+       rc2 = mdt_fix_reply(info);
+       if (rc == 0)
+               rc = rc2;
        mdt_thread_info_fini(info);
        return rc;
 }
@@ -2007,14 +2554,14 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
                            struct mdt_object *obj, s64 ctime)
 {
        struct lu_fid *child_fid = &info->mti_tmp_fid1;
-       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo;
        struct mdt_device *mdt = info->mti_mdt;
        struct md_attr *ma = &info->mti_attr;
        struct mdt_lock_handle *parent_lh;
        struct mdt_lock_handle *child_lh;
        struct mdt_object *pobj;
-       bool cos_incompat = false;
        int rc;
+
        ENTRY;
 
        pobj = mdt_object_find(info->mti_env, mdt, pfid);
@@ -2022,14 +2569,10 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
                GOTO(out, rc = PTR_ERR(pobj));
 
        parent_lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(parent_lh, LCK_PW, name);
-       rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
+       rc = mdt_parent_lock(info, pobj, parent_lh, name, LCK_PW);
        if (rc != 0)
                GOTO(put_parent, rc);
 
-       if (mdt_object_remote(pobj))
-               cos_incompat = true;
-
        rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
                        name, child_fid, &info->mti_spec);
        if (rc != 0)
@@ -2039,10 +2582,9 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
                GOTO(unlock_parent, rc = -EREMCHG);
 
        child_lh = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(child_lh, LCK_EX);
-       rc = mdt_reint_striped_lock(info, obj, child_lh,
-                                   MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
-                                   einfo, cos_incompat);
+       rc = mdt_object_stripes_lock(info, pobj, obj, child_lh, einfo,
+                                    MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_UPDATE, LCK_EX);
        if (rc != 0)
                GOTO(unlock_parent, rc);
 
@@ -2065,7 +2607,7 @@ static int mdt_rmfid_unlink(struct mdt_thread_info *info,
        mutex_unlock(&obj->mot_lov_mutex);
 
 unlock_child:
-       mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
+       mdt_object_stripes_unlock(info, obj, child_lh, einfo, 1);
 unlock_parent:
        mdt_object_unlock(info, pobj, parent_lh, 1);
 put_parent:
@@ -2081,6 +2623,7 @@ static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
        struct md_attr *ma = &info->mti_attr;
        struct lu_attr *la = &ma->ma_attr;
        int rc = 0;
+
        ENTRY;
 
        ma->ma_need = MA_INODE;
@@ -2089,17 +2632,22 @@ static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
                GOTO(out, rc);
 
        if (la->la_flags & LUSTRE_IMMUTABLE_FL)
-                       rc = -EACCES;
+               rc = -EACCES;
 
-       if (md_capable(uc, CFS_CAP_DAC_OVERRIDE))
+       /* we want rbac roles to have precedence over any other
+        * permission or capability checks
+        */
+       if (!uc->uc_rbac_byfid_ops)
+               RETURN(-EPERM);
+       if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE))
                RETURN(0);
        if (uc->uc_fsuid == la->la_uid) {
-               if ((la->la_mode & S_IWUSR) == 0)
+               if ((la->la_mode & 0200) == 0)
                        rc = -EACCES;
        } else if (uc->uc_fsgid == la->la_gid) {
-               if ((la->la_mode & S_IWGRP) == 0)
+               if ((la->la_mode & 0020) == 0)
                        rc = -EACCES;
-       } else if ((la->la_mode & S_IWOTH) == 0) {
+       } else if ((la->la_mode & 0002) == 0) {
                        rc = -EACCES;
        }
 
@@ -2119,6 +2667,7 @@ static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
        struct link_ea_header *leh;
        struct link_ea_entry *lee;
        int reclen, count, rc = 0;
+
        ENTRY;
 
        if (!fid_is_sane(fid))
@@ -2178,6 +2727,7 @@ static int mdt_rmfid(struct tgt_session_info *tsi)
        int bufsize, rc;
        __u32 *rcs;
        int i, nr;
+
        ENTRY;
 
        reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
@@ -2217,6 +2767,62 @@ out:
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg);
 
+int mdt_io_set_info(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct ost_body         *body = NULL, *repbody;
+       void                    *key, *val = NULL;
+       int                      keylen, vallen, rc = 0;
+       bool                     is_grant_shrink;
+
+       ENTRY;
+
+       key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
+       if (key == NULL) {
+               DEBUG_REQ(D_HA, req, "no set_info key");
+               RETURN(err_serious(-EFAULT));
+       }
+       keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
+                                     RCL_CLIENT);
+
+       val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
+       if (val == NULL) {
+               DEBUG_REQ(D_HA, req, "no set_info val");
+               RETURN(err_serious(-EFAULT));
+       }
+       vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
+                                     RCL_CLIENT);
+
+       is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK);
+       if (is_grant_shrink)
+               /* In this case the value is actually an RMF_OST_BODY, so we
+                * transmutate the type of this PTLRPC
+                */
+               req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO);
+
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (is_grant_shrink) {
+               body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+               repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+               *repbody = *body;
+
+               /** handle grant shrink, similar to a read request */
+               tgt_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp,
+                                      &repbody->oa);
+       } else {
+               CERROR("%s: Unsupported key %s\n",
+                      tgt_name(tsi->tsi_tgt), (char *)key);
+               rc = -EOPNOTSUPP;
+       }
+
+       RETURN(rc);
+}
+
+
 static int mdt_set_info(struct tgt_session_info *tsi)
 {
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
@@ -2246,6 +2852,22 @@ static int mdt_set_info(struct tgt_session_info *tsi)
 
        /* Swab any part of val you need to here */
        if (KEY_IS(KEY_READ_ONLY)) {
+               /* If client wants rw, make sure nodemap does not enforce ro. */
+               if (!*(__u32 *)val) {
+                       struct lu_nodemap *nm = NULL;
+                       bool readonly = false;
+
+                       if (req->rq_export)
+                               nm = nodemap_get_from_exp(req->rq_export);
+
+                       if (!IS_ERR_OR_NULL(nm)) {
+                               readonly = nm->nmf_readonly_mount;
+                               nodemap_putref(nm);
+                       }
+
+                       if (unlikely(readonly))
+                               RETURN(-EROFS);
+               }
                spin_lock(&req->rq_export->exp_lock);
                if (*(__u32 *)val)
                        *exp_connect_flags_ptr(req->rq_export) |=
@@ -2262,11 +2884,13 @@ static int mdt_set_info(struct tgt_session_info *tsi)
                               tgt_name(tsi->tsi_tgt), vallen);
                        RETURN(-EINVAL);
                }
-               if (ptlrpc_req_need_swab(req)) {
+               if (req_capsule_req_need_swab(&req->rq_pill)) {
                        __swab64s(&cs->cs_recno);
                        __swab32s(&cs->cs_id);
                }
 
+               if (!mdt_changelog_allow(tsi2mdt_info(tsi)))
+                       RETURN(-EACCES);
                rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
                                   vallen, val, NULL);
        } else if (KEY_IS(KEY_EVICT_BY_NID)) {
@@ -2290,18 +2914,18 @@ static int mdt_readpage(struct tgt_session_info *tsi)
 
        ENTRY;
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
                RETURN(err_serious(-ENOMEM));
 
        repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
        if (repbody == NULL || reqbody == NULL)
-                RETURN(err_serious(-EFAULT));
+               RETURN(err_serious(-EFAULT));
 
-        /*
-         * prepare @rdpg before calling lower layers and transfer itself. Here
-         * reqbody->size contains offset of where to start to read and
-         * reqbody->nlink contains number bytes to read.
-         */
+       /*
+        * prepare @rdpg before calling lower layers and transfer itself. Here
+        * reqbody->size contains offset of where to start to read and
+        * reqbody->nlink contains number bytes to read.
+        */
        rdpg->rp_hash = reqbody->mbo_size;
        if (rdpg->rp_hash != reqbody->mbo_size) {
                CERROR("Invalid hash: %#llx != %#llx\n",
@@ -2316,17 +2940,17 @@ static int mdt_readpage(struct tgt_session_info *tsi)
                                exp_max_brw_size(tsi->tsi_exp));
        rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
                          PAGE_SHIFT;
-        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
-        if (rdpg->rp_pages == NULL)
-                RETURN(-ENOMEM);
+       OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
+       if (rdpg->rp_pages == NULL)
+               RETURN(-ENOMEM);
 
-        for (i = 0; i < rdpg->rp_npages; ++i) {
+       for (i = 0; i < rdpg->rp_npages; ++i) {
                rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
-                if (rdpg->rp_pages[i] == NULL)
-                        GOTO(free_rdpg, rc = -ENOMEM);
-        }
+               if (rdpg->rp_pages[i] == NULL)
+                       GOTO(free_rdpg, rc = -ENOMEM);
+       }
 
-        /* call lower layers to fill allocated pages with directory data */
+       /* call lower layers to fill allocated pages with directory data */
        rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
        if (rc < 0)
                GOTO(free_rdpg, rc);
@@ -2340,9 +2964,9 @@ free_rdpg:
        for (i = 0; i < rdpg->rp_npages; i++)
                if (rdpg->rp_pages[i] != NULL)
                        __free_page(rdpg->rp_pages[i]);
-       OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+       OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
                RETURN(0);
 
        return rc;
@@ -2360,7 +2984,8 @@ static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
                if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
                        attr->la_uid = uc->uc_fsuid;
                /* for S_ISGID, inherit gid from his parent, such work will be
-                * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
+                * done in cmm/mdd layer, here set all cases as uc->uc_fsgid.
+                */
                if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
                        attr->la_gid = uc->uc_fsgid;
        }
@@ -2383,22 +3008,22 @@ static void mdt_preset_secctx_size(struct mdt_thread_info *info)
            req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
                                  RCL_CLIENT)) {
                if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
-                                        RCL_CLIENT) != 0) {
+                                        RCL_CLIENT) != 0)
                        /* pre-set size in server part with max size */
                        req_capsule_set_size(pill, &RMF_FILE_SECCTX,
                                             RCL_SERVER,
-                                            info->mti_mdt->mdt_max_ea_size);
-               } else {
+                                            req_capsule_ptlreq(pill) ?
+                                            OBD_MAX_DEFAULT_EA_SIZE :
+                                            MAX_MD_SIZE_OLD);
+               else
                        req_capsule_set_size(pill, &RMF_FILE_SECCTX,
                                             RCL_SERVER, 0);
-               }
        }
-
 }
 
 static int mdt_reint_internal(struct mdt_thread_info *info,
-                              struct mdt_lock_handle *lhc,
-                              __u32 op)
+                             struct mdt_lock_handle *lhc,
+                             __u32 op)
 {
        struct req_capsule      *pill = info->mti_pill;
        struct mdt_body         *repbody;
@@ -2414,7 +3039,8 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
 
 
        /* check if the file system is set to readonly. O_RDONLY open
-        * is still allowed even the file system is set to readonly mode */
+        * is still allowed even the file system is set to readonly mode
+        */
        if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
                RETURN(err_serious(-EROFS));
 
@@ -2429,12 +3055,14 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
 
        /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
         * by default. If the target object has more ACL entries, then
-        * enlarge the buffer when necessary. */
+        * enlarge the buffer when necessary.
+        */
        if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
                req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
                                     LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
        mdt_preset_secctx_size(info);
+       mdt_preset_encctx_size(info);
 
        rc = req_capsule_server_pack(pill);
        if (rc != 0) {
@@ -2449,7 +3077,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                repbody->mbo_aclsize = 0;
        }
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
 
        /* for replay no cookkie / lmm need, because client have this already */
        if (info->mti_spec.no_create)
@@ -2485,14 +3113,11 @@ out_shrink:
 
        /*
         * Data-on-MDT optimization - read data along with OPEN and return it
-        * in reply. Do that only if we have both DOM and LAYOUT locks.
+        * in reply when possible.
         */
-       if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) &&
-           info->mti_attr.ma_lmm != NULL &&
-           mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) {
+       if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
                rc = mdt_dom_read_on_open(info, info->mti_mdt,
                                          &lhc->mlh_reg_lh);
-       }
 
        return rc;
 }
@@ -2512,8 +3137,8 @@ static long mdt_reint_opcode(struct ptlrpc_request *req,
                        req_capsule_extend(&req->rq_pill, fmt[opc]);
                else {
                        mdt = mdt_exp2dev(req->rq_export);
-                       CERROR("%s: Unsupported opcode '%ld' from client '%s':"
-                              " rc = %d\n", req->rq_export->exp_obd->obd_name,
+                       CERROR("%s: Unsupported opcode '%ld' from client '%s': rc = %d\n",
+                              req->rq_export->exp_obd->obd_name,
                               opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
                        opc = err_serious(-EFAULT);
                }
@@ -2562,12 +3187,13 @@ static int mdt_reint(struct tgt_session_info *tsi)
 /* this should sync the whole device */
 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
 {
-        struct dt_device *dt = mdt->mdt_bottom;
-        int rc;
-        ENTRY;
+       struct dt_device *dt = mdt->mdt_bottom;
+       int rc;
 
-        rc = dt->dd_ops->dt_sync(env, dt);
-        RETURN(rc);
+       ENTRY;
+
+       rc = dt->dd_ops->dt_sync(env, dt);
+       RETURN(rc);
 }
 
 /* this should sync this object */
@@ -2604,11 +3230,12 @@ static int mdt_sync(struct tgt_session_info *tsi)
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
        struct req_capsule      *pill = tsi->tsi_pill;
        struct mdt_body         *body;
+       ktime_t                  kstart = ktime_get();
        int                      rc;
 
        ENTRY;
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
                RETURN(err_serious(-ENOMEM));
 
        if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
@@ -2640,7 +3267,8 @@ static int mdt_sync(struct tgt_session_info *tsi)
                mdt_thread_info_fini(info);
        }
        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_SYNC);
+               mdt_counter_incr(req, LPROC_MDT_SYNC,
+                                ktime_us_delta(ktime_get(), kstart));
 
        RETURN(rc);
 }
@@ -2659,8 +3287,7 @@ static int mdt_data_sync(struct tgt_session_info *tsi)
 
        repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
 
-       /* if no fid is specified then do nothing,
-        * device sync is done via MDS_SYNC */
+        /* device sync is done via MDS_SYNC. NOOP if no fid is specified */
        if (fid_is_zero(&tsi->tsi_fid))
                RETURN(0);
 
@@ -2699,19 +3326,28 @@ put:
  */
 static int mdt_quotactl(struct tgt_session_info *tsi)
 {
-       struct obd_export       *exp  = tsi->tsi_exp;
-       struct req_capsule      *pill = tsi->tsi_pill;
-       struct obd_quotactl     *oqctl, *repoqc;
-       int                      id, rc;
-       struct mdt_device       *mdt = mdt_exp2dev(exp);
-       struct lu_device        *qmt = mdt->mdt_qmt_dev;
-       struct lu_nodemap       *nodemap;
+       struct obd_export *exp  = tsi->tsi_exp;
+       struct req_capsule *pill = tsi->tsi_pill;
+       struct obd_quotactl *oqctl, *repoqc;
+       struct mdt_device *mdt = mdt_exp2dev(exp);
+       struct lu_device *qmt = mdt->mdt_qmt_dev;
+       struct lu_nodemap *nodemap;
+       char *buffer = NULL;
+       int id, rc;
+
        ENTRY;
 
        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
-       if (oqctl == NULL)
+       if (!oqctl)
                RETURN(err_serious(-EPROTO));
 
+       if (oqctl->qc_cmd == LUSTRE_Q_ITERQUOTA ||
+           oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA)
+               req_capsule_set_size(pill, &RMF_OBD_QUOTA_ITER, RCL_SERVER,
+                                    LQUOTA_ITER_BUFLEN);
+       else
+               req_capsule_set_size(pill, &RMF_OBD_QUOTA_ITER, RCL_SERVER, 0);
+
        rc = req_capsule_server_pack(pill);
        if (rc)
                RETURN(err_serious(rc));
@@ -2725,22 +3361,35 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_SETINFO:
        case Q_SETQUOTA:
        case LUSTRE_Q_SETDEFAULT:
-               if (!nodemap_can_setquota(nodemap))
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
+       case LUSTRE_Q_SETDEFAULT_POOL:
+       case LUSTRE_Q_DELETEQID:
+       case LUSTRE_Q_RESETQID:
+               if (!nodemap_can_setquota(nodemap, oqctl->qc_type,
+                                         oqctl->qc_id))
                        GOTO(out_nodemap, rc = -EPERM);
-               /* fallthrough */
+               fallthrough;
        case Q_GETINFO:
        case Q_GETQUOTA:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_GETQUOTAPOOL:
+       case LUSTRE_Q_GETINFOPOOL:
+       case LUSTRE_Q_GETDEFAULT_POOL:
+       case LUSTRE_Q_ITERQUOTA:
                if (qmt == NULL)
                        GOTO(out_nodemap, rc = -EOPNOTSUPP);
                /* slave quotactl */
-               /* fallthrough */
+               fallthrough;
        case Q_GETOINFO:
        case Q_GETOQUOTA:
+       case LUSTRE_Q_ITEROQUOTA:
                break;
        default:
-               CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
-               GOTO(out_nodemap, rc = -EFAULT);
+               rc = -EFAULT;
+               CERROR("%s: unsupported quotactl command %d: rc = %d\n",
+                      mdt_obd_name(mdt), oqctl->qc_cmd, rc);
+               GOTO(out_nodemap, rc);
        }
 
        id = oqctl->qc_id;
@@ -2754,8 +3403,8 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
                                    NODEMAP_CLIENT_TO_FS, id);
                break;
        case PRJQUOTA:
-               /* todo: check/map project id */
-               id = oqctl->qc_id;
+               id = nodemap_map_id(nodemap, NODEMAP_PROJID,
+                                   NODEMAP_CLIENT_TO_FS, id);
                break;
        default:
                GOTO(out_nodemap, rc = -EOPNOTSUPP);
@@ -2764,6 +3413,13 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        if (repoqc == NULL)
                GOTO(out_nodemap, rc = err_serious(-EFAULT));
 
+       if (oqctl->qc_cmd == LUSTRE_Q_ITERQUOTA ||
+           oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) {
+               buffer = req_capsule_server_get(pill, &RMF_OBD_QUOTA_ITER);
+               if (buffer == NULL)
+                       GOTO(out_nodemap, rc = err_serious(-EFAULT));
+       }
+
        if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
                barrier_exit(tsi->tsi_tgt->lut_bottom);
 
@@ -2783,15 +3439,28 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_GETQUOTA:
        case LUSTRE_Q_SETDEFAULT:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_GETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
+       case LUSTRE_Q_GETINFOPOOL:
+       case LUSTRE_Q_SETDEFAULT_POOL:
+       case LUSTRE_Q_GETDEFAULT_POOL:
+       case LUSTRE_Q_DELETEQID:
+       case LUSTRE_Q_RESETQID:
+       case LUSTRE_Q_ITERQUOTA:
                /* forward quotactl request to QMT */
-               rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
+               rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl, buffer,
+                                           buffer == NULL ? 0 :
+                                                       LQUOTA_ITER_BUFLEN);
                break;
 
        case Q_GETOINFO:
        case Q_GETOQUOTA:
+       case LUSTRE_Q_ITEROQUOTA:
                /* slave quotactl */
                rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom,
-                                  oqctl);
+                                  oqctl, buffer,
+                                  buffer == NULL ? 0 : LQUOTA_ITER_BUFLEN);
                break;
 
        default:
@@ -2802,8 +3471,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
-       *repoqc = *oqctl;
-
+       QCTL_COPY_NO_PNAME(repoqc, oqctl);
        EXIT;
 
 out_nodemap:
@@ -2821,39 +3489,38 @@ out_nodemap:
  * context into our context list here.
  */
 static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
-                               int idx)
+                              int idx)
 {
-        struct md_device  *next = mdt->mdt_child;
-        struct llog_ctxt *ctxt;
-        int rc;
+       struct md_device  *next = mdt->mdt_child;
+       struct llog_ctxt *ctxt;
+       int rc;
 
-        if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
-                return 0;
+       if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
+               return 0;
 
-        rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
-        if (rc || ctxt == NULL) {
+       rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
+       if (rc || ctxt == NULL)
                return 0;
-        }
 
-        rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
-        if (rc)
-                CERROR("Can't set mdt ctxt %d\n", rc);
+       rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
+       if (rc)
+               CERROR("Can't set mdt ctxt %d\n", rc);
 
-        return rc;
+       return rc;
 }
 
 static int mdt_llog_ctxt_unclone(const struct lu_env *env,
-                                 struct mdt_device *mdt, int idx)
+                                struct mdt_device *mdt, int idx)
 {
-        struct llog_ctxt *ctxt;
+       struct llog_ctxt *ctxt;
 
-        ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
-        if (ctxt == NULL)
-                return 0;
-        /* Put once for the get we just did, and once for the clone */
-        llog_ctxt_put(ctxt);
-        llog_ctxt_put(ctxt);
-        return 0;
+       ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
+       if (ctxt == NULL)
+               return 0;
+       /* Put once for the get we just did, and once for the clone */
+       llog_ctxt_put(ctxt);
+       llog_ctxt_put(ctxt);
+       return 0;
 }
 
 /*
@@ -2874,6 +3541,7 @@ static int mdt_quota_dqacq(struct tgt_session_info *tsi)
        struct mdt_device       *mdt = mdt_exp2dev(tsi->tsi_exp);
        struct lu_device        *qmt = mdt->mdt_qmt_dev;
        int                      rc;
+
        ENTRY;
 
        if (qmt == NULL)
@@ -2890,6 +3558,7 @@ struct mdt_object *mdt_object_new(const struct lu_env *env,
        struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
        struct lu_object *o;
        struct mdt_object *m;
+
        ENTRY;
 
        CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
@@ -2907,6 +3576,7 @@ struct mdt_object *mdt_object_find(const struct lu_env *env,
 {
        struct lu_object *o;
        struct mdt_object *m;
+
        ENTRY;
 
        CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
@@ -2928,10 +3598,11 @@ struct mdt_object *mdt_object_find(const struct lu_env *env,
  * \param mdt the mdt device
  */
 static void mdt_device_commit_async(const struct lu_env *env,
-                                    struct mdt_device *mdt)
+                                   struct mdt_device *mdt)
 {
        struct dt_device *dt = mdt->mdt_bottom;
        int rc;
+
        ENTRY;
 
        rc = dt->dd_ops->dt_commit_async(env, dt);
@@ -2954,7 +3625,7 @@ static void mdt_device_commit_async(const struct lu_env *env,
  */
 static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
 {
-        lock->l_ast_data = (void*)1;
+       lock->l_ast_data = (void *)1;
 }
 
 /**
@@ -2969,7 +3640,7 @@ static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
  */
 static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
 {
-        return lock->l_ast_data != NULL;
+       return lock->l_ast_data != NULL;
 }
 
 /**
@@ -2993,8 +3664,8 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
        struct ldlm_cb_set_arg *arg = data;
        bool commit_async = false;
        int rc;
-       ENTRY;
 
+       ENTRY;
        if (flag == LDLM_CB_CANCELING)
                RETURN(0);
 
@@ -3010,33 +3681,17 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         * The 'data' parameter is l_ast_data in the first case and
         * callback arguments in the second one. Distinguish them by that.
         */
-       if (!data || data == lock->l_ast_data || !arg->bl_desc)
-               goto skip_cos_checks;
-
-       if (lock->l_req_mode & (LCK_PW | LCK_EX)) {
-               if (mdt_cos_is_enabled(mdt)) {
-                       if (!arg->bl_desc->bl_same_client)
-                               mdt_set_lock_sync(lock);
-               } else if (mdt_slc_is_enabled(mdt) &&
-                          arg->bl_desc->bl_cos_incompat) {
-                       mdt_set_lock_sync(lock);
-                       /*
-                        * we may do extra commit here, but there is a small
-                        * window to miss a commit: lock was unlocked (saved),
-                        * then a conflict lock queued and we come here, but
-                        * REP-ACK not received, so lock was not converted to
-                        * COS mode yet.
-                        * Fortunately this window is quite small, so the
-                        * extra commit should be rare (not to say distributed
-                        * operation is rare too).
-                        */
+       if (data && data != lock->l_ast_data && arg->bl_desc) {
+               if (lock->l_req_mode & (LCK_COS | LCK_TXN))
                        commit_async = true;
-               }
-       } else if (lock->l_req_mode == LCK_COS) {
-               commit_async = true;
+               else if ((lock->l_req_mode & (LCK_PW | LCK_EX)) &&
+                        ((mdt_cos_is_enabled(mdt) &&
+                            !arg->bl_desc->bl_same_client) ||
+                         (mdt_slc_is_enabled(mdt) &&
+                            arg->bl_desc->bl_txn_dependent)))
+                       mdt_set_lock_sync(lock);
        }
 
-skip_cos_checks:
        rc = ldlm_blocking_ast_nocheck(lock);
 
        if (commit_async) {
@@ -3044,8 +3699,7 @@ skip_cos_checks:
 
                rc = lu_env_init(&env, LCT_LOCAL);
                if (unlikely(rc != 0))
-                       CWARN("%s: lu_env initialization failed, cannot "
-                             "start asynchronous commit: rc = %d\n",
+                       CWARN("%s: lu_env initialization failed, cannot start asynchronous commit: rc = %d\n",
                              obd->obd_name, rc);
                else
                        mdt_device_commit_async(&env, mdt);
@@ -3070,6 +3724,7 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                            void *data, int flag)
 {
        int rc = 0;
+
        ENTRY;
 
        switch (flag) {
@@ -3090,10 +3745,11 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                struct mdt_device *mdt =
                                mdt_dev(obd->obd_lu_dev->ld_site->ls_top_dev);
 
-               LDLM_DEBUG(lock, "Revoke remote lock\n");
+               LDLM_DEBUG(lock, "Revoke remote lock");
 
                /* discard slc lock here so that it can be cleaned anytime,
-                * especially for cleanup_resource() */
+                * especially for cleanup_resource()
+                */
                tgt_discard_slc_lock(&mdt->mdt_lut, lock);
 
                /* once we cache lock, l_ast_data is set to mdt_object */
@@ -3103,10 +3759,9 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                        rc = lu_env_init(&env, LCT_MD_THREAD);
                        if (unlikely(rc != 0)) {
-                               CWARN("%s: lu_env initialization failed, object"
-                                     "%p "DFID" is leaked!\n",
+                               CWARN("%s: lu_env initialization failed, object %p "DFID" is leaked!: rc = %d\n",
                                      obd->obd_name, mo,
-                                     PFID(mdt_object_fid(mo)));
+                                     PFID(mdt_object_fid(mo)), rc);
                                RETURN(rc);
                        }
 
@@ -3141,7 +3796,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
                if (lock == NULL) {
                        /* Lock is pinned by ldlm_handle_enqueue0() as it is
                         * a resend case, however, it could be already destroyed
-                        * due to client eviction or a raced cancel RPC. */
+                        * due to client eviction or a raced cancel RPC.
+                        */
                        LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
                                          lhc->mlh_reg_lh.cookie);
                        RETURN(-ESTALE);
@@ -3149,8 +3805,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
 
                if (!fid_res_name_eq(mdt_object_fid(mo),
                                     &lock->l_resource->lr_name)) {
-                       CWARN("%s: Although resent, but still not "
-                             "get child lock:"DFID"\n",
+                       CWARN("%s: Although resent, but still not get child lock:"
+                             DFID"\n",
                              info->mti_exp->exp_obd->obd_name,
                              PFID(mdt_object_fid(mo)));
                        LDLM_LOCK_PUT(lock);
@@ -3167,20 +3823,18 @@ static void mdt_remote_object_lock_created_cb(struct ldlm_lock *lock)
        mdt_object_get(NULL, lock->l_ast_data);
 }
 
-int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
-                              struct mdt_object *o, const struct lu_fid *fid,
-                              struct lustre_handle *lh, enum ldlm_mode mode,
-                              __u64 *ibits, __u64 trybits, bool cache)
+static int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
+                                     struct mdt_object *obj,
+                                     struct lustre_handle *lh,
+                                     enum ldlm_mode mode,
+                                     union ldlm_policy_data *policy,
+                                     struct ldlm_res_id *res_id,
+                                     bool cache)
 {
        struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo;
-       union ldlm_policy_data *policy = &mti->mti_policy;
-       struct ldlm_res_id *res_id = &mti->mti_res_id;
-       int rc = 0;
-       ENTRY;
+       int rc;
 
-       LASSERT(mdt_object_remote(o));
-
-       fid_build_reg_res_name(fid, res_id);
+       LASSERT(mdt_object_remote(obj));
 
        memset(einfo, 0, sizeof(*einfo));
        einfo->ei_type = LDLM_IBITS;
@@ -3189,251 +3843,373 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
        einfo->ei_cb_cp = ldlm_completion_ast;
        einfo->ei_enq_slave = 0;
        einfo->ei_res_id = res_id;
-
+       einfo->ei_req_slot = 1;
        if (cache) {
                /*
                 * if we cache lock, couple lock with mdt_object, so that object
                 * can be easily found in lock ASTs.
                 */
-               einfo->ei_cbdata = o;
+               einfo->ei_cbdata = obj;
                einfo->ei_cb_created = mdt_remote_object_lock_created_cb;
        }
 
-       memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = *ibits;
-       policy->l_inodebits.try_bits = trybits;
-
-       rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
+       rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), lh, einfo,
                            policy);
-
-       /* Return successfully acquired bits to a caller */
-       if (rc == 0) {
-               struct ldlm_lock *lock = ldlm_handle2lock(lh);
-
-               LASSERT(lock);
-               *ibits = lock->l_policy_data.l_inodebits.bits;
-               LDLM_LOCK_PUT(lock);
+       if (rc) {
+               lh->cookie = 0ull;
+               return rc;
        }
-       RETURN(rc);
-}
 
-int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
-                          const struct lu_fid *fid, struct lustre_handle *lh,
-                          enum ldlm_mode mode, __u64 ibits, bool cache)
-{
-       return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0,
-                                         cache);
+       /* other components like LFSCK can use lockless access
+        * and populate cache, so we better invalidate it
+        */
+       if (policy->l_inodebits.bits &
+           (MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR))
+               mo_invalidate(mti->mti_env, mdt_object_child(obj));
+
+       return 0;
 }
 
-static int mdt_object_local_lock(struct mdt_thread_info *info,
-                                struct mdt_object *o,
-                                struct mdt_lock_handle *lh, __u64 *ibits,
-                                __u64 trybits, bool cos_incompat)
+/*
+ * Helper function to take PDO and hash lock.
+ *
+ * if \a pdo_lock is false, don't take PDO lock, this is case in rename.
+ */
+int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                       struct mdt_lock_handle *lh, const struct lu_name *name,
+                       enum ldlm_mode mode, bool pdo_lock)
 {
        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
        union ldlm_policy_data *policy = &info->mti_policy;
        struct ldlm_res_id *res_id = &info->mti_res_id;
-       __u64 dlmflags = 0, *cookie = NULL;
+       /*
+        * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it is never going to
+        * be sent to client and we do not want it slowed down due to possible
+        * cancels.
+        */
+       __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+       __u64 *cookie = NULL;
        int rc;
-       ENTRY;
 
-        LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
-        LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
-        LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
-        LASSERT(lh->mlh_type != MDT_NUL_LOCK);
-
-       if (cos_incompat) {
-               LASSERT(lh->mlh_reg_mode == LCK_PW ||
-                       lh->mlh_reg_mode == LCK_EX);
-               dlmflags |= LDLM_FL_COS_INCOMPAT;
-       } else if (mdt_cos_is_enabled(info->mti_mdt)) {
-               dlmflags |= LDLM_FL_COS_ENABLED;
-       }
-
-       /* Only enqueue LOOKUP lock for remote object */
-       LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP));
-
-       /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
-       if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX &&
-           *ibits == MDS_INODELOCK_OPEN)
-               dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
-
-       if (lh->mlh_type == MDT_PDO_LOCK) {
-                /* check for exists after object is locked */
-                if (mdt_object_exists(o) == 0) {
-                        /* Non-existent object shouldn't have PDO lock */
-                        RETURN(-ESTALE);
-                } else {
-                        /* Non-dir object shouldn't have PDO lock */
-                       if (!S_ISDIR(lu_object_attr(&o->mot_obj)))
-                               RETURN(-ENOTDIR);
-               }
-       }
-
-       fid_build_reg_res_name(mdt_object_fid(o), res_id);
-       dlmflags |= LDLM_FL_ATOMIC_CB;
+       LASSERT(obj);
 
+       /* check for exists after object is locked */
+       if (!mdt_object_exists(obj))
+               /* Non-existent object shouldn't have PDO lock */
+               return -ESTALE;
+
+       /* Non-dir object shouldn't have PDO lock */
+       if (!S_ISDIR(lu_object_attr(&obj->mot_obj)))
+               return -ENOTDIR;
+
+       policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+       policy->l_inodebits.try_bits = 0;
+       policy->l_inodebits.li_gid = 0;
+       policy->l_inodebits.li_initiator_id = mdt_node_id(info->mti_mdt);
+       fid_build_reg_res_name(mdt_object_fid(obj), res_id);
        if (info->mti_exp)
                cookie = &info->mti_exp->exp_handle.h_cookie;
 
-       /*
-        * Take PDO lock on whole directory and build correct @res_id for lock
-        * on part of directory.
-        */
-       if (lh->mlh_pdo_hash != 0) {
-               LASSERT(lh->mlh_type == MDT_PDO_LOCK);
-               mdt_lock_pdo_mode(info, o, lh);
-               if (lh->mlh_pdo_mode != LCK_NL) {
-                       /*
-                        * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
-                        * is never going to be sent to client and we do not
-                        * want it slowed down due to possible cancels.
-                        */
-                       policy->l_inodebits.bits =
-                               *ibits & MDS_INODELOCK_UPDATE;
-                       policy->l_inodebits.try_bits =
-                               trybits & MDS_INODELOCK_UPDATE;
-                       /* at least one of them should be set */
-                       LASSERT(policy->l_inodebits.bits |
-                               policy->l_inodebits.try_bits);
-                       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
-                                         lh->mlh_pdo_mode, policy, res_id,
-                                         dlmflags, cookie);
-                       if (unlikely(rc != 0))
-                               GOTO(out_unlock, rc);
-                }
+       mdt_lock_pdo_init(lh, mode, name);
+       mdt_lock_pdo_mode(info, obj, lh);
+       if (lh->mlh_pdo_mode != LCK_NL) {
+               if (pdo_lock) {
+                       if (mdt_object_remote(obj)) {
+                               rc = mdt_remote_object_lock_try(info, obj,
+                                       &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
+                                       policy, res_id, false);
+                               lh->mlh_pdo_remote = 1;
+                       } else {
+                               rc = mdt_fid_lock(info->mti_env, ns,
+                                       &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
+                                       policy, res_id, dlmflags, cookie);
+                       }
+                       if (rc) {
+                               mdt_object_unlock(info, obj, lh, 1);
+                               return rc;
+                       }
+               }
+               res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
+       }
+
+       if (mdt_object_remote(obj))
+               rc = mdt_remote_object_lock_try(info, obj, &lh->mlh_rreg_lh,
+                       lh->mlh_rreg_mode, policy, res_id, false);
+       else
+               rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh,
+                       lh->mlh_reg_mode, policy, res_id, dlmflags, cookie);
+       if (rc)
+               mdt_object_unlock(info, obj, lh, 1);
+       else if (CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK) &&
+                  lh->mlh_pdo_hash != 0 &&
+                  (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
+               CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+
+       return rc;
+}
 
-                /*
-                 * Finish res_id initializing by name hash marking part of
-                 * directory which is taking modification.
-                 */
-                res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
-        }
+int mdt_object_lock_internal(struct mdt_thread_info *info,
+                            struct mdt_object *obj, const struct lu_fid *fid,
+                            struct mdt_lock_handle *lh, __u64 *ibits,
+                            __u64 trybits, bool cache)
+{
+       union ldlm_policy_data *policy = &info->mti_policy;
+       struct ldlm_res_id *res_id = &info->mti_res_id;
+       struct lustre_handle *handle;
+       int rc;
 
        policy->l_inodebits.bits = *ibits;
        policy->l_inodebits.try_bits = trybits;
+       policy->l_inodebits.li_gid = lh->mlh_gid;
+       policy->l_inodebits.li_initiator_id = mdt_node_id(info->mti_mdt);
+       fid_build_reg_res_name(fid, res_id);
 
-        /*
-         * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
-         * going to be sent to client. If it is - mdt_intent_policy() path will
-         * fix it up and turn FL_LOCAL flag off.
-         */
-       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
-                         policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
-                         cookie);
-out_unlock:
-       if (rc != 0)
-               mdt_object_unlock(info, o, lh, 1);
-       else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) &&
-                  lh->mlh_pdo_hash != 0 &&
-                  (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
-               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+       if (obj && mdt_object_remote(obj)) {
+               handle = &lh->mlh_rreg_lh;
+               LASSERT(!lustre_handle_is_used(handle));
+               LASSERT(lh->mlh_rreg_mode != LCK_MINMODE);
+               LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+               rc = mdt_remote_object_lock_try(info, obj, handle,
+                                               lh->mlh_rreg_mode, policy,
+                                               res_id, cache);
+       } else {
+               struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+               /*
+                * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if
+                * it is going to be sent to client. If it is -
+                * mdt_intent_policy() path will fix it up and turn FL_LOCAL
+                * flag off.
+                */
+               __u64 dlmflags = LDLM_FL_ATOMIC_CB | LDLM_FL_LOCAL_ONLY;
+               __u64 *cookie = NULL;
+
+               handle = &lh->mlh_reg_lh;
+               LASSERT(!lustre_handle_is_used(handle));
+               LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
+               LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+
+               /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
+               if (lh->mlh_type == MDT_REG_LOCK &&
+                   lh->mlh_reg_mode == LCK_EX && *ibits == MDS_INODELOCK_OPEN)
+                       dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
+
+
+               if (info->mti_exp)
+                       cookie = &info->mti_exp->exp_handle.h_cookie;
+
+               rc = mdt_fid_lock(info->mti_env, ns, handle, lh->mlh_reg_mode,
+                                 policy, res_id, dlmflags, cookie);
+               if (rc)
+                       mdt_object_unlock(info, obj, lh, 1);
+       }
 
-       /* Return successfully acquired bits to a caller */
        if (rc == 0) {
-               struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh);
+               struct ldlm_lock *lock;
 
+               /* Return successfully acquired bits to a caller */
+               lock = ldlm_handle2lock(handle);
                LASSERT(lock);
                *ibits = lock->l_policy_data.l_inodebits.bits;
                LDLM_LOCK_PUT(lock);
        }
-       RETURN(rc);
+
+       return rc;
 }
 
-static int
-mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
-                        struct mdt_lock_handle *lh, __u64 *ibits,
-                        __u64 trybits, bool cos_incompat)
+/*
+ * MDT object locking functions:
+ * mdt_object_lock(): lock object, this is used in most places, and normally
+ *     lock ibits doesn't contain LOOKUP, unless the caller knows it's not
+ *     remote object.
+ * mdt_object_check_lock(): lock object with LOOKUP and other ibits, it needs
+ *     to check whether parent is on remote MDT, if so, take LOOKUP on parent
+ *     MDT separately, and then lock other ibits on child object.
+ * mdt_parent_lock(): take parent UPDATE lock with specific mode, if parent is
+ *     local, take PDO lock by name hash, otherwise take regular lock.
+ * mdt_object_stripes_lock(): lock object which should be local, and if it's a
+ *     striped directory, lock its stripes, this is called in operations which
+ *     modify both object and stripes.
+ * mdt_object_lock_try(): lock object with trybits, the trybits contains
+ *     optional inode lock bits that can be granted. This is called by
+ *     getattr/open to fetch more inode lock bits to client, and is also called
+ *     by dir migration to lock link parent in non-block mode to avoid
+ *     deadlock.
+ */
+
+/**
+ * lock object
+ *
+ * this is used to lock object in most places, and normally lock ibits doesn't
+ * contain LOOKUP, unless the caller knows it's not remote object.
+ *
+ * \param info         struct mdt_thread_info
+ * \param obj          object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param mode         lock mode
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                   struct mdt_lock_handle *lh, __u64 ibits,
+                   enum ldlm_mode mode)
 {
-       struct mdt_lock_handle *local_lh = NULL;
        int rc;
-       ENTRY;
-
-       if (!mdt_object_remote(o)) {
-               rc = mdt_object_local_lock(info, o, lh, ibits, trybits,
-                                          cos_incompat);
-               RETURN(rc);
-       }
 
-       /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
-       *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
-                   MDS_INODELOCK_XATTR);
+       ENTRY;
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh,
+                                     &ibits, 0, false);
+       RETURN(rc);
+}
 
-       /* Only enqueue LOOKUP lock for remote object */
-       if (*ibits & MDS_INODELOCK_LOOKUP) {
-               __u64 local = MDS_INODELOCK_LOOKUP;
+/**
+ * lock object with LOOKUP and other ibits
+ *
+ * it will check whether parent and child are on different MDTs, if so, take
+ * LOOKUP lock on parent MDT, and lock other ibits on child MDT, otherwise lock
+ * all ibits on child MDT. Note, parent and child shouldn't be both on remote
+ * MDTs, in which case specific lock function should be used, and it's in
+ * rename and migrate only.
+ *
+ * \param info         struct mdt_thread_info
+ * \param parent       parent object
+ * \param child                child object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param mode         lock mode
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_check_lock(struct mdt_thread_info *info,
+                         struct mdt_object *parent, struct mdt_object *child,
+                         struct mdt_lock_handle *lh, __u64 ibits,
+                         enum ldlm_mode mode)
+{
+       int rc;
 
-               rc = mdt_object_local_lock(info, o, lh, &local, 0,
-                                          cos_incompat);
-               if (rc != ELDLM_OK)
+       ENTRY;
+       /* if LOOKUP ibit is not set, use mdt_object_lock() */
+       LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+       /* if only LOOKUP ibit is needed, use mdt_object_lookup_lock() */
+       LASSERT(ibits != MDS_INODELOCK_LOOKUP);
+       LASSERT(parent);
+       /* @parent and @child shouldn't both be on remote MDTs */
+       LASSERT(!(mdt_object_remote(parent) && mdt_object_remote(child)));
+
+       mdt_lock_reg_init(lh, mode);
+       if (mdt_object_remote(parent) ^ mdt_object_remote(child)) {
+               __u64 lookup_ibits = MDS_INODELOCK_LOOKUP;
+
+               rc = mdt_object_lock_internal(info, parent,
+                                             mdt_object_fid(child), lh,
+                                             &lookup_ibits, 0, false);
+               if (rc)
                        RETURN(rc);
 
-               local_lh = lh;
-       }
-
-       if ((*ibits | trybits) & MDS_INODELOCK_UPDATE) {
-               /* Sigh, PDO needs to enqueue 2 locks right now, but
-                * enqueue RPC can only request 1 lock, to avoid extra
-                * RPC, so it will instead enqueue EX lock for remote
-                * object anyway XXX*/
-               if (lh->mlh_type == MDT_PDO_LOCK &&
-                   lh->mlh_pdo_hash != 0) {
-                       CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to"
-                              "EX lock.\n", mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)));
-                       lh->mlh_pdo_hash = 0;
-                       lh->mlh_rreg_mode = LCK_EX;
-                       lh->mlh_type = MDT_REG_LOCK;
-               }
-
-               rc = mdt_remote_object_lock_try(info, o, mdt_object_fid(o),
-                                               &lh->mlh_rreg_lh,
-                                               lh->mlh_rreg_mode,
-                                               ibits, trybits, false);
-               if (rc != ELDLM_OK) {
-                       if (local_lh != NULL)
-                               mdt_object_unlock(info, o, local_lh, rc);
-                       RETURN(rc);
-               }
+               ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       /* other components like LFSCK can use lockless access
-        * and populate cache, so we better invalidate it */
-       mo_invalidate(info->mti_env, mdt_object_child(o));
+       rc = mdt_object_lock_internal(info, child, mdt_object_fid(child), lh,
+                                     &ibits, 0, false);
+       if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
+               mdt_object_unlock(info, NULL, lh, 1);
 
-       RETURN(0);
+       RETURN(rc);
 }
 
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                   struct mdt_lock_handle *lh, __u64 ibits)
+/**
+ * take parent UPDATE lock
+ *
+ * if parent is local or mode is LCK_PW, take PDO lock, otherwise take regular
+ * lock.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj  parent object
+ * \param lh   lock handle
+ * \param lname        child name
+ * \param mode lock mode
+ *
+ * \retval     0 on success, -ev on error.
+ */
+int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+                   struct mdt_lock_handle *lh, const struct lu_name *lname,
+                   enum ldlm_mode mode)
 {
-       return mdt_object_lock_internal(info, o, lh, &ibits, 0, false);
-}
+       int rc;
 
-int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                         struct mdt_lock_handle *lh, __u64 ibits,
-                         bool cos_incompat)
-{
-       LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
-       return mdt_object_lock_internal(info, o, lh, &ibits, 0,
-                                       cos_incompat);
+       ENTRY;
+       LASSERT(obj && lname);
+       LASSERT(mode == LCK_PW || mode == LCK_PR);
+       if (mdt_object_remote(obj) && mode == LCK_PR) {
+               __u64 ibits = MDS_INODELOCK_UPDATE;
+
+               mdt_lock_reg_init(lh, mode);
+               rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj),
+                                             lh, &ibits, 0, false);
+       } else {
+               rc = mdt_object_pdo_lock(info, obj, lh, lname, mode, true);
+       }
+       RETURN(rc);
 }
 
-int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+/**
+ * lock object with trybits
+ *
+ * the trybits contains optional inode lock bits that can be granted. This is
+ * called by getattr/open to fetch more inode lock bits to client, and is also
+ * called by dir migration to lock link parent in non-block mode to avoid
+ * deadlock.
+ *
+ * \param info         struct mdt_thread_info
+ * \param obj          object
+ * \param lh           lock handle
+ * \param ibits                MDS inode lock bits
+ * \param trybits      optional inode lock bits
+ * \param mode         lock mode
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *obj,
                        struct mdt_lock_handle *lh, __u64 *ibits,
-                       __u64 trybits, bool cos_incompat)
+                       __u64 trybits, enum ldlm_mode mode)
 {
        bool trylock_only = *ibits == 0;
        int rc;
 
+       ENTRY;
        LASSERT(!(*ibits & trybits));
-       rc = mdt_object_lock_internal(info, o, lh, ibits, trybits,
-                                     cos_incompat);
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, ibits,
+                                     trybits, false);
        if (rc && trylock_only) { /* clear error for try ibits lock only */
                LASSERT(*ibits == 0);
                rc = 0;
        }
-       return rc;
+       RETURN(rc);
+}
+
+/*
+ * Helper function to take \a obj LOOKUP lock.
+ *
+ * Both \a pobj and \a obj may be located on remote MDTs.
+ */
+int mdt_object_lookup_lock(struct mdt_thread_info *info,
+                          struct mdt_object *pobj, struct mdt_object *obj,
+                          struct mdt_lock_handle *lh, enum ldlm_mode mode)
+{
+       __u64 ibits = MDS_INODELOCK_LOOKUP;
+       int rc;
+
+       ENTRY;
+       /* if @parent is NULL, it's on local MDT, and @child is remote,
+        * this is case in getattr/unlink/open by name.
+        */
+       LASSERT(ergo(!pobj, mdt_object_remote(obj)));
+       mdt_lock_reg_init(lh, mode);
+       rc = mdt_object_lock_internal(info, pobj, mdt_object_fid(obj), lh,
+                                     &ibits, 0, false);
+       RETURN(rc);
 }
 
 /**
@@ -3449,55 +4225,65 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
  * \param mode lock mode
  * \param decref force immediate lock releasing
  */
-void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
-                  enum ldlm_mode mode, int decref)
+static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+                         enum ldlm_mode mode, int decref)
 {
+       struct tgt_session_info *tsi = info->mti_env->le_ses ?
+                                      tgt_ses_info(info->mti_env) : NULL;
+
        ENTRY;
 
        if (lustre_handle_is_used(h)) {
-               if (decref || !info->mti_has_trans ||
-                   !(mode & (LCK_PW | LCK_EX))) {
+               bool has_trans = tsi && tsi->tsi_has_trans;
+
+               if (decref || !has_trans || !(mode & (LCK_PW | LCK_EX))) {
                        mdt_fid_unlock(h, mode);
                } else {
                        struct mdt_device *mdt = info->mti_mdt;
                        struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
-                       bool cos = mdt_cos_is_enabled(mdt);
-                       bool convert_lock = !cos && mdt_slc_is_enabled(mdt);
+                       bool no_ack = false;
 
                        LASSERTF(lock != NULL, "no lock for cookie %#llx\n",
                                 h->cookie);
 
                        /* there is no request if mdt_object_unlock() is called
-                        * from mdt_export_cleanup()->mdt_add_dirty_flag() */
+                        * from mdt_export_cleanup()->mdt_add_dirty_flag()
+                        */
                        if (likely(req != NULL)) {
-                               LDLM_DEBUG(lock, "save lock request %p reply "
-                                       "state %p transno %lld\n", req,
+                               LDLM_DEBUG(lock, "save lock request %p reply state %p transno %lld",
+                                       req,
                                        req->rq_reply_state, req->rq_transno);
-                               if (cos) {
-                                       ldlm_lock_mode_downgrade(lock, LCK_COS);
+                               if (mdt_cos_is_enabled(mdt)) {
                                        mode = LCK_COS;
+                                       no_ack = true;
+                                       ldlm_lock_mode_downgrade(lock, mode);
+                               } else if (mdt_slc_is_enabled(mdt)) {
+                                       no_ack = true;
+                                       if (mode != LCK_TXN) {
+                                               mode = LCK_TXN;
+                                               ldlm_lock_mode_downgrade(lock,
+                                                                        mode);
+                                       }
                                }
                                if (req->rq_export->exp_disconnected)
                                        mdt_fid_unlock(h, mode);
                                else
-                                       ptlrpc_save_lock(req, h, mode, cos,
-                                                        convert_lock);
+                                       ptlrpc_save_lock(req, h, no_ack);
                        } else {
                                mdt_fid_unlock(h, mode);
                        }
-                        if (mdt_is_lock_sync(lock)) {
-                                CDEBUG(D_HA, "found sync-lock,"
-                                       " async commit started\n");
-                                mdt_device_commit_async(info->mti_env,
-                                                        mdt);
-                        }
-                        LDLM_LOCK_PUT(lock);
-                }
-                h->cookie = 0ull;
-        }
 
-        EXIT;
+                       if (mdt_is_lock_sync(lock)) {
+                               CDEBUG(D_HA, "sync_lock, do async commit\n");
+                               mdt_device_commit_async(info->mti_env, mdt);
+                       }
+                       LDLM_LOCK_PUT(lock);
+               }
+               h->cookie = 0ull;
+       }
+
+       EXIT;
 }
 
 /**
@@ -3519,20 +4305,18 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info,
 
        if (lustre_handle_is_used(h)) {
                struct ldlm_lock *lock = ldlm_handle2lock(h);
+               struct ptlrpc_request *req = mdt_info_req(info);
 
                if (o != NULL &&
                    (lock->l_policy_data.l_inodebits.bits &
                     (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)))
                        mo_invalidate(info->mti_env, mdt_object_child(o));
 
-               if (decref || !info->mti_has_trans ||
-                   !(mode & (LCK_PW | LCK_EX))) {
+               if (decref || !req || !(mode & (LCK_PW | LCK_EX)) ||
+                   !tgt_ses_info(info->mti_env)->tsi_has_trans) {
                        ldlm_lock_decref_and_cancel(h, mode);
                        LDLM_LOCK_PUT(lock);
                } else {
-                       struct ptlrpc_request *req = mdt_info_req(info);
-
-                       LASSERT(req != NULL);
                        tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock,
                                          req->rq_transno);
                        ldlm_lock_decref(h, mode);
@@ -3562,7 +4346,11 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
 {
        ENTRY;
 
-       mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+       if (lh->mlh_pdo_remote)
+               mdt_save_remote_lock(info, o, &lh->mlh_pdo_lh,
+                                    lh->mlh_pdo_mode, decref);
+       else
+               mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
        mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
        mdt_save_remote_lock(info, o, &lh->mlh_rreg_lh, lh->mlh_rreg_mode,
                             decref);
@@ -3571,32 +4359,32 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
 }
 
 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
-                                        const struct lu_fid *f,
-                                        struct mdt_lock_handle *lh,
-                                        __u64 ibits)
+                                       const struct lu_fid *f,
+                                       struct mdt_lock_handle *lh,
+                                       __u64 ibits, enum ldlm_mode mode)
 {
-        struct mdt_object *o;
+       struct mdt_object *o;
 
-        o = mdt_object_find(info->mti_env, info->mti_mdt, f);
-        if (!IS_ERR(o)) {
-                int rc;
+       o = mdt_object_find(info->mti_env, info->mti_mdt, f);
+       if (!IS_ERR(o)) {
+               int rc;
 
-               rc = mdt_object_lock(info, o, lh, ibits);
-                if (rc != 0) {
-                        mdt_object_put(info->mti_env, o);
-                        o = ERR_PTR(rc);
-                }
-        }
-        return o;
+               rc = mdt_object_lock(info, o, lh, ibits, mode);
+               if (rc != 0) {
+                       mdt_object_put(info->mti_env, o);
+                       o = ERR_PTR(rc);
+               }
+       }
+       return o;
 }
 
-void mdt_object_unlock_put(struct mdt_thread_info * info,
-                           struct mdt_object * o,
-                           struct mdt_lock_handle *lh,
-                           int decref)
+void mdt_object_unlock_put(struct mdt_thread_info *info,
+                          struct mdt_object *o,
+                          struct mdt_lock_handle *lh,
+                          int decref)
 {
-        mdt_object_unlock(info, o, lh, decref);
-        mdt_object_put(info->mti_env, o);
+       mdt_object_unlock(info, o, lh, decref);
+       mdt_object_put(info->mti_env, o);
 }
 
 /*
@@ -3614,41 +4402,42 @@ void mdt_object_unlock_put(struct mdt_thread_info * info,
 static int mdt_body_unpack(struct mdt_thread_info *info,
                           enum tgt_handler_flags flags)
 {
-        const struct mdt_body    *body;
-        struct mdt_object        *obj;
-        const struct lu_env      *env;
-        struct req_capsule       *pill;
-        int                       rc;
-        ENTRY;
+       const struct mdt_body    *body;
+       struct mdt_object        *obj;
+       const struct lu_env      *env;
+       struct req_capsule       *pill;
+       int                       rc;
+
+       ENTRY;
 
-        env = info->mti_env;
-        pill = info->mti_pill;
+       env = info->mti_env;
+       pill = info->mti_pill;
 
-        body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
+       body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               RETURN(-EFAULT);
 
        if (!(body->mbo_valid & OBD_MD_FLID))
                RETURN(0);
 
        if (!fid_is_sane(&body->mbo_fid1)) {
                CERROR("Invalid fid: "DFID"\n", PFID(&body->mbo_fid1));
-                RETURN(-EINVAL);
-        }
+               RETURN(-EINVAL);
+       }
 
        obj = mdt_object_find(env, info->mti_mdt, &body->mbo_fid1);
        if (!IS_ERR(obj)) {
                if ((flags & HAS_BODY) && !mdt_object_exists(obj)) {
                        mdt_object_put(env, obj);
                        rc = -ENOENT;
-                } else {
-                        info->mti_object = obj;
-                        rc = 0;
-                }
-        } else
-                rc = PTR_ERR(obj);
+               } else {
+                       info->mti_object = obj;
+                       rc = 0;
+               }
+       } else
+               rc = PTR_ERR(obj);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
@@ -3668,19 +4457,23 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
                /* Pack reply. */
                if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
-                                            DEF_REP_MD_SIZE);
+                                            req_capsule_ptlreq(pill) ?
+                                            DEF_REP_MD_SIZE : MAX_MD_SIZE_OLD);
+
                if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_LOGCOOKIES,
                                             RCL_SERVER, 0);
 
                /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
                 * by default. If the target object has more ACL entries, then
-                * enlarge the buffer when necessary. */
+                * enlarge the buffer when necessary.
+                */
                if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
                                             LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
                mdt_preset_secctx_size(info);
+               mdt_preset_encctx_size(info);
 
                rc = req_capsule_server_pack(pill);
                if (rc)
@@ -3690,21 +4483,28 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
        RETURN(rc);
 }
 
-void mdt_lock_handle_init(struct mdt_lock_handle *lh)
+void mdt_thread_info_reset(struct mdt_thread_info *info)
 {
-        lh->mlh_type = MDT_NUL_LOCK;
-        lh->mlh_reg_lh.cookie = 0ull;
-        lh->mlh_reg_mode = LCK_MINMODE;
-        lh->mlh_pdo_lh.cookie = 0ull;
-        lh->mlh_pdo_mode = LCK_MINMODE;
-       lh->mlh_rreg_lh.cookie = 0ull;
-       lh->mlh_rreg_mode = LCK_MINMODE;
-}
+       memset(&info->mti_attr, 0, sizeof(info->mti_attr));
+       info->mti_body = NULL;
+       info->mti_dlm_req = NULL;
+       info->mti_cross_ref = 0;
+       info->mti_opdata = 0;
+       info->mti_big_lmm_used = 0;
+       info->mti_big_acl_used = 0;
+       info->mti_som_strict = 0;
 
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
-{
-        LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
-        LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
+       info->mti_spec.no_create = 0;
+       info->mti_spec.sp_rm_entry = 0;
+       info->mti_spec.sp_permitted = 0;
+
+       info->mti_spec.u.sp_ea.eadata = NULL;
+       info->mti_spec.u.sp_ea.eadatalen = 0;
+
+       if (info->mti_batch_env && info->mti_object != NULL) {
+               mdt_object_put(info->mti_env, info->mti_object);
+               info->mti_object = NULL;
+       }
 }
 
 /*
@@ -3715,42 +4515,21 @@ void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
 void mdt_thread_info_init(struct ptlrpc_request *req,
                          struct mdt_thread_info *info)
 {
-        int i;
+       info->mti_pill = &req->rq_pill;
 
-        info->mti_pill = &req->rq_pill;
-
-        /* lock handle */
-        for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
-                mdt_lock_handle_init(&info->mti_lh[i]);
-
-        /* mdt device: it can be NULL while CONNECT */
-        if (req->rq_export) {
-                info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
-                info->mti_exp = req->rq_export;
-        } else
-                info->mti_mdt = NULL;
+       /* mdt device: it can be NULL while CONNECT */
+       if (req->rq_export) {
+               info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+               info->mti_exp = req->rq_export;
+       } else
+               info->mti_mdt = NULL;
        info->mti_env = req->rq_svc_thread->t_env;
        info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
-
-        memset(&info->mti_attr, 0, sizeof(info->mti_attr));
        info->mti_big_buf = LU_BUF_NULL;
-       info->mti_body = NULL;
-        info->mti_object = NULL;
-        info->mti_dlm_req = NULL;
-        info->mti_has_trans = 0;
-        info->mti_cross_ref = 0;
-        info->mti_opdata = 0;
-       info->mti_big_lmm_used = 0;
-       info->mti_big_acl_used = 0;
-       info->mti_som_valid = 0;
+       info->mti_batch_env = 0;
+       info->mti_object = NULL;
 
-        info->mti_spec.no_create = 0;
-       info->mti_spec.sp_rm_entry = 0;
-       info->mti_spec.sp_permitted = 0;
-       info->mti_spec.sp_migrate_close = 0;
-
-       info->mti_spec.u.sp_ea.eadata = NULL;
-       info->mti_spec.u.sp_ea.eadatalen = 0;
+       mdt_thread_info_reset(info);
 }
 
 void mdt_thread_info_fini(struct mdt_thread_info *info)
@@ -3763,10 +4542,11 @@ void mdt_thread_info_fini(struct mdt_thread_info *info)
        }
 
        for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
-               mdt_lock_handle_fini(&info->mti_lh[i]);
+               mdt_lock_handle_assert(&info->mti_lh[i]);
        info->mti_env = NULL;
        info->mti_pill = NULL;
        info->mti_exp = NULL;
+       info->mti_mdt = NULL;
 
        if (unlikely(info->mti_big_buf.lb_buf != NULL))
                lu_buf_free(&info->mti_big_buf);
@@ -3792,12 +4572,10 @@ struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi)
 
 static int mdt_tgt_connect(struct tgt_session_info *tsi)
 {
-       if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) &&
+       if (CFS_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) &&
            cfs_fail_val ==
-           tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) {
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               schedule_timeout(cfs_time_seconds(3));
-       }
+           tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id)
+               schedule_timeout_uninterruptible(cfs_time_seconds(3));
 
        return tgt_connect(tsi);
 }
@@ -3822,112 +4600,117 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
                            struct mdt_lock_handle *lh,
                            __u64 flags, int result)
 {
-        struct ptlrpc_request  *req = mdt_info_req(info);
-        struct ldlm_lock       *lock = *lockp;
+       struct ptlrpc_request  *req = mdt_info_req(info);
+       struct ldlm_lock       *lock = *lockp;
        struct ldlm_lock       *new_lock;
 
        /* If possible resent found a lock, @lh is set to its handle */
        new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
 
-        if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
-                lh->mlh_reg_lh.cookie = 0;
-                RETURN(0);
-        }
-
-       if (new_lock == NULL && (flags & LDLM_FL_RESENT)) {
-               /* Lock is pinned by ldlm_handle_enqueue0() as it is
-                * a resend case, however, it could be already destroyed
-                * due to client eviction or a raced cancel RPC. */
-               LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
-                                 lh->mlh_reg_lh.cookie);
+       if (new_lock == NULL) {
+               if (flags & LDLM_FL_INTENT_ONLY) {
+                       result = 0;
+               } else if (flags & LDLM_FL_RESENT) {
+                       /* Lock is pinned by ldlm_handle_enqueue0() as it is a
+                        * resend case, however, it could be already destroyed
+                        * due to client eviction or a raced cancel RPC.
+                        */
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
+                                         lh->mlh_reg_lh.cookie);
+                       result = -ESTALE;
+               } else {
+                       CERROR("%s: Invalid lockh=%#llx flags=%#llx fid1="DFID" fid2="DFID": rc = %d\n",
+                              mdt_obd_name(info->mti_mdt),
+                              lh->mlh_reg_lh.cookie, flags,
+                              PFID(&info->mti_tmp_fid1),
+                              PFID(&info->mti_tmp_fid2), result);
+                       result = -ESTALE;
+               }
                lh->mlh_reg_lh.cookie = 0;
-               RETURN(-ESTALE);
+               RETURN(result);
+       }
+
+       /*
+        * If we've already given this lock to a client once, then we should
+        * have no readers or writers.  Otherwise, we should have one reader
+        * _or_ writer ref (which will be zeroed below) before returning the
+        * lock to a client.
+        */
+       if (new_lock->l_export == req->rq_export) {
+               LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
+       } else {
+               LASSERT(new_lock->l_export == NULL);
+               LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
        }
 
-       LASSERTF(new_lock != NULL,
-                "lockh %#llx flags %#llx : rc = %d\n",
-                lh->mlh_reg_lh.cookie, flags, result);
-
-        /*
-         * If we've already given this lock to a client once, then we should
-         * have no readers or writers.  Otherwise, we should have one reader
-         * _or_ writer ref (which will be zeroed below) before returning the
-         * lock to a client.
-         */
-        if (new_lock->l_export == req->rq_export) {
-                LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
-        } else {
-                LASSERT(new_lock->l_export == NULL);
-                LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
-        }
-
-        *lockp = new_lock;
-
-        if (new_lock->l_export == req->rq_export) {
-                /*
-                 * Already gave this to the client, which means that we
-                 * reconstructed a reply.
-                 */
-                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
-                        MSG_RESENT);
+       *lockp = new_lock;
+
+       if (new_lock->l_export == req->rq_export) {
+               /*
+                * Already gave this to the client, which means that we
+                * reconstructed a reply.
+                */
+               LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+                       MSG_RESENT);
 
                LDLM_LOCK_RELEASE(new_lock);
-                lh->mlh_reg_lh.cookie = 0;
-                RETURN(ELDLM_LOCK_REPLACED);
-        }
-
-        /*
-         * Fixup the lock to be given to the client.
-         */
-        lock_res_and_lock(new_lock);
-        /* Zero new_lock->l_readers and new_lock->l_writers without triggering
-         * possible blocking AST. */
-        while (new_lock->l_readers > 0) {
-                lu_ref_del(&new_lock->l_reference, "reader", new_lock);
-                lu_ref_del(&new_lock->l_reference, "user", new_lock);
-                new_lock->l_readers--;
-        }
-        while (new_lock->l_writers > 0) {
-                lu_ref_del(&new_lock->l_reference, "writer", new_lock);
-                lu_ref_del(&new_lock->l_reference, "user", new_lock);
-                new_lock->l_writers--;
-        }
-
-        new_lock->l_export = class_export_lock_get(req->rq_export, new_lock);
-        new_lock->l_blocking_ast = lock->l_blocking_ast;
-        new_lock->l_completion_ast = lock->l_completion_ast;
+               lh->mlh_reg_lh.cookie = 0;
+               RETURN(ELDLM_LOCK_REPLACED);
+       }
+
+       /*
+        * Fixup the lock to be given to the client.
+        */
+       lock_res_and_lock(new_lock);
+       /* Zero new_lock->l_readers and new_lock->l_writers without triggering
+        * possible blocking AST.
+        */
+       while (new_lock->l_readers > 0) {
+               lu_ref_del(&new_lock->l_reference, "reader", new_lock);
+               lu_ref_del(&new_lock->l_reference, "user", new_lock);
+               new_lock->l_readers--;
+       }
+       while (new_lock->l_writers > 0) {
+               lu_ref_del(&new_lock->l_reference, "writer", new_lock);
+               lu_ref_del(&new_lock->l_reference, "user", new_lock);
+               new_lock->l_writers--;
+       }
+
+       new_lock->l_export = class_export_lock_get(req->rq_export, new_lock);
+       new_lock->l_blocking_ast = lock->l_blocking_ast;
+       new_lock->l_completion_ast = lock->l_completion_ast;
        if (ldlm_has_dom(new_lock))
                new_lock->l_glimpse_ast = ldlm_server_glimpse_ast;
-        new_lock->l_remote_handle = lock->l_remote_handle;
-        new_lock->l_flags &= ~LDLM_FL_LOCAL;
+       new_lock->l_remote_handle = lock->l_remote_handle;
+       new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
-        unlock_res_and_lock(new_lock);
+       unlock_res_and_lock(new_lock);
 
-        cfs_hash_add(new_lock->l_export->exp_lock_hash,
-                     &new_lock->l_remote_handle,
-                     &new_lock->l_exp_hash);
+       cfs_hash_add(new_lock->l_export->exp_lock_hash,
+                    &new_lock->l_remote_handle,
+                    &new_lock->l_exp_hash);
 
-        LDLM_LOCK_RELEASE(new_lock);
-        lh->mlh_reg_lh.cookie = 0;
+       LDLM_LOCK_RELEASE(new_lock);
+       lh->mlh_reg_lh.cookie = 0;
 
-        RETURN(ELDLM_LOCK_REPLACED);
+       RETURN(ELDLM_LOCK_REPLACED);
 }
 
 void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                             struct ldlm_lock *new_lock,
                             struct mdt_lock_handle *lh, __u64 flags)
 {
-        struct ptlrpc_request  *req = mdt_info_req(info);
-        struct ldlm_request    *dlmreq;
+       struct ptlrpc_request  *req = mdt_info_req(info);
+       struct ldlm_request    *dlmreq;
 
-        if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
-                return;
+       if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+               return;
 
-        dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
+       dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
 
-       /* Check if this is a resend case (MSG_RESENT is set on RPC) and a
-        * lock was found by ldlm_handle_enqueue(); if so @lh must be
-        * initialized. */
+       /* if this is a resend case (MSG_RESENT is set on RPC) and a lock was
+        * found by ldlm_handle_enqueue(); if so @lh must be initialized.
+        */
        if (flags & LDLM_FL_RESENT) {
                lh->mlh_reg_lh.cookie = new_lock->l_handle.h_cookie;
                lh->mlh_reg_mode = new_lock->l_granted_mode;
@@ -3942,15 +4725,15 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info,
         * If the xid matches, then we know this is a resent request, and allow
         * it. (It's probably an OPEN, for which we don't send a lock.
         */
-       if (req_can_reconstruct(req, NULL) == 1)
+       if (req_can_reconstruct(req, NULL) != 0)
                return;
 
-        /*
-         * This remote handle isn't enqueued, so we never received or processed
-         * this request.  Clear MSG_RESENT, because it can be handled like any
-         * normal request now.
-         */
-        lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
+       /*
+        * This remote handle isn't enqueued, so we never received or processed
+        * this request.  Clear MSG_RESENT, because it can be handled like any
+        * normal request now.
+        */
+       lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
        DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle %#llx",
                  dlmreq->lock_handle[0].cookie);
@@ -3964,6 +4747,7 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
        struct ldlm_reply      *ldlm_rep = NULL;
        int rc;
+
        ENTRY;
 
        /*
@@ -3973,9 +4757,8 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
         */
        mdt_intent_fixup_resent(info, *lockp, lhc, flags);
        if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-               mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
                rc = mdt_object_lock(info, info->mti_object, lhc,
-                                    MDS_INODELOCK_XATTR);
+                                    MDS_INODELOCK_XATTR, (*lockp)->l_req_mode);
                if (rc)
                        return rc;
        }
@@ -3986,7 +4769,7 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
 
        if (ldlm_rep == NULL ||
-           OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) {
+           CFS_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) {
                mdt_object_unlock(info,  info->mti_object, lhc, 1);
                if (is_serious(rc))
                        RETURN(rc);
@@ -3996,8 +4779,7 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
 
        ldlm_rep->lock_policy_res2 = clear_serious(rc);
 
-       /* This is left for interop instead of adding a new interop flag.
-        * LU-7433 */
+       /* This is for interop instead of adding a new interop flag. LU-7433 */
 #if LUSTRE_VERSION_CODE > OBD_OCD_VERSION(3, 0, 0, 0)
        if (ldlm_rep->lock_policy_res2) {
                mdt_object_unlock(info, info->mti_object, lhc, 1);
@@ -4010,23 +4792,24 @@ static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
 }
 
 static int mdt_intent_getattr(enum ldlm_intent_flags it_opc,
-                              struct mdt_thread_info *info,
-                              struct ldlm_lock **lockp,
+                             struct mdt_thread_info *info,
+                             struct ldlm_lock **lockp,
                              __u64 flags)
 {
-        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
-        __u64                   child_bits;
-        struct ldlm_reply      *ldlm_rep;
-        struct mdt_body        *reqbody;
-        struct mdt_body        *repbody;
-        int                     rc, rc2;
-        ENTRY;
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
+       __u64                   child_bits;
+       struct ldlm_reply      *ldlm_rep;
+       struct mdt_body        *reqbody;
+       struct mdt_body        *repbody;
+       int                     rc, rc2;
 
-        reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(reqbody);
+       ENTRY;
 
-        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(repbody);
+       reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(reqbody);
+
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(repbody);
 
        info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
        repbody->mbo_eadatasize = 0;
@@ -4046,12 +4829,12 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc,
                GOTO(out_shrink, rc = -EINVAL);
        }
 
-       rc = mdt_init_ucred_intent_getattr(info, reqbody);
+       rc = mdt_init_ucred(info, reqbody);
        if (rc)
                GOTO(out_shrink, rc);
 
-        ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
-        mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
+       ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+       mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
 
        /* Get lock from request for possible resent case. */
        mdt_intent_fixup_resent(info, *lockp, lhc, flags);
@@ -4059,24 +4842,25 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc,
        rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
        ldlm_rep->lock_policy_res2 = clear_serious(rc);
 
-        if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
-                ldlm_rep->lock_policy_res2 = 0;
-        if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
-            ldlm_rep->lock_policy_res2) {
-                lhc->mlh_reg_lh.cookie = 0ull;
-                GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
-        }
+       if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG) &&
+           ldlm_rep->lock_policy_res2 != -ENOKEY)
+               ldlm_rep->lock_policy_res2 = 0;
+       if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
+           ldlm_rep->lock_policy_res2) {
+               lhc->mlh_reg_lh.cookie = 0ull;
+               GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
+       }
 
        rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
-        EXIT;
+       EXIT;
 out_ucred:
-        mdt_exit_ucred(info);
+       mdt_exit_ucred(info);
 out_shrink:
-        mdt_client_compatibility(info);
-        rc2 = mdt_fix_reply(info);
-        if (rc == 0)
-                rc = rc2;
-        return rc;
+       mdt_client_compatibility(info);
+       rc2 = mdt_fix_reply(info);
+       if (rc == 0)
+               rc = rc2;
+       return rc;
 }
 
 static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
@@ -4102,14 +4886,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
        if (intent == NULL)
                RETURN(-EPROTO);
 
-       CDEBUG(D_INFO, DFID "got layout change request from client: "
-              "opc:%u flags:%#x extent "DEXT"\n",
-              PFID(fid), intent->li_opc, intent->li_flags,
-              PEXT(&intent->li_extent));
+       CDEBUG(D_INFO, DFID "got layout change request from client: opc:%u flags:%#x extent "
+              DEXT"\n",
+              PFID(fid), intent->lai_opc, intent->lai_flags,
+              PEXT(&intent->lai_extent));
 
-       switch (intent->li_opc) {
+       switch (intent->lai_opc) {
        case LAYOUT_INTENT_TRUNC:
        case LAYOUT_INTENT_WRITE:
+       case LAYOUT_INTENT_PCCRO_SET:
+       case LAYOUT_INTENT_PCCRO_CLEAR:
                layout.mlc_opc = MD_LAYOUT_WRITE;
                layout.mlc_intent = intent;
                break;
@@ -4120,11 +4906,11 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
        case LAYOUT_INTENT_RELEASE:
        case LAYOUT_INTENT_RESTORE:
                CERROR("%s: Unsupported layout intent opc %d\n",
-                      mdt_obd_name(info->mti_mdt), intent->li_opc);
+                      mdt_obd_name(info->mti_mdt), intent->lai_opc);
                RETURN(-ENOTSUPP);
        default:
                CERROR("%s: Unknown layout intent opc %d\n",
-                      mdt_obd_name(info->mti_mdt), intent->li_opc);
+                      mdt_obd_name(info->mti_mdt), intent->lai_opc);
                RETURN(-EINVAL);
        }
 
@@ -4224,26 +5010,30 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
                           struct ldlm_lock **lockp,
                           __u64 flags)
 {
-        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
-        struct ldlm_reply      *rep = NULL;
-        long                    opc;
-        int                     rc;
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
+       struct ldlm_reply      *rep = NULL;
+       long                    opc;
+       int                     rc;
+       struct ptlrpc_request  *req = mdt_info_req(info);
 
-        static const struct req_format *intent_fmts[REINT_MAX] = {
-                [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
-                [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
-        };
+       static const struct req_format *intent_fmts[REINT_MAX] = {
+               [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
+               [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
+       };
 
-        ENTRY;
+       ENTRY;
 
        opc = mdt_reint_opcode(mdt_info_req(info), intent_fmts);
-        if (opc < 0)
-                RETURN(opc);
+       if (opc < 0)
+               RETURN(opc);
 
        /* Get lock from request for possible resent case. */
        mdt_intent_fixup_resent(info, *lockp, lhc, flags);
 
-        rc = mdt_reint_internal(info, lhc, opc);
+       rc = mdt_reint_internal(info, lhc, opc);
+
+       if (rc < 0 && lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+               DEBUG_REQ(D_ERROR, req, "Replay open failed with %d", rc);
 
        /* Check whether the reply has been packed successfully. */
        if (mdt_info_req(info)->rq_repmsg != NULL)
@@ -4255,12 +5045,13 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
                        RETURN(err_serious(-EFAULT));
        }
 
-        /* MDC expects this in any case */
-        if (rc != 0)
-                mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
+       /* MDC expects this in any case */
+       if (rc != 0)
+               mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
 
        /* the open lock or the lock for cross-ref object should be
-        * returned to the client */
+        * returned to the client
+        */
        if (lustre_handle_is_used(&lhc->mlh_reg_lh) &&
            (rc == 0 || rc == -MDT_EREMOTE_OPEN)) {
                rep->lock_policy_res2 = 0;
@@ -4270,22 +5061,22 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
 
        rep->lock_policy_res2 = clear_serious(rc);
 
-        if (rep->lock_policy_res2 == -ENOENT &&
+       if (rep->lock_policy_res2 == -ENOENT &&
            mdt_get_disposition(rep, DISP_LOOKUP_NEG) &&
            !mdt_get_disposition(rep, DISP_OPEN_CREATE))
                rep->lock_policy_res2 = 0;
 
        lhc->mlh_reg_lh.cookie = 0ull;
-        if (rc == -ENOTCONN || rc == -ENODEV ||
-            rc == -EOVERFLOW) { /**< if VBR failure then return error */
-                /*
-                 * If it is the disconnect error (ENODEV & ENOCONN), the error
-                 * will be returned by rq_status, and client at ptlrpc layer
-                 * will detect this, then disconnect, reconnect the import
-                 * immediately, instead of impacting the following the rpc.
-                 */
-                RETURN(rc);
-        }
+       if (rc == -ENOTCONN || rc == -ENODEV ||
+           rc == -EOVERFLOW) { /**< if VBR failure then return error */
+               /*
+                * If it is the disconnect error (ENODEV & ENOCONN), the error
+                * will be returned by rq_status, and client at ptlrpc layer
+                * will detect this, then disconnect, reconnect the import
+                * immediately, instead of impacting the following the rpc.
+                */
+               RETURN(rc);
+       }
        /*
         * For other cases, the error will be returned by intent, and client
         * will retrieve the result from intent.
@@ -4309,6 +5100,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
        struct ldlm_reply *rep;
        bool check_mdt_object = false;
        int rc;
+
        ENTRY;
 
        switch (it_opc) {
@@ -4325,7 +5117,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
                break;
        case IT_GETATTR:
                check_mdt_object = true;
-               /* fallthrough */
+               fallthrough;
        case IT_LOOKUP:
                it_format = &RQF_LDLM_INTENT_GETATTR;
                it_handler = &mdt_intent_getattr;
@@ -4372,7 +5164,8 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
                RETURN(-EPROTO);
        }
 
-       req_capsule_extend(pill, it_format);
+       if (!info->mti_batch_env)
+               req_capsule_extend(pill, it_format);
 
        rc = mdt_unpack_req_pack_rep(info, it_handler_flags);
        if (rc < 0)
@@ -4384,13 +5177,13 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
        if (it_handler_flags & IS_MUTABLE && mdt_rdonly(req->rq_export))
                RETURN(-EROFS);
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10);
 
        /* execute policy */
        rc = (*it_handler)(it_opc, info, lockp, flags);
 
        /* Check whether the reply has been packed successfully. */
-       if (req->rq_repmsg != NULL) {
+       if (info->mti_batch_env || req->rq_repmsg != NULL) {
                rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
                rep->lock_policy_res2 =
                        ptlrpc_status_hton(rep->lock_policy_res2);
@@ -4432,14 +5225,30 @@ static int mdt_intent_policy(const struct lu_env *env,
 
        tsi = tgt_ses_info(env);
 
-       info = tsi2mdt_info(tsi);
+       info = mdt_th_info(env);
        LASSERT(info != NULL);
-       pill = info->mti_pill;
+
+       /* Check whether it is a sub request processing in a batch request */
+       if (info->mti_batch_env) {
+               pill = info->mti_pill;
+               LASSERT(pill == &info->mti_sub_pill);
+       } else {
+               info = tsi2mdt_info(tsi);
+               pill = info->mti_pill;
+       }
+
        LASSERT(pill->rc_req == req);
        ldesc = &info->mti_dlm_req->lock_desc;
 
-       if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
-               req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
+       if (info->mti_batch_env ||
+           req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
+               /*
+                * For batch processing environment, the request format has
+                * already been set.
+                */
+               if (!info->mti_batch_env)
+                       req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
+
                it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
                if (it != NULL) {
                        mdt_ptlrpc_stats_update(req, it->opc);
@@ -4451,12 +5260,30 @@ static int mdt_intent_policy(const struct lu_env *env,
                         * later in ldlm. Let's check it now to see if we have
                         * ibits corrupted somewhere in mdt_intent_opc().
                         * The case for client miss to set ibits has been
-                        * processed by others. */
+                        * processed by others.
+                        */
                        LASSERT(ergo(ldesc->l_resource.lr_type == LDLM_IBITS,
                                ldesc->l_policy_data.l_inodebits.bits != 0));
                } else {
                        rc = err_serious(-EFAULT);
                }
+       } else if (ldesc->l_resource.lr_type == LDLM_IBITS &&
+                  ldesc->l_policy_data.l_inodebits.bits == MDS_INODELOCK_DOM) {
+               struct ldlm_reply *rep;
+
+               /* No intent was provided but INTENT flag is set along with
+                * DOM bit, this is considered as GLIMPSE request.
+                * This logic is common for MDT and OST glimpse
+                */
+               mdt_ptlrpc_stats_update(req, IT_GLIMPSE);
+               rc = mdt_glimpse_enqueue(info, ns, lockp, flags);
+               /* Check whether the reply has been packed successfully. */
+               if (req->rq_repmsg != NULL) {
+                       rep = req_capsule_server_get(info->mti_pill,
+                                                    &RMF_DLM_REP);
+                       rep->lock_policy_res2 =
+                               ptlrpc_status_hton(rep->lock_policy_res2);
+               }
        } else {
                /* No intent was provided */
                req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
@@ -4464,7 +5291,9 @@ static int mdt_intent_policy(const struct lu_env *env,
                if (rc)
                        rc = err_serious(rc);
        }
-       mdt_thread_info_fini(info);
+
+       if (!info->mti_batch_env)
+               mdt_thread_info_fini(info);
        RETURN(rc);
 }
 
@@ -4516,6 +5345,7 @@ static int mdt_register_lwp_callback(void *data)
        struct mdt_device       *mdt = data;
        struct lu_server_fld    *fld = mdt_seq_site(mdt)->ss_server_fld;
        int                     rc;
+
        ENTRY;
 
        LASSERT(mdt_seq_site(mdt)->ss_node_id != 0);
@@ -4527,7 +5357,8 @@ static int mdt_register_lwp_callback(void *data)
        }
 
        /* Allocate new sequence now to avoid creating local transaction
-        * in the normal transaction process */
+        * in the normal transaction process
+        */
        rc = seq_server_check_and_alloc_super(&env,
                                              mdt_seq_site(mdt)->ss_server_seq);
        if (rc < 0)
@@ -4589,13 +5420,14 @@ out_free:
  */
 static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
 {
-       struct seq_server_site  *ss = mdt_seq_site(mdt);
-       int                     rc;
-       char                    *prefix;
+       struct seq_server_site *ss = mdt_seq_site(mdt);
+       char *prefix;
+
        ENTRY;
 
        /* check if this is adding the first MDC and controller is not yet
-        * initialized. */
+        * initialized.
+        */
        OBD_ALLOC_PTR(ss->ss_client_seq);
        if (ss->ss_client_seq == NULL)
                RETURN(-ENOMEM);
@@ -4609,25 +5441,19 @@ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
 
        /* Note: seq_client_fini will be called in seq_site_fini */
        snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt));
-       rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
-                            prefix, ss->ss_node_id == 0 ?  ss->ss_control_seq :
+       seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
+                       prefix, ss->ss_node_id == 0 ?  ss->ss_control_seq :
                                                            NULL);
        OBD_FREE(prefix, MAX_OBD_NAME + 5);
-       if (rc != 0) {
-               OBD_FREE_PTR(ss->ss_client_seq);
-               ss->ss_client_seq = NULL;
-               RETURN(rc);
-       }
-
-       rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
 
-       RETURN(rc);
+       RETURN(seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq));
 }
 
 static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt)
 {
        struct seq_server_site  *ss;
        int                     rc;
+
        ENTRY;
 
        ss = mdt_seq_site(mdt);
@@ -4675,9 +5501,10 @@ out_seq_fini:
  * FLD wrappers
  */
 static int mdt_fld_fini(const struct lu_env *env,
-                        struct mdt_device *m)
+                       struct mdt_device *m)
 {
        struct seq_server_site *ss = mdt_seq_site(m);
+
        ENTRY;
 
        if (ss && ss->ss_server_fld) {
@@ -4690,11 +5517,12 @@ static int mdt_fld_fini(const struct lu_env *env,
 }
 
 static int mdt_fld_init(const struct lu_env *env,
-                        const char *uuid,
-                        struct mdt_device *m)
+                       const char *uuid,
+                       struct mdt_device *m)
 {
        struct seq_server_site *ss;
        int rc;
+
        ENTRY;
 
        ss = mdt_seq_site(m);
@@ -4720,6 +5548,7 @@ static void mdt_stack_pre_fini(const struct lu_env *env,
        struct lustre_cfg_bufs  *bufs;
        struct lustre_cfg       *lcfg;
        struct mdt_thread_info  *info;
+
        ENTRY;
 
        LASSERT(top);
@@ -4736,7 +5565,8 @@ static void mdt_stack_pre_fini(const struct lu_env *env,
        /* XXX: this is needed because all layers are referenced by
         * objects (some of them are pinned by osd, for example *
         * the proper solution should be a model where object used
-        * by osd only doesn't have mdt/mdd slices -bzzz */
+        * by osd only doesn't have mdt/mdd slices -bzzz
+        */
        lustre_cfg_bufs_reset(bufs, mdt_obd_name(m));
        lustre_cfg_bufs_set_string(bufs, 1, NULL);
        OBD_ALLOC(lcfg, lustre_cfg_len(bufs->lcfg_bufcount, bufs->lcfg_buflen));
@@ -4757,6 +5587,7 @@ static void mdt_stack_fini(const struct lu_env *env,
        struct lustre_cfg       *lcfg;
        struct mdt_thread_info  *info;
        char                     flags[3] = "";
+
        ENTRY;
 
        info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
@@ -4802,6 +5633,7 @@ static int mdt_connect_to_next(const struct lu_env *env, struct mdt_device *m,
        struct obd_connect_data *data = NULL;
        struct obd_device       *obd;
        int                      rc;
+
        ENTRY;
 
        OBD_ALLOC_PTR(data);
@@ -4842,7 +5674,8 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt,
        struct obd_device      *obd;
        struct lustre_profile  *lprof;
        struct lu_site         *site;
-        ENTRY;
+
+       ENTRY;
 
        /* in 1.8 we had the only device in the stack - MDS.
         * 2.0 introduces MDT, MDD, OSD; MDT starts others internally.
@@ -4868,7 +5701,8 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt,
         *   #02 (160)setup   0:lustre-MDT0000  1:lustre-MDT0000_UUID  2:0
         *                    3:lustre-MDD0000  4:f
         *
-        *  notice we build the stack from down to top: MDD first, then MDT */
+        *  notice we build the stack from down to top: MDD first, then MDT
+        */
 
        name_size = MAX_OBD_NAME;
        uuid_size = MAX_OBD_NAME;
@@ -4990,6 +5824,7 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt,
        struct lustre_profile   *lprof;
        struct obd_connect_data *data;
        int                      rc;
+
        ENTRY;
 
        LASSERT(mdt->mdt_qmt_exp == NULL);
@@ -5006,7 +5841,8 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt,
         * We generate the QMT name from the MDT one, just replacing MD with QM
         * after all the preparations, the logical equivalent will be:
         *   #01 (160)setup   0:lustre-QMT0000  1:lustre-QMT0000_UUID  2:0
-        *                    3:lustre-MDT0000-osd  4:f */
+        *                    3:lustre-MDT0000-osd  4:f
+        */
        OBD_ALLOC(qmtname, MAX_OBD_NAME);
        OBD_ALLOC(uuid, UUID_MAX);
        OBD_ALLOC_PTR(bufs);
@@ -5074,7 +5910,7 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt,
        mdt->mdt_qmt_dev = obd->obd_lu_dev;
 
        /* configure local quota objects */
-       if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_INIT))
+       if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_INIT))
                rc = -EBADF;
        else
                rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env,
@@ -5138,7 +5974,8 @@ static void mdt_quota_fini(const struct lu_env *env, struct mdt_device *mdt)
 
 /* mdt_getxattr() is used from mdt_intent_getxattr(), use this wrapper
  * for now. This will be removed along with converting rest of MDT code
- * to use tgt_session_info */
+ * to use tgt_session_info
+ */
 static int mdt_tgt_getxattr(struct tgt_session_info *tsi)
 {
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
@@ -5153,6 +5990,16 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi)
        return rc;
 }
 
+static int mdt_llog_open(struct tgt_session_info *tsi)
+{
+       ENTRY;
+
+       if (!mdt_changelog_allow(tsi2mdt_info(tsi)))
+               RETURN(err_serious(-EACCES));
+
+       RETURN(tgt_llog_open(tsi));
+}
+
 #define OBD_FAIL_OST_READ_NET  OBD_FAIL_OST_BRW_NET
 #define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
 #define OST_BRW_READ   OST_READ
@@ -5167,7 +6014,7 @@ TGT_RPC_HANDLER(MDS_FIRST_OPC,
                &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
 TGT_RPC_HANDLER(MDS_FIRST_OPC,
                HAS_REPLY,              MDS_SET_INFO,   mdt_set_info,
-               &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION),
+               &RQF_MDT_SET_INFO, LUSTRE_MDS_VERSION),
 TGT_MDT_HDL(0,                         MDS_GET_INFO,   mdt_get_info),
 TGT_MDT_HDL(HAS_REPLY,         MDS_GET_ROOT,   mdt_get_root),
 TGT_MDT_HDL(HAS_BODY,          MDS_GETATTR,    mdt_getattr),
@@ -5197,6 +6044,7 @@ TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE,
            MDS_SWAP_LAYOUTS,
            mdt_swap_layouts),
 TGT_MDT_HDL(IS_MUTABLE,                MDS_RMFID,      mdt_rmfid),
+TGT_MDT_HDL(IS_MUTABLE,                MDS_BATCH,      mdt_batch),
 };
 
 static struct tgt_handler mdt_io_ops[] = {
@@ -5206,13 +6054,19 @@ TGT_OST_HDL_HP(HAS_BODY | IS_MUTABLE,    OST_BRW_WRITE, tgt_brw_write,
                                                        mdt_hp_brw),
 TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE,
                                         OST_PUNCH,     mdt_punch_hdl,
-                                                       mdt_hp_punch),
+                                                       mdt_hp_punch),
 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC,    mdt_data_sync),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE,
+                                                       mdt_fallocate_hdl),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek),
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+               0,                      OST_SET_INFO,   mdt_io_set_info,
+               &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION),
 };
 
 static struct tgt_handler mdt_sec_ctx_ops[] = {
 TGT_SEC_HDL_VAR(0,                     SEC_CTX_INIT,     mdt_sec_ctx_handle),
-TGT_SEC_HDL_VAR(0,                     SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
+TGT_SEC_HDL_VAR(0,                     SEC_CTX_INIT_CONT, mdt_sec_ctx_handle),
 TGT_SEC_HDL_VAR(0,                     SEC_CTX_FINI,     mdt_sec_ctx_handle)
 };
 
@@ -5220,6 +6074,13 @@ static struct tgt_handler mdt_quota_ops[] = {
 TGT_QUOTA_HDL(HAS_REPLY,               QUOTA_DQACQ,      mdt_quota_dqacq),
 };
 
+static struct tgt_handler mdt_llog_handlers[] = {
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE,      mdt_llog_open),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
+};
+
 static struct tgt_opc_slice mdt_common_slice[] = {
        {
                .tos_opc_start  = MDS_FIRST_OPC,
@@ -5264,7 +6125,7 @@ static struct tgt_opc_slice mdt_common_slice[] = {
        {
                .tos_opc_start  = LLOG_FIRST_OPC,
                .tos_opc_end    = LLOG_LAST_OPC,
-               .tos_hs         = tgt_llog_handlers
+               .tos_hs         = mdt_llog_handlers
        },
        {
                .tos_opc_start  = LFSCK_FIRST_OPC,
@@ -5294,6 +6155,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
 
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
+
+       mdt_restriper_stop(m);
        ping_evictor_stop();
 
        /* Remove the HSM /proc entry so the coordinator cannot be
@@ -5324,8 +6187,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        upcall_cache_cleanup(m->mdt_identity_cache);
        m->mdt_identity_cache = NULL;
 
-       mdt_fs_cleanup(env, m);
-
        tgt_fini(env, &m->mdt_lut);
 
        mdt_hsm_cdt_fini(m);
@@ -5382,6 +6243,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        int rc;
        long node_id;
        mntopt_t mntopts;
+
        ENTRY;
 
        lu_device_init(&m->mdt_lu_dev, ldt);
@@ -5404,7 +6266,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        LASSERT(obd != NULL);
 
        m->mdt_max_mdsize = MAX_MD_SIZE_OLD;
-       m->mdt_opts.mo_evict_tgt_nids = 1;
+       m->mdt_evict_tgt_nids = 1;
        m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
 
        lmi = server_get_mount(dev);
@@ -5413,40 +6275,58 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                RETURN(-EFAULT);
        } else {
                lsi = s2lsi(lmi->lmi_sb);
+               LASSERT(lsi->lsi_lmd);
                /* CMD is supported only in IAM mode */
                LASSERT(num);
-               node_id = simple_strtol(num, NULL, 10);
-               obd->u.obt.obt_magic = OBT_MAGIC;
-               if (lsi->lsi_lmd != NULL &&
-                   lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
+               rc = kstrtol(num, 10, &node_id);
+               if (rc)
+                       RETURN(rc);
+
+               obd_obt_init(obd);
+               if (test_bit(LMD_FLG_SKIP_LFSCK, lsi->lsi_lmd->lmd_flags))
                        m->mdt_skip_lfsck = 1;
+               if (test_bit(LMD_FLG_NO_CREATE, lsi->lsi_lmd->lmd_flags))
+                       m->mdt_lut.lut_no_create = 1;
        }
 
-       /* DoM files get IO lock at open by default */
-       m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
+       /* Just try to get a DoM lock by default. Otherwise, having a group
+        * lock granted, it may get blocked for a long time.
+        */
+       m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN;
        /* DoM files are read at open and data is packed in the reply */
-       m->mdt_opts.mo_dom_read_open = 1;
+       m->mdt_dom_read_open = 1;
 
        m->mdt_squash.rsi_uid = 0;
        m->mdt_squash.rsi_gid = 0;
        INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
        spin_lock_init(&m->mdt_squash.rsi_lock);
        spin_lock_init(&m->mdt_lock);
-       m->mdt_enable_remote_dir = 1;
-       m->mdt_enable_striped_dir = 1;
+       m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_dir_migration = 1;
+       m->mdt_enable_dir_restripe = 0;
+       m->mdt_enable_dir_auto_split = 0;
+       m->mdt_enable_parallel_rename_dir = 1;
+       m->mdt_enable_parallel_rename_file = 1;
+       m->mdt_enable_parallel_rename_crossdir = 1;
+       m->mdt_enable_remote_dir = 1;
        m->mdt_enable_remote_dir_gid = 0;
-       m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_remote_rename = 1;
+       m->mdt_enable_striped_dir = 1;
+       m->mdt_enable_dmv_implicit_inherit = 1;
+       m->mdt_dir_restripe_nsonly = 1;
+       m->mdt_max_mod_rpcs_in_flight = OBD_MAX_RIF_DEFAULT;
 
        atomic_set(&m->mdt_mds_mds_conns, 0);
        atomic_set(&m->mdt_async_commit_count, 0);
+       atomic_set(&m->mdt_dmv_old_client_count, 0);
 
        m->mdt_lu_dev.ld_ops = &mdt_lu_ops;
        m->mdt_lu_dev.ld_obd = obd;
        /* Set this lu_device to obd for error handling purposes. */
        obd->obd_lu_dev = &m->mdt_lu_dev;
 
+       strncpy(m->mdt_job_xattr, XATTR_NAME_JOB_DEFAULT, XATTR_JOB_MAX_LEN);
+
        /* init the stack */
        rc = mdt_stack_init((struct lu_env *)env, m, cfg);
        if (rc) {
@@ -5466,13 +6346,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        /* failover is the default
         * FIXME: we do not failout mds0/mgs, which may cause some problems.
         * assumed whose ss_node_id == 0 XXX
-        * */
+        */
        obd->obd_replayable = 1;
        /* No connection accepted until configurations will finish */
        obd->obd_no_conn = 1;
 
        if (cfg->lcfg_bufcount > 4 && LUSTRE_CFG_BUFLEN(cfg, 4) > 0) {
                char *str = lustre_cfg_string(cfg, 4);
+
                if (strchr(str, 'n')) {
                        CWARN("%s: recovery disabled\n", mdt_obd_name(m));
                        obd->obd_replayable = 0;
@@ -5493,8 +6374,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                                              LDLM_NAMESPACE_SERVER,
                                              LDLM_NAMESPACE_GREEDY,
                                              LDLM_NS_TYPE_MDT);
-       if (m->mdt_namespace == NULL)
-               GOTO(err_fini_seq, rc = -ENOMEM);
+       if (IS_ERR(m->mdt_namespace)) {
+               rc = PTR_ERR(m->mdt_namespace);
+               CERROR("%s: unable to create server namespace: rc = %d\n",
+                      obd->obd_name, rc);
+               m->mdt_namespace = NULL;
+               GOTO(err_fini_seq, rc);
+       }
 
        m->mdt_namespace->ns_lvbp = m;
        m->mdt_namespace->ns_lvbo = &mdt_lvbo;
@@ -5510,24 +6396,24 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                GOTO(err_free_ns, rc);
 
        /* Amount of available space excluded from granting and reserved
-        * for metadata. It is in percentage and 50% is default value. */
-       tgd->tgd_reserved_pcnt = 50;
+        * for metadata. It is a percentage of the total MDT size.
+        */
+       tgd->tgd_reserved_pcnt = 10;
 
        if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits))
                m->mdt_brw_size = 1U << tgd->tgd_blockbits;
        else
                m->mdt_brw_size = ONE_MB_BRW_SIZE;
 
-       rc = mdt_fs_setup(env, m, obd, lsi);
-       if (rc)
-               GOTO(err_tgt, rc);
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
+               GOTO(err_tgt, rc = -ENOENT);
 
        fid.f_seq = FID_SEQ_LOCAL_NAME;
        fid.f_oid = 1;
        fid.f_ver = 0;
        rc = local_oid_storage_init(env, m->mdt_bottom, &fid, &m->mdt_los);
        if (rc != 0)
-               GOTO(err_fs_cleanup, rc);
+               GOTO(err_tgt, rc);
 
        rc = mdt_hsm_cdt_init(m);
        if (rc != 0) {
@@ -5555,13 +6441,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        else
                m->mdt_opts.mo_acl = 0;
 
+       m->mdt_enable_strict_som = 1;
+
        /* XXX: to support suppgid for ACL, we enable identity_upcall
-        * by default, otherwise, maybe got unexpected -EACCESS. */
+        * by default, otherwise, maybe got unexpected -EACCESS.
+        */
        if (m->mdt_opts.mo_acl)
                identity_upcall = MDT_IDENTITY_UPCALL_PATH;
 
        m->mdt_identity_cache = upcall_cache_init(mdt_obd_name(m),
                                                identity_upcall,
+                                               UC_IDCACHE_HASH_SIZE,
+                                               1200, /* entry expire: 20 mn */
+                                               30, /* acquire expire: 30 s */
+                                               true, /* acquire can replay */
                                                &mdt_identity_upcall_cache_ops);
        if (IS_ERR(m->mdt_identity_cache)) {
                rc = PTR_ERR(m->mdt_identity_cache);
@@ -5585,17 +6478,28 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
        ping_evictor_start();
 
-       /* recovery will be started upon mdt_prepare()
-        * when the whole stack is complete and ready
-        * to serve the requests */
+       /* recovery will be started upon mdt_prepare() when the whole stack is
+        * complete and ready to serve the requests
+        */
 
        /* Reduce the initial timeout on an MDS because it doesn't need such
         * a long timeout as an OST does. Adaptive timeouts will adjust this
-        * value appropriately. */
+        * value appropriately.
+        */
        if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
                ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
 
+       if (test_bit(LMD_FLG_LOCAL_RECOV, lsi->lsi_lmd->lmd_flags))
+               m->mdt_lut.lut_local_recovery = 1;
+
+       rc = mdt_restriper_start(m);
+       if (rc)
+               GOTO(err_ping_evictor, rc);
+
        RETURN(0);
+
+err_ping_evictor:
+       ping_evictor_stop();
 err_procfs:
        mdt_tunables_fini(m);
 err_recovery:
@@ -5606,8 +6510,6 @@ err_free_hsm:
 err_los_fini:
        local_oid_storage_fini(env, m->mdt_los);
        m->mdt_los = NULL;
-err_fs_cleanup:
-       mdt_fs_cleanup(env, m);
 err_tgt:
        /* keep recoverable clients */
        obd->obd_fail = 1;
@@ -5627,12 +6529,13 @@ err_fini_stack:
 err_lmi:
        if (lmi)
                server_put_mount(dev, true);
-       return(rc);
+       return rc;
 }
 
 /* For interoperability, the left element is old parameter, the right one
  * is the new version of the parameter, if some parameter is deprecated,
- * the new version should be set as NULL. */
+ * the new version should be set as NULL.
+ */
 static struct cfg_interop_param mdt_interop_param[] = {
        { "mdt.group_upcall",   NULL },
        { "mdt.quota_type",     NULL },
@@ -5645,13 +6548,14 @@ static struct cfg_interop_param mdt_interop_param[] = {
 
 /* used by MGS to process specific configurations */
 static int mdt_process_config(const struct lu_env *env,
-                              struct lu_device *d, struct lustre_cfg *cfg)
+                             struct lu_device *d, struct lustre_cfg *cfg)
 {
-        struct mdt_device *m = mdt_dev(d);
-        struct md_device *md_next = m->mdt_child;
-        struct lu_device *next = md2lu_dev(md_next);
-        int rc;
-        ENTRY;
+       struct mdt_device *m = mdt_dev(d);
+       struct md_device *md_next = m->mdt_child;
+       struct lu_device *next = md2lu_dev(md_next);
+       int rc;
+
+       ENTRY;
 
        switch (cfg->lcfg_command) {
        case LCFG_PARAM: {
@@ -5673,8 +6577,8 @@ static int mdt_process_config(const struct lu_env *env,
                if (ptr != NULL) {
                        if (ptr->new_param == NULL) {
                                rc = 0;
-                               CWARN("For interoperability, skip this %s."
-                                     " It is obsolete.\n", ptr->old_param);
+                               CWARN("For interoperability, skip this %s. It is obsolete.\n",
+                                     ptr->old_param);
                                break;
                        }
 
@@ -5712,12 +6616,12 @@ static int mdt_process_config(const struct lu_env *env,
                                                     cfg->lcfg_buflens));
                break;
        }
-        default:
-                /* others are passed further */
-                rc = next->ld_ops->ldo_process_config(env, next, cfg);
-                break;
-        }
-        RETURN(rc);
+       default:
+               /* others are passed further */
+               rc = next->ld_ops->ldo_process_config(env, next, cfg);
+               break;
+       }
+       RETURN(rc);
 }
 
 static struct lu_object *mdt_object_alloc(const struct lu_env *env,
@@ -5745,49 +6649,65 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                init_rwsem(&mo->mot_dom_sem);
                init_rwsem(&mo->mot_open_sem);
                atomic_set(&mo->mot_open_count, 0);
+               mo->mot_restripe_offset = 0;
+               INIT_LIST_HEAD(&mo->mot_restripe_linkage);
+               mo->mot_lsom_size = 0;
+               mo->mot_lsom_blocks = 0;
+               mo->mot_lsom_inited = false;
                RETURN(o);
        }
        RETURN(NULL);
 }
 
 static int mdt_object_init(const struct lu_env *env, struct lu_object *o,
-                           const struct lu_object_conf *unused)
+                          const struct lu_object_conf *unused)
 {
-        struct mdt_device *d = mdt_dev(o->lo_dev);
-        struct lu_device  *under;
-        struct lu_object  *below;
-        int                rc = 0;
-        ENTRY;
+       struct mdt_device *d = mdt_dev(o->lo_dev);
+       struct lu_device  *under;
+       struct lu_object  *below;
+       int                rc = 0;
+
+       ENTRY;
 
-        CDEBUG(D_INFO, "object init, fid = "DFID"\n",
-               PFID(lu_object_fid(o)));
+       CDEBUG(D_INFO, "object init, fid = "DFID"\n",
+              PFID(lu_object_fid(o)));
 
-        under = &d->mdt_child->md_lu_dev;
-        below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
-        if (below != NULL) {
-                lu_object_add(o, below);
-        } else
-                rc = -ENOMEM;
+       under = &d->mdt_child->md_lu_dev;
+       below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
+       if (below != NULL)
+               lu_object_add(o, below);
+       else
+               rc = -ENOMEM;
+
+       RETURN(rc);
+}
 
-        RETURN(rc);
+static void mdt_object_free_rcu(struct rcu_head *head)
+{
+       struct mdt_object *mo = container_of(head, struct mdt_object,
+                                            mot_header.loh_rcu);
+
+       kmem_cache_free(mdt_object_kmem, mo);
 }
 
 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
 {
-        struct mdt_object *mo = mdt_obj(o);
-        struct lu_object_header *h;
-        ENTRY;
+       struct mdt_object *mo = mdt_obj(o);
+       struct lu_object_header *h;
+
+       ENTRY;
 
-        h = o->lo_header;
-        CDEBUG(D_INFO, "object free, fid = "DFID"\n",
-               PFID(lu_object_fid(o)));
+       h = o->lo_header;
+       CDEBUG(D_INFO, "object free, fid = "DFID"\n",
+              PFID(lu_object_fid(o)));
 
        LASSERT(atomic_read(&mo->mot_open_count) == 0);
        LASSERT(atomic_read(&mo->mot_lease_count) == 0);
 
        lu_object_fini(o);
        lu_object_header_fini(h);
-       OBD_SLAB_FREE_PTR(mo, mdt_object_kmem);
+       OBD_FREE_PRE(mo, sizeof(*mo), "slab-freed");
+       call_rcu(&mo->mot_header.loh_rcu, mdt_object_free_rcu);
 
        EXIT;
 }
@@ -5831,7 +6751,8 @@ static int mdt_prepare(const struct lu_env *env,
 
        rc = lfsck_register_namespace(env, mdt->mdt_bottom, mdt->mdt_namespace);
        /* The LFSCK instance is registered just now, so it must be there when
-        * register the namespace to such instance. */
+        * register the namespace to such instance.
+        */
        LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc);
 
        if (mdt->mdt_seq_site.ss_node_id == 0) {
@@ -5857,22 +6778,22 @@ static int mdt_prepare(const struct lu_env *env,
 }
 
 const struct lu_device_operations mdt_lu_ops = {
-        .ldo_object_alloc   = mdt_object_alloc,
-        .ldo_process_config = mdt_process_config,
+       .ldo_object_alloc   = mdt_object_alloc,
+       .ldo_process_config = mdt_process_config,
        .ldo_prepare        = mdt_prepare,
 };
 
 static const struct lu_object_operations mdt_obj_ops = {
-        .loo_object_init    = mdt_object_init,
-        .loo_object_free    = mdt_object_free,
-        .loo_object_print   = mdt_object_print
+       .loo_object_init    = mdt_object_init,
+       .loo_object_free    = mdt_object_free,
+       .loo_object_print   = mdt_object_print
 };
 
 static int mdt_obd_set_info_async(const struct lu_env *env,
-                                  struct obd_export *exp,
-                                  __u32 keylen, void *key,
-                                  __u32 vallen, void *val,
-                                  struct ptlrpc_request_set *set)
+                                 struct obd_export *exp,
+                                 __u32 keylen, void *key,
+                                 __u32 vallen, void *val,
+                                 struct ptlrpc_request_set *set)
 {
        int rc;
 
@@ -5886,6 +6807,18 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
        RETURN(0);
 }
 
+static inline void mdt_enable_slc(struct mdt_device *mdt)
+{
+       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER)
+               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING;
+}
+
+static inline void mdt_disable_slc(struct mdt_device *mdt)
+{
+       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING)
+               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
+}
+
 /**
  * Match client and server connection feature flags.
  *
@@ -5917,6 +6850,7 @@ static int mdt_connect_internal(const struct lu_env *env,
                                struct obd_connect_data *data, bool reconnect)
 {
        const char *obd_name = mdt_obd_name(mdt);
+
        LASSERT(data != NULL);
 
        data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
@@ -5941,11 +6875,7 @@ static int mdt_connect_internal(const struct lu_env *env,
                data->ocd_brw_size = min(data->ocd_brw_size,
                                         mdt->mdt_brw_size);
                if (data->ocd_brw_size == 0) {
-                       CERROR("%s: cli %s/%p ocd_connect_flags: %#llx "
-                              "ocd_version: %x ocd_grant: %d ocd_index: %u "
-                              "ocd_brw_size unexpectedly zero, network data "
-                              "corruption? Refusing to connect this client\n",
-                              obd_name, exp->exp_client_uuid.uuid,
+                       CERROR("%s: cli %s/%p ocd_connect_flags: %#llx ocd_version: %x ocd_grant: %d ocd_index: %u ocd_brw_size unexpectedly zero, network data corruption? Refusing to connect this client\n", obd_name, exp->exp_client_uuid.uuid,
                               exp, data->ocd_connect_flags, data->ocd_version,
                               data->ocd_grant, data->ocd_index);
                        return -EPROTO;
@@ -5959,7 +6889,8 @@ static int mdt_connect_internal(const struct lu_env *env,
                exp->exp_target_data.ted_pagebits = data->ocd_grant_blkbits;
                data->ocd_grant_blkbits  = mdt->mdt_lut.lut_tgd.tgd_blockbits;
                /* ddp_inodespace may not be power-of-two value, eg. for ldiskfs
-                * it's LDISKFS_DIR_REC_LEN(20) = 28. */
+                * it's LDISKFS_DIR_REC_LEN(20) = 28.
+                */
                data->ocd_grant_inobits = fls(ddp->ddp_inodespace - 1);
                /* ocd_grant_tax_kb is in 1K byte blocks */
                data->ocd_grant_tax_kb = ddp->ddp_extent_tax >> 10;
@@ -5968,7 +6899,8 @@ static int mdt_connect_internal(const struct lu_env *env,
 
        /* Save connect_data we have so far because tgt_grant_connect()
         * uses it to calculate grant, and we want to save the client
-        * version before it is overwritten by LUSTRE_VERSION_CODE. */
+        * version before it is overwritten by LUSTRE_VERSION_CODE.
+        */
        exp->exp_connect_data = *data;
        if (OCD_HAS_FLAG(data, GRANT))
                tgt_grant_connect(env, exp, data, !reconnect);
@@ -5980,7 +6912,8 @@ static int mdt_connect_internal(const struct lu_env *env,
         * exp_connect_data.ocd_connect_flags in this case, since
         * tgt_client_new() needs to know if this is a lightweight
         * connection, and it is safe to expose this flag before
-        * connection processing completes. */
+        * connection processing completes.
+        */
        if (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) {
                spin_lock(&exp->exp_lock);
                *exp_connect_flags_ptr(exp) |= OBD_CONNECT_LIGHTWEIGHT;
@@ -6011,9 +6944,18 @@ static int mdt_connect_internal(const struct lu_env *env,
         * exp_connect_data.ocd_connect_flags in this case, since
         * tgt_client_new() needs to know if this is client supports
         * multiple modify RPCs, and it is safe to expose this flag before
-        * connection processing completes. */
+        * connection processing completes.
+        */
        if (data->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
-               data->ocd_maxmodrpcs = max_mod_rpcs_per_client;
+               if (mdt_max_mod_rpcs_changed(mdt))
+                       /* The new mdt.*.max_mod_rpcs_in_flight parameter
+                        * has not changed since initialization, but the
+                        * deprecated module parameter was changed,
+                        * so use that instead.
+                        */
+                       data->ocd_maxmodrpcs = max_mod_rpcs_per_client;
+               else
+                       data->ocd_maxmodrpcs = mdt->mdt_max_mod_rpcs_in_flight;
                spin_lock(&exp->exp_lock);
                *exp_connect_flags_ptr(exp) |= OBD_CONNECT_MULTIMODRPCS;
                spin_unlock(&exp->exp_lock);
@@ -6022,30 +6964,38 @@ static int mdt_connect_internal(const struct lu_env *env,
        if (OCD_HAS_FLAG(data, CKSUM)) {
                __u32 cksum_types = data->ocd_cksum_types;
 
-               /* The client set in ocd_cksum_types the checksum types it
-                * supports. We have to mask off the algorithms that we don't
-                * support */
-               data->ocd_cksum_types &=
-                       obd_cksum_types_supported_server(obd_name);
+               tgt_mask_cksum_types(&mdt->mdt_lut, &data->ocd_cksum_types);
 
                if (unlikely(data->ocd_cksum_types == 0)) {
-                       CERROR("%s: Connect with checksum support but no "
-                              "ocd_cksum_types is set\n",
+                       CERROR("%s: Connect with checksum support but no ocd_cksum_types is set\n",
                               exp->exp_obd->obd_name);
                        RETURN(-EPROTO);
                }
 
-               CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
-                      "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp),
+               CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return %x\n",
+                      exp->exp_obd->obd_name, obd_export_nid2str(exp),
                       cksum_types, data->ocd_cksum_types);
        } else {
-               /* This client does not support OBD_CONNECT_CKSUM
-                * fall back to CRC32 */
-               CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
-                      "OBD_CONNECT_CKSUM, CRC32 will be used\n",
+               /* Client not support OBD_CONNECT_CKSUM? fall back to CRC32 */
+               CDEBUG(D_RPCTRACE, "%s: cli %s does not support OBD_CONNECT_CKSUM, CRC32 will be used\n",
                       exp->exp_obd->obd_name, obd_export_nid2str(exp));
        }
 
+       if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
+               atomic_inc(&mdt->mdt_mds_mds_conns);
+               mdt_enable_slc(mdt);
+       }
+
+       if (!mdt->mdt_lut.lut_dt_conf.ddp_has_lseek_data_hole)
+               data->ocd_connect_flags2 &= ~OBD_CONNECT2_LSEEK;
+
+       if (!OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) &&
+           !OCD_HAS_FLAG2(data, DMV_IMP_INHERIT)) {
+               atomic_inc(&mdt->mdt_dmv_old_client_count);
+               mdt->mdt_enable_dmv_implicit_inherit = 0;
+       }
+
        return 0;
 }
 
@@ -6055,6 +7005,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env,
 {
        struct lu_context ses;
        int rc;
+
        ENTRY;
 
        rc = lu_context_init(&ses, LCT_SERVER_SESSION);
@@ -6065,6 +7016,13 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env,
        lu_context_enter(&ses);
 
        mdt_ucred(info)->uc_valid = UCRED_OLD;
+       /* do not let rbac interfere with dirty flag internal system event */
+       mdt_ucred(info)->uc_rbac_file_perms = 1;
+       mdt_ucred(info)->uc_rbac_dne_ops = 1;
+       mdt_ucred(info)->uc_rbac_quota_ops = 1;
+       mdt_ucred(info)->uc_rbac_byfid_ops = 1;
+       mdt_ucred(info)->uc_rbac_chlg_ops = 1;
+       mdt_ucred(info)->uc_rbac_fscrypt_admin = 1;
        rc = mdt_add_dirty_flag(info, mfd->mfd_object, &info->mti_attr);
 
        lu_context_exit(&ses);
@@ -6076,7 +7034,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env,
 
 static int mdt_export_cleanup(struct obd_export *exp)
 {
-       struct list_head         closing_list;
+       LIST_HEAD(closing_list);
        struct mdt_export_data  *med = &exp->exp_mdt_data;
        struct obd_device       *obd = exp->exp_obd;
        struct mdt_device       *mdt;
@@ -6084,39 +7042,40 @@ static int mdt_export_cleanup(struct obd_export *exp)
        struct lu_env            env;
        struct mdt_file_data    *mfd, *n;
        int rc = 0;
+
        ENTRY;
 
-       INIT_LIST_HEAD(&closing_list);
        spin_lock(&med->med_open_lock);
        while (!list_empty(&med->med_open_head)) {
                struct list_head *tmp = med->med_open_head.next;
+
                mfd = list_entry(tmp, struct mdt_file_data, mfd_list);
 
                /* Remove mfd handle so it can't be found again.
-                * We are consuming the mfd_list reference here. */
+                * We are consuming the mfd_list reference here.
+                */
                class_handle_unhash(&mfd->mfd_open_handle);
                list_move_tail(&mfd->mfd_list, &closing_list);
        }
        spin_unlock(&med->med_open_lock);
-        mdt = mdt_dev(obd->obd_lu_dev);
-        LASSERT(mdt != NULL);
+       mdt = mdt_dev(obd->obd_lu_dev);
+       LASSERT(mdt != NULL);
 
-        rc = lu_env_init(&env, LCT_MD_THREAD);
-        if (rc)
-                RETURN(rc);
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc)
+               RETURN(rc);
 
-        info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
-        LASSERT(info != NULL);
-        memset(info, 0, sizeof *info);
-        info->mti_env = &env;
-        info->mti_mdt = mdt;
-        info->mti_exp = exp;
+       info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+       LASSERT(info != NULL);
+       memset(info, 0, sizeof(*info));
+       info->mti_env = &env;
+       info->mti_mdt = mdt;
+       info->mti_exp = exp;
 
        if (!list_empty(&closing_list)) {
                struct md_attr *ma = &info->mti_attr;
 
-               /* Close any open files (which may also cause orphan
-                * unlinking). */
+               /* Close any open files (which may cause orphan unlinking). */
                list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
                        list_del_init(&mfd->mfd_list);
                        ma->ma_need = ma->ma_valid = 0;
@@ -6139,7 +7098,8 @@ static int mdt_export_cleanup(struct obd_export *exp)
                                rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
 
                        /* Don't unlink orphan on failover umount, LU-184 */
-                       if (exp->exp_flags & OBD_OPT_FAILOVER) {
+                       if (exp->exp_flags & OBD_OPT_FAILOVER ||
+                           exp->exp_obd->obd_stopping) {
                                ma->ma_valid = MA_FLAGS;
                                ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
                        }
@@ -6152,25 +7112,15 @@ static int mdt_export_cleanup(struct obd_export *exp)
        /* Do not erase record for recoverable client. */
        if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
                tgt_client_del(&env, exp);
-        lu_env_fini(&env);
-
-        RETURN(rc);
-}
-
-static inline void mdt_enable_slc(struct mdt_device *mdt)
-{
-       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER)
-               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING;
-}
+       lu_env_fini(&env);
 
-static inline void mdt_disable_slc(struct mdt_device *mdt)
-{
-       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING)
-               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
+       RETURN(rc);
 }
 
 static int mdt_obd_disconnect(struct obd_export *exp)
 {
+       struct obd_connect_data *data = &exp->exp_connect_data;
+       struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
        int rc;
 
        ENTRY;
@@ -6178,16 +7128,14 @@ static int mdt_obd_disconnect(struct obd_export *exp)
        LASSERT(exp);
        class_export_get(exp);
 
-       if (!(exp->exp_flags & OBD_OPT_FORCE))
-               tgt_grant_sanity_check(exp->exp_obd, __func__);
+       if (OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) &&
+           atomic_dec_and_test(&mdt->mdt_mds_mds_conns))
+               mdt_disable_slc(mdt);
 
-       if ((exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) &&
-           !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)) {
-               struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
-
-               if (atomic_dec_and_test(&mdt->mdt_mds_mds_conns))
-                       mdt_disable_slc(mdt);
-       }
+       if (!OCD_HAS_FLAG(data, MDS_MDS) && !OCD_HAS_FLAG(data, LIGHTWEIGHT) &&
+           !OCD_HAS_FLAG2(data, DMV_IMP_INHERIT) &&
+           atomic_dec_and_test(&mdt->mdt_dmv_old_client_count))
+               mdt->mdt_enable_dmv_implicit_inherit = 1;
 
        rc = server_disconnect_export(exp);
        if (rc != 0)
@@ -6195,6 +7143,9 @@ static int mdt_obd_disconnect(struct obd_export *exp)
 
        tgt_grant_discard(exp);
 
+       if (!(exp->exp_flags & OBD_OPT_FORCE))
+               tgt_grant_sanity_check(exp->exp_obd, __func__);
+
        rc = mdt_export_cleanup(exp);
        nodemap_del_member(exp);
        class_export_put(exp);
@@ -6212,7 +7163,8 @@ static int mdt_obd_connect(const struct lu_env *env,
        struct lustre_handle    conn = { 0 };
        struct mdt_device       *mdt;
        int                      rc;
-       lnet_nid_t              *client_nid = localdata;
+       struct lnet_nid         *client_nid = localdata;
+
        ENTRY;
 
        LASSERT(env != NULL);
@@ -6223,12 +7175,6 @@ static int mdt_obd_connect(const struct lu_env *env,
 
        mdt = mdt_dev(obd->obd_lu_dev);
 
-       if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
-           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
-               atomic_inc(&mdt->mdt_mds_mds_conns);
-               mdt_enable_slc(mdt);
-       }
-
        /*
         * first, check whether the stack is ready to handle requests
         * XXX: probably not very appropriate method is used now
@@ -6252,7 +7198,7 @@ static int mdt_obd_connect(const struct lu_env *env,
        lexp = class_conn2export(&conn);
        LASSERT(lexp != NULL);
 
-       rc = nodemap_add_member(*client_nid, lexp);
+       rc = nodemap_add_member(client_nid, lexp);
        if (rc != 0 && rc != -EEXIST)
                GOTO(out, rc);
 
@@ -6261,7 +7207,7 @@ static int mdt_obd_connect(const struct lu_env *env,
                struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
 
                LASSERT(lcd);
-               memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
+               memcpy(lcd->lcd_uuid, cluuid, sizeof(lcd->lcd_uuid));
                rc = tgt_client_new(env, lexp);
                if (rc == 0)
                        mdt_export_stats_init(obd, lexp, localdata);
@@ -6274,7 +7220,8 @@ out:
        } else {
                *exp = lexp;
                /* Because we do not want this export to be evicted by pinger,
-                * let's not add this export to the timed chain list. */
+                * let's not add this export to the timed chain list.
+                */
                if (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) {
                        spin_lock(&lexp->exp_obd->obd_dev_lock);
                        list_del_init(&lexp->exp_obd_chain_timed);
@@ -6291,14 +7238,15 @@ static int mdt_obd_reconnect(const struct lu_env *env,
                             struct obd_connect_data *data,
                             void *localdata)
 {
-       lnet_nid_t             *client_nid = localdata;
-       int                     rc;
+       struct lnet_nid *client_nid = localdata;
+       int rc;
+
        ENTRY;
 
        if (exp == NULL || obd == NULL || cluuid == NULL)
                RETURN(-EINVAL);
 
-       rc = nodemap_add_member(*client_nid, exp);
+       rc = nodemap_add_member(client_nid, exp);
        if (rc != 0 && rc != -EEXIST)
                RETURN(rc);
 
@@ -6317,6 +7265,7 @@ static int mdt_init_export(struct obd_export *exp)
 {
        struct mdt_export_data *med = &exp->exp_mdt_data;
        int                     rc;
+
        ENTRY;
 
        INIT_LIST_HEAD(&med->med_open_head);
@@ -6330,20 +7279,20 @@ static int mdt_init_export(struct obd_export *exp)
        if (exp->exp_used_slots == NULL)
                RETURN(-ENOMEM);
 
-        /* self-export doesn't need client data and ldlm initialization */
-        if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
-                                     &exp->exp_client_uuid)))
-                RETURN(0);
+       /* self-export doesn't need client data and ldlm initialization */
+       if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
+                                    &exp->exp_client_uuid)))
+               RETURN(0);
 
-        rc = tgt_client_alloc(exp);
-        if (rc)
+       rc = tgt_client_alloc(exp);
+       if (rc)
                GOTO(err, rc);
 
        rc = ldlm_init_export(exp);
        if (rc)
                GOTO(err_free, rc);
 
-        RETURN(rc);
+       RETURN(rc);
 
 err_free:
        tgt_client_free(exp);
@@ -6359,18 +7308,19 @@ err:
 
 static int mdt_destroy_export(struct obd_export *exp)
 {
-        ENTRY;
+       ENTRY;
 
-        target_destroy_export(exp);
+       target_destroy_export(exp);
        if (exp->exp_used_slots)
                OBD_FREE(exp->exp_used_slots,
                         BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
 
-        /* destroy can be called from failed obd_setup, so
-         * checking uuid is safer than obd_self_export */
-        if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
-                                     &exp->exp_client_uuid)))
-                RETURN(0);
+       /* destroy can be called from failed obd_setup, so
+        * checking uuid is safer than obd_self_export
+        */
+       if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
+                                    &exp->exp_client_uuid)))
+               RETURN(0);
 
        ldlm_destroy_export(exp);
        tgt_client_free(exp);
@@ -6384,7 +7334,7 @@ static int mdt_destroy_export(struct obd_export *exp)
         */
        tgt_grant_discard(exp);
        if (exp_connect_flags(exp) & OBD_CONNECT_GRANT)
-               exp->exp_obd->u.obt.obt_lut->lut_tgd.tgd_tot_granted_clients--;
+               obd2obt(exp->exp_obd)->obt_lut->lut_tgd.tgd_tot_granted_clients--;
 
        if (!(exp->exp_flags & OBD_OPT_FORCE))
                tgt_grant_sanity_check(exp->exp_obd, __func__);
@@ -6441,22 +7391,28 @@ static int mdt_path_current(struct mdt_thread_info *info,
                            struct getinfo_fid2path *fp,
                            struct lu_fid *root_fid)
 {
-       struct mdt_device       *mdt = info->mti_mdt;
-       struct mdt_object       *mdt_obj;
-       struct link_ea_header   *leh;
-       struct link_ea_entry    *lee;
-       struct lu_name          *tmpname = &info->mti_name;
-       struct lu_fid           *tmpfid = &info->mti_tmp_fid1;
-       struct lu_buf           *buf = &info->mti_big_buf;
-       char                    *ptr;
-       int                     reclen;
-       struct linkea_data      ldata = { NULL };
-       int                     rc = 0;
-       bool                    first = true;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lu_name *tmpname = &info->mti_name;
+       struct lu_fid *tmpfid = &info->mti_tmp_fid1;
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct linkea_data ldata = { NULL };
+       bool first = true;
+       struct mdt_object *mdt_obj;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       bool worthchecking = true;
+       bool needsfid = false;
+       bool supported = false;
+       int isenc = -1;
+       char *ptr;
+       int reclen;
+       int rc = 0;
+
        ENTRY;
 
        /* temp buffer for path element, the buffer will be finally freed
-        * in mdt_thread_info_fini */
+        * in mdt_thread_info_fini
+        */
        buf = lu_buf_check_and_alloc(buf, PATH_MAX);
        if (buf->lb_buf == NULL)
                RETURN(-ENOMEM);
@@ -6468,8 +7424,6 @@ static int mdt_path_current(struct mdt_thread_info *info,
        *tmpfid = fp->gf_fid = *mdt_object_fid(obj);
 
        while (!lu_fid_eq(root_fid, &fp->gf_fid)) {
-               struct lu_buf           lmv_buf;
-
                if (!lu_fid_eq(root_fid, &mdt->mdt_md_root_fid) &&
                    lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid))
                        GOTO(out, rc = -ENOENT);
@@ -6493,6 +7447,37 @@ static int mdt_path_current(struct mdt_thread_info *info,
                        GOTO(remote_out, rc = -EREMOTE);
                }
 
+               if (worthchecking) {
+                       /* need to know if FID being looked up is encrypted */
+                       struct lu_attr la = { 0 };
+                       struct dt_object *dt = mdt_obj2dt(mdt_obj);
+
+                       if (dt && dt->do_ops && dt->do_ops->do_attr_get)
+                               dt_attr_get(info->mti_env, dt, &la);
+                       if (la.la_valid & LA_FLAGS &&
+                           la.la_flags & LUSTRE_ENCRYPT_FL) {
+                               if (!supported && mdt_info_req(info) &&
+                                   !exp_connect_encrypt_fid2path(
+                                           mdt_info_req(info)->rq_export)) {
+                                       /* client does not support fid2path
+                                        * for encrypted files
+                                        */
+                                       mdt_object_put(info->mti_env, mdt_obj);
+                                       GOTO(out, rc = -ENODATA);
+                               } else {
+                                       supported = true;
+                               }
+                               needsfid = true;
+                               if (isenc == -1)
+                                       isenc = 1;
+                       } else {
+                               worthchecking = false;
+                               needsfid = false;
+                               if (isenc == -1)
+                                       isenc = 0;
+                       }
+               }
+
                rc = mdt_links_read(info, mdt_obj, &ldata);
                if (rc != 0) {
                        mdt_object_put(info->mti_env, mdt_obj);
@@ -6503,9 +7488,11 @@ static int mdt_path_current(struct mdt_thread_info *info,
                lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
                linkea_entry_unpack(lee, &reclen, tmpname, tmpfid);
                /* If set, use link #linkno for path lookup, otherwise use
-                  link #0.  Only do this for the final path element. */
+                *  link #0.  Only do this for the final path ement.
+                */
                if (first && fp->gf_linkno < leh->leh_reccount) {
                        int count;
+
                        for (count = 0; count < fp->gf_linkno; count++) {
                                lee = (struct link_ea_entry *)
                                     ((char *)lee + reclen);
@@ -6517,43 +7504,50 @@ static int mdt_path_current(struct mdt_thread_info *info,
                                fp->gf_linkno++;
                }
 
-               lmv_buf.lb_buf = info->mti_xattr_buf;
-               lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
                /* Check if it is slave stripes */
-               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
-                                 &lmv_buf, XATTR_NAME_LMV);
+               rc = mdt_is_dir_stripe(info, mdt_obj);
                mdt_object_put(info->mti_env, mdt_obj);
-               if (rc > 0) {
-                       union lmv_mds_md *lmm = lmv_buf.lb_buf;
-
-                       /* For slave stripes, get its master */
-                       if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) {
-                               fp->gf_fid = *tmpfid;
-                               continue;
-                       }
-               } else if (rc < 0 && rc != -ENODATA) {
+               if (rc < 0)
                        GOTO(out, rc);
+               if (rc == 1) {
+                       fp->gf_fid = *tmpfid;
+                       continue;
                }
 
-               rc = 0;
-
                /* Pack the name in the end of the buffer */
                ptr -= tmpname->ln_namelen;
                if (ptr - 1 <= fp->gf_u.gf_path)
-                       GOTO(out, rc = -EOVERFLOW);
+                       GOTO(out, rc = -ENAMETOOLONG);
                strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
+               if (needsfid) {
+                       /* Pack FID before file name, so that client can build
+                        * encoded/digested form.
+                        */
+                       char fidstr[FID_LEN + 1];
+
+                       snprintf(fidstr, sizeof(fidstr), DFID,
+                                PFID(&fp->gf_fid));
+                       ptr -= strlen(fidstr);
+                       if (ptr - 1 <= fp->gf_u.gf_path)
+                               GOTO(out, rc = -ENAMETOOLONG);
+                       strncpy(ptr, fidstr, strlen(fidstr));
+               }
                *(--ptr) = '/';
 
-               /* keep the last resolved fid to the client, so the
-                * client will build the left path on another MDT for
-                * remote object */
+               /* keep the last resolved fid to the client, so the client will
+                * build the left path on another MDT for remote object
+                */
                fp->gf_fid = *tmpfid;
 
                first = false;
        }
 
+       /* non-zero will be treated as an error */
+       rc = 0;
+
 remote_out:
-       ptr++; /* skip leading / */
+       if (isenc != 1)
+               ptr++; /* skip leading / unless this is an encrypted file */
        memmove(fp->gf_u.gf_path, ptr,
                fp->gf_u.gf_path + fp->gf_pathlen - ptr);
 
@@ -6571,7 +7565,7 @@ out:
  * \param[in]     info  Per-thread common data shared by mdt level handlers.
  * \param[in]     obj   Object to do path lookup of
  * \param[in,out] fp    User-provided struct for arguments and to store path
- *                     information
+ *                     information
  *
  * \retval 0 Lookup successful, path information stored in fp
  * \retval negative errno if there was a problem
@@ -6582,6 +7576,7 @@ static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj,
        struct mdt_device       *mdt = info->mti_mdt;
        int                     tries = 3;
        int                     rc = -EAGAIN;
+
        ENTRY;
 
        if (fp->gf_pathlen < 3)
@@ -6612,7 +7607,7 @@ static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj,
  *
  * \param[in]     info  Per-thread common data shared by mdt level handlers.
  * \param[in,out] fp    User-provided struct for arguments and to store path
- *                     information
+ *                     information
  *
  * \retval 0 Lookup successful, path information and recno stored in fp
  * \retval -ENOENT, object does not exist
@@ -6625,6 +7620,7 @@ static int mdt_fid2path(struct mdt_thread_info *info,
        struct mdt_device *mdt = info->mti_mdt;
        struct mdt_object *obj;
        int    rc;
+
        ENTRY;
 
        CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n",
@@ -6634,12 +7630,24 @@ static int mdt_fid2path(struct mdt_thread_info *info,
                RETURN(-EINVAL);
 
        if (!fid_is_namespace_visible(&fp->gf_fid)) {
-               CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx"
-                      ", or f_oid != 0, or f_ver == 0\n", mdt_obd_name(mdt),
+               CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx, or f_oid != 0, or f_ver == 0\n",
+                      mdt_obd_name(mdt),
                       PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
                RETURN(-EINVAL);
        }
 
+       /* return error if client-provided root fid is not the one stored in
+        * the export
+        */
+       if (root_fid && !fid_is_zero(&info->mti_exp->exp_root_fid) &&
+           !lu_fid_eq(root_fid, &info->mti_exp->exp_root_fid)) {
+               CDEBUG(D_INFO,
+                      "%s: root fid from client "DFID" but "DFID" stored in export\n",
+                      mdt_obd_name(mdt), PFID(root_fid),
+                      PFID(&info->mti_exp->exp_root_fid));
+               RETURN(-EXDEV);
+       }
+
        obj = mdt_object_find(info->mti_env, mdt, &fp->gf_fid);
        if (IS_ERR(obj)) {
                rc = PTR_ERR(obj);
@@ -6680,21 +7688,21 @@ static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, int keylen,
        struct lu_fid *root_fid = NULL;
        int rc = 0;
 
-       fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
+       fpin = key + round_up(sizeof(KEY_FID2PATH), 8);
        fpout = val;
 
-       if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+       if (req_capsule_req_need_swab(info->mti_pill))
                lustre_swab_fid2path(fpin);
 
        memcpy(fpout, fpin, sizeof(*fpin));
        if (fpout->gf_pathlen != vallen - sizeof(*fpin))
                RETURN(-EINVAL);
 
-       if (keylen >= cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*fpin) +
+       if (keylen >= round_up(sizeof(KEY_FID2PATH), 8) + sizeof(*fpin) +
                      sizeof(struct lu_fid)) {
                /* client sent its root FID, which is normally fileset FID */
                root_fid = fpin->gf_u.gf_root_fid;
-               if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+               if (req_capsule_req_need_swab(info->mti_pill))
                        lustre_swab_lu_fid(root_fid);
 
                if (root_fid != NULL && !fid_is_sane(root_fid))
@@ -6762,6 +7770,7 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
        struct mdt_object *obj;
        struct mdt_lock_handle  *lh;
        int rc;
+
        ENTRY;
 
        if (data->ioc_inlbuf1 == NULL || data->ioc_inllen1 != sizeof(*fid) ||
@@ -6775,12 +7784,10 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
 
        CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
 
-        lh = &mti->mti_lh[MDT_LH_PARENT];
-        mdt_lock_reg_init(lh, LCK_CR);
-
-        obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE);
-        if (IS_ERR(obj))
-                RETURN(PTR_ERR(obj));
+       lh = &mti->mti_lh[MDT_LH_PARENT];
+       obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE, LCK_CR);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
 
        if (mdt_object_remote(obj)) {
                rc = -EREMOTE;
@@ -6805,59 +7812,76 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg)
 {
-        struct lu_env      env;
-        struct obd_device *obd = exp->exp_obd;
-        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
-        struct dt_device  *dt = mdt->mdt_bottom;
-        int rc;
+       struct obd_device *obd = exp->exp_obd;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       struct dt_device *dt = mdt->mdt_bottom;
+       struct obd_ioctl_data *data;
+       struct lu_env env;
+       int rc;
+
+       ENTRY;
+       CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
+              obd->obd_name, cmd, len, karg, uarg);
 
-        ENTRY;
-        CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
-        rc = lu_env_init(&env, LCT_MD_THREAD);
-        if (rc)
-                RETURN(rc);
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc)
+               RETURN(rc);
 
+       /* handle commands that don't use @karg first */
        switch (cmd) {
        case OBD_IOC_SYNC:
                rc = mdt_device_sync(&env, mdt);
-               break;
+               GOTO(out, rc);
        case OBD_IOC_SET_READONLY:
                rc = dt_sync(&env, dt);
                if (rc == 0)
                        rc = dt_ro(&env, dt);
-               break;
-       case OBD_IOC_ABORT_RECOVERY:
-               CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
-               obd->obd_abort_recovery = 1;
-               target_stop_recovery_thread(obd);
+               GOTO(out, rc);
+       }
+
+       if (unlikely(karg == NULL)) {
+               OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", rc = -EINVAL);
+               GOTO(out, rc);
+       }
+       data = karg;
+
+       switch (cmd) {
+       case OBD_IOC_ABORT_RECOVERY: {
+               if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) {
+                       LCONSOLE_WARN("%s: Aborting MDT recovery\n",
+                                     obd->obd_name);
+                       obd->obd_abort_mdt_recovery = 1;
+                       wake_up(&obd->obd_next_transno_waitq);
+               } else { /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */
+                       /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */
+                       LCONSOLE_WARN("%s: Aborting client recovery\n",
+                                     obd->obd_name);
+                       obd->obd_abort_recovery = 1;
+                       target_stop_recovery_thread(obd);
+               }
                rc = 0;
                break;
-        case OBD_IOC_CHANGELOG_REG:
-        case OBD_IOC_CHANGELOG_DEREG:
-        case OBD_IOC_CHANGELOG_CLEAR:
-               rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env,
-                                                          mdt->mdt_child,
+       }
+       case OBD_IOC_CHANGELOG_REG:
+       case OBD_IOC_CHANGELOG_DEREG:
+       case OBD_IOC_CHANGELOG_CLEAR:
+       case OBD_IOC_LLOG_PRINT:
+       case OBD_IOC_LLOG_CANCEL:
+               rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env, mdt->mdt_child,
                                                           cmd, len, karg);
-                break;
+               break;
        case OBD_IOC_START_LFSCK: {
                struct md_device *next = mdt->mdt_child;
-               struct obd_ioctl_data *data = karg;
                struct lfsck_start_param lsp;
 
-               if (unlikely(data == NULL)) {
-                       rc = -EINVAL;
-                       break;
-               }
-
                lsp.lsp_start = (struct lfsck_start *)(data->ioc_inlbuf1);
                lsp.lsp_index_valid = 0;
                rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, &lsp);
                break;
        }
        case OBD_IOC_STOP_LFSCK: {
-               struct md_device        *next = mdt->mdt_child;
-               struct obd_ioctl_data   *data = karg;
-               struct lfsck_stop        stop;
+               struct md_device *next = mdt->mdt_child;
+               struct lfsck_stop stop;
 
                stop.ls_status = LS_STOPPED;
                /* Old lfsck utils may pass NULL @stop. */
@@ -6871,24 +7895,24 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                break;
        }
        case OBD_IOC_QUERY_LFSCK: {
-               struct md_device        *next = mdt->mdt_child;
-               struct obd_ioctl_data   *data = karg;
+               struct md_device *next = mdt->mdt_child;
 
                rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0,
                                                 data->ioc_inlbuf1);
                break;
        }
-        case OBD_IOC_GET_OBJ_VERSION: {
-                struct mdt_thread_info *mti;
-                mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
-                memset(mti, 0, sizeof *mti);
-                mti->mti_env = &env;
-                mti->mti_mdt = mdt;
-                mti->mti_exp = exp;
-
-                rc = mdt_ioc_version_get(mti, karg);
-                break;
-        }
+       case OBD_IOC_GET_OBJ_VERSION: {
+               struct mdt_thread_info *mti;
+
+               mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+               memset(mti, 0, sizeof(*mti));
+               mti->mti_env = &env;
+               mti->mti_mdt = mdt;
+               mti->mti_exp = exp;
+
+               rc = mdt_ioc_version_get(mti, karg);
+               break;
+       }
        case OBD_IOC_CATLOGLIST: {
                struct mdt_thread_info *mti;
 
@@ -6897,21 +7921,21 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                rc = llog_catalog_list(&env, mdt->mdt_bottom, 0, karg,
                                       &mti->mti_tmp_fid1);
                break;
-        }
+       }
        default:
-               rc = -EOPNOTSUPP;
-               CERROR("%s: Not supported cmd = %d, rc = %d\n",
-                       mdt_obd_name(mdt), cmd, rc);
+               rc = OBD_IOC_ERROR(obd->obd_name, cmd, "unrecognized", -ENOTTY);
+               break;
        }
-
-        lu_env_fini(&env);
-        RETURN(rc);
+out:
+       lu_env_fini(&env);
+       RETURN(rc);
 }
 
 static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
 {
        struct lu_device *ld = md2lu_dev(mdt->mdt_child);
        int rc;
+
        ENTRY;
 
        if (!mdt->mdt_skip_lfsck && !mdt->mdt_bottom->dd_rdonly) {
@@ -6933,46 +7957,48 @@ static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
 
 static int mdt_obd_postrecov(struct obd_device *obd)
 {
-        struct lu_env env;
-        int rc;
+       struct lu_env env;
+       int rc;
 
-        rc = lu_env_init(&env, LCT_MD_THREAD);
-        if (rc)
-                RETURN(rc);
-        rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
-        lu_env_fini(&env);
-        return rc;
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc)
+               RETURN(rc);
+       rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
+       lu_env_fini(&env);
+       return rc;
 }
 
 static const struct obd_ops mdt_obd_device_ops = {
-        .o_owner          = THIS_MODULE,
-        .o_set_info_async = mdt_obd_set_info_async,
-        .o_connect        = mdt_obd_connect,
-        .o_reconnect      = mdt_obd_reconnect,
-        .o_disconnect     = mdt_obd_disconnect,
-        .o_init_export    = mdt_init_export,
-        .o_destroy_export = mdt_destroy_export,
-        .o_iocontrol      = mdt_iocontrol,
-        .o_postrecov      = mdt_obd_postrecov,
+       .o_owner          = THIS_MODULE,
+       .o_set_info_async = mdt_obd_set_info_async,
+       .o_connect        = mdt_obd_connect,
+       .o_reconnect      = mdt_obd_reconnect,
+       .o_disconnect     = mdt_obd_disconnect,
+       .o_init_export    = mdt_init_export,
+       .o_destroy_export = mdt_destroy_export,
+       .o_iocontrol      = mdt_iocontrol,
+       .o_postrecov      = mdt_obd_postrecov,
        /* Data-on-MDT IO methods */
        .o_preprw         = mdt_obd_preprw,
        .o_commitrw       = mdt_obd_commitrw,
 };
 
-static struct lu_devicemdt_device_fini(const struct lu_env *env,
-                                         struct lu_device *d)
+static struct lu_device *mdt_device_fini(const struct lu_env *env,
+                                        struct lu_device *d)
 {
-        struct mdt_device *m = mdt_dev(d);
-        ENTRY;
+       struct mdt_device *m = mdt_dev(d);
 
-        mdt_fini(env, m);
-        RETURN(NULL);
+       ENTRY;
+
+       mdt_fini(env, m);
+       RETURN(NULL);
 }
 
 static struct lu_device *mdt_device_free(const struct lu_env *env,
-                                         struct lu_device *d)
+                                        struct lu_device *d)
 {
        struct mdt_device *m = mdt_dev(d);
+
        ENTRY;
 
        lu_device_fini(&m->mdt_lu_dev);
@@ -6982,33 +8008,33 @@ static struct lu_device *mdt_device_free(const struct lu_env *env,
 }
 
 static struct lu_device *mdt_device_alloc(const struct lu_env *env,
-                                          struct lu_device_type *t,
-                                          struct lustre_cfg *cfg)
+                                         struct lu_device_type *t,
+                                         struct lustre_cfg *cfg)
 {
-        struct lu_device  *l;
-        struct mdt_device *m;
+       struct lu_device  *l;
+       struct mdt_device *m;
 
-        OBD_ALLOC_PTR(m);
-        if (m != NULL) {
-                int rc;
+       OBD_ALLOC_PTR(m);
+       if (m != NULL) {
+               int rc;
 
                l = &m->mdt_lu_dev;
-                rc = mdt_init0(env, m, t, cfg);
-                if (rc != 0) {
-                        mdt_device_free(env, l);
-                        l = ERR_PTR(rc);
-                        return l;
-                }
-        } else
-                l = ERR_PTR(-ENOMEM);
-        return l;
+               rc = mdt_init0(env, m, t, cfg);
+               if (rc != 0) {
+                       mdt_device_free(env, l);
+                       l = ERR_PTR(rc);
+                       return l;
+               }
+       } else
+               l = ERR_PTR(-ENOMEM);
+       return l;
 }
 
 /* context key constructor/destructor: mdt_key_init, mdt_key_fini */
 LU_KEY_INIT(mdt, struct mdt_thread_info);
 
 static void mdt_key_fini(const struct lu_context *ctx,
-                        struct lu_context_key *key, voiddata)
+                        struct lu_context_key *key, void *data)
 {
        struct mdt_thread_info *info = data;
 
@@ -7050,14 +8076,14 @@ struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info)
  */
 void mdt_enable_cos(struct mdt_device *mdt, bool val)
 {
-        struct lu_env env;
-        int rc;
+       struct lu_env env;
+       int rc;
 
        mdt->mdt_opts.mo_cos = val;
-        rc = lu_env_init(&env, LCT_LOCAL);
+       rc = lu_env_init(&env, LCT_LOCAL);
        if (unlikely(rc != 0)) {
-               CWARN("%s: lu_env initialization failed, cannot "
-                     "sync: rc = %d\n", mdt_obd_name(mdt), rc);
+               CWARN("%s: lu_env initialization failed, cannot sync: rc = %d\n",
+                     mdt_obd_name(mdt), rc);
                return;
        }
        mdt_device_sync(&env, mdt);
@@ -7073,20 +8099,20 @@ void mdt_enable_cos(struct mdt_device *mdt, bool val)
  */
 int mdt_cos_is_enabled(struct mdt_device *mdt)
 {
-        return mdt->mdt_opts.mo_cos != 0;
+       return mdt->mdt_opts.mo_cos != 0;
 }
 
-static struct lu_device_type_operations mdt_device_type_ops = {
-        .ldto_device_alloc = mdt_device_alloc,
-        .ldto_device_free  = mdt_device_free,
-        .ldto_device_fini  = mdt_device_fini
+static const struct lu_device_type_operations mdt_device_type_ops = {
+       .ldto_device_alloc = mdt_device_alloc,
+       .ldto_device_free  = mdt_device_free,
+       .ldto_device_fini  = mdt_device_fini
 };
 
 static struct lu_device_type mdt_device_type = {
-        .ldt_tags     = LU_DEVICE_MD,
-        .ldt_name     = LUSTRE_MDT_NAME,
-        .ldt_ops      = &mdt_device_type_ops,
-        .ldt_ctx_tags = LCT_MD_THREAD
+       .ldt_tags     = LU_DEVICE_MD,
+       .ldt_name     = LUSTRE_MDT_NAME,
+       .ldt_ops      = &mdt_device_type_ops,
+       .ldt_ctx_tags = LCT_MD_THREAD
 };
 
 static int __init mdt_init(void)
@@ -7097,6 +8123,11 @@ static int __init mdt_init(void)
                     FID_NOBRACE_LEN + 1);
        BUILD_BUG_ON(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") !=
                     FID_LEN + 1);
+
+       rc = libcfs_setup();
+       if (rc)
+               return rc;
+
        rc = lu_kmem_init(mdt_caches);
        if (rc)
                return rc;
@@ -7105,7 +8136,7 @@ static int __init mdt_init(void)
        if (rc)
                GOTO(lu_fini, rc);
 
-       rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL,
+       rc = class_register_type(&mdt_obd_device_ops, NULL, true,
                                 LUSTRE_MDT_NAME, &mdt_device_type);
        if (rc)
                GOTO(mds_fini, rc);