Whamcloud - gitweb
LU-4788 lfsck: namespace LFSCK uses assistant thread 03/10603/24
authorFan Yong <fan.yong@intel.com>
Tue, 29 Jul 2014 19:02:18 +0000 (03:02 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 24 Sep 2014 02:16:04 +0000 (02:16 +0000)
Move the lfsck assistant thread from layout.c to engine.c, and
make it to be shared by both layout LFSCK and namespace LFSCK.

With using assistant thread, the namespace LFSCK can make the
async pipeline for scanning the directory as the layout LFSCK
does for scanning the stripes, then the LFSCK main engine will
not be blocked by cross-MDT verification.

The namesapce LFSCK assistant thread is necessary, because both
the layout LFSCK and the namespace LFSCK are driven by the same
LFSCK main engine. If the LFSCK main engine is blocked because
of namespace handling, then the layout LFSCK will also be blocked.
Currently, the LFSCK main engine and the layout LFSCK assistant
thread has composed a async pipeline, then the LFSCK main engine
will not be blocked by layout related remote operations. So it is
necessary to make another pipeline for namespace related handling
to avoid the LFSCK main engine to be blocked for namespace related
remote operations.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I99e18ab1d85ad4d74b16b2387767422907781d5e
Reviewed-on: http://review.whamcloud.com/10603
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/lfsck/lfsck_engine.c
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c

index 39a0b3c..33250ba 100644 (file)
 
 #include "lfsck_internal.h"
 
 
 #include "lfsck_internal.h"
 
-static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
+static int lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie, __u16 *type)
 {
 {
+       struct luda_type        *lt;
+       int                      align = sizeof(*lt) - 1;
+       int                      len;
+
        fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
        *cookie = le64_to_cpu(ent->lde_hash);
        ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
        ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
        ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
 
        fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
        *cookie = le64_to_cpu(ent->lde_hash);
        ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
        ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
        ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
 
-       /* Make sure the name is terminated with '0'.
-        * The data (type) after ent::lde_name maybe
-        * broken, but we do not care. */
-       ent->lde_name[ent->lde_namelen] = 0;
+       if (unlikely(!(ent->lde_attrs & LUDA_TYPE)))
+               return -EINVAL;
+
+       len = (ent->lde_namelen + align) & ~align;
+       lt = (struct luda_type *)(ent->lde_name + len);
+       *type = le16_to_cpu(lt->lt_type);
+
+       /* Make sure the name is terminated with '\0'. The data (object type)
+        * after ent::lde_name maybe broken, but we have stored such data in
+        * the output parameter @type as above. */
+       ent->lde_name[ent->lde_namelen] = '\0';
+
+       return 0;
 }
 
 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
 }
 
 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
@@ -274,7 +287,7 @@ static int lfsck_checkpoint(const struct lu_env *env,
                                    lfsck->li_time_next_checkpoint)))
                return 0;
 
                                    lfsck->li_time_next_checkpoint)))
                return 0;
 
-       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
+       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
        list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
                rc = com->lc_ops->lfsck_checkpoint(env, com, false);
                if (rc != 0)
        list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
                rc = com->lc_ops->lfsck_checkpoint(env, com, false);
                if (rc != 0)
@@ -394,7 +407,8 @@ out:
        }
 
        rc = 0;
        }
 
        rc = 0;
-       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
+       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, true);
+       lfsck->li_pos_current = lfsck->li_pos_checkpoint;
        list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
                rc = com->lc_ops->lfsck_checkpoint(env, com, true);
                if (rc != 0)
        list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
                rc = com->lc_ops->lfsck_checkpoint(env, com, true);
                if (rc != 0)
@@ -464,13 +478,13 @@ out:
 
 static int lfsck_exec_dir(const struct lu_env *env,
                          struct lfsck_instance *lfsck,
 
 static int lfsck_exec_dir(const struct lu_env *env,
                          struct lfsck_instance *lfsck,
-                         struct dt_object *obj, struct lu_dirent *ent)
+                         struct lu_dirent *ent, __u16 type)
 {
        struct lfsck_component *com;
        int                     rc;
 
        list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
 {
        struct lfsck_component *com;
        int                     rc;
 
        list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
-               rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
+               rc = com->lc_ops->lfsck_exec_dir(env, com, ent, type);
                if (rc != 0)
                        return rc;
        }
                if (rc != 0)
                        return rc;
        }
@@ -485,7 +499,7 @@ static int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
        int                     rc  = 0;
        int                     rc1 = 0;
 
        int                     rc  = 0;
        int                     rc1 = 0;
 
-       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
+       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
        list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
                rc = com->lc_ops->lfsck_post(env, com, result, false);
                if (rc != 0)
        list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
                rc = com->lc_ops->lfsck_post(env, com, result, false);
                if (rc != 0)
@@ -573,15 +587,13 @@ static int lfsck_master_dir_engine(const struct lu_env *env,
        struct dt_it                    *di     = lfsck->li_di_dir;
        struct lu_dirent                *ent    =
                        (struct lu_dirent *)info->lti_key;
        struct dt_it                    *di     = lfsck->li_di_dir;
        struct lu_dirent                *ent    =
                        (struct lu_dirent *)info->lti_key;
-       struct lu_fid                   *fid    = &info->lti_fid;
        struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
        struct ptlrpc_thread            *thread = &lfsck->li_thread;
        int                              rc;
        struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
        struct ptlrpc_thread            *thread = &lfsck->li_thread;
        int                              rc;
+       __u16                            type;
        ENTRY;
 
        do {
        ENTRY;
 
        do {
-               struct dt_object *child;
-
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
                    cfs_fail_val > 0) {
                        struct l_wait_info lwi;
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
                    cfs_fail_val > 0) {
                        struct l_wait_info lwi;
@@ -596,7 +608,10 @@ static int lfsck_master_dir_engine(const struct lu_env *env,
                lfsck->li_new_scanned++;
                rc = iops->rec(env, di, (struct dt_rec *)ent,
                               lfsck->li_args_dir);
                lfsck->li_new_scanned++;
                rc = iops->rec(env, di, (struct dt_rec *)ent,
                               lfsck->li_args_dir);
-               lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
+               if (rc == 0)
+                       rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir,
+                                             &type);
+
                if (rc != 0) {
                        CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
                               "parent "DFID", cookie "LPX64": rc = %d\n",
                if (rc != 0) {
                        CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
                               "parent "DFID", cookie "LPX64": rc = %d\n",
@@ -613,27 +628,9 @@ static int lfsck_master_dir_engine(const struct lu_env *env,
                if (ent->lde_attrs & LUDA_IGNORE)
                        goto checkpoint;
 
                if (ent->lde_attrs & LUDA_IGNORE)
                        goto checkpoint;
 
-               *fid = ent->lde_fid;
-               child = lfsck_object_find(env, lfsck, fid);
-               if (IS_ERR(child)) {
-                       CDEBUG(D_LFSCK, "%s: scan dir failed at find target, "
-                              "parent "DFID", child %.*s "DFID": rc = %d\n",
-                              lfsck_lfsck2name(lfsck),
-                              PFID(lfsck_dto2fid(dir)),
-                              ent->lde_namelen, ent->lde_name,
-                              PFID(&ent->lde_fid), rc);
-                       lfsck_fail(env, lfsck, true);
-                       if (bk->lb_param & LPF_FAILOUT)
-                               RETURN(PTR_ERR(child));
-                       else
-                               goto checkpoint;
-               }
-
-               /* XXX: Currently, skip remote object, the consistency for
-                *      remote object will be processed in LFSCK phase III. */
-               if (dt_object_exists(child) && !dt_object_remote(child))
-                       rc = lfsck_exec_dir(env, lfsck, child, ent);
-               lfsck_object_put(env, child);
+               /* The type in the @ent structure may has been overwritten,
+                * so we need to pass the @type parameter independently. */
+               rc = lfsck_exec_dir(env, lfsck, ent, type);
                if (rc != 0 && bk->lb_param & LPF_FAILOUT)
                        RETURN(rc);
 
                if (rc != 0 && bk->lb_param & LPF_FAILOUT)
                        RETURN(rc);
 
@@ -713,6 +710,7 @@ static int lfsck_master_oit_engine(const struct lu_env *env,
 
                lfsck->li_current_oit_processed = 1;
                lfsck->li_new_scanned++;
 
                lfsck->li_current_oit_processed = 1;
                lfsck->li_new_scanned++;
+               lfsck->li_pos_current.lp_oit_cookie = iops->store(env, di);
                rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
                if (rc != 0) {
                        CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
                rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
                if (rc != 0) {
                        CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
@@ -855,9 +853,9 @@ int lfsck_master_engine(void *args)
        CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
               "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
               ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
        CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
               "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
               ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
-              lfsck->li_pos_current.lp_oit_cookie,
-              lfsck->li_pos_current.lp_dir_cookie,
-              PFID(&lfsck->li_pos_current.lp_dir_parent),
+              lfsck->li_pos_checkpoint.lp_oit_cookie,
+              lfsck->li_pos_checkpoint.lp_dir_cookie,
+              PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
               current_pid());
 
        spin_lock(&lfsck->li_lock);
               current_pid());
 
        spin_lock(&lfsck->li_lock);
@@ -881,9 +879,9 @@ int lfsck_master_engine(void *args)
        CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
               "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
               ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
        CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
               "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
               ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
-              lfsck->li_pos_current.lp_oit_cookie,
-              lfsck->li_pos_current.lp_dir_cookie,
-              PFID(&lfsck->li_pos_current.lp_dir_parent),
+              lfsck->li_pos_checkpoint.lp_oit_cookie,
+              lfsck->li_pos_checkpoint.lp_dir_cookie,
+              PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
               current_pid(), rc);
 
        if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
               current_pid(), rc);
 
        if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
@@ -914,3 +912,718 @@ fini_args:
        lfsck_thread_args_fini(lta);
        return rc;
 }
        lfsck_thread_args_fini(lta);
        return rc;
 }
+
+static inline bool lfsck_assistant_req_empty(struct lfsck_assistant_data *lad)
+{
+       bool empty = false;
+
+       spin_lock(&lad->lad_lock);
+       if (list_empty(&lad->lad_req_list))
+               empty = true;
+       spin_unlock(&lad->lad_lock);
+
+       return empty;
+}
+
+/**
+ * Query the LFSCK status from the instatnces on remote servers.
+ *
+ * The LFSCK assistant thread queries the LFSCK instances on other
+ * servers (MDT/OST) about their status, such as whether they have
+ * finished the phase1/phase2 scanning or not, and so on.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+static int lfsck_assistant_query_others(const struct lu_env *env,
+                                       struct lfsck_component *com)
+{
+       struct lfsck_thread_info          *info  = lfsck_env_info(env);
+       struct lfsck_request              *lr    = &info->lti_lr;
+       struct lfsck_async_interpret_args *laia  = &info->lti_laia;
+       struct lfsck_instance             *lfsck = com->lc_lfsck;
+       struct lfsck_assistant_data       *lad   = com->lc_data;
+       struct ptlrpc_request_set         *set;
+       struct lfsck_tgt_descs            *ltds;
+       struct lfsck_tgt_desc             *ltd;
+       struct list_head                  *phase_head;
+       int                                rc    = 0;
+       int                                rc1   = 0;
+       ENTRY;
+
+       set = ptlrpc_prep_set();
+       if (set == NULL)
+               RETURN(-ENOMEM);
+
+       lad->lad_touch_gen++;
+       memset(lr, 0, sizeof(*lr));
+       lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
+       lr->lr_event = LE_QUERY;
+       lr->lr_active = com->lc_type;
+       laia->laia_com = com;
+       laia->laia_lr = lr;
+       laia->laia_shared = 0;
+
+       if (!list_empty(&lad->lad_mdt_phase1_list)) {
+               ltds = &lfsck->li_mdt_descs;
+               lr->lr_flags = 0;
+               phase_head = &lad->lad_mdt_phase1_list;
+       } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
+               goto out;
+       } else {
+
+again:
+               ltds = &lfsck->li_ost_descs;
+               lr->lr_flags = LEF_TO_OST;
+               phase_head = &lad->lad_ost_phase1_list;
+       }
+
+       laia->laia_ltds = ltds;
+       spin_lock(&ltds->ltd_lock);
+       while (!list_empty(phase_head)) {
+               struct list_head *phase_list;
+               __u32            *gen;
+
+               if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                       ltd = list_entry(phase_head->next,
+                                        struct lfsck_tgt_desc,
+                                        ltd_layout_phase_list);
+                       phase_list = &ltd->ltd_layout_phase_list;
+                       gen = &ltd->ltd_layout_gen;
+               } else {
+                       ltd = list_entry(phase_head->next,
+                                        struct lfsck_tgt_desc,
+                                        ltd_namespace_phase_list);
+                       phase_list = &ltd->ltd_namespace_phase_list;
+                       gen = &ltd->ltd_namespace_gen;
+               }
+
+               if (*gen == lad->lad_touch_gen)
+                       break;
+
+               *gen = lad->lad_touch_gen;
+               list_move_tail(phase_list, phase_head);
+               atomic_inc(&ltd->ltd_ref);
+               laia->laia_ltd = ltd;
+               spin_unlock(&ltds->ltd_lock);
+               rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
+                                        lfsck_async_interpret_common,
+                                        laia, LFSCK_QUERY);
+               if (rc != 0) {
+                       CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to query "
+                              "%s %x for %s: rc = %d\n",
+                              lfsck_lfsck2name(lfsck),
+                              (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
+                              ltd->ltd_index, lad->lad_name, rc);
+                       lfsck_tgt_put(ltd);
+                       rc1 = rc;
+               }
+               spin_lock(&ltds->ltd_lock);
+       }
+       spin_unlock(&ltds->ltd_lock);
+
+       rc = ptlrpc_set_wait(set);
+       if (rc < 0) {
+               ptlrpc_set_destroy(set);
+               RETURN(rc);
+       }
+
+       if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST) &&
+           list_empty(&lad->lad_mdt_phase1_list))
+               goto again;
+
+out:
+       ptlrpc_set_destroy(set);
+
+       RETURN(rc1 != 0 ? rc1 : rc);
+}
+
+/**
+ * Notify the LFSCK event to the instatnces on remote servers.
+ *
+ * The LFSCK assistant thread notifies the LFSCK instances on other
+ * servers (MDT/OST) about some events, such as start new scanning,
+ * stop the scanning, this LFSCK instance will exit, and so on.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] lr       pointer to the LFSCK event request
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+static int lfsck_assistant_notify_others(const struct lu_env *env,
+                                        struct lfsck_component *com,
+                                        struct lfsck_request *lr)
+{
+       struct lfsck_thread_info          *info  = lfsck_env_info(env);
+       struct lfsck_async_interpret_args *laia  = &info->lti_laia;
+       struct lfsck_instance             *lfsck = com->lc_lfsck;
+       struct lfsck_assistant_data       *lad   = com->lc_data;
+       struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
+       struct ptlrpc_request_set         *set;
+       struct lfsck_tgt_descs            *ltds;
+       struct lfsck_tgt_desc             *ltd;
+       struct lfsck_tgt_desc             *next;
+       __u32                              idx;
+       int                                rc    = 0;
+       int                                rc1   = 0;
+       ENTRY;
+
+       set = ptlrpc_prep_set();
+       if (set == NULL)
+               RETURN(-ENOMEM);
+
+       lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
+       lr->lr_active = com->lc_type;
+       laia->laia_com = com;
+       laia->laia_lr = lr;
+       laia->laia_shared = 0;
+
+       switch (lr->lr_event) {
+       case LE_START:
+               if (com->lc_type != LFSCK_TYPE_LAYOUT)
+                       goto next;
+
+               lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
+                              LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
+               lr->lr_speed = bk->lb_speed_limit;
+               lr->lr_version = bk->lb_version;
+               lr->lr_param |= bk->lb_param;
+               lr->lr_async_windows = bk->lb_async_windows;
+               lr->lr_flags = LEF_TO_OST;
+
+               /* Notify OSTs firstly, then handle other MDTs if needed. */
+               ltds = &lfsck->li_ost_descs;
+               laia->laia_ltds = ltds;
+               down_read(&ltds->ltd_rw_sem);
+               cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
+                       ltd = lfsck_tgt_get(ltds, idx);
+                       LASSERT(ltd != NULL);
+
+                       laia->laia_ltd = ltd;
+                       ltd->ltd_layout_done = 0;
+                       rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
+                                       lfsck_async_interpret_common,
+                                       laia, LFSCK_NOTIFY);
+                       if (rc != 0) {
+                               struct lfsck_layout *lo = com->lc_file_ram;
+
+                               lo->ll_flags |= LF_INCOMPLETE;
+                               CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
+                                      "notify OST %x for %s start: rc = %d\n",
+                                      lfsck_lfsck2name(lfsck), idx,
+                                      lad->lad_name, rc);
+                               lfsck_tgt_put(ltd);
+                       }
+               }
+               up_read(&ltds->ltd_rw_sem);
+
+               /* Sync up */
+               rc = ptlrpc_set_wait(set);
+               if (rc < 0) {
+                       ptlrpc_set_destroy(set);
+                       RETURN(rc);
+               }
+
+next:
+               if (!(bk->lb_param & LPF_ALL_TGT))
+                       break;
+
+               /* link other MDT targets locallly. */
+               ltds = &lfsck->li_mdt_descs;
+               spin_lock(&ltds->ltd_lock);
+               if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                       cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
+                               ltd = LTD_TGT(ltds, idx);
+                               LASSERT(ltd != NULL);
+
+                               if (!list_empty(&ltd->ltd_layout_list))
+                                       continue;
+
+                               list_add_tail(&ltd->ltd_layout_list,
+                                             &lad->lad_mdt_list);
+                               list_add_tail(&ltd->ltd_layout_phase_list,
+                                             &lad->lad_mdt_phase1_list);
+                       }
+               } else {
+                       cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
+                               ltd = LTD_TGT(ltds, idx);
+                               LASSERT(ltd != NULL);
+
+                               if (!list_empty(&ltd->ltd_namespace_list))
+                                       continue;
+
+                               list_add_tail(&ltd->ltd_namespace_list,
+                                             &lad->lad_mdt_list);
+                               list_add_tail(&ltd->ltd_namespace_phase_list,
+                                             &lad->lad_mdt_phase1_list);
+                       }
+               }
+               spin_unlock(&ltds->ltd_lock);
+               break;
+       case LE_STOP:
+       case LE_PHASE2_DONE:
+       case LE_PEER_EXIT: {
+               struct list_head *phase_head;
+
+               /* Handle other MDTs firstly if needed, then notify the OSTs. */
+               if (bk->lb_param & LPF_ALL_TGT) {
+                       phase_head = &lad->lad_mdt_list;
+                       ltds = &lfsck->li_mdt_descs;
+                       if (lr->lr_event == LE_STOP) {
+                               /* unlink other MDT targets locallly. */
+                               spin_lock(&ltds->ltd_lock);
+                               if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                                       list_for_each_entry_safe(ltd, next,
+                                               phase_head, ltd_layout_list) {
+                                               list_del_init(
+                                               &ltd->ltd_layout_phase_list);
+                                               list_del_init(
+                                               &ltd->ltd_layout_list);
+                                       }
+                               } else {
+                                       list_for_each_entry_safe(ltd, next,
+                                                       phase_head,
+                                                       ltd_namespace_list) {
+                                               list_del_init(
+                                               &ltd->ltd_namespace_phase_list);
+                                               list_del_init(
+                                               &ltd->ltd_namespace_list);
+                                       }
+                               }
+                               spin_unlock(&ltds->ltd_lock);
+
+                               if (com->lc_type != LFSCK_TYPE_LAYOUT)
+                                       break;
+
+                               lr->lr_flags |= LEF_TO_OST;
+                               phase_head = &lad->lad_ost_list;
+                               ltds = &lfsck->li_ost_descs;
+                       } else {
+                               lr->lr_flags &= ~LEF_TO_OST;
+                       }
+               } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
+                       break;
+               } else {
+                       lr->lr_flags |= LEF_TO_OST;
+                       phase_head = &lad->lad_ost_list;
+                       ltds = &lfsck->li_ost_descs;
+               }
+
+again:
+               laia->laia_ltds = ltds;
+               spin_lock(&ltds->ltd_lock);
+               while (!list_empty(phase_head)) {
+                       if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                               ltd = list_entry(phase_head->next,
+                                                struct lfsck_tgt_desc,
+                                                ltd_layout_list);
+                               if (!list_empty(&ltd->ltd_layout_phase_list))
+                                       list_del_init(
+                                               &ltd->ltd_layout_phase_list);
+                               list_del_init(&ltd->ltd_layout_list);
+                       } else {
+                               ltd = list_entry(phase_head->next,
+                                                struct lfsck_tgt_desc,
+                                                ltd_namespace_list);
+                               if (!list_empty(&ltd->ltd_namespace_phase_list))
+                                       list_del_init(
+                                               &ltd->ltd_namespace_phase_list);
+                               list_del_init(&ltd->ltd_namespace_list);
+                       }
+                       atomic_inc(&ltd->ltd_ref);
+                       laia->laia_ltd = ltd;
+                       spin_unlock(&ltds->ltd_lock);
+                       rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
+                                       lfsck_async_interpret_common,
+                                       laia, LFSCK_NOTIFY);
+                       if (rc != 0) {
+                               CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
+                                      "notify %s %x for %s stop/phase2_done/"
+                                      "peer_exit: rc = %d\n",
+                                      lfsck_lfsck2name(lfsck),
+                                      (lr->lr_flags & LEF_TO_OST) ?
+                                      "OST" : "MDT", ltd->ltd_index,
+                                      lad->lad_name, rc);
+                               lfsck_tgt_put(ltd);
+                       }
+                       spin_lock(&ltds->ltd_lock);
+               }
+               spin_unlock(&ltds->ltd_lock);
+
+               rc = ptlrpc_set_wait(set);
+               if (rc < 0) {
+                       ptlrpc_set_destroy(set);
+                       RETURN(rc);
+               }
+
+               if (com->lc_type == LFSCK_TYPE_LAYOUT &&
+                   !(lr->lr_flags & LEF_TO_OST)) {
+                       lr->lr_flags |= LEF_TO_OST;
+                       phase_head = &lad->lad_ost_list;
+                       ltds = &lfsck->li_ost_descs;
+                       goto again;
+               }
+               break;
+       }
+       case LE_PHASE1_DONE:
+               lad->lad_touch_gen++;
+               ltds = &lfsck->li_mdt_descs;
+               laia->laia_ltds = ltds;
+               spin_lock(&ltds->ltd_lock);
+               while (!list_empty(&lad->lad_mdt_list)) {
+                       struct list_head *list;
+                       __u32            *gen;
+
+                       if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                               ltd = list_entry(lad->lad_mdt_list.next,
+                                                struct lfsck_tgt_desc,
+                                                ltd_layout_list);
+                               list = &ltd->ltd_layout_list;
+                               gen = &ltd->ltd_layout_gen;
+                       } else {
+                               ltd = list_entry(lad->lad_mdt_list.next,
+                                                struct lfsck_tgt_desc,
+                                                ltd_namespace_list);
+                               list = &ltd->ltd_namespace_list;
+                               gen = &ltd->ltd_namespace_gen;
+                       }
+
+                       if (*gen == lad->lad_touch_gen)
+                               break;
+
+                       *gen = lad->lad_touch_gen;
+                       list_move_tail(list, &lad->lad_mdt_list);
+                       atomic_inc(&ltd->ltd_ref);
+                       laia->laia_ltd = ltd;
+                       spin_unlock(&ltds->ltd_lock);
+                       rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
+                                       lfsck_async_interpret_common,
+                                       laia, LFSCK_NOTIFY);
+                       if (rc != 0) {
+                               CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
+                                      "notify MDT %x for %s phase1 done: "
+                                      "rc = %d\n", lfsck_lfsck2name(lfsck),
+                                      ltd->ltd_index, lad->lad_name, rc);
+                               lfsck_tgt_put(ltd);
+                       }
+                       spin_lock(&ltds->ltd_lock);
+               }
+               spin_unlock(&ltds->ltd_lock);
+               break;
+       default:
+               CDEBUG(D_LFSCK, "%s: LFSCK assistant unexpected LFSCK event: "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), lr->lr_event);
+               rc = -EINVAL;
+               break;
+       }
+
+       rc1 = ptlrpc_set_wait(set);
+       ptlrpc_set_destroy(set);
+
+       RETURN(rc != 0 ? rc : rc1);
+}
+
+/**
+ * The LFSCK assistant thread is triggered by the LFSCK main engine.
+ * They co-work together as an asynchronous pipeline: the LFSCK main
+ * engine scans the system and pre-fetches the objects, attributes,
+ * or name entries, etc, and pushes them into the pipeline as input
+ * requests for the LFSCK assistant thread; on the other end of the
+ * pipeline, the LFSCK assistant thread performs the real check and
+ * repair for every request from the main engine.
+ *
+ * Generally, the assistant engine may be blocked when check/repair
+ * something, so the LFSCK main engine will run some faster. On the
+ * other hand, the LFSCK main engine will drive multiple assistant
+ * threads in parallel, means for each LFSCK component on the master
+ * (such as layout LFSCK, namespace LFSCK), there is an independent
+ * LFSCK assistant thread. So under such 1:N multiple asynchronous
+ * pipelines mode, the whole LFSCK performance will be much better
+ * than check/repair everything by the LFSCK main engine itself.
+ */
+int lfsck_assistant_engine(void *args)
+{
+       struct lfsck_thread_args          *lta     = args;
+       struct lu_env                     *env     = &lta->lta_env;
+       struct lfsck_component            *com     = lta->lta_com;
+       struct lfsck_instance             *lfsck   = lta->lta_lfsck;
+       struct lfsck_bookmark             *bk      = &lfsck->li_bookmark_ram;
+       struct lfsck_position             *pos     = &com->lc_pos_start;
+       struct lfsck_thread_info          *info    = lfsck_env_info(env);
+       struct lfsck_request              *lr      = &info->lti_lr;
+       struct lfsck_assistant_data       *lad     = com->lc_data;
+       struct ptlrpc_thread              *mthread = &lfsck->li_thread;
+       struct ptlrpc_thread              *athread = &lad->lad_thread;
+       struct lfsck_assistant_operations *lao     = lad->lad_ops;
+       struct lfsck_assistant_req        *lar;
+       struct l_wait_info                 lwi     = { 0 };
+       int                                rc      = 0;
+       int                                rc1     = 0;
+       ENTRY;
+
+       CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread start\n",
+              lfsck_lfsck2name(lfsck), lad->lad_name);
+
+       memset(lr, 0, sizeof(*lr));
+       lr->lr_event = LE_START;
+       if (pos->lp_oit_cookie <= 1)
+               lr->lr_param = LPF_RESET;
+       rc = lfsck_assistant_notify_others(env, com, lr);
+       if (rc != 0) {
+               CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to notify others "
+                      "to start %s: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), lad->lad_name, rc);
+               GOTO(fini, rc);
+       }
+
+       spin_lock(&lad->lad_lock);
+       thread_set_flags(athread, SVC_RUNNING);
+       spin_unlock(&lad->lad_lock);
+       wake_up_all(&mthread->t_ctl_waitq);
+
+       while (1) {
+               while (!list_empty(&lad->lad_req_list)) {
+                       bool wakeup = false;
+
+                       if (unlikely(lad->lad_exit ||
+                                    !thread_is_running(mthread)))
+                               GOTO(cleanup1, rc = lad->lad_post_result);
+
+                       lar = list_entry(lad->lad_req_list.next,
+                                        struct lfsck_assistant_req,
+                                        lar_list);
+                       /* Only the lfsck_assistant_engine thread itself can
+                        * remove the "lar" from the head of the list, LFSCK
+                        * engine thread only inserts other new "lar" at the
+                        * end of the list. So it is safe to handle current
+                        * "lar" without the spin_lock. */
+                       rc = lao->la_handler_p1(env, com, lar);
+                       spin_lock(&lad->lad_lock);
+                       list_del_init(&lar->lar_list);
+                       lad->lad_prefetched--;
+                       /* Wake up the main engine thread only when the list
+                        * is empty or half of the prefetched items have been
+                        * handled to avoid too frequent thread schedule. */
+                       if (lad->lad_prefetched == 0 ||
+                           (bk->lb_async_windows != 0 &&
+                            bk->lb_async_windows / 2 ==
+                            lad->lad_prefetched))
+                               wakeup = true;
+                       spin_unlock(&lad->lad_lock);
+                       if (wakeup)
+                               wake_up_all(&mthread->t_ctl_waitq);
+
+                       lao->la_req_fini(env, lar);
+                       if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+                               GOTO(cleanup1, rc);
+               }
+
+               l_wait_event(athread->t_ctl_waitq,
+                            !lfsck_assistant_req_empty(lad) ||
+                            lad->lad_exit ||
+                            lad->lad_to_post ||
+                            lad->lad_to_double_scan,
+                            &lwi);
+
+               if (unlikely(lad->lad_exit))
+                       GOTO(cleanup1, rc = lad->lad_post_result);
+
+               if (!list_empty(&lad->lad_req_list))
+                       continue;
+
+               if (lad->lad_to_post) {
+                       CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread post\n",
+                              lfsck_lfsck2name(lfsck), lad->lad_name);
+
+                       if (unlikely(lad->lad_exit))
+                               GOTO(cleanup1, rc = lad->lad_post_result);
+
+                       lad->lad_to_post = 0;
+                       LASSERT(lad->lad_post_result > 0);
+
+                       memset(lr, 0, sizeof(*lr));
+                       lr->lr_event = LE_PHASE1_DONE;
+                       lr->lr_status = lad->lad_post_result;
+                       rc = lfsck_assistant_notify_others(env, com, lr);
+                       if (rc != 0)
+                               CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to "
+                                      "notify others for %s post: rc = %d\n",
+                                      lfsck_lfsck2name(lfsck),
+                                      lad->lad_name, rc);
+
+                       /* Wakeup the master engine to go ahead. */
+                       wake_up_all(&mthread->t_ctl_waitq);
+               }
+
+               if (lad->lad_to_double_scan) {
+                       lad->lad_to_double_scan = 0;
+                       atomic_inc(&lfsck->li_double_scan_count);
+                       lad->lad_in_double_scan = 1;
+                       wake_up_all(&mthread->t_ctl_waitq);
+
+                       com->lc_new_checked = 0;
+                       com->lc_new_scanned = 0;
+                       com->lc_time_last_checkpoint = cfs_time_current();
+                       com->lc_time_next_checkpoint =
+                               com->lc_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+
+                       /* Flush async updates before handling orphan. */
+                       dt_sync(env, lfsck->li_next);
+
+                       CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 "
+                              "scan start\n", lfsck_lfsck2name(lfsck));
+
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
+                               GOTO(cleanup2, rc = 0);
+
+                       while (lad->lad_in_double_scan) {
+                               rc = lfsck_assistant_query_others(env, com);
+                               if (lfsck_phase2_next_ready(lad))
+                                       goto p2_next;
+
+                               if (rc < 0)
+                                       GOTO(cleanup2, rc);
+
+                               /* Pull LFSCK status on related targets once
+                                * per 30 seconds if we are not notified. */
+                               lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
+                                                          cfs_time_seconds(1),
+                                                          NULL, NULL);
+                               rc = l_wait_event(athread->t_ctl_waitq,
+                                       lfsck_phase2_next_ready(lad) ||
+                                       lad->lad_exit ||
+                                       !thread_is_running(mthread),
+                                       &lwi);
+
+                               if (unlikely(lad->lad_exit ||
+                                            !thread_is_running(mthread)))
+                                       GOTO(cleanup2, rc = 0);
+
+                               if (rc == -ETIMEDOUT)
+                                       continue;
+
+                               if (rc < 0)
+                                       GOTO(cleanup2, rc);
+
+p2_next:
+                               rc = lao->la_handler_p2(env, com);
+                               if (rc != 0)
+                                       GOTO(cleanup2, rc);
+
+                               if (unlikely(lad->lad_exit ||
+                                            !thread_is_running(mthread)))
+                                       GOTO(cleanup2, rc = 0);
+                       }
+               }
+       }
+
+cleanup1:
+       /* Cleanup the unfinished requests. */
+       spin_lock(&lad->lad_lock);
+       if (rc < 0)
+               lad->lad_assistant_status = rc;
+
+       if (lad->lad_exit && lad->lad_post_result <= 0)
+               lao->la_fill_pos(env, com, &lfsck->li_pos_checkpoint);
+
+       while (!list_empty(&lad->lad_req_list)) {
+               lar = list_entry(lad->lad_req_list.next,
+                                struct lfsck_assistant_req,
+                                lar_list);
+               list_del_init(&lar->lar_list);
+               lad->lad_prefetched--;
+               spin_unlock(&lad->lad_lock);
+               lao->la_req_fini(env, lar);
+               spin_lock(&lad->lad_lock);
+       }
+       spin_unlock(&lad->lad_lock);
+
+       LASSERTF(lad->lad_prefetched == 0, "unmatched prefeteched objs %d\n",
+                lad->lad_prefetched);
+
+cleanup2:
+       memset(lr, 0, sizeof(*lr));
+       if (rc > 0) {
+               lr->lr_event = LE_PHASE2_DONE;
+               lr->lr_status = rc;
+       } else if (rc == 0) {
+               if (lfsck->li_flags & LPF_ALL_TGT) {
+                       lr->lr_event = LE_STOP;
+                       lr->lr_status = LS_STOPPED;
+               } else {
+                       lr->lr_event = LE_PEER_EXIT;
+                       switch (lfsck->li_status) {
+                       case LS_PAUSED:
+                       case LS_CO_PAUSED:
+                               lr->lr_status = LS_CO_PAUSED;
+                               break;
+                       case LS_STOPPED:
+                       case LS_CO_STOPPED:
+                               lr->lr_status = LS_CO_STOPPED;
+                               break;
+                       default:
+                               CDEBUG(D_LFSCK, "%s: LFSCK assistant unknown "
+                                      "status: rc = %d\n",
+                                      lfsck_lfsck2name(lfsck),
+                                      lfsck->li_status);
+                               lr->lr_status = LS_CO_FAILED;
+                               break;
+                       }
+               }
+       } else {
+               if (lfsck->li_flags & LPF_ALL_TGT) {
+                       lr->lr_event = LE_STOP;
+                       lr->lr_status = LS_FAILED;
+               } else {
+                       lr->lr_event = LE_PEER_EXIT;
+                       lr->lr_status = LS_CO_FAILED;
+               }
+       }
+
+       rc1 = lfsck_assistant_notify_others(env, com, lr);
+       if (rc1 != 0) {
+               CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to notify "
+                      "others for %s quit: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), lad->lad_name, rc1);
+               rc = rc1;
+       }
+
+       /* Flush async updates before exit. */
+       dt_sync(env, lfsck->li_next);
+
+       /* Under force exit case, some requests may be just freed without
+        * verification, those objects should be re-handled when next run.
+        * So not update the on-disk tracing file under such case. */
+       if (lad->lad_in_double_scan) {
+               if (!lad->lad_exit)
+                       rc1 = lao->la_double_scan_result(env, com, rc);
+
+               CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 scan "
+                      "finished: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), rc1 != 0 ? rc1 : rc);
+       }
+
+fini:
+       if (lad->lad_in_double_scan)
+               atomic_dec(&lfsck->li_double_scan_count);
+
+       spin_lock(&lad->lad_lock);
+       lad->lad_assistant_status = (rc1 != 0 ? rc1 : rc);
+       thread_set_flags(athread, SVC_STOPPED);
+       wake_up_all(&mthread->t_ctl_waitq);
+       spin_unlock(&lad->lad_lock);
+
+       CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread exit: rc = %d\n",
+              lfsck_lfsck2name(lfsck), lad->lad_name,
+              lad->lad_assistant_status);
+
+       lfsck_thread_args_fini(lta);
+
+       return rc;
+}
index eb0af97..5a801cf 100644 (file)
 #define HALF_SEC                       (HZ >> 1)
 #define LFSCK_CHECKPOINT_INTERVAL      60
 
 #define HALF_SEC                       (HZ >> 1)
 #define LFSCK_CHECKPOINT_INTERVAL      60
 
