Whamcloud - gitweb
LU-5517 lfsck: repair invalid nlink count
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index f91bdcd..3f9d3d7 100644 (file)
@@ -207,9 +207,8 @@ static void lfsck_namespace_record_failure(const struct lu_env *env,
  * \param[in] env      pointer to the thread context
  * \param[in] com      pointer to the lfsck component
  *
- * \retval             positive number for data corruption
  * \retval             0 for success
- * \retval             negative error number on failure
+ * \retval             negative error number on failure or data corruption
  */
 static int lfsck_namespace_load_bitmap(const struct lu_env *env,
                                       struct lfsck_component *com)
@@ -259,14 +258,8 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env,
        rc = dt_xattr_get(env, obj,
                          lfsck_buf_get(env, bitmap->data, size),
                          XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA);
-       if (rc == -ERANGE || rc == -ENODATA || rc == 0)
-               RETURN(1);
-
-       if (rc < 0)
-               RETURN(rc);
-
        if (rc != size)
-               RETURN(rc);
+               RETURN(rc >= 0 ? -EINVAL : rc);
 
        if (cfs_bitmap_check_empty(bitmap))
                lad->lad_incomplete = 0;
@@ -1061,7 +1054,7 @@ log:
  * \param[in] type     the orphan's type to be created
  *
  *  type "P":          The orphan object to be created was a parent directory
- *                     of some DMT-object which linkEA shows that the @orphan
+ *                     of some MDT-object which linkEA shows that the @orphan
  *                     object is missing.
  *
  * \see lfsck_layout_recreate_parent() for more types.
@@ -1160,7 +1153,7 @@ out:
  * \param[in] type     the orphan's type to be created
  *
  *  type "P":          The orphan object to be created was a parent directory
- *                     of some DMT-object which linkEA shows that the @orphan
+ *                     of some MDT-object which linkEA shows that the @orphan
  *                     object is missing.
  *
  * \see lfsck_layout_recreate_parent() for more types.
@@ -1367,7 +1360,7 @@ log:
  * \param[in] orphan   pointer to the orphan MDT-object
  *
  *  type "P":          The orphan object to be created was a parent directory
- *                     of some DMT-object which linkEA shows that the @orphan
+ *                     of some MDT-object which linkEA shows that the @orphan
  *                     object is missing.
  *
  * \see lfsck_layout_recreate_parent() for more types.
@@ -2674,6 +2667,130 @@ next:
 }
 
 /**
+ * Repair the object's nlink attribute.
+ *
+ * If all the known name entries have been verified, then the object's hard
+ * link attribute should match the object's linkEA entries count unless the
+ * object's has too much hard link to be recorded in the linkEA. Such cases
+ * should have been marked in the LFSCK tracing file. Otherwise, trust the
+ * linkEA to update the object's nlink attribute.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the dt_object to be handled
+ * \param[in,out] nlink        pointer to buffer to object's hard lock count before
+ *                     and after the repairing
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_repair_nlink(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       struct dt_object *obj, __u32 *nlink)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_attr                  *la     = &info->lti_la3;
+       struct lu_fid                   *tfid   = &info->lti_fid3;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_bottom;
+       const struct lu_fid             *cfid   = lfsck_dto2fid(obj);
+       struct dt_object                *child  = NULL;
+       struct thandle                  *th     = NULL;
+       struct linkea_data               ldata  = { 0 };
+       struct lustre_handle             lh     = { 0 };
+       __u32                            old    = *nlink;
+       int                              rc     = 0;
+       __u8                             flags;
+       ENTRY;
+
+       LASSERT(!dt_object_remote(obj));
+       LASSERT(S_ISREG(lfsck_object_type(obj)));
+
+       child = lfsck_object_find_by_dev(env, dev, cfid);
+       if (IS_ERR(child))
+               GOTO(log, rc = PTR_ERR(child));
+
+       rc = lfsck_ibits_lock(env, lfsck, child, &lh,
+                             MDS_INODELOCK_UPDATE |
+                             MDS_INODELOCK_XATTR, LCK_EX);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       la->la_valid = LA_NLINK;
+       rc = dt_declare_attr_set(env, child, la, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, child, 0);
+       /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
+        * ever tried to verify some remote MDT-object that resides on this
+        * MDT, but this MDT failed to respond such request. So means there
+        * may be some remote name entry on other MDT that references this
+        * object with another name, so we cannot know whether this linkEA
+        * is valid or not. So keep it there and maybe resolved when next
+        * LFSCK run. */
+       if (ns->ln_flags & LF_INCOMPLETE)
+               GOTO(unlock, rc = 0);
+
+       fid_cpu_to_be(tfid, cfid);
+       rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags,
+                      (const struct dt_key *)tfid, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       if (flags & LNTF_SKIP_NLINK)
+               GOTO(unlock, rc = 0);
+
+       rc = lfsck_links_read2(env, child, &ldata);
+       if (rc == -ENODATA)
+               GOTO(unlock, rc = 0);
+
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       if (*nlink == ldata.ld_leh->leh_reccount)
+               GOTO(unlock, rc = 0);
+
+       la->la_nlink = *nlink = ldata.ld_leh->leh_reccount;
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 1);
+
+       rc = dt_attr_set(env, child, la, th, BYPASS_CAPA);
+
+       GOTO(unlock, rc = (rc == 0 ? 1 : rc));
+
+unlock:
+       dt_write_unlock(env, child);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       lfsck_ibits_unlock(&lh, LCK_EX);
+       if (child != NULL && !IS_ERR(child))
+               lfsck_object_put(env, child);
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK repaired the object "DFID"'s "
+              "nlink count from %u to %u: rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(cfid), old, *nlink, rc);
+
+       if (rc != 0)
+               ns->ln_flags |= LF_INCONSISTENT;
+
+       return rc;
+}
+
+/**
  * Double scan the directory object for namespace LFSCK.
  *
  * This function will verify the <parent, child> pairs in the namespace tree:
@@ -3227,8 +3344,12 @@ out:
                return rc;
 
        if (la->la_nlink != count) {
-               /* XXX: there will be other patch(es) for MDT-object
-                *      hard links verification. */
+               rc = lfsck_namespace_repair_nlink(env, com, child,
+                                                 &la->la_nlink);
+               if (rc > 0) {
+                       ns->ln_objs_nlink_repaired++;
+                       rc = 0;
+               }
        }
 
        if (repaired) {
@@ -3425,17 +3546,17 @@ static int lfsck_namespace_prep(const struct lu_env *env,
        int                      rc;
 
        rc = lfsck_namespace_load_bitmap(env, com);
-       if (rc > 0 || (rc == 0 && ns->ln_status == LS_COMPLETED)) {
+       if (rc != 0 || ns->ln_status == LS_COMPLETED) {
                rc = lfsck_namespace_reset(env, com, false);
                if (rc == 0)
                        rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
-       }
 
-       if (rc != 0) {
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: rc = %d\n",
-                      lfsck_lfsck2name(lfsck), rc);
+               if (rc != 0) {
+                       CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
+                              "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
 
-               return rc;
+                       return rc;
+               }
        }
 
        down_write(&com->lc_sem);
@@ -3681,8 +3802,9 @@ static int lfsck_namespace_post(const struct lu_env *env,
                list_del_init(&com->lc_link_dir);
                list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
        } else if (result == 0) {
-               ns->ln_status = lfsck->li_status;
-               if (ns->ln_status == 0)
+               if (lfsck->li_status != 0)
+                       ns->ln_status = lfsck->li_status;
+               else
                        ns->ln_status = LS_STOPPED;
                if (ns->ln_status != LS_PAUSED) {
                        list_del_init(&com->lc_link_dir);
@@ -3881,9 +4003,27 @@ out:
 static int lfsck_namespace_double_scan(const struct lu_env *env,
                                       struct lfsck_component *com)
 {
-       struct lfsck_namespace *ns = com->lc_file_ram;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc           *ltd;
+       struct lfsck_tgt_desc           *next;
+       int                              rc;
+
+       rc = lfsck_double_scan_generic(env, com, ns->ln_status);
+       if (thread_is_stopped(&lad->lad_thread)) {
+               LASSERT(list_empty(&lad->lad_req_list));
+               LASSERT(list_empty(&lad->lad_mdt_phase1_list));
 
-       return lfsck_double_scan_generic(env, com, ns->ln_status);
+               spin_lock(&ltds->ltd_lock);
+               list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+                                        ltd_namespace_phase_list) {
+                       list_del_init(&ltd->ltd_namespace_phase_list);
+               }
+               spin_unlock(&ltds->ltd_lock);
+       }
+
+       return rc;
 }
 
 static void lfsck_namespace_data_release(const struct lu_env *env,
@@ -3916,14 +4056,44 @@ static void lfsck_namespace_data_release(const struct lu_env *env,
        }
        spin_unlock(&ltds->ltd_lock);
 
-       CFS_FREE_BITMAP(lad->lad_bitmap);
+       if (likely(lad->lad_bitmap != NULL))
+               CFS_FREE_BITMAP(lad->lad_bitmap);
 
        OBD_FREE_PTR(lad);
 }
 
+static void lfsck_namespace_quit(const struct lu_env *env,
+                                struct lfsck_component *com)
+{
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc           *ltd;
+       struct lfsck_tgt_desc           *next;
+
+       LASSERT(lad != NULL);
+
+       lfsck_quit_generic(env, com);
+
+       LASSERT(thread_is_init(&lad->lad_thread) ||
+               thread_is_stopped(&lad->lad_thread));
+       LASSERT(list_empty(&lad->lad_req_list));
+
+       spin_lock(&ltds->ltd_lock);
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
+                                ltd_namespace_phase_list) {
+               list_del_init(&ltd->ltd_namespace_phase_list);
+       }
+       list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
+                                ltd_namespace_phase_list) {
+               list_del_init(&ltd->ltd_namespace_phase_list);
+       }
+       spin_unlock(&ltds->ltd_lock);
+}
+
 static int lfsck_namespace_in_notify(const struct lu_env *env,
                                     struct lfsck_component *com,
-                                    struct lfsck_request *lr)
+                                    struct lfsck_request *lr,
+                                    struct thandle *th)
 {
        struct lfsck_instance           *lfsck = com->lc_lfsck;
        struct lfsck_namespace          *ns    = com->lc_file_ram;
@@ -3966,6 +4136,70 @@ out_create:
 
                return rc;
        }
