Whamcloud - gitweb
LU-5517 lfsck: repair invalid nlink count 16/11516/29
authorFan Yong <fan.yong@intel.com>
Wed, 27 Aug 2014 15:12:44 +0000 (23:12 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 30 Oct 2014 02:13:32 +0000 (02:13 +0000)
If the namespace LFSCK has verified all the known name entries during
the first-stage scanning, then the MDT-object's linkEA is trustable.
So if the non-directory MDT-object's nlink attribute does not match
the MDT-object linkEA entries count, then the LFSCK will repair the
MDT-object's nlink attribute according to its linkEA entries count.

One exception is that: the linkEA space is limited, if there are too
much hard links on the MDT-object and exceeds the object's linkEA
space limitation, then some name entries cannot be recorded in the
linkEA. Under such case, we will add some flags (LLF_SKIP_NLINK)
in the LFSCK tracing file for related MDT-objects. Then the LFSCK
can skip the nlink attribute verification for the marked MDT-objects
during the second-stage scanning.

This patch also cleanup the LFSCK environment when current LFSCK
scanning exits (completed/stopped/failed) to avoid some stale to
misguide the next LFSCK scanning.

This patch also makes some code adjustment for the former landed
LFSCK patches according to the inspection feedback.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Iedc676e8cc06a52f55e82372e6dc8b30008e20f4
Reviewed-on: http://review.whamcloud.com/11516
Tested-by: Jenkins
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
25 files changed:
lustre/include/lu_target.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_lfsck.h
lustre/include/obd_support.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/mdd/mdd_compat.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/ofd/ofd_io.c
lustre/ofd/ofd_objects.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_xattr.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/ptlrpc/wiretest.c
lustre/target/out_handler.c
lustre/target/tgt_handler.c
lustre/target/tgt_internal.h
lustre/tests/sanity-lfsck.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 929a4d4..d9da1a4 100644 (file)
@@ -284,7 +284,8 @@ int tgt_brw_write(struct tgt_session_info *tsi);
 int tgt_hpreq_handler(struct ptlrpc_request *req);
 void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *,
                                                struct dt_device *,
-                                               struct lfsck_request *));
+                                               struct lfsck_request *,
+                                               struct thandle *));
 void tgt_register_lfsck_query(int (*query)(const struct lu_env *,
                                           struct dt_device *,
                                           struct lfsck_request *));
index b6d0516..0ebe6b1 100644 (file)
@@ -3585,6 +3585,8 @@ enum lfsck_events {
        LE_CONDITIONAL_DESTROY  = 10,
        LE_PAIRS_VERIFY         = 11,
        LE_CREATE_ORPHAN        = 12,
+       LE_SKIP_NLINK_DECLARE   = 13,
+       LE_SKIP_NLINK           = 14,
 };
 
 enum lfsck_event_flags {
index 635aa21..b3a7e2e 100644 (file)
@@ -144,7 +144,7 @@ int lfsck_start(const struct lu_env *env, struct dt_device *key,
 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
               struct lfsck_stop *stop);
 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
-                   struct lfsck_request *lr);
+                   struct lfsck_request *lr, struct thandle *th);
 int lfsck_query(const struct lu_env *env, struct dt_device *key,
                struct lfsck_request *lr);
 
@@ -156,12 +156,13 @@ int lfsck_set_windows(struct dt_device *key, int val);
 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type);
 
 static inline void lfsck_pack_rfa(struct lfsck_request *lr,
-                                 const struct lu_fid *fid)
+                                 const struct lu_fid *fid,
+                                 __u32 event, __u16 com)
 {
        memset(lr, 0, sizeof(*lr));
-       lr->lr_event = LE_FID_ACCESSED;
-       lr->lr_active = LFSCK_TYPE_LAYOUT;
        lr->lr_fid = *fid;
+       lr->lr_event = event;
+       lr->lr_active = com;
 }
 
 #endif /* _LUSTRE_LFSCK_H */
index db06866..8d48365 100644 (file)
@@ -533,6 +533,9 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_MUL_REF         0x1622
 #define OBD_FAIL_LFSCK_BAD_TYPE                0x1623
 #define OBD_FAIL_LFSCK_NO_NAMEENTRY    0x1624
+#define OBD_FAIL_LFSCK_MORE_NLINK      0x1625
+#define OBD_FAIL_LFSCK_LESS_NLINK      0x1626
+#define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627
 
 #define OBD_FAIL_LFSCK_NOTIFY_NET      0x16f0
 #define OBD_FAIL_LFSCK_QUERY_NET       0x16f1
