return (c->lc_opc & CAPA_OPC_INDEX_LOOKUP) == 0;
}
+static inline bool lovea_slot_is_dummy(const struct lov_ost_data_v1 *obj)
+{
+ /* zero area does not care about the bytes-order. */
+ if (obj->l_ost_oi.oi.oi_id == 0 && obj->l_ost_oi.oi.oi_seq == 0 &&
+ obj->l_ost_idx == 0 && obj->l_ost_gen == 0)
+ return true;
+
+ return false;
+}
+
/* lustre_capa::lc_hmac_alg */
enum {
CAPA_HMAC_ALG_SHA1 = 1, /**< sha1 algorithm */
#define LOV_PATTERN_CMOBD 0x200
#define LOV_PATTERN_F_MASK 0xffff0000
+#define LOV_PATTERN_F_HOLE 0x40000000 /* there is hole in LOV EA */
#define LOV_PATTERN_F_RELEASED 0x80000000 /* HSM released file */
#define LOV_MAXPOOLNAME 16
#define OBD_FAIL_LFSCK_NOPFID 0x1617
#define OBD_FAIL_LFSCK_CHANGE_STRIPE 0x1618
#define OBD_FAIL_LFSCK_INVALID_PFID 0x1619
+#define OBD_FAIL_LFSCK_LOST_SPEOBJ 0x161a
+#define OBD_FAIL_LFSCK_DELAY5 0x161b
#define OBD_FAIL_LFSCK_NOTIFY_NET 0x16f0
#define OBD_FAIL_LFSCK_QUERY_NET 0x16f1
};
struct dt_allocation_hint lti_hint;
struct lu_orphan_rec lti_rec;
+ struct lov_user_md lti_lum;
};
/* lfsck_lib.c */
}
}
-static inline bool is_dummy_lov_ost_data(struct lov_ost_data_v1 *obj)
-{
- if (fid_is_zero(&obj->l_ost_oi.oi_fid) &&
- obj->l_ost_gen == 0 && obj->l_ost_idx == 0)
- return true;
-
- return false;
-}
-
static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
const struct lfsck_layout *src)
{
}
/**
+ * Get the system default stripe size.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] lfsck pointer to the lfsck instance
+ * \param[out] size pointer to the default stripe size
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
+ struct lfsck_instance *lfsck,
+ __u32 *size)
+{
+ struct lov_user_md *lum = &lfsck_env_info(env)->lti_lum;
+ struct dt_object *root;
+ int rc;
+
+ root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
+ if (IS_ERR(root))
+ return PTR_ERR(root);
+
+ /* Get the default stripe size via xattr_get on the backend root. */
+ rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
+ XATTR_NAME_LOV, BYPASS_CAPA);
+ if (rc > 0) {
+ /* The lum->lmm_stripe_size is LE mode. The *size also
+ * should be LE mode. So it is unnecessary to convert. */
+ *size = lum->lmm_stripe_size;
+ rc = 0;
+ } else if (unlikely(rc == 0)) {
+ rc = -EINVAL;
+ }
+
+ lfsck_object_put(env, root);
+
+ return rc;
+}
+
+/**
* \retval +1: repaired
* \retval 0: did nothing
* \retval -ve: on error
struct lov_ost_data_v1 *slot,
int fl, __u32 ost_idx)
{
- struct ost_id *oi = &lfsck_env_info(env)->lti_oi;
- int rc;
+ struct ost_id *oi = &lfsck_env_info(env)->lti_oi;
+ struct lov_mds_md_v1 *lmm = buf->lb_buf;
+ int rc;
fid_to_ostid(cfid, oi);
ostid_cpu_to_le(oi, &slot->l_ost_oi);
slot->l_ost_gen = cpu_to_le32(0);
slot->l_ost_idx = cpu_to_le32(ost_idx);
+
+ if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
+ struct lov_ost_data_v1 *objs;
+ int i;
+ __u16 count;
+
+ count = le16_to_cpu(lmm->lmm_stripe_count);
+ if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
+ objs = &lmm->lmm_objects[0];
+ else
+ objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
+ for (i = 0; i < count; i++, objs++) {
+ if (objs != slot && lovea_slot_is_dummy(objs))
+ break;
+ }
+
+ /* If the @slot is the last dummy slot to be refilled,
+ * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
+ if (i == count)
+ lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
+ }
+
rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV, fl, handle,
BYPASS_CAPA);
if (rc == 0)
* \retval -ve: on error
*/
static int lfsck_layout_extend_lovea(const struct lu_env *env,
+ struct lfsck_instance *lfsck,
struct thandle *handle,
struct dt_object *parent,
struct lu_fid *cfid,
struct lov_mds_md_v1 *lmm = buf->lb_buf;
struct lov_ost_data_v1 *objs;
int rc;
+ __u16 count;
ENTRY;
if (fl == LU_XATTR_CREATE || reset) {
- LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1,
- LOV_MAGIC_V1));
+ __u32 pattern = LOV_PATTERN_RAID0;
+
+ count = ea_off + 1;
+ LASSERT(buf->lb_len == lov_mds_md_size(count, LOV_MAGIC_V1));
+
+ if (ea_off != 0 || reset)
+ pattern |= LOV_PATTERN_F_HOLE;
memset(lmm, 0, buf->lb_len);
lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
- /* XXX: currently, we only support LOV_PATTERN_RAID0. */
- lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
+ lmm->lmm_pattern = cpu_to_le32(pattern);
fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
- /* XXX: We cannot know the stripe size,
- * then use the default value (1 MB). */
- lmm->lmm_stripe_size =
- cpu_to_le32(LOV_DESC_STRIPE_SIZE_DEFAULT);
- objs = &(lmm->lmm_objects[ea_off]);
+
+ rc = lfsck_layout_get_def_stripesize(env, lfsck,
+ &lmm->lmm_stripe_size);
+ if (rc != 0)
+ RETURN(rc);
+
+ objs = &lmm->lmm_objects[ea_off];
} else {
- __u16 count = le16_to_cpu(lmm->lmm_stripe_count);
- int gap = ea_off - count;
__u32 magic = le32_to_cpu(lmm->lmm_magic);
+ int gap;
- /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3
- * which has been verified in lfsck_layout_verify_header()
- * already. If some new magic introduced in the future,
- * then layout LFSCK needs to be updated also. */
- if (magic == LOV_MAGIC_V1) {
- objs = &(lmm->lmm_objects[count]);
- } else {
- LASSERT(magic == LOV_MAGIC_V3);
+ count = le16_to_cpu(lmm->lmm_stripe_count);
+ if (magic == LOV_MAGIC_V1)
+ objs = &lmm->lmm_objects[count];
+ else
objs = &((struct lov_mds_md_v3 *)lmm)->
lmm_objects[count];
- }
- if (gap > 0)
+ gap = ea_off - count;
+ if (gap >= 0)
+ count = ea_off + 1;
+ LASSERT(buf->lb_len == lov_mds_md_size(count, magic));
+
+ if (gap > 0) {
memset(objs, 0, gap * sizeof(*objs));
+ lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
+ }
+
lmm->lmm_layout_gen =
cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
objs += gap;
-
- LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1, magic));
}
- lmm->lmm_stripe_count = cpu_to_le16(ea_off + 1);
+ lmm->lmm_stripe_count = cpu_to_le16(count);
rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
fl, ost_idx);
memset(la, 0, sizeof(*la));
la->la_uid = rec->lor_uid;
la->la_gid = rec->lor_gid;
- la->la_mode = S_IFREG | S_IRUSR | S_IWUSR;
+ la->la_mode = S_IFREG | S_IRUSR;
la->la_valid = LA_MODE | LA_UID | LA_GID;
memset(dof, 0, sizeof(*dof));
rc = dt_create(env, pobj, la, NULL, dof, th);
if (rc == 0)
/* 3b. Add layout EA for the MDT-object. */
- rc = lfsck_layout_extend_lovea(env, th, pobj, cfid, ea_buf,
- LU_XATTR_CREATE, ltd->ltd_index,
- ea_off, false);
+ rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
+ ea_buf, LU_XATTR_CREATE,
+ ltd->ltd_index, ea_off, false);
dt_write_unlock(env, pobj);
if (rc < 0)
GOTO(stop, rc);
LASSERT(buf->lb_len >= rc);
buf->lb_len = rc;
- rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
- fl, ost_idx, ea_off, false);
+ rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
+ buf, fl, ost_idx, ea_off, false);
GOTO(unlock_parent, rc);
}
buf->lb_len = rc;
memset(lmm, 0, buf->lb_len);
- rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
- fl, ost_idx, ea_off, true);
+ rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
+ buf, fl, ost_idx, ea_off, true);
GOTO(unlock_parent, rc);
}
* be updated also. */
magic = le32_to_cpu(lmm->lmm_magic);
if (magic == LOV_MAGIC_V1) {
- objs = &(lmm->lmm_objects[0]);
+ objs = &lmm->lmm_objects[0];
} else {
LASSERT(magic == LOV_MAGIC_V3);
objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
goto again;
buf->lb_len = rc;
- rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
- fl, ost_idx, ea_off, false);
+ rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
+ buf, fl, ost_idx, ea_off, false);
+
GOTO(unlock_parent, rc);
}
for (i = 0; i < count; i++, objs++) {
/* The MDT-object was created via lfsck_layout_recover_create()
* by others before, and we fill the dummy layout EA. */
- if (is_dummy_lov_ost_data(objs)) {
+ if (lovea_slot_is_dummy(objs)) {
if (i != ea_off)
continue;
dt_trans_stop(env, dt, handle);
lfsck_layout_unlock(&lh);
if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
- objs = &(lmm->lmm_objects[ea_off]);
+ objs = &lmm->lmm_objects[ea_off];
else
objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
* be updated also. */
magic = le32_to_cpu(lmm->lmm_magic);
if (magic == LOV_MAGIC_V1) {
- objs = &(lmm->lmm_objects[0]);
+ objs = &lmm->lmm_objects[0];
} else {
LASSERT(magic == LOV_MAGIC_V3);
objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
lmm = buf->lb_buf;
magic = le32_to_cpu(lmm->lmm_magic);
if (magic == LOV_MAGIC_V1) {
- objs = &(lmm->lmm_objects[0]);
+ objs = &lmm->lmm_objects[0];
} else {
LASSERT(magic == LOV_MAGIC_V3);
objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
struct lu_fid *tfid = &info->lti_fid2;
struct ost_id *oi = &info->lti_oi;
- if (is_dummy_lov_ost_data(objs))
+ if (lovea_slot_is_dummy(objs))
continue;
ostid_le_to_cpu(&objs->l_ost_oi, oi);
* be updated also. */
magic = le32_to_cpu(lmm->lmm_magic);
if (magic == LOV_MAGIC_V1) {
- objs = &(lmm->lmm_objects[0]);
+ objs = &lmm->lmm_objects[0];
} else {
LASSERT(magic == LOV_MAGIC_V3);
objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
* be updated also. */
magic = le32_to_cpu(lmm->lmm_magic);
if (magic == LOV_MAGIC_V1) {
- objs = &(lmm->lmm_objects[0]);
+ objs = &lmm->lmm_objects[0];
} else {
LASSERT(magic == LOV_MAGIC_V3);
objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
le32_to_cpu(objs->l_ost_idx);
bool wakeup = false;
- if (is_dummy_lov_ost_data(objs))
+ if (unlikely(lovea_slot_is_dummy(objs)))
continue;
l_wait_event(mthread->t_ctl_waitq,
LASSERT(llsd != NULL);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) &&
+ cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) {
+ struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(1),
+ NULL, NULL);
+ struct ptlrpc_thread *thread = &lfsck->li_thread;
+
+ l_wait_event(thread->t_ctl_waitq,
+ !thread_is_running(thread),
+ &lwi);
+ }
+
lfsck_rbtree_update_bitmap(env, com, fid, false);
down_write(&com->lc_sem);
OBD_CONNECT_LAYOUTLOCK | OBD_CONNECT_PINGLESS |
OBD_CONNECT_MAX_EASIZE |
OBD_CONNECT_FLOCK_DEAD |
- OBD_CONNECT_DISP_STRIPE;
+ OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK;
if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
data->ocd_connect_flags |= OBD_CONNECT_SOM;
OBD_CONNECT_MAXBYTES |
OBD_CONNECT_EINPROGRESS |
OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
- OBD_CONNECT_LAYOUTLOCK | OBD_CONNECT_PINGLESS;
+ OBD_CONNECT_LAYOUTLOCK |
+ OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK;
if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
data->ocd_connect_flags |= OBD_CONNECT_SOM;
RETURN(-ENOMEM);
for (i = 0; i < lo->ldo_stripenr; i++) {
+ if (unlikely(lovea_slot_is_dummy(&objs[i])))
+ continue;
+
ostid_le_to_cpu(&objs[i].l_ost_oi, &info->lti_ostid);
idx = le32_to_cpu(objs[i].l_ost_idx);
rc = ostid_to_fid(&info->lti_fid, &info->lti_ostid, idx);
*/
LASSERT(lo->ldo_stripe);
for (i = 0; i < lo->ldo_stripenr; i++) {
- LASSERT(lo->ldo_stripe[i]);
-
- rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
- if (rc) {
- CERROR("failed declaration: %d\n", rc);
- break;
+ if (likely(lo->ldo_stripe[i] != NULL)) {
+ rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
+ handle);
+ if (rc != 0) {
+ CERROR("failed declaration: %d\n", rc);
+ break;
+ }
}
}
*/
LASSERT(lo->ldo_stripe);
for (i = 0; i < lo->ldo_stripenr; i++) {
- LASSERT(lo->ldo_stripe[i]);
- if (dt_object_exists(lo->ldo_stripe[i]) == 0)
- continue;
- rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
- if (rc) {
- CERROR("failed declaration: %d\n", rc);
- break;
+ if (likely(lo->ldo_stripe[i] != NULL)) {
+ if (dt_object_exists(lo->ldo_stripe[i]) == 0)
+ continue;
+
+ rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
+ handle, capa);
+ if (rc != 0) {
+ CERROR("failed declaration: %d\n", rc);
+ break;
+ }
}
}
rc = 0;
v1 = info->lti_ea_store;
- if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
+ if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
lustre_swab_lov_user_md_v1(v1);
- else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
+ } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
+ v3 = (struct lov_user_md_v3 *)v1;
lustre_swab_lov_user_md_v3(v3);
+ }
if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
GOTO(unlock, rc = 0);
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
RETURN(0);
- /* declare destroy for all underlying objects */
+ /* declare destroy all striped objects */
for (i = 0; i < lo->ldo_stripenr; i++) {
- LASSERT(lo->ldo_stripe[i]);
- rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
- if (rc != 0)
- break;
+ if (likely(lo->ldo_stripe[i] != NULL)) {
+ rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
+ if (rc != 0)
+ break;
+ }
}
RETURN(rc);
/* destroy all striped objects */
for (i = 0; i < lo->ldo_stripenr; i++) {
- LASSERT(lo->ldo_stripe[i]);
- rc = dt_destroy(env, lo->ldo_stripe[i], th);
- if (rc != 0)
- break;
+ if (likely(lo->ldo_stripe[i] != NULL) &&
+ (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
+ i == cfs_fail_val)) {
+ rc = dt_destroy(env, lo->ldo_stripe[i], th);
+ if (rc != 0)
+ break;
+ }
}
RETURN(rc);
ostid_le_to_cpu(&lmm->lmm_objects[i].l_ost_oi, &loi->loi_oi);
loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
if (loi->loi_ost_idx >= lov->desc.ld_tgt_count) {
CERROR("OST index %d more than OST count %d\n",
loi->loi_ost_idx, lov->desc.ld_tgt_count);
return -E2BIG;
for (i = 0; i < stripe_count; i++) {
- /* XXX LOV STACKING call down to osc_unpackmd() */
- loi = lsm->lsm_oinfo[i];
+ /* XXX LOV STACKING call down to osc_unpackmd() */
+ loi = lsm->lsm_oinfo[i];
ostid_le_to_cpu(&lmm->lmm_objects[i].l_ost_oi, &loi->loi_oi);
- loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
- loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
+ loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
+ loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
if (loi->loi_ost_idx >= lov->desc.ld_tgt_count) {
CERROR("OST index %d more than OST count %d\n",
loi->loi_ost_idx, lov->desc.ld_tgt_count);
return lsm;
}
+static inline bool lov_oinfo_is_dummy(const struct lov_oinfo *loi)
+{
+ if (unlikely(loi->loi_oi.oi.oi_id == 0 &&
+ loi->loi_oi.oi.oi_seq == 0 &&
+ loi->loi_ost_idx == 0 &&
+ loi->loi_ost_gen == 0))
+ return true;
+
+ return false;
+}
+
#endif
LASSERT(sub->sub_stripe < lio->lis_stripe_count);
ENTRY;
+ if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL))
+ RETURN(-EIO);
+
result = 0;
sub->sub_io_initialized = 0;
sub->sub_borrowed = 0;
RETURN(result);
}
-static void lov_io_slice_init(struct lov_io *lio,
- struct lov_object *obj, struct cl_io *io)
+static int lov_io_slice_init(struct lov_io *lio,
+ struct lov_object *obj, struct cl_io *io)
{
ENTRY;
lio->lis_io_endpos = lio->lis_endpos;
if (cl_io_is_append(io)) {
LASSERT(io->ci_type == CIT_WRITE);
+
+ /* If there is LOV EA hole, then we may cannot locate
+ * the current file-tail exactly. */
+ if (unlikely(obj->lo_lsm->lsm_pattern &
+ LOV_PATTERN_F_HOLE))
+ RETURN(-EIO);
+
lio->lis_pos = 0;
lio->lis_endpos = OBD_OBJECT_EOF;
}
LBUG();
}
- EXIT;
+ RETURN(0);
}
static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
endpos, &start, &end))
continue;
+ if (unlikely(lov_r0(lio->lis_object)->lo_sub[stripe] == NULL)) {
+ if (ios->cis_io->ci_type == CIT_READ ||
+ ios->cis_io->ci_type == CIT_WRITE ||
+ ios->cis_io->ci_type == CIT_FAULT)
+ RETURN(-EIO);
+
+ continue;
+ }
+
end = lov_offset_mod(end, +1);
sub = lov_sub_get(env, lio, stripe);
if (!IS_ERR(sub)) {
ENTRY;
INIT_LIST_HEAD(&lio->lis_active);
- lov_io_slice_init(lio, lov, io);
+ io->ci_result = lov_io_slice_init(lio, lov, io);
+ if (io->ci_result != 0)
+ RETURN(io->ci_result);
+
if (io->ci_result == 0) {
io->ci_result = lov_io_subio_init(env, lio, io);
if (io->ci_result == 0) {
* XXX for wide striping smarter algorithm is desirable,
* breaking out of the loop, early.
*/
- if (lov_stripe_intersects(loo->lo_lsm, i,
- file_start, file_end, &start, &end))
+ if (likely(r0->lo_sub[i] != NULL) &&
+ lov_stripe_intersects(loo->lo_lsm, i,
+ file_start, file_end, &start, &end))
nr++;
}
LASSERT(nr > 0);
* top-lock.
*/
for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
- if (lov_stripe_intersects(loo->lo_lsm, i,
- file_start, file_end, &start, &end)) {
+ if (likely(r0->lo_sub[i] != NULL) &&
+ lov_stripe_intersects(loo->lo_lsm, i,
+ file_start, file_end, &start, &end)) {
struct cl_lock_descr *descr;
descr = &lck->lls_sub[nr].sub_descr;
*/
start = cl_offset(&lov->lo_cl, descr->cld_start);
end = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1;
- result = end - start <= lsm->lsm_stripe_size &&
- stripe == lov_stripe_number(lsm, start) &&
- stripe == lov_stripe_number(lsm, end);
- if (result) {
+
+ result = 0;
+ /* glimpse should work on the object with LOV EA hole. */
+ if ((end - start <= lsm->lsm_stripe_size) ||
+ (descr->cld_end == CL_PAGE_EOF &&
+ unlikely(lov->lo_lsm->lsm_pattern & LOV_PATTERN_F_HOLE))) {
+ int idx;
+
+ idx = lov_stripe_number(lsm, start);
+ if (idx == stripe ||
+ unlikely(lov_r0(lov)->lo_sub[idx] == NULL)) {
+ idx = lov_stripe_number(lsm, end);
+ if (idx == stripe ||
+ unlikely(lov_r0(lov)->lo_sub[idx] == NULL))
+ result = 1;
+ }
+ }
+
+ if (result != 0) {
struct cl_lock_descr *subd = &lov_env_info(env)->lti_ldescr;
obd_off sub_start;
obd_off sub_end;
GOTO(out, rc = -EINVAL);
for (i = 0; i < lsm->lsm_stripe_count; i++) {
- if (lsm->lsm_oinfo[i]->loi_ost_idx == ost_idx) {
- if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) !=
+ struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
+ if (loi->loi_ost_idx == ost_idx) {
+ if (ostid_id(&loi->loi_oi) !=
ostid_id(&src_oa->o_oi))
GOTO(out, rc = -EINVAL);
break;
struct lov_stripe_md submd;
struct lov_oinfo *loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx]) {
- CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
- continue;
- }
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
+ if (!lov->lov_tgts[loi->loi_ost_idx]) {
+ CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
+ continue;
+ }
submd.lsm_oi = loi->loi_oi;
submd.lsm_stripe_count = 0;
struct lov_stripe_md submd;
struct lov_oinfo *loi = lsm->lsm_oinfo[i];
- if (!lov->lov_tgts[loi->loi_ost_idx]) {
- CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
- continue;
- }
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
+ if (!lov->lov_tgts[loi->loi_ost_idx]) {
+ CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
+ continue;
+ }
+
submd.lsm_oi = loi->loi_oi;
submd.lsm_stripe_count = 0;
rc = obd_find_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
fiemap->fm_extents[0].fe_logical == 0)
return 0;
- /* Find out stripe_no from ost_index saved in the fe_device */
- for (i = 0; i < lsm->lsm_stripe_count; i++) {
- if (lsm->lsm_oinfo[i]->loi_ost_idx ==
- fiemap->fm_extents[0].fe_device) {
- stripe_no = i;
- break;
- }
- }
+ /* Find out stripe_no from ost_index saved in the fe_device */
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
+
+ if (lov_oinfo_is_dummy(oinfo))
+ continue;
+
+ if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
+ stripe_no = i;
+ break;
+ }
+ }
+
if (stripe_no == -1)
return -EINVAL;
&lun_start, &obd_object_end)) == 0)
continue;
+ if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe]))
+ GOTO(out, rc = -EIO);
+
/* If this is a continuation FIEMAP call and we are on
* starting stripe then lun_start needs to be set to
* fm_end_offset */
* be NULL and won't match the lock's export. */
for (i = 0; i < lsm->lsm_stripe_count; i++) {
loi = lsm->lsm_oinfo[i];
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
if (!lov->lov_tgts[loi->loi_ost_idx])
continue;
if (lov->lov_tgts[loi->loi_ost_idx]->ltd_exp ==
struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
int ost_idx = oinfo->loi_ost_idx;
+ if (lov_oinfo_is_dummy(oinfo))
+ continue;
+
result = ostid_to_fid(ofid, &oinfo->loi_oi,
oinfo->loi_ost_idx);
if (result != 0)
LASSERT(lsm != NULL);
for (i = 0; i < lsm->lsm_stripe_count; i++) {
struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
if (loi->loi_ar.ar_rc && !rc)
rc = loi->loi_ar.ar_rc;
loi->loi_ar.ar_rc = 0;
struct lov_request *req;
loi = oinfo->oi_md->lsm_oinfo[i];
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
struct lov_request *req;
loi = lsm->lsm_oinfo[i];
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
struct lov_request *req;
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
continue;
rc = mo_xattr_get(info->mti_env, next, buf, name);
if (rc > 0) {
+
+got:
if (strcmp(name, XATTR_NAME_LOV) == 0) {
- ma->ma_lmm_size = rc;
- ma->ma_valid |= MA_LOV;
+ if (info->mti_big_lmm_used)
+ ma->ma_lmm = info->mti_big_lmm;
+
+ /* NOT return LOV EA with hole to old client. */
+ if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
+ LOV_PATTERN_F_HOLE) &&
+ !(exp_connect_flags(info->mti_exp) &
+ OBD_CONNECT_LFSCK)) {
+ return -EIO;
+ } else {
+ ma->ma_lmm_size = rc;
+ ma->ma_valid |= MA_LOV;
+ }
} else if (strcmp(name, XATTR_NAME_LMV) == 0) {
+ if (info->mti_big_lmm_used)
+ ma->ma_lmv = info->mti_big_lmm;
+
ma->ma_lmv_size = rc;
ma->ma_valid |= MA_LMV;
} else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
ma->ma_valid |= MA_LMV_DEF;
}
+ /* Update mdt_max_mdsize so all clients will be aware that */
+ if (info->mti_mdt->mdt_max_mdsize < rc)
+ info->mti_mdt->mdt_max_mdsize = rc;
+
rc = 0;
} else if (rc == -ENODATA) {
/* no LOV EA */
rc = mdt_big_xattr_get(info, o, name);
if (rc > 0) {
info->mti_big_lmm_used = 1;
- if (!strcmp(name, XATTR_NAME_LOV)) {
- ma->ma_valid |= MA_LOV;
- ma->ma_lmm = info->mti_big_lmm;
- ma->ma_lmm_size = rc;
- } else if (!strcmp(name, XATTR_NAME_LMV)) {
- ma->ma_valid |= MA_LMV;
- ma->ma_lmv = info->mti_big_lmm;
- ma->ma_lmv_size = rc;
- } else {
- return -EINVAL;
- }
-
- /* update mdt_max_mdsize so all clients
- * will be aware about that */
- if (info->mti_mdt->mdt_max_mdsize < rc)
- info->mti_mdt->mdt_max_mdsize = rc;
- rc = 0;
+ goto got;
}
}
}
run_test 19b "OST-object inconsistency self repair"
+test_20() {
+ [ $OSTCOUNT -lt 2 ] &&
+ skip "The test needs at least 2 OSTs" && return
+
+ echo "#####"
+ echo "The target MDT-object and some of its OST-object are lost."
+ echo "The LFSCK should find out the left OST-objects and re-create"
+ echo "the MDT-object under the direcotry .lustre/lost+found/MDTxxxx/"
+ echo "with the partial OST-objects (LOV EA hole)."
+
+ echo "New client can access the file with LOV EA hole via normal"
+ echo "system tools or commands without crash the system."
+
+ echo "For old client, even though it cannot access the file with"
+ echo "LOV EA hole, it should not cause the system crash."
+ echo "#####"
+
+ check_mount_and_prep
+ $LFS mkdir -i 0 $DIR/$tdir/a1
+ if [ $OSTCOUNT -gt 2 ]; then
+ $LFS setstripe -c 3 -i 0 -s 1M $DIR/$tdir/a1
+ bcount=513
+ else
+ $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a1
+ bcount=257
+ fi
+
+ # 256 blocks on the stripe0.
+ # 1 block on the stripe1 for 2 OSTs case.
+ # 256 blocks on the stripe1 for other cases.
+ # 1 block on the stripe2 if OSTs > 2
+ dd if=/dev/zero of=$DIR/$tdir/a1/f0 bs=4096 count=$bcount
+ dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=4096 count=$bcount
+ dd if=/dev/zero of=$DIR/$tdir/a1/f2 bs=4096 count=$bcount
+
+ local fid0=$($LFS path2fid $DIR/$tdir/a1/f0)
+ local fid1=$($LFS path2fid $DIR/$tdir/a1/f1)
+ local fid2=$($LFS path2fid $DIR/$tdir/a1/f2)
+
+ echo ${fid0}
+ $LFS getstripe $DIR/$tdir/a1/f0
+ echo ${fid1}
+ $LFS getstripe $DIR/$tdir/a1/f1
+ echo ${fid2}
+ $LFS getstripe $DIR/$tdir/a1/f2
+
+ if [ $OSTCOUNT -gt 2 ]; then
+ dd if=/dev/zero of=$DIR/$tdir/a1/f3 bs=4096 count=$bcount
+ fid3=$($LFS path2fid $DIR/$tdir/a1/f3)
+ echo ${fid3}
+ $LFS getstripe $DIR/$tdir/a1/f3
+ fi
+
+ cancel_lru_locks osc
+
+ echo "Inject failure..."
+ echo "To simulate f0 lost MDT-object"
+ #define OBD_FAIL_LFSCK_LOST_MDTOBJ 0x1616
+ do_facet mds1 $LCTL set_param fail_loc=0x1616
+ rm -f $DIR/$tdir/a1/f0
+
+ echo "To simulate f1 lost MDT-object and OST-object0"
+ #define OBD_FAIL_LFSCK_LOST_SPEOBJ 0x161a
+ do_facet mds1 $LCTL set_param fail_loc=0x161a
+ rm -f $DIR/$tdir/a1/f1
+
+ echo "To simulate f2 lost MDT-object and OST-object1"
+ do_facet mds1 $LCTL set_param fail_val=1
+ rm -f $DIR/$tdir/a1/f2
+
+ if [ $OSTCOUNT -gt 2 ]; then
+ echo "To simulate f3 lost MDT-object and OST-object2"
+ do_facet mds1 $LCTL set_param fail_val=2
+ rm -f $DIR/$tdir/a1/f3
+ fi
+
+ umount_client $MOUNT
+ sync
+ sleep 2
+ do_facet mds1 $LCTL set_param fail_loc=0 fail_val=0
+
+ echo "Inject failure to slow down the LFSCK on OST0"
+ #define OBD_FAIL_LFSCK_DELAY5 0x161b
+ do_facet ost1 $LCTL set_param fail_loc=0x161b
+
+ echo "Trigger layout LFSCK on all devices to find out orphan OST-object"
+ $START_LAYOUT -r -o || error "(1) Fail to start LFSCK for layout!"
+
+ sleep 3
+ do_facet ost1 $LCTL set_param fail_loc=0
+
+ for k in $(seq $MDSCOUNT); do
+ # The LFSCK status query internal is 30 seconds. For the case
+ # of some LFSCK_NOTIFY RPCs failure/lost, we will wait enough
+ # time to guarantee the status sync up.
+ wait_update_facet mds${k} "$LCTL get_param -n \
+ mdd.$(facet_svc mds${k}).lfsck_layout |
+ awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+ error "(2) MDS${k} is not the expected 'completed'"
+ done
+
+ for k in $(seq $OSTCOUNT); do
+ local cur_status=$(do_facet ost${k} $LCTL get_param -n \
+ obdfilter.$(facet_svc ost${k}).lfsck_layout |
+ awk '/^status/ { print $2 }')
+ [ "$cur_status" == "completed" ] ||
+ error "(3) OST${k} Expect 'completed', but got '$cur_status'"
+ done
+
+ local repaired=$(do_facet mds1 $LCTL get_param -n \
+ mdd.$(facet_svc mds1).lfsck_layout |
+ awk '/^repaired_orphan/ { print $2 }')
+ if [ $OSTCOUNT -gt 2 ]; then
+ [ $repaired -eq 9 ] ||
+ error "(4.1) Expect 9 fixed on mds1, but got: $repaired"
+ else
+ [ $repaired -eq 4 ] ||
+ error "(4.2) Expect 4 fixed on mds1, but got: $repaired"
+ fi
+
+ mount_client $MOUNT || error "(5.0) Fail to start client!"
+
+ LOV_PATTERN_F_HOLE=0x40000000
+
+ #
+ # R-${fid0} is the old f0
+ #
+ local name="$MOUNT/.lustre/lost+found/MDT0000/R-${fid0}"
+ echo "Check $name, which is the old f0"
+
+ $LFS getstripe -v $name || error "(5.1) cannot getstripe on $name"
+
+ local pattern=0x$($LFS getstripe -L $name)
+ [[ $((pattern & LOV_PATTERN_F_HOLE)) -eq 0 ]] ||
+ error "(5.2) NOT expect pattern flag hole, but got $pattern"
+
+ local stripes=$($LFS getstripe -c $name)
+ if [ $OSTCOUNT -gt 2 ]; then
+ [ $stripes -eq 3 ] ||
+ error "(5.3.1) expect the stripe count is 3, but got $stripes"
+ else
+ [ $stripes -eq 2 ] ||
+ error "(5.3.2) expect the stripe count is 2, but got $stripes"
+ fi
+
+ local size=$(stat $name | awk '/Size:/ { print $2 }')
+ [ $size -eq $((4096 * $bcount)) ] ||
+ error "(5.4) expect the size $((4096 * $bcount)), but got $size"
+
+ cat $name > /dev/null || error "(5.5) cannot read $name"
+
+ echo "dummy" >> $name || error "(5.6) cannot write $name"
+
+ chown $RUNAS_ID:$RUNAS_GID $name || error "(5.7) cannot chown on $name"
+
+ touch $name || error "(5.8) cannot touch $name"
+
+ rm -f $name || error "(5.9) cannot unlink $name"
+
+ #
+ # R-${fid1} contains the old f1's stripe1 (and stripe2 if OSTs > 2)
+ #
+ name="$MOUNT/.lustre/lost+found/MDT0000/R-${fid1}"
+ if [ $OSTCOUNT -gt 2 ]; then
+ echo "Check $name, it contains the old f1's stripe1 and stripe2"
+ else
+ echo "Check $name, it contains the old f1's stripe1"
+ fi
+
+ $LFS getstripe -v $name || error "(6.1) cannot getstripe on $name"
+
+ pattern=0x$($LFS getstripe -L $name)
+ [[ $((pattern & LOV_PATTERN_F_HOLE)) -ne 0 ]] ||
+ error "(6.2) expect pattern flag hole, but got $pattern"
+
+ stripes=$($LFS getstripe -c $name)
+ if [ $OSTCOUNT -gt 2 ]; then
+ [ $stripes -eq 3 ] ||
+ error "(6.3.1) expect the stripe count is 3, but got $stripes"
+ else
+ [ $stripes -eq 2 ] ||
+ error "(6.3.2) expect the stripe count is 2, but got $stripes"
+ fi
+
+ size=$(stat $name | awk '/Size:/ { print $2 }')
+ [ $size -eq $((4096 * $bcount)) ] ||
+ error "(6.4) expect the size $((4096 * $bcount)), but got $size"
+
+ cat $name > /dev/null && error "(6.5) normal read $name should fail"
+
+ local failures=$(dd if=$name of=$DIR/$tdir/dump conv=sync,noerror \
+ bs=4096 2>&1 | grep "Input/output error" | wc -l)
+
+ # stripe0 is dummy
+ [ $failures -eq 256 ] ||
+ error "(6.6) expect 256 IO failures, but get $failures"
+
+ size=$(stat $DIR/$tdir/dump | awk '/Size:/ { print $2 }')
+ [ $size -eq $((4096 * $bcount)) ] ||
+ error "(6.7) expect the size $((4096 * $bcount)), but got $size"
+
+ dd if=/dev/zero of=$name conv=sync,notrunc bs=4096 count=1 &&
+ error "(6.8) write to the LOV EA hole should fail"
+
+ dd if=/dev/zero of=$name conv=sync,notrunc bs=4096 count=1 seek=300 ||
+ error "(6.9) write to normal stripe should NOT fail"
+
+ echo "foo" >> $name && error "(6.10) append write $name should fail"
+
+ chown $RUNAS_ID:$RUNAS_GID $name || error "(6.11) cannot chown on $name"
+
+ touch $name || error "(6.12) cannot touch $name"
+
+ rm -f $name || error "(6.13) cannot unlink $name"
+
+ #
+ # R-${fid2} it contains the old f2's stripe0 (and stripe2 if OSTs > 2)
+ #
+ name="$MOUNT/.lustre/lost+found/MDT0000/R-${fid2}"
+ if [ $OSTCOUNT -gt 2 ]; then
+ echo "Check $name, it contains the old f2's stripe0 and stripe2"
+ else
+ echo "Check $name, it contains the old f2's stripe0"
+ fi
+
+ $LFS getstripe -v $name || error "(7.1) cannot getstripe on $name"
+
+ pattern=0x$($LFS getstripe -L $name)
+ stripes=$($LFS getstripe -c $name)
+ size=$(stat $name | awk '/Size:/ { print $2 }')
+ if [ $OSTCOUNT -gt 2 ]; then
+ [[ $((pattern & LOV_PATTERN_F_HOLE)) -ne 0 ]] ||
+ error "(7.2.1) expect pattern flag hole, but got $pattern"
+
+ [ $stripes -eq 3 ] ||
+ error "(7.3.1) expect the stripe count is 3, but got $stripes"
+
+ [ $size -eq $((4096 * $bcount)) ] ||
+ error "(7.4.1) expect size $((4096 * $bcount)), but got $size"
+
+ cat $name > /dev/null &&
+ error "(7.5.1) normal read $name should fail"
+
+ failures=$(dd if=$name of=$DIR/$tdir/dump conv=sync,noerror \
+ bs=4096 2>&1 | grep "Input/output error" | wc -l)
+ # stripe1 is dummy
+ [ $failures -eq 256 ] ||
+ error "(7.6) expect 256 IO failures, but get $failures"
+
+ size=$(stat $DIR/$tdir/dump | awk '/Size:/ { print $2 }')
+ [ $size -eq $((4096 * $bcount)) ] ||
+ error "(7.7) expect the size $((4096 * $bcount)), but got $size"
+
+ dd if=/dev/zero of=$name conv=sync,notrunc bs=4096 count=1 \
+ seek=300 && error "(7.8.0) write to the LOV EA hole should fail"
+
+ dd if=/dev/zero of=$name conv=sync,notrunc bs=4096 count=1 ||
+ error "(7.8.1) write to normal stripe should NOT fail"
+
+ echo "foo" >> $name &&
+ error "(7.8.3) append write $name should fail"
+
+ chown $RUNAS_ID:$RUNAS_GID $name ||
+ error "(7.9.1) cannot chown on $name"
+
+ touch $name || error "(7.10.1) cannot touch $name"
+ else
+ [[ $((pattern & LOV_PATTERN_F_HOLE)) -eq 0 ]] ||
+ error "(7.2.2) NOT expect pattern flag hole, but got $pattern"
+
+ [ $stripes -eq 1 ] ||
+ error "(7.3.2) expect the stripe count is 1, but got $stripes"
+
+ # stripe1 is dummy
+ [ $size -eq $((4096 * (256 + 0))) ] ||
+ error "(7.4.2) expect the size $((4096 * 256)), but got $size"
+
+ cat $name > /dev/null || error "(7.5.2) cannot read $name"
+
+ echo "dummy" >> $name || error "(7.8.2) cannot write $name"
+
+ chown $RUNAS_ID:$RUNAS_GID $name ||
+ error "(7.9.2) cannot chown on $name"
+
+ touch $name || error "(7.10.2) cannot touch $name"
+ fi
+
+ rm -f $name || error "(7.11) cannot unlink $name"
+
+ [ $OSTCOUNT -le 2 ] && return
+
+ #
+ # R-${fid3} should contains the old f3's stripe0 and stripe1
+ #
+ name="$MOUNT/.lustre/lost+found/MDT0000/R-${fid3}"
+ echo "Check $name, which contains the old f3's stripe0 and stripe1"
+
+ $LFS getstripe -v $name || error "(8.1) cannot getstripe on $name"
+
+ pattern=0x$($LFS getstripe -L $name)
+ [[ $((pattern & LOV_PATTERN_F_HOLE)) -eq 0 ]] ||
+ error "(8.2) NOT expect pattern flag hole, but got $pattern"
+
+ stripes=$($LFS getstripe -c $name)
+ # LFSCK does not know the old f3 had 3 stripes.
+ # It only tries to find as much as possible.
+ # The stripe count depends on the last stripe's offset.
+ [ $stripes -eq 2 ] ||
+ error "(8.3) expect the stripe count is 2, but got $stripes"
+
+ size=$(stat $name | awk '/Size:/ { print $2 }')
+ # stripe2 is lost
+ [ $size -eq $((4096 * (256 + 256 + 0))) ] ||
+ error "(8.4) expect the size $((4096 * 512)), but got $size"
+
+ cat $name > /dev/null || error "(8.5) cannot read $name"
+
+ echo "dummy" >> $name || error "(8.6) cannot write $name"
+
+ chown $RUNAS_ID:$RUNAS_GID $name ||
+ error "(8.7) cannot chown on $name"
+
+ touch $name || error "(8.8) cannot touch $name"
+
+ rm -f $name || error "(8.9) cannot unlink $name"
+}
+run_test 20 "Handle the orphan with dummy LOV EA slot properly"
+
$LCTL set_param debug=-lfsck > /dev/null || true
# restore MDS/OST size