Whamcloud - gitweb
LU-930 misc: limit CDEBUG console message frequency
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 2ce7a68..bf1d0d7 100644 (file)
@@ -58,6 +58,7 @@
 #include <uapi/linux/lustre/lustre_param.h>
 #include <lustre_quota.h>
 #include <lustre_swab.h>
+#include <lustre_lmv.h>
 #include <obd.h>
 #include <obd_support.h>
 #include <lustre_barrier.h>
@@ -163,6 +164,13 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
        lh->mlh_type = MDT_REG_LOCK;
 }
 
+void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
+{
+       mdt_lock_reg_init(lh, lock->l_req_mode);
+       if (lock->l_req_mode == LCK_GROUP)
+               lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
+}
+
 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
                       const struct lu_name *lname)
 {
@@ -268,6 +276,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct lu_name *lname = &info->mti_name;
+       const char *start = fileset;
        char *filename = info->mti_filename;
        struct mdt_object *parent;
        u32 mode;
@@ -282,8 +291,8 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
         */
        *fid = mdt->mdt_md_root_fid;
 
-       while (rc == 0 && fileset != NULL && *fileset != '\0') {
-               const char *s1 = fileset;
+       while (rc == 0 && start != NULL && *start != '\0') {
+               const char *s1 = start;
                const char *s2;
 
                while (*++s1 == '/')
@@ -295,7 +304,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                if (s2 == s1)
                        break;
 
-               fileset = s2;
+               start = s2;
 
                lname->ln_namelen = s2 - s1;
                if (lname->ln_namelen > NAME_MAX) {
@@ -331,9 +340,21 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                        rc = PTR_ERR(parent);
                else {
                        mode = lu_object_attr(&parent->mot_obj);
-                       mdt_object_put(info->mti_env, parent);
-                       if (!S_ISDIR(mode))
+                       if (!S_ISDIR(mode)) {
                                rc = -ENOTDIR;
+                       } else if (mdt_is_remote_object(info, parent, parent)) {
+                               if (!mdt->mdt_enable_remote_subdir_mount) {
+                                       rc = -EREMOTE;
+                                       LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
+                                                     mdt_obd_name(mdt),
+                                                     fileset, rc);
+                               } else {
+                                       LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
+                                                     mdt_obd_name(mdt),
+                                                     fileset);
+                               }
+                       }
+                       mdt_object_put(info->mti_env, parent);
                }
        }
 
@@ -414,6 +435,7 @@ static int mdt_statfs(struct tgt_session_info *tsi)
        struct obd_statfs *osfs;
        struct mdt_body *reqbody = NULL;
        struct mdt_statfs_cache *msf;
+       ktime_t kstart = ktime_get();
        int rc;
 
        ENTRY;
@@ -503,12 +525,65 @@ static int mdt_statfs(struct tgt_session_info *tsi)
                osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
        }
        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_STATFS);
+               mdt_counter_incr(req, LPROC_MDT_STATFS,
+                                ktime_us_delta(ktime_get(), kstart));
 out:
        mdt_thread_info_fini(info);
        RETURN(rc);
 }
 
+__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
+{
+       struct lov_comp_md_v1 *comp_v1;
+       struct lov_mds_md *v1;
+       __u32 off;
+       __u32 dom_stripesize = 0;
+       int i;
+       bool has_ost_stripes = false;
+
+       ENTRY;
+
+       if (is_dom_only)
+               *is_dom_only = 0;
+
+       if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
+               RETURN(0);
+
+       comp_v1 = (struct lov_comp_md_v1 *)lmm;
+       off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
+       v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+       /* Fast check for DoM entry with no mirroring, should be the first */
+       if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
+           lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
+               RETURN(0);
+
+       /* check all entries otherwise */
+       for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
+               struct lov_comp_md_entry_v1 *lcme;
+
+               lcme = &comp_v1->lcm_entries[i];
+               if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
+                       continue;
+
+               off = le32_to_cpu(lcme->lcme_offset);
+               v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+               if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
+                   LOV_PATTERN_MDT)
+                       dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
+               else
+                       has_ost_stripes = true;
+
+               if (dom_stripesize && has_ost_stripes)
+                       RETURN(dom_stripesize);
+       }
+       /* DoM-only case exits here */
+       if (is_dom_only && dom_stripesize)
+               *is_dom_only = 1;
+       RETURN(dom_stripesize);
+}
+
 /**
  * Pack size attributes into the reply.
  */
@@ -517,7 +592,7 @@ int mdt_pack_size2body(struct mdt_thread_info *info,
 {
        struct mdt_body *b;
        struct md_attr *ma = &info->mti_attr;
-       int dom_stripe;
+       __u32 dom_stripe;
        bool dom_lock = false;
 
        ENTRY;
@@ -528,9 +603,9 @@ int mdt_pack_size2body(struct mdt_thread_info *info,
            !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
                RETURN(-ENODATA);
 
-       dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm);
+       dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
        /* no DoM stripe, no size in reply */
-       if (dom_stripe == LMM_NO_DOM)
+       if (!dom_stripe)
                RETURN(-ENOENT);
 
        if (lustre_handle_is_used(lh)) {
@@ -555,7 +630,7 @@ int mdt_pack_size2body(struct mdt_thread_info *info,
        RETURN(0);
 }
 
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
 /*
  * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
  *
@@ -598,8 +673,9 @@ again:
                            buf->lb_buf != info->mti_big_acl) {
                                if (info->mti_big_acl == NULL) {
                                        info->mti_big_aclsize =
-                                                       MIN(mdt->mdt_max_ea_size,
-                                                           XATTR_SIZE_MAX);
+                                                       min_t(unsigned int,
+                                                             mdt->mdt_max_ea_size,
+                                                             XATTR_SIZE_MAX);
                                        OBD_ALLOC_LARGE(info->mti_big_acl,
                                                        info->mti_big_aclsize);
                                        if (info->mti_big_acl == NULL) {
@@ -726,6 +802,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_ctime = attr->la_ctime;
                b->mbo_valid |= OBD_MD_FLCTIME;
        }
+       if (attr->la_valid & LA_BTIME) {
+               b->mbo_btime = attr->la_btime;
+               b->mbo_valid |= OBD_MD_FLBTIME;
+       }
        if (attr->la_valid & LA_FLAGS) {
                b->mbo_flags = attr->la_flags;
                b->mbo_valid |= OBD_MD_FLFLAGS;
@@ -801,10 +881,15 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                        b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
                } else if (info->mti_som_valid) { /* som is valid */
                        b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+               } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
+                       b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
+                       b->mbo_size = ma->ma_som.ms_size;
+                       b->mbo_blocks = ma->ma_som.ms_blocks;
                }
        }
 
-       if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE))
+       if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
+                           b->mbo_valid & OBD_MD_FLLAZYSIZE))
                CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
                       PFID(fid), (unsigned long long)b->mbo_size);
 
@@ -917,8 +1002,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(rc);
 }
 
-int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
-                  struct md_attr *ma, const char *name)
+int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                    struct md_attr *ma, const char *name)
 {
        struct md_object *next = mdt_object_child(o);
        struct lu_buf    *buf = &info->mti_buf;
@@ -933,8 +1018,8 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
                buf->lb_len = ma->ma_lmv_size;
                LASSERT(!(ma->ma_valid & MA_LMV));
        } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
-               buf->lb_buf = ma->ma_lmv;
-               buf->lb_len = ma->ma_lmv_size;
+               buf->lb_buf = ma->ma_default_lmv;
+               buf->lb_len = ma->ma_default_lmv_size;
                LASSERT(!(ma->ma_valid & MA_LMV_DEF));
        } else {
                return -EINVAL;
@@ -967,7 +1052,7 @@ got:
                        ma->ma_lmv_size = rc;
                        ma->ma_valid |= MA_LMV;
                } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
-                       ma->ma_lmv_size = rc;
+                       ma->ma_default_lmv_size = rc;
                        ma->ma_valid |= MA_LMV_DEF;
                }
 
@@ -994,6 +1079,40 @@ got:
        return rc;
 }
 
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                  struct md_attr *ma, const char *name)
+{
+       int rc;
+
+       if (!info->mti_big_lmm) {
+               OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
+               if (!info->mti_big_lmm)
+                       return -ENOMEM;
+               info->mti_big_lmmsize = PAGE_SIZE;
+       }
+
+       if (strcmp(name, XATTR_NAME_LOV) == 0) {
+               ma->ma_lmm = info->mti_big_lmm;
+               ma->ma_lmm_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LOV;
+       } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
+               ma->ma_lmv = info->mti_big_lmm;
+               ma->ma_lmv_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LMV;
+       } else {
+               LBUG();
+       }
+
+       LASSERT(!info->mti_big_lmm_used);
+       rc = __mdt_stripe_get(info, o, ma, name);
+       /* since big_lmm is always used here, clear 'used' flag to avoid
+        * assertion in mdt_big_xattr_get().
+        */
+       info->mti_big_lmm_used = 0;
+
+       return rc;
+}
+
 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
                      struct lu_fid *pfid)
 {
@@ -1041,6 +1160,51 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(0);
 }
 
+int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct lu_fid *pfid, struct lu_name *lname)
+{
+       struct lu_buf *buf = &info->mti_buf;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       int reclen;
+       int rc;
+
+       buf->lb_buf = info->mti_xattr_buf;
+       buf->lb_len = sizeof(info->mti_xattr_buf);
+       rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
+                         XATTR_NAME_LINK);
+       if (rc == -ERANGE) {
+               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+               buf->lb_buf = info->mti_big_lmm;
+               buf->lb_len = info->mti_big_lmmsize;
+       }
+       if (rc < 0)
+               return rc;
+
+       if (rc < sizeof(*leh)) {
+               CERROR("short LinkEA on "DFID": rc = %d\n",
+                      PFID(mdt_object_fid(o)), rc);
+               return -ENODATA;
+       }
+
+       leh = (struct link_ea_header *)buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+               leh->leh_magic = LINK_EA_MAGIC;
+               leh->leh_reccount = __swab32(leh->leh_reccount);
+               leh->leh_len = __swab64(leh->leh_len);
+       }
+       if (leh->leh_magic != LINK_EA_MAGIC)
+               return -EINVAL;
+
+       if (leh->leh_reccount == 0)
+               return -ENODATA;
+
+       linkea_entry_unpack(lee, &reclen, lname, pfid);
+
+       return 0;
+}
+
 int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma)
 {
@@ -1078,19 +1242,19 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
        }
 
        if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
                if (rc)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV_DEF && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
