+ LASSERT(info->lti_comp_idx != NULL);
+ info->lti_comp_idx[info->lti_count++] = i;
+ layout_changed = true;
+ }
+
+ if (!layout_changed)
+ RETURN(-EALREADY);
+
+ lod_obj_inc_layout_gen(lo);
+ rc = lod_declare_instantiate_components(env, lo, th, 0);
+ EXIT;
+out:
+ if (rc)
+ lod_striping_free(env, lo);
+ return rc;
+}
+
+static inline int lod_comp_index(struct lod_object *lo,
+ struct lod_layout_component *lod_comp)
+{
+ LASSERT(lod_comp >= lo->ldo_comp_entries &&
+ lod_comp <= &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]);
+
+ return lod_comp - lo->ldo_comp_entries;
+}
+
+/**
+ * Stale other mirrors by writing extent.
+ */
+static int lod_stale_components(const struct lu_env *env, struct lod_object *lo,
+ int primary, struct lu_extent *extent,
+ struct thandle *th)
+{
+ struct lod_layout_component *pri_comp, *lod_comp;
+ struct lu_extent pri_extent;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ /* The writing extent decides which components in the primary
+ * are affected... */
+ CDEBUG(D_LAYOUT, "primary mirror %d, "DEXT"\n", primary, PEXT(extent));
+
+restart:
+ lod_foreach_mirror_comp(pri_comp, lo, primary) {
+ if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent))
+ continue;
+
+ CDEBUG(D_LAYOUT, "primary comp %u "DEXT"\n",
+ lod_comp_index(lo, pri_comp),
+ PEXT(&pri_comp->llc_extent));
+
+ pri_extent.e_start = pri_comp->llc_extent.e_start;
+ pri_extent.e_end = pri_comp->llc_extent.e_end;
+
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (i == primary)
+ continue;
+ rc = lod_declare_update_extents(env, lo, &pri_extent,
+ th, i, 0);
+ /* if update_extents changed the layout, it may have
+ * reallocated the component array, so start over to
+ * avoid using stale pointers */
+ if (rc == 1)
+ goto restart;
+ if (rc < 0)
+ RETURN(rc);
+
+ /* ... and then stale other components that are
+ * overlapping with primary components */
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lu_extent_is_overlapped(
+ &pri_extent,
+ &lod_comp->llc_extent))
+ continue;
+
+ CDEBUG(D_LAYOUT, "stale: %u / %u\n",
+ i, lod_comp_index(lo, lod_comp));
+
+ lod_comp->llc_flags |= LCME_FL_STALE;
+ lo->ldo_mirrors[i].lme_stale = 1;
+ }
+ }
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * check an OST's availability
+ * \param[in] env execution environment
+ * \param[in] lo lod object
+ * \param[in] dt dt object
+ * \param[in] index mirror index
+ *
+ * \retval negative if failed
+ * \retval 1 if \a dt is available
+ * \retval 0 if \a dt is not available
+ */
+static inline int lod_check_ost_avail(const struct lu_env *env,
+ struct lod_object *lo,
+ struct dt_object *dt, int index)
+{
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ struct lod_tgt_desc *ost;
+ __u32 idx;
+ int type = LU_SEQ_RANGE_OST;
+ int rc;
+
+ rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &idx, &type);
+ if (rc < 0) {
+ CERROR("%s: can't locate "DFID":rc = %d\n",
+ lod2obd(lod)->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+ rc);
+ return rc;
+ }
+
+ ost = OST_TGT(lod, idx);
+ if (ost->ltd_statfs.os_state &
+ (OS_STATFS_READONLY | OS_STATFS_ENOSPC | OS_STATFS_ENOINO |
+ OS_STATFS_NOPRECREATE) ||
+ ost->ltd_active == 0) {
+ CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n",
+ PFID(lod_object_fid(lo)), index, idx, rc);
+ return 0;
+ }
+
+ return 1;
+}
+
+/**
+ * Pick primary mirror for write
+ * \param[in] env execution environment
+ * \param[in] lo object
+ * \param[in] extent write range
+ */
+static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo,
+ struct lu_extent *extent)
+{
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ unsigned int seq = 0;
+ struct lod_layout_component *lod_comp;
+ int i, j, rc;
+ int picked = -1, second_pick = -1, third_pick = -1;
+ ENTRY;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) {
+ get_random_bytes(&seq, sizeof(seq));
+ seq %= lo->ldo_mirror_count;
+ }
+
+ /**
+ * Pick a mirror as the primary, and check the availability of OSTs.
+ *
+ * This algo can be revised later after knowing the topology of
+ * cluster.
+ */
+ lod_qos_statfs_update(env, lod, &lod->lod_ost_descs);
+
+ rc = lod_fill_mirrors(lo);
+ if (rc)
+ RETURN(rc);
+
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ bool ost_avail = true;
+ int index = (i + seq) % lo->ldo_mirror_count;
+
+ if (lo->ldo_mirrors[index].lme_stale) {
+ CDEBUG(D_LAYOUT, DFID": mirror %d stale\n",
+ PFID(lod_object_fid(lo)), index);
+ continue;
+ }
+
+ /* 2nd pick is for the primary mirror containing unavail OST */
+ if (lo->ldo_mirrors[index].lme_prefer && second_pick < 0)
+ second_pick = index;
+
+ /* 3rd pick is for non-primary mirror containing unavail OST */
+ if (second_pick < 0 && third_pick < 0)
+ third_pick = index;
+
+ /**
+ * we found a non-primary 1st pick, we'd like to find a
+ * potential pirmary mirror.
+ */
+ if (picked >= 0 && !lo->ldo_mirrors[index].lme_prefer)
+ continue;
+
+ /* check the availability of OSTs */
+ lod_foreach_mirror_comp(lod_comp, lo, index) {
+ if (!lod_comp_inited(lod_comp) || !lod_comp->llc_stripe)
+ continue;
+
+ for (j = 0; j < lod_comp->llc_stripe_count; j++) {
+ struct dt_object *dt = lod_comp->llc_stripe[j];
+
+ rc = lod_check_ost_avail(env, lo, dt, index);
+ if (rc < 0)
+ RETURN(rc);
+
+ ost_avail = !!rc;
+ if (!ost_avail)
+ break;
+ } /* for all dt object in one component */
+ if (!ost_avail)
+ break;
+ } /* for all components in a mirror */
+
+ /**
+ * the OSTs where allocated objects locates in the components
+ * of the mirror are available.
+ */
+ if (!ost_avail)
+ continue;
+
+ /* this mirror has all OSTs available */
+ picked = index;
+
+ /**
+ * primary with all OSTs are available, this is the perfect
+ * 1st pick.
+ */
+ if (lo->ldo_mirrors[index].lme_prefer)
+ break;
+ } /* for all mirrors */
+
+ /* failed to pick a sound mirror, lower our expectation */
+ if (picked < 0)
+ picked = second_pick;
+ if (picked < 0)
+ picked = third_pick;
+ if (picked < 0)
+ RETURN(-ENODATA);
+
+ RETURN(picked);
+}
+
+static int lod_prepare_resync_mirror(const struct lu_env *env,
+ struct lod_object *lo,
+ __u16 mirror_id)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_layout_component *lod_comp;
+ bool neg = !!(MIRROR_ID_NEG & mirror_id);
+ int i;
+
+ mirror_id &= ~MIRROR_ID_NEG;
+
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if ((!neg && lo->ldo_mirrors[i].lme_id != mirror_id) ||
+ (neg && lo->ldo_mirrors[i].lme_id == mirror_id))
+ continue;
+
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * figure out the components should be instantiated for resync.
+ */
+static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo,
+ struct lu_extent *extent)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_layout_component *lod_comp;
+ unsigned int need_sync = 0;
+ int i;
+
+ CDEBUG(D_LAYOUT,
+ DFID": instantiate all stale components in "DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(extent));
+
+ /**
+ * instantiate all components within this extent, even non-stale
+ * components.
+ */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (!lo->ldo_mirrors[i].lme_stale)
+ continue;
+
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lu_extent_is_overlapped(extent,
+ &lod_comp->llc_extent))
+ break;
+
+ need_sync++;
+
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n",
+ i, lod_comp_index(lo, lod_comp));
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+ }
+
+ return need_sync ? 0 : -EALREADY;
+}
+
+static int lod_declare_update_rdonly(const struct lu_env *env,
+ struct lod_object *lo, struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_attr *layout_attr = &info->lti_layout_attr;
+ struct lod_layout_component *lod_comp;
+ struct lu_extent extent = { 0 };
+ int rc;
+ ENTRY;
+
+ LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY);
+ LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE ||
+ mlc->mlc_opc == MD_LAYOUT_RESYNC);
+ LASSERT(lo->ldo_mirror_count > 0);
+
+ if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+ struct layout_intent *layout = mlc->mlc_intent;
+ int write = layout->li_opc == LAYOUT_INTENT_WRITE;
+ int picked;
+
+ extent = layout->li_extent;
+ CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(&extent));
+
+ picked = lod_primary_pick(env, lo, &extent);
+ if (picked < 0)
+ RETURN(picked);
+
+ CDEBUG(D_LAYOUT, DFID": picked mirror id %u as primary\n",
+ PFID(lod_object_fid(lo)),
+ lo->ldo_mirrors[picked].lme_id);
+
+ /* Update extents of primary before staling */
+ rc = lod_declare_update_extents(env, lo, &extent, th, picked,
+ write);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (layout->li_opc == LAYOUT_INTENT_TRUNC) {
+ /**
+ * trunc transfers [0, size) in the intent extent, we'd
+ * stale components overlapping [size, eof).
+ */
+ extent.e_start = extent.e_end;
+ extent.e_end = OBD_OBJECT_EOF;
+ }
+
+ /* stale overlapping components from other mirrors */
+ rc = lod_stale_components(env, lo, picked, &extent, th);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* restore truncate intent extent */
+ if (layout->li_opc == LAYOUT_INTENT_TRUNC)
+ extent.e_end = extent.e_start;
+
+ /* instantiate components for the picked mirror, start from 0 */
+ extent.e_start = 0;
+
+ lod_foreach_mirror_comp(lod_comp, lo, picked) {
+ if (!lu_extent_is_overlapped(&extent,
+ &lod_comp->llc_extent))
+ break;
+
+ if (!lod_is_instantiation_needed(lod_comp))
+ continue;
+
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+
+ lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+ } else { /* MD_LAYOUT_RESYNC */
+ int i;
+
+ /**
+ * could contain multiple non-stale mirrors, so we need to
+ * prep uninited all components assuming any non-stale mirror
+ * could be picked as the primary mirror.
+ */
+ if (mlc->mlc_mirror_id == 0) {
+ /* normal resync */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (lo->ldo_mirrors[i].lme_stale)
+ continue;
+
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lod_comp_inited(lod_comp))
+ break;
+
+ if (extent.e_end <
+ lod_comp->llc_extent.e_end)
+ extent.e_end =
+ lod_comp->llc_extent.e_end;
+ }
+ }
+ rc = lod_prepare_resync(env, lo, &extent);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ /* mirror write, try to init its all components */
+ rc = lod_prepare_resync_mirror(env, lo,
+ mlc->mlc_mirror_id);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ /* change the file state to SYNC_PENDING */
+ lo->ldo_flr_state = LCM_FL_SYNC_PENDING;
+ }
+
+ /* Reset the layout version once it's becoming too large.
+ * This way it can make sure that the layout version is
+ * monotonously increased in this writing era. */
+ lod_obj_inc_layout_gen(lo);
+ if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) {
+ __u32 layout_version;
+
+ get_random_bytes(&layout_version, sizeof(layout_version));
+ lo->ldo_layout_gen = layout_version & 0xffff;
+ }
+
+ rc = lod_declare_instantiate_components(env, lo, th, 0);
+ if (rc)
+ GOTO(out, rc);
+
+ layout_attr->la_valid = LA_LAYOUT_VERSION;
+ layout_attr->la_layout_version = 0; /* set current version */
+ if (mlc->mlc_opc == MD_LAYOUT_RESYNC)
+ layout_attr->la_layout_version = LU_LAYOUT_RESYNC;
+ rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+ if (rc)
+ GOTO(out, rc);
+
+out:
+ if (rc)
+ lod_striping_free(env, lo);
+ RETURN(rc);
+}
+
+static int lod_declare_update_write_pending(const struct lu_env *env,
+ struct lod_object *lo, struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_attr *layout_attr = &info->lti_layout_attr;
+ struct lod_layout_component *lod_comp;
+ struct lu_extent extent = { 0 };
+ int primary = -1;
+ int i;
+ int rc;
+ ENTRY;
+
+ LASSERT(lo->ldo_flr_state == LCM_FL_WRITE_PENDING);
+ LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE ||
+ mlc->mlc_opc == MD_LAYOUT_RESYNC);
+
+ /* look for the first preferred mirror */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (lo->ldo_mirrors[i].lme_stale)
+ continue;
+ if (lo->ldo_mirrors[i].lme_prefer == 0)
+ continue;
+
+ primary = i;
+ break;
+ }
+ if (primary < 0) {
+ /* no primary, use any in-sync */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (lo->ldo_mirrors[i].lme_stale)
+ continue;
+ primary = i;
+ break;
+ }
+ if (primary < 0) {
+ CERROR(DFID ": doesn't have a primary mirror\n",
+ PFID(lod_object_fid(lo)));
+ GOTO(out, rc = -ENODATA);
+ }
+ }
+
+ CDEBUG(D_LAYOUT, DFID": found primary %u\n",
+ PFID(lod_object_fid(lo)), lo->ldo_mirrors[primary].lme_id);
+
+ LASSERT(!lo->ldo_mirrors[primary].lme_stale);
+
+ /* for LAYOUT_WRITE opc, it has to do the following operations:
+ * 1. stale overlapping componets from stale mirrors;
+ * 2. instantiate components of the primary mirror;
+ * 3. transfter layout version to all objects of the primary;
+ *
+ * for LAYOUT_RESYNC opc, it will do:
+ * 1. instantiate components of all stale mirrors;
+ * 2. transfer layout version to all objects to close write era. */
+
+ if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+ struct layout_intent *layout = mlc->mlc_intent;
+ int write = layout->li_opc == LAYOUT_INTENT_WRITE;
+
+ LASSERT(mlc->mlc_intent != NULL);
+
+ extent = mlc->mlc_intent->li_extent;
+
+ CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(&extent));
+
+ /* 1. Update extents of primary before staling */
+ rc = lod_declare_update_extents(env, lo, &extent, th, primary,
+ write);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) {
+ /**
+ * trunc transfers [0, size) in the intent extent, we'd
+ * stale components overlapping [size, eof).
+ */
+ extent.e_start = extent.e_end;
+ extent.e_end = OBD_OBJECT_EOF;
+ }
+
+ /* 2. stale overlapping components */
+ rc = lod_stale_components(env, lo, primary, &extent, th);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* 3. find the components which need instantiating.
+ * instantiate [0, mlc->mlc_intent->e_end) */
+
+ /* restore truncate intent extent */
+ if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC)
+ extent.e_end = extent.e_start;
+ extent.e_start = 0;
+
+ lod_foreach_mirror_comp(lod_comp, lo, primary) {
+ if (!lu_extent_is_overlapped(&extent,
+ &lod_comp->llc_extent))
+ break;
+
+ if (!lod_is_instantiation_needed(lod_comp))
+ continue;
+
+ CDEBUG(D_LAYOUT, "write instantiate %d / %d\n",
+ primary, lod_comp_index(lo, lod_comp));
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+ } else { /* MD_LAYOUT_RESYNC */
+ if (mlc->mlc_mirror_id == 0) {
+ /* normal resync */
+ lod_foreach_mirror_comp(lod_comp, lo, primary) {
+ if (!lod_comp_inited(lod_comp))
+ break;
+
+ extent.e_end = lod_comp->llc_extent.e_end;
+ }
+
+ rc = lod_prepare_resync(env, lo, &extent);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ /* mirror write, try to init its all components */
+ rc = lod_prepare_resync_mirror(env, lo,
+ mlc->mlc_mirror_id);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ /* change the file state to SYNC_PENDING */
+ lo->ldo_flr_state = LCM_FL_SYNC_PENDING;
+ }
+
+ rc = lod_declare_instantiate_components(env, lo, th, 0);
+ if (rc)
+ GOTO(out, rc);
+
+ /* 3. transfer layout version to OST objects.
+ * transfer new layout version to OST objects so that stale writes
+ * can be denied. It also ends an era of writing by setting
+ * LU_LAYOUT_RESYNC. Normal client can never use this bit to
+ * send write RPC; only resync RPCs could do it. */
+ layout_attr->la_valid = LA_LAYOUT_VERSION;
+ layout_attr->la_layout_version = 0; /* set current version */
+ if (mlc->mlc_opc == MD_LAYOUT_RESYNC)
+ layout_attr->la_layout_version = LU_LAYOUT_RESYNC;
+ rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+ if (rc)
+ GOTO(out, rc);
+
+ lod_obj_inc_layout_gen(lo);
+out:
+ if (rc)
+ lod_striping_free(env, lo);
+ RETURN(rc);
+}
+
+static int lod_declare_update_sync_pending(const struct lu_env *env,
+ struct lod_object *lo, struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_attr *layout_attr = &info->lti_layout_attr;
+ unsigned sync_components = 0;
+ unsigned resync_components = 0;
+ int i;
+ int rc;
+ ENTRY;
+
+ LASSERT(lo->ldo_flr_state == LCM_FL_SYNC_PENDING);
+ LASSERT(mlc->mlc_opc == MD_LAYOUT_RESYNC_DONE ||
+ mlc->mlc_opc == MD_LAYOUT_WRITE);
+
+ CDEBUG(D_LAYOUT, DFID ": received op %d in sync pending\n",
+ PFID(lod_object_fid(lo)), mlc->mlc_opc);
+
+ if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+ CDEBUG(D_LAYOUT, DFID": cocurrent write to sync pending\n",
+ PFID(lod_object_fid(lo)));
+
+ lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+ return lod_declare_update_write_pending(env, lo, mlc, th);
+ }
+
+ /* MD_LAYOUT_RESYNC_DONE */
+
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ struct lod_layout_component *lod_comp;
+ int j;
+
+ lod_comp = &lo->ldo_comp_entries[i];
+
+ if (!(lod_comp->llc_flags & LCME_FL_STALE)) {
+ sync_components++;
+ continue;
+ }
+
+ for (j = 0; j < mlc->mlc_resync_count; j++) {
+ if (lod_comp->llc_id != mlc->mlc_resync_ids[j])
+ continue;
+
+ mlc->mlc_resync_ids[j] = LCME_ID_INVAL;
+ lod_comp->llc_flags &= ~LCME_FL_STALE;
+ resync_components++;
+ break;
+ }
+ }
+
+ /* valid check */
+ for (i = 0; i < mlc->mlc_resync_count; i++) {
+ if (mlc->mlc_resync_ids[i] == LCME_ID_INVAL)
+ continue;
+
+ CDEBUG(D_LAYOUT, DFID": lcme id %u (%d / %zd) not exist "
+ "or already synced\n", PFID(lod_object_fid(lo)),
+ mlc->mlc_resync_ids[i], i, mlc->mlc_resync_count);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ if (!sync_components || (mlc->mlc_resync_count && !resync_components)) {
+ CDEBUG(D_LAYOUT, DFID": no mirror in sync\n",
+ PFID(lod_object_fid(lo)));
+
+ /* tend to return an error code here to prevent
+ * the MDT from setting SoM attribute */
+ GOTO(out, rc = -EINVAL);
+ }
+
+ CDEBUG(D_LAYOUT, DFID": synced %u resynced %u/%zu components\n",
+ PFID(lod_object_fid(lo)),
+ sync_components, resync_components, mlc->mlc_resync_count);
+
+ lo->ldo_flr_state = LCM_FL_RDONLY;
+ lod_obj_inc_layout_gen(lo);
+
+ layout_attr->la_valid = LA_LAYOUT_VERSION;
+ layout_attr->la_layout_version = 0; /* set current version */
+ rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+ if (rc)
+ GOTO(out, rc);
+
+ info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+ &info->lti_buf, XATTR_NAME_LOV, 0, th);
+ EXIT;
+
+out:
+ if (rc)
+ lod_striping_free(env, lo);
+ RETURN(rc);
+}
+
+typedef int (*mlc_handler)(const struct lu_env *env, struct dt_object *dt,
+ const struct md_layout_change *mlc,
+ struct thandle *th);
+
+/**
+ * Attach stripes after target's for migrating directory. NB, we
+ * only need to declare this, the actual work is done inside
+ * lod_xattr_set_lmv().
+ *
+ * \param[in] env execution environment
+ * \param[in] dt target object
+ * \param[in] mlc layout change data
+ * \param[in] th transaction handle
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+static int lod_dir_declare_layout_attach(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct dt_object_format *dof = &info->lti_format;
+ struct lmv_mds_md_v1 *lmv = mlc->mlc_buf.lb_buf;
+ struct dt_object **stripes;
+ __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+ struct lu_fid *fid = &info->lti_fid;
+ struct lod_tgt_desc *tgt;
+ struct dt_object *dto;
+ struct dt_device *tgt_dt;
+ int type = LU_SEQ_RANGE_ANY;
+ struct dt_insert_rec *rec = &info->lti_dt_rec;
+ char *stripe_name = info->lti_key;
+ struct lu_name *sname;
+ struct linkea_data ldata = { NULL };
+ struct lu_buf linkea_buf;
+ __u32 idx;
+ int i;
+ int rc;
+
+ ENTRY;
+
+ if (!lmv_is_sane(lmv))
+ RETURN(-EINVAL);
+
+ if (!dt_try_as_dir(env, dt))
+ return -ENOTDIR;
+
+ dof->dof_type = DFT_DIR;
+
+ OBD_ALLOC_PTR_ARRAY(stripes, (lo->ldo_dir_stripe_count + stripe_count));
+ if (!stripes)
+ RETURN(-ENOMEM);
+
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++)
+ stripes[i] = lo->ldo_stripe[i];
+
+ rec->rec_type = S_IFDIR;
+
+ for (i = 0; i < stripe_count; i++) {
+ fid_le_to_cpu(fid,
+ &lmv->lmv_stripe_fids[i]);
+ if (!fid_is_sane(fid))
+ continue;
+
+ rc = lod_fld_lookup(env, lod, fid, &idx, &type);
+ if (rc)
+ GOTO(out, rc);
+
+ if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
+ tgt_dt = lod->lod_child;
+ } else {
+ tgt = LTD_TGT(ltd, idx);
+ if (tgt == NULL)
+ GOTO(out, rc = -ESTALE);
+ tgt_dt = tgt->ltd_tgt;
+ }
+
+ dto = dt_locate_at(env, tgt_dt, fid,
+ lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+ NULL);
+ if (IS_ERR(dto))
+ GOTO(out, rc = PTR_ERR(dto));
+
+ stripes[i + lo->ldo_dir_stripe_count] = dto;
+
+ if (!dt_try_as_dir(env, dto))
+ GOTO(out, rc = -ENOTDIR);
+
+ rc = lod_sub_declare_ref_add(env, dto, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rec->rec_fid = lu_object_fid(&dto->do_lu);
+ rc = lod_sub_declare_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ GOTO(out, rc);