Whamcloud - gitweb
LU-11907 dne: allow access to striped dir with broken layout 50/34750/4
authorLai Siyao <lai.siyao@whamcloud.com>
Sun, 14 Apr 2019 20:12:54 +0000 (04:12 +0800)
committerOleg Drokin <green@whamcloud.com>
Wed, 29 May 2019 04:24:55 +0000 (04:24 +0000)
Sometimes the layout of striped directories may become broken:
* creation/unlink is partially executed on some MDT.
* disk failure or stopped MDS cause some stripe inaccessible.
* software bugs.

In this situation, this directory should still be accessible,
and specially be able to migrate to other active MDTs.

This patch add this support on both server and client: don't
imply stripe FID is sane, and when stripe doesn't exist, skip
it.

Add OBD_FAIL_MDS_STRIPE_FID to simulate insane stripe FID, and
OBD_FAIL_MDS_STRIPE_CREATE to simulate stripe creation failure.

Add sanity 60h.

Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: I8a05a0e0cef8b051a935b3fa3d3e26c0b6ef3b4a
Reviewed-on: https://review.whamcloud.com/34750
Tested-by: Jenkins
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/obd_support.h
lustre/llite/dir.c
lustre/llite/llite_lib.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_reint.c
lustre/tests/sanity.sh

index be2b3bb..0aea7c2 100644 (file)
@@ -266,6 +266,8 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS 0x185
 #define OBD_FAIL_MDS_GET_INFO_NET        0x186
 #define OBD_FAIL_MDS_DQACQ_NET           0x187
+#define OBD_FAIL_MDS_STRIPE_CREATE      0x188
+#define OBD_FAIL_MDS_STRIPE_FID                 0x189
 
 /* OI scrub */
 #define OBD_FAIL_OSD_SCRUB_DELAY                       0x190
index 9fff515..914fff0 100644 (file)
@@ -342,11 +342,13 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
                GOTO(out, rc = 0);
 
        if (unlikely(ll_i2info(inode)->lli_lsm_md != NULL)) {
-               /* This is only needed for striped dir to fill ..,
-                * see lmv_read_entry */
+               /*
+                * This is only needed for striped dir to fill ..,
+                * see lmv_read_page()
+                */
                if (file_dentry(filp)->d_parent != NULL &&
                    file_dentry(filp)->d_parent->d_inode != NULL) {
-                       __u64 ibits = MDS_INODELOCK_UPDATE;
+                       __u64 ibits = MDS_INODELOCK_LOOKUP;
                        struct inode *parent =
                                file_dentry(filp)->d_parent->d_inode;
 
@@ -1587,12 +1589,15 @@ out:
                        struct lu_fid   fid;
 
                        fid_le_to_cpu(&fid, &lmm->lmv_md_v1.lmv_stripe_fids[i]);
-                       mdt_index = ll_get_mdt_idx_by_fid(sbi, &fid);
-                       if (mdt_index < 0)
-                               GOTO(out_tmp, rc = mdt_index);
+                       if (fid_is_sane(&fid)) {
+                               mdt_index = ll_get_mdt_idx_by_fid(sbi, &fid);
+                               if (mdt_index < 0)
+                                       GOTO(out_tmp, rc = mdt_index);
+
+                               tmp->lum_objects[i].lum_mds = mdt_index;
+                               tmp->lum_objects[i].lum_fid = fid;
+                       }
 
-                       tmp->lum_objects[i].lum_mds = mdt_index;
-                       tmp->lum_objects[i].lum_fid = fid;
                        tmp->lum_stripe_count++;
                }
 
index 11d6169..ff64d19 100644 (file)
@@ -1362,6 +1362,10 @@ static int ll_init_lsm_md(struct inode *inode, struct lustre_md *md)
        for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
                fid = &lsm->lsm_md_oinfo[i].lmo_fid;
                LASSERT(lsm->lsm_md_oinfo[i].lmo_root == NULL);
+
+               if (!fid_is_sane(fid))
+                       continue;
+
                /* Unfortunately ll_iget will call ll_update_inode,
                 * where the initialization of slave inode is slightly
                 * different, so it reset lsm_md to NULL to avoid
index bb94c26..75be491 100644 (file)
@@ -156,13 +156,14 @@ int lmv_revalidate_slaves(struct obd_export *exp,
                          ldlm_blocking_callback cb_blocking,
                          int extra_lock_flags)
 {
-       struct obd_device      *obd = exp->exp_obd;
-       struct lmv_obd         *lmv = &obd->u.lmv;
-       struct ptlrpc_request   *req = NULL;
-       struct mdt_body         *body;
-       struct md_op_data      *op_data;
-       int                     i;
-       int                     rc = 0;
+       struct obd_device *obd = exp->exp_obd;
+       struct lmv_obd *lmv = &obd->u.lmv;
+       struct ptlrpc_request *req = NULL;
+       struct mdt_body *body;
+       struct md_op_data *op_data;
+       int i;
+       int valid_stripe_count = 0;
+       int rc = 0;
 
        ENTRY;
 
@@ -188,6 +189,9 @@ int lmv_revalidate_slaves(struct obd_export *exp,
                fid = lsm->lsm_md_oinfo[i].lmo_fid;
                inode = lsm->lsm_md_oinfo[i].lmo_root;
 
+               if (!inode)
+                       continue;
+
                /*
                 * Prepare op_data for revalidating. Note that @fid2 shluld be
                 * defined otherwise it will go to server and take new lock
@@ -211,6 +215,12 @@ int lmv_revalidate_slaves(struct obd_export *exp,
 
                rc = md_intent_lock(tgt->ltd_exp, op_data, &it, &req,
                                    cb_blocking, extra_lock_flags);
+               if (rc == -ENOENT) {
+                       /* skip stripe is not exists */
+                       rc = 0;
+                       continue;
+               }
+
                if (rc < 0)
                        GOTO(cleanup, rc);
 