@@ -1107,8 +1271,8 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
        if (need & MA_HSM && S_ISREG(mode)) {
                buf->lb_buf = info->mti_xattr_buf;
                buf->lb_len = sizeof(info->mti_xattr_buf);
-               CLASSERT(sizeof(struct hsm_attrs) <=
-                        sizeof(info->mti_xattr_buf));
+               BUILD_BUG_ON(sizeof(struct hsm_attrs) >
+                            sizeof(info->mti_xattr_buf));
                rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
                rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
                if (rc2 == 0)
@@ -1117,7 +1281,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
                        GOTO(out, rc = rc2);
        }
 
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
        if (need & MA_ACL_DEF && S_ISDIR(mode)) {
                buf->lb_buf = ma->ma_acl;
                buf->lb_len = ma->ma_acl_size;
@@ -1140,19 +1304,22 @@ out:
 }
 
 static int mdt_getattr_internal(struct mdt_thread_info *info,
-                                struct mdt_object *o, int ma_need)
+                               struct mdt_object *o, int ma_need)
 {
-       struct md_object        *next = mdt_object_child(o);
-       const struct mdt_body   *reqbody = info->mti_body;
-       struct ptlrpc_request   *req = mdt_info_req(info);
-       struct md_attr          *ma = &info->mti_attr;
-       struct lu_attr          *la = &ma->ma_attr;
-       struct req_capsule      *pill = info->mti_pill;
-       const struct lu_env     *env = info->mti_env;
-       struct mdt_body         *repbody;
-       struct lu_buf           *buffer = &info->mti_buf;
-       struct obd_export       *exp = info->mti_exp;
-       int                      rc;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct md_object *next = mdt_object_child(o);
+       const struct mdt_body *reqbody = info->mti_body;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_attr *la = &ma->ma_attr;
+       struct req_capsule *pill = info->mti_pill;
+       const struct lu_env *env = info->mti_env;
+       struct mdt_body *repbody;
+       struct lu_buf *buffer = &info->mti_buf;
+       struct obd_export *exp = info->mti_exp;
+       ktime_t kstart = ktime_get();
+       int rc;
+
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
@@ -1188,41 +1355,71 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                       req->rq_export->exp_client_uuid.uuid);
        }
 
-       /* If it is dir object and client require MEA, then we got MEA */
+       /* from 2.12.58 intent_getattr pack default LMV in reply */
        if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
-           (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
+           ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
+                   (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
+           req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
+                                 RCL_SERVER)) {
+               ma->ma_lmv = buffer->lb_buf;
+               ma->ma_lmv_size = buffer->lb_len;
+               ma->ma_default_lmv = req_capsule_server_get(pill,
+                                               &RMF_DEFAULT_MDT_MD);
+               ma->ma_default_lmv_size = req_capsule_get_size(pill,
+                                               &RMF_DEFAULT_MDT_MD,
+                                               RCL_SERVER);
+               ma->ma_need = MA_INODE;
+               if (ma->ma_lmv_size > 0)
+                       ma->ma_need |= MA_LMV;
+               if (ma->ma_default_lmv_size > 0)
+                       ma->ma_need |= MA_LMV_DEF;
+       } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
+                  (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
+               /* If it is dir and client require MEA, then we got MEA */
                /* Assumption: MDT_MD size is enough for lmv size. */
                ma->ma_lmv = buffer->lb_buf;
                ma->ma_lmv_size = buffer->lb_len;
                ma->ma_need = MA_INODE;
                if (ma->ma_lmv_size > 0) {
-                       if (reqbody->mbo_valid & OBD_MD_MEA)
+                       if (reqbody->mbo_valid & OBD_MD_MEA) {
                                ma->ma_need |= MA_LMV;
-                       else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA)
+                       } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
                                ma->ma_need |= MA_LMV_DEF;
+                               ma->ma_default_lmv = buffer->lb_buf;
+                               ma->ma_lmv = NULL;
+                               ma->ma_default_lmv_size = buffer->lb_len;
+                               ma->ma_lmv_size = 0;
+                       }
                }
        } else {
                ma->ma_lmm = buffer->lb_buf;
                ma->ma_lmm_size = buffer->lb_len;
                ma->ma_need = MA_INODE | MA_HSM;
-               if (ma->ma_lmm_size > 0)
+               if (ma->ma_lmm_size > 0) {
                        ma->ma_need |= MA_LOV;
+                       /* Older clients may crash if they getattr overstriped
+                        * files
+                        */
+                       if (!exp_connect_overstriping(exp) &&
+                           mdt_lmm_is_overstriping(ma->ma_lmm))
+                               RETURN(-EOPNOTSUPP);
+               }
        }
 
-        if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
+       if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
            reqbody->mbo_valid & OBD_MD_FLDIREA  &&
-            lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
-                /* get default stripe info for this dir. */
-                ma->ma_need |= MA_LOV_DEF;
-        }
-        ma->ma_need |= ma_need;
+           lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
+               /* get default stripe info for this dir. */
+               ma->ma_need |= MA_LOV_DEF;
+       }
+       ma->ma_need |= ma_need;
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
-               CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
-                      "%s: getattr error for "DFID": rc = %d\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(mdt_object_fid(o)), rc);
+               CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
+                            "%s: getattr error for "DFID": rc = %d\n",
+                            mdt_obd_name(info->mti_mdt),
+                            PFID(mdt_object_fid(o)), rc);
                RETURN(rc);
        }
 
@@ -1234,22 +1431,27 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_t_state = MS_RESTORE;
        }
 
-        if (likely(ma->ma_valid & MA_INODE))
-                mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
-        else
-                RETURN(-EFAULT);
+       if (unlikely(!(ma->ma_valid & MA_INODE)))
+               RETURN(-EFAULT);
+
+       mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
 
-        if (mdt_body_has_lov(la, reqbody)) {
-                if (ma->ma_valid & MA_LOV) {
-                        LASSERT(ma->ma_lmm_size);
+       if (mdt_body_has_lov(la, reqbody)) {
+               u32 stripe_count = 1;
+
+               if (ma->ma_valid & MA_LOV) {
+                       LASSERT(ma->ma_lmm_size);
                        repbody->mbo_eadatasize = ma->ma_lmm_size;
                        if (S_ISDIR(la->la_mode))
                                repbody->mbo_valid |= OBD_MD_FLDIREA;
                        else
                                repbody->mbo_valid |= OBD_MD_FLEASIZE;
                        mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
-                }
+               }
                if (ma->ma_valid & MA_LMV) {
+                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+                       u32 magic = le32_to_cpu(lmv->lmv_magic);
+
                        /* Return -ENOTSUPP for old client */
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
@@ -1258,17 +1460,41 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->mbo_eadatasize = ma->ma_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
+
+                       stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+                       if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
+                               mdt_restripe_migrate_add(info, o);
+                       else if (magic == LMV_MAGIC_V1 &&
+                                lmv_is_restriping(lmv))
+                               mdt_restripe_update_add(info, o);
                }
                if (ma->ma_valid & MA_LMV_DEF) {
                        /* Return -ENOTSUPP for old client */
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
                        LASSERT(S_ISDIR(la->la_mode));
-                       mdt_dump_lmv(D_INFO, ma->ma_lmv);
-                       repbody->mbo_eadatasize = ma->ma_lmv_size;
+                       /*
+                        * when ll_dir_getstripe() gets default LMV, it
+                        * checks mbo_eadatasize.
+                        */
+                       if (!(ma->ma_valid & MA_LMV))
+                               repbody->mbo_eadatasize =
+                                       ma->ma_default_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA |
                                               OBD_MD_DEFAULT_MEA);
                }
+               CDEBUG(D_VFSTRACE,
+                      "dirent count %llu stripe count %u MDT count %d\n",
+                      ma->ma_attr.la_dirent_count, stripe_count,
+                      atomic_read(&mdt->mdt_mds_mds_conns) + 1);
+               if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
+                   ma->ma_attr.la_dirent_count >
+                       mdt->mdt_restriper.mdr_dir_split_count &&
+                   !fid_is_root(mdt_object_fid(o)) &&
+                   mdt->mdt_enable_dir_auto_split &&
+                   !o->mot_restriping &&
+                   stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1)
+                       mdt_auto_split_add(info, o);
        } else if (S_ISLNK(la->la_mode) &&
                   reqbody->mbo_valid & OBD_MD_LINKNAME) {
                buffer->lb_buf = ma->ma_lmm;
@@ -1306,8 +1532,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                               print_limit < rc ? "..." : "", print_limit,
                               (char *)ma->ma_lmm + rc - print_limit, rc);
                        rc = 0;
-                }
-        }
+               }
+       }
 
        if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
                repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
@@ -1316,7 +1542,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                       repbody->mbo_max_mdsize);
        }
 
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
        if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
                 (reqbody->mbo_valid & OBD_MD_FLACL)) {
                struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
@@ -1329,10 +1555,11 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 #endif
 
 out:
-        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_GETATTR);
+       if (rc == 0)
+               mdt_counter_incr(req, LPROC_MDT_GETATTR,
+                                ktime_us_delta(ktime_get(), kstart));
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_getattr(struct tgt_session_info *tsi)
@@ -1345,9 +1572,11 @@ static int mdt_getattr(struct tgt_session_info *tsi)
         int rc, rc2;
         ENTRY;
 
-        reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
-        LASSERT(reqbody);
-        LASSERT(obj != NULL);
+       if (unlikely(info->mti_object == NULL))
+               RETURN(-EPROTO);
+
+       reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
+       LASSERT(reqbody);
        LASSERT(lu_object_assert_exists(&obj->mot_obj));
 
        /* Special case for Data-on-MDT files to get data version */
@@ -1422,42 +1651,59 @@ out:
 /**
  * Handler of layout intent RPC requiring the layout modification
  *
- * \param[in] info     thread environment
- * \param[in] obj      object
- * \param[in] layout   layout change descriptor
+ * \param[in]  info    thread environment
+ * \param[in]  obj     object
+ * \param[out] lhc     object ldlm lock handle
+ * \param[in]  layout  layout change descriptor
  *
  * \retval 0   on success
  * \retval < 0 error code
  */
 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
+                     struct mdt_lock_handle *lhc,
                      struct md_layout_change *layout)
 {
-       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
        int rc;
+
        ENTRY;
 
        if (!mdt_object_exists(obj))
-               GOTO(out, rc = -ENOENT);
+               RETURN(-ENOENT);
 
        if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
-               GOTO(out, rc = -EINVAL);
+               RETURN(-EINVAL);
 
        rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
                           MAY_WRITE);
        if (rc)
-               GOTO(out, rc);
+               RETURN(rc);
 
-       /* take layout lock to prepare layout change */
-       mdt_lock_reg_init(lh, LCK_EX);
-       rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT);
-       if (rc)
-               GOTO(out, rc);
+       rc = mdt_check_resent_lock(info, obj, lhc);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (rc > 0) {
+               /* not resent */
+               __u64 lockpart = MDS_INODELOCK_LAYOUT;
+
+               /* take layout lock to prepare layout change */
+               if (layout->mlc_opc == MD_LAYOUT_WRITE)
+                       lockpart |= MDS_INODELOCK_UPDATE;
+
+               mdt_lock_handle_init(lhc);
+               mdt_lock_reg_init(lhc, LCK_EX);
+               rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
+               if (rc)
+                       RETURN(rc);
+       }
 
        mutex_lock(&obj->mot_som_mutex);
        rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
        mutex_unlock(&obj->mot_som_mutex);
