Whamcloud - gitweb
LU-5767 lfsck: use OUT RPC to create remote orphan parent 72/13172/8
authorFan Yong <fan.yong@intel.com>
Wed, 22 Oct 2014 07:00:20 +0000 (15:00 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 19 Jan 2015 17:50:43 +0000 (17:50 +0000)
When the namespace LFSCK tries to repair the missing name entry,
means inserting the lost name entry back to its parent directory,
it may find that the parent MDT-object was also lost. Under such
case, the namespace LFSCK will firstly create the missing parent
MDT-object as an orphan and insert into the
.lustre/lost+found/MDTxxxx/ directory remotely. Then insert the
lost name entry into the orphan parent according to the linkEA.
Originally, the namespace LFSCK uses the LFSCK RPC to handle the
case of creating orphan parent MDT-object on remote MDT. But it
is not the normal way for cross-MDTs modification that usually
is handled via the OUT RPC. This patch replaces the LFSCK RPC
with normal OUT RPC to create orphan parent on remote MDT.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I8b192e51f4c159cbf28e266f22ec487a8c6a68f0
Reviewed-on: http://review.whamcloud.com/13172
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/lfsck/lfsck_striped_dir.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/wiretest.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 7e82994..a58e294 100644 (file)
@@ -3560,19 +3560,12 @@ struct lfsck_request {
        __u16           lr_active;
        __u16           lr_param;
        __u16           lr_async_windows;
-       union {
-               __u32   lr_flags2;
-               __u32   lr_layout_version;
-       };
+       __u32           lr_flags2;
        struct lu_fid   lr_fid;
        struct lu_fid   lr_fid2;
-       union {
-               struct lu_fid   lr_fid3;
-               char            lr_pool_name[LOV_MAXPOOLNAME + 1];
-       };
-       __u32           lr_stripe_count;
-       __u32           lr_hash_type;
-       __u64           lr_padding_3;
+       struct lu_fid   lr_fid3;
+       __u64           lr_padding_1;
+       __u64           lr_padding_2;
 };
 
 void lustre_swab_lfsck_request(struct lfsck_request *lr);
@@ -3597,7 +3590,6 @@ enum lfsck_events {
        LE_PEER_EXIT            = 9,
        LE_CONDITIONAL_DESTROY  = 10,
        LE_PAIRS_VERIFY         = 11,
-       LE_CREATE_ORPHAN        = 12,
        LE_SKIP_NLINK_DECLARE   = 13,
        LE_SKIP_NLINK           = 14,
        LE_SET_LMV_MASTER       = 15,
index a4aa737..3484301 100644 (file)
@@ -630,6 +630,7 @@ struct lfsck_instance {
        struct lu_fid             li_global_root_fid; /* /ROOT */
        struct dt_object         *li_bookmark_obj;
        struct dt_object         *li_lpf_obj;
+       struct dt_object         *li_lpf_root_obj;
        struct lu_client_seq     *li_seq;
        struct lfsck_bookmark     li_bookmark_ram;
        struct lfsck_bookmark     li_bookmark_disk;
@@ -800,8 +801,6 @@ struct lfsck_thread_info {
        struct lu_fid           lti_fid;
        struct lu_fid           lti_fid2;
        struct lu_fid           lti_fid3;
-       struct lu_fid           lti_fid4;
-       struct lu_fid           lti_fid5;
        struct lu_attr          lti_la;
        struct lu_attr          lti_la2;
        struct lu_attr          lti_la3;
@@ -850,7 +849,6 @@ void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode);
 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
                              struct lfsck_instance *lfsck,
                              const struct lu_fid *fid);
-int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck);
 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck);
 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
                                           bool unlink);
@@ -942,10 +940,6 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
                        const struct lu_fid *pfid);
 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
                          char *name, struct lu_fid *pfid);
