Whamcloud - gitweb
LU-5513 lfsck: repair multiple referenced name entry 47/12247/5
authorFan Yong <fan.yong@intel.com>
Sun, 17 Aug 2014 23:50:56 +0000 (07:50 +0800)
committerAndreas Dilger <andreas.dilger@intel.com>
Thu, 16 Oct 2014 17:04:42 +0000 (17:04 +0000)
If more than one MDT-objects back reference the same name entry via
their each own linkEA entry, but the name entry only references one
MDT-object of them. Then for those non-recognized MDT-objects, the
namespace LFSCK will remove the non-recognized linkEA entries when
double scan them. And if the MDT-object has no linkEA entries left
after the cleanup, then it will be regarded as orphan and inserted
into .lustre/lost+found/MDTxxxx/ directory with the name:
${FID}-${infix}-${type}-${conflict_version}

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ie50711c3c17861bd2c0f491052caa9dcea0e02f2
Reviewed-on: http://review.whamcloud.com/12247
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Andreas Dilger <andreas.dilger@intel.com>
lustre/include/obd_support.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_namespace.c
lustre/mdd/mdd_dir.c
lustre/obdclass/linkea.c
lustre/tests/sanity-lfsck.sh
lustre/tests/test-framework.sh

index 4c80683..b4098ea 100644 (file)
@@ -521,6 +521,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_BAD_PARENT2     0x161f
 #define OBD_FAIL_LFSCK_DANGLING2       0x1620
 #define OBD_FAIL_LFSCK_DANGLING3       0x1621
+#define OBD_FAIL_LFSCK_MUL_REF         0x1622
 
 #define OBD_FAIL_LFSCK_NOTIFY_NET      0x16f0
 #define OBD_FAIL_LFSCK_QUERY_NET       0x16f1