-       mdt_object_unlock(info, obj, lh, 1);
-out:
+
+       if (rc)
+               mdt_object_unlock(info, obj, lhc, 1);
+
        RETURN(rc);
 }
 
@@ -1505,6 +1751,8 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi)
                RETURN(-EOPNOTSUPP);
 
        info = tsi2mdt_info(tsi);
+       if (unlikely(info->mti_object == NULL))
+               RETURN(-EPROTO);
 
        if (info->mti_dlm_req != NULL)
                ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
@@ -1614,19 +1862,18 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                                  __u64 child_bits,
                                  struct ldlm_reply *ldlm_rep)
 {
-       struct ptlrpc_request  *req = mdt_info_req(info);
-       struct mdt_body        *reqbody = NULL;
-       struct mdt_object      *parent = info->mti_object;
-       struct mdt_object      *child;
-       struct lu_fid          *child_fid = &info->mti_tmp_fid1;
-       struct lu_name         *lname = NULL;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct mdt_body *reqbody = NULL;
+       struct mdt_object *parent = info->mti_object;
+       struct mdt_object *child = NULL;
+       struct lu_fid *child_fid = &info->mti_tmp_fid1;
+       struct lu_name *lname = NULL;
        struct mdt_lock_handle *lhp = NULL;
-       struct ldlm_lock       *lock;
+       struct ldlm_lock *lock;
        struct req_capsule *pill = info->mti_pill;
        __u64 try_bits = 0;
        bool is_resent;
        int ma_need = 0;
-       bool deal_with_dom = false;
        int rc;
 
        ENTRY;
@@ -1680,11 +1927,20 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                }
 
                rc = mdt_getattr_internal(info, child, 0);
-               if (unlikely(rc != 0))
+               if (unlikely(rc != 0)) {
                        mdt_object_unlock(info, child, lhc, 1);
+                       RETURN(rc);
+               }
 
-               mdt_pack_secctx_in_reply(info, child);
+               rc = mdt_pack_secctx_in_reply(info, child);
+               if (unlikely(rc)) {
+                       mdt_object_unlock(info, child, lhc, 1);
+                       RETURN(rc);
+               }
 
+               rc = mdt_pack_encctx_in_reply(info, child);
+               if (unlikely(rc))
+                       mdt_object_unlock(info, child, lhc, 1);
                RETURN(rc);
        }
 
@@ -1692,6 +1948,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
 
        if (lu_name_is_valid(lname)) {
+               if (mdt_object_remote(parent)) {
+                       CERROR("%s: parent "DFID" is on remote target\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(parent)));
+                       RETURN(-EPROTO);
+               }
+
                CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
                       "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
                       PNAME(lname), ldlm_rep);
@@ -1701,10 +1964,33 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        RETURN(err_serious(-EPROTO));
 
                *child_fid = reqbody->mbo_fid2;
-
                if (unlikely(!fid_is_sane(child_fid)))
                        RETURN(err_serious(-EINVAL));
 
+               if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
+                       mdt_object_get(info->mti_env, parent);
+                       child = parent;
+               } else {
+                       child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                               child_fid);
+                       if (IS_ERR(child))
+                               RETURN(PTR_ERR(child));
+               }
+
+               if (mdt_object_remote(child)) {
+                       CERROR("%s: child "DFID" is on remote target\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(child)));
+                       GOTO(out_child, rc = -EPROTO);
+               }
+
+               /* don't fetch LOOKUP lock if it's remote object */
+               rc = mdt_is_remote_object(info, parent, child);
+               if (rc < 0)
+                       GOTO(out_child, rc);
+               if (rc)
+                       child_bits &= ~MDS_INODELOCK_LOOKUP;
+
                CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
                       "ldlm_rep = %p\n",
                       PFID(mdt_object_fid(parent)),
@@ -1717,14 +2003,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                &parent->mot_obj,
                                "Parent doesn't exist!");
-               RETURN(-ESTALE);
-       }
-
-       if (mdt_object_remote(parent)) {
-               CERROR("%s: parent "DFID" is on remote target\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(mdt_object_fid(parent)));
-               RETURN(-EIO);
+               GOTO(out_child, rc = -ESTALE);
        }
 
        if (lu_name_is_valid(lname)) {
@@ -1758,30 +2037,18 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
 
                if (rc != 0)
-                       GOTO(out_parent, rc);
-       }
-
-       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+                       GOTO(unlock_parent, rc);
 
-       /*
-        *step 3: find the child object by fid & lock it.
-        *        regardless if it is local or remote.
-        *
-        *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e)
-        *      set parent dir fid the same as child fid in getattr by fid case
-        *      we should not lu_object_find() the object again, could lead
-        *      to hung if there is a concurrent unlink destroyed the object.
-        */
-       if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
-               mdt_object_get(info->mti_env, parent);
-               child = parent;
-       } else {
                child = mdt_object_find(info->mti_env, info->mti_mdt,
                                        child_fid);
+               if (unlikely(IS_ERR(child)))
+                       GOTO(unlock_parent, rc = PTR_ERR(child));
        }
 
-       if (unlikely(IS_ERR(child)))
-               GOTO(out_parent, rc = PTR_ERR(child));
+       mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+
+       /* step 3: lock child regardless if it is local or remote. */
+       LASSERT(child);
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
        if (!mdt_object_exists(child)) {
@@ -1799,7 +2066,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                mdt_lock_reg_init(lhc, LCK_PR);
 
                if (!(child_bits & MDS_INODELOCK_UPDATE) &&
-                     mdt_object_exists(child) && !mdt_object_remote(child)) {
+                   !mdt_object_remote(child)) {
                        struct md_attr *ma = &info->mti_attr;
 
                        ma->ma_valid = 0;
@@ -1813,12 +2080,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         * lock and this might save us RPC on later STAT. For
                         * directories, it also let negative dentry cache start
                         * working for this dir. */
-                        if (ma->ma_valid & MA_INODE &&
-                            ma->ma_attr.la_valid & LA_CTIME &&
-                            info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
-                               ma->ma_attr.la_ctime < ktime_get_real_seconds())
-                                child_bits |= MDS_INODELOCK_UPDATE;
-                }
+                       if (ma->ma_valid & MA_INODE &&
+                           ma->ma_attr.la_valid & LA_CTIME &&
+                           info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
+                           ma->ma_attr.la_ctime < ktime_get_real_seconds())
+                               child_bits |= MDS_INODELOCK_UPDATE;
+               }
 
                /* layout lock must be granted in a best-effort way
                 * for IT operations */
@@ -1853,14 +2120,27 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        GOTO(out_child, rc);
        }
 
