Whamcloud - gitweb
LU-8569 lfsck: handle linkEA overflow 41/23741/10
authorFan Yong <fan.yong@intel.com>
Fri, 23 Sep 2016 05:00:47 +0000 (13:00 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 18 Jan 2017 18:58:55 +0000 (18:58 +0000)
If the linkEA is marked as overflow (32 bits timestamp in header),
and even if some hard links are removed, we still cannot clear the
overflow timestamp, because the missed hard links entries are still
not in the linkEA. It is the namespace LFSCK's duty to add the missed
entries into the linkEA (if possible) and clear the overflow timestamp
if all the hard links entries are in the linkEA. But it is possible
that there are some new hard links added during the namespace LFSCK
scanning. So the namespace LFSCK needs to compare the linkEA overflow
timestamp with the namespace LFSCK latest reset time (scan the system
from the beginning) in the 2nd-stage scanning. If the latter one is
newer, then means all the hard links entries have been in the linkEA,
so the linkEA overflow timestamp can be cleared.

To avoid the trouble caused by clock drift among MDTs, the linkEA
overflow timestamp will be set as the MDT local time on which the
object resides even if the set linkEA operation is sponsored by
some remote MDT. So we can directly compare the linkEA overflow
time with the namespace LFSCK latest reset time.

The old interfaces between MDD/OUT and LFSCK via lfsck_in_notify()
for the linkEA overflow event are obsoleted and removed.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ia2c3f9d0a0ecb0b3285041f1365cb4e075a07dda
Reviewed-on: https://review.whamcloud.com/23741
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
14 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/obd_support.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/lfsck/lfsck_striped_dir.c
lustre/mdd/mdd_dir.c
lustre/ptlrpc/wiretest.c
lustre/target/out_lib.c
lustre/tests/sanity-lfsck.sh
lustre/tests/sanity.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index a5bef55..22e246d 100644 (file)
@@ -2802,8 +2802,6 @@ enum lfsck_events {
        LE_PEER_EXIT            = 9,
        LE_CONDITIONAL_DESTROY  = 10,
        LE_PAIRS_VERIFY         = 11,
-       LE_SKIP_NLINK_DECLARE   = 13,
-       LE_SKIP_NLINK           = 14,
        LE_SET_LMV_MASTER       = 15,
        LE_SET_LMV_SLAVE        = 16,
 };
index 43b6149..cc2506d 100644 (file)
@@ -571,7 +571,6 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_LFSCK_MUL_REF         0x1622
 #define OBD_FAIL_LFSCK_BAD_TYPE                0x1623
 #define OBD_FAIL_LFSCK_NO_NAMEENTRY    0x1624
-#define OBD_FAIL_LFSCK_MORE_NLINK      0x1625
 #define OBD_FAIL_LFSCK_LESS_NLINK      0x1626
 #define OBD_FAIL_LFSCK_BAD_NAME_HASH   0x1628
 #define OBD_FAIL_LFSCK_LOST_MASTER_LMV 0x1629
index 05021ae..dd89553 100644 (file)
@@ -111,7 +111,6 @@ struct lfsck_bookmark {
 enum lfsck_namespace_trace_flags {
        LNTF_CHECK_LINKEA       = 0x01,
        LNTF_CHECK_PARENT       = 0x02,
-       LNTF_SKIP_NLINK         = 0x04,
        LNTF_CHECK_ORPHAN       = 0x08,
        LNTF_UNCERTAIN_LMV      = 0x10,
        LNTF_RECHECK_NAME_HASH  = 0x20,
@@ -274,9 +273,14 @@ struct lfsck_namespace {
         * the MDTs that contain non-verified MDT-objects. */
        __u32   ln_bitmap_size;
 
-       __u32   ln_reserved_1;
+       /* Time for the latest LFSCK scan in seconds from the beginning. */
+       __u32   ln_time_latest_reset;
+
+       /* How many linkEA overflow timestamp have been cleared. */
+       __u64   ln_linkea_overflow_cleared;
+
        /* For further using. 256-bytes aligned now. */
-       __u64   ln_reserved[15];
+       __u64   ln_reserved[14];
 };
 
 enum lfsck_layout_inconsistency_type {
@@ -993,7 +997,7 @@ int lfsck_namespace_check_exist(const struct lu_env *env,
                                struct dt_object *dir,
                                struct dt_object *obj, const char *name);
 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
-                      struct linkea_data *ldata);
+                      struct linkea_data *ldata, bool with_rec);
 int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
                                   struct lfsck_component *com,
                                   struct dt_object *obj,
@@ -1450,20 +1454,33 @@ static inline int lfsck_links_read(const struct lu_env *env,
 {
        ldata->ld_buf =
                lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
-                                      PAGE_SIZE);
+                                      MAX_LINKEA_SIZE);
+
+       return __lfsck_links_read(env, obj, ldata, false);
+}
+
+/* Read linkEA for the given object, the linkEA should contain
+ * at least one entry, otherwise, -ENODATA will be returned. */
+static inline int lfsck_links_read_with_rec(const struct lu_env *env,
+                                           struct dt_object *obj,
+                                           struct linkea_data *ldata)
+{
+       ldata->ld_buf =
+               lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
+                                      MAX_LINKEA_SIZE);
 
-       return __lfsck_links_read(env, obj, ldata);
+       return __lfsck_links_read(env, obj, ldata, true);
 }
 
-static inline int lfsck_links_read2(const struct lu_env *env,
-                                   struct dt_object *obj,
-                                   struct linkea_data *ldata)
+static inline int lfsck_links_read2_with_rec(const struct lu_env *env,
+                                            struct dt_object *obj,
+                                            struct linkea_data *ldata)
 {
        ldata->ld_buf =
                lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf2,
-                                      PAGE_SIZE);
+                                      MAX_LINKEA_SIZE);
 
-       return __lfsck_links_read(env, obj, ldata);
+       return __lfsck_links_read(env, obj, ldata, true);
 }
 
 static inline struct lfsck_lmv *lfsck_lmv_get(struct lfsck_lmv *llmv)
