Whamcloud - gitweb
LU-5223 lmv: build master LMV EA dynamically build via readdir
[fs/lustre-release.git] / lustre / lod / lod_object.c
index d5d09b0..fb43a8d 100644 (file)
@@ -44,6 +44,7 @@
 #include <lustre_fid.h>
 #include <lustre_lmv.h>
 #include <md_object.h>
+#include <lustre_linkea.h>
 
 #include "lod_internal.h"
 
@@ -667,6 +668,201 @@ static struct dt_index_operations lod_striped_index_ops = {
 };
 
 /**
+ * Append the FID for each shard of the striped directory after the
+ * given LMV EA header.
+ *
+ * To simplify striped directory and the consistency verification,
+ * we only store the LMV EA header on disk, for both master object
+ * and slave objects. When someone wants to know the whole LMV EA,
+ * such as client readdir(), we can build the entrie LMV EA on the
+ * MDT side (in RAM) via iterating the sub-directory entries that
+ * are contained in the master object of the stripe directory.
+ *
+ * For the master object of the striped directroy, the valid name
+ * for each shard is composed of the ${shard_FID}:${shard_idx}.
+ *
+ * There may be holes in the LMV EA if some shards' name entries
+ * are corrupted or lost.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lo       pointer to the master object of the striped directory
+ * \param[in] buf      pointer to the lu_buf which will hold the LMV EA
+ * \param[in] resize   whether re-allocate the buffer if it is not big enough
+ *
+ * \retval             positive size of the LMV EA
+ * \retval             0 for nothing to be loaded
+ * \retval             negative error number on failure
+ */
+int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
+                       struct lu_buf *buf, bool resize)
+{
+       struct lu_dirent        *ent    =
+                       (struct lu_dirent *)lod_env_info(env)->lti_key;
+       struct lod_device       *lod    = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+       struct dt_object        *obj    = dt_object_child(&lo->ldo_obj);
+       struct lmv_mds_md_v1    *lmv1   = buf->lb_buf;
+       struct dt_it            *it;
+       const struct dt_it_ops  *iops;
+       __u32                    stripes;
+       __u32                    magic  = le32_to_cpu(lmv1->lmv_magic);
+       int                      size;
+       int                      rc;
+       ENTRY;
+
+       /* If it is not a striped directory, then load nothing. */
+       if (magic != LMV_MAGIC_V1)
+               RETURN(0);
+
+       /* If it is in migration (or failure), then load nothing. */
+       if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
+               RETURN(0);
+
+       stripes = le32_to_cpu(lmv1->lmv_stripe_count);
+       if (stripes < 1)
+               RETURN(0);
+
+       size = lmv_mds_md_size(stripes, magic);
+       if (buf->lb_len < size) {
+               struct lu_buf tbuf;
+
+               if (!resize)
+                       RETURN(-ERANGE);
+
+               tbuf = *buf;
+               buf->lb_buf = NULL;
+               buf->lb_len = 0;
+               lu_buf_alloc(buf, size);
+               lmv1 = buf->lb_buf;
+               if (lmv1 == NULL)
+                       RETURN(-ENOMEM);
+
+               memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
+       }
+
+       if (unlikely(!dt_try_as_dir(env, obj)))
+               RETURN(-ENOTDIR);
+
+       memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
+       iops = &obj->do_index_ops->dio_it;
+       it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
+       if (IS_ERR(it))
+               RETURN(PTR_ERR(it));
+
+       rc = iops->load(env, it, 0);
+       if (rc == 0)
+               rc = iops->next(env, it);
+       else if (rc > 0)
+               rc = 0;
+
+       while (rc == 0) {
+               char             name[FID_LEN + 2] = "";
+               struct lu_fid    fid;
+               __u32            index;
+               int              len;
+
+               rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
+               if (rc != 0)
+                       break;
+
+               rc = -EIO;
+
+               fid_le_to_cpu(&fid, &ent->lde_fid);
+               ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
+               if (ent->lde_name[0] == '.') {
+                       if (ent->lde_namelen == 1)
+                               goto next;
+
+                       if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
+                               goto next;
+               }
+
+               len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
+               /* The ent->lde_name is composed of ${FID}:${index} */
+               if (ent->lde_namelen < len + 1 ||
+                   memcmp(ent->lde_name, name, len) != 0) {
+                       CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
+                              "%s: invalid shard name %.*s with the FID "DFID
+                              " for the striped directory "DFID", %s\n",
+                              lod2obd(lod)->obd_name, ent->lde_namelen,
+                              ent->lde_name, PFID(&fid),
+                              PFID(lu_object_fid(&obj->do_lu)),
+                              lod->lod_lmv_failout ? "failout" : "skip");
+
+                       if (lod->lod_lmv_failout)
+                               break;
+
+                       goto next;
+               }
+
+               index = 0;
+               do {
+                       if (ent->lde_name[len] < '0' ||
+                           ent->lde_name[len] > '9') {
+                               CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
+                                      "%s: invalid shard name %.*s with the "
+                                      "FID "DFID" for the striped directory "
+                                      DFID", %s\n",
+                                      lod2obd(lod)->obd_name, ent->lde_namelen,
+                                      ent->lde_name, PFID(&fid),
+                                      PFID(lu_object_fid(&obj->do_lu)),
+                                      lod->lod_lmv_failout ?
+                                      "failout" : "skip");
+
+                               if (lod->lod_lmv_failout)
+                                       break;
+
+                               goto next;
+                       }
+
+                       index = index * 10 + ent->lde_name[len++] - '0';
+               } while (len < ent->lde_namelen);
+
+               if (len == ent->lde_namelen) {
+                       /* Out of LMV EA range. */
+                       if (index >= stripes) {
+                               CERROR("%s: the shard %.*s for the striped "
+                                      "directory "DFID" is out of the known "
+                                      "LMV EA range [0 - %u], failout\n",
+                                      lod2obd(lod)->obd_name, ent->lde_namelen,
+                                      ent->lde_name,
+                                      PFID(lu_object_fid(&obj->do_lu)),
+                                      stripes - 1);
+
+                               break;
+                       }
+
+                       /* The slot has been occupied. */
+                       if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
+                               struct lu_fid fid0;
+
+                               fid_le_to_cpu(&fid0,
+                                       &lmv1->lmv_stripe_fids[index]);
+                               CERROR("%s: both the shard "DFID" and "DFID
+                                      " for the striped directory "DFID
+                                      " claim the same LMV EA slot at the "
+                                      "index %d, failout\n",
+                                      lod2obd(lod)->obd_name,
+                                      PFID(&fid0), PFID(&fid),
+                                      PFID(lu_object_fid(&obj->do_lu)), index);
+
+                               break;
+                       }
+
+                       /* stored as LE mode */
+                       lmv1->lmv_stripe_fids[index] = ent->lde_fid;
+
+next:
+                       rc = iops->next(env, it);
+               }
+       }
+
+       iops->put(env, it);
+       iops->fini(env, it);
+
+       RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
+}
+
+/**
  * Implementation of dt_object_operations:: do_index_try
  *
  * This function will try to initialize the index api pointer for the
@@ -756,62 +952,11 @@ static int lod_attr_get(const struct lu_env *env,
                        struct lu_attr *attr,
                        struct lustre_capa *capa)
 {
-       struct lod_object *lo = lod_dt_obj(dt);
-       int i;
-       int rc;
-       ENTRY;
-
-       rc = dt_attr_get(env, dt_object_child(dt), attr, capa);
-       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr) || rc != 0)
-               RETURN(rc);
-
-       rc = lod_load_striping_locked(env, lo);
-       if (rc)
-               RETURN(rc);
-
-       if (lo->ldo_stripenr == 0)
-               RETURN(rc);
-
-       attr->la_nlink = 2;
-       attr->la_size = 0;
-       for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct lu_attr *sub_attr = &lod_env_info(env)->lti_attr;
-
-               LASSERT(lo->ldo_stripe[i]);
-               if (dt_object_exists(lo->ldo_stripe[i]))
-                       continue;
-
-               rc = dt_attr_get(env, lo->ldo_stripe[i], sub_attr, capa);
-               if (rc != 0)
-                       break;
-
-               /* -2 for . and .. on each stripe */
-               if (sub_attr->la_valid & LA_NLINK && attr->la_valid & LA_NLINK)
-                       attr->la_nlink += sub_attr->la_nlink - 2;
-               if (sub_attr->la_valid & LA_SIZE && attr->la_valid & LA_SIZE)
-                       attr->la_size += sub_attr->la_size;
-
-               if (sub_attr->la_valid & LA_ATIME &&
-                   attr->la_valid & LA_ATIME &&
-                   attr->la_atime < sub_attr->la_atime)
-                       attr->la_atime = sub_attr->la_atime;
-
-               if (sub_attr->la_valid & LA_CTIME &&
-                   attr->la_valid & LA_CTIME &&
-                   attr->la_ctime < sub_attr->la_ctime)
-                       attr->la_ctime = sub_attr->la_ctime;
-
-               if (sub_attr->la_valid & LA_MTIME &&
-                   attr->la_valid & LA_MTIME &&
-                   attr->la_mtime < sub_attr->la_mtime)
-                       attr->la_mtime = sub_attr->la_mtime;
-       }
-
-       CDEBUG(D_INFO, DFID" stripe_count %d nlink %u size "LPU64"\n",
-              PFID(lu_object_fid(&dt->do_lu)), lo->ldo_stripenr,
-              attr->la_nlink, attr->la_size);
-
-       RETURN(rc);
+       /* Note: for striped directory, client will merge attributes
+        * from all of the sub-stripes see lmv_merge_attr(), and there
+        * no MDD logic depend on directory nlink/size/time, so we can
+        * always use master inode nlink and size for now. */
+       return dt_attr_get(env, dt_object_child(dt), attr, capa);
 }
 
 /**
@@ -929,12 +1074,13 @@ static int lod_declare_attr_set(const struct lu_env *env,
         */
        LASSERT(lo->ldo_stripe);
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               LASSERT(lo->ldo_stripe[i]);
-
-               rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
-               if (rc) {
-                       CERROR("failed declaration: %d\n", rc);
-                       break;
+               if (likely(lo->ldo_stripe[i] != NULL)) {
+                       rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
+                                                handle);
+                       if (rc != 0) {
+                               CERROR("failed declaration: %d\n", rc);
+                               break;
+                       }
                }
        }
 
