Whamcloud - gitweb
LU-1267 lfsck: rebuild LAST_ID 97/6997/35
authorFan Yong <fan.yong@intel.com>
Sat, 18 Jan 2014 01:04:09 +0000 (09:04 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 23 Jan 2014 23:57:36 +0000 (23:57 +0000)
The /O/<seq>/LAST_ID records the last oid of the object allocated
within the sequence. The LAST_ID file can be crashed or missed as
the system running. The LFSCK for layout consistency verification
can detect the LAST_ID lost/crashed cases, and can rebuild it via
scanning the whole device.

This functionality is also part of LU-14 live replacement of OST.

Introduce lfsck_notify callback - the LFSCK events notification
channel from the LFSCK to the registered users (MDD/OFD).

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Iee85056e2fda1ecba9424c9f0e822643e9f029a8
Reviewed-on: http://review.whamcloud.com/6997
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
17 files changed:
lustre/include/lustre/lustre_lfsck_user.h
lustre/include/lustre_export.h
lustre/include/lustre_lfsck.h
lustre/include/obd_support.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/mdd/mdd_device.c
lustre/ofd/lproc_ofd.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_fs.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_obd.c
lustre/ofd/ofd_objects.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_scrub.c
lustre/tests/sanity-lfsck.sh

index 8daa152..aca5eb3 100644 (file)
@@ -60,7 +60,7 @@ enum lfsck_type {
 
 #define LFSCK_TYPES_ALL        ((__u16)(~0))
 #define LFSCK_TYPES_DEF        ((__u16)0)
 
 #define LFSCK_TYPES_ALL        ((__u16)(~0))
 #define LFSCK_TYPES_DEF        ((__u16)0)
-#define LFSCK_TYPES_SUPPORTED  LT_NAMESPACE
+#define LFSCK_TYPES_SUPPORTED  (LT_LAYOUT | LT_NAMESPACE)
 
 #define LFSCK_SPEED_NO_LIMIT   0
 #define LFSCK_SPEED_LIMIT_DEF  LFSCK_SPEED_NO_LIMIT
 
 #define LFSCK_SPEED_NO_LIMIT   0
 #define LFSCK_SPEED_LIMIT_DEF  LFSCK_SPEED_NO_LIMIT
index a52eeaf..dc2f64d 100644 (file)
@@ -90,6 +90,7 @@ struct ec_export_data { /* echo client */
 struct filter_export_data {
        struct tg_export_data   fed_ted;
        spinlock_t              fed_lock;       /**< protects fed_mod_list */
 struct filter_export_data {
        struct tg_export_data   fed_ted;
        spinlock_t              fed_lock;       /**< protects fed_mod_list */
+       __u64                   fed_lastid_gen;
        long                    fed_dirty;    /* in bytes */
        long                    fed_grant;    /* in bytes */
        cfs_list_t              fed_mod_list; /* files being modified */
        long                    fed_dirty;    /* in bytes */
        long                    fed_grant;    /* in bytes */
        cfs_list_t              fed_mod_list; /* files being modified */
index a833018..b20eff9 100644 (file)
 #include <lu_object.h>
 #include <dt_object.h>
 
 #include <lu_object.h>
 #include <dt_object.h>
 
+/**
+ * status machine:
+ *
+ *                                     LS_INIT
+ *                                        |
+ *                                  (lfsck|start)
+ *                                        |
+ *                                        v
+ *                                LS_SCANNING_PHASE1
+ *                                     |       ^
+ *                                     |       :
+ *                                     | (lfsck:restart)
+ *                                     |       :
+ *                                     v       :
+ *     -----------------------------------------------------------------
+ *     |                   |^          |^         |^         |^        |^
+ *     |                   |:          |:         |:         |:        |:
+ *     v                   v:          v:         v:         v:        v:
+ * LS_SCANNING_PHASE2  LS_FAILED  LS_STOPPED  LS_PAUSED LS_CRASHED LS_PARTIAL
+ *     |       ^           ^:          ^:         ^:         ^:        ^:
+ *     |       :           |:          |:         |:         |:        |:
+ *     | (lfsck:restart)   |:          |:         |:         |:        |:
+ *     v       :           |v          |v         |v         |v        |v
+ *     -----------------------------------------------------------------
+ *         |
+ *         v
+ *    LS_COMPLETED
+ */
 enum lfsck_status {
        /* The lfsck file is new created, for new MDT, upgrading from old disk,
         * or re-creating the lfsck file manually. */
 enum lfsck_status {
        /* The lfsck file is new created, for new MDT, upgrading from old disk,
         * or re-creating the lfsck file manually. */
@@ -77,8 +105,17 @@ struct lfsck_start_param {
        struct ldlm_namespace   *lsp_namespace;
 };
 
        struct ldlm_namespace   *lsp_namespace;
 };
 
+enum lfsck_events {
+       LE_LASTID_REBUILDING    = 1,
+       LE_LASTID_REBUILT       = 2,
+};
+
+typedef int (*lfsck_out_notify)(const struct lu_env *env, void *data,
+                               enum lfsck_events event);
+
 int lfsck_register(const struct lu_env *env, struct dt_device *key,
 int lfsck_register(const struct lu_env *env, struct dt_device *key,
-                  struct dt_device *next, bool master);
+                  struct dt_device *next, lfsck_out_notify notify,
+                  void *notify_data, bool master);
 void lfsck_degister(const struct lu_env *env, struct dt_device *key);
 
 int lfsck_start(const struct lu_env *env, struct dt_device *key,
 void lfsck_degister(const struct lu_env *env, struct dt_device *key);
 
 int lfsck_start(const struct lu_env *env, struct dt_device *key,
index fb1b2b1..5ecdaab 100644 (file)
@@ -498,6 +498,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_CRASH           0x160a
 #define OBD_FAIL_LFSCK_NO_AUTO         0x160b
 #define OBD_FAIL_LFSCK_NO_DOUBLESCAN   0x160c
 #define OBD_FAIL_LFSCK_CRASH           0x160a
 #define OBD_FAIL_LFSCK_NO_AUTO         0x160b
 #define OBD_FAIL_LFSCK_NO_DOUBLESCAN   0x160c
+#define OBD_FAIL_LFSCK_SKIP_LASTID     0x160d
+#define OBD_FAIL_LFSCK_DELAY4          0x160e
 
 /* UPDATE */
 #define OBD_FAIL_UPDATE_OBJ_NET                        0x1700
 
 /* UPDATE */
 #define OBD_FAIL_UPDATE_OBJ_NET                        0x1700
index 3676341..1c16e06 100644 (file)
@@ -40,6 +40,7 @@
 #include <obd.h>
 #include <lu_object.h>
 #include <dt_object.h>
 #include <obd.h>
 #include <lu_object.h>
 #include <dt_object.h>
+#include <md_object.h>
 #include <lustre_net.h>
 #include <lustre_dlm.h>
 #include <lustre_fid.h>
 #include <lustre_net.h>
 #include <lustre_dlm.h>
 #include <lustre_fid.h>
@@ -65,6 +66,9 @@ enum lfsck_flags {
        /* The server ever restarted during the LFSCK, and may miss to process
         * some objects check/repair. */
        LF_INCOMPLETE           = 0x00000008ULL,
        /* The server ever restarted during the LFSCK, and may miss to process
         * some objects check/repair. */
        LF_INCOMPLETE           = 0x00000008ULL,
+
+       /* The LAST_ID (file) crashed. */
+       LF_CRASHED_LASTID       = 0x00000010ULL,
 };
 
 struct lfsck_position {
 };
 
 struct lfsck_position {
@@ -283,6 +287,9 @@ struct lfsck_operations {
 
        int (*lfsck_double_scan)(const struct lu_env *env,
                                 struct lfsck_component *com);
 
        int (*lfsck_double_scan)(const struct lu_env *env,
                                 struct lfsck_component *com);
+
+       void (*lfsck_data_release)(const struct lu_env *env,
+                                  struct lfsck_component *com);
 };
 
 struct lfsck_component {
 };
 
 struct lfsck_component {
@@ -300,6 +307,7 @@ struct lfsck_component {
        struct lfsck_operations *lc_ops;
        void                    *lc_file_ram;
        void                    *lc_file_disk;
        struct lfsck_operations *lc_ops;
        void                    *lc_file_ram;
        void                    *lc_file_disk;
+       void                    *lc_data;
 
        /* The time for last checkpoint, jiffies */
        cfs_time_t               lc_time_last_checkpoint;
 
        /* The time for last checkpoint, jiffies */
        cfs_time_t               lc_time_last_checkpoint;
@@ -349,6 +357,8 @@ struct lfsck_instance {
        /* The time for next checkpoint, jiffies */
        cfs_time_t                li_time_next_checkpoint;
 
        /* The time for next checkpoint, jiffies */
        cfs_time_t                li_time_next_checkpoint;
 
+       lfsck_out_notify          li_out_notify;
+       void                     *li_out_notify_data;
        struct dt_device         *li_next;
        struct dt_device         *li_bottom;
        struct ldlm_namespace    *li_namespace;
        struct dt_device         *li_next;
        struct dt_device         *li_bottom;
        struct ldlm_namespace    *li_namespace;
@@ -424,6 +434,7 @@ struct lfsck_thread_info {
                /* old LMA for compatibility */
                char                    lti_lma_old[LMA_OLD_SIZE];
        };
                /* old LMA for compatibility */
                char                    lti_lma_old[LMA_OLD_SIZE];
        };
+       struct dt_object_format lti_dof;
        /* lti_ent and lti_key must be conjoint,
         * then lti_ent::lde_name will be lti_key. */
        struct lu_dirent        lti_ent;
        /* lti_ent and lti_key must be conjoint,
         * then lti_ent::lde_name will be lti_key. */
        struct lu_dirent        lti_ent;
@@ -643,6 +654,12 @@ static inline void lfsck_component_put(const struct lu_env *env,
                        OBD_FREE(com->lc_file_ram, com->lc_file_size);
                if (com->lc_file_disk != NULL)
                        OBD_FREE(com->lc_file_disk, com->lc_file_size);
                        OBD_FREE(com->lc_file_ram, com->lc_file_size);
                if (com->lc_file_disk != NULL)
                        OBD_FREE(com->lc_file_disk, com->lc_file_size);
+               if (com->lc_data != NULL) {
+                       LASSERT(com->lc_ops->lfsck_data_release != NULL);
+
+                       com->lc_ops->lfsck_data_release(env, com);
+               }
+
                OBD_FREE_PTR(com);
        }
 }
                OBD_FREE_PTR(com);
        }
 }
index 402cca7..f04950a 100644 (file)
@@ -43,6 +43,7 @@
 #include <lustre_lib.h>
 #include <lustre_net.h>
 #include <lustre/lustre_user.h>
 #include <lustre_lib.h>
 #include <lustre_net.h>
 #include <lustre/lustre_user.h>
+#include <md_object.h>
 
 #include "lfsck_internal.h"
 
 
 #include "lfsck_internal.h"
 
 
 static const char lfsck_layout_name[] = "lfsck_layout";
 
 
 static const char lfsck_layout_name[] = "lfsck_layout";
 
+struct lfsck_layout_seq {
+       struct list_head         lls_list;
+       __u64                    lls_seq;
+       __u64                    lls_lastid;
+       __u64                    lls_lastid_known;
+       struct dt_object        *lls_lastid_obj;
+       unsigned int             lls_dirty:1;
+};
+
+struct lfsck_layout_slave_data {
+       /* list for lfsck_layout_seq */
+       struct list_head         llsd_seq_list;
+};
+
 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
                                   const struct lfsck_layout *src)
 {
 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
                                   const struct lfsck_layout *src)
 {
@@ -213,6 +228,335 @@ static int lfsck_layout_init(const struct lu_env *env,
        return rc;
 }
 
        return rc;
 }
 
+static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
+                            struct dt_object *obj, const struct lu_fid *fid)
+{
+       struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
+       struct lu_seq_range      range  = { 0 };
+       struct lustre_mdt_attrs *lma;
+       int                      rc;
+
+       fld_range_set_any(&range);
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
+       if (rc == 0) {
+               if (fld_range_is_ost(&range))
+                       return 1;
+
+               return 0;
+       }
+
+       lma = &lfsck_env_info(env)->lti_lma;
+       rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
+                         XATTR_NAME_LMA, BYPASS_CAPA);
+       if (rc == sizeof(*lma)) {
+               lustre_lma_swab(lma);
+
+               /* Generally, the low layer OSD create handler or OI scrub
+                * will set the LMAC_FID_ON_OST for all external visible
+                * OST-objects. But to make the otable-based iteration to
+                * be independent from OI scrub in spite of it got failure
+                * or not, we check the LMAC_FID_ON_OST here to guarantee
+                * that the LFSCK will not repair something by wrong. */
+               return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
+       }
+
+       rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
+
+       return rc > 0;
+}
+
+static struct lfsck_layout_seq *
+lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
+{
+       struct lfsck_layout_seq *lls;
+
+       list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
+               if (lls->lls_seq == seq)
+                       return lls;
+
+               if (lls->lls_seq > seq)
+                       return NULL;
+       }
+
+       return NULL;
+}
+
+static void
+lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
+                       struct lfsck_layout_seq *lls)
+{
+       struct lfsck_layout_seq *tmp;
+       struct list_head        *pos = &llsd->llsd_seq_list;
+
+       list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
+               if (lls->lls_seq < tmp->lls_seq) {
+                       pos = &tmp->lls_list;
+                       break;
+               }
+       }
+       list_add_tail(&lls->lls_list, pos);
+}
+
+static int
+lfsck_layout_lastid_create(const struct lu_env *env,
+                          struct lfsck_instance *lfsck,
+                          struct dt_object *obj)
+{
+       struct lfsck_thread_info *info   = lfsck_env_info(env);
+       struct lu_attr           *la     = &info->lti_la;
+       struct dt_object_format  *dof    = &info->lti_dof;
+       struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
+       struct dt_device         *dt     = lfsck->li_bottom;
+       struct thandle           *th;
+       __u64                     lastid = 0;
+       loff_t                    pos    = 0;
+       int                       rc;
+       ENTRY;
+
+       CDEBUG(D_LFSCK, "To create LAST_ID for <seq> "LPX64"\n",
+              fid_seq(lfsck_dto2fid(obj)));
+
+       if (bk->lb_param & LPF_DRYRUN)
+               return 0;
+
+       memset(la, 0, sizeof(*la));
+       la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
+       la->la_valid = LA_MODE | LA_UID | LA_GID;
+       dof->dof_type = dt_mode_to_dft(S_IFREG);
+
+       th = dt_trans_create(env, dt);
+       if (IS_ERR(th))
+               RETURN(rc = PTR_ERR(th));
+
+       rc = dt_declare_create(env, obj, la, NULL, dof, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dt, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       if (likely(!dt_object_exists(obj))) {
+               rc = dt_create(env, obj, la, NULL, dof, th);
+               if (rc == 0)
+                       rc = dt_record_write(env, obj,
+                               lfsck_buf_get(env, &lastid, sizeof(lastid)),
+                               &pos, th);
+       }
+       dt_write_unlock(env, obj);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dt, th);
+
+       return rc;
+}
+
+static int
+lfsck_layout_lastid_reload(const struct lu_env *env,
+                          struct lfsck_component *com,
+                          struct lfsck_layout_seq *lls)
+{
+       __u64   lastid;
+       loff_t  pos     = 0;
+       int     rc;
+
+       dt_read_lock(env, lls->lls_lastid_obj, 0);
+       rc = dt_record_read(env, lls->lls_lastid_obj,
+                           lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
+       dt_read_unlock(env, lls->lls_lastid_obj);
+       if (unlikely(rc != 0))
+               return rc;
+
+       lastid = le64_to_cpu(lastid);
+       if (lastid < lls->lls_lastid_known) {
+               struct lfsck_instance   *lfsck  = com->lc_lfsck;
+               struct lfsck_layout     *lo     = com->lc_file_ram;
+
+               lls->lls_lastid = lls->lls_lastid_known;
+               lls->lls_dirty = 1;
+               if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
+                       LASSERT(lfsck->li_out_notify != NULL);
+
+                       lfsck->li_out_notify(env, lfsck->li_out_notify_data,
+                                            LE_LASTID_REBUILDING);
+                       lo->ll_flags |= LF_CRASHED_LASTID;
+               }
+       } else if (lastid >= lls->lls_lastid) {
+               lls->lls_lastid = lastid;
+               lls->lls_dirty = 0;
+       }
+
+       return 0;
+}
+
+static int
+lfsck_layout_lastid_store(const struct lu_env *env,
+                         struct lfsck_component *com)
+{
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
+       struct dt_device                *dt     = lfsck->li_bottom;
+       struct lfsck_layout_slave_data  *llsd   = com->lc_data;
+       struct lfsck_layout_seq         *lls;
+       struct thandle                  *th;
+       __u64                            lastid;
+       int                              rc     = 0;
+       int                              rc1    = 0;
+
+       list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
+               loff_t pos = 0;
+
+               /* XXX: Add the code back if we really found related
+                *      inconsistent cases in the future. */
+#if 0
+               if (!lls->lls_dirty) {
+                       /* In OFD, before the pre-creation, the LAST_ID
+                        * file will be updated firstly, which may hide
+                        * some potential crashed cases. For example:
+                        *
+                        * The old obj1's ID is higher than old LAST_ID
+                        * but lower than the new LAST_ID, but the LFSCK
+                        * have not touch the obj1 until the OFD updated
+                        * the LAST_ID. So the LFSCK does not regard it
+                        * as crashed case. But when OFD does not create
+                        * successfully, it will set the LAST_ID as the
+                        * real created objects' ID, then LFSCK needs to
+                        * found related inconsistency. */
+                       rc = lfsck_layout_lastid_reload(env, com, lls);
+                       if (likely(!lls->lls_dirty))
+                               continue;
+               }
+#endif
+
+               CDEBUG(D_LFSCK, "To sync the LAST_ID for <seq> "LPX64
+                      " as <oid> "LPU64"\n", lls->lls_seq, lls->lls_lastid);
+
+               if (bk->lb_param & LPF_DRYRUN) {
+                       lls->lls_dirty = 0;
+                       continue;
+               }
+
+               th = dt_trans_create(env, dt);
+               if (IS_ERR(th)) {
+                       rc1 = PTR_ERR(th);
+                       CERROR("%s: (1) failed to store "LPX64": rc = %d\n",
+                              lfsck_lfsck2name(com->lc_lfsck),
+                              lls->lls_seq, rc1);
+                       continue;
+               }
+
+               rc = dt_declare_record_write(env, lls->lls_lastid_obj,
+                                            sizeof(lastid), pos, th);
+               if (rc != 0)
+                       goto stop;
+
+               rc = dt_trans_start_local(env, dt, th);
+               if (rc != 0)
+                       goto stop;
+
+               lastid = cpu_to_le64(lls->lls_lastid);
+               dt_write_lock(env, lls->lls_lastid_obj, 0);
+               rc = dt_record_write(env, lls->lls_lastid_obj,
+                                    lfsck_buf_get(env, &lastid,
+                                    sizeof(lastid)), &pos, th);
+               dt_write_unlock(env, lls->lls_lastid_obj);
+               if (rc == 0)
+                       lls->lls_dirty = 0;
+
+stop:
+               dt_trans_stop(env, dt, th);
+               if (rc != 0) {
+                       rc1 = rc;
+                       CERROR("%s: (2) failed to store "LPX64": rc = %d\n",
+                              lfsck_lfsck2name(com->lc_lfsck),
+                              lls->lls_seq, rc1);
+               }
+       }
+
+       return rc1;
+}
+
+static int
+lfsck_layout_lastid_load(const struct lu_env *env,
+                        struct lfsck_component *com,
+                        struct lfsck_layout_seq *lls)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lfsck_layout     *lo     = com->lc_file_ram;
+       struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
+       struct dt_object        *obj;
+       loff_t                   pos    = 0;
+       int                      rc;
+       ENTRY;
+
+       lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
+       obj = dt_locate(env, lfsck->li_bottom, fid);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+
+       /* LAST_ID crashed, to be rebuilt */
+       if (!dt_object_exists(obj)) {
+               if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
+                       LASSERT(lfsck->li_out_notify != NULL);
+
+                       lfsck->li_out_notify(env, lfsck->li_out_notify_data,
+                                            LE_LASTID_REBUILDING);
+                       lo->ll_flags |= LF_CRASHED_LASTID;
+
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
+                           cfs_fail_val > 0) {
+                               struct l_wait_info lwi = LWI_TIMEOUT(
+                                               cfs_time_seconds(cfs_fail_val),
+                                               NULL, NULL);
+
+                               up_write(&com->lc_sem);
+                               l_wait_event(lfsck->li_thread.t_ctl_waitq,
+                                            !thread_is_running(&lfsck->li_thread),
+                                            &lwi);
+                               down_write(&com->lc_sem);
+                       }
+               }
+
+               rc = lfsck_layout_lastid_create(env, lfsck, obj);
+       } else {
+               dt_read_lock(env, obj, 0);
+               rc = dt_read(env, obj,
+                       lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
+                       &pos);
+               dt_read_unlock(env, obj);
+               if (rc != 0 && rc != sizeof(__u64))
+                       GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
+
+               if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
+                       LASSERT(lfsck->li_out_notify != NULL);
+
+                       lfsck->li_out_notify(env, lfsck->li_out_notify_data,
+                                            LE_LASTID_REBUILDING);
+                       lo->ll_flags |= LF_CRASHED_LASTID;
+               }
+
+               lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
+               rc = 0;
+       }
+
+       GOTO(out, rc);
+
+out:
+       if (rc != 0)
+               lfsck_object_put(env, obj);
+       else
+               lls->lls_lastid_obj = obj;
+
+       return rc;
+}
+
 /* layout APIs */
 /* XXX: Some to be implemented in other patch(es). */
 
 /* layout APIs */
 /* XXX: Some to be implemented in other patch(es). */
 
