Whamcloud - gitweb
LU-5223 lmv: build master LMV EA dynamically build via readdir
[fs/lustre-release.git] / lustre / lod / lod_object.c
index d6dc2e5..fb43a8d 100644 (file)
@@ -668,6 +668,201 @@ static struct dt_index_operations lod_striped_index_ops = {
 };
 
 /**
+ * Append the FID for each shard of the striped directory after the
+ * given LMV EA header.
+ *
+ * To simplify striped directory and the consistency verification,
+ * we only store the LMV EA header on disk, for both master object
+ * and slave objects. When someone wants to know the whole LMV EA,
+ * such as client readdir(), we can build the entrie LMV EA on the
+ * MDT side (in RAM) via iterating the sub-directory entries that
+ * are contained in the master object of the stripe directory.
+ *
+ * For the master object of the striped directroy, the valid name
+ * for each shard is composed of the ${shard_FID}:${shard_idx}.
+ *
+ * There may be holes in the LMV EA if some shards' name entries
+ * are corrupted or lost.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lo       pointer to the master object of the striped directory
+ * \param[in] buf      pointer to the lu_buf which will hold the LMV EA
+ * \param[in] resize   whether re-allocate the buffer if it is not big enough
+ *
+ * \retval             positive size of the LMV EA
+ * \retval             0 for nothing to be loaded
+ * \retval             negative error number on failure
+ */
+int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
+                       struct lu_buf *buf, bool resize)
+{
+       struct lu_dirent        *ent    =
+                       (struct lu_dirent *)lod_env_info(env)->lti_key;
+       struct lod_device       *lod    = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+       struct dt_object        *obj    = dt_object_child(&lo->ldo_obj);
+       struct lmv_mds_md_v1    *lmv1   = buf->lb_buf;
+       struct dt_it            *it;
+       const struct dt_it_ops  *iops;
+       __u32                    stripes;
+       __u32                    magic  = le32_to_cpu(lmv1->lmv_magic);
+       int                      size;
+       int                      rc;
+       ENTRY;
+
+       /* If it is not a striped directory, then load nothing. */
+       if (magic != LMV_MAGIC_V1)
+               RETURN(0);
+
+       /* If it is in migration (or failure), then load nothing. */
+       if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
+               RETURN(0);
+
+       stripes = le32_to_cpu(lmv1->lmv_stripe_count);
+       if (stripes < 1)
+               RETURN(0);
+
+       size = lmv_mds_md_size(stripes, magic);
+       if (buf->lb_len < size) {
+               struct lu_buf tbuf;
+
+               if (!resize)
+                       RETURN(-ERANGE);
+
+               tbuf = *buf;
+               buf->lb_buf = NULL;
+               buf->lb_len = 0;
+               lu_buf_alloc(buf, size);
+               lmv1 = buf->lb_buf;
+               if (lmv1 == NULL)
+                       RETURN(-ENOMEM);
+
+               memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
+       }
+
+       if (unlikely(!dt_try_as_dir(env, obj)))
+               RETURN(-ENOTDIR);
+
+       memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
+       iops = &obj->do_index_ops->dio_it;
+       it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
+       if (IS_ERR(it))
+               RETURN(PTR_ERR(it));
+
+       rc = iops->load(env, it, 0);
+       if (rc == 0)
+               rc = iops->next(env, it);
+       else if (rc > 0)
+               rc = 0;
+
+       while (rc == 0) {
+               char             name[FID_LEN + 2] = "";
+               struct lu_fid    fid;
+               __u32            index;
+               int              len;
+
+               rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
+               if (rc != 0)
+                       break;
+
+               rc = -EIO;
+
+               fid_le_to_cpu(&fid, &ent->lde_fid);
+               ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
+               if (ent->lde_name[0] == '.') {
+                       if (ent->lde_namelen == 1)
+                               goto next;
+
+                       if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
+                               goto next;
+               }
+
+               len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
+               /* The ent->lde_name is composed of ${FID}:${index} */
+               if (ent->lde_namelen < len + 1 ||
+                   memcmp(ent->lde_name, name, len) != 0) {
+                       CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
+                              "%s: invalid shard name %.*s with the FID "DFID
+                              " for the striped directory "DFID", %s\n",
+                              lod2obd(lod)->obd_name, ent->lde_namelen,
+                              ent->lde_name, PFID(&fid),
+                              PFID(lu_object_fid(&obj->do_lu)),
+                              lod->lod_lmv_failout ? "failout" : "skip");
+
+                       if (lod->lod_lmv_failout)
+                               break;
+
+                       goto next;
+               }
+
+               index = 0;
+               do {
+                       if (ent->lde_name[len] < '0' ||
+                           ent->lde_name[len] > '9') {
+                               CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
+                                      "%s: invalid shard name %.*s with the "
+                                      "FID "DFID" for the striped directory "
+                                      DFID", %s\n",
+                                      lod2obd(lod)->obd_name, ent->lde_namelen,
+                                      ent->lde_name, PFID(&fid),
+                                      PFID(lu_object_fid(&obj->do_lu)),
+                                      lod->lod_lmv_failout ?
+                                      "failout" : "skip");
+
+                               if (lod->lod_lmv_failout)
+                                       break;
+
+                               goto next;
+                       }
+
+                       index = index * 10 + ent->lde_name[len++] - '0';
+               } while (len < ent->lde_namelen);
+
+               if (len == ent->lde_namelen) {
+                       /* Out of LMV EA range. */
+                       if (index >= stripes) {
+                               CERROR("%s: the shard %.*s for the striped "
+                                      "directory "DFID" is out of the known "
+                                      "LMV EA range [0 - %u], failout\n",
+                                      lod2obd(lod)->obd_name, ent->lde_namelen,
+                                      ent->lde_name,
+                                      PFID(lu_object_fid(&obj->do_lu)),
+                                      stripes - 1);
+
+                               break;
+                       }
+
+                       /* The slot has been occupied. */
+                       if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
+                               struct lu_fid fid0;
+
+                               fid_le_to_cpu(&fid0,
+                                       &lmv1->lmv_stripe_fids[index]);
+                               CERROR("%s: both the shard "DFID" and "DFID
+                                      " for the striped directory "DFID
+                                      " claim the same LMV EA slot at the "
+                                      "index %d, failout\n",
+                                      lod2obd(lod)->obd_name,
+                                      PFID(&fid0), PFID(&fid),
+                                      PFID(lu_object_fid(&obj->do_lu)), index);
+
+                               break;
+                       }
+
+                       /* stored as LE mode */
+                       lmv1->lmv_stripe_fids[index] = ent->lde_fid;
+
+next:
+                       rc = iops->next(env, it);
+               }
+       }
+
+       iops->put(env, it);
+       iops->fini(env, it);
+
+       RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
+}
+
+/**
  * Implementation of dt_object_operations:: do_index_try
  *
  * This function will try to initialize the index api pointer for the
@@ -1018,6 +1213,42 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
+       if (strcmp(name, XATTR_NAME_LMV) == 0) {
+               struct lmv_mds_md_v1    *lmv1;
+               int                      rc1 = 0;
+
+               if (rc > sizeof(*lmv1))
+                       RETURN(rc);
+
+               if (rc < sizeof(*lmv1))
+                       RETURN(rc = rc > 0 ? -EINVAL : rc);
+
+               if (buf->lb_buf == NULL || buf->lb_len == 0) {
+                       CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
+
+                       info->lti_buf.lb_buf = info->lti_key;
+                       info->lti_buf.lb_len = sizeof(*lmv1);
+                       rc = dt_xattr_get(env, dt_object_child(dt),
+                                         &info->lti_buf, name, capa);
+                       if (unlikely(rc != sizeof(*lmv1)))
+                               RETURN(rc = rc > 0 ? -EINVAL : rc);
+
+                       lmv1 = info->lti_buf.lb_buf;
+                       /* The on-disk LMV EA only contains header, but the
+                        * returned LMV EA size should contain the space for
+                        * the FIDs of all shards of the striped directory. */
+                       if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
+                               rc = lmv_mds_md_size(
+                                       le32_to_cpu(lmv1->lmv_stripe_count),
+                                       LMV_MAGIC_V1);
+               } else {
+                       rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
+                                                 buf, false);
+               }
+
+               RETURN(rc = rc1 != 0 ? rc1 : rc);
+       }
+
        if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
                RETURN(rc);
 
@@ -1081,8 +1312,7 @@ out:
 /**
  * Master LMVEA will be same as slave LMVEA, except
  * 1. different magic
- * 2. No lmv_stripe_fids on slave
- * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
+ * 2. lmv_master_mdt_index on slave LMV EA will be stripe_index.
  */
 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
                                  const struct lmv_mds_md_v1 *master_lmv)
@@ -1099,9 +1329,7 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        struct lod_object       *lo = lod_dt_obj(dt);
        struct lmv_mds_md_v1    *lmm1;
        int                     stripe_count;
-       int                     lmm_size;
        int                     type = LU_SEQ_RANGE_ANY;
-       int                     i;
        int                     rc;
        __u32                   mdtidx;
        ENTRY;
@@ -1109,11 +1337,13 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        LASSERT(lo->ldo_dir_striped != 0);
        LASSERT(lo->ldo_stripenr > 0);
        stripe_count = lo->ldo_stripenr;
-       lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
-       if (info->lti_ea_store_size < lmm_size) {
-               rc = lod_ea_store_resize(info, lmm_size);
+       /* Only store the LMV EA heahder on the disk. */
+       if (info->lti_ea_store_size < sizeof(*lmm1)) {
+               rc = lod_ea_store_resize(info, sizeof(*lmm1));
                if (rc != 0)
                        RETURN(rc);
+       } else {
+               memset(info->lti_ea_store, 0, sizeof(*lmm1));
        }
 
        lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
@@ -1126,18 +1356,8 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
                RETURN(rc);
 
        lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
-       fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
-       for (i = 0; i < lo->ldo_stripenr; i++) {
-               struct dt_object *dto;
-
-               dto = lo->ldo_stripe[i];
-               LASSERT(dto != NULL);
-               fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
-                             lu_object_fid(&dto->do_lu));
-       }
-
        lmv_buf->lb_buf = info->lti_ea_store;
