Whamcloud - gitweb
LU-4788 lfsck: namespace LFSCK uses assistant thread
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index dc4bb5e..6bd6dee 100644 (file)
 
 #define LFSCK_NAMESPACE_MAGIC  0xA0629D03
 
+enum lfsck_nameentry_check {
+       LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
+       LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
+       LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
+};
+
 static const char lfsck_namespace_name[] = "lfsck_namespace";
 
+struct lfsck_namespace_req {
+       struct lfsck_assistant_req       lnr_lar;
+       struct dt_object                *lnr_obj;
+       struct lu_fid                    lnr_fid;
+       __u64                            lnr_oit_cookie;
+       __u64                            lnr_dir_cookie;
+       __u32                            lnr_attr;
+       __u32                            lnr_size;
+       __u16                            lnr_type;
+       __u16                            lnr_namelen;
+       char                             lnr_name[0];
+};
+
+static struct lfsck_namespace_req *
+lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
+                                  struct lu_dirent *ent, __u16 type)
+{
+       struct lfsck_namespace_req *lnr;
+       int                         size;
+
+       size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
+       OBD_ALLOC(lnr, size);
+       if (lnr == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
+       lu_object_get(&lfsck->li_obj_dir->do_lu);
+       lnr->lnr_obj = lfsck->li_obj_dir;
+       lnr->lnr_fid = ent->lde_fid;
+       lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
+       lnr->lnr_dir_cookie = ent->lde_hash;
+       lnr->lnr_attr = ent->lde_attrs;
+       lnr->lnr_size = size;
+       lnr->lnr_type = type;
+       lnr->lnr_namelen = ent->lde_namelen;
+       memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
+
+       return lnr;
+}
+
+static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
+                                              struct lfsck_assistant_req *lar)
+{
+       struct lfsck_namespace_req *lnr =
+                       container_of0(lar, struct lfsck_namespace_req, lnr_lar);
+
+       lu_object_put(env, &lnr->lnr_obj->do_lu);
+       OBD_FREE(lnr, lnr->lnr_size);
+}
+
 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
                                      struct lfsck_namespace *src)
 {
@@ -117,6 +173,27 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
 }
 