index b32b147..6fa9ca1 100644 (file)
@@ -111,6 +111,7 @@ struct lfsck_bookmark {
 enum lfsck_namespace_trace_flags {
        LNTF_CHECK_LINKEA       = 0x01,
        LNTF_CHECK_PARENT       = 0x02,
+       LNTF_SKIP_NLINK         = 0x04,
        LNTF_ALL                = 0xff
 };
 
@@ -359,7 +360,8 @@ struct lfsck_operations {
 
        int (*lfsck_in_notify)(const struct lu_env *env,
                               struct lfsck_component *com,
-                              struct lfsck_request *lr);
+                              struct lfsck_request *lr,
+                              struct thandle *th);
 
        int (*lfsck_query)(const struct lu_env *env,
                           struct lfsck_component *com);
@@ -1104,11 +1106,15 @@ static inline void lfsck_lad_set_bitmap(const struct lu_env *env,
 
        LASSERT(com->lc_lfsck->li_master);
        LASSERT(bitmap != NULL);
-       LASSERTF(bitmap->size > index, "invalid index: nbits %d, index %u\n",
-                bitmap->size, index);
 
-       cfs_bitmap_set(bitmap, index);
-       lad->lad_incomplete = 1;
+       if (likely(bitmap->size > index)) {
+               cfs_bitmap_set(bitmap, index);
+               lad->lad_incomplete = 1;
+       } else if (com->lc_type == LFSCK_TYPE_NAMESPACE) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               ns->ln_flags |= LF_INCOMPLETE;
+       }
 }
 
 static inline int lfsck_links_read(const struct lu_env *env,
index 18633e0..81066d2 100644 (file)
@@ -811,9 +811,8 @@ static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
  * \param[in] env      pointer to the thread context
  * \param[in] com      pointer to the lfsck component
  *
- * \retval             positive number for data corruption
  * \retval             0 for success
- * \retval             negative error number on failure
+ * \retval             negative error number on failure or data corruption
  */
 static int lfsck_layout_load_bitmap(const struct lu_env *env,
                                    struct lfsck_component *com)
@@ -862,15 +861,8 @@ static int lfsck_layout_load_bitmap(const struct lu_env *env,
 
        size = (lo->ll_bitmap_size + 7) >> 3;
        rc = dt_read(env, obj, lfsck_buf_get(env, bitmap->data, size), &pos);
-       if (rc == 0) {
-               RETURN(-ENOENT);
-       } else if (rc != size) {
-               CDEBUG(D_LFSCK, "%s: lfsck_layout bitmap size %u != %u\n",
-                      lfsck_lfsck2name(com->lc_lfsck),
-                      (unsigned int)size, rc);
-
-               RETURN(rc);
-       }
+       if (rc != size)
+               RETURN(rc >= 0 ? -EINVAL : rc);
 
        if (cfs_bitmap_check_empty(bitmap))
                lad->lad_incomplete = 0;
@@ -1417,8 +1409,9 @@ static int lfsck_layout_double_scan_result(const struct lu_env *env,
                lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
                lo->ll_success_count++;
        } else if (rc == 0) {
-               lo->ll_status = lfsck->li_status;
-               if (lo->ll_status == 0)
+               if (lfsck->li_status != 0)
+                       lo->ll_status = lfsck->li_status;
+               else
                        lo->ll_status = LS_STOPPED;
        } else {
                lo->ll_status = LS_FAILED;
@@ -1718,6 +1711,16 @@ out:
  *                     but does not know the position (the file name) in the
  *                     layout.
  *
+ *  type "D":          The MDT-object is a directory, it may knows its parent
+ *                     but because there is no valid linkEA, the LFSCK cannot
+ *                     know where to put it back to the namespace.
+ *  type "O":          The MDT-object has no linkEA, and there is no name
+ *                     entry that references the MDT-object.
+ *
+ *  type "P":          The orphan object to be created was a parent directory
+ *                     of some MDT-object which linkEA shows that the @orphan
+ *                     object is missing.
+ *
  * The orphan name will be like:
  * ${FID}-${infix}-${type}-${conflict_version}
  *
@@ -3845,6 +3848,9 @@ log:
 
 /* layout APIs */
 
+static void lfsck_layout_slave_quit(const struct lu_env *env,
+                                   struct lfsck_component *com);
+
 static int lfsck_layout_reset(const struct lu_env *env,
                              struct lfsck_component *com, bool init)
 {
@@ -4082,15 +4088,15 @@ static int lfsck_layout_master_prep(const struct lu_env *env,
        ENTRY;
 
        rc = lfsck_layout_load_bitmap(env, com);
-       if (rc > 0) {
+       if (rc != 0) {
                rc = lfsck_layout_reset(env, com, false);
                if (rc == 0)
                        rc = lfsck_set_param(env, com->lc_lfsck,
                                             lsp->lsp_start, true);
-       }
 
-       if (rc != 0)
-               GOTO(log, rc);
+               if (rc != 0)
+                       GOTO(log, rc);
+       }
 
        rc = lfsck_layout_prep(env, com, lsp->lsp_start);
        if (rc != 0)
@@ -4102,7 +4108,7 @@ static int lfsck_layout_master_prep(const struct lu_env *env,
 
 log:
        CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
-              LPU64"\n", lfsck_lfsck2name(com->lc_lfsck),
+              LPU64"]\n", lfsck_lfsck2name(com->lc_lfsck),
               com->lc_pos_start.lp_oit_cookie);
 
        return 0;
@@ -4592,12 +4598,12 @@ static int lfsck_layout_master_post(const struct lu_env *env,
                lo->ll_flags &= ~LF_UPGRADE;
                list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
        } else if (result == 0) {
-               lo->ll_status = lfsck->li_status;
-               if (lo->ll_status == 0)
+               if (lfsck->li_status != 0)
+                       lo->ll_status = lfsck->li_status;
+               else
                        lo->ll_status = LS_STOPPED;
-               if (lo->ll_status != LS_PAUSED) {
+               if (lo->ll_status != LS_PAUSED)
                        list_move_tail(&com->lc_link, &lfsck->li_list_idle);
-               }
        } else {
                lo->ll_status = LS_FAILED;
                list_move_tail(&com->lc_link, &lfsck->li_list_idle);
@@ -4643,10 +4649,7 @@ static int lfsck_layout_slave_post(const struct lu_env *env,
                                lfsck->li_pos_checkpoint.lp_oit_cookie;
 
        if (result > 0) {
-               if (lo->ll_flags & LF_INCOMPLETE)
-                       lo->ll_status = LS_PARTIAL;
-               else
-                       lo->ll_status = LS_SCANNING_PHASE2;
+               lo->ll_status = LS_SCANNING_PHASE2;
                lo->ll_flags |= LF_SCANNED_ONCE;
                if (lo->ll_flags & LF_CRASHED_LASTID) {
                        done = true;
@@ -4659,8 +4662,9 @@ static int lfsck_layout_slave_post(const struct lu_env *env,
                lo->ll_flags &= ~LF_UPGRADE;
                list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
        } else if (result == 0) {
-               lo->ll_status = lfsck->li_status;
-               if (lo->ll_status == 0)
+               if (lfsck->li_status != 0)
+                       lo->ll_status = lfsck->li_status;
+               else
                        lo->ll_status = LS_STOPPED;
                if (lo->ll_status != LS_PAUSED)
                        list_move_tail(&com->lc_link, &lfsck->li_list_idle);
@@ -4687,9 +4691,6 @@ static int lfsck_layout_slave_post(const struct lu_env *env,
 
        lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
 
-       if (result <= 0)
-               lfsck_rbtree_cleanup(env, com);
-
        CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
               lfsck_lfsck2name(lfsck), rc);
 
@@ -4878,9 +4879,39 @@ out:
 static int lfsck_layout_master_double_scan(const struct lu_env *env,
                                           struct lfsck_component *com)
 {
-       struct lfsck_layout *lo = com->lc_file_ram;
+       struct lfsck_layout             *lo     = com->lc_file_ram;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_tgt_descs          *ltds;
+       struct lfsck_tgt_desc           *ltd;
+       struct lfsck_tgt_desc           *next;
+       int                              rc;
+
+       rc = lfsck_double_scan_generic(env, com, lo->ll_status);
+
+       if (thread_is_stopped(&lad->lad_thread)) {
+               LASSERT(list_empty(&lad->lad_req_list));
+               LASSERT(list_empty(&lad->lad_ost_phase1_list));
+               LASSERT(list_empty(&lad->lad_mdt_phase1_list));
+
+               ltds = &lfsck->li_ost_descs;
+               spin_lock(&ltds->ltd_lock);
+               list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
+                                        ltd_layout_phase_list) {
+                       list_del_init(&ltd->ltd_layout_phase_list);
+               }
+               spin_unlock(&ltds->ltd_lock);
+
+               ltds = &lfsck->li_mdt_descs;
+               spin_lock(&ltds->ltd_lock);
+               list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+                                        ltd_layout_phase_list) {
+                       list_del_init(&ltd->ltd_layout_phase_list);
+               }
+               spin_unlock(&ltds->ltd_lock);
+       }
 
-       return lfsck_double_scan_generic(env, com, lo->ll_status);
+       return rc;
 }
 
 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
@@ -4893,15 +4924,12 @@ static int lfsck_layout_slave_double_scan(const struct lu_env *env,
        int                              rc;
        ENTRY;
 
-       if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
-               lfsck_rbtree_cleanup(env, com);
-               lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
-               RETURN(0);
-       }
-
        CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
               lfsck_lfsck2name(lfsck));
 
+       if (lo->ll_flags & LF_INCOMPLETE)
+               GOTO(done, rc = 1);
+
        atomic_inc(&lfsck->li_double_scan_count);
 
        com->lc_new_checked = 0;
@@ -4942,9 +4970,9 @@ static int lfsck_layout_slave_double_scan(const struct lu_env *env,
 
 done:
        rc = lfsck_layout_double_scan_result(env, com, rc);
-
-       lfsck_rbtree_cleanup(env, com);
-       lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
+       lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE,
+                       (rc > 0 && lo->ll_flags & LF_INCOMPLETE) ? 0 : rc);
+       lfsck_layout_slave_quit(env, com);
        if (atomic_dec_and_test(&lfsck->li_double_scan_count))
                wake_up_all(&lfsck->li_thread.t_ctl_waitq);
 
@@ -5003,7 +5031,8 @@ static void lfsck_layout_master_data_release(const struct lu_env *env,
        }
        spin_unlock(&ltds->ltd_lock);
 
-       CFS_FREE_BITMAP(lad->lad_bitmap);
+       if (likely(lad->lad_bitmap != NULL))
+               CFS_FREE_BITMAP(lad->lad_bitmap);
 
        OBD_FREE_PTR(lad);
 }
@@ -5011,41 +5040,89 @@ static void lfsck_layout_master_data_release(const struct lu_env *env,
 static void lfsck_layout_slave_data_release(const struct lu_env *env,
                                            struct lfsck_component *com)
 {
+       struct lfsck_layout_slave_data *llsd = com->lc_data;
+
+       lfsck_layout_slave_quit(env, com);
+       com->lc_data = NULL;
+       OBD_FREE_PTR(llsd);
+}
+
+static void lfsck_layout_master_quit(const struct lu_env *env,
+                                    struct lfsck_component *com)
+{
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_tgt_descs          *ltds;
+       struct lfsck_tgt_desc           *ltd;
+       struct lfsck_tgt_desc           *next;
+
+       LASSERT(lad != NULL);
+
+       lfsck_quit_generic(env, com);
+
+       LASSERT(thread_is_init(&lad->lad_thread) ||
+               thread_is_stopped(&lad->lad_thread));
+       LASSERT(list_empty(&lad->lad_req_list));
+
+       ltds = &lfsck->li_ost_descs;
+       spin_lock(&ltds->ltd_lock);
+       list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list,
+                                ltd_layout_phase_list) {
+               list_del_init(&ltd->ltd_layout_phase_list);
+       }
+       list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
+                                ltd_layout_phase_list) {
+               list_del_init(&ltd->ltd_layout_phase_list);
+       }
+       spin_unlock(&ltds->ltd_lock);
+
+       ltds = &lfsck->li_mdt_descs;
+       spin_lock(&ltds->ltd_lock);
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
+                                ltd_layout_phase_list) {
+               list_del_init(&ltd->ltd_layout_phase_list);
+       }
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+                                ltd_layout_phase_list) {
+               list_del_init(&ltd->ltd_layout_phase_list);
+       }
+       spin_unlock(&ltds->ltd_lock);
+}
+
+static void lfsck_layout_slave_quit(const struct lu_env *env,
+                                   struct lfsck_component *com)
+{
        struct lfsck_layout_slave_data   *llsd  = com->lc_data;
        struct lfsck_layout_seq          *lls;
        struct lfsck_layout_seq          *next;
        struct lfsck_layout_slave_target *llst;
-       struct lfsck_layout_slave_target *tmp;
 
        LASSERT(llsd != NULL);
 
        list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
-                                    lls_list) {
+                                lls_list) {
                list_del_init(&lls->lls_list);
                lfsck_object_put(env, lls->lls_lastid_obj);
                OBD_FREE_PTR(lls);
        }
 
-       list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
-                                llst_list) {
+       spin_lock(&llsd->llsd_lock);
+       while (!list_empty(&llsd->llsd_master_list)) {
+               llst = list_entry(llsd->llsd_master_list.next,
+                                 struct lfsck_layout_slave_target, llst_list);
                list_del_init(&llst->llst_list);
-               OBD_FREE_PTR(llst);
+               spin_unlock(&llsd->llsd_lock);
+               lfsck_layout_llst_put(llst);
        }
+       spin_unlock(&llsd->llsd_lock);
 
        lfsck_rbtree_cleanup(env, com);
-       com->lc_data = NULL;
-       OBD_FREE_PTR(llsd);
-}
-
-static void lfsck_layout_slave_quit(const struct lu_env *env,
-                                   struct lfsck_component *com)
-{
-       lfsck_rbtree_cleanup(env, com);
 }
 
 static int lfsck_layout_master_in_notify(const struct lu_env *env,
                                         struct lfsck_component *com,
-                                        struct lfsck_request *lr)
+                                        struct lfsck_request *lr,
+                                        struct thandle *th)
 {
        struct lfsck_instance           *lfsck = com->lc_lfsck;
        struct lfsck_layout             *lo    = com->lc_file_ram;
@@ -5065,9 +5142,10 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env,
        }
 
        CDEBUG(D_LFSCK, "%s: layout LFSCK master handles notify %u "
-              "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
-              lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
-              lr->lr_index, lr->lr_status);
+              "from %s %x, status %d, flags %x, flags2 %x\n",
+              lfsck_lfsck2name(lfsck), lr->lr_event,
+              (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
+              lr->lr_index, lr->lr_status, lr->lr_flags, lr->lr_flags2);
 
        if (lr->lr_event != LE_PHASE1_DONE &&
            lr->lr_event != LE_PHASE2_DONE &&
@@ -5150,7 +5228,8 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env,
 
 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
                                        struct lfsck_component *com,
-                                       struct lfsck_request *lr)
+                                       struct lfsck_request *lr,
+                                       struct thandle *th)
 {
        struct lfsck_instance            *lfsck = com->lc_lfsck;
        struct lfsck_layout_slave_data   *llsd  = com->lc_data;
@@ -5309,7 +5388,7 @@ static struct lfsck_operations lfsck_layout_master_ops = {
        .lfsck_dump             = lfsck_layout_dump,
        .lfsck_double_scan      = lfsck_layout_master_double_scan,
        .lfsck_data_release     = lfsck_layout_master_data_release,
-       .lfsck_quit             = lfsck_quit_generic,
+       .lfsck_quit             = lfsck_layout_master_quit,
        .lfsck_in_notify        = lfsck_layout_master_in_notify,
        .lfsck_query            = lfsck_layout_query,
 };
index c9ad104..127a87a 100644 (file)
@@ -2875,7 +2875,7 @@ out:
 EXPORT_SYMBOL(lfsck_stop);
 
 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
-                   struct lfsck_request *lr)
+                   struct lfsck_request *lr, struct thandle *th)
 {
        int rc = -EOPNOTSUPP;
        ENTRY;
@@ -2914,6 +2914,8 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
        case LE_PEER_EXIT:
        case LE_CONDITIONAL_DESTROY:
        case LE_CREATE_ORPHAN:
+       case LE_SKIP_NLINK_DECLARE:
+       case LE_SKIP_NLINK:
        case LE_PAIRS_VERIFY: {
                struct lfsck_instance  *lfsck;
                struct lfsck_component *com;
@@ -2924,7 +2926,7 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
 
                com = lfsck_component_find(lfsck, lr->lr_active);
                if (likely(com != NULL)) {
-                       rc = com->lc_ops->lfsck_in_notify(env, com, lr);
+                       rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
                        lfsck_component_put(env, com);
                }
 
index f91bdcd..3f9d3d7 100644 (file)
@@ -207,9 +207,8 @@ static void lfsck_namespace_record_failure(const struct lu_env *env,
  * \param[in] env      pointer to the thread context
  * \param[in] com      pointer to the lfsck component
  *
- * \retval             positive number for data corruption
  * \retval             0 for success
- * \retval             negative error number on failure
+ * \retval             negative error number on failure or data corruption
  */
 static int lfsck_namespace_load_bitmap(const struct lu_env *env,
                                       struct lfsck_component *com)
@@ -259,14 +258,8 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env,
        rc = dt_xattr_get(env, obj,
                          lfsck_buf_get(env, bitmap->data, size),
                          XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA);
-       if (rc == -ERANGE || rc == -ENODATA || rc == 0)
-               RETURN(1);
-
-       if (rc < 0)
-               RETURN(rc);
-
        if (rc != size)
-               RETURN(rc);
+               RETURN(rc >= 0 ? -EINVAL : rc);
 
        if (cfs_bitmap_check_empty(bitmap))
                lad->lad_incomplete = 0;
@@ -1061,7 +1054,7 @@ log:
  * \param[in] type     the orphan's type to be created
  *
  *  type "P":          The orphan object to be created was a parent directory
- *                     of some DMT-object which linkEA shows that the @orphan
+ *                     of some MDT-object which linkEA shows that the @orphan
  *                     object is missing.
  *
  * \see lfsck_layout_recreate_parent() for more types.
@@ -1160,7 +1153,7 @@ out:
  * \param[in] type     the orphan's type to be created
  *
  *  type "P":          The orphan object to be created was a parent directory
- *                     of some DMT-object which linkEA shows that the @orphan
+ *                     of some MDT-object which linkEA shows that the @orphan
  *                     object is missing.
  *
  * \see lfsck_layout_recreate_parent() for more types.
@@ -1367,7 +1360,7 @@ log:
  * \param[in] orphan   pointer to the orphan MDT-object
  *
  *  type "P":          The orphan object to be created was a parent directory
- *                     of some DMT-object which linkEA shows that the @orphan
+ *                     of some MDT-object which linkEA shows that the @orphan
  *                     object is missing.
  *
  * \see lfsck_layout_recreate_parent() for more types.
@@ -2674,6 +2667,130 @@ next:
 }
 
 /**
+ * Repair the object's nlink attribute.
+ *
+ * If all the known name entries have been verified, then the object's hard
+ * link attribute should match the object's linkEA entries count unless the
+ * object's has too much hard link to be recorded in the linkEA. Such cases
+ * should have been marked in the LFSCK tracing file. Otherwise, trust the
+ * linkEA to update the object's nlink attribute.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the dt_object to be handled
+ * \param[in,out] nlink        pointer to buffer to object's hard lock count before
+ *                     and after the repairing
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_repair_nlink(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       struct dt_object *obj, __u32 *nlink)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_attr                  *la     = &info->lti_la3;
+       struct lu_fid                   *tfid   = &info->lti_fid3;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_bottom;
+       const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
+       struct dt_object                *child  = NULL;
+       struct thandle                  *th     = NULL;
+       struct linkea_data               ldata  = { 0 };
+       struct lustre_handle             lh     = { 0 };
+       __u32                            old    = *nlink;
+       int                              rc     = 0;
+       __u8                             flags;
+       ENTRY;
+
+       LASSERT(!dt_object_remote(obj));
+       LASSERT(S_ISREG(lfsck_object_type(obj)));
+
+       child = lfsck_object_find_by_dev(env, dev, cfid);
+       if (IS_ERR(child))
+               GOTO(log, rc = PTR_ERR(child));
+
+       rc = lfsck_ibits_lock(env, lfsck, child, &lh,
+                             MDS_INODELOCK_UPDATE |
+                             MDS_INODELOCK_XATTR, LCK_EX);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       la->la_valid = LA_NLINK;
+       rc = dt_declare_attr_set(env, child, la, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, child, 0);
+       /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
+        * ever tried to verify some remote MDT-object that resides on this
+        * MDT, but this MDT failed to respond such request. So means there
+        * may be some remote name entry on other MDT that references this
+        * object with another name, so we cannot know whether this linkEA
+        * is valid or not. So keep it there and maybe resolved when next
+        * LFSCK run. */
+       if (ns->ln_flags & LF_INCOMPLETE)
+               GOTO(unlock, rc = 0);
+
+       fid_cpu_to_be(tfid, cfid);
+       rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags,
+                      (const struct dt_key *)tfid, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       if (flags & LNTF_SKIP_NLINK)
+               GOTO(unlock, rc = 0);
+
+       rc = lfsck_links_read2(env, child, &ldata);
+       if (rc == -ENODATA)
+               GOTO(unlock, rc = 0);
+
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       if (*nlink == ldata.ld_leh->leh_reccount)
+               GOTO(unlock, rc = 0);
+
+       la->la_nlink = *nlink = ldata.ld_leh->leh_reccount;
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 1);
+
+       rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
+
+       GOTO(unlock, rc = (rc == 0 ? 1 : rc));
+
+unlock:
+       dt_write_unlock(env, child);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       lfsck_ibits_unlock(&lh, LCK_EX);
+       if (child != NULL && !IS_ERR(child))
+               lfsck_object_put(env, child);
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
+              "nlink count from %u to %u: rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(cfid), old, *nlink, rc);
+
+       if (rc != 0)
+               ns->ln_flags |= LF_INCONSISTENT;
+
+       return rc;
+}
+
+/**
  * Double scan the directory object for namespace LFSCK.
  *
  * This function will verify the <parent, child> pairs in the namespace tree:
@@ -3227,8 +3344,12 @@ out:
                return rc;
 
        if (la->la_nlink != count) {
-               /* XXX: there will be other patch(es) for MDT-object
-                *      hard links verification. */
+               rc = lfsck_namespace_repair_nlink(env, com, child,
+                                                 &la->la_nlink);
+               if (rc > 0) {
+                       ns->ln_objs_nlink_repaired++;
+                       rc = 0;
+               }
        }
 
        if (repaired) {
@@ -3425,17 +3546,17 @@ static int lfsck_namespace_prep(const struct lu_env *env,
        int                      rc;
 
        rc = lfsck_namespace_load_bitmap(env, com);
-       if (rc > 0 || (rc == 0 && ns->ln_status == LS_COMPLETED)) {
+       if (rc != 0 || ns->ln_status == LS_COMPLETED) {
                rc = lfsck_namespace_reset(env, com, false);
                if (rc == 0)
                        rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
-       }
 
-       if (rc != 0) {
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: rc = %d\n",
-                      lfsck_lfsck2name(lfsck), rc);
+               if (rc != 0) {
+                       CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
+                              "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
 
-               return rc;
+                       return rc;
+               }
        }
 
        down_write(&com->lc_sem);
@@ -3681,8 +3802,9 @@ static int lfsck_namespace_post(const struct lu_env *env,
                list_del_init(&com->lc_link_dir);
                list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
        } else if (result == 0) {
-               ns->ln_status = lfsck->li_status;
-               if (ns->ln_status == 0)
+               if (lfsck->li_status != 0)
+                       ns->ln_status = lfsck->li_status;
+               else
                        ns->ln_status = LS_STOPPED;
                if (ns->ln_status != LS_PAUSED) {
                        list_del_init(&com->lc_link_dir);
@@ -3881,9 +4003,27 @@ out:
 static int lfsck_namespace_double_scan(const struct lu_env *env,
                                       struct lfsck_component *com)
 {
-       struct lfsck_namespace *ns = com->lc_file_ram;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc           *ltd;
+       struct lfsck_tgt_desc           *next;
+       int                              rc;
+
+       rc = lfsck_double_scan_generic(env, com, ns->ln_status);
+       if (thread_is_stopped(&lad->lad_thread)) {
+               LASSERT(list_empty(&lad->lad_req_list));
+               LASSERT(list_empty(&lad->lad_mdt_phase1_list));
 
-       return lfsck_double_scan_generic(env, com, ns->ln_status);
+               spin_lock(&ltds->ltd_lock);
+               list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+                                        ltd_namespace_phase_list) {
+                       list_del_init(&ltd->ltd_namespace_phase_list);
+               }
+               spin_unlock(&ltds->ltd_lock);
+       }
+
+       return rc;
 }
 
 static void lfsck_namespace_data_release(const struct lu_env *env,
@@ -3916,14 +4056,44 @@ static void lfsck_namespace_data_release(const struct lu_env *env,
        }
        spin_unlock(&ltds->ltd_lock);
 
-       CFS_FREE_BITMAP(lad->lad_bitmap);
+       if (likely(lad->lad_bitmap != NULL))
+               CFS_FREE_BITMAP(lad->lad_bitmap);
 
        OBD_FREE_PTR(lad);
 }
 
+static void lfsck_namespace_quit(const struct lu_env *env,
+                                struct lfsck_component *com)
+{
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc           *ltd;
+       struct lfsck_tgt_desc           *next;
+
+       LASSERT(lad != NULL);
+
+       lfsck_quit_generic(env, com);
+
+       LASSERT(thread_is_init(&lad->lad_thread) ||
+               thread_is_stopped(&lad->lad_thread));
+       LASSERT(list_empty(&lad->lad_req_list));
+
+       spin_lock(&ltds->ltd_lock);
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
+                                ltd_namespace_phase_list) {
+               list_del_init(&ltd->ltd_namespace_phase_list);
+       }
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+                                ltd_namespace_phase_list) {
+               list_del_init(&ltd->ltd_namespace_phase_list);
+       }
+       spin_unlock(&ltds->ltd_lock);
+}
+
 static int lfsck_namespace_in_notify(const struct lu_env *env,
                                     struct lfsck_component *com,
-                                    struct lfsck_request *lr)
+                                    struct lfsck_request *lr,
+                                    struct thandle *th)
 {
        struct lfsck_instance           *lfsck = com->lc_lfsck;
        struct lfsck_namespace          *ns    = com->lc_file_ram;
@@ -3966,6 +4136,70 @@ out_create:
 
                return rc;
        }
