Whamcloud - gitweb
LU-1267 lfsck: framework (3) for MDT-OST consistency
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index 01bead5..d030528 100644 (file)
@@ -134,16 +134,15 @@ static int lfsck_namespace_load(const struct lu_env *env,
                lfsck_namespace_le_to_cpu(ns,
                                (struct lfsck_namespace *)com->lc_file_disk);
                if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
-                       CWARN("%.16s: invalid lfsck_namespace magic "
-                             "0x%x != 0x%x\n",
-                             lfsck_lfsck2name(com->lc_lfsck),
-                             ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
+                       CWARN("%s: invalid lfsck_namespace magic %#x != %#x\n",
+                             lfsck_lfsck2name(com->lc_lfsck), ns->ln_magic,
+                             LFSCK_NAMESPACE_MAGIC);
                        rc = 1;
                } else {
                        rc = 0;
                }
        } else if (rc != -ENODATA) {
-               CERROR("%.16s: fail to load lfsck_namespace, expected = %d, "
+               CERROR("%s: fail to load lfsck_namespace: expected = %d, "
                       "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck), len, rc);
                if (rc >= 0)
                        rc = 1;
@@ -166,8 +165,8 @@ static int lfsck_namespace_store(const struct lu_env *env,
        handle = dt_trans_create(env, lfsck->li_bottom);
        if (IS_ERR(handle)) {
                rc = PTR_ERR(handle);
-               CERROR("%.16s: fail to create trans for storing "
-                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               CERROR("%s: fail to create trans for storing lfsck_namespace: "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
                RETURN(rc);
        }
 
@@ -175,15 +174,15 @@ static int lfsck_namespace_store(const struct lu_env *env,
                                  lfsck_buf_get(env, com->lc_file_disk, len),
                                  XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
        if (rc != 0) {
-               CERROR("%.16s: fail to declare trans for storing "
-                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               CERROR("%s: fail to declare trans for storing lfsck_namespace: "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
                GOTO(out, rc);
        }
 
        rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
        if (rc != 0) {
-               CERROR("%.16s: fail to start trans for storing "
-                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               CERROR("%s: fail to start trans for storing lfsck_namespace: "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
                GOTO(out, rc);
        }
 
@@ -193,7 +192,7 @@ static int lfsck_namespace_store(const struct lu_env *env,
                          init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
                          handle, BYPASS_CAPA);
        if (rc != 0)
-               CERROR("%.16s: fail to store lfsck_namespace, len = %d, "
+               CERROR("%s: fail to store lfsck_namespace: len = %d, "
                       "rc = %d\n", lfsck_lfsck2name(lfsck), len, rc);
 
        GOTO(out, rc);
@@ -206,7 +205,7 @@ out:
 static int lfsck_namespace_init(const struct lu_env *env,
                                struct lfsck_component *com)
 {
-       struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace *ns = com->lc_file_ram;
        int rc;
 
        memset(ns, 0, sizeof(*ns));
@@ -316,7 +315,7 @@ static int lfsck_namespace_update(const struct lu_env *env,
                rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
                               BYPASS_CAPA);
                if (rc != 0) {
-                       CERROR("%s: fail to insert "DFID", rc = %d\n",
+                       CERROR("%s: fail to insert "DFID": rc = %d\n",
                               lfsck_lfsck2name(com->lc_lfsck), PFID(fid), rc);
                        GOTO(out, rc);
                }
@@ -473,8 +472,7 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
        struct lu_fid            *cfid    = &info->lti_fid2;
        struct lfsck_instance   *lfsck    = com->lc_lfsck;
        struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns       =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns       = com->lc_file_ram;
        struct linkea_data       ldata    = { 0 };
        struct thandle          *handle   = NULL;
        bool                     locked   = false;
@@ -625,7 +623,7 @@ stop:
                if (rc == 0 && !lfsck_is_dead_obj(child) &&
                    ldata.ld_leh != NULL &&
                    ldata.ld_leh->leh_reccount != la->la_nlink)
-                       CWARN("%.16s: the object "DFID" linkEA entry count %u "
+                       CWARN("%s: the object "DFID" linkEA entry count %u "
                              "may not match its hardlink count %u\n",
                              lfsck_lfsck2name(lfsck), PFID(cfid),
                              ldata.ld_leh->leh_reccount, la->la_nlink);
@@ -650,12 +648,21 @@ static int lfsck_namespace_reset(const struct lu_env *env,
                                 struct lfsck_component *com, bool init)
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns    = com->lc_file_ram;
+       struct dt_object        *root;
        struct dt_object        *dto;
        int                      rc;
        ENTRY;
 
+       root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
+       if (IS_ERR(root))
+               RETURN(PTR_ERR(root));
+
+       if (unlikely(!dt_try_as_dir(env, root))) {
+               lu_object_put(env, &root->do_lu);
+               RETURN(-ENOTDIR);
+       }
+
        down_write(&com->lc_sem);
        if (init) {
                memset(ns, 0, sizeof(*ns));
@@ -670,12 +677,14 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
 
-       rc = local_object_unlink(env, lfsck->li_bottom, lfsck->li_local_root,
+       rc = local_object_unlink(env, lfsck->li_bottom, root,
                                 lfsck_namespace_name);
        if (rc != 0)
                GOTO(out, rc);
 
-       dto = local_index_find_or_create(env, lfsck->li_los, lfsck->li_local_root,
+       lfsck_object_put(env, com->lc_obj);
+       com->lc_obj = NULL;
+       dto = local_index_find_or_create(env, lfsck->li_los, root,
                                         lfsck_namespace_name,
                                         S_IFREG | S_IRUGO | S_IWUSR,
                                         &dt_lfsck_features);
@@ -693,6 +702,7 @@ static int lfsck_namespace_reset(const struct lu_env *env,
 
 out:
        up_write(&com->lc_sem);
+       lu_object_put(env, &root->do_lu);
        return rc;
 }
 
@@ -700,7 +710,7 @@ static void
 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
                     bool new_checked)
 {
-       struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace *ns = com->lc_file_ram;
 
        down_write(&com->lc_sem);
        if (new_checked)
@@ -716,8 +726,7 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env,
                                      struct lfsck_component *com, bool init)
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
 
        if (com->lc_new_checked == 0 && !init)
@@ -746,8 +755,7 @@ static int lfsck_namespace_prep(const struct lu_env *env,
                                struct lfsck_component *com)
 {
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct lfsck_namespace  *ns     =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
        struct lfsck_position   *pos    = &com->lc_pos_start;
 
        if (ns->ln_status == LS_COMPLETED) {
@@ -833,11 +841,9 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env,
        struct lu_attr             *la       = &info->lti_la;
        struct lfsck_instance      *lfsck    = com->lc_lfsck;
        struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace     *ns       =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace     *ns       = com->lc_file_ram;
        struct linkea_data          ldata    = { 0 };
-       const struct lu_fid        *pfid     =
-                               lu_object_fid(&lfsck->li_obj_dir->do_lu);
+       const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
        const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
        const struct lu_name       *cname;
        struct thandle             *handle   = NULL;
@@ -1012,10 +1018,16 @@ out:
                if (!(bk->lb_param & LPF_FAILOUT))
                        rc = 0;
        } else {
-               if (repaired)
+               if (repaired) {
                        ns->ln_items_repaired++;
-               else
+                       if (bk->lb_param & LPF_DRYRUN &&
+                           lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
+                               lfsck_pos_fill(env, lfsck,
+                                              &ns->ln_pos_first_inconsistent,
+                                              false);
+               } else {
                        com->lc_journal = 0;
+               }
                rc = 0;
        }
        up_write(&com->lc_sem);
@@ -1027,8 +1039,7 @@ static int lfsck_namespace_post(const struct lu_env *env,
                                int result, bool init)
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
 
        down_write(&com->lc_sem);
@@ -1080,8 +1091,7 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
        struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      save  = len;
        int                      ret   = -ENOSPC;
        int                      rc;
@@ -1089,12 +1099,12 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
        down_read(&com->lc_sem);
        rc = snprintf(buf, len,
                      "name: lfsck_namespace\n"
-                     "magic: 0x%x\n"
+                     "magic: %#x\n"
                      "version: %d\n"
                      "status: %s\n",
                      ns->ln_magic,
                      bk->lb_version,
-                     lfsck_status_names[ns->ln_status]);
+                     lfsck_status2names(ns->ln_status));
        if (rc <= 0)
                goto out;
 
@@ -1214,7 +1224,7 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                                pos.lp_dir_cookie = 0;
                        } else {
                                pos.lp_dir_parent =
-                               *lu_object_fid(&lfsck->li_obj_dir->do_lu);
+                                       *lfsck_dto2fid(lfsck->li_obj_dir);
                        }
                } else {
                        fid_zero(&pos.lp_dir_parent);
@@ -1337,14 +1347,15 @@ out:
        return ret;
 }
 
-static int lfsck_namespace_double_scan(const struct lu_env *env,
-                                      struct lfsck_component *com)
+static int lfsck_namespace_double_scan_main(void *args)
 {
+       struct lfsck_thread_args *lta   = args;
+       const struct lu_env     *env    = &lta->lta_env;
+       struct lfsck_component  *com    = lta->lta_com;
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
        struct ptlrpc_thread    *thread = &lfsck->li_thread;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns     =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
        struct dt_object        *obj    = com->lc_obj;
        const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
        struct dt_object        *target;
@@ -1352,17 +1363,18 @@ static int lfsck_namespace_double_scan(const struct lu_env *env,
        struct dt_key           *key;
        struct lu_fid            fid;
        int                      rc;
-       __u8                     flags;
+       __u8                     flags = 0;
        ENTRY;
 
-       lfsck->li_new_scanned = 0;
-       lfsck->li_time_last_checkpoint = cfs_time_current();
-       lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+       com->lc_new_checked = 0;
+       com->lc_new_scanned = 0;
+       com->lc_time_last_checkpoint = cfs_time_current();
+       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
                                cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
 
        di = iops->init(env, obj, 0, BYPASS_CAPA);
        if (IS_ERR(di))
-               RETURN(PTR_ERR(di));
+               GOTO(out, rc = PTR_ERR(di));
 
        fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
        rc = iops->get(env, di, (const struct dt_key *)&fid);
@@ -1413,8 +1425,8 @@ static int lfsck_namespace_double_scan(const struct lu_env *env,
                lfsck_object_put(env, target);
 
 checkpoint:
-               lfsck->li_new_scanned++;
                com->lc_new_checked++;
+               com->lc_new_scanned++;
                ns->ln_fid_latest_scanned_phase2 = fid;
                if (rc > 0)
                        ns->ln_objs_repaired_phase2++;
@@ -1432,13 +1444,13 @@ checkpoint:
                if (rc < 0 && bk->lb_param & LPF_FAILOUT)
                        GOTO(put, rc);
 
-               if (unlikely(cfs_time_beforeq(lfsck->li_time_next_checkpoint,
+               if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
                                              cfs_time_current())) &&
                    com->lc_new_checked != 0) {
                        down_write(&com->lc_sem);
                        ns->ln_run_time_phase2 +=
                                cfs_duration_sec(cfs_time_current() +
-                               HALF_SEC - lfsck->li_time_last_checkpoint);
+                               HALF_SEC - com->lc_time_last_checkpoint);
                        ns->ln_time_last_checkpoint = cfs_time_current_sec();
                        ns->ln_objs_checked_phase2 += com->lc_new_checked;
                        com->lc_new_checked = 0;
@@ -1447,13 +1459,13 @@ checkpoint:
                        if (rc != 0)
                                GOTO(put, rc);
 
-                       lfsck->li_time_last_checkpoint = cfs_time_current();
-                       lfsck->li_time_next_checkpoint =
-                               lfsck->li_time_last_checkpoint +
+                       com->lc_time_last_checkpoint = cfs_time_current();
+                       com->lc_time_next_checkpoint =
+                               com->lc_time_last_checkpoint +
                                cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
                }
 
-               lfsck_control_speed(lfsck);
+               lfsck_control_speed_by_self(com);
                if (unlikely(!thread_is_running(thread)))
                        GOTO(put, rc = 0);
 
@@ -1467,6 +1479,8 @@ put:
 
 fini:
        iops->fini(env, di);
+
+out:
        down_write(&com->lc_sem);
 
        ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
@@ -1479,8 +1493,7 @@ fini:
                com->lc_journal = 0;
                ns->ln_status = LS_COMPLETED;
                if (!(bk->lb_param & LPF_DRYRUN))
-                       ns->ln_flags &=
-                       ~(LF_SCANNED_ONCE | LF_INCONSISTENT | LF_UPGRADE);
+                       ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
                ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
                ns->ln_success_count++;
        } else if (rc == 0) {
@@ -1502,9 +1515,45 @@ fini:
        rc = lfsck_namespace_store(env, com, false);
 
        up_write(&com->lc_sem);
+       if (atomic_dec_and_test(&lfsck->li_double_scan_count))
+               wake_up_all(&thread->t_ctl_waitq);
+
+       lfsck_thread_args_fini(lta);
+
        return rc;
 }
 
+static int lfsck_namespace_double_scan(const struct lu_env *env,
+                                      struct lfsck_component *com)
+{
+       struct lfsck_instance           *lfsck = com->lc_lfsck;
+       struct lfsck_namespace          *ns    = com->lc_file_ram;
+       struct lfsck_thread_args        *lta;
+       long                             rc;
+       ENTRY;
+
+       if (unlikely(ns->ln_status != LS_SCANNING_PHASE2))
+               RETURN(0);
+
+       lta = lfsck_thread_args_init(lfsck, com);
+       if (IS_ERR(lta))
+               RETURN(PTR_ERR(lta));
+
+       atomic_inc(&lfsck->li_double_scan_count);
+       rc = PTR_ERR(kthread_run(lfsck_namespace_double_scan_main, lta,
+                                "lfsck_namespace"));
+       if (IS_ERR_VALUE(rc)) {
+               CERROR("%s: cannot start LFSCK namespace thread: rc = %ld\n",
+                      lfsck_lfsck2name(lfsck), rc);
+               atomic_dec(&lfsck->li_double_scan_count);
+               lfsck_thread_args_fini(lta);
+       } else {
+               rc = 0;
+       }
+
+       RETURN(rc);
+}
+
 static struct lfsck_operations lfsck_namespace_ops = {
        .lfsck_reset            = lfsck_namespace_reset,
        .lfsck_fail             = lfsck_namespace_fail,
@@ -1522,6 +1571,7 @@ int lfsck_namespace_setup(const struct lu_env *env,
 {
        struct lfsck_component  *com;
        struct lfsck_namespace  *ns;
+       struct dt_object        *root = NULL;
        struct dt_object        *obj;
        int                      rc;
        ENTRY;
@@ -1548,8 +1598,14 @@ int lfsck_namespace_setup(const struct lu_env *env,
        if (com->lc_file_disk == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       obj = local_index_find_or_create(env, lfsck->li_los,
-                                        lfsck->li_local_root,
+       root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
+       if (IS_ERR(root))
+               GOTO(out, rc = PTR_ERR(root));
+
+       if (unlikely(!dt_try_as_dir(env, root)))
+               GOTO(out, rc = -ENOTDIR);
+
+       obj = local_index_find_or_create(env, lfsck->li_los, root,
                                         lfsck_namespace_name,
                                         S_IFREG | S_IRUGO | S_IWUSR,
                                         &dt_lfsck_features);
@@ -1569,16 +1625,18 @@ int lfsck_namespace_setup(const struct lu_env *env,
        if (rc != 0)
                GOTO(out, rc);
 
-       ns = (struct lfsck_namespace *)com->lc_file_ram;
+       ns = com->lc_file_ram;
        switch (ns->ln_status) {
        case LS_INIT:
        case LS_COMPLETED:
        case LS_FAILED:
        case LS_STOPPED:
+               spin_lock(&lfsck->li_lock);
                cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+               spin_unlock(&lfsck->li_lock);
                break;
        default:
-               CERROR("%s: unknown lfsck_namespace status: %u\n",
+               CERROR("%s: unknown lfsck_namespace status: rc = %u\n",
                       lfsck_lfsck2name(lfsck), ns->ln_status);
                /* fall through */
        case LS_SCANNING_PHASE1:
@@ -1590,14 +1648,18 @@ int lfsck_namespace_setup(const struct lu_env *env,
                /* fall through */
        case LS_PAUSED:
        case LS_CRASHED:
+               spin_lock(&lfsck->li_lock);
                cfs_list_add_tail(&com->lc_link, &lfsck->li_list_scan);
                cfs_list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
+               spin_unlock(&lfsck->li_lock);
                break;
        }
 
        GOTO(out, rc = 0);
 
 out:
+       if (root != NULL && !IS_ERR(root))
+               lu_object_put(env, &root->do_lu);
        if (rc != 0)
                lfsck_component_cleanup(env, com);
        return rc;