@@ -246,17 +256,22 @@ int lmv_revalidate_slaves(struct obd_export *exp,
                        ldlm_lock_decref(lockh, it.it_lock_mode);
                        it.it_lock_mode = 0;
                }
+
+               valid_stripe_count++;
        }
 
 cleanup:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
+       /* if all stripes are invalid, return -ENOENT to notify user */
+       if (!rc && !valid_stripe_count)
+               rc = -ENOENT;
+
        OBD_FREE_PTR(op_data);
        RETURN(rc);
 }
 
-
 /*
  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
  * may be split dir.
index be2691e..bd54773 100644 (file)
@@ -2441,6 +2441,11 @@ static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt,
                }
 
                oinfo = &op_data->op_mea1->lsm_md_oinfo[stripe_index];
+               if (!oinfo->lmo_root) {
+                       rc = -ENOENT;
+                       break;
+               }
+
                tgt = lmv_get_target(ctxt->ldc_lmv, oinfo->lmo_mds, NULL);
                if (IS_ERR(tgt)) {
                        rc = PTR_ERR(tgt);
@@ -3001,10 +3006,22 @@ static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
        for (i = 0; i < stripe_count; i++) {
                fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
                              &lmm1->lmv_stripe_fids[i]);
+               /*
+                * set default value -1, so lmv_locate_tgt() knows this stripe
+                * target is not initialized.
+                */
+               lsm->lsm_md_oinfo[i].lmo_mds = (u32)-1;
+               if (!fid_is_sane(&lsm->lsm_md_oinfo[i].lmo_fid))
+                       continue;
+
                rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
                                    &lsm->lsm_md_oinfo[i].lmo_mds);
-               if (rc != 0)
+               if (rc == -ENOENT)
+                       continue;
+
+               if (rc)
                        RETURN(rc);
+
                CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i,
                       PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
        }
@@ -3038,8 +3055,10 @@ static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
                        RETURN(0);
                }
 
-               for (i = 0; i < lsm->lsm_md_stripe_count; i++)
-                       iput(lsm->lsm_md_oinfo[i].lmo_root);
+               for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
+                       if (lsm->lsm_md_oinfo[i].lmo_root)
+                               iput(lsm->lsm_md_oinfo[i].lmo_root);
+               }
                lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
                OBD_FREE(lsm, lsm_size);
                *lsmp = NULL;