+       case LE_SKIP_NLINK_DECLARE: {
+               struct dt_object        *obj   = com->lc_obj;
+               struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
+               __u8                     flags = 0;
+
+               LASSERT(th != NULL);
+
+               rc = dt_declare_delete(env, obj,
+                                      (const struct dt_key *)key, th);
+               if (rc == 0)
+                       rc = dt_declare_insert(env, obj,
+                                              (const struct dt_rec *)&flags,
+                                              (const struct dt_key *)key, th);
+
+               RETURN(rc);
+       }
+       case LE_SKIP_NLINK: {
+               struct dt_object        *obj   = com->lc_obj;
+               struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
+               __u8                     flags = 0;
+               bool                     exist = false;
+               ENTRY;
+
+               LASSERT(th != NULL);
+
+               fid_cpu_to_be(key, &lr->lr_fid);
+               rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
+                              (const struct dt_key *)key, BYPASS_CAPA);
+               if (rc == 0) {
+                       if (flags & LNTF_SKIP_NLINK)
+                               RETURN(0);
+
+                       exist = true;
+               } else if (rc != -ENOENT) {
+                       GOTO(log, rc);
+               }
+
+               flags |= LNTF_SKIP_NLINK;
+               if (exist) {
+                       rc = dt_delete(env, obj, (const struct dt_key *)key,
+                                      th, BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(log, rc);
+               }
+
+               rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
+                              (const struct dt_key *)key, th, BYPASS_CAPA, 1);
+
+               GOTO(log, rc);
+
+log:
+               CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
+                      " to be skipped for namespace double scan: rc = %d\n",
+                      lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
+
+               if (rc != 0)
+                       /* If we cannot record this object in the LFSCK tracing,
+                        * we have to mark the LFSC as LF_INCOMPLETE, then the
+                        * LFSCK will skip nlink attribute verification for
+                        * all objects. */
+                       ns->ln_flags |= LF_INCOMPLETE;
+
+               return 0;
+       }
        case LE_PHASE1_DONE:
        case LE_PHASE2_DONE:
        case LE_PEER_EXIT:
@@ -4063,7 +4297,7 @@ static struct lfsck_operations lfsck_namespace_ops = {
        .lfsck_dump             = lfsck_namespace_dump,
        .lfsck_double_scan      = lfsck_namespace_double_scan,
        .lfsck_data_release     = lfsck_namespace_data_release,
-       .lfsck_quit             = lfsck_quit_generic,
+       .lfsck_quit             = lfsck_namespace_quit,
        .lfsck_in_notify        = lfsck_namespace_in_notify,
        .lfsck_query            = lfsck_namespace_query,
 };
@@ -4514,6 +4748,33 @@ nodata:
                        GOTO(stop, rc);
 
                rc = lfsck_links_write(env, obj, &ldata, handle);
+               if (unlikely(rc == -ENOSPC) &&
+                   S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) {
+                       if (handle != NULL) {
+                               LASSERT(dt_write_locked(env, obj));
+
+                               dt_write_unlock(env, obj);
+                               dtlocked = false;
+
+                               dt_trans_stop(env, dev, handle);
+                               handle = NULL;
+
+                               lfsck_ibits_unlock(&lh, LCK_EX);
+                       }
+
+                       rc = lfsck_namespace_trace_update(env, com,
+                                       &lnr->lnr_fid, LNTF_SKIP_NLINK, true);
+                       if (rc != 0)
+                               /* If we cannot record this object in the
+                                * LFSCK tracing, we have to mark the LFSCK
+                                * as LF_INCOMPLETE, then the LFSCK will
+                                * skip nlink attribute verification for
+                                * all objects. */
+                               ns->ln_flags |= LF_INCOMPLETE;
+
+                       GOTO(out, rc = 0);
+               }
+
                if (rc != 0)
                        GOTO(stop, rc);
 