-       lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-
        /* finally, we can get attr for child. */
        rc = mdt_getattr_internal(info, child, ma_need);
        if (unlikely(rc != 0)) {
                mdt_object_unlock(info, child, lhc, 1);
-               GOTO(out_lock, rc);
-       } else if (lock) {
+               GOTO(out_child, rc);
+       }
+
+       rc = mdt_pack_secctx_in_reply(info, child);
+       if (unlikely(rc)) {
+               mdt_object_unlock(info, child, lhc, 1);
+               GOTO(out_child, rc);
+       }
+
+       rc = mdt_pack_encctx_in_reply(info, child);
+       if (unlikely(rc)) {
+               mdt_object_unlock(info, child, lhc, 1);
+               GOTO(out_child, rc);
+       }
+
+       lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
+       if (lock) {
                /* Debugging code. */
                LDLM_DEBUG(lock, "Returning lock to client");
                LASSERTF(fid_res_name_eq(mdt_object_fid(child),
@@ -1869,77 +2149,286 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         PLDLMRES(lock->l_resource),
                         PFID(mdt_object_fid(child)));
 
-               if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
-                   mdt_object_exists(child) && !mdt_object_remote(child) &&
-                   child != parent)
-                       deal_with_dom = true;
-       }
+               if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
+                   !mdt_object_remote(child) && child != parent) {
+                       mdt_object_put(info->mti_env, child);
+                       rc = mdt_pack_size2body(info, child_fid,
+                                               &lhc->mlh_reg_lh);
+                       if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
+                               /* DOM lock was taken in advance but this is
+                                * not DoM file. Drop the lock.
+                                */
+                               lock_res_and_lock(lock);
+                               ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
+                               unlock_res_and_lock(lock);
+                       }
+                       LDLM_LOCK_PUT(lock);
+                       GOTO(unlock_parent, rc = 0);
+               }
+               LDLM_LOCK_PUT(lock);
+       }
+
+       EXIT;
+out_child:
+       if (child)
+               mdt_object_put(info->mti_env, child);
+unlock_parent:
+       if (lhp)
+               mdt_object_unlock(info, parent, lhp, 1);
+       return rc;
+}
+
+/* normal handler: should release the child lock */
+static int mdt_getattr_name(struct tgt_session_info *tsi)
+{
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
+       struct mdt_body *reqbody;
+       struct mdt_body *repbody;
+       int rc, rc2;
+
+       ENTRY;
+
+       reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(reqbody != NULL);
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(repbody != NULL);
+
+       info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
+       repbody->mbo_eadatasize = 0;
+       repbody->mbo_aclsize = 0;
+
+       rc = mdt_init_ucred(info, reqbody);
+       if (unlikely(rc))
+               GOTO(out_shrink, rc);
+
+       rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
+               lhc->mlh_reg_lh.cookie = 0;
+       }
+       mdt_exit_ucred(info);
+       EXIT;
+out_shrink:
+       mdt_client_compatibility(info);
+       rc2 = mdt_fix_reply(info);
+       if (rc == 0)
+               rc = rc2;
+       mdt_thread_info_fini(info);
+       return rc;
+}
+
+static int mdt_rmfid_unlink(struct mdt_thread_info *info,
+                           const struct lu_fid *pfid,
+                           const struct lu_name *name,
+                           struct mdt_object *obj, s64 ctime)
+{
+       struct lu_fid *child_fid = &info->mti_tmp_fid1;
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+       struct mdt_device *mdt = info->mti_mdt;
+       struct md_attr *ma = &info->mti_attr;
+       struct mdt_lock_handle *parent_lh;
+       struct mdt_lock_handle *child_lh;
+       struct mdt_object *pobj;
+       bool cos_incompat = false;
+       int rc;
+       ENTRY;
+
+       pobj = mdt_object_find(info->mti_env, mdt, pfid);
+       if (IS_ERR(pobj))
+               GOTO(out, rc = PTR_ERR(pobj));
+
+       parent_lh = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(parent_lh, LCK_PW, name);
+       rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
+       if (rc != 0)
+               GOTO(put_parent, rc);
+
+       if (mdt_object_remote(pobj))
+               cos_incompat = true;
+
+       rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
+                       name, child_fid, &info->mti_spec);
+       if (rc != 0)
+               GOTO(unlock_parent, rc);
+
+       if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
+               GOTO(unlock_parent, rc = -EREMCHG);
+
+       child_lh = &info->mti_lh[MDT_LH_CHILD];
+       mdt_lock_reg_init(child_lh, LCK_EX);
+       rc = mdt_reint_striped_lock(info, obj, child_lh,
+                                   MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
+                                   einfo, cos_incompat);
+       if (rc != 0)
+               GOTO(unlock_parent, rc);
+
+       if (atomic_read(&obj->mot_open_count)) {
+               CDEBUG(D_OTHER, "object "DFID" open, skip\n",
+                      PFID(mdt_object_fid(obj)));
+               GOTO(unlock_child, rc = -EBUSY);
+       }
+
+       ma->ma_need = 0;
+       ma->ma_valid = MA_INODE;
+       ma->ma_attr.la_valid = LA_CTIME;
+       ma->ma_attr.la_ctime = ctime;
+
+       mutex_lock(&obj->mot_lov_mutex);
+
+       rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
+                       mdt_object_child(obj), name, ma, 0);
+
+       mutex_unlock(&obj->mot_lov_mutex);
+
+unlock_child:
+       mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
+unlock_parent:
+       mdt_object_unlock(info, pobj, parent_lh, 1);
+put_parent:
+       mdt_object_put(info->mti_env, pobj);
+out:
+       RETURN(rc);
+}
+
+static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
+                                       struct mdt_object *obj)
+{
+       struct lu_ucred *uc = lu_ucred(info->mti_env);
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_attr *la = &ma->ma_attr;
+       int rc = 0;
+       ENTRY;
+
+       ma->ma_need = MA_INODE;
+       rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
+       if (rc)
+               GOTO(out, rc);
+
+       if (la->la_flags & LUSTRE_IMMUTABLE_FL)
+                       rc = -EACCES;
+
+       if (md_capable(uc, CAP_DAC_OVERRIDE))
+               RETURN(0);
+       if (uc->uc_fsuid == la->la_uid) {
+               if ((la->la_mode & S_IWUSR) == 0)
+                       rc = -EACCES;
+       } else if (uc->uc_fsgid == la->la_gid) {
+               if ((la->la_mode & S_IWGRP) == 0)
+                       rc = -EACCES;
+       } else if ((la->la_mode & S_IWOTH) == 0) {
+                       rc = -EACCES;
+       }
+
+out:
+       RETURN(rc);
+}
+
+static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
+                        s64 ctime)
+{
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_object *obj = NULL;
+       struct linkea_data ldata = { NULL };
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct lu_name *name = &info->mti_name;
+       struct lu_fid *pfid = &info->mti_tmp_fid1;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       int reclen, count, rc = 0;
+       ENTRY;
+
+       if (!fid_is_sane(fid))
+               GOTO(out, rc = -EINVAL);
+
+       if (!fid_is_namespace_visible(fid))
+               GOTO(out, rc = -EINVAL);
+
+       obj = mdt_object_find(info->mti_env, mdt, fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       if (mdt_object_remote(obj))
+               GOTO(out, rc = -EREMOTE);
+       if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
+               GOTO(out, rc = -ENOENT);
 
-       mdt_pack_secctx_in_reply(info, child);
+       rc = mdt_rmfid_check_permission(info, obj);
+       if (rc)
+               GOTO(out, rc);
 
-out_lock:
-       if (lock)
-               LDLM_LOCK_PUT(lock);
+       /* take LinkEA */
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (!buf->lb_buf)
+               GOTO(out, rc = -ENOMEM);
 
-       EXIT;
-out_child:
-       mdt_object_put(info->mti_env, child);
-       if (deal_with_dom) {
-               rc = mdt_pack_size2body(info, child_fid,
-                                       &lhc->mlh_reg_lh);
-               if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
-                       /* DOM lock was taken in advance but this is
-                        * not DoM file. Drop the lock.
-                        */
-                       lock_res_and_lock(lock);
-                       ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
-                       unlock_res_and_lock(lock);
-               }
-               rc = 0;
+       ldata.ld_buf = buf;
+       rc = mdt_links_read(info, obj, &ldata);
+       if (rc)
+               GOTO(out, rc);
+
+       leh = buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       for (count = 0; count < leh->leh_reccount; count++) {
+               /* remove every hardlink */
+               linkea_entry_unpack(lee, &reclen, name, pfid);
+               lee = (struct link_ea_entry *) ((char *)lee + reclen);
+               rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
+               if (rc)
+                       break;
        }
-out_parent:
-       if (lhp)
-               mdt_object_unlock(info, parent, lhp, 1);
-       return rc;
+
+out:
+       if (obj && !IS_ERR(obj))
+               mdt_object_put(info->mti_env, obj);
+       if (info->mti_big_buf.lb_buf)
+               lu_buf_free(&info->mti_big_buf);
+
+       RETURN(rc);
 }
 
-/* normal handler: should release the child lock */
-static int mdt_getattr_name(struct tgt_session_info *tsi)
+static int mdt_rmfid(struct tgt_session_info *tsi)
 {
-       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
-        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
-        struct mdt_body        *reqbody;
-        struct mdt_body        *repbody;
-        int rc, rc2;
-        ENTRY;
-
-        reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(reqbody != NULL);
-        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(repbody != NULL);
+       struct mdt_thread_info *mti = tsi2mdt_info(tsi);
+       struct mdt_body *reqbody;
+       struct lu_fid *fids, *rfids;
+       int bufsize, rc;
+       __u32 *rcs;
+       int i, nr;
+       ENTRY;
 
-       info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
-       repbody->mbo_eadatasize = 0;
-       repbody->mbo_aclsize = 0;
+       reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
+       if (reqbody == NULL)
+               RETURN(-EPROTO);
+       bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
+                                      RCL_CLIENT);
+       nr = bufsize / sizeof(struct lu_fid);
+       if (nr * sizeof(struct lu_fid) != bufsize)
+               RETURN(-EINVAL);
+       req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
+                            RCL_SERVER, nr * sizeof(__u32));
+       req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
+                            RCL_SERVER, nr * sizeof(struct lu_fid));
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
+       fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
+       if (fids == NULL)
+               RETURN(-EPROTO);
+       rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
+       LASSERT(rcs);
+       rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
+       LASSERT(rfids);
 
-        rc = mdt_init_ucred_intent_getattr(info, reqbody);
-        if (unlikely(rc))
-                GOTO(out_shrink, rc);
+       mdt_init_ucred(mti, reqbody);
+       for (i = 0; i < nr; i++) {
+               rfids[i] = fids[i];
+               rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
+       }
+       mdt_exit_ucred(mti);
 
-        rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
-        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
-                lhc->mlh_reg_lh.cookie = 0;
-        }
-        mdt_exit_ucred(info);
-        EXIT;
-out_shrink:
-        mdt_client_compatibility(info);
-        rc2 = mdt_fix_reply(info);
-        if (rc == 0)
-                rc = rc2;
-       mdt_thread_info_fini(info);
-       return rc;
+out:
+       RETURN(rc);
 }
 
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
@@ -1995,6 +2484,8 @@ static int mdt_set_info(struct tgt_session_info *tsi)
                        __swab32s(&cs->cs_id);
                }
 
+               if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
+                       RETURN(-EACCES);
                rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
                                   vallen, val, NULL);
        } else if (KEY_IS(KEY_EVICT_BY_NID)) {
@@ -2044,17 +2535,17 @@ static int mdt_readpage(struct tgt_session_info *tsi)
                                exp_max_brw_size(tsi->tsi_exp));
        rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
                          PAGE_SHIFT;
-        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
-        if (rdpg->rp_pages == NULL)
-                RETURN(-ENOMEM);
+       OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
+       if (rdpg->rp_pages == NULL)
+               RETURN(-ENOMEM);
 
-        for (i = 0; i < rdpg->rp_npages; ++i) {
+       for (i = 0; i < rdpg->rp_npages; ++i) {
                rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
-                if (rdpg->rp_pages[i] == NULL)
-                        GOTO(free_rdpg, rc = -ENOMEM);
-        }
+               if (rdpg->rp_pages[i] == NULL)
+                       GOTO(free_rdpg, rc = -ENOMEM);
+       }
 
-        /* call lower layers to fill allocated pages with directory data */
+       /* call lower layers to fill allocated pages with directory data */
        rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
        if (rc < 0)
                GOTO(free_rdpg, rc);
@@ -2068,7 +2559,7 @@ free_rdpg:
        for (i = 0; i < rdpg->rp_npages; i++)
                if (rdpg->rp_pages[i] != NULL)
                        __free_page(rdpg->rp_pages[i]);
-       OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+       OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
                RETURN(0);
@@ -2111,17 +2602,27 @@ static void mdt_preset_secctx_size(struct mdt_thread_info *info)
            req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
                                  RCL_CLIENT)) {
                if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
-                                        RCL_CLIENT) != 0) {
+                                        RCL_CLIENT) != 0)
                        /* pre-set size in server part with max size */
                        req_capsule_set_size(pill, &RMF_FILE_SECCTX,
                                             RCL_SERVER,
-                                            info->mti_mdt->mdt_max_ea_size);
-               } else {
+                                            OBD_MAX_DEFAULT_EA_SIZE);
+               else
                        req_capsule_set_size(pill, &RMF_FILE_SECCTX,
                                             RCL_SERVER, 0);
-               }
        }
+}
+
+static void mdt_preset_encctx_size(struct mdt_thread_info *info)
+{
+       struct req_capsule *pill = info->mti_pill;
 
+       if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
+                                 RCL_SERVER))
+               /* pre-set size in server part with max size */
+               req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
+                                    RCL_SERVER,
+                                    info->mti_mdt->mdt_max_mdsize);
 }
 
 static int mdt_reint_internal(struct mdt_thread_info *info,
@@ -2163,6 +2664,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                                     LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
        mdt_preset_secctx_size(info);
+       mdt_preset_encctx_size(info);
 
        rc = req_capsule_server_pack(pill);
        if (rc != 0) {
@@ -2196,7 +2698,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
        if (rc < 0) {
                GOTO(out_ucred, rc);
        } else if (rc == 1) {
-               DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
+               DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
                rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
                GOTO(out_ucred, rc);
        }
@@ -2213,14 +2715,11 @@ out_shrink:
 
        /*
         * Data-on-MDT optimization - read data along with OPEN and return it
-        * in reply. Do that only if we have both DOM and LAYOUT locks.
+        * in reply when possible.
         */
-       if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) &&
-           info->mti_attr.ma_lmm != NULL &&
-           mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) {
+       if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
                rc = mdt_dom_read_on_open(info, info->mti_mdt,
                                          &lhc->mlh_reg_lh);
-       }
 
        return rc;
 }
