Whamcloud - gitweb
LU-5914 lfsck: dt_lookup() LBUG
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index 6641db8..8352fb7 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2013, 2014, Intel Corporation.
  */
 /*
  * lustre/lfsck/lfsck_namespace.c
@@ -319,9 +319,19 @@ static int lfsck_namespace_load_bitmap(const struct lu_env *env,
 }
 
 /**
- * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
- * \retval 0: succeed.
- * \retval -ve: failed cases.
+ * Load namespace LFSCK statistics information from the trace file.
+ *
+ * For old release (Lustre-2.6 or older), the statistics information was
+ * stored as XATTR_NAME_LFSCK_NAMESPACE_OLD EA. But in Lustre-2.7, we need
+ * more statistics information. To avoid confusing old MDT when downgrade,
+ * Lustre-2.7 stores the namespace LFSCK statistics information as new
+ * XATTR_NAME_LFSCK_NAMESPACE EA.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
  */
 static int lfsck_namespace_load(const struct lu_env *env,
                                struct lfsck_component *com)
@@ -341,7 +351,7 @@ static int lfsck_namespace_load(const struct lu_env *env,
                        CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
                               "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
                               ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
-                       rc = 1;
+                       rc = -ESTALE;
                } else {
                        rc = 0;
                }
@@ -350,13 +360,22 @@ static int lfsck_namespace_load(const struct lu_env *env,
                       "expected = %d: rc = %d\n",
                       lfsck_lfsck2name(com->lc_lfsck), len, rc);
                if (rc >= 0)
-                       rc = 1;
+                       rc = -ESTALE;
+       } else {
+               /* Check whether it is old trace file or not.
+                * If yes, it should be reset via returning -ESTALE. */
+               rc = dt_xattr_get(env, com->lc_obj,
+                                 lfsck_buf_get(env, com->lc_file_disk, len),
+                                 XATTR_NAME_LFSCK_NAMESPACE_OLD, BYPASS_CAPA);
+               if (rc >= 0)
+                       rc = -ESTALE;
        }