@@ -246,12 +590,51 @@ static int lfsck_layout_reset(const struct lu_env *env,
 static void lfsck_layout_fail(const struct lu_env *env,
                              struct lfsck_component *com, bool new_checked)
 {
 static void lfsck_layout_fail(const struct lu_env *env,
                              struct lfsck_component *com, bool new_checked)
 {
+       struct lfsck_layout *lo = com->lc_file_ram;
+
+       down_write(&com->lc_sem);
+       if (new_checked)
+               com->lc_new_checked++;
+       lo->ll_objs_failed_phase1++;
+       if (lo->ll_pos_first_inconsistent == 0) {
+               struct lfsck_instance *lfsck = com->lc_lfsck;
+
+               lo->ll_pos_first_inconsistent =
+                       lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
+                                                       lfsck->li_di_oit);
+       }
+       up_write(&com->lc_sem);
 }
 
 static int lfsck_layout_checkpoint(const struct lu_env *env,
                                   struct lfsck_component *com, bool init)
 {
 }
 
 static int lfsck_layout_checkpoint(const struct lu_env *env,
                                   struct lfsck_component *com, bool init)
 {
-       return 0;
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct lfsck_layout     *lo    = com->lc_file_ram;
+       int                      rc;
+
+       if (com->lc_new_checked == 0 && !init)
+               return 0;
+
+       down_write(&com->lc_sem);
+
+       if (init) {
+               lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
+       } else {
+               lo->ll_pos_last_checkpoint =
+                                       lfsck->li_pos_current.lp_oit_cookie;
+               lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - lfsck->li_time_last_checkpoint);
+               lo->ll_time_last_checkpoint = cfs_time_current_sec();
+               lo->ll_objs_checked_phase1 += com->lc_new_checked;
+               com->lc_new_checked = 0;
+       }
+
+       rc = lfsck_layout_store(env, com);
+
+       up_write(&com->lc_sem);
+
+       return rc;
 }
 
 static int lfsck_layout_master_prep(const struct lu_env *env,
 }
 
 static int lfsck_layout_master_prep(const struct lu_env *env,
@@ -263,6 +646,64 @@ static int lfsck_layout_master_prep(const struct lu_env *env,
 static int lfsck_layout_slave_prep(const struct lu_env *env,
                                   struct lfsck_component *com)
 {
 static int lfsck_layout_slave_prep(const struct lu_env *env,
                                   struct lfsck_component *com)
 {
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lfsck_layout     *lo     = com->lc_file_ram;
+       struct lfsck_position   *pos    = &com->lc_pos_start;
+
+       /* XXX: For a new scanning, generate OST-objects
+        *      bitmap for orphan detection. */
+
+       fid_zero(&pos->lp_dir_parent);
+       pos->lp_dir_cookie = 0;
+       if (lo->ll_status == LS_COMPLETED ||
+           lo->ll_status == LS_PARTIAL) {
+               int rc;
+
+               rc = lfsck_layout_reset(env, com, false);
+               if (rc != 0)
+                       return rc;
+       }
+
+       down_write(&com->lc_sem);
+
+       lo->ll_time_latest_start = cfs_time_current_sec();
+
+       spin_lock(&lfsck->li_lock);
+       if (lo->ll_flags & LF_SCANNED_ONCE) {
+               if (!lfsck->li_drop_dryrun ||
+                   lo->ll_pos_first_inconsistent == 0) {
+                       lo->ll_status = LS_SCANNING_PHASE2;
+                       list_del_init(&com->lc_link);
+                       list_add_tail(&com->lc_link,
+                                     &lfsck->li_list_double_scan);
+                       pos->lp_oit_cookie = 0;
+               } else {
+                       int i;
+
+                       lo->ll_status = LS_SCANNING_PHASE1;
+                       lo->ll_run_time_phase1 = 0;
+                       lo->ll_run_time_phase2 = 0;
+                       lo->ll_objs_checked_phase1 = 0;
+                       lo->ll_objs_checked_phase2 = 0;
+                       lo->ll_objs_failed_phase1 = 0;
+                       lo->ll_objs_failed_phase2 = 0;
+                       for (i = 0; i < LLIT_MAX; i++)
+                               lo->ll_objs_repaired[i] = 0;
+
+                       pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
+               }
+       } else {
+               lo->ll_status = LS_SCANNING_PHASE1;
+               if (!lfsck->li_drop_dryrun ||
+                   lo->ll_pos_first_inconsistent == 0)
+                       pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
+               else
+                       pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
+       }
+       spin_unlock(&lfsck->li_lock);
+
+       up_write(&com->lc_sem);
+
        return 0;
 }
 
        return 0;
 }
 
@@ -277,7 +718,84 @@ static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
                                       struct lfsck_component *com,
                                       struct dt_object *obj)
 {
                                       struct lfsck_component *com,
                                       struct dt_object *obj)
 {
-       return 0;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_layout             *lo     = com->lc_file_ram;
+       const struct lu_fid             *fid    = lfsck_dto2fid(obj);
+       struct lfsck_layout_slave_data  *llsd   = com->lc_data;
+       struct lfsck_layout_seq         *lls;
+       __u64                            seq;
+       __u64                            oid;
+       int                              rc;
+       ENTRY;
+
+       /* XXX: Update OST-objects bitmap for orphan detection. */
+
+       LASSERT(llsd != NULL);
+
+       down_write(&com->lc_sem);
+       if (fid_is_idif(fid))
+               seq = 0;
+       else if (!fid_is_norm(fid) ||
+                !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
+               GOTO(unlock, rc = 0);
+       else
+               seq = fid_seq(fid);
+       com->lc_new_checked++;
+
+       lls = lfsck_layout_seq_lookup(llsd, seq);
+       if (lls == NULL) {
+               OBD_ALLOC_PTR(lls);
+               if (unlikely(lls == NULL))
+                       GOTO(unlock, rc = -ENOMEM);
+
+               INIT_LIST_HEAD(&lls->lls_list);
+               lls->lls_seq = seq;
+               rc = lfsck_layout_lastid_load(env, com, lls);
+               if (rc != 0) {
+                       lo->ll_objs_failed_phase1++;
+                       OBD_FREE_PTR(lls);
+                       GOTO(unlock, rc);
+               }
+
+               lfsck_layout_seq_insert(llsd, lls);
+       }
+
+       if (unlikely(fid_is_last_id(fid)))
+               GOTO(unlock, rc = 0);
+
+       oid = fid_oid(fid);
+       if (oid > lls->lls_lastid_known)
+               lls->lls_lastid_known = oid;
+
+       if (oid > lls->lls_lastid) {
+               if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
+                       /* OFD may create new objects during LFSCK scanning. */
+                       rc = lfsck_layout_lastid_reload(env, com, lls);
+                       if (unlikely(rc != 0))
+                               CWARN("%s: failed to reload LAST_ID for "LPX64
+                                     ": rc = %d\n",
+                                     lfsck_lfsck2name(com->lc_lfsck),
+                                     lls->lls_seq, rc);
+                       if (oid <= lls->lls_lastid)
+                               GOTO(unlock, rc = 0);
+
+                       LASSERT(lfsck->li_out_notify != NULL);
+
+                       lfsck->li_out_notify(env, lfsck->li_out_notify_data,
+                                            LE_LASTID_REBUILDING);
+                       lo->ll_flags |= LF_CRASHED_LASTID;
+               }
+
+               lls->lls_lastid = oid;
+               lls->lls_dirty = 1;
+       }
+
+       GOTO(unlock, rc = 0);
+
+unlock:
+       up_write(&com->lc_sem);
+
+       return rc;
 }
 
 static int lfsck_layout_exec_dir(const struct lu_env *env,
 }
 
 static int lfsck_layout_exec_dir(const struct lu_env *env,
@@ -299,13 +817,246 @@ static int lfsck_layout_slave_post(const struct lu_env *env,
                                   struct lfsck_component *com,
                                   int result, bool init)
 {
                                   struct lfsck_component *com,
                                   int result, bool init)
 {
-       return 0;
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct lfsck_layout     *lo    = com->lc_file_ram;
+       int                      rc;
+       bool                     done  = false;
+
+       rc = lfsck_layout_lastid_store(env, com);
+       if (rc != 0)
+               result = rc;
+
+       LASSERT(lfsck->li_out_notify != NULL);
+
+       down_write(&com->lc_sem);
+
+       spin_lock(&lfsck->li_lock);
+       if (!init)
+               lo->ll_pos_last_checkpoint =
+                                       lfsck->li_pos_current.lp_oit_cookie;
+       if (result > 0) {
+               lo->ll_status = LS_SCANNING_PHASE2;
+               lo->ll_flags |= LF_SCANNED_ONCE;
+               if (lo->ll_flags & LF_CRASHED_LASTID) {
+                       done = true;
+                       lo->ll_flags &= ~LF_CRASHED_LASTID;
+               }
+               lo->ll_flags &= ~LF_UPGRADE;
+               list_del_init(&com->lc_link);
+               list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
+       } else if (result == 0) {
+               if (lfsck->li_paused) {
+                       lo->ll_status = LS_PAUSED;
+               } else {
+                       lo->ll_status = LS_STOPPED;
+                       list_del_init(&com->lc_link);
+                       list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+               }
+       } else {
+               lo->ll_status = LS_FAILED;
+               list_del_init(&com->lc_link);
+               list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+       }
+       spin_unlock(&lfsck->li_lock);
+
+       if (done)
+               lfsck->li_out_notify(env, lfsck->li_out_notify_data,
+                                    LE_LASTID_REBUILT);
+
+       if (!init) {
+               lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - lfsck->li_time_last_checkpoint);
+               lo->ll_time_last_checkpoint = cfs_time_current_sec();
+               lo->ll_objs_checked_phase1 += com->lc_new_checked;
+               com->lc_new_checked = 0;
+       }
+
+       rc = lfsck_layout_store(env, com);
+
+       up_write(&com->lc_sem);
+
+       return rc;
 }
 
 static int lfsck_layout_dump(const struct lu_env *env,
                             struct lfsck_component *com, char *buf, int len)
 {
 }
 
 static int lfsck_layout_dump(const struct lu_env *env,
                             struct lfsck_component *com, char *buf, int len)
 {
-       return 0;
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
+       struct lfsck_layout     *lo    = com->lc_file_ram;
+       int                      save  = len;
+       int                      ret   = -ENOSPC;
+       int                      rc;
+
+       down_read(&com->lc_sem);
+       rc = snprintf(buf, len,
+                     "name: lfsck_layout\n"
+                     "magic: %#x\n"
+                     "version: %d\n"
+                     "status: %s\n",
+                     lo->ll_magic,
+                     bk->lb_version,
+                     lfsck_status2names(lo->ll_status));
+       if (rc <= 0)
+               goto out;
+
+       buf += rc;
+       len -= rc;
+       rc = lfsck_bits_dump(&buf, &len, lo->ll_flags, lfsck_flags_names,
+                            "flags");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
+                            "param");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_complete,
+                            "time_since_last_completed");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_time_dump(&buf, &len, lo->ll_time_latest_start,
+                            "time_since_latest_start");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_checkpoint,
+                            "time_since_last_checkpoint");
+       if (rc < 0)
+               goto out;
+
+       rc = snprintf(buf, len,
+                     "latest_start_position: "LPU64"\n"
+                     "last_checkpoint_position: "LPU64"\n"
+                     "first_failure_position: "LPU64"\n",
+                     lo->ll_pos_latest_start,
+                     lo->ll_pos_last_checkpoint,
+                     lo->ll_pos_first_inconsistent);
+       if (rc <= 0)
+               goto out;
+
+       buf += rc;
+       len -= rc;
+
+       rc = snprintf(buf, len,
+                     "success_count: %u\n"
+                     "repaired_dangling: "LPU64"\n"
+                     "repaired_unmatched_pair: "LPU64"\n"
+                     "repaired_multiple_referenced: "LPU64"\n"
+                     "repaired_orphan: "LPU64"\n"
+                     "repaired_inconsistent_owner: "LPU64"\n"
+                     "repaired_others: "LPU64"\n"
+                     "skipped: "LPU64"\n"
+                     "failed_phase1: "LPU64"\n"
+                     "failed_phase2: "LPU64"\n",
+                     lo->ll_success_count,
+                     lo->ll_objs_repaired[LLIT_DANGLING - 1],
+                     lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
+                     lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
+                     lo->ll_objs_repaired[LLIT_ORPHAN - 1],
+                     lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
+                     lo->ll_objs_repaired[LLIT_OTHERS - 1],
+                     lo->ll_objs_skipped,
+                     lo->ll_objs_failed_phase1,
+                     lo->ll_objs_failed_phase2);
+       if (rc <= 0)
+               goto out;
+
+       buf += rc;
+       len -= rc;
+
+       if (lo->ll_status == LS_SCANNING_PHASE1) {
+               __u64 pos;
+               const struct dt_it_ops *iops;
+               cfs_duration_t duration = cfs_time_current() -
+                                         lfsck->li_time_last_checkpoint;
+               __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
+               __u64 speed = checked;
+               __u64 new_checked = com->lc_new_checked * HZ;
+               __u32 rtime = lo->ll_run_time_phase1 +
+                             cfs_duration_sec(duration + HALF_SEC);
+
+               if (duration != 0)
+                       do_div(new_checked, duration);
+               if (rtime != 0)
+                       do_div(speed, rtime);
+               rc = snprintf(buf, len,
+                             "checked_phase1: "LPU64"\n"
+                             "checked_phase2: "LPU64"\n"
+                             "run_time_phase1: %u seconds\n"
+                             "run_time_phase2: %u seconds\n"
+                             "average_speed_phase1: "LPU64" items/sec\n"
+                             "average_speed_phase2: N/A\n"
+                             "real-time_speed_phase1: "LPU64" items/sec\n"
+                             "real-time_speed_phase2: N/A\n",
+                             checked,
+                             lo->ll_objs_checked_phase2,
+                             rtime,
+                             lo->ll_run_time_phase2,
+                             speed,
+                             new_checked);
+               if (rc <= 0)
+                       goto out;
+
+               buf += rc;
+               len -= rc;
+
+               LASSERT(lfsck->li_di_oit != NULL);
+
+               iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
+
+               /* The low layer otable-based iteration position may NOT
+                * exactly match the layout-based directory traversal
+                * cookie. Generally, it is not a serious issue. But the
+                * caller should NOT make assumption on that. */
+               pos = iops->store(env, lfsck->li_di_oit);
+               if (!lfsck->li_current_oit_processed)
+                       pos--;
+               rc = snprintf(buf, len, "current_position: "LPU64"\n", pos);
+               if (rc <= 0)
+                       goto out;
+
+               buf += rc;
+               len -= rc;
+       } else {
+               /* XXX: LS_SCANNING_PHASE2 will be handled in the future. */
+               __u64 speed1 = lo->ll_objs_checked_phase1;
+               __u64 speed2 = lo->ll_objs_checked_phase2;
+
+               if (lo->ll_run_time_phase1 != 0)
+                       do_div(speed1, lo->ll_run_time_phase1);
+               if (lo->ll_run_time_phase2 != 0)
+                       do_div(speed2, lo->ll_run_time_phase2);
+               rc = snprintf(buf, len,
+                             "checked_phase1: "LPU64"\n"
+                             "checked_phase2: "LPU64"\n"
+                             "run_time_phase1: %u seconds\n"
+                             "run_time_phase2: %u seconds\n"
+                             "average_speed_phase1: "LPU64" items/sec\n"
+                             "average_speed_phase2: "LPU64" objs/sec\n"
+                             "real-time_speed_phase1: N/A\n"
+                             "real-time_speed_phase2: N/A\n"
+                             "current_position: N/A\n",
+                             lo->ll_objs_checked_phase1,
+                             lo->ll_objs_checked_phase2,
+                             lo->ll_run_time_phase1,
+                             lo->ll_run_time_phase2,
+                             speed1,
+                             speed2);
+               if (rc <= 0)
+                       goto out;
+
+               buf += rc;
+               len -= rc;
+       }
+       ret = save - len;
+
+out:
+       up_read(&com->lc_sem);
+
+       return ret;
 }
 
 static int lfsck_layout_master_double_scan(const struct lu_env *env,
 }
 
 static int lfsck_layout_master_double_scan(const struct lu_env *env,
@@ -317,7 +1068,81 @@ static int lfsck_layout_master_double_scan(const struct lu_env *env,
 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
                                          struct lfsck_component *com)
 {
 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
                                          struct lfsck_component *com)
 {
-       return 0;
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
+       struct lfsck_layout     *lo    = com->lc_file_ram;
+       int                      rc    = 1;
+
+       down_write(&com->lc_sem);
+
+       lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - lfsck->li_time_last_checkpoint);
+       lo->ll_time_last_checkpoint = cfs_time_current_sec();
+       lo->ll_objs_checked_phase2 += com->lc_new_checked;
+
+       com->lc_new_checked = 0;
+       com->lc_new_scanned = 0;
+       com->lc_time_last_checkpoint = cfs_time_current();
+       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+
+       if (rc > 0) {
+               com->lc_journal = 0;
+               if (lo->ll_flags & LF_INCOMPLETE)
+                       lo->ll_status = LS_PARTIAL;
+               else
+                       lo->ll_status = LS_COMPLETED;
+               if (!(bk->lb_param & LPF_DRYRUN))
+                       lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
+               lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
+               lo->ll_success_count++;
+       } else if (rc == 0) {
+               if (lfsck->li_paused)
+                       lo->ll_status = LS_PAUSED;
+               else
+                       lo->ll_status = LS_STOPPED;
+       } else {
+               lo->ll_status = LS_FAILED;
+       }
+
+       if (lo->ll_status != LS_PAUSED) {
+               spin_lock(&lfsck->li_lock);
+               list_del_init(&com->lc_link);
+               list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+               spin_unlock(&lfsck->li_lock);
+       }
+
+       rc = lfsck_layout_store(env, com);
+
+       up_write(&com->lc_sem);
+
+       return rc;
+}
+
+static void lfsck_layout_master_data_release(const struct lu_env *env,
+                                            struct lfsck_component *com)
+{
+}
+
+static void lfsck_layout_slave_data_release(const struct lu_env *env,
+                                           struct lfsck_component *com)
+{
+       struct lfsck_layout_slave_data  *llsd   = com->lc_data;
+       struct lfsck_layout_seq         *lls;
+       struct lfsck_layout_seq         *next;
+
+       LASSERT(llsd != NULL);
+
+       com->lc_data = NULL;
+
+       list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
+                                    lls_list) {
+               list_del_init(&lls->lls_list);
+               lfsck_object_put(env, lls->lls_lastid_obj);
+               OBD_FREE_PTR(lls);
+       }
+
+       OBD_FREE_PTR(llsd);
 }
 
 static struct lfsck_operations lfsck_layout_master_ops = {
 }
 
 static struct lfsck_operations lfsck_layout_master_ops = {
@@ -330,6 +1155,7 @@ static struct lfsck_operations lfsck_layout_master_ops = {
        .lfsck_post             = lfsck_layout_master_post,
        .lfsck_dump             = lfsck_layout_dump,
        .lfsck_double_scan      = lfsck_layout_master_double_scan,
        .lfsck_post             = lfsck_layout_master_post,
        .lfsck_dump             = lfsck_layout_dump,
        .lfsck_double_scan      = lfsck_layout_master_double_scan,
+       .lfsck_data_release     = lfsck_layout_master_data_release,
 };
 
 static struct lfsck_operations lfsck_layout_slave_ops = {
 };
 
 static struct lfsck_operations lfsck_layout_slave_ops = {
@@ -342,6 +1168,7 @@ static struct lfsck_operations lfsck_layout_slave_ops = {
        .lfsck_post             = lfsck_layout_slave_post,
        .lfsck_dump             = lfsck_layout_dump,
        .lfsck_double_scan      = lfsck_layout_slave_double_scan,
        .lfsck_post             = lfsck_layout_slave_post,
        .lfsck_dump             = lfsck_layout_dump,
        .lfsck_double_scan      = lfsck_layout_slave_double_scan,
+       .lfsck_data_release     = lfsck_layout_slave_data_release,
 };
 
 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
 };
 
 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
@@ -363,10 +1190,19 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
        atomic_set(&com->lc_ref, 1);
        com->lc_lfsck = lfsck;
        com->lc_type = LT_LAYOUT;
        atomic_set(&com->lc_ref, 1);
        com->lc_lfsck = lfsck;
        com->lc_type = LT_LAYOUT;
-       if (lfsck->li_master)
+       if (lfsck->li_master) {
                com->lc_ops = &lfsck_layout_master_ops;
                com->lc_ops = &lfsck_layout_master_ops;
-       else
+       } else {
+               struct lfsck_layout_slave_data *llsd;
+
                com->lc_ops = &lfsck_layout_slave_ops;
                com->lc_ops = &lfsck_layout_slave_ops;
+               OBD_ALLOC_PTR(llsd);
+               if (llsd == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               INIT_LIST_HEAD(&llsd->llsd_seq_list);
+               com->lc_data = llsd;
+       }
        com->lc_file_size = sizeof(*lo);
        OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
        if (com->lc_file_ram == NULL)
        com->lc_file_size = sizeof(*lo);
        OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
        if (com->lc_file_ram == NULL)
@@ -430,6 +1266,13 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
                break;
        }
 
                break;
        }
 
+       if (lo->ll_flags & LF_CRASHED_LASTID) {
+               LASSERT(lfsck->li_out_notify != NULL);
+
+               lfsck->li_out_notify(env, lfsck->li_out_notify_data,
+                                    LE_LASTID_REBUILDING);
+       }
+
        GOTO(out, rc = 0);
 
 out:
        GOTO(out, rc = 0);
 
 out:
index 37efbcc..bc703ea 100644 (file)
@@ -77,6 +77,7 @@ const char *lfsck_flags_names[] = {
        "inconsistent",
        "upgrade",
        "incomplete",
        "inconsistent",
        "upgrade",
        "incomplete",
+       "crashed_lastid",
        NULL
 };
 
        NULL
 };
 
@@ -1119,7 +1120,8 @@ int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
 EXPORT_SYMBOL(lfsck_stop);
 
 int lfsck_register(const struct lu_env *env, struct dt_device *key,
 EXPORT_SYMBOL(lfsck_stop);
 
 int lfsck_register(const struct lu_env *env, struct dt_device *key,
-                  struct dt_device *next, bool master)
+                  struct dt_device *next, lfsck_out_notify notify,
+                  void *notify_data, bool master)
 {
        struct lfsck_instance   *lfsck;
        struct dt_object        *root  = NULL;
 {
        struct lfsck_instance   *lfsck;
        struct dt_object        *root  = NULL;
@@ -1145,6 +1147,8 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
        atomic_set(&lfsck->li_ref, 1);
        init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
        CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
        atomic_set(&lfsck->li_ref, 1);
        init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
+       lfsck->li_out_notify = notify;
+       lfsck->li_out_notify_data = notify_data;
        lfsck->li_next = next;
        lfsck->li_bottom = key;
 
        lfsck->li_next = next;
        lfsck->li_bottom = key;
 
index 1d049ea..3a90cff 100644 (file)
@@ -893,6 +893,12 @@ out_put:
        return 0;
 }
 
        return 0;
 }
 