@@ -4832,8 +5093,9 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env,
                ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
                ns->ln_success_count++;
        } else if (rc == 0) {
-               ns->ln_status = lfsck->li_status;
-               if (ns->ln_status == 0)
+               if (lfsck->li_status != 0)
+                       ns->ln_status = lfsck->li_status;
+               else
                        ns->ln_status = LS_STOPPED;
        } else {
                ns->ln_status = LS_FAILED;
@@ -4937,7 +5199,7 @@ out:
        if (rc != 0)
                CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
                       "to sync failure with MDTs, and related MDTs "
-                      "may handle orphan un-properly: rc = %d\n",
+                      "may handle orphan improperly: rc = %d\n",
                       lfsck_lfsck2name(lfsck), rc);
 
        EXIT;
index 7b50624..023ffa5 100644 (file)
@@ -110,7 +110,7 @@ static int mdd_convert_linkea(const struct lu_env *env,
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       rc = mdd_declare_links_add(env, o, th, NULL);
+       rc = mdd_declare_links_add(env, o, th, NULL, MLAO_IGNORE);
        if (rc)
                GOTO(out, rc);
        rc = dt_trans_start_local(env, mdd->mdd_child, th);
index bccc16d..8899fc5 100644 (file)
@@ -1113,16 +1113,35 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
 {
        const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
                                                     ldata->ld_leh->leh_len);
+       int                 rc;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
                return 0;
 
-       return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
-                            mdd_object_capa(env, mdd_obj));
+       rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
+                          mdd_object_capa(env, mdd_obj));
+       if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
+           mdd_object_remote(mdd_obj) == 0) {
+               struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
+
+               /* XXX: If the linkEA is overflow, then we need to notify the
+                *      namespace LFSCK to skip "nlink" attribute verification
+                *      on this object to avoid the "nlink" to be shrinked by
+                *      wrong. It may be not good an interaction with LFSCK
+                *      like this. We will consider to replace it with other
+                *      mechanism in future. LU-5802. */
+               lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
+                              LFSCK_TYPE_NAMESPACE);
+               lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
+                               lr, handle);
+       }
+
+       return rc;
 }
 
 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
-                         struct thandle *handle, struct linkea_data *ldata)
+                         struct thandle *handle, struct linkea_data *ldata,
+                         enum mdd_links_add_overflow overflow)
 {
        int     rc;
        int     ea_len;
@@ -1140,6 +1159,25 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
        rc = mdo_declare_xattr_set(env, mdd_obj,
                                   mdd_buf_get_const(env, linkea, ea_len),
                                   XATTR_NAME_LINK, 0, handle);
+       if (rc != 0)
+               return rc;
+
+       if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
+               struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
+
+               /* XXX: If the linkEA is overflow, then we need to notify the
+                *      namespace LFSCK to skip "nlink" attribute verification
+                *      on this object to avoid the "nlink" to be shrinked by
+                *      wrong. It may be not good an interaction with LFSCK
+                *      like this. We will consider to replace it with other
+                *      mechanism in future. LU-5802. */
+               lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
+                              LFSCK_TYPE_NAMESPACE);
+               rc = lfsck_in_notify(env,
+                                    mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
+                                    lr, handle);
+       }
+
        return rc;
 }
 
@@ -1152,7 +1190,7 @@ static inline int mdd_declare_links_del(const struct lu_env *env,
        /* For directory, the linkEA will be removed together
         * with the object. */
        if (!S_ISDIR(mdd_object_type(c)))
-               rc = mdd_declare_links_add(env, c, handle, NULL);
+               rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
 
        return rc;
 }
@@ -1174,9 +1212,15 @@ static int mdd_declare_link(const struct lu_env *env,
                return rc;
 
        rc = mdo_declare_ref_add(env, c, handle);
-       if (rc)
+       if (rc != 0)
                return rc;
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
+               rc = mdo_declare_ref_add(env, c, handle);
+               if (rc != 0)
+                       return rc;
+       }
+
        la->la_valid = LA_CTIME | LA_MTIME;
        rc = mdo_declare_attr_set(env, p, la, handle);
        if (rc != 0)