@@ -1003,13 +1149,16 @@ static int lod_attr_set(const struct lu_env *env,
         */
        LASSERT(lo->ldo_stripe);
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               LASSERT(lo->ldo_stripe[i]);
-               if (dt_object_exists(lo->ldo_stripe[i]) == 0)
-                       continue;
-               rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
-               if (rc) {
-                       CERROR("failed declaration: %d\n", rc);
-                       break;
+               if (likely(lo->ldo_stripe[i] != NULL)) {
+                       if (dt_object_exists(lo->ldo_stripe[i]) == 0)
+                               continue;
+
+                       rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
+                                        handle, capa);
+                       if (rc != 0) {
+                               CERROR("failed declaration: %d\n", rc);
+                               break;
+                       }
                }
        }
 
@@ -1064,6 +1213,42 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
+       if (strcmp(name, XATTR_NAME_LMV) == 0) {
+               struct lmv_mds_md_v1    *lmv1;
+               int                      rc1 = 0;
+
+               if (rc > sizeof(*lmv1))
+                       RETURN(rc);
+
+               if (rc < sizeof(*lmv1))
+                       RETURN(rc = rc > 0 ? -EINVAL : rc);
+
+               if (buf->lb_buf == NULL || buf->lb_len == 0) {
+                       CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
+
+                       info->lti_buf.lb_buf = info->lti_key;
+                       info->lti_buf.lb_len = sizeof(*lmv1);
+                       rc = dt_xattr_get(env, dt_object_child(dt),
+                                         &info->lti_buf, name, capa);
+                       if (unlikely(rc != sizeof(*lmv1)))
+                               RETURN(rc = rc > 0 ? -EINVAL : rc);
+
+                       lmv1 = info->lti_buf.lb_buf;
+                       /* The on-disk LMV EA only contains header, but the
+                        * returned LMV EA size should contain the space for
+                        * the FIDs of all shards of the striped directory. */
+                       if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
+                               rc = lmv_mds_md_size(
+                                       le32_to_cpu(lmv1->lmv_stripe_count),
+                                       LMV_MAGIC_V1);
+               } else {
+                       rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
+                                                 buf, false);
+               }
+
+               RETURN(rc = rc1 != 0 ? rc1 : rc);
+       }
+
        if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
                RETURN(rc);
 