+static int mdd_lfsck_out_notify(const struct lu_env *env, void *data,
+                               enum lfsck_events event)
+{
+       return 0;
+}
+
 static int mdd_prepare(const struct lu_env *env,
                        struct lu_device *pdev,
                        struct lu_device *cdev)
 static int mdd_prepare(const struct lu_env *env,
                        struct lu_device *pdev,
                        struct lu_device *cdev)
@@ -965,7 +971,8 @@ static int mdd_prepare(const struct lu_env *env,
        if (rc != 0)
                GOTO(out_changelog, rc);
 
        if (rc != 0)
                GOTO(out_changelog, rc);
 
-       rc = lfsck_register(env, mdd->mdd_bottom, mdd->mdd_child, true);
+       rc = lfsck_register(env, mdd->mdd_bottom, mdd->mdd_child,
+                           mdd_lfsck_out_notify, mdd, true);
        if (rc != 0) {
                CERROR("%s: failed to initialize lfsck: rc = %d\n",
                       mdd2obd_dev(mdd)->obd_name, rc);
        if (rc != 0) {
                CERROR("%s: failed to initialize lfsck: rc = %d\n",
                       mdd2obd_dev(mdd)->obd_name, rc);
index 4354497..e303fd6 100644 (file)
@@ -41,6 +41,7 @@
 #include <obd.h>
 #include <lprocfs_status.h>
 #include <linux/seq_file.h>
 #include <obd.h>
 #include <lprocfs_status.h>
 #include <linux/seq_file.h>
+#include <lustre_lfsck.h>
 
 #include "ofd_internal.h"
 
 
 #include "ofd_internal.h"
 
@@ -488,6 +489,45 @@ int lprocfs_ofd_wr_soft_sync_limit(struct file *file, const char *buffer,
        return lprocfs_wr_uint(file, buffer, count, &ofd->ofd_soft_sync_limit);
 }
 
        return lprocfs_wr_uint(file, buffer, count, &ofd->ofd_soft_sync_limit);
 }
 
+static int lprocfs_rd_lfsck_speed_limit(char *page, char **start, off_t off,
+                                       int count, int *eof, void *data)
+{
+       struct obd_device       *obd = data;
+       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+
+       *eof = 1;
+
+       return lfsck_get_speed(ofd->ofd_osd, page, count);
+}
+
+static int lprocfs_wr_lfsck_speed_limit(struct file *file, const char *buffer,
+                                       unsigned long count, void *data)
+{
+       struct obd_device       *obd = data;
+       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+       __u32                    val;
+       int                      rc;
+
+       rc = lprocfs_write_helper(buffer, count, &val);
+       if (rc != 0)
+               return rc;
+
+       rc = lfsck_set_speed(ofd->ofd_osd, val);
+
+       return rc != 0 ? rc : count;
+}
+
+static int lprocfs_rd_lfsck_layout(char *page, char **start, off_t off,
+                                  int count, int *eof, void *data)
+{
+       struct obd_device       *obd = data;
+       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+
+       *eof = 1;
+
+       return lfsck_dump(ofd->ofd_osd, page, count, LT_LAYOUT);
+}
+
 static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "uuid",                lprocfs_rd_uuid, 0, 0 },
        { "blocksize",           lprocfs_rd_blksize, 0, 0 },
 static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
        { "uuid",                lprocfs_rd_uuid, 0, 0 },
        { "blocksize",           lprocfs_rd_blksize, 0, 0 },
@@ -537,6 +577,9 @@ static struct lprocfs_vars lprocfs_ofd_obd_vars[] = {
                                  lprocfs_wr_job_interval, 0},
        { "soft_sync_limit",     lprocfs_ofd_rd_soft_sync_limit,
                                 lprocfs_ofd_wr_soft_sync_limit, 0},
                                  lprocfs_wr_job_interval, 0},
        { "soft_sync_limit",     lprocfs_ofd_rd_soft_sync_limit,
                                 lprocfs_ofd_wr_soft_sync_limit, 0},
+       { "lfsck_speed_limit",  lprocfs_rd_lfsck_speed_limit,
+                               lprocfs_wr_lfsck_speed_limit, 0 },
+       { "lfsck_layout",       lprocfs_rd_lfsck_layout, 0, 0 },
        { 0 }
 };
 
        { 0 }
 };
 
index afe056e..ea5d166 100644 (file)
@@ -381,6 +381,38 @@ static struct lu_object *ofd_object_alloc(const struct lu_env *env,
 
 extern int ost_handle(struct ptlrpc_request *req);
 
 
 extern int ost_handle(struct ptlrpc_request *req);
 
+static int ofd_lfsck_out_notify(const struct lu_env *env, void *data,
+                               enum lfsck_events event)
+{
+       struct ofd_device *ofd = data;
+       struct obd_device *obd = ofd_obd(ofd);
+
+       switch (event) {
+       case LE_LASTID_REBUILDING:
+               CWARN("%s: Found crashed LAST_ID, deny creating new OST-object "
+                     "on the device until the LAST_ID rebuilt successfully.\n",
+                     obd->obd_name);
+               down_write(&ofd->ofd_lastid_rwsem);
+               ofd->ofd_lastid_rebuilding = 1;
+               up_write(&ofd->ofd_lastid_rwsem);
+               break;
+       case LE_LASTID_REBUILT: {
+               down_write(&ofd->ofd_lastid_rwsem);
+               ofd_seqs_free(env, ofd);
+               ofd->ofd_lastid_rebuilding = 0;
+               ofd->ofd_lastid_gen++;
+               up_write(&ofd->ofd_lastid_rwsem);
+               break;
+       }
+       default:
+               CERROR("%s: unknown lfsck event: rc = %d\n",
+                      ofd_obd(ofd)->obd_name, event);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
 static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev,
                       struct lu_device *dev)
 {
 static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev,
                       struct lu_device *dev)
 {
@@ -402,7 +434,8 @@ static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev,
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
-       rc = lfsck_register(env, ofd->ofd_osd, &ofd->ofd_dt_dev, false);
+       rc = lfsck_register(env, ofd->ofd_osd, ofd->ofd_osd,
+                           ofd_lfsck_out_notify, ofd, false);
        if (rc != 0) {
                CERROR("%s: failed to initialize lfsck: rc = %d\n",
                       obd->obd_name, rc);
        if (rc != 0) {
                CERROR("%s: failed to initialize lfsck: rc = %d\n",
                       obd->obd_name, rc);
@@ -1181,7 +1214,8 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        struct ost_body         *repbody;
        const struct obdo       *oa = &tsi->tsi_ost_body->oa;
        struct obdo             *rep_oa;
        struct ost_body         *repbody;
        const struct obdo       *oa = &tsi->tsi_ost_body->oa;
        struct obdo             *rep_oa;
-       struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
+       struct obd_export       *exp = tsi->tsi_exp;
+       struct ofd_device       *ofd = ofd_exp(exp);
        obd_seq                  seq = ostid_seq(&oa->o_oi);
        obd_id                   oid = ostid_id(&oa->o_oi);
        struct ofd_seq          *oseq;
        obd_seq                  seq = ostid_seq(&oa->o_oi);
        obd_id                   oid = ostid_id(&oa->o_oi);
        struct ofd_seq          *oseq;
@@ -1197,6 +1231,13 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        if (repbody == NULL)
                RETURN(-ENOMEM);
 
        if (repbody == NULL)
                RETURN(-ENOMEM);
 
+       down_read(&ofd->ofd_lastid_rwsem);
+       /* Currently, for safe, we do not distinguish which LAST_ID is broken,
+        * we may do that in the future.
+        * Return -ENOSPC until the LAST_ID rebuilt. */
+       if (unlikely(ofd->ofd_lastid_rebuilding))
+               GOTO(out_sem, rc = -ENOSPC);
+
        rep_oa = &repbody->oa;
        rep_oa->o_oi = oa->o_oi;
 
        rep_oa = &repbody->oa;
        rep_oa->o_oi = oa->o_oi;
 
@@ -1209,7 +1250,7 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        if (IS_ERR(oseq)) {
                CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
                       ofd_name(ofd), seq, PTR_ERR(oseq));
        if (IS_ERR(oseq)) {
                CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
                       ofd_name(ofd), seq, PTR_ERR(oseq));
-               RETURN(-EINVAL);
+               GOTO(out_sem, rc = -EINVAL);
        }
 
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
        }
 
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
@@ -1228,9 +1269,11 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        /* former ofd_handle_precreate */
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
            (oa->o_flags & OBD_FL_DELORPHAN)) {
        /* former ofd_handle_precreate */
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
            (oa->o_flags & OBD_FL_DELORPHAN)) {
+               exp->exp_filter_data.fed_lastid_gen = ofd->ofd_lastid_gen;
+
                /* destroy orphans */
                if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
                /* destroy orphans */
                if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
-                   tsi->tsi_exp->exp_conn_cnt) {
+                   exp->exp_conn_cnt) {
                        CERROR("%s: dropping old orphan cleanup request\n",
                               ofd_name(ofd));
                        GOTO(out_nolock, rc = 0);
                        CERROR("%s: dropping old orphan cleanup request\n",
                               ofd_name(ofd));
                        GOTO(out_nolock, rc = 0);
@@ -1251,7 +1294,7 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
                        /* FIXME: should reset precreate_next_id on MDS */
                        rc = 0;
                } else if (diff < 0) {
                        /* FIXME: should reset precreate_next_id on MDS */
                        rc = 0;
                } else if (diff < 0) {
-                       rc = ofd_orphans_destroy(tsi->tsi_env, tsi->tsi_exp,
+                       rc = ofd_orphans_destroy(tsi->tsi_env, exp,
                                                 ofd, rep_oa);
                        oseq->os_destroys_in_progress = 0;
                } else {
                                                 ofd, rep_oa);
                        oseq->os_destroys_in_progress = 0;
                } else {
@@ -1259,9 +1302,15 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
                        oseq->os_destroys_in_progress = 0;
                }
        } else {
                        oseq->os_destroys_in_progress = 0;
                }
        } else {
+               if (unlikely(exp->exp_filter_data.fed_lastid_gen !=
+                            ofd->ofd_lastid_gen)) {
+                       ofd_obd_disconnect(exp);
+                       GOTO(out_nolock, rc = -ENOTCONN);
+               }
+
                mutex_lock(&oseq->os_create_lock);
                if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
                mutex_lock(&oseq->os_create_lock);
                if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
-                   tsi->tsi_exp->exp_conn_cnt) {
+                   exp->exp_conn_cnt) {
                        CERROR("%s: dropping old precreate request\n",
                               ofd_name(ofd));
                        GOTO(out, rc = 0);
                        CERROR("%s: dropping old precreate request\n",
                               ofd_name(ofd));
                        GOTO(out, rc = 0);
@@ -1366,7 +1415,7 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
                ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
        }
        EXIT;
                ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
        }
        EXIT;
