Whamcloud - gitweb
LU-2915 lfsck: LFSCK 1.5 technical debts (3)
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index 9d2950c..273daee 100644 (file)
@@ -428,6 +428,38 @@ static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
 }
 
 /**
+ * \retval ve: removed entries
+ */
+static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
+                                    struct linkea_data *ldata,
+                                    struct lu_name *cname,
+                                    struct lu_fid *pfid)
+{
+       struct link_ea_entry    *oldlee;
+       int                      oldlen;
+       int                      removed = 0;
+
+       linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
+       oldlee = ldata->ld_lee;
+       oldlen = ldata->ld_reclen;
+       linkea_next_entry(ldata);
+       while (ldata->ld_lee != NULL) {
+               ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
+                                  ldata->ld_lee->lee_reclen[1];
+               if (unlikely(ldata->ld_reclen == oldlen &&
+                            memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
+                       linkea_del_buf(ldata, cname);
+                       removed++;
+               } else {
+                       linkea_next_entry(ldata);
+               }
+       }
+       ldata->ld_lee = oldlee;
+       ldata->ld_reclen = oldlen;
+       return removed;
+}
+
+/**
  * \retval +ve repaired
  * \retval 0   no need to repair
  * \retval -ve error cases
@@ -449,7 +481,6 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
        struct thandle          *handle   = NULL;
        bool                     locked   = false;
        bool                     update   = false;
-       int                      count;
        int                      rc;
        ENTRY;
 
@@ -458,6 +489,7 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
 again:
                LASSERT(!locked);
 
+               update = false;
                com->lc_journal = 1;
                handle = dt_trans_create(env, lfsck->li_next);
                if (IS_ERR(handle))
@@ -491,13 +523,14 @@ again:
                GOTO(stop, rc);
        }
 
-       ldata.ld_lee = LINKEA_FIRST_ENTRY(ldata);
-       count = ldata.ld_leh->leh_reccount;
-       while (count-- > 0) {
+       linkea_first_entry(&ldata);
+       while (ldata.ld_lee != NULL) {
                struct dt_object *parent = NULL;
 
-               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname,
-                                   pfid);
+               rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
+               if (rc > 0)
+                       update = true;
+
                if (!fid_is_sane(pfid))
                        goto shrink;
 
@@ -514,7 +547,7 @@ again:
                 *      remote object will be processed in LFSCK phase III. */
                if (dt_object_remote(parent)) {
                        lfsck_object_put(env, parent);
-                       ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+                       linkea_next_entry(&ldata);
                        continue;
                }
 
@@ -536,25 +569,29 @@ again:
                if (rc == 0) {
                        if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
                                lfsck_object_put(env, parent);
-                               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+                               linkea_next_entry(&ldata);
                                continue;
                        }
 
                        goto shrink;
                }
 
+               /* If there is no name entry in the parent dir and the object
+                * link count is less than the linkea entries count, then the
+                * linkea entry should be removed. */
                if (ldata.ld_leh->leh_reccount > la->la_nlink)
                        goto shrink;
 
-               /* XXX: For the case of there is linkea entry, but without name
-                *      entry pointing to the object, and the object link count
-                *      isn't less than the count of name entries, then add the
-                *      name entry back to namespace.
-                *
-                *      It is out of LFSCK 1.5 scope, will implement it in the
-                *      future. Keep the linkEA entry. */
+               /* XXX: For the case of there is a linkea entry, but without
+                *      name entry pointing to the object and its hard links
+                *      count is not less than the object name entries count,
+                *      then seems we should add the 'missed' name entry back
+                *      to namespace, but before LFSCK phase III finished, we
+                *      do not know whether the object has some inconsistency
+                *      on other MDTs. So now, do NOT add the name entry back
+                *      to the namespace, but keep the linkEA entry. LU-2914 */
                lfsck_object_put(env, parent);
-               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+               linkea_next_entry(&ldata);
                continue;
 
 shrink:
@@ -582,8 +619,21 @@ shrink:
        GOTO(stop, rc);
 
 stop:
-       if (locked)
+       if (locked) {
+       /* XXX: For the case linkea entries count does not match the object hard
+        *      links count, we cannot update the later one simply. Before LFSCK
+        *      phase III finished, we cannot know whether there are some remote
+        *      name entries to be repaired or not. LU-2914 */
+               if (rc == 0 && !lfsck_is_dead_obj(child) &&
+                   ldata.ld_leh != NULL &&
+                   ldata.ld_leh->leh_reccount != la->la_nlink)
+                       CWARN("%.16s: the object "DFID" linkEA entry count %u "
+                             "may not match its hardlink count %u\n",
+                             lfsck_lfsck2name(lfsck), PFID(cfid),
+                             ldata.ld_leh->leh_reccount, la->la_nlink);
+
                dt_write_unlock(env, child);
+       }
 
        if (handle != NULL)
                dt_trans_stop(env, lfsck->li_next, handle);
@@ -592,6 +642,7 @@ stop:
                ns->ln_objs_nlink_repaired++;
                rc = 1;
        }
+
        return rc;
 }
 
@@ -794,6 +845,8 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env,
        struct thandle             *handle   = NULL;
        bool                        repaired = false;
        bool                        locked   = false;
+       bool                        remove;
+       bool                        newdata;
        int                         count    = 0;
        int                         rc;
        ENTRY;
