Whamcloud - gitweb
LU-5513 lfsck: repair multiple referenced name entry
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index 54b3498..8a688c7 100644 (file)
@@ -128,6 +128,7 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
        dst->ln_unmatched_pairs_repaired =
                                le64_to_cpu(src->ln_unmatched_pairs_repaired);
        dst->ln_dangling_repaired = le64_to_cpu(src->ln_dangling_repaired);
+       dst->ln_mul_ref_repaired = le64_to_cpu(src->ln_mul_ref_repaired);
 }
 
 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
@@ -170,6 +171,7 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_unmatched_pairs_repaired =
                                cpu_to_le64(src->ln_unmatched_pairs_repaired);
        dst->ln_dangling_repaired = cpu_to_le64(src->ln_dangling_repaired);
+       dst->ln_mul_ref_repaired = cpu_to_le64(src->ln_mul_ref_repaired);
 }
 
 static void lfsck_namespace_record_failure(const struct lu_env *env,
@@ -596,14 +598,228 @@ static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
        return repeated;
 }
 
+/**
+ * Insert orphan into .lustre/lost+found/MDTxxxx/ locally.
+ *
+ * Add the specified orphan MDT-object to the .lustre/lost+found/MDTxxxx/
+ * with the given type to generate the name, the detailed rules for name
+ * have been described as following.
+ *
+ * The function also generates the linkEA corresponding to the name entry
+ * under the .lustre/lost+found/MDTxxxx/ for the orphan MDT-object.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] orphan   pointer to the orphan MDT-object
+ * \param[in] infix    additional information for the orphan name, such as
+ *                     the FID for original
+ * \param[in] type     the type for describing why the orphan MDT-object is
+ *                     created. The rules are as following:
+ *
+ *  type "D":          The MDT-object is a directory, it may knows its parent
+ *                     but because there is no valid linkEA, the LFSCK cannot
+ *                     know where to put it back to the namespace.
+ *  type "O":          The MDT-object has no linkEA, and there is no name
+ *                     entry that references the MDT-object.
+ *
+ * \see lfsck_layout_recreate_parent() for more types.
+ *
+ * The orphan name will be like:
+ * ${FID}-${infix}-${type}-${conflict_version}
+ *
+ * \param[out] count   if some others inserted some linkEA entries by race,
+ *                     then return the linkEA entries count.
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if needs to repair nothing
+ * \retval             negative error number on failure
+ */
 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
                                         struct lfsck_component *com,
                                         struct dt_object *orphan,
                                         const char *infix, const char *type,
                                         int *count)
 {
-       /* XXX: TBD */
-       return 0;
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_name                  *cname  = &info->lti_name;
+       struct dt_insert_rec            *rec    = &info->lti_dt_rec;
+       struct lu_fid                   *tfid   = &info->lti_fid5;
+       const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
+       const struct lu_fid             *pfid;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_bottom;
+       struct dt_object                *parent;
+       struct thandle                  *th     = NULL;
+       struct lustre_handle             plh    = { 0 };
+       struct lustre_handle             clh    = { 0 };
+       struct linkea_data               ldata  = { 0 };
+       struct lu_buf                    linkea_buf;
+       int                              namelen;
+       int                              idx    = 0;
+       int                              rc     = 0;
+       bool                             exist  = false;
+       ENTRY;
+
+       cname->ln_name = NULL;
+       /* Create .lustre/lost+found/MDTxxxx when needed. */
+       if (unlikely(lfsck->li_lpf_obj == NULL)) {
+               rc = lfsck_create_lpf(env, lfsck);
+               if (rc != 0)
+                       GOTO(log, rc);
+       }
+
+       parent = lfsck->li_lpf_obj;
+       pfid = lfsck_dto2fid(parent);
+
+       /* Hold update lock on the parent to prevent others to access. */
+       rc = lfsck_ibits_lock(env, lfsck, parent, &plh,
+                             MDS_INODELOCK_UPDATE, LCK_EX);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       do {
+               namelen = snprintf(info->lti_key, NAME_MAX, DFID"%s-%s-%d",
+                                  PFID(cfid), infix, type, idx++);
+               rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+                              (const struct dt_key *)info->lti_key,
+                              BYPASS_CAPA);
+               if (rc != 0 && rc != -ENOENT)
+                       GOTO(log, rc);
+
+               if (unlikely(rc == 0 && lu_fid_eq(cfid, tfid)))
+                       exist = true;
+       } while (rc == 0 && !exist);
+
+       cname->ln_name = info->lti_key;
+       cname->ln_namelen = namelen;
+       rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       rc = linkea_add_buf(&ldata, cname, pfid);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       rc = lfsck_ibits_lock(env, lfsck, orphan, &clh,
+                             MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP,
+                             LCK_EX);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+                      ldata.ld_leh->leh_len);
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       if (S_ISDIR(lfsck_object_type(orphan))) {
+               rc = dt_declare_delete(env, orphan,
+                                      (const struct dt_key *)dotdot, th);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rec->rec_type = S_IFDIR;
+               rec->rec_fid = pfid;
+               rc = dt_declare_insert(env, orphan, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)dotdot, th);
+               if (rc != 0)
+                       GOTO(stop, rc);
+       }
+
+       rc = dt_declare_xattr_set(env, orphan, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       if (!exist) {
+               rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
+               rec->rec_fid = cfid;
+               rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)cname->ln_name,
+                                      th);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               if (S_ISDIR(rec->rec_type)) {
+                       rc = dt_declare_ref_add(env, parent, th);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+       }
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, orphan, 0);
+       rc = lfsck_links_read(env, orphan, &ldata);
+       if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
+                  (rc == 0 && ldata.ld_leh->leh_reccount == 0))) {
+               if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+                       GOTO(unlock, rc = 1);
+
+               if (S_ISDIR(lfsck_object_type(orphan))) {
+                       rc = dt_delete(env, orphan,
+                                      (const struct dt_key *)dotdot, th,
+                                      BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(unlock, rc);
+
+                       rec->rec_type = S_IFDIR;
+                       rec->rec_fid = pfid;
+                       rc = dt_insert(env, orphan, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)dotdot, th,
+                                      BYPASS_CAPA, 1);
+                       if (rc != 0)
+                               GOTO(unlock, rc);
+               }
+
+               rc = dt_xattr_set(env, orphan, &linkea_buf, XATTR_NAME_LINK, 0,
+                                 th, BYPASS_CAPA);
+       } else {
+               if (rc == 0 && count != NULL)
+                       *count = ldata.ld_leh->leh_reccount;
+
+               GOTO(unlock, rc);
+       }
+       dt_write_unlock(env, orphan);
+
+       if (rc == 0 && !exist) {
+               rec->rec_type = lfsck_object_type(orphan) & S_IFMT;
+               rec->rec_fid = cfid;
+               rc = dt_insert(env, parent, (const struct dt_rec *)rec,
+                              (const struct dt_key *)cname->ln_name,
+                              th, BYPASS_CAPA, 1);
+               if (rc == 0 && S_ISDIR(rec->rec_type)) {
+                       dt_write_lock(env, parent, 0);
+                       rc = dt_ref_add(env, parent, th);
+                       dt_write_unlock(env, parent);
+               }
+       }
+
+       GOTO(stop, rc = (rc == 0 ? 1 : rc));
+
+unlock:
+       dt_write_unlock(env, orphan);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       lfsck_ibits_unlock(&clh, LCK_EX);
+       lfsck_ibits_unlock(&plh, LCK_EX);
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK insert orphan for the "
+              "object "DFID", name = %s: rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(cfid),
+              cname->ln_name != NULL ? cname->ln_name : "<NULL>", rc);
+
+       if (rc != 0) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+       }
+
+       return rc;
 }
 
 static int lfsck_namespace_insert_normal(const struct lu_env *env,
@@ -866,7 +1082,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
                                        const struct lu_name *cname)
 {
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       struct lu_fid                   *tfid   = &info->lti_fid4;
+       struct lu_fid                   *tfid   = &info->lti_fid5;
        struct lu_attr                  *la     = &info->lti_la;
        struct dt_insert_rec            *rec    = &info->lti_dt_rec;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
@@ -1233,16 +1449,19 @@ log:
  * \param[in] obj      pointer to the orphan object to be handled
  * \param[in] pfid     the new fid for the object's ".." name entry
  * \param[in,out] lh   ldlm lock handler for the given @obj
+ * \param[out] type    to tell the caller what the inconsistency is
  *
  * \retval             positive number for repaired cases
  * \retval             0 if nothing to be repaired
  * \retval             negative error number on failure
  */
-static int lfsck_namespace_dsd_orphan(const struct lu_env *env,
-                                     struct lfsck_component *com,
-                                     struct dt_object *obj,
-                                     const struct lu_fid *pfid,
-                                     struct lustre_handle *lh)
+static int
+lfsck_namespace_dsd_orphan(const struct lu_env *env,
+                          struct lfsck_component *com,
+                          struct dt_object *obj,
+                          const struct lu_fid *pfid,
+                          struct lustre_handle *lh,
+                          enum lfsck_namespace_inconsistency_type *type)
 {
        struct lfsck_thread_info *info = lfsck_env_info(env);
        int                       rc;
@@ -1254,6 +1473,7 @@ static int lfsck_namespace_dsd_orphan(const struct lu_env *env,
        if (rc < 0 && rc != -ENODATA)
                RETURN(rc);
 
+       *type = LNIT_MUL_REF;
        /* The unique linkEA is invalid, even if the ".." name entry may be
         * valid, we still cannot know via which name entry this directory
         * will be referenced. Then handle it as pure orphan. */
@@ -1314,7 +1534,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
                        *retry = true;
                else
                        rc = lfsck_namespace_dsd_orphan(env, com, child,
-                                                       pfid, lh);
+                                                       pfid, lh, type);
 
                GOTO(out, rc);
        }
@@ -1352,7 +1572,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
                        *retry = true;
                else
                        rc = lfsck_namespace_dsd_orphan(env, com, child,
-                                                       pfid, lh);
+                                                       pfid, lh, type);
 
                GOTO(out, rc);
        }
