Whamcloud - gitweb
LU-6406 tests: fix undersized fsname and FID strings
[fs/lustre-release.git] / lustre / lod / lod_object.c
index f8536d0..a87c4e8 100644 (file)
@@ -86,9 +86,10 @@ static int lod_declare_index_insert(const struct lu_env *env,
                                    struct dt_object *dt,
                                    const struct dt_rec *rec,
                                    const struct dt_key *key,
-                                   struct thandle *handle)
+                                   struct thandle *th)
 {
-       return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
+       return lod_sub_object_declare_insert(env, dt_object_child(dt),
+                                            rec, key, th);
 }
 
 /**
@@ -105,7 +106,8 @@ static int lod_index_insert(const struct lu_env *env,
                            struct thandle *th,
                            int ign)
 {
-       return dt_insert(env, dt_object_child(dt), rec, key, th, ign);
+       return lod_sub_object_index_insert(env, dt_object_child(dt), rec, key,
+                                          th, ign);
 }
 
 /**
@@ -121,7 +123,8 @@ static int lod_declare_index_delete(const struct lu_env *env,
                                    const struct dt_key *key,
                                    struct thandle *th)
 {
-       return dt_declare_delete(env, dt_object_child(dt), key, th);
+       return lod_sub_object_declare_delete(env, dt_object_child(dt), key,
+                                            th);
 }
 
 /**
@@ -136,7 +139,7 @@ static int lod_index_delete(const struct lu_env *env,
                            const struct dt_key *key,
                            struct thandle *th)
 {
-       return dt_delete(env, dt_object_child(dt), key, th);
+       return lod_sub_object_delete(env, dt_object_child(dt), key, th);
 }
 
 /**
@@ -153,7 +156,6 @@ static struct dt_it *lod_it_init(const struct lu_env *env,
        struct lod_it           *it = &lod_env_info(env)->lti_it;
        struct dt_it            *it_next;
 
-
        it_next = next->do_index_ops->dio_it.init(env, next, attr);
        if (IS_ERR(it_next))
                return it_next;
@@ -851,7 +853,8 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
                                goto next;
                }
 
-               len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
+               len = snprintf(name, sizeof(name),
+                              DFID":", PFID(&ent->lde_fid));
                /* The ent->lde_name is composed of ${FID}:${index} */
                if (ent->lde_namelen < len + 1 ||
                    memcmp(ent->lde_name, name, len) != 0) {
@@ -1075,7 +1078,7 @@ static int lod_attr_get(const struct lu_env *env,
  **/
 static int lod_mark_dead_object(const struct lu_env *env,
                                struct dt_object *dt,
-                               struct thandle *handle,
+                               struct thandle *th,
                                bool declare)
 {
        struct lod_object       *lo = lod_dt_obj(dt);
@@ -1111,13 +1114,14 @@ static int lod_mark_dead_object(const struct lu_env *env,
                buf.lb_buf = lmv;
                buf.lb_len = sizeof(*lmv);
                if (declare) {
-                       rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
-                                                 XATTR_NAME_LMV,
-                                                 LU_XATTR_REPLACE, handle);
+                       rc = lod_sub_object_declare_xattr_set(env,
+                                               lo->ldo_stripe[i], &buf,
+                                               XATTR_NAME_LMV,
+                                               LU_XATTR_REPLACE, th);
                } else {
-                       rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
-                                         XATTR_NAME_LMV, LU_XATTR_REPLACE,
-                                         handle);
+                       rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
+                                                     &buf, XATTR_NAME_LMV,
+                                                     LU_XATTR_REPLACE, th);
                }
                if (rc != 0)
                        break;
@@ -1137,7 +1141,7 @@ static int lod_mark_dead_object(const struct lu_env *env,
 static int lod_declare_attr_set(const struct lu_env *env,
                                struct dt_object *dt,
                                const struct lu_attr *attr,
-                               struct thandle *handle)
+                               struct thandle *th)
 {
        struct dt_object  *next = dt_object_child(dt);
        struct lod_object *lo = lod_dt_obj(dt);
@@ -1147,14 +1151,14 @@ static int lod_declare_attr_set(const struct lu_env *env,
        /* Set dead object on all other stripes */
        if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
            attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
-               rc = lod_mark_dead_object(env, dt, handle, true);
+               rc = lod_mark_dead_object(env, dt, th, true);
                RETURN(rc);
        }
 
        /*
         * declare setattr on the local object
         */
-       rc = dt_declare_attr_set(env, next, attr, handle);
+       rc = lod_sub_object_declare_attr_set(env, next, attr, th);
        if (rc)
                RETURN(rc);
 
@@ -1173,7 +1177,8 @@ static int lod_declare_attr_set(const struct lu_env *env,
                        RETURN(0);
        } else {
                if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
-                                       LA_ATIME | LA_MTIME | LA_CTIME)))
+                                       LA_ATIME | LA_MTIME | LA_CTIME |
+                                       LA_FLAGS)))
                        RETURN(rc);
        }
        /*
@@ -1193,20 +1198,20 @@ static int lod_declare_attr_set(const struct lu_env *env,
         */
        LASSERT(lo->ldo_stripe);
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               if (likely(lo->ldo_stripe[i] != NULL)) {
-                       rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
-                                                handle);
-                       if (rc != 0) {
-                               CERROR("failed declaration: %d\n", rc);
-                               break;
-                       }
-               }
+               if (lo->ldo_stripe[i] == NULL)
+                       continue;
+               rc = lod_sub_object_declare_attr_set(env,
+                                       lo->ldo_stripe[i], attr,
+                                       th);
+               if (rc != 0)
+                       RETURN(rc);
        }
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
            dt_object_exists(next) != 0 &&
            dt_object_remote(next) == 0)