-       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_CREATE,
+       ofd_counter_incr(exp, LPROC_OFD_STATS_CREATE,
                         tsi->tsi_jobid, 1);
 out:
        mutex_unlock(&oseq->os_create_lock);
                         tsi->tsi_jobid, 1);
 out:
        mutex_unlock(&oseq->os_create_lock);
@@ -1375,6 +1424,9 @@ out_nolock:
                rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
 
        ofd_seq_put(tsi->tsi_env, oseq);
                rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
 
        ofd_seq_put(tsi->tsi_env, oseq);
+
+out_sem:
+       up_read(&ofd->ofd_lastid_rwsem);
        return rc;
 }
 
        return rc;
 }
 
@@ -2070,6 +2122,7 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        spin_lock_init(&m->ofd_batch_lock);
        rwlock_init(&obd->u.filter.fo_sptlrpc_lock);
        sptlrpc_rule_set_init(&obd->u.filter.fo_sptlrpc_rset);
        spin_lock_init(&m->ofd_batch_lock);
        rwlock_init(&obd->u.filter.fo_sptlrpc_lock);
        sptlrpc_rule_set_init(&obd->u.filter.fo_sptlrpc_rset);
+       init_rwsem(&m->ofd_lastid_rwsem);
 
        obd->u.filter.fo_fl_oss_capa = 0;
        CFS_INIT_LIST_HEAD(&obd->u.filter.fo_capa_keys);
 
        obd->u.filter.fo_fl_oss_capa = 0;
        CFS_INIT_LIST_HEAD(&obd->u.filter.fo_capa_keys);
index bba1618..e5fd706 100644 (file)
@@ -220,22 +220,11 @@ static int ofd_fld_fini(const struct lu_env *env,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
-void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
+void ofd_seqs_free(const struct lu_env *env, struct ofd_device *ofd)
 {
        struct ofd_seq  *oseq;
        struct ofd_seq  *tmp;
        cfs_list_t       dispose;
 {
        struct ofd_seq  *oseq;
        struct ofd_seq  *tmp;
        cfs_list_t       dispose;
-       int             rc;
-
-       ofd_deregister_seq_exp(ofd);
-
-       rc = ofd_fid_fini(env, ofd);
-       if (rc != 0)
-               CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
-
-       rc = ofd_fld_fini(env, ofd);
-       if (rc != 0)
-               CERROR("%s: fld fini error: rc = %d\n", ofd_name(ofd), rc);
 
        CFS_INIT_LIST_HEAD(&dispose);
        write_lock(&ofd->ofd_seq_list_lock);
 
        CFS_INIT_LIST_HEAD(&dispose);
        write_lock(&ofd->ofd_seq_list_lock);
@@ -248,9 +237,25 @@ void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
                oseq = container_of0(dispose.next, struct ofd_seq, os_list);
                ofd_seq_delete(env, oseq);
        }
                oseq = container_of0(dispose.next, struct ofd_seq, os_list);
                ofd_seq_delete(env, oseq);
        }
+}
+
+void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
+{
+       int rc;
+
+       ofd_deregister_seq_exp(ofd);
+
+       rc = ofd_fid_fini(env, ofd);
+       if (rc != 0)
+               CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
+
+       rc = ofd_fld_fini(env, ofd);
+       if (rc != 0)
+               CERROR("%s: fld fini error: rc = %d\n", ofd_name(ofd), rc);
+
+       ofd_seqs_free(env, ofd);
 
        LASSERT(cfs_list_empty(&ofd->ofd_seq_list));
 
        LASSERT(cfs_list_empty(&ofd->ofd_seq_list));
-       return;
 }
 
 /**
 }
 
 /**
index e19d357..b22ceec 100644 (file)
@@ -181,10 +181,15 @@ struct ofd_device {
                                 ofd_syncjournal:1,
                                 /* shall we grant space to clients not
                                  * supporting OBD_CONNECT_GRANT_PARAM? */
                                 ofd_syncjournal:1,
                                 /* shall we grant space to clients not
                                  * supporting OBD_CONNECT_GRANT_PARAM? */
-                                ofd_grant_compat_disable:1;
+                                ofd_grant_compat_disable:1,
+                                /* Protected by ofd_lastid_rwsem. */
+                                ofd_lastid_rebuilding:1;
        struct seq_server_site   ofd_seq_site;
        /* the limit of SOFT_SYNC RPCs that will trigger a soft sync */
        unsigned int             ofd_soft_sync_limit;
        struct seq_server_site   ofd_seq_site;
        /* the limit of SOFT_SYNC RPCs that will trigger a soft sync */
        unsigned int             ofd_soft_sync_limit;