@@ -3400,6 +3419,9 @@ static int lmv_merge_attr(struct obd_export *exp,
        for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
                struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
 
+               if (!inode)
+                       continue;
+
                CDEBUG(D_INFO,
                       "" DFID " size %llu, blocks %llu nlink %u, atime %lld ctime %lld, mtime %lld.\n",
                       PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
index 7224f4c..43437f9 100644 (file)
@@ -377,15 +377,24 @@ static struct dt_index_operations lod_index_ops = {
 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
                                         struct dt_object *dt, __u32 attr)
 {
-       struct lod_object       *lo = lod_dt_obj(dt);
-       struct dt_object        *next;
-       struct lod_it           *it = &lod_env_info(env)->lti_it;
-       struct dt_it            *it_next;
-       ENTRY;
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next;
+       struct lod_it *it = &lod_env_info(env)->lti_it;
+       struct dt_it *it_next;
+       __u16 index = 0;
 
        LASSERT(lo->ldo_dir_stripe_count > 0);
-       next = lo->ldo_stripe[0];
-       LASSERT(next != NULL);
+
+       do {
+               next = lo->ldo_stripe[index];
+               if (next && dt_object_exists(next))
+                       break;
+       } while (++index < lo->ldo_dir_stripe_count);
+
+       /* no valid stripe */
+       if (!next || !dt_object_exists(next))
+               return ERR_PTR(-ENODEV);
+
        LASSERT(next->do_index_ops != NULL);
 
        it_next = next->do_index_ops->dio_it.init(env, next, attr);
@@ -398,7 +407,7 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env,
         * additional ones */
        LASSERT(it->lit_obj == NULL);
 
-       it->lit_stripe_index = 0;
+       it->lit_stripe_index = index;
        it->lit_attr = attr;
        it->lit_it = it_next;
        it->lit_obj = dt;
@@ -433,10 +442,10 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
                LOD_CHECK_STRIPED_IT(env, it, lo);
 
                next = lo->ldo_stripe[it->lit_stripe_index];
-               LASSERT(next != NULL);
-               LASSERT(next->do_index_ops != NULL);
-
-               next->do_index_ops->dio_it.fini(env, it->lit_it);
+               if (next) {
+                       LASSERT(next->do_index_ops != NULL);
+                       next->do_index_ops->dio_it.fini(env, it->lit_it);
+               }
        }
 
        /* the iterator not in use any more */
@@ -457,15 +466,15 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
                              const struct dt_key *key)
 {
-       const struct lod_it     *it = (const struct lod_it *)di;
-       struct lod_object       *lo = lod_dt_obj(it->lit_obj);
-       struct dt_object        *next;
-       ENTRY;
+       const struct lod_it *it = (const struct lod_it *)di;
+       struct lod_object *lo = lod_dt_obj(it->lit_obj);
+       struct dt_object *next;
 
        LOD_CHECK_STRIPED_IT(env, it, lo);
 
        next = lo->ldo_stripe[it->lit_stripe_index];
        LASSERT(next != NULL);
+       LASSERT(dt_object_exists(next));
        LASSERT(next->do_index_ops != NULL);
 
        return next->do_index_ops->dio_it.get(env, it->lit_it, key);
@@ -480,9 +489,16 @@ static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
  */
 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
 {
-       struct lod_it           *it = (struct lod_it *)di;
-       struct lod_object       *lo = lod_dt_obj(it->lit_obj);
-       struct dt_object        *next;
+       struct lod_it *it = (struct lod_it *)di;
+       struct lod_object *lo = lod_dt_obj(it->lit_obj);
+       struct dt_object *next;
+
+       /*
+        * If lit_it == NULL, then it means the sub_it has been finished,
+        * which only happens in failure cases, see lod_striped_it_next()
+        */
+       if (!it->lit_it)
+               return;
 
        LOD_CHECK_STRIPED_IT(env, it, lo);
 
@@ -503,17 +519,20 @@ static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
  */
 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
 {
-       struct lod_it           *it = (struct lod_it *)di;
-       struct lod_object       *lo = lod_dt_obj(it->lit_obj);
-       struct dt_object        *next;
-       struct dt_it            *it_next;
-       int                     rc;
+       struct lod_it *it = (struct lod_it *)di;
+       struct lod_object *lo = lod_dt_obj(it->lit_obj);
+       struct dt_object *next;
+       struct dt_it *it_next;
+       __u32 index;
+       int rc;
+
        ENTRY;
 
        LOD_CHECK_STRIPED_IT(env, it, lo);
 
        next = lo->ldo_stripe[it->lit_stripe_index];
        LASSERT(next != NULL);
+       LASSERT(dt_object_exists(next));
        LASSERT(next->do_index_ops != NULL);
 again:
        rc = next->do_index_ops->dio_it.next(env, it->lit_it);
@@ -546,33 +565,44 @@ again:
                RETURN(rc);
        }
 
-       /* go to next stripe */
-       if (it->lit_stripe_index + 1 >= lo->ldo_dir_stripe_count)
-               RETURN(1);
-
-       it->lit_stripe_index++;
-
        next->do_index_ops->dio_it.put(env, it->lit_it);
        next->do_index_ops->dio_it.fini(env, it->lit_it);
        it->lit_it = NULL;
 
-       next = lo->ldo_stripe[it->lit_stripe_index];
-       LASSERT(next != NULL);
-       rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
-       if (rc != 0)
-               RETURN(rc);
+       /* go to next stripe */
+       index = it->lit_stripe_index;
+       while (++index < lo->ldo_dir_stripe_count) {
+               next = lo->ldo_stripe[index];
+               if (!next)
+                       continue;
 
-       LASSERT(next->do_index_ops != NULL);
+               if (!dt_object_exists(next))
+                       continue;
+
+               rc = next->do_ops->do_index_try(env, next,
+                                               &dt_directory_features);
+               if (rc != 0)
+                       RETURN(rc);
+
+               LASSERT(next->do_index_ops != NULL);
+
+               it_next = next->do_index_ops->dio_it.init(env, next,
+                                                         it->lit_attr);
+               if (IS_ERR(it_next))
+                       RETURN(PTR_ERR(it_next));
+
+               rc = next->do_index_ops->dio_it.get(env, it_next,
+                                                   (const struct dt_key *)"");
+               if (rc <= 0)
+                       RETURN(rc == 0 ? -EIO : rc);
 
-       it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
-       if (!IS_ERR(it_next)) {
                it->lit_it = it_next;
+               it->lit_stripe_index = index;
                goto again;
-       } else {
-               rc = PTR_ERR(it_next);
+
        }
 
-       RETURN(rc);
+       RETURN(1);
 }
 
 /**
@@ -956,7 +986,9 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
                int i;
 
                for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
-                       if (dt_object_exists(lo->ldo_stripe[i]) == 0)
+                       if (!lo->ldo_stripe[i])
+                               continue;
+                       if (!dt_object_exists(lo->ldo_stripe[i]))
                                continue;
                        rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
                                                lo->ldo_stripe[i], feat);
@@ -1241,6 +1273,8 @@ static int lod_declare_attr_set(const struct lu_env *env,
                for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
                        if (lo->ldo_stripe[i] == NULL)
                                continue;
+                       if (!dt_object_exists(lo->ldo_stripe[i]))
+                               continue;
                        rc = lod_sub_declare_attr_set(env, lo->ldo_stripe[i],
                                                      attr, th);
                        if (rc != 0)
@@ -1711,8 +1745,10 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
                __u32                   idx;
 
                fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
-               if (!fid_is_sane(fid))
-                       GOTO(out, rc = -ESTALE);
+               if (!fid_is_sane(fid)) {
+                       stripe[i] = NULL;
+                       continue;
+               }
 
                rc = lod_fld_lookup(env, lod, fid, &idx, &type);
                if (rc != 0)
@@ -1807,6 +1843,10 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env,
                struct linkea_data       ldata          = { NULL };
                struct lu_buf           linkea_buf;
 
+               /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */
+               if (!dto)
+                       continue;
+
                rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th);
                if (rc != 0)
                        GOTO(out, rc);
@@ -2034,6 +2074,11 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                 * in the above loop */
                LASSERT(tgt_dt != NULL);
                LASSERT(fid_is_sane(&fid));
+
+               /* fail a remote stripe FID allocation */
+               if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID))
+                       continue;
+
                conf.loc_flags = LOC_F_NEW;
                dto = dt_locate_at(env, tgt_dt, &fid,
                                   dt->do_lu.lo_dev->ld_site->ls_top_dev,
@@ -2217,7 +2262,7 @@ static int lod_dir_declare_layout_add(const struct lu_env *env,
                fid_le_to_cpu(fid,
                        &lmv->lmv_stripe_fids[i]);
                if (!fid_is_sane(fid))
-                       GOTO(out, rc = -ESTALE);
+                       continue;
 
                rc = lod_fld_lookup(env, lod, fid, &idx, &type);
                if (rc)
@@ -2338,7 +2383,8 @@ static int lod_dir_declare_layout_delete(const struct lu_env *env,
 
        for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
                dto = lo->ldo_stripe[i];
-               LASSERT(dto);
+               if (!dto)
+                       continue;
 
                if (!dt_try_as_dir(env, dto))
                        return -ENOTDIR;
@@ -2408,7 +2454,8 @@ static int lod_dir_layout_delete(const struct lu_env *env,
 
        for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
                dto = lo->ldo_stripe[i];
-               LASSERT(dto);
+               if (!dto)
+                       continue;
 
                rc = lod_sub_delete(env, dto,
                                    (const struct dt_key *)dotdot, th);
@@ -2490,7 +2537,11 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
                RETURN(0);
 
        for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
-               LASSERT(lo->ldo_stripe[i]);
+               if (!lo->ldo_stripe[i])
+                       continue;
+
+               if (!dt_object_exists(lo->ldo_stripe[i]))
+                       continue;
 
                rc = lod_sub_declare_xattr_set(env, lo->ldo_stripe[i],
                                               buf, name, fl, th);
@@ -3464,7 +3515,11 @@ static int lod_xattr_set_internal(const struct lu_env *env,
                RETURN(0);
 
        for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
-               LASSERT(lo->ldo_stripe[i]);
+               if (!lo->ldo_stripe[i])
+                       continue;
+
+               if (!dt_object_exists(lo->ldo_stripe[i]))
+                       continue;
 
                rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], buf, name,
                                       fl, th);
@@ -3722,6 +3777,14 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                struct linkea_data ldata = { NULL };
                struct lu_buf linkea_buf;
 
+               /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */
+               if (!dto)
+                       continue;
+
+               /* fail a remote stripe creation */
+               if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE))
+                       continue;
+
                /* if it's source stripe of migrating directory, don't create */
                if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
                      i >= lo->ldo_dir_migrate_offset)) {
@@ -4357,7 +4420,9 @@ static int lod_declare_xattr_del(const struct lu_env *env,
        for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
                struct dt_object *dto = lo->ldo_stripe[i];
 
-               LASSERT(dto);
+               if (!dto)
+                       continue;
+
                rc = lod_sub_declare_xattr_del(env, dto, name, th);
                if (rc != 0)
                        break;
@@ -4396,7 +4461,8 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
        for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
                struct dt_object *dto = lo->ldo_stripe[i];
 
-               LASSERT(dto);
+               if (!dto)
+                       continue;
 
                rc = lod_sub_xattr_del(env, dto, name, th);
                if (rc != 0)
@@ -5498,11 +5564,13 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo,
 static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt,
                               struct thandle *th)
 {
-       struct dt_object   *next = dt_object_child(dt);
-       struct lod_object  *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       struct lod_object *lo = lod_dt_obj(dt);
        struct lod_thread_info *info = lod_env_info(env);
-       char               *stripe_name = info->lti_key;
-       int                 rc, i;
+       struct dt_object *stripe;
+       char *stripe_name = info->lti_key;
+       int rc, i;
+
        ENTRY;
 
        /*
@@ -5522,13 +5590,17 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
 
                for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+                       stripe = lo->ldo_stripe[i];
+                       if (!stripe)
+                               continue;
+
                        rc = lod_sub_declare_ref_del(env, next, th);
                        if (rc != 0)
                                RETURN(rc);
 
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
-                               PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
-                               i);
+                       snprintf(stripe_name, sizeof(info->lti_key),
+                                DFID":%d",
+                                PFID(lu_object_fid(&stripe->do_lu)), i);
                        rc = lod_sub_declare_delete(env, next,
                                        (const struct dt_key *)stripe_name, th);
                        if (rc != 0)
@@ -5553,14 +5625,18 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt,
        /* declare destroy all striped objects */
        if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
                for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
-                       if (lo->ldo_stripe[i] == NULL)
+                       stripe = lo->ldo_stripe[i];
+                       if (!stripe)
                                continue;
 
-                       rc = lod_sub_declare_ref_del(env, lo->ldo_stripe[i],
-                                                    th);
+                       if (!dt_object_exists(stripe))
+                               continue;
 
-                       rc = lod_sub_declare_destroy(env, lo->ldo_stripe[i],
-                                                    th);
+                       rc = lod_sub_declare_ref_del(env, stripe, th);
+                       if (rc != 0)
+                               break;
+
+                       rc = lod_sub_declare_destroy(env, stripe, th);
                        if (rc != 0)
                                break;
                }
@@ -5590,9 +5666,11 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt,
        struct dt_object  *next = dt_object_child(dt);
        struct lod_object *lo = lod_dt_obj(dt);
        struct lod_thread_info *info = lod_env_info(env);
-       char               *stripe_name = info->lti_key;
-       unsigned int       i;
-       int                rc;
+       char *stripe_name = info->lti_key;
+       struct dt_object *stripe;
+       unsigned int i;
+       int rc;
+
        ENTRY;
 
        /* destroy sub-stripe of master object */
@@ -5603,17 +5681,20 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
 
                for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+                       stripe = lo->ldo_stripe[i];
+                       if (!stripe)
+                               continue;
+
                        rc = lod_sub_ref_del(env, next, th);
                        if (rc != 0)
                                RETURN(rc);
 
                        snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
-                               PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
-                               i);
+                               PFID(lu_object_fid(&stripe->do_lu)), i);
 
                        CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
                               PFID(lu_object_fid(&dt->do_lu)), stripe_name,