-int lfsck_remove_name_entry(const struct lu_env *env,
-                           struct lfsck_instance *lfsck,
-                           struct dt_object *parent,
-                           const char *name, __u32 type);
 int lfsck_update_name_entry(const struct lu_env *env,
                            struct lfsck_instance *lfsck,
                            struct dt_object *parent, const char *name,
index b2fa319..148fbdf 100644 (file)
@@ -1763,12 +1763,8 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        int                              rc     = 0;
        ENTRY;
 
-       /* Create .lustre/lost+found/MDTxxxx when needed. */
-       if (unlikely(lfsck->li_lpf_obj == NULL)) {
-               rc = lfsck_create_lpf(env, lfsck);
-               if (rc != 0)
-                       GOTO(log, rc);
-       }
+       if (unlikely(lfsck->li_lpf_obj == NULL))
+               GOTO(log, rc = -ENXIO);
 
        if (fid_is_zero(pfid)) {
                struct filter_fid *ff = &info->lti_new_pfid;
index 28bc0a2..1603099 100644 (file)
@@ -460,15 +460,84 @@ const char dotdot[] = "..";
 static const char dotlustre[] = ".lustre";
 static const char lostfound[] = "lost+found";
 
+/**
+ * Remove the name entry from the .lustre/lost+found directory.
+ *
+ * No need to care about the object referenced by the name entry,
+ * either the name entry is invalid or redundant, or the referenced
+ * object has been processed or will be handled by others.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ * \param[in] name     the name for the name entry to be removed
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
+                                      struct lfsck_instance *lfsck,
+                                      const char *name)
+{
+       struct dt_object        *parent = lfsck->li_lpf_root_obj;
+       struct dt_device        *dev    = lfsck->li_next;
+       struct thandle          *th;
+       struct lustre_handle     lh     = { 0 };
+       int                      rc;
+       ENTRY;
+
+       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
+                             MDS_INODELOCK_UPDATE, LCK_EX);
+       if (rc != 0)
+               RETURN(rc);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(unlock, rc = PTR_ERR(th));
+
+       rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_declare_ref_del(env, parent, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_delete(env, parent, (const struct dt_key *)name, th,
+                      BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, parent, 0);
+       rc = dt_ref_del(env, parent, th);
+       dt_write_unlock(env, parent);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+unlock:
+       lfsck_ibits_unlock(&lh, LCK_EX);
+
+       CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
+
+       return rc;
+}
+
 static int lfsck_create_lpf_local(const struct lu_env *env,
                                  struct lfsck_instance *lfsck,
-                                 struct dt_object *parent,
                                  struct dt_object *child,
                                  struct lu_attr *la,
                                  struct dt_object_format *dof,
                                  const char *name)
 {
        struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
+       struct dt_object        *parent = lfsck->li_lpf_root_obj;
        struct dt_device        *dev    = lfsck->li_bottom;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
@@ -606,13 +675,13 @@ stop:
 
 static int lfsck_create_lpf_remote(const struct lu_env *env,
                                   struct lfsck_instance *lfsck,
-                                  struct dt_object *parent,
                                   struct dt_object *child,
                                   struct lu_attr *la,
                                   struct dt_object_format *dof,
                                   const char *name)
 {
        struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
+       struct dt_object        *parent = lfsck->li_lpf_root_obj;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
        const struct lu_fid     *cfid   = lfsck_dto2fid(child);
@@ -789,17 +858,29 @@ stop:
        return rc;
 }
 
-/* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
- * because the MDT0 maybe not reaady for sequence allocation yet. We do that
- * only when it is required, such as orphan OST-objects repairing. */
-int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
+/**
+ * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
+ *
+ * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
+ * orphans and other uncertain inconsistent objects found during the
+ * LFSCK. Such directory will be created by the LFSCK engine on the
+ * local MDT before the LFSCK scanning.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+static int lfsck_create_lpf(const struct lu_env *env,
+                           struct lfsck_instance *lfsck)
 {
        struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
        struct lfsck_thread_info *info  = lfsck_env_info(env);
        struct lu_fid            *cfid  = &info->lti_fid2;
        struct lu_attr           *la    = &info->lti_la;
        struct dt_object_format  *dof   = &info->lti_dof;
-       struct dt_object         *parent = NULL;
+       struct dt_object         *parent = lfsck->li_lpf_root_obj;
        struct dt_object         *child = NULL;
        struct lustre_handle      lh    = { 0 };
        char                      name[8];
@@ -808,40 +889,15 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        ENTRY;
 
        LASSERT(lfsck->li_master);
-
-       sprintf(name, "MDT%04x", node);
-       if (node == 0) {
-               parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
-                                                 &LU_LPF_FID);
-       } else {
-               struct lfsck_tgt_desc *ltd;
-
-               ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
-               if (unlikely(ltd == NULL))
-                       RETURN(-ENXIO);
-
-               parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
-                                                 &LU_LPF_FID);
-               lfsck_tgt_put(ltd);
-       }
-       if (IS_ERR(parent))
-               RETURN(PTR_ERR(parent));
-
-       if (lfsck->li_lpf_obj != NULL)
-               GOTO(out, rc = 0);
-
-       if (unlikely(!dt_try_as_dir(env, parent)))
-               GOTO(out, rc = -ENOTDIR);
+       LASSERT(parent != NULL);
+       LASSERT(lfsck->li_lpf_obj == NULL);
 
        rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
                              MDS_INODELOCK_UPDATE, LCK_EX);
        if (rc != 0)
-               GOTO(out, rc);
-
-       mutex_lock(&lfsck->li_mutex);
-       if (lfsck->li_lpf_obj != NULL)
-               GOTO(unlock, rc = 0);
+               RETURN(rc);
 
+       snprintf(name, 8, "MDT%04x", node);
        if (fid_is_zero(&bk->lb_lpf_fid)) {
                /* There is corner case that: in former LFSCK scanning we have
                 * created the .lustre/lost+found/MDTxxxx but failed to update
@@ -886,24 +942,18 @@ int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        dof->dof_type = dt_mode_to_dft(S_IFDIR);
 
        if (node == 0)
-               rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
-                                           dof, name);
+               rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
        else
-               rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
-                                            dof, name);
+               rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
        if (rc == 0)
                lfsck->li_lpf_obj = child;
 
        GOTO(unlock, rc);
 
 unlock:
-       mutex_unlock(&lfsck->li_mutex);
        lfsck_ibits_unlock(&lh, LCK_EX);
        if (rc != 0 && child != NULL && !IS_ERR(child))
                lu_object_put(env, &child->do_lu);
-out:
-       if (parent != NULL && !IS_ERR(parent))
-               lu_object_put(env, &parent->do_lu);
 
        return rc;
 }
@@ -917,15 +967,14 @@ out:
  *
  * \param[in] env      pointer to the thread context
  * \param[in] lfsck    pointer to the lfsck instance
- * \param[in] parent   pointer to the lost+found object
  *
  * \retval             0 for success
  * \retval             negative error number on failure
  */
 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
-                                     struct lfsck_instance *lfsck,
-                                     struct dt_object *parent)
+                                     struct lfsck_instance *lfsck)
 {
+       struct dt_object        *parent = lfsck->li_lpf_root_obj;
        struct lu_dirent        *ent    =
                        (struct lu_dirent *)lfsck_env_info(env)->lti_key;
        const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
@@ -967,8 +1016,8 @@ static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
                if (off != 7) {
 
 remove:
-                       rc = lfsck_remove_name_entry(env, lfsck, parent,
-                                                    ent->lde_name, S_IFDIR);
+                       rc = lfsck_lpf_remove_name_entry(env, lfsck,
+                                                        ent->lde_name);
                        if (rc != 0)
                                break;
                }
@@ -1035,7 +1084,6 @@ static int lfsck_update_lpf_entry(const struct lu_env *env,
  *
  * \param[in] env      pointer to the thread context
  * \param[in] lfsck    pointer to the lfsck instance
- * \param[in] parent   pointer to the lost+found object
  * \param[in] child    pointer to the lost+found sub-directory object
  * \param[in] name     the name for lost+found sub-directory object
  * \param[out] fid     pointer to the buffer to hold the FID of the object
@@ -1050,11 +1098,11 @@ static int lfsck_update_lpf_entry(const struct lu_env *env,
  */
 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
                                  struct lfsck_instance *lfsck,
-                                 struct dt_object *parent,
                                  struct dt_object *child, const char *name,
                                  struct lu_fid *fid,
                                  enum lfsck_verify_lpf_types type)
 {
+       struct dt_object         *parent  = lfsck->li_lpf_root_obj;
        struct lfsck_thread_info *info    = lfsck_env_info(env);
        char                     *name2   = info->lti_key;
        struct lu_fid            *fid2    = &info->lti_fid3;
@@ -1179,7 +1227,7 @@ linkea:
                       lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
        } else /* if (type == LVLT_BY_NAMEENTRY) */ {
                /* The name entry is wrong, remove it. */
-               rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
+               rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
        }
 
        GOTO(out_put, rc);
@@ -1213,7 +1261,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        struct lu_fid            *pfid   = &info->lti_fid;
        struct lu_fid            *cfid   = &info->lti_fid2;
        struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
-       struct dt_object         *parent = NULL;
+       struct dt_object         *parent;
        /* child1's FID is in the bookmark file. */
        struct dt_object         *child1 = NULL;
        /* child2's FID is in the name entry MDTxxxx. */
@@ -1227,6 +1275,9 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
        LASSERT(lfsck->li_master);
 
+       if (lfsck->li_lpf_root_obj != NULL)
+               RETURN(0);
+
        if (node == 0) {
                parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
        } else {
@@ -1246,11 +1297,15 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
        LASSERT(dt_object_exists(parent));
 
-       if (unlikely(!dt_try_as_dir(env, parent)))
+       if (unlikely(!dt_try_as_dir(env, parent))) {
+               lu_object_put(env, &parent->do_lu);
+
                GOTO(put, rc = -ENOTDIR);
+       }
 
+       lfsck->li_lpf_root_obj = parent;
        if (node == 0) {
-               rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
+               rc = lfsck_scan_lpf_bad_entries(env, lfsck);
                if (rc != 0)
                        CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
                               "for bad sub-directories: rc = %d\n",
@@ -1316,7 +1371,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
        /* Invalid FID in the name entry, remove the name entry. */
        if (!fid_is_norm(cfid)) {
-               rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
+               rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
                if (rc != 0)
                        GOTO(put, rc);
 
@@ -1330,8 +1385,7 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
        if (unlikely(!dt_object_exists(child2) ||
                     dt_object_remote(child2)) ||
                     !S_ISDIR(lfsck_object_type(child2))) {
-               rc = lfsck_remove_name_entry(env, lfsck, parent, name,
-                                            S_IFDIR);
+               rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
                if (rc != 0)
                        GOTO(put, rc);
 
@@ -1342,13 +1396,13 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
                GOTO(put, rc = -ENOTDIR);
 
        if (child1 == NULL) {
-               rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
+               rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
                                            pfid, LVLT_BY_NAMEENTRY);
        } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
-               rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
+               rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
                                            pfid, LVLT_BY_BOOKMARK);
                if (!lu_fid_eq(pfid, &LU_LPF_FID))
-                       rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
+                       rc = lfsck_verify_lpf_pairs(env, lfsck, child2,
                                                    name, pfid,
                                                    LVLT_BY_NAMEENTRY);
        } else {
@@ -1365,22 +1419,26 @@ int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
 
 check_child1:
        if (child1 != NULL)
-               rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
+               rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
                                            pfid, LVLT_BY_BOOKMARK);
 
        GOTO(put, rc);
 
 put:
-       if (lfsck->li_lpf_obj != NULL &&
-           unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
-               rc = -ENOTDIR;
+       if (lfsck->li_lpf_obj != NULL) {
+               if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
+                       lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
+                       lfsck->li_lpf_obj = NULL;
+                       rc = -ENOTDIR;
+               }
+       } else if (rc == 0) {
+               rc = lfsck_create_lpf(env, lfsck);
+       }
 
        if (child2 != NULL && !IS_ERR(child2))
                lu_object_put(env, &child2->do_lu);
        if (child1 != NULL && !IS_ERR(child1))
                lu_object_put(env, &child1->do_lu);
-       if (parent != NULL && !IS_ERR(parent))
-               lu_object_put(env, &parent->do_lu);
 
        return rc;
 }
@@ -1493,6 +1551,11 @@ void lfsck_instance_cleanup(const struct lu_env *env,
                lfsck->li_lpf_obj = NULL;
        }
 
+       if (lfsck->li_lpf_root_obj != NULL) {
+               lu_object_put(env, &lfsck->li_lpf_root_obj->do_lu);
+               lfsck->li_lpf_root_obj = NULL;
+       }
+
        if (lfsck->li_los != NULL) {
                local_oid_storage_fini(env, lfsck->li_los);
                lfsck->li_los = NULL;
@@ -2941,7 +3004,6 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
        case LE_FID_ACCESSED:
        case LE_PEER_EXIT:
        case LE_CONDITIONAL_DESTROY:
-       case LE_CREATE_ORPHAN:
        case LE_SKIP_NLINK_DECLARE:
        case LE_SKIP_NLINK:
        case LE_SET_LMV_MASTER:
index 744b29e..887c145 100644 (file)
@@ -835,10 +835,10 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
        struct lu_name                  *cname  = &info->lti_name;
        struct dt_insert_rec            *rec    = &info->lti_dt_rec;
-       struct lu_fid                   *tfid   = &info->lti_fid5;
        struct lu_attr                  *la     = &info->lti_la3;
        const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
        const struct lu_fid             *pfid;
+       struct lu_fid                    tfid;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct dt_device                *dev    = lfsck->li_bottom;
        struct dt_object                *parent;
@@ -854,12 +854,8 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
        ENTRY;
 
        cname->ln_name = NULL;
-       /* Create .lustre/lost+found/MDTxxxx when needed. */
-       if (unlikely(lfsck->li_lpf_obj == NULL)) {
-               rc = lfsck_create_lpf(env, lfsck);
-               if (rc != 0)
-                       GOTO(log, rc);
-       }
+       if (unlikely(lfsck->li_lpf_obj == NULL))
+               GOTO(log, rc = -ENXIO);
 
        parent = lfsck->li_lpf_obj;
        pfid = lfsck_dto2fid(parent);
@@ -873,13 +869,13 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
        do {
                namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
                                   PFID(cfid), infix, type, idx++);
-               rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+               rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
                               (const struct dt_key *)info->lti_key,
                               BYPASS_CAPA);
                if (rc != 0 && rc != -ENOENT)
                        GOTO(log, rc);
 
-               if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid)))
+               if (unlikely(rc == 0 && lu_fid_eq(cfid, &tfid)))
                        exist = true;
        } while (rc == 0 && !exist);
 