@@ -1184,11 +1228,12 @@ static int mdd_declare_link(const struct lu_env *env,
 
        la->la_valid = LA_CTIME;
        rc = mdo_declare_attr_set(env, c, la, handle);
-       if (rc)
+       if (rc != 0)
                return rc;
 
-       rc = mdd_declare_links_add(env, c, handle, data);
-       if (rc)
+       rc = mdd_declare_links_add(env, c, handle, data,
+                       S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
+       if (rc != 0)
                return rc;
 
        rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
@@ -1244,10 +1289,17 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
        if (rc)
                GOTO(out_unlock, rc);
 
-       rc = mdo_ref_add(env, mdd_sobj, handle);
-       if (rc)
-               GOTO(out_unlock, rc);
+       if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LESS_NLINK)) {
+               rc = mdo_ref_add(env, mdd_sobj, handle);
+               if (rc != 0)
+                       GOTO(out_unlock, rc);
+       }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
+               rc = mdo_ref_add(env, mdd_sobj, handle);
+               if (rc != 0)
+                       GOTO(out_unlock, rc);
+       }
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) {
                struct lu_fid tfid = *mdo2fid(mdd_sobj);
@@ -2027,7 +2079,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                if (rc != 0)
                        return rc;
 
-               rc = mdd_declare_links_add(env, c, handle, ldata);
+               rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
                if (rc)
                        return rc;
 
@@ -2564,7 +2616,8 @@ static int mdd_declare_rename(const struct lu_env *env,
        if (rc)
                return rc;
 
-       rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
+       rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
+               S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
        if (rc)
                return rc;
 
@@ -2981,7 +3034,8 @@ static int mdd_linkea_update_child_internal(const struct lu_env *env,
                linkea_entry_pack(ldata.ld_lee, &lname,
                                  mdd_object_fid(parent));
                if (declare)
-                       rc = mdd_declare_links_add(env, child, handle, &ldata);
+                       rc = mdd_declare_links_add(env, child, handle, &ldata,
+                                                  MLAO_IGNORE);
                else
                        rc = mdd_links_write(env, child, &ldata, handle);
                break;
@@ -3031,7 +3085,8 @@ static int mdd_update_linkea_internal(const struct lu_env *env,
        }
 
        if (declare)
-               rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
+               rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
+                                          MLAO_IGNORE);
        else
                rc = mdd_links_write(env, mdd_tobj, ldata, handle);
 
index 316651f..eff2098 100644 (file)
@@ -161,6 +161,12 @@ struct mdd_thread_info {
        struct linkea_data        mti_link_data;
        struct md_op_spec         mti_spec;
        struct dt_insert_rec      mti_dt_rec;
+       struct lfsck_request      mti_lr;
+};
+
+enum mdd_links_add_overflow {
+       MLAO_IGNORE     = false,
+       MLAO_CHECK      = true,
 };
 
 extern const char orph_index_name[];
@@ -222,7 +228,8 @@ int mdd_lookup(const struct lu_env *env,
 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
                   struct linkea_data *ldata);
 int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
-                         struct thandle *handle, struct linkea_data *ldata);
+                         struct thandle *handle, struct linkea_data *ldata,
+                         enum mdd_links_add_overflow overflow);
 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
                    struct linkea_data *ldata, struct thandle *handle);
 struct lu_buf *mdd_links_get(const struct lu_env *env,
index eede5f2..714645b 100644 (file)
@@ -77,7 +77,7 @@ static void ofd_inconsistency_verify_one(const struct lu_env *env,
        lr->lr_fid2 = oii->oii_pfid; /* client given PFID. */
        lr->lr_fid3 = *pfid; /* OST local stored PFID. */
 
-       rc = lfsck_in_notify(env, ofd->ofd_osd, lr);
+       rc = lfsck_in_notify(env, ofd->ofd_osd, lr, NULL);
        ofd_write_lock(env, fo);
        switch (lr->lr_status) {
        case LPVS_INIT:
index 245b5d8..1b3b4f0 100644 (file)
@@ -350,10 +350,12 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
 
                /* Only the new created objects need to be recorded. */
                if (ofd->ofd_osd->dd_record_fid_accessed) {
-                       lfsck_pack_rfa(&ofd_info(env)->fti_lr,
-                                      lu_object_fid(&fo->ofo_obj.do_lu));
-                       lfsck_in_notify(env, ofd->ofd_osd,
-                                       &ofd_info(env)->fti_lr);
+                       struct lfsck_request *lr = &ofd_info(env)->fti_lr;
+
+                       lfsck_pack_rfa(lr, lu_object_fid(&fo->ofo_obj.do_lu),
+                                      LE_FID_ACCESSED,
+                                      LFSCK_TYPE_LAYOUT);
+                       lfsck_in_notify(env, ofd->ofd_osd, lr, NULL);
                }
 
                if (likely(!ofd_object_exists(fo) &&
index cc24aa2..23ce9fb 100644 (file)
@@ -3055,6 +3055,10 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
+           strcmp(name, XATTR_NAME_LINK) == 0)
+               return -ENOSPC;
+
        return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
                               fs_flags);
 }
index a24aa34..0d783aa 100644 (file)
@@ -598,6 +598,10 @@ int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
             strcmp(name, POSIX_ACL_XATTR_DEFAULT) == 0))
                RETURN(-EOPNOTSUPP);
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
+           strcmp(name, XATTR_NAME_LINK) == 0)
+               RETURN(-ENOSPC);
+
        oh = container_of0(handle, struct osd_thandle, ot_super);
 
        down(&obj->oo_guard);
index 7c0c497..9441a5f 100644 (file)
@@ -563,11 +563,8 @@ int osp_md_declare_object_create(const struct lu_env *env,
 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
                         struct lu_attr *attr, struct dt_allocation_hint *hint,
                         struct dt_object_format *dof, struct thandle *th);
-int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
-                           const struct lu_attr *attr, struct thandle *th);
-int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
-                   const struct lu_attr *attr, struct thandle *th,
-                   struct lustre_capa *capa);
+int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
+                     const struct lu_attr *attr, struct thandle *th);
 extern const struct dt_index_operations osp_md_index_ops;
 
 /* osp_precreate.c */
index c10a9dc..a66dd65 100644 (file)
@@ -326,10 +326,7 @@ static void osp_md_ah_init(const struct lu_env *env,
 }
 
 /**
- * Implementation of dt_object_operations::do_declare_attr_get
- *
- * Declare setting attributes of the remote object, i.e. insert remote
- * object attr_set update into RPC.
+ * Add attr_set sub-request into the OUT RPC.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object on which to set attributes
@@ -339,8 +336,8 @@ static void osp_md_ah_init(const struct lu_env *env,
  * \retval             0 if the insertion succeeds.
  * \retval             negative errno if the insertion fails.
  */
-int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
-                           const struct lu_attr *attr, struct thandle *th)
+int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
+                     const struct lu_attr *attr, struct thandle *th)
 {
        struct dt_update_request        *update;
        int                             rc;
@@ -361,11 +358,46 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
 }
 
 /**
+ * Implementation of dt_object_operations::do_declare_attr_get
+ *
+ * Declare setting attributes to the specified remote object.
+ *
+ * If the transaction is a remote transaction, then add the modification
+ * sub-request into the OUT RPC here, and such OUT RPC will be triggered
+ * when transaction start.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object on which to set attributes
+ * \param[in] attr     attributes to be set
+ * \param[in] th       the transaction handle
+ *
+ * \retval             0 if the insertion succeeds.
+ * \retval             negative errno if the insertion fails.
+ */
+int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+                           const struct lu_attr *attr, struct thandle *th)
+{
+       int rc = 0;
+
+       CDEBUG(D_INFO, "declare attr set object "DFID"\n",
+              PFID(&dt->do_lu.lo_header->loh_fid));
+
+       if (!is_only_remote_trans(th))
+               rc = __osp_md_attr_set(env, dt, attr, th);
+
+       return rc;
+}
+
+/**
  * Implementation of dt_object_operations::do_attr_set
  *
- * Do nothing in this method for now. In DNE phase I, remote updates
- * are actually executed during transaction start, i.e. object attributes
- * have already been set when calling this method.
+ * Set attributes to the specified remote object.
+ *
+ * If the transaction is a remote transaction, then related modification
+ * sub-request has been added in the declare phase and related OUT RPC
+ * has been triggered at transaction start. Otherwise, the modification
+ * sub-request will be added here, and related OUT RPC will be triggered
+ * when transaction stop.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object to set attributes
@@ -379,10 +411,15 @@ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
                    const struct lu_attr *attr, struct thandle *th,
                    struct lustre_capa *capa)
 {
+       int rc = 0;
+
        CDEBUG(D_INFO, "attr set object "DFID"\n",
               PFID(&dt->do_lu.lo_header->loh_fid));
 
-       RETURN(0);
+       if (is_only_remote_trans(th))
+               rc = __osp_md_attr_set(env, dt, attr, th);
+
+       RETURN(rc);
 }
 
 /**
index 2835bbb..ed26ee1 100644 (file)
@@ -623,22 +623,23 @@ static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
-       if (o->opo_new)
-               /* no need in logging for new objects being created */
-               RETURN(0);
-
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
 
-       if (!is_only_remote_trans(th))
+       if (!is_only_remote_trans(th)) {
+               if (o->opo_new)
+                       /* no need in logging for new objects being created */
+                       RETURN(0);
+
                /*
                 * track all UID/GID changes via llog
                 */
                rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
-       else
+       } else {
                /* It is for OST-object attr_set directly without updating
                 * local MDT-object attribute. It is usually used by LFSCK. */
-               rc = osp_md_declare_attr_set(env, dt, attr, th);
+               rc = __osp_md_attr_set(env, dt, attr, th);
+       }
 
        if (rc != 0 || o->opo_ooa == NULL)
                RETURN(rc);
@@ -744,8 +745,10 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
 
        if (is_only_remote_trans(th)) {
                rc = __osp_attr_set(env, dt, attr, th);
-               if (rc != 0)
-                       RETURN(rc);
+               if (rc == 0 && o->opo_new)
+                       o->opo_new = 0;
+
+               RETURN(rc);
        }
 
        /* we're interested in uid/gid changes only */
@@ -761,17 +764,8 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
                RETURN(0);
        }
 
-       if (!is_only_remote_trans(th))
-               /*
-                * once transaction is committed put proper command on
-                * the queue going to our OST
-                */
-               rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
-               /* XXX: send new uid/gid to OST ASAP? */
-       else
-               /* It is for OST-object attr_set directly without updating
-                * local MDT-object attribute. It is usually used by LFSCK. */
-               rc = osp_md_attr_set(env, dt, attr, th, capa);
+       rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
+       /* XXX: send new uid/gid to OST ASAP? */
 
        RETURN(rc);
 }