+static void lfsck_namespace_record_failure(const struct lu_env *env,
+                                          struct lfsck_instance *lfsck,
+                                          struct lfsck_namespace *ns)
+{
+       struct lfsck_position pos;
+
+       ns->ln_items_failed++;
+       lfsck_pos_fill(env, lfsck, &pos, false);
+       if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
+           lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
+               ns->ln_pos_first_inconsistent = pos;
+
+               CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
+                      "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
+                      lfsck_lfsck2name(lfsck),
+                      ns->ln_pos_first_inconsistent.lp_oit_cookie,
+                      PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
+                      ns->ln_pos_first_inconsistent.lp_dir_cookie);
+       }
+}
+
 /**
  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
  * \retval 0: succeed.
@@ -325,10 +402,9 @@ out:
 }
 
 static int lfsck_namespace_check_exist(const struct lu_env *env,
-                                      struct lfsck_instance *lfsck,
+                                      struct dt_object *dir,
                                       struct dt_object *obj, const char *name)
 {
-       struct dt_object *dir = lfsck->li_obj_dir;
        struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
        int               rc;
        ENTRY;
@@ -711,18 +787,7 @@ lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
        down_write(&com->lc_sem);
        if (new_checked)
                com->lc_new_checked++;
-       ns->ln_items_failed++;
-       if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
-               lfsck_pos_fill(env, com->lc_lfsck,
-                              &ns->ln_pos_first_inconsistent, false);
-
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
-                      "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
-                      lfsck_lfsck2name(com->lc_lfsck),
-                      ns->ln_pos_first_inconsistent.lp_oit_cookie,
-                      PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
-                      ns->ln_pos_first_inconsistent.lp_dir_cookie);
-       }
+       lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
        up_write(&com->lc_sem);
 }
 
@@ -733,14 +798,17 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env,
        struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
 
-       if (com->lc_new_checked == 0 && !init)
-               return 0;
+       if (!init) {
+               rc = lfsck_checkpoint_generic(env, com);
+               if (rc != 0)
+                       goto log;
+       }
 
        down_write(&com->lc_sem);
        if (init) {
-               ns->ln_pos_latest_start = lfsck->li_pos_current;
+               ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
        } else {
-               ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
+               ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
                ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
                ns->ln_time_last_checkpoint = cfs_time_current_sec();
@@ -751,13 +819,14 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env,
        rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
 
+log:
        CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
               ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
               lfsck->li_pos_current.lp_oit_cookie,
               PFID(&lfsck->li_pos_current.lp_dir_parent),
               lfsck->li_pos_current.lp_dir_cookie, rc);
 
-       return rc;
+       return rc > 0 ? 0 : rc;
 }
 
 static int lfsck_namespace_prep(const struct lu_env *env,
@@ -767,10 +836,9 @@ static int lfsck_namespace_prep(const struct lu_env *env,
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
        struct lfsck_namespace  *ns     = com->lc_file_ram;
        struct lfsck_position   *pos    = &com->lc_pos_start;
+       int                      rc;
 
        if (ns->ln_status == LS_COMPLETED) {
-               int rc;
-
                rc = lfsck_namespace_reset(env, com, false);
                if (rc == 0)
                        rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
@@ -785,8 +853,8 @@ static int lfsck_namespace_prep(const struct lu_env *env,
 
        down_write(&com->lc_sem);
        ns->ln_time_latest_start = cfs_time_current_sec();
-
        spin_lock(&lfsck->li_lock);
+
        if (ns->ln_flags & LF_SCANNED_ONCE) {
                if (!lfsck->li_drop_dryrun ||
                    lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
@@ -829,14 +897,18 @@ static int lfsck_namespace_prep(const struct lu_env *env,
                        *pos = ns->ln_pos_first_inconsistent;
                }
        }
+
        spin_unlock(&lfsck->li_lock);
        up_write(&com->lc_sem);
 
+       rc = lfsck_start_assistant(env, com, lsp);
+
        CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
-              DFID", "LPX64"]\n", lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
-              PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
+              DFID", "LPX64"]: rc = %d\n",
+              lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
+              PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
 
-       return 0;
+       return rc;
 }
 
 static int lfsck_namespace_exec_oit(const struct lu_env *env,
@@ -853,223 +925,41 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env,
 
 static int lfsck_namespace_exec_dir(const struct lu_env *env,
                                    struct lfsck_component *com,
-                                   struct dt_object *obj,
-                                   struct lu_dirent *ent)
+                                   struct lu_dirent *ent, __u16 type)
 {
-       struct lfsck_thread_info   *info     = lfsck_env_info(env);
-       struct lu_attr             *la       = &info->lti_la;
-       struct lfsck_instance      *lfsck    = com->lc_lfsck;
-       struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace     *ns       = com->lc_file_ram;
-       struct linkea_data          ldata    = { 0 };
-       const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
-       const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
-       const struct lu_name       *cname;
-       struct thandle             *handle   = NULL;
-       bool                        repaired = false;
-       bool                        locked   = false;
-       bool                        remove;
-       bool                        newdata;
-       bool                        log      = false;
-       int                         count    = 0;
-       int                         rc;
-       ENTRY;
-
-       cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
-       down_write(&com->lc_sem);
-       com->lc_new_checked++;
-
-       if (ent->lde_attrs & LUDA_UPGRADE) {
-               ns->ln_flags |= LF_UPGRADE;
-               ns->ln_dirent_repaired++;
-               repaired = true;
-       } else if (ent->lde_attrs & LUDA_REPAIR) {
-               ns->ln_flags |= LF_INCONSISTENT;
-               ns->ln_dirent_repaired++;
-               repaired = true;
-       }
-
-       if (ent->lde_name[0] == '.' &&
-           (ent->lde_namelen == 1 ||
-            (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
-            fid_seq_is_dot(fid_seq(&ent->lde_fid))))
-               GOTO(out, rc = 0);
-
-       if (!(bk->lb_param & LPF_DRYRUN) &&
-           (com->lc_journal || repaired)) {
-
-again:
-               LASSERT(!locked);
-
-               com->lc_journal = 1;
-               handle = dt_trans_create(env, lfsck->li_next);
-               if (IS_ERR(handle))
-                       GOTO(out, rc = PTR_ERR(handle));
-
-               rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = dt_trans_start(env, lfsck->li_next, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               dt_write_lock(env, obj, MOR_TGT_CHILD);
-               locked = true;
-       }
-
-       rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       rc = lfsck_links_read(env, obj, &ldata);
-       if (rc == 0) {
-               count = ldata.ld_leh->leh_reccount;
-               rc = linkea_links_find(&ldata, cname, pfid);
-               if ((rc == 0) &&
-                   (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
-                       goto record;
-
-               ns->ln_flags |= LF_INCONSISTENT;
-               /* For dir, if there are more than one linkea entries, or the
-                * linkea entry does not match the name entry, then remove all
-                * and add the correct one. */
-               if (S_ISDIR(lfsck_object_type(obj))) {
-                       remove = true;
-                       newdata = true;
-               } else {
-                       remove = false;
-                       newdata = false;
-               }
-               goto nodata;
-       } else if (unlikely(rc == -EINVAL)) {
-               count = 1;
-               ns->ln_flags |= LF_INCONSISTENT;
-               /* The magic crashed, we are not sure whether there are more
-                * corrupt data in the linkea, so remove all linkea entries. */
-               remove = true;
-               newdata = true;
-               goto nodata;
-       } else if (rc == -ENODATA) {
-               count = 1;
-               ns->ln_flags |= LF_UPGRADE;
-               remove = false;
-               newdata = true;
-
-nodata:
-               if (bk->lb_param & LPF_DRYRUN) {
-                       ns->ln_linkea_repaired++;
-                       log = true;
-                       repaired = true;
-                       goto record;
-               }
-
-               if (!com->lc_journal)
-                       goto again;
-
-               if (remove) {
-                       LASSERT(newdata);
-
-                       rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
-                                         BYPASS_CAPA);
-                       if (rc != 0)
-                               GOTO(stop, rc);
-               }
-
-               if (newdata) {
-                       rc = linkea_data_new(&ldata,
-                                       &lfsck_env_info(env)->lti_linkea_buf);
-                       if (rc != 0)
-                               GOTO(stop, rc);
-               }
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_namespace_req      *lnr;
+       bool                             wakeup = false;
 