@@ -2302,7 +2801,7 @@ int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
                           struct mdt_object *mo)
 {
-       int rc;
+       int rc = 0;
 
        ENTRY;
 
@@ -2313,7 +2812,16 @@ static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
                RETURN(-ESTALE);
        }
 
-       rc = mo_object_sync(env, mdt_object_child(mo));
+       if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
+               struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
+               dt_obj_version_t version;
+
+               version = dt_version_get(env, mdt_obj2dt(mo));
+               if (version > tgt->lut_obd->obd_last_committed)
+                       rc = mo_object_sync(env, mdt_object_child(mo));
+       } else {
+               rc = mo_object_sync(env, mdt_object_child(mo));
+       }
 
        RETURN(rc);
 }
@@ -2323,6 +2831,7 @@ static int mdt_sync(struct tgt_session_info *tsi)
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
        struct req_capsule      *pill = tsi->tsi_pill;
        struct mdt_body         *body;
+       ktime_t                  kstart = ktime_get();
        int                      rc;
 
        ENTRY;
@@ -2335,6 +2844,9 @@ static int mdt_sync(struct tgt_session_info *tsi)
        } else {
                struct mdt_thread_info *info = tsi2mdt_info(tsi);
 
+               if (unlikely(info->mti_object == NULL))
+                       RETURN(-EPROTO);
+
                /* sync an object */
                rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
                                     info->mti_object);
@@ -2356,7 +2868,8 @@ static int mdt_sync(struct tgt_session_info *tsi)
                mdt_thread_info_fini(info);
        }
        if (rc == 0)
-               mdt_counter_incr(req, LPROC_MDT_SYNC);
+               mdt_counter_incr(req, LPROC_MDT_SYNC,
+                                ktime_us_delta(ktime_get(), kstart));
 
        RETURN(rc);
 }
@@ -2415,17 +2928,17 @@ put:
  */
 static int mdt_quotactl(struct tgt_session_info *tsi)
 {
-       struct obd_export       *exp  = tsi->tsi_exp;
-       struct req_capsule      *pill = tsi->tsi_pill;
-       struct obd_quotactl     *oqctl, *repoqc;
-       int                      id, rc;
-       struct mdt_device       *mdt = mdt_exp2dev(exp);
-       struct lu_device        *qmt = mdt->mdt_qmt_dev;
-       struct lu_nodemap       *nodemap;
+       struct obd_export *exp  = tsi->tsi_exp;
+       struct req_capsule *pill = tsi->tsi_pill;
+       struct obd_quotactl *oqctl, *repoqc;
+       int id, rc;
+       struct mdt_device *mdt = mdt_exp2dev(exp);
+       struct lu_device *qmt = mdt->mdt_qmt_dev;
+       struct lu_nodemap *nodemap;
        ENTRY;
 
        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
-       if (oqctl == NULL)
+       if (!oqctl)
                RETURN(err_serious(-EPROTO));
 
        rc = req_capsule_server_pack(pill);
@@ -2441,20 +2954,28 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_SETINFO:
        case Q_SETQUOTA:
        case LUSTRE_Q_SETDEFAULT:
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
                if (!nodemap_can_setquota(nodemap))
                        GOTO(out_nodemap, rc = -EPERM);
+               /* fallthrough */
        case Q_GETINFO:
        case Q_GETQUOTA:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_GETQUOTAPOOL:
+       case LUSTRE_Q_GETINFOPOOL:
                if (qmt == NULL)
                        GOTO(out_nodemap, rc = -EOPNOTSUPP);
                /* slave quotactl */
+               /* fallthrough */
        case Q_GETOINFO:
        case Q_GETOQUOTA:
                break;
        default:
-               CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
-               GOTO(out_nodemap, rc = -EFAULT);
+               rc = -EFAULT;
+               CERROR("%s: unsupported quotactl command %d: rc = %d\n",
+                      mdt_obd_name(mdt), oqctl->qc_cmd, rc);
+               GOTO(out_nodemap, rc);
        }
 
        id = oqctl->qc_id;
@@ -2497,6 +3018,10 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_GETQUOTA:
        case LUSTRE_Q_SETDEFAULT:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_GETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
+       case LUSTRE_Q_GETINFOPOOL:
                /* forward quotactl request to QMT */
                rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
                break;
@@ -2516,8 +3041,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
-       *repoqc = *oqctl;
-
+       QCTL_COPY(repoqc, oqctl);
        EXIT;
 
 out_nodemap:
@@ -2817,10 +3341,9 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                        rc = lu_env_init(&env, LCT_MD_THREAD);
                        if (unlikely(rc != 0)) {
-                               CWARN("%s: lu_env initialization failed, object"
-                                     "%p "DFID" is leaked!\n",
+                               CWARN("%s: lu_env initialization failed, object %p "DFID" is leaked!: rc = %d\n",
                                      obd->obd_name, mo,
-                                     PFID(mdt_object_fid(mo)));
+                                     PFID(mdt_object_fid(mo)), rc);
                                RETURN(rc);
                        }
 
@@ -2939,10 +3462,9 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
                                          cache);
 }
 