@@ -847,75 +900,61 @@ again:
        if (rc == 0) {
                count = ldata.ld_leh->leh_reccount;
                rc = linkea_links_find(&ldata, cname, pfid);
-               if (rc == 0) {
-                       /* For dir, if there are more than one linkea entries,
-                        * then remove all the other redundant linkea entries.*/
-                       if (unlikely(count > 1 &&
-                                    S_ISDIR(lfsck_object_type(obj))))
-                               goto unmatch;
-
+               if ((rc == 0) &&
+                   (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
                        goto record;
-               } else {
-
-unmatch:
-                       ns->ln_flags |= LF_INCONSISTENT;
-                       if (bk->lb_param & LPF_DRYRUN) {
-                               repaired = true;
-                               goto record;
-                       }
 
-                       /*For dir, remove the unmatched linkea entry directly.*/
-                       if (S_ISDIR(lfsck_object_type(obj))) {
-                               if (!com->lc_journal)
-                                       goto again;
-
-                               rc = dt_xattr_del(env, obj, XATTR_NAME_LINK,
-                                                 handle, BYPASS_CAPA);
-                               if (rc != 0)
-                                       GOTO(stop, rc);
-
-                               goto nodata;
-                       } else {
-                               goto add;
-                       }
+               ns->ln_flags |= LF_INCONSISTENT;
+               /* For dir, if there are more than one linkea entries, or the
+                * linkea entry does not match the name entry, then remove all
+                * and add the correct one. */
+               if (S_ISDIR(lfsck_object_type(obj))) {
+                       remove = true;
+                       newdata = true;
+               } else {
+                       remove = false;
+                       newdata = false;
                }
+               goto nodata;
        } else if (unlikely(rc == -EINVAL)) {
+               count = 1;
                ns->ln_flags |= LF_INCONSISTENT;
-               if (bk->lb_param & LPF_DRYRUN) {
-                       count = 1;
-                       repaired = true;
-                       goto record;
-               }
-
-               if (!com->lc_journal)
-                       goto again;
-
                /* The magic crashed, we are not sure whether there are more
                 * corrupt data in the linkea, so remove all linkea entries. */
-               rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
-                                 BYPASS_CAPA);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
+               remove = true;
+               newdata = true;
                goto nodata;
        } else if (rc == -ENODATA) {
+               count = 1;
                ns->ln_flags |= LF_UPGRADE;
+               remove = false;
+               newdata = true;
+
+nodata:
                if (bk->lb_param & LPF_DRYRUN) {
-                       count = 1;
                        repaired = true;
                        goto record;
                }
 
-nodata:
-               rc = linkea_data_new(&ldata,
-                                    &lfsck_env_info(env)->lti_linkea_buf);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-add:
                if (!com->lc_journal)
                        goto again;
 
+               if (remove) {
+                       LASSERT(newdata);
+
+                       rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
+                                         BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+
+               if (newdata) {
+                       rc = linkea_data_new(&ldata,
+                                       &lfsck_env_info(env)->lti_linkea_buf);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+
                rc = linkea_add_buf(&ldata, cname, pfid);
                if (rc != 0)
                        GOTO(stop, rc);
@@ -1366,15 +1405,13 @@ static int lfsck_namespace_double_scan(const struct lu_env *env,
 
                /* XXX: Currently, skip remote object, the consistency for
                 *      remote object will be processed in LFSCK phase III. */
-               if (!dt_object_exists(target) || dt_object_remote(target))
-                       goto obj_put;
-
-               rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
-               if (rc == 0)
-                       rc = lfsck_namespace_double_scan_one(env, com,
-                                                            target, flags);
+               if (dt_object_exists(target) && !dt_object_remote(target)) {
+                       rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
+                       if (rc == 0)
+                               rc = lfsck_namespace_double_scan_one(env, com,
+                                                               target, flags);
+               }
 
-obj_put:
                lfsck_object_put(env, target);
 
 checkpoint:
@@ -1397,27 +1434,27 @@ checkpoint:
                if (rc < 0 && bk->lb_param & LPF_FAILOUT)
                        GOTO(put, rc);
 
-               if (likely(cfs_time_beforeq(cfs_time_current(),
-                                           lfsck->li_time_next_checkpoint)) ||
-                   com->lc_new_checked == 0)
-                       goto speed;
-
-               down_write(&com->lc_sem);
-               ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
+               if (unlikely(cfs_time_beforeq(lfsck->li_time_next_checkpoint,
+                                             cfs_time_current())) &&
+                   com->lc_new_checked != 0) {
+                       down_write(&com->lc_sem);
+                       ns->ln_run_time_phase2 +=
+                               cfs_duration_sec(cfs_time_current() +
                                HALF_SEC - lfsck->li_time_last_checkpoint);
-               ns->ln_time_last_checkpoint = cfs_time_current_sec();
-               ns->ln_objs_checked_phase2 += com->lc_new_checked;
-               com->lc_new_checked = 0;
-               rc = lfsck_namespace_store(env, com, false);
-               up_write(&com->lc_sem);
-               if (rc != 0)
-                       GOTO(put, rc);
-
-               lfsck->li_time_last_checkpoint = cfs_time_current();
-               lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+                       ns->ln_time_last_checkpoint = cfs_time_current_sec();
+                       ns->ln_objs_checked_phase2 += com->lc_new_checked;
+                       com->lc_new_checked = 0;
+                       rc = lfsck_namespace_store(env, com, false);
+                       up_write(&com->lc_sem);
+                       if (rc != 0)
+                               GOTO(put, rc);
+
+                       lfsck->li_time_last_checkpoint = cfs_time_current();
+                       lfsck->li_time_next_checkpoint =
+                               lfsck->li_time_last_checkpoint +
                                cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+               }
 
-speed:
                lfsck_control_speed(lfsck);
                if (unlikely(!thread_is_running(thread)))
                        GOTO(put, rc = 0);
@@ -1543,7 +1580,7 @@ int lfsck_namespace_setup(const struct lu_env *env,
                cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
                break;
        default:
-               CERROR("%s: unknown status: %u\n",
+               CERROR("%s: unknown lfsck_namespace status: %u\n",
                       lfsck_lfsck2name(lfsck), ns->ln_status);
                /* fall through */
        case LS_SCANNING_PHASE1: