* Copyright 2009 Sun Microsystems, Inc. All rights reserved
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* lustre/lod/lod_object.c
#define DEBUG_SUBSYSTEM S_MDS
+#include <linux/random.h>
+
#include <obd.h>
#include <obd_class.h>
#include <obd_support.h>
*/
static int lod_insert(const struct lu_env *env, struct dt_object *dt,
const struct dt_rec *rec, const struct dt_key *key,
- struct thandle *th, int ign)
+ struct thandle *th)
{
- return lod_sub_insert(env, dt_object_child(dt), rec, key, th, ign);
+ return lod_sub_insert(env, dt_object_child(dt), rec, key, th);
}
/**
int rc;
ENTRY;
- /* If it is not a striped directory, then load nothing. */
if (magic != LMV_MAGIC_V1)
RETURN(0);
- /* If it is in migration (or failure), then load nothing. */
- if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
- RETURN(0);
-
stripes = le32_to_cpu(lmv1->lmv_stripe_count);
if (stripes < 1)
RETURN(0);
LASSERT(next->do_ops);
LASSERT(next->do_ops->do_index_try);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
return dt_attr_get(env, dt_object_child(dt), attr);
}
+static inline void lod_adjust_stripe_info(struct lod_layout_component *comp,
+ struct lov_desc *desc)
+{
+ if (comp->llc_pattern != LOV_PATTERN_MDT) {
+ if (!comp->llc_stripe_count)
+ comp->llc_stripe_count =
+ desc->ld_default_stripe_count;
+ }
+ if (comp->llc_stripe_size <= 0)
+ comp->llc_stripe_size = desc->ld_default_stripe_size;
+}
+
int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
struct thandle *th,
struct lod_obj_stripe_cb_data *data)
data->locd_comp_skip_cb(env, lo, i, data))
continue;
+ if (data->locd_comp_cb) {
+ rc = data->locd_comp_cb(env, lo, i, data);
+ if (rc)
+ RETURN(rc);
+ }
+
+ /* could used just to do sth about component, not each
+ * stripes
+ */
+ if (!data->locd_stripe_cb)
+ continue;
+
LASSERT(lod_comp->llc_stripe_count > 0);
for (j = 0; j < lod_comp->llc_stripe_count; j++) {
struct dt_object *dt = lod_comp->llc_stripe[j];
* is being initialized as we don't need this information till
* few specific cases like destroy, chown
*/
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
* the in-memory striping information has been freed in lod_xattr_set()
* due to layout change. It has to load stripe here again. It only
* changes flags of layout so declare_attr_set() is still accurate */
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
}
lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
+ memset(lmm1, 0, sizeof(*lmm1));
lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
+ if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) {
+ lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash);
+ lmm1->lmv_migrate_offset =
+ cpu_to_le32(lo->ldo_dir_migrate_offset);
+ }
rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
&mdtidx, &type);
if (rc != 0)
int rc = 0;
ENTRY;
- if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
- RETURN(0);
+ LASSERT(mutex_is_locked(&lo->ldo_layout_mutex));
if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
lo->ldo_dir_slave_stripe = 1;
lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
if (rc != 0)
- lod_object_free_striping(env, lo);
+ lod_striping_free_nolock(env, lo);
RETURN(rc);
}
struct dt_object_format *dof,
struct thandle *th)
{
+ struct lod_thread_info *info = lod_env_info(env);
struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
struct lod_object *lo = lod_dt_obj(dt);
int rc = 0;
__u32 i;
__u32 j;
+ bool is_specific = false;
ENTRY;
/* The lum has been verifed in lod_verify_md_striping */
- LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
- LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
+ LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC ||
+ le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC);
- stripe_count = le32_to_cpu(lum->lum_stripe_count);
+ stripe_count = lo->ldo_dir_stripe_count;
OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
if (idx_array == NULL)
/* Start index must be the master MDT */
master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
idx_array[0] = master_index;
+ if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) {
+ is_specific = true;
+ for (i = 1; i < stripe_count; i++)
+ idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds);
+ }
+
for (i = 0; i < stripe_count; i++) {
struct lod_tgt_desc *tgt = NULL;
struct dt_object *dto;
CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n",
idx, lod->lod_remote_mdt_count + 1, i);
- if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
+ if (likely(!is_specific &&
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
/* check whether the idx already exists
* in current allocated array */
for (k = 0; k < i; k++) {
continue;
tgt_dt = tgt->ltd_tgt;
- rc = dt_statfs(env, tgt_dt, NULL);
+ rc = dt_statfs(env, tgt_dt, &info->lti_osfs);
if (rc) {
/* this OSP doesn't feel well */
rc = 0;
idx, i, PFID(&fid));
idx_array[i] = idx;
/* Set the start index for next stripe allocation */
- if (i < stripe_count - 1)
+ if (!is_specific && i < stripe_count - 1) {
+ /*
+ * for large dir test, put all other slaves on one
+ * remote MDT, otherwise we may save too many local
+ * slave locks which will exceed RS_MAX_LOCKS.
+ */
+ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)))
+ idx = master_index;
idx_array[i + 1] = (idx + 1) %
(lod->lod_remote_mdt_count + 1);
+ }
/* tgt_dt and fid must be ready after search avaible OSP
* in the above loop */
LASSERT(tgt_dt != NULL);
stripe[i] = dto;
}
- lo->ldo_dir_stripe_loaded = 1;
lo->ldo_dir_striped = 1;
lo->ldo_stripe = stripe;
lo->ldo_dir_stripe_count = i;
lo->ldo_dir_stripes_allocated = stripe_count;
+ smp_mb();
+ lo->ldo_dir_stripe_loaded = 1;
if (lo->ldo_dir_stripe_count == 0)
GOTO(out_put, rc = -ENOSPC);
struct thandle *th)
{
struct lod_object *lo = lod_dt_obj(dt);
- struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
- struct lmv_user_md_v1 *lum;
+ struct lmv_user_md_v1 *lum = lum_buf->lb_buf;
int rc;
ENTRY;
- lum = lum_buf->lb_buf;
LASSERT(lum != NULL);
CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
(int)le32_to_cpu(lum->lum_stripe_offset));
- if (le32_to_cpu(lum->lum_stripe_count) == 0)
+ if (lo->ldo_dir_stripe_count == 0)
GOTO(out, rc = 0);
- rc = lod_verify_md_striping(lod, lum);
- if (rc != 0)
- GOTO(out, rc);
-
/* prepare dir striped objects */
rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
if (rc != 0) {
/* failed to create striping, let's reset
* config so that others don't get confused */
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
GOTO(out, rc);
}
out:
}
/**
+ * Append source stripes after target stripes for migrating directory. NB, we
+ * only need to declare this, the append is done inside lod_xattr_set_lmv().
+ *
+ * \param[in] env execution environment
+ * \param[in] dt target object
+ * \param[in] buf LMV buf which contains source stripe fids
+ * \param[in] th transaction handle
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+static int lod_dir_declare_layout_add(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct dt_object_format *dof = &info->lti_format;
+ struct lmv_mds_md_v1 *lmv = buf->lb_buf;
+ struct dt_object **stripe;
+ __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+ struct lu_fid *fid = &info->lti_fid;
+ struct lod_tgt_desc *tgt;
+ struct dt_object *dto;
+ struct dt_device *tgt_dt;
+ int type = LU_SEQ_RANGE_ANY;
+ struct dt_insert_rec *rec = &info->lti_dt_rec;
+ char *stripe_name = info->lti_key;
+ struct lu_name *sname;
+ struct linkea_data ldata = { NULL };
+ struct lu_buf linkea_buf;
+ __u32 idx;
+ int i;
+ int rc;
+
+ ENTRY;
+
+ if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+ RETURN(-EINVAL);
+
+ if (stripe_count == 0)
+ RETURN(-EINVAL);
+
+ dof->dof_type = DFT_DIR;
+
+ OBD_ALLOC(stripe,
+ sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count));
+ if (stripe == NULL)
+ RETURN(-ENOMEM);
+
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++)
+ stripe[i] = lo->ldo_stripe[i];
+
+ for (i = 0; i < stripe_count; i++) {
+ fid_le_to_cpu(fid,
+ &lmv->lmv_stripe_fids[i]);
+ if (!fid_is_sane(fid))
+ GOTO(out, rc = -ESTALE);
+
+ rc = lod_fld_lookup(env, lod, fid, &idx, &type);
+ if (rc)
+ GOTO(out, rc);
+
+ if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
+ tgt_dt = lod->lod_child;
+ } else {
+ tgt = LTD_TGT(ltd, idx);
+ if (tgt == NULL)
+ GOTO(out, rc = -ESTALE);
+ tgt_dt = tgt->ltd_tgt;
+ }
+
+ dto = dt_locate_at(env, tgt_dt, fid,
+ lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+ NULL);
+ if (IS_ERR(dto))
+ GOTO(out, rc = PTR_ERR(dto));
+
+ stripe[i + lo->ldo_dir_stripe_count] = dto;
+
+ if (!dt_try_as_dir(env, dto))
+ GOTO(out, rc = -ENOTDIR);
+
+ rc = lod_sub_declare_ref_add(env, dto, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_xattr_set(env, dto, buf,
+ XATTR_NAME_LMV, 0, th);
+ if (rc)
+ GOTO(out, rc);
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+ PFID(lu_object_fid(&dto->do_lu)),
+ i + lo->ldo_dir_stripe_count);
+
+ sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+ rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+ sname, lu_object_fid(&dt->do_lu));
+ if (rc)
+ GOTO(out, rc);
+
+ linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+ linkea_buf.lb_len = ldata.ld_leh->leh_len;
+ rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
+ XATTR_NAME_LINK, 0, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, next,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)stripe_name,
+ th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_ref_add(env, next, th);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ if (lo->ldo_stripe)
+ OBD_FREE(lo->ldo_stripe,
+ sizeof(*stripe) * lo->ldo_dir_stripes_allocated);
+ lo->ldo_stripe = stripe;
+ lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count;
+ lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
+ lo->ldo_dir_stripe_count += stripe_count;
+ lo->ldo_dir_stripes_allocated += stripe_count;
+ lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
+
+ RETURN(0);
+out:
+ i = lo->ldo_dir_stripe_count;
+ while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i])
+ dt_object_put(env, stripe[i++]);
+
+ OBD_FREE(stripe,
+ sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count));
+ RETURN(rc);
+}
+
+static int lod_dir_declare_layout_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct lmv_user_md *lmu = buf->lb_buf;
+ __u32 final_stripe_count;
+ char *stripe_name = info->lti_key;
+ struct dt_object *dto;
+ int i;
+ int rc = 0;
+
+ if (!lmu)
+ return -EINVAL;
+
+ final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+ if (final_stripe_count >= lo->ldo_dir_stripe_count)
+ return -EINVAL;
+
+ for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+ dto = lo->ldo_stripe[i];
+ LASSERT(dto);
+
+ if (!dt_try_as_dir(env, dto))
+ return -ENOTDIR;
+
+ rc = lod_sub_declare_delete(env, dto,
+ (const struct dt_key *)dot, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_ref_del(env, dto, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_delete(env, dto,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ return rc;
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+ PFID(lu_object_fid(&dto->do_lu)), i);
+
+ rc = lod_sub_declare_delete(env, next,
+ (const struct dt_key *)stripe_name, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_ref_del(env, next, th);
+ if (rc)
+ return rc;
+ }
+
+ return 0;
+}
+
+/*
+ * delete stripes from dir master object, the lum_stripe_count in argument is
+ * the final stripe count, the stripes after that will be deleted, NB, they
+ * are not destroyed, but deleted from it's parent namespace, this function
+ * will be called in two places:
+ * 1. mdd_migrate_create() delete stripes from source, and append them to
+ * target.
+ * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them.
+ */
+static int lod_dir_layout_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct lmv_user_md *lmu = buf->lb_buf;
+ __u32 final_stripe_count;
+ char *stripe_name = info->lti_key;
+ struct dt_object *dto;
+ int i;
+ int rc = 0;
+
+ ENTRY;
+
+ if (!lmu)
+ RETURN(-EINVAL);
+
+ final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+ if (final_stripe_count >= lo->ldo_dir_stripe_count)
+ RETURN(-EINVAL);
+
+ for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+ dto = lo->ldo_stripe[i];
+ LASSERT(dto);
+
+ rc = lod_sub_delete(env, dto,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ break;
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+ PFID(lu_object_fid(&dto->do_lu)), i);
+
+ rc = lod_sub_delete(env, next,
+ (const struct dt_key *)stripe_name, th);
+ if (rc)
+ break;
+
+ rc = lod_sub_ref_del(env, next, th);
+ if (rc)
+ break;
+ }
+
+ lod_striping_free(env, lod_dt_obj(dt));
+
+ RETURN(rc);
+}
+
+/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
* Used with regular (non-striped) objects. Basically it
RETURN(0);
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
struct lod_obj_stripe_cb_data *data)
{
struct lod_thread_info *info = lod_env_info(env);
+ struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx];
struct filter_fid *ff = &info->lti_ff;
struct lu_buf *buf = &info->lti_buf;
int rc;
buf->lb_buf = ff;
buf->lb_len = sizeof(*ff);
rc = dt_xattr_get(env, dt, buf, XATTR_NAME_FID);
- if (rc == -ENODATA)
- return 0;
-
- if (rc < 0)
+ if (rc < 0) {
+ if (rc == -ENODATA)
+ return 0;
return rc;
+ }
+
+ /*
+ * locd_buf is set if it's called by dir migration, which doesn't check
+ * pfid and comp id.
+ */
+ if (data->locd_buf) {
+ memset(ff, 0, sizeof(*ff));
+ ff->ff_parent = *(struct lu_fid *)data->locd_buf->lb_buf;
+ } else {
+ filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
- ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu);
+ if (lu_fid_eq(lod_object_fid(lo), &ff->ff_parent) &&
+ ff->ff_layout.ol_comp_id == comp->llc_id)
+ return 0;
+
+ memset(ff, 0, sizeof(*ff));
+ ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu);
+ }
+
+ /* rewrite filter_fid */
ff->ff_parent.f_ver = stripe_idx;
- fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent);
+ ff->ff_layout.ol_stripe_size = comp->llc_stripe_size;
+ ff->ff_layout.ol_stripe_count = comp->llc_stripe_count;
+ ff->ff_layout.ol_comp_id = comp->llc_id;
+ ff->ff_layout.ol_comp_start = comp->llc_extent.e_start;
+ ff->ff_layout.ol_comp_end = comp->llc_extent.e_end;
+ filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
+
if (data->locd_declare)
rc = lod_sub_declare_xattr_set(env, dt, buf, XATTR_NAME_FID,
LU_XATTR_REPLACE, th);
*/
static int lod_replace_parent_fid(const struct lu_env *env,
struct dt_object *dt,
+ const struct lu_buf *buf,
struct thandle *th, bool declare)
{
struct lod_object *lo = lod_dt_obj(dt);
struct lod_thread_info *info = lod_env_info(env);
- struct lu_buf *buf = &info->lti_buf;
struct filter_fid *ff;
struct lod_obj_stripe_cb_data data = { { 0 } };
int rc;
LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
RETURN(rc);
}
- buf->lb_buf = info->lti_ea_store;
- buf->lb_len = info->lti_ea_store_size;
-
data.locd_declare = declare;
data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb;
+ data.locd_buf = buf;
rc = lod_obj_for_each_stripe(env, lo, th, &data);
RETURN(rc);
LASSERT(lo->ldo_is_composite);
- if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+ if (lo->ldo_flr_state != LCM_FL_NONE)
RETURN(-EBUSY);
rc = lod_verify_striping(d, lo, buf, false);
lod_comp->llc_flags = comp_v1->lcm_entries[i].lcme_flags;
lod_comp->llc_stripe_count = v1->lmm_stripe_count;
- if (!lod_comp->llc_stripe_count ||
- lod_comp->llc_stripe_count == (__u16)-1)
- lod_comp->llc_stripe_count =
- desc->ld_default_stripe_count;
lod_comp->llc_stripe_size = v1->lmm_stripe_size;
- if (!lod_comp->llc_stripe_size)
- lod_comp->llc_stripe_size =
- desc->ld_default_stripe_size;
+ lod_adjust_stripe_info(lod_comp, desc);
if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
v3 = (struct lov_user_md_v3 *) v1;
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
struct lod_object *lo = lod_dt_obj(dt);
struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
- __u32 magic, id;
+ __u32 magic;
int i, j, rc;
bool changed = false;
ENTRY;
}
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
- id = comp_v1->lcm_entries[i].lcme_id;
+ __u32 id = comp_v1->lcm_entries[i].lcme_id;
+ __u32 flags = comp_v1->lcm_entries[i].lcme_flags;
+ __u32 mirror_flag = flags & LCME_MIRROR_FLAGS;
+ bool neg = flags & LCME_FL_NEG;
+
+ if (flags & LCME_FL_INIT) {
+ if (changed)
+ lod_striping_free(env, lo);
+ RETURN(-EINVAL);
+ }
+ flags &= ~(LCME_MIRROR_FLAGS | LCME_FL_NEG);
for (j = 0; j < lo->ldo_comp_cnt; j++) {
lod_comp = &lo->ldo_comp_entries[j];
- if (id == lod_comp->llc_id || id == LCME_ID_ALL) {
- lod_comp->llc_flags =
- comp_v1->lcm_entries[i].lcme_flags;
- changed = true;
+
+ /* lfs only put one flag in each entry */
+ if ((flags && id != lod_comp->llc_id) ||
+ (mirror_flag && mirror_id_of(id) !=
+ mirror_id_of(lod_comp->llc_id)))
+ continue;
+
+ if (neg) {
+ if (flags)
+ lod_comp->llc_flags &= ~flags;
+ if (mirror_flag)
+ lod_comp->llc_flags &= ~mirror_flag;
+ } else {
+ if (flags)
+ lod_comp->llc_flags |= flags;
+ if (mirror_flag) {
+ lod_comp->llc_flags |= mirror_flag;
+ if (mirror_flag & LCME_FL_NOSYNC)
+ lod_comp->llc_timestamp =
+ ktime_get_real_seconds();
+ }
}
+ changed = true;
}
}
lod_obj_inc_layout_gen(lo);
info->lti_buf.lb_len = lod_comp_md_size(lo, false);
- rc = lod_sub_declare_xattr_set(env, dt, &info->lti_buf,
- XATTR_NAME_LOV, 0, th);
+ rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), &info->lti_buf,
+ XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
RETURN(rc);
}
LASSERT(lo->ldo_is_composite);
- if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+ if (lo->ldo_flr_state != LCM_FL_NONE)
RETURN(-EBUSY);
magic = comp_v1->lcm_magic;
RETURN(-EINVAL);
}
+ if (id == LCME_ID_INVAL && !flags) {
+ CDEBUG(D_LAYOUT, "%s: no id or flags specified.\n",
+ lod2obd(d)->obd_name);
+ RETURN(-EINVAL);
+ }
+
if (flags & LCME_FL_NEG) {
neg_flags = flags & ~LCME_FL_NEG;
flags = 0;
{
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
struct lod_object *lo = lod_dt_obj(dt);
- struct dt_object *next = dt_object_child(&lo->ldo_obj);
char *op;
int rc, len = strlen(XATTR_LUSTRE_LOV);
ENTRY;
}
len++;
- dt_write_lock(env, next, 0);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(unlock, rc);
}
unlock:
if (rc)
- lod_object_free_striping(env, lo);
- dt_write_unlock(env, next);
+ lod_striping_free(env, lo);
RETURN(rc);
}
/**
- * Merge layouts to form a mirrored file.
+ * Convert a plain file lov_mds_md to a composite layout.
+ *
+ * \param[in,out] info the thread info::lti_ea_store buffer contains little
+ * endian plain file layout
+ *
+ * \retval 0 on success, <0 on failure
*/
-static int lod_declare_layout_merge(const struct lu_env *env,
- struct dt_object *dt, const struct lu_buf *mbuf,
- struct thandle *th)
+static int lod_layout_convert(struct lod_thread_info *info)
{
- struct lod_thread_info *info = lod_env_info(env);
- struct lu_buf *buf = &info->lti_buf;
- struct lod_object *lo = lod_dt_obj(dt);
- struct lov_comp_md_v1 *lcm;
- struct lov_comp_md_v1 *cur_lcm;
- struct lov_comp_md_v1 *merge_lcm;
- struct lov_comp_md_entry_v1 *lcme;
- size_t size = 0;
- size_t offset;
- __u16 cur_entry_count;
- __u16 merge_entry_count;
- __u32 id = 0;
- __u16 mirror_id = 0;
- __u32 mirror_count;
+ struct lov_mds_md *lmm = info->lti_ea_store;
+ struct lov_mds_md *lmm_save;
+ struct lov_comp_md_v1 *lcm;
+ struct lov_comp_md_entry_v1 *lcme;
+ size_t size;
+ __u32 blob_size;
+ int rc = 0;
+ ENTRY;
+
+ /* realloc buffer to a composite layout which contains one component */
+ blob_size = lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
+ le32_to_cpu(lmm->lmm_magic));
+ size = sizeof(*lcm) + sizeof(*lcme) + blob_size;
+
+ OBD_ALLOC_LARGE(lmm_save, blob_size);
+ if (!lmm_save)
+ GOTO(out, rc = -ENOMEM);
+
+ memcpy(lmm_save, lmm, blob_size);
+
+ if (info->lti_ea_store_size < size) {
+ rc = lod_ea_store_resize(info, size);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ lcm = info->lti_ea_store;
+ lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
+ lcm->lcm_size = cpu_to_le32(size);
+ lcm->lcm_layout_gen = cpu_to_le32(le16_to_cpu(
+ lmm_save->lmm_layout_gen));
+ lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE);
+ lcm->lcm_entry_count = cpu_to_le16(1);
+ lcm->lcm_mirror_count = 0;
+
+ lcme = &lcm->lcm_entries[0];
+ lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT);
+ lcme->lcme_extent.e_start = 0;
+ lcme->lcme_extent.e_end = cpu_to_le64(OBD_OBJECT_EOF);
+ lcme->lcme_offset = cpu_to_le32(sizeof(*lcm) + sizeof(*lcme));
+ lcme->lcme_size = cpu_to_le32(blob_size);
+
+ memcpy((char *)lcm + lcme->lcme_offset, (char *)lmm_save, blob_size);
+
+ EXIT;
+out:
+ if (lmm_save)
+ OBD_FREE_LARGE(lmm_save, blob_size);
+ return rc;
+}
+
+/**
+ * Merge layouts to form a mirrored file.
+ */
+static int lod_declare_layout_merge(const struct lu_env *env,
+ struct dt_object *dt, const struct lu_buf *mbuf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_buf *buf = &info->lti_buf;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lov_comp_md_v1 *lcm;
+ struct lov_comp_md_v1 *cur_lcm;
+ struct lov_comp_md_v1 *merge_lcm;
+ struct lov_comp_md_entry_v1 *lcme;
+ size_t size = 0;
+ size_t offset;
+ __u16 cur_entry_count;
+ __u16 merge_entry_count;
+ __u32 id = 0;
+ __u16 mirror_id = 0;
+ __u32 mirror_count;
int rc, i;
ENTRY;
RETURN(rc ? : -ENODATA);
cur_lcm = info->lti_ea_store;
- if (le32_to_cpu(cur_lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
- RETURN(-EINVAL);
+ switch (le32_to_cpu(cur_lcm->lcm_magic)) {
+ case LOV_MAGIC_V1:
+ case LOV_MAGIC_V3:
+ rc = lod_layout_convert(info);
+ break;
+ case LOV_MAGIC_COMP_V1:
+ rc = 0;
+ break;
+ default:
+ rc = -EINVAL;
+ }
+ if (rc)
+ RETURN(rc);
+ /* info->lti_ea_store could be reallocated in lod_layout_convert() */
+ cur_lcm = info->lti_ea_store;
cur_entry_count = le16_to_cpu(cur_lcm->lcm_entry_count);
/* 'lcm_mirror_count + 1' is the current # of mirrors the file has */
offset += le32_to_cpu(lcme->lcme_size);
- if (mirror_count == 1) {
- /* new mirrored file, create new mirror ID */
+ if (mirror_count == 1 &&
+ mirror_id_of(le32_to_cpu(lcme->lcme_id)) == 0) {
+ /* Add mirror from a non-flr file, create new mirror ID.
+ * Otherwise, keep existing mirror's component ID, used
+ * for mirror extension.
+ */
id = pflr_id(1, i + 1);
lcme->lcme_id = cpu_to_le32(id);
}
lcm->lcm_size = cpu_to_le32(size);
lcm->lcm_entry_count = cpu_to_le16(cur_entry_count + merge_entry_count);
lcm->lcm_mirror_count = cpu_to_le16(mirror_count);
- if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NOT_FLR)
+ if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE)
lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY);
- LASSERT(dt_write_locked(env, dt_object_child(dt)));
- lod_object_free_striping(env, lo);
- rc = lod_parse_striping(env, lo, buf);
+ rc = lod_striping_reload(env, lo, buf);
if (rc)
GOTO(out, rc);
}
/**
+ * Split layouts, just set the LOVEA with the layout from mbuf.
+ */
+static int lod_declare_layout_split(const struct lu_env *env,
+ struct dt_object *dt, const struct lu_buf *mbuf,
+ struct thandle *th)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lov_comp_md_v1 *lcm = mbuf->lb_buf;
+ int rc;
+ ENTRY;
+
+ lod_obj_inc_layout_gen(lo);
+ lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen);
+
+ rc = lod_striping_reload(env, lo, mbuf);
+ if (rc)
+ RETURN(rc);
+
+ rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf,
+ XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
+ RETURN(rc);
+}
+
+/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
* \see dt_object_operations::do_declare_xattr_set() in the API description
mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
if ((S_ISREG(mode) || mode == 0) &&
- !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE)) &&
+ !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) &&
(strcmp(name, XATTR_NAME_LOV) == 0 ||
strcmp(name, XATTR_LUSTRE_LOV) == 0)) {
/*
LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
strcmp(name, XATTR_LUSTRE_LOV) == 0);
rc = lod_declare_layout_merge(env, dt, buf, th);
+ } else if (fl & LU_XATTR_SPLIT) {
+ LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
+ strcmp(name, XATTR_LUSTRE_LOV) == 0);
+ rc = lod_declare_layout_split(env, dt, buf, th);
} else if (S_ISREG(mode) &&
strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 &&
strncmp(name, XATTR_LUSTRE_LOV,
RETURN(-ENOENT);
rc = lod_declare_modify_layout(env, dt, name, buf, th);
+ } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
+ strlen(name) > strlen(XATTR_NAME_LMV) + 1) {
+ const char *op = name + strlen(XATTR_NAME_LMV) + 1;
+
+ rc = -ENOTSUPP;
+ if (strcmp(op, "add") == 0)
+ rc = lod_dir_declare_layout_add(env, dt, buf, th);
+ else if (strcmp(op, "del") == 0)
+ rc = lod_dir_declare_layout_delete(env, dt, buf, th);
+ else if (strcmp(op, "set") == 0)
+ rc = lod_sub_declare_xattr_set(env, next, buf,
+ XATTR_NAME_LMV, fl, th);
+
+ RETURN(rc);
} else if (S_ISDIR(mode)) {
rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
} else if (strcmp(name, XATTR_NAME_FID) == 0) {
- rc = lod_replace_parent_fid(env, dt, th, true);
+ rc = lod_replace_parent_fid(env, dt, buf, th, true);
} else {
rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th);
}
lum = buf->lb_buf;
switch (lum->lmm_magic) {
+ case LOV_USER_MAGIC_SPECIFIC:
case LOV_USER_MAGIC_V3:
v3 = buf->lb_buf;
if (v3->lmm_pool_name[0] != '\0')
rec->rec_type = S_IFDIR;
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- struct dt_object *dto;
- char *stripe_name = info->lti_key;
- struct lu_name *sname;
- struct linkea_data ldata = { NULL };
- struct lu_buf linkea_buf;
-
- dto = lo->ldo_stripe[i];
+ struct dt_object *dto = lo->ldo_stripe[i];
+ char *stripe_name = info->lti_key;
+ struct lu_name *sname;
+ struct linkea_data ldata = { NULL };
+ struct lu_buf linkea_buf;
+
+ /* if it's source stripe of migrating directory, don't create */
+ if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
+ i >= lo->ldo_dir_migrate_offset)) {
+ dt_write_lock(env, dto, MOR_TGT_CHILD);
+ rc = lod_sub_create(env, dto, attr, NULL, dof, th);
+ if (rc != 0) {
+ dt_write_unlock(env, dto);
+ GOTO(out, rc);
+ }
- dt_write_lock(env, dto, MOR_TGT_CHILD);
- rc = lod_sub_create(env, dto, attr, NULL, dof, th);
- if (rc != 0) {
+ rc = lod_sub_ref_add(env, dto, th);
dt_write_unlock(env, dto);
- GOTO(out, rc);
- }
-
- rc = lod_sub_ref_add(env, dto, th);
- dt_write_unlock(env, dto);
- if (rc != 0)
- GOTO(out, rc);
+ if (rc != 0)
+ GOTO(out, rc);
- rec->rec_fid = lu_object_fid(&dto->do_lu);
- rc = lod_sub_insert(env, dto, (const struct dt_rec *)rec,
- (const struct dt_key *)dot, th, 0);
- if (rc != 0)
- GOTO(out, rc);
+ rec->rec_fid = lu_object_fid(&dto->do_lu);
+ rc = lod_sub_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th);
+ if (rc != 0)
+ GOTO(out, rc);
+ }
rec->rec_fid = lu_object_fid(&dt->do_lu);
rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
- (const struct dt_key *)dotdot, th, 0);
+ (const struct dt_key *)dotdot, th);
if (rc != 0)
GOTO(out, rc);
rec->rec_fid = lu_object_fid(&dto->do_lu);
rc = lod_sub_insert(env, dt_object_child(dt),
(const struct dt_rec *)rec,
- (const struct dt_key *)stripe_name, th, 0);
+ (const struct dt_key *)stripe_name, th);
if (rc != 0)
GOTO(out, rc);
* \param[in] env execution environment
* \param[in] dt object
* \param[in] attr attributes the stripes will be created with
+ * \param[in] lmu lmv_user_md if MDT indices are specified
* \param[in] dof format of stripes (see OSD API description)
* \param[in] th transaction handle
* \param[in] declare where to call "declare" or "execute" methods
static int lod_dir_striping_create_internal(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
+ const struct lu_buf *lmu,
struct dt_object_format *dof,
struct thandle *th,
bool declare)
if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripe_count,
lo->ldo_dir_stripe_offset)) {
- struct lmv_user_md_v1 *v1 = info->lti_ea_store;
- int stripe_count = lo->ldo_dir_stripe_count;
+ if (!lmu) {
+ struct lmv_user_md_v1 *v1 = info->lti_ea_store;
+ int stripe_count = lo->ldo_dir_stripe_count;
- if (info->lti_ea_store_size < sizeof(*v1)) {
- rc = lod_ea_store_resize(info, sizeof(*v1));
- if (rc != 0)
- RETURN(rc);
- v1 = info->lti_ea_store;
- }
+ if (info->lti_ea_store_size < sizeof(*v1)) {
+ rc = lod_ea_store_resize(info, sizeof(*v1));
+ if (rc != 0)
+ RETURN(rc);
+ v1 = info->lti_ea_store;
+ }
- memset(v1, 0, sizeof(*v1));
- v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
- v1->lum_stripe_count = cpu_to_le32(stripe_count);
- v1->lum_stripe_offset =
- cpu_to_le32(lo->ldo_dir_stripe_offset);
+ memset(v1, 0, sizeof(*v1));
+ v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+ v1->lum_stripe_count = cpu_to_le32(stripe_count);
+ v1->lum_stripe_offset =
+ cpu_to_le32(lo->ldo_dir_stripe_offset);
- info->lti_buf.lb_buf = v1;
- info->lti_buf.lb_len = sizeof(*v1);
+ info->lti_buf.lb_buf = v1;
+ info->lti_buf.lb_len = sizeof(*v1);
+ lmu = &info->lti_buf;
+ }
if (declare)
- rc = lod_declare_xattr_set_lmv(env, dt, attr,
- &info->lti_buf, dof, th);
+ rc = lod_declare_xattr_set_lmv(env, dt, attr, lmu, dof,
+ th);
else
- rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
- XATTR_NAME_LMV, 0, th);
+ rc = lod_xattr_set_lmv(env, dt, lmu, XATTR_NAME_LMV, 0,
+ th);
if (rc != 0)
RETURN(rc);
}
static int lod_declare_dir_striping_create(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
+ struct lu_buf *lmu,
struct dt_object_format *dof,
struct thandle *th)
{
- return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
+ return lod_dir_striping_create_internal(env, dt, attr, lmu, dof, th,
+ true);
}
static int lod_dir_striping_create(const struct lu_env *env,
struct dt_object_format *dof,
struct thandle *th)
{
- return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
+ return lod_dir_striping_create_internal(env, dt, attr, NULL, dof, th,
+ false);
}
/**
LASSERT(lo);
if (lo->ldo_comp_cnt == 0) {
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
RETURN(rc);
}
OBD_FREE(lod_comp->llc_stripe, sizeof(struct dt_object *) *
lod_comp->llc_stripes_allocated);
lod_comp->llc_stripe = NULL;
+ OBD_FREE(lod_comp->llc_ost_indices,
+ sizeof(__u32) * lod_comp->llc_stripes_allocated);
+ lod_comp->llc_ost_indices = NULL;
lod_comp->llc_stripes_allocated = 0;
lod_obj_set_pool(lo, i, NULL);
if (lod_comp->llc_ostlist.op_array) {
EXIT;
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
return rc;
}
+
+static int lod_get_default_lov_striping(const struct lu_env *env,
+ struct lod_object *lo,
+ struct lod_default_striping *lds);
/**
* Implementation of dt_object_operations::do_xattr_set.
*
if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_LMV) == 0) {
- struct lmv_mds_md_v1 *lmm = buf->lb_buf;
+ rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+ RETURN(rc);
+ } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+ strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
+ strlen(name) > strlen(XATTR_NAME_LMV) + 1) {
+ const char *op = name + strlen(XATTR_NAME_LMV) + 1;
- if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
- LMV_HASH_FLAG_MIGRATION)
- rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
- else
- rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+ rc = -ENOTSUPP;
+ /*
+ * XATTR_NAME_LMV".add" is never called, but only declared,
+ * because lod_xattr_set_lmv() will do the addition.
+ */
+ if (strcmp(op, "del") == 0)
+ rc = lod_dir_layout_delete(env, dt, buf, th);
+ else if (strcmp(op, "set") == 0)
+ rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV,
+ fl, th);
RETURN(rc);
- }
-
- if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+ } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_LOV) == 0) {
- /* default LOVEA */
- rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th);
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_default_striping *lds = &info->lti_def_striping;
+ struct lov_user_md_v1 *v1 = buf->lb_buf;
+ char pool[LOV_MAXPOOLNAME + 1];
+ bool is_del;
+
+ /* get existing striping config */
+ rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds);
+ if (rc)
+ RETURN(rc);
+
+ memset(pool, 0, sizeof(pool));
+ if (lds->lds_def_striping_set == 1)
+ lod_layout_get_pool(lds->lds_def_comp_entries,
+ lds->lds_def_comp_cnt, pool,
+ sizeof(pool));
+
+ is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size,
+ v1->lmm_stripe_count,
+ v1->lmm_stripe_offset,
+ NULL);
+
+ /* Retain the pool name if it is not given */
+ if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' &&
+ !is_del) {
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lov_user_md_v3 *v3 = info->lti_ea_store;
+
+ memset(v3, 0, sizeof(*v3));
+ v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
+ v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern);
+ v3->lmm_stripe_count =
+ cpu_to_le32(v1->lmm_stripe_count);
+ v3->lmm_stripe_offset =
+ cpu_to_le32(v1->lmm_stripe_offset);
+ v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size);
+
+ strlcpy(v3->lmm_pool_name, pool,
+ sizeof(v3->lmm_pool_name));
+
+ info->lti_buf.lb_buf = v3;
+ info->lti_buf.lb_len = sizeof(*v3);
+ rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
+ name, fl, th);
+ } else {
+ rc = lod_xattr_set_lov_on_dir(env, dt, buf, name,
+ fl, th);
+ }
+
+ if (lds->lds_def_striping_set == 1 &&
+ lds->lds_def_comp_entries != NULL)
+ lod_free_def_comp_entries(lds);
+
RETURN(rc);
} else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
* defines striping, then create() does the work */
if (fl & LU_XATTR_REPLACE) {
/* free stripes, then update disk */
- lod_object_free_striping(env, lod_dt_obj(dt));
+ lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
} else if (dt_object_remote(dt)) {
}
RETURN(rc);
} else if (strcmp(name, XATTR_NAME_FID) == 0) {
- rc = lod_replace_parent_fid(env, dt, th, false);
+ rc = lod_replace_parent_fid(env, dt, buf, th, false);
RETURN(rc);
}
struct dt_object *dt, const char *name,
struct thandle *th)
{
- struct lod_object *lo = lod_dt_obj(dt);
- int rc;
- int i;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ int i;
+ int rc;
ENTRY;
- rc = lod_sub_declare_xattr_del(env, dt_object_child(dt), name, th);
+ rc = lod_sub_declare_xattr_del(env, next, name, th);
if (rc != 0)
RETURN(rc);
RETURN(0);
/* set xattr to each stripes, if needed */
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
RETURN(rc);
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
- rc = lod_sub_declare_xattr_del(env, lo->ldo_stripe[i],
- name, th);
+ struct dt_object *dto = lo->ldo_stripe[i];
+
+ LASSERT(dto);
+ rc = lod_sub_declare_xattr_del(env, dto, name, th);
if (rc != 0)
break;
}
int i;
ENTRY;
- if (!strcmp(name, XATTR_NAME_LOV))
- lod_object_free_striping(env, lod_dt_obj(dt));
+ if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV))
+ lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_del(env, next, name, th);
if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
+ struct dt_object *dto = lo->ldo_stripe[i];
- rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th);
+ LASSERT(dto);
+
+ rc = lod_sub_xattr_del(env, dto, name, th);
if (rc != 0)
break;
}
return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
}
+/**
+ * Copy OST list from layout provided by user.
+ *
+ * \param[in] lod_comp layout_component to be filled
+ * \param[in] v3 LOV EA V3 user data
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp,
+ struct lov_user_md_v3 *v3)
+{
+ int j;
+
+ ENTRY;
+
+ if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT)
+ v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx;
+
+ if (lod_comp->llc_ostlist.op_array) {
+ if (lod_comp->llc_ostlist.op_size >=
+ v3->lmm_stripe_count * sizeof(__u32)) {
+ lod_comp->llc_ostlist.op_count =
+ v3->lmm_stripe_count;
+ goto skip;
+ }
+ OBD_FREE(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ }
+
+ /* copy ost list from lmm */
+ lod_comp->llc_ostlist.op_count = v3->lmm_stripe_count;
+ lod_comp->llc_ostlist.op_size = v3->lmm_stripe_count * sizeof(__u32);
+ OBD_ALLOC(lod_comp->llc_ostlist.op_array,
+ lod_comp->llc_ostlist.op_size);
+ if (!lod_comp->llc_ostlist.op_array)
+ RETURN(-ENOMEM);
+skip:
+ for (j = 0; j < v3->lmm_stripe_count; j++) {
+ lod_comp->llc_ostlist.op_array[j] =
+ v3->lmm_objects[j].l_ost_idx;
+ }
+
+ RETURN(0);
+}
+
/**
* Get default striping.
__u16 comp_cnt;
__u16 mirror_cnt;
bool composite;
- int rc, i;
+ int rc, i, j;
ENTRY;
lds->lds_def_striping_set = 0;
} else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
v3 = (struct lov_user_md_v3 *)v1;
lustre_swab_lov_user_md_v3(v3);
+ } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) {
+ v3 = (struct lov_user_md_v3 *)v1;
+ lustre_swab_lov_user_md_v3(v3);
+ lustre_swab_lov_user_md_objects(v3->lmm_objects,
+ v3->lmm_stripe_count);
} else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
comp_v1 = (struct lov_comp_md_v1 *)v1;
lustre_swab_lov_comp_md_v1(comp_v1);
}
if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 &&
- v1->lmm_magic != LOV_MAGIC_COMP_V1)
+ v1->lmm_magic != LOV_MAGIC_COMP_V1 &&
+ v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC)
RETURN(-ENOTSUPP);
if (v1->lmm_magic == LOV_MAGIC_COMP_V1) {
for (i = 0; i < comp_cnt; i++) {
struct lod_layout_component *lod_comp;
- struct lu_extent *ext;
char *pool;
lod_comp = &lds->lds_def_comp_entries[i];
if (composite) {
v1 = (struct lov_user_md *)((char *)comp_v1 +
comp_v1->lcm_entries[i].lcme_offset);
- ext = &comp_v1->lcm_entries[i].lcme_extent;
- lod_comp->llc_extent = *ext;
+ lod_comp->llc_extent =
+ comp_v1->lcm_entries[i].lcme_extent;
+ /* We only inherit certain flags from the layout */
+ lod_comp->llc_flags =
+ comp_v1->lcm_entries[i].lcme_flags &
+ LCME_TEMPLATE_FLAGS;
}
if (v1->lmm_pattern != LOV_PATTERN_RAID0 &&
pool = v3->lmm_pool_name;
}
lod_set_def_pool(lds, i, pool);
+ if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
+ v3 = (struct lov_user_md_v3 *)v1;
+ rc = lod_comp_copy_ost_lists(lod_comp, v3);
+ if (rc)
+ RETURN(rc);
+ } else if (lod_comp->llc_ostlist.op_array &&
+ lod_comp->llc_ostlist.op_count) {
+ for (j = 0; j < lod_comp->llc_ostlist.op_count; j++)
+ lod_comp->llc_ostlist.op_array[j] = -1;
+ lod_comp->llc_ostlist.op_count = 0;
+ }
}
lds->lds_def_striping_set = 1;
return;
lo->ldo_is_composite = lds->lds_def_striping_is_composite;
+ if (lds->lds_def_mirror_cnt > 1)
+ lo->ldo_flr_state = LCM_FL_RDONLY;
for (i = 0; i < lo->ldo_comp_cnt; i++) {
struct lod_layout_component *obj_comp =
struct lod_layout_component *def_comp =
&lds->lds_def_comp_entries[i];
- CDEBUG(D_LAYOUT, "Inherite from default: size:%hu "
- "nr:%u offset:%u pattern %#x %s\n",
+ CDEBUG(D_LAYOUT, "Inherit from default: flags=%#x "
+ "size=%hu nr=%u offset=%u pattern=%#x pool=%s\n",
+ def_comp->llc_flags,
def_comp->llc_stripe_size,
def_comp->llc_stripe_count,
def_comp->llc_stripe_offset,
lod_obj_set_pool(lo, i, def_comp->llc_pool);
}
+ /* copy ost list */
+ if (def_comp->llc_ostlist.op_array &&
+ def_comp->llc_ostlist.op_count) {
+ OBD_ALLOC(obj_comp->llc_ostlist.op_array,
+ obj_comp->llc_ostlist.op_size);
+ if (!obj_comp->llc_ostlist.op_array)
+ return;
+ memcpy(obj_comp->llc_ostlist.op_array,
+ def_comp->llc_ostlist.op_array,
+ obj_comp->llc_ostlist.op_size);
+ } else if (def_comp->llc_ostlist.op_array) {
+ obj_comp->llc_ostlist.op_array = NULL;
+ }
+
/*
* Don't initialize these fields for plain layout
* (v1/v3) here, they are inherited in the order of
if (!lo->ldo_is_composite)
continue;
- if (obj_comp->llc_stripe_count <= 0 &&
- obj_comp->llc_pattern != LOV_PATTERN_MDT)
- obj_comp->llc_stripe_count =
- desc->ld_default_stripe_count;
- if (obj_comp->llc_stripe_size <= 0)
- obj_comp->llc_stripe_size =
- desc->ld_default_stripe_size;
+ lod_adjust_stripe_info(obj_comp, desc);
}
} else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) {
if (lo->ldo_dir_stripe_count == 0)
nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
if (S_ISDIR(child_mode)) {
+ const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
+
/* other default values are 0 */
lc->ldo_dir_stripe_offset = -1;
lc->ldo_def_striping = lds;
/* It should always honour the specified stripes */
+ /* Note: old client (< 2.7)might also do lfs mkdir, whose EA
+ * will have old magic. In this case, we should ignore the
+ * stripe count and try to create dir by default stripe.
+ */
if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 &&
- lod_verify_md_striping(d, ah->dah_eadata) == 0) {
- const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
-
+ (le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC ||
+ le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC)) {
lc->ldo_dir_stripe_count =
le32_to_cpu(lum1->lum_stripe_count);
lc->ldo_dir_stripe_offset =
} else {
/* transfer defaults LMV to new directory */
lod_striping_from_default(lc, lds, child_mode);
+
+ /* set count 0 to create normal directory */
+ if (lc->ldo_dir_stripe_count == 1)
+ lc->ldo_dir_stripe_count = 0;
}
/* shrink the stripe_count to the avaible MDT count */
if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 &&
- !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) {
lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1;
-
- /* Directory will be striped only if stripe_count > 1, if
- * stripe_count == 1, let's reset stripe_count = 0 to avoid
- * create single master stripe and also help to unify the
- * stripe handling of directories and files */
- if (lc->ldo_dir_stripe_count == 1)
- lc->ldo_dir_stripe_count = 0;
+ if (lc->ldo_dir_stripe_count == 1)
+ lc->ldo_dir_stripe_count = 0;
+ }
CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n",
lc->ldo_dir_stripe_count,
LASSERT(!lc->ldo_is_composite);
lod_comp = &lc->ldo_comp_entries[0];
desc = &d->lod_desc;
- if (lod_comp->llc_stripe_count <= 0)
- lod_comp->llc_stripe_count =
- desc->ld_default_stripe_count;
- if (lod_comp->llc_stripe_size <= 0)
- lod_comp->llc_stripe_size =
- desc->ld_default_stripe_size;
+ lod_adjust_stripe_info(lod_comp, desc);
}
EXIT;
/* failed to create striping or to set initial size, let's reset
* config so that others don't get confused */
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
NULL, th);
} else if (dof->dof_type == DFT_DIR) {
struct seq_server_site *ss;
+ struct lu_buf buf = { NULL };
+ struct lu_buf *lmu = NULL;
ss = lu_site2seq(dt->do_lu.lo_dev->ld_site);
else
GOTO(out, rc = -EINVAL);
}
+ } else if (hint && hint->dah_eadata) {
+ lmu = &buf;
+ lmu->lb_buf = (void *)hint->dah_eadata;
+ lmu->lb_len = hint->dah_eadata_len;
}
- rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
+ rc = lod_declare_dir_striping_create(env, dt, attr, lmu, dof,
+ th);
}
out:
/* failed to create striping or to set initial size, let's reset
* config so that others don't get confused */
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL);
- mirror_id = lo->ldo_mirror_count > 1 ? 1 : 0;
+ mirror_id = 0; /* non-flr file's mirror_id is 0 */
+ if (lo->ldo_mirror_count > 1) {
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ lod_comp = &lo->ldo_comp_entries[i];
+ if (lod_comp->llc_id != LCME_ID_INVAL &&
+ mirror_id_of(lod_comp->llc_id) > mirror_id)
+ mirror_id = mirror_id_of(lod_comp->llc_id);
+ }
+ }
/* create all underlying objects */
for (i = 0; i < lo->ldo_comp_cnt; i++) {
lod_comp = &lo->ldo_comp_entries[i];
- if (lod_comp->llc_extent.e_start == 0 && i > 0) /* new mirror */
- ++mirror_id;
-
if (lod_comp->llc_id == LCME_ID_INVAL) {
+ /* only the component of FLR layout with more than 1
+ * mirror has mirror ID in its component ID.
+ */
+ if (lod_comp->llc_extent.e_start == 0 &&
+ lo->ldo_mirror_count > 1)
+ ++mirror_id;
+
lod_comp->llc_id = lod_gen_component_id(lo,
mirror_id, i);
if (lod_comp->llc_id == LCME_ID_INVAL)
RETURN(0);
out:
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
+static inline bool lod_obj_is_dom(struct dt_object *dt)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+
+ if (!dt_object_exists(dt_object_child(dt)))
+ return false;
+
+ if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ return false;
+
+ if (!lo->ldo_comp_cnt)
+ return false;
+
+ return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) ==
+ LOV_PATTERN_MDT);
+}
+
/**
* Implementation of dt_object_operations::do_create.
*
RETURN(rc);
if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
- lod_obj_is_striped(dt) && dof->u.dof_reg.striped != 0) {
+ (lod_obj_is_striped(dt) || lod_obj_is_dom(dt)) &&
+ dof->u.dof_reg.striped != 0) {
LASSERT(lod_dt_obj(dt)->ldo_comp_cached == 0);
rc = lod_striped_create(env, dt, attr, dof, th);
}
* is being initialized as we don't need this information till
* few specific cases like destroy, chown
*/
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
RETURN(rc);
}
/**
- * Release LDLM locks on the stripes of a striped directory.
- *
- * Iterates over all the locks taken on the stripe objects and
- * cancel them.
- *
- * \param[in] env execution environment
- * \param[in] dt striped object
- * \param[in] einfo lock description
- * \param[in] policy data describing requested lock
- *
- * \retval 0 on success
- * \retval negative if failed
- */
-static int lod_object_unlock_internal(const struct lu_env *env,
- struct dt_object *dt,
- struct ldlm_enqueue_info *einfo,
- union ldlm_policy_data *policy)
-{
- struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
- int rc = 0;
- int i;
- ENTRY;
-
- if (slave_locks == NULL)
- RETURN(0);
-
- for (i = 1; i < slave_locks->count; i++) {
- if (lustre_handle_is_used(&slave_locks->handles[i]))
- ldlm_lock_decref_and_cancel(&slave_locks->handles[i],
- einfo->ei_mode);
- }
-
- RETURN(rc);
-}
-
-/**
* Implementation of dt_object_operations::do_object_unlock.
*
* Used to release LDLM lock(s).
RETURN(0);
LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
- LASSERT(lo->ldo_dir_stripe_count > 1);
/* Note: for remote lock for single stripe dir, MDT will cancel
* the lock by lockh directly */
LASSERT(!dt_object_remote(dt_object_child(dt)));
/* locks were unlocked in MDT layer */
- for (i = 1; i < slave_locks->count; i++) {
- LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
+ for (i = 0; i < slave_locks->ha_count; i++)
+ LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i]));
+
+ /*
+ * NB, ha_count may not equal to ldo_dir_stripe_count, because dir
+ * layout may change, e.g., shrink dir layout after migration.
+ */
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++)
dt_invalidate(env, lo->ldo_stripe[i]);
- }
- slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
- sizeof(slave_locks->handles[0]);
+ slave_locks_size = offsetof(typeof(*slave_locks),
+ ha_handles[slave_locks->ha_count]);
OBD_FREE(slave_locks, slave_locks_size);
einfo->ei_cbdata = NULL;
struct ldlm_enqueue_info *einfo,
union ldlm_policy_data *policy)
{
- struct lod_object *lo = lod_dt_obj(dt);
- int rc = 0;
- int i;
- int slave_locks_size;
+ struct lod_object *lo = lod_dt_obj(dt);
+ int slave_locks_size;
struct lustre_handle_array *slave_locks = NULL;
+ int i;
+ int rc;
ENTRY;
/* remote object lock */
}
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
- GOTO(out, rc = -ENOTDIR);
+ RETURN(-ENOTDIR);
- rc = lod_load_striping(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc != 0)
- GOTO(out, rc);
+ RETURN(rc);
/* No stripes */
- if (lo->ldo_dir_stripe_count <= 1) {
- /*
- * NB, ei_cbdata stores pointer to slave locks, if no locks
- * taken, make sure it's set to NULL, otherwise MDT will try to
- * unlock them.
- */
- einfo->ei_cbdata = NULL;
- GOTO(out, rc = 0);
- }
+ if (lo->ldo_dir_stripe_count <= 1)
+ RETURN(0);
- slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripe_count *
- sizeof(slave_locks->handles[0]);
+ slave_locks_size = offsetof(typeof(*slave_locks),
+ ha_handles[lo->ldo_dir_stripe_count]);
/* Freed in lod_object_unlock */
OBD_ALLOC(slave_locks, slave_locks_size);
- if (slave_locks == NULL)
- GOTO(out, rc = -ENOMEM);
- slave_locks->count = lo->ldo_dir_stripe_count;
+ if (!slave_locks)
+ RETURN(-ENOMEM);
+ slave_locks->ha_count = lo->ldo_dir_stripe_count;
/* striped directory lock */
- for (i = 1; i < lo->ldo_dir_stripe_count; i++) {
- struct lustre_handle lockh;
- struct ldlm_res_id *res_id;
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ struct lustre_handle lockh;
+ struct ldlm_res_id *res_id;
res_id = &lod_env_info(env)->lti_res_id;
fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
einfo->ei_res_id = res_id;
LASSERT(lo->ldo_stripe[i] != NULL);
- if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+ if (dt_object_remote(lo->ldo_stripe[i])) {
+ set_bit(i, (void *)slave_locks->ha_map);
rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
einfo, policy);
} else {
struct ldlm_namespace *ns = einfo->ei_namespace;
ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
ldlm_completion_callback completion = einfo->ei_cb_cp;
- __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
if (einfo->ei_mode == LCK_PW ||
einfo->ei_mode == LCK_EX)
dlmflags |= LDLM_FL_COS_INCOMPAT;
- /* This only happens if there are mulitple stripes
- * on the master MDT, i.e. except stripe0, there are
- * other stripes on the Master MDT as well, Only
- * happens in the test case right now. */
LASSERT(ns != NULL);
- rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+ rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS,
policy, einfo->ei_mode,
&dlmflags, blocking,
completion, NULL,
NULL, 0, LVB_T_NONE,
NULL, &lockh);
}
- if (rc != 0)
- break;
- slave_locks->handles[i] = lockh;
+ if (rc) {
+ while (i--)
+ ldlm_lock_decref_and_cancel(
+ &slave_locks->ha_handles[i],
+ einfo->ei_mode);
+ OBD_FREE(slave_locks, slave_locks_size);
+ RETURN(rc);
+ }
+ slave_locks->ha_handles[i] = lockh;
}
einfo->ei_cbdata = slave_locks;
- if (rc != 0 && slave_locks != NULL) {
- lod_object_unlock_internal(env, dt, einfo, policy);
- OBD_FREE(slave_locks, slave_locks_size);
- }
- EXIT;
-out:
- if (rc != 0)
- einfo->ei_cbdata = NULL;
- RETURN(rc);
+ RETURN(0);
}
/**
struct lod_object *lo, struct thandle *th)
{
struct lod_thread_info *info = lod_env_info(env);
- struct ost_pool *inuse = &info->lti_inuse_osts;
int i;
int rc = 0;
ENTRY;
LASSERT(info->lti_count < lo->ldo_comp_cnt);
- if (info->lti_count > 0) {
- /* Prepare inuse array for composite file */
- rc = lod_prepare_inuse(env, lo);
- if (rc)
- RETURN(rc);
- }
for (i = 0; i < info->lti_count; i++) {
rc = lod_qos_prep_create(env, lo, NULL, th,
- info->lti_comp_idx[i], inuse);
+ info->lti_comp_idx[i]);
if (rc)
break;
}
int i, rc;
ENTRY;
- LASSERT(lo->ldo_flr_state == LCM_FL_NOT_FLR);
+ LASSERT(lo->ldo_flr_state == LCM_FL_NONE);
/*
* In case the client is passing lovea, which only happens during
GOTO(out, rc = -EINVAL);
}
- lod_object_free_striping(env, lo);
rc = lod_use_defined_striping(env, lo, buf);
if (rc)
GOTO(out, rc);
+ lo->ldo_comp_cached = 1;
rc = lod_get_lov_ea(env, lo);
if (rc <= 0)
replay = true;
} else {
/* non replay path */
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(out, rc);
}
- if (layout->li_opc == LAYOUT_INTENT_TRUNC) {
- /**
- * trunc transfers [size, eof) in the intent extent, while
- * we'd instantiated components covers [0, size).
- */
- layout->li_extent.e_end = layout->li_extent.e_start;
- layout->li_extent.e_start = 0;
- }
-
/* Make sure defined layout covers the requested write range. */
lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1];
if (lo->ldo_comp_cnt > 1 &&
rc = lod_declare_instantiate_components(env, lo, th);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
-#define lod_foreach_mirror_comp(comp, lo, mirror_idx) \
-for (comp = &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_start]; \
- comp <= &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_end]; \
- comp++)
-
static inline int lod_comp_index(struct lod_object *lo,
struct lod_layout_component *lod_comp)
{
}
}
-static int lod_declare_update_rdonly(const struct lu_env *env,
- struct lod_object *lo, struct md_layout_change *mlc,
- struct thandle *th)
+/**
+ * check an OST's availability
+ * \param[in] env execution environment
+ * \param[in] lo lod object
+ * \param[in] dt dt object
+ * \param[in] index mirror index
+ *
+ * \retval negative if failed
+ * \retval 1 if \a dt is available
+ * \retval 0 if \a dt is not available
+ */
+static inline int lod_check_ost_avail(const struct lu_env *env,
+ struct lod_object *lo,
+ struct dt_object *dt, int index)
{
- struct lod_thread_info *info = lod_env_info(env);
- struct lu_attr *layout_attr = &info->lti_layout_attr;
- struct lod_layout_component *lod_comp;
- struct layout_intent *layout = mlc->mlc_intent;
- struct lu_extent extent = layout->li_extent;
- unsigned int seq = 0;
- int picked;
- int i;
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ struct lod_tgt_desc *ost;
+ __u32 idx;
+ int type = LU_SEQ_RANGE_OST;
int rc;
- ENTRY;
- LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE);
- LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY);
- LASSERT(lo->ldo_mirror_count > 0);
+ rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &idx, &type);
+ if (rc < 0) {
+ CERROR("%s: can't locate "DFID":rc = %d\n",
+ lod2obd(lod)->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+ rc);
+ return rc;
+ }
- CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n",
- PFID(lod_object_fid(lo)), PEXT(&extent));
+ ost = OST_TGT(lod, idx);
+ if (ost->ltd_statfs.os_state &
+ (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO |
+ OS_STATE_NOPRECREATE) ||
+ ost->ltd_active == 0) {
+ CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n",
+ PFID(lod_object_fid(lo)), index, idx, rc);
+ return 0;
+ }
+
+ return 1;
+}
+
+/**
+ * Pick primary mirror for write
+ * \param[in] env execution environment
+ * \param[in] lo object
+ * \param[in] extent write range
+ */
+static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo,
+ struct lu_extent *extent)
+{
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ unsigned int seq = 0;
+ struct lod_layout_component *lod_comp;
+ int i, j, rc;
+ int picked = -1, second_pick = -1, third_pick = -1;
+ ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) {
get_random_bytes(&seq, sizeof(seq));
}
/**
- * Pick a mirror as the primary.
- * Now it only picks the first mirror, this algo can be
- * revised later after knowing the topology of cluster or
- * the availability of OSTs.
+ * Pick a mirror as the primary, and check the availability of OSTs.
+ *
+ * This algo can be revised later after knowing the topology of
+ * cluster.
*/
- for (picked = -1, i = 0; i < lo->ldo_mirror_count; i++) {
+ lod_qos_statfs_update(env, lod);
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ bool ost_avail = true;
int index = (i + seq) % lo->ldo_mirror_count;
- if (!lo->ldo_mirrors[index].lme_stale) {
- picked = index;
- break;
+ if (lo->ldo_mirrors[index].lme_stale) {
+ CDEBUG(D_LAYOUT, DFID": mirror %d stale\n",
+ PFID(lod_object_fid(lo)), index);
+ continue;
}
- }
- if (picked < 0) /* failed to pick a primary */
- RETURN(-ENODATA);
- CDEBUG(D_LAYOUT, DFID": picked mirror %u as primary\n",
- PFID(lod_object_fid(lo)), lo->ldo_mirrors[picked].lme_id);
+ /* 2nd pick is for the primary mirror containing unavail OST */
+ if (lo->ldo_mirrors[index].lme_primary && second_pick < 0)
+ second_pick = index;
- /* stale overlapping components from other mirrors */
- lod_stale_components(lo, picked, &extent);
+ /* 3rd pick is for non-primary mirror containing unavail OST */
+ if (second_pick < 0 && third_pick < 0)
+ third_pick = index;
- /* instantiate components for the picked mirror, start from 0 */
- if (layout->li_opc == LAYOUT_INTENT_TRUNC) {
/**
- * trunc transfers [size, eof) in the intent extent, we'd
- * stale components overlapping [size, eof), while we'd
- * instantiated components covers [0, size).
+ * we found a non-primary 1st pick, we'd like to find a
+ * potential pirmary mirror.
*/
- extent.e_end = extent.e_start;
- }
- extent.e_start = 0;
+ if (picked >= 0 && !lo->ldo_mirrors[index].lme_primary)
+ continue;
- lod_foreach_mirror_comp(lod_comp, lo, picked) {
- if (!lu_extent_is_overlapped(&extent,
- &lod_comp->llc_extent))
+ /* check the availability of OSTs */
+ lod_foreach_mirror_comp(lod_comp, lo, index) {
+ if (!lod_comp_inited(lod_comp) || !lod_comp->llc_stripe)
+ continue;
+
+ for (j = 0; j < lod_comp->llc_stripe_count; j++) {
+ struct dt_object *dt = lod_comp->llc_stripe[j];
+
+ rc = lod_check_ost_avail(env, lo, dt, index);
+ if (rc < 0)
+ RETURN(rc);
+
+ ost_avail = !!rc;
+ if (!ost_avail)
+ break;
+ } /* for all dt object in one component */
+ if (!ost_avail)
+ break;
+ } /* for all components in a mirror */
+
+ /**
+ * the OSTs where allocated objects locates in the components
+ * of the mirror are available.
+ */
+ if (!ost_avail)
+ continue;
+
+ /* this mirror has all OSTs available */
+ picked = index;
+
+ /**
+ * primary with all OSTs are available, this is the perfect
+ * 1st pick.
+ */
+ if (lo->ldo_mirrors[index].lme_primary)
break;
+ } /* for all mirrors */
+
+ /* failed to pick a sound mirror, lower our expectation */
+ if (picked < 0)
+ picked = second_pick;
+ if (picked < 0)
+ picked = third_pick;
+ if (picked < 0)
+ RETURN(-ENODATA);
- if (lod_comp_inited(lod_comp))
+ RETURN(picked);
+}
+
+static int lod_prepare_resync_mirror(const struct lu_env *env,
+ struct lod_object *lo,
+ __u16 mirror_id)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_layout_component *lod_comp;
+ int i;
+
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (lo->ldo_mirrors[i].lme_id != mirror_id)
continue;
- CDEBUG(D_LAYOUT, "instantiate: %u / %u\n",
- i, lod_comp_index(lo, lod_comp));
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (lod_comp_inited(lod_comp))
+ continue;
- info->lti_comp_idx[info->lti_count++] =
- lod_comp_index(lo, lod_comp);
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * figure out the components should be instantiated for resync.
+ */
+static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo,
+ struct lu_extent *extent)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_layout_component *lod_comp;
+ unsigned int need_sync = 0;
+ int i;
+
+ CDEBUG(D_LAYOUT,
+ DFID": instantiate all stale components in "DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(extent));
+
+ /**
+ * instantiate all components within this extent, even non-stale
+ * components.
+ */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (!lo->ldo_mirrors[i].lme_stale)
+ continue;
+
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lu_extent_is_overlapped(extent,
+ &lod_comp->llc_extent))
+ break;
+
+ need_sync++;
+
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n",
+ i, lod_comp_index(lo, lod_comp));
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
}
- lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+ return need_sync ? 0 : -EALREADY;
+}
+
+static int lod_declare_update_rdonly(const struct lu_env *env,
+ struct lod_object *lo, struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_attr *layout_attr = &info->lti_layout_attr;
+ struct lod_layout_component *lod_comp;
+ struct lu_extent extent = { 0 };
+ int rc;
+ ENTRY;
+
+ LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY);
+ LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE ||
+ mlc->mlc_opc == MD_LAYOUT_RESYNC);
+ LASSERT(lo->ldo_mirror_count > 0);
+
+ if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+ struct layout_intent *layout = mlc->mlc_intent;
+ int picked;
+
+ extent = layout->li_extent;
+ CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(&extent));
+
+ picked = lod_primary_pick(env, lo, &extent);
+ if (picked < 0)
+ RETURN(picked);
+
+ CDEBUG(D_LAYOUT, DFID": picked mirror id %u as primary\n",
+ PFID(lod_object_fid(lo)),
+ lo->ldo_mirrors[picked].lme_id);
+
+ if (layout->li_opc == LAYOUT_INTENT_TRUNC) {
+ /**
+ * trunc transfers [0, size) in the intent extent, we'd
+ * stale components overlapping [size, eof).
+ */
+ extent.e_start = extent.e_end;
+ extent.e_end = OBD_OBJECT_EOF;
+ }
+
+ /* stale overlapping components from other mirrors */
+ lod_stale_components(lo, picked, &extent);
+
+ /* restore truncate intent extent */
+ if (layout->li_opc == LAYOUT_INTENT_TRUNC)
+ extent.e_end = extent.e_start;
+
+ /* instantiate components for the picked mirror, start from 0 */
+ extent.e_start = 0;
+
+ lod_foreach_mirror_comp(lod_comp, lo, picked) {
+ if (!lu_extent_is_overlapped(&extent,
+ &lod_comp->llc_extent))
+ break;
+
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+
+ lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+ } else { /* MD_LAYOUT_RESYNC */
+ int i;
+
+ /**
+ * could contain multiple non-stale mirrors, so we need to
+ * prep uninited all components assuming any non-stale mirror
+ * could be picked as the primary mirror.
+ */
+ if (mlc->mlc_mirror_id == 0) {
+ /* normal resync */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (lo->ldo_mirrors[i].lme_stale)
+ continue;
+
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lod_comp_inited(lod_comp))
+ break;
+
+ if (extent.e_end <
+ lod_comp->llc_extent.e_end)
+ extent.e_end =
+ lod_comp->llc_extent.e_end;
+ }
+ }
+ rc = lod_prepare_resync(env, lo, &extent);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ /* mirror write, try to init its all components */
+ rc = lod_prepare_resync_mirror(env, lo,
+ mlc->mlc_mirror_id);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ /* change the file state to SYNC_PENDING */
+ lo->ldo_flr_state = LCM_FL_SYNC_PENDING;
+ }
/* Reset the layout version once it's becoming too large.
* This way it can make sure that the layout version is
layout_attr->la_valid = LA_LAYOUT_VERSION;
layout_attr->la_layout_version = 0; /* set current version */
+ if (mlc->mlc_opc == MD_LAYOUT_RESYNC)
+ layout_attr->la_layout_version = LU_LAYOUT_RESYNC;
rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
if (rc)
GOTO(out, rc);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n",
PFID(lod_object_fid(lo)), PEXT(&extent));
+ if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) {
+ /**
+ * trunc transfers [0, size) in the intent extent, we'd
+ * stale components overlapping [size, eof).
+ */
+ extent.e_start = extent.e_end;
+ extent.e_end = OBD_OBJECT_EOF;
+ }
/* 1. stale overlapping components */
lod_stale_components(lo, primary, &extent);
/* 2. find out the components need instantiating.
* instantiate [0, mlc->mlc_intent->e_end) */
- if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) {
- /**
- * trunc transfers [size, eof) in the intent extent,
- * we'd stale components overlapping [size, eof),
- * while we'd instantiated components covers [0, size).
- */
+
+ /* restore truncate intent extent */
+ if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC)
extent.e_end = extent.e_start;
- }
extent.e_start = 0;
lod_foreach_mirror_comp(lod_comp, lo, primary) {
lod_comp_index(lo, lod_comp);
}
} else { /* MD_LAYOUT_RESYNC */
- /* figure out the components that have been instantiated in
- * in primary to decide what components should be instantiated
- * in stale mirrors */
- lod_foreach_mirror_comp(lod_comp, lo, primary) {
- if (!lod_comp_inited(lod_comp))
- break;
-
- extent.e_end = lod_comp->llc_extent.e_end;
- }
-
- CDEBUG(D_LAYOUT,
- DFID": instantiate all stale components in "DEXT"\n",
- PFID(lod_object_fid(lo)), PEXT(&extent));
-
- /* 1. instantiate all components within this extent, even
- * non-stale components so that it won't need to instantiate
- * those components for mirror truncate later. */
- for (i = 0; i < lo->ldo_mirror_count; i++) {
- if (primary == i)
- continue;
-
- LASSERTF(lo->ldo_mirrors[i].lme_stale,
- "both %d and %d are primary\n", i, primary);
-
- lod_foreach_mirror_comp(lod_comp, lo, i) {
- if (!lu_extent_is_overlapped(&extent,
- &lod_comp->llc_extent))
+ if (mlc->mlc_mirror_id == 0) {
+ /* normal resync */
+ lod_foreach_mirror_comp(lod_comp, lo, primary) {
+ if (!lod_comp_inited(lod_comp))
break;
- if (lod_comp_inited(lod_comp))
- continue;
-
- CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n",
- i, lod_comp_index(lo, lod_comp));
-
- info->lti_comp_idx[info->lti_count++] =
- lod_comp_index(lo, lod_comp);
+ extent.e_end = lod_comp->llc_extent.e_end;
}
+
+ rc = lod_prepare_resync(env, lo, &extent);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ /* mirror write, try to init its all components */
+ rc = lod_prepare_resync_mirror(env, lo,
+ mlc->mlc_mirror_id);
+ if (rc)
+ GOTO(out, rc);
}
/* change the file state to SYNC_PENDING */
lod_obj_inc_layout_gen(lo);
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
GOTO(out, rc = -EINVAL);
}
- if (!sync_components || !resync_components) {
- CDEBUG(D_LAYOUT, DFID": no mirror in sync or resync\n",
+ if (!sync_components || (mlc->mlc_resync_count && !resync_components)) {
+ CDEBUG(D_LAYOUT, DFID": no mirror in sync\n",
PFID(lod_object_fid(lo)));
/* tend to return an error code here to prevent
out:
if (rc)
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
RETURN(rc);
}
dt_object_remote(dt_object_child(dt)))
RETURN(-EINVAL);
- lod_write_lock(env, dt, 0);
- rc = lod_load_striping_locked(env, lo);
+ rc = lod_striping_load(env, lo);
if (rc)
GOTO(out, rc);
GOTO(out, rc);
switch (lo->ldo_flr_state) {
- case LCM_FL_NOT_FLR:
+ case LCM_FL_NONE:
rc = lod_declare_update_plain(env, lo, mlc->mlc_intent,
&mlc->mlc_buf, th);
break;
break;
}
out:
- dt_write_unlock(env, dt);
RETURN(rc);
}
*/
static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
- struct thandle *th, int iq)
+ struct thandle *th)
{
LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) ||
S_ISLNK(dt->do_lu.lo_header->loh_attr));
- return lod_sub_write(env, dt_object_child(dt), buf, pos, th, iq);
+ return lod_sub_write(env, dt_object_child(dt), buf, pos, th);
}
static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt,
* \param[in] env execution environment
* \param[in] lo object
*/
-void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
+void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo)
{
struct lod_layout_component *lod_comp;
int i, j;
sizeof(struct dt_object *) *
lod_comp->llc_stripes_allocated);
lod_comp->llc_stripe = NULL;
+ OBD_FREE(lod_comp->llc_ost_indices,
+ sizeof(__u32) *
+ lod_comp->llc_stripes_allocated);
+ lod_comp->llc_ost_indices = NULL;
lod_comp->llc_stripes_allocated = 0;
}
lod_free_comp_entries(lo);
}
}
+void lod_striping_free(const struct lu_env *env, struct lod_object *lo)
+{
+ mutex_lock(&lo->ldo_layout_mutex);
+ lod_striping_free_nolock(env, lo);
+ mutex_unlock(&lo->ldo_layout_mutex);
+}
+
/**
* Implementation of lu_object_operations::loo_object_free.
*
struct lod_object *lo = lu2lod_obj(o);
/* release all underlying object pinned */
- lod_object_free_striping(env, lo);
+ lod_striping_free(env, lo);
lu_object_fini(o);
OBD_SLAB_FREE_PTR(lo, lod_object_kmem);
}