-               dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
+               lod_sub_object_declare_xattr_del(env, next,
+                                               XATTR_NAME_LOV, th);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
            dt_object_exists(next) &&
@@ -1216,8 +1221,9 @@ static int lod_declare_attr_set(const struct lu_env *env,
 
                buf->lb_buf = info->lti_ea_store;
                buf->lb_len = info->lti_ea_store_size;
-               dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
-                                    LU_XATTR_REPLACE, handle);
+               lod_sub_object_declare_xattr_set(env, next, buf,
+                                                XATTR_NAME_LOV,
+                                                LU_XATTR_REPLACE, th);
        }
 
        RETURN(rc);
@@ -1234,7 +1240,7 @@ static int lod_declare_attr_set(const struct lu_env *env,
 static int lod_attr_set(const struct lu_env *env,
                        struct dt_object *dt,
                        const struct lu_attr *attr,
-                       struct thandle *handle)
+                       struct thandle *th)
 {
        struct dt_object        *next = dt_object_child(dt);
        struct lod_object       *lo = lod_dt_obj(dt);
@@ -1244,14 +1250,14 @@ static int lod_attr_set(const struct lu_env *env,
        /* Set dead object on all other stripes */
        if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
            attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
-               rc = lod_mark_dead_object(env, dt, handle, false);
+               rc = lod_mark_dead_object(env, dt, th, false);
                RETURN(rc);
        }
 
        /*
         * apply changes to the local object
         */
-       rc = dt_attr_set(env, next, attr, handle);
+       rc = lod_sub_object_attr_set(env, next, attr, th);
        if (rc)
                RETURN(rc);
 
@@ -1263,7 +1269,8 @@ static int lod_attr_set(const struct lu_env *env,
                        RETURN(0);
        } else {
                if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
-                                       LA_ATIME | LA_MTIME | LA_CTIME)))
+                                       LA_ATIME | LA_MTIME | LA_CTIME |
+                                       LA_FLAGS)))
                        RETURN(rc);
        }
 
@@ -1277,21 +1284,20 @@ static int lod_attr_set(const struct lu_env *env,
        for (i = 0; i < lo->ldo_stripenr; i++) {
                if (unlikely(lo->ldo_stripe[i] == NULL))
                        continue;
+
                if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
                    (dt_object_exists(lo->ldo_stripe[i]) == 0))
                        continue;
 
-               rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle);
-               if (rc != 0) {
-                       CERROR("failed declaration: %d\n", rc);
+               rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], attr, th);
+               if (rc != 0)
                        break;
-               }
        }
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
            dt_object_exists(next) != 0 &&
            dt_object_remote(next) == 0)
-               dt_xattr_del(env, next, XATTR_NAME_LOV, handle);
+               rc = lod_sub_object_xattr_del(env, next, XATTR_NAME_LOV, th);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
            dt_object_exists(next) &&
@@ -1322,8 +1328,9 @@ static int lod_attr_set(const struct lu_env *env,
                fid->f_oid--;
                fid_to_ostid(fid, oi);
                ostid_cpu_to_le(oi, &objs->l_ost_oi);
-               dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
-                            LU_XATTR_REPLACE, handle);
+
+               rc = lod_sub_object_xattr_set(env, next, buf, XATTR_NAME_LOV,
+                                             LU_XATTR_REPLACE, th);
        }
 
        RETURN(rc);