@@ -1392,7 +1612,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
                                                  tfid, cname);
                if (rc == 0)
                        rc = lfsck_namespace_dsd_orphan(env, com, child,
-                                                       pfid, lh);
+                                                       pfid, lh, type);
 
                GOTO(out, rc);
        }
@@ -1591,7 +1811,8 @@ rebuild:
        /* All linkEA entries are invalid and removed, then handle the @child
         * as an orphan.*/
        if (ldata->ld_leh->leh_reccount == 0) {
-               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh);
+               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh,
+                                               type);
 
                RETURN(rc);
        }
@@ -1757,6 +1978,7 @@ lock:
                         *    but no parent references this child
                         *    directory, then handle it as orphan. */
                        lfsck_ibits_unlock(&lh, LCK_EX);
+                       type = LNIT_MUL_REF;
                        snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
                                 "-"DFID, PFID(pfid));
                        rc = lfsck_namespace_insert_orphan(env, com, child,
@@ -1785,7 +2007,8 @@ lock:
                goto lock;
 
        if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
-               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh);
+               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh,
+                                               &type);
 
                GOTO(out, rc);
        }
@@ -1815,6 +2038,9 @@ out:
                case LNIT_UNMATCHED_PAIRS:
                        ns->ln_unmatched_pairs_repaired++;
                        break;