index ab25ede..1c3639b 100644 (file)
@@ -119,6 +119,7 @@ enum lfsck_namespace_inconsistency_type {
        LNIT_BAD_LINKEA         = 1,
        LNIT_UNMATCHED_PAIRS    = 2,
        LNIT_DANGLING           = 3,
+       LNIT_MUL_REF            = 4,
 };
 
 struct lfsck_namespace {
@@ -209,8 +210,12 @@ struct lfsck_namespace {
        /* How many dangling name entries have been found/repaired. */
        __u64   ln_dangling_repaired;
 
+       /* How many multiple referenced name entries have been
+        * found/repaired. */
+       __u64   ln_mul_ref_repaired;
+
        /* For further using. 256-bytes aligned now. */
-       __u64   ln_reserved[28];
+       __u64   ln_reserved[27];
 };
 
 enum lfsck_layout_inconsistency_type {
@@ -650,6 +655,7 @@ struct lfsck_thread_info {
        struct lu_fid           lti_fid2;
        struct lu_fid           lti_fid3;
        struct lu_fid           lti_fid4;
+       struct lu_fid           lti_fid5;
        struct lu_attr          lti_la;
        struct lu_attr          lti_la2;
        struct lu_attr          lti_la3;
index 54b3498..8a688c7 100644 (file)
@@ -128,6 +128,7 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
        dst->ln_unmatched_pairs_repaired =
                                le64_to_cpu(src->ln_unmatched_pairs_repaired);
        dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
+       dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
 }
 
 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
@@ -170,6 +171,7 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_unmatched_pairs_repaired =
                                cpu_to_le64(src->ln_unmatched_pairs_repaired);
        dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
+       dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
 }
 
 static void lfsck_namespace_record_failure(const struct lu_env *env,
@@ -596,14 +598,228 @@ static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
        return repeated;
 }
 
+/**
+ * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
+ *
+ * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
+ * with the given type to generate the name, the detailed rules for name
+ * have been described as following.
+ *
+ * The function also generates the linkEA corresponding to the name entry
+ * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] orphan   pointer to the orphan MDT-object
+ * \param[in] infix    additional information for the orphan name, such as
+ *                     the FID for original
+ * \param[in] type     the type for describing why the orphan MDT-object is
+ *                     created. The rules are as following:
+ *
+ *  type "D":          The MDT-object is a directory, it may knows its parent
+ *                     but because there is no valid linkEA, the LFSCK cannot
+ *                     know where to put it back to the namespace.
+ *  type "O":          The MDT-object has no linkEA, and there is no name
+ *                     entry that references the MDT-object.
+ *
+ * \see lfsck_layout_recreate_parent() for more types.
+ *
+ * The orphan name will be like:
+ * ${FID}-${infix}-${type}-${conflict_version}
+ *
+ * \param[out] count   if some others inserted some linkEA entries by race,
+ *                     then return the linkEA entries count.
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if needs to repair nothing
+ * \retval             negative error number on failure
+ */
 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
                                         struct lfsck_component *com,
                                         struct dt_object *orphan,
                                         const char *infix, const char *type,
                                         int *count)
 {
-       /* XXX: TBD */
-       return 0;
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_name                  *cname  = &info->lti_name;
+       struct dt_insert_rec            *rec    = &info->lti_dt_rec;
+       struct lu_fid                   *tfid   = &info->lti_fid5;
+       const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
+       const struct lu_fid             *pfid;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_bottom;
+       struct dt_object                *parent;
+       struct thandle                  *th     = NULL;
+       struct lustre_handle             plh    = { 0 };
+       struct lustre_handle             clh    = { 0 };
+       struct linkea_data               ldata  = { 0 };
+       struct lu_buf                    linkea_buf;
+       int                              namelen;
+       int                              idx    = 0;
+       int                              rc     = 0;
+       bool                             exist  = false;
+       ENTRY;
+
+       cname->ln_name = NULL;
+       /* Create .lustre/lost+found/MDTxxxx when needed. */
+       if (unlikely(lfsck->li_lpf_obj == NULL)) {
+               rc = lfsck_create_lpf(env, lfsck);
+               if (rc != 0)
+                       GOTO(log, rc);
+       }
+
+       parent = lfsck->li_lpf_obj;
+       pfid = lfsck_dto2fid(parent);
+
+       /* Hold update lock on the parent to prevent others to access. */
+       rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
+                             MDS_INODELOCK_UPDATE, LCK_EX);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       do {
+               namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
+                                  PFID(cfid), infix, type, idx++);
+               rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+                              (const struct dt_key *)info->lti_key,
+                              BYPASS_CAPA);
+               if (rc != 0 && rc != -ENOENT)
+                       GOTO(log, rc);
+
+               if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid)))
+                       exist = true;
+       } while (rc == 0 && !exist);
+
+       cname->ln_name = info->lti_key;
+       cname->ln_namelen = namelen;
+       rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       rc = linkea_add_buf(&ldata, cname, pfid);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
+                             MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
+                             LCK_EX);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+                      ldata.ld_leh->leh_len);
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       if (S_ISDIR(lfsck_object_type(orphan))) {
+               rc = dt_declare_delete(env, orphan,
+                                      (const struct dt_key *)dotdot, th);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rec->rec_type = S_IFDIR;
+               rec->rec_fid = pfid;
+               rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)dotdot, th);
+               if (rc != 0)
+                       GOTO(stop, rc);
+       }
+
+       rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       if (!exist) {
+               rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
+               rec->rec_fid = cfid;
+               rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)cname->ln_name,
+                                      th);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               if (S_ISDIR(rec->rec_type)) {
+                       rc = dt_declare_ref_add(env, parent, th);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+       }
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, orphan, 0);
+       rc = lfsck_links_read(env, orphan, &ldata);
+       if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
+                  (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
+               if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+                       GOTO(unlock, rc = 1);
+
+               if (S_ISDIR(lfsck_object_type(orphan))) {
+                       rc = dt_delete(env, orphan,
+                                      (const struct dt_key *)dotdot, th,
+                                      BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(unlock, rc);
+
+                       rec->rec_type = S_IFDIR;
+                       rec->rec_fid = pfid;
+                       rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)dotdot, th,
+                                      BYPASS_CAPA, 1);
+                       if (rc != 0)
+                               GOTO(unlock, rc);
+               }
+
+               rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
+                                 th, BYPASS_CAPA);
+       } else {
+               if (rc == 0 && count != NULL)
+                       *count = ldata.ld_leh->leh_reccount;
+
+               GOTO(unlock, rc);
+       }
+       dt_write_unlock(env, orphan);
+
+       if (rc == 0 && !exist) {
+               rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
+               rec->rec_fid = cfid;
+               rc = dt_insert(env, parent, (const struct dt_rec *)rec,
+                              (const struct dt_key *)cname->ln_name,
+                              th, BYPASS_CAPA, 1);
+               if (rc == 0 && S_ISDIR(rec->rec_type)) {
+                       dt_write_lock(env, parent, 0);
+                       rc = dt_ref_add(env, parent, th);
+                       dt_write_unlock(env, parent);
+               }
+       }
+
+       GOTO(stop, rc = (rc == 0 ? 1 : rc));
+
+unlock:
+       dt_write_unlock(env, orphan);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       lfsck_ibits_unlock(&clh, LCK_EX);
+       lfsck_ibits_unlock(&plh, LCK_EX);
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
+              "object "DFID", name = %s: rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(cfid),
+              cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
+
+       if (rc != 0) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+       }
+
+       return rc;
 }
 
 static int lfsck_namespace_insert_normal(const struct lu_env *env,
@@ -866,7 +1082,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
                                        const struct lu_name *cname)
 {
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       struct lu_fid                   *tfid   = &info->lti_fid4;
+       struct lu_fid                   *tfid   = &info->lti_fid5;
        struct lu_attr                  *la     = &info->lti_la;
        struct dt_insert_rec            *rec    = &info->lti_dt_rec;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
@@ -1233,16 +1449,19 @@ log:
  * \param[in] obj      pointer to the orphan object to be handled
  * \param[in] pfid     the new fid for the object's ".." name entry
  * \param[in,out] lh   ldlm lock handler for the given @obj
+ * \param[out] type    to tell the caller what the inconsistency is
  *
  * \retval             positive number for repaired cases
  * \retval             0 if nothing to be repaired
  * \retval             negative error number on failure
  */
-static int lfsck_namespace_dsd_orphan(const struct lu_env *env,
-                                     struct lfsck_component *com,
-                                     struct dt_object *obj,
-                                     const struct lu_fid *pfid,
-                                     struct lustre_handle *lh)
+static int
+lfsck_namespace_dsd_orphan(const struct lu_env *env,
+                          struct lfsck_component *com,
+                          struct dt_object *obj,
+                          const struct lu_fid *pfid,
+                          struct lustre_handle *lh,
+                          enum lfsck_namespace_inconsistency_type *type)
 {
        struct lfsck_thread_info *info = lfsck_env_info(env);
        int                       rc;
@@ -1254,6 +1473,7 @@ static int lfsck_namespace_dsd_orphan(const struct lu_env *env,
        if (rc < 0 && rc != -ENODATA)
                RETURN(rc);
 
+       *type = LNIT_MUL_REF;
        /* The unique linkEA is invalid, even if the ".." name entry may be
         * valid, we still cannot know via which name entry this directory
         * will be referenced. Then handle it as pure orphan. */
@@ -1314,7 +1534,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
                        *retry = true;
                else
                        rc = lfsck_namespace_dsd_orphan(env, com, child,
-                                                       pfid, lh);
+                                                       pfid, lh, type);
 
                GOTO(out, rc);
        }
@@ -1352,7 +1572,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
                        *retry = true;
                else
                        rc = lfsck_namespace_dsd_orphan(env, com, child,
-                                                       pfid, lh);
+                                                       pfid, lh, type);
 
                GOTO(out, rc);
        }