-               rc = linkea_add_buf(&ldata, cname, pfid);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = lfsck_links_write(env, obj, &ldata, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
+       lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
+       if (IS_ERR(lnr)) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
 
-               count = ldata.ld_leh->leh_reccount;
-               ns->ln_linkea_repaired++;
-               log = true;
-               repaired = true;
-       } else {
-               GOTO(stop, rc);
+               lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
+               return PTR_ERR(lnr);
        }
 
-record:
-       LASSERT(count > 0);
-
-       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       if ((count == 1) &&
-           (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
-               /* Usually, it is for single linked object or dir, do nothing.*/
-               GOTO(stop, rc);
-
-       /* Following modification will be in another transaction.  */
-       if (handle != NULL) {
-               LASSERT(dt_write_locked(env, obj));
-
-               dt_write_unlock(env, obj);
-               locked = false;
-
-               dt_trans_stop(env, lfsck->li_next, handle);
-               handle = NULL;
-
-               if (log)
-                       CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired "
-                             "linkEA for the object: "DFID", parent "
-                             DFID", name %.*s\n",
-                             lfsck_lfsck2name(lfsck), PFID(cfid), PFID(pfid),
-                             ent->lde_namelen, ent->lde_name);
+       spin_lock(&lad->lad_lock);
+       if (lad->lad_assistant_status < 0) {
+               spin_unlock(&lad->lad_lock);
+               lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
+               return lad->lad_assistant_status;
        }
 
-       ns->ln_mlinked_checked++;
-       rc = lfsck_namespace_update(env, com, cfid,
-                       count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
-
-       GOTO(out, rc);
-
-stop:
-       if (locked)
-               dt_write_unlock(env, obj);
+       list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
+       if (lad->lad_prefetched == 0)
+               wakeup = true;
 
-       if (handle != NULL)
-               dt_trans_stop(env, lfsck->li_next, handle);
+       lad->lad_prefetched++;
+       spin_unlock(&lad->lad_lock);
+       if (wakeup)
+               wake_up_all(&lad->lad_thread.t_ctl_waitq);
 
-out:
-       if (rc < 0) {
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK exec_dir failed, "
-                      "parent "DFID", child name %.*s, child FID "DFID
-                      ": rc = %d\n", lfsck_lfsck2name(lfsck), PFID(pfid),
-                      ent->lde_namelen, ent->lde_name, PFID(cfid), rc);
-
-               ns->ln_items_failed++;
-               if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
-                       lfsck_pos_fill(env, lfsck,
-                                      &ns->ln_pos_first_inconsistent, false);
-               if (!(bk->lb_param & LPF_FAILOUT))
-                       rc = 0;
-       } else {
-               if (repaired) {
-                       ns->ln_items_repaired++;
-                       if (bk->lb_param & LPF_DRYRUN &&
-                           lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
-                               lfsck_pos_fill(env, lfsck,
-                                              &ns->ln_pos_first_inconsistent,
-                                              false);
-               } else {
-                       com->lc_journal = 0;
-               }
-               rc = 0;
-       }
+       down_write(&com->lc_sem);
+       com->lc_new_checked++;
        up_write(&com->lc_sem);
-       return rc;
+
+       return 0;
 }
 
 static int lfsck_namespace_post(const struct lu_env *env,
@@ -1079,11 +969,14 @@ static int lfsck_namespace_post(const struct lu_env *env,
        struct lfsck_instance   *lfsck = com->lc_lfsck;
        struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
+       ENTRY;
+
+       lfsck_post_generic(env, com, &result);
 
        down_write(&com->lc_sem);
        spin_lock(&lfsck->li_lock);
        if (!init)
-               ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
+               ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
        if (result > 0) {
                ns->ln_status = LS_SCANNING_PHASE2;
                ns->ln_flags |= LF_SCANNED_ONCE;
@@ -1119,7 +1012,7 @@ static int lfsck_namespace_post(const struct lu_env *env,
        CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
               lfsck_lfsck2name(lfsck), rc);
 
-       return rc;
+       RETURN(rc);
 }
 
 static int
@@ -1365,37 +1258,420 @@ out:
        return 0;
 }
 
-static int lfsck_namespace_double_scan_main(void *args)
+static int lfsck_namespace_double_scan(const struct lu_env *env,
+                                      struct lfsck_component *com)
 {
-       struct lfsck_thread_args *lta   = args;
-       const struct lu_env     *env    = &lta->lta_env;
-       struct lfsck_component  *com    = lta->lta_com;
-       struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct ptlrpc_thread    *thread = &lfsck->li_thread;
-       struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns     = com->lc_file_ram;
-       struct dt_object        *obj    = com->lc_obj;
-       const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
-       struct dt_object        *target;
-       struct dt_it            *di;
-       struct dt_key           *key;
-       struct lu_fid            fid;
-       int                      rc;
-       __u8                     flags = 0;
-       ENTRY;
+       struct lfsck_namespace *ns = com->lc_file_ram;
 
-       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
-              lfsck_lfsck2name(lfsck));
+       return lfsck_double_scan_generic(env, com, ns->ln_status);
+}
 
-       com->lc_new_checked = 0;
-       com->lc_new_scanned = 0;
-       com->lc_time_last_checkpoint = cfs_time_current();
-       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+static void lfsck_namespace_data_release(const struct lu_env *env,
+                                        struct lfsck_component *com)
+{
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc           *ltd;
+       struct lfsck_tgt_desc           *next;
+
+       LASSERT(lad != NULL);
+       LASSERT(thread_is_init(&lad->lad_thread) ||
+               thread_is_stopped(&lad->lad_thread));
+       LASSERT(list_empty(&lad->lad_req_list));
+
+       com->lc_data = NULL;
+
+       spin_lock(&ltds->ltd_lock);
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
+                                ltd_namespace_phase_list) {
+               list_del_init(&ltd->ltd_namespace_phase_list);
+       }
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+                                ltd_namespace_phase_list) {
+               list_del_init(&ltd->ltd_namespace_phase_list);
+       }
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
+                                ltd_namespace_list) {
+               list_del_init(&ltd->ltd_namespace_list);
+       }
+       spin_unlock(&ltds->ltd_lock);
+
+       OBD_FREE_PTR(lad);
+}
+
+static int lfsck_namespace_in_notify(const struct lu_env *env,
+                                    struct lfsck_component *com,
+                                    struct lfsck_request *lr)
+{
+       struct lfsck_instance           *lfsck = com->lc_lfsck;
+       struct lfsck_namespace          *ns    = com->lc_file_ram;
+       struct lfsck_assistant_data     *lad   = com->lc_data;
+       struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc           *ltd;
+       bool                             fail  = false;
+       ENTRY;
+
+       if (lr->lr_event != LE_PHASE1_DONE &&
+           lr->lr_event != LE_PHASE2_DONE &&
+           lr->lr_event != LE_PEER_EXIT)
+               RETURN(-EINVAL);
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
+              "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
+              lr->lr_index, lr->lr_status);
+
+       spin_lock(&ltds->ltd_lock);
+       ltd = LTD_TGT(ltds, lr->lr_index);
+       if (ltd == NULL) {
+               spin_unlock(&ltds->ltd_lock);
+
+               RETURN(-ENXIO);
+       }
+
+       list_del_init(&ltd->ltd_namespace_phase_list);
+       switch (lr->lr_event) {
+       case LE_PHASE1_DONE:
+               if (lr->lr_status <= 0) {
+                       ltd->ltd_namespace_done = 1;
+                       list_del_init(&ltd->ltd_namespace_list);
+                       CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
+                              "phase1 for namespace LFSCK: rc = %d.\n",
+                              lfsck_lfsck2name(lfsck),
+                              ltd->ltd_index, lr->lr_status);
+                       ns->ln_flags |= LF_INCOMPLETE;
+                       fail = true;
+                       break;
+               }
+
+               if (list_empty(&ltd->ltd_namespace_list))
+                       list_add_tail(&ltd->ltd_namespace_list,
+                                     &lad->lad_mdt_list);
+               list_add_tail(&ltd->ltd_namespace_phase_list,
+                             &lad->lad_mdt_phase2_list);
+               break;
+       case LE_PHASE2_DONE:
+               ltd->ltd_namespace_done = 1;
+               list_del_init(&ltd->ltd_namespace_list);
+               break;
+       case LE_PEER_EXIT:
+               fail = true;
+               ltd->ltd_namespace_done = 1;
+               list_del_init(&ltd->ltd_namespace_list);
+               if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
+                       CDEBUG(D_LFSCK,
+                              "%s: the peer MDT %x exit namespace LFSCK\n",
+                              lfsck_lfsck2name(lfsck), ltd->ltd_index);
+                       ns->ln_flags |= LF_INCOMPLETE;
+               }
+               break;
+       default:
+               break;
+       }
+       spin_unlock(&ltds->ltd_lock);
+
+       if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
+               struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
+
+               memset(stop, 0, sizeof(*stop));
+               stop->ls_status = lr->lr_status;
+               stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
+               lfsck_stop(env, lfsck->li_bottom, stop);
+       } else if (lfsck_phase2_next_ready(lad)) {
+               wake_up_all(&lad->lad_thread.t_ctl_waitq);
+       }
+
+       RETURN(0);
+}
+
+static int lfsck_namespace_query(const struct lu_env *env,
+                                struct lfsck_component *com)
+{
+       struct lfsck_namespace *ns = com->lc_file_ram;
+
+       return ns->ln_status;
+}
+
+static struct lfsck_operations lfsck_namespace_ops = {
+       .lfsck_reset            = lfsck_namespace_reset,
+       .lfsck_fail             = lfsck_namespace_fail,
+       .lfsck_checkpoint       = lfsck_namespace_checkpoint,
+       .lfsck_prep             = lfsck_namespace_prep,
+       .lfsck_exec_oit         = lfsck_namespace_exec_oit,
+       .lfsck_exec_dir         = lfsck_namespace_exec_dir,
+       .lfsck_post             = lfsck_namespace_post,
+       .lfsck_dump             = lfsck_namespace_dump,
+       .lfsck_double_scan      = lfsck_namespace_double_scan,
+       .lfsck_data_release     = lfsck_namespace_data_release,
+       .lfsck_quit             = lfsck_quit_generic,
+       .lfsck_in_notify        = lfsck_namespace_in_notify,
+       .lfsck_query            = lfsck_namespace_query,
+};
+
+static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
+                                               struct lfsck_component *com,
+                                               struct lfsck_assistant_req *lar)
+{
+       struct lfsck_thread_info   *info     = lfsck_env_info(env);
+       struct lu_attr             *la       = &info->lti_la;
+       struct lfsck_instance      *lfsck    = com->lc_lfsck;
+       struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
+       struct lfsck_namespace     *ns       = com->lc_file_ram;
+       struct linkea_data          ldata    = { 0 };
+       const struct lu_name       *cname;
+       struct thandle             *handle   = NULL;
+       struct lfsck_namespace_req *lnr      =
+                       container_of0(lar, struct lfsck_namespace_req, lnr_lar);
+       struct dt_object           *dir      = lnr->lnr_obj;
+       struct dt_object           *obj      = NULL;
+       const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
+       bool                        repaired = false;
+       bool                        locked   = false;
+       bool                        remove;
+       bool                        newdata;
+       bool                        log      = false;
+       int                         count    = 0;
+       int                         rc;
+       ENTRY;
+
+       if (lnr->lnr_attr & LUDA_UPGRADE) {
+               ns->ln_flags |= LF_UPGRADE;
+               ns->ln_dirent_repaired++;
+               repaired = true;
+       } else if (lnr->lnr_attr & LUDA_REPAIR) {
+               ns->ln_flags |= LF_INCONSISTENT;
+               ns->ln_dirent_repaired++;
+               repaired = true;
+       }
+
+       if (lnr->lnr_name[0] == '.' &&
+           (lnr->lnr_namelen == 1 ||
+            (lnr->lnr_namelen == 2 && lnr->lnr_name[1] == '.') ||
+            fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
+               GOTO(out, rc = 0);
+
+       obj = lfsck_object_find(env, lfsck, &lnr->lnr_fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       if (dt_object_exists(obj) == 0) {
+               rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               /* XXX: dangling name entry, will handle it in other patch. */
+               GOTO(out, rc);
+       }
+
+       cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
+       if (!(bk->lb_param & LPF_DRYRUN) &&
+           (com->lc_journal || repaired)) {
+
+again:
+               LASSERT(!locked);
+
+               com->lc_journal = 1;
+               handle = dt_trans_create(env, lfsck->li_next);
+               if (IS_ERR(handle))
+                       GOTO(out, rc = PTR_ERR(handle));
+
+               rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rc = dt_trans_start(env, lfsck->li_next, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               dt_write_lock(env, obj, MOR_TGT_CHILD);
+               locked = true;
+       }
+
+       rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = lfsck_links_read(env, obj, &ldata);
+       if (rc == 0) {
+               count = ldata.ld_leh->leh_reccount;
+               rc = linkea_links_find(&ldata, cname, pfid);
+               if ((rc == 0) &&
+                   (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
+                       goto record;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+               /* For dir, if there are more than one linkea entries, or the
+                * linkea entry does not match the name entry, then remove all
+                * and add the correct one. */
+               if (S_ISDIR(lfsck_object_type(obj))) {
+                       remove = true;
+                       newdata = true;
+               } else {
+                       remove = false;
+                       newdata = false;
+               }
+               goto nodata;
+       } else if (unlikely(rc == -EINVAL)) {
+               count = 1;
+               ns->ln_flags |= LF_INCONSISTENT;
+               /* The magic crashed, we are not sure whether there are more
+                * corrupt data in the linkea, so remove all linkea entries. */
+               remove = true;
+               newdata = true;
+               goto nodata;
+       } else if (rc == -ENODATA) {
+               count = 1;
+               ns->ln_flags |= LF_UPGRADE;
+               remove = false;
+               newdata = true;
+
+nodata:
+               if (bk->lb_param & LPF_DRYRUN) {
+                       ns->ln_linkea_repaired++;
+                       repaired = true;
+                       log = true;
+                       goto record;
+               }
+
+               if (!com->lc_journal)
+                       goto again;
+
+               if (remove) {
+                       LASSERT(newdata);
+
+                       rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
+                                         BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+
+               if (newdata) {
+                       rc = linkea_data_new(&ldata,
+                                       &lfsck_env_info(env)->lti_linkea_buf);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+
+               rc = linkea_add_buf(&ldata, cname, pfid);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rc = lfsck_links_write(env, obj, &ldata, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               count = ldata.ld_leh->leh_reccount;
+               ns->ln_linkea_repaired++;
+               repaired = true;
+               log = true;
+       } else {
+               GOTO(stop, rc);
+       }
+
+record:
+       LASSERT(count > 0);
+
+       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       if ((count == 1) &&
+           (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
+               /* Usually, it is for single linked object or dir, do nothing.*/
+               GOTO(stop, rc);
+
+       /* Following modification will be in another transaction.  */
+       if (handle != NULL) {
+               LASSERT(dt_write_locked(env, obj));
+
+               dt_write_unlock(env, obj);
+               locked = false;
+
+               dt_trans_stop(env, lfsck->li_next, handle);
+               handle = NULL;
+       }
+
+       ns->ln_mlinked_checked++;
+       rc = lfsck_namespace_update(env, com, &lnr->lnr_fid,
+                       count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
+
+       GOTO(out, rc);
+
+stop:
+       if (locked)
+               dt_write_unlock(env, obj);
+
+       if (handle != NULL)
+               dt_trans_stop(env, lfsck->li_next, handle);
+
+out:
+       down_write(&com->lc_sem);
+       if (rc < 0) {
+               CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
+                      "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
+                      PFID(lfsck_dto2fid(lnr->lnr_obj)),
+                      lnr->lnr_namelen, lnr->lnr_name, rc);
+
+               lfsck_namespace_record_failure(env, lfsck, ns);
+               if (!(bk->lb_param & LPF_FAILOUT))
+                       rc = 0;
+       } else {
+               if (log)
+                       CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
+                              "repaired the entry: "DFID", parent "DFID
+                              ", name %.*s\n", lfsck_lfsck2name(lfsck),
+                              PFID(&lnr->lnr_fid),
+                              PFID(lfsck_dto2fid(lnr->lnr_obj)),
+                              lnr->lnr_namelen, lnr->lnr_name);
+
+               if (repaired) {
+                       ns->ln_items_repaired++;
+                       if (bk->lb_param & LPF_DRYRUN &&
+                           lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
+                               lfsck_pos_fill(env, lfsck,
+                                              &ns->ln_pos_first_inconsistent,
+                                              false);
+               } else {
+                       com->lc_journal = 0;
+               }
+               rc = 0;
+       }
+       up_write(&com->lc_sem);
+
+       if (obj != NULL && !IS_ERR(obj))
+               lfsck_object_put(env, obj);
+       return rc;
+}
+
+static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
+                                               struct lfsck_component *com)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct ptlrpc_thread    *thread = &lfsck->li_thread;
+       struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
+       struct dt_object        *obj    = com->lc_obj;
+       const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
+       struct dt_object        *target;
+       struct dt_it            *di;
+       struct dt_key           *key;
+       struct lu_fid            fid;
+       int                      rc;
+       __u8                     flags = 0;
+       ENTRY;
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
+              lfsck_lfsck2name(lfsck));
+
+       com->lc_new_checked = 0;
+       com->lc_new_scanned = 0;
+       com->lc_time_last_checkpoint = cfs_time_current();
+       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
 
        di = iops->init(env, obj, 0, BYPASS_CAPA);
        if (IS_ERR(di))
-               GOTO(out, rc = PTR_ERR(di));
+               RETURN(PTR_ERR(di));
 
        fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
        rc = iops->get(env, di, (const struct dt_key *)&fid);
@@ -1407,9 +1683,6 @@ static int lfsck_namespace_double_scan_main(void *args)
        if (rc != 0)
                GOTO(put, rc);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
-               GOTO(put, rc = 0);
-
        do {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
                    cfs_fail_val > 0) {
@@ -1497,8 +1770,34 @@ put:
 
 fini:
        iops->fini(env, di);
+       return rc;
+}
+
+static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
+                                              struct lfsck_component *com,
+                                              struct lfsck_position *pos)
+{
+       struct lfsck_assistant_data     *lad = com->lc_data;
+       struct lfsck_namespace_req      *lnr;
+
+       if (list_empty(&lad->lad_req_list))
+               return;
+
+       lnr = list_entry(lad->lad_req_list.next,
+                        struct lfsck_namespace_req,
+                        lnr_lar.lar_list);
+       pos->lp_oit_cookie = lnr->lnr_oit_cookie;
+       pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
+       pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
+}
+
+static int lfsck_namespace_double_scan_result(const struct lu_env *env,
+                                             struct lfsck_component *com,
+                                             int rc)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
 
-out:
        down_write(&com->lc_sem);
        ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
@@ -1508,8 +1807,11 @@ out:
 
        if (rc > 0) {
                com->lc_journal = 0;
-               ns->ln_status = LS_COMPLETED;
-               if (!(bk->lb_param & LPF_DRYRUN))
+               if (ns->ln_flags & LF_INCOMPLETE)
+                       ns->ln_status = LS_PARTIAL;
+               else
+                       ns->ln_status = LS_COMPLETED;
+               if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
                        ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
                ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
                ns->ln_success_count++;
@@ -1521,80 +1823,18 @@ out:
                ns->ln_status = LS_FAILED;
        }
 
-       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan finished, status %d: "
-             "rc = %d\n", lfsck_lfsck2name(lfsck), ns->ln_status, rc);
-
        rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
-       if (atomic_dec_and_test(&lfsck->li_double_scan_count))
-               wake_up_all(&thread->t_ctl_waitq);
-
-       lfsck_thread_args_fini(lta);
 
        return rc;
 }
 
-static int lfsck_namespace_double_scan(const struct lu_env *env,
-                                      struct lfsck_component *com)
-{
-       struct lfsck_instance           *lfsck = com->lc_lfsck;
-       struct lfsck_namespace          *ns    = com->lc_file_ram;
-       struct lfsck_thread_args        *lta;
-       struct task_struct              *task;
-       int                              rc;
-       ENTRY;
-
-       if (unlikely(ns->ln_status != LS_SCANNING_PHASE2))
-               RETURN(0);
-
-       lta = lfsck_thread_args_init(lfsck, com, NULL);
-       if (IS_ERR(lta))
-               GOTO(out, rc = PTR_ERR(lta));
-
-       atomic_inc(&lfsck->li_double_scan_count);
-       task = kthread_run(lfsck_namespace_double_scan_main, lta,
-                          "lfsck_namespace");
-       if (IS_ERR(task)) {
-               atomic_dec(&lfsck->li_double_scan_count);
-               lfsck_thread_args_fini(lta);
-               GOTO(out, rc = PTR_ERR(task));
-       }
-
-       RETURN(0);
-
-out:
-       CERROR("%s: cannot start LFSCK namespace thread: rc = %d\n",
-              lfsck_lfsck2name(lfsck), rc);
-       return rc;
-}
-
-static int lfsck_namespace_in_notify(const struct lu_env *env,
-                                    struct lfsck_component *com,
-                                    struct lfsck_request *lr)
-{
-       return 0;
-}
-
-static int lfsck_namespace_query(const struct lu_env *env,
-                                struct lfsck_component *com)
-{
-       struct lfsck_namespace *ns = com->lc_file_ram;
-
-       return ns->ln_status;
-}
-
-static struct lfsck_operations lfsck_namespace_ops = {
-       .lfsck_reset            = lfsck_namespace_reset,
-       .lfsck_fail             = lfsck_namespace_fail,
-       .lfsck_checkpoint       = lfsck_namespace_checkpoint,
-       .lfsck_prep             = lfsck_namespace_prep,
-       .lfsck_exec_oit         = lfsck_namespace_exec_oit,
-       .lfsck_exec_dir         = lfsck_namespace_exec_dir,
-       .lfsck_post             = lfsck_namespace_post,
-       .lfsck_dump             = lfsck_namespace_dump,
-       .lfsck_double_scan      = lfsck_namespace_double_scan,
-       .lfsck_in_notify        = lfsck_namespace_in_notify,
-       .lfsck_query            = lfsck_namespace_query,
+struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
+       .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
+       .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
+       .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
+       .la_double_scan_result  = lfsck_namespace_double_scan_result,
+       .la_req_fini            = lfsck_namespace_assistant_req_fini,
 };
 
 /**
@@ -1900,6 +2140,12 @@ int lfsck_namespace_setup(const struct lu_env *env,
        com->lc_lfsck = lfsck;
        com->lc_type = LFSCK_TYPE_NAMESPACE;
        com->lc_ops = &lfsck_namespace_ops;
+       com->lc_data = lfsck_assistant_data_init(
+                       &lfsck_namespace_assistant_ops,
+                       "lfsck_namespace");
+       if (com->lc_data == NULL)
+               GOTO(out, rc = -ENOMEM);
+
        com->lc_file_size = sizeof(struct lfsck_namespace);
        OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
        if (com->lc_file_ram == NULL)