-static int mdt_object_local_lock(struct mdt_thread_info *info,
-                                struct mdt_object *o,
-                                struct mdt_lock_handle *lh, __u64 *ibits,
-                                __u64 trybits, bool cos_incompat)
+int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
+                         struct mdt_lock_handle *lh, __u64 *ibits,
+                         __u64 trybits, bool cos_incompat)
 {
        struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
        union ldlm_policy_data *policy = &info->mti_policy;
@@ -3026,6 +3548,7 @@ static int mdt_object_local_lock(struct mdt_thread_info *info,
 
        policy->l_inodebits.bits = *ibits;
        policy->l_inodebits.try_bits = trybits;
+       policy->l_inodebits.li_gid = lh->mlh_gid;
 
         /*
          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
@@ -3092,8 +3615,9 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
                 * object anyway XXX*/
                if (lh->mlh_type == MDT_PDO_LOCK &&
                    lh->mlh_pdo_hash != 0) {
-                       CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to"
-                              "EX lock.\n", mdt_obd_name(info->mti_mdt),
+                       CDEBUG(D_INFO,
+                              "%s: "DFID" convert PDO lock to EX lock.\n",
+                              mdt_obd_name(info->mti_mdt),
                               PFID(mdt_object_fid(o)));
                        lh->mlh_pdo_hash = 0;
                        lh->mlh_rreg_mode = LCK_EX;
@@ -3233,20 +3757,18 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info,
 
        if (lustre_handle_is_used(h)) {
                struct ldlm_lock *lock = ldlm_handle2lock(h);
+               struct ptlrpc_request *req = mdt_info_req(info);
 
                if (o != NULL &&
                    (lock->l_policy_data.l_inodebits.bits &
                     (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)))
                        mo_invalidate(info->mti_env, mdt_object_child(o));
 
-               if (decref || !info->mti_has_trans ||
+               if (decref || !info->mti_has_trans || !req ||
                    !(mode & (LCK_PW | LCK_EX))) {
                        ldlm_lock_decref_and_cancel(h, mode);
                        LDLM_LOCK_PUT(lock);
                } else {
-                       struct ptlrpc_request *req = mdt_info_req(info);
-
-                       LASSERT(req != NULL);
                        tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock,
                                          req->rq_transno);
                        ldlm_lock_decref(h, mode);
@@ -3395,6 +3917,7 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
                                             LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
                mdt_preset_secctx_size(info);
+               mdt_preset_encctx_size(info);
 
                rc = req_capsule_server_pack(pill);
                if (rc)
@@ -3461,7 +3984,6 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
        info->mti_spec.sp_permitted = 0;
-       info->mti_spec.sp_migrate_close = 0;
 
        info->mti_spec.u.sp_ea.eadata = NULL;
        info->mti_spec.u.sp_ea.eadatalen = 0;
@@ -3481,6 +4003,7 @@ void mdt_thread_info_fini(struct mdt_thread_info *info)
        info->mti_env = NULL;
        info->mti_pill = NULL;
        info->mti_exp = NULL;
+       info->mti_mdt = NULL;
 
        if (unlikely(info->mti_big_buf.lb_buf != NULL))
                lu_buf_free(&info->mti_big_buf);
@@ -3508,10 +4031,8 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi)
 {
        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) &&
            cfs_fail_val ==
-           tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) {
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               schedule_timeout(msecs_to_jiffies(3 * MSEC_PER_SEC));
-       }
+           tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id)
+               schedule_timeout_uninterruptible(cfs_time_seconds(3));
 
        return tgt_connect(tsi);
 }
@@ -3656,7 +4177,7 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info,
         * If the xid matches, then we know this is a resent request, and allow
         * it. (It's probably an OPEN, for which we don't send a lock.
         */
-       if (req_can_reconstruct(req, NULL))
+       if (req_can_reconstruct(req, NULL) != 0)
                return;
 
         /*
@@ -3760,7 +4281,7 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc,
                GOTO(out_shrink, rc = -EINVAL);
        }
 
-       rc = mdt_init_ucred_intent_getattr(info, reqbody);
+       rc = mdt_init_ucred(info, reqbody);
        if (rc)
                GOTO(out_shrink, rc);
 
@@ -3798,13 +4319,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
                             struct ldlm_lock **lockp,
                             __u64 flags)
 {
-       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT];
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
        struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP };
        struct layout_intent *intent;
+       struct ldlm_reply *ldlm_rep;
        struct lu_fid *fid = &info->mti_tmp_fid2;
        struct mdt_object *obj = NULL;
        int layout_size = 0;
+       struct lu_buf *buf = &layout.mlc_buf;
        int rc = 0;
+
        ENTRY;
 
        fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
@@ -3832,24 +4356,16 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
        case LAYOUT_INTENT_RESTORE:
                CERROR("%s: Unsupported layout intent opc %d\n",
                       mdt_obd_name(info->mti_mdt), intent->li_opc);
-               rc = -ENOTSUPP;
-               break;
+               RETURN(-ENOTSUPP);
        default:
                CERROR("%s: Unknown layout intent opc %d\n",
                       mdt_obd_name(info->mti_mdt), intent->li_opc);
-               rc = -EINVAL;
-               break;
+               RETURN(-EINVAL);
        }
-       if (rc < 0)
-               RETURN(rc);
-
-       /* Get lock from request for possible resent case. */
-       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
 
        obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
        if (IS_ERR(obj))
-               GOTO(out, rc = PTR_ERR(obj));
-
+               RETURN(PTR_ERR(obj));
 
        if (mdt_object_exists(obj) && !mdt_object_remote(obj)) {
                /* if layout is going to be changed don't use the current EA
@@ -3861,7 +4377,7 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
                } else {
                        layout_size = mdt_attr_get_eabuf_size(info, obj);
                        if (layout_size < 0)
-                               GOTO(out_obj, rc = layout_size);
+                               GOTO(out, rc = layout_size);
 
                        if (layout_size > info->mti_mdt->mdt_max_mdsize)
                                info->mti_mdt->mdt_max_mdsize = layout_size;
@@ -3874,72 +4390,68 @@ static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
         * set reply buffer size, so that ldlm_handle_enqueue0()->
         * ldlm_lvbo_fill() will fill the reply buffer with lovea.
         */
-       (*lockp)->l_lvb_type = LVB_T_LAYOUT;
        req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
                             layout_size);
        rc = req_capsule_server_pack(info->mti_pill);
        if (rc)
-               GOTO(out_obj, rc);
+               GOTO(out, rc);
 
+       ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+       if (!ldlm_rep)
+               GOTO(out, rc = -EPROTO);
 
-       if (layout.mlc_opc != MD_LAYOUT_NOP) {
-               struct lu_buf *buf = &layout.mlc_buf;
+       mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
 
-               /**
-                * mdt_layout_change is a reint operation, when the request
-                * is resent, layout write shouldn't reprocess it again.
-                */
-               rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
-               if (rc)
-                       GOTO(out_obj, rc = rc < 0 ? rc : 0);
+       /* take lock in ldlm_lock_enqueue() for LAYOUT_INTENT_ACCESS */
+       if (layout.mlc_opc == MD_LAYOUT_NOP)
+               GOTO(out, rc = 0);
 
-               /**
-                * There is another resent case: the client's job has been
-                * done by another client, referring lod_declare_layout_change
-                * -EALREADY case, and it became a operation w/o transaction,
-                * so we should not do the layout change, otherwise
-                * mdt_layout_change() will try to cancel the granted server
-                * CR lock whose remote counterpart is still in hold on the
-                * client, and a deadlock ensues.
-                */
-               rc = mdt_check_resent_lock(info, obj, lhc);
-               if (rc <= 0)
-                       GOTO(out_obj, rc);
-
-               buf->lb_buf = NULL;
-               buf->lb_len = 0;
-               if (unlikely(req_is_replay(mdt_info_req(info)))) {
-                       buf->lb_buf = req_capsule_client_get(info->mti_pill,
-                                       &RMF_EADATA);
-                       buf->lb_len = req_capsule_get_size(info->mti_pill,
-                                       &RMF_EADATA, RCL_CLIENT);
-                       /*
-                        * If it's a replay of layout write intent RPC, the
-                        * client has saved the extended lovea when
-                        * it get reply then.
-                        */
-                       if (buf->lb_len > 0)
-                               mdt_fix_lov_magic(info, buf->lb_buf);
-               }
+       rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
+       if (rc < 0)
+               GOTO(out, rc);
+       if (rc == 1) {
+               DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
+               rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
+               GOTO(out, rc);
+       }
+
+       buf->lb_buf = NULL;
+       buf->lb_len = 0;
+       if (unlikely(req_is_replay(mdt_info_req(info)))) {
+               buf->lb_buf = req_capsule_client_get(info->mti_pill,
+                                                    &RMF_EADATA);
+               buf->lb_len = req_capsule_get_size(info->mti_pill,
+                                                    &RMF_EADATA, RCL_CLIENT);
                /*
-                * Instantiate some layout components, if @buf contains
-                * lovea, then it's a replay of the layout intent write
-                * RPC.
+                * If it's a replay of layout write intent RPC, the client has
+                * saved the extended lovea when it get reply then.
                 */
-               rc = mdt_layout_change(info, obj, &layout);
-               if (rc)
-                       GOTO(out_obj, rc);
+               if (buf->lb_len > 0)
+                       mdt_fix_lov_magic(info, buf->lb_buf);
        }
-out_obj:
-       mdt_object_put(info->mti_env, obj);
 
-       if (rc == 0 && lustre_handle_is_used(&lhc->mlh_reg_lh))
+       /* Get lock from request for possible resent case. */
+       mdt_intent_fixup_resent(info, *lockp, lhc, flags);
+       (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+
+       /*
+        * Instantiate some layout components, if @buf contains lovea, then it's
+        * a replay of the layout intent write RPC.
+        */
+       rc = mdt_layout_change(info, obj, lhc, &layout);
+       ldlm_rep->lock_policy_res2 = clear_serious(rc);
+
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
+               if (rc == ELDLM_LOCK_REPLACED &&
+                   (*lockp)->l_granted_mode == LCK_EX)
+                       ldlm_lock_mode_downgrade(*lockp, LCK_CR);
+       }
 
+       EXIT;
 out:
-       lhc->mlh_reg_lh.cookie = 0;
-
-       RETURN(rc);
+       mdt_object_put(info->mti_env, obj);
+       return rc;
 }
 
 static int mdt_intent_open(enum ldlm_intent_flags it_opc,
@@ -3951,6 +4463,7 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
         struct ldlm_reply      *rep = NULL;
         long                    opc;
         int                     rc;
+       struct ptlrpc_request  *req = mdt_info_req(info);
 
         static const struct req_format *intent_fmts[REINT_MAX] = {
                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
@@ -3968,6 +4481,9 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
 
         rc = mdt_reint_internal(info, lhc, opc);
 
+       if (rc < 0 && lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+               DEBUG_REQ(D_ERROR, req, "Replay open failed with %d", rc);
+
        /* Check whether the reply has been packed successfully. */
        if (mdt_info_req(info)->rq_repmsg != NULL)
                rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
@@ -4030,6 +4546,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
                          u64);
        enum tgt_handler_flags it_handler_flags = 0;
        struct ldlm_reply *rep;
+       bool check_mdt_object = false;
        int rc;
        ENTRY;
 
@@ -4046,12 +4563,15 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
                it_handler = &mdt_intent_open;
                break;
        case IT_GETATTR:
+               check_mdt_object = true;
+               /* fallthrough */
        case IT_LOOKUP:
                it_format = &RQF_LDLM_INTENT_GETATTR;
                it_handler = &mdt_intent_getattr;
                it_handler_flags = HAS_REPLY;
                break;
        case IT_GETXATTR:
+               check_mdt_object = true;
                it_format = &RQF_LDLM_INTENT_GETXATTR;
                it_handler = &mdt_intent_getxattr;
                it_handler_flags = HAS_BODY;
@@ -4097,6 +4617,9 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
        if (rc < 0)
                RETURN(rc);
 
+       if (unlikely(info->mti_object == NULL && check_mdt_object))
+               RETURN(-EPROTO);
+
        if (it_handler_flags & IS_MUTABLE && mdt_rdonly(req->rq_export))
                RETURN(-EROFS);
 
@@ -4173,6 +4696,23 @@ static int mdt_intent_policy(const struct lu_env *env,
                } else {
                        rc = err_serious(-EFAULT);
                }
+       } else if (ldesc->l_resource.lr_type == LDLM_IBITS &&
+                  ldesc->l_policy_data.l_inodebits.bits == MDS_INODELOCK_DOM) {
+               struct ldlm_reply *rep;
+
+               /* No intent was provided but INTENT flag is set along with
+                * DOM bit, this is considered as GLIMPSE request.
+                * This logic is common for MDT and OST glimpse
+                */
+               mdt_ptlrpc_stats_update(req, IT_GLIMPSE);
+               rc = mdt_glimpse_enqueue(info, ns, lockp, flags);
+               /* Check whether the reply has been packed successfully. */
+               if (req->rq_repmsg != NULL) {
+                       rep = req_capsule_server_get(info->mti_pill,
+                                                    &RMF_DLM_REP);
+                       rep->lock_policy_res2 =
+                               ptlrpc_status_hton(rep->lock_policy_res2);
+               }
        } else {
                /* No intent was provided */
                req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
@@ -4305,9 +4845,8 @@ out_free:
  */
 static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
 {
-       struct seq_server_site  *ss = mdt_seq_site(mdt);
-       int                     rc;
-       char                    *prefix;
+       struct seq_server_site *ss = mdt_seq_site(mdt);
+       char *prefix;
        ENTRY;
 
        /* check if this is adding the first MDC and controller is not yet
@@ -4325,19 +4864,12 @@ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
 
        /* Note: seq_client_fini will be called in seq_site_fini */
        snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt));
-       rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
-                            prefix, ss->ss_node_id == 0 ?  ss->ss_control_seq :
+       seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
+                       prefix, ss->ss_node_id == 0 ?  ss->ss_control_seq :
                                                            NULL);
        OBD_FREE(prefix, MAX_OBD_NAME + 5);
-       if (rc != 0) {
-               OBD_FREE_PTR(ss->ss_client_seq);
-               ss->ss_client_seq = NULL;
-               RETURN(rc);
-       }
-
-       rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
 
-       RETURN(rc);
+       RETURN(seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq));
 }
 
 static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt)
@@ -4790,9 +5322,12 @@ static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt,
        mdt->mdt_qmt_dev = obd->obd_lu_dev;
 
        /* configure local quota objects */
-       rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env,
-                                                  &mdt->mdt_lu_dev,
-                                                  mdt->mdt_qmt_dev);
+       if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_INIT))
+               rc = -EBADF;
+       else
+               rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env,
+                                                          &mdt->mdt_lu_dev,
+                                                          mdt->mdt_qmt_dev);
        if (rc)
                GOTO(class_cleanup, rc);
 
@@ -4812,6 +5347,7 @@ class_cleanup:
        if (rc) {
                class_manual_cleanup(obd);
                mdt->mdt_qmt_dev = NULL;
+               GOTO(lcfg_cleanup, rc);
        }
 class_detach:
        if (rc)
@@ -4856,12 +5392,25 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi)
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
        int                      rc;
 
+       if (unlikely(info->mti_object == NULL))
+               return -EPROTO;
+
        rc = mdt_getxattr(info);
 
        mdt_thread_info_fini(info);
        return rc;
 }
 
+static int mdt_llog_open(struct tgt_session_info *tsi)
+{
+       ENTRY;
+
+       if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
+               RETURN(err_serious(-EACCES));
+
+       RETURN(tgt_llog_open(tsi));
+}
+
 #define OBD_FAIL_OST_READ_NET  OBD_FAIL_OST_BRW_NET
 #define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
 #define OST_BRW_READ   OST_READ
@@ -4876,7 +5425,7 @@ TGT_RPC_HANDLER(MDS_FIRST_OPC,
                &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
 TGT_RPC_HANDLER(MDS_FIRST_OPC,
                HAS_REPLY,              MDS_SET_INFO,   mdt_set_info,
-               &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION),
+               &RQF_MDT_SET_INFO, LUSTRE_MDS_VERSION),
 TGT_MDT_HDL(0,                         MDS_GET_INFO,   mdt_get_info),
 TGT_MDT_HDL(HAS_REPLY,         MDS_GET_ROOT,   mdt_get_root),
 TGT_MDT_HDL(HAS_BODY,          MDS_GETATTR,    mdt_getattr),