@@ -1392,7 +1612,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
                                                  tfid, cname);
                if (rc == 0)
                        rc = lfsck_namespace_dsd_orphan(env, com, child,
-                                                       pfid, lh);
+                                                       pfid, lh, type);
 
                GOTO(out, rc);
        }
@@ -1591,7 +1811,8 @@ rebuild:
        /* All linkEA entries are invalid and removed, then handle the @child
         * as an orphan.*/
        if (ldata->ld_leh->leh_reccount == 0) {
-               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh);
+               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
+                                               type);
 
                RETURN(rc);
        }
@@ -1757,6 +1978,7 @@ lock:
                         *    but no parent references this child
                         *    directory, then handle it as orphan. */
                        lfsck_ibits_unlock(&lh, LCK_EX);
+                       type = LNIT_MUL_REF;
                        snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
                                 "-"DFID, PFID(pfid));
                        rc = lfsck_namespace_insert_orphan(env, com, child,
@@ -1785,7 +2007,8 @@ lock:
                goto lock;
 
        if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
-               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh);
+               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
+                                               &type);
 
                GOTO(out, rc);
        }
@@ -1815,6 +2038,9 @@ out:
                case LNIT_UNMATCHED_PAIRS:
                        ns->ln_unmatched_pairs_repaired++;
                        break;
