Whamcloud - gitweb
LU-6895 lfsck: not destroy directory when fix FID-in-dirent
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index 1aa9e4a..44ead75 100644 (file)
@@ -59,6 +59,7 @@ enum lfsck_nameentry_check {
 
 static struct lfsck_namespace_req *
 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
+                                  struct lfsck_assistant_object *lso,
                                   struct lu_dirent *ent, __u16 type)
 {
        struct lfsck_namespace_req *lnr;
@@ -70,10 +71,9 @@ lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
                return ERR_PTR(-ENOMEM);
 
        INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
-       lnr->lnr_lar.lar_fid = *lfsck_dto2fid(lfsck->li_obj_dir);
+       lnr->lnr_lar.lar_parent = lfsck_assistant_object_get(lso);
        lnr->lnr_lmv = lfsck_lmv_get(lfsck->li_lmv);
        lnr->lnr_fid = ent->lde_fid;
-       lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
        lnr->lnr_dir_cookie = ent->lde_hash;
        lnr->lnr_attr = ent->lde_attrs;
        lnr->lnr_size = size;
@@ -93,6 +93,7 @@ static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
        if (lnr->lnr_lmv != NULL)
                lfsck_lmv_put(env, lnr->lnr_lmv);
 
+       lfsck_assistant_object_put(env, lar->lar_parent);
        OBD_FREE(lnr, lnr->lnr_size);
 }
 