@@ -1127,8 +1312,7 @@ out:
 /**
  * Master LMVEA will be same as slave LMVEA, except
  * 1. different magic
- * 2. No lmv_stripe_fids on slave
- * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
+ * 2. lmv_master_mdt_index on slave LMV EA will be stripe_index.
  */
 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
                                  const struct lmv_mds_md_v1 *master_lmv)
@@ -1145,9 +1329,7 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        struct lod_object       *lo = lod_dt_obj(dt);
        struct lmv_mds_md_v1    *lmm1;
        int                     stripe_count;
-       int                     lmm_size;
        int                     type = LU_SEQ_RANGE_ANY;
-       int                     i;
        int                     rc;
        __u32                   mdtidx;
        ENTRY;
@@ -1155,11 +1337,13 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        LASSERT(lo->ldo_dir_striped != 0);
        LASSERT(lo->ldo_stripenr > 0);
        stripe_count = lo->ldo_stripenr;
-       lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
-       if (info->lti_ea_store_size < lmm_size) {
-               rc = lod_ea_store_resize(info, lmm_size);
+       /* Only store the LMV EA heahder on the disk. */
+       if (info->lti_ea_store_size < sizeof(*lmm1)) {
+               rc = lod_ea_store_resize(info, sizeof(*lmm1));
                if (rc != 0)
                        RETURN(rc);
+       } else {
+               memset(info->lti_ea_store, 0, sizeof(*lmm1));
        }
 
        lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
@@ -1172,18 +1356,8 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
                RETURN(rc);
 
        lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
-       fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
-       for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct dt_object *dto;
-
-               dto = lo->ldo_stripe[i];
-               LASSERT(dto != NULL);
-               fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
-                             lu_object_fid(&dto->do_lu));
-       }
-
        lmv_buf->lb_buf = info->lti_ea_store;