+               case LNIT_MUL_REF:
+                       ns->ln_mul_ref_repaired++;
+                       break;
                default:
                        break;
                }
@@ -2069,8 +2295,10 @@ out:
                if (rc < 0)
                        return rc;
 
-               if (rc > 0)
+               if (rc > 0) {
+                       ns->ln_mul_ref_repaired++;
                        repaired = true;
+               }
        }
 
        rc = dt_attr_get(env, child, la, BYPASS_CAPA);
@@ -2116,6 +2344,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                      "unknown_inconsistency: "LPU64"\n"
                      "unmatched_pairs_repaired: "LPU64"\n"
                      "dangling_repaired: "LPU64"\n"
+                     "multiple_referenced_repaired: "LPU64"\n"
                      "success_count: %u\n"
                      "run_time_phase1: %u seconds\n"
                      "run_time_phase2: %u seconds\n",
@@ -2135,6 +2364,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                      ns->ln_unknown_inconsistency,
                      ns->ln_unmatched_pairs_repaired,
                      ns->ln_dangling_repaired,
+                     ns->ln_mul_ref_repaired,
                      ns->ln_success_count,
                      time_phase1,
                      time_phase2);
@@ -2313,6 +2543,7 @@ static int lfsck_namespace_prep(const struct lu_env *env,
                        ns->ln_unknown_inconsistency = 0;
                        ns->ln_unmatched_pairs_repaired = 0;
                        ns->ln_dangling_repaired = 0;
+                       ns->ln_mul_ref_repaired = 0;
                        fid_zero(&ns->ln_fid_latest_scanned_phase2);
                        if (list_empty(&com->lc_link_dir))
                                list_add_tail(&com->lc_link_dir,
index b1bff5c..a176ce2 100644 (file)
@@ -1641,6 +1641,9 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                        GOTO(cleanup, rc);
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF))
+               GOTO(cleanup, rc = 0);
+
        if (likely(mdd_cobj != NULL)) {
                rc = mdo_ref_del(env, mdd_cobj, handle);
                if (rc != 0) {
index c3f7e6d..5c2eaea 100644 (file)
@@ -78,9 +78,12 @@ int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
        struct lu_fid   tmpfid;
        int             reclen;
 
-       fid_cpu_to_be(&tmpfid, pfid);
+       tmpfid = *pfid;
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MUL_REF))
+               tmpfid.f_oid--;
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_CRASH))
                tmpfid.f_ver = ~0;
+       fid_cpu_to_be(&tmpfid, &tmpfid);
        memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid));
        memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen);
        reclen = sizeof(struct link_ea_entry) + lname->ln_namelen;
index a3e832e..8fe3c86 100644 (file)
@@ -19,6 +19,8 @@ init_logging
 
 require_dsh_mds || exit 0
 
+LTIME=${LTIME:-120}
+
 SAVED_MDSSIZE=${MDSSIZE}
 SAVED_OSTSIZE=${OSTSIZE}
 SAVED_OSTCOUNT=${OSTCOUNT}
@@ -44,7 +46,7 @@ setupall
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18 19 20 21"
 
 [[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.6.50) ]] &&
-       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23"
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24"
 
 build_test_filter
 