-#define LFSCK_NAMEENTRY_DEAD           1 /* The object has been unlinked. */
-#define LFSCK_NAMEENTRY_REMOVED        2 /* The entry has been removed. */
-#define LFSCK_NAMEENTRY_RECREATED      3 /* The entry has been recreated. */
-
 enum lfsck_flags {
        /* Finish the first cycle scanning. */
        LF_SCANNED_ONCE         = 0x00000001ULL,
 enum lfsck_flags {
        /* Finish the first cycle scanning. */
        LF_SCANNED_ONCE         = 0x00000001ULL,
@@ -289,19 +285,14 @@ struct lfsck_operations {
 
        int (*lfsck_exec_dir)(const struct lu_env *env,
                              struct lfsck_component *com,
 
        int (*lfsck_exec_dir)(const struct lu_env *env,
                              struct lfsck_component *com,
-                             struct dt_object *obj,
-                             struct lu_dirent *ent);
+                             struct lu_dirent *ent,
+                             __u16 type);
 
        int (*lfsck_post)(const struct lu_env *env,
                          struct lfsck_component *com,
                          int result,
                          bool init);
 
 
        int (*lfsck_post)(const struct lu_env *env,
                          struct lfsck_component *com,
                          int result,
                          bool init);
 
-       int (*lfsck_interpret)(const struct lu_env *env,
-                              struct ptlrpc_request *req,
-                              void *args,
-                              int rc);
-
        int (*lfsck_dump)(const struct lu_env *env,
                          struct lfsck_component *com,
                          struct seq_file *m);
        int (*lfsck_dump)(const struct lu_env *env,
                          struct lfsck_component *com,
                          struct seq_file *m);
@@ -322,12 +313,6 @@ struct lfsck_operations {
        int (*lfsck_query)(const struct lu_env *env,
                           struct lfsck_component *com);
 
        int (*lfsck_query)(const struct lu_env *env,
                           struct lfsck_component *com);
 
-       int (*lfsck_stop_notify)(const struct lu_env *env,
-                                struct lfsck_component *com,
-                                struct lfsck_tgt_descs *ltds,
-                                struct lfsck_tgt_desc *ltd,
-                                struct ptlrpc_request_set *set);
-
        int (*lfsck_join)(const struct lu_env *env,
                          struct lfsck_component *com,
                          struct lfsck_start_param *lsp);
        int (*lfsck_join)(const struct lu_env *env,
                          struct lfsck_component *com,
                          struct lfsck_start_param *lsp);
@@ -343,11 +328,15 @@ struct lfsck_tgt_desc {
        struct obd_export *ltd_exp;
        struct list_head   ltd_layout_list;
        struct list_head   ltd_layout_phase_list;
        struct obd_export *ltd_exp;
        struct list_head   ltd_layout_list;
        struct list_head   ltd_layout_phase_list;
+       struct list_head   ltd_namespace_list;
+       struct list_head   ltd_namespace_phase_list;
        atomic_t           ltd_ref;
        __u32              ltd_index;
        __u32              ltd_layout_gen;
        atomic_t           ltd_ref;
        __u32              ltd_index;
        __u32              ltd_layout_gen;
+       __u32              ltd_namespace_gen;
        unsigned int       ltd_dead:1,
        unsigned int       ltd_dead:1,
-                          ltd_layout_done:1;
+                          ltd_layout_done:1,
+                          ltd_namespace_done:1;
 };
 
 struct lfsck_tgt_desc_idx {
 };
 
 struct lfsck_tgt_desc_idx {
@@ -464,6 +453,7 @@ struct lfsck_instance {
        struct lfsck_bookmark     li_bookmark_ram;
        struct lfsck_bookmark     li_bookmark_disk;
        struct lfsck_position     li_pos_current;
        struct lfsck_bookmark     li_bookmark_ram;
        struct lfsck_bookmark     li_bookmark_disk;
        struct lfsck_position     li_pos_current;
+       struct lfsck_position     li_pos_checkpoint;
 
        /* Obj for otable-based iteration */
        struct dt_object         *li_obj_oit;
 
        /* Obj for otable-based iteration */
        struct dt_object         *li_obj_oit;
@@ -538,6 +528,67 @@ struct lfsck_thread_args {
        struct lfsck_start_param        *lta_lsp;
 };
 
        struct lfsck_start_param        *lta_lsp;
 };
 
+struct lfsck_assistant_req {
+       struct list_head        lar_list;
+};
+
+struct lfsck_assistant_operations {
+       int (*la_handler_p1)(const struct lu_env *env,
+                            struct lfsck_component *com,
+                            struct lfsck_assistant_req *lar);
+
+       int (*la_handler_p2)(const struct lu_env *env,
+                            struct lfsck_component *com);
+
+       void (*la_fill_pos)(const struct lu_env *env,
+                           struct lfsck_component *com,
+                           struct lfsck_position *pos);
+
+       int (*la_double_scan_result)(const struct lu_env *env,
+                                    struct lfsck_component *com,
+                                    int rc);
+
+       void (*la_req_fini)(const struct lu_env *env,
+                           struct lfsck_assistant_req *lar);
+};
+
+struct lfsck_assistant_data {
+       spinlock_t                               lad_lock;
+       struct list_head                         lad_req_list;
+
+       /* list for the ost targets involve LFSCK. */
+       struct list_head                         lad_ost_list;
+
+       /* list for the ost targets in phase1 scanning. */
+       struct list_head                         lad_ost_phase1_list;
+
+       /* list for the ost targets in phase1 scanning. */
+       struct list_head                         lad_ost_phase2_list;
+
+       /* list for the mdt targets involve LFSCK. */
+       struct list_head                         lad_mdt_list;
+
+       /* list for the mdt targets in phase1 scanning. */
+       struct list_head                         lad_mdt_phase1_list;
+
+       /* list for the mdt targets in phase1 scanning. */
+       struct list_head                         lad_mdt_phase2_list;
+
+       const char                              *lad_name;
+       struct ptlrpc_thread                     lad_thread;
+
+       struct lfsck_assistant_operations       *lad_ops;
+
+       __u32                                    lad_touch_gen;
+       int                                      lad_prefetched;
+       int                                      lad_assistant_status;
+       int                                      lad_post_result;
+       unsigned int                             lad_to_post:1,
+                                                lad_to_double_scan:1,
+                                                lad_in_double_scan:1,
+                                                lad_exit:1;
+};
+
 #define LFSCK_TMPBUF_LEN       64
 
 struct lfsck_thread_info {
 #define LFSCK_TMPBUF_LEN       64
 
 struct lfsck_thread_info {
@@ -606,18 +657,32 @@ void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit);
 void lfsck_control_speed(struct lfsck_instance *lfsck);
 void lfsck_control_speed_by_self(struct lfsck_component *com);
 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit);
 void lfsck_control_speed(struct lfsck_instance *lfsck);
 void lfsck_control_speed_by_self(struct lfsck_component *com);
-struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
-                                                struct lfsck_component *com,
-                                                struct lfsck_start_param *lsp);
 void lfsck_thread_args_fini(struct lfsck_thread_args *lta);
 void lfsck_thread_args_fini(struct lfsck_thread_args *lta);
+struct lfsck_assistant_data *
+lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
+                         const char *name);
+int lfsck_async_interpret_common(const struct lu_env *env,
+                                struct ptlrpc_request *req,
+                                void *args, int rc);
 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
                        struct lfsck_request *lr,
                        struct ptlrpc_request_set *set,
                        ptlrpc_interpterer_t interpterer,
                        void *args, int request);
 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
                        struct lfsck_request *lr,
                        struct ptlrpc_request_set *set,
                        ptlrpc_interpterer_t interpterer,
                        void *args, int request);
+int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
+                         struct lfsck_start_param *lsp);
+int lfsck_checkpoint_generic(const struct lu_env *env,
+                            struct lfsck_component *com);
+void lfsck_post_generic(const struct lu_env *env,
+                       struct lfsck_component *com, int *result);
+int lfsck_double_scan_generic(const struct lu_env *env,
+                             struct lfsck_component *com, int status);
+void lfsck_quit_generic(const struct lu_env *env,
+                       struct lfsck_component *com);
 
 /* lfsck_engine.c */
 int lfsck_master_engine(void *args);
 
 /* lfsck_engine.c */
 int lfsck_master_engine(void *args);