@@ -556,7 +557,7 @@ static int lfsck_namespace_init(const struct lu_env *env,
  *                     trace file
  * \param[in] add      true if add new flags, otherwise remove flags
  *
- * \retval             0 for succeed or nothing to be done
+ * \retval             0 for success or nothing to be done
  * \retval             negative error number on failure
  */
 int lfsck_namespace_trace_update(const struct lu_env *env,
@@ -904,7 +905,7 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
        struct lfsck_thread_info        *info   = lfsck_env_info(env);
        struct lu_name                  *cname  = &info->lti_name;
        struct dt_insert_rec            *rec    = &info->lti_dt_rec;
-       struct lu_attr                  *la     = &info->lti_la3;
+       struct lu_attr                  *la     = &info->lti_la2;
        const struct lu_fid             *cfid   = lfsck_dto2fid(orphan);
        const struct lu_fid             *pfid;
        struct lu_fid                    tfid;
@@ -2149,7 +2150,7 @@ log:
               "entry for: parent "DFID", child "DFID", name %s, type "
               "in name entry %o, type claimed by child %o. repair it "
               "by %s with new name2 %s: rc = %d\n", lfsck_lfsck2name(lfsck),
-              PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
+              PFID(lfsck_dto2fid(parent)), PFID(cfid),
               name, type, update ? lfsck_object_type(child) : 0,
               update ? "updating" : "removing", name2, rc);
 
@@ -3812,6 +3813,7 @@ static void lfsck_namespace_close_dir(const struct lu_env *env,
 {
        struct lfsck_namespace          *ns     = com->lc_file_ram;
        struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_assistant_object   *lso    = NULL;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct lfsck_lmv                *llmv   = lfsck->li_lmv;
        struct lfsck_namespace_req      *lnr;
@@ -3830,13 +3832,21 @@ static void lfsck_namespace_close_dir(const struct lu_env *env,
                RETURN_EXIT;
        }
 
+       lso = lfsck_assistant_object_init(env, lfsck_dto2fid(lfsck->li_obj_dir),
+                       NULL, lfsck->li_pos_current.lp_oit_cookie, true);
+       if (IS_ERR(lso)) {
+               OBD_FREE(lnr, size);
+               ns->ln_striped_dirs_skipped++;
+
+               RETURN_EXIT;
+       }
+
        /* Generate a dummy request to indicate that all shards' name entry
         * in this striped directory has been scanned for the first time. */
        INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
-       lnr->lnr_lar.lar_fid = *lfsck_dto2fid(lfsck->li_obj_dir);
+       lnr->lnr_lar.lar_parent = lso;
        lnr->lnr_lmv = lfsck_lmv_get(llmv);
        lnr->lnr_fid = *lfsck_dto2fid(lfsck->li_obj_dir);
-       lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
        lnr->lnr_dir_cookie = MDS_DIR_END_OFF;
        lnr->lnr_size = size;
 
@@ -4120,6 +4130,7 @@ out:
 
 static int lfsck_namespace_exec_dir(const struct lu_env *env,
                                    struct lfsck_component *com,
+                                   struct lfsck_assistant_object *lso,
                                    struct lu_dirent *ent, __u16 type)
 {
        struct lfsck_assistant_data     *lad     = com->lc_data;
@@ -4144,7 +4155,7 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env,
        if (unlikely(lfsck_is_dead_obj(lfsck->li_obj_dir)))
                return 0;
 
-       lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
+       lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, lso, ent, type);
        if (IS_ERR(lnr)) {
                struct lfsck_namespace *ns = com->lc_file_ram;
 
@@ -4345,7 +4356,7 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                lfsck_pos_dump(m, &pos, "current_position");
        } else if (ns->ln_status == LS_SCANNING_PHASE2) {
                cfs_duration_t duration = cfs_time_current() -
-                                         lfsck->li_time_last_checkpoint;
+                                         com->lc_time_last_checkpoint;
                __u64 checked = ns->ln_objs_checked_phase2 +
                                com->lc_new_checked;
                __u64 speed1 = ns->ln_items_checked;
@@ -4528,7 +4539,7 @@ static int lfsck_namespace_in_notify(const struct lu_env *env,
        struct lfsck_assistant_data     *lad   = com->lc_data;
        struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
        struct lfsck_tgt_desc           *ltd;
-       int                              rc;
+       int                              rc    = 0;
        bool                             fail  = false;
        ENTRY;
 
@@ -4619,7 +4630,10 @@ log:
                if (IS_ERR(obj))
                        RETURN(PTR_ERR(obj));
 
-               rc = lfsck_namespace_notify_lmv_master_local(env, com, obj);
+               if (likely(dt_object_exists(obj)))
+                       rc = lfsck_namespace_notify_lmv_master_local(env, com,
+                                                                    obj);
+
                lfsck_object_put(env, obj);
 
                RETURN(rc > 0 ? 0 : rc);
@@ -4646,7 +4660,7 @@ log:
               lr->lr_index, lr->lr_status, lr->lr_flags2);
 
        spin_lock(&ltds->ltd_lock);
-       ltd = LTD_TGT(ltds, lr->lr_index);
+       ltd = lfsck_ltd2tgt(ltds, lr->lr_index);
        if (ltd == NULL) {
                spin_unlock(&ltds->ltd_lock);
 
@@ -4972,9 +4986,9 @@ log:
        CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant found dangling "
               "reference for: parent "DFID", child "DFID", type %u, "
               "name %s. %s: rc = %d\n", lfsck_lfsck2name(lfsck),
-              PFID(&lnr->lnr_lar.lar_fid), PFID(lfsck_dto2fid(child)),
+              PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
               type, cname->ln_name,
-              create ? "Create the lost OST-object as required" :
+              create ? "Create the lost MDT-object as required" :
                        "Keep the MDT-object there by default", rc);
 
        if (rc <= 0) {
@@ -4995,6 +5009,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
        struct lfsck_instance      *lfsck    = com->lc_lfsck;
        struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
        struct lfsck_namespace     *ns       = com->lc_file_ram;
+       struct lfsck_assistant_data *lad     = com->lc_data;
        struct linkea_data          ldata    = { NULL };
        const struct lu_name       *cname;
        struct thandle             *handle   = NULL;
@@ -5002,7 +5017,8 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
                        container_of0(lar, struct lfsck_namespace_req, lnr_lar);
        struct dt_object           *dir      = NULL;
        struct dt_object           *obj      = NULL;
-       const struct lu_fid        *pfid;
+       struct lfsck_assistant_object *lso   = lar->lar_parent;
+       const struct lu_fid        *pfid     = &lso->lso_fid;
        struct dt_device           *dev      = NULL;
        struct lustre_handle        lh       = { 0 };
        bool                        repaired = false;
@@ -5011,23 +5027,16 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
        bool                        newdata;
        bool                        log      = false;
        bool                        bad_hash = false;
+       bool                        bad_linkea = false;
        int                         idx      = 0;
        int                         count    = 0;
        int                         rc       = 0;
        enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
        ENTRY;
 
-       dir = lfsck_object_find_bottom(env, lfsck, &lar->lar_fid);
-       if (IS_ERR(dir))
-               RETURN(PTR_ERR(dir));
-
-       if (unlikely(lfsck_is_dead_obj(dir)))
-               GOTO(put_dir, rc = 0);
-
-       if (unlikely(!dt_try_as_dir(env, dir)))
-               GOTO(put_dir, rc = -ENOTDIR);
+       if (lso->lso_dead)
+               RETURN(0);
 
-       pfid = lfsck_dto2fid(dir);
        la->la_nlink = 0;
        if (lnr->lnr_attr & LUDA_UPGRADE) {
                ns->ln_flags |= LF_UPGRADE;
@@ -5068,18 +5077,18 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
        }
 
        if (unlikely(lnr->lnr_dir_cookie == MDS_DIR_END_OFF)) {
-               rc = lfsck_namespace_striped_dir_rescan(env, com, dir, lnr);
+               rc = lfsck_namespace_striped_dir_rescan(env, com, lnr);
 
-               GOTO(put_dir, rc);
+               RETURN(rc);
        }
 
        if (fid_seq_is_dot(fid_seq(&lnr->lnr_fid)))
                GOTO(out, rc = 0);
 
        if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
-               rc = lfsck_namespace_handle_striped_master(env, com, dir, lnr);
+               rc = lfsck_namespace_handle_striped_master(env, com, lnr);
 
-               GOTO(put_dir, rc);
+               RETURN(rc);
        }
 
        idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
@@ -5104,7 +5113,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
                        GOTO(out, rc);
                }
 
-               ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
+               ltd = lfsck_ltd2tgt(&lfsck->li_mdt_descs, idx);
                if (unlikely(ltd == NULL)) {
                        CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
                               "did not join the namespace LFSCK\n",
@@ -5125,6 +5134,15 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
        if (dt_object_exists(obj) == 0) {
 
 dangling:
+               if (dir == NULL) {
+                       dir = lfsck_assistant_object_load(env, lfsck, lso);
+                       if (IS_ERR(dir)) {
+                               rc = PTR_ERR(dir);
+
+                               GOTO(trace, rc == -ENOENT ? 0 : rc);
+                       }
+               }
+
                rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
                if (rc == 0) {
                        if (!lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv,
@@ -5144,7 +5162,7 @@ dangling:
                GOTO(out, rc);
        }
 
-       if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
+       if (!(bk->lb_param & LPF_DRYRUN) && lad->lad_advance_lock) {
 
 again:
                rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
@@ -5169,10 +5187,6 @@ again:
                dtlocked = true;
        }
 
-       rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
-       if (rc != 0)
-               GOTO(stop, rc);
-
        rc = lfsck_links_read(env, obj, &ldata);
        if (unlikely(rc == -ENOENT)) {
                if (handle != NULL) {
@@ -5202,13 +5216,12 @@ again:
                        goto stop;
                }
 
-               ns->ln_flags |= LF_INCONSISTENT;
-
                /* If the name entry hash does not match the slave striped
                 * directory, and the name entry does not match also, then
                 * it is quite possible that name entry is corrupted. */
                if (!lfsck_is_valid_slave_name_entry(env, lnr->lnr_lmv,
                                        lnr->lnr_name, lnr->lnr_namelen)) {
+                       ns->ln_flags |= LF_INCONSISTENT;
                        type = LNIT_BAD_DIRENT;
 
                        GOTO(stop, rc = 0);
@@ -5219,6 +5232,7 @@ again:
                 * not recognize the name entry, then it is quite possible
                 * that the name entry is corrupted. */
                if ((lfsck_object_type(obj) & S_IFMT) != lnr->lnr_type) {
+                       ns->ln_flags |= LF_INCONSISTENT;
                        type = LNIT_BAD_DIRENT;
 
                        GOTO(stop, rc = 0);
@@ -5241,7 +5255,6 @@ again:
                        type = LNIT_BAD_TYPE;
 
                count = 1;
-               ns->ln_flags |= LF_INCONSISTENT;
                /* The magic crashed, we are not sure whether there are more
                 * corrupt data in the linkea, so remove all linkea entries. */
                remove = true;
@@ -5252,20 +5265,47 @@ again:
                        type = LNIT_BAD_TYPE;
 
                count = 1;
-               ns->ln_flags |= LF_UPGRADE;
                remove = false;
                newdata = true;
 
 nodata:
                if (bk->lb_param & LPF_DRYRUN) {
+                       if (rc == -ENODATA)
+                               ns->ln_flags |= LF_UPGRADE;
+                       else
+                               ns->ln_flags |= LF_INCONSISTENT;
                        ns->ln_linkea_repaired++;
                        repaired = true;
                        log = true;
                        goto stop;
                }
 
-               if (!lustre_handle_is_used(&lh))
+               if (!lustre_handle_is_used(&lh)) {
+                       remove = false;
+                       newdata = false;
+                       type = LNIT_NONE;
+
                        goto again;
+               }
+
+               if (dir == NULL) {
+                       dir = lfsck_assistant_object_load(env, lfsck, lso);
+                       if (IS_ERR(dir)) {
+                               rc = PTR_ERR(dir);
+
+                               GOTO(stop, rc == -ENOENT ? 0 : rc);
+                       }
+               }
+
+               rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               bad_linkea = true;
+               if (!remove && newdata)
+                       ns->ln_flags |= LF_UPGRADE;
+               else if (remove || !(ns->ln_flags & LF_UPGRADE))
+                       ns->ln_flags |= LF_INCONSISTENT;
 
                if (remove) {
                        LASSERT(newdata);
@@ -5345,13 +5385,31 @@ out:
                ns->ln_flags |= LF_INCONSISTENT;
 
                log = false;
+               if (dir == NULL) {
+                       dir = lfsck_assistant_object_load(env, lfsck, lso);
+                       if (IS_ERR(dir)) {
+                               rc = PTR_ERR(dir);
+
+                               GOTO(trace, rc == -ENOENT ? 0 : rc);
+                       }
+               }
+
                rc = lfsck_namespace_repair_bad_name_hash(env, com, dir,
                                                lnr->lnr_lmv, lnr->lnr_name);
-               if (rc >= 0)
+               if (rc == 0)
                        bad_hash = true;
        }
 
        if (rc >= 0) {
+               if (type != LNIT_NONE && dir == NULL) {
+                       dir = lfsck_assistant_object_load(env, lfsck, lso);
+                       if (IS_ERR(dir)) {
+                               rc = PTR_ERR(dir);
+
+                               GOTO(trace, rc == -ENOENT ? 0 : rc);
+                       }
+               }
+
                switch (type) {
                case LNIT_BAD_TYPE:
                        log = false;
@@ -5381,12 +5439,12 @@ out:
                        dt_attr_get(env, obj, la);
        }
 
+trace:
        down_write(&com->lc_sem);
        if (rc < 0) {
                CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
                       "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
-                      lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
-                      PFID(lfsck_dto2fid(dir)),
+                      lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid), PFID(pfid),
                       lnr->lnr_namelen, lnr->lnr_name, rc);
 
                lfsck_namespace_record_failure(env, lfsck, ns);
@@ -5403,8 +5461,7 @@ out:
                        CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
                               "repaired the entry: "DFID", parent "DFID
                               ", name %.*s\n", lfsck_lfsck2name(lfsck),
-                              PFID(&lnr->lnr_fid),
-                              PFID(lfsck_dto2fid(dir)),
+                              PFID(&lnr->lnr_fid), PFID(pfid),
                               lnr->lnr_namelen, lnr->lnr_name);
 
                if (repaired) {
@@ -5456,8 +5513,10 @@ out:
        if (obj != NULL && !IS_ERR(obj))
                lfsck_object_put(env, obj);
 
-put_dir:
-       lfsck_object_put(env, dir);
+       if (dir != NULL && !IS_ERR(dir))
+               lfsck_object_put(env, dir);
+
+       lad->lad_advance_lock = bad_linkea;
 
        return rc;
 }
@@ -5812,6 +5871,7 @@ static int lfsck_namespace_rescan_striped_dir(const struct lu_env *env,
                        (struct lu_dirent *)info->lti_key;
        struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
        struct ptlrpc_thread            *thread = &lfsck->li_thread;
+       struct lfsck_assistant_object   *lso    = NULL;
        struct lfsck_namespace_req      *lnr;
        struct lfsck_assistant_req      *lar;
        int                              rc;
@@ -5846,7 +5906,20 @@ static int lfsck_namespace_rescan_striped_dir(const struct lu_env *env,
                if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
                        goto next;
 
-               lnr = lfsck_namespace_assistant_req_init(lfsck, ent, type);
+               if (lso == NULL) {
+                       lso = lfsck_assistant_object_init(env,
+                               lfsck_dto2fid(dir), NULL,
+                               lfsck->li_pos_current.lp_oit_cookie, true);
+                       if (IS_ERR(lso)) {
+                               if (bk->lb_param & LPF_FAILOUT)
+                                       GOTO(out, rc = PTR_ERR(lso));
+
+                               lso = NULL;
+                               goto next;
+                       }
+               }
+
+               lnr = lfsck_namespace_assistant_req_init(lfsck, lso, ent, type);
                if (IS_ERR(lnr)) {
                        if (bk->lb_param & LPF_FAILOUT)
                                GOTO(out, rc = PTR_ERR(lnr));
@@ -5868,6 +5941,9 @@ next:
        } while (rc == 0);
 
 out:
+       if (lso != NULL && !IS_ERR(lso))
+               lfsck_assistant_object_put(env, lso);
+
        lfsck_close_dir(env, lfsck, rc);
        if (rc <= 0)
                RETURN(rc);
@@ -6078,9 +6154,9 @@ static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
        lnr = list_entry(lad->lad_req_list.next,
                         struct lfsck_namespace_req,
                         lnr_lar.lar_list);
-       pos->lp_oit_cookie = lnr->lnr_oit_cookie;
+       pos->lp_oit_cookie = lnr->lnr_lar.lar_parent->lso_oit_cookie;
        pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
-       pos->lp_dir_parent = lnr->lnr_lar.lar_fid;
+       pos->lp_dir_parent = lnr->lnr_lar.lar_parent->lso_fid;
 }
 
 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
@@ -6092,7 +6168,7 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env,
 
        down_write(&com->lc_sem);
        ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
-                               HALF_SEC - lfsck->li_time_last_checkpoint);
+                                 HALF_SEC - com->lc_time_last_checkpoint);
        ns->ln_time_last_checkpoint = cfs_time_current_sec();
        ns->ln_objs_checked_phase2 += com->lc_new_checked;
        com->lc_new_checked = 0;
@@ -6186,7 +6262,7 @@ static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
 
        down_read(&ltds->ltd_rw_sem);
        cfs_foreach_bit(lad->lad_bitmap, idx) {
-               ltd = LTD_TGT(ltds, idx);
+               ltd = lfsck_ltd2tgt(ltds, idx);
                LASSERT(ltd != NULL);
 
                laia->laia_ltd = ltd;