RETURN(rc);
}
+inline __u16 lod_comp_entry_stripecnt(struct lod_object *lo,
+ struct lod_layout_component *entry,
+ bool is_dir)
+{
+ if (is_dir)
+ return 0;
+ else if (lod_comp_inited(entry))
+ return entry->llc_stripenr;
+ else
+ return lod_get_stripecnt(lu2lod_dev(lod2lu_obj(lo)->lo_dev), lo,
+ entry->llc_stripenr);
+}
+
+static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
+{
+ int magic, size = 0, i;
+ struct lod_layout_component *comp_entries;
+ __u16 comp_cnt;
+ bool is_composite;
+
+ if (is_dir) {
+ comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
+ comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
+ is_composite =
+ lo->ldo_def_striping->lds_def_striping_is_composite;
+ } else {
+ comp_cnt = lo->ldo_comp_cnt;
+ comp_entries = lo->ldo_comp_entries;
+ is_composite = lo->ldo_is_composite;
+ }
+
+
+ LASSERT(comp_cnt != 0 && comp_entries != NULL);
+ if (is_composite) {
+ size = sizeof(struct lov_comp_md_v1) +
+ sizeof(struct lov_comp_md_entry_v1) * comp_cnt;
+ LASSERT(size % sizeof(__u64) == 0);
+ }
+
+ for (i = 0; i < comp_cnt; i++) {
+ __u16 stripenr;
+
+ magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
+ stripenr = lod_comp_entry_stripecnt(lo, &comp_entries[i],
+ is_dir);
+ size += lov_user_md_size(stripenr, magic);
+ LASSERT(size % sizeof(__u64) == 0);
+ }
+ return size;
+}
+
/**
* Declare component add. The xattr name is XATTR_LUSTRE_LOV.add, and
* the xattr value is binary lov_comp_md_v1 which contains component(s)
const struct lu_buf *buf,
struct thandle *th)
{
+ struct lod_thread_info *info = lod_env_info(env);
struct lod_layout_component *comp_array, *lod_comp;
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+ struct dt_object *next = dt_object_child(dt);
struct lov_desc *desc = &d->lod_desc;
struct lod_object *lo = lod_dt_obj(dt);
- struct lov_user_md_v1 *v1;
struct lov_user_md_v3 *v3;
struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
- struct lu_extent *ext;
__u32 magic;
__u64 prev_end;
int i, rc, array_cnt;
LASSERT(lo->ldo_is_composite);
- magic = comp_v1->lcm_magic;
- /* Replay request, see comment for LOV_MAGIC_DEF */
- if (unlikely(le32_to_cpu(magic) == LOV_MAGIC_COMP_V1_DEF)) {
- struct dt_object *next = dt_object_child(dt);
-
- lod_object_free_striping(env, lo);
- rc = lod_use_defined_striping(env, lo, buf);
- if (rc == 0) {
- lo->ldo_comp_cached = 1;
- rc = lod_sub_object_declare_xattr_set(env, next, buf,
- XATTR_NAME_LOV,
- 0, th);
- }
- RETURN(rc);
- }
-
prev_end = lo->ldo_comp_entries[lo->ldo_comp_cnt - 1].llc_extent.e_end;
rc = lod_verify_striping(d, buf, false, prev_end);
if (rc != 0)
RETURN(rc);
+ magic = comp_v1->lcm_magic;
if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
lustre_swab_lov_comp_md_v1(comp_v1);
magic = comp_v1->lcm_magic;
sizeof(*comp_array) * lo->ldo_comp_cnt);
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+ struct lov_user_md_v1 *v1;
+ struct lu_extent *ext;
+
v1 = (struct lov_user_md *)((char *)comp_v1 +
comp_v1->lcm_entries[i].lcme_offset);
ext = &comp_v1->lcm_entries[i].lcme_extent;
lod_comp->llc_stripe_offset = v1->lmm_stripe_offset;
lod_comp->llc_stripenr = v1->lmm_stripe_count;
- if (lod_comp->llc_stripenr <= 0)
+ if (!lod_comp->llc_stripenr ||
+ lod_comp->llc_stripenr == (__u16)-1)
lod_comp->llc_stripenr = desc->ld_default_stripe_count;
lod_comp->llc_stripe_size = v1->lmm_stripe_size;
- if (lod_comp->llc_stripe_size <= 0)
+ if (!lod_comp->llc_stripe_size)
lod_comp->llc_stripe_size =
desc->ld_default_stripe_size;
if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
- int len;
v3 = (struct lov_user_md_v3 *) v1;
if (v3->lmm_pool_name[0] != '\0') {
- len = strlen(v3->lmm_pool_name);
- OBD_ALLOC(lod_comp->llc_pool, len + 1);
- if (lod_comp->llc_pool == NULL)
- GOTO(error, rc = -ENOMEM);
- strncpy(lod_comp->llc_pool, v3->lmm_pool_name,
- len + 1);
+ rc = lod_set_pool(&lod_comp->llc_pool,
+ v3->lmm_pool_name);
+ if (rc)
+ GOTO(error, rc);
}
}
}
/* No need to increase layout generation here, it will be increased
* later when generating component ID for the new components */
- rc = lod_declare_striped_object(env, dt, NULL, NULL, th);
- RETURN(rc);
+ info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th);
+ if (rc)
+ GOTO(error, rc);
+
+ RETURN(0);
error:
for (i = lo->ldo_comp_cnt; i < array_cnt; i++) {
RETURN(rc);
}
-static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
-{
- int magic, size = 0, i;
- struct lod_layout_component *comp_entries;
- __u16 comp_cnt;
- bool is_composite;
-
- if (is_dir) {
- comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
- comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
- is_composite =
- lo->ldo_def_striping->lds_def_striping_is_composite;
- } else {
- comp_cnt = lo->ldo_comp_cnt;
- comp_entries = lo->ldo_comp_entries;
- is_composite = lo->ldo_is_composite;
- }
-
-
- LASSERT(comp_cnt != 0 && comp_entries != NULL);
- if (is_composite) {
- size = sizeof(struct lov_comp_md_v1) +
- sizeof(struct lov_comp_md_entry_v1) * comp_cnt;
- LASSERT(size % sizeof(__u64) == 0);
- }
-
- for (i = 0; i < comp_cnt; i++) {
- magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
-
- size += lov_user_md_size(
- is_dir ? 0 : comp_entries[i].llc_stripenr,
- magic);
- LASSERT(size % sizeof(__u64) == 0);
- }
- return size;
-}
-
/**
* Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field,
* the '$field' can only be 'flags' now. The xattr value is binary
lod_comp->llc_stripe = NULL;
lod_comp->llc_stripes_allocated = 0;
lod_obj_set_pool(lo, i, NULL);
+ if (lod_comp->llc_ostlist.op_array) {
+ OBD_FREE(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ lod_comp->llc_ostlist.op_array = NULL;
+ lod_comp->llc_ostlist.op_size = 0;
+ }
}
LASSERTF(left >= 0 && left < lo->ldo_comp_cnt, "left = %d\n", left);
for (i = 0; i < lo->ldo_comp_cnt; i++) {
lod_comp = &lo->ldo_comp_entries[i];
- if (lod_comp->llc_flags & LCME_FL_INIT)
+ if (lod_comp_inited(lod_comp))
continue;
- lod_comp->llc_flags |= LCME_FL_INIT;
+ if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
+ lod_comp_set_init(lod_comp);
if (lod_comp->llc_stripe == NULL)
continue;
- LASSERT(lod_comp->llc_stripenr > 0);
+ LASSERT(lod_comp->llc_stripenr);
for (j = 0; j < lod_comp->llc_stripenr; j++) {
struct dt_object *object = lod_comp->llc_stripe[j];
LASSERT(object != NULL);
if (rc)
break;
}
+ lod_comp_set_init(lod_comp);
}
if (rc == 0)
return dt_invalidate(env, dt_object_child(dt));
}
+/**
+ * Resize per-thread ost list to hold OST target index list already used.
+ *
+ * \param[in,out] inuse structure contains ost list array
+ * \param[in] cnt total stripe count of all components
+ * \param[in] max array's max size if @max > 0
+ *
+ * \retval 0 on success
+ * \retval -ENOMEM reallocation failed
+ */
+int lod_inuse_resize(struct ost_pool *inuse, __u16 cnt, __u16 max)
+{
+ __u32 *array;
+ __u32 new = cnt * sizeof(__u32);
+
+ inuse->op_count = 0;
+
+ if (new <= inuse->op_size)
+ return 0;
+
+ if (max)
+ new = min_t(__u32, new, max);
+ OBD_ALLOC(array, new);
+ if (!array)
+ return -ENOMEM;
+
+ if (inuse->op_array)
+ OBD_FREE(inuse->op_array, inuse->op_size);
+
+ inuse->op_array = array;
+ inuse->op_size = new;
+
+ return 0;
+}
+
+static int lod_declare_layout_change(const struct lu_env *env,
+ struct dt_object *dt,
+ struct layout_intent *layout,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+ struct dt_object *next = dt_object_child(dt);
+ struct lod_obj_stripe_cb_data data;
+ struct ost_pool *inuse = &info->lti_inuse_osts;
+ struct lod_layout_component *lod_comp;
+ struct lov_comp_md_v1 *comp_v1 = NULL;
+ bool replay = false;
+ bool need_create = false;
+ int i, rc;
+ __u32 stripe_cnt = 0;
+ ENTRY;
+
+ if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
+ dt_object_remote(next))
+ RETURN(-EINVAL);
+
+ dt_write_lock(env, next, 0);
+ /*
+ * In case the client is passing lovea, which only happens during
+ * the replay of layout intent write RPC for now, we may need to
+ * parse the lovea and apply new layout configuration.
+ */
+ if (buf && buf->lb_len) {
+ struct lov_user_md_v1 *v1 = buf->lb_buf;
+
+ if (v1->lmm_magic != (LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1) &&
+ v1->lmm_magic !=
+ __swab32(LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1)) {
+ CERROR("%s: the replay buffer of layout extend "
+ "(magic %#x) does not contain expected "
+ "composite layout.\n",
+ lod2obd(d)->obd_name, v1->lmm_magic);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ lod_object_free_striping(env, lo);
+ rc = lod_use_defined_striping(env, lo, buf);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_get_lov_ea(env, lo);
+ if (rc <= 0)
+ GOTO(out, rc);
+ /* old on-disk EA is stored in info->lti_buf */
+ comp_v1 = (struct lov_comp_md_v1 *)&info->lti_buf.lb_buf;
+ replay = true;
+ } else {
+ /* non replay path */
+ rc = lod_load_striping_locked(env, lo);
+ if (rc)
+ GOTO(out, rc);
+
+ /* Prepare inuse array for composite file */
+ for (i = 0; i < lo->ldo_comp_cnt; i++)
+ stripe_cnt += lod_comp_entry_stripecnt(lo,
+ &lo->ldo_comp_entries[i],
+ false);
+ rc = lod_inuse_resize(inuse, stripe_cnt, d->lod_osd_max_easize);
+ if (rc)
+ GOTO(out, rc);
+
+ data.locd_inuse = inuse;
+ rc = lod_obj_for_each_stripe(env, lo, NULL,
+ lod_obj_stripe_set_inuse_cb,
+ &data);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ /* Make sure defined layout covers the requested write range. */
+ lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1];
+ if ((lod_comp->llc_extent.e_end != OBD_OBJECT_EOF &&
+ lod_comp->llc_extent.e_end < layout->li_end)) {
+ CDEBUG(replay ? D_ERROR : D_LAYOUT,
+ "%s: the defined layout [0, %#llx) does not covers "
+ "the write range [%#llx, %#llx).\n",
+ lod2obd(d)->obd_name, lod_comp->llc_extent.e_end,
+ layout->li_start, layout->li_end);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ /*
+ * Iterate ld->ldo_comp_entries, find the component whose extent under
+ * the write range and not instantianted.
+ */
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ lod_comp = &lo->ldo_comp_entries[i];
+
+ if (lod_comp->llc_extent.e_start >= layout->li_end)
+ break;
+
+ if (!replay) {
+ if (lod_comp_inited(lod_comp))
+ continue;
+ } else {
+ /**
+ * In replay path, lod_comp is the EA passed by
+ * client replay buffer, comp_v1 is the pre-recovery
+ * on-disk EA, we'd sift out those components which
+ * were init-ed in the on-disk EA.
+ */
+ if (le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags) &
+ LCME_FL_INIT)
+ continue;
+ }
+ /*
+ * this component hasn't instantiated in normal path, or during
+ * replay it needs replay the instantiation.
+ */
+
+ /* A released component is being extended */
+ if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
+ GOTO(out, rc = -EINVAL);
+
+ need_create = true;
+ /*
+ * In replay, the component EA is passed by client,
+ * Clear LCME_FL_INIT so that lod_striping_create() can create
+ * the striping objects.
+ */
+ if (replay)
+ lod_comp_unset_init(lod_comp);
+
+ rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse);
+ if (rc)
+ break;
+ }
+
+ if (need_create)
+ lod_obj_inc_layout_gen(lo);
+ else
+ GOTO(unlock, rc = -EALREADY);
+
+ if (!rc) {
+ info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th);
+ }
+out:
+ if (rc)
+ lod_object_free_striping(env, lo);
+
+unlock:
+ dt_write_unlock(env, next);
+
+ RETURN(rc);
+}
+
+/**
+ * Instantiate layout component objects which covers the intent write offset.
+ */
+static int lod_layout_change(const struct lu_env *env, struct dt_object *dt,
+ struct layout_intent *layout,
+ const struct lu_buf *buf, struct thandle *th)
+{
+ struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+
+ RETURN(lod_striping_create(env, dt, attr, NULL, th));
+}
+
struct dt_object_operations lod_obj_ops = {
.do_read_lock = lod_object_read_lock,
.do_write_lock = lod_object_write_lock,
.do_object_lock = lod_object_lock,
.do_object_unlock = lod_object_unlock,
.do_invalidate = lod_invalidate,
+ .do_declare_layout_change = lod_declare_layout_change,
+ .do_layout_change = lod_layout_change,
};
/**