@@ -1154,119 +1150,7 @@ log:
 }
 
 /**
- * Create the specified orphan MDT-object on remote MDT.
- *
- * The LFSCK instance on this MDT will send LFSCK RPC to remote MDT to
- * ask the remote LFSCK instance to create the specified orphan object
- * under .lustre/lost+found/MDTxxxx/ directory with the name:
- * ${FID}-P-${conflict_version}.
- *
- * \param[in] env      pointer to the thread context
- * \param[in] com      pointer to the lfsck component
- * \param[in] orphan   pointer to the orphan MDT-object
- * \param[in] type     the orphan's type to be created
- *
- *  type "P":          The orphan object to be created was a parent directory
- *                     of some MDT-object which linkEA shows that the @orphan
- *                     object is missing.
- *
- * \see lfsck_layout_recreate_parent() for more types.
- *
- * \param[in] lmv      pointer to master LMV EA that will be set to the orphan
- *
- * \retval             positive number for repaired cases
- * \retval             0 if needs to repair nothing
- * \retval             negative error number on failure
- */
-static int lfsck_namespace_create_orphan_remote(const struct lu_env *env,
-                                               struct lfsck_component *com,
-                                               struct dt_object *orphan,
-                                               __u32 type,
-                                               struct lmv_mds_md_v1 *lmv)
-{
-       struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       struct lfsck_request            *lr     = &info->lti_lr;
-       struct lu_seq_range             *range  = &info->lti_range;
-       const struct lu_fid             *fid    = lfsck_dto2fid(orphan);
-       struct lfsck_namespace          *ns     = com->lc_file_ram;
-       struct lfsck_instance           *lfsck  = com->lc_lfsck;
-       struct seq_server_site          *ss     =
-                       lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
-       struct lfsck_tgt_desc           *ltd    = NULL;
-       struct ptlrpc_request           *req    = NULL;
-       int                              rc;
-       ENTRY;
-
-       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
-               GOTO(out, rc = 1);
-
-       fld_range_set_mdt(range);
-       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, range->lsr_index);
-       if (ltd == NULL) {
-               ns->ln_flags |= LF_INCOMPLETE;
-
-               GOTO(out, rc = -ENODEV);
-       }
-
-       req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
-                                  &RQF_LFSCK_NOTIFY);
-       if (req == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-       rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
-       if (rc != 0) {
-               ptlrpc_request_free(req);
-
-               GOTO(out, rc);
-       }
-
-       lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
-       memset(lr, 0, sizeof(*lr));
-       lr->lr_event = LE_CREATE_ORPHAN;
-       lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
-       lr->lr_active = LFSCK_TYPE_NAMESPACE;
-       lr->lr_fid = *fid;
-       lr->lr_type = type;
-       if (lmv != NULL) {
-               lr->lr_hash_type = lmv->lmv_hash_type;
-               lr->lr_stripe_count = lmv->lmv_stripe_count;
-               lr->lr_layout_version = lmv->lmv_layout_version;
-               memcpy(lr->lr_pool_name, lmv->lmv_pool_name,
-                      sizeof(lr->lr_pool_name));
-       }
-
-       ptlrpc_request_set_replen(req);
-       rc = ptlrpc_queue_wait(req);
-       ptlrpc_req_finished(req);
-
-       if (rc == 0) {
-               orphan->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
-               rc = 1;
-       } else if (rc == -EEXIST) {
-               orphan->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
-               rc = 0;
-       }
-
-       GOTO(out, rc);
-
-out:
-       CDEBUG(D_LFSCK, "%s: namespace LFSCK create object "
-              DFID" on the MDT %x remotely: rc = %d\n",
-              lfsck_lfsck2name(lfsck), PFID(fid),
-              ltd != NULL ? ltd->ltd_index : -1, rc);
-
-       if (ltd != NULL)
-               lfsck_tgt_put(ltd);
-
-       return rc;
-}
-
-/**
- * Create the specified orphan MDT-object locally.
+ * Create the specified orphan directory.
  *
  * For the case that the parent MDT-object stored in some MDT-object's
  * linkEA entry is lost, the LFSCK will re-create the parent object as
@@ -1276,24 +1160,15 @@ out:
  * \param[in] env      pointer to the thread context
  * \param[in] com      pointer to the lfsck component
  * \param[in] orphan   pointer to the orphan MDT-object to be created
- * \param[in] type     the orphan's type to be created
- *
- *  type "P":          The orphan object to be created was a parent directory
- *                     of some MDT-object which linkEA shows that the @orphan
- *                     object is missing.
- *
- * \see lfsck_layout_recreate_parent() for more types.
- *
  * \param[in] lmv      pointer to master LMV EA that will be set to the orphan
  *
  * \retval             positive number for repaired cases
  * \retval             negative error number on failure
  */