+               case LNIT_MUL_REF:
+                       ns->ln_mul_ref_repaired++;
+                       break;
                default:
                        break;
                }
@@ -2069,8 +2295,10 @@ out:
                if (rc < 0)
                        return rc;
 
-               if (rc > 0)
+               if (rc > 0) {
+                       ns->ln_mul_ref_repaired++;
                        repaired = true;
+               }
        }
 
        rc = dt_attr_get(env, child, la, BYPASS_CAPA);
@@ -2116,6 +2344,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                      "unknown_inconsistency: "LPU64"\n"
                      "unmatched_pairs_repaired: "LPU64"\n"
                      "dangling_repaired: "LPU64"\n"
+                     "multiple_referenced_repaired: "LPU64"\n"
                      "success_count: %u\n"
                      "run_time_phase1: %u seconds\n"
                      "run_time_phase2: %u seconds\n",
@@ -2135,6 +2364,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                      ns->ln_unknown_inconsistency,
                      ns->ln_unmatched_pairs_repaired,
                      ns->ln_dangling_repaired,
+                     ns->ln_mul_ref_repaired,
                      ns->ln_success_count,
                      time_phase1,
                      time_phase2);
@@ -2313,6 +2543,7 @@ static int lfsck_namespace_prep(const struct lu_env *env,
                        ns->ln_unknown_inconsistency = 0;
                        ns->ln_unmatched_pairs_repaired = 0;
                        ns->ln_dangling_repaired = 0;
+                       ns->ln_mul_ref_repaired = 0;
                        fid_zero(&ns->ln_fid_latest_scanned_phase2);
                        if (list_empty(&com->lc_link_dir))
                                list_add_tail(&com->lc_link_dir,