Whamcloud - gitweb
LU-9771 flr: Send write intent RPC to mdt
[fs/lustre-release.git] / lustre / lod / lod_object.c
index 5cde03d..b0e940f 100644 (file)
@@ -1047,7 +1047,7 @@ static int lod_attr_get(const struct lu_env *env,
 }
 
 int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
-                           struct thandle *th, lod_obj_stripe_cb_t cb,
+                           struct thandle *th,
                            struct lod_obj_stripe_cb_data *data)
 {
        struct lod_layout_component *lod_comp;
@@ -1061,13 +1061,23 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
                if (lod_comp->llc_stripe == NULL)
                        continue;
 
+               /* has stripe but not inited yet, this component has been
+                * declared to be created, but hasn't created yet.
+                */
+               if (!lod_comp_inited(lod_comp))
+                       continue;
+
+               if (data->locd_comp_skip_cb &&
+                   data->locd_comp_skip_cb(env, lo, i, data))
+                       continue;
+
                LASSERT(lod_comp->llc_stripe_count > 0);
                for (j = 0; j < lod_comp->llc_stripe_count; j++) {
                        struct dt_object *dt = lod_comp->llc_stripe[j];
 
                        if (dt == NULL)
                                continue;
-                       rc = cb(env, lo, dt, th, j, data);
+                       rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data);
                        if (rc != 0)
                                RETURN(rc);
                }
@@ -1075,10 +1085,63 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
        RETURN(0);
 }
 
+static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env,
+               struct lod_object *lo, int comp_idx,
+               struct lod_obj_stripe_cb_data *data)
+{
+       struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx];
+       bool skipped = false;
+
+       if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION))
+               return skipped;
+
+       switch (lo->ldo_flr_state) {
+       case LCM_FL_WRITE_PENDING: {
+               int i;
+
+               /* skip stale components */
+               if (lod_comp->llc_flags & LCME_FL_STALE) {
+                       skipped = true;
+                       break;
+               }
+
+               /* skip valid and overlapping components, therefore any
+                * attempts to write overlapped components will never succeed
+                * because client will get EINPROGRESS. */
+               for (i = 0; i < lo->ldo_comp_cnt; i++) {
+                       if (i == comp_idx)
+                               continue;
+
+                       if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE)
+                               continue;
+
+                       if (lu_extent_is_overlapped(&lod_comp->llc_extent,
+                                       &lo->ldo_comp_entries[i].llc_extent)) {
+                               skipped = true;
+                               break;
+                       }
+               }
+               break;
+       }
+       default:
+               LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state);
+       case LCM_FL_SYNC_PENDING:
+               break;
+       }
+
+       CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n",
+              PFID(lu_object_fid(&lo->ldo_obj.do_lu)),
+              skipped ? "skipped" : "chose", lod_comp->llc_id,
+              data->locd_attr->la_layout_version);
+
+       return skipped;
+}
+
 static inline int
 lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo,
                           struct dt_object *dt, struct thandle *th,
-                          int stripe_idx, struct lod_obj_stripe_cb_data *data)
+                          int comp_idx, int stripe_idx,
+                          struct lod_obj_stripe_cb_data *data)
 {
        if (data->locd_declare)
                return lod_sub_declare_attr_set(env, dt, data->locd_attr, th);
@@ -1120,7 +1183,7 @@ static int lod_declare_attr_set(const struct lu_env *env,
         * speed up rename().
         */
        if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
-               if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+               if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
                        RETURN(rc);
 
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
@@ -1157,12 +1220,12 @@ static int lod_declare_attr_set(const struct lu_env *env,
                                RETURN(rc);
                }
        } else {
-               struct lod_obj_stripe_cb_data data;
+               struct lod_obj_stripe_cb_data data = { { 0 } };
 
                data.locd_attr = attr;
                data.locd_declare = true;
-               rc = lod_obj_for_each_stripe(env, lo, th,
-                               lod_obj_stripe_attr_set_cb, &data);
+               data.locd_stripe_cb = lod_obj_stripe_attr_set_cb;
+               rc = lod_obj_for_each_stripe(env, lo, th, &data);
        }
 
        if (rc)
@@ -1217,7 +1280,7 @@ static int lod_attr_set(const struct lu_env *env,
                RETURN(rc);
 
        if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
-               if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+               if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
                        RETURN(rc);
 
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
@@ -1229,6 +1292,14 @@ static int lod_attr_set(const struct lu_env *env,
                        RETURN(rc);
        }
 
+       /* FIXME: a tricky case in the code path of mdd_layout_change():
+        * the in-memory striping information has been freed in lod_xattr_set()
+        * due to layout change. It has to load stripe here again. It only
+        * changes flags of layout so declare_attr_set() is still accurate */
+       rc = lod_load_striping_locked(env, lo);
+       if (rc)
+               RETURN(rc);
+
        if (!lod_obj_is_striped(dt))
                RETURN(0);
 
@@ -1249,12 +1320,13 @@ static int lod_attr_set(const struct lu_env *env,
                                break;
                }
        } else {
-               struct lod_obj_stripe_cb_data data;
+               struct lod_obj_stripe_cb_data data = { { 0 } };
 
                data.locd_attr = attr;
                data.locd_declare = false;
-               rc = lod_obj_for_each_stripe(env, lo, th,
-                               lod_obj_stripe_attr_set_cb, &data);
+               data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb;
+               data.locd_stripe_cb = lod_obj_stripe_attr_set_cb;
+               rc = lod_obj_for_each_stripe(env, lo, th, &data);
        }
 
        if (rc)
@@ -2057,7 +2129,7 @@ static int
 lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env,
                                     struct lod_object *lo,
                                     struct dt_object *dt, struct thandle *th,
-                                    int stripe_idx,
+                                    int comp_idx, int stripe_idx,
                                     struct lod_obj_stripe_cb_data *data)
 {
        struct lod_thread_info *info = lod_env_info(env);
@@ -2110,7 +2182,7 @@ static int lod_replace_parent_fid(const struct lu_env *env,
        struct lod_thread_info  *info = lod_env_info(env);
        struct lu_buf *buf = &info->lti_buf;
        struct filter_fid *ff;
-       struct lod_obj_stripe_cb_data data;
+       struct lod_obj_stripe_cb_data data = { { 0 } };
        int rc;
        ENTRY;
 
@@ -2134,9 +2206,8 @@ static int lod_replace_parent_fid(const struct lu_env *env,
        buf->lb_len = info->lti_ea_store_size;
 
        data.locd_declare = declare;
-       rc = lod_obj_for_each_stripe(env, lo, th,
-                                    lod_obj_stripe_replace_parent_fid_cb,
-                                    &data);
+       data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb;
+       rc = lod_obj_for_each_stripe(env, lo, th, &data);
 
        RETURN(rc);
 }
@@ -4611,7 +4682,8 @@ static int lod_create(const struct lu_env *env, struct dt_object *dt,
 static inline int
 lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo,
                          struct dt_object *dt, struct thandle *th,
-                         int stripe_idx, struct lod_obj_stripe_cb_data *data)
+                         int comp_idx, int stripe_idx,
+                         struct lod_obj_stripe_cb_data *data)
 {
        if (data->locd_declare)
                return lod_sub_declare_destroy(env, dt, th);
@@ -4703,11 +4775,11 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt,
                                break;
                }
        } else {
-               struct lod_obj_stripe_cb_data data;
+               struct lod_obj_stripe_cb_data data = { { 0 } };
 
                data.locd_declare = true;
-               rc = lod_obj_for_each_stripe(env, lo, th,
-                               lod_obj_stripe_destroy_cb, &data);
+               data.locd_stripe_cb = lod_obj_stripe_destroy_cb;
+               rc = lod_obj_for_each_stripe(env, lo, th, &data);
        }
 
        RETURN(rc);
@@ -4793,11 +4865,11 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt,
                        }
                }
        } else {
-               struct lod_obj_stripe_cb_data data;
+               struct lod_obj_stripe_cb_data data = { { 0 } };
 
                data.locd_declare = false;
-               rc = lod_obj_for_each_stripe(env, lo, th,
-                               lod_obj_stripe_destroy_cb, &data);
+               data.locd_stripe_cb = lod_obj_stripe_destroy_cb;
+               rc = lod_obj_for_each_stripe(env, lo, th, &data);
        }
 
        RETURN(rc);
@@ -5055,30 +5127,78 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt)
        return dt_invalidate(env, dt_object_child(dt));
 }
 
-static int lod_declare_layout_change(const struct lu_env *env,
-                                    struct dt_object *dt,
-                                    struct md_layout_change *mlc,
-                                    struct thandle *th)
+static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt)
 {
-       struct lod_thread_info  *info = lod_env_info(env);
-       struct lod_object *lo = lod_dt_obj(dt);
-       struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
-       struct dt_object *next = dt_object_child(dt);
+       ENTRY;
+
+       /* clear memory region that will be used for layout change */
+       memset(&info->lti_layout_attr, 0, sizeof(struct lu_attr));
+       info->lti_count = 0;
+
+       if (info->lti_comp_size >= comp_cnt)
+               RETURN(0);
+
+       if (info->lti_comp_size > 0) {
+               OBD_FREE(info->lti_comp_idx,
+                        info->lti_comp_size * sizeof(__u32));
+               info->lti_comp_size = 0;
+       }
+
+       OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32));
+       if (!info->lti_comp_idx)
+               RETURN(-ENOMEM);
+
+       info->lti_comp_size = comp_cnt;
+       RETURN(0);
+}
+
+static int lod_declare_instantiate_components(const struct lu_env *env,
+               struct lod_object *lo, struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
        struct ost_pool *inuse = &info->lti_inuse_osts;
-       struct layout_intent *layout = mlc->mlc_intent;
-       struct lu_buf *buf = &mlc->mlc_buf;
+       int i;
+       int rc = 0;
+       ENTRY;
+
+       LASSERT(info->lti_count < lo->ldo_comp_cnt);
+       if (info->lti_count > 0) {
+               /* Prepare inuse array for composite file */
+               rc = lod_prepare_inuse(env, lo);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       for (i = 0; i < info->lti_count; i++) {
+               rc = lod_qos_prep_create(env, lo, NULL, th,
+                                        info->lti_comp_idx[i], inuse);
+               if (rc)
+                       break;
+       }
+
+       if (!rc) {
+               info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+               rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+                               &info->lti_buf, XATTR_NAME_LOV, 0, th);
+       }
+
+       RETURN(rc);
+}
+
+static int lod_declare_update_plain(const struct lu_env *env,
+               struct lod_object *lo, struct layout_intent *layout,
+               const struct lu_buf *buf, struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
        struct lod_layout_component *lod_comp;
        struct lov_comp_md_v1 *comp_v1 = NULL;
        bool replay = false;
-       bool need_create = false;
        int i, rc;
        ENTRY;
 
-       if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
-           dt_object_remote(next))
-               RETURN(-EINVAL);
+       LASSERT(lo->ldo_flr_state == LCM_FL_NOT_FLR);
 
-       dt_write_lock(env, next, 0);
        /*
         * In case the client is passing lovea, which only happens during
         * the replay of layout intent write RPC for now, we may need to
@@ -5113,11 +5233,6 @@ static int lod_declare_layout_change(const struct lu_env *env,
                rc = lod_load_striping_locked(env, lo);
                if (rc)
                        GOTO(out, rc);
-
-               /* Prepare inuse array for composite file */
-               rc = lod_prepare_inuse(env, lo);
-               if (rc)
-                       GOTO(out, rc);
        }
 
        /* Make sure defined layout covers the requested write range. */
@@ -5134,7 +5249,7 @@ static int lod_declare_layout_change(const struct lu_env *env,
        }
 
        CDEBUG(D_LAYOUT, "%s: "DFID": instantiate components "DEXT"\n",
-              lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+              lod2obd(d)->obd_name, PFID(lod_object_fid(lo)),
               PEXT(&layout->li_extent));
 
        /*
@@ -5170,30 +5285,292 @@ static int lod_declare_layout_change(const struct lu_env *env,
                if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
                        GOTO(out, rc = -EINVAL);
 
-               need_create = true;
+               LASSERT(info->lti_comp_idx != NULL);
+               info->lti_comp_idx[info->lti_count++] = i;
+       }
+
+       if (info->lti_count == 0)
+               RETURN(-EALREADY);
 
-               rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse);
-               if (rc)
+       lod_obj_inc_layout_gen(lo);
+       rc = lod_declare_instantiate_components(env, lo, th);
+out:
+       if (rc)
+               lod_object_free_striping(env, lo);
+       RETURN(rc);
+}
+
+#define lod_foreach_mirror_comp(comp, lo, mirror_idx)                      \
+for (comp = &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_start];  \
+     comp <= &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_end];   \
+     comp++)
+
+static inline int lod_comp_index(struct lod_object *lo,
+                                struct lod_layout_component *lod_comp)
+{
+       LASSERT(lod_comp >= lo->ldo_comp_entries &&
+               lod_comp <= &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]);
+
+       return lod_comp - lo->ldo_comp_entries;
+}
+
+/**
+ * Stale other mirrors by writing extent.
+ */
+static void lod_stale_components(struct lod_object *lo, int primary,
+                                struct lu_extent *extent)
+{
+       struct lod_layout_component *pri_comp, *lod_comp;
+       int i;
+
+       /* The writing extent decides which components in the primary
+        * are affected... */
+       lod_foreach_mirror_comp(pri_comp, lo, primary) {
+               if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent))
+                       continue;
+
+               for (i = 0; i < lo->ldo_mirror_count; i++) {
+                       if (i == primary)
+                               continue;
+
+                       /* ... and then stale other components that are
+                        * overlapping with primary components */
+                       lod_foreach_mirror_comp(lod_comp, lo, i) {
+                               if (!lu_extent_is_overlapped(
+                                                       &pri_comp->llc_extent,
+                                                       &lod_comp->llc_extent))
+                                       continue;
+
+                               CDEBUG(D_LAYOUT, "stale: %u / %u\n",
+                                     i, lod_comp_index(lo, lod_comp));
+
+                               lod_comp->llc_flags |= LCME_FL_STALE;
+                               lo->ldo_mirrors[i].lme_stale = 1;
+                       }
+               }
+       }
+}
+
+static int lod_declare_update_rdonly(const struct lu_env *env,
+               struct lod_object *lo, struct md_layout_change *mlc,
+               struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lu_attr *layout_attr = &info->lti_layout_attr;
+       struct lod_layout_component *lod_comp;
+       struct layout_intent *layout = mlc->mlc_intent;
+       struct lu_extent extent = layout->li_extent;
+       int picked;
+       int i;
+       int rc;
+       ENTRY;
+
+       LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE);
+       LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY);
+       LASSERT(lo->ldo_mirror_count > 0);
+
+       CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n",
+              PFID(lod_object_fid(lo)), PEXT(&extent));
+
+       /**
+        * Pick a mirror as the primary.
+        * Now it only picks the first mirror, this algo can be
+        * revised later after knowing the topology of cluster or
+        * the availability of OSTs.
+        */
+       for (picked = -1, i = 0; i < lo->ldo_mirror_count; i++) {
+               if (!lo->ldo_mirrors[i].lme_stale) {
+                       picked = i;
                        break;
+               }
        }
+       if (picked < 0) /* failed to pick a primary */
+               RETURN(-ENODATA);
 
-       if (need_create)
-               lod_obj_inc_layout_gen(lo);
-       else
-               GOTO(unlock, rc = -EALREADY);
+       CDEBUG(D_LAYOUT, DFID": picked mirror %u as primary\n",
+              PFID(lod_object_fid(lo)), lo->ldo_mirrors[picked].lme_id);
 
-       if (!rc) {
-               info->lti_buf.lb_len = lod_comp_md_size(lo, false);
-               rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf,
-                                              XATTR_NAME_LOV, 0, th);
+       /* stale overlapping components from other mirrors */
+       lod_stale_components(lo, picked, &extent);
+
+       /* instantiate components for the picked mirror, start from 0 */
+       extent = (struct lu_extent) { 0, layout->li_extent.e_end };
+       lod_foreach_mirror_comp(lod_comp, lo, picked) {
+               if (!lu_extent_is_overlapped(&extent,
+                                            &lod_comp->llc_extent))
+                       break;
+
+               if (lod_comp_inited(lod_comp))
+                       continue;
+
+               CDEBUG(D_LAYOUT, "instantiate: %u / %u\n",
+                      i, lod_comp_index(lo, lod_comp));
+
+               info->lti_comp_idx[info->lti_count++] =
+                                               lod_comp_index(lo, lod_comp);
        }
+
+       lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+
+       /* Reset the layout version once it's becoming too large.
+        * This way it can make sure that the layout version is
+        * monotonously increased in this writing era. */
+       lod_obj_inc_layout_gen(lo);
+       if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) {
+               __u32 layout_version;
+
+               cfs_get_random_bytes(&layout_version, sizeof(layout_version));
+               lo->ldo_layout_gen = layout_version & 0xffff;
+       }
+
+       rc = lod_declare_instantiate_components(env, lo, th);
+       if (rc)
+               GOTO(out, rc);
+
+       layout_attr->la_valid = LA_LAYOUT_VERSION;
+       layout_attr->la_layout_version = 0; /* set current version */
+       rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+       if (rc)
+               GOTO(out, rc);
+
 out:
        if (rc)
                lod_object_free_striping(env, lo);
+       RETURN(rc);
+}
 