@@ -1629,6 +1636,139 @@ out:
  * \retval             0 on success
  * \retval             negative if failed
  */
+static int lod_dir_declare_create_stripes(const struct lu_env *env,
+                                         struct dt_object *dt,
+                                         struct lu_attr *attr,
+                                         struct dt_object_format *dof,
+                                         struct thandle *th)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lu_buf           lmv_buf;
+       struct lu_buf           slave_lmv_buf;
+       struct lmv_mds_md_v1    *lmm;
+       struct lmv_mds_md_v1    *slave_lmm = NULL;
+       struct dt_insert_rec    *rec = &info->lti_dt_rec;
+       struct lod_object       *lo = lod_dt_obj(dt);
+       int                     rc;
+       __u32                   i;
+       ENTRY;
+
+       rc = lod_prep_lmv_md(env, dt, &lmv_buf);
+       if (rc != 0)
+               GOTO(out, rc);
+       lmm = lmv_buf.lb_buf;
+
+       OBD_ALLOC_PTR(slave_lmm);
+       if (slave_lmm == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       lod_prep_slave_lmv_md(slave_lmm, lmm);
+       slave_lmv_buf.lb_buf = slave_lmm;
+       slave_lmv_buf.lb_len = sizeof(*slave_lmm);
+
+       if (!dt_try_as_dir(env, dt_object_child(dt)))
+               GOTO(out, rc = -EINVAL);
+
+       rec->rec_type = S_IFDIR;
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               struct dt_object        *dto = lo->ldo_stripe[i];
+               char                    *stripe_name = info->lti_key;
+               struct lu_name          *sname;
+               struct linkea_data       ldata          = { NULL };
+               struct lu_buf           linkea_buf;
+
+               rc = lod_sub_object_declare_create(env, dto, attr, NULL,
+                                                  dof, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               if (!dt_try_as_dir(env, dto))
+                       GOTO(out, rc = -EINVAL);
+
+               rc = lod_sub_object_declare_ref_add(env, dto, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
+               rc = lod_sub_object_declare_insert(env, dto,
+                                       (const struct dt_rec *)rec,
+                                       (const struct dt_key *)dot, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               /* master stripe FID will be put to .. */
+               rec->rec_fid = lu_object_fid(&dt->do_lu);
+               rc = lod_sub_object_declare_insert(env, dto,
+                                       (const struct dt_rec *)rec,
+                                       (const struct dt_key *)dotdot,
+                                       th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
+                   cfs_fail_val != i) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
+                           cfs_fail_val == i)
+                               slave_lmm->lmv_master_mdt_index =
+                                                       cpu_to_le32(i + 1);
+                       else
+                               slave_lmm->lmv_master_mdt_index =
+                                                       cpu_to_le32(i);
+                       rc = lod_sub_object_declare_xattr_set(env, dto,
+                                       &slave_lmv_buf, XATTR_NAME_LMV, 0, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
+                   cfs_fail_val == i)
+                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+                               PFID(lu_object_fid(&dto->do_lu)), i + 1);
+               else
+                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+                               PFID(lu_object_fid(&dto->do_lu)), i);
+
+               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+               rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+               linkea_buf.lb_len = ldata.ld_leh->leh_len;
+               rc = lod_sub_object_declare_xattr_set(env, dto, &linkea_buf,
+                                         XATTR_NAME_LINK, 0, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
+               rc = lod_sub_object_declare_insert(env, dt_object_child(dt),
+                                      (const struct dt_rec *)rec,
+                                      (const struct dt_key *)stripe_name,
+                                      th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = lod_sub_object_declare_ref_add(env, dt_object_child(dt),
+                                                   th);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
+
+       rc = lod_sub_object_declare_xattr_set(env, dt_object_child(dt),
+                               &lmv_buf, XATTR_NAME_LMV, 0, th);
+       if (rc != 0)
+               GOTO(out, rc);
+out:
+       if (slave_lmm != NULL)
+               OBD_FREE_PTR(slave_lmm);
+
+       RETURN(rc);
+}
+
 static int lod_prep_md_striped_create(const struct lu_env *env,
                                      struct dt_object *dt,
                                      struct lu_attr *attr,
@@ -1639,13 +1779,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
        struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
        struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
        struct lod_object       *lo = lod_dt_obj(dt);
-       struct lod_thread_info  *info = lod_env_info(env);
        struct dt_object        **stripe;
-       struct lu_buf           lmv_buf;
-       struct lu_buf           slave_lmv_buf;
-       struct lmv_mds_md_v1    *lmm;
-       struct lmv_mds_md_v1    *slave_lmm = NULL;
-       struct dt_insert_rec    *rec = &info->lti_dt_rec;
        __u32                   stripe_count;
        int                     *idx_array;
        int                     rc = 0;
@@ -1682,7 +1816,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                if (i == 0) {
                        /* Right now, master stripe and master object are
                         * on the same MDT */
-                       idx = le32_to_cpu(lum->lum_stripe_offset);
+                       idx = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
                        rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
                                           NULL);
                        if (rc < 0)
@@ -1774,145 +1908,7 @@ next:
        if (lo->ldo_stripenr == 0)
                GOTO(out_put, rc = -ENOSPC);
 
-       rc = lod_prep_lmv_md(env, dt, &lmv_buf);
-       if (rc != 0)
-               GOTO(out_put, rc);
-       lmm = lmv_buf.lb_buf;
-
-       OBD_ALLOC_PTR(slave_lmm);
-       if (slave_lmm == NULL)
-               GOTO(out_put, rc = -ENOMEM);
-
-       lod_prep_slave_lmv_md(slave_lmm, lmm);
-       slave_lmv_buf.lb_buf = slave_lmm;
-       slave_lmv_buf.lb_len = sizeof(*slave_lmm);
-
-       if (!dt_try_as_dir(env, dt_object_child(dt)))
-               GOTO(out_put, rc = -EINVAL);
-
-       rec->rec_type = S_IFDIR;
-       for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct dt_object        *dto            = stripe[i];
-               char                    *stripe_name    = info->lti_key;
-               struct lu_name          *sname;
-               struct linkea_data       ldata          = { NULL };
-               struct lu_buf            linkea_buf;
-
-               rc = dt_declare_create(env, dto, attr, NULL, dof, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               if (!dt_try_as_dir(env, dto))
-                       GOTO(out_put, rc = -EINVAL);
-
-               rc = dt_declare_ref_add(env, dto, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
-                                      (const struct dt_key *)dot, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               /* master stripe FID will be put to .. */
-               rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
-                                      (const struct dt_key *)dotdot, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               /* probably nothing to inherite */
-               if (lo->ldo_def_striping_set &&
-                   !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
-                                        lo->ldo_def_stripenr,
-                                        lo->ldo_def_stripe_offset,
-                                        lo->ldo_pool)) {
-                       struct lov_user_md_v3   *v3;
-
-                       /* sigh, lti_ea_store has been used for lmv_buf,
-                        * so we have to allocate buffer for default
-                        * stripe EA */
-                       OBD_ALLOC_PTR(v3);
-                       if (v3 == NULL)
-                               GOTO(out_put, rc = -ENOMEM);
-
-                       memset(v3, 0, sizeof(*v3));
-                       v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
-                       v3->lmm_stripe_count =
-                               cpu_to_le16(lo->ldo_def_stripenr);
-                       v3->lmm_stripe_offset =
-                               cpu_to_le16(lo->ldo_def_stripe_offset);
-                       v3->lmm_stripe_size =
-                               cpu_to_le32(lo->ldo_def_stripe_size);
-                       if (lo->ldo_pool != NULL)
-                               strlcpy(v3->lmm_pool_name, lo->ldo_pool,
-                                       sizeof(v3->lmm_pool_name));
-
-                       info->lti_buf.lb_buf = v3;
-                       info->lti_buf.lb_len = sizeof(*v3);
-                       rc = dt_declare_xattr_set(env, dto,
-                                                 &info->lti_buf,
-                                                 XATTR_NAME_LOV,
-                                                 0, th);
-                       OBD_FREE_PTR(v3);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
-
-               if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
-                   cfs_fail_val != i) {
-                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
-                           cfs_fail_val == i)
-                               slave_lmm->lmv_master_mdt_index =
-                                                       cpu_to_le32(i + 1);
-                       else
-                               slave_lmm->lmv_master_mdt_index =
-                                                       cpu_to_le32(i);
-                       rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
-                                                 XATTR_NAME_LMV, 0, th);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
-                   cfs_fail_val == i)
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i + 1);
-               else
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i);
-
-               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
-               rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-               linkea_buf.lb_len = ldata.ld_leh->leh_len;
-               rc = dt_declare_xattr_set(env, dto, &linkea_buf,
-                                         XATTR_NAME_LINK, 0, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = dt_declare_insert(env, dt_object_child(dt),
-                                      (const struct dt_rec *)rec,
-                                      (const struct dt_key *)stripe_name, th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-
-               rc = dt_declare_ref_add(env, dt_object_child(dt), th);
-               if (rc != 0)
-                       GOTO(out_put, rc);
-       }
-
-       rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
-                                 XATTR_NAME_LMV, 0, th);
+       rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th);
        if (rc != 0)
                GOTO(out_put, rc);
 