-                              PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
+                              PFID(lu_object_fid(&stripe->do_lu)));
 
                        rc = lod_sub_delete(env, next,
                                       (const struct dt_key *)stripe_name, th);
@@ -5636,20 +5717,22 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt,
        /* destroy all striped objects */
        if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
                for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
-                       if (lo->ldo_stripe[i] == NULL)
+                       stripe = lo->ldo_stripe[i];
+                       if (!stripe)
                                continue;
+
+                       if (!dt_object_exists(stripe))
+                               continue;
+
                        if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
                            i == cfs_fail_val) {
-                               dt_write_lock(env, lo->ldo_stripe[i],
-                                             MOR_TGT_CHILD);
-                               rc = lod_sub_ref_del(env, lo->ldo_stripe[i],
-                                                    th);
-                               dt_write_unlock(env, lo->ldo_stripe[i]);
+                               dt_write_lock(env, stripe, MOR_TGT_CHILD);
+                               rc = lod_sub_ref_del(env, stripe, th);
+                               dt_write_unlock(env, stripe);
                                if (rc != 0)
                                        break;
 
-                               rc = lod_sub_destroy(env, lo->ldo_stripe[i],
-                                                    th);
+                               rc = lod_sub_destroy(env, stripe, th);
                                if (rc != 0)
                                        break;
                        }
