static struct dt_it *lod_striped_it_init(const struct lu_env *env,
struct dt_object *dt, __u32 attr)
{
- struct lod_object *lo = lod_dt_obj(dt);
- struct dt_object *next;
- struct lod_it *it = &lod_env_info(env)->lti_it;
- struct dt_it *it_next;
- ENTRY;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next;
+ struct lod_it *it = &lod_env_info(env)->lti_it;
+ struct dt_it *it_next;
+ __u16 index = 0;
LASSERT(lo->ldo_dir_stripe_count > 0);
- next = lo->ldo_stripe[0];
- LASSERT(next != NULL);
+
+ do {
+ next = lo->ldo_stripe[index];
+ if (next && dt_object_exists(next))
+ break;
+ } while (++index < lo->ldo_dir_stripe_count);
+
+ /* no valid stripe */
+ if (!next || !dt_object_exists(next))
+ return ERR_PTR(-ENODEV);
+
LASSERT(next->do_index_ops != NULL);
it_next = next->do_index_ops->dio_it.init(env, next, attr);
* additional ones */
LASSERT(it->lit_obj == NULL);
- it->lit_stripe_index = 0;
+ it->lit_stripe_index = index;
it->lit_attr = attr;
it->lit_it = it_next;
it->lit_obj = dt;
LOD_CHECK_STRIPED_IT(env, it, lo);
next = lo->ldo_stripe[it->lit_stripe_index];
- LASSERT(next != NULL);
- LASSERT(next->do_index_ops != NULL);
-
- next->do_index_ops->dio_it.fini(env, it->lit_it);
+ if (next) {
+ LASSERT(next->do_index_ops != NULL);
+ next->do_index_ops->dio_it.fini(env, it->lit_it);
+ }
}
/* the iterator not in use any more */
static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
const struct dt_key *key)
{
- const struct lod_it *it = (const struct lod_it *)di;
- struct lod_object *lo = lod_dt_obj(it->lit_obj);
- struct dt_object *next;
- ENTRY;
+ const struct lod_it *it = (const struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
LOD_CHECK_STRIPED_IT(env, it, lo);
next = lo->ldo_stripe[it->lit_stripe_index];
LASSERT(next != NULL);
+ LASSERT(dt_object_exists(next));
LASSERT(next->do_index_ops != NULL);
return next->do_index_ops->dio_it.get(env, it->lit_it, key);
*/
static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
{
- struct lod_it *it = (struct lod_it *)di;
- struct lod_object *lo = lod_dt_obj(it->lit_obj);
- struct dt_object *next;
+ struct lod_it *it = (struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+
+ /*
+ * If lit_it == NULL, then it means the sub_it has been finished,
+ * which only happens in failure cases, see lod_striped_it_next()
+ */
+ if (!it->lit_it)
+ return;
LOD_CHECK_STRIPED_IT(env, it, lo);
*/
static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
{
- struct lod_it *it = (struct lod_it *)di;
- struct lod_object *lo = lod_dt_obj(it->lit_obj);
- struct dt_object *next;
- struct dt_it *it_next;
- int rc;
+ struct lod_it *it = (struct lod_it *)di;
+ struct lod_object *lo = lod_dt_obj(it->lit_obj);
+ struct dt_object *next;
+ struct dt_it *it_next;
+ __u32 index;
+ int rc;
+
ENTRY;
LOD_CHECK_STRIPED_IT(env, it, lo);
next = lo->ldo_stripe[it->lit_stripe_index];
LASSERT(next != NULL);
+ LASSERT(dt_object_exists(next));
LASSERT(next->do_index_ops != NULL);
again:
rc = next->do_index_ops->dio_it.next(env, it->lit_it);
RETURN(rc);
}
- /* go to next stripe */
- if (it->lit_stripe_index + 1 >= lo->ldo_dir_stripe_count)
- RETURN(1);
-
- it->lit_stripe_index++;
-
next->do_index_ops->dio_it.put(env, it->lit_it);
next->do_index_ops->dio_it.fini(env, it->lit_it);
it->lit_it = NULL;
- next = lo->ldo_stripe[it->lit_stripe_index];
- LASSERT(next != NULL);
- rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
- if (rc != 0)
- RETURN(rc);
+ /* go to next stripe */
+ index = it->lit_stripe_index;
+ while (++index < lo->ldo_dir_stripe_count) {
+ next = lo->ldo_stripe[index];
+ if (!next)
+ continue;
- LASSERT(next->do_index_ops != NULL);
+ if (!dt_object_exists(next))
+ continue;
+
+ rc = next->do_ops->do_index_try(env, next,
+ &dt_directory_features);
+ if (rc != 0)
+ RETURN(rc);
+
+ LASSERT(next->do_index_ops != NULL);
+
+ it_next = next->do_index_ops->dio_it.init(env, next,
+ it->lit_attr);
+ if (IS_ERR(it_next))
+ RETURN(PTR_ERR(it_next));
+
+ rc = next->do_index_ops->dio_it.get(env, it_next,
+ (const struct dt_key *)"");
+ if (rc <= 0)
+ RETURN(rc == 0 ? -EIO : rc);
- it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
- if (!IS_ERR(it_next)) {
it->lit_it = it_next;
+ it->lit_stripe_index = index;
goto again;
- } else {
- rc = PTR_ERR(it_next);
+
}
- RETURN(rc);
+ RETURN(1);
}
/**
int i;
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- if (dt_object_exists(lo->ldo_stripe[i]) == 0)
+ if (!lo->ldo_stripe[i])
+ continue;
+ if (!dt_object_exists(lo->ldo_stripe[i]))
continue;
rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
lo->ldo_stripe[i], feat);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
if (lo->ldo_stripe[i] == NULL)
continue;
+ if (!dt_object_exists(lo->ldo_stripe[i]))
+ continue;
rc = lod_sub_declare_attr_set(env, lo->ldo_stripe[i],
attr, th);
if (rc != 0)
__u32 idx;
fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
- if (!fid_is_sane(fid))
- GOTO(out, rc = -ESTALE);
+ if (!fid_is_sane(fid)) {
+ stripe[i] = NULL;
+ continue;
+ }
rc = lod_fld_lookup(env, lod, fid, &idx, &type);
if (rc != 0)
struct linkea_data ldata = { NULL };
struct lu_buf linkea_buf;
+ /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */
+ if (!dto)
+ continue;
+
rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th);
if (rc != 0)
GOTO(out, rc);
* in the above loop */
LASSERT(tgt_dt != NULL);
LASSERT(fid_is_sane(&fid));
+
+ /* fail a remote stripe FID allocation */
+ if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID))
+ continue;
+
conf.loc_flags = LOC_F_NEW;
dto = dt_locate_at(env, tgt_dt, &fid,
dt->do_lu.lo_dev->ld_site->ls_top_dev,
fid_le_to_cpu(fid,
&lmv->lmv_stripe_fids[i]);
if (!fid_is_sane(fid))
- GOTO(out, rc = -ESTALE);
+ continue;
rc = lod_fld_lookup(env, lod, fid, &idx, &type);
if (rc)
for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
dto = lo->ldo_stripe[i];
- LASSERT(dto);
+ if (!dto)
+ continue;
if (!dt_try_as_dir(env, dto))
return -ENOTDIR;
for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
dto = lo->ldo_stripe[i];
- LASSERT(dto);
+ if (!dto)
+ continue;
rc = lod_sub_delete(env, dto,
(const struct dt_key *)dotdot, th);
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
+ if (!lo->ldo_stripe[i])
+ continue;
+
+ if (!dt_object_exists(lo->ldo_stripe[i]))
+ continue;
rc = lod_sub_declare_xattr_set(env, lo->ldo_stripe[i],
buf, name, fl, th);
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
+ if (!lo->ldo_stripe[i])
+ continue;
+
+ if (!dt_object_exists(lo->ldo_stripe[i]))
+ continue;
rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], buf, name,
fl, th);
struct linkea_data ldata = { NULL };
struct lu_buf linkea_buf;
+ /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */
+ if (!dto)
+ continue;
+
+ /* fail a remote stripe creation */
+ if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE))
+ continue;
+
/* if it's source stripe of migrating directory, don't create */
if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
i >= lo->ldo_dir_migrate_offset)) {
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
struct dt_object *dto = lo->ldo_stripe[i];
- LASSERT(dto);
+ if (!dto)
+ continue;
+
rc = lod_sub_declare_xattr_del(env, dto, name, th);
if (rc != 0)
break;
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
struct dt_object *dto = lo->ldo_stripe[i];
- LASSERT(dto);
+ if (!dto)
+ continue;
rc = lod_sub_xattr_del(env, dto, name, th);
if (rc != 0)
static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt,
struct thandle *th)
{
- struct dt_object *next = dt_object_child(dt);
- struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct lod_object *lo = lod_dt_obj(dt);
struct lod_thread_info *info = lod_env_info(env);
- char *stripe_name = info->lti_key;
- int rc, i;
+ struct dt_object *stripe;
+ char *stripe_name = info->lti_key;
+ int rc, i;
+
ENTRY;
/*
RETURN(rc);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
+ continue;
+
rc = lod_sub_declare_ref_del(env, next, th);
if (rc != 0)
RETURN(rc);
- snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
- PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
- i);
+ snprintf(stripe_name, sizeof(info->lti_key),
+ DFID":%d",
+ PFID(lu_object_fid(&stripe->do_lu)), i);
rc = lod_sub_declare_delete(env, next,
(const struct dt_key *)stripe_name, th);
if (rc != 0)
/* declare destroy all striped objects */
if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- if (lo->ldo_stripe[i] == NULL)
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
continue;
- rc = lod_sub_declare_ref_del(env, lo->ldo_stripe[i],
- th);
+ if (!dt_object_exists(stripe))
+ continue;
- rc = lod_sub_declare_destroy(env, lo->ldo_stripe[i],
- th);
+ rc = lod_sub_declare_ref_del(env, stripe, th);
+ if (rc != 0)
+ break;
+
+ rc = lod_sub_declare_destroy(env, stripe, th);
if (rc != 0)
break;
}
struct dt_object *next = dt_object_child(dt);
struct lod_object *lo = lod_dt_obj(dt);
struct lod_thread_info *info = lod_env_info(env);
- char *stripe_name = info->lti_key;
- unsigned int i;
- int rc;
+ char *stripe_name = info->lti_key;
+ struct dt_object *stripe;
+ unsigned int i;
+ int rc;
+
ENTRY;
/* destroy sub-stripe of master object */
RETURN(rc);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
+ continue;
+
rc = lod_sub_ref_del(env, next, th);
if (rc != 0)
RETURN(rc);
snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
- PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
- i);
+ PFID(lu_object_fid(&stripe->do_lu)), i);
CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
PFID(lu_object_fid(&dt->do_lu)), stripe_name,
- PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
+ PFID(lu_object_fid(&stripe->do_lu)));
rc = lod_sub_delete(env, next,
(const struct dt_key *)stripe_name, th);
/* destroy all striped objects */
if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- if (lo->ldo_stripe[i] == NULL)
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
continue;
+
+ if (!dt_object_exists(stripe))
+ continue;
+
if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
i == cfs_fail_val) {
- dt_write_lock(env, lo->ldo_stripe[i],
- MOR_TGT_CHILD);
- rc = lod_sub_ref_del(env, lo->ldo_stripe[i],
- th);
- dt_write_unlock(env, lo->ldo_stripe[i]);
+ dt_write_lock(env, stripe, MOR_TGT_CHILD);
+ rc = lod_sub_ref_del(env, stripe, th);
+ dt_write_unlock(env, stripe);
if (rc != 0)
break;
- rc = lod_sub_destroy(env, lo->ldo_stripe[i],
- th);
+ rc = lod_sub_destroy(env, stripe, th);
if (rc != 0)
break;
}
* NB, ha_count may not equal to ldo_dir_stripe_count, because dir
* layout may change, e.g., shrink dir layout after migration.
*/
- for (i = 0; i < lo->ldo_dir_stripe_count; i++)
- dt_invalidate(env, lo->ldo_stripe[i]);
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ if (lo->ldo_stripe[i])
+ dt_invalidate(env, lo->ldo_stripe[i]);
+ }
slave_locks_size = offsetof(typeof(*slave_locks),
ha_handles[slave_locks->ha_count]);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
struct lustre_handle lockh;
struct ldlm_res_id *res_id;
+ struct dt_object *stripe;
+
+ stripe = lo->ldo_stripe[i];
+ if (!stripe)
+ continue;
res_id = &lod_env_info(env)->lti_res_id;
- fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
- res_id);
+ fid_build_reg_res_name(lu_object_fid(&stripe->do_lu), res_id);
einfo->ei_res_id = res_id;
- LASSERT(lo->ldo_stripe[i] != NULL);
- if (dt_object_remote(lo->ldo_stripe[i])) {
+ if (dt_object_remote(stripe)) {
set_bit(i, (void *)slave_locks->ha_map);
- rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
- einfo, policy);
+ rc = dt_object_lock(env, stripe, &lockh, einfo, policy);
} else {
struct ldlm_namespace *ns = einfo->ei_namespace;
ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;