@@ -1930,8 +1926,6 @@ out_put:
 out_free:
        if (idx_array != NULL)
                OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
-       if (slave_lmm != NULL)
-               OBD_FREE_PTR(slave_lmm);
 
        RETURN(rc);
 }
@@ -2028,7 +2022,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
                        RETURN(rc);
        }
 
-       rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
+       rc = lod_sub_object_declare_xattr_set(env, next, buf, name, fl, th);
        if (rc != 0)
                RETURN(rc);
 
@@ -2046,8 +2040,9 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
-                                         name, fl, th);
+
+               rc = lod_sub_object_declare_xattr_set(env, lo->ldo_stripe[i],
+                                               buf, name, fl, th);
                if (rc != 0)
                        break;
        }
@@ -2102,7 +2097,8 @@ static int lod_declare_xattr_set(const struct lu_env *env,
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
        } else {
-               rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
+               rc = lod_sub_object_declare_xattr_set(env, next, buf, name,
+                                                     fl, th);
        }
 
        RETURN(rc);
@@ -2150,7 +2146,7 @@ static int lod_xattr_set_internal(const struct lu_env *env,
        int                     i;
        ENTRY;
 
-       rc = dt_xattr_set(env, next, buf, name, fl, th);
+       rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th);
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
@@ -2163,7 +2159,9 @@ static int lod_xattr_set_internal(const struct lu_env *env,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th);
+
+               rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], buf, name,
+                                             fl, th);
                if (rc != 0)
                        break;
        }