@@ -5757,8 +5840,10 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
         * NB, ha_count may not equal to ldo_dir_stripe_count, because dir
         * layout may change, e.g., shrink dir layout after migration.
         */
-       for (i = 0; i < lo->ldo_dir_stripe_count; i++)
-               dt_invalidate(env, lo->ldo_stripe[i]);
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               if (lo->ldo_stripe[i])
+                       dt_invalidate(env, lo->ldo_stripe[i]);
+       }
 
        slave_locks_size = offsetof(typeof(*slave_locks),
                                    ha_handles[slave_locks->ha_count]);
@@ -5819,17 +5904,19 @@ static int lod_object_lock(const struct lu_env *env,
        for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
                struct lustre_handle lockh;
                struct ldlm_res_id *res_id;
+               struct dt_object *stripe;
+
+               stripe = lo->ldo_stripe[i];
+               if (!stripe)
+                       continue;
 
                res_id = &lod_env_info(env)->lti_res_id;
-               fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
-                                      res_id);
+               fid_build_reg_res_name(lu_object_fid(&stripe->do_lu), res_id);
                einfo->ei_res_id = res_id;
 
-               LASSERT(lo->ldo_stripe[i] != NULL);
-               if (dt_object_remote(lo->ldo_stripe[i])) {
+               if (dt_object_remote(stripe)) {
                        set_bit(i, (void *)slave_locks->ha_map);
-                       rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
-                                           einfo, policy);
+                       rc = dt_object_lock(env, stripe, &lockh, einfo, policy);
                } else {
                        struct ldlm_namespace *ns = einfo->ei_namespace;
                        ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
index b3fcc45..551c370 100644 (file)
@@ -385,8 +385,12 @@ static int mdd_dir_is_empty(const struct lu_env *env,
 
                iops->put(env, it);
                iops->fini(env, it);
-       } else
+       } else {
                result = PTR_ERR(it);
+               /* -ENODEV means no valid stripe */
+               if (result == -ENODEV)
+                       RETURN(0);
+       }
        RETURN(result);
 }
 
