*/
static int lod_insert(const struct lu_env *env, struct dt_object *dt,
const struct dt_rec *rec, const struct dt_key *key,
- struct thandle *th, int ign)
+ struct thandle *th)
{
- return lod_sub_insert(env, dt_object_child(dt), rec, key, th, ign);
+ return lod_sub_insert(env, dt_object_child(dt), rec, key, th);
}
/**
int rc;
ENTRY;
- /* If it is not a striped directory, then load nothing. */
if (magic != LMV_MAGIC_V1)
RETURN(0);
- /* If it is in migration (or failure), then load nothing. */
- if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
- RETURN(0);
-
stripes = le32_to_cpu(lmv1->lmv_stripe_count);
if (stripes < 1)
RETURN(0);
LASSERT(next->do_ops);
LASSERT(next->do_ops->do_index_try);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
data->locd_comp_skip_cb(env, lo, i, data))
continue;
+ if (data->locd_comp_cb) {
+ rc = data->locd_comp_cb(env, lo, i, data);
+ if (rc)
+ RETURN(rc);
+ }
+
+ /* could used just to do sth about component, not each
+ * stripes
+ */
+ if (!data->locd_stripe_cb)
+ continue;
+
LASSERT(lod_comp->llc_stripe_count > 0);
for (j = 0; j < lod_comp->llc_stripe_count; j++) {
struct dt_object *dt = lod_comp->llc_stripe[j];
* is being initialized as we don't need this information till
* few specific cases like destroy, chown
*/
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
* the in-memory striping information has been freed in lod_xattr_set()
* due to layout change. It has to load stripe here again. It only
* changes flags of layout so declare_attr_set() is still accurate */
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
}
lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
+ memset(lmm1, 0, sizeof(*lmm1));
lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
+ if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) {
+ lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash);
+ lmm1->lmv_migrate_offset =
+ cpu_to_le32(lo->ldo_dir_migrate_offset);
+ }
rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
&mdtidx, &type);
if (rc != 0)
int rc = 0;
ENTRY;
+ LASSERT(mutex_is_locked(&lo->ldo_layout_mutex));
+
if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
RETURN(0);
lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
if (rc != 0)
- lod_object_free_striping(env, lo);
+ lod_striping_free_nolock(env, lo);
RETURN(rc);
}
struct dt_object_format *dof,
struct thandle *th)
{
+ struct lod_thread_info *info = lod_env_info(env);
struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
struct lod_object *lo = lod_dt_obj(dt);
continue;
tgt_dt = tgt->ltd_tgt;
- rc = dt_statfs(env, tgt_dt, NULL);
+ rc = dt_statfs(env, tgt_dt, &info->lti_osfs);
if (rc) {
/* this OSP doesn't feel well */
rc = 0;
stripe[i] = dto;
}
- lo->ldo_dir_stripe_loaded = 1;
lo->ldo_dir_striped = 1;
lo->ldo_stripe = stripe;
lo->ldo_dir_stripe_count = i;
lo->ldo_dir_stripes_allocated = stripe_count;
+ smp_mb();
+ lo->ldo_dir_stripe_loaded = 1;
if (lo->ldo_dir_stripe_count == 0)
GOTO(out_put, rc = -ENOSPC);
if (rc != 0) {
/* failed to create striping, let's reset
* config so that others don't get confused */
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
GOTO(out, rc);
}
out:
}
/**
+ * Append source stripes after target stripes for migrating directory. NB, we
+ * only need to declare this, the append is done inside lod_xattr_set_lmv().
+ *
+ * \param[in] env execution environment
+ * \param[in] dt target object
+ * \param[in] buf LMV buf which contains source stripe fids
+ * \param[in] th transaction handle
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+static int lod_dir_declare_layout_add(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct dt_object_format *dof = &info->lti_format;
+ struct lmv_mds_md_v1 *lmv = buf->lb_buf;
+ struct dt_object **stripe;
+ __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+ struct lu_fid *fid = &info->lti_fid;
+ struct lod_tgt_desc *tgt;
+ struct dt_object *dto;
+ struct dt_device *tgt_dt;
+ int type = LU_SEQ_RANGE_ANY;
+ struct dt_insert_rec *rec = &info->lti_dt_rec;
+ char *stripe_name = info->lti_key;
+ struct lu_name *sname;
+ struct linkea_data ldata = { NULL };
+ struct lu_buf linkea_buf;
+ __u32 idx;
+ int i;
+ int rc;
+
+ ENTRY;
+
+ if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+ RETURN(-EINVAL);
+
+ if (stripe_count == 0)
+ RETURN(-EINVAL);
+
+ dof->dof_type = DFT_DIR;
+
+ OBD_ALLOC(stripe,
+ sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count));
+ if (stripe == NULL)
+ RETURN(-ENOMEM);
+
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++)
+ stripe[i] = lo->ldo_stripe[i];
+
+ for (i = 0; i < stripe_count; i++) {
+ fid_le_to_cpu(fid,
+ &lmv->lmv_stripe_fids[i]);
+ if (!fid_is_sane(fid))
+ GOTO(out, rc = -ESTALE);
+
+ rc = lod_fld_lookup(env, lod, fid, &idx, &type);
+ if (rc)
+ GOTO(out, rc);
+
+ if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
+ tgt_dt = lod->lod_child;
+ } else {
+ tgt = LTD_TGT(ltd, idx);
+ if (tgt == NULL)
+ GOTO(out, rc = -ESTALE);
+ tgt_dt = tgt->ltd_tgt;
+ }
+
+ dto = dt_locate_at(env, tgt_dt, fid,
+ lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+ NULL);
+ if (IS_ERR(dto))
+ GOTO(out, rc = PTR_ERR(dto));
+
+ stripe[i + lo->ldo_dir_stripe_count] = dto;
+
+ if (!dt_try_as_dir(env, dto))
+ GOTO(out, rc = -ENOTDIR);
+
+ rc = lod_sub_declare_ref_add(env, dto, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_xattr_set(env, dto, buf,
+ XATTR_NAME_LMV, 0, th);
+ if (rc)
+ GOTO(out, rc);
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+ PFID(lu_object_fid(&dto->do_lu)),
+ i + lo->ldo_dir_stripe_count);
+
+ sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+ rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+ sname, lu_object_fid(&dt->do_lu));
+ if (rc)
+ GOTO(out, rc);
+
+ linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+ linkea_buf.lb_len = ldata.ld_leh->leh_len;
+ rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
+ XATTR_NAME_LINK, 0, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, next,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)stripe_name,
+ th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_ref_add(env, next, th);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ if (lo->ldo_stripe)
+ OBD_FREE(lo->ldo_stripe,
+ sizeof(*stripe) * lo->ldo_dir_stripes_allocated);
+ lo->ldo_stripe = stripe;
+ lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count;
+ lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
+ lo->ldo_dir_stripe_count += stripe_count;
+ lo->ldo_dir_stripes_allocated += stripe_count;
+ lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
+
+ RETURN(0);
+out:
+ i = lo->ldo_dir_stripe_count;
+ while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i])
+ dt_object_put(env, stripe[i++]);
+
+ OBD_FREE(stripe,
+ sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count));
+ RETURN(rc);
+}
+
+static int lod_dir_declare_layout_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct lmv_user_md *lmu = buf->lb_buf;
+ __u32 final_stripe_count;
+ char *stripe_name = info->lti_key;
+ struct dt_object *dto;
+ int i;
+ int rc = 0;
+
+ if (!lmu)
+ return -EINVAL;
+
+ final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+ if (final_stripe_count >= lo->ldo_dir_stripe_count)
+ return -EINVAL;
+
+ for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+ dto = lo->ldo_stripe[i];
+ LASSERT(dto);
+
+ if (!dt_try_as_dir(env, dto))
+ return -ENOTDIR;
+
+ rc = lod_sub_declare_delete(env, dto,
+ (const struct dt_key *)dot, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_ref_del(env, dto, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_delete(env, dto,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ return rc;
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+ PFID(lu_object_fid(&dto->do_lu)), i);
+
+ rc = lod_sub_declare_delete(env, next,
+ (const struct dt_key *)stripe_name, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_ref_del(env, next, th);
+ if (rc)
+ return rc;
+ }
+
+ return 0;
+}
+
+/*
+ * delete stripes from dir master object, the lum_stripe_count in argument is
+ * the final stripe count, the stripes after that will be deleted, NB, they
+ * are not destroyed, but deleted from it's parent namespace, this function
+ * will be called in two places:
+ * 1. mdd_migrate_create() delete stripes from source, and append them to
+ * target.
+ * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them.
+ */
+static int lod_dir_layout_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct lmv_user_md *lmu = buf->lb_buf;
+ __u32 final_stripe_count;
+ char *stripe_name = info->lti_key;
+ struct dt_object *dto;
+ int i;
+ int rc = 0;
+
+ ENTRY;
+
+ if (!lmu)
+ RETURN(-EINVAL);
+
+ final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+ if (final_stripe_count >= lo->ldo_dir_stripe_count)
+ RETURN(-EINVAL);
+
+ for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+ dto = lo->ldo_stripe[i];
+ LASSERT(dto);
+
+ rc = lod_sub_delete(env, dto,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ break;
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+ PFID(lu_object_fid(&dto->do_lu)), i);
+
+ rc = lod_sub_delete(env, next,
+ (const struct dt_key *)stripe_name, th);
+ if (rc)
+ break;
+
+ rc = lod_sub_ref_del(env, next, th);
+ if (rc)
+ break;
+ }
+
+ lod_striping_free(env, lod_dt_obj(dt));
+
+ RETURN(rc);
+}
+
+/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
* Used with regular (non-striped) objects. Basically it
RETURN(0);
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
return rc;
}
- filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
- if (lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) &&
- ff->ff_layout.ol_comp_id == comp->llc_id)
- return 0;
+ /*
+ * locd_buf is set if it's called by dir migration, which doesn't check
+ * pfid and comp id.
+ */
+ if (data->locd_buf) {
+ memset(ff, 0, sizeof(*ff));
+ ff->ff_parent = *(struct lu_fid *)data->locd_buf->lb_buf;
+ } else {
+ filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
+
+ if (lu_fid_eq(lod_object_fid(lo), &ff->ff_parent) &&
+ ff->ff_layout.ol_comp_id == comp->llc_id)
+ return 0;
+
+ memset(ff, 0, sizeof(*ff));
+ ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu);
+ }
/* rewrite filter_fid */
- memset(ff, 0, sizeof(*ff));
- ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu);
ff->ff_parent.f_ver = stripe_idx;
ff->ff_layout.ol_stripe_size = comp->llc_stripe_size;
ff->ff_layout.ol_stripe_count = comp->llc_stripe_count;
*/
static int lod_replace_parent_fid(const struct lu_env *env,
struct dt_object *dt,
+ const struct lu_buf *buf,
struct thandle *th, bool declare)
{
struct lod_object *lo = lod_dt_obj(dt);
struct lod_thread_info *info = lod_env_info(env);
- struct lu_buf *buf = &info->lti_buf;
struct filter_fid *ff;
struct lod_obj_stripe_cb_data data = { { 0 } };
int rc;
LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
RETURN(rc);
}
- buf->lb_buf = info->lti_ea_store;
- buf->lb_len = info->lti_ea_store_size;
-
data.locd_declare = declare;
data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb;
+ data.locd_buf = buf;
rc = lod_obj_for_each_stripe(env, lo, th, &data);
RETURN(rc);
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
__u32 id = comp_v1->lcm_entries[i].lcme_id;
__u32 flags = comp_v1->lcm_entries[i].lcme_flags;
+ __u32 mirror_flag = flags & LCME_MIRROR_FLAGS;
+ bool neg = flags & LCME_FL_NEG;
if (flags & LCME_FL_INIT) {
if (changed)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(-EINVAL);
}
+ flags &= ~(LCME_MIRROR_FLAGS | LCME_FL_NEG);
for (j = 0; j < lo->ldo_comp_cnt; j++) {
lod_comp = &lo->ldo_comp_entries[j];
- if (id != lod_comp->llc_id)
+
+ /* lfs only put one flag in each entry */
+ if ((flags && id != lod_comp->llc_id) ||
+ (mirror_flag && mirror_id_of(id) !=
+ mirror_id_of(lod_comp->llc_id)))
continue;
- if (flags & LCME_FL_NEG) {
- flags &= ~LCME_FL_NEG;
- lod_comp->llc_flags &= ~flags;
+ if (neg) {
+ if (flags)
+ lod_comp->llc_flags &= ~flags;
+ if (mirror_flag)
+ lod_comp->llc_flags &= ~mirror_flag;
} else {
- lod_comp->llc_flags |= flags;
+ if (flags)
+ lod_comp->llc_flags |= flags;
+ if (mirror_flag) {
+ lod_comp->llc_flags |= mirror_flag;
+ if (mirror_flag & LCME_FL_NOSYNC)
+ lod_comp->llc_timestamp =
+ ktime_get_real_seconds();
+ }
}
changed = true;
}
{
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
struct lod_object *lo = lod_dt_obj(dt);
- struct dt_object *next = dt_object_child(&lo->ldo_obj);
char *op;
int rc, len = strlen(XATTR_LUSTRE_LOV);
ENTRY;
}
len++;
- dt_write_lock(env, next, 0);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(unlock, rc);
}
unlock:
if (rc)
- lod_object_free_striping(env, lo);
- dt_write_unlock(env, next);
+ lod_striping_free(env, lo);
RETURN(rc);
}
offset += le32_to_cpu(lcme->lcme_size);
- if (mirror_count == 1) {
- /* new mirrored file, create new mirror ID */
+ if (mirror_count == 1 &&
+ mirror_id_of(le32_to_cpu(lcme->lcme_id)) == 0) {
+ /* Add mirror from a non-flr file, create new mirror ID.
+ * Otherwise, keep existing mirror's component ID, used
+ * for mirror extension.
+ */
id = pflr_id(1, i + 1);
lcme->lcme_id = cpu_to_le32(id);
}
if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE)
lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY);
- LASSERT(dt_write_locked(env, dt_object_child(dt)));
- lod_object_free_striping(env, lo);
- rc = lod_parse_striping(env, lo, buf);
+ rc = lod_striping_reload(env, lo, buf);
if (rc)
GOTO(out, rc);
lod_obj_inc_layout_gen(lo);
lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen);
- lod_object_free_striping(env, lo);
- rc = lod_parse_striping(env, lo, mbuf);
+ rc = lod_striping_reload(env, lo, mbuf);
if (rc)
RETURN(rc);
RETURN(-ENOENT);
rc = lod_declare_modify_layout(env, dt, name, buf, th);
+ } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
+ strlen(name) > strlen(XATTR_NAME_LMV) + 1) {
+ const char *op = name + strlen(XATTR_NAME_LMV) + 1;
+
+ rc = -ENOTSUPP;
+ if (strcmp(op, "add") == 0)
+ rc = lod_dir_declare_layout_add(env, dt, buf, th);
+ else if (strcmp(op, "del") == 0)
+ rc = lod_dir_declare_layout_delete(env, dt, buf, th);
+ else if (strcmp(op, "set") == 0)
+ rc = lod_sub_declare_xattr_set(env, next, buf,
+ XATTR_NAME_LMV, fl, th);
+
+ RETURN(rc);
} else if (S_ISDIR(mode)) {
rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
} else if (strcmp(name, XATTR_NAME_FID) == 0) {
- rc = lod_replace_parent_fid(env, dt, th, true);
+ rc = lod_replace_parent_fid(env, dt, buf, th, true);
} else {
rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th);
}
lum = buf->lb_buf;
switch (lum->lmm_magic) {
+ case LOV_USER_MAGIC_SPECIFIC:
case LOV_USER_MAGIC_V3:
v3 = buf->lb_buf;
if (v3->lmm_pool_name[0] != '\0')
rec->rec_type = S_IFDIR;
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- struct dt_object *dto;
- char *stripe_name = info->lti_key;
- struct lu_name *sname;
- struct linkea_data ldata = { NULL };
- struct lu_buf linkea_buf;
-
- dto = lo->ldo_stripe[i];
+ struct dt_object *dto = lo->ldo_stripe[i];
+ char *stripe_name = info->lti_key;
+ struct lu_name *sname;
+ struct linkea_data ldata = { NULL };
+ struct lu_buf linkea_buf;
+
+ /* if it's source stripe of migrating directory, don't create */
+ if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
+ i >= lo->ldo_dir_migrate_offset)) {
+ dt_write_lock(env, dto, MOR_TGT_CHILD);
+ rc = lod_sub_create(env, dto, attr, NULL, dof, th);
+ if (rc != 0) {
+ dt_write_unlock(env, dto);
+ GOTO(out, rc);
+ }
- dt_write_lock(env, dto, MOR_TGT_CHILD);
- rc = lod_sub_create(env, dto, attr, NULL, dof, th);
- if (rc != 0) {
+ rc = lod_sub_ref_add(env, dto, th);
dt_write_unlock(env, dto);
- GOTO(out, rc);
- }
-
- rc = lod_sub_ref_add(env, dto, th);
- dt_write_unlock(env, dto);
- if (rc != 0)
- GOTO(out, rc);
+ if (rc != 0)
+ GOTO(out, rc);
- rec->rec_fid = lu_object_fid(&dto->do_lu);
- rc = lod_sub_insert(env, dto, (const struct dt_rec *)rec,
- (const struct dt_key *)dot, th, 0);
- if (rc != 0)
- GOTO(out, rc);
+ rec->rec_fid = lu_object_fid(&dto->do_lu);
+ rc = lod_sub_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th);
+ if (rc != 0)
+ GOTO(out, rc);
+ }
rec->rec_fid = lu_object_fid(&dt->do_lu);
rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
- (const struct dt_key *)dotdot, th, 0);
+ (const struct dt_key *)dotdot, th);
if (rc != 0)
GOTO(out, rc);
rec->rec_fid = lu_object_fid(&dto->do_lu);
rc = lod_sub_insert(env, dt_object_child(dt),
(const struct dt_rec *)rec,
- (const struct dt_key *)stripe_name, th, 0);
+ (const struct dt_key *)stripe_name, th);
if (rc != 0)
GOTO(out, rc);
LASSERT(lo);
if (lo->ldo_comp_cnt == 0) {
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
RETURN(rc);
}
EXIT;
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
return rc;
}
if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_LMV) == 0) {
- struct lmv_mds_md_v1 *lmm = buf->lb_buf;
+ rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+ RETURN(rc);
+ } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+ strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
+ strlen(name) > strlen(XATTR_NAME_LMV) + 1) {
+ const char *op = name + strlen(XATTR_NAME_LMV) + 1;
- if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
- LMV_HASH_FLAG_MIGRATION)
- rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
- else
- rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+ rc = -ENOTSUPP;
+ /*
+ * XATTR_NAME_LMV".add" is never called, but only declared,
+ * because lod_xattr_set_lmv() will do the addition.
+ */
+ if (strcmp(op, "del") == 0)
+ rc = lod_dir_layout_delete(env, dt, buf, th);
+ else if (strcmp(op, "set") == 0)
+ rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV,
+ fl, th);
RETURN(rc);
- }
-
- if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+ } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_LOV) == 0) {
struct lod_thread_info *info = lod_env_info(env);
struct lod_default_striping *lds = &info->lti_def_striping;
* defines striping, then create() does the work */
if (fl & LU_XATTR_REPLACE) {
/* free stripes, then update disk */
- lod_object_free_striping(env, lod_dt_obj(dt));
+ lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
} else if (dt_object_remote(dt)) {
}
RETURN(rc);
} else if (strcmp(name, XATTR_NAME_FID) == 0) {
- rc = lod_replace_parent_fid(env, dt, th, false);
+ rc = lod_replace_parent_fid(env, dt, buf, th, false);
RETURN(rc);
}
struct dt_object *dt, const char *name,
struct thandle *th)
{
- struct lod_object *lo = lod_dt_obj(dt);
- int rc;
- int i;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ int i;
+ int rc;
ENTRY;
- rc = lod_sub_declare_xattr_del(env, dt_object_child(dt), name, th);
+ rc = lod_sub_declare_xattr_del(env, next, name, th);
if (rc != 0)
RETURN(rc);
RETURN(0);
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
- rc = lod_sub_declare_xattr_del(env, lo->ldo_stripe[i],
- name, th);
+ struct dt_object *dto = lo->ldo_stripe[i];
+
+ LASSERT(dto);
+ rc = lod_sub_declare_xattr_del(env, dto, name, th);
if (rc != 0)
break;
}
int i;
ENTRY;
- if (!strcmp(name, XATTR_NAME_LOV))
- lod_object_free_striping(env, lod_dt_obj(dt));
+ if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV))
+ lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_del(env, next, name, th);
if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
+ struct dt_object *dto = lo->ldo_stripe[i];
- rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th);
+ LASSERT(dto);
+
+ rc = lod_sub_xattr_del(env, dto, name, th);
if (rc != 0)
break;
}
return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
}
+/**
+ * Copy OST list from layout provided by user.
+ *
+ * \param[in] lod_comp layout_component to be filled
+ * \param[in] v3 LOV EA V3 user data
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp,
+ struct lov_user_md_v3 *v3)
+{
+ int j;
+
+ ENTRY;
+
+ if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT)
+ v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx;
+
+ if (lod_comp->llc_ostlist.op_array) {
+ if (lod_comp->llc_ostlist.op_size >=
+ v3->lmm_stripe_count * sizeof(__u32)) {
+ lod_comp->llc_ostlist.op_count =
+ v3->lmm_stripe_count;
+ goto skip;
+ }
+ OBD_FREE(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ }
+
+ /* copy ost list from lmm */
+ lod_comp->llc_ostlist.op_count = v3->lmm_stripe_count;
+ lod_comp->llc_ostlist.op_size = v3->lmm_stripe_count * sizeof(__u32);
+ OBD_ALLOC(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ if (!lod_comp->llc_ostlist.op_array)
+ RETURN(-ENOMEM);
+skip:
+ for (j = 0; j < v3->lmm_stripe_count; j++) {
+ lod_comp->llc_ostlist.op_array[j] =
+ v3->lmm_objects[j].l_ost_idx;
+ }
+
+ RETURN(0);
+}
+
/**
* Get default striping.
__u16 comp_cnt;
__u16 mirror_cnt;
bool composite;
- int rc, i;
+ int rc, i, j;
ENTRY;
lds->lds_def_striping_set = 0;
} else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
v3 = (struct lov_user_md_v3 *)v1;
lustre_swab_lov_user_md_v3(v3);
+ } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) {
+ v3 = (struct lov_user_md_v3 *)v1;
+ lustre_swab_lov_user_md_v3(v3);
+ lustre_swab_lov_user_md_objects(v3->lmm_objects,
+ v3->lmm_stripe_count);
} else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
comp_v1 = (struct lov_comp_md_v1 *)v1;
lustre_swab_lov_comp_md_v1(comp_v1);
}
if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 &&
- v1->lmm_magic != LOV_MAGIC_COMP_V1)
+ v1->lmm_magic != LOV_MAGIC_COMP_V1 &&
+ v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC)
RETURN(-ENOTSUPP);
if (v1->lmm_magic == LOV_MAGIC_COMP_V1) {
for (i = 0; i < comp_cnt; i++) {
struct lod_layout_component *lod_comp;
- struct lu_extent *ext;
char *pool;
lod_comp = &lds->lds_def_comp_entries[i];
if (composite) {
v1 = (struct lov_user_md *)((char *)comp_v1 +
comp_v1->lcm_entries[i].lcme_offset);
- ext = &comp_v1->lcm_entries[i].lcme_extent;
- lod_comp->llc_extent = *ext;
+ lod_comp->llc_extent =
+ comp_v1->lcm_entries[i].lcme_extent;
+ /* We only inherit certain flags from the layout */
+ lod_comp->llc_flags =
+ comp_v1->lcm_entries[i].lcme_flags &
+ LCME_TEMPLATE_FLAGS;
}
if (v1->lmm_pattern != LOV_PATTERN_RAID0 &&
pool = v3->lmm_pool_name;
}
lod_set_def_pool(lds, i, pool);
+ if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
+ v3 = (struct lov_user_md_v3 *)v1;
+ rc = lod_comp_copy_ost_lists(lod_comp, v3);
+ if (rc)
+ RETURN(rc);
+ } else if (lod_comp->llc_ostlist.op_array &&
+ lod_comp->llc_ostlist.op_count) {
+ for (j = 0; j < lod_comp->llc_ostlist.op_count; j++)
+ lod_comp->llc_ostlist.op_array[j] = -1;
+ lod_comp->llc_ostlist.op_count = 0;
+ }
}
lds->lds_def_striping_set = 1;
struct lod_layout_component *def_comp =
&lds->lds_def_comp_entries[i];
- CDEBUG(D_LAYOUT, "Inherite from default: size:%hu "
- "nr:%u offset:%u pattern %#x %s\n",
+ CDEBUG(D_LAYOUT, "Inherit from default: flags=%#x "
+ "size=%hu nr=%u offset=%u pattern=%#x pool=%s\n",
+ def_comp->llc_flags,
def_comp->llc_stripe_size,
def_comp->llc_stripe_count,
def_comp->llc_stripe_offset,
lod_obj_set_pool(lo, i, def_comp->llc_pool);
}
+ /* copy ost list */
+ if (def_comp->llc_ostlist.op_array &&
+ def_comp->llc_ostlist.op_count) {
+ OBD_ALLOC(obj_comp->llc_ostlist.op_array,
+ obj_comp->llc_ostlist.op_size);
+ if (!obj_comp->llc_ostlist.op_array)
+ return;
+ memcpy(obj_comp->llc_ostlist.op_array,
+ def_comp->llc_ostlist.op_array,
+ obj_comp->llc_ostlist.op_size);
+ } else if (def_comp->llc_ostlist.op_array) {
+ obj_comp->llc_ostlist.op_array = NULL;
+ }
+
/*
* Don't initialize these fields for plain layout
* (v1/v3) here, they are inherited in the order of
} else {
/* transfer defaults LMV to new directory */
lod_striping_from_default(lc, lds, child_mode);
+
+ /* set count 0 to create normal directory */
+ if (lc->ldo_dir_stripe_count == 1)
+ lc->ldo_dir_stripe_count = 0;
}
/* shrink the stripe_count to the avaible MDT count */
if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 &&
- !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) {
lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1;
-
- /* Directory will be striped only if stripe_count > 1, if
- * stripe_count == 1, let's reset stripe_count = 0 to avoid
- * create single master stripe and also help to unify the
- * stripe handling of directories and files */
- if (lc->ldo_dir_stripe_count == 1)
- lc->ldo_dir_stripe_count = 0;
+ if (lc->ldo_dir_stripe_count == 1)
+ lc->ldo_dir_stripe_count = 0;
+ }
CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n",
lc->ldo_dir_stripe_count,
/* failed to create striping or to set initial size, let's reset
* config so that others don't get confused */
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
/* failed to create striping or to set initial size, let's reset
* config so that others don't get confused */
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL);
- mirror_id = lo->ldo_mirror_count > 1 ? 1 : 0;
+ mirror_id = 0; /* non-flr file's mirror_id is 0 */
+ if (lo->ldo_mirror_count > 1) {
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ lod_comp = &lo->ldo_comp_entries[i];
+ if (lod_comp->llc_id != LCME_ID_INVAL &&
+ mirror_id_of(lod_comp->llc_id) > mirror_id)
+ mirror_id = mirror_id_of(lod_comp->llc_id);
+ }
+ }
/* create all underlying objects */
for (i = 0; i < lo->ldo_comp_cnt; i++) {
lod_comp = &lo->ldo_comp_entries[i];
- if (lod_comp->llc_extent.e_start == 0 && i > 0) /* new mirror */
- ++mirror_id;
-
if (lod_comp->llc_id == LCME_ID_INVAL) {
+ /* only the component of FLR layout with more than 1
+ * mirror has mirror ID in its component ID.
+ */
+ if (lod_comp->llc_extent.e_start == 0 &&
+ lo->ldo_mirror_count > 1)
+ ++mirror_id;
+
lod_comp->llc_id = lod_gen_component_id(lo,
mirror_id, i);
if (lod_comp->llc_id == LCME_ID_INVAL)
RETURN(0);
out:
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
* is being initialized as we don't need this information till
* few specific cases like destroy, chown
*/
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
RETURN(0);
LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
- LASSERT(lo->ldo_dir_stripe_count > 1);
/* Note: for remote lock for single stripe dir, MDT will cancel
* the lock by lockh directly */
LASSERT(!dt_object_remote(dt_object_child(dt)));
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
RETURN(-ENOTDIR);
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
dlmflags |= LDLM_FL_COS_INCOMPAT;
LASSERT(ns != NULL);
- rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+ rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS,
policy, einfo->ei_mode,
&dlmflags, blocking,
completion, NULL,
struct lod_object *lo, struct thandle *th)
{
struct lod_thread_info *info = lod_env_info(env);
- struct ost_pool *inuse = &info->lti_inuse_osts;
int i;
int rc = 0;
ENTRY;
LASSERT(info->lti_count < lo->ldo_comp_cnt);
- if (info->lti_count > 0) {
- /* Prepare inuse array for composite file */
- rc = lod_prepare_inuse(env, lo);
- if (rc)
- RETURN(rc);
- }
for (i = 0; i < info->lti_count; i++) {
rc = lod_qos_prep_create(env, lo, NULL, th,
- info->lti_comp_idx[i], inuse);
+ info->lti_comp_idx[i]);
if (rc)
break;
}
GOTO(out, rc = -EINVAL);
}
- lod_object_free_striping(env, lo);
rc = lod_use_defined_striping(env, lo, buf);
if (rc)
GOTO(out, rc);
+ lo->ldo_comp_cached = 1;
rc = lod_get_lov_ea(env, lo);
if (rc <= 0)
replay = true;
} else {
/* non replay path */
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(out, rc);
}
rc = lod_declare_instantiate_components(env, lo, th);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
lod_obj_inc_layout_gen(lo);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
dt_object_remote(dt_object_child(dt)))
RETURN(-EINVAL);
- lod_write_lock(env, dt, 0);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(out, rc);
break;
}
out:
- dt_write_unlock(env, dt);
RETURN(rc);
}
*/
static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
- struct thandle *th, int iq)
+ struct thandle *th)
{
LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) ||
S_ISLNK(dt->do_lu.lo_header->loh_attr));
- return lod_sub_write(env, dt_object_child(dt), buf, pos, th, iq);
+ return lod_sub_write(env, dt_object_child(dt), buf, pos, th);
}
static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt,
* \param[in] env execution environment
* \param[in] lo object
*/
-void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
+void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo)
{
struct lod_layout_component *lod_comp;
int i, j;
}
}
+void lod_striping_free(const struct lu_env *env, struct lod_object *lo)
+{
+ mutex_lock(&lo->ldo_layout_mutex);
+ lod_striping_free_nolock(env, lo);
+ mutex_unlock(&lo->ldo_layout_mutex);
+}
+
/**
* Implementation of lu_object_operations::loo_object_free.
*
struct lod_object *lo = lu2lod_obj(o);
/* release all underlying object pinned */
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
lu_object_fini(o);
OBD_SLAB_FREE_PTR(lo, lod_object_kmem);
}