-static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
-                                              struct lfsck_component *com,
-                                              struct dt_object *orphan,
-                                              __u32 type,
-                                              struct lmv_mds_md_v1 *lmv)
+static int lfsck_namespace_create_orphan_dir(const struct lu_env *env,
+                                            struct lfsck_component *com,
+                                            struct dt_object *orphan,
+                                            struct lmv_mds_md_v1 *lmv)
 {
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
        struct lu_attr                  *la     = &info->lti_la;
@@ -1301,12 +1176,12 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
        struct dt_object_format         *dof    = &info->lti_dof;
        struct lu_name                  *cname  = &info->lti_name2;
        struct dt_insert_rec            *rec    = &info->lti_dt_rec;
-       struct lu_fid                   *tfid   = &info->lti_fid;
        struct lmv_mds_md_v1            *lmv2   = &info->lti_lmv2;
        const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
-       const struct lu_fid             *pfid;
+       struct lu_fid                    tfid;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
-       struct dt_device                *dev    = lfsck->li_bottom;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct dt_device                *dev;
        struct dt_object                *parent = NULL;
        struct dt_object                *child  = NULL;
        struct thandle                  *th     = NULL;
@@ -1321,26 +1196,45 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
        ENTRY;
 
        LASSERT(!dt_object_exists(orphan));
-       LASSERT(!dt_object_remote(orphan));
-
-       /* @orphan maybe not attached to lfsck->li_bottom */
-       child = lfsck_object_find_by_dev(env, dev, cfid);
-       if (IS_ERR(child))
-               GOTO(log, rc = PTR_ERR(child));
 
        cname->ln_name = NULL;
        if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
                GOTO(log, rc = 1);
 
-       /* Create .lustre/lost+found/MDTxxxx when needed. */
-       if (unlikely(lfsck->li_lpf_obj == NULL)) {
-               rc = lfsck_create_lpf(env, lfsck);
+       if (dt_object_remote(orphan)) {
+               LASSERT(lfsck->li_lpf_root_obj != NULL);
+
+               idx = lfsck_find_mdt_idx_by_fid(env, lfsck, cfid);
+               if (idx < 0)
+                       GOTO(log, rc = idx);
+
+               snprintf(name, 8, "MDT%04x", idx);
+               rc = dt_lookup(env, lfsck->li_lpf_root_obj,
+                              (struct dt_rec *)&tfid,
+                              (const struct dt_key *)name, BYPASS_CAPA);
                if (rc != 0)
-                       GOTO(log, rc);
+                       GOTO(log, rc = (rc == -ENOENT ? -ENXIO : rc));
+
+               parent = lfsck_object_find_bottom(env, lfsck, &tfid);
+               if (IS_ERR(parent))
+                       GOTO(log, rc = PTR_ERR(parent));
+
+               if (unlikely(!dt_try_as_dir(env, parent)))
+                       GOTO(log, rc = -ENOTDIR);
+       } else {
+               if (unlikely(lfsck->li_lpf_obj == NULL))
+                       GOTO(log, rc = -ENXIO);
+
+               parent = lfsck->li_lpf_obj;
        }
 
-       parent = lfsck->li_lpf_obj;
-       pfid = lfsck_dto2fid(parent);
+       dev = lfsck_find_dev_by_fid(env, lfsck, cfid);
+       if (IS_ERR(dev))
+               GOTO(log, rc = PTR_ERR(dev));
+
+       child = lfsck_object_find_by_dev(env, dev, cfid);
+       if (IS_ERR(child))
+               GOTO(log, rc = PTR_ERR(child));
 
        /* Hold update lock on the parent to prevent others to access. */
        rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
@@ -1348,10 +1242,11 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
        if (rc != 0)
                GOTO(log, rc);
 
+       idx = 0;
        do {
                namelen = snprintf(name, 31, DFID"-P-%d",
                                   PFID(cfid), idx++);
-               rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+               rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
                               (const struct dt_key *)name, BYPASS_CAPA);
                if (rc != 0 && rc != -ENOENT)
                        GOTO(unlock1, rc);
@@ -1361,7 +1256,7 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
        cname->ln_namelen = namelen;
 
        memset(la, 0, sizeof(*la));
-       la->la_mode = type | (S_ISDIR(type) ? 0700 : 0600);
+       la->la_mode = S_IFDIR | 0700;
        la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
                       LA_ATIME | LA_MTIME | LA_CTIME;
 
@@ -1369,13 +1264,13 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
                                  la->la_mode & S_IFMT);
 
        memset(dof, 0, sizeof(*dof));
-       dof->dof_type = dt_mode_to_dft(type);
+       dof->dof_type = dt_mode_to_dft(S_IFDIR);
 
        rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
        if (rc != 0)
                GOTO(unlock1, rc);
 
-       rc = linkea_add_buf(&ldata, cname, pfid);
+       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
        if (rc != 0)
                GOTO(unlock1, rc);
 
@@ -1383,13 +1278,38 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
        if (IS_ERR(th))
                GOTO(unlock1, rc = PTR_ERR(th));
 
+       /* Sync the remote transaction to guarantee that the subsequent
+        * lock against the @orphan can find the @orphan in time. */
+       if (dt_object_remote(orphan))
+               th->th_sync = 1;
+
        rc = dt_declare_create(env, child, la, hint, dof, th);
-       if (rc == 0 && S_ISDIR(type))
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       if (unlikely(!dt_try_as_dir(env, child)))
+               GOTO(stop, rc = -ENOTDIR);
+
+       rec->rec_type = S_IFDIR;
+       rec->rec_fid = cfid;
+       rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
+                              (const struct dt_key *)dot, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rec->rec_fid = lfsck_dto2fid(parent);
+       rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
+                              (const struct dt_key *)dotdot, th);
+       if (rc == 0)
                rc = dt_declare_ref_add(env, child, th);
 
        if (rc != 0)
                GOTO(stop, rc);
 