index e769304..a0caec0 100644 (file)
@@ -4715,6 +4715,10 @@ void lustre_assert_wire_constants(void)
                 (long long)LE_PAIRS_VERIFY);
        LASSERTF(LE_CREATE_ORPHAN == 12, "found %lld\n",
                 (long long)LE_CREATE_ORPHAN);
+       LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n",
+                (long long)LE_SKIP_NLINK_DECLARE);
+       LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
+                (long long)LE_SKIP_NLINK);
        LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)LEF_TO_OST);
        LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",
index 25aac41..a0e4879 100644 (file)
@@ -565,12 +565,30 @@ static int out_tx_xattr_set_exec(const struct lu_env *env,
        rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
                          arg->u.xattr_set.name, arg->u.xattr_set.flags,
                          th, NULL);
-       dt_write_unlock(env, dt_obj);
        /**
         * Ignore errors if this is LINK EA
         **/
-       if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
+       if (unlikely(rc != 0 &&
+                    strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) {
+               /* XXX: If the linkEA is overflow, then we need to notify the
+                *      namespace LFSCK to skip "nlink" attribute verification
+                *      on this object to avoid the "nlink" to be shrinked by
+                *      wrong. It may be not good an interaction with LFSCK
+                *      like this. We will consider to replace it with other
+                *      mechanism in future. LU-5802. */
+               if (rc == -ENOSPC) {
+                       struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
+
+                       lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
+                                      LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE);
+                       tgt_lfsck_in_notify(env,
+                               tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th);
+               }
+
                rc = 0;
+       }
+       dt_write_unlock(env, dt_obj);
+
 out:
        CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
               dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
@@ -596,6 +614,24 @@ static int __out_tx_xattr_set(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
+       if (strcmp(name, XATTR_NAME_LINK) == 0) {
+               struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
+
+               /* XXX: If the linkEA is overflow, then we need to notify the
+                *      namespace LFSCK to skip "nlink" attribute verification
+                *      on this object to avoid the "nlink" to be shrinked by
+                *      wrong. It may be not good an interaction with LFSCK
+                *      like this. We will consider to replace it with other
+                *      mechanism in future. LU-5802. */
+               lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
+                              LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE);
+               rc = tgt_lfsck_in_notify(env,
+                                        tgt_ses_info(env)->tsi_tgt->lut_bottom,
+                                        lr, ta->ta_handle);
+               if (rc != 0)
+                       return rc;
+       }
+
        arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
        if (IS_ERR(arg))
                return PTR_ERR(arg);
@@ -1566,8 +1602,10 @@ int out_handle(struct tgt_session_info *tsi)
 
                if (dt->dd_record_fid_accessed) {
                        lfsck_pack_rfa(&tti->tti_lr,
-                                      lu_object_fid(&dt_obj->do_lu));
-                       tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
+                                      lu_object_fid(&dt_obj->do_lu),
+                                      LE_FID_ACCESSED,
+                                      LFSCK_TYPE_LAYOUT);
+                       tgt_lfsck_in_notify(env, dt, &tti->tti_lr, NULL);
                }
 
                tti->tti_u.update.tti_dt_object = dt_obj;
index 3c42442..57ec2e5 100644 (file)
@@ -1373,11 +1373,13 @@ EXPORT_SYMBOL(tgt_sec_ctx_handlers);
 
 int (*tgt_lfsck_in_notify)(const struct lu_env *env,
                           struct dt_device *key,
-                          struct lfsck_request *lr) = NULL;
+                          struct lfsck_request *lr,
+                          struct thandle *th) = NULL;
 
 void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *,
                                                struct dt_device *,
-                                               struct lfsck_request *))
+                                               struct lfsck_request *,
+                                               struct thandle *))
 {
        tgt_lfsck_in_notify = notify;
 }
@@ -1408,7 +1410,7 @@ static int tgt_handle_lfsck_notify(struct tgt_session_info *tsi)
        if (lr == NULL)
                RETURN(-EPROTO);
 
-       rc = tgt_lfsck_in_notify(env, key, lr);
+       rc = tgt_lfsck_in_notify(env, key, lr, NULL);
 
        RETURN(rc);
 }
index ebe3a1a..7c96074 100644 (file)
@@ -45,7 +45,8 @@
 
 extern int (*tgt_lfsck_in_notify)(const struct lu_env *env,
                                  struct dt_device *key,
-                                 struct lfsck_request *lr);
+                                 struct lfsck_request *lr,
+                                 struct thandle *th);
 
 struct tx_arg;
 typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
index 38c7f3a..1d330ea 100644 (file)
@@ -46,7 +46,7 @@ setupall
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18 19 20 21"
 
 [[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.6.50) ]] &&
-       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24 25 26 27 28"
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24 25 26 27 28 29"
 
 build_test_filter
 
@@ -1663,7 +1663,7 @@ test_18a() {
 
        check_mount_and_prep
        $LFS mkdir -i 0 $DIR/$tdir/a1
-       $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+       $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
        dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2
 
        local saved_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }')