@@ -2194,7 +2192,7 @@ static int lod_xattr_del_internal(const struct lu_env *env,
        int                     i;
        ENTRY;
 
-       rc = dt_xattr_del(env, next, name, th);
+       rc = lod_sub_object_xattr_del(env, next, name, th);
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
@@ -2203,7 +2201,9 @@ static int lod_xattr_del_internal(const struct lu_env *env,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th);
+
+               rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name,
+                                             th);
                if (rc != 0)
                        break;
        }
@@ -2349,7 +2349,7 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
  * failure, then the layer above LOD sends this defined striping
  * using ->do_xattr_set(), so LOD uses this method to replay creation
  * of the stripes. Notice the original information for the striping
- * (#stripes, FIDs, etc) was transfered in declare path.
+ * (#stripes, FIDs, etc) was transferred in declare path.
  *
  * \param[in] env      execution environment
  * \param[in] dt       the striped object
@@ -2409,71 +2409,39 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
 
        rec->rec_type = S_IFDIR;
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct dt_object        *dto;
-               char                    *stripe_name    = info->lti_key;
+               struct dt_object *dto;
+               char             *stripe_name = info->lti_key;
                struct lu_name          *sname;
                struct linkea_data       ldata          = { NULL };
                struct lu_buf            linkea_buf;
 
                dto = lo->ldo_stripe[i];
+
                dt_write_lock(env, dto, MOR_TGT_CHILD);
-               rc = dt_create(env, dto, attr, NULL, dof, th);
+               rc = lod_sub_object_create(env, dto, attr, NULL, dof,
+                                          th);
                if (rc != 0) {
                        dt_write_unlock(env, dto);
-                       RETURN(rc);
+                       GOTO(out, rc);
                }
 
-               rc = dt_ref_add(env, dto, th);
+               rc = lod_sub_object_ref_add(env, dto, th);
                dt_write_unlock(env, dto);
                if (rc != 0)
-                       RETURN(rc);
+                       GOTO(out, rc);
 
                rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = dt_insert(env, dto, (const struct dt_rec *)rec,
-                              (const struct dt_key *)dot, th, 0);
+               rc = lod_sub_object_index_insert(env, dto,
+                               (const struct dt_rec *)rec,
+                               (const struct dt_key *)dot, th, 0);
                if (rc != 0)
-                       RETURN(rc);
+                       GOTO(out, rc);
 
                rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = dt_insert(env, dto, (struct dt_rec *)rec,
+               rc = lod_sub_object_index_insert(env, dto, (struct dt_rec *)rec,
                               (const struct dt_key *)dotdot, th, 0);
                if (rc != 0)
-                       RETURN(rc);
-
-               if (lo->ldo_def_striping_set &&
-                   !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
-                                        lo->ldo_def_stripenr,
-                                        lo->ldo_def_stripe_offset,
-                                        lo->ldo_pool)) {
-                       struct lov_user_md_v3   *v3;
-
-                       /* sigh, lti_ea_store has been used for lmv_buf,
-                        * so we have to allocate buffer for default
-                        * stripe EA */
-                       OBD_ALLOC_PTR(v3);
-                       if (v3 == NULL)
-                               GOTO(out, rc);
-
-                       memset(v3, 0, sizeof(*v3));
-                       v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
-                       v3->lmm_stripe_count =
-                               cpu_to_le16(lo->ldo_def_stripenr);
-                       v3->lmm_stripe_offset =
-                               cpu_to_le16(lo->ldo_def_stripe_offset);
-                       v3->lmm_stripe_size =
-                               cpu_to_le32(lo->ldo_def_stripe_size);
-                       if (lo->ldo_pool != NULL)
-                               strlcpy(v3->lmm_pool_name, lo->ldo_pool,
-                                       sizeof(v3->lmm_pool_name));
-
-                       info->lti_buf.lb_buf = v3;
-                       info->lti_buf.lb_len = sizeof(*v3);
-                       rc = dt_xattr_set(env, dto, &info->lti_buf,
-                                         XATTR_NAME_LOV, 0, th);
-                       OBD_FREE_PTR(v3);
-                       if (rc != 0)
-                               GOTO(out, rc);
-               }
+                       GOTO(out, rc);
 
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
@@ -2484,8 +2452,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                        else
                                slave_lmm->lmv_master_mdt_index =
                                                        cpu_to_le32(i);
-                       rc = dt_xattr_set(env, dto, &slave_lmv_buf,
-                                         XATTR_NAME_LMV, fl, th);
+
+                       rc = lod_sub_object_xattr_set(env, dto, &slave_lmv_buf,
+                                                     XATTR_NAME_LMV, fl, th);
                        if (rc != 0)
                                GOTO(out, rc);
                }
@@ -2509,27 +2478,26 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
 
                linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
                linkea_buf.lb_len = ldata.ld_leh->leh_len;
-               rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
-                                 0, th);
+               rc = lod_sub_object_xattr_set(env, dto, &linkea_buf,
+                                       XATTR_NAME_LINK, 0, th);
                if (rc != 0)
                        GOTO(out, rc);
 
                rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = dt_insert(env, dt_object_child(dt),
+               rc = lod_sub_object_index_insert(env, dt_object_child(dt),
                               (const struct dt_rec *)rec,
                               (const struct dt_key *)stripe_name, th, 0);
                if (rc != 0)
                        GOTO(out, rc);
 
-               rc = dt_ref_add(env, dt_object_child(dt), th);
+               rc = lod_sub_object_ref_add(env, dt_object_child(dt), th);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV))
-               rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf,
-                                 XATTR_NAME_LMV, fl, th);
-
+               rc = lod_sub_object_xattr_set(env, dt_object_child(dt),
+                                             &lmv_buf, XATTR_NAME_LMV, fl, th);
 out:
        if (slave_lmm != NULL)
                OBD_FREE_PTR(slave_lmm);