+
        return rc;
 }
 
 static int lfsck_namespace_store(const struct lu_env *env,
-                                struct lfsck_component *com)
+                                struct lfsck_component *com, bool init)
 {
        struct dt_object                *obj    = com->lc_obj;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
@@ -367,6 +386,9 @@ static int lfsck_namespace_store(const struct lu_env *env,
        __u32                            nbits  = 0;
        int                              len    = com->lc_file_size;
        int                              rc;
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
+       struct lu_buf            tbuf   = { &len, sizeof(len) };
+#endif
        ENTRY;
 
        if (lad != NULL) {
@@ -398,6 +420,20 @@ static int lfsck_namespace_store(const struct lu_env *env,
                        GOTO(out, rc);
        }
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
+       /* To be compatible with old Lustre-2.x MDT (x <= 6), generate dummy
+        * XATTR_NAME_LFSCK_NAMESPACE_OLD EA, then when downgrade to Lustre-2.x,
+        * the old LFSCK will find "invalid" XATTR_NAME_LFSCK_NAMESPACE_OLD EA,
+        * then reset the namespace LFSCK trace file. */
+       if (init) {
+               rc = dt_declare_xattr_set(env, obj, &tbuf,
+                                         XATTR_NAME_LFSCK_NAMESPACE_OLD,
+                                         LU_XATTR_CREATE, handle);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
+#endif
+
        rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
        if (rc != 0)
                GOTO(out, rc);
@@ -411,6 +447,13 @@ static int lfsck_namespace_store(const struct lu_env *env,
                                  XATTR_NAME_LFSCK_BITMAP, 0, handle,
                                  BYPASS_CAPA);
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 8, 53, 0)
+       if (rc == 0 && init)
+               rc = dt_xattr_set(env, obj, &tbuf,
+                                 XATTR_NAME_LFSCK_NAMESPACE_OLD,
+                                 LU_XATTR_CREATE, handle, BYPASS_CAPA);
+#endif
+
        GOTO(out, rc);
 
 out:
@@ -433,7 +476,7 @@ static int lfsck_namespace_init(const struct lu_env *env,
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
        down_write(&com->lc_sem);
-       rc = lfsck_namespace_store(env, com);
+       rc = lfsck_namespace_store(env, com, true);
        up_write(&com->lc_sem);
        return rc;
 }
@@ -954,10 +997,8 @@ static int lfsck_namespace_insert_orphan(const struct lu_env *env,
                }
        }
 
-       if (rc != 0)
-               GOTO(unlock, rc);
-
-       rc = dt_attr_set(env, orphan, la, th, BYPASS_CAPA);
+       if (rc == 0)
+               rc = dt_attr_set(env, orphan, la, th, BYPASS_CAPA);
 
        GOTO(stop, rc = (rc == 0 ? 1 : rc));
 
@@ -1202,10 +1243,13 @@ static int lfsck_namespace_create_orphan_remote(const struct lu_env *env,
        rc = ptlrpc_queue_wait(req);
        ptlrpc_req_finished(req);
 
-       if (rc == 0)
+       if (rc == 0) {
+               orphan->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
                rc = 1;
-       else if (rc == -EEXIST)
+       } else if (rc == -EEXIST) {
+               orphan->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
                rc = 0;
+       }
 
        GOTO(out, rc);
 
@@ -1929,8 +1973,6 @@ int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
        int                              rc     = 0;
        ENTRY;
 
-       LASSERT(!dt_object_remote(obj));
-
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
                GOTO(log, rc = PTR_ERR(th));
@@ -2554,6 +2596,9 @@ lost_parent:
                GOTO(out, rc);
        }
 
+       if (fid_is_zero(pfid))
+               GOTO(out, rc = 0);
+
        /* The ".." name entry is wrong, update it. */
        if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
                if (!lustre_handle_is_used(lh) && retry != NULL) {
@@ -2619,7 +2664,8 @@ lfsck_namespace_dsd_multiple(const struct lu_env *env,
        struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
        struct dt_object         *parent        = NULL;
        struct linkea_data        ldata_new     = { 0 };
-       int                       count         = 0;
+       int                       dirent_count  = 0;
+       int                       linkea_count  = 0;
        int                       rc            = 0;
        bool                      once          = true;
        ENTRY;
@@ -2633,6 +2679,7 @@ again:
                /* Drop invalid linkEA entry. */
                if (!fid_is_sane(tfid)) {
                        linkea_del_buf(ldata, cname);
+                       linkea_count++;
                        continue;
                }
 
@@ -2666,6 +2713,7 @@ again:
                                 * child to be visible via other parent, then
                                 * remove this linkEA entry. */
                                linkea_del_buf(ldata, cname);
+                               linkea_count++;
                                continue;
                        }
 
@@ -2676,6 +2724,7 @@ again:
                if (unlikely(!dt_try_as_dir(env, parent))) {
                        lfsck_object_put(env, parent);
                        linkea_del_buf(ldata, cname);
+                       linkea_count++;
                        continue;
                }
 
@@ -2723,6 +2772,7 @@ rebuild:
                                RETURN(rc);
 
                        linkea_del_buf(ldata, cname);
+                       linkea_count++;
                        linkea_first_entry(ldata);
                        /* There may be some invalid dangling name entries under
                         * other parent directories, remove all of them. */
@@ -2759,13 +2809,13 @@ rebuild:
                                        goto next;
                                }
 
-                               count += rc;
+                               dirent_count += rc;
 
 next:
                                linkea_del_buf(ldata, cname);
                        }
 
-                       ns->ln_dirent_repaired += count;
+                       ns->ln_dirent_repaired += dirent_count;
 
                        RETURN(rc);
                }
@@ -2786,10 +2836,15 @@ next:
                linkea_del_buf(ldata, cname);
        }
 
+       linkea_first_entry(ldata);
        if (ldata->ld_leh->leh_reccount == 1) {
                rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
                                                lh, type, NULL);
 
+               if (rc == 0 && fid_is_zero(pfid) && linkea_count > 0)
+                       rc = lfsck_namespace_rebuild_linkea(env, com, child,
+                                                           ldata);
+
                RETURN(rc);
        }
 
@@ -2802,7 +2857,6 @@ next:
                RETURN(rc);
        }
 
-       linkea_first_entry(ldata);
        /* If the dangling name entry for the orphan directory object has
         * been remvoed, then just check whether the directory object is
         * still under the .lustre/lost+found/MDTxxxx/ or not. */
@@ -3071,6 +3125,8 @@ lock:
        } else if (lfsck->li_lpf_obj != NULL &&
                   lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
                lpf = true;
+       } else if (unlikely(!fid_is_sane(pfid))) {
+               fid_zero(pfid);
        }
 
        rc = lfsck_links_read(env, child, &ldata);