+       /* Protect ::ofd_lastid_rebuilding */
+       struct rw_semaphore      ofd_lastid_rwsem;
+       __u64                    ofd_lastid_gen;
 };
 
 static inline struct ofd_device *ofd_dev(struct lu_device *d)
 };
 
 static inline struct ofd_device *ofd_dev(struct lu_device *d)
@@ -340,6 +345,7 @@ int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd,
                       const struct lu_fid *fid, int orphan);
 int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
               struct obd_statfs *osfs, __u64 max_age, __u32 flags);
                       const struct lu_fid *fid, int orphan);
 int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
               struct obd_statfs *osfs, __u64 max_age, __u32 flags);
+int ofd_obd_disconnect(struct obd_export *exp);
 
 /* ofd_fs.c */
 obd_id ofd_seq_last_oid(struct ofd_seq *oseq);
 
 /* ofd_fs.c */
 obd_id ofd_seq_last_oid(struct ofd_seq *oseq);
@@ -357,6 +363,7 @@ int ofd_precreate_batch(struct ofd_device *ofd, int batch);
 struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
                             obd_seq seq);
 void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd);
 struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
                             obd_seq seq);
 void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd);
+void ofd_seqs_free(const struct lu_env *env, struct ofd_device *ofd);
 
 /* ofd_io.c */
 int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp,
 
 /* ofd_io.c */
 int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp,
index 9137427..ec55834 100644 (file)
@@ -309,7 +309,7 @@ out:
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int ofd_obd_disconnect(struct obd_export *exp)
+int ofd_obd_disconnect(struct obd_export *exp)
 {
        struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
        struct lu_env            env;
 {
        struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
        struct lu_env            env;
@@ -500,7 +500,15 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp,
 int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
                         struct obd_statfs *osfs, __u64 max_age, int *from_cache)
 {
 int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
                         struct obd_statfs *osfs, __u64 max_age, int *from_cache)
 {
-       int rc;
+       int rc = 0;
+       ENTRY;
+
+       down_read(&ofd->ofd_lastid_rwsem);
+       /* Currently, for safe, we do not distinguish which LAST_ID is broken,
+        * we may do that in the future.
+        * Return -ENOSPC until the LAST_ID rebuilt. */
+       if (unlikely(ofd->ofd_lastid_rebuilding))
+               GOTO(out, rc = -ENOSPC);
 
        spin_lock(&ofd->ofd_osfs_lock);
        if (cfs_time_before_64(ofd->ofd_osfs_age, max_age) || max_age == 0) {
 
        spin_lock(&ofd->ofd_osfs_lock);
        if (cfs_time_before_64(ofd->ofd_osfs_age, max_age) || max_age == 0) {
@@ -527,7 +535,7 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
                 * call it fairly often as space fills up */
                rc = dt_statfs(env, ofd->ofd_osd, osfs);
                if (unlikely(rc))
                 * call it fairly often as space fills up */
                rc = dt_statfs(env, ofd->ofd_osd, osfs);
                if (unlikely(rc))
-                       return rc;
+                       GOTO(out, rc);
 
                spin_lock(&ofd->ofd_grant_lock);
                spin_lock(&ofd->ofd_osfs_lock);
 
                spin_lock(&ofd->ofd_grant_lock);
                spin_lock(&ofd->ofd_osfs_lock);
@@ -574,7 +582,13 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
                if (from_cache)
                        *from_cache = 1;
        }
                if (from_cache)
                        *from_cache = 1;
        }
-       return 0;
+
+       GOTO(out, rc);
+
+out:
+       up_read(&ofd->ofd_lastid_rwsem);
+
+       return rc;
 }
 
 int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
 }
 
 int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
@@ -822,11 +836,18 @@ int ofd_echo_create(const struct lu_env *env, struct obd_export *exp,
 
        CDEBUG(D_INFO, "ofd_create("DOSTID")\n", POSTID(&oa->o_oi));
 
 
        CDEBUG(D_INFO, "ofd_create("DOSTID")\n", POSTID(&oa->o_oi));
 
+       down_read(&ofd->ofd_lastid_rwsem);
+       /* Currently, for safe, we do not distinguish which LAST_ID is broken,
+        * we may do that in the future.
+        * Return -ENOSPC until the LAST_ID rebuilt. */
+       if (unlikely(ofd->ofd_lastid_rebuilding))
+               GOTO(out_sem, rc = -ENOSPC);
+
        oseq = ofd_seq_load(env, ofd, seq);
        if (IS_ERR(oseq)) {
                CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
                       ofd_name(ofd), seq, PTR_ERR(oseq));
        oseq = ofd_seq_load(env, ofd, seq);
        if (IS_ERR(oseq)) {
                CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
                       ofd_name(ofd), seq, PTR_ERR(oseq));
-               RETURN(-EINVAL);
+               GOTO(out_sem, rc = -EINVAL);
        }
 
        mutex_lock(&oseq->os_create_lock);
        }
 
        mutex_lock(&oseq->os_create_lock);
@@ -860,6 +881,9 @@ out:
                lsm->lsm_oi = oa->o_oi;
        }
        ofd_seq_put(env, oseq);
                lsm->lsm_oi = oa->o_oi;
        }
        ofd_seq_put(env, oseq);
+
+out_sem:
+       up_read(&ofd->ofd_lastid_rwsem);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
index ee9ebe2..cd8c26a 100644 (file)
@@ -266,6 +266,31 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
        CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
               ofd_name(ofd), PFID(fid), nr);
 
        CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
               ofd_name(ofd), PFID(fid), nr);
 
+       LASSERT(nr > 0);
+
+        /* When the LFSCK scanning the whole device to verify the LAST_ID file
+         * consistency, it will load the last_id into RAM firstly, and compare
+         * the last_id with echo OST-object's ID. If the later one is larger,
+         * then it will regard the LAST_ID file crashed. But during the LFSCK
+         * scanning, the OFD may continue to create new OST-objects. Those new
+         * created OST-objects will have larger IDs than the LFSCK known ones.
+         * So from the LFSCK view, it needs to re-load the last_id from disk
+         * file, and if the latest last_id is still smaller than the object's
+         * ID, then the LAST_ID file is real crashed.
+         *
+         * To make above mechanism to work, before OFD pre-create OST-objects,
+         * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
+         * may cannot get latest last_id although new OST-object created. */
+       if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
+               tmp = cpu_to_le64(id + nr - 1);
+               dt_write_lock(env, oseq->os_lastid_obj, 0);
+               rc = dt_record_write(env, oseq->os_lastid_obj,
+                                    &info->fti_buf, &info->fti_off, th);
+               dt_write_unlock(env, oseq->os_lastid_obj);
+               if (rc != 0)
+                       GOTO(trans_stop, rc);
+       }
+
        for (i = 0; i < nr; i++) {
                fo = batch[i];
                LASSERT(fo);
        for (i = 0; i < nr; i++) {
                fo = batch[i];
                LASSERT(fo);
@@ -284,11 +309,24 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
        }
 
        objects = i;
        }
 
        objects = i;
-       if (objects > 0) {
+       /* NOT all the wanted objects have been created,
+        * set the LAST_ID as the real created. */
+       if (unlikely(objects < nr)) {
+               int rc1;
+
+               info->fti_off = 0;
                tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
                tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
-               rc = dt_record_write(env, oseq->os_lastid_obj,
-                                    &info->fti_buf, &info->fti_off, th);
+               dt_write_lock(env, oseq->os_lastid_obj, 0);
+               rc1 = dt_record_write(env, oseq->os_lastid_obj,
+                                     &info->fti_buf, &info->fti_off, th);
+               dt_write_unlock(env, oseq->os_lastid_obj);
+               if (rc1 != 0)
+                       CERROR("%s: fail to reset the LAST_ID for seq ("LPX64
+                              ") from "LPU64" to "LPU64"\n", ofd_name(ofd),
+                              ostid_seq(&oseq->os_oi), id + nr - 1,
+                              ofd_seq_last_oid(oseq));
        }
        }
+
 trans_stop:
        ofd_trans_stop(env, ofd, th, rc);
 out:
 trans_stop:
        ofd_trans_stop(env, ofd, th, rc);
 out:
index 2288b64..6f369d0 100644 (file)
@@ -353,7 +353,7 @@ check_oi:
                LASSERTF(rc == -ESTALE || rc == -ENOENT, "rc = %d\n", rc);
 
                rc = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD);
                LASSERTF(rc == -ESTALE || rc == -ENOENT, "rc = %d\n", rc);
 
                rc = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD);