@@ -1707,7 +1709,7 @@ test_18a() {
                # time to guarantee the status sync up.
                wait_update_facet mds${k} "$LCTL get_param -n \
                        mdd.$(facet_svc mds${k}).lfsck_layout |
-                       awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+                       awk '/^status/ { print \\\$2 }'" "completed" $LTIME ||
                        error "(4) MDS${k} is not the expected 'completed'"
        done
 
@@ -1811,7 +1813,7 @@ test_18b() {
                # time to guarantee the status sync up.
                wait_update_facet mds${k} "$LCTL get_param -n \
                        mdd.$(facet_svc mds${k}).lfsck_layout |
-                       awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+                       awk '/^status/ { print \\\$2 }'" "completed" $LTIME ||
                        error "(2) MDS${k} is not the expected 'completed'"
        done
 
@@ -1924,7 +1926,7 @@ test_18c() {
                # time to guarantee the status sync up.
                wait_update_facet mds${k} "$LCTL get_param -n \
                        mdd.$(facet_svc mds${k}).lfsck_layout |
-                       awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+                       awk '/^status/ { print \\\$2 }'" "completed" $LTIME ||
                        error "(2) MDS${k} is not the expected 'completed'"
        done
 
@@ -2028,7 +2030,7 @@ test_18d() {
 
        wait_update_facet mds1 "$LCTL get_param -n \
                mdd.$(facet_svc mds1).lfsck_layout |
-               awk '/^status/ { print \\\$2 }'" "scanning-phase2" 32 ||
+               awk '/^status/ { print \\\$2 }'" "scanning-phase2" $LTIME ||
                error "(3.0) MDS1 is not the expected 'scanning-phase2'"
 
        do_facet $SINGLEMDS $LCTL set_param fail_val=0 fail_loc=0
@@ -2039,7 +2041,7 @@ test_18d() {
                # time to guarantee the status sync up.
                wait_update_facet mds${k} "$LCTL get_param -n \
                        mdd.$(facet_svc mds${k}).lfsck_layout |
-                       awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+                       awk '/^status/ { print \\\$2 }'" "completed" $LTIME ||
                        error "(3) MDS${k} is not the expected 'completed'"
        done
 
@@ -2122,7 +2124,7 @@ test_18e() {
 
        wait_update_facet mds1 "$LCTL get_param -n \
                mdd.$(facet_svc mds1).lfsck_layout |
-               awk '/^status/ { print \\\$2 }'" "scanning-phase2" 32 ||
+               awk '/^status/ { print \\\$2 }'" "scanning-phase2" $LTIME ||
                error "(3) MDS1 is not the expected 'scanning-phase2'"
 
        # to guarantee all updates are synced.
@@ -2140,7 +2142,7 @@ test_18e() {
                # time to guarantee the status sync up.
                wait_update_facet mds${k} "$LCTL get_param -n \
                        mdd.$(facet_svc mds${k}).lfsck_layout |
-                       awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+                       awk '/^status/ { print \\\$2 }'" "completed" $LTIME ||
                        error "(4) MDS${k} is not the expected 'completed'"
        done
 
@@ -2256,19 +2258,19 @@ test_18f() {
                # time to guarantee the status sync up.
                wait_update_facet mds${k} "$LCTL get_param -n \
                        mdd.$(facet_svc mds${k}).lfsck_layout |
-                       awk '/^status/ { print \\\$2 }'" "partial" 32 ||
+                       awk '/^status/ { print \\\$2 }'" "partial" $LTIME ||
                        error "(2) MDS${k} is not the expected 'partial'"
        done
 
        wait_update_facet ost1 "$LCTL get_param -n \
                obdfilter.$(facet_svc ost1).lfsck_layout |
-               awk '/^status/ { print \\\$2 }'" "partial" 32 || {
+               awk '/^status/ { print \\\$2 }'" "partial" $LTIME || {
                error "(3) OST1 is not the expected 'partial'"
        }
 
        wait_update_facet ost2 "$LCTL get_param -n \
                obdfilter.$(facet_svc ost2).lfsck_layout |
-               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               awk '/^status/ { print \\\$2 }'" "completed" $LTIME || {
                error "(4) OST2 is not the expected 'completed'"
        }
 
@@ -2297,7 +2299,7 @@ test_18f() {
                # time to guarantee the status sync up.
                wait_update_facet mds${k} "$LCTL get_param -n \
                        mdd.$(facet_svc mds${k}).lfsck_layout |
-                       awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+                       awk '/^status/ { print \\\$2 }'" "completed" $LTIME ||
                        error "(8) MDS${k} is not the expected 'completed'"
        done
 
@@ -3028,6 +3030,80 @@ test_23c() {
 }
 run_test 23c "LFSCK can repair dangling name entry (3)"
 
+test_24() {
+       [ $MDSCOUNT -lt 2 ] &&
+               skip "We need at least 2 MDSes for this test" && return
+
+       echo "#####"
+       echo "Two MDT-objects back reference the same name entry via their"
+       echo "each own linkEA entry, but the name entry only references one"
+       echo "MDT-object. The namespace LFSCK will remove the linkEA entry"
+       echo "for the MDT-object that is not recognized. If such MDT-object"
+       echo "has no other linkEA entry after the removing, then the LFSCK"
+       echo "will add it as orphan under the .lustre/lost+found/MDTxxxx/."
+       echo "#####"
+
+       check_mount_and_prep
+
+       $LFS mkdir -i 1 $DIR/$tdir/d0 || error "(1) Fail to mkdir d0"
+
+       mkdir $DIR/$tdir/d0/guard || error "(1) Fail to mkdir guard"
+       $LFS path2fid $DIR/$tdir/d0/guard
+
+       mkdir $DIR/$tdir/d0/dummy || error "(2) Fail to mkdir dummy"
+       $LFS path2fid $DIR/$tdir/d0/dummy
+       local pfid=$($LFS path2fid $DIR/$tdir/d0/dummy)
+
+       touch $DIR/$tdir/d0/guard/foo ||
+               error "(3) Fail to touch $DIR/$tdir/d0/guard/foo"
+
+       echo "Inject failure stub on MDT0 to simulate the case that"
+       echo "the $DIR/$tdir/d0/dummy/foo has the 'bad' linkEA entry"
+       echo "that references $DIR/$tdir/d0/guard/foo."
+       echo "Then remove the name entry $DIR/$tdir/d0/dummy/foo."
+       echo "So the MDT-object $DIR/$tdir/d0/dummy/foo will be left"
+       echo "there with the same linkEA entry as another MDT-object"
+       echo "$DIR/$tdir/d0/guard/foo has"
+
+       #define OBD_FAIL_LFSCK_MUL_REF          0x1622
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1622
+       $LFS mkdir -i 0 $DIR/$tdir/d0/dummy/foo ||
+               error "(4) Fail to mkdir $DIR/$tdir/d0/dummy/foo"
+       local cfid=$($LFS path2fid $DIR/$tdir/d0/dummy/foo)
+       rmdir $DIR/$tdir/d0/dummy/foo ||
+               error "(5) Fail to remove $DIR/$tdir/d0/dummy/foo name entry"
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+
+       echo "stat $DIR/$tdir/d0/dummy/foo should fail"
+       stat $DIR/$tdir/d0/dummy/foo > /dev/null 2>&1 &&
+               error "(6) stat successfully unexpectedly"
+
+       echo "Trigger namespace LFSCK to repair multiple-referenced name entry"
+       $START_NAMESPACE -A -r ||
+               error "(7) Fail to start LFSCK for namespace"
+
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(8) unexpected status"
+       }
+
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^multiple_referenced_repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+       error "(9) Fail to repair multiple referenced name entry: $repaired"
+
+       echo "There should be an orphan under .lustre/lost+found/MDT0000/"
+       [ -d $MOUNT/.lustre/lost+found/MDT0000 ] ||
+               error "(10) $MOUNT/.lustre/lost+found/MDT0000/ should be there"
+
+       local cname="$cfid-$pfid-D-0"
+       ls -ail $MOUNT/.lustre/lost+found/MDT0000/$cname ||
+               error "(11) .lustre/lost+found/MDT0000/ should not be empty"
+}
+run_test 24 "LFSCK can repair multiple-referenced name entry"
+
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size
index 1bc644b..f48347a 100755 (executable)
@@ -6923,6 +6923,10 @@ check_mount_and_prep()
 
        rm -rf $DIR/[df][0-9]* || error "Fail to cleanup the env!"
        mkdir $DIR/$tdir || error "Fail to mkdir $DIR/$tdir."
+       for idx in $(seq $MDSCOUNT); do
+               local name="MDT$(printf '%04x' $((idx - 1)))"
+               rm -rf $MOUNT/.lustre/lost+found/$name/*
+       done
 }
 
 # calcule how many ost-objects to be created.