@@ -4905,6 +5454,7 @@ TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_REQUEST,
 TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE,
            MDS_SWAP_LAYOUTS,
            mdt_swap_layouts),
+TGT_MDT_HDL(IS_MUTABLE,                MDS_RMFID,      mdt_rmfid),
 };
 
 static struct tgt_handler mdt_io_ops[] = {
@@ -4916,6 +5466,7 @@ TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE,
                                         OST_PUNCH,     mdt_punch_hdl,
                                                        mdt_hp_punch),
 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC,    mdt_data_sync),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek),
 };
 
 static struct tgt_handler mdt_sec_ctx_ops[] = {
@@ -4928,6 +5479,13 @@ static struct tgt_handler mdt_quota_ops[] = {
 TGT_QUOTA_HDL(HAS_REPLY,               QUOTA_DQACQ,      mdt_quota_dqacq),
 };
 
+static struct tgt_handler mdt_llog_handlers[] = {
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE,      mdt_llog_open),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
+};
+
 static struct tgt_opc_slice mdt_common_slice[] = {
        {
                .tos_opc_start  = MDS_FIRST_OPC,
@@ -4972,7 +5530,7 @@ static struct tgt_opc_slice mdt_common_slice[] = {
        {
                .tos_opc_start  = LLOG_FIRST_OPC,
                .tos_opc_end    = LLOG_LAST_OPC,
-               .tos_hs         = tgt_llog_handlers
+               .tos_hs         = mdt_llog_handlers
        },
        {
                .tos_opc_start  = LFSCK_FIRST_OPC,
@@ -5002,11 +5560,13 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
 
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
+
+       mdt_restriper_stop(m);
        ping_evictor_stop();
 
        /* Remove the HSM /proc entry so the coordinator cannot be
-        * restarted by a user while it's shutting down. */
-       hsm_cdt_procfs_fini(m);
+        * restarted by a user while it's shutting down.
+        */
        mdt_hsm_cdt_stop(m);
 
        mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
@@ -5026,7 +5586,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        /* Calling the cleanup functions in the same order as in the mdt_init0
         * error path
         */
-       mdt_procfs_fini(m);
+       mdt_tunables_fini(m);
 
        target_recovery_fini(obd);
        upcall_cache_cleanup(m->mdt_identity_cache);
@@ -5121,30 +5681,39 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                RETURN(-EFAULT);
        } else {
                lsi = s2lsi(lmi->lmi_sb);
+               LASSERT(lsi->lsi_lmd);
                /* CMD is supported only in IAM mode */
                LASSERT(num);
-               node_id = simple_strtol(num, NULL, 10);
+               rc = kstrtol(num, 10, &node_id);
+               if (rc)
+                       RETURN(rc);
+
                obd->u.obt.obt_magic = OBT_MAGIC;
-               if (lsi->lsi_lmd != NULL &&
-                   lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
+               if (lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
                        m->mdt_skip_lfsck = 1;
        }
 
-       /* DoM files get IO lock at open by default */
-       m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
+       /* Just try to get a DoM lock by default. Otherwise, having a group
+        * lock granted, it may get blocked for a long time. */
+       m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN;
        /* DoM files are read at open and data is packed in the reply */
        m->mdt_opts.mo_dom_read_open = 1;
 
        m->mdt_squash.rsi_uid = 0;
        m->mdt_squash.rsi_gid = 0;
        INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
-       init_rwsem(&m->mdt_squash.rsi_sem);
+       spin_lock_init(&m->mdt_squash.rsi_lock);
        spin_lock_init(&m->mdt_lock);
        m->mdt_enable_remote_dir = 1;
        m->mdt_enable_striped_dir = 1;
        m->mdt_enable_dir_migration = 1;
+       m->mdt_enable_dir_restripe = 0;
+       m->mdt_enable_dir_auto_split = 0;
        m->mdt_enable_remote_dir_gid = 0;
+       m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_remote_rename = 1;
+       m->mdt_dir_restripe_nsonly = 1;
+       m->mdt_enable_remote_subdir_mount = 1;
 
        atomic_set(&m->mdt_mds_mds_conns, 0);
        atomic_set(&m->mdt_async_commit_count, 0);
@@ -5200,8 +5769,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                                              LDLM_NAMESPACE_SERVER,
                                              LDLM_NAMESPACE_GREEDY,
                                              LDLM_NS_TYPE_MDT);
-       if (m->mdt_namespace == NULL)
-               GOTO(err_fini_seq, rc = -ENOMEM);
+       if (IS_ERR(m->mdt_namespace)) {
+               rc = PTR_ERR(m->mdt_namespace);
+               CERROR("%s: unable to create server namespace: rc = %d\n",
+                      obd->obd_name, rc);
+               m->mdt_namespace = NULL;
+               GOTO(err_fini_seq, rc);
+       }
 
        m->mdt_namespace->ns_lvbp = m;
        m->mdt_namespace->ns_lvbo = &mdt_lvbo;
@@ -5217,8 +5791,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                GOTO(err_free_ns, rc);
 
        /* Amount of available space excluded from granting and reserved
-        * for metadata. It is in percentage and 50% is default value. */
-       tgd->tgd_reserved_pcnt = 50;
+        * for metadata. It is a percentage of the total MDT size. */
+       tgd->tgd_reserved_pcnt = 10;
 
        if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits))
                m->mdt_brw_size = 1U << tgd->tgd_blockbits;
@@ -5276,7 +5850,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                GOTO(err_free_hsm, rc);
        }
 
-       rc = mdt_procfs_init(m, dev);
+       rc = mdt_tunables_init(m, dev);
        if (rc) {
                CERROR("Can't init MDT lprocfs, rc %d\n", rc);
                GOTO(err_recovery, rc);
@@ -5302,11 +5876,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
                ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
 
+       if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_LOCAL_RECOV))
+               m->mdt_lut.lut_local_recovery = 1;
+
+       rc = mdt_restriper_start(m);
+       if (rc)
+               GOTO(err_ping_evictor, rc);
+
        RETURN(0);
+
+err_ping_evictor:
+       ping_evictor_stop();
 err_procfs:
-       mdt_procfs_fini(m);
+       mdt_tunables_fini(m);
 err_recovery:
-       target_recovery_fini(obd);
        upcall_cache_cleanup(m->mdt_identity_cache);
        m->mdt_identity_cache = NULL;
 err_free_hsm:
@@ -5317,6 +5900,11 @@ err_los_fini:
 err_fs_cleanup:
        mdt_fs_cleanup(env, m);
 err_tgt:
+       /* keep recoverable clients */
+       obd->obd_fail = 1;
+       target_recovery_fini(obd);
+       obd_exports_barrier(obd);
+       obd_zombie_barrier();
        tgt_fini(env, &m->mdt_lut);
 err_free_ns:
        ldlm_namespace_free(m->mdt_namespace, NULL, 0);
@@ -5358,12 +5946,12 @@ static int mdt_process_config(const struct lu_env *env,
 
        switch (cfg->lcfg_command) {
        case LCFG_PARAM: {
-               struct obd_device          *obd = d->ld_obd;
-
+               struct obd_device *obd = d->ld_obd;
                /* For interoperability */
-               struct cfg_interop_param   *ptr = NULL;
-               struct lustre_cfg          *old_cfg = NULL;
-               char                       *param = NULL;
+               struct cfg_interop_param *ptr = NULL;
+               struct lustre_cfg *old_cfg = NULL;
+               char *param = NULL;
+               ssize_t count;
 
                param = lustre_cfg_string(cfg, 1);
                if (param == NULL) {
@@ -5392,17 +5980,22 @@ static int mdt_process_config(const struct lu_env *env,
                        }
                }
 
-               rc = class_process_proc_param(PARAM_MDT, obd->obd_vars,
-                                             cfg, obd);
-               if (rc > 0 || rc == -ENOSYS) {
+               count = class_modify_config(cfg, PARAM_MDT,
+                                           &obd->obd_kset.kobj);
+               if (count < 0) {
+                       struct coordinator *cdt = &m->mdt_coordinator;
+
                        /* is it an HSM var ? */
-                       rc = class_process_proc_param(PARAM_HSM,
-                                                     hsm_cdt_get_proc_vars(),
-                                                     cfg, obd);
-                       if (rc > 0 || rc == -ENOSYS)
+                       count = class_modify_config(cfg, PARAM_HSM,
+                                                   &cdt->cdt_hsm_kobj);
+                       if (count < 0)
                                /* we don't understand; pass it on */
                                rc = next->ld_ops->ldo_process_config(env, next,
                                                                      cfg);
+                       else
+                               rc = count > 0 ? 0 : count;
+               } else {
+                       rc = count > 0 ? 0 : count;
                }
 
                if (old_cfg)
@@ -5443,6 +6036,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                init_rwsem(&mo->mot_dom_sem);
                init_rwsem(&mo->mot_open_sem);
                atomic_set(&mo->mot_open_count, 0);
+               mo->mot_restripe_offset = 0;
+               INIT_LIST_HEAD(&mo->mot_restripe_linkage);
                RETURN(o);
        }
        RETURN(NULL);
@@ -5470,22 +6065,31 @@ static int mdt_object_init(const struct lu_env *env, struct lu_object *o,
         RETURN(rc);
 }
 
+static void mdt_object_free_rcu(struct rcu_head *head)
+{
+       struct mdt_object *mo = container_of(head, struct mdt_object,
+                                            mot_header.loh_rcu);
+
+       kmem_cache_free(mdt_object_kmem, mo);
+}
+
 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
 {
-        struct mdt_object *mo = mdt_obj(o);
-        struct lu_object_header *h;
-        ENTRY;
+       struct mdt_object *mo = mdt_obj(o);
+       struct lu_object_header *h;
+       ENTRY;
 
-        h = o->lo_header;
-        CDEBUG(D_INFO, "object free, fid = "DFID"\n",
-               PFID(lu_object_fid(o)));
+       h = o->lo_header;
+       CDEBUG(D_INFO, "object free, fid = "DFID"\n",
+              PFID(lu_object_fid(o)));
 
        LASSERT(atomic_read(&mo->mot_open_count) == 0);
        LASSERT(atomic_read(&mo->mot_lease_count) == 0);
 
        lu_object_fini(o);
        lu_object_header_fini(h);
-       OBD_SLAB_FREE_PTR(mo, mdt_object_kmem);
+       OBD_FREE_PRE(mo, sizeof(*mo), "slab-freed");
+       call_rcu(&mo->mot_header.loh_rcu, mdt_object_free_rcu);
 
        EXIT;
 }
@@ -5584,6 +6188,18 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
        RETURN(0);
 }
 
+static inline void mdt_enable_slc(struct mdt_device *mdt)
+{
+       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER)
+               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING;
+}
+
+static inline void mdt_disable_slc(struct mdt_device *mdt)
+{
+       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING)
+               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
+}
+
 /**
  * Match client and server connection feature flags.
  *
@@ -5720,11 +6336,7 @@ static int mdt_connect_internal(const struct lu_env *env,
        if (OCD_HAS_FLAG(data, CKSUM)) {
                __u32 cksum_types = data->ocd_cksum_types;
 
-               /* The client set in ocd_cksum_types the checksum types it
-                * supports. We have to mask off the algorithms that we don't
-                * support */
-               data->ocd_cksum_types &=
-                       obd_cksum_types_supported_server(obd_name);
+               tgt_mask_cksum_types(&mdt->mdt_lut, &data->ocd_cksum_types);
 
                if (unlikely(data->ocd_cksum_types == 0)) {
                        CERROR("%s: Connect with checksum support but no "
@@ -5744,6 +6356,15 @@ static int mdt_connect_internal(const struct lu_env *env,
                       exp->exp_obd->obd_name, obd_export_nid2str(exp));
        }
 
+       if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
+               atomic_inc(&mdt->mdt_mds_mds_conns);
+               mdt_enable_slc(mdt);
+       }
+
+       if (!mdt->mdt_lut.lut_dt_conf.ddp_has_lseek_data_hole)
+               data->ocd_connect_flags2 &= ~OBD_CONNECT2_LSEEK;
+
        return 0;
 }
 