-       lmv_buf->lb_len = lmm_size;
+       lmv_buf->lb_len = sizeof(*lmm1);
        lo->ldo_dir_striping_cached = 1;
 
        RETURN(rc);
@@ -1214,7 +1388,7 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
        if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
                RETURN(-EINVAL);
 
-       if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
+       if (le32_to_cpu(lmv1->lmv_stripe_count) < 1)
                RETURN(0);
 
        LASSERT(lo->ldo_stripe == NULL);
@@ -1427,8 +1601,11 @@ next:
                GOTO(out_put, rc = -EINVAL);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct dt_object *dto = stripe[i];
-               char             *stripe_name = info->lti_key;
+               struct dt_object        *dto            = stripe[i];
+               char                    *stripe_name    = info->lti_key;
+               struct lu_name          *sname;
+               struct linkea_data       ldata          = { 0 };
+               struct lu_buf            linkea_buf;
 
                rc = dt_declare_create(env, dto, attr, NULL, dof, th);
                if (rc != 0)
@@ -1472,9 +1649,9 @@ next:
                                cpu_to_le16(lo->ldo_def_stripe_offset);
                        v3->lmm_stripe_size =
                                cpu_to_le32(lo->ldo_def_stripe_size);
-                       if (lo->ldo_pool)
-                               strncpy(v3->lmm_pool_name, lo->ldo_pool,
-                                       LOV_MAXPOOLNAME);
+                       if (lo->ldo_pool != NULL)
+                               strlcpy(v3->lmm_pool_name, lo->ldo_pool,
+                                       sizeof(v3->lmm_pool_name));
 
                        info->lti_buf.lb_buf = v3;
                        info->lti_buf.lb_len = sizeof(*v3);
@@ -1495,6 +1672,23 @@ next:
 
                snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+               rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+               linkea_buf.lb_len = ldata.ld_leh->leh_len;
+               rc = dt_declare_xattr_set(env, dto, &linkea_buf,
+                                         XATTR_NAME_LINK, 0, th);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
                rc = dt_declare_insert(env, dt_object_child(dt),
                     (const struct dt_rec *)lu_object_fid(&dto->do_lu),
                     (const struct dt_key *)stripe_name, th);
@@ -1605,8 +1799,12 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
-       if (lo->ldo_stripenr == 0)
-               RETURN(rc);
+       /* Note: Do not set LinkEA on sub-stripes, otherwise
+        * it will confuse the fid2path process(see mdt_path_current()).
+        * The linkEA between master and sub-stripes is set in
+        * lod_xattr_set_lmv(). */
+       if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+               RETURN(0);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
@@ -1699,8 +1897,12 @@ static int lod_xattr_set_internal(const struct lu_env *env,
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
-       if (lo->ldo_stripenr == 0)
-               RETURN(rc);
+       /* Note: Do not set LinkEA on sub-stripes, otherwise
+        * it will confuse the fid2path process(see mdt_path_current()).
+        * The linkEA between master and sub-stripes is set in
+        * lod_xattr_set_lmv(). */
+       if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+               RETURN(0);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
@@ -1764,7 +1966,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
        LASSERT(buf != NULL && buf->lb_buf != NULL);
        lum = buf->lb_buf;
 
-       rc = lod_verify_striping(d, buf, 0);
+       rc = lod_verify_striping(d, buf, false);
        if (rc)
                RETURN(rc);
 
@@ -1886,8 +2088,11 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
        slave_lmv_buf.lb_len = sizeof(*slave_lmm);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct dt_object *dto;
-               char             *stripe_name = info->lti_key;
+               struct dt_object        *dto;
+               char                    *stripe_name    = info->lti_key;
+               struct lu_name          *sname;
+               struct linkea_data       ldata          = { 0 };
+               struct lu_buf            linkea_buf;
 
                dto = lo->ldo_stripe[i];
                dt_write_lock(env, dto, MOR_TGT_CHILD);
@@ -1929,9 +2134,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                cpu_to_le16(lo->ldo_def_stripe_offset);
                        v3->lmm_stripe_size =
                                cpu_to_le32(lo->ldo_def_stripe_size);
-                       if (lo->ldo_pool)
-                               strncpy(v3->lmm_pool_name, lo->ldo_pool,
-                                       LOV_MAXPOOLNAME);
+                       if (lo->ldo_pool != NULL)
+                               strlcpy(v3->lmm_pool_name, lo->ldo_pool,
+                                       sizeof(v3->lmm_pool_name));
 
                        info->lti_buf.lb_buf = v3;
                        info->lti_buf.lb_len = sizeof(*v3);
@@ -1950,6 +2155,23 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
 
                snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
                         PFID(lu_object_fid(&dto->do_lu)), i);