@@ -1673,7 +1673,7 @@ test_18a() {
 
        if [ $MDSCOUNT -ge 2 ]; then
                $LFS mkdir -i 1 $DIR/$tdir/a2
-               $LFS setstripe -c 2 -i 1 -s 1M $DIR/$tdir/a2
+               $LFS setstripe -c 2 -i 1 -S 1M $DIR/$tdir/a2
                dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
                $LFS path2fid $DIR/$tdir/a2/f2
                $LFS getstripe $DIR/$tdir/a2/f2
@@ -1778,7 +1778,7 @@ test_18b() {
 
        check_mount_and_prep
        $LFS mkdir -i 0 $DIR/$tdir/a1
-       $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+       $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
        dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2
        local saved_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }')
        local fid1=$($LFS path2fid $DIR/$tdir/a1/f1)
@@ -1787,7 +1787,7 @@ test_18b() {
 
        if [ $MDSCOUNT -ge 2 ]; then
                $LFS mkdir -i 1 $DIR/$tdir/a2
-               $LFS setstripe -c 2 -i 1 -s 1M $DIR/$tdir/a2
+               $LFS setstripe -c 2 -i 1 -S 1M $DIR/$tdir/a2
                dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
                fid2=$($LFS path2fid $DIR/$tdir/a2/f2)
                echo ${fid2}
@@ -1891,7 +1891,7 @@ test_18c() {
 
        check_mount_and_prep
        $LFS mkdir -i 0 $DIR/$tdir/a1
-       $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+       $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
 
        echo "Inject failure, to simulate the case of missing parent FID"
        #define OBD_FAIL_LFSCK_NOPFID           0x1617
@@ -1902,7 +1902,7 @@ test_18c() {
 
        if [ $MDSCOUNT -ge 2 ]; then
                $LFS mkdir -i 1 $DIR/$tdir/a2
-               $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a2
+               $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a2
                dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
                $LFS getstripe $DIR/$tdir/a2/f2
        fi
@@ -2001,7 +2001,7 @@ test_18d() {
 
        check_mount_and_prep
        mkdir $DIR/$tdir/a1
-       $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+       $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
        echo "guard" > $DIR/$tdir/a1/f1
        echo "foo" > $DIR/$tdir/a1/f2
        local saved_size=$(ls -il $DIR/$tdir/a1/f2 | awk '{ print $6 }')
@@ -2095,7 +2095,7 @@ test_18e() {
 
        check_mount_and_prep
        mkdir $DIR/$tdir/a1
-       $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+       $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
        echo "guard" > $DIR/$tdir/a1/f1
        echo "foo" > $DIR/$tdir/a1/f2
        local saved_size=$(ls -il $DIR/$tdir/a1/f2 | awk '{ print $6 }')
@@ -2211,22 +2211,22 @@ test_18f() {
 
        check_mount_and_prep
        $LFS mkdir -i 0 $DIR/$tdir/a1
-       $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+       $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a1
        dd if=/dev/zero of=$DIR/$tdir/a1/guard bs=1M count=2
        dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2
        $LFS mkdir -i 0 $DIR/$tdir/a2
-       $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a2
+       $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a2
        dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
        $LFS getstripe $DIR/$tdir/a1/f1
        $LFS getstripe $DIR/$tdir/a2/f2
 
        if [ $MDSCOUNT -ge 2 ]; then
                $LFS mkdir -i 1 $DIR/$tdir/a3
-               $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a3
+               $LFS setstripe -c 1 -i 0 -S 1M $DIR/$tdir/a3
                dd if=/dev/zero of=$DIR/$tdir/a3/guard bs=1M count=2
                dd if=/dev/zero of=$DIR/$tdir/a3/f3 bs=1M count=2
                $LFS mkdir -i 1 $DIR/$tdir/a4
-               $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a4
+               $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a4
                dd if=/dev/zero of=$DIR/$tdir/a4/f4 bs=1M count=2
                $LFS getstripe $DIR/$tdir/a3/f3
                $LFS getstripe $DIR/$tdir/a4/f4
@@ -2416,10 +2416,10 @@ test_20() {
        check_mount_and_prep
        $LFS mkdir -i 0 $DIR/$tdir/a1
        if [ $OSTCOUNT -gt 2 ]; then
-               $LFS setstripe -c 3 -i 0 -s 1M $DIR/$tdir/a1
+               $LFS setstripe -c 3 -i 0 -S 1M $DIR/$tdir/a1
                bcount=513
        else
-               $LFS setstripe -c 2 -i 0 -s 1M $DIR/$tdir/a1
+               $LFS setstripe -c 2 -i 0 -S 1M $DIR/$tdir/a1
                bcount=257
        fi
 
@@ -3386,7 +3386,7 @@ test_28() {
        echo "The target name entry is lost. The LFSCK should insert the"
        echo "orphan MDT-object under .lustre/lost+found/MDTxxxx. But if"
        echo "the MDT (on which the orphan MDT-object resides) has ever"
-       echo "failed to respond some name entry verification durin the"
+       echo "failed to respond some name entry verification during the"
        echo "first stage-scanning, then the LFSCK should skip to handle"
        echo "orphan MDT-object on this MDT. But other MDTs should not"
        echo "be affected."
@@ -3481,6 +3481,161 @@ test_28() {
 }
 run_test 28 "Skip the failed MDT(s) when handle orphan MDT-objects"
 
+test_29a() {
+       echo "#####"
+       echo "The object's nlink attribute is larger than the object's known"
+       echo "name entries count. The LFSCK will repair the object's nlink"
+       echo "attribute to match the known name entries count"
+       echo "#####"
+
+       check_mount_and_prep
+
+       $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0"
+       touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo"
+
+       echo "Inject failure stub on MDT0 to simulate the case that foo's"
+       echo "nlink attribute is larger than its name entries count."
+
+       #define OBD_FAIL_LFSCK_MORE_NLINK       0x1625
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1625
+       ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 ||
+               error "(3) Fail to hard link to $DIR/$tdir/d0/foo"
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+
+       cancel_lru_locks mdc
+       local count=$(stat --format=%h $DIR/$tdir/d0/foo)
+       [ $count -eq 3 ] || error "(4) Cannot inject error: $count"
+
+       echo "Trigger namespace LFSCK to repair the nlink count"
+       $START_NAMESPACE -r -A ||
+               error "(5) Fail to start LFSCK for namespace"
+
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(6) unexpected status"
+       }
+
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^nlinks_repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(7) Fail to repair nlink count: $repaired"
+
+       cancel_lru_locks mdc
+       count=$(stat --format=%h $DIR/$tdir/d0/foo)
+       [ $count -eq 2 ] || error "(8) Fail to repair nlink count: $count"
+}
+run_test 29a "LFSCK can repair bad nlink count (1)"
+
+test_29b() {
+       echo "#####"
+       echo "The object's nlink attribute is smaller than the object's known"
+       echo "name entries count. The LFSCK will repair the object's nlink"
+       echo "attribute to match the known name entries count"
+       echo "#####"
+
+       check_mount_and_prep
+
+       $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0"
+       touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo"
+
+       echo "Inject failure stub on MDT0 to simulate the case that foo's"
+       echo "nlink attribute is smaller than its name entries count."
+
+       #define OBD_FAIL_LFSCK_LESS_NLINK       0x1626
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1626
+       ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 ||
+               error "(3) Fail to hard link to $DIR/$tdir/d0/foo"
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+
+       cancel_lru_locks mdc
+       local count=$(stat --format=%h $DIR/$tdir/d0/foo)
+       [ $count -eq 1 ] || error "(4) Cannot inject error: $count"
+
+       echo "Trigger namespace LFSCK to repair the nlink count"
+       $START_NAMESPACE -r -A ||
+               error "(5) Fail to start LFSCK for namespace"
+
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(6) unexpected status"
+       }
+
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^nlinks_repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(7) Fail to repair nlink count: $repaired"
+
+       cancel_lru_locks mdc
+       count=$(stat --format=%h $DIR/$tdir/d0/foo)
+       [ $count -eq 2 ] || error "(8) Fail to repair nlink count: $count"
+}
+run_test 29b "LFSCK can repair bad nlink count (2)"
+
+test_29c() {
+       echo "#####"
+       echo "There are too much hard links to the object, and exceeds the
+       echo object's linkEA limitation, as to NOT all the known name entries"
+       echo "will be recorded in the linkEA. Under such case, LFSCK should"
+       echo "skip the nlink verification for this object."
+       echo "#####"
+
+       check_mount_and_prep
+
+       $LFS mkdir -i 0 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0"
+       touch $DIR/$tdir/d0/foo || error "(2) Fail to create foo"
+       ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h1 ||
+               error "(3) Fail to hard link to $DIR/$tdir/d0/foo"
+
+       echo "Inject failure stub on MDT0 to simulate the case that"
+       echo "foo's hard links exceed the object's linkEA limitation."
+
+       #define OBD_FAIL_LFSCK_LINKEA_OVERFLOW  0x1627
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1627
+       ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h2 ||
+               error "(4) Fail to hard link to $DIR/$tdir/d0/foo"
+
+       cancel_lru_locks mdc
+
+       local count1=$(stat --format=%h $DIR/$tdir/d0/foo)
+       [ $count1 -eq 3 ] || error "(5) Stat failure: $count1"
+
+       local foofid=$($LFS path2fid $DIR/$tdir/d0/foo)
+       $LFS fid2path $DIR $foofid
+       local count2=$($LFS fid2path $DIR $foofid | wc -l)
+       [ $count2 -eq 2 ] || "(6) Fail to inject error: $count2"
+
+       echo "Trigger namespace LFSCK to repair the nlink count"
+       $START_NAMESPACE -r -A ||
+               error "(7) Fail to start LFSCK for namespace"
+
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(8) unexpected status"
+       }
+
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^nlinks_repaired/ { print $2 }')
+       [ $repaired -eq 0 ] ||
+               error "(9) Repair nlink count unexpcetedly: $repaired"
+
+       cancel_lru_locks mdc
+
+       count1=$(stat --format=%h $DIR/$tdir/d0/foo)
+       [ $count1 -eq 3 ] || error "(10) Stat failure: $count1"
+
+       count2=$($LFS fid2path $DIR $foofid | wc -l)
+       [ $count2 -eq 2 ] ||
+               error "(11) Repaired something unexpectedly: $count2"
+}
+run_test 29c "Not verify nlink attr if hark links exceed linkEA limitation"
+
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size
index 0ea8c10..5e6f93d 100644 (file)
@@ -2149,6 +2149,8 @@ static void check_lfsck_request(void)
        CHECK_VALUE(LE_CONDITIONAL_DESTROY);
        CHECK_VALUE(LE_PAIRS_VERIFY);
        CHECK_VALUE(LE_CREATE_ORPHAN);
+       CHECK_VALUE(LE_SKIP_NLINK_DECLARE);
+       CHECK_VALUE(LE_SKIP_NLINK);
 
        CHECK_VALUE_X(LEF_TO_OST);
        CHECK_VALUE_X(LEF_FROM_OST);
index 7c5e3bf..f08f453 100644 (file)
@@ -4727,6 +4727,10 @@ void lustre_assert_wire_constants(void)
                 (long long)LE_PAIRS_VERIFY);
        LASSERTF(LE_CREATE_ORPHAN == 12, "found %lld\n",
                 (long long)LE_CREATE_ORPHAN);
+       LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n",
+                (long long)LE_SKIP_NLINK_DECLARE);
+       LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
+                (long long)LE_SKIP_NLINK);
        LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)LEF_TO_OST);
        LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",