-       lmv_buf->lb_len = lmm_size;
+       lmv_buf->lb_len = sizeof(*lmm1);
        lo->ldo_dir_striping_cached = 1;
 
        RETURN(rc);
@@ -1168,7 +1388,7 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
        if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
                RETURN(-EINVAL);
 
-       if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
+       if (le32_to_cpu(lmv1->lmv_stripe_count) < 1)
                RETURN(0);
 
        LASSERT(lo->ldo_stripe == NULL);
@@ -1579,8 +1799,12 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
-       if (lo->ldo_stripenr == 0)
-               RETURN(rc);
+       /* Note: Do not set LinkEA on sub-stripes, otherwise
+        * it will confuse the fid2path process(see mdt_path_current()).
+        * The linkEA between master and sub-stripes is set in
+        * lod_xattr_set_lmv(). */
+       if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+               RETURN(0);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
@@ -1673,8 +1897,12 @@ static int lod_xattr_set_internal(const struct lu_env *env,
        if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
                RETURN(rc);
 
-       if (lo->ldo_stripenr == 0)
-               RETURN(rc);
+       /* Note: Do not set LinkEA on sub-stripes, otherwise
+        * it will confuse the fid2path process(see mdt_path_current()).
+        * The linkEA between master and sub-stripes is set in
+        * lod_xattr_set_lmv(). */
+       if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+               RETURN(0);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);