index 74ae617..05ee675 100644 (file)
@@ -1804,13 +1804,9 @@ again:
        if (rc != -ENOENT)
                GOTO(unlock, rc);
 
-       rc = linkea_data_new(&ldata,
-                            &lfsck_env_info(env)->lti_linkea_buf);
-       if (rc != 0)
-               GOTO(unlock, rc);
-
        pname = lfsck_name_get_const(env, name, strlen(name));
-       rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
+       rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf,
+                             pname, lfsck_dto2fid(lfsck->li_lpf_obj));
        if (rc != 0)
                GOTO(unlock, rc);
 
index 8319208..17c2dd5 100644 (file)
@@ -642,13 +642,9 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        int                      rc;
        ENTRY;
 
-       rc = linkea_data_new(&ldata,
-                            &lfsck_env_info(env)->lti_linkea_buf2);
-       if (rc != 0)
-               RETURN(rc);
-
        cname = lfsck_name_get_const(env, name, strlen(name));
-       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+       rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf2,
+                             cname, lfsck_dto2fid(parent));
        if (rc != 0)
                RETURN(rc);
 
@@ -801,13 +797,9 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
        int                      rc;
        ENTRY;
 
-       rc = linkea_data_new(&ldata,
-                            &lfsck_env_info(env)->lti_linkea_buf2);
-       if (rc != 0)
-               RETURN(rc);
-
        cname = lfsck_name_get_const(env, name, strlen(name));
-       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+       rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf2,
+                             cname, lfsck_dto2fid(parent));
        if (rc != 0)
                RETURN(rc);
 
@@ -3340,8 +3332,6 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
        case LE_FID_ACCESSED:
        case LE_PEER_EXIT:
        case LE_CONDITIONAL_DESTROY:
-       case LE_SKIP_NLINK_DECLARE:
-       case LE_SKIP_NLINK:
        case LE_SET_LMV_MASTER:
        case LE_SET_LMV_SLAVE:
        case LE_PAIRS_VERIFY: {
index 75c939e..c43c64d 100644 (file)
@@ -164,6 +164,9 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
        dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped);
        dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed);
        dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size);
+       dst->ln_time_latest_reset = le32_to_cpu(src->ln_time_latest_reset);
+       dst->ln_linkea_overflow_cleared =
+                               le64_to_cpu(src->ln_linkea_overflow_cleared);
 }
 
 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
@@ -233,6 +236,9 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped);
        dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed);
        dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size);
+       dst->ln_time_latest_reset = cpu_to_le32(src->ln_time_latest_reset);
+       dst->ln_linkea_overflow_cleared =
+                               cpu_to_le64(src->ln_linkea_overflow_cleared);
 }
 
 static void lfsck_namespace_record_failure(const struct lu_env *env,
@@ -508,6 +514,7 @@ static int lfsck_namespace_init(const struct lu_env *env,
        memset(ns, 0, sizeof(*ns));
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
+       ns->ln_time_latest_reset = cfs_time_current_sec();
        down_write(&com->lc_sem);
        rc = lfsck_namespace_store(env, com);
        up_write(&com->lc_sem);
@@ -670,6 +677,11 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
 {
        int rc;
 
+       /* For remote updating LINKEA, there may be further LFSCK action
+        * on remote MDT after the updating, so update the LINKEA ASAP. */
+       if (dt_object_remote(obj))
+               handle->th_sync = 1;
+
        /* For destroying all invalid linkEA entries. */
        rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
        if (rc == 0)
@@ -681,7 +693,7 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
 }
 
 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
-                      struct linkea_data *ldata)
+                      struct linkea_data *ldata, bool with_rec)
 {
        int rc;
 
@@ -711,8 +723,12 @@ int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
        if (unlikely(rc == 0))
                return -ENODATA;
 
-       if (rc > 0)
-               rc = linkea_init(ldata);
+       if (rc > 0) {
+               if (with_rec)
+                       rc = linkea_init_with_rec(ldata);
+               else
+                       rc = linkea_init(ldata);
+       }
 
        return rc;
 }
@@ -787,11 +803,22 @@ log:
 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
                             struct linkea_data *ldata, struct thandle *handle)
 {
-       const struct lu_buf *buf = lfsck_buf_get_const(env,
-                                                      ldata->ld_buf->lb_buf,
-                                                      ldata->ld_leh->leh_len);
+       struct lu_buf buf;
+       int rc;
+
+       lfsck_buf_init(&buf, ldata->ld_buf->lb_buf, ldata->ld_leh->leh_len);
 
-       return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle);
+again:
+       rc = dt_xattr_set(env, obj, &buf, XATTR_NAME_LINK, 0, handle);
+       if (unlikely(rc == -ENOSPC)) {
+               rc = linkea_overflow_shrink(ldata);
+               if (likely(rc > 0)) {
+                       buf.lb_len = rc;
+                       goto again;
+               }
+       }
+
+       return rc;
 }
 
 static int lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
@@ -991,11 +1018,8 @@ again:
 
        cname->ln_name = info->lti_key;
        cname->ln_namelen = namelen;
-       rc = linkea_data_new(&ldata2, &info->lti_linkea_buf2);
-       if (rc != 0)
-               GOTO(log, rc);
-
-       rc = linkea_add_buf(&ldata2, cname, pfid);
+       rc = linkea_links_new(&ldata2, &info->lti_linkea_buf2,
+                             cname, pfid);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -1058,10 +1082,8 @@ again:
                GOTO(stop, rc);
 
        dt_write_lock(env, orphan, 0);
