/* default LOV */
/* current layout component count */
__u16 lds_def_comp_cnt;
+ __u16 lds_def_mirror_cnt;
/* the largest comp count ever used */
__u32 lds_def_comp_size_cnt;
struct lod_layout_component *lds_def_comp_entries;
lds_dir_def_striping_set:1;
};
+struct lod_mirror_entry {
+ __u16 lme_stale:1;
+ /* mirror id */
+ __u16 lme_id;
+ /* start,end index of this mirror in ldo_comp_entries */
+ __u16 lme_start;
+ __u16 lme_end;
+};
+
struct lod_object {
/* common fields for both files and directories */
struct dt_object ldo_obj;
/* Layout component count for a regular file.
* It equals to 1 for non-composite layout. */
__u16 ldo_comp_cnt;
+ /* Layout mirror count for a PFLR file.
+ * It's 0 for files with non-composite layout. */
__u16 ldo_mirror_count;
+ struct lod_mirror_entry *ldo_mirrors;
__u32 ldo_is_composite:1,
ldo_flr_state:2,
ldo_comp_cached:1;
static inline int lod_set_def_pool(struct lod_default_striping *lds,
int i, const char *new_pool)
{
- return lod_set_pool(&lds->lds_def_comp_entries[i].llc_pool,
- new_pool);
+ return lod_set_pool(&lds->lds_def_comp_entries[i].llc_pool, new_pool);
}
static inline int lod_obj_set_pool(struct lod_object *lo, int i,
const char *new_pool)
{
- return lod_set_pool(&lo->ldo_comp_entries[i].llc_pool,
- new_pool);
+ return lod_set_pool(&lo->ldo_comp_entries[i].llc_pool, new_pool);
}
/**
int lod_def_striping_comp_resize(struct lod_default_striping *lds, __u16 count);
void lod_free_def_comp_entries(struct lod_default_striping *lds);
void lod_free_comp_entries(struct lod_object *lo);
-int lod_alloc_comp_entries(struct lod_object *lo, int cnt);
+int lod_alloc_comp_entries(struct lod_object *lo, int mirror_cnt, int comp_cnt);
+int lod_fill_mirrors(struct lod_object *lo);
/* lod_pool.c */
int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count);
void lod_free_comp_entries(struct lod_object *lo)
{
+ if (lo->ldo_mirrors) {
+ OBD_FREE(lo->ldo_mirrors,
+ sizeof(*lo->ldo_mirrors) * lo->ldo_mirror_count);
+ lo->ldo_mirrors = NULL;
+ lo->ldo_mirror_count = 0;
+ }
lod_free_comp_buffer(lo->ldo_comp_entries,
lo->ldo_comp_cnt,
sizeof(*lo->ldo_comp_entries) * lo->ldo_comp_cnt);
lo->ldo_is_composite = 0;
}
-int lod_alloc_comp_entries(struct lod_object *lo, int cnt)
+int lod_alloc_comp_entries(struct lod_object *lo,
+ int mirror_count, int comp_count)
{
- LASSERT(cnt != 0);
+ LASSERT(comp_count != 0);
LASSERT(lo->ldo_comp_cnt == 0 && lo->ldo_comp_entries == NULL);
+ if (mirror_count > 0) {
+ OBD_ALLOC(lo->ldo_mirrors,
+ sizeof(*lo->ldo_mirrors) * mirror_count);
+ if (!lo->ldo_mirrors)
+ return -ENOMEM;
+
+ lo->ldo_mirror_count = mirror_count;
+ }
+
OBD_ALLOC_LARGE(lo->ldo_comp_entries,
- sizeof(*lo->ldo_comp_entries) * cnt);
- if (lo->ldo_comp_entries == NULL)
+ sizeof(*lo->ldo_comp_entries) * comp_count);
+ if (lo->ldo_comp_entries == NULL) {
+ OBD_FREE(lo->ldo_mirrors,
+ sizeof(*lo->ldo_mirrors) * mirror_count);
+ lo->ldo_mirror_count = 0;
return -ENOMEM;
- lo->ldo_comp_cnt = cnt;
+ }
+
+ lo->ldo_comp_cnt = comp_count;
return 0;
}
+int lod_fill_mirrors(struct lod_object *lo)
+{
+ struct lod_layout_component *lod_comp;
+ int mirror_idx = -1;
+ __u16 mirror_id = 0xffff;
+ int i;
+ ENTRY;
+
+ LASSERT(equi(!lo->ldo_is_composite, lo->ldo_mirror_count == 0));
+
+ if (!lo->ldo_is_composite)
+ RETURN(0);
+
+ lod_comp = &lo->ldo_comp_entries[0];
+ for (i = 0; i < lo->ldo_comp_cnt; i++, lod_comp++) {
+ int stale = !!(lod_comp->llc_flags & LCME_FL_STALE);
+
+ if (mirror_id_of(lod_comp->llc_id) == mirror_id) {
+ lo->ldo_mirrors[mirror_idx].lme_stale |= stale;
+ lo->ldo_mirrors[mirror_idx].lme_end = i;
+ continue;
+ }
+
+ /* new mirror */
+ ++mirror_idx;
+ if (mirror_idx >= lo->ldo_mirror_count)
+ RETURN(-EINVAL);
+
+ mirror_id = mirror_id_of(lod_comp->llc_id);
+
+ lo->ldo_mirrors[mirror_idx].lme_id = mirror_id;
+ lo->ldo_mirrors[mirror_idx].lme_stale = stale;
+ lo->ldo_mirrors[mirror_idx].lme_start = i;
+ lo->ldo_mirrors[mirror_idx].lme_end = i;
+ }
+ if (mirror_idx != lo->ldo_mirror_count - 1)
+ RETURN(-EINVAL);
+
+ RETURN(0);
+}
+
/**
* Generate on-disk lov_mds_md structure for each layout component based on
* the information in lod_object->ldo_comp_entries[i].
}
/**
- * Generate component ID for new created component.
- *
- * \param[in] lo LOD object
- * \param[in] comp_idx index of ldo_comp_entries
- *
- * \retval component ID on success
- * \retval LCME_ID_INVAL on failure
- */
-static __u32 lod_gen_component_id(struct lod_object *lo, int comp_idx)
-{
- struct lod_layout_component *lod_comp;
- __u32 id, start, end;
- int i;
-
- LASSERT(lo->ldo_comp_entries[comp_idx].llc_id == LCME_ID_INVAL);
-
- lod_obj_inc_layout_gen(lo);
- id = lo->ldo_layout_gen;
- if (likely(id <= LCME_ID_MAX))
- return id;
-
- /* Layout generation wraps, need to check collisions. */
- start = id & LCME_ID_MASK;
- end = LCME_ID_MAX;
-again:
- for (id = start; id <= end; id++) {
- for (i = 0; i < lo->ldo_comp_cnt; i++) {
- lod_comp = &lo->ldo_comp_entries[i];
- if (id == lod_comp->llc_id)
- break;
- }
- /* Found the ununsed ID */
- if (i == lo->ldo_comp_cnt)
- return id;
- }
- if (end == LCME_ID_MAX) {
- start = 1;
- end = min(lo->ldo_layout_gen & LCME_ID_MASK,
- (__u32)(LCME_ID_MAX - 1));
- goto again;
- }
-
- return LCME_ID_INVAL;
-}
-
-/**
* Generate on-disk lov_mds_md structure based on the information in
* the lod_object->ldo_comp_entries.
*
struct lov_comp_md_entry_v1 *lcme;
struct lov_comp_md_v1 *lcm;
struct lod_layout_component *comp_entries;
- __u16 comp_cnt;
+ __u16 comp_cnt, mirror_cnt;
bool is_composite;
int i, rc = 0, offset;
ENTRY;
if (is_dir) {
comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
+ mirror_cnt = lo->ldo_def_striping->lds_def_mirror_cnt;
comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
is_composite =
lo->ldo_def_striping->lds_def_striping_is_composite;
} else {
comp_cnt = lo->ldo_comp_cnt;
+ mirror_cnt = lo->ldo_mirror_count;
comp_entries = lo->ldo_comp_entries;
is_composite = lo->ldo_is_composite;
}
lcm = (struct lov_comp_md_v1 *)lmm;
lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
lcm->lcm_entry_count = cpu_to_le16(comp_cnt);
- lcm->lcm_mirror_count = cpu_to_le16(lo->ldo_mirror_count);
+ lcm->lcm_mirror_count = cpu_to_le16(mirror_cnt - 1);
lcm->lcm_flags = cpu_to_le16(lo->ldo_flr_state);
offset = sizeof(*lcm) + sizeof(*lcme) * comp_cnt;
lod_comp = &comp_entries[i];
lcme = &lcm->lcm_entries[i];
- if (lod_comp->llc_id == LCME_ID_INVAL && !is_dir) {
- lod_comp->llc_id = lod_gen_component_id(lo, i);
- if (lod_comp->llc_id == LCME_ID_INVAL)
- GOTO(out, rc = -ERANGE);
- }
+ LASSERT(ergo(!is_dir, lod_comp->llc_id != LCME_ID_INVAL));
lcme->lcme_id = cpu_to_le32(lod_comp->llc_id);
/* component could be un-inistantiated */
__u32 magic, pattern;
int i, j, rc = 0;
__u16 comp_cnt;
+ __u16 mirror_cnt = 0;
ENTRY;
LASSERT(buf);
lo->ldo_is_composite = 1;
lo->ldo_flr_state = le16_to_cpu(comp_v1->lcm_flags) &
LCM_FL_FLR_MASK;
- lo->ldo_mirror_count = le16_to_cpu(comp_v1->lcm_mirror_count);
+ mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
} else {
comp_cnt = 1;
lo->ldo_layout_gen = le16_to_cpu(lmm->lmm_layout_gen);
lo->ldo_is_composite = 0;
}
- rc = lod_alloc_comp_entries(lo, comp_cnt);
+ rc = lod_alloc_comp_entries(lo, mirror_cnt, comp_cnt);
if (rc)
GOTO(out, rc);
GOTO(out, rc);
}
}
+
+ rc = lod_fill_mirrors(lo);
+ if (rc)
+ GOTO(out, rc);
+
out:
if (rc)
lod_object_free_striping(env, lo);
struct thandle *th)
{
struct lod_thread_info *info = lod_env_info(env);
- struct lod_layout_component *comp_array, *lod_comp;
+ struct lod_layout_component *comp_array, *lod_comp, *old_array;
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
struct dt_object *next = dt_object_child(dt);
struct lov_desc *desc = &d->lod_desc;
struct lov_user_md_v3 *v3;
struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
__u32 magic;
- int i, rc, array_cnt;
+ int i, rc, array_cnt, old_array_cnt;
ENTRY;
LASSERT(lo->ldo_is_composite);
+ if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+ RETURN(-EBUSY);
+
rc = lod_verify_striping(d, lo, buf, false);
if (rc != 0)
RETURN(rc);
}
}
- OBD_FREE(lo->ldo_comp_entries, sizeof(*lod_comp) * lo->ldo_comp_cnt);
+ old_array = lo->ldo_comp_entries;
+ old_array_cnt = lo->ldo_comp_cnt;
+
lo->ldo_comp_entries = comp_array;
lo->ldo_comp_cnt = array_cnt;
info->lti_buf.lb_len = lod_comp_md_size(lo, false);
rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf,
XATTR_NAME_LOV, 0, th);
- if (rc)
+ if (rc) {
+ lo->ldo_comp_entries = old_array;
+ lo->ldo_comp_cnt = old_array_cnt;
GOTO(error, rc);
+ }
+
+ OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt);
+
+ LASSERT(lo->ldo_mirror_count == 1);
+ lo->ldo_mirrors[0].lme_end = array_cnt - 1;
RETURN(0);
LASSERT(lo->ldo_is_composite);
+ if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+ RETURN(-EBUSY);
+
magic = comp_v1->lcm_magic;
if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
lustre_swab_lov_comp_md_v1(comp_v1);
sizeof(*comp_array) * lo->ldo_comp_cnt);
lo->ldo_comp_entries = comp_array;
lo->ldo_comp_cnt = left;
+
+ LASSERT(lo->ldo_mirror_count == 1);
+ lo->ldo_mirrors[0].lme_end = left - 1;
lod_obj_inc_layout_gen(lo);
} else {
lod_free_comp_entries(lo);
struct lov_user_md_v3 *v3 = NULL;
struct lov_comp_md_v1 *comp_v1 = NULL;
__u16 comp_cnt;
+ __u16 mirror_cnt;
bool composite;
int rc, i;
ENTRY;
comp_cnt = comp_v1->lcm_entry_count;
if (comp_cnt == 0)
RETURN(-EINVAL);
+ mirror_cnt = comp_v1->lcm_mirror_count + 1;
composite = true;
} else {
comp_cnt = 1;
+ mirror_cnt = 0;
composite = false;
}
RETURN(rc);
lds->lds_def_comp_cnt = comp_cnt;
- lds->lds_def_striping_is_composite = composite ? 1 : 0;
+ lds->lds_def_striping_is_composite = composite;
+ lds->lds_def_mirror_cnt = mirror_cnt;
for (i = 0; i < comp_cnt; i++) {
struct lod_layout_component *lod_comp;
int i, rc;
if (lds->lds_def_striping_set && S_ISREG(mode)) {
- rc = lod_alloc_comp_entries(lo, lds->lds_def_comp_cnt);
+ rc = lod_alloc_comp_entries(lo, lds->lds_def_mirror_cnt,
+ lds->lds_def_comp_cnt);
if (rc != 0)
return;
if (lod_need_inherit_more(lc, false)) {
if (lc->ldo_comp_cnt == 0) {
- rc = lod_alloc_comp_entries(lc, 1);
+ rc = lod_alloc_comp_entries(lc, 0, 1);
if (rc)
/* fail to allocate memory, will create a
* non-striped file. */
}
/**
+ * Generate component ID for new created component.
+ *
+ * \param[in] lo LOD object
+ * \param[in] comp_idx index of ldo_comp_entries
+ *
+ * \retval component ID on success
+ * \retval LCME_ID_INVAL on failure
+ */
+static __u32 lod_gen_component_id(struct lod_object *lo,
+ int mirror_id, int comp_idx)
+{
+ struct lod_layout_component *lod_comp;
+ __u32 id, start, end;
+ int i;
+
+ LASSERT(lo->ldo_comp_entries[comp_idx].llc_id == LCME_ID_INVAL);
+
+ lod_obj_inc_layout_gen(lo);
+ id = lo->ldo_layout_gen;
+ if (likely(id <= SEQ_ID_MAX))
+ RETURN(pflr_id(mirror_id, id & SEQ_ID_MASK));
+
+ /* Layout generation wraps, need to check collisions. */
+ start = id & SEQ_ID_MASK;
+ end = SEQ_ID_MAX;
+again:
+ for (id = start; id <= end; id++) {
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ lod_comp = &lo->ldo_comp_entries[i];
+ if (pflr_id(mirror_id, id) == lod_comp->llc_id)
+ break;
+ }
+ /* Found the ununsed ID */
+ if (i == lo->ldo_comp_cnt)
+ RETURN(pflr_id(mirror_id, id));
+ }
+ if (end == LCME_ID_MAX) {
+ start = 1;
+ end = min(lo->ldo_layout_gen & LCME_ID_MASK,
+ (__u32)(LCME_ID_MAX - 1));
+ goto again;
+ }
+
+ RETURN(LCME_ID_INVAL);
+}
+
+/**
* Creation of a striped regular object.
*
* The function is called to create the stripe objects for a regular
for (i = 0; i < lo->ldo_comp_cnt; i++) {
lod_comp = &lo->ldo_comp_entries[i];
+ if (lod_comp->llc_id == LCME_ID_INVAL) {
+ lod_comp->llc_id = lod_gen_component_id(lo, 0, i);
+ if (lod_comp->llc_id == LCME_ID_INVAL)
+ GOTO(out, rc = -ERANGE);
+ }
+
if (lod_comp_inited(lod_comp))
continue;
LASSERT(object != NULL);
rc = lod_sub_create(env, object, attr, NULL, dof, th);
if (rc)
- break;
+ GOTO(out, rc);
}
lod_comp_set_init(lod_comp);
}
- if (rc == 0)
- rc = lod_generate_and_set_lovea(env, lo, th);
+ rc = lod_fill_mirrors(lo);
+ if (rc)
+ GOTO(out, rc);
- if (rc == 0)
- lo->ldo_comp_cached = 1;
- else
- lod_object_free_striping(env, lo);
+ rc = lod_generate_and_set_lovea(env, lo, th);
+ if (rc)
+ GOTO(out, rc);
+
+ lo->ldo_comp_cached = 1;
+ RETURN(0);
+out:
+ lod_object_free_striping(env, lo);
RETURN(rc);
}
struct lov_ost_data_v1 *objs;
__u32 magic;
__u16 comp_cnt;
+ __u16 mirror_cnt;
int rc = 0, i;
ENTRY;
comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count);
if (comp_cnt == 0)
RETURN(-EINVAL);
+ mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
mo->ldo_is_composite = 1;
} else {
mo->ldo_is_composite = 0;
comp_cnt = 1;
+ mirror_cnt = 0;
}
+ mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen);
- rc = lod_alloc_comp_entries(mo, comp_cnt);
+ rc = lod_alloc_comp_entries(mo, mirror_cnt, comp_cnt);
if (rc)
RETURN(rc);
GOTO(out, rc);
}
}
+
+ rc = lod_fill_mirrors(mo);
+ if (rc)
+ GOTO(out, rc);
out:
if (rc)
lod_object_free_striping(env, mo);
struct lov_comp_md_v1 *comp_v1 = NULL;
__u32 magic;
__u16 comp_cnt;
+ __u16 mirror_cnt;
int i, rc;
ENTRY;
comp_cnt = comp_v1->lcm_entry_count;
if (comp_cnt == 0)
RETURN(-EINVAL);
+ mirror_cnt = comp_v1->lcm_mirror_count + 1;
lo->ldo_is_composite = 1;
} else {
comp_cnt = 1;
+ mirror_cnt = 0;
lo->ldo_is_composite = 0;
}
- rc = lod_alloc_comp_entries(lo, comp_cnt);
+ rc = lod_alloc_comp_entries(lo, mirror_cnt, comp_cnt);
if (rc)
RETURN(rc);