@@ -5774,7 +6395,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env,
 
 static int mdt_export_cleanup(struct obd_export *exp)
 {
-       struct list_head         closing_list;
+       LIST_HEAD(closing_list);
        struct mdt_export_data  *med = &exp->exp_mdt_data;
        struct obd_device       *obd = exp->exp_obd;
        struct mdt_device       *mdt;
@@ -5784,7 +6405,6 @@ static int mdt_export_cleanup(struct obd_export *exp)
        int rc = 0;
        ENTRY;
 
-       INIT_LIST_HEAD(&closing_list);
        spin_lock(&med->med_open_lock);
        while (!list_empty(&med->med_open_head)) {
                struct list_head *tmp = med->med_open_head.next;
@@ -5841,31 +6461,20 @@ static int mdt_export_cleanup(struct obd_export *exp)
                                ma->ma_valid = MA_FLAGS;
                                ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
                        }
-                        mdt_mfd_close(info, mfd);
-                }
-        }
-        info->mti_mdt = NULL;
-        /* cleanup client slot early */
-        /* Do not erase record for recoverable client. */
-        if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
+                       ma->ma_valid |= MA_FORCE_LOG;
+                       mdt_mfd_close(info, mfd);
+               }
+       }
+       info->mti_mdt = NULL;
+       /* cleanup client slot early */
+       /* Do not erase record for recoverable client. */
+       if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
                tgt_client_del(&env, exp);
         lu_env_fini(&env);
 
         RETURN(rc);
 }
 
-static inline void mdt_enable_slc(struct mdt_device *mdt)
-{
-       if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
-               mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL;
-}
-
-static inline void mdt_disable_slc(struct mdt_device *mdt)
-{
-       if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL)
-               mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
-}
-
 static int mdt_obd_disconnect(struct obd_export *exp)
 {
        int rc;
@@ -5920,12 +6529,6 @@ static int mdt_obd_connect(const struct lu_env *env,
 
        mdt = mdt_dev(obd->obd_lu_dev);
 
-       if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
-           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
-               atomic_inc(&mdt->mdt_mds_mds_conns);
-               mdt_enable_slc(mdt);
-       }
-
        /*
         * first, check whether the stack is ready to handle requests
         * XXX: probably not very appropriate method is used now
@@ -6022,6 +6625,11 @@ static int mdt_init_export(struct obd_export *exp)
        exp->exp_connecting = 1;
        spin_unlock(&exp->exp_lock);
 
+       OBD_ALLOC(exp->exp_used_slots,
+                 BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+       if (exp->exp_used_slots == NULL)
+               RETURN(-ENOMEM);
+
         /* self-export doesn't need client data and ldlm initialization */
         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
                                      &exp->exp_client_uuid)))
@@ -6040,6 +6648,10 @@ static int mdt_init_export(struct obd_export *exp)
 err_free:
        tgt_client_free(exp);
 err:
+       OBD_FREE(exp->exp_used_slots,
+                BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+       exp->exp_used_slots = NULL;
+
        CERROR("%s: Failed to initialize export: rc = %d\n",
               exp->exp_obd->obd_name, rc);
        return rc;
@@ -6050,6 +6662,10 @@ static int mdt_destroy_export(struct obd_export *exp)
         ENTRY;
 
         target_destroy_export(exp);
+       if (exp->exp_used_slots)
+               OBD_FREE(exp->exp_used_slots,
+                        BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+
         /* destroy can be called from failed obd_setup, so
          * checking uuid is safer than obd_self_export */
         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
@@ -6125,18 +6741,20 @@ static int mdt_path_current(struct mdt_thread_info *info,
                            struct getinfo_fid2path *fp,
                            struct lu_fid *root_fid)
 {
-       struct mdt_device       *mdt = info->mti_mdt;
-       struct mdt_object       *mdt_obj;
-       struct link_ea_header   *leh;
-       struct link_ea_entry    *lee;
-       struct lu_name          *tmpname = &info->mti_name;
-       struct lu_fid           *tmpfid = &info->mti_tmp_fid1;
-       struct lu_buf           *buf = &info->mti_big_buf;
-       char                    *ptr;
-       int                     reclen;
-       struct linkea_data      ldata = { NULL };
-       int                     rc = 0;
-       bool                    first = true;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lu_name *tmpname = &info->mti_name;
+       struct lu_fid *tmpfid = &info->mti_tmp_fid1;
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct md_attr *ma = &info->mti_attr;
+       struct linkea_data ldata = { NULL };
+       bool first = true;
+       struct mdt_object *mdt_obj;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       char *ptr;
+       int reclen;
+       int rc = 0;
+
        ENTRY;
 
        /* temp buffer for path element, the buffer will be finally freed
@@ -6152,8 +6770,6 @@ static int mdt_path_current(struct mdt_thread_info *info,
        *tmpfid = fp->gf_fid = *mdt_object_fid(obj);
 
        while (!lu_fid_eq(root_fid, &fp->gf_fid)) {
-               struct lu_buf           lmv_buf;
-
                if (!lu_fid_eq(root_fid, &mdt->mdt_md_root_fid) &&
                    lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid))
                        GOTO(out, rc = -ENOENT);
@@ -6201,26 +6817,25 @@ static int mdt_path_current(struct mdt_thread_info *info,
                                fp->gf_linkno++;
                }
 
-               lmv_buf.lb_buf = info->mti_xattr_buf;
-               lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
                /* Check if it is slave stripes */
-               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
-                                 &lmv_buf, XATTR_NAME_LMV);
+               rc = mdt_stripe_get(info, mdt_obj, ma, XATTR_NAME_LMV);
                mdt_object_put(info->mti_env, mdt_obj);
-               if (rc > 0) {
-                       union lmv_mds_md *lmm = lmv_buf.lb_buf;
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               if (ma->ma_valid & MA_LMV) {
+                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+
+                       if (!lmv_is_sane2(lmv))
+                               GOTO(out, rc = -EBADF);
 
                        /* For slave stripes, get its master */
-                       if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) {
+                       if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE) {
                                fp->gf_fid = *tmpfid;
                                continue;
                        }
-               } else if (rc < 0 && rc != -ENODATA) {
-                       GOTO(out, rc);
                }
 
-               rc = 0;
-
                /* Pack the name in the end of the buffer */
                ptr -= tmpname->ln_namelen;
                if (ptr - 1 <= fp->gf_u.gf_path)
@@ -6236,6 +6851,9 @@ static int mdt_path_current(struct mdt_thread_info *info,
                first = false;
        }
 
+       /* non-zero will be treated as an error */
+       rc = 0;
+
 remote_out:
        ptr++; /* skip leading / */
        memmove(fp->gf_u.gf_path, ptr,
@@ -6510,12 +7128,20 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                if (rc == 0)
                        rc = dt_ro(&env, dt);
                break;
-       case OBD_IOC_ABORT_RECOVERY:
+       case OBD_IOC_ABORT_RECOVERY: {
+               struct obd_ioctl_data *data = karg;
+
                CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
-               obd->obd_abort_recovery = 1;
+               if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT)
+                       obd->obd_abort_recov_mdt = 1;
+               else /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */
+                       /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */
+                       obd->obd_abort_recovery = 1;
+
                target_stop_recovery_thread(obd);
                rc = 0;
                break;
+       }
         case OBD_IOC_CHANGELOG_REG:
         case OBD_IOC_CHANGELOG_DEREG:
         case OBD_IOC_CHANGELOG_CLEAR:
@@ -6628,7 +7254,7 @@ static int mdt_obd_postrecov(struct obd_device *obd)
         return rc;
 }
 
-static struct obd_ops mdt_obd_device_ops = {
+static const struct obd_ops mdt_obd_device_ops = {
         .o_owner          = THIS_MODULE,
         .o_set_info_async = mdt_obd_set_info_async,
         .o_connect        = mdt_obd_connect,
@@ -6760,10 +7386,10 @@ int mdt_cos_is_enabled(struct mdt_device *mdt)
         return mdt->mdt_opts.mo_cos != 0;
 }
 
-static struct lu_device_type_operations mdt_device_type_ops = {
-        .ldto_device_alloc = mdt_device_alloc,
-        .ldto_device_free  = mdt_device_free,
-        .ldto_device_fini  = mdt_device_fini
+static const struct lu_device_type_operations mdt_device_type_ops = {
+       .ldto_device_alloc = mdt_device_alloc,
+       .ldto_device_free  = mdt_device_free,
+       .ldto_device_fini  = mdt_device_fini
 };
 
 static struct lu_device_type mdt_device_type = {
@@ -6777,10 +7403,10 @@ static int __init mdt_init(void)
 {
        int rc;
 
-       CLASSERT(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") ==
-                FID_NOBRACE_LEN + 1);
-       CLASSERT(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") ==
-                FID_LEN + 1);
+       BUILD_BUG_ON(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") !=
+                    FID_NOBRACE_LEN + 1);
+       BUILD_BUG_ON(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") !=
+                    FID_LEN + 1);
        rc = lu_kmem_init(mdt_caches);
        if (rc)
                return rc;
@@ -6789,7 +7415,7 @@ static int __init mdt_init(void)
        if (rc)
                GOTO(lu_fini, rc);
 
-       rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL,
+       rc = class_register_type(&mdt_obd_device_ops, NULL, true,
                                 LUSTRE_MDT_NAME, &mdt_device_type);
        if (rc)
                GOTO(mds_fini, rc);