-       rc = lfsck_links_read2(env, orphan, &ldata2);
-       if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
-                  (rc == 0 && ldata2.ld_leh != NULL &&
-                   ldata2.ld_leh->leh_reccount == 0))) {
+       rc = lfsck_links_read2_with_rec(env, orphan, &ldata2);
+       if (likely(rc == -ENODATA || rc == -EINVAL)) {
                if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
                        GOTO(unlock, rc = 1);
 
@@ -1395,11 +1417,8 @@ again:
        memset(dof, 0, sizeof(*dof));
        dof->dof_type = dt_mode_to_dft(S_IFDIR);
 
-       rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
-       if (rc != 0)
-               GOTO(unlock1, rc);
-
-       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+       rc = linkea_links_new(&ldata, &info->lti_linkea_buf2,
+                             cname, lfsck_dto2fid(parent));
        if (rc != 0)
                GOTO(unlock1, rc);
 
@@ -1587,7 +1606,8 @@ static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
        else
                lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
                                                    true);
-       if (ldata->ld_leh->leh_reccount > 0) {
+       if (ldata->ld_leh->leh_reccount > 0 ||
+           unlikely(ldata->ld_leh->leh_overflow_time)) {
                lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
                               ldata->ld_leh->leh_len);
                buflen = linkea_buf.lb_len;
@@ -1614,10 +1634,9 @@ again:
        if (unlikely(lfsck_is_dead_obj(obj)))
                GOTO(unlock2, rc = -ENOENT);
 
-       rc = lfsck_links_read2(env, obj, &ldata_new);
-       if (rc != 0)
-               GOTO(unlock2,
-                    rc = (rc == -ENODATA ? 0 : rc));
+       rc = lfsck_links_read2_with_rec(env, obj, &ldata_new);
+       if (rc)
+               GOTO(unlock2, rc = (rc == -ENODATA ? 0 : rc));
 
        /* The specified linkEA entry has been removed by race. */
        rc = linkea_links_find(&ldata_new, cname, pfid);
@@ -1641,14 +1660,11 @@ again:
                goto again;
        }
 
-       if (ldata_new.ld_leh->leh_reccount > 0) {
-               lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
-                              ldata_new.ld_leh->leh_len);
-               rc = dt_xattr_set(env, obj, &linkea_buf,
-                                 XATTR_NAME_LINK, 0, th);
-       } else {
+       if (ldata_new.ld_leh->leh_reccount > 0 ||
+           unlikely(ldata->ld_leh->leh_overflow_time))
+               rc = lfsck_links_write(env, obj, &ldata_new, th);
+       else
                rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th);
-       }
 
        GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
 
@@ -1890,7 +1906,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
 
 replace:
        dt_read_lock(env, child, 0);
-       rc = lfsck_links_read2(env, child, &ldata);
+       rc = lfsck_links_read2_with_rec(env, child, &ldata);
        dt_read_unlock(env, child);
 
        /* Someone changed the child, no need to replace. */
@@ -2238,11 +2254,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
        LASSERT(!dt_object_remote(obj));
        LASSERT(S_ISDIR(lfsck_object_type(obj)));
 
-       rc = linkea_data_new(&ldata, &info->lti_big_buf);
-       if (rc != 0)
-               GOTO(log, rc);
-
-       rc = linkea_add_buf(&ldata, cname, pfid);
+       rc = linkea_links_new(&ldata, &info->lti_big_buf, cname, pfid);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -2288,8 +2300,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
        if (rc != 0)
                GOTO(unlock, rc);
 
-       rc = dt_xattr_set(env, obj, &linkea_buf,
-                         XATTR_NAME_LINK, 0, th);
+       rc = lfsck_links_write(env, obj, &ldata, th);
 
        GOTO(unlock, rc = (rc == 0 ? 1 : rc));
 
@@ -2804,11 +2815,8 @@ rebuild:
                        /* It is the most common case that we find the
                         * name entry corresponding to the linkEA entry
                         * that matches the ".." name entry. */
-                       rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
-                       if (rc != 0)
-                               RETURN(rc);
-
-                       rc = linkea_add_buf(&ldata_new, cname, pfid2);
+                       rc = linkea_links_new(&ldata_new, &info->lti_big_buf,
+                                             cname, pfid2);
                        if (rc != 0)
                                RETURN(rc);
 
@@ -2882,6 +2890,10 @@ next:
                lfsck_linkea_del_buf(ldata, cname);
        } /* while (ldata->ld_lee != NULL) */
 
+       /* If there is still linkEA overflow, return. */
+       if (unlikely(ldata->ld_leh->leh_overflow_time))
+               RETURN(0);
+
        linkea_first_entry(ldata);
        if (ldata->ld_leh->leh_reccount == 1) {
                rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
@@ -2923,7 +2935,7 @@ next:
  *
  * If all the known name entries have been verified, then the object's hard
  * link attribute should match the object's linkEA entries count unless the
- * object's has too much hard link to be recorded in the linkEA. Such cases
+ * object's has too many hard link to be recorded in the linkEA. Such cases
  * should have been marked in the LFSCK trace file. Otherwise, trust the
  * linkEA to update the object's nlink attribute.
  *
@@ -2942,8 +2954,6 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
                                        struct dt_object *obj,
                                        struct lu_attr *la)
 {
-       struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       struct lu_fid                   *tfid   = &info->lti_fid3;
        struct lfsck_namespace          *ns     = com->lc_file_ram;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct dt_device                *dev    = lfsck_obj2dev(obj);
@@ -2952,9 +2962,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
        struct linkea_data               ldata  = { NULL };
        struct lustre_handle             lh     = { 0 };
        __u32                            old    = la->la_nlink;
-       int                              idx;
        int                              rc     = 0;
-       __u8                             flags;
        ENTRY;
 
        LASSERT(!dt_object_remote(obj));
@@ -2988,26 +2996,20 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
        if (ns->ln_flags & LF_INCOMPLETE)
                GOTO(unlock, rc = 0);
 
-       fid_cpu_to_be(tfid, cfid);
-       idx = lfsck_sub_trace_file_fid2idx(cfid);
-       rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj,
-                      (struct dt_rec *)&flags, (const struct dt_key *)tfid);
-       if (rc != 0)
-               GOTO(unlock, rc);
-
-       if (flags & LNTF_SKIP_NLINK)
-               GOTO(unlock, rc = 0);
-
        rc = dt_attr_get(env, obj, la);
        if (rc != 0)
                GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc));
 