@@ -3717,7 +3773,7 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        lad->lad_incomplete = 0;
        CFS_RESET_BITMAP(lad->lad_bitmap);
 
-       rc = lfsck_namespace_store(env, com);
+       rc = lfsck_namespace_store(env, com, true);
 
        GOTO(out, rc);
 
@@ -3854,7 +3910,7 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env,
                com->lc_new_checked = 0;
        }
 
-       rc = lfsck_namespace_store(env, com);
+       rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
 
 log:
@@ -4092,9 +4148,28 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env,
                                    struct lfsck_component *com,
                                    struct lu_dirent *ent, __u16 type)
 {
-       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_assistant_data     *lad     = com->lc_data;
+       struct lfsck_instance           *lfsck   = com->lc_lfsck;
        struct lfsck_namespace_req      *lnr;
-       bool                             wakeup = false;
+       struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
+       struct ptlrpc_thread            *mthread = &lfsck->li_thread;
+       struct ptlrpc_thread            *athread = &lad->lad_thread;
+       struct l_wait_info               lwi     = { 0 };
+       bool                             wakeup  = false;
+
+       l_wait_event(mthread->t_ctl_waitq,
+                    bk->lb_async_windows == 0 ||
+                    lad->lad_prefetched < bk->lb_async_windows ||
+                    !thread_is_running(mthread) ||
+                    thread_is_stopped(athread),
+                    &lwi);
+
+       if (unlikely(!thread_is_running(mthread)) ||
+                    thread_is_stopped(athread))
+               return 0;
+
+       if (unlikely(lfsck_is_dead_obj(lfsck->li_obj_dir)))
+               return 0;
 
        lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
        if (IS_ERR(lnr)) {
@@ -4174,7 +4249,7 @@ static int lfsck_namespace_post(const struct lu_env *env,
                com->lc_new_checked = 0;
        }
 
-       rc = lfsck_namespace_store(env, com);
+       rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
 
        CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
@@ -4246,8 +4321,8 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                                          lfsck->li_time_last_checkpoint;
                __u64 checked = ns->ln_items_checked + com->lc_new_checked;
                __u64 speed = checked;
-               __u64 new_checked = msecs_to_jiffies(com->lc_new_checked *
-                                                    MSEC_PER_SEC);
+               __u64 new_checked = com->lc_new_checked *
+                                   msecs_to_jiffies(MSEC_PER_SEC);
                __u32 rtime = ns->ln_run_time_phase1 +
                              cfs_duration_sec(duration + HALF_SEC);
 
@@ -4301,8 +4376,8 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                                com->lc_new_checked;
                __u64 speed1 = ns->ln_items_checked;
                __u64 speed2 = checked;
-               __u64 new_checked = msecs_to_jiffies(com->lc_new_checked *
-                                                    MSEC_PER_SEC);
+               __u64 new_checked = com->lc_new_checked *
+                                   msecs_to_jiffies(MSEC_PER_SEC);
                __u32 rtime = ns->ln_run_time_phase2 +
                              cfs_duration_sec(duration + HALF_SEC);
 
@@ -5952,7 +6027,7 @@ checkpoint:
                        ns->ln_time_last_checkpoint = cfs_time_current_sec();
                        ns->ln_objs_checked_phase2 += com->lc_new_checked;
                        com->lc_new_checked = 0;
-                       rc = lfsck_namespace_store(env, com);
+                       rc = lfsck_namespace_store(env, com, false);
                        up_write(&com->lc_sem);
                        if (rc != 0)
                                GOTO(put, rc);
@@ -6034,7 +6109,7 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env,
                ns->ln_status = LS_FAILED;
        }
 
-       rc = lfsck_namespace_store(env, com);
+       rc = lfsck_namespace_store(env, com, false);
        up_write(&com->lc_sem);
 
        return rc;
@@ -6485,10 +6560,10 @@ int lfsck_namespace_setup(const struct lu_env *env,
                GOTO(out, rc);
 
        rc = lfsck_namespace_load(env, com);
-       if (rc > 0)
-               rc = lfsck_namespace_reset(env, com, true);
-       else if (rc == -ENODATA)
+       if (rc == -ENODATA)
                rc = lfsck_namespace_init(env, com);
+       else if (rc < 0)
+               rc = lfsck_namespace_reset(env, com, true);
        if (rc != 0)
                GOTO(out, rc);