-unlock:
-       dt_write_unlock(env, next);
+static int lod_declare_update_write_pending(const struct lu_env *env,
+               struct lod_object *lo, struct md_layout_change *mlc,
+               struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lu_attr *layout_attr = &info->lti_layout_attr;
+       struct lod_layout_component *lod_comp;
+       struct lu_extent extent = { 0 };
+       int primary = -1;
+       int i;
+       int rc;
+       ENTRY;
+
+       LASSERT(lo->ldo_flr_state == LCM_FL_WRITE_PENDING);
+       LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE);
+
+       /* look for the primary mirror */
+       for (i = 0; i < lo->ldo_mirror_count; i++) {
+               if (lo->ldo_mirrors[i].lme_stale)
+                       continue;
+
+               LASSERTF(primary < 0, DFID " has multiple primary: %u / %u",
+                        PFID(lod_object_fid(lo)),
+                        lo->ldo_mirrors[i].lme_id,
+                        lo->ldo_mirrors[primary].lme_id);
+
+               primary = i;
+       }
+       if (primary < 0) {
+               CERROR(DFID ": doesn't have a primary mirror\n",
+                      PFID(lod_object_fid(lo)));
+               GOTO(out, rc = -ENODATA);
+       }
+
+       CDEBUG(D_LAYOUT, DFID": found primary %u\n",
+              PFID(lod_object_fid(lo)), lo->ldo_mirrors[primary].lme_id);
 
+       LASSERT(!lo->ldo_mirrors[primary].lme_stale);
+
+       /* for LAYOUT_WRITE opc, it has to do the following operations:
+        * 1. stale overlapping componets from stale mirrors;
+        * 2. instantiate components of the primary mirror;
+        * 3. transfter layout version to all objects of the primary; */
+
+       if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+               LASSERT(mlc->mlc_intent != NULL);
+
+               extent = mlc->mlc_intent->li_extent;
+
+               CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n",
+                      PFID(lod_object_fid(lo)), PEXT(&extent));
+
+               /* 1. stale overlapping components */
+               lod_stale_components(lo, primary, &extent);
+
+               /* 2. find out the components need instantiating.
+                * instantiate [0, mlc->mlc_intent->e_end) */
+               extent.e_start = 0;
+               lod_foreach_mirror_comp(lod_comp, lo, primary) {
+                       if (!lu_extent_is_overlapped(&extent,
+                                                    &lod_comp->llc_extent))
+                               break;
+
+                       if (lod_comp_inited(lod_comp))
+                               continue;
+
+                       CDEBUG(D_LAYOUT, "write instantiate %d / %d\n",
+                              primary, lod_comp_index(lo, lod_comp));
+                       info->lti_comp_idx[info->lti_count++] =
+                                               lod_comp_index(lo, lod_comp);
+               }
+       }
+
+       rc = lod_declare_instantiate_components(env, lo, th);
+       if (rc)
+               GOTO(out, rc);
+
+       layout_attr->la_valid = LA_LAYOUT_VERSION;
+       layout_attr->la_layout_version = 0; /* set current version */
+       rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+       if (rc)
+               GOTO(out, rc);
+
+       lod_obj_inc_layout_gen(lo);
+out:
+       if (rc)
+               lod_object_free_striping(env, lo);
+       RETURN(rc);
+}
+
+static int lod_declare_layout_change(const struct lu_env *env,
+               struct dt_object *dt, struct md_layout_change *mlc,
+               struct thandle *th)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       int rc;
+       ENTRY;
+
+       if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
+           dt_object_remote(dt_object_child(dt)))
+               RETURN(-EINVAL);
+
+       lod_write_lock(env, dt, 0);
+       rc = lod_load_striping_locked(env, lo);
+       if (rc)
+               GOTO(out, rc);
+
+       LASSERT(lo->ldo_comp_cnt > 0);
+
+       rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+       if (rc)
+               GOTO(out, rc);
+
+       switch (lo->ldo_flr_state) {
+       case LCM_FL_NOT_FLR:
+               rc = lod_declare_update_plain(env, lo, mlc->mlc_intent,
+                                             &mlc->mlc_buf, th);
+               break;
+       case LCM_FL_RDONLY:
+               rc = lod_declare_update_rdonly(env, lo, mlc, th);
+               break;
+       case LCM_FL_WRITE_PENDING:
+               rc = lod_declare_update_write_pending(env, lo, mlc, th);
+               break;
+       case LCM_FL_SYNC_PENDING:
+       default:
+               rc = -ENOTSUPP;
+               break;
+       }
+out:
+       dt_write_unlock(env, dt);
        RETURN(rc);
 }
 
@@ -5204,8 +5581,17 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt,
                             struct md_layout_change *mlc, struct thandle *th)
 {
        struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+       struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr;
+       struct lod_object *lo = lod_dt_obj(dt);
+       int rc;
 
-       RETURN(lod_striped_create(env, dt, attr, NULL, th));
+       rc = lod_striped_create(env, dt, attr, NULL, th);
+       if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) {
+               layout_attr->la_layout_version |= lo->ldo_layout_gen;
+               rc = lod_attr_set(env, dt, layout_attr, th);
+       }
+
+       return rc;
 }
 
 struct dt_object_operations lod_obj_ops = {