@@ -2736,7 +2704,8 @@ static int lod_xattr_set(const struct lu_env *env,
 
                if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
                                                LMV_HASH_FLAG_MIGRATION)
-                       rc = dt_xattr_set(env, next, buf, name, fl, th);
+                       rc = lod_sub_object_xattr_set(env, next, buf, name, fl,
+                                                     th);
                else
                        rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
 
@@ -2759,12 +2728,21 @@ static int lod_xattr_set(const struct lu_env *env,
                /* in case of lov EA swap, just set it
                 * if not, it is a replay so check striping match what we
                 * already have during req replay, declare_xattr_set()
-                * defines striping, then create() does the work
-               */
+                * defines striping, then create() does the work */
                if (fl & LU_XATTR_REPLACE) {
                        /* free stripes, then update disk */
                        lod_object_free_striping(env, lod_dt_obj(dt));
-                       rc = dt_xattr_set(env, next, buf, name, fl, th);
+
+                       rc = lod_sub_object_xattr_set(env, next, buf, name,
+                                                     fl, th);
+               } else if (dt_object_remote(dt)) {
+                       /* This only happens during migration, see
+                        * mdd_migrate_create(), in which Master MDT will
+                        * create a remote target object, and only set
+                        * (migrating) stripe EA on the remote object,
+                        * and does not need creating each stripes. */
+                       rc = lod_sub_object_xattr_set(env, next, buf, name,
+                                                     fl, th);
                } else {
                        rc = lod_striping_create(env, dt, NULL, NULL, th);
                }
@@ -2792,7 +2770,8 @@ static int lod_declare_xattr_del(const struct lu_env *env,
        int                     i;
        ENTRY;
 
-       rc = dt_declare_xattr_del(env, dt_object_child(dt), name, th);
+       rc = lod_sub_object_declare_xattr_del(env, dt_object_child(dt),
+                                             name, th);
        if (rc != 0)
                RETURN(rc);
 
@@ -2809,7 +2788,8 @@ static int lod_declare_xattr_del(const struct lu_env *env,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_declare_xattr_del(env, lo->ldo_stripe[i], name, th);
+               rc = lod_sub_object_declare_xattr_del(env, lo->ldo_stripe[i],
+                                                     name, th);
                if (rc != 0)
                        break;
        }
@@ -2837,7 +2817,7 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
        if (!strcmp(name, XATTR_NAME_LOV))
                lod_object_free_striping(env, lod_dt_obj(dt));
 
-       rc = dt_xattr_del(env, next, name, th);
+       rc = lod_sub_object_xattr_del(env, next, name, th);
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
@@ -2846,7 +2826,8 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th);
+
+               rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, th);
                if (rc != 0)
                        break;
        }
