* \retval negative negated errno on error
*/
int (*do_invalidate)(const struct lu_env *env, struct dt_object *dt);
+
+ /**
+ * Declare intention to instaintiate extended layout component.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt DT object
+ * \param[in] layout data structure to describe the changes to
+ * the DT object's layout
+ * \param[in] buf buffer containing client's lovea or empty
+ *
+ * \retval 0 success
+ * \retval -ne error code
+ */
+ int (*do_declare_layout_change)(const struct lu_env *env,
+ struct dt_object *dt,
+ struct layout_intent *layout,
+ const struct lu_buf *buf,
+ struct thandle *th);
+
+ /**
+ * Client is trying to write to un-instantiated layout component.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt DT object
+ * \param[in] layout data structure to describe the changes to
+ * the DT object's layout
+ * \param[in] buf buffer containing client's lovea or empty
+ *
+ * \retval 0 success
+ * \retval -ne error code
+ */
+ int (*do_layout_change)(const struct lu_env *env, struct dt_object *dt,
+ struct layout_intent *layout,
+ const struct lu_buf *buf, struct thandle *th);
};
/**
return ret;
}
+static inline int dt_declare_layout_change(const struct lu_env *env,
+ struct dt_object *o,
+ struct layout_intent *layout,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ LASSERT(o);
+ LASSERT(o->do_ops);
+ LASSERT(o->do_ops->do_declare_layout_change);
+ return o->do_ops->do_declare_layout_change(env, o, layout, buf, th);
+}
+
+static inline int dt_layout_change(const struct lu_env *env,
+ struct dt_object *o,
+ struct layout_intent *layout,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ LASSERT(o);
+ LASSERT(o->do_ops);
+ LASSERT(o->do_ops->do_layout_change);
+ return o->do_ops->do_layout_change(env, o, layout, buf, th);
+}
+
struct dt_find_hint {
struct lu_fid *dfh_fid;
struct dt_device *dfh_dt;
union ldlm_policy_data *policy);
int (*moo_invalidate)(const struct lu_env *env, struct md_object *obj);
+ /**
+ * Trying to write to un-instantiated layout component.
+ *
+ * The caller should have held layout lock.
+ *
+ * \param[in] env execution environment
+ * \param[in] obj MD object
+ * \param[in] layout data structure to describe the changes to
+ * the MD object's layout
+ * \param[in] buf buffer containing the client's lovea
+ *
+ * \retval 0 success
+ * \retval -ne error code
+ */
+ int (*moo_layout_change)(const struct lu_env *env,
+ struct md_object *obj,
+ struct layout_intent *layout,
+ const struct lu_buf *buf);
};
/**
return m->mo_ops->moo_invalidate(env, m);
}
+static inline int mo_layout_change(const struct lu_env *env,
+ struct md_object *m,
+ struct layout_intent *layout,
+ const struct lu_buf *buf)
+{
+ CDEBUG(D_INFO, "got layout change request from client: "
+ "opc:%u flags:%#x extent[%#llx,%#llx)\n",
+ layout->li_opc, layout->li_flags,
+ layout->li_start, layout->li_end);
+ /* need instantiate objects which in the access range */
+ LASSERT(m->mo_ops->moo_layout_change);
+ return m->mo_ops->moo_layout_change(env, m, layout, buf);
+}
+
static inline int mo_swap_layouts(const struct lu_env *env,
struct md_object *o1,
struct md_object *o2, __u64 flags)
struct lod_thread_info *info = data;
struct lod_layout_component *lds =
info->lti_def_striping.lds_def_comp_entries;
+ struct ost_pool *inuse = &info->lti_inuse_osts;
/* allocated in lod_get_lov_ea
* XXX: this is overload, a tread may have such store but used only
if (lds != NULL)
lod_free_def_comp_entries(&info->lti_def_striping);
+ if (inuse->op_size)
+ OBD_FREE(inuse->op_array, inuse->op_size);
+
OBD_FREE_PTR(info);
}
__u16 llc_stripenr;
__u16 llc_stripes_allocated;
char *llc_pool;
+ /* ost list specified with LOV_USER_MAGIC_SPECIFIC lum */
+ struct ost_pool llc_ostlist;
struct dt_object **llc_stripe;
};
struct lu_attr lti_attr;
struct lod_it lti_it;
struct ldlm_res_id lti_res_id;
+ struct ost_pool lti_inuse_osts;
/* used to hold lu_dirent, sizeof(struct lu_dirent) + NAME_MAX */
char lti_key[sizeof(struct lu_dirent) +
NAME_MAX];
return lod_get_ea(env, lo, XATTR_NAME_DEFAULT_LMV);
}
+static inline void
+lod_comp_set_init(struct lod_layout_component *entry)
+{
+ entry->llc_flags |= LCME_FL_INIT;
+}
+
+static inline void
+lod_comp_unset_init(struct lod_layout_component *entry)
+{
+ entry->llc_flags &= ~LCME_FL_INIT;
+}
+
+static inline bool
+lod_comp_inited(const struct lod_layout_component *entry)
+{
+ return entry->llc_flags & LCME_FL_INIT;
+}
+
void lod_fix_desc(struct lov_desc *desc);
void lod_fix_desc_qos_maxage(__u32 *val);
void lod_fix_desc_pattern(__u32 *val);
int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname);
int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname);
+struct lod_obj_stripe_cb_data {
+ union {
+ const struct lu_attr *locd_attr;
+ struct ost_pool *locd_inuse;
+ };
+ bool locd_declare;
+};
+
+typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env,
+ struct lod_object *lo, struct dt_object *dt,
+ struct thandle *th, int stripe_idx,
+ struct lod_obj_stripe_cb_data *data);
/* lod_qos.c */
int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
struct lu_attr *attr, const struct lu_buf *buf,
void lod_qos_rr_init(struct lod_qos_rr *lqr);
int lod_use_defined_striping(const struct lu_env *, struct lod_object *,
const struct lu_buf *);
+int lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo,
+ struct dt_object *dt, struct thandle *th,
+ int stripe_idx,
+ struct lod_obj_stripe_cb_data *data);
+int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
+ const struct lu_buf *buf);
+int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
+ struct lu_attr *attr, struct thandle *th,
+ int comp_idx, struct ost_pool *inuse);
+__u16 lod_comp_entry_stripecnt(struct lod_object *lo,
+ struct lod_layout_component *entry,
+ bool is_dir);
+__u16 lod_get_stripecnt(struct lod_device *lod, struct lod_object *lo,
+ __u16 stripe_count);
/* lproc_lod.c */
int lod_procfs_init(struct lod_device *lod);
struct thandle *th);
void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo);
-struct lod_obj_stripe_cb_data {
- union {
- const struct lu_attr *locd_attr;
- struct ost_pool *locd_inuse;
- };
- bool locd_declare;
-};
-
-typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env,
- struct lod_object *lo, struct dt_object *dt,
- struct thandle *th, int stripe_idx,
- struct lod_obj_stripe_cb_data *data);
-
int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
struct thandle *th, lod_obj_stripe_cb_t cb,
struct lod_obj_stripe_cb_data *data);
entry = &entries[i];
if (entry->llc_pool != NULL)
lod_set_pool(&entry->llc_pool, NULL);
+ if (entry->llc_ostlist.op_array)
+ OBD_FREE(entry->llc_ostlist.op_array,
+ entry->llc_ostlist.op_size);
LASSERT(entry->llc_stripe == NULL);
LASSERT(entry->llc_stripes_allocated == 0);
}
struct lov_ost_data_v1 *objs;
struct lod_layout_component *lod_comp;
__u32 magic;
+ __u16 stripecnt;
int i, rc = 0;
ENTRY;
lmm->lmm_stripe_size = cpu_to_le32(lod_comp->llc_stripe_size);
lmm->lmm_stripe_count = cpu_to_le16(lod_comp->llc_stripenr);
- /* for dir, lmm_layout_gen stores default stripe offset. */
- lmm->lmm_layout_gen = is_dir ?
+ /**
+ * for dir and uninstantiated component, lmm_layout_gen stores
+ * default stripe offset.
+ */
+ lmm->lmm_layout_gen =
+ (is_dir || !lod_comp_inited(lod_comp)) ?
cpu_to_le16(lod_comp->llc_stripe_offset) :
cpu_to_le16(lod_comp->llc_layout_gen);
RETURN(-E2BIG);
objs = &v3->lmm_objects[0];
}
+ stripecnt = lod_comp_entry_stripecnt(lo, lod_comp, is_dir);
if (is_dir || lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
GOTO(done, rc = 0);
+ /* generate ost_idx of this component stripe */
lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
- for (i = 0; i < lod_comp->llc_stripenr; i++) {
- struct dt_object *object;
- __u32 ost_idx;
- int type = LU_SEQ_RANGE_OST;
-
- object = lod_comp->llc_stripe[i];
- LASSERT(object != NULL);
- info->lti_fid = *lu_object_fid(&object->do_lu);
-
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MULTIPLE_REF) &&
- comp_idx == 0) {
- if (cfs_fail_val == 0)
- cfs_fail_val = info->lti_fid.f_oid;
- else if (i == 0)
- info->lti_fid.f_oid = cfs_fail_val;
- }
+ for (i = 0; i < stripecnt; i++) {
+ struct dt_object *object;
+ __u32 ost_idx = (__u32)-1UL;
+ int type = LU_SEQ_RANGE_OST;
+
+ if (lod_comp->llc_stripe && lod_comp->llc_stripe[i]) {
+ object = lod_comp->llc_stripe[i];
+ /* instantiated component */
+ info->lti_fid = *lu_object_fid(&object->do_lu);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MULTIPLE_REF) &&
+ comp_idx == 0) {
+ if (cfs_fail_val == 0)
+ cfs_fail_val = info->lti_fid.f_oid;
+ else if (i == 0)
+ info->lti_fid.f_oid = cfs_fail_val;
+ }
- rc = fid_to_ostid(&info->lti_fid, &info->lti_ostid);
- LASSERT(rc == 0);
-
- ostid_cpu_to_le(&info->lti_ostid, &objs[i].l_ost_oi);
- objs[i].l_ost_gen = cpu_to_le32(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FLD_LOOKUP))
- rc = -ENOENT;
- else
- rc = lod_fld_lookup(env, lod, &info->lti_fid,
- &ost_idx, &type);
- if (rc < 0) {
- CERROR("%s: Can not locate "DFID": rc = %d\n",
- lod2obd(lod)->obd_name, PFID(&info->lti_fid),
- rc);
- RETURN(rc);
+ rc = fid_to_ostid(&info->lti_fid, &info->lti_ostid);
+ LASSERT(rc == 0);
+
+ ostid_cpu_to_le(&info->lti_ostid, &objs[i].l_ost_oi);
+ objs[i].l_ost_gen = cpu_to_le32(0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FLD_LOOKUP))
+ rc = -ENOENT;
+ else
+ rc = lod_fld_lookup(env, lod, &info->lti_fid,
+ &ost_idx, &type);
+ if (rc < 0) {
+ CERROR("%s: Can not locate "DFID": rc = %d\n",
+ lod2obd(lod)->obd_name,
+ PFID(&info->lti_fid), rc);
+ RETURN(rc);
+ }
+ } else if (lod_comp->llc_ostlist.op_array) {
+ /* user specified ost list */
+ ost_idx = lod_comp->llc_ostlist.op_array[i];
}
+ /*
+ * with un-instantiated or with no specified ost list
+ * component, its l_ost_idx does not matter.
+ */
objs[i].l_ost_idx = cpu_to_le32(ost_idx);
}
done:
if (lmm_size != NULL)
- *lmm_size = lov_mds_md_size(is_dir ?
- 0 : lod_comp->llc_stripenr, magic);
+ *lmm_size = lov_mds_md_size(stripecnt, magic);
RETURN(rc);
}
GOTO(out, rc = -ERANGE);
}
lcme->lcme_id = cpu_to_le32(lod_comp->llc_id);
- /* component must has been inistantiated */
- LASSERT(ergo(!is_dir, lod_comp->llc_flags & LCME_FL_INIT));
+
+ /* component could be un-inistantiated */
lcme->lcme_flags = cpu_to_le32(lod_comp->llc_flags);
lcme->lcme_extent.e_start =
cpu_to_le64(lod_comp->llc_extent.e_start);
* \param[in] lo LOD object
* \param[in] name name of the EA
*
- * \retval 0 if EA is fetched successfully
+ * \retval > 0 if EA is fetched successfully
+ * \retval 0 if EA is empty
* \retval negative error number on failure
*/
int lod_get_ea(const struct lu_env *env, struct lod_object *lo,
struct lov_comp_md_v1 *comp_v1 = NULL;
struct lov_ost_data_v1 *objs;
__u32 magic, pattern;
- int i, rc = 0;
+ int i, j, rc = 0;
__u16 comp_cnt;
ENTRY;
if (lod_comp->llc_id == LCME_ID_INVAL)
GOTO(out, rc = -EINVAL);
} else {
- lod_comp->llc_flags = LCME_FL_INIT;
+ lod_comp_set_init(lod_comp);
}
pattern = le32_to_cpu(lmm->lmm_pattern);
objs = &lmm->lmm_objects[0];
}
+ /**
+ * If uninstantiated template component has valid l_ost_idx,
+ * then use has specified ost list for this component.
+ */
+ if (!lod_comp_inited(lod_comp) &&
+ objs[0].l_ost_idx != (__u32)-1UL) {
+ /**
+ * load the user specified ost list, when this
+ * component is instantiated later, it will be used
+ * in lod_alloc_ost_list().
+ */
+ lod_comp->llc_ostlist.op_count = lod_comp->llc_stripenr;
+ lod_comp->llc_ostlist.op_size =
+ lod_comp->llc_stripenr * sizeof(__u32);
+ OBD_ALLOC(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ if (!lod_comp->llc_ostlist.op_array)
+ GOTO(out, rc = -ENOMEM);
+
+ for (j = 0; j < lod_comp->llc_stripenr; j++)
+ lod_comp->llc_ostlist.op_array[j] =
+ le32_to_cpu(objs[j].l_ost_idx);
+ }
+
+ /* skip un-instantiated component object initialization */
+ if (!lod_comp_inited(lod_comp))
+ continue;
+
if (!(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)) {
rc = lod_initialize_objects(env, lo, objs, i);
if (rc)
RETURN(rc);
}
+inline __u16 lod_comp_entry_stripecnt(struct lod_object *lo,
+ struct lod_layout_component *entry,
+ bool is_dir)
+{
+ if (is_dir)
+ return 0;
+ else if (lod_comp_inited(entry))
+ return entry->llc_stripenr;
+ else
+ return lod_get_stripecnt(lu2lod_dev(lod2lu_obj(lo)->lo_dev), lo,
+ entry->llc_stripenr);
+}
+
+static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
+{
+ int magic, size = 0, i;
+ struct lod_layout_component *comp_entries;
+ __u16 comp_cnt;
+ bool is_composite;
+
+ if (is_dir) {
+ comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
+ comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
+ is_composite =
+ lo->ldo_def_striping->lds_def_striping_is_composite;
+ } else {
+ comp_cnt = lo->ldo_comp_cnt;
+ comp_entries = lo->ldo_comp_entries;
+ is_composite = lo->ldo_is_composite;
+ }
+
+
+ LASSERT(comp_cnt != 0 && comp_entries != NULL);
+ if (is_composite) {
+ size = sizeof(struct lov_comp_md_v1) +
+ sizeof(struct lov_comp_md_entry_v1) * comp_cnt;
+ LASSERT(size % sizeof(__u64) == 0);
+ }
+
+ for (i = 0; i < comp_cnt; i++) {
+ __u16 stripenr;
+
+ magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
+ stripenr = lod_comp_entry_stripecnt(lo, &comp_entries[i],
+ is_dir);
+ size += lov_user_md_size(stripenr, magic);
+ LASSERT(size % sizeof(__u64) == 0);
+ }
+ return size;
+}
+
/**
* Declare component add. The xattr name is XATTR_LUSTRE_LOV.add, and
* the xattr value is binary lov_comp_md_v1 which contains component(s)
const struct lu_buf *buf,
struct thandle *th)
{
+ struct lod_thread_info *info = lod_env_info(env);
struct lod_layout_component *comp_array, *lod_comp;
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+ struct dt_object *next = dt_object_child(dt);
struct lov_desc *desc = &d->lod_desc;
struct lod_object *lo = lod_dt_obj(dt);
- struct lov_user_md_v1 *v1;
struct lov_user_md_v3 *v3;
struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
- struct lu_extent *ext;
__u32 magic;
__u64 prev_end;
int i, rc, array_cnt;
LASSERT(lo->ldo_is_composite);
- magic = comp_v1->lcm_magic;
- /* Replay request, see comment for LOV_MAGIC_DEF */
- if (unlikely(le32_to_cpu(magic) == LOV_MAGIC_COMP_V1_DEF)) {
- struct dt_object *next = dt_object_child(dt);
-
- lod_object_free_striping(env, lo);
- rc = lod_use_defined_striping(env, lo, buf);
- if (rc == 0) {
- lo->ldo_comp_cached = 1;
- rc = lod_sub_object_declare_xattr_set(env, next, buf,
- XATTR_NAME_LOV,
- 0, th);
- }
- RETURN(rc);
- }
-
prev_end = lo->ldo_comp_entries[lo->ldo_comp_cnt - 1].llc_extent.e_end;
rc = lod_verify_striping(d, buf, false, prev_end);
if (rc != 0)
RETURN(rc);
+ magic = comp_v1->lcm_magic;
if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
lustre_swab_lov_comp_md_v1(comp_v1);
magic = comp_v1->lcm_magic;
sizeof(*comp_array) * lo->ldo_comp_cnt);
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+ struct lov_user_md_v1 *v1;
+ struct lu_extent *ext;
+
v1 = (struct lov_user_md *)((char *)comp_v1 +
comp_v1->lcm_entries[i].lcme_offset);
ext = &comp_v1->lcm_entries[i].lcme_extent;
lod_comp->llc_stripe_offset = v1->lmm_stripe_offset;
lod_comp->llc_stripenr = v1->lmm_stripe_count;
- if (lod_comp->llc_stripenr <= 0)
+ if (!lod_comp->llc_stripenr ||
+ lod_comp->llc_stripenr == (__u16)-1)
lod_comp->llc_stripenr = desc->ld_default_stripe_count;
lod_comp->llc_stripe_size = v1->lmm_stripe_size;
- if (lod_comp->llc_stripe_size <= 0)
+ if (!lod_comp->llc_stripe_size)
lod_comp->llc_stripe_size =
desc->ld_default_stripe_size;
if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
- int len;
v3 = (struct lov_user_md_v3 *) v1;
if (v3->lmm_pool_name[0] != '\0') {
- len = strlen(v3->lmm_pool_name);
- OBD_ALLOC(lod_comp->llc_pool, len + 1);
- if (lod_comp->llc_pool == NULL)
- GOTO(error, rc = -ENOMEM);
- strncpy(lod_comp->llc_pool, v3->lmm_pool_name,
- len + 1);
+ rc = lod_set_pool(&lod_comp->llc_pool,
+ v3->lmm_pool_name);
+ if (rc)
+ GOTO(error, rc);
}
}
}
/* No need to increase layout generation here, it will be increased
* later when generating component ID for the new components */
- rc = lod_declare_striped_object(env, dt, NULL, NULL, th);
- RETURN(rc);
+ info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th);
+ if (rc)
+ GOTO(error, rc);
+
+ RETURN(0);
error:
for (i = lo->ldo_comp_cnt; i < array_cnt; i++) {
RETURN(rc);
}
-static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
-{
- int magic, size = 0, i;
- struct lod_layout_component *comp_entries;
- __u16 comp_cnt;
- bool is_composite;
-
- if (is_dir) {
- comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
- comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
- is_composite =
- lo->ldo_def_striping->lds_def_striping_is_composite;
- } else {
- comp_cnt = lo->ldo_comp_cnt;
- comp_entries = lo->ldo_comp_entries;
- is_composite = lo->ldo_is_composite;
- }
-
-
- LASSERT(comp_cnt != 0 && comp_entries != NULL);
- if (is_composite) {
- size = sizeof(struct lov_comp_md_v1) +
- sizeof(struct lov_comp_md_entry_v1) * comp_cnt;
- LASSERT(size % sizeof(__u64) == 0);
- }
-
- for (i = 0; i < comp_cnt; i++) {
- magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
-
- size += lov_user_md_size(
- is_dir ? 0 : comp_entries[i].llc_stripenr,
- magic);
- LASSERT(size % sizeof(__u64) == 0);
- }
- return size;
-}
-
/**
* Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field,
* the '$field' can only be 'flags' now. The xattr value is binary
lod_comp->llc_stripe = NULL;
lod_comp->llc_stripes_allocated = 0;
lod_obj_set_pool(lo, i, NULL);
+ if (lod_comp->llc_ostlist.op_array) {
+ OBD_FREE(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ lod_comp->llc_ostlist.op_array = NULL;
+ lod_comp->llc_ostlist.op_size = 0;
+ }
}
LASSERTF(left >= 0 && left < lo->ldo_comp_cnt, "left = %d\n", left);
for (i = 0; i < lo->ldo_comp_cnt; i++) {
lod_comp = &lo->ldo_comp_entries[i];
- if (lod_comp->llc_flags & LCME_FL_INIT)
+ if (lod_comp_inited(lod_comp))
continue;
- lod_comp->llc_flags |= LCME_FL_INIT;
+ if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
+ lod_comp_set_init(lod_comp);
if (lod_comp->llc_stripe == NULL)
continue;
- LASSERT(lod_comp->llc_stripenr > 0);
+ LASSERT(lod_comp->llc_stripenr);
for (j = 0; j < lod_comp->llc_stripenr; j++) {
struct dt_object *object = lod_comp->llc_stripe[j];
LASSERT(object != NULL);
if (rc)
break;
}
+ lod_comp_set_init(lod_comp);
}
if (rc == 0)
return dt_invalidate(env, dt_object_child(dt));
}
+/**
+ * Resize per-thread ost list to hold OST target index list already used.
+ *
+ * \param[in,out] inuse structure contains ost list array
+ * \param[in] cnt total stripe count of all components
+ * \param[in] max array's max size if @max > 0
+ *
+ * \retval 0 on success
+ * \retval -ENOMEM reallocation failed
+ */
+int lod_inuse_resize(struct ost_pool *inuse, __u16 cnt, __u16 max)
+{
+ __u32 *array;
+ __u32 new = cnt * sizeof(__u32);
+
+ inuse->op_count = 0;
+
+ if (new <= inuse->op_size)
+ return 0;
+
+ if (max)
+ new = min_t(__u32, new, max);
+ OBD_ALLOC(array, new);
+ if (!array)
+ return -ENOMEM;
+
+ if (inuse->op_array)
+ OBD_FREE(inuse->op_array, inuse->op_size);
+
+ inuse->op_array = array;
+ inuse->op_size = new;
+
+ return 0;
+}
+
+static int lod_declare_layout_change(const struct lu_env *env,
+ struct dt_object *dt,
+ struct layout_intent *layout,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+ struct dt_object *next = dt_object_child(dt);
+ struct lod_obj_stripe_cb_data data;
+ struct ost_pool *inuse = &info->lti_inuse_osts;
+ struct lod_layout_component *lod_comp;
+ struct lov_comp_md_v1 *comp_v1 = NULL;
+ bool replay = false;
+ bool need_create = false;
+ int i, rc;
+ __u32 stripe_cnt = 0;
+ ENTRY;
+
+ if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
+ dt_object_remote(next))
+ RETURN(-EINVAL);
+
+ dt_write_lock(env, next, 0);
+ /*
+ * In case the client is passing lovea, which only happens during
+ * the replay of layout intent write RPC for now, we may need to
+ * parse the lovea and apply new layout configuration.
+ */
+ if (buf && buf->lb_len) {
+ struct lov_user_md_v1 *v1 = buf->lb_buf;
+
+ if (v1->lmm_magic != (LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1) &&
+ v1->lmm_magic !=
+ __swab32(LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1)) {
+ CERROR("%s: the replay buffer of layout extend "
+ "(magic %#x) does not contain expected "
+ "composite layout.\n",
+ lod2obd(d)->obd_name, v1->lmm_magic);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ lod_object_free_striping(env, lo);
+ rc = lod_use_defined_striping(env, lo, buf);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_get_lov_ea(env, lo);
+ if (rc <= 0)
+ GOTO(out, rc);
+ /* old on-disk EA is stored in info->lti_buf */
+ comp_v1 = (struct lov_comp_md_v1 *)&info->lti_buf.lb_buf;
+ replay = true;
+ } else {
+ /* non replay path */
+ rc = lod_load_striping_locked(env, lo);
+ if (rc)
+ GOTO(out, rc);
+
+ /* Prepare inuse array for composite file */
+ for (i = 0; i < lo->ldo_comp_cnt; i++)
+ stripe_cnt += lod_comp_entry_stripecnt(lo,
+ &lo->ldo_comp_entries[i],
+ false);
+ rc = lod_inuse_resize(inuse, stripe_cnt, d->lod_osd_max_easize);
+ if (rc)
+ GOTO(out, rc);
+
+ data.locd_inuse = inuse;
+ rc = lod_obj_for_each_stripe(env, lo, NULL,
+ lod_obj_stripe_set_inuse_cb,
+ &data);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ /* Make sure defined layout covers the requested write range. */
+ lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1];
+ if ((lod_comp->llc_extent.e_end != OBD_OBJECT_EOF &&
+ lod_comp->llc_extent.e_end < layout->li_end)) {
+ CDEBUG(replay ? D_ERROR : D_LAYOUT,
+ "%s: the defined layout [0, %#llx) does not covers "
+ "the write range [%#llx, %#llx).\n",
+ lod2obd(d)->obd_name, lod_comp->llc_extent.e_end,
+ layout->li_start, layout->li_end);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ /*
+ * Iterate ld->ldo_comp_entries, find the component whose extent under
+ * the write range and not instantianted.
+ */
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ lod_comp = &lo->ldo_comp_entries[i];
+
+ if (lod_comp->llc_extent.e_start >= layout->li_end)
+ break;
+
+ if (!replay) {
+ if (lod_comp_inited(lod_comp))
+ continue;
+ } else {
+ /**
+ * In replay path, lod_comp is the EA passed by
+ * client replay buffer, comp_v1 is the pre-recovery
+ * on-disk EA, we'd sift out those components which
+ * were init-ed in the on-disk EA.
+ */
+ if (le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags) &
+ LCME_FL_INIT)
+ continue;
+ }
+ /*
+ * this component hasn't instantiated in normal path, or during
+ * replay it needs replay the instantiation.
+ */
+
+ /* A released component is being extended */
+ if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
+ GOTO(out, rc = -EINVAL);
+
+ need_create = true;
+ /*
+ * In replay, the component EA is passed by client,
+ * Clear LCME_FL_INIT so that lod_striping_create() can create
+ * the striping objects.
+ */
+ if (replay)
+ lod_comp_unset_init(lod_comp);
+
+ rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse);
+ if (rc)
+ break;
+ }
+
+ if (need_create)
+ lod_obj_inc_layout_gen(lo);
+ else
+ GOTO(unlock, rc = -EALREADY);
+
+ if (!rc) {
+ info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th);
+ }
+out:
+ if (rc)
+ lod_object_free_striping(env, lo);
+
+unlock:
+ dt_write_unlock(env, next);
+
+ RETURN(rc);
+}
+
+/**
+ * Instantiate layout component objects which covers the intent write offset.
+ */
+static int lod_layout_change(const struct lu_env *env, struct dt_object *dt,
+ struct layout_intent *layout,
+ const struct lu_buf *buf, struct thandle *th)
+{
+ struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+
+ RETURN(lod_striping_create(env, dt, attr, NULL, th));
+}
+
struct dt_object_operations lod_obj_ops = {
.do_read_lock = lod_object_read_lock,
.do_write_lock = lod_object_write_lock,
.do_object_lock = lod_object_lock,
.do_object_unlock = lod_object_unlock,
.do_invalidate = lod_invalidate,
+ .do_declare_layout_change = lod_declare_layout_change,
+ .do_layout_change = lod_layout_change,
};
/**
* \param[in] env execution environment for this thread
* \param[in] lo LOD object
* \param[out] stripe striping created
- * \param[in] lum stripe md to specify list of OSTs
* \param[in] th transaction handle
* \param[in] comp_idx index of ldo_comp_entries
* \param[in|out] inuse array of inuse ost index
* \retval -EINVAL requested OST index is invalid
* \retval negative negated errno on error
*/
-static int lod_alloc_ost_list(const struct lu_env *env,
- struct lod_object *lo, struct dt_object **stripe,
- struct lov_user_md *lum, struct thandle *th,
+static int lod_alloc_ost_list(const struct lu_env *env, struct lod_object *lo,
+ struct dt_object **stripe, struct thandle *th,
int comp_idx, struct ost_pool *inuse)
{
struct lod_layout_component *lod_comp;
struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
struct dt_object *o;
- struct lov_user_md_v3 *v3;
unsigned int array_idx = 0;
int stripe_count = 0;
int i;
ENTRY;
/* for specific OSTs layout */
- LASSERT(lum != NULL && lum->lmm_magic == LOV_USER_MAGIC_SPECIFIC);
- lustre_print_user_md(D_OTHER, lum, __func__);
-
LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
lod_comp = &lo->ldo_comp_entries[comp_idx];
+ LASSERT(lod_comp->llc_ostlist.op_array);
rc = lod_qos_ost_in_use_clear(env, lod_comp->llc_stripenr);
if (rc < 0)
RETURN(rc);
- v3 = (struct lov_user_md_v3 *)lum;
for (i = 0; i < lod_comp->llc_stripenr; i++) {
- if (v3->lmm_objects[i].l_ost_idx ==
- lod_comp->llc_stripe_offset) {
+ if (lod_comp->llc_ostlist.op_array[i] ==
+ lod_comp->llc_stripe_offset) {
array_idx = i;
break;
}
for (i = 0; i < lod_comp->llc_stripenr;
i++, array_idx = (array_idx + 1) % lod_comp->llc_stripenr) {
- __u32 ost_idx = v3->lmm_objects[array_idx].l_ost_idx;
+ __u32 ost_idx = lod_comp->llc_ostlist.op_array[array_idx];
if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx)) {
rc = -ENODEV;
*
* \retval the maximum usable stripe count
*/
-static __u16 lod_get_stripecnt(struct lod_device *lod, struct lod_object *lo,
- __u16 stripe_count)
+__u16 lod_get_stripecnt(struct lod_device *lod, struct lod_object *lo,
+ __u16 stripe_count)
{
__u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD;
* \retval 0 on success
* \retval negative negated errno on error
*/
-static int lod_qos_parse_config(const struct lu_env *env,
- struct lod_object *lo,
- const struct lu_buf *buf)
+int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
+ const struct lu_buf *buf)
{
struct lod_layout_component *lod_comp;
struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
pool_name = NULL;
if (v1->lmm_magic == LOV_USER_MAGIC_V3 ||
v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
- v3 = (struct lov_user_md_v3 *)v1;
+ int j;
+ v3 = (struct lov_user_md_v3 *)v1;
if (v3->lmm_pool_name[0] != '\0')
pool_name = v3->lmm_pool_name;
- if (v3->lmm_magic == LOV_USER_MAGIC_SPECIFIC &&
- v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT)
- v3->lmm_stripe_offset =
- v3->lmm_objects[0].l_ost_idx;
+ if (v3->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
+ if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT)
+ v3->lmm_stripe_offset =
+ v3->lmm_objects[0].l_ost_idx;
+
+ /* copy ost list from lmm */
+ lod_comp->llc_ostlist.op_count =
+ v3->lmm_stripe_count;
+ lod_comp->llc_ostlist.op_size =
+ v3->lmm_stripe_count * sizeof(__u32);
+ OBD_ALLOC(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ if (!lod_comp->llc_ostlist.op_array)
+ GOTO(free_comp, rc = -ENOMEM);
+
+ for (j = 0; j < v3->lmm_stripe_count; j++)
+ lod_comp->llc_ostlist.op_array[j] =
+ v3->lmm_objects[j].l_ost_idx;
+ }
}
if (v1->lmm_pattern == 0)
if (lov_pattern(v1->lmm_pattern) != LOV_PATTERN_RAID0) {
CDEBUG(D_LAYOUT, "%s: invalid pattern: %x\n",
lod2obd(d)->obd_name, v1->lmm_pattern);
- lod_free_comp_entries(lo);
- RETURN(-EINVAL);
+ GOTO(free_comp, rc = -EINVAL);
}
lod_comp->llc_pattern = v1->lmm_pattern;
lod_comp->llc_stripe_size = desc->ld_default_stripe_size;
- if (v1->lmm_stripe_size > 0)
+ if (v1->lmm_stripe_size)
lod_comp->llc_stripe_size = v1->lmm_stripe_size;
lod_comp->llc_stripenr = desc->ld_default_stripe_count;
- if (v1->lmm_stripe_count > 0)
+ if (v1->lmm_stripe_count)
lod_comp->llc_stripenr = v1->lmm_stripe_count;
lod_comp->llc_stripe_offset = v1->lmm_stripe_offset;
CDEBUG(D_LAYOUT, "%s: invalid offset, %u\n",
lod2obd(d)->obd_name,
lod_comp->llc_stripe_offset);
- lod_free_comp_entries(lo);
- RETURN(-EINVAL);
+ GOTO(free_comp, rc = -EINVAL);
}
}
}
RETURN(0);
+
+free_comp:
+ lod_free_comp_entries(lo);
+ RETURN(rc);
}
/**
* Create a striping for an obejct.
*
- * The function creates a new striping for the object. A buffer containing
- * configuration hints can be provided optionally. The function tries QoS
+ * The function creates a new striping for the object. The function tries QoS
* algorithm first unless free space is distributed evenly among OSTs, but
* by default RR algorithm is preferred due to internal concurrency (QoS is
* serialized). The caller must ensure no concurrent calls to the function
* \param[in] env execution environment for this thread
* \param[in] lo LOD object
* \param[in] attr attributes OST objects will be declared with
- * \param[in] buf suggested striping configuration or NULL
* \param[in] th transaction handle
* \param[in] comp_idx index of ldo_comp_entries
- * \param[in|out]inuse array of inuse ost index
+ * \param[in|out] inuse array of inuse ost index
*
* \retval 0 on success
* \retval negative negated errno on error
*/
-static int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
- struct lu_attr *attr, const struct lu_buf *buf,
- struct thandle *th, int comp_idx,
- struct ost_pool *inuse)
+int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
+ struct lu_attr *attr, struct thandle *th,
+ int comp_idx, struct ost_pool *inuse)
{
struct lod_layout_component *lod_comp;
struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
RETURN(0);
if (likely(lod_comp->llc_stripe == NULL)) {
- struct lov_user_md *lum = NULL;
-
/*
* no striping has been created so far
*/
- LASSERT(lod_comp->llc_stripenr > 0);
+ LASSERT(lod_comp->llc_stripenr);
/*
* statfs and check OST targets now, since ld_active_tgt_count
* could be changed if some OSTs are [de]activated manually.
CDEBUG(D_OTHER, "tgt_count %d stripenr %d\n",
d->lod_desc.ld_tgt_count, stripe_len);
- if (buf != NULL && buf->lb_buf != NULL) {
- lum = buf->lb_buf;
- if (lum->lmm_magic == LOV_USER_MAGIC_COMP_V1) {
- struct lov_comp_md_v1 *comp_v1;
-
- comp_v1 = (struct lov_comp_md_v1 *)lum;
- lum = (struct lov_user_md *)((char *)comp_v1 +
- comp_v1->lcm_entries[comp_idx].lcme_offset);
- }
- }
-
- if (lum != NULL && lum->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
- rc = lod_alloc_ost_list(env, lo, stripe, lum, th,
- comp_idx, inuse);
+ if (lod_comp->llc_ostlist.op_array) {
+ rc = lod_alloc_ost_list(env, lo, stripe, th, comp_idx,
+ inuse);
} else if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) {
rc = lod_alloc_qos(env, lo, stripe, flag, th,
comp_idx, inuse);
RETURN(rc);
}
-static int
-lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo,
- struct dt_object *dt, struct thandle *th,
- int stripe_idx, struct lod_obj_stripe_cb_data *data)
+int lod_obj_stripe_set_inuse_cb(const struct lu_env *env,
+ struct lod_object *lo,
+ struct dt_object *dt, struct thandle *th,
+ int stripe_idx,
+ struct lod_obj_stripe_cb_data *data)
{
struct lod_thread_info *info = lod_env_info(env);
struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
struct thandle *th)
{
- struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
- struct ost_pool inuse;
- int i, rc, comp_cnt;
+ struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
+ struct ost_pool inuse = { 0 };
+ int rc;
ENTRY;
LASSERT(lo);
if (rc)
RETURN(rc);
- memset(&inuse, 0, sizeof(inuse));
- init_rwsem(&inuse.op_rw_sem);
- comp_cnt = lo->ldo_comp_cnt;
-
- /* Prepare inuse array for composite file */
- if (lo->ldo_is_composite) {
- struct lod_obj_stripe_cb_data data;
-
- inuse.op_size = comp_cnt * LOV_MAX_STRIPE_COUNT_OLD *
- sizeof(__u32);
- if (d->lod_osd_max_easize > 0 &&
- inuse.op_size > d->lod_osd_max_easize)
- inuse.op_size = d->lod_osd_max_easize;
- OBD_ALLOC(inuse.op_array, inuse.op_size);
- if (inuse.op_array == NULL)
- RETURN(-ENOMEM);
-
- data.locd_inuse = &inuse;
- rc = lod_obj_for_each_stripe(env, lo, NULL,
- lod_obj_stripe_set_inuse_cb, &data);
- if (rc) {
- OBD_FREE(inuse.op_array, inuse.op_size);
- RETURN(rc);
- }
- }
-
- /* prepare OST object creation */
- for (i = 0; i < comp_cnt; i++) {
- rc = lod_qos_prep_create(env, lo, attr, buf, th, i, &inuse);
- if (rc)
- break;
- }
+ /* prepare OST object creation for the 1st comp. */
+ rc = lod_qos_prep_create(env, lo, attr, th, 0, &inuse);
- if (inuse.op_size)
- OBD_FREE(inuse.op_array, inuse.op_size);
RETURN(rc);
}
if (opcode == MDS_REINT)
mdc_put_mod_rpc_slot(req, NULL);
- /* For XATTR_LUSTRE_LOV.add, we'd save the LOVEA for replay. */
- if (opcode == MDS_REINT && rc == 0) {
- struct mdt_body *body;
- struct req_capsule *pill = &req->rq_pill;
-
- body = req_capsule_server_get(pill, &RMF_MDT_BODY);
- if (body == NULL)
- GOTO(out, rc = -EPROTO);
-
- if (body->mbo_valid & OBD_MD_FLEASIZE) {
- void *eadata;
-
- eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
- body->mbo_eadatasize);
- if (eadata == NULL)
- GOTO(out, rc = -EPROTO);
-
- rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
- body->mbo_eadatasize);
- if (rc)
- GOTO(out, rc);
- }
- }
-out:
if (rc)
ptlrpc_req_finished(req);
else
return dt_invalidate(env, mdd_object_child(obj));
}
+static inline int
+mdo_declare_layout_change(const struct lu_env *env, struct mdd_object *obj,
+ struct layout_intent *layout,
+ const struct lu_buf *buf, struct thandle *handle)
+{
+ return dt_declare_layout_change(env, mdd_object_child(obj),
+ layout, buf, handle);
+}
+
+static inline int
+mdo_layout_change(const struct lu_env *env, struct mdd_object *obj,
+ struct layout_intent *layout, const struct lu_buf *buf,
+ struct thandle *handle)
+{
+ return dt_layout_change(env, mdd_object_child(obj),
+ layout, buf, handle);
+}
+
static inline
int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
const struct lu_fid *fid, __u32 type,
return rc;
}
+static int mdd_declare_layout_change(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ struct layout_intent *layout,
+ const struct lu_buf *buf,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = mdo_declare_layout_change(env, obj, layout, buf, handle);
+ if (rc)
+ return rc;
+
+ return mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+}
+
+/* For PFL, this is used to instantiate necessary component objects. */
+int mdd_layout_change(const struct lu_env *env, struct md_object *obj,
+ struct layout_intent *layout, const struct lu_buf *buf)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+ struct mdd_device *mdd = mdo2mdd(obj);
+ struct thandle *handle;
+ int rc;
+ ENTRY;
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
+ rc = mdd_declare_layout_change(env, mdd, mdd_obj, layout, buf, handle);
+ /**
+ * It's possible that another layout write intent has already
+ * instantiated our objects, so a -EALREADY returned, and we need to
+ * do nothing.
+ */
+ if (rc)
+ GOTO(stop, rc = (rc == -EALREADY) ? 0 : rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+ rc = mdo_layout_change(env, mdd_obj, layout, buf, handle);
+ mdd_write_unlock(env, mdd_obj);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, mdd_obj, handle);
+stop:
+ RETURN(mdd_trans_stop(env, mdd, rc, handle));
+}
+
void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
struct mdd_object *child, const struct lu_attr *attr,
const struct md_op_spec *spec,
.moo_object_sync = mdd_object_sync,
.moo_object_lock = mdd_object_lock,
.moo_object_unlock = mdd_object_unlock,
+ .moo_layout_change = mdd_layout_change,
};
/**
* Handler of layout intent RPC requiring the layout modification
*
- * \param info [in] thread environment
- * \param obj [in] object
- * \param layout [in] layout intent
+ * \param[in] info thread environment
+ * \param[in] obj object
+ * \param[in] layout layout intent
+ * \param[in] buf buffer containing client's lovea, could be empty
*
* \retval 0 on success
* \retval < 0 error code
*/
static int mdt_layout_change(struct mdt_thread_info *info,
struct mdt_object *obj,
- struct layout_intent *layout)
+ struct layout_intent *layout,
+ const struct lu_buf *buf)
{
- /* XXX: to do */
- return 0;
+ struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+ int rc;
+ ENTRY;
+
+ if (layout->li_start >= layout->li_end) {
+ CERROR("Recieved an invalid layout change range [%llu, %llu) "
+ "for "DFID"\n", layout->li_start, layout->li_end,
+ PFID(mdt_object_fid(obj)));
+ RETURN(-EINVAL);
+ }
+
+ if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
+ GOTO(out, rc = -EINVAL);
+
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
+ MAY_WRITE);
+ if (rc)
+ GOTO(out, rc);
+
+ /* take layout lock to prepare layout change */
+ mdt_lock_reg_init(lh, LCK_EX);
+ rc = mdt_object_lock(info, obj, lh,
+ MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout,
+ buf);
+
+ mdt_object_unlock(info, obj, lh, 1);
+out:
+ RETURN(rc);
}
/**
info->mti_mdt->mdt_max_mdsize = layout_size;
}
+ /*
+ * set reply buffer size, so that ldlm_handle_enqueue0()->
+ * ldlm_lvbo_fill() will fill the reply buffer with lovea.
+ */
(*lockp)->l_lvb_type = LVB_T_LAYOUT;
req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
layout_size);
if (rc)
GOTO(out_obj, rc);
+
if (layout_change) {
- rc = mdt_layout_change(info, obj, layout);
+ struct lu_buf *buf = &info->mti_buf;
+
+ buf->lb_buf = NULL;
+ buf->lb_len = 0;
+ if (unlikely(req_is_replay(mdt_info_req(info)))) {
+ buf->lb_buf = req_capsule_client_get(info->mti_pill,
+ &RMF_EADATA);
+ buf->lb_len = req_capsule_get_size(info->mti_pill,
+ &RMF_EADATA, RCL_CLIENT);
+ /*
+ * If it's a replay of layout write intent RPC, the
+ * client has saved the extended lovea when
+ * it get reply then.
+ */
+ if (buf->lb_len > 0)
+ mdt_fix_lov_magic(info, buf->lb_buf);
+ }
+
+ /*
+ * Instantiate some layout components, if @buf contains
+ * lovea, then it's a replay of the layout intent write
+ * RPC.
+ */
+ rc = mdt_layout_change(info, obj, layout, buf);
if (rc)
GOTO(out_obj, rc);
}
enum mdt_name_flags flags);
int mdt_close_unpack(struct mdt_thread_info *info);
int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
-void mdt_fix_lov_magic(struct mdt_thread_info *info);
+void mdt_fix_lov_magic(struct mdt_thread_info *info, void *eadata);
int mdt_reint_rec(struct mdt_thread_info *, struct mdt_lock_handle *);
#ifdef CONFIG_FS_POSIX_ACL
int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
/*
* please see comment above LOV_MAGIC_V1_DEF
*/
-void mdt_fix_lov_magic(struct mdt_thread_info *info)
+void mdt_fix_lov_magic(struct mdt_thread_info *info, void *eadata)
{
- struct mdt_reint_record *rr = &info->mti_rr;
- struct lov_user_md_v1 *v1;
+ struct lov_user_md_v1 *v1 = eadata;
- v1 = (void *)rr->rr_eadata;
LASSERT(v1);
if (unlikely(req_is_replay(mdt_info_req(info)))) {
sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
sp->u.sp_ea.eadata = rr->rr_eadata;
sp->no_create = !!req_is_replay(req);
- mdt_fix_lov_magic(info);
+ mdt_fix_lov_magic(info, rr->rr_eadata);
}
/*
int xattr_len = rr->rr_eadatalen;
__u64 lockpart = MDS_INODELOCK_UPDATE;
int rc;
- bool reply_ea = false;
ENTRY;
CDEBUG(D_INODE, "setxattr for "DFID": %s %s\n", PFID(rr->rr_fid1),
}
lockpart |= MDS_INODELOCK_LAYOUT;
-
- /*
- * For XATTR_LUSTRE_LOV.add, we'd reply LOVEA to client,
- * client will save it for replay.
- */
- if (strncmp(xattr_name, XATTR_LUSTRE_LOV".add",
- strlen(XATTR_LUSTRE_LOV".add")) == 0 &&
- req_capsule_has_field(&req->rq_pill, &RMF_MDT_MD,
- RCL_SERVER)) {
- /*
- * Don't need to reply LOVEA for replay request,
- * it's already stored in client request.
- */
- if (!req_is_replay(req))
- reply_ea = true;
- mdt_fix_lov_magic(info);
- }
}
/* Revoke all clients' lookup lock, since the access
rc = -EINVAL;
}
- if (reply_ea && rc == 0) {
- ma->ma_lmm = req_capsule_server_get(&req->rq_pill, &RMF_MDT_MD);
- ma->ma_lmm_size = req_capsule_get_size(&req->rq_pill,
- &RMF_MDT_MD, RCL_SERVER);
- ma->ma_need = MA_LOV;
- ma->ma_valid = 0;
- if (ma->ma_lmm_size > 0)
- rc = mdt_attr_get_complex(info, obj, ma);
-
- if (ma->ma_valid & MA_LOV) {
- struct mdt_body *repbody;
-
- repbody = req_capsule_server_get(&req->rq_pill,
- &RMF_MDT_BODY);
- LASSERT(ma->ma_lmm_size != 0);
- repbody->mbo_eadatasize = ma->ma_lmm_size;
- repbody->mbo_valid |= OBD_MD_FLEASIZE;
- }
- }
-
if (rc == 0)
mdt_counter_incr(req, LPROC_MDT_SETXATTR);
&RMF_DLM_REQ
};
-static const struct req_msg_field *mds_reint_setxattr_server[] = {
- &RMF_PTLRPC_BODY,
- &RMF_MDT_BODY,
- &RMF_MDT_MD
-};
-
static const struct req_msg_field *mdt_swap_layouts[] = {
&RMF_PTLRPC_BODY,
&RMF_MDT_BODY,
struct req_format RQF_MDS_REINT_SETXATTR =
DEFINE_REQ_FMT0("MDS_REINT_SETXATTR",
- mds_reint_setxattr_client, mds_reint_setxattr_server);
+ mds_reint_setxattr_client, mdt_body_only);
EXPORT_SYMBOL(RQF_MDS_REINT_SETXATTR);
struct req_format RQF_MDS_CONNECT =
rm -f $comp_file
- $LFS setstripe -E 1m -S 1m $comp_file ||
+ $LFS setstripe -E 1m -S 1m -E 2M -c 1 $comp_file ||
error "Create $comp_file failed"
local comp_cnt=$($LFS getstripe --component-count $comp_file)
- [ $comp_cnt -ne 1 ] && error "component count $comp_cnt != 1"
+ [ $comp_cnt -ne 2 ] && error "component count $comp_cnt != 2"
replay_barrier $SINGLEMDS
- $LFS setstripe --component-add -E 2M -c 1 $comp_file ||
- error "Add component to $comp_file failed"
+ # instantiate the 2nd component
+ dd if=/dev/zero of=$comp_file bs=1k count=1 seek=1k
local f1=$($LFS getstripe -I 2 $comp_file |
awk '/l_fid:/ {print $7}')
-
+ echo "before MDS recovery, the ost fid of 2nd component is $f1"
fail $SINGLEMDS
local f2=$($LFS getstripe -I 2 $comp_file |
awk '/l_fid:/ {print $7}')
+ echo "after MDS recovery, the ost fid of 2nd component is $f2"
[ $f1 == $f2 ] || error "$f1 != $f2"
}
-run_test 9 "Replay component add"
+run_test 9 "Replay layout extend object instantiation"
component_dump() {
echo $($LFS getstripe $1 |
}
run_test 10 "Inherit composite template from root"
+test_11() {
+ local comp_file=$DIR/$tfile
+ rm -f $comp_file
+
+ # only 1st component instantiated
+ $LFS setstripe -E 1m -E 2m -E 3m -E -1 $comp_file ||
+ error "Create $comp_file failed"
+
+ local f1=$($LFS getstripe -I 1 $comp_file | grep "l_fid")
+ [[ -z $f1 ]] && error "1: 1st component uninstantiated"
+ local f2=$($LFS getstripe -I 2 $comp_file | grep "l_fid")
+ [[ -n $f2 ]] && error "1: 2nd component instantiated"
+ local f3=$($LFS getstripe -I 3 $comp_file | grep "l_fid")
+ [[ -n $f3 ]] && error "1: 3rd component instantiated"
+ local f4=$($LFS getstripe -I 4 $comp_file | grep "l_fid")
+ [[ -n $f4 ]] && error "1: 4th component instantiated"
+
+ # the first 2 components instantiated
+ $TRUNCATE $comp_file $((1024*1024*1+1))
+
+ f2=$($LFS getstripe -I 2 $comp_file | grep "l_fid")
+ [[ -z $f2 ]] && error "2: 2nd component uninstantiated"
+ f3=$($LFS getstripe -I 3 $comp_file | grep "l_fid")
+ [[ -n $f3 ]] && error "2: 3rd component instantiated"
+ f4=$($LFS getstripe -I 4 $comp_file | grep "l_fid")
+ [[ -n $f4 ]] && error "2: 4th component instantiated"
+
+ # the first 3 components instantiated
+ $TRUNCATE $comp_file $((1024*1024*3))
+ $TRUNCATE $comp_file $((1024*1024*1+1))
+
+ f2=$($LFS getstripe -I 2 $comp_file | grep "l_fid")
+ [[ -z $f2 ]] && error "2: 2nd component uninstantiated"
+ f3=$($LFS getstripe -I 3 $comp_file | grep "l_fid")
+ [[ -z $f3 ]] && error "3: 3rd component uninstantiated"
+ f4=$($LFS getstripe -I 4 $comp_file | grep "l_fid")
+ [[ -n $f4 ]] && error "3: 4th component instantiated"
+
+ # all 4 components instantiated
+ dd if=/dev/zero of=$comp_file bs=1k count=1 seek=3k
+
+ f4=$($LFS getstripe -I 4 $comp_file | grep "l_fid")
+ [[ -z $f4 ]] && error "4: 4th component uninstantiated"
+
+ return 0
+}
+run_test 11 "Verify component instantiation with write/truncate"
+
+test_12() {
+ [ $OSTCOUNT -lt 3 ] && skip "needs >= 3 OSTs" && return
+
+ local file=$DIR/$tfile
+ rm -f $file
+
+ # specify ost list for component
+ $LFS setstripe -E1m -c2 -o0,1 -E2m -c2 -o1,2 -E3m -c2 -o2,1 \
+ -E4m -c2 -o2,0 -E-1 $file ||
+ error "Create $file failed"
+ # instantiate all components
+ $TRUNCATE $file $((1024*1024*4+1))
+
+ #verify object alloc order
+ local o1=$($LFS getstripe -I1 $file |
+ awk '/l_ost_idx:/ {printf("%d",$5)}')
+ [[ $o1 != "01" ]] && error "$o1 is not 01"
+
+ local o2=$($LFS getstripe -I2 $file |
+ awk '/l_ost_idx:/ {printf("%d",$5)}')
+ [[ $o2 != "12" ]] && error "$o2 is not 12"
+
+ local o3=$($LFS getstripe -I3 $file |
+ awk '/l_ost_idx:/ {printf("%d",$5)}')
+ [[ $o3 != "21" ]] && error "$o3 is not 21"
+
+ local o4=$($LFS getstripe -I4 $file |
+ awk '/l_ost_idx:/ {printf("%d",$5)}')
+ [[ $o4 != "20" ]] && error "$o4 is not 20"
+
+ return 0
+}
+run_test 12 "Verify ost list specification"
+
complete $SECONDS
check_and_cleanup_lustre
exit_status