+
+               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+               rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+               linkea_buf.lb_len = ldata.ld_leh->leh_len;
+               rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
+                                 0, th, BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(out, rc);
+
                rc = dt_insert(env, dt_object_child(dt),
                     (const struct dt_rec *)lu_object_fid(&dto->do_lu),
                     (const struct dt_key *)stripe_name, th, capa, 0);
@@ -1983,8 +2205,7 @@ int lod_dir_striping_create_internal(const struct lu_env *env,
        int                     rc;
        ENTRY;
 
-       if (lo->ldo_dir_def_striping_set &&
-           !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
+       if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
                                 lo->ldo_dir_stripe_offset)) {
                struct lmv_user_md_v1 *v1 = info->lti_ea_store;
                int stripe_count = lo->ldo_stripenr;
@@ -2072,9 +2293,9 @@ int lod_dir_striping_create_internal(const struct lu_env *env,
                v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
                v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
                v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
-               if (lo->ldo_pool)
-                       strncpy(v3->lmm_pool_name, lo->ldo_pool,
-                               LOV_MAXPOOLNAME);
+               if (lo->ldo_pool != NULL)
+                       strlcpy(v3->lmm_pool_name, lo->ldo_pool,
+                               sizeof(v3->lmm_pool_name));
 
                info->lti_buf.lb_buf = v3;
                info->lti_buf.lb_len = sizeof(*v3);
@@ -2243,10 +2464,12 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env,
 
        rc = 0;
        v1 = info->lti_ea_store;
-       if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
+       if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
                lustre_swab_lov_user_md_v1(v1);
-       else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
+       } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
+               v3 = (struct lov_user_md_v3 *)v1;
                lustre_swab_lov_user_md_v3(v3);
+       }
 
        if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
                GOTO(unlock, rc = 0);
@@ -2433,7 +2656,7 @@ static void lod_ah_init(const struct lu_env *env,
                               lc->ldo_dir_def_hash_type);
                }
 
-               /* If the directory is specified with certain stripes */
+               /* It should always honour the specified stripes */
                if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
                        const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
 
@@ -2452,6 +2675,7 @@ static void lod_ah_init(const struct lu_env *env,
                                       lc->ldo_stripenr,
                                       (int)lc->ldo_dir_stripe_offset);
                        }
+               /* then check whether there is default stripes from parent */
                } else if (lp->ldo_dir_def_striping_set) {
                        /* If there are default dir stripe from parent */
                        lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
@@ -2777,12 +3001,13 @@ static int lod_declare_object_destroy(const struct lu_env *env,
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
                RETURN(0);
 
-       /* declare destroy for all underlying objects */
+       /* declare destroy all striped objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               LASSERT(lo->ldo_stripe[i]);
-               rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
-               if (rc != 0)
-                       break;
+               if (likely(lo->ldo_stripe[i] != NULL)) {
+                       rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
+                       if (rc != 0)
+                               break;
+               }
        }
 
        RETURN(rc);
@@ -2834,10 +3059,13 @@ static int lod_object_destroy(const struct lu_env *env,
 
        /* destroy all striped objects */
        for (i = 0; i < lo->ldo_stripenr; i++) {
-               LASSERT(lo->ldo_stripe[i]);
-               rc = dt_destroy(env, lo->ldo_stripe[i], th);
-               if (rc != 0)
-                       break;
+               if (likely(lo->ldo_stripe[i] != NULL) &&
+                   (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
+                    i == cfs_fail_val)) {
+                       rc = dt_destroy(env, lo->ldo_stripe[i], th);
+                       if (rc != 0)
+                               break;
+               }
        }
 
        RETURN(rc);