Whamcloud - gitweb
LU-15487 mdd: print FID in mdd_dir_page_build() error 68/46368/4
authorAndreas Dilger <adilger@whamcloud.com>
Wed, 26 Jan 2022 23:33:27 +0000 (16:33 -0700)
committerOleg Drokin <green@whamcloud.com>
Mon, 30 May 2022 19:04:08 +0000 (19:04 +0000)
Print the MDT name and FID in mdd_dir_page_build() when an error
is hit.  Because this changes the callback function signature,
also update dt_index_page_build() to print a more useful message.

Add OBD_FAIL_MDS_DIR_PAGE_WALK to allow triggering this codepath
to see if this is the source of problems in error handling.

Minor code style and whitespace cleanups in related functions.

Signed-off-by: Andreas Dilger <adilger@whamcloud.com>
Change-Id: Ic475f4a2c775871ff5af59a47e0966ba3eed7013
Reviewed-on: https://review.whamcloud.com/46368
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Jian Yu <yujian@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/dt_object.h
lustre/include/obd_support.h
lustre/mdd/mdd_object.c
lustre/obdclass/dt_object.c
lustre/ptlrpc/nodemap_storage.c

index f24d7d3..3df2027 100644 (file)
@@ -2247,8 +2247,8 @@ int dt_record_read(const struct lu_env *env, struct dt_object *dt,
 int dt_record_write(const struct lu_env *env, struct dt_object *dt,
                     const struct lu_buf *buf, loff_t *pos, struct thandle *th);
 typedef int (*dt_index_page_build_t)(const struct lu_env *env,
-                                    union lu_page *lp, size_t nob,
-                                    const struct dt_it_ops *iops,
+                                    struct dt_object *obj, union lu_page *lp,
+                                    size_t bytes, const struct dt_it_ops *iops,
                                     struct dt_it *it, __u32 attr, void *arg);
 int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
                  const struct lu_rdpg *rdpg, dt_index_page_build_t filler,
index 9748c5a..a927d2c 100644 (file)
@@ -220,34 +220,35 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_MDS_HSM_ACTION_NET            0x150
 #define OBD_FAIL_MDS_CHANGELOG_INIT            0x151
 #define OBD_FAIL_MDS_HSM_SWAP_LAYOUTS          0x152
-#define OBD_FAIL_MDS_RENAME              0x153
-#define OBD_FAIL_MDS_RENAME2             0x154
-#define OBD_FAIL_MDS_RENAME3             0x155
-#define OBD_FAIL_MDS_RENAME4             0x156
-#define OBD_FAIL_MDS_LDLM_REPLY_NET     0x157
-#define OBD_FAIL_MDS_STALE_DIR_LAYOUT   0x158
-#define OBD_FAIL_MDS_REINT_MULTI_NET     0x159
-#define OBD_FAIL_MDS_REINT_MULTI_NET_REP 0x15a
-#define OBD_FAIL_MDS_LLOG_CREATE_FAILED2 0x15b
+#define OBD_FAIL_MDS_RENAME                    0x153
+#define OBD_FAIL_MDS_RENAME2                   0x154
+#define OBD_FAIL_MDS_RENAME3                   0x155
+#define OBD_FAIL_MDS_RENAME4                   0x156
+#define OBD_FAIL_MDS_LDLM_REPLY_NET            0x157
+#define OBD_FAIL_MDS_STALE_DIR_LAYOUT          0x158
+#define OBD_FAIL_MDS_REINT_MULTI_NET           0x159
+#define OBD_FAIL_MDS_REINT_MULTI_NET_REP       0x15a
+#define OBD_FAIL_MDS_LLOG_CREATE_FAILED2       0x15b
 #define OBD_FAIL_MDS_FLD_LOOKUP                        0x15c
-#define OBD_FAIL_MDS_CHANGELOG_REORDER 0x15d
-#define OBD_FAIL_MDS_LLOG_UMOUNT_RACE   0x15e
-#define OBD_FAIL_MDS_CHANGELOG_RACE     0x15f
+#define OBD_FAIL_MDS_CHANGELOG_REORDER         0x15d
+#define OBD_FAIL_MDS_LLOG_UMOUNT_RACE          0x15e
+#define OBD_FAIL_MDS_CHANGELOG_RACE            0x15f
 #define OBD_FAIL_MDS_INTENT_DELAY              0x160
 #define OBD_FAIL_MDS_XATTR_REP                 0x161
-#define OBD_FAIL_MDS_TRACK_OVERFLOW     0x162
-#define OBD_FAIL_MDS_LOV_CREATE_RACE    0x163
-#define OBD_FAIL_MDS_HSM_CDT_DELAY      0x164
-#define OBD_FAIL_MDS_ORPHAN_DELETE      0x165
-#define OBD_FAIL_MDS_RMFID_NET          0x166
-#define OBD_FAIL_MDS_CREATE_RACE        0x167
-#define OBD_FAIL_MDS_STATFS_SPOOF       0x168
-#define OBD_FAIL_MDS_REINT_OPEN                 0x169
-#define OBD_FAIL_MDS_REINT_OPEN2        0x16a
-#define OBD_FAIL_MDS_COMMITRW_DELAY     0x16b
-#define OBD_FAIL_MDS_CHANGELOG_DEL      0x16c
-#define OBD_FAIL_MDS_CHANGELOG_IDX_PUMP         0x16d
-#define OBD_FAIL_MDS_DELAY_DELORPHAN    0x16e
+#define OBD_FAIL_MDS_TRACK_OVERFLOW            0x162
+#define OBD_FAIL_MDS_LOV_CREATE_RACE           0x163
+#define OBD_FAIL_MDS_HSM_CDT_DELAY             0x164
+#define OBD_FAIL_MDS_ORPHAN_DELETE             0x165
+#define OBD_FAIL_MDS_RMFID_NET                 0x166
+#define OBD_FAIL_MDS_CREATE_RACE               0x167
+#define OBD_FAIL_MDS_STATFS_SPOOF              0x168
+#define OBD_FAIL_MDS_REINT_OPEN                        0x169
+#define OBD_FAIL_MDS_REINT_OPEN2               0x16a
+#define OBD_FAIL_MDS_COMMITRW_DELAY            0x16b
+#define OBD_FAIL_MDS_CHANGELOG_DEL             0x16c
+#define OBD_FAIL_MDS_CHANGELOG_IDX_PUMP                0x16d
+#define OBD_FAIL_MDS_DELAY_DELORPHAN           0x16e
+#define OBD_FAIL_MDS_DIR_PAGE_WALK             0x16f
 
 /* layout lock */
 #define OBD_FAIL_MDS_NO_LL_GETATTR      0x170
index e266977..4af567f 100644 (file)
@@ -3600,89 +3600,96 @@ static int mdd_readpage_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_dir_page_build(const struct lu_env *env, union lu_page *lp,
-                             size_t nob, const struct dt_it_ops *iops,
+static int mdd_dir_page_build(const struct lu_env *env, struct dt_object *obj,
+                             union lu_page *lp, size_t bytes,
+                             const struct dt_it_ops *iops,
                              struct dt_it *it, __u32 attr, void *arg)
 {
-       struct lu_dirpage       *dp = &lp->lp_dir;
-       void                    *area = dp;
-       int                      result;
-       __u64                    hash = 0;
-       struct lu_dirent        *ent;
-       struct lu_dirent        *last = NULL;
-       struct lu_fid            fid;
-       int                      first = 1;
-
-       if (nob < sizeof(*dp))
-               return -EINVAL;
-
-        memset(area, 0, sizeof (*dp));
-        area += sizeof (*dp);
-        nob  -= sizeof (*dp);
-
-        ent  = area;
-        do {
-                int    len;
+       struct lu_dirpage *dp = &lp->lp_dir;
+       void *area = dp;
+       __u64 hash = 0;
+       struct lu_dirent *ent;
+       struct lu_dirent *last = NULL;
+       struct lu_fid fid;
+       int first = 1;
+       int result;
+
+       if (bytes < sizeof(*dp))
+               GOTO(out_err, result = -EOVERFLOW);
+
+       memset(area, 0, sizeof(*dp));
+       area += sizeof(*dp);
+       bytes -= sizeof(*dp);
+
+       ent = area;
+       do {
+               int len;
                size_t recsize;
 
-                len = iops->key_size(env, it);
+               len = iops->key_size(env, it);
 
-                /* IAM iterator can return record with zero len. */
-                if (len == 0)
-                        goto next;
+               /* IAM iterator can return record with zero len. */
+               if (len == 0)
+                       GOTO(next, 0);
 
-                hash = iops->store(env, it);
-                if (unlikely(first)) {
-                        first = 0;
-                        dp->ldp_hash_start = cpu_to_le64(hash);
-                }
+               hash = iops->store(env, it);
+               if (unlikely(first)) {
+                       first = 0;
+                       dp->ldp_hash_start = cpu_to_le64(hash);
+               }
 
-                /* calculate max space required for lu_dirent */
-                recsize = lu_dirent_calc_size(len, attr);
+               /* calculate max space required for lu_dirent */
+               recsize = lu_dirent_calc_size(len, attr);
 
-                if (nob >= recsize) {
-                        result = iops->rec(env, it, (struct dt_rec *)ent, attr);
-                        if (result == -ESTALE)
-                                goto next;
-                        if (result != 0)
-                                goto out;
+               if (bytes >= recsize &&
+                   !OBD_FAIL_CHECK(OBD_FAIL_MDS_DIR_PAGE_WALK)) {
+                       result = iops->rec(env, it, (struct dt_rec *)ent, attr);
+                       if (result == -ESTALE)
+                               GOTO(next, result);
+                       if (result != 0)
+                               GOTO(out, result);
 
-                        /* osd might not able to pack all attributes,
-                         * so recheck rec length */
-                        recsize = le16_to_cpu(ent->lde_reclen);
+                       /* OSD might not able to pack all attributes, so
+                        * recheck record length had room to store FID
+                        */
+                       recsize = le16_to_cpu(ent->lde_reclen);
 
                        if (le32_to_cpu(ent->lde_attrs) & LUDA_FID) {
                                fid_le_to_cpu(&fid, &ent->lde_fid);
                                if (fid_is_dot_lustre(&fid))
-                                       goto next;
+                                       GOTO(next, recsize);
                        }
-                } else {
-                        result = (last != NULL) ? 0 :-EINVAL;
-                        goto out;
-                }
-                last = ent;
-                ent = (void *)ent + recsize;
-                nob -= recsize;
+               } else {
+                       result = (last != NULL) ? 0 : -EBADSLT;
+                       GOTO(out, result);
+               }
+               last = ent;
+               ent = (void *)ent + recsize;
+               bytes -= recsize;
 
 next:
-                result = iops->next(env, it);
-                if (result == -ESTALE)
-                        goto next;
-        } while (result == 0);
+               result = iops->next(env, it);
+               if (result == -ESTALE)
+                       GOTO(next, result);
+       } while (result == 0);
 
 out:
-        dp->ldp_hash_end = cpu_to_le64(hash);
-        if (last != NULL) {
-                if (last->lde_hash == dp->ldp_hash_end)
-                        dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
-                last->lde_reclen = 0; /* end mark */
-        }
+       dp->ldp_hash_end = cpu_to_le64(hash);
+       if (last != NULL) {
+               if (last->lde_hash == dp->ldp_hash_end)
+                       dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
+               last->lde_reclen = 0; /* end mark */
+       }
+out_err:
        if (result > 0)
                /* end of directory */
                dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
        else if (result < 0)
-               CWARN("build page failed: %d!\n", result);
-        return result;
+               CWARN("%s: build page failed for "DFID": rc = %d\n",
+                     lu_dev_name(obj->do_lu.lo_dev),
+                     PFID(lu_object_fid(&obj->do_lu)), result);
+
+       return result;
 }
 
 int mdd_readpage(const struct lu_env *env, struct md_object *obj,
index ee17b36..59cf948 100644 (file)
@@ -685,15 +685,17 @@ static inline const struct dt_index_features *dt_index_feat_select(__u64 seq,
  * RPC
  *
  * \param env - is the environment passed by the caller
+ * \param obj - index object being traversed (mostly for debugging)
  * \param lp  - is a pointer to the lu_page to fill
- * \param nob - is the maximum number of bytes that should be copied
+ * \param bytes - is the maximum number of bytes that should be copied
  * \param iops - is the index operation vector associated with the index object
  * \param it   - is a pointer to the current iterator
  * \param attr - is the index attribute to pass to iops->rec()
  * \param arg  - is a pointer to the idx_info structure
  */
-static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
-                              size_t nob, const struct dt_it_ops *iops,
+static int dt_index_page_build(const struct lu_env *env, struct dt_object *obj,
+                              union lu_page *lp, size_t bytes,
+                              const struct dt_it_ops *iops,
                               struct dt_it *it, __u32 attr, void *arg)
 {
        struct idx_info *ii = (struct idx_info *)arg;
@@ -707,13 +709,13 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
 
        ENTRY;
 
-       if (nob < LIP_HDR_SIZE)
+       if (bytes < LIP_HDR_SIZE)
                return -EINVAL;
 
        /* initialize the header of the new container */
        memset(lip, 0, LIP_HDR_SIZE);
        lip->lip_magic = LIP_MAGIC;
-       nob           -= LIP_HDR_SIZE;
+       bytes -= LIP_HDR_SIZE;
 
        /* client wants to the 64-bit hash value associated with each record */
        if (!(ii->ii_flags & II_FL_NOHASH))
@@ -734,9 +736,13 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
                        keysize = iops->key_size(env, it);
                        if (!(ii->ii_flags & II_FL_VARKEY) &&
                            keysize != ii->ii_keysize) {
-                               CERROR("keysize mismatch %hu != %hu.\n",
-                                      keysize, ii->ii_keysize);
-                               GOTO(out, rc = -EINVAL);
+                               rc = -EINVAL;
+                               CERROR("%s: keysize mismatch %hu != %hu on "
+                                      DFID": rc = %d\n",
+                                      lu_dev_name(obj->do_lu.lo_dev),
+                                      keysize, ii->ii_keysize,
+                                      PFID(lu_object_fid(&obj->do_lu)), rc);
+                               GOTO(out, rc);
                        }
                }
 
@@ -746,7 +752,7 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
                else
                        recsize = ii->ii_recsize;
 
-               if (nob < hashsize + keysize + recsize) {
+               if (bytes < hashsize + keysize + recsize) {
                        if (lip->lip_nr == 0)
                                GOTO(out, rc = -E2BIG);
                        GOTO(out, rc = 0);
@@ -769,7 +775,7 @@ static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
                        if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0))
                                ii->ii_hash_start = hash;
                        entry += hashsize + keysize + recsize;
-                       nob -= hashsize + keysize + recsize;
+                       bytes -= hashsize + keysize + recsize;
                } else if (rc != -ESTALE) {
                        GOTO(out, rc);
                }
@@ -812,7 +818,7 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
 {
        struct dt_it *it;
        const struct dt_it_ops *iops;
-       size_t pageidx, nob, nlupgs = 0;
+       size_t pageidx, bytes, nlupgs = 0;
        int rc;
        ENTRY;
 
@@ -822,8 +828,8 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
        if (filler == NULL)
                filler = dt_index_page_build;
 
-       nob = rdpg->rp_count;
-       if (nob == 0)
+       bytes = rdpg->rp_count;
+       if (bytes == 0)
                RETURN(-EFAULT);
 
        /* Iterate through index and fill containers from @rdpg */
@@ -863,7 +869,7 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
         *  rc >  0 -> end of index.
         *  rc <  0 -> error.
         */
-       for (pageidx = 0; rc == 0 && nob > 0; pageidx++) {
+       for (pageidx = 0; rc == 0 && bytes > 0; pageidx++) {
                union lu_page   *lp;
                int              i;
 
@@ -871,8 +877,9 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
                lp = kmap(rdpg->rp_pages[pageidx]);
 
                /* fill lu pages */
-               for (i = 0; i < LU_PAGE_COUNT; i++, lp++, nob -= LU_PAGE_SIZE) {
-                       rc = filler(env, lp, min_t(size_t, nob, LU_PAGE_SIZE),
+               for (i = 0; i < LU_PAGE_COUNT; i++, lp++, bytes-=LU_PAGE_SIZE) {
+                       rc = filler(env, obj, lp,
+                                   min_t(size_t, bytes, LU_PAGE_SIZE),
                                    iops, it, rdpg->rp_attrs, arg);
                        if (rc < 0)
                                break;
index de60d0f..59ec6c7 100644 (file)
@@ -1356,8 +1356,9 @@ int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
 }
 EXPORT_SYMBOL(nodemap_process_idx_pages);
 
-static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
-                             size_t nob, const struct dt_it_ops *iops,
+static int nodemap_page_build(const struct lu_env *env, struct dt_object *obj,
+                             union lu_page *lp, size_t bytes,
+                             const struct dt_it_ops *iops,
                              struct dt_it *it, __u32 attr, void *arg)
 {
        struct idx_info *ii = (struct idx_info *)arg;
@@ -1367,19 +1368,19 @@ static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
        int rc;
        ENTRY;
 
-       if (nob < LIP_HDR_SIZE)
+       if (bytes < LIP_HDR_SIZE)
                return -EINVAL;
 
        /* initialize the header of the new container */
        memset(lip, 0, LIP_HDR_SIZE);
        lip->lip_magic = LIP_MAGIC;
-       nob           -= LIP_HDR_SIZE;
+       bytes -= LIP_HDR_SIZE;
 
        entry = lip->lip_entries;
        do {
-               char            *tmp_entry = entry;
-               struct dt_key   *key;
-               __u64           hash;
+               char *tmp_entry = entry;
+               struct dt_key *key;
+               __u64 hash;
                enum nodemap_idx_type key_type;
 
                /* fetch 64-bit hash value */
@@ -1391,7 +1392,7 @@ static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
                                GOTO(out, rc = 0);
                }
 
-               if (nob < size) {
+               if (bytes < size) {
                        if (lip->lip_nr == 0)
                                GOTO(out, rc = -EINVAL);
                        GOTO(out, rc = 0);
@@ -1424,7 +1425,7 @@ static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
                                        ii->ii_hash_start = hash;
 
                                entry = tmp_entry + ii->ii_recsize;
-                               nob -= size;
+                               bytes -= size;
                        }
                }