-               /* XXX: There are three possible cases:
+               /* XXX: There are some possible cases:
                 *      1. rc = 0.
                 *         Backup/restore caused the OI invalid.
                 *      2. rc = 0.
                 *      1. rc = 0.
                 *         Backup/restore caused the OI invalid.
                 *      2. rc = 0.
index 303d769..cce33af 100644 (file)
@@ -815,6 +815,26 @@ static int osd_scrub_get_fid(struct osd_thread_info *info,
                }
 
                *fid = lma->lma_self_fid;
                }
 
                *fid = lma->lma_self_fid;
+               if (unlikely(fid_is_last_id(fid))) {
+                       if (scrub) {
+                               if (lma->lma_compat & LMAC_FID_ON_OST)
+                                       rc = SCRUB_NEXT_OSTOBJ;
+                               else
+                                       rc = osd_scrub_check_local_fldb(info,
+                                                               dev, fid);
+                       }
+
+                       /* XXX: For up layer iteration, LAST_ID is a visible
+                        *      object to be checked and repaired, so return
+                        *      it directly.
+                        *
+                        *      In fact, the OSD layer otable-based iteration
+                        *      should not care about the FID type, it is the
+                        *      up layer user's duty (LFSCK) to handle that.
+                        *      It will be fixed in other patch in future. */
+                       return rc;
+               }
+
                if (fid_is_internal(&lma->lma_self_fid)) {
                        if (!scrub)
                                rc = SCRUB_NEXT_CONTINUE;
                if (fid_is_internal(&lma->lma_self_fid)) {
                        if (!scrub)
                                rc = SCRUB_NEXT_CONTINUE;
@@ -830,7 +850,7 @@ static int osd_scrub_get_fid(struct osd_thread_info *info,
                if (lma->lma_compat & LMAC_FID_ON_OST)
                        return SCRUB_NEXT_OSTOBJ;
 
                if (lma->lma_compat & LMAC_FID_ON_OST)
                        return SCRUB_NEXT_OSTOBJ;
 
-               if (fid_is_idif(fid) || fid_is_last_id(fid))
+               if (fid_is_idif(fid))
                        return SCRUB_NEXT_OSTOBJ_OLD;
 
                if (lma->lma_incompat & LMAI_AGENT)
                        return SCRUB_NEXT_OSTOBJ_OLD;
 
                if (lma->lma_incompat & LMAI_AGENT)
index 4d203d6..36fd071 100644 (file)
@@ -17,8 +17,12 @@ init_test_env $@
 . ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
 init_logging
 
 . ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
 init_logging
 
+# remove the check when ZFS backend iteration is ready
 [ $(facet_fstype $SINGLEMDS) != "ldiskfs" ] &&
        skip "test LFSCK only for ldiskfs" && exit 0
 [ $(facet_fstype $SINGLEMDS) != "ldiskfs" ] &&
        skip "test LFSCK only for ldiskfs" && exit 0
+[ $(facet_fstype ost1) != ldiskfs ] &&
+       skip "test LFSCK only for ldiskfs" && exit 0
+
 require_dsh_mds || exit 0
 
 MCREATE=${MCREATE:-mcreate}
 require_dsh_mds || exit 0
 
 MCREATE=${MCREATE:-mcreate}
@@ -38,17 +42,24 @@ check_and_setup_lustre
 [[ $(lustre_version_code $SINGLEMDS) -le $(version_code 2.4.90) ]] &&
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2c"
 
 [[ $(lustre_version_code $SINGLEMDS) -le $(version_code 2.4.90) ]] &&
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2c"
 
+[[ $(lustre_version_code ost1) -lt $(version_code 2.5.50) ]] &&
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11"
+
 build_test_filter
 
 $LCTL set_param debug=+lfsck > /dev/null || true
 
 MDT_DEV="${FSNAME}-MDT0000"
 build_test_filter
 
 $LCTL set_param debug=+lfsck > /dev/null || true
 
 MDT_DEV="${FSNAME}-MDT0000"
+OST_DEV="${FSNAME}-OST0000"
 MDT_DEVNAME=$(mdsdevname ${SINGLEMDS//mds/})
 START_NAMESPACE="do_facet $SINGLEMDS \
                $LCTL lfsck_start -M ${MDT_DEV} -t namespace"
 MDT_DEVNAME=$(mdsdevname ${SINGLEMDS//mds/})
 START_NAMESPACE="do_facet $SINGLEMDS \
                $LCTL lfsck_start -M ${MDT_DEV} -t namespace"
+START_LAYOUT_ON_OST="do_facet ost1 $LCTL lfsck_start -M ${OST_DEV} -t layout"
 STOP_LFSCK="do_facet $SINGLEMDS $LCTL lfsck_stop -M ${MDT_DEV}"
 SHOW_NAMESPACE="do_facet $SINGLEMDS \
                $LCTL get_param -n mdd.${MDT_DEV}.lfsck_namespace"
 STOP_LFSCK="do_facet $SINGLEMDS $LCTL lfsck_stop -M ${MDT_DEV}"
 SHOW_NAMESPACE="do_facet $SINGLEMDS \
                $LCTL get_param -n mdd.${MDT_DEV}.lfsck_namespace"
+SHOW_LAYOUT_ON_OST="do_facet ost1 \
+               $LCTL get_param -n obdfilter.${OST_DEV}.lfsck_layout"
 MOUNT_OPTS_SCRUB="-o user_xattr"
 MOUNT_OPTS_NOSCRUB="-o user_xattr,noscrub"
 
 MOUNT_OPTS_SCRUB="-o user_xattr"
 MOUNT_OPTS_NOSCRUB="-o user_xattr,noscrub"
 
@@ -998,6 +1009,152 @@ test_10()
 }
 run_test 10 "System is available during LFSCK scanning"
 
 }
 run_test 10 "System is available during LFSCK scanning"
 
+# remove LAST_ID
+ost_remove_lastid() {
+       local ost=$1
+       local idx=$2
+       local rcmd="do_facet ost${ost}"
+
+       echo "remove LAST_ID on ost${ost}: idx=${idx}"
+
+       # step 1: local mount
+       mount_fstype ost${ost} || return 1
+       # step 2: remove the specified LAST_ID
+       ${rcmd} rm -fv $(facet_mntpt ost${ost})/O/${idx}/LAST_ID
+       # step 3: umount
+       unmount_fstype ost${ost} || return 2
+}
+
+test_11a() {
+       echo "stopall"
+       stopall > /dev/null
+       echo "formatall"
+       formatall > /dev/null
+       echo "setupall"
+       setupall > /dev/null
+
+       mkdir -p $DIR/$tdir
+       $SETSTRIPE -c 1 -i 0 $DIR/$tdir
+       createmany -o $DIR/$tdir/f 64
+
+       echo "stopall"
+       stopall > /dev/null
+
+       ost_remove_lastid 1 0 || error "(1) Fail to remove LAST_ID"
+
+       echo "start ost1"
+       start ost1 $(ostdevname 1) $MOUNT_OPTS_NOSCRUB > /dev/null ||
+               error "(2) Fail to start ost1"
+
+       local STATUS=$($SHOW_LAYOUT_ON_OST | awk '/^status/ { print $2 }')
+       [ "$STATUS" == "init" ] ||
+               error "(3) Expect 'init', but got '$STATUS'"
+
+       #define OBD_FAIL_LFSCK_DELAY4           0x160e
+       do_facet ost1 $LCTL set_param fail_val=3
+       do_facet ost1 $LCTL set_param fail_loc=0x160e
+
+       echo "trigger LFSCK for layout on ost1 to rebuild the LAST_ID(s)"
+       $START_LAYOUT_ON_OST || error "(4) Fail to start LFSCK on OST!"
+
+       wait_update_facet ost1 "$LCTL get_param -n \
+               obdfilter.${OST_DEV}.lfsck_layout |
+               awk '/^flags/ { print \\\$2 }'" "crashed_lastid" 60 || {
+               $SHOW_LAYOUT_ON_OST
+               return 5
+       }
+
+       do_facet ost1 $LCTL set_param fail_val=0
+       do_facet ost1 $LCTL set_param fail_loc=0
+
+       wait_update_facet ost1 "$LCTL get_param -n \
+               obdfilter.${OST_DEV}.lfsck_layout |
+               awk '/^status/ { print \\\$2 }'" "completed" 3 || {
+               $SHOW_LAYOUT_ON_OST
+               return 6
+       }
+
+       echo "the LAST_ID(s) should have been rebuilt"
+       FLAGS=$($SHOW_LAYOUT_ON_OST | awk '/^flags/ { print $2 }')
+       [ -z "$FLAGS" ] || error "(7) Expect empty flags, but got '$FLAGS'"
+}
+run_test 11a "LFSCK can rebuild lost last_id"
+
+test_11b() {
+       echo "stopall"
+       stopall > /dev/null
+       echo "formatall"
+       formatall > /dev/null
+       echo "setupall"
+       setupall > /dev/null
+
+       mkdir -p $DIR/$tdir
+       $SETSTRIPE -c 1 -i 0 $DIR/$tdir
+
+       echo "set fail_loc=0x160d to skip the updating LAST_ID on-disk"
+       #define OBD_FAIL_LFSCK_SKIP_LASTID      0x160d
+       do_facet ost1 $LCTL set_param fail_loc=0x160d
+       createmany -o $DIR/$tdir/f 64
+       local lastid1=$(do_facet ost1 "lctl get_param -n \
+               obdfilter.${ost1_svc}.last_id" | grep 0x100000000 |
+               awk -F: '{ print $2 }')
+
+       umount_client $MOUNT
+       echo "stop ost1"
+       stop ost1 || error "(1) Fail to stop ost1"
+
+       #define OBD_FAIL_OST_ENOSPC              0x215
+       do_facet ost1 $LCTL set_param fail_loc=0x215
+
+       echo "start ost1"
+       start ost1 $(ostdevname 1) $OST_MOUNT_OPTS ||
+               error "(2) Fail to start ost1"
+
+       local STATUS=$($SHOW_LAYOUT_ON_OST | awk '/^status/ { print $2 }')
+       [ "$STATUS" == "init" ] ||
+               error "(3) Expect 'init', but got '$STATUS'"
+
+       for ((i = 0; i < 60; i++)); do
+               lastid2=$(do_facet ost1 "lctl get_param -n \
+                       obdfilter.${ost1_svc}.last_id" | grep 0x100000000 |
+                       awk -F: '{ print $2 }')
+               [ ! -z $lastid2 ] && break;
+               sleep 1
+       done
+
+       echo "the on-disk LAST_ID should be smaller than the expected one"
+       [ $lastid1 -gt $lastid2 ] ||
+               error "(4) expect lastid1 [ $lastid1 ] > lastid2 [ $lastid2 ]"
+
+       echo "trigger LFSCK for layout on ost1 to rebuild the on-disk LAST_ID"
+       $START_LAYOUT_ON_OST || error "(5) Fail to start LFSCK on OST!"
+
+       wait_update_facet ost1 "$LCTL get_param -n \
+               obdfilter.${OST_DEV}.lfsck_layout |
+               awk '/^status/ { print \\\$2 }'" "completed" 3 || {
+               $SHOW_LAYOUT_ON_OST
+               return 6
+       }
+
+       echo "stop ost1"
+       stop ost1 || error "(7) Fail to stop ost1"
+
+       echo "start ost1"
+       start ost1 $(ostdevname 1) $OST_MOUNT_OPTS ||
+               error "(8) Fail to start ost1"
+
+       echo "the on-disk LAST_ID should have been rebuilt"
+       wait_update_facet ost1 "$LCTL get_param -n \
+               obdfilter.${ost1_svc}.last_id | grep 0x100000000 |
+               awk -F: '{ print \\\$2 }'" "$lastid1" 60 || {
+               $LCTL get_param -n obdfilter.${ost1_svc}.last_id
+               error "(9) expect lastid1 0x100000000:$lastid1"
+       }
+
+       do_facet ost1 $LCTL set_param fail_loc=0
+}
+run_test 11b "LFSCK can rebuild crashed last_id"
+
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size