@@ -3441,6 +3445,9 @@ static int mdd_dir_iterate_stripes(const struct lu_env *env,
 
        for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
                fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
+               if (!fid_is_sane(fid))
+                       continue;
+
                stripe = mdd_object_find(env, mdd, fid);
                if (IS_ERR(stripe))
                        RETURN(PTR_ERR(stripe));
index 505b254..c97e718 100644 (file)
@@ -1639,6 +1639,9 @@ static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
        for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
                fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
 
+               if (!fid_is_sane(fid))
+                       continue;
+
                slave = mdt_object_find(info->mti_env, mdt, fid);
                if (IS_ERR(slave))
                        GOTO(out, rc = PTR_ERR(slave));
index cbdb6bb..8e94baa 100755 (executable)
@@ -6702,6 +6702,39 @@ test_60g() {
 }
 run_test 60g "transaction abort won't cause MDT hung"
 
+test_60h() {
+       [ $MDS1_VERSION -le $(version_code 2.12.52) ] ||
+               skip "Need MDS version at least 2.12.52"
+       [ $MDSCOUNT -le 2 ] || skip "Need >= 2 MDTs"
+
+       local f
+
+       #define OBD_FAIL_MDS_STRIPE_CREATE       0x188
+       #define OBD_FAIL_MDS_STRIPE_FID          0x189
+       for fail_loc in 0x80000188 0x80000189; do
+               do_facet mds1 "$LCTL set_param fail_loc=$fail_loc"
+               $LFS mkdir -c $MDSCOUNT -i 0 $DIR/$tdir-$fail_loc ||
+                       error "mkdir $dir-$fail_loc failed"
+               for i in {0..10}; do
+                       # create may fail on missing stripe
+                       echo $i > $DIR/$tdir-$fail_loc/$i
+               done
+               $LFS getdirstripe $DIR/$tdir-$fail_loc ||
+                       error "getdirstripe $tdir-$fail_loc failed"
+               $LFS migrate -m 1 $DIR/$tdir-$fail_loc ||
+                       error "migrate $tdir-$fail_loc failed"
+               $LFS getdirstripe $DIR/$tdir-$fail_loc ||
+                       error "getdirstripe $tdir-$fail_loc failed"
+               pushd $DIR/$tdir-$fail_loc
+               for f in *; do
+                       echo $f | cmp $f - || error "$f data mismatch"
+               done
+               popd
+               rm -rf $DIR/$tdir-$fail_loc
+       done
+}
+run_test 60h "striped directory with missing stripes can be accessed"
+
 test_61a() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"