+       rc = dt_declare_ref_add(env, child, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
        if (lmv != NULL) {
                lmv->lmv_magic = LMV_MAGIC;
                lmv->lmv_master_mdt_index = lfsck_dev_idx(dev);
@@ -1408,11 +1328,10 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
-       rec->rec_type = type;
        rec->rec_fid = cfid;
        rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
                               (const struct dt_key *)name, th);
-       if (rc == 0 && S_ISDIR(type))
+       if (rc == 0)
                rc = dt_declare_ref_add(env, parent, th);
 
        if (rc != 0)
@@ -1427,28 +1346,22 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
        if (rc != 0)
                GOTO(unlock2, rc);
 
-       if (S_ISDIR(type)) {
-               if (unlikely(!dt_try_as_dir(env, child)))
-                       GOTO(unlock2, rc = -ENOTDIR);
-
-               rec->rec_type = S_IFDIR;
-               rec->rec_fid = cfid;
-               rc = dt_insert(env, child, (const struct dt_rec *)rec,
-                              (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
-               if (rc != 0)
-                       GOTO(unlock2, rc);
+       rec->rec_fid = cfid;
+       rc = dt_insert(env, child, (const struct dt_rec *)rec,
+                      (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
+       if (rc != 0)
+               GOTO(unlock2, rc);
 
-               rec->rec_fid = pfid;
-               rc = dt_insert(env, child, (const struct dt_rec *)rec,
-                              (const struct dt_key *)dotdot, th,
-                              BYPASS_CAPA, 1);
-               if (rc != 0)
-                       GOTO(unlock2, rc);
+       rec->rec_fid = lfsck_dto2fid(parent);
+       rc = dt_insert(env, child, (const struct dt_rec *)rec,
+                      (const struct dt_key *)dotdot, th,
+                      BYPASS_CAPA, 1);
+       if (rc != 0)
+               GOTO(unlock2, rc);
 
-               rc = dt_ref_add(env, child, th);
-               if (rc != 0)
-                       GOTO(unlock2, rc);
-       }
+       rc = dt_ref_add(env, child, th);
+       if (rc != 0)
+               GOTO(unlock2, rc);
 
        if (lmv != NULL) {
                rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV, 0,
@@ -1463,11 +1376,10 @@ static int lfsck_namespace_create_orphan_local(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
-       rec->rec_type = type;
        rec->rec_fid = cfid;
        rc = dt_insert(env, parent, (const struct dt_rec *)rec,
                       (const struct dt_key *)name, th, BYPASS_CAPA, 1);
-       if (rc == 0 && S_ISDIR(type)) {
+       if (rc == 0) {
                dt_write_lock(env, parent, 0);
                rc = dt_ref_add(env, parent, th);
                dt_write_unlock(env, parent);
@@ -1485,55 +1397,16 @@ unlock1:
        lfsck_ibits_unlock(&lh, LCK_EX);
 
 log:
-       CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan locally for "
-              "the object "DFID", name = %s, type %o: rc = %d\n",
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK create orphan dir for "
+              "the object "DFID", name = %s: rc = %d\n",
               lfsck_lfsck2name(lfsck), PFID(cfid),
-              cname->ln_name != NULL ? cname->ln_name : "<NULL>", type, rc);
+              cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
 
        if (child != NULL && !IS_ERR(child))
                lfsck_object_put(env, child);
 
-       return rc;
-}
-
-/**
- * Create the specified orphan MDT-object.
- *
- * For the case that the parent MDT-object stored in some MDT-object's
- * linkEA entry is lost, the LFSCK will re-create the parent object as
- * an orphan and insert it into .lustre/lost+found/MDTxxxx/ directory
- * with the name: ${FID}-P-${conflict_version}.
- *
- * \param[in] env      pointer to the thread context
- * \param[in] com      pointer to the lfsck component
- * \param[in] orphan   pointer to the orphan MDT-object
- *
- *  type "P":          The orphan object to be created was a parent directory
- *                     of some MDT-object which linkEA shows that the @orphan
- *                     object is missing.
- *
- * \see lfsck_layout_recreate_parent() for more types.
- *
- * \param[in] lmv      pointer to master LMV EA that will be set to the orphan
- *
- * \retval             positive number for repaired cases
- * \retval             0 if needs to repair nothing
- * \retval             negative error number on failure
- */
-static int lfsck_namespace_create_orphan(const struct lu_env *env,
-                                        struct lfsck_component *com,
-                                        struct dt_object *orphan,
-                                        struct lmv_mds_md_v1 *lmv)
-{
-       struct lfsck_namespace *ns = com->lc_file_ram;
-       int                     rc;
-
-       if (dt_object_remote(orphan))
-               rc = lfsck_namespace_create_orphan_remote(env, com, orphan,
-                                                         S_IFDIR, lmv);
-       else
-               rc = lfsck_namespace_create_orphan_local(env, com, orphan,
-                                                        S_IFDIR, lmv);
+       if (parent != NULL && !IS_ERR(parent) && parent != lfsck->li_lpf_obj)
+               lfsck_object_put(env, parent);
 
        if (rc != 0)
                ns->ln_flags |= LF_INCONSISTENT;
@@ -1783,9 +1656,9 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
                                        const struct lu_name *cname)
 {
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       struct lu_fid                   *tfid   = &info->lti_fid5;
        struct lu_attr                  *la     = &info->lti_la;
        struct dt_insert_rec            *rec    = &info->lti_dt_rec;
+       struct lu_fid                    tfid;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct dt_device                *dev    = lfsck->li_next;
        const char                      *name   = cname->ln_name;
@@ -1824,7 +1697,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
                goto replace;
        }
 
-       rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+       rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
                       (const struct dt_key *)name, BYPASS_CAPA);
        if (rc == -ENOENT) {
                exist = false;
@@ -1835,7 +1708,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
                GOTO(log, rc);
 
        /* Someone changed the name entry, cannot replace it. */
-       if (!lu_fid_eq(cfid, tfid))
+       if (!lu_fid_eq(cfid, &tfid))
                GOTO(log, rc = 0);
 
        /* lock the object to be destroyed. */
@@ -2048,15 +1921,14 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
                                  const char *name, const char *name2,
                                  __u16 type, bool update, bool dec)
 {
-       struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       struct dt_insert_rec            *rec    = &info->lti_dt_rec;
-       const struct lu_fid             *cfid   = lfsck_dto2fid(child);
-       struct lu_fid                   *tfid   = &info->lti_fid5;
-       struct lfsck_instance           *lfsck  = com->lc_lfsck;
-       struct dt_device                *dev    = lfsck->li_next;
-       struct thandle                  *th     = NULL;
-       struct lustre_handle             lh     = { 0 };
-       int                              rc     = 0;
+       struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
+       const struct lu_fid     *cfid   = lfsck_dto2fid(child);
+       struct lu_fid            tfid;
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct dt_device        *dev    = lfsck->li_next;
+       struct thandle          *th     = NULL;
+       struct lustre_handle     lh     = { 0 };
+       int                      rc     = 0;
        ENTRY;
 
        if (unlikely(!dt_try_as_dir(env, parent)))
@@ -2096,7 +1968,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
                GOTO(stop, rc);
 
        dt_write_lock(env, parent, 0);
-       rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+       rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
                       (const struct dt_key *)name, BYPASS_CAPA);
        /* Someone has removed the bad name entry by race. */
        if (rc == -ENOENT)
@@ -2107,7 +1979,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env,
 
        /* Someone has removed the bad name entry and reused it for other
         * object by race. */
-       if (!lu_fid_eq(tfid, cfid))
+       if (!lu_fid_eq(&tfid, cfid))
                GOTO(unlock2, rc = 0);
 
        if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
@@ -2381,7 +2253,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
        struct lfsck_thread_info *info          = lfsck_env_info(env);
        struct lu_name           *cname         = &info->lti_name;
        const struct lu_fid      *cfid          = lfsck_dto2fid(child);
-       struct lu_fid            *tfid          = &info->lti_fid3;
+       struct lu_fid             tfid;
        struct lfsck_namespace   *ns            = com->lc_file_ram;
        struct lfsck_instance    *lfsck         = com->lc_lfsck;
        struct dt_object         *parent        = NULL;
@@ -2389,9 +2261,9 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
        int                       rc            = 0;
        ENTRY;
 
-       lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key);
+       lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid, info->lti_key);
        /* The unique linkEA entry with bad parent will be handled as orphan. */
-       if (!fid_is_sane(tfid)) {
+       if (!fid_is_sane(&tfid)) {
                if (!lustre_handle_is_used(lh) && retry != NULL)
                        *retry = true;
                else
@@ -2401,7 +2273,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
                GOTO(out, rc);
        }
 
-       parent = lfsck_object_find_bottom(env, lfsck, tfid);
+       parent = lfsck_object_find_bottom(env, lfsck, &tfid);
        if (IS_ERR(parent))
                GOTO(out, rc = PTR_ERR(parent));
 
@@ -2443,7 +2315,7 @@ lost_parent:
                        /* It is an invalid name entry, we
                         * cannot trust the parent also. */
                        rc = lfsck_namespace_shrink_linkea(env, com, child,
-                                               ldata, cname, tfid, true);
+                                               ldata, cname, &tfid, true);
                        if (rc < 0)
                                GOTO(out, rc);
 
@@ -2456,7 +2328,7 @@ lost_parent:
                }
 
                /* Create the lost parent as an orphan. */
-               rc = lfsck_namespace_create_orphan(env, com, parent, lmv);
+               rc = lfsck_namespace_create_orphan_dir(env, com, parent, lmv);
                if (rc >= 0) {
                        /* Add the missing name entry to the parent. */
                        rc = lfsck_namespace_insert_normal(env, com, parent,
@@ -2472,7 +2344,7 @@ lost_parent:
                                 * current system to be consistent. */
                                rc = lfsck_namespace_shrink_linkea(env,
                                                com, child, ldata,
-                                               cname, tfid, true);
+                                               cname, &tfid, true);
                                if (rc >= 0) {
                                        snprintf(info->lti_tmpbuf,
                                                 sizeof(info->lti_tmpbuf),
@@ -2498,7 +2370,7 @@ lost_parent:
                GOTO(out, rc);
        }
 
-       rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+       rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
                       (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
        if (rc == -ENOENT) {
                /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
@@ -2528,7 +2400,7 @@ lost_parent:
                /* It is an invalid name entry, drop it. */
                if (unlikely(rc > 0)) {
                        rc = lfsck_namespace_shrink_linkea(env, com, child,
-                                               ldata, cname, tfid, true);
+                                               ldata, cname, &tfid, true);
                        if (rc >= 0) {
                                snprintf(info->lti_tmpbuf,
                                         sizeof(info->lti_tmpbuf),
@@ -2560,7 +2432,7 @@ lost_parent:
                         * internal status of create operation. Under such
                         * case, nothing to be done. */
                        rc = lfsck_namespace_shrink_linkea_cond(env, com,
-                                       parent, child, ldata, cname, tfid);
+                                       parent, child, ldata, cname, &tfid);
                        if (rc >= 0) {
                                snprintf(info->lti_tmpbuf,
                                         sizeof(info->lti_tmpbuf),
@@ -2576,7 +2448,7 @@ lost_parent:
        if (rc != 0)
                GOTO(out, rc);
 
-       if (!lu_fid_eq(tfid, cfid)) {
+       if (!lu_fid_eq(&tfid, cfid)) {
                if (!lustre_handle_is_used(lh) && retry != NULL) {
                        *retry = true;
 
@@ -2588,7 +2460,7 @@ lost_parent:
                 * may be created by the LFSCK for repairing dangling
                 * name entry. Try to replace it. */
                rc = lfsck_namespace_replace_cond(env, com, parent, child,
-                                                 tfid, cname);
+                                                 &tfid, cname);
                if (rc == 0)
                        rc = lfsck_namespace_dsd_orphan(env, com, child,
                                                        pfid, lh, type);
@@ -2657,8 +2529,8 @@ lfsck_namespace_dsd_multiple(const struct lu_env *env,
        struct lfsck_thread_info *info          = lfsck_env_info(env);
        struct lu_name           *cname         = &info->lti_name;
        const struct lu_fid      *cfid          = lfsck_dto2fid(child);
-       struct lu_fid            *tfid          = &info->lti_fid3;
-       struct lu_fid            *pfid2         = &info->lti_fid4;
+       struct lu_fid            *pfid2         = &info->lti_fid3;
+       struct lu_fid             tfid;
        struct lfsck_namespace   *ns            = com->lc_file_ram;
        struct lfsck_instance    *lfsck         = com->lc_lfsck;
        struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
@@ -2672,12 +2544,12 @@ lfsck_namespace_dsd_multiple(const struct lu_env *env,
 
 again:
        while (ldata->ld_lee != NULL) {
-               lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid,
+               lfsck_namespace_unpack_linkea_entry(ldata, cname, &tfid,
                                                    info->lti_key);
                /* Drop repeated linkEA entries. */
-               lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true);
+               lfsck_namespace_filter_linkea_entry(ldata, cname, &tfid, true);
                /* Drop invalid linkEA entry. */
-               if (!fid_is_sane(tfid)) {
+               if (!fid_is_sane(&tfid)) {
                        linkea_del_buf(ldata, cname);
                        linkea_count++;
                        continue;
@@ -2696,12 +2568,12 @@ again:
                 * When the LFSCK runs again, if the dangling name is still
                 * there, the LFSCK should move the orphan directory object
                 * back to the normal namespace. */
-               if (!lpf && !lu_fid_eq(pfid, tfid) && once) {
+               if (!lpf && !lu_fid_eq(pfid, &tfid) && once) {
                        linkea_next_entry(ldata);
                        continue;
                }
 
-               parent = lfsck_object_find_bottom(env, lfsck, tfid);
+               parent = lfsck_object_find_bottom(env, lfsck, &tfid);
                if (IS_ERR(parent))
                        RETURN(PTR_ERR(parent));
 
@@ -2728,7 +2600,7 @@ again:
                        continue;
                }
 
-               rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+               rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
                               (const struct dt_key *)cname->ln_name,
                               BYPASS_CAPA);
                *pfid2 = *lfsck_dto2fid(parent);
@@ -2744,7 +2616,7 @@ again:
                        RETURN(rc);
                }
 
-               if (lu_fid_eq(tfid, cfid)) {
+               if (lu_fid_eq(&tfid, cfid)) {
                        lfsck_object_put(env, parent);
                        if (!lu_fid_eq(pfid, pfid2)) {
                                *type = LNIT_UNMATCHED_PAIRS;
@@ -2778,12 +2650,12 @@ rebuild:
                         * other parent directories, remove all of them. */
                        while (ldata->ld_lee != NULL) {
                                lfsck_namespace_unpack_linkea_entry(ldata,
-                                               cname, tfid, info->lti_key);
-                               if (!fid_is_sane(tfid))
+                                               cname, &tfid, info->lti_key);
+                               if (!fid_is_sane(&tfid))
                                        goto next;
 
                                parent = lfsck_object_find_bottom(env, lfsck,
-                                                                 tfid);
+                                                                 &tfid);
                                if (IS_ERR(parent)) {
                                        rc = PTR_ERR(parent);
                                        if (rc != -ENOENT &&
@@ -2825,7 +2697,7 @@ next:
                 * created by the LFSCK for repairing dangling name entry.
                 * Try to replace it. */
                rc = lfsck_namespace_replace_cond(env, com, parent, child,
-                                                 tfid, cname);
+                                                 &tfid, cname);
                lfsck_object_put(env, parent);
                if (rc < 0)
                        RETURN(rc);
@@ -3372,7 +3244,7 @@ lost_parent:
                                }
 
                                /* Create the lost parent as an orphan. */
-                               rc = lfsck_namespace_create_orphan(env, com,
+                               rc = lfsck_namespace_create_orphan_dir(env, com,
                                                                parent, NULL);
                                if (rc < 0) {
                                        lfsck_object_put(env, parent);
@@ -4537,50 +4409,6 @@ static int lfsck_namespace_in_notify(const struct lu_env *env,
        ENTRY;
 
        switch (lr->lr_event) {
-       case LE_CREATE_ORPHAN: {
-               struct dt_object *orphan = NULL;
-               struct lmv_mds_md_v1 *lmv;
-
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK handling notify from "
-                      "MDT %x to create orphan"DFID" with type %o\n",
-                      lfsck_lfsck2name(lfsck), lr->lr_index,
-                      PFID(&lr->lr_fid), lr->lr_type);
-
-               orphan = lfsck_object_find(env, lfsck, &lr->lr_fid);
-               if (IS_ERR(orphan))
-                       GOTO(out_create, rc = PTR_ERR(orphan));
-
-               if (dt_object_exists(orphan))
-                       GOTO(out_create, rc = -EEXIST);
-
-               if (lr->lr_stripe_count > 0) {
-                       lmv = &lfsck_env_info(env)->lti_lmv;
-                       memset(lmv, 0, sizeof(*lmv));
-                       lmv->lmv_hash_type = lr->lr_hash_type;
-                       lmv->lmv_stripe_count = lr->lr_stripe_count;
-                       lmv->lmv_layout_version = lr->lr_layout_version;
-                       memcpy(lmv->lmv_pool_name, lr->lr_pool_name,
-                              sizeof(lmv->lmv_pool_name));
-               } else {
-                       lmv = NULL;
-               }
-
-               rc = lfsck_namespace_create_orphan_local(env, com, orphan,
-                                                        lr->lr_type, lmv);
-
-               GOTO(out_create, rc = (rc == 1) ? 0 : rc);
-
-out_create:
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK handled notify from "
-                      "MDT %x to create orphan"DFID" with type %o: rc = %d\n",
-                      lfsck_lfsck2name(lfsck), lr->lr_index,
-                      PFID(&lr->lr_fid), lr->lr_type, rc);
-
-               if (orphan != NULL && !IS_ERR(orphan))
-                       lfsck_object_put(env, orphan);
-
-               return rc;
-       }
        case LE_SKIP_NLINK_DECLARE: {
                struct dt_object        *obj   = com->lc_obj;
                struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
@@ -6342,82 +6170,6 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
 }
 
 /**
- * Remove the name entry from the parent directory.
- *
- * No need to care about the object referenced by the name entry,
- * either the name entry is invalid or redundant, or the referenced
- * object has been processed has been or will be handled by others.
- *
- * \param[in] env      pointer to the thread context
- * \param[in] lfsck    pointer to the lfsck instance
- * \param[in] parent   pointer to the lost+found object
- * \param[in] name     the name for the name entry to be removed
- * \param[in] type     the type for the name entry to be removed
- *
- * \retval             0 for success
- * \retval             negative error number on failure
- */
-int lfsck_remove_name_entry(const struct lu_env *env,
-                           struct lfsck_instance *lfsck,
-                           struct dt_object *parent,
-                           const char *name, __u32 type)
-{
-       struct dt_device        *dev    = lfsck->li_next;
-       struct thandle          *th;
-       struct lustre_handle     lh     = { 0 };
-       int                      rc;
-       ENTRY;
-
-       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
-                             MDS_INODELOCK_UPDATE, LCK_EX);
-       if (rc != 0)
-               RETURN(rc);
-
-       th = dt_trans_create(env, dev);
-       if (IS_ERR(th))
-               GOTO(unlock, rc = PTR_ERR(th));
-
-       rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       if (S_ISDIR(type)) {
-               rc = dt_declare_ref_del(env, parent, th);
-               if (rc != 0)
-                       GOTO(stop, rc);
-       }
-
-       rc = dt_trans_start(env, dev, th);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       rc = dt_delete(env, parent, (const struct dt_key *)name, th,
-                      BYPASS_CAPA);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       if (S_ISDIR(type)) {
-               dt_write_lock(env, parent, 0);
-               rc = dt_ref_del(env, parent, th);
-               dt_write_unlock(env, parent);
-       }
-
-       GOTO(stop, rc);
-
-stop:
-       dt_trans_stop(env, dev, th);
-
-unlock:
-       lfsck_ibits_unlock(&lh, LCK_EX);
-
-       CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
-              "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
-              PFID(lfsck_dto2fid(parent)), name, type, rc);
-
-       return rc;
-}
-
-/**
  * Update the object's name entry with the given FID.
  *
  * \param[in] env      pointer to the thread context
index 709d4a2..b0655d0 100644 (file)
@@ -1695,8 +1695,8 @@ int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
        char                            *name   = info->lti_key;
        char                            *name2;
        struct lu_fid                   *pfid   = &info->lti_fid3;
-       struct lu_fid                   *tfid   = &info->lti_fid4;
        const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
+       struct lu_fid                    tfid;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct lmv_mds_md_v1            *clmv   = &llmv->ll_lmv;
        struct lmv_mds_md_v1            *plmv   = &info->lti_lmv;
@@ -1780,16 +1780,16 @@ int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
                 PFID(cfid), clmv->lmv_master_mdt_index);
        name2 = info->lti_tmpbuf2;
 
-       rc = lfsck_links_get_first(env, obj, name, tfid);
-       if (rc == 0 && strcmp(name, name2) == 0 && lu_fid_eq(pfid, tfid)) {
+       rc = lfsck_links_get_first(env, obj, name, &tfid);
+       if (rc == 0 && strcmp(name, name2) == 0 && lu_fid_eq(pfid, &tfid)) {
                llmv->ll_lmv_verified = 1;
 
                GOTO(out, rc);
        }
 
-       rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+       rc = dt_lookup(env, parent, (struct dt_rec *)&tfid,
                       (const struct dt_key *)name2, BYPASS_CAPA);
-       if (rc != 0 || !lu_fid_eq(cfid, tfid))
+       if (rc != 0 || !lu_fid_eq(cfid, &tfid))
                rc = lfsck_namespace_trace_update(env, com, cfid,
                                                  LNTF_UNCERTAIN_LMV, true);
        else
index f02221f..3e47d8a 100644 (file)
@@ -2673,9 +2673,8 @@ void lustre_swab_lfsck_request(struct lfsck_request *lr)
        lustre_swab_lu_fid(&lr->lr_fid);
        lustre_swab_lu_fid(&lr->lr_fid2);
        lustre_swab_lu_fid(&lr->lr_fid3);
-       __swab32s(&lr->lr_stripe_count);
-       __swab32s(&lr->lr_hash_type);
-       CLASSERT(offsetof(typeof(*lr), lr_padding_3) != 0);
+       CLASSERT(offsetof(typeof(*lr), lr_padding_1) != 0);
+       CLASSERT(offsetof(typeof(*lr), lr_padding_2) != 0);
 }
 EXPORT_SYMBOL(lustre_swab_lfsck_request);
 
index 6ce0e50..d843e95 100644 (file)
@@ -4682,18 +4682,14 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct lfsck_request, lr_fid3));
        LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_fid3) == 16, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_request *)0)->lr_fid3));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_stripe_count) == 80, "found %lld\n",
-                (long long)(int)offsetof(struct lfsck_request, lr_stripe_count));
-       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_stripe_count) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_stripe_count));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_hash_type) == 84, "found %lld\n",
-                (long long)(int)offsetof(struct lfsck_request, lr_hash_type));
-       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_hash_type) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_hash_type));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_3) == 88, "found %lld\n",
-                (long long)(int)offsetof(struct lfsck_request, lr_padding_3));
-       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_3) == 8, "found %lld\n",
-                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_3));
+       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_1) == 80, "found %lld\n",
+                (long long)(int)offsetof(struct lfsck_request, lr_padding_1));
+       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_1) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_1));
+       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_2) == 88, "found %lld\n",
+                (long long)(int)offsetof(struct lfsck_request, lr_padding_2));
+       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_2) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_2));
        LASSERTF(LFSCK_TYPE_SCRUB == 0x00000000UL, "found 0x%.8xUL\n",
                (unsigned)LFSCK_TYPE_SCRUB);
        LASSERTF(LFSCK_TYPE_LAYOUT == 0x00000001UL, "found 0x%.8xUL\n",
@@ -4722,8 +4718,6 @@ void lustre_assert_wire_constants(void)
                 (long long)LE_CONDITIONAL_DESTROY);
        LASSERTF(LE_PAIRS_VERIFY == 11, "found %lld\n",
                 (long long)LE_PAIRS_VERIFY);
-       LASSERTF(LE_CREATE_ORPHAN == 12, "found %lld\n",
-                (long long)LE_CREATE_ORPHAN);
        LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n",
                 (long long)LE_SKIP_NLINK_DECLARE);
        LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
index a2efeac..fc94019 100644 (file)
@@ -2132,9 +2132,8 @@ static void check_lfsck_request(void)
        CHECK_MEMBER(lfsck_request, lr_fid);
        CHECK_MEMBER(lfsck_request, lr_fid2);
        CHECK_MEMBER(lfsck_request, lr_fid3);
-       CHECK_MEMBER(lfsck_request, lr_stripe_count);
-       CHECK_MEMBER(lfsck_request, lr_hash_type);
-       CHECK_MEMBER(lfsck_request, lr_padding_3);
+       CHECK_MEMBER(lfsck_request, lr_padding_1);
+       CHECK_MEMBER(lfsck_request, lr_padding_2);
 
        CHECK_VALUE_X(LFSCK_TYPE_SCRUB);
        CHECK_VALUE_X(LFSCK_TYPE_LAYOUT);
@@ -2151,7 +2150,6 @@ static void check_lfsck_request(void)
        CHECK_VALUE(LE_PEER_EXIT);
        CHECK_VALUE(LE_CONDITIONAL_DESTROY);
        CHECK_VALUE(LE_PAIRS_VERIFY);
-       CHECK_VALUE(LE_CREATE_ORPHAN);
        CHECK_VALUE(LE_SKIP_NLINK_DECLARE);
        CHECK_VALUE(LE_SKIP_NLINK);
        CHECK_VALUE(LE_SET_LMV_MASTER);
index 131e33d..2121298 100644 (file)
@@ -4691,18 +4691,14 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct lfsck_request, lr_fid3));
        LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_fid3) == 16, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_request *)0)->lr_fid3));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_stripe_count) == 80, "found %lld\n",
-                (long long)(int)offsetof(struct lfsck_request, lr_stripe_count));
-       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_stripe_count) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_stripe_count));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_hash_type) == 84, "found %lld\n",
-                (long long)(int)offsetof(struct lfsck_request, lr_hash_type));
-       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_hash_type) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_hash_type));
-       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_3) == 88, "found %lld\n",
-                (long long)(int)offsetof(struct lfsck_request, lr_padding_3));
-       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_3) == 8, "found %lld\n",
-                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_3));
+       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_1) == 80, "found %lld\n",
+                (long long)(int)offsetof(struct lfsck_request, lr_padding_1));
+       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_1) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_1));
+       LASSERTF((int)offsetof(struct lfsck_request, lr_padding_2) == 88, "found %lld\n",
+                (long long)(int)offsetof(struct lfsck_request, lr_padding_2));
+       LASSERTF((int)sizeof(((struct lfsck_request *)0)->lr_padding_2) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct lfsck_request *)0)->lr_padding_2));
        LASSERTF(LFSCK_TYPE_SCRUB == 0x00000000UL, "found 0x%.8xUL\n",
                (unsigned)LFSCK_TYPE_SCRUB);
        LASSERTF(LFSCK_TYPE_LAYOUT == 0x00000001UL, "found 0x%.8xUL\n",
@@ -4731,8 +4727,6 @@ void lustre_assert_wire_constants(void)
                 (long long)LE_CONDITIONAL_DESTROY);
        LASSERTF(LE_PAIRS_VERIFY == 11, "found %lld\n",
                 (long long)LE_PAIRS_VERIFY);
-       LASSERTF(LE_CREATE_ORPHAN == 12, "found %lld\n",
-                (long long)LE_CREATE_ORPHAN);
        LASSERTF(LE_SKIP_NLINK_DECLARE == 13, "found %lld\n",
                 (long long)LE_SKIP_NLINK_DECLARE);
        LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",