-       rc = lfsck_links_read2(env, obj, &ldata);
-       if (rc != 0)
+       rc = lfsck_links_read2_with_rec(env, obj, &ldata);
+       if (rc)
                GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc));
 
-       if (la->la_nlink == ldata.ld_leh->leh_reccount ||
-           unlikely(la->la_nlink == 0))
+       /* XXX: Currently, we only update the nlink attribute if the known
+        *      linkEA entries is larger than the nlink attribute. That is
+        *      safe action. */
+       if (la->la_nlink >= ldata.ld_leh->leh_reccount ||
+           unlikely(la->la_nlink == 0 ||
+                    ldata.ld_leh->leh_overflow_time))
                GOTO(unlock, rc = 0);
 
        la->la_nlink = ldata.ld_leh->leh_reccount;
@@ -3284,6 +3286,117 @@ out:
        return rc;
 }
 
+#define lfsck_time_before(a, b)                \
+       (typecheck(__u32, a) &&         \
+        typecheck(__u32, b) &&         \
+        ((int)(a) - (int)(b) < 0))
+
+static inline bool
+lfsck_namespace_linkea_stale_overflow(struct linkea_data *ldata,
+                                     struct lfsck_namespace *ns)
+{
+       /* Both the leh_overflow_time and ln_time_latest_reset are
+        * local time based, so need NOT to care about clock drift
+        * among the servers. */
+       return ldata->ld_leh->leh_overflow_time &&
+              lfsck_time_before(ldata->ld_leh->leh_overflow_time,
+                                ns->ln_time_latest_reset);
+}
+
+/**
+ * Clear the object's linkEA overflow timestamp.
+ *
+ * If the MDT-object has too many hard links as to the linkEA cannot hold
+ * all of them, then overflow timestamp will be set in the linkEA header.
+ * If some hard links are removed after that, then it is possible to hold
+ * other missed linkEA entries. If the namespace LFSCK have added all the
+ * related linkEA entries, then it will remove the overflow timestamp.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] ldata    pointer to the linkEA data for the given @obj
+ * \param[in] obj      pointer to the dt_object to be handled
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_linkea_clear_overflow(const struct lu_env *env,
+                                                struct lfsck_component *com,
+                                                struct linkea_data *ldata,
+                                                struct dt_object *obj)
+{
+       struct lfsck_namespace *ns = com->lc_file_ram;
+       struct lfsck_instance *lfsck = com->lc_lfsck;
+       struct dt_device *dev = lfsck_obj2dev(obj);
+       struct thandle *th = NULL;
+       struct lustre_handle lh = { 0 };
+       struct lu_buf linkea_buf;
+       int rc = 0;
+       ENTRY;
+
+       LASSERT(!dt_object_remote(obj));
+
+       rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+                             MDS_INODELOCK_UPDATE, LCK_PW);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       rc = dt_declare_xattr_set(env, obj,
+                       lfsck_buf_get_const(env, NULL, MAX_LINKEA_SIZE),
+                       XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       rc = lfsck_links_read(env, obj, ldata);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       if (unlikely(!lfsck_namespace_linkea_stale_overflow(ldata, ns)))
+               GOTO(unlock, rc = 0);
+
+       ldata->ld_leh->leh_overflow_time = 0;
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 1);
+
+       /* If all known entries are in the linkEA, then the 'leh_reccount'
+        * should NOT be zero. */
+       LASSERT(ldata->ld_leh->leh_reccount > 0);
+
+       lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
+                      ldata->ld_leh->leh_len);
+       rc = dt_xattr_set(env, obj, &linkea_buf, XATTR_NAME_LINK, 0, th);
+       if (unlikely(rc == -ENOSPC))
+               rc = 0;
+       else if (!rc)
+               rc = 1;
+
+       GOTO(unlock, rc);
+
+unlock:
+       dt_write_unlock(env, obj);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       lfsck_ibits_unlock(&lh, LCK_PW);
+       CDEBUG(D_LFSCK, "%s: clear linkea overflow timestamp for the object "
+              DFID": rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
+
+       return rc;
+}
+
 /**
  * Double scan the MDT-object for namespace LFSCK.
  *
@@ -3349,12 +3462,23 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
                        lfsck_ibits_unlock(&lh, LCK_EX);
                }
 
-               GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
+               GOTO(out, rc);
        }
 
        if (rc != 0)
                GOTO(out, rc);
 
+       if (!(ns->ln_flags & LF_INCOMPLETE) &&
+           unlikely(lfsck_namespace_linkea_stale_overflow(&ldata, ns))) {
+               rc = lfsck_namespace_linkea_clear_overflow(env, com, &ldata,
+                                                          child);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               if (rc > 0)
+                       ns->ln_linkea_overflow_cleared++;
+       }
+
        linkea_first_entry(&ldata);
        while (ldata.ld_lee != NULL) {
                rc = lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
@@ -3527,7 +3651,7 @@ lost_parent:
                        GOTO(out, rc);
 
                /* If there is no name entry in the parent dir and the object
-                * link count is less than the linkea entries count, then the
+                * link count is fewer than the linkea entries count, then the
                 * linkea entry should be removed. */
                if (ldata.ld_leh->leh_reccount > la->la_nlink) {
                        rc = lfsck_namespace_shrink_linkea_cond(env, com,
@@ -3628,7 +3752,9 @@ out:
                 * other MDT that references this object with another name,
                 * so we cannot know whether this linkEA is valid or not.
                 * So keep it there and maybe resolved when next LFSCK run. */
-               if (!(ns->ln_flags & LF_INCOMPLETE)) {
+               if (!(ns->ln_flags & LF_INCOMPLETE) &&
+                   (ldata.ld_leh == NULL ||
+                    !ldata.ld_leh->leh_overflow_time)) {
                        /* If the child becomes orphan, then insert it into
                         * the global .lustre/lost+found/MDTxxxx directory. */
                        rc = lfsck_namespace_insert_orphan(env, com, child,
@@ -3657,7 +3783,8 @@ out:
                                       PFID(lfsck_dto2fid(child)),
                                       la->la_nlink, count,
                                       lfsck_object_type(child));
-                       } else {
+                       } else if (la->la_nlink < count &&
+                                  likely(!ldata.ld_leh->leh_overflow_time)) {
                                rc = lfsck_namespace_repair_nlink(env, com,
                                                                  child, la);
                                if (rc > 0) {
@@ -3718,6 +3845,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                   "striped_shards_failed: %llu\n"
                   "striped_shards_skipped: %llu\n"
                   "name_hash_repaired: %llu\n"
+                  "linkea_overflow_cleared: %llu\n"
                   "success_count: %u\n"
                   "run_time_phase1: %u seconds\n"
                   "run_time_phase2: %u seconds\n",
@@ -3753,6 +3881,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                   ns->ln_striped_shards_failed,
                   ns->ln_striped_shards_skipped,
                   ns->ln_name_hash_repaired,
+                  ns->ln_linkea_overflow_cleared,
                   ns->ln_success_count,
                   time_phase1,
                   time_phase2);
@@ -3841,6 +3970,7 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        }
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
+       ns->ln_time_latest_reset = cfs_time_current_sec();
 
        lfsck_object_put(env, com->lc_obj);
        com->lc_obj = NULL;
@@ -4157,7 +4287,7 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env,
                GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
        }
 
-       if (rc == -ENODATA) {
+       if (rc == -ENODATA || unlikely(!ldata.ld_leh->leh_reccount)) {
                rc = lfsck_namespace_check_for_double_scan(env, com, obj);
 
                GOTO(out, rc);
@@ -4599,100 +4729,6 @@ static int lfsck_namespace_in_notify(const struct lu_env *env,
        ENTRY;
 
        switch (lr->lr_event) {
-       case LE_SKIP_NLINK_DECLARE: {
-               struct dt_object        *obj;
-               struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
-               int                      idx;
-               __u8                     flags = 0;
-
-               LASSERT(th != NULL);
-
-               idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
-               mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-               obj = com->lc_sub_trace_objs[idx].lsto_obj;
-               if (unlikely(obj == NULL)) {
-                       mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-                       RETURN(0);
-               }
-
-               lfsck_object_get(obj);
-               fid_cpu_to_be(key, &lr->lr_fid);
-               rc = dt_declare_delete(env, obj,
-                                      (const struct dt_key *)key, th);
-               if (rc == 0)
-                       rc = dt_declare_insert(env, obj,
-                                              (const struct dt_rec *)&flags,
-                                              (const struct dt_key *)key, th);
-               mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-               lfsck_object_put(env, obj);
-
-               RETURN(rc);
-       }
-       case LE_SKIP_NLINK: {
-               struct dt_object        *obj;
-               struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
-               int                      idx;
-               __u8                     flags = 0;
-               bool                     exist = false;
-               ENTRY;
-
-               LASSERT(th != NULL);
-
-               idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
-               mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-               obj = com->lc_sub_trace_objs[idx].lsto_obj;
-               if (unlikely(obj == NULL)) {
-                       mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-                       RETURN(0);
-               }
-
-               lfsck_object_get(obj);
-               fid_cpu_to_be(key, &lr->lr_fid);
-               rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
-                              (const struct dt_key *)key);
-               if (rc == 0) {
-                       if (flags & LNTF_SKIP_NLINK) {
-                               mutex_unlock(
-                               &com->lc_sub_trace_objs[idx].lsto_mutex);
-                               lfsck_object_put(env, obj);
-
-                               RETURN(0);
-                       }
-
-                       exist = true;
-               } else if (rc != -ENOENT) {
-                       GOTO(log, rc);
-               }
-
-               flags |= LNTF_SKIP_NLINK;
-               if (exist) {
-                       rc = dt_delete(env, obj, (const struct dt_key *)key,
-                                      th);
-                       if (rc != 0)
-                               GOTO(log, rc);
-               }
-
-               rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
-                              (const struct dt_key *)key, th, 1);
-
-               GOTO(log, rc);
-
-log:
-               mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-               lfsck_object_put(env, obj);
-               CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
-                      " to be skipped for namespace double scan: rc = %d\n",
-                      lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
-
-               if (rc != 0)
-                       /* If we cannot record this object in the LFSCK tracing,
-                        * we have to mark the LFSC as LF_INCOMPLETE, then the
-                        * LFSCK will skip nlink attribute verification for
-                        * all objects. */
-                       ns->ln_flags |= LF_INCOMPLETE;
-
-               return 0;
-       }
        case LE_SET_LMV_MASTER: {
                struct dt_object        *obj;
 
@@ -4967,11 +5003,8 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env,
        if (IS_ERR(child))
                GOTO(log, rc = PTR_ERR(child));
 
-       rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
-       if (rc != 0)
-               GOTO(log, rc);
-
-       rc = linkea_add_buf(&ldata, cname, pfid);
+       rc = linkea_links_new(&ldata, &info->lti_linkea_buf2,
+                             cname, pfid);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -5426,6 +5459,8 @@ nodata:
                        goto again;
                }
 
+               LASSERT(handle != NULL);
+
                if (dir == NULL) {
                        dir = lfsck_assistant_object_load(env, lfsck, lso);
                        if (IS_ERR(dir)) {
@@ -5461,37 +5496,8 @@ nodata:
                }
 
                rc = linkea_add_buf(&ldata, cname, pfid);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = lfsck_links_write(env, obj, &ldata, handle);
-               if (unlikely(rc == -ENOSPC) &&
-                   S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) {
-                       if (handle != NULL) {
-                               LASSERT(dt_write_locked(env, obj));
-
-                               dt_write_unlock(env, obj);
-                               dtlocked = false;
-
-                               dt_trans_stop(env, dev, handle);
-                               handle = NULL;
-
-                               lfsck_ibits_unlock(&lh, LCK_EX);
-                       }
-
-                       rc = lfsck_namespace_trace_update(env, com,
-                                       &lnr->lnr_fid, LNTF_SKIP_NLINK, true);
-                       if (rc != 0)
-                               /* If we cannot record this object in the
-                                * LFSCK tracing, we have to mark the LFSCK
-                                * as LF_INCOMPLETE, then the LFSCK will
-                                * skip nlink attribute verification for
-                                * all objects. */
-                               ns->ln_flags |= LF_INCOMPLETE;
-
-                       GOTO(out, rc = 0);
-               }
-
+               if (rc == 0)
+                       rc = lfsck_links_write(env, obj, &ldata, handle);
                if (rc != 0)
                        GOTO(stop, rc);
 
@@ -5601,9 +5607,10 @@ trace:
                        if (log)
                                CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
                                       "repaired the entry: "DFID", parent "DFID
-                                      ", name %.*s\n", lfsck_lfsck2name(lfsck),
+                                      ", name %.*s, type %d\n",
+                                      lfsck_lfsck2name(lfsck),
                                       PFID(&lnr->lnr_fid), PFID(pfid),
-                                      lnr->lnr_namelen, lnr->lnr_name);
+                                      lnr->lnr_namelen, lnr->lnr_name, type);
 
                        switch (type) {
                        case LNIT_DANGLING:
@@ -6478,7 +6485,7 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj,
 
        LASSERT(S_ISDIR(lfsck_object_type(obj)));
 
-       rc = lfsck_links_read(env, obj, &ldata);
+       rc = lfsck_links_read_with_rec(env, obj, &ldata);
        if (rc == -ENODATA) {
                dirty = true;
        } else if (rc == 0) {
@@ -6495,11 +6502,8 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj,
        if (!dirty)
                RETURN(rc);
 
-       rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
-       if (rc != 0)
-               RETURN(rc);
-
-       rc = linkea_add_buf(&ldata, cname, pfid);
+       rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf,
+                             cname, pfid);
        if (rc != 0)
                RETURN(rc);
 
@@ -6550,14 +6554,11 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
        struct linkea_data        ldata = { NULL };
        int                       rc;
 
-       rc = lfsck_links_read(env, obj, &ldata);
-       if (rc != 0)
+       rc = lfsck_links_read_with_rec(env, obj, &ldata);
+       if (rc)
                return rc;
 
        linkea_first_entry(&ldata);
-       if (ldata.ld_lee == NULL)
-               return -ENODATA;
-
        linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
        /* To guarantee the 'name' is terminated with '0'. */
        memcpy(name, cname->ln_name, cname->ln_namelen);
index 280c960..857953a 100644 (file)
@@ -1984,7 +1984,7 @@ int lfsck_namespace_striped_dir_rescan(const struct lu_env *env,
                        break;
                }
 
-               rc1 = lfsck_links_read(env, obj, &ldata);
+               rc1 = lfsck_links_read_with_rec(env, obj, &ldata);
                if (rc1 == -ENOENT) {
                        create = true;
                        goto repair;
@@ -2122,7 +2122,8 @@ repair:
                if (repair_linkea) {
                        struct lustre_handle lh = { 0 };
 
-                       rc1 = linkea_data_new(&ldata, &info->lti_big_buf);
+                       rc1 = linkea_links_new(&ldata, &info->lti_big_buf,
+                                              cname, lfsck_dto2fid(dir));
                        if (rc1 != 0)
                                goto next;
 
@@ -2140,10 +2141,6 @@ repair:
                                }
                        }
 
-                       rc1 = linkea_add_buf(&ldata, cname, lfsck_dto2fid(dir));
-                       if (rc1 != 0)
-                               goto next;
-
                        rc1 = lfsck_ibits_lock(env, lfsck, obj, &lh,
                                               MDS_INODELOCK_UPDATE |
                                               MDS_INODELOCK_XATTR, LCK_EX);
index 4c20548..4dbb755 100644 (file)
@@ -1241,12 +1241,6 @@ static int mdd_declare_link(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
-               rc = mdo_declare_ref_add(env, c, handle);
-               if (rc != 0)
-                       return rc;
-       }
-
        la->la_valid = LA_CTIME | LA_MTIME;
        rc = mdo_declare_attr_set(env, p, la, handle);
        if (rc != 0)
@@ -1329,12 +1323,6 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                        GOTO(out_unlock, rc);
        }
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
-               rc = mdo_ref_add(env, mdd_sobj, handle);
-               if (rc != 0)
-                       GOTO(out_unlock, rc);
-       }
-
        *tfid = *mdo2fid(mdd_sobj);
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
                tfid->f_oid = cfs_fail_val;
index 55c6b5e..2fe199b 100644 (file)
@@ -4863,10 +4863,6 @@ void lustre_assert_wire_constants(void)
                 (long long)LE_CONDITIONAL_DESTROY);
        LASSERTF(LE_PAIRS_VERIFY == 11, "found %lld\n",
                 (long long)LE_PAIRS_VERIFY);
-       LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n",
-                (long long)LE_SKIP_NLINK_DECLARE);
-       LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
-                (long long)LE_SKIP_NLINK);
        LASSERTF(LE_SET_LMV_MASTER == 15, "found %lld\n",
                 (long long)LE_SET_LMV_MASTER);
        LASSERTF(LE_SET_LMV_SLAVE == 16, "found %lld\n",
index 98f2741..e71484d 100644 (file)
@@ -748,10 +748,64 @@ static int out_tx_xattr_set_exec(const struct lu_env *env,
 
                ldata.ld_buf = &arg->u.xattr_set.buf;
                if (strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0) {
+                       struct link_ea_header *leh;
+
                        linkea = true;
                        rc = linkea_init(&ldata);
                        if (unlikely(rc))
                                GOTO(out, rc == -ENODATA ? -EINVAL : rc);
+
+                       leh = ldata.ld_leh;
+                       LASSERT(leh != NULL);
+
+                       /* If the new linkEA contains overflow timestamp,
+                        * then two cases:
+                        *
+                        * 1. The old linkEA for the object has already
+                        *    overflowed before current setting, the new
+                        *    linkEA does not contains new link entry. So
+                        *    the linkEA overflow timestamp is unchanged.
+                        *
+                        * 2. There are new link entry in the new linkEA,
+                        *    so its overflow timestamp is differnt from
+                        *    the old one. Usually, the overstamp in the
+                        *    given linkEA is newer. But because of clock
+                        *    drift among MDTs, the timestamp may become
+                        *    older. So here, we convert the timestamp to
+                        *    the server local time. Then namespace LFSCK
+                        *    that uses local time can handle it easily. */
+                       if (unlikely(leh->leh_overflow_time)) {
+                               struct lu_buf tbuf = { 0 };
+                               bool update = false;
+
+                               lu_buf_alloc(&tbuf, MAX_LINKEA_SIZE);
+                               if (tbuf.lb_buf == NULL)
+                                       GOTO(unlock, rc = -ENOMEM);
+
+                               rc = dt_xattr_get(env, dt_obj, &tbuf,
+                                                 XATTR_NAME_LINK);
+                               if (rc > 0) {
+                                       struct linkea_data tdata = { 0 };
+
+                                       tdata.ld_buf = &tbuf;
+                                       rc = linkea_init(&tdata);
+                                       if (rc || leh->leh_overflow_time !=
+                                           tdata.ld_leh->leh_overflow_time)
+                                               update = true;
+                               } else {
+                                       /* Update the timestamp by force if
+                                        * fail to load the old linkEA. */
+                                       update = true;
+                               }
+
+                               lu_buf_free(&tbuf);
+                               if (update) {
+                                       leh->leh_overflow_time =
+                                                       cfs_time_current_sec();
+                                       if (unlikely(!leh->leh_overflow_time))
+                                               leh->leh_overflow_time++;
+                               }
+                       }
                } else {
                        linkea = false;
                }
@@ -769,6 +823,8 @@ again:
                                goto again;
                        }
                }
