struct thandle *th)
{
struct lod_thread_info *info = lod_env_info(env);
- struct lod_layout_component *comp_array, *lod_comp;
+ struct lod_layout_component *comp_array, *lod_comp, *old_array;
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
struct dt_object *next = dt_object_child(dt);
struct lov_desc *desc = &d->lod_desc;
struct lov_user_md_v3 *v3;
struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
__u32 magic;
- int i, rc, array_cnt;
+ int i, rc, array_cnt, old_array_cnt;
ENTRY;
LASSERT(lo->ldo_is_composite);
+ if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+ RETURN(-EBUSY);
+
rc = lod_verify_striping(d, lo, buf, false);
if (rc != 0)
RETURN(rc);
}
}
- OBD_FREE(lo->ldo_comp_entries, sizeof(*lod_comp) * lo->ldo_comp_cnt);
+ old_array = lo->ldo_comp_entries;
+ old_array_cnt = lo->ldo_comp_cnt;
+
lo->ldo_comp_entries = comp_array;
lo->ldo_comp_cnt = array_cnt;
info->lti_buf.lb_len = lod_comp_md_size(lo, false);
rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf,
XATTR_NAME_LOV, 0, th);
- if (rc)
+ if (rc) {
+ lo->ldo_comp_entries = old_array;
+ lo->ldo_comp_cnt = old_array_cnt;
GOTO(error, rc);
+ }
+
+ OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt);
+
+ LASSERT(lo->ldo_mirror_count == 1);
+ lo->ldo_mirrors[0].lme_end = array_cnt - 1;
RETURN(0);
LASSERT(lo->ldo_is_composite);
+ if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+ RETURN(-EBUSY);
+
magic = comp_v1->lcm_magic;
if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
lustre_swab_lov_comp_md_v1(comp_v1);
sizeof(*comp_array) * lo->ldo_comp_cnt);
lo->ldo_comp_entries = comp_array;
lo->ldo_comp_cnt = left;
+
+ LASSERT(lo->ldo_mirror_count == 1);
+ lo->ldo_mirrors[0].lme_end = left - 1;
lod_obj_inc_layout_gen(lo);
} else {
lod_free_comp_entries(lo);
struct lov_user_md_v3 *v3 = NULL;
struct lov_comp_md_v1 *comp_v1 = NULL;
__u16 comp_cnt;
+ __u16 mirror_cnt;
bool composite;
int rc, i;
ENTRY;
comp_cnt = comp_v1->lcm_entry_count;
if (comp_cnt == 0)
RETURN(-EINVAL);
+ mirror_cnt = comp_v1->lcm_mirror_count + 1;
composite = true;
} else {
comp_cnt = 1;
+ mirror_cnt = 0;
composite = false;
}
RETURN(rc);
lds->lds_def_comp_cnt = comp_cnt;
- lds->lds_def_striping_is_composite = composite ? 1 : 0;
+ lds->lds_def_striping_is_composite = composite;
+ lds->lds_def_mirror_cnt = mirror_cnt;
for (i = 0; i < comp_cnt; i++) {
struct lod_layout_component *lod_comp;
int i, rc;
if (lds->lds_def_striping_set && S_ISREG(mode)) {
- rc = lod_alloc_comp_entries(lo, lds->lds_def_comp_cnt);
+ rc = lod_alloc_comp_entries(lo, lds->lds_def_mirror_cnt,
+ lds->lds_def_comp_cnt);
if (rc != 0)
return;
if (lod_need_inherit_more(lc, false)) {
if (lc->ldo_comp_cnt == 0) {
- rc = lod_alloc_comp_entries(lc, 1);
+ rc = lod_alloc_comp_entries(lc, 0, 1);
if (rc)
/* fail to allocate memory, will create a
* non-striped file. */
}
/**
+ * Generate component ID for new created component.
+ *
+ * \param[in] lo LOD object
+ * \param[in] comp_idx index of ldo_comp_entries
+ *
+ * \retval component ID on success
+ * \retval LCME_ID_INVAL on failure
+ */
+static __u32 lod_gen_component_id(struct lod_object *lo,
+ int mirror_id, int comp_idx)
+{
+ struct lod_layout_component *lod_comp;
+ __u32 id, start, end;
+ int i;
+
+ LASSERT(lo->ldo_comp_entries[comp_idx].llc_id == LCME_ID_INVAL);
+
+ lod_obj_inc_layout_gen(lo);
+ id = lo->ldo_layout_gen;
+ if (likely(id <= SEQ_ID_MAX))
+ RETURN(pflr_id(mirror_id, id & SEQ_ID_MASK));
+
+ /* Layout generation wraps, need to check collisions. */
+ start = id & SEQ_ID_MASK;
+ end = SEQ_ID_MAX;
+again:
+ for (id = start; id <= end; id++) {
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ lod_comp = &lo->ldo_comp_entries[i];
+ if (pflr_id(mirror_id, id) == lod_comp->llc_id)
+ break;
+ }
+ /* Found the ununsed ID */
+ if (i == lo->ldo_comp_cnt)
+ RETURN(pflr_id(mirror_id, id));
+ }
+ if (end == LCME_ID_MAX) {
+ start = 1;
+ end = min(lo->ldo_layout_gen & LCME_ID_MASK,
+ (__u32)(LCME_ID_MAX - 1));
+ goto again;
+ }
+
+ RETURN(LCME_ID_INVAL);
+}
+
+/**
* Creation of a striped regular object.
*
* The function is called to create the stripe objects for a regular
for (i = 0; i < lo->ldo_comp_cnt; i++) {
lod_comp = &lo->ldo_comp_entries[i];
+ if (lod_comp->llc_id == LCME_ID_INVAL) {
+ lod_comp->llc_id = lod_gen_component_id(lo, 0, i);
+ if (lod_comp->llc_id == LCME_ID_INVAL)
+ GOTO(out, rc = -ERANGE);
+ }
+
if (lod_comp_inited(lod_comp))
continue;
LASSERT(object != NULL);
rc = lod_sub_create(env, object, attr, NULL, dof, th);
if (rc)
- break;
+ GOTO(out, rc);
}
lod_comp_set_init(lod_comp);
}
- if (rc == 0)
- rc = lod_generate_and_set_lovea(env, lo, th);
+ rc = lod_fill_mirrors(lo);
+ if (rc)
+ GOTO(out, rc);
- if (rc == 0)
- lo->ldo_comp_cached = 1;
- else
- lod_object_free_striping(env, lo);
+ rc = lod_generate_and_set_lovea(env, lo, th);
+ if (rc)
+ GOTO(out, rc);
+
+ lo->ldo_comp_cached = 1;
+ RETURN(0);
+out:
+ lod_object_free_striping(env, lo);
RETURN(rc);
}