+       case LE_SKIP_NLINK_DECLARE: {
+               struct dt_object        *obj   = com->lc_obj;
+               struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
+               __u8                     flags = 0;
+
+               LASSERT(th != NULL);
+
+               rc = dt_declare_delete(env, obj,
+                                      (const struct dt_key *)key, th);
+               if (rc == 0)
+                       rc = dt_declare_insert(env, obj,
+                                              (const struct dt_rec *)&flags,
+                                              (const struct dt_key *)key, th);
+
+               RETURN(rc);
+       }
+       case LE_SKIP_NLINK: {
+               struct dt_object        *obj   = com->lc_obj;
+               struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
+               __u8                     flags = 0;
+               bool                     exist = false;
+               ENTRY;
+
+               LASSERT(th != NULL);
+
+               fid_cpu_to_be(key, &lr->lr_fid);
+               rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
+                              (const struct dt_key *)key, BYPASS_CAPA);
+               if (rc == 0) {
+                       if (flags & LNTF_SKIP_NLINK)
+                               RETURN(0);
+
+                       exist = true;
+               } else if (rc != -ENOENT) {
+                       GOTO(log, rc);
+               }
+
+               flags |= LNTF_SKIP_NLINK;
+               if (exist) {
+                       rc = dt_delete(env, obj, (const struct dt_key *)key,
+                                      th, BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(log, rc);
+               }
+
+               rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
+                              (const struct dt_key *)key, th, BYPASS_CAPA, 1);
+
+               GOTO(log, rc);
+
+log:
+               CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
+                      " to be skipped for namespace double scan: rc = %d\n",
+                      lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
+
+               if (rc != 0)
+                       /* If we cannot record this object in the LFSCK tracing,
+                        * we have to mark the LFSC as LF_INCOMPLETE, then the
+                        * LFSCK will skip nlink attribute verification for
+                        * all objects. */
+                       ns->ln_flags |= LF_INCOMPLETE;
+
+               return 0;
+       }
        case LE_PHASE1_DONE:
        case LE_PHASE2_DONE:
        case LE_PEER_EXIT:
@@ -4063,7 +4297,7 @@ static struct lfsck_operations lfsck_namespace_ops = {
        .lfsck_dump             = lfsck_namespace_dump,
        .lfsck_double_scan      = lfsck_namespace_double_scan,
        .lfsck_data_release     = lfsck_namespace_data_release,
-       .lfsck_quit             = lfsck_quit_generic,
+       .lfsck_quit             = lfsck_namespace_quit,
        .lfsck_in_notify        = lfsck_namespace_in_notify,
        .lfsck_query            = lfsck_namespace_query,
 };
@@ -4514,6 +4748,33 @@ nodata:
                        GOTO(stop, rc);
 
                rc = lfsck_links_write(env, obj, &ldata, handle);
+               if (unlikely(rc == -ENOSPC) &&
+                   S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) {
+                       if (handle != NULL) {
+                               LASSERT(dt_write_locked(env, obj));
+
+                               dt_write_unlock(env, obj);
+                               dtlocked = false;
+
+                               dt_trans_stop(env, dev, handle);
+                               handle = NULL;
+
+                               lfsck_ibits_unlock(&lh, LCK_EX);
+                       }
+
+                       rc = lfsck_namespace_trace_update(env, com,
+                                       &lnr->lnr_fid, LNTF_SKIP_NLINK, true);
+                       if (rc != 0)
+                               /* If we cannot record this object in the
+                                * LFSCK tracing, we have to mark the LFSCK
+                                * as LF_INCOMPLETE, then the LFSCK will
+                                * skip nlink attribute verification for
+                                * all objects. */
+                               ns->ln_flags |= LF_INCOMPLETE;
+
+                       GOTO(out, rc = 0);
+               }
+
                if (rc != 0)
                        GOTO(stop, rc);
 
@@ -4832,8 +5093,9 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env,
                ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
                ns->ln_success_count++;
        } else if (rc == 0) {
-               ns->ln_status = lfsck->li_status;
-               if (ns->ln_status == 0)
+               if (lfsck->li_status != 0)
+                       ns->ln_status = lfsck->li_status;
+               else
                        ns->ln_status = LS_STOPPED;
        } else {
                ns->ln_status = LS_FAILED;
@@ -4937,7 +5199,7 @@ out:
        if (rc != 0)
                CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
                       "to sync failure with MDTs, and related MDTs "
-                      "may handle orphan un-properly: rc = %d\n",
+                      "may handle orphan improperly: rc = %d\n",
                       lfsck_lfsck2name(lfsck), rc);
 
        EXIT;