@@ -3319,7 +3300,8 @@ static int lod_declare_init_size(const struct lu_env *env,
        attr->la_valid = LA_SIZE;
        attr->la_size = size;
 
-       rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
+       rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[stripe], attr,
+                                            th);
 
        RETURN(rc);
 }
@@ -3385,8 +3367,8 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
                info->lti_buf = *lovea;
        }
 
-       rc = dt_declare_xattr_set(env, next, &info->lti_buf,
-                                 XATTR_NAME_LOV, 0, th);
+       rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+                                             XATTR_NAME_LOV, 0, th);
        if (rc)
                GOTO(out, rc);
 
@@ -3432,8 +3414,8 @@ static int lod_declare_object_create(const struct lu_env *env,
        /*
         * first of all, we declare creation of local object
         */
-       rc = dt_declare_create(env, next, attr, hint, dof, th);
-       if (rc)
+       rc = lod_sub_object_declare_create(env, next, attr, hint, dof, th);
+       if (rc != 0)
                GOTO(out, rc);
 
        if (dof->dof_type == DFT_SYM)
@@ -3513,8 +3495,8 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
        /* create all underlying objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
-               rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
-
+               rc = lod_sub_object_create(env, lo->ldo_stripe[i], attr, NULL,
+                                          dof, th);
                if (rc)
                        break;
        }
@@ -3542,13 +3524,13 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
                             struct dt_allocation_hint *hint,
                             struct dt_object_format *dof, struct thandle *th)
 {
-       struct dt_object   *next = dt_object_child(dt);
        struct lod_object  *lo = lod_dt_obj(dt);
        int                 rc;
        ENTRY;
 
        /* create local object */
-       rc = dt_create(env, next, attr, hint, dof, th);
+       rc = lod_sub_object_create(env, dt_object_child(dt), attr, hint, dof,
+                                  th);
        if (rc != 0)
                RETURN(rc);
 
@@ -3598,22 +3580,24 @@ static int lod_declare_object_destroy(const struct lu_env *env,
                        RETURN(rc);
 
                for (i = 0; i < lo->ldo_stripenr; i++) {
-                       rc = dt_declare_ref_del(env, next, th);
+                       rc = lod_sub_object_declare_ref_del(env, next, th);
                        if (rc != 0)
                                RETURN(rc);
+
                        snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
                                i);
-                       rc = dt_declare_delete(env, next,
+                       rc = lod_sub_object_declare_delete(env, next,
                                        (const struct dt_key *)stripe_name, th);
                        if (rc != 0)
                                RETURN(rc);
                }
        }
+
        /*
         * we declare destroy for the local object
         */
-       rc = dt_declare_destroy(env, next, th);
+       rc = lod_sub_object_declare_destroy(env, next, th);
        if (rc)
                RETURN(rc);
 
@@ -3622,18 +3606,17 @@ static int lod_declare_object_destroy(const struct lu_env *env,
 
        /* declare destroy all striped objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               if (likely(lo->ldo_stripe[i] != NULL)) {
-                       if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
-                               rc = dt_declare_ref_del(env, lo->ldo_stripe[i],
-                                                       th);
-                               if (rc != 0)
-                                       RETURN(rc);
-                       }
+               if (lo->ldo_stripe[i] == NULL)
+                       continue;
 
-                       rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
-                       if (rc != 0)
-                               break;
-               }
+               if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+                       rc = lod_sub_object_declare_ref_del(env,
+                                       lo->ldo_stripe[i], th);
+
+               rc = lod_sub_object_declare_destroy(env, lo->ldo_stripe[i],
+                                       th);
+               if (rc != 0)
+                       break;
        }
 
        RETURN(rc);
@@ -3667,7 +3650,7 @@ static int lod_object_destroy(const struct lu_env *env,
                        RETURN(rc);
 
                for (i = 0; i < lo->ldo_stripenr; i++) {
-                       rc = dt_ref_del(env, next, th);
+                       rc = lod_sub_object_ref_del(env, next, th);
                        if (rc != 0)
                                RETURN(rc);
 
@@ -3679,13 +3662,14 @@ static int lod_object_destroy(const struct lu_env *env,
                               PFID(lu_object_fid(&dt->do_lu)), stripe_name,
                               PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
 
-                       rc = dt_delete(env, next,
+                       rc = lod_sub_object_delete(env, next,
                                       (const struct dt_key *)stripe_name, th);
                        if (rc != 0)
                                RETURN(rc);
                }
        }
-       rc = dt_destroy(env, next, th);
+
+       rc = lod_sub_object_destroy(env, next, th);
        if (rc != 0)
                RETURN(rc);
 
@@ -3700,13 +3684,14 @@ static int lod_object_destroy(const struct lu_env *env,
                        if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
                                dt_write_lock(env, lo->ldo_stripe[i],
                                              MOR_TGT_CHILD);
-                               rc = dt_ref_del(env, lo->ldo_stripe[i], th);
+                               rc = lod_sub_object_ref_del(env,
+                                               lo->ldo_stripe[i], th);
                                dt_write_unlock(env, lo->ldo_stripe[i]);
                                if (rc != 0)
                                        break;
                        }
 
-                       rc = dt_destroy(env, lo->ldo_stripe[i], th);
+                       rc = lod_sub_object_destroy(env, lo->ldo_stripe[i], th);
                        if (rc != 0)
                                break;
                }
@@ -3724,7 +3709,7 @@ static int lod_object_destroy(const struct lu_env *env,
 static int lod_declare_ref_add(const struct lu_env *env,
                               struct dt_object *dt, struct thandle *th)
 {
-       return dt_declare_ref_add(env, dt_object_child(dt), th);
+       return lod_sub_object_declare_ref_add(env, dt_object_child(dt), th);
 }
 
 /**
@@ -3735,7 +3720,7 @@ static int lod_declare_ref_add(const struct lu_env *env,
 static int lod_ref_add(const struct lu_env *env,
                       struct dt_object *dt, struct thandle *th)
 {
-       return dt_ref_add(env, dt_object_child(dt), th);
+       return lod_sub_object_ref_add(env, dt_object_child(dt), th);
 }
 
 /**
@@ -3747,7 +3732,7 @@ static int lod_ref_add(const struct lu_env *env,
 static int lod_declare_ref_del(const struct lu_env *env,
                               struct dt_object *dt, struct thandle *th)
 {
-       return dt_declare_ref_del(env, dt_object_child(dt), th);
+       return lod_sub_object_declare_ref_del(env, dt_object_child(dt), th);
 }
 
 /**
@@ -3758,7 +3743,7 @@ static int lod_declare_ref_del(const struct lu_env *env,
 static int lod_ref_del(const struct lu_env *env,
                       struct dt_object *dt, struct thandle *th)
 {
-       return dt_ref_del(env, dt_object_child(dt), th);
+       return lod_sub_object_ref_del(env, dt_object_child(dt), th);
 }
 
 /**
@@ -3996,8 +3981,8 @@ static ssize_t lod_declare_write(const struct lu_env *env,
                                 const struct lu_buf *buf, loff_t pos,
                                 struct thandle *th)
 {
-       return dt_declare_record_write(env, dt_object_child(dt),
-                                      buf, pos, th);
+       return lod_sub_object_declare_write(env, dt_object_child(dt), buf, pos,
+                                           th);
 }
 
 /**
@@ -4009,9 +3994,7 @@ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
                         const struct lu_buf *buf, loff_t *pos,
                         struct thandle *th, int iq)
 {
-       struct dt_object *next = dt_object_child(dt);
-       LASSERT(next);
-       return next->do_body_ops->dbo_write(env, next, buf, pos, th, iq);
+       return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq);
 }
 
 static const struct dt_body_operations lod_body_lnk_ops = {