+
+unlock:
                dt_write_unlock(env, dt_obj);
        }
 
index d4507cf..1796dfb 100644 (file)
@@ -3750,7 +3750,10 @@ test_29a() {
        count=$(stat --format=%h $DIR/$tdir/d0/foo)
        [ $count -eq 2 ] || error "(8) Fail to repair nlink count: $count"
 }
-run_test 29a "LFSCK can repair bad nlink count (1)"
+# Disable 29a, we only allow nlink to be updated if the known linkEA
+# entries is larger than nlink count.
+#
+#run_test 29a "LFSCK can repair bad nlink count (1)"
 
 test_29b() {
        echo "#####"
@@ -3794,59 +3797,100 @@ test_29b() {
 }
 run_test 29b "LFSCK can repair bad nlink count (2)"
 
-test_29c() {
+test_29c()
+{
        echo "#####"
-       echo "There are too many hard links to the object, and exceeds the"
-       echo "object's linkEA limitation, as to NOT all the known name entries"
-       echo "will be recorded in the linkEA. Under such case, LFSCK should"
-       echo "skip the nlink verification for this object."
+       echo "The namespace LFSCK will create many hard links to the target"
+       echo "file as to exceed the linkEA size limitation. Under such case"
+       echo "the linkEA will be marked as overflow that will prevent the"
+       echo "target file to be migrated. Then remove some hard links to"
+       echo "make the left hard links to be held within the linkEA size"
+       echo "limitation. But before the namespace LFSCK adding all the"
+       echo "missed linkEA entries back, the overflow mark (timestamp)"
+       echo "will not be cleared."
        echo "#####"
 
        check_mount_and_prep
 
-       $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0"
-       touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo"
-       ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 ||
-               error "(3) Fail to hard link to $DIR/$tdir/d0/foo"
+       mkdir -p $DIR/$tdir/guard || error "(0.1) Fail to mkdir"
+       $LFS mkdir -i $((MDSCOUNT - 1)) $DIR/$tdir/foo ||
+               error "(0.2) Fail to mkdir"
+       touch $DIR/$tdir/guard/f0 || error "(1) Fail to create"
+       local oldfid=$($LFS path2fid $DIR/$tdir/guard/f0)
+
+       # define MAX_LINKEA_SIZE        4096
+       # sizeof(link_ea_header) = 24
+       # sizeof(link_ea_entry) = 18
+       # nlink_min=$(((MAX_LINKEA_SIZE - sizeof(link_ea_header)) /
+       #             (sizeof(link_ea_entry) + name_length))
+       # If the average name length is 12 bytes, then 150 hard links
+       # is totally enough to overflow the linkEA
+       echo "Create 150 hard links should succeed although the linkEA overflow"
+       createmany -l $DIR/$tdir/guard/f0 $DIR/$tdir/foo/ttttttttttt 150 ||
+               error "(2) Fail to hard link"
 
-       echo "Inject failure stub on MDT0 to simulate the case that"
-       echo "foo's hard links exceed the object's linkEA limitation."
+       cancel_lru_locks mdc
+       if [ $MDSCOUNT -ge 2 ]; then
+               $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null ||
+                       error "(3.1) Migrate failure"
 
-       ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h2 ||
-               error "(4) Fail to hard link to $DIR/$tdir/d0/foo"
+               echo "The object with linkEA overflow should NOT be migrated"
+               local newfid=$($LFS path2fid $DIR/$tdir/guard/f0)
+               [ "$newfid" == "$oldfid" ] ||
+                       error "(3.2) Migrate should fail: $newfid != $oldfid"
+       fi
 
-       cancel_lru_locks mdc
+       # Remove 100 hard links, then the linkEA should have space
+       # to hold the missed linkEA entries.
+       echo "Remove 100 hard links to save space for the missed linkEA entries"
+       unlinkmany $DIR/$tdir/foo/ttttttttttt 100 || error "(4) Fail to unlink"
 
-       local count1=$(stat --format=%h $DIR/$tdir/d0/foo)
-       [ $count1 -eq 3 ] || error "(5) Stat failure: $count1"
+       if [ $MDSCOUNT -ge 2 ]; then
+               $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null ||
+                       error "(5.1) Migrate failure"
 
-       local foofid=$($LFS path2fid $DIR/$tdir/d0/foo)
-       $LFS fid2path $DIR $foofid
-       local count2=$($LFS fid2path $DIR $foofid | wc -l)
-       [ $count2 -eq 2 ] || error "(6) Fail to inject error: $count2"
+               # The overflow timestamp is still there, so migration will fail.
+               local newfid=$($LFS path2fid $DIR/$tdir/guard/f0)
+               [ "$newfid" == "$oldfid" ] ||
+                       error "(5.2) Migrate should fail: $newfid != $oldfid"
+       fi
 
-       echo "Trigger namespace LFSCK to repair the nlink count"
+       # sleep 3 seconds to guarantee that the overflow is recognized
+       sleep 3
+
+       echo "Trigger namespace LFSCK to clear the overflow timestamp"
        $START_NAMESPACE -r -A ||
-               error "(7) Fail to start LFSCK for namespace"
+               error "(6) Fail to start LFSCK for namespace"
 
-       wait_all_targets_blocked namespace completed 8
+       wait_all_targets_blocked namespace completed 7
 
        local repaired=$($SHOW_NAMESPACE |
-                        awk '/^nlinks_repaired/ { print $2 }')
+                        awk '/^linkea_overflow_cleared/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(8) Fail to clear linkea overflow: $repaired"
+
+       repaired=$($SHOW_NAMESPACE |
+                  awk '/^nlinks_repaired/ { print $2 }')
        [ $repaired -eq 0 ] ||
-               error "(9) Repair nlink count unexpcetedly: $repaired"
+               error "(9) Unexpected nlink repaired: $repaired"
 
-       cancel_lru_locks mdc
+       if [ $MDSCOUNT -ge 2 ]; then
+               $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null ||
+                       error "(10.1) Migrate failure"
+
+               # Migration should succeed after clear the overflow timestamp.
+               local newfid=$($LFS path2fid $DIR/$tdir/guard/f0)
+               [ "$newfid" != "$oldfid" ] ||
+                       error "(10.2) Migrate should succeed"
 
-       count1=$(stat --format=%h $DIR/$tdir/d0/foo)
-       [ $count1 -eq 3 ] || error "(10) Stat failure: $count1"
+               ls -l $DIR/$tdir/foo > /dev/null ||
+                       error "(11) 'ls' failed after migration"
+       fi
 
-       count2=$($LFS fid2path $DIR $foofid | wc -l)
-       [ $count2 -eq 2 ] ||
-               error "(11) Repaired something unexpectedly: $count2"
+       rm -f $DIR/$tdir/guard/f0 || error "(12) Fail to unlink f0"
+       rm -rf $DIR/$tdir/foo || error "(13) Fail to rmdir foo"
 }
-# disable test_29c temporarily, it will be re-enabled in subsequent patch.
-#run_test 29c "Not verify nlink attr if hard links exceed linkEA limitation"
+run_test 29c "verify linkEA size limitation"
 
 test_30() {
        [ $(facet_fstype $SINGLEMDS) != ldiskfs ] &&
index 1654625..c736941 100755 (executable)
@@ -15792,6 +15792,38 @@ test_408() {
 }
 run_test 408 "drop_caches should not hang due to page leaks"
 
+test_409()
+{
+       [ $MDSCOUNT -lt 2 ] &&
+               skip "We need at least 2 MDTs for this test" && return
+
+       check_mount_and_prep
+
+       mkdir -p $DIR/$tdir || error "(0) Fail to mkdir"
+       $LFS mkdir -i 1 -c 2 $DIR/$tdir/foo || error "(1) Fail to mkdir"
+       touch $DIR/$tdir/guard || error "(2) Fail to create"
+
+       local PREFIX=$(str_repeat 'A' 128)
+       echo "Create 1K hard links start at $(date)"
+       createmany -l $DIR/$tdir/guard $DIR/$tdir/foo/${PREFIX}_ 1000 ||
+               error "(3) Fail to hard link"
+
+       echo "Links count should be right although linkEA overflow"
+       stat $DIR/$tdir/guard || error "(4) Fail to stat"
+       local linkcount=$(stat --format=%h $DIR/$tdir/guard)
+       [ $linkcount -eq 1001 ] ||
+               error "(5) Unexpected hard links count: $linkcount"
+
+       echo "List all links start at $(date)"
+       ls -l $DIR/$tdir/foo > /dev/null ||
+               error "(6) Fail to list $DIR/$tdir/foo"
+
+       echo "Unlink hard links start at $(date)"
+       unlinkmany $DIR/$tdir/foo/${PREFIX}_ 1000 ||
+               error "(7) Fail to unlink"
+}
+run_test 409 "Large amount of cross-MDTs hard links on the same file"
+
 #
 # tests that do cleanup/setup should be run at the end
 #
index 8dff709..268e65f 100644 (file)
@@ -2266,8 +2266,6 @@ static void check_lfsck_request(void)
        CHECK_VALUE(LE_PEER_EXIT);
        CHECK_VALUE(LE_CONDITIONAL_DESTROY);
        CHECK_VALUE(LE_PAIRS_VERIFY);
-       CHECK_VALUE(LE_SKIP_NLINK_DECLARE);
-       CHECK_VALUE(LE_SKIP_NLINK);
        CHECK_VALUE(LE_SET_LMV_MASTER);
        CHECK_VALUE(LE_SET_LMV_SLAVE);
 
index f3f798b..e09003f 100644 (file)
@@ -4878,10 +4878,6 @@ void lustre_assert_wire_constants(void)
                 (long long)LE_CONDITIONAL_DESTROY);
        LASSERTF(LE_PAIRS_VERIFY == 11, "found %lld\n",
                 (long long)LE_PAIRS_VERIFY);
-       LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n",
-                (long long)LE_SKIP_NLINK_DECLARE);
-       LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
-                (long long)LE_SKIP_NLINK);
        LASSERTF(LE_SET_LMV_MASTER == 15, "found %lld\n",
                 (long long)LE_SET_LMV_MASTER);
        LASSERTF(LE_SET_LMV_SLAVE == 16, "found %lld\n",