+int lfsck_assistant_engine(void *args);
 
 /* lfsck_bookmark.c */
 void lfsck_bookmark_cpu_to_le(struct lfsck_bookmark *des,
 
 /* lfsck_bookmark.c */
 void lfsck_bookmark_cpu_to_le(struct lfsck_bookmark *des,
@@ -901,4 +966,11 @@ static inline u32 lfsck_dev_idx(struct dt_device *dev)
        return dev->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
 }
 
        return dev->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
 }
 
+static inline bool lfsck_phase2_next_ready(struct lfsck_assistant_data *lad)
+{
+       return list_empty(&lad->lad_mdt_phase1_list) &&
+              (!list_empty(&lad->lad_ost_phase2_list) ||
+               list_empty(&lad->lad_ost_phase1_list));
+}
+
 #endif /* _LFSCK_INTERNAL_H */
 #endif /* _LFSCK_INTERNAL_H */
index 2ea8a49..aadf08e 100644 (file)
@@ -91,50 +91,18 @@ struct lfsck_layout_object {
        struct dt_object        *llo_obj;
        struct lu_attr           llo_attr;
        atomic_t                 llo_ref;
        struct dt_object        *llo_obj;
        struct lu_attr           llo_attr;
        atomic_t                 llo_ref;
+       __u64                    llo_cookie;
        __u16                    llo_gen;
 };
 
 struct lfsck_layout_req {
        __u16                    llo_gen;
 };
 
 struct lfsck_layout_req {
-       struct list_head                 llr_list;
+       struct lfsck_assistant_req       llr_lar;
        struct lfsck_layout_object      *llr_parent;
        struct dt_object                *llr_child;
        __u32                            llr_ost_idx;
        __u32                            llr_lov_idx; /* offset in LOV EA */
 };
 
        struct lfsck_layout_object      *llr_parent;
        struct dt_object                *llr_child;
        __u32                            llr_ost_idx;
        __u32                            llr_lov_idx; /* offset in LOV EA */
 };
 
-struct lfsck_layout_master_data {
-       spinlock_t              llmd_lock;
-       struct list_head        llmd_req_list;
-
-       /* list for the ost targets involve layout verification. */
-       struct list_head        llmd_ost_list;
-
-       /* list for the ost targets in phase1 scanning. */
-       struct list_head        llmd_ost_phase1_list;
-
-       /* list for the ost targets in phase1 scanning. */
-       struct list_head        llmd_ost_phase2_list;
-
-       /* list for the mdt targets involve layout verification. */
-       struct list_head        llmd_mdt_list;
-
-       /* list for the mdt targets in phase1 scanning. */
-       struct list_head        llmd_mdt_phase1_list;
-
-       /* list for the mdt targets in phase1 scanning. */
-       struct list_head        llmd_mdt_phase2_list;
-
-       struct ptlrpc_thread    llmd_thread;
-       __u32                   llmd_touch_gen;
-       int                     llmd_prefetched;
-       int                     llmd_assistant_status;
-       int                     llmd_post_result;
-       unsigned int            llmd_to_post:1,
-                               llmd_to_double_scan:1,
-                               llmd_in_double_scan:1,
-                               llmd_exit:1;
-};
-
 struct lfsck_layout_slave_async_args {
        struct obd_export                *llsaa_exp;
        struct lfsck_component           *llsaa_com;
 struct lfsck_layout_slave_async_args {
        struct obd_export                *llsaa_exp;
        struct lfsck_component           *llsaa_com;
@@ -143,7 +111,7 @@ struct lfsck_layout_slave_async_args {
 
 static struct lfsck_layout_object *
 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
 
 static struct lfsck_layout_object *
 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
-                        __u16 gen)
+                        __u64 cookie, __u16 gen)
 {
        struct lfsck_layout_object *llo;
        int                         rc;
 {
        struct lfsck_layout_object *llo;
        int                         rc;
@@ -161,6 +129,7 @@ lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
 
        lu_object_get(&obj->do_lu);
        llo->llo_obj = obj;
 
        lu_object_get(&obj->do_lu);
        llo->llo_obj = obj;
+       llo->llo_cookie = cookie;
        /* The gen can be used to check whether some others have changed the
         * file layout after LFSCK pre-fetching but before real verification. */
        llo->llo_gen = gen;
        /* The gen can be used to check whether some others have changed the
         * file layout after LFSCK pre-fetching but before real verification. */
        llo->llo_gen = gen;
@@ -262,8 +231,9 @@ static inline void lfsck_layout_object_put(const struct lu_env *env,
 }
 
 static struct lfsck_layout_req *
 }
 
 static struct lfsck_layout_req *
-lfsck_layout_req_init(struct lfsck_layout_object *parent,
-                     struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
+lfsck_layout_assistant_req_init(struct lfsck_layout_object *parent,
+                               struct dt_object *child, __u32 ost_idx,
+                               __u32 lov_idx)
 {
        struct lfsck_layout_req *llr;
 
 {
        struct lfsck_layout_req *llr;
 
@@ -271,7 +241,7 @@ lfsck_layout_req_init(struct lfsck_layout_object *parent,
        if (llr == NULL)
                return ERR_PTR(-ENOMEM);
 
        if (llr == NULL)
                return ERR_PTR(-ENOMEM);
 
-       INIT_LIST_HEAD(&llr->llr_list);
+       INIT_LIST_HEAD(&llr->llr_lar.lar_list);
        atomic_inc(&parent->llo_ref);
        llr->llr_parent = parent;
        llr->llr_child = child;
        atomic_inc(&parent->llo_ref);
        llr->llr_parent = parent;
        llr->llr_child = child;
@@ -281,26 +251,17 @@ lfsck_layout_req_init(struct lfsck_layout_object *parent,
        return llr;
 }
 
        return llr;
 }
 
-static inline void lfsck_layout_req_fini(const struct lu_env *env,
-                                        struct lfsck_layout_req *llr)
+static void lfsck_layout_assistant_req_fini(const struct lu_env *env,
+                                           struct lfsck_assistant_req *lar)
 {
 {
+       struct lfsck_layout_req *llr =
+                       container_of0(lar, struct lfsck_layout_req, llr_lar);
+
        lu_object_put(env, &llr->llr_child->do_lu);
        lfsck_layout_object_put(env, llr->llr_parent);
        OBD_FREE_PTR(llr);
 }
 
        lu_object_put(env, &llr->llr_child->do_lu);
        lfsck_layout_object_put(env, llr->llr_parent);
        OBD_FREE_PTR(llr);
 }
 
-static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
-{
-       bool empty = false;
-
-       spin_lock(&llmd->llmd_lock);
-       if (list_empty(&llmd->llmd_req_list))
-               empty = true;
-       spin_unlock(&llmd->llmd_lock);
-
-       return empty;
-}
-
 static int lfsck_layout_get_lovea(const struct lu_env *env,
                                  struct dt_object *obj, struct lu_buf *buf)
 {
 static int lfsck_layout_get_lovea(const struct lu_env *env,
                                  struct dt_object *obj, struct lu_buf *buf)
 {
@@ -1155,14 +1116,17 @@ out:
 }
 
 static void lfsck_layout_record_failure(const struct lu_env *env,
 }
 
 static void lfsck_layout_record_failure(const struct lu_env *env,
-                                                struct lfsck_instance *lfsck,
-                                                struct lfsck_layout *lo)
+                                       struct lfsck_instance *lfsck,
+                                       struct lfsck_layout *lo)
 {
 {
+       __u64 cookie;
+
        lo->ll_objs_failed_phase1++;
        lo->ll_objs_failed_phase1++;
-       if (unlikely(lo->ll_pos_first_inconsistent == 0)) {
-               lo->ll_pos_first_inconsistent =
-                       lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
+       cookie = lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
                                                        lfsck->li_di_oit);
                                                        lfsck->li_di_oit);
+       if (lo->ll_pos_first_inconsistent == 0 ||
+           lo->ll_pos_first_inconsistent < cookie) {
+               lo->ll_pos_first_inconsistent = cookie;
 
                CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
                       "inconsistency at the pos ["LPU64"]\n",
 
                CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
                       "inconsistency at the pos ["LPU64"]\n",
@@ -1171,429 +1135,12 @@ static void lfsck_layout_record_failure(const struct lu_env *env,
        }
 }
 
        }
 }
 
-static int lfsck_layout_master_async_interpret(const struct lu_env *env,
-                                              struct ptlrpc_request *req,
-                                              void *args, int rc)
-{
-       struct lfsck_async_interpret_args *laia = args;
-       struct lfsck_component            *com  = laia->laia_com;
-       struct lfsck_layout_master_data   *llmd = com->lc_data;
-       struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
-       struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
-       struct lfsck_request              *lr   = laia->laia_lr;
-
-       switch (lr->lr_event) {
-       case LE_START:
-               if (rc != 0) {
-                       struct lfsck_layout *lo = com->lc_file_ram;
-
-                       CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout "
-                              "start: rc = %d\n",
-                              lfsck_lfsck2name(com->lc_lfsck),
-                              (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
-                              ltd->ltd_index, rc);
-                       lo->ll_flags |= LF_INCOMPLETE;
-                       break;
-               }
-
-               spin_lock(&ltds->ltd_lock);
-               if (ltd->ltd_dead || ltd->ltd_layout_done) {
-                       spin_unlock(&ltds->ltd_lock);
-                       break;
-               }
-
-               if (lr->lr_flags & LEF_TO_OST) {
-                       if (list_empty(&ltd->ltd_layout_list))
-                               list_add_tail(&ltd->ltd_layout_list,
-                                             &llmd->llmd_ost_list);
-                       if (list_empty(&ltd->ltd_layout_phase_list))
-                               list_add_tail(&ltd->ltd_layout_phase_list,
-                                             &llmd->llmd_ost_phase1_list);
-               } else {
-                       if (list_empty(&ltd->ltd_layout_list))
-                               list_add_tail(&ltd->ltd_layout_list,
-                                             &llmd->llmd_mdt_list);
-                       if (list_empty(&ltd->ltd_layout_phase_list))
-                               list_add_tail(&ltd->ltd_layout_phase_list,
-                                             &llmd->llmd_mdt_phase1_list);
-               }
-               spin_unlock(&ltds->ltd_lock);
-               break;
-       case LE_STOP:
-       case LE_PHASE1_DONE:
-       case LE_PHASE2_DONE:
-       case LE_PEER_EXIT:
-               if (rc != 0 && rc != -EALREADY)
-                       CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout: "
-                              "event = %d, rc = %d\n",
-                              lfsck_lfsck2name(com->lc_lfsck),
-                              (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
-                              ltd->ltd_index, lr->lr_event, rc);
-               break;
-       case LE_QUERY: {
-               struct lfsck_reply *reply;
-
-               if (rc != 0) {
-                       spin_lock(&ltds->ltd_lock);
-                       list_del_init(&ltd->ltd_layout_phase_list);
-                       list_del_init(&ltd->ltd_layout_list);
-                       spin_unlock(&ltds->ltd_lock);
-                       break;
-               }
-
-               reply = req_capsule_server_get(&req->rq_pill,
-                                              &RMF_LFSCK_REPLY);
-               if (reply == NULL) {
-                       rc = -EPROTO;
-                       CDEBUG(D_LFSCK, "%s:  invalid query reply: rc = %d\n",
-                              lfsck_lfsck2name(com->lc_lfsck), rc);
-                       spin_lock(&ltds->ltd_lock);
-                       list_del_init(&ltd->ltd_layout_phase_list);
-                       list_del_init(&ltd->ltd_layout_list);
-                       spin_unlock(&ltds->ltd_lock);
-                       break;
-               }
-
-               switch (reply->lr_status) {
-               case LS_SCANNING_PHASE1:
-                       break;
-               case LS_SCANNING_PHASE2:
-                       spin_lock(&ltds->ltd_lock);
-                       list_del_init(&ltd->ltd_layout_phase_list);
-                       if (ltd->ltd_dead || ltd->ltd_layout_done) {
-                               spin_unlock(&ltds->ltd_lock);
-                               break;
-                       }
-
-                       if (lr->lr_flags & LEF_TO_OST)
-                               list_add_tail(&ltd->ltd_layout_phase_list,
-                                             &llmd->llmd_ost_phase2_list);
-                       else
-                               list_add_tail(&ltd->ltd_layout_phase_list,
-                                             &llmd->llmd_mdt_phase2_list);
-                       spin_unlock(&ltds->ltd_lock);
-                       break;
-               default:
-                       spin_lock(&ltds->ltd_lock);
-                       list_del_init(&ltd->ltd_layout_phase_list);
-                       list_del_init(&ltd->ltd_layout_list);
-                       spin_unlock(&ltds->ltd_lock);
-                       break;
-               }
-               break;
-       }
-       default:
-               CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
-                      lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
-               break;
-       }
-
-       if (!laia->laia_shared) {
-               lfsck_tgt_put(ltd);
-               lfsck_component_put(env, com);
-       }
-
-       return 0;
-}
-
-static int lfsck_layout_master_query_others(const struct lu_env *env,
-                                           struct lfsck_component *com)
-{
-       struct lfsck_thread_info          *info  = lfsck_env_info(env);
-       struct lfsck_request              *lr    = &info->lti_lr;
-       struct lfsck_async_interpret_args *laia  = &info->lti_laia;
-       struct lfsck_instance             *lfsck = com->lc_lfsck;
-       struct lfsck_layout_master_data   *llmd  = com->lc_data;
-       struct ptlrpc_request_set         *set;
-       struct lfsck_tgt_descs            *ltds;
-       struct lfsck_tgt_desc             *ltd;
-       struct list_head                  *head;
-       int                                rc    = 0;
-       int                                rc1   = 0;
-       ENTRY;
-
-       set = ptlrpc_prep_set();
-       if (set == NULL)
-               RETURN(-ENOMEM);
-
-       llmd->llmd_touch_gen++;
-       memset(lr, 0, sizeof(*lr));
-       lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
-       lr->lr_event = LE_QUERY;
-       lr->lr_active = LFSCK_TYPE_LAYOUT;
-       laia->laia_com = com;
-       laia->laia_lr = lr;
-       laia->laia_shared = 0;
-
-       if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
-               ltds = &lfsck->li_mdt_descs;
-               lr->lr_flags = 0;
-               head = &llmd->llmd_mdt_phase1_list;
-       } else {
-
-again:
-               ltds = &lfsck->li_ost_descs;
-               lr->lr_flags = LEF_TO_OST;
-               head = &llmd->llmd_ost_phase1_list;
-       }
-
-       laia->laia_ltds = ltds;
-       spin_lock(&ltds->ltd_lock);
-       while (!list_empty(head)) {
-               ltd = list_entry(head->next,
-                                struct lfsck_tgt_desc,
-                                ltd_layout_phase_list);
-               if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
-                       break;
-
-               ltd->ltd_layout_gen = llmd->llmd_touch_gen;
-               list_move_tail(&ltd->ltd_layout_phase_list, head);
-               atomic_inc(&ltd->ltd_ref);
-               laia->laia_ltd = ltd;
-               spin_unlock(&ltds->ltd_lock);
-               rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
-                                        lfsck_layout_master_async_interpret,
-                                        laia, LFSCK_QUERY);
-               if (rc != 0) {
-                       CDEBUG(D_LFSCK, "%s: layout LFSCK fail to query %s %x: "
-                              "rc = %d\n", lfsck_lfsck2name(lfsck),
-                              (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
-                              ltd->ltd_index, rc);
-                       lfsck_tgt_put(ltd);
-                       rc1 = rc;
-               }
-               spin_lock(&ltds->ltd_lock);
-       }
-       spin_unlock(&ltds->ltd_lock);
-
-       rc = ptlrpc_set_wait(set);
-       if (rc < 0) {
-               ptlrpc_set_destroy(set);
-               RETURN(rc);
-       }
-
-       if (!(lr->lr_flags & LEF_TO_OST) &&
-           list_empty(&llmd->llmd_mdt_phase1_list))
-               goto again;
-
-       ptlrpc_set_destroy(set);
-
-       RETURN(rc1 != 0 ? rc1 : rc);
-}
-
-static inline bool
-lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
-{
-       return list_empty(&llmd->llmd_mdt_phase1_list) &&
-              (!list_empty(&llmd->llmd_ost_phase2_list) ||
-               list_empty(&llmd->llmd_ost_phase1_list));
-}
-
-static int lfsck_layout_master_notify_others(const struct lu_env *env,
-                                            struct lfsck_component *com,
-                                            struct lfsck_request *lr)
-{
-       struct lfsck_thread_info          *info  = lfsck_env_info(env);
-       struct lfsck_async_interpret_args *laia  = &info->lti_laia;
-       struct lfsck_instance             *lfsck = com->lc_lfsck;
-       struct lfsck_layout_master_data   *llmd  = com->lc_data;
-       struct lfsck_layout               *lo    = com->lc_file_ram;
-       struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
-       struct ptlrpc_request_set         *set;
-       struct lfsck_tgt_descs            *ltds;
-       struct lfsck_tgt_desc             *ltd;
-       struct lfsck_tgt_desc             *next;
-       struct list_head                  *head;
-       __u32                              idx;
-       int                                rc    = 0;
-       ENTRY;
-
-       set = ptlrpc_prep_set();
-       if (set == NULL)
-               RETURN(-ENOMEM);
-
-       lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
-       lr->lr_active = LFSCK_TYPE_LAYOUT;
-       laia->laia_com = com;
-       laia->laia_lr = lr;
-       laia->laia_shared = 0;
-       switch (lr->lr_event) {
-       case LE_START:
-               /* Notify OSTs firstly, then handle other MDTs if needed. */
-               ltds = &lfsck->li_ost_descs;
-               laia->laia_ltds = ltds;
-               down_read(&ltds->ltd_rw_sem);
-               cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
-                       ltd = lfsck_tgt_get(ltds, idx);
-                       LASSERT(ltd != NULL);
-
-                       laia->laia_ltd = ltd;
-                       ltd->ltd_layout_done = 0;
-                       rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
-                                       lfsck_layout_master_async_interpret,
-                                       laia, LFSCK_NOTIFY);
-                       if (rc != 0) {
-                               CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
-                                      "notify %s %x for start: rc = %d\n",
-                                      lfsck_lfsck2name(lfsck),
-                                      (lr->lr_flags & LEF_TO_OST) ? "OST" :
-                                      "MDT", idx, rc);
-                               lfsck_tgt_put(ltd);
-                               lo->ll_flags |= LF_INCOMPLETE;
-                       }
-               }
-               up_read(&ltds->ltd_rw_sem);
-
-               /* Sync up */
-               rc = ptlrpc_set_wait(set);
-               if (rc < 0) {
-                       ptlrpc_set_destroy(set);
-                       RETURN(rc);
-               }
-
-               if (!(bk->lb_param & LPF_ALL_TGT))
-                       break;
-
-               /* link other MDT targets locallly. */
-               ltds = &lfsck->li_mdt_descs;
-               spin_lock(&ltds->ltd_lock);
-               cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
-                       ltd = LTD_TGT(ltds, idx);
-                       LASSERT(ltd != NULL);
-
-                       if (!list_empty(&ltd->ltd_layout_list))
-                               continue;
-
-                       list_add_tail(&ltd->ltd_layout_list,
-                                     &llmd->llmd_mdt_list);
-                       list_add_tail(&ltd->ltd_layout_phase_list,
-                                     &llmd->llmd_mdt_phase1_list);
-               }
-               spin_unlock(&ltds->ltd_lock);
-               break;
-       case LE_STOP:
-       case LE_PHASE2_DONE:
-       case LE_PEER_EXIT: {
-               /* Handle other MDTs firstly if needed, then notify the OSTs. */
-               if (bk->lb_param & LPF_ALL_TGT) {
-                       head = &llmd->llmd_mdt_list;
-                       ltds = &lfsck->li_mdt_descs;
-                       if (lr->lr_event == LE_STOP) {
-                               /* unlink other MDT targets locallly. */
-                               spin_lock(&ltds->ltd_lock);
-                               list_for_each_entry_safe(ltd, next, head,
-                                                        ltd_layout_list) {
-                                       list_del_init(&ltd->ltd_layout_phase_list);
-                                       list_del_init(&ltd->ltd_layout_list);
-                               }
-                               spin_unlock(&ltds->ltd_lock);
-
-                               lr->lr_flags |= LEF_TO_OST;
-                               head = &llmd->llmd_ost_list;
-                               ltds = &lfsck->li_ost_descs;
-                       } else {
-                               lr->lr_flags &= ~LEF_TO_OST;
-                       }
-               } else {
-                       lr->lr_flags |= LEF_TO_OST;
-                       head = &llmd->llmd_ost_list;
-                       ltds = &lfsck->li_ost_descs;
-               }
-
-again:
-               laia->laia_ltds = ltds;
-               spin_lock(&ltds->ltd_lock);
-               while (!list_empty(head)) {
-                       ltd = list_entry(head->next, struct lfsck_tgt_desc,
-                                        ltd_layout_list);
-                       if (!list_empty(&ltd->ltd_layout_phase_list))
-                               list_del_init(&ltd->ltd_layout_phase_list);
-                       list_del_init(&ltd->ltd_layout_list);
-                       atomic_inc(&ltd->ltd_ref);
-                       laia->laia_ltd = ltd;
-                       spin_unlock(&ltds->ltd_lock);
-                       rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
-                                       lfsck_layout_master_async_interpret,
-                                       laia, LFSCK_NOTIFY);
-                       if (rc != 0) {
-                               CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
-                                      "notify %s %x for stop/phase2_done/"
-                                      "peer_exit: rc = %d\n",
-                                      lfsck_lfsck2name(lfsck),
-                                      (lr->lr_flags & LEF_TO_OST) ? "OST" :
-                                      "MDT", ltd->ltd_index, rc);
-                               lfsck_tgt_put(ltd);
-                       }
-                       spin_lock(&ltds->ltd_lock);
-               }
-               spin_unlock(&ltds->ltd_lock);
-
-               rc = ptlrpc_set_wait(set);
-               if (rc < 0) {
-                       ptlrpc_set_destroy(set);
-                       RETURN(rc);
-               }
-
-               if (!(lr->lr_flags & LEF_TO_OST)) {
-                       lr->lr_flags |= LEF_TO_OST;
-                       head = &llmd->llmd_ost_list;
-                       ltds = &lfsck->li_ost_descs;
-                       goto again;
-               }
-               break;
-       }
-       case LE_PHASE1_DONE:
-               llmd->llmd_touch_gen++;
-               ltds = &lfsck->li_mdt_descs;
-               laia->laia_ltds = ltds;
-               spin_lock(&ltds->ltd_lock);
-               while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
-                       ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
-                                        struct lfsck_tgt_desc,
-                                        ltd_layout_phase_list);
-                       if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
-                               break;
-
-                       ltd->ltd_layout_gen = llmd->llmd_touch_gen;
-                       list_move_tail(&ltd->ltd_layout_phase_list,
-                                      &llmd->llmd_mdt_phase1_list);
-                       atomic_inc(&ltd->ltd_ref);
-                       laia->laia_ltd = ltd;
-                       spin_unlock(&ltds->ltd_lock);
-                       rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
-                                       lfsck_layout_master_async_interpret,
-                                       laia, LFSCK_NOTIFY);
-                       if (rc != 0) {
-                               CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
-                                      "notify MDT %x for phase1_done: "
-                                      "rc = %d\n", lfsck_lfsck2name(lfsck),
-                                      ltd->ltd_index, rc);
-                               lfsck_tgt_put(ltd);
-                       }
-                       spin_lock(&ltds->ltd_lock);
-               }
-               spin_unlock(&ltds->ltd_lock);
-               break;
-       default:
-               CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
-                      lfsck_lfsck2name(lfsck), lr->lr_event);
-               rc = -EINVAL;
-               break;
-       }
-
-       rc = ptlrpc_set_wait(set);
-       ptlrpc_set_destroy(set);
-
-       RETURN(rc);
-}
-
 static int lfsck_layout_double_scan_result(const struct lu_env *env,
                                           struct lfsck_component *com,
                                           int rc)
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
        struct lfsck_layout     *lo    = com->lc_file_ram;
 static int lfsck_layout_double_scan_result(const struct lu_env *env,
                                           struct lfsck_component *com,
                                           int rc)
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
        struct lfsck_layout     *lo    = com->lc_file_ram;
-       struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
 
        down_write(&com->lc_sem);
        lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
 
        down_write(&com->lc_sem);
        lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
@@ -1607,7 +1154,7 @@ static int lfsck_layout_double_scan_result(const struct lu_env *env,
                        lo->ll_status = LS_PARTIAL;
                else
                        lo->ll_status = LS_COMPLETED;
                        lo->ll_status = LS_PARTIAL;
                else
                        lo->ll_status = LS_COMPLETED;
-               if (!(bk->lb_param & LPF_DRYRUN))
+               if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
                        lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
                lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
                lo->ll_success_count++;
                        lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
                lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
                lo->ll_success_count++;
@@ -1911,7 +1458,7 @@ out:
  *
  *  type "R":          The orphan OST-object knows its parent MDT-object FID,
  *                     but does not know the position (the file name) in the
  *
  *  type "R":          The orphan OST-object knows its parent MDT-object FID,
  *                     but does not know the position (the file name) in the
- *                     namespace.
+ *                     layout.
  *
  * The orphan name will be like:
  * ${FID}-${infix}-${type}-${conflict_version}
  *
  * The orphan name will be like:
  * ${FID}-${infix}-${type}-${conflict_version}
@@ -3375,10 +2922,12 @@ out:
        return rc;
 }
 
        return rc;
 }
 
-static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
+static int lfsck_layout_assistant_handler_p1(const struct lu_env *env,
                                             struct lfsck_component *com,
                                             struct lfsck_component *com,
-                                            struct lfsck_layout_req *llr)
+                                            struct lfsck_assistant_req *lar)
 {
 {
+       struct lfsck_layout_req              *llr    =
+                       container_of0(lar, struct lfsck_layout_req, llr_lar);
        struct lfsck_layout                  *lo     = com->lc_file_ram;
        struct lfsck_thread_info             *info   = lfsck_env_info(env);
        struct filter_fid_old                *pea    = &info->lti_old_pfid;
        struct lfsck_layout                  *lo     = com->lc_file_ram;
        struct lfsck_thread_info             *info   = lfsck_env_info(env);
        struct filter_fid_old                *pea    = &info->lti_old_pfid;
@@ -3485,9 +3034,9 @@ repair:
 out:
        down_write(&com->lc_sem);
        if (rc < 0) {
 out:
        down_write(&com->lc_sem);
        if (rc < 0) {
-               struct lfsck_layout_master_data *llmd = com->lc_data;
+               struct lfsck_assistant_data *lad = com->lc_data;
 
 
-               if (unlikely(llmd->llmd_exit)) {
+               if (unlikely(lad->lad_exit)) {
                        rc = 0;
                } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
                           rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
                        rc = 0;
                } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
                           rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
@@ -3519,302 +3068,43 @@ out:
        return rc;
 }
 
        return rc;
 }
 
-static int lfsck_layout_assistant(void *args)
+static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
+                                            struct lfsck_component *com)
 {
 {
-       struct lfsck_thread_args        *lta     = args;
-       struct lu_env                   *env     = &lta->lta_env;
-       struct lfsck_component          *com     = lta->lta_com;
-       struct lfsck_instance           *lfsck   = lta->lta_lfsck;
-       struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
-       struct lfsck_position           *pos     = &com->lc_pos_start;
-       struct lfsck_thread_info        *info    = lfsck_env_info(env);
-       struct lfsck_request            *lr      = &info->lti_lr;
-       struct lfsck_layout_master_data *llmd    = com->lc_data;
-       struct ptlrpc_thread            *mthread = &lfsck->li_thread;
-       struct ptlrpc_thread            *athread = &llmd->llmd_thread;
-       struct lfsck_layout_req         *llr;
-       struct l_wait_info               lwi     = { 0 };
-       int                              rc      = 0;
-       int                              rc1     = 0;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
+       struct lfsck_tgt_descs          *ltds   = &lfsck->li_ost_descs;
+       struct lfsck_tgt_desc           *ltd;
+       int                              rc     = 0;
        ENTRY;
 
        ENTRY;
 
-       memset(lr, 0, sizeof(*lr));
-       lr->lr_event = LE_START;
-       lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
-                      LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
-       lr->lr_speed = bk->lb_speed_limit;
-       lr->lr_version = bk->lb_version;
-       lr->lr_param = bk->lb_param;
-       lr->lr_async_windows = bk->lb_async_windows;
-       lr->lr_flags = LEF_TO_OST;
-       if (pos->lp_oit_cookie <= 1)
-               lr->lr_param |= LPF_RESET;
-
-       rc = lfsck_layout_master_notify_others(env, com, lr);
-       if (rc != 0) {
-               CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to notify "
-                      "others for LFSCK start: rc = %d\n",
-                      lfsck_lfsck2name(lfsck), rc);
-               GOTO(fini, rc);
-       }
-
-       spin_lock(&llmd->llmd_lock);
-       thread_set_flags(athread, SVC_RUNNING);
-       spin_unlock(&llmd->llmd_lock);
-       wake_up_all(&mthread->t_ctl_waitq);
-
-       while (1) {
-               while (!list_empty(&llmd->llmd_req_list)) {
-                       bool wakeup = false;
-
-                       if (unlikely(llmd->llmd_exit ||
-                                    !thread_is_running(mthread)))
-                               GOTO(cleanup1, rc = llmd->llmd_post_result);
-
-                       llr = list_entry(llmd->llmd_req_list.next,
-                                        struct lfsck_layout_req,
-                                        llr_list);
-                       /* Only the lfsck_layout_assistant thread itself can
-                        * remove the "llr" from the head of the list, LFSCK
-                        * engine thread only inserts other new "lld" at the
-                        * end of the list. So it is safe to handle current
-                        * "llr" without the spin_lock. */
-                       rc = lfsck_layout_assistant_handle_one(env, com, llr);
-                       spin_lock(&llmd->llmd_lock);
-                       list_del_init(&llr->llr_list);
-                       llmd->llmd_prefetched--;
-                       /* Wake up the main engine thread only when the list
-                        * is empty or half of the prefetched items have been
-                        * handled to avoid too frequent thread schedule. */
-                       if (llmd->llmd_prefetched == 0 ||
-                           (bk->lb_async_windows != 0 &&
-                            bk->lb_async_windows / 2 ==
-                            llmd->llmd_prefetched))
-                               wakeup = true;
-                       spin_unlock(&llmd->llmd_lock);
-                       if (wakeup)
-                               wake_up_all(&mthread->t_ctl_waitq);
-
-                       lfsck_layout_req_fini(env, llr);
-                       if (rc < 0 && bk->lb_param & LPF_FAILOUT)
-                               GOTO(cleanup1, rc);
-               }
-
-               l_wait_event(athread->t_ctl_waitq,
-                            !lfsck_layout_req_empty(llmd) ||
-                            llmd->llmd_exit ||
-                            llmd->llmd_to_post ||
-                            llmd->llmd_to_double_scan,
-                            &lwi);
-
-               if (unlikely(llmd->llmd_exit))
-                       GOTO(cleanup1, rc = llmd->llmd_post_result);
-
-               if (!list_empty(&llmd->llmd_req_list))
-                       continue;
-
-               if (llmd->llmd_to_post) {
-                       llmd->llmd_to_post = 0;
-                       LASSERT(llmd->llmd_post_result > 0);
-
-                       memset(lr, 0, sizeof(*lr));
-                       lr->lr_event = LE_PHASE1_DONE;
-                       lr->lr_status = llmd->llmd_post_result;
-                       rc = lfsck_layout_master_notify_others(env, com, lr);
-                       if (rc != 0)
-                               CDEBUG(D_LFSCK, "%s: layout LFSCK assistant "
-                                      "failed to notify others for LFSCK "
-                                      "post: rc = %d\n",
-                                      lfsck_lfsck2name(lfsck), rc);
-
-                       /* Wakeup the master engine to go ahead. */
-                       wake_up_all(&mthread->t_ctl_waitq);
-               }
-
-               if (llmd->llmd_to_double_scan) {
-                       llmd->llmd_to_double_scan = 0;
-                       atomic_inc(&lfsck->li_double_scan_count);
-                       llmd->llmd_in_double_scan = 1;
-                       wake_up_all(&mthread->t_ctl_waitq);
-
-                       CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 "
-                              "scan start\n", lfsck_lfsck2name(lfsck));
-
-                       com->lc_new_checked = 0;
-                       com->lc_new_scanned = 0;
-                       com->lc_time_last_checkpoint = cfs_time_current();
-                       com->lc_time_next_checkpoint =
-                               com->lc_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
-
-                       /* flush all async updating before handling orphan. */
-                       dt_sync(env, lfsck->li_next);
-
-                       while (llmd->llmd_in_double_scan) {
-                               struct lfsck_tgt_descs  *ltds =
-                                                       &lfsck->li_ost_descs;
-                               struct lfsck_tgt_desc   *ltd;
-
-                               rc = lfsck_layout_master_query_others(env, com);
-                               if (lfsck_layout_master_to_orphan(llmd))
-                                       goto orphan;
-
-                               if (rc < 0)
-                                       GOTO(cleanup2, rc);
-
-                               /* Pull LFSCK status on related targets once
-                                * per 30 seconds if we are not notified. */
-                               lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
-                                                          cfs_time_seconds(1),
-                                                          NULL, NULL);
-                               rc = l_wait_event(athread->t_ctl_waitq,
-                                       lfsck_layout_master_to_orphan(llmd) ||
-                                       llmd->llmd_exit ||
-                                       !thread_is_running(mthread),
-                                       &lwi);
-
-                               if (unlikely(llmd->llmd_exit ||
-                                            !thread_is_running(mthread)))
-                                       GOTO(cleanup2, rc = 0);
-
-                               if (rc == -ETIMEDOUT)
-                                       continue;
-
-                               if (rc < 0)
-                                       GOTO(cleanup2, rc);
-
-orphan:
-                               spin_lock(&ltds->ltd_lock);
-                               while (!list_empty(
-                                               &llmd->llmd_ost_phase2_list)) {
-                                       ltd = list_entry(
-                                             llmd->llmd_ost_phase2_list.next,
-                                             struct lfsck_tgt_desc,
-                                             ltd_layout_phase_list);
-                                       list_del_init(
-                                               &ltd->ltd_layout_phase_list);
-                                       spin_unlock(&ltds->ltd_lock);
-
-                                       if (bk->lb_param & LPF_ALL_TGT) {
-                                               rc = lfsck_layout_scan_orphan(
-                                                               env, com, ltd);
-                                               if (rc != 0 &&
-                                                   bk->lb_param & LPF_FAILOUT)
-                                                       GOTO(cleanup2, rc);
-                                       }
-
-                                       if (unlikely(llmd->llmd_exit ||
-                                               !thread_is_running(mthread)))
-                                               GOTO(cleanup2, rc = 0);
-
-                                       spin_lock(&ltds->ltd_lock);
-                               }
-
-                               if (list_empty(&llmd->llmd_ost_phase1_list)) {
-                                       spin_unlock(&ltds->ltd_lock);
-                                       GOTO(cleanup2, rc = 1);
-                               }
-                               spin_unlock(&ltds->ltd_lock);
-                       }
-               }
-       }
-
-cleanup1:
-       /* Cleanup the unfinished requests. */
-       spin_lock(&llmd->llmd_lock);
-       if (rc < 0)
-               llmd->llmd_assistant_status = rc;
-
-       while (!list_empty(&llmd->llmd_req_list)) {
-               llr = list_entry(llmd->llmd_req_list.next,
-                                struct lfsck_layout_req,
-                                llr_list);
-               list_del_init(&llr->llr_list);
-               llmd->llmd_prefetched--;
-               spin_unlock(&llmd->llmd_lock);
-               lfsck_layout_req_fini(env, llr);
-               spin_lock(&llmd->llmd_lock);
-       }
-       spin_unlock(&llmd->llmd_lock);
-
-       LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
-                llmd->llmd_prefetched);
+       spin_lock(&ltds->ltd_lock);
+       while (!list_empty(&lad->lad_ost_phase2_list)) {
+               ltd = list_entry(lad->lad_ost_phase2_list.next,
+                                struct lfsck_tgt_desc,
+                                ltd_layout_phase_list);
+               list_del_init(&ltd->ltd_layout_phase_list);
+               if (bk->lb_param & LPF_ALL_TGT) {
+                       spin_unlock(&ltds->ltd_lock);
+                       rc = lfsck_layout_scan_orphan(env, com, ltd);
+                       if (rc != 0 && bk->lb_param & LPF_FAILOUT)
+                               RETURN(rc);
 
 
-cleanup2:
-       memset(lr, 0, sizeof(*lr));
-       if (rc > 0) {
-               lr->lr_event = LE_PHASE2_DONE;
-               lr->lr_status = rc;
-       } else if (rc == 0) {
-               if (lfsck->li_flags & LPF_ALL_TGT) {
-                       lr->lr_event = LE_STOP;
-                       lr->lr_status = LS_STOPPED;
-               } else {
-                       lr->lr_event = LE_PEER_EXIT;
-                       switch (lfsck->li_status) {
-                       case LS_PAUSED:
-                       case LS_CO_PAUSED:
-                               lr->lr_status = LS_CO_PAUSED;
-                               break;
-                       case LS_STOPPED:
-                       case LS_CO_STOPPED:
-                               lr->lr_status = LS_CO_STOPPED;
-                               break;
-                       default:
-                               CDEBUG(D_LFSCK, "%s: unknown status: rc = %d\n",
-                                      lfsck_lfsck2name(lfsck),
-                                      lfsck->li_status);
-                               lr->lr_status = LS_CO_FAILED;
-                               break;
-                       }
-               }
-       } else {
-               if (lfsck->li_flags & LPF_ALL_TGT) {
-                       lr->lr_event = LE_STOP;
-                       lr->lr_status = LS_FAILED;
-               } else {
-                       lr->lr_event = LE_PEER_EXIT;
-                       lr->lr_status = LS_CO_FAILED;
+                       if (unlikely(lad->lad_exit ||
+                                    !thread_is_running(&lfsck->li_thread)))
+                               RETURN(0);
+                       spin_lock(&ltds->ltd_lock);
                }
        }
 
                }
        }
 
-       rc1 = lfsck_layout_master_notify_others(env, com, lr);
-       if (rc1 != 0) {
-               CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to "
-                      "notify others for LFSCK quit: rc = %d\n",
-                      lfsck_lfsck2name(lfsck), rc1);
-               rc = rc1;
-       }
-
-       /* flush all async updating before exit. */
-       dt_sync(env, lfsck->li_next);
-
-       /* Under force exit case, some requests may be just freed without
-        * verification, those objects should be re-handled when next run.
-        * So not update the on-disk tracing file under such case. */
-       if (llmd->llmd_in_double_scan) {
-               struct lfsck_layout *lo = com->lc_file_ram;
-
-               if (!llmd->llmd_exit)
-                       rc1 = lfsck_layout_double_scan_result(env, com, rc);
-
-               CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 scan "
-                      "finished, status %d: rc = %d\n",
-                      lfsck_lfsck2name(lfsck), lo->ll_status, rc1);
-       }
-
-fini:
-       if (llmd->llmd_in_double_scan)
-               atomic_dec(&lfsck->li_double_scan_count);
-
-       spin_lock(&llmd->llmd_lock);
-       llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
-       thread_set_flags(athread, SVC_STOPPED);
-       wake_up_all(&mthread->t_ctl_waitq);
-       spin_unlock(&llmd->llmd_lock);
-       lfsck_thread_args_fini(lta);
+       if (list_empty(&lad->lad_ost_phase1_list))
+               rc = 1;
+       else
+               rc = 0;
+       spin_unlock(&ltds->ltd_lock);
 
 
-       return rc;
+       RETURN(rc);
 }
 
 static int
 }
 
 static int
@@ -4326,32 +3616,23 @@ static void lfsck_layout_fail(const struct lu_env *env,
 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
                                          struct lfsck_component *com, bool init)
 {
 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
                                          struct lfsck_component *com, bool init)
 {
-       struct lfsck_instance           *lfsck   = com->lc_lfsck;
-       struct lfsck_layout             *lo      = com->lc_file_ram;
-       struct lfsck_layout_master_data *llmd    = com->lc_data;
-       struct ptlrpc_thread            *mthread = &lfsck->li_thread;
-       struct ptlrpc_thread            *athread = &llmd->llmd_thread;
-       struct l_wait_info               lwi     = { 0 };
-       int                              rc;
-
-       if (com->lc_new_checked == 0 && !init)
-               return 0;
-
-       l_wait_event(mthread->t_ctl_waitq,
-                    list_empty(&llmd->llmd_req_list) ||
-                    !thread_is_running(mthread) ||
-                    thread_is_stopped(athread),
-                    &lwi);
+       struct lfsck_instance   *lfsck   = com->lc_lfsck;
+       struct lfsck_layout     *lo      = com->lc_file_ram;
+       int                      rc;
 
 
-       if (!thread_is_running(mthread) || thread_is_stopped(athread))
-               return 0;
+       if (!init) {
+               rc = lfsck_checkpoint_generic(env, com);
+               if (rc != 0)
+                       return rc > 0 ? 0 : rc;
+       }
 
        down_write(&com->lc_sem);
        if (init) {
 
        down_write(&com->lc_sem);
        if (init) {
-               lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
+               lo->ll_pos_latest_start =
+                               lfsck->li_pos_checkpoint.lp_oit_cookie;
        } else {
                lo->ll_pos_last_checkpoint =
        } else {
                lo->ll_pos_last_checkpoint =
-                                       lfsck->li_pos_current.lp_oit_cookie;
+                               lfsck->li_pos_checkpoint.lp_oit_cookie;
                lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
                lo->ll_time_last_checkpoint = cfs_time_current_sec();
                lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
                lo->ll_time_last_checkpoint = cfs_time_current_sec();
@@ -4381,10 +3662,11 @@ static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
 
        down_write(&com->lc_sem);
        if (init) {
 
        down_write(&com->lc_sem);
        if (init) {
-               lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
+               lo->ll_pos_latest_start =
+                               lfsck->li_pos_checkpoint.lp_oit_cookie;
        } else {
                lo->ll_pos_last_checkpoint =
        } else {
                lo->ll_pos_last_checkpoint =
-                                       lfsck->li_pos_current.lp_oit_cookie;
+                               lfsck->li_pos_checkpoint.lp_oit_cookie;
                lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
                lo->ll_time_last_checkpoint = cfs_time_current_sec();
                lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
                lo->ll_time_last_checkpoint = cfs_time_current_sec();
@@ -4515,52 +3797,17 @@ static int lfsck_layout_master_prep(const struct lu_env *env,
                                    struct lfsck_component *com,
                                    struct lfsck_start_param *lsp)
 {
                                    struct lfsck_component *com,
                                    struct lfsck_start_param *lsp)
 {
-       struct lfsck_instance           *lfsck   = com->lc_lfsck;
-       struct lfsck_layout_master_data *llmd    = com->lc_data;
-       struct ptlrpc_thread            *mthread = &lfsck->li_thread;
-       struct ptlrpc_thread            *athread = &llmd->llmd_thread;
-       struct lfsck_thread_args        *lta;
-       struct task_struct              *task;
-       int                              rc;
+       int rc;
        ENTRY;
 
        rc = lfsck_layout_prep(env, com, lsp->lsp_start);
        if (rc != 0)
                RETURN(rc);
 
        ENTRY;
 
        rc = lfsck_layout_prep(env, com, lsp->lsp_start);
        if (rc != 0)
                RETURN(rc);
 
-       llmd->llmd_assistant_status = 0;
-       llmd->llmd_post_result = 0;
-       llmd->llmd_to_post = 0;
-       llmd->llmd_to_double_scan = 0;
-       llmd->llmd_in_double_scan = 0;
-       llmd->llmd_exit = 0;
-       thread_set_flags(athread, 0);
-
-       lta = lfsck_thread_args_init(lfsck, com, lsp);
-       if (IS_ERR(lta))
-               RETURN(PTR_ERR(lta));
-
-       task = kthread_run(lfsck_layout_assistant, lta, "lfsck_layout");
-       if (IS_ERR(task)) {
-               rc = PTR_ERR(task);
-               CERROR("%s: cannot start LFSCK layout assistant thread: "
-                      "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
-               lfsck_thread_args_fini(lta);
-       } else {
-               struct l_wait_info lwi = { 0 };
-
-               l_wait_event(mthread->t_ctl_waitq,
-                            thread_is_running(athread) ||
-                            thread_is_stopped(athread),
-                            &lwi);
-               if (unlikely(!thread_is_running(athread)))
-                       rc = llmd->llmd_assistant_status;
-               else
-                       rc = 0;
-       }
+       rc = lfsck_start_assistant(env, com, lsp);
 
        CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
 
        CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
-              LPU64"\n", lfsck_lfsck2name(lfsck),
+              LPU64"\n", lfsck_lfsck2name(com->lc_lfsck),
               com->lc_pos_start.lp_oit_cookie);
 
        RETURN(rc);
               com->lc_pos_start.lp_oit_cookie);
 
        RETURN(rc);
@@ -4576,13 +3823,13 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
        struct lfsck_instance           *lfsck   = com->lc_lfsck;
        struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
        struct lfsck_layout             *lo      = com->lc_file_ram;
        struct lfsck_instance           *lfsck   = com->lc_lfsck;
        struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
        struct lfsck_layout             *lo      = com->lc_file_ram;
-       struct lfsck_layout_master_data *llmd    = com->lc_data;
+       struct lfsck_assistant_data     *lad     = com->lc_data;
        struct lfsck_layout_object      *llo     = NULL;
        struct lov_ost_data_v1          *objs;
        struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
        struct ptlrpc_thread            *mthread = &lfsck->li_thread;
        struct lfsck_layout_object      *llo     = NULL;
        struct lov_ost_data_v1          *objs;
        struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
        struct ptlrpc_thread            *mthread = &lfsck->li_thread;
-       struct ptlrpc_thread            *athread = &llmd->llmd_thread;
-               struct l_wait_info       lwi     = { 0 };
+       struct ptlrpc_thread            *athread = &lad->lad_thread;
+       struct l_wait_info               lwi     = { 0 };
        struct lu_buf                    buf;
        int                              rc      = 0;
        int                              i;
        struct lu_buf                    buf;
        int                              rc      = 0;
        int                              i;
@@ -4621,7 +3868,7 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
 
                l_wait_event(mthread->t_ctl_waitq,
                             bk->lb_async_windows == 0 ||
 
                l_wait_event(mthread->t_ctl_waitq,
                             bk->lb_async_windows == 0 ||
-                            llmd->llmd_prefetched < bk->lb_async_windows ||
+                            lad->lad_prefetched < bk->lb_async_windows ||
                             !thread_is_running(mthread) ||
                             thread_is_stopped(athread),
                             &lwi);
                             !thread_is_running(mthread) ||
                             thread_is_stopped(athread),
                             &lwi);
@@ -4704,34 +3951,35 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
                        goto next;
 
                if (llo == NULL) {
                        goto next;
 
                if (llo == NULL) {
-                       llo = lfsck_layout_object_init(env, parent, gen);
+                       llo = lfsck_layout_object_init(env, parent,
+                               lfsck->li_pos_current.lp_oit_cookie, gen);
                        if (IS_ERR(llo)) {
                                rc = PTR_ERR(llo);
                                goto next;
                        }
                }
 
                        if (IS_ERR(llo)) {
                                rc = PTR_ERR(llo);
                                goto next;
                        }
                }
 
-               llr = lfsck_layout_req_init(llo, cobj, index, i);
+               llr = lfsck_layout_assistant_req_init(llo, cobj, index, i);
                if (IS_ERR(llr)) {
                        rc = PTR_ERR(llr);
                        goto next;
                }
 
                cobj = NULL;
                if (IS_ERR(llr)) {
                        rc = PTR_ERR(llr);
                        goto next;
                }
 
                cobj = NULL;
-               spin_lock(&llmd->llmd_lock);
-               if (llmd->llmd_assistant_status < 0) {
-                       spin_unlock(&llmd->llmd_lock);
-                       lfsck_layout_req_fini(env, llr);
+               spin_lock(&lad->lad_lock);
+               if (lad->lad_assistant_status < 0) {
+                       spin_unlock(&lad->lad_lock);
+                       lfsck_layout_assistant_req_fini(env, &llr->llr_lar);
                        lfsck_tgt_put(tgt);
                        lfsck_tgt_put(tgt);
-                       RETURN(llmd->llmd_assistant_status);
+                       RETURN(lad->lad_assistant_status);
                }
 
                }
 
-               list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
-               if (llmd->llmd_prefetched == 0)
+               list_add_tail(&llr->llr_lar.lar_list, &lad->lad_req_list);
+               if (lad->lad_prefetched == 0)
                        wakeup = true;
 
                        wakeup = true;
 
-               llmd->llmd_prefetched++;
-               spin_unlock(&llmd->llmd_lock);
+               lad->lad_prefetched++;
+               spin_unlock(&lad->lad_lock);
                if (wakeup)
                        wake_up_all(&athread->t_ctl_waitq);
 
                if (wakeup)
                        wake_up_all(&athread->t_ctl_waitq);
 
@@ -4763,7 +4011,7 @@ out:
 
 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
  * the OST-object's attribute and generate an structure lfsck_layout_req on the
 
 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
  * the OST-object's attribute and generate an structure lfsck_layout_req on the
- * list ::llmd_req_list.
+ * list ::lad_req_list.
  *
  * For each request on above list, the lfsck_layout_assistant thread compares
  * the OST side attribute with local attribute, if inconsistent, then repair it.
  *
  * For each request on above list, the lfsck_layout_assistant thread compares
  * the OST side attribute with local attribute, if inconsistent, then repair it.
@@ -4776,7 +4024,7 @@ static int lfsck_layout_master_exec_oit(const struct lu_env *env,
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
        struct ost_id                   *oi     = &info->lti_oi;
        struct lfsck_layout             *lo     = com->lc_file_ram;
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
        struct ost_id                   *oi     = &info->lti_oi;
        struct lfsck_layout             *lo     = com->lc_file_ram;
-       struct lfsck_layout_master_data *llmd   = com->lc_data;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
        struct thandle                  *handle = NULL;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
        struct thandle                  *handle = NULL;
@@ -4795,7 +4043,7 @@ static int lfsck_layout_master_exec_oit(const struct lu_env *env,
        if (!S_ISREG(lfsck_object_type(obj)))
                GOTO(out, rc = 0);
 
        if (!S_ISREG(lfsck_object_type(obj)))
                GOTO(out, rc = 0);
 
-       if (llmd->llmd_assistant_status < 0)
+       if (lad->lad_assistant_status < 0)
                GOTO(out, rc = -ESRCH);
 
        fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
                GOTO(out, rc = -ESRCH);
 
        fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
@@ -5022,8 +4270,7 @@ unlock:
 
 static int lfsck_layout_exec_dir(const struct lu_env *env,
                                 struct lfsck_component *com,
 
 static int lfsck_layout_exec_dir(const struct lu_env *env,
                                 struct lfsck_component *com,
-                                struct dt_object *obj,
-                                struct lu_dirent *ent)
+                                struct lu_dirent *ent, __u16 type)
 {
        return 0;
 }
 {
        return 0;
 }
@@ -5032,38 +4279,18 @@ static int lfsck_layout_master_post(const struct lu_env *env,
                                    struct lfsck_component *com,
                                    int result, bool init)
 {
                                    struct lfsck_component *com,
                                    int result, bool init)
 {
-       struct lfsck_instance           *lfsck   = com->lc_lfsck;
-       struct lfsck_layout             *lo      = com->lc_file_ram;
-       struct lfsck_layout_master_data *llmd    = com->lc_data;
-       struct ptlrpc_thread            *mthread = &lfsck->li_thread;
-       struct ptlrpc_thread            *athread = &llmd->llmd_thread;
-       struct l_wait_info               lwi     = { 0 };
-       int                              rc;
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lfsck_layout     *lo     = com->lc_file_ram;
+       int                      rc;
        ENTRY;
 
        ENTRY;
 
-
-       llmd->llmd_post_result = result;
-       llmd->llmd_to_post = 1;
-       if (llmd->llmd_post_result <= 0)
-               llmd->llmd_exit = 1;
-
-       wake_up_all(&athread->t_ctl_waitq);
-       l_wait_event(mthread->t_ctl_waitq,
-                    (result > 0 && list_empty(&llmd->llmd_req_list)) ||
-                    thread_is_stopped(athread),
-                    &lwi);
-
-       if (llmd->llmd_assistant_status < 0)
-               result = llmd->llmd_assistant_status;
+       lfsck_post_generic(env, com, &result);
 
        down_write(&com->lc_sem);
        spin_lock(&lfsck->li_lock);
 
        down_write(&com->lc_sem);
        spin_lock(&lfsck->li_lock);
-       /* When LFSCK failed, there may be some prefetched objects those are
-        * not been processed yet, we do not know the exactly position, then
-        * just restart from last check-point next time. */
-       if (!init && !llmd->llmd_exit)
+       if (!init)
                lo->ll_pos_last_checkpoint =
                lo->ll_pos_last_checkpoint =
-                                       lfsck->li_pos_current.lp_oit_cookie;
+                               lfsck->li_pos_checkpoint.lp_oit_cookie;
 
        if (result > 0) {
                lo->ll_status = LS_SCANNING_PHASE2;
 
        if (result > 0) {
                lo->ll_status = LS_SCANNING_PHASE2;
@@ -5119,7 +4346,8 @@ static int lfsck_layout_slave_post(const struct lu_env *env,
        spin_lock(&lfsck->li_lock);
        if (!init)
                lo->ll_pos_last_checkpoint =
        spin_lock(&lfsck->li_lock);
        if (!init)
                lo->ll_pos_last_checkpoint =
-                                       lfsck->li_pos_current.lp_oit_cookie;
+                               lfsck->li_pos_checkpoint.lp_oit_cookie;
+
        if (result > 0) {
                lo->ll_status = LS_SCANNING_PHASE2;
                lo->ll_flags |= LF_SCANNED_ONCE;
        if (result > 0) {
                lo->ll_status = LS_SCANNING_PHASE2;
                lo->ll_flags |= LF_SCANNED_ONCE;
@@ -5353,25 +4581,9 @@ out:
 static int lfsck_layout_master_double_scan(const struct lu_env *env,
                                           struct lfsck_component *com)
 {
 static int lfsck_layout_master_double_scan(const struct lu_env *env,
                                           struct lfsck_component *com)
 {
-       struct lfsck_layout_master_data *llmd    = com->lc_data;
-       struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
-       struct ptlrpc_thread            *athread = &llmd->llmd_thread;
-       struct lfsck_layout             *lo      = com->lc_file_ram;
-       struct l_wait_info               lwi     = { 0 };
-
-       if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
-               return 0;
-
-       llmd->llmd_to_double_scan = 1;
-       wake_up_all(&athread->t_ctl_waitq);
-       l_wait_event(mthread->t_ctl_waitq,
-                    llmd->llmd_in_double_scan ||
-                    thread_is_stopped(athread),
-                    &lwi);
-       if (llmd->llmd_assistant_status < 0)
-               return llmd->llmd_assistant_status;
+       struct lfsck_layout *lo = com->lc_file_ram;
 
 
-       return 0;
+       return lfsck_double_scan_generic(env, com, lo->ll_status);
 }
 
 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
 }
 
 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
@@ -5449,30 +4661,30 @@ done:
 static void lfsck_layout_master_data_release(const struct lu_env *env,
                                             struct lfsck_component *com)
 {
 static void lfsck_layout_master_data_release(const struct lu_env *env,
                                             struct lfsck_component *com)
 {
-       struct lfsck_layout_master_data *llmd   = com->lc_data;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct lfsck_tgt_descs          *ltds;
        struct lfsck_tgt_desc           *ltd;
        struct lfsck_tgt_desc           *next;
 
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct lfsck_tgt_descs          *ltds;
        struct lfsck_tgt_desc           *ltd;
        struct lfsck_tgt_desc           *next;
 
-       LASSERT(llmd != NULL);
-       LASSERT(thread_is_init(&llmd->llmd_thread) ||
-               thread_is_stopped(&llmd->llmd_thread));
-       LASSERT(list_empty(&llmd->llmd_req_list));
+       LASSERT(lad != NULL);
+       LASSERT(thread_is_init(&lad->lad_thread) ||
+               thread_is_stopped(&lad->lad_thread));
+       LASSERT(list_empty(&lad->lad_req_list));
 
        com->lc_data = NULL;
 
        ltds = &lfsck->li_ost_descs;
        spin_lock(&ltds->ltd_lock);
 
        com->lc_data = NULL;
 
        ltds = &lfsck->li_ost_descs;
        spin_lock(&ltds->ltd_lock);
-       list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
+       list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list,
                                 ltd_layout_phase_list) {
                list_del_init(&ltd->ltd_layout_phase_list);
        }
                                 ltd_layout_phase_list) {
                list_del_init(&ltd->ltd_layout_phase_list);
        }
-       list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
+       list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
                                 ltd_layout_phase_list) {
                list_del_init(&ltd->ltd_layout_phase_list);
        }
                                 ltd_layout_phase_list) {
                list_del_init(&ltd->ltd_layout_phase_list);
        }
-       list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
+       list_for_each_entry_safe(ltd, next, &lad->lad_ost_list,
                                 ltd_layout_list) {
                list_del_init(&ltd->ltd_layout_list);
        }
                                 ltd_layout_list) {
                list_del_init(&ltd->ltd_layout_list);
        }
@@ -5480,21 +4692,21 @@ static void lfsck_layout_master_data_release(const struct lu_env *env,
 
        ltds = &lfsck->li_mdt_descs;
        spin_lock(&ltds->ltd_lock);
 
        ltds = &lfsck->li_mdt_descs;
        spin_lock(&ltds->ltd_lock);
-       list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
                                 ltd_layout_phase_list) {
                list_del_init(&ltd->ltd_layout_phase_list);
        }
                                 ltd_layout_phase_list) {
                list_del_init(&ltd->ltd_layout_phase_list);
        }
-       list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
                                 ltd_layout_phase_list) {
                list_del_init(&ltd->ltd_layout_phase_list);
        }
                                 ltd_layout_phase_list) {
                list_del_init(&ltd->ltd_layout_phase_list);
        }
-       list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
                                 ltd_layout_list) {
                list_del_init(&ltd->ltd_layout_list);
        }
        spin_unlock(&ltds->ltd_lock);
 
                                 ltd_layout_list) {
                list_del_init(&ltd->ltd_layout_list);
        }
        spin_unlock(&ltds->ltd_lock);
 
-       OBD_FREE_PTR(llmd);
+       OBD_FREE_PTR(lad);
 }
 
 static void lfsck_layout_slave_data_release(const struct lu_env *env,
 }
 
 static void lfsck_layout_slave_data_release(const struct lu_env *env,
@@ -5526,22 +4738,6 @@ static void lfsck_layout_slave_data_release(const struct lu_env *env,
        OBD_FREE_PTR(llsd);
 }
 
        OBD_FREE_PTR(llsd);
 }
 
-static void lfsck_layout_master_quit(const struct lu_env *env,
-                                    struct lfsck_component *com)
-{
-       struct lfsck_layout_master_data *llmd    = com->lc_data;
-       struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
-       struct ptlrpc_thread            *athread = &llmd->llmd_thread;
-       struct l_wait_info               lwi     = { 0 };
-
-       llmd->llmd_exit = 1;
-       wake_up_all(&athread->t_ctl_waitq);
-       l_wait_event(mthread->t_ctl_waitq,
-                    thread_is_init(athread) ||
-                    thread_is_stopped(athread),
-                    &lwi);
-}
-
 static void lfsck_layout_slave_quit(const struct lu_env *env,
                                    struct lfsck_component *com)
 {
 static void lfsck_layout_slave_quit(const struct lu_env *env,
                                    struct lfsck_component *com)
 {
@@ -5554,7 +4750,7 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env,
 {
        struct lfsck_instance           *lfsck = com->lc_lfsck;
        struct lfsck_layout             *lo    = com->lc_file_ram;
 {
        struct lfsck_instance           *lfsck = com->lc_lfsck;
        struct lfsck_layout             *lo    = com->lc_file_ram;
-       struct lfsck_layout_master_data *llmd  = com->lc_data;
+       struct lfsck_assistant_data     *lad   = com->lc_data;
        struct lfsck_tgt_descs          *ltds;
        struct lfsck_tgt_desc           *ltd;
        bool                             fail  = false;
        struct lfsck_tgt_descs          *ltds;
        struct lfsck_tgt_desc           *ltd;
        bool                             fail  = false;
@@ -5569,7 +4765,7 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env,
                RETURN(rc);
        }
 
                RETURN(rc);
        }
 
-       CDEBUG(D_LFSCK, "%s: layout LFSCK master handle notify %u "
+       CDEBUG(D_LFSCK, "%s: layout LFSCK master handles notify %u "
               "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
               lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
               lr->lr_index, lr->lr_status);
               "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
               lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
               lr->lr_index, lr->lr_status);
@@ -5605,15 +4801,15 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env,
                if (lr->lr_flags & LEF_FROM_OST) {
                        if (list_empty(&ltd->ltd_layout_list))
                                list_add_tail(&ltd->ltd_layout_list,
                if (lr->lr_flags & LEF_FROM_OST) {
                        if (list_empty(&ltd->ltd_layout_list))
                                list_add_tail(&ltd->ltd_layout_list,
-                                             &llmd->llmd_ost_list);
+                                             &lad->lad_ost_list);
                        list_add_tail(&ltd->ltd_layout_phase_list,
                        list_add_tail(&ltd->ltd_layout_phase_list,
-                                     &llmd->llmd_ost_phase2_list);
+                                     &lad->lad_ost_phase2_list);
                } else {
                        if (list_empty(&ltd->ltd_layout_list))
                                list_add_tail(&ltd->ltd_layout_list,
                } else {
                        if (list_empty(&ltd->ltd_layout_list))
                                list_add_tail(&ltd->ltd_layout_list,
-                                             &llmd->llmd_mdt_list);
+                                             &lad->lad_mdt_list);
                        list_add_tail(&ltd->ltd_layout_phase_list,
                        list_add_tail(&ltd->ltd_layout_phase_list,
-                                     &llmd->llmd_mdt_phase2_list);
+                                     &lad->lad_mdt_phase2_list);
                }
                break;
        case LE_PHASE2_DONE:
                }
                break;
        case LE_PHASE2_DONE:
@@ -5639,8 +4835,8 @@ static int lfsck_layout_master_in_notify(const struct lu_env *env,
                stop->ls_status = lr->lr_status;
                stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
                lfsck_stop(env, lfsck->li_bottom, stop);
                stop->ls_status = lr->lr_status;
                stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
                lfsck_stop(env, lfsck->li_bottom, stop);
-       } else if (lfsck_layout_master_to_orphan(llmd)) {
-               wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
+       } else if (lfsck_phase2_next_ready(lad)) {
+               wake_up_all(&lad->lad_thread.t_ctl_waitq);
        }
 
        RETURN(0);
        }
 
        RETURN(0);
@@ -5715,7 +4911,10 @@ static int lfsck_layout_slave_in_notify(const struct lu_env *env,
                wake_up_all(&lfsck->li_thread.t_ctl_waitq);
 
        if (lr->lr_event == LE_PEER_EXIT &&
                wake_up_all(&lfsck->li_thread.t_ctl_waitq);
 
        if (lr->lr_event == LE_PEER_EXIT &&
-           lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
+           (lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT ||
+            (list_empty(&llsd->llsd_master_list) &&
+             (lr->lr_status == LS_STOPPED ||
+              lr->lr_status == LS_CO_STOPPED)))) {
                struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
 
                memset(stop, 0, sizeof(*stop));
                struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
 
                memset(stop, 0, sizeof(*stop));
@@ -5735,60 +4934,6 @@ static int lfsck_layout_query(const struct lu_env *env,
        return lo->ll_status;
 }
 
        return lo->ll_status;
 }
 
-static int lfsck_layout_master_stop_notify(const struct lu_env *env,
-                                          struct lfsck_component *com,
-                                          struct lfsck_tgt_descs *ltds,
-                                          struct lfsck_tgt_desc *ltd,
-                                          struct ptlrpc_request_set *set)
-{
-       struct lfsck_thread_info          *info  = lfsck_env_info(env);
-       struct lfsck_async_interpret_args *laia  = &info->lti_laia;
-       struct lfsck_request              *lr    = &info->lti_lr;
-       struct lfsck_instance             *lfsck = com->lc_lfsck;
-       int                                rc;
-
-       spin_lock(&ltds->ltd_lock);
-       if (list_empty(&ltd->ltd_layout_list)) {
-               LASSERT(list_empty(&ltd->ltd_layout_phase_list));
-               spin_unlock(&ltds->ltd_lock);
-
-               return 0;
-       }
-
-       list_del_init(&ltd->ltd_layout_phase_list);
-       list_del_init(&ltd->ltd_layout_list);
-       spin_unlock(&ltds->ltd_lock);
-
-       memset(lr, 0, sizeof(*lr));
-       lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
-       lr->lr_event = LE_PEER_EXIT;
-       lr->lr_active = LFSCK_TYPE_LAYOUT;
-       lr->lr_status = LS_CO_PAUSED;
-       if (ltds == &lfsck->li_ost_descs)
-               lr->lr_flags = LEF_TO_OST;
-
-       laia->laia_com = com;
-       laia->laia_ltds = ltds;
-       atomic_inc(&ltd->ltd_ref);
-       laia->laia_ltd = ltd;
-       laia->laia_lr = lr;
-       laia->laia_shared = 0;
-
-       rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
-                                lfsck_layout_master_async_interpret,
-                                laia, LFSCK_NOTIFY);
-       if (rc != 0) {
-               CDEBUG(D_LFSCK, "%s: layout LFSCK fail to notify %s %x "
-                      "for co-stop: rc = %d\n",
-                      lfsck_lfsck2name(lfsck),
-                      (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
-                      ltd->ltd_index, rc);
-               lfsck_tgt_put(ltd);
-       }
-
-       return rc;
-}
-
 /* with lfsck::li_lock held */
 static int lfsck_layout_slave_join(const struct lu_env *env,
                                   struct lfsck_component *com,
 /* with lfsck::li_lock held */
 static int lfsck_layout_slave_join(const struct lu_env *env,
                                   struct lfsck_component *com,
@@ -5837,14 +4982,12 @@ static struct lfsck_operations lfsck_layout_master_ops = {
        .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
        .lfsck_exec_dir         = lfsck_layout_exec_dir,
        .lfsck_post             = lfsck_layout_master_post,
        .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
        .lfsck_exec_dir         = lfsck_layout_exec_dir,
        .lfsck_post             = lfsck_layout_master_post,
-       .lfsck_interpret        = lfsck_layout_master_async_interpret,
        .lfsck_dump             = lfsck_layout_dump,
        .lfsck_double_scan      = lfsck_layout_master_double_scan,
        .lfsck_data_release     = lfsck_layout_master_data_release,
        .lfsck_dump             = lfsck_layout_dump,
        .lfsck_double_scan      = lfsck_layout_master_double_scan,
        .lfsck_data_release     = lfsck_layout_master_data_release,
-       .lfsck_quit             = lfsck_layout_master_quit,
+       .lfsck_quit             = lfsck_quit_generic,
        .lfsck_in_notify        = lfsck_layout_master_in_notify,
        .lfsck_query            = lfsck_layout_query,
        .lfsck_in_notify        = lfsck_layout_master_in_notify,
        .lfsck_query            = lfsck_layout_query,
-       .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
 };
 
 static struct lfsck_operations lfsck_layout_slave_ops = {
 };
 
 static struct lfsck_operations lfsck_layout_slave_ops = {
@@ -5864,6 +5007,30 @@ static struct lfsck_operations lfsck_layout_slave_ops = {
        .lfsck_join             = lfsck_layout_slave_join,
 };
 
        .lfsck_join             = lfsck_layout_slave_join,
 };
 
+static void lfsck_layout_assistant_fill_pos(const struct lu_env *env,
+                                           struct lfsck_component *com,
+                                           struct lfsck_position *pos)
+{
+       struct lfsck_assistant_data     *lad = com->lc_data;
+       struct lfsck_layout_req         *llr;
+
+       if (list_empty(&lad->lad_req_list))
+               return;
+
+       llr = list_entry(lad->lad_req_list.next,
+                        struct lfsck_layout_req,
+                        llr_lar.lar_list);
+       pos->lp_oit_cookie = llr->llr_parent->llo_cookie - 1;
+}
+
+struct lfsck_assistant_operations lfsck_layout_assistant_ops = {
+       .la_handler_p1          = lfsck_layout_assistant_handler_p1,
+       .la_handler_p2          = lfsck_layout_assistant_handler_p2,
+       .la_fill_pos            = lfsck_layout_assistant_fill_pos,
+       .la_double_scan_result  = lfsck_layout_double_scan_result,
+       .la_req_fini            = lfsck_layout_assistant_req_fini,
+};
+
 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
 {
        struct lfsck_component  *com;
 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
 {
        struct lfsck_component  *com;
@@ -5884,23 +5051,12 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
        com->lc_lfsck = lfsck;
        com->lc_type = LFSCK_TYPE_LAYOUT;
        if (lfsck->li_master) {
        com->lc_lfsck = lfsck;
        com->lc_type = LFSCK_TYPE_LAYOUT;
        if (lfsck->li_master) {
-               struct lfsck_layout_master_data *llmd;
-
                com->lc_ops = &lfsck_layout_master_ops;
                com->lc_ops = &lfsck_layout_master_ops;
-               OBD_ALLOC_PTR(llmd);
-               if (llmd == NULL)
+               com->lc_data = lfsck_assistant_data_init(
+                               &lfsck_layout_assistant_ops,
+                               "lfsck_layout");
+               if (com->lc_data == NULL)
                        GOTO(out, rc = -ENOMEM);
                        GOTO(out, rc = -ENOMEM);
-
-               INIT_LIST_HEAD(&llmd->llmd_req_list);
-               spin_lock_init(&llmd->llmd_lock);
-               INIT_LIST_HEAD(&llmd->llmd_ost_list);
-               INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
-               INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
-               INIT_LIST_HEAD(&llmd->llmd_mdt_list);
-               INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
-               INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
-               init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
-               com->lc_data = llmd;
        } else {
                struct lfsck_layout_slave_data *llsd;
 
        } else {
                struct lfsck_layout_slave_data *llsd;
 
index 9a7980e..dda5e1f 100644 (file)
@@ -42,6 +42,8 @@
 
 #include "lfsck_internal.h"
 
 
 #include "lfsck_internal.h"
 
+#define LFSCK_CHECKPOINT_SKIP  1
+
 /* define lfsck thread key */
 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
 
 /* define lfsck thread key */
 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
 
@@ -148,6 +150,8 @@ static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
                if (likely(ltd != NULL)) {
                        LASSERT(list_empty(&ltd->ltd_layout_list));
                        LASSERT(list_empty(&ltd->ltd_layout_phase_list));
                if (likely(ltd != NULL)) {
                        LASSERT(list_empty(&ltd->ltd_layout_list));
                        LASSERT(list_empty(&ltd->ltd_layout_phase_list));
+                       LASSERT(list_empty(&ltd->ltd_namespace_list));
+                       LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
 
                        ltds->ltd_tgtnr--;
                        cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
 
                        ltds->ltd_tgtnr--;
                        cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
@@ -1271,8 +1275,12 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        snprintf(name, 8, "MDT%04x", node);
        rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
                       (const struct dt_key *)name, BYPASS_CAPA);
        snprintf(name, 8, "MDT%04x", node);
        rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
                       (const struct dt_key *)name, BYPASS_CAPA);
-       if (rc == -ENOENT)
-               goto check_child1;
+       if (rc == -ENOENT) {
+               if (!fid_is_zero(&bk->lb_lpf_fid))
+                       goto check_child1;
+
+               GOTO(put, rc = 0);
+       }
 
        if (rc != 0)
                GOTO(put, rc);
 
        if (rc != 0)
                GOTO(put, rc);
@@ -1649,9 +1657,10 @@ void lfsck_control_speed_by_self(struct lfsck_component *com)
        }
 }
 
        }
 }
 
-struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
-                                                struct lfsck_component *com,
-                                                struct lfsck_start_param *lsp)
+static struct lfsck_thread_args *
+lfsck_thread_args_init(struct lfsck_instance *lfsck,
+                      struct lfsck_component *com,
+                      struct lfsck_start_param *lsp)
 {
        struct lfsck_thread_args *lta;
        int                       rc;
 {
        struct lfsck_thread_args *lta;
        int                       rc;
@@ -1684,6 +1693,223 @@ void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
        OBD_FREE_PTR(lta);
 }
 
        OBD_FREE_PTR(lta);
 }
 
+struct lfsck_assistant_data *
+lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
+                         const char *name)
+{
+       struct lfsck_assistant_data *lad;
+
+       OBD_ALLOC_PTR(lad);
+       if (lad != NULL) {
+               INIT_LIST_HEAD(&lad->lad_req_list);
+               spin_lock_init(&lad->lad_lock);
+               INIT_LIST_HEAD(&lad->lad_ost_list);
+               INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
+               INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
+               INIT_LIST_HEAD(&lad->lad_mdt_list);
+               INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
+               INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
+               init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
+               lad->lad_ops = lao;
+               lad->lad_name = name;
+       }
+
+       return lad;
+}
+
+/**
+ * Generic LFSCK asynchronous communication interpretor function.
+ * The LFSCK RPC reply for both the event notification and status
+ * querying will be handled here.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] req      pointer to the LFSCK request
+ * \param[in] args     pointer to the lfsck_async_interpret_args
+ * \param[in] rc       the result for handling the LFSCK request
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+int lfsck_async_interpret_common(const struct lu_env *env,
+                                struct ptlrpc_request *req,
+                                void *args, int rc)
+{
+       struct lfsck_async_interpret_args *laia = args;
+       struct lfsck_component            *com  = laia->laia_com;
+       struct lfsck_assistant_data       *lad  = com->lc_data;
+       struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
+       struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
+       struct lfsck_request              *lr   = laia->laia_lr;
+
+       LASSERT(com->lc_lfsck->li_master);
+
+       switch (lr->lr_event) {
+       case LE_START:
+               if (rc != 0) {
+                       CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
+                              "start: rc = %d\n",
+                              lfsck_lfsck2name(com->lc_lfsck),
+                              (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
+                              ltd->ltd_index, lad->lad_name, rc);
+
+                       if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                               struct lfsck_layout *lo = com->lc_file_ram;
+
+                               lo->ll_flags |= LF_INCOMPLETE;
+                       } else {
+                               struct lfsck_namespace *ns = com->lc_file_ram;
+
+                               ns->ln_flags |= LF_INCOMPLETE;
+                       }
+                       break;
+               }
+
+               spin_lock(&ltds->ltd_lock);
+               if (ltd->ltd_dead) {
+                       spin_unlock(&ltds->ltd_lock);
+                       break;
+               }
+
+               if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                       struct list_head *list;
+                       struct list_head *phase_list;
+
+                       if (ltd->ltd_layout_done) {
+                               spin_unlock(&ltds->ltd_lock);
+                               break;
+                       }
+
+                       if (lr->lr_flags & LEF_TO_OST) {
+                               list = &lad->lad_ost_list;
+                               phase_list = &lad->lad_ost_phase1_list;
+                       } else {
+                               list = &lad->lad_mdt_list;
+                               phase_list = &lad->lad_mdt_phase1_list;
+                       }
+
+                       if (list_empty(&ltd->ltd_layout_list))
+                               list_add_tail(&ltd->ltd_layout_list, list);
+                       if (list_empty(&ltd->ltd_layout_phase_list))
+                               list_add_tail(&ltd->ltd_layout_phase_list,
+                                             phase_list);
+               } else {
+                       if (ltd->ltd_namespace_done) {
+                               spin_unlock(&ltds->ltd_lock);
+                               break;
+                       }
+
+                       if (list_empty(&ltd->ltd_namespace_list))
+                               list_add_tail(&ltd->ltd_namespace_list,
+                                             &lad->lad_mdt_list);
+                       if (list_empty(&ltd->ltd_namespace_phase_list))
+                               list_add_tail(&ltd->ltd_namespace_phase_list,
+                                             &lad->lad_mdt_phase1_list);
+               }
+               spin_unlock(&ltds->ltd_lock);
+               break;
+       case LE_STOP:
+       case LE_PHASE1_DONE:
+       case LE_PHASE2_DONE:
+       case LE_PEER_EXIT:
+               if (rc != 0 && rc != -EALREADY)
+                       CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
+                             "event = %d, rc = %d\n",
+                             lfsck_lfsck2name(com->lc_lfsck),
+                             (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
+                             ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
+               break;
+       case LE_QUERY: {
+               struct lfsck_reply *reply;
+               struct list_head *list;
+               struct list_head *phase_list;
+
+               if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                       list = &ltd->ltd_layout_list;
+                       phase_list = &ltd->ltd_layout_phase_list;
+               } else {
+                       list = &ltd->ltd_namespace_list;
+                       phase_list = &ltd->ltd_namespace_phase_list;
+               }
+
+               if (rc != 0) {
+                       spin_lock(&ltds->ltd_lock);
+                       list_del_init(phase_list);
+                       list_del_init(list);
+                       spin_unlock(&ltds->ltd_lock);
+                       break;
+               }
+
+               reply = req_capsule_server_get(&req->rq_pill,
+                                              &RMF_LFSCK_REPLY);
+               if (reply == NULL) {
+                       rc = -EPROTO;
+                       CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
+                              "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
+                              lad->lad_name, rc);
+                       spin_lock(&ltds->ltd_lock);
+                       list_del_init(phase_list);
+                       list_del_init(list);
+                       spin_unlock(&ltds->ltd_lock);
+                       break;
+               }
+
+               switch (reply->lr_status) {
+               case LS_SCANNING_PHASE1:
+                       break;
+               case LS_SCANNING_PHASE2:
+                       spin_lock(&ltds->ltd_lock);
+                       list_del_init(phase_list);
+                       if (ltd->ltd_dead) {
+                               spin_unlock(&ltds->ltd_lock);
+                               break;
+                       }
+
+                       if (com->lc_type == LFSCK_TYPE_LAYOUT) {
+                               if (ltd->ltd_layout_done) {
+                                       spin_unlock(&ltds->ltd_lock);
+                                       break;
+                               }
+
+                               if (lr->lr_flags & LEF_TO_OST)
+                                       list_add_tail(phase_list,
+                                               &lad->lad_ost_phase2_list);
+                               else
+                                       list_add_tail(phase_list,
+                                               &lad->lad_mdt_phase2_list);
+                       } else {
+                               if (ltd->ltd_namespace_done) {
+                                       spin_unlock(&ltds->ltd_lock);
+                                       break;
+                               }
+
+                               list_add_tail(phase_list,
+                                             &lad->lad_mdt_phase2_list);
+                       }
+                       spin_unlock(&ltds->ltd_lock);
+                       break;
+               default:
+                       spin_lock(&ltds->ltd_lock);
+                       list_del_init(phase_list);
+                       list_del_init(list);
+                       spin_unlock(&ltds->ltd_lock);
+                       break;
+               }
+               break;
+       }
+       default:
+               CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
+                      lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
+               break;
+       }
+
+       if (!laia->laia_shared) {
+               lfsck_tgt_put(ltd);
+               lfsck_component_put(env, com);
+       }
+
+       return 0;
+}
+
 static void lfsck_interpret(const struct lu_env *env,
                            struct lfsck_instance *lfsck,
                            struct ptlrpc_request *req, void *args, int result)
 static void lfsck_interpret(const struct lu_env *env,
                            struct lfsck_instance *lfsck,
                            struct ptlrpc_request *req, void *args, int result)
@@ -1696,17 +1922,13 @@ static void lfsck_interpret(const struct lu_env *env,
 
        spin_lock(&lfsck->li_lock);
        list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
 
        spin_lock(&lfsck->li_lock);
        list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
-               if (com->lc_ops->lfsck_interpret != NULL) {
-                       laia->laia_com = com;
-                       com->lc_ops->lfsck_interpret(env, req, laia, result);
-               }
+               laia->laia_com = com;
+               lfsck_async_interpret_common(env, req, laia, result);
        }
 
        list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
        }
 
        list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
-               if (com->lc_ops->lfsck_interpret != NULL) {
-                       laia->laia_com = com;
-                       com->lc_ops->lfsck_interpret(env, req, laia, result);
-               }
+               laia->laia_com = com;
+               lfsck_async_interpret_common(env, req, laia, result);
        }
        spin_unlock(&lfsck->li_lock);
 }
        }
        spin_unlock(&lfsck->li_lock);
 }
@@ -1716,11 +1938,12 @@ static int lfsck_stop_notify(const struct lu_env *env,
                             struct lfsck_tgt_descs *ltds,
                             struct lfsck_tgt_desc *ltd, __u16 type)
 {
                             struct lfsck_tgt_descs *ltds,
                             struct lfsck_tgt_desc *ltd, __u16 type)
 {
-       struct ptlrpc_request_set *set;
-       struct lfsck_component    *com;
-       int                        rc  = 0;
+       struct lfsck_component *com;
+       int                     rc = 0;
        ENTRY;
 
        ENTRY;
 
+       LASSERT(lfsck->li_master);
+
        spin_lock(&lfsck->li_lock);
        com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
        if (com == NULL)
        spin_lock(&lfsck->li_lock);
        com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
        if (com == NULL)
@@ -1731,22 +1954,72 @@ static int lfsck_stop_notify(const struct lu_env *env,
        spin_unlock(&lfsck->li_lock);
 
        if (com != NULL) {
        spin_unlock(&lfsck->li_lock);
 
        if (com != NULL) {
-               if (com->lc_ops->lfsck_stop_notify != NULL) {
-                       set = ptlrpc_prep_set();
-                       if (set == NULL) {
-                               lfsck_component_put(env, com);
+               struct lfsck_thread_info          *info  = lfsck_env_info(env);
+               struct lfsck_async_interpret_args *laia  = &info->lti_laia;
+               struct lfsck_request              *lr    = &info->lti_lr;
+               struct lfsck_assistant_data       *lad   = com->lc_data;
+               struct list_head                  *list;
+               struct list_head                  *phase_list;
+               struct ptlrpc_request_set         *set;
+
+               set = ptlrpc_prep_set();
+               if (set == NULL) {
+                       lfsck_component_put(env, com);
 
 
-                               RETURN(-ENOMEM);
-                       }
+                       RETURN(-ENOMEM);
+               }
 
 
-                       rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
-                                                           ltd, set);
-                       if (rc == 0)
-                               rc = ptlrpc_set_wait(set);
+               if (type == LFSCK_TYPE_LAYOUT) {
+                       list = &ltd->ltd_layout_list;
+                       phase_list = &ltd->ltd_layout_phase_list;
+               } else {
+                       list = &ltd->ltd_namespace_list;
+                       phase_list = &ltd->ltd_namespace_phase_list;
+               }
 
 
+               spin_lock(&ltds->ltd_lock);
+               if (list_empty(list)) {
+                       LASSERT(list_empty(phase_list));
+                       spin_unlock(&ltds->ltd_lock);
                        ptlrpc_set_destroy(set);
                        ptlrpc_set_destroy(set);
+
+                       RETURN(0);
                }
 
                }
 
+               list_del_init(phase_list);
+               list_del_init(list);
+               spin_unlock(&ltds->ltd_lock);
+
+               memset(lr, 0, sizeof(*lr));
+               lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
+               lr->lr_event = LE_PEER_EXIT;
+               lr->lr_active = type;
+               lr->lr_status = LS_CO_PAUSED;
+               if (ltds == &lfsck->li_ost_descs)
+                       lr->lr_flags = LEF_TO_OST;
+
+               laia->laia_com = com;
+               laia->laia_ltds = ltds;
+               atomic_inc(&ltd->ltd_ref);
+               laia->laia_ltd = ltd;
+               laia->laia_lr = lr;
+               laia->laia_shared = 0;
+
+               rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
+                                        lfsck_async_interpret_common,
+                                        laia, LFSCK_NOTIFY);
+               if (rc != 0) {
+                       CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
+                              "co-stop for %s: rc = %d\n",
+                              lfsck_lfsck2name(lfsck),
+                              (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
+                              ltd->ltd_index, lad->lad_name, rc);
+                       lfsck_tgt_put(ltd);
+               } else {
+                       rc = ptlrpc_set_wait(set);
+               }
+
+               ptlrpc_set_destroy(set);
                lfsck_component_put(env, com);
        }
 
                lfsck_component_put(env, com);
        }
 
@@ -1820,6 +2093,139 @@ int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
        return 0;
 }
 
        return 0;
 }
 
+int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
+                         struct lfsck_start_param *lsp)
+{
+       struct lfsck_instance           *lfsck   = com->lc_lfsck;
+       struct lfsck_assistant_data     *lad     = com->lc_data;
+       struct ptlrpc_thread            *mthread = &lfsck->li_thread;
+       struct ptlrpc_thread            *athread = &lad->lad_thread;
+       struct lfsck_thread_args        *lta;
+       struct task_struct              *task;
+       int                              rc;
+       ENTRY;
+
+       lad->lad_assistant_status = 0;
+       lad->lad_post_result = 0;
+       lad->lad_to_post = 0;
+       lad->lad_to_double_scan = 0;
+       lad->lad_in_double_scan = 0;
+       lad->lad_exit = 0;
+       thread_set_flags(athread, 0);
+
+       lta = lfsck_thread_args_init(lfsck, com, lsp);
+       if (IS_ERR(lta))
+               RETURN(PTR_ERR(lta));
+
+       task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CERROR("%s: cannot start LFSCK assistant thread for %s: "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
+               lfsck_thread_args_fini(lta);
+       } else {
+               struct l_wait_info lwi = { 0 };
+
+               l_wait_event(mthread->t_ctl_waitq,
+                            thread_is_running(athread) ||
+                            thread_is_stopped(athread),
+                            &lwi);
+               if (unlikely(!thread_is_running(athread)))
+                       rc = lad->lad_assistant_status;
+               else
+                       rc = 0;
+       }
+
+       RETURN(rc);
+}
+
+int lfsck_checkpoint_generic(const struct lu_env *env,
+                            struct lfsck_component *com)
+{
+       struct lfsck_assistant_data     *lad     = com->lc_data;
+       struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
+       struct ptlrpc_thread            *athread = &lad->lad_thread;
+       struct l_wait_info               lwi     = { 0 };
+
+       if (com->lc_new_checked == 0)
+               return LFSCK_CHECKPOINT_SKIP;
+
+       l_wait_event(mthread->t_ctl_waitq,
+                    list_empty(&lad->lad_req_list) ||
+                    !thread_is_running(mthread) ||
+                    thread_is_stopped(athread),
+                    &lwi);
+
+       if (!thread_is_running(mthread) || thread_is_stopped(athread))
+               return LFSCK_CHECKPOINT_SKIP;
+
+       return 0;
+}
+
+void lfsck_post_generic(const struct lu_env *env,
+                       struct lfsck_component *com, int *result)
+{
+       struct lfsck_assistant_data     *lad     = com->lc_data;
+       struct ptlrpc_thread            *athread = &lad->lad_thread;
+       struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
+       struct l_wait_info               lwi     = { 0 };
+
+       lad->lad_post_result = *result;
+       if (*result <= 0)
+               lad->lad_exit = 1;
+       lad->lad_to_post = 1;
+
+       wake_up_all(&athread->t_ctl_waitq);
+       l_wait_event(mthread->t_ctl_waitq,
+                    (*result > 0 && list_empty(&lad->lad_req_list)) ||
+                    thread_is_stopped(athread),
+                    &lwi);
+
+       if (lad->lad_assistant_status < 0)
+               *result = lad->lad_assistant_status;
+}
+
+int lfsck_double_scan_generic(const struct lu_env *env,
+                             struct lfsck_component *com, int status)
+{
+       struct lfsck_assistant_data     *lad     = com->lc_data;
+       struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
+       struct ptlrpc_thread            *athread = &lad->lad_thread;
+       struct l_wait_info               lwi     = { 0 };
+
+       if (status != LS_SCANNING_PHASE2)
+               lad->lad_exit = 1;
+       else
+               lad->lad_to_double_scan = 1;
+
+       wake_up_all(&athread->t_ctl_waitq);
+       l_wait_event(mthread->t_ctl_waitq,
+                    lad->lad_in_double_scan ||
+                    thread_is_stopped(athread),
+                    &lwi);
+
+       if (lad->lad_assistant_status < 0)
+               return lad->lad_assistant_status;
+
+       return 0;
+}
+
+void lfsck_quit_generic(const struct lu_env *env,
+                       struct lfsck_component *com)
+{
+       struct lfsck_assistant_data     *lad     = com->lc_data;
+       struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
+       struct ptlrpc_thread            *athread = &lad->lad_thread;
+       struct l_wait_info               lwi     = { 0 };
+
+       lad->lad_exit = 1;
+       wake_up_all(&athread->t_ctl_waitq);
+       l_wait_event(mthread->t_ctl_waitq,
+                    thread_is_init(athread) ||
+                    thread_is_stopped(athread),
+                    &lwi);
+}
+
 /* external interfaces */
 
 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
 /* external interfaces */
 
 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
@@ -2087,6 +2493,7 @@ static int lfsck_start_all(const struct lu_env *env,
 
                laia->laia_ltd = ltd;
                ltd->ltd_layout_done = 0;
 
                laia->laia_ltd = ltd;
                ltd->ltd_layout_done = 0;
+               ltd->ltd_namespace_done = 0;
                rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
                                         lfsck_async_interpret, laia,
                                         LFSCK_NOTIFY);
                rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
                                         lfsck_async_interpret, laia,
                                         LFSCK_NOTIFY);
@@ -2287,7 +2694,7 @@ int lfsck_start(const struct lu_env *env, struct dt_device *key,
        }
 
 trigger:
        }
 
 trigger:
-       lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
+       lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
        if (bk->lb_param & LPF_DRYRUN)
                lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
 
        if (bk->lb_param & LPF_DRYRUN)
                lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
 
@@ -2732,6 +3139,8 @@ int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
        INIT_LIST_HEAD(&ltd->ltd_orphan_list);
        INIT_LIST_HEAD(&ltd->ltd_layout_list);
        INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
        INIT_LIST_HEAD(&ltd->ltd_orphan_list);
        INIT_LIST_HEAD(&ltd->ltd_layout_list);
        INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
+       INIT_LIST_HEAD(&ltd->ltd_namespace_list);
+       INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
        atomic_set(&ltd->ltd_ref, 1);
        ltd->ltd_index = index;
 
        atomic_set(&ltd->ltd_ref, 1);
        ltd->ltd_index = index;
 
@@ -2831,6 +3240,7 @@ unlock:
                spin_lock(&ltds->ltd_lock);
                ltd->ltd_dead = 1;
                spin_unlock(&ltds->ltd_lock);
                spin_lock(&ltds->ltd_lock);
                ltd->ltd_dead = 1;
                spin_unlock(&ltds->ltd_lock);
+               lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
                lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
                lfsck_tgt_put(ltd);
        }
                lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
                lfsck_tgt_put(ltd);
        }
index dc4bb5e..6bd6dee 100644 (file)
 
 #define LFSCK_NAMESPACE_MAGIC  0xA0629D03
 
 
 #define LFSCK_NAMESPACE_MAGIC  0xA0629D03
 
+enum lfsck_nameentry_check {
+       LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
+       LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
+       LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
+};
+
 static const char lfsck_namespace_name[] = "lfsck_namespace";
 
 static const char lfsck_namespace_name[] = "lfsck_namespace";
 
+struct lfsck_namespace_req {
+       struct lfsck_assistant_req       lnr_lar;
+       struct dt_object                *lnr_obj;
+       struct lu_fid                    lnr_fid;
+       __u64                            lnr_oit_cookie;
+       __u64                            lnr_dir_cookie;
+       __u32                            lnr_attr;
+       __u32                            lnr_size;
+       __u16                            lnr_type;
+       __u16                            lnr_namelen;
+       char                             lnr_name[0];
+};
+
+static struct lfsck_namespace_req *
+lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
+                                  struct lu_dirent *ent, __u16 type)
+{
+       struct lfsck_namespace_req *lnr;
+       int                         size;
+
+       size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
+       OBD_ALLOC(lnr, size);
+       if (lnr == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
+       lu_object_get(&lfsck->li_obj_dir->do_lu);
+       lnr->lnr_obj = lfsck->li_obj_dir;
+       lnr->lnr_fid = ent->lde_fid;
+       lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
+       lnr->lnr_dir_cookie = ent->lde_hash;
+       lnr->lnr_attr = ent->lde_attrs;
+       lnr->lnr_size = size;
+       lnr->lnr_type = type;
+       lnr->lnr_namelen = ent->lde_namelen;
+       memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
+
+       return lnr;
+}
+
+static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
+                                              struct lfsck_assistant_req *lar)
+{
+       struct lfsck_namespace_req *lnr =
+                       container_of0(lar, struct lfsck_namespace_req, lnr_lar);
+
+       lu_object_put(env, &lnr->lnr_obj->do_lu);
+       OBD_FREE(lnr, lnr->lnr_size);
+}
+
 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
                                      struct lfsck_namespace *src)
 {
 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
                                      struct lfsck_namespace *src)
 {
@@ -117,6 +173,27 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
 }
 
        dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
 }
 
+static void lfsck_namespace_record_failure(const struct lu_env *env,
+                                          struct lfsck_instance *lfsck,
+                                          struct lfsck_namespace *ns)
+{
+       struct lfsck_position pos;
+
+       ns->ln_items_failed++;
+       lfsck_pos_fill(env, lfsck, &pos, false);
+       if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
+           lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
+               ns->ln_pos_first_inconsistent = pos;
+
+               CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
+                      "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
+                      lfsck_lfsck2name(lfsck),
+                      ns->ln_pos_first_inconsistent.lp_oit_cookie,
+                      PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
+                      ns->ln_pos_first_inconsistent.lp_dir_cookie);
+       }
+}
+
 /**
  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
  * \retval 0: succeed.
 /**
  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
  * \retval 0: succeed.
@@ -325,10 +402,9 @@ out:
 }
 
 static int lfsck_namespace_check_exist(const struct lu_env *env,
 }
 
 static int lfsck_namespace_check_exist(const struct lu_env *env,
-                                      struct lfsck_instance *lfsck,
+                                      struct dt_object *dir,
                                       struct dt_object *obj, const char *name)
 {
                                       struct dt_object *obj, const char *name)
 {
-       struct dt_object *dir = lfsck->li_obj_dir;
        struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
        int               rc;
        ENTRY;
        struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
        int               rc;
        ENTRY;
@@ -711,18 +787,7 @@ lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
        down_write(&com->lc_sem);
        if (new_checked)
                com->lc_new_checked++;
        down_write(&com->lc_sem);
        if (new_checked)
                com->lc_new_checked++;
-       ns->ln_items_failed++;
-       if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
-               lfsck_pos_fill(env, com->lc_lfsck,
-                              &ns->ln_pos_first_inconsistent, false);
-
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
-                      "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
-                      lfsck_lfsck2name(com->lc_lfsck),
-                      ns->ln_pos_first_inconsistent.lp_oit_cookie,
-                      PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
-                      ns->ln_pos_first_inconsistent.lp_dir_cookie);
-       }
+       lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
        up_write(&com->lc_sem);
 }
 
        up_write(&com->lc_sem);
 }
 
@@ -733,14 +798,17 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env,
        struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
 
        struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
 
-       if (com->lc_new_checked == 0 && !init)
-               return 0;
+       if (!init) {
+               rc = lfsck_checkpoint_generic(env, com);
+               if (rc != 0)
+                       goto log;
+       }
 
        down_write(&com->lc_sem);
        if (init) {
 
        down_write(&com->lc_sem);
        if (init) {
-               ns->ln_pos_latest_start = lfsck->li_pos_current;
+               ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
        } else {
        } else {
-               ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
+               ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
                ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
                ns->ln_time_last_checkpoint = cfs_time_current_sec();
                ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
                ns->ln_time_last_checkpoint = cfs_time_current_sec();
@@ -751,13 +819,14 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env,
        rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
 
        rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
 
+log:
        CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
               ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
               lfsck->li_pos_current.lp_oit_cookie,
               PFID(&lfsck->li_pos_current.lp_dir_parent),
               lfsck->li_pos_current.lp_dir_cookie, rc);
 
        CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
               ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
               lfsck->li_pos_current.lp_oit_cookie,
               PFID(&lfsck->li_pos_current.lp_dir_parent),
               lfsck->li_pos_current.lp_dir_cookie, rc);
 
-       return rc;
+       return rc > 0 ? 0 : rc;
 }
 
 static int lfsck_namespace_prep(const struct lu_env *env,
 }
 
 static int lfsck_namespace_prep(const struct lu_env *env,
@@ -767,10 +836,9 @@ static int lfsck_namespace_prep(const struct lu_env *env,
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
        struct lfsck_namespace  *ns     = com->lc_file_ram;
        struct lfsck_position   *pos    = &com->lc_pos_start;
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
        struct lfsck_namespace  *ns     = com->lc_file_ram;
        struct lfsck_position   *pos    = &com->lc_pos_start;
+       int                      rc;
 
        if (ns->ln_status == LS_COMPLETED) {
 
        if (ns->ln_status == LS_COMPLETED) {
-               int rc;
-
                rc = lfsck_namespace_reset(env, com, false);
                if (rc == 0)
                        rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
                rc = lfsck_namespace_reset(env, com, false);
                if (rc == 0)
                        rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
@@ -785,8 +853,8 @@ static int lfsck_namespace_prep(const struct lu_env *env,
 
        down_write(&com->lc_sem);
        ns->ln_time_latest_start = cfs_time_current_sec();
 
        down_write(&com->lc_sem);
        ns->ln_time_latest_start = cfs_time_current_sec();
-
        spin_lock(&lfsck->li_lock);
        spin_lock(&lfsck->li_lock);
+
        if (ns->ln_flags & LF_SCANNED_ONCE) {
                if (!lfsck->li_drop_dryrun ||
                    lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
        if (ns->ln_flags & LF_SCANNED_ONCE) {
                if (!lfsck->li_drop_dryrun ||
                    lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
@@ -829,14 +897,18 @@ static int lfsck_namespace_prep(const struct lu_env *env,
                        *pos = ns->ln_pos_first_inconsistent;
                }
        }
                        *pos = ns->ln_pos_first_inconsistent;
                }
        }
+
        spin_unlock(&lfsck->li_lock);
        up_write(&com->lc_sem);
 
        spin_unlock(&lfsck->li_lock);
        up_write(&com->lc_sem);
 
+       rc = lfsck_start_assistant(env, com, lsp);
+
        CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
        CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
-              DFID", "LPX64"]\n", lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
-              PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
+              DFID", "LPX64"]: rc = %d\n",
+              lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
+              PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
 
 
-       return 0;
+       return rc;
 }
 
 static int lfsck_namespace_exec_oit(const struct lu_env *env,
 }
 
 static int lfsck_namespace_exec_oit(const struct lu_env *env,
@@ -853,223 +925,41 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env,
 
 static int lfsck_namespace_exec_dir(const struct lu_env *env,
                                    struct lfsck_component *com,
 
 static int lfsck_namespace_exec_dir(const struct lu_env *env,
                                    struct lfsck_component *com,
-                                   struct dt_object *obj,
-                                   struct lu_dirent *ent)
+                                   struct lu_dirent *ent, __u16 type)
 {
 {
-       struct lfsck_thread_info   *info     = lfsck_env_info(env);
-       struct lu_attr             *la       = &info->lti_la;
-       struct lfsck_instance      *lfsck    = com->lc_lfsck;
-       struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace     *ns       = com->lc_file_ram;
-       struct linkea_data          ldata    = { 0 };
-       const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
-       const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
-       const struct lu_name       *cname;
-       struct thandle             *handle   = NULL;
-       bool                        repaired = false;
-       bool                        locked   = false;
-       bool                        remove;
-       bool                        newdata;
-       bool                        log      = false;
-       int                         count    = 0;
-       int                         rc;
-       ENTRY;
-
-       cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
-       down_write(&com->lc_sem);
-       com->lc_new_checked++;
-
-       if (ent->lde_attrs & LUDA_UPGRADE) {
-               ns->ln_flags |= LF_UPGRADE;
-               ns->ln_dirent_repaired++;
-               repaired = true;
-       } else if (ent->lde_attrs & LUDA_REPAIR) {
-               ns->ln_flags |= LF_INCONSISTENT;
-               ns->ln_dirent_repaired++;
-               repaired = true;
-       }
-
-       if (ent->lde_name[0] == '.' &&
-           (ent->lde_namelen == 1 ||
-            (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
-            fid_seq_is_dot(fid_seq(&ent->lde_fid))))
-               GOTO(out, rc = 0);
-
-       if (!(bk->lb_param & LPF_DRYRUN) &&
-           (com->lc_journal || repaired)) {
-
-again:
-               LASSERT(!locked);
-
-               com->lc_journal = 1;
-               handle = dt_trans_create(env, lfsck->li_next);
-               if (IS_ERR(handle))
-                       GOTO(out, rc = PTR_ERR(handle));
-
-               rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = dt_trans_start(env, lfsck->li_next, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               dt_write_lock(env, obj, MOR_TGT_CHILD);
-               locked = true;
-       }
-
-       rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       rc = lfsck_links_read(env, obj, &ldata);
-       if (rc == 0) {
-               count = ldata.ld_leh->leh_reccount;
-               rc = linkea_links_find(&ldata, cname, pfid);
-               if ((rc == 0) &&
-                   (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
-                       goto record;
-
-               ns->ln_flags |= LF_INCONSISTENT;
-               /* For dir, if there are more than one linkea entries, or the
-                * linkea entry does not match the name entry, then remove all
-                * and add the correct one. */
-               if (S_ISDIR(lfsck_object_type(obj))) {
-                       remove = true;
-                       newdata = true;
-               } else {
-                       remove = false;
-                       newdata = false;
-               }
-               goto nodata;
-       } else if (unlikely(rc == -EINVAL)) {
-               count = 1;
-               ns->ln_flags |= LF_INCONSISTENT;
-               /* The magic crashed, we are not sure whether there are more
-                * corrupt data in the linkea, so remove all linkea entries. */
-               remove = true;
-               newdata = true;
-               goto nodata;
-       } else if (rc == -ENODATA) {
-               count = 1;
-               ns->ln_flags |= LF_UPGRADE;
-               remove = false;
-               newdata = true;
-
-nodata:
-               if (bk->lb_param & LPF_DRYRUN) {
-                       ns->ln_linkea_repaired++;
-                       log = true;
-                       repaired = true;
-                       goto record;
-               }
-
-               if (!com->lc_journal)
-                       goto again;
-
-               if (remove) {
-                       LASSERT(newdata);
-
-                       rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
-                                         BYPASS_CAPA);
-                       if (rc != 0)
-                               GOTO(stop, rc);
-               }
-
-               if (newdata) {
-                       rc = linkea_data_new(&ldata,
-                                       &lfsck_env_info(env)->lti_linkea_buf);
-                       if (rc != 0)
-                               GOTO(stop, rc);
-               }
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_namespace_req      *lnr;
+       bool                             wakeup = false;
 
 
-               rc = linkea_add_buf(&ldata, cname, pfid);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = lfsck_links_write(env, obj, &ldata, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
+       lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
+       if (IS_ERR(lnr)) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
 
 
-               count = ldata.ld_leh->leh_reccount;
-               ns->ln_linkea_repaired++;
-               log = true;
-               repaired = true;
-       } else {
-               GOTO(stop, rc);
+               lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
+               return PTR_ERR(lnr);
        }
 
        }
 
-record:
-       LASSERT(count > 0);
-
-       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       if ((count == 1) &&
-           (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
-               /* Usually, it is for single linked object or dir, do nothing.*/
-               GOTO(stop, rc);
-
-       /* Following modification will be in another transaction.  */
-       if (handle != NULL) {
-               LASSERT(dt_write_locked(env, obj));
-
-               dt_write_unlock(env, obj);
-               locked = false;
-
-               dt_trans_stop(env, lfsck->li_next, handle);
-               handle = NULL;
-
-               if (log)
-                       CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired "
-                             "linkEA for the object: "DFID", parent "
-                             DFID", name %.*s\n",
-                             lfsck_lfsck2name(lfsck), PFID(cfid), PFID(pfid),
-                             ent->lde_namelen, ent->lde_name);
+       spin_lock(&lad->lad_lock);
+       if (lad->lad_assistant_status < 0) {
+               spin_unlock(&lad->lad_lock);
+               lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
+               return lad->lad_assistant_status;
        }
 
        }
 
-       ns->ln_mlinked_checked++;
-       rc = lfsck_namespace_update(env, com, cfid,
-                       count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
-
-       GOTO(out, rc);
-
-stop:
-       if (locked)
-               dt_write_unlock(env, obj);
+       list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
+       if (lad->lad_prefetched == 0)
+               wakeup = true;
 
 
-       if (handle != NULL)
-               dt_trans_stop(env, lfsck->li_next, handle);
+       lad->lad_prefetched++;
+       spin_unlock(&lad->lad_lock);
+       if (wakeup)
+               wake_up_all(&lad->lad_thread.t_ctl_waitq);
 
 
-out:
-       if (rc < 0) {
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK exec_dir failed, "
-                      "parent "DFID", child name %.*s, child FID "DFID
-                      ": rc = %d\n", lfsck_lfsck2name(lfsck), PFID(pfid),
-                      ent->lde_namelen, ent->lde_name, PFID(cfid), rc);
-
-               ns->ln_items_failed++;
-               if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
-                       lfsck_pos_fill(env, lfsck,
-                                      &ns->ln_pos_first_inconsistent, false);
-               if (!(bk->lb_param & LPF_FAILOUT))
-                       rc = 0;
-       } else {
-               if (repaired) {
-                       ns->ln_items_repaired++;
-                       if (bk->lb_param & LPF_DRYRUN &&
-                           lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
-                               lfsck_pos_fill(env, lfsck,
-                                              &ns->ln_pos_first_inconsistent,
-                                              false);
-               } else {
-                       com->lc_journal = 0;
-               }
-               rc = 0;
-       }
+       down_write(&com->lc_sem);
+       com->lc_new_checked++;
        up_write(&com->lc_sem);
        up_write(&com->lc_sem);
-       return rc;
+
+       return 0;
 }
 
 static int lfsck_namespace_post(const struct lu_env *env,
 }
 
 static int lfsck_namespace_post(const struct lu_env *env,
@@ -1079,11 +969,14 @@ static int lfsck_namespace_post(const struct lu_env *env,
        struct lfsck_instance   *lfsck = com->lc_lfsck;
        struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
        struct lfsck_instance   *lfsck = com->lc_lfsck;
        struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
+       ENTRY;
+
+       lfsck_post_generic(env, com, &result);
 
        down_write(&com->lc_sem);
        spin_lock(&lfsck->li_lock);
        if (!init)
 
        down_write(&com->lc_sem);
        spin_lock(&lfsck->li_lock);
        if (!init)
-               ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
+               ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
        if (result > 0) {
                ns->ln_status = LS_SCANNING_PHASE2;
                ns->ln_flags |= LF_SCANNED_ONCE;
        if (result > 0) {
                ns->ln_status = LS_SCANNING_PHASE2;
                ns->ln_flags |= LF_SCANNED_ONCE;
@@ -1119,7 +1012,7 @@ static int lfsck_namespace_post(const struct lu_env *env,
        CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
               lfsck_lfsck2name(lfsck), rc);
 
        CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
               lfsck_lfsck2name(lfsck), rc);
 
-       return rc;
+       RETURN(rc);
 }
 
 static int
 }
 
 static int
@@ -1365,37 +1258,420 @@ out:
        return 0;
 }
 
        return 0;
 }
 
-static int lfsck_namespace_double_scan_main(void *args)
+static int lfsck_namespace_double_scan(const struct lu_env *env,
+                                      struct lfsck_component *com)
 {
 {
-       struct lfsck_thread_args *lta   = args;
-       const struct lu_env     *env    = &lta->lta_env;
-       struct lfsck_component  *com    = lta->lta_com;
-       struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct ptlrpc_thread    *thread = &lfsck->li_thread;
-       struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns     = com->lc_file_ram;
-       struct dt_object        *obj    = com->lc_obj;
-       const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
-       struct dt_object        *target;
-       struct dt_it            *di;
-       struct dt_key           *key;
-       struct lu_fid            fid;
-       int                      rc;
-       __u8                     flags = 0;
-       ENTRY;
+       struct lfsck_namespace *ns = com->lc_file_ram;
 
 
-       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
-              lfsck_lfsck2name(lfsck));
+       return lfsck_double_scan_generic(env, com, ns->ln_status);
+}
 
 
-       com->lc_new_checked = 0;
-       com->lc_new_scanned = 0;
-       com->lc_time_last_checkpoint = cfs_time_current();
-       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+static void lfsck_namespace_data_release(const struct lu_env *env,
+                                        struct lfsck_component *com)
+{
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc           *ltd;
+       struct lfsck_tgt_desc           *next;
+
+       LASSERT(lad != NULL);
+       LASSERT(thread_is_init(&lad->lad_thread) ||
+               thread_is_stopped(&lad->lad_thread));
+       LASSERT(list_empty(&lad->lad_req_list));
+
+       com->lc_data = NULL;
+
+       spin_lock(&ltds->ltd_lock);
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
+                                ltd_namespace_phase_list) {
+               list_del_init(&ltd->ltd_namespace_phase_list);
+       }
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+                                ltd_namespace_phase_list) {
+               list_del_init(&ltd->ltd_namespace_phase_list);
+       }
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
+                                ltd_namespace_list) {
+               list_del_init(&ltd->ltd_namespace_list);
+       }
+       spin_unlock(&ltds->ltd_lock);
+
+       OBD_FREE_PTR(lad);
+}
+
+static int lfsck_namespace_in_notify(const struct lu_env *env,
+                                    struct lfsck_component *com,
+                                    struct lfsck_request *lr)
+{
+       struct lfsck_instance           *lfsck = com->lc_lfsck;
+       struct lfsck_namespace          *ns    = com->lc_file_ram;
+       struct lfsck_assistant_data     *lad   = com->lc_data;
+       struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc           *ltd;
+       bool                             fail  = false;
+       ENTRY;
+
+       if (lr->lr_event != LE_PHASE1_DONE &&
+           lr->lr_event != LE_PHASE2_DONE &&
+           lr->lr_event != LE_PEER_EXIT)
+               RETURN(-EINVAL);
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
+              "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
+              lr->lr_index, lr->lr_status);
+
+       spin_lock(&ltds->ltd_lock);
+       ltd = LTD_TGT(ltds, lr->lr_index);
+       if (ltd == NULL) {
+               spin_unlock(&ltds->ltd_lock);
+
+               RETURN(-ENXIO);
+       }
+
+       list_del_init(&ltd->ltd_namespace_phase_list);
+       switch (lr->lr_event) {
+       case LE_PHASE1_DONE:
+               if (lr->lr_status <= 0) {
+                       ltd->ltd_namespace_done = 1;
+                       list_del_init(&ltd->ltd_namespace_list);
+                       CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
+                              "phase1 for namespace LFSCK: rc = %d.\n",
+                              lfsck_lfsck2name(lfsck),
+                              ltd->ltd_index, lr->lr_status);
+                       ns->ln_flags |= LF_INCOMPLETE;
+                       fail = true;
+                       break;
+               }
+
+               if (list_empty(&ltd->ltd_namespace_list))
+                       list_add_tail(&ltd->ltd_namespace_list,
+                                     &lad->lad_mdt_list);
+               list_add_tail(&ltd->ltd_namespace_phase_list,
+                             &lad->lad_mdt_phase2_list);
+               break;
+       case LE_PHASE2_DONE:
+               ltd->ltd_namespace_done = 1;
+               list_del_init(&ltd->ltd_namespace_list);
+               break;
+       case LE_PEER_EXIT:
+               fail = true;
+               ltd->ltd_namespace_done = 1;
+               list_del_init(&ltd->ltd_namespace_list);
+               if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
+                       CDEBUG(D_LFSCK,
+                              "%s: the peer MDT %x exit namespace LFSCK\n",
+                              lfsck_lfsck2name(lfsck), ltd->ltd_index);
+                       ns->ln_flags |= LF_INCOMPLETE;
+               }
+               break;
+       default:
+               break;
+       }
+       spin_unlock(&ltds->ltd_lock);
+
+       if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
+               struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
+
+               memset(stop, 0, sizeof(*stop));
+               stop->ls_status = lr->lr_status;
+               stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
+               lfsck_stop(env, lfsck->li_bottom, stop);
+       } else if (lfsck_phase2_next_ready(lad)) {
+               wake_up_all(&lad->lad_thread.t_ctl_waitq);
+       }
+
+       RETURN(0);
+}
+
+static int lfsck_namespace_query(const struct lu_env *env,
+                                struct lfsck_component *com)
+{
+       struct lfsck_namespace *ns = com->lc_file_ram;
+
+       return ns->ln_status;
+}
+
+static struct lfsck_operations lfsck_namespace_ops = {
+       .lfsck_reset            = lfsck_namespace_reset,
+       .lfsck_fail             = lfsck_namespace_fail,
+       .lfsck_checkpoint       = lfsck_namespace_checkpoint,
+       .lfsck_prep             = lfsck_namespace_prep,
+       .lfsck_exec_oit         = lfsck_namespace_exec_oit,
+       .lfsck_exec_dir         = lfsck_namespace_exec_dir,
+       .lfsck_post             = lfsck_namespace_post,
+       .lfsck_dump             = lfsck_namespace_dump,
+       .lfsck_double_scan      = lfsck_namespace_double_scan,
+       .lfsck_data_release     = lfsck_namespace_data_release,
+       .lfsck_quit             = lfsck_quit_generic,
+       .lfsck_in_notify        = lfsck_namespace_in_notify,
+       .lfsck_query            = lfsck_namespace_query,
+};
+
+static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
+                                               struct lfsck_component *com,
+                                               struct lfsck_assistant_req *lar)
+{
+       struct lfsck_thread_info   *info     = lfsck_env_info(env);
+       struct lu_attr             *la       = &info->lti_la;
+       struct lfsck_instance      *lfsck    = com->lc_lfsck;
+       struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
+       struct lfsck_namespace     *ns       = com->lc_file_ram;
+       struct linkea_data          ldata    = { 0 };
+       const struct lu_name       *cname;
+       struct thandle             *handle   = NULL;
+       struct lfsck_namespace_req *lnr      =
+                       container_of0(lar, struct lfsck_namespace_req, lnr_lar);
+       struct dt_object           *dir      = lnr->lnr_obj;
+       struct dt_object           *obj      = NULL;
+       const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
+       bool                        repaired = false;
+       bool                        locked   = false;
+       bool                        remove;
+       bool                        newdata;
+       bool                        log      = false;
+       int                         count    = 0;
+       int                         rc;
+       ENTRY;
+
+       if (lnr->lnr_attr & LUDA_UPGRADE) {
+               ns->ln_flags |= LF_UPGRADE;
+               ns->ln_dirent_repaired++;
+               repaired = true;
+       } else if (lnr->lnr_attr & LUDA_REPAIR) {
+               ns->ln_flags |= LF_INCONSISTENT;
+               ns->ln_dirent_repaired++;
+               repaired = true;
+       }
+
+       if (lnr->lnr_name[0] == '.' &&
+           (lnr->lnr_namelen == 1 ||
+            (lnr->lnr_namelen == 2 && lnr->lnr_name[1] == '.') ||
+            fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
+               GOTO(out, rc = 0);
+
+       obj = lfsck_object_find(env, lfsck, &lnr->lnr_fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       if (dt_object_exists(obj) == 0) {
+               rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               /* XXX: dangling name entry, will handle it in other patch. */
+               GOTO(out, rc);
+       }
+
+       cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
+       if (!(bk->lb_param & LPF_DRYRUN) &&
+           (com->lc_journal || repaired)) {
+
+again:
+               LASSERT(!locked);
+
+               com->lc_journal = 1;
+               handle = dt_trans_create(env, lfsck->li_next);
+               if (IS_ERR(handle))
+                       GOTO(out, rc = PTR_ERR(handle));
+
+               rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rc = dt_trans_start(env, lfsck->li_next, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               dt_write_lock(env, obj, MOR_TGT_CHILD);
+               locked = true;
+       }
+
+       rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = lfsck_links_read(env, obj, &ldata);
+       if (rc == 0) {
+               count = ldata.ld_leh->leh_reccount;
+               rc = linkea_links_find(&ldata, cname, pfid);
+               if ((rc == 0) &&
+                   (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
+                       goto record;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+               /* For dir, if there are more than one linkea entries, or the
+                * linkea entry does not match the name entry, then remove all
+                * and add the correct one. */
+               if (S_ISDIR(lfsck_object_type(obj))) {
+                       remove = true;
+                       newdata = true;
+               } else {
+                       remove = false;
+                       newdata = false;
+               }
+               goto nodata;
+       } else if (unlikely(rc == -EINVAL)) {
+               count = 1;
+               ns->ln_flags |= LF_INCONSISTENT;
+               /* The magic crashed, we are not sure whether there are more
+                * corrupt data in the linkea, so remove all linkea entries. */
+               remove = true;
+               newdata = true;
+               goto nodata;
+       } else if (rc == -ENODATA) {
+               count = 1;
+               ns->ln_flags |= LF_UPGRADE;
+               remove = false;
+               newdata = true;
+
+nodata:
+               if (bk->lb_param & LPF_DRYRUN) {
+                       ns->ln_linkea_repaired++;
+                       repaired = true;
+                       log = true;
+                       goto record;
+               }
+
+               if (!com->lc_journal)
+                       goto again;
+
+               if (remove) {
+                       LASSERT(newdata);
+
+                       rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
+                                         BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+
+               if (newdata) {
+                       rc = linkea_data_new(&ldata,
+                                       &lfsck_env_info(env)->lti_linkea_buf);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+
+               rc = linkea_add_buf(&ldata, cname, pfid);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rc = lfsck_links_write(env, obj, &ldata, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               count = ldata.ld_leh->leh_reccount;
+               ns->ln_linkea_repaired++;
+               repaired = true;
+               log = true;
+       } else {
+               GOTO(stop, rc);
+       }
+
+record:
+       LASSERT(count > 0);
+
+       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       if ((count == 1) &&
+           (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
+               /* Usually, it is for single linked object or dir, do nothing.*/
+               GOTO(stop, rc);
+
+       /* Following modification will be in another transaction.  */
+       if (handle != NULL) {
+               LASSERT(dt_write_locked(env, obj));
+
+               dt_write_unlock(env, obj);
+               locked = false;
+
+               dt_trans_stop(env, lfsck->li_next, handle);
+               handle = NULL;
+       }
+
+       ns->ln_mlinked_checked++;
+       rc = lfsck_namespace_update(env, com, &lnr->lnr_fid,
+                       count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
+
+       GOTO(out, rc);
+
+stop:
+       if (locked)
+               dt_write_unlock(env, obj);
+
+       if (handle != NULL)
+               dt_trans_stop(env, lfsck->li_next, handle);
+
+out:
+       down_write(&com->lc_sem);
+       if (rc < 0) {
+               CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
+                      "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
+                      PFID(lfsck_dto2fid(lnr->lnr_obj)),
+                      lnr->lnr_namelen, lnr->lnr_name, rc);
+
+               lfsck_namespace_record_failure(env, lfsck, ns);
+               if (!(bk->lb_param & LPF_FAILOUT))
+                       rc = 0;
+       } else {
+               if (log)
+                       CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
+                              "repaired the entry: "DFID", parent "DFID
+                              ", name %.*s\n", lfsck_lfsck2name(lfsck),
+                              PFID(&lnr->lnr_fid),
+                              PFID(lfsck_dto2fid(lnr->lnr_obj)),
+                              lnr->lnr_namelen, lnr->lnr_name);
+
+               if (repaired) {
+                       ns->ln_items_repaired++;
+                       if (bk->lb_param & LPF_DRYRUN &&
+                           lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
+                               lfsck_pos_fill(env, lfsck,
+                                              &ns->ln_pos_first_inconsistent,
+                                              false);
+               } else {
+                       com->lc_journal = 0;
+               }
+               rc = 0;
+       }
+       up_write(&com->lc_sem);
+
+       if (obj != NULL && !IS_ERR(obj))
+               lfsck_object_put(env, obj);
+       return rc;
+}
+
+static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
+                                               struct lfsck_component *com)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct ptlrpc_thread    *thread = &lfsck->li_thread;
+       struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
+       struct dt_object        *obj    = com->lc_obj;
+       const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
+       struct dt_object        *target;
+       struct dt_it            *di;
+       struct dt_key           *key;
+       struct lu_fid            fid;
+       int                      rc;
+       __u8                     flags = 0;
+       ENTRY;
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
+              lfsck_lfsck2name(lfsck));
+
+       com->lc_new_checked = 0;
+       com->lc_new_scanned = 0;
+       com->lc_time_last_checkpoint = cfs_time_current();
+       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
 
        di = iops->init(env, obj, 0, BYPASS_CAPA);
        if (IS_ERR(di))
 
        di = iops->init(env, obj, 0, BYPASS_CAPA);
        if (IS_ERR(di))
-               GOTO(out, rc = PTR_ERR(di));
+               RETURN(PTR_ERR(di));
 
        fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
        rc = iops->get(env, di, (const struct dt_key *)&fid);
 
        fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
        rc = iops->get(env, di, (const struct dt_key *)&fid);
@@ -1407,9 +1683,6 @@ static int lfsck_namespace_double_scan_main(void *args)
        if (rc != 0)
                GOTO(put, rc);
 
        if (rc != 0)
                GOTO(put, rc);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
-               GOTO(put, rc = 0);
-
        do {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
                    cfs_fail_val > 0) {
        do {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
                    cfs_fail_val > 0) {
@@ -1497,8 +1770,34 @@ put:
 
 fini:
        iops->fini(env, di);
 
 fini:
        iops->fini(env, di);
+       return rc;
+}
+
+static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
+                                              struct lfsck_component *com,
+                                              struct lfsck_position *pos)
+{
+       struct lfsck_assistant_data     *lad = com->lc_data;
+       struct lfsck_namespace_req      *lnr;
+
+       if (list_empty(&lad->lad_req_list))
+               return;
+
+       lnr = list_entry(lad->lad_req_list.next,
+                        struct lfsck_namespace_req,
+                        lnr_lar.lar_list);
+       pos->lp_oit_cookie = lnr->lnr_oit_cookie;
+       pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
+       pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
+}
+
+static int lfsck_namespace_double_scan_result(const struct lu_env *env,
+                                             struct lfsck_component *com,
+                                             int rc)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
 
 
-out:
        down_write(&com->lc_sem);
        ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
        down_write(&com->lc_sem);
        ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
@@ -1508,8 +1807,11 @@ out:
 
        if (rc > 0) {
                com->lc_journal = 0;
 
        if (rc > 0) {
                com->lc_journal = 0;
-               ns->ln_status = LS_COMPLETED;
-               if (!(bk->lb_param & LPF_DRYRUN))
+               if (ns->ln_flags & LF_INCOMPLETE)
+                       ns->ln_status = LS_PARTIAL;
+               else
+                       ns->ln_status = LS_COMPLETED;
+               if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
                        ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
                ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
                ns->ln_success_count++;
                        ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
                ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
                ns->ln_success_count++;
@@ -1521,80 +1823,18 @@ out:
                ns->ln_status = LS_FAILED;
        }
 
                ns->ln_status = LS_FAILED;
        }
 
-       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan finished, status %d: "
-             "rc = %d\n", lfsck_lfsck2name(lfsck), ns->ln_status, rc);
-
        rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
        rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
-       if (atomic_dec_and_test(&lfsck->li_double_scan_count))
-               wake_up_all(&thread->t_ctl_waitq);
-
-       lfsck_thread_args_fini(lta);
 
        return rc;
 }
 
 
        return rc;
 }
 
-static int lfsck_namespace_double_scan(const struct lu_env *env,
-                                      struct lfsck_component *com)
-{
-       struct lfsck_instance           *lfsck = com->lc_lfsck;
-       struct lfsck_namespace          *ns    = com->lc_file_ram;
-       struct lfsck_thread_args        *lta;
-       struct task_struct              *task;
-       int                              rc;
-       ENTRY;
-
-       if (unlikely(ns->ln_status != LS_SCANNING_PHASE2))
-               RETURN(0);
-
-       lta = lfsck_thread_args_init(lfsck, com, NULL);
-       if (IS_ERR(lta))
-               GOTO(out, rc = PTR_ERR(lta));
-
-       atomic_inc(&lfsck->li_double_scan_count);
-       task = kthread_run(lfsck_namespace_double_scan_main, lta,
-                          "lfsck_namespace");
-       if (IS_ERR(task)) {
-               atomic_dec(&lfsck->li_double_scan_count);
-               lfsck_thread_args_fini(lta);
-               GOTO(out, rc = PTR_ERR(task));
-       }
-
-       RETURN(0);
-
-out:
-       CERROR("%s: cannot start LFSCK namespace thread: rc = %d\n",
-              lfsck_lfsck2name(lfsck), rc);
-       return rc;
-}
-
-static int lfsck_namespace_in_notify(const struct lu_env *env,
-                                    struct lfsck_component *com,
-                                    struct lfsck_request *lr)
-{
-       return 0;
-}
-
-static int lfsck_namespace_query(const struct lu_env *env,
-                                struct lfsck_component *com)
-{
-       struct lfsck_namespace *ns = com->lc_file_ram;
-
-       return ns->ln_status;
-}
-
-static struct lfsck_operations lfsck_namespace_ops = {
-       .lfsck_reset            = lfsck_namespace_reset,
-       .lfsck_fail             = lfsck_namespace_fail,
-       .lfsck_checkpoint       = lfsck_namespace_checkpoint,
-       .lfsck_prep             = lfsck_namespace_prep,
-       .lfsck_exec_oit         = lfsck_namespace_exec_oit,
-       .lfsck_exec_dir         = lfsck_namespace_exec_dir,
-       .lfsck_post             = lfsck_namespace_post,
-       .lfsck_dump             = lfsck_namespace_dump,
-       .lfsck_double_scan      = lfsck_namespace_double_scan,
-       .lfsck_in_notify        = lfsck_namespace_in_notify,
-       .lfsck_query            = lfsck_namespace_query,
+struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
+       .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
+       .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
+       .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
+       .la_double_scan_result  = lfsck_namespace_double_scan_result,
+       .la_req_fini            = lfsck_namespace_assistant_req_fini,
 };
 
 /**
 };
 
 /**
@@ -1900,6 +2140,12 @@ int lfsck_namespace_setup(const struct lu_env *env,
        com->lc_lfsck = lfsck;
        com->lc_type = LFSCK_TYPE_NAMESPACE;
        com->lc_ops = &lfsck_namespace_ops;
        com->lc_lfsck = lfsck;
        com->lc_type = LFSCK_TYPE_NAMESPACE;
        com->lc_ops = &lfsck_namespace_ops;
+       com->lc_data = lfsck_assistant_data_init(
+                       &lfsck_namespace_assistant_ops,
+                       "lfsck_namespace");
+       if (com->lc_data == NULL)
+               GOTO(out, rc = -ENOMEM);
+
        com->lc_file_size = sizeof(struct lfsck_namespace);
        OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
        if (com->lc_file_ram == NULL)
        com->lc_file_size = sizeof(struct lfsck_namespace);
        OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
        if (com->lc_file_ram == NULL)