static struct dt_it *lod_striped_it_init(const struct lu_env *env,
struct dt_object *dt, __u32 attr)
{
- struct lod_object *lo = lod_dt_obj(dt);
- struct dt_object *next;
- struct lod_it *it = &lod_env_info(env)->lti_it;
- struct dt_it *it_next;
- ENTRY;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next;
+ struct lod_it *it = &lod_env_info(env)->lti_it;
+ struct dt_it *it_next;
+ __u16 index = 0;
LASSERT(lo->ldo_dir_stripe_count > 0);
- next = lo->ldo_stripe[0];
- LASSERT(next != NULL);
+
+ do {
+ next = lo->ldo_stripe[index];
+ if (next && dt_object_exists(next))
+ break;
+ } while (++index < lo->ldo_dir_stripe_count);
+
+ /* no valid stripe */
+ if (!next || !dt_object_exists(next))
+ return ERR_PTR(-ENODEV);
+
LASSERT(next->do_index_ops != NULL);
it_next = next->do_index_ops->dio_it.init(env, next, attr);
* additional ones */
LASSERT(it->lit_obj == NULL);
- it->lit_stripe_index = 0;
+ it->lit_stripe_index = index;
it->lit_attr = attr;
it->lit_it = it_next;
it->lit_obj = dt;
LOD_CHECK_STRIPED_IT(env, it, lo);
next = lo->ldo_stripe[it->lit_stripe_index];
- LASSERT(next != NULL);
- LASSERT(next->do_index_ops != NULL);
-
- next->do_index_ops->dio_it.fini(env, it->lit_it);
+ if (next) {
+ LASSERT(next->do_index_ops != NULL);
+ next->do_index_ops->dio_it.fini(env, it->lit_it);
+ }
}
/* the iterator not in use any more */
static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
const struct dt_key *key)
{
- const struct lod_it *it = (const struct lod_it *)di;
- struct lod_object *lo = lod_dt_obj(it->lit_obj);
- struct dt_object *next;
- ENTRY;
+ const struct lod_it *it = (const struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
LOD_CHECK_STRIPED_IT(env, it, lo);
next = lo->ldo_stripe[it->lit_stripe_index];
LASSERT(next != NULL);
+ LASSERT(dt_object_exists(next));
LASSERT(next->do_index_ops != NULL);
return next->do_index_ops->dio_it.get(env, it->lit_it, key);
*/
static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
{
- struct lod_it *it = (struct lod_it *)di;
- struct lod_object *lo = lod_dt_obj(it->lit_obj);
- struct dt_object *next;
+ struct lod_it *it = (struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+
+ /*
+ * If lit_it == NULL, then it means the sub_it has been finished,
+ * which only happens in failure cases, see lod_striped_it_next()
+ */
+ if (!it->lit_it)
+ return;
LOD_CHECK_STRIPED_IT(env, it, lo);
*/
static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
{
- struct lod_it *it = (struct lod_it *)di;
- struct lod_object *lo = lod_dt_obj(it->lit_obj);
- struct dt_object *next;
- struct dt_it *it_next;
- int rc;
+ struct lod_it *it = (struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+ struct dt_it *it_next;
+ __u32 index;
+ int rc;
+
ENTRY;
LOD_CHECK_STRIPED_IT(env, it, lo);
next = lo->ldo_stripe[it->lit_stripe_index];
LASSERT(next != NULL);
+ LASSERT(dt_object_exists(next));
LASSERT(next->do_index_ops != NULL);
again:
rc = next->do_index_ops->dio_it.next(env, it->lit_it);
RETURN(rc);
}
- /* go to next stripe */
- if (it->lit_stripe_index + 1 >= lo->ldo_dir_stripe_count)
- RETURN(1);
-
- it->lit_stripe_index++;
-
next->do_index_ops->dio_it.put(env, it->lit_it);
next->do_index_ops->dio_it.fini(env, it->lit_it);
it->lit_it = NULL;
- next = lo->ldo_stripe[it->lit_stripe_index];
- LASSERT(next != NULL);
- rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
- if (rc != 0)
- RETURN(rc);
+ /* go to next stripe */
+ index = it->lit_stripe_index;
+ while (++index < lo->ldo_dir_stripe_count) {
+ next = lo->ldo_stripe[index];
+ if (!next)
+ continue;
- LASSERT(next->do_index_ops != NULL);
+ if (!dt_object_exists(next))
+ continue;
+
+ rc = next->do_ops->do_index_try(env, next,
+ &dt_directory_features);
+ if (rc != 0)
+ RETURN(rc);
+
+ LASSERT(next->do_index_ops != NULL);
+
+ it_next = next->do_index_ops->dio_it.init(env, next,
+ it->lit_attr);
+ if (IS_ERR(it_next))
+ RETURN(PTR_ERR(it_next));
+
+ rc = next->do_index_ops->dio_it.get(env, it_next,
+ (const struct dt_key *)"");
+ if (rc <= 0)
+ RETURN(rc == 0 ? -EIO : rc);
- it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
- if (!IS_ERR(it_next)) {
it->lit_it = it_next;
+ it->lit_stripe_index = index;
goto again;
- } else {
- rc = PTR_ERR(it_next);
+
}
- RETURN(rc);
+ RETURN(1);
}
/**
int i;
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- if (dt_object_exists(lo->ldo_stripe[i]) == 0)
+ if (!lo->ldo_stripe[i])
+ continue;
+ if (!dt_object_exists(lo->ldo_stripe[i]))
continue;
rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
lo->ldo_stripe[i], feat);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
if (lo->ldo_stripe[i] == NULL)
continue;
+ if (!dt_object_exists(lo->ldo_stripe[i]))
+ continue;
rc = lod_sub_declare_attr_set(env, lo->ldo_stripe[i],
attr, th);
if (rc != 0)
rc = dt_xattr_get(env, dt_object_child(dt), buf, name);
if (strcmp(name, XATTR_NAME_LMV) == 0) {
struct lmv_mds_md_v1 *lmv1;
+ struct lmv_foreign_md *lfm;
int rc1 = 0;
if (rc > (typeof(rc))sizeof(*lmv1))
RETURN(rc);
- if (rc < (typeof(rc))sizeof(*lmv1))
+ /* short (<= sizeof(struct lmv_mds_md_v1)) foreign LMV case */
+ /* XXX empty foreign LMV is not allowed */
+ if (rc <= offsetof(typeof(*lfm), lfm_value))
RETURN(rc = rc > 0 ? -EINVAL : rc);
if (buf->lb_buf == NULL || buf->lb_len == 0) {
CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
+ /* lti_buf is large enough for *lmv1 or a short
+ * (<= sizeof(struct lmv_mds_md_v1)) foreign LMV
+ */
info->lti_buf.lb_buf = info->lti_key;
info->lti_buf.lb_len = sizeof(*lmv1);
rc = dt_xattr_get(env, dt_object_child(dt),
&info->lti_buf, name);
+ if (unlikely(rc <= offsetof(typeof(*lfm),
+ lfm_value)))
+ RETURN(rc = rc > 0 ? -EINVAL : rc);
+
+ lfm = info->lti_buf.lb_buf;
+ if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN)
+ RETURN(rc);
+
if (unlikely(rc != sizeof(*lmv1)))
RETURN(rc = rc > 0 ? -EINVAL : rc);
le32_to_cpu(lmv1->lmv_stripe_count),
LMV_MAGIC_V1);
} else {
+ lfm = buf->lb_buf;
+ if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN)
+ RETURN(rc);
+
+ if (rc != sizeof(*lmv1))
+ RETURN(rc = rc > 0 ? -EINVAL : rc);
+
rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
buf, false);
}
LASSERT(mutex_is_locked(&lo->ldo_layout_mutex));
- if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
+ /* XXX may be useless as not called for foreign LMV ?? */
+ if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_FOREIGN)
RETURN(0);
if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
__u32 idx;
fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
- if (!fid_is_sane(fid))
- GOTO(out, rc = -ESTALE);
+ if (!fid_is_sane(fid)) {
+ stripe[i] = NULL;
+ continue;
+ }
rc = lod_fld_lookup(env, lod, fid, &idx, &type);
if (rc != 0)
struct linkea_data ldata = { NULL };
struct lu_buf linkea_buf;
+ /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */
+ if (!dto)
+ continue;
+
rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th);
if (rc != 0)
GOTO(out, rc);
/* The lum has been verifed in lod_verify_md_striping */
LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC ||
le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC);
- LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
- stripe_count = le32_to_cpu(lum->lum_stripe_count);
+ stripe_count = lo->ldo_dir_stripe_count;
OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
if (idx_array == NULL)
* in the above loop */
LASSERT(tgt_dt != NULL);
LASSERT(fid_is_sane(&fid));
+
+ /* fail a remote stripe FID allocation */
+ if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID))
+ continue;
+
conf.loc_flags = LOC_F_NEW;
dto = dt_locate_at(env, tgt_dt, &fid,
dt->do_lu.lo_dev->ld_site->ls_top_dev,
}
/**
+ *
+ * Alloc cached foreign LMV
+ *
+ * \param[in] lo object
+ * \param[in] size size of foreign LMV
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size)
+{
+ OBD_ALLOC_LARGE(lo->ldo_foreign_lmv, size);
+ if (lo->ldo_foreign_lmv == NULL)
+ return -ENOMEM;
+ lo->ldo_foreign_lmv_size = size;
+ lo->ldo_dir_is_foreign = 1;
+
+ return 0;
+}
+
+/**
* Declare create striped md object.
*
* The function declares intention to create a striped directory. This is a
le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
(int)le32_to_cpu(lum->lum_stripe_offset));
- if (lo->ldo_dir_stripe_count == 0)
+ if (lo->ldo_dir_stripe_count == 0) {
+ if (lo->ldo_dir_is_foreign) {
+ rc = lod_alloc_foreign_lmv(lo, lum_buf->lb_len);
+ if (rc != 0)
+ GOTO(out, rc);
+ memcpy(lo->ldo_foreign_lmv, lum, lum_buf->lb_len);
+ lo->ldo_dir_stripe_loaded = 1;
+ }
GOTO(out, rc = 0);
+ }
/* prepare dir striped objects */
rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
fid_le_to_cpu(fid,
&lmv->lmv_stripe_fids[i]);
if (!fid_is_sane(fid))
- GOTO(out, rc = -ESTALE);
+ continue;
rc = lod_fld_lookup(env, lod, fid, &idx, &type);
if (rc)
for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
dto = lo->ldo_stripe[i];
- LASSERT(dto);
+ if (!dto)
+ continue;
if (!dt_try_as_dir(env, dto))
return -ENOTDIR;
for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
dto = lo->ldo_stripe[i];
- LASSERT(dto);
+ if (!dto)
+ continue;
rc = lod_sub_delete(env, dto,
(const struct dt_key *)dotdot, th);
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
+ if (!lo->ldo_stripe[i])
+ continue;
+
+ if (!dt_object_exists(lo->ldo_stripe[i]))
+ continue;
rc = lod_sub_declare_xattr_set(env, lo->ldo_stripe[i],
buf, name, fl, th);
return rc;
}
- filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
-
/*
- * mdd_declare_migrate_create() declares this via source object because
- * target is not ready yet, so declare anyway.
+ * locd_buf is set if it's called by dir migration, which doesn't check
+ * pfid and comp id.
*/
- if (!data->locd_declare &&
- lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) &&
- ff->ff_layout.ol_comp_id == comp->llc_id)
- return 0;
+ if (data->locd_buf) {
+ memset(ff, 0, sizeof(*ff));
+ ff->ff_parent = *(struct lu_fid *)data->locd_buf->lb_buf;
+ } else {
+ filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
+
+ if (lu_fid_eq(lod_object_fid(lo), &ff->ff_parent) &&
+ ff->ff_layout.ol_comp_id == comp->llc_id)
+ return 0;
+
+ memset(ff, 0, sizeof(*ff));
+ ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu);
+ }
/* rewrite filter_fid */
- memset(ff, 0, sizeof(*ff));
- ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu);
ff->ff_parent.f_ver = stripe_idx;
ff->ff_layout.ol_stripe_size = comp->llc_stripe_size;
ff->ff_layout.ol_stripe_count = comp->llc_stripe_count;
*/
static int lod_replace_parent_fid(const struct lu_env *env,
struct dt_object *dt,
+ const struct lu_buf *buf,
struct thandle *th, bool declare)
{
struct lod_object *lo = lod_dt_obj(dt);
struct lod_thread_info *info = lod_env_info(env);
- struct lu_buf *buf = &info->lti_buf;
struct filter_fid *ff;
struct lod_obj_stripe_cb_data data = { { 0 } };
int rc;
RETURN(rc);
}
- buf->lb_buf = info->lti_ea_store;
- buf->lb_len = info->lti_ea_store_size;
-
data.locd_declare = declare;
data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb;
+ data.locd_buf = buf;
rc = lod_obj_for_each_stripe(env, lo, th, &data);
RETURN(rc);
else if ((__u16)-1 == entry->llc_stripe_count)
return lod->lod_desc.ld_tgt_count;
else
- return lod_get_stripe_count(lod, lo, entry->llc_stripe_count);
+ return lod_get_stripe_count(lod, lo,
+ entry->llc_stripe_count, false);
}
static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
int magic, size = 0, i;
struct lod_layout_component *comp_entries;
__u16 comp_cnt;
- bool is_composite;
+ bool is_composite, is_foreign = false;
if (is_dir) {
comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
comp_cnt = lo->ldo_comp_cnt;
comp_entries = lo->ldo_comp_entries;
is_composite = lo->ldo_is_composite;
+ is_foreign = lo->ldo_is_foreign;
}
+ if (is_foreign)
+ return lo->ldo_foreign_lov_size;
LASSERT(comp_cnt != 0 && comp_entries != NULL);
if (is_composite) {
} else if (S_ISDIR(mode)) {
rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
} else if (strcmp(name, XATTR_NAME_FID) == 0) {
- rc = lod_replace_parent_fid(env, dt, th, true);
+ rc = lod_replace_parent_fid(env, dt, buf, th, true);
} else {
rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th);
}
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
+ if (!lo->ldo_stripe[i])
+ continue;
+
+ if (!dt_object_exists(lo->ldo_stripe[i]))
+ continue;
rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], buf, name,
fl, th);
if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
le32_to_cpu(lum->lum_stripe_offset)) &&
- le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
+ le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC &&
+ le32_to_cpu(lum->lum_hash_type) == LMV_HASH_TYPE_UNKNOWN) {
rc = lod_xattr_del_internal(env, dt, name, th);
if (rc == -ENODATA)
rc = 0;
/* The stripes are supposed to be allocated in declare phase,
* if there are no stripes being allocated, it will skip */
- if (lo->ldo_dir_stripe_count == 0)
+ if (lo->ldo_dir_stripe_count == 0) {
+ if (lo->ldo_dir_is_foreign) {
+ rc = lod_sub_xattr_set(env, dt_object_child(dt), buf,
+ XATTR_NAME_LMV, fl, th);
+ if (rc != 0)
+ RETURN(rc);
+ }
RETURN(0);
+ }
rc = dt_attr_get(env, dt_object_child(dt), attr);
if (rc != 0)
struct linkea_data ldata = { NULL };
struct lu_buf linkea_buf;
+ /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */
+ if (!dto)
+ continue;
+
+ /* fail a remote stripe creation */
+ if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE))
+ continue;
+
/* if it's source stripe of migrating directory, don't create */
if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
i >= lo->ldo_dir_migrate_offset)) {
th);
if (rc != 0)
RETURN(rc);
+ } else {
+ /* foreign LMV EA case */
+ if (lmu) {
+ struct lmv_foreign_md *lfm = lmu->lb_buf;
+
+ if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) {
+ rc = lod_declare_xattr_set_lmv(env, dt, attr,
+ lmu, dof, th);
+ }
+ } else {
+ if (lo->ldo_dir_is_foreign) {
+ LASSERT(lo->ldo_foreign_lmv != NULL &&
+ lo->ldo_foreign_lmv_size > 0);
+ info->lti_buf.lb_buf = lo->ldo_foreign_lmv;
+ info->lti_buf.lb_len = lo->ldo_foreign_lmv_size;
+ lmu = &info->lti_buf;
+ rc = lod_xattr_set_lmv(env, dt, lmu,
+ XATTR_NAME_LMV, 0, th);
+ }
+ }
}
/* Transfer default LMV striping from the parent */
if (lds != NULL && lds->lds_dir_def_striping_set &&
- !LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count,
- lds->lds_dir_def_stripe_offset)) {
+ !(LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count,
+ lds->lds_dir_def_stripe_offset) &&
+ le32_to_cpu(lds->lds_dir_def_hash_type) !=
+ LMV_HASH_TYPE_UNKNOWN)) {
struct lmv_user_md_v1 *v1 = info->lti_ea_store;
if (info->lti_ea_store_size < sizeof(*v1)) {
LASSERT(lo);
- if (lo->ldo_comp_cnt == 0) {
+ if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) {
lod_striping_free(env, lo);
rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
RETURN(rc);
RETURN(rc);
} else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_LOV) == 0) {
- struct lod_thread_info *info = lod_env_info(env);
- struct lod_default_striping *lds = &info->lti_def_striping;
+ struct lod_default_striping *lds = lod_lds_buf_get(env);
struct lov_user_md_v1 *v1 = buf->lb_buf;
char pool[LOV_MAXPOOLNAME + 1];
bool is_del;
}
RETURN(rc);
} else if (strcmp(name, XATTR_NAME_FID) == 0) {
- rc = lod_replace_parent_fid(env, dt, th, false);
+ rc = lod_replace_parent_fid(env, dt, buf, th, false);
RETURN(rc);
}
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
struct dt_object *dto = lo->ldo_stripe[i];
- LASSERT(dto);
+ if (!dto)
+ continue;
+
rc = lod_sub_declare_xattr_del(env, dto, name, th);
if (rc != 0)
break;
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
struct dt_object *dto = lo->ldo_stripe[i];
- LASSERT(dto);
+ if (!dto)
+ continue;
rc = lod_sub_xattr_del(env, dto, name, th);
if (rc != 0)
LCME_TEMPLATE_FLAGS;
}
- if (v1->lmm_pattern != LOV_PATTERN_RAID0 &&
- v1->lmm_pattern != LOV_PATTERN_MDT &&
- v1->lmm_pattern != 0) {
+ if (!lov_pattern_supported(v1->lmm_pattern) &&
+ !(v1->lmm_pattern & LOV_PATTERN_F_RELEASED)) {
lod_free_def_comp_entries(lds);
RETURN(-EINVAL);
}
{
struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
struct lod_thread_info *info = lod_env_info(env);
- struct lod_default_striping *lds = &info->lti_def_striping;
+ struct lod_default_striping *lds = lod_lds_buf_get(env);
struct dt_object *nextp = NULL;
struct dt_object *nextc;
struct lod_object *lp = NULL;
/* other default values are 0 */
lc->ldo_dir_stripe_offset = -1;
- /* get default striping from parent object */
- if (likely(lp != NULL))
+ /* no default striping configuration is needed for
+ * foreign dirs
+ */
+ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 &&
+ le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_FOREIGN) {
+ lc->ldo_dir_is_foreign = true;
+ /* keep stripe_count 0 and stripe_offset -1 */
+ CDEBUG(D_INFO, "no default striping for foreign dir\n");
+ RETURN_EXIT;
+ }
+
+ /*
+ * If parent object is not root directory,
+ * then get default striping from parent object.
+ */
+ if (likely(lp != NULL) && !fid_is_root(lod_object_fid(lp)))
lod_get_default_striping(env, lp, lds);
/* set child default striping info, default value is NULL */
if (lod_comp->llc_stripe_size <= 0)
lod_comp->llc_stripe_size =
def_comp->llc_stripe_size;
- if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT)
+ if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT &&
+ (!lod_comp->llc_pool || !lod_comp->llc_pool[0]))
lod_comp->llc_stripe_offset =
def_comp->llc_stripe_offset;
if (lod_comp->llc_pool == NULL)
int rc = 0, i, j;
ENTRY;
- LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL);
+ LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) ||
+ lo->ldo_is_foreign);
mirror_id = 0; /* non-flr file's mirror_id is 0 */
if (lo->ldo_mirror_count > 1) {
RETURN(rc);
}
+static inline bool lod_obj_is_dom(struct dt_object *dt)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+
+ if (!dt_object_exists(dt_object_child(dt)))
+ return false;
+
+ if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ return false;
+
+ if (!lo->ldo_comp_cnt)
+ return false;
+
+ return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) ==
+ LOV_PATTERN_MDT);
+}
+
/**
* Implementation of dt_object_operations::do_create.
*
RETURN(rc);
if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
- lod_obj_is_striped(dt) && dof->u.dof_reg.striped != 0) {
+ (lod_obj_is_striped(dt) || lod_obj_is_dom(dt)) &&
+ dof->u.dof_reg.striped != 0) {
LASSERT(lod_dt_obj(dt)->ldo_comp_cached == 0);
rc = lod_striped_create(env, dt, attr, dof, th);
}
static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt,
struct thandle *th)
{
- struct dt_object *next = dt_object_child(dt);
- struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct lod_object *lo = lod_dt_obj(dt);
struct lod_thread_info *info = lod_env_info(env);
- char *stripe_name = info->lti_key;
- int rc, i;
+ struct dt_object *stripe;
+ char *stripe_name = info->lti_key;
+ int rc, i;
+
ENTRY;
/*
RETURN(rc);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
+ continue;
+
rc = lod_sub_declare_ref_del(env, next, th);
if (rc != 0)
RETURN(rc);
- snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
- PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
- i);
+ snprintf(stripe_name, sizeof(info->lti_key),
+ DFID":%d",
+ PFID(lu_object_fid(&stripe->do_lu)), i);
rc = lod_sub_declare_delete(env, next,
(const struct dt_key *)stripe_name, th);
if (rc != 0)
/* declare destroy all striped objects */
if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- if (lo->ldo_stripe[i] == NULL)
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
continue;
- rc = lod_sub_declare_ref_del(env, lo->ldo_stripe[i],
- th);
+ if (!dt_object_exists(stripe))
+ continue;
- rc = lod_sub_declare_destroy(env, lo->ldo_stripe[i],
- th);
+ rc = lod_sub_declare_ref_del(env, stripe, th);
+ if (rc != 0)
+ break;
+
+ rc = lod_sub_declare_destroy(env, stripe, th);
if (rc != 0)
break;
}
struct dt_object *next = dt_object_child(dt);
struct lod_object *lo = lod_dt_obj(dt);
struct lod_thread_info *info = lod_env_info(env);
- char *stripe_name = info->lti_key;
- unsigned int i;
- int rc;
+ char *stripe_name = info->lti_key;
+ struct dt_object *stripe;
+ unsigned int i;
+ int rc;
+
ENTRY;
/* destroy sub-stripe of master object */
RETURN(rc);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
+ continue;
+
rc = lod_sub_ref_del(env, next, th);
if (rc != 0)
RETURN(rc);
snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
- PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
- i);
+ PFID(lu_object_fid(&stripe->do_lu)), i);
CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
PFID(lu_object_fid(&dt->do_lu)), stripe_name,
- PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
+ PFID(lu_object_fid(&stripe->do_lu)));
rc = lod_sub_delete(env, next,
(const struct dt_key *)stripe_name, th);
/* destroy all striped objects */
if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- if (lo->ldo_stripe[i] == NULL)
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
+ continue;
+
+ if (!dt_object_exists(stripe))
continue;
+
if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
i == cfs_fail_val) {
- dt_write_lock(env, lo->ldo_stripe[i],
- MOR_TGT_CHILD);
- rc = lod_sub_ref_del(env, lo->ldo_stripe[i],
- th);
- dt_write_unlock(env, lo->ldo_stripe[i]);
+ dt_write_lock(env, stripe, MOR_TGT_CHILD);
+ rc = lod_sub_ref_del(env, stripe, th);
+ dt_write_unlock(env, stripe);
if (rc != 0)
break;
- rc = lod_sub_destroy(env, lo->ldo_stripe[i],
- th);
+ rc = lod_sub_destroy(env, stripe, th);
if (rc != 0)
break;
}
* NB, ha_count may not equal to ldo_dir_stripe_count, because dir
* layout may change, e.g., shrink dir layout after migration.
*/
- for (i = 0; i < lo->ldo_dir_stripe_count; i++)
- dt_invalidate(env, lo->ldo_stripe[i]);
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ if (lo->ldo_stripe[i])
+ dt_invalidate(env, lo->ldo_stripe[i]);
+ }
slave_locks_size = offsetof(typeof(*slave_locks),
ha_handles[slave_locks->ha_count]);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
struct lustre_handle lockh;
struct ldlm_res_id *res_id;
+ struct dt_object *stripe;
+
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
+ continue;
res_id = &lod_env_info(env)->lti_res_id;
- fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
- res_id);
+ fid_build_reg_res_name(lu_object_fid(&stripe->do_lu), res_id);
einfo->ei_res_id = res_id;
- LASSERT(lo->ldo_stripe[i] != NULL);
- if (dt_object_remote(lo->ldo_stripe[i])) {
+ if (dt_object_remote(stripe)) {
set_bit(i, (void *)slave_locks->ha_map);
- rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
- einfo, policy);
+ rc = dt_object_lock(env, stripe, &lockh, einfo, policy);
} else {
struct ldlm_namespace *ns = einfo->ei_namespace;
ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
RETURN(picked);
}
+static int lod_prepare_resync_mirror(const struct lu_env *env,
+ struct lod_object *lo,
+ __u16 mirror_id)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_layout_component *lod_comp;
+ bool neg = !!(MIRROR_ID_NEG & mirror_id);
+ int i;
+
+ mirror_id &= ~MIRROR_ID_NEG;
+
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if ((!neg && lo->ldo_mirrors[i].lme_id != mirror_id) ||
+ (neg && lo->ldo_mirrors[i].lme_id == mirror_id))
+ continue;
+
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+ }
+
+ return 0;
+}
+
/**
* figure out the components should be instantiated for resync.
*/
* prep uninited all components assuming any non-stale mirror
* could be picked as the primary mirror.
*/
- for (i = 0; i < lo->ldo_mirror_count; i++) {
- if (lo->ldo_mirrors[i].lme_stale)
- continue;
+ if (mlc->mlc_mirror_id == 0) {
+ /* normal resync */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (lo->ldo_mirrors[i].lme_stale)
+ continue;
- lod_foreach_mirror_comp(lod_comp, lo, i) {
- if (!lod_comp_inited(lod_comp))
- break;
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lod_comp_inited(lod_comp))
+ break;
- if (extent.e_end < lod_comp->llc_extent.e_end)
- extent.e_end =
- lod_comp->llc_extent.e_end;
+ if (extent.e_end <
+ lod_comp->llc_extent.e_end)
+ extent.e_end =
+ lod_comp->llc_extent.e_end;
+ }
}
+ rc = lod_prepare_resync(env, lo, &extent);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ /* mirror write, try to init its all components */
+ rc = lod_prepare_resync_mirror(env, lo,
+ mlc->mlc_mirror_id);
+ if (rc)
+ GOTO(out, rc);
}
- rc = lod_prepare_resync(env, lo, &extent);
- if (rc)
- GOTO(out, rc);
/* change the file state to SYNC_PENDING */
lo->ldo_flr_state = LCM_FL_SYNC_PENDING;
}
lod_comp_index(lo, lod_comp);
}
} else { /* MD_LAYOUT_RESYNC */
- lod_foreach_mirror_comp(lod_comp, lo, primary) {
- if (!lod_comp_inited(lod_comp))
- break;
+ if (mlc->mlc_mirror_id == 0) {
+ /* normal resync */
+ lod_foreach_mirror_comp(lod_comp, lo, primary) {
+ if (!lod_comp_inited(lod_comp))
+ break;
- extent.e_end = lod_comp->llc_extent.e_end;
+ extent.e_end = lod_comp->llc_extent.e_end;
+ }
+
+ rc = lod_prepare_resync(env, lo, &extent);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ /* mirror write, try to init its all components */
+ rc = lod_prepare_resync_mirror(env, lo,
+ mlc->mlc_mirror_id);
+ if (rc)
+ GOTO(out, rc);
}
- rc = lod_prepare_resync(env, lo, &extent);
- if (rc)
- GOTO(out, rc);
/* change the file state to SYNC_PENDING */
lo->ldo_flr_state = LCM_FL_SYNC_PENDING;
}
/**
*
+ * Alloc cached foreign LOV
+ *
+ * \param[in] lo object
+ * \param[in] size size of foreign LOV
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+int lod_alloc_foreign_lov(struct lod_object *lo, size_t size)
+{
+ OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size);
+ if (lo->ldo_foreign_lov == NULL)
+ return -ENOMEM;
+ lo->ldo_foreign_lov_size = size;
+ lo->ldo_is_foreign = 1;
+ return 0;
+}
+
+/**
+ *
+ * Free cached foreign LOV
+ *
+ * \param[in] lo object
+ */
+void lod_free_foreign_lov(struct lod_object *lo)
+{
+ if (lo->ldo_foreign_lov != NULL)
+ OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size);
+ lo->ldo_foreign_lov = NULL;
+ lo->ldo_foreign_lov_size = 0;
+ lo->ldo_is_foreign = 0;
+}
+
+/**
+ *
+ * Free cached foreign LMV
+ *
+ * \param[in] lo object
+ */
+void lod_free_foreign_lmv(struct lod_object *lo)
+{
+ if (lo->ldo_foreign_lmv != NULL)
+ OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size);
+ lo->ldo_foreign_lmv = NULL;
+ lo->ldo_foreign_lmv_size = 0;
+ lo->ldo_dir_is_foreign = 0;
+}
+
+/**
+ *
* Release resources associated with striping.
*
* If the object is striped (regular or directory), then release
struct lod_layout_component *lod_comp;
int i, j;
- if (lo->ldo_stripe != NULL) {
+ if (unlikely(lo->ldo_is_foreign)) {
+ lod_free_foreign_lov(lo);
+ lo->ldo_comp_cached = 0;
+ } else if (unlikely(lo->ldo_dir_is_foreign)) {
+ lod_free_foreign_lmv(lo);
+ lo->ldo_dir_stripe_loaded = 0;
+ } else if (lo->ldo_stripe != NULL) {
LASSERT(lo->ldo_comp_entries == NULL);
LASSERT(lo->ldo_dir_stripes_allocated > 0);