Whamcloud - gitweb
LU-4684 mdt: improve directory stripe lock 25/31425/9
authorLai Siyao <lai.siyao@intel.com>
Sun, 21 Jan 2018 05:22:24 +0000 (13:22 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 29 May 2018 04:53:28 +0000 (04:53 +0000)
Striped directory has an implication that the first stripe is
local, and others are remote, but this is not true for migrating
directory because its stripes consists of both the original and
the newly created stripes.

This patch also put striped directory master object locking and
stripes locking into one function called mdt_reint_striped_lock(),
which simplifies locking code.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: I4724447e5b10c301b6799e1827f6d13a40876945
Reviewed-on: https://review.whamcloud.com/31425
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_dlm.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/lod/lod_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_reint.c

index 1a7c900..ae5a599 100644 (file)
@@ -942,6 +942,15 @@ struct ldlm_lock {
  *  which is for server. */
 #define l_slc_link l_rk_ast
 
+#define HANDLE_MAP_SIZE  ((LMV_MAX_STRIPE_COUNT + 7) >> 3)
+
+struct lustre_handle_array {
+       unsigned int            ha_count;
+       /* ha_map is used as bit flag to indicate handle is remote or local */
+       char                    ha_map[HANDLE_MAP_SIZE];
+       struct lustre_handle    ha_handles[0];
+};
+
 /**
  * LDLM resource description.
  * Basically, resource is a representation for a single object.
@@ -1141,6 +1150,7 @@ struct ldlm_enqueue_info {
        void            *ei_cb_gl;      /** lock glimpse callback */
        void            *ei_cbdata;     /** Data to be passed into callbacks. */
        void            *ei_namespace;  /** lock namespace **/
+       u64             ei_inodebits;   /** lock inode bits **/
        unsigned int    ei_enq_slave:1; /** whether enqueue slave stripes */
 };
 
index c9b467e..8ddc060 100644 (file)
@@ -564,11 +564,6 @@ static inline void lustre_handle_copy(struct lustre_handle *tgt,
        tgt->cookie = src->cookie;
 }
 
-struct lustre_handle_array {
-       unsigned int            count;
-       struct lustre_handle    handles[0];
-};
-
 /* lustre_msg struct magic.  DON'T use swabbed values of MAGIC as magic! */
 enum lustre_msg_magic {
        LUSTRE_MSG_MAGIC_V2             = 0x0BD00BD3,
index 1fac134..7bcc4f2 100644 (file)
@@ -1982,9 +1982,17 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                       idx, i, PFID(&fid));
                idx_array[i] = idx;
                /* Set the start index for next stripe allocation */
-               if (!is_specific && i < stripe_count - 1)
+               if (!is_specific && i < stripe_count - 1) {
+                       /*
+                        * for large dir test, put all other slaves on one
+                        * remote MDT, otherwise we may save too many local
+                        * slave locks which will exceed RS_MAX_LOCKS.
+                        */
+                       if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)))
+                               idx = master_index;
                        idx_array[i + 1] = (idx + 1) %
                                           (lod->lod_remote_mdt_count + 1);
+               }
                /* tgt_dt and fid must be ready after search avaible OSP
                 * in the above loop */
                LASSERT(tgt_dt != NULL);
@@ -5159,42 +5167,6 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
 }
 
 /**
- * Release LDLM locks on the stripes of a striped directory.
- *
- * Iterates over all the locks taken on the stripe objects and
- * cancel them.
- *
- * \param[in] env      execution environment
- * \param[in] dt       striped object
- * \param[in] einfo    lock description
- * \param[in] policy   data describing requested lock
- *
- * \retval             0 on success
- * \retval             negative if failed
- */
-static int lod_object_unlock_internal(const struct lu_env *env,
-                                     struct dt_object *dt,
-                                     struct ldlm_enqueue_info *einfo,
-                                     union ldlm_policy_data *policy)
-{
-       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
-       int                     rc = 0;
-       int                     i;
-       ENTRY;
-
-       if (slave_locks == NULL)
-               RETURN(0);
-
-       for (i = 1; i < slave_locks->count; i++) {
-               if (lustre_handle_is_used(&slave_locks->handles[i]))
-                       ldlm_lock_decref_and_cancel(&slave_locks->handles[i],
-                                                   einfo->ei_mode);
-       }
-
-       RETURN(rc);
-}
-
-/**
  * Implementation of dt_object_operations::do_object_unlock.
  *
  * Used to release LDLM lock(s).
@@ -5222,13 +5194,18 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
        LASSERT(!dt_object_remote(dt_object_child(dt)));
 
        /* locks were unlocked in MDT layer */
-       for (i = 1; i < slave_locks->count; i++) {
-               LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
+       for (i = 0; i < slave_locks->ha_count; i++)
+               LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i]));
+
+       /*
+        * NB, ha_count may not equal to ldo_dir_stripe_count, because dir
+        * layout may change, e.g., shrink dir layout after migration.
+        */
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++)
                dt_invalidate(env, lo->ldo_stripe[i]);
-       }
 
-       slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
-                          sizeof(slave_locks->handles[0]);
+       slave_locks_size = offsetof(typeof(*slave_locks),
+                                   ha_handles[slave_locks->ha_count]);
        OBD_FREE(slave_locks, slave_locks_size);
        einfo->ei_cbdata = NULL;
 
@@ -5249,11 +5226,11 @@ static int lod_object_lock(const struct lu_env *env,
                           struct ldlm_enqueue_info *einfo,
                           union ldlm_policy_data *policy)
 {
-       struct lod_object       *lo = lod_dt_obj(dt);
-       int                     rc = 0;
-       int                     i;
-       int                     slave_locks_size;
+       struct lod_object *lo = lod_dt_obj(dt);
+       int slave_locks_size;
        struct lustre_handle_array *slave_locks = NULL;
+       int i;
+       int rc;
        ENTRY;
 
        /* remote object lock */
@@ -5264,35 +5241,28 @@ static int lod_object_lock(const struct lu_env *env,
        }
 
        if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
-               GOTO(out, rc = -ENOTDIR);
+               RETURN(-ENOTDIR);
 
        rc = lod_load_striping(env, lo);
        if (rc != 0)
-               GOTO(out, rc);
+               RETURN(rc);
 
        /* No stripes */
-       if (lo->ldo_dir_stripe_count <= 1) {
-               /*
-                * NB, ei_cbdata stores pointer to slave locks, if no locks
-                * taken, make sure it's set to NULL, otherwise MDT will try to
-                * unlock them.
-                */
-               einfo->ei_cbdata = NULL;
-               GOTO(out, rc = 0);
-       }
+       if (lo->ldo_dir_stripe_count <= 1)
+               RETURN(0);
 
-       slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripe_count *
-                          sizeof(slave_locks->handles[0]);
+       slave_locks_size = offsetof(typeof(*slave_locks),
+                                   ha_handles[lo->ldo_dir_stripe_count]);
        /* Freed in lod_object_unlock */
        OBD_ALLOC(slave_locks, slave_locks_size);
-       if (slave_locks == NULL)
-               GOTO(out, rc = -ENOMEM);
-       slave_locks->count = lo->ldo_dir_stripe_count;
+       if (!slave_locks)
+               RETURN(-ENOMEM);
+       slave_locks->ha_count = lo->ldo_dir_stripe_count;
 
        /* striped directory lock */
-       for (i = 1; i < lo->ldo_dir_stripe_count; i++) {
-               struct lustre_handle    lockh;
-               struct ldlm_res_id      *res_id;
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               struct lustre_handle lockh;
+               struct ldlm_res_id *res_id;
 
                res_id = &lod_env_info(env)->lti_res_id;
                fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
@@ -5300,23 +5270,20 @@ static int lod_object_lock(const struct lu_env *env,
                einfo->ei_res_id = res_id;
 
                LASSERT(lo->ldo_stripe[i] != NULL);
-               if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+               if (dt_object_remote(lo->ldo_stripe[i])) {
+                       set_bit(i, (void *)slave_locks->ha_map);
                        rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
                                            einfo, policy);
                } else {
                        struct ldlm_namespace *ns = einfo->ei_namespace;
                        ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
                        ldlm_completion_callback completion = einfo->ei_cb_cp;
-                       __u64   dlmflags = LDLM_FL_ATOMIC_CB;
+                       __u64 dlmflags = LDLM_FL_ATOMIC_CB;
 
                        if (einfo->ei_mode == LCK_PW ||
                            einfo->ei_mode == LCK_EX)
                                dlmflags |= LDLM_FL_COS_INCOMPAT;
 
-                       /* This only happens if there are mulitple stripes
-                        * on the master MDT, i.e. except stripe0, there are
-                        * other stripes on the Master MDT as well, Only
-                        * happens in the test case right now. */
                        LASSERT(ns != NULL);
                        rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
                                                    policy, einfo->ei_mode,
@@ -5325,21 +5292,19 @@ static int lod_object_lock(const struct lu_env *env,
                                                    NULL, 0, LVB_T_NONE,
                                                    NULL, &lockh);
                }
-               if (rc != 0)
-                       break;
-               slave_locks->handles[i] = lockh;
+               if (rc) {
+                       while (i--)
+                               ldlm_lock_decref_and_cancel(
+                                               &slave_locks->ha_handles[i],
+                                               einfo->ei_mode);
+                       OBD_FREE(slave_locks, slave_locks_size);
+                       RETURN(rc);
+               }
+               slave_locks->ha_handles[i] = lockh;
        }
        einfo->ei_cbdata = slave_locks;
 
-       if (rc != 0 && slave_locks != NULL) {
-               lod_object_unlock_internal(env, dt, einfo, policy);
-               OBD_FREE(slave_locks, slave_locks_size);
-       }
-       EXIT;
-out:
-       if (rc != 0)
-               einfo->ei_cbdata = NULL;
-       RETURN(rc);
+       RETURN(0);
 }
 
 /**
index 80d5460..fee96ac 100644 (file)
@@ -2766,7 +2766,7 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
                               struct lustre_handle *lh, enum ldlm_mode mode,
                               __u64 *ibits, __u64 trybits, bool cache)
 {
-       struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
+       struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo;
        union ldlm_policy_data *policy = &mti->mti_policy;
        struct ldlm_res_id *res_id = &mti->mti_res_id;
        int rc = 0;
@@ -2793,17 +2793,14 @@ int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
                einfo->ei_cbdata = o;
        }
 
-
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = *ibits;
        policy->l_inodebits.try_bits = trybits;
 
        rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
                            policy);
-       if (rc < 0 && cache) {
+       if (rc < 0 && cache)
                mdt_object_put(mti->mti_env, o);
-               einfo->ei_cbdata = NULL;
-       }
 
        /* Return successfully acquired bits to a caller */
        if (rc == 0) {
index 2cf7219..3a29c90 100644 (file)
@@ -472,6 +472,8 @@ struct mdt_thread_info {
        /* should be enough to fit lustre_mdt_attrs */
        char                       mti_xattr_buf[128];
        struct ldlm_enqueue_info   mti_einfo;
+       /* einfo used by mdt_remote_object_lock_try() */
+       struct ldlm_enqueue_info   mti_remote_einfo;
        struct tg_reply_data      *mti_reply_data;
 
        struct lustre_som_attrs    mti_som;
index f307bd6..602072d 100644 (file)
@@ -290,82 +290,49 @@ static int mdt_remote_permission(struct mdt_thread_info *info)
 }
 
 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
-                            struct mdt_object *obj, __u64 ibits,
-                            struct mdt_lock_handle *s0_lh,
-                            struct mdt_object *s0_obj,
+                            struct mdt_object *obj,
                             struct ldlm_enqueue_info *einfo,
                             int decref)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
+       struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
        struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
        int i;
-       int rc;
-       ENTRY;
-
-       if (!S_ISDIR(obj->mot_header.loh_attr))
-               RETURN(0);
 
-       /* Unlock stripe 0 */
-       if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) {
-               LASSERT(s0_obj != NULL);
-               mdt_object_unlock_put(mti, s0_obj, s0_lh, decref);
-       }
+       LASSERT(S_ISDIR(obj->mot_header.loh_attr));
+       LASSERT(slave_locks);
 
        memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = ibits;
-
-       if (slave_locks != NULL) {
-               LASSERT(s0_lh != NULL);
-               for (i = 1; i < slave_locks->count; i++) {
-                       /* borrow s0_lh temporarily to do mdt unlock */
-                       mdt_lock_reg_init(s0_lh, einfo->ei_mode);
-                       s0_lh->mlh_rreg_lh = slave_locks->handles[i];
-                       mdt_object_unlock(mti, NULL, s0_lh, decref);
-                       slave_locks->handles[i].cookie = 0ull;
-               }
+       policy->l_inodebits.bits = einfo->ei_inodebits;
+       mdt_lock_handle_init(lh);
+       mdt_lock_reg_init(lh, einfo->ei_mode);
+       for (i = 0; i < slave_locks->ha_count; i++) {
+               if (test_bit(i, (void *)slave_locks->ha_map))
+                       lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+               else
+                       lh->mlh_reg_lh = slave_locks->ha_handles[i];
+               mdt_object_unlock(mti, NULL, lh, decref);
+               slave_locks->ha_handles[i].cookie = 0ull;
        }
 
-       rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
-                             policy);
-       RETURN(rc);
+       return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
+                               policy);
 }
 
-static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
-                          struct lu_fid *fid)
+static inline int mdt_object_striped(struct mdt_thread_info *mti,
+                                    struct mdt_object *obj)
 {
-       struct lu_buf *buf = &mti->mti_buf;
-       struct lmv_mds_md_v1 *lmv;
        int rc;
-       ENTRY;
 
        if (!S_ISDIR(obj->mot_header.loh_attr))
-               RETURN(0);
+               return 0;
 
-       buf->lb_buf = mti->mti_xattr_buf;
-       buf->lb_len = sizeof(mti->mti_xattr_buf);
-       rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf,
+       rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), &LU_BUF_NULL,
                          XATTR_NAME_LMV);
-       if (rc == -ERANGE) {
-               rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV);
-               if (rc > 0) {
-                       buf->lb_buf = mti->mti_big_lmm;
-                       buf->lb_len = mti->mti_big_lmmsize;
-               }
-       }
-
-       if (rc == -ENODATA || rc == -ENOENT)
-               RETURN(0);
-
        if (rc <= 0)
-               RETURN(rc);
-
-       lmv = buf->lb_buf;
-       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
-               RETURN(-EINVAL);
+               return rc == -ENODATA ? 0 : rc;
 
-       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
-
-       RETURN(rc);
+       return 1;
 }
 
 /**
@@ -374,40 +341,12 @@ static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
  **/
 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
                           enum ldlm_mode mode, __u64 ibits,
-                          struct lu_fid *s0_fid,
-                          struct mdt_lock_handle *s0_lh,
-                          struct mdt_object **s0_objp,
                           struct ldlm_enqueue_info *einfo)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
-       int rc;
-       ENTRY;
-
-       memset(einfo, 0, sizeof(*einfo));
-
-       rc = mdt_init_slaves(mti, obj, s0_fid);
-       if (rc <= 0)
-               RETURN(rc);
 
        LASSERT(S_ISDIR(obj->mot_header.loh_attr));
 
-       if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) {
-               /* Except migrating object, whose 0_stripe and master
-                * object are the same object, 0_stripe and master
-                * object are different, though they are in the same
-                * MDT, to avoid adding osd_object_lock here, so we
-                * will enqueue the stripe0 lock in MDT0 for now */
-               *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid);
-               if (IS_ERR(*s0_objp))
-                       RETURN(PTR_ERR(*s0_objp));
-
-               rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true);
-               if (rc < 0) {
-                       mdt_object_put(mti->mti_env, *s0_objp);
-                       RETURN(rc);
-               }
-       }
-
        einfo->ei_type = LDLM_IBITS;
        einfo->ei_mode = mode;
        einfo->ei_cb_bl = mdt_remote_blocking_ast;
@@ -415,12 +354,56 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
        einfo->ei_cb_cp = ldlm_completion_ast;
        einfo->ei_enq_slave = 1;
        einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
+       einfo->ei_inodebits = ibits;
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
 
-       rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
-                           policy);
-       RETURN(rc);
+       return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
+                             policy);
+}
+
+static inline int mdt_reint_striped_lock(struct mdt_thread_info *info,
+                                        struct mdt_object *o,
+                                        struct mdt_lock_handle *lh,
+                                        __u64 ibits,
+                                        struct ldlm_enqueue_info *einfo,
+                                        bool cos_incompat)
+{
+       int rc;
+
+       LASSERT(!mdt_object_remote(o));
+
+       memset(einfo, 0, sizeof(*einfo));
+
+       rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
+       if (rc)
+               return rc;
+
+       rc = mdt_object_striped(info, o);
+       if (rc != 1) {
+               if (rc < 0)
+                       mdt_object_unlock(info, o, lh, rc);
+               return rc;
+       }
+
+       rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
+       if (rc) {
+               mdt_object_unlock(info, o, lh, rc);
+               if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
+                       rc = 0;
+       }
+
+       return rc;
+}
+
+static inline void
+mdt_reint_striped_unlock(struct mdt_thread_info *info, struct mdt_object *o,
+                        struct mdt_lock_handle *lh,
+                        struct ldlm_enqueue_info *einfo, int decref)
+{
+       if (einfo->ei_cbdata)
+               mdt_unlock_slaves(info, o, einfo, decref);
+       mdt_object_unlock(info, o, lh, decref);
 }
 
 /*
@@ -536,15 +519,15 @@ static int mdt_create(struct mdt_thread_info *info)
         */
        if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
                struct mdt_lock_handle *lhc;
-               struct mdt_lock_handle *s0_lh;
-               struct mdt_object *s0_obj = NULL;
-               struct ldlm_enqueue_info *einfo;
-               struct lu_fid *s0_fid = &info->mti_tmp_fid1;
-               bool cos_incompat = false;
-
-               rc = mdt_init_slaves(info, child, s0_fid);
-               if (rc > 0) {
-                       cos_incompat = true;
+               struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+               bool cos_incompat;
+
+               rc = mdt_object_striped(info, child);
+               if (rc < 0)
+                       GOTO(put_child, rc);
+
+               cos_incompat = rc;
+               if (cos_incompat) {
                        if (!mdt_object_remote(parent)) {
                                mdt_object_unlock(info, parent, lh, 1);
                                mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
@@ -556,26 +539,16 @@ static int mdt_create(struct mdt_thread_info *info)
                        }
                }
 
-               einfo = &info->mti_einfo;
                lhc = &info->mti_lh[MDT_LH_CHILD];
                mdt_lock_handle_init(lhc);
                mdt_lock_reg_init(lhc, LCK_PW);
-               rc = mdt_reint_object_lock(info, child, lhc,
-                                          MDS_INODELOCK_UPDATE,
-                                          cos_incompat);
+               rc = mdt_reint_striped_lock(info, child, lhc,
+                                           MDS_INODELOCK_UPDATE, einfo,
+                                           cos_incompat);
                if (rc)
                        GOTO(put_child, rc);
-               mdt_object_unlock(info, child, lhc, rc);
-
-               s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-               mdt_lock_handle_init(s0_lh);
-               mdt_lock_reg_init(s0_lh, LCK_PW);
-               rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE,
-                                    s0_fid, s0_lh, &s0_obj, einfo);
-               mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh,
-                                 s0_obj, einfo, rc);
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO)
-                       rc = 0;
+
+               mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
        }
 
        /* Return fid & attr to client. */
@@ -599,16 +572,15 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
                        (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
        __u64 lockpart = MDS_INODELOCK_UPDATE;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo;
-       struct lu_fid *s0_fid = &info->mti_tmp_fid1;
-       struct mdt_lock_handle *s0_lh = NULL;
-       struct mdt_object *s0_obj = NULL;
-       bool cos_incompat = false;
+       bool cos_incompat;
        int rc;
        ENTRY;
 
-       rc = mdt_init_slaves(info, mo, s0_fid);
-       if (rc > 0)
-               cos_incompat = true;
+       rc = mdt_object_striped(info, mo);
+       if (rc < 0)
+               RETURN(rc);
+
+       cos_incompat = rc;
 
        lh = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_reg_init(lh, LCK_PW);
@@ -620,17 +592,11 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
                lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
 
-       rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat);
+       rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
+                                   cos_incompat);
        if (rc != 0)
                RETURN(rc);
 
-       s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-       mdt_lock_reg_init(s0_lh, LCK_PW);
-       rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj,
-                            einfo);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
-
        /* all attrs are packed into mti_attr in unpack_setattr */
        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                       OBD_FAIL_MDS_REINT_SETATTR_WRITE);
@@ -659,8 +625,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        mdt_dom_obj_lvb_update(info->mti_env, mo, false);
        EXIT;
 out_unlock:
-       mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc);
-       mdt_object_unlock(info, mo, lh, rc);
+       mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
        return rc;
 }
 
@@ -872,9 +837,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        struct mdt_lock_handle *parent_lh;
        struct mdt_lock_handle *child_lh;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo;
-       struct lu_fid *s0_fid = &info->mti_tmp_fid2;
-       struct mdt_lock_handle *s0_lh = NULL;
-       struct mdt_object *s0_obj = NULL;
        __u64 lock_ibits;
        bool cos_incompat = false;
        int no_name = 0;
@@ -950,11 +912,17 @@ relock:
        if (IS_ERR(mc))
                GOTO(unlock_parent, rc = PTR_ERR(mc));
 
-       if (!cos_incompat && mdt_init_slaves(info, mc, s0_fid) > 0) {
-               cos_incompat = true;
-               mdt_object_put(info->mti_env, mc);
-               mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
-               goto relock;
+       if (!cos_incompat) {
+               rc = mdt_object_striped(info, mc);
+               if (rc < 0)
+                       GOTO(unlock_parent, rc = PTR_ERR(mc));
+
+               cos_incompat = rc;
+               if (cos_incompat) {
+                       mdt_object_put(info->mti_env, mc);
+                       mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
+                       goto relock;
+               }
        }
 
        child_lh = &info->mti_lh[MDT_LH_CHILD];
@@ -1021,10 +989,10 @@ relock:
                lock_ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits,
-                                  cos_incompat);
+       rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
+                                   cos_incompat);
        if (rc != 0)
-               GOTO(unlock_child, rc);
+               GOTO(put_child, rc);
 
        /*
         * Now we can only make sure we need MA_INODE, in mdd layer, will check
@@ -1033,13 +1001,6 @@ relock:
        ma->ma_need = MA_INODE;
        ma->ma_valid = 0;
 
-       s0_lh = &info->mti_lh[MDT_LH_LOCAL];
-       mdt_lock_reg_init(s0_lh, LCK_EX);
-       rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid,
-                            s0_lh, &s0_obj, einfo);
-       if (rc != 0)
-               GOTO(unlock_child, rc);
-
        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                       OBD_FAIL_MDS_REINT_UNLINK_WRITE);
        /* save version when object is locked */
@@ -1086,9 +1047,7 @@ out_stat:
        EXIT;
 
 unlock_child:
-       mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo,
-                         rc);
-       mdt_object_unlock(info, mc, child_lh, rc);
+       mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
 put_child:
        mdt_object_put(info->mti_env, mc);
 unlock_parent: