Whamcloud - gitweb
LU-3336 lfsck: use rbtree to record OST-object accessing 43/7743/21
authorFan Yong <fan.yong@intel.com>
Tue, 11 Feb 2014 04:53:47 +0000 (12:53 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 25 Feb 2014 00:16:43 +0000 (00:16 +0000)
To find out orphan OST-objects, the LFSCK on OST side maintains
two bitmaps in RAM for the OST-object accessed during the LFSCK.
After the first cycle system scanning, the LFSCK got the bitmap
for the known OST-objects, and got another bitmap for which OST
objects have been referenced by MDT-objects. Then the LFSCK can
know which OST-objects are not referenced by any MDT-object via
comparing the two bitmaps.

Above two bitmaps are organized via a single rbtree. The rbtree
is maintained by LFSCK on the OST side. For every LFSCK scanned
OST-object, it will be recorded in the known-bitmap, for every
OST-object accessed by any RPC during the scanning, it will be
recoreded in the accessed-bitmap.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: If399584a8617e7c368e48922a3582294ac98d5f4
Reviewed-on: http://review.whamcloud.com/7743
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/dt_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_lfsck.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_objects.c
lustre/target/out_handler.c
lustre/target/tgt_internal.h

index 2b21697..cdd321a 100644 (file)
@@ -671,6 +671,7 @@ struct dt_device {
          * single-threaded start-up shut-down procedures.
          */
         cfs_list_t                         dd_txn_callbacks;
+       unsigned int                       dd_record_fid_accessed:1;
 };
 
 int  dt_device_init(struct dt_device *dev, struct lu_device_type *t);
index 0f0bf41..bc288d3 100644 (file)
@@ -463,6 +463,7 @@ enum fid_seq {
        FID_SEQ_QUOTA           = 0x200000005ULL,
        FID_SEQ_QUOTA_GLB       = 0x200000006ULL,
        FID_SEQ_ROOT            = 0x200000007ULL,  /* Located on MDT0 */
+       FID_SEQ_LAYOUT_RBTREE   = 0x200000008ULL,
        FID_SEQ_NORMAL          = 0x200000400ULL,
        FID_SEQ_LOV_DEFAULT     = 0xffffffffffffffffULL
 };
index 2e93420..e992346 100644 (file)
@@ -125,6 +125,7 @@ enum lfsck_events {
        LE_START                = 5,
        LE_STOP                 = 6,
        LE_QUERY                = 7,
+       LE_FID_ACCESSED         = 8,
 };
 
 enum lfsck_event_flags {
@@ -163,4 +164,13 @@ int lfsck_set_windows(struct dt_device *key, int val);
 
 int lfsck_dump(struct dt_device *key, void *buf, int len, enum lfsck_type type);
 
+static inline void lfsck_pack_rfa(struct lfsck_request *lr,
+                                 const struct lu_fid *fid)
+{
+       memset(lr, 0, sizeof(*lr));
+       lr->lr_event = LE_FID_ACCESSED;
+       lr->lr_active = LT_LAYOUT;
+       lr->lr_fid = *fid;
+}
+
 #endif /* _LUSTRE_LFSCK_H */
index 07b6d6b..9dd4d7f 100644 (file)
@@ -34,6 +34,7 @@
 #define DEBUG_SUBSYSTEM S_LFSCK
 
 #include <linux/bitops.h>
+#include <linux/rbtree.h>
 
 #include <lustre/lustre_idl.h>
 #include <lu_object.h>
@@ -77,6 +78,10 @@ struct lfsck_layout_slave_data {
        struct list_head         llsd_master_list;
        spinlock_t               llsd_lock;
        __u64                    llsd_touch_gen;
+       struct dt_object        *llsd_rb_obj;
+       struct rb_root           llsd_rb_root;
+       rwlock_t                 llsd_rb_lock;
+       unsigned int             llsd_rbtree_valid:1;
 };
 
 struct lfsck_layout_object {
@@ -354,6 +359,265 @@ static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
        return 0;
 }
 
+#define LFSCK_RBTREE_BITMAP_SIZE       PAGE_CACHE_SIZE
+#define LFSCK_RBTREE_BITMAP_WIDTH      (LFSCK_RBTREE_BITMAP_SIZE << 3)
+#define LFSCK_RBTREE_BITMAP_MASK       (LFSCK_RBTREE_BITMAP_SIZE - 1)
+
+struct lfsck_rbtree_node {
+       struct rb_node   lrn_node;
+       __u64            lrn_seq;
+       __u32            lrn_first_oid;
+       atomic_t         lrn_known_count;
+       atomic_t         lrn_accessed_count;
+       void            *lrn_known_bitmap;
+       void            *lrn_accessed_bitmap;
+};
+
+static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
+                                  __u64 seq, __u32 oid)
+{
+       if (seq < lrn->lrn_seq)
+               return -1;
+
+       if (seq > lrn->lrn_seq)
+               return 1;
+
+       if (oid < lrn->lrn_first_oid)
+               return -1;
+
+       if (oid >= lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH)
+               return 1;
+
+       return 0;
+}
+
+/* The caller should hold lock. */
+static struct lfsck_rbtree_node *
+lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
+                   const struct lu_fid *fid)
+{
+       struct rb_node           *node = llsd->llsd_rb_root.rb_node;
+       struct lfsck_rbtree_node *lrn;
+       int                       rc;
+
+       while (node != NULL) {
+               lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
+               rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
+               if (rc < 0)
+                       node = node->rb_left;
+               else if (rc > 0)
+                       node = node->rb_right;
+               else
+                       return lrn;
+       }
+
+       return NULL;
+}
+
+static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
+                                                 const struct lu_fid *fid)
+{
+       struct lfsck_rbtree_node *lrn;
+
+       OBD_ALLOC_PTR(lrn);
+       if (lrn == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
+       if (lrn->lrn_known_bitmap == NULL) {
+               OBD_FREE_PTR(lrn);
+
+               return ERR_PTR(-ENOMEM);
+       }
+
+       OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
+       if (lrn->lrn_accessed_bitmap == NULL) {
+               OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
+               OBD_FREE_PTR(lrn);
+
+               return ERR_PTR(-ENOMEM);
+       }
+
+       rb_init_node(&lrn->lrn_node);
+       lrn->lrn_seq = fid_seq(fid);
+       lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
+       atomic_set(&lrn->lrn_known_count, 0);
+       atomic_set(&lrn->lrn_accessed_count, 0);
+
+       return lrn;
+}
+
+static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
+{
+       OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
+       OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
+       OBD_FREE_PTR(lrn);
+}
+
+/* The caller should hold lock. */
+static struct lfsck_rbtree_node *
+lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
+                   struct lfsck_rbtree_node *lrn)
+{
+       struct rb_node           **pos    = &(llsd->llsd_rb_root.rb_node);
+       struct rb_node            *parent = NULL;
+       struct lfsck_rbtree_node  *tmp;
+       int                        rc;
+
+       while (*pos) {
+               parent = *pos;
+               tmp = rb_entry(*pos, struct lfsck_rbtree_node, lrn_node);
+               rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
+               if (rc < 0)
+                       pos = &((*pos)->rb_left);
+               else if (rc > 0)
+                       pos = &((*pos)->rb_right);
+               else
+                       return tmp;
+       }
+
+       rb_link_node(&lrn->lrn_node, parent, pos);
+       rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
+
+       return lrn;
+}
+
+static int lfsck_rbtree_setup(const struct lu_env *env,
+                             struct lfsck_component *com)
+{
+       struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_bottom;
+       struct lfsck_layout_slave_data  *llsd   = com->lc_data;
+       struct dt_object                *obj;
+
+       fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
+       fid->f_oid = lfsck_dev_idx(dev);
+       fid->f_ver = 0;
+       obj = dt_locate(env, dev, fid);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+
+       /* XXX: Generate an in-RAM object to stand for the layout rbtree.
+        *      Scanning the layout rbtree will be via the iteration over
+        *      the object. In the future, the rbtree may be written onto
+        *      disk with the object.
+        *
+        *      Mark the object to be as exist. */
+       obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
+       llsd->llsd_rb_obj = obj;
+       llsd->llsd_rbtree_valid = 1;
+       dev->dd_record_fid_accessed = 1;
+
+       return 0;
+}
+
+static void lfsck_rbtree_cleanup(const struct lu_env *env,
+                                struct lfsck_component *com)
+{
+       struct lfsck_instance           *lfsck = com->lc_lfsck;
+       struct lfsck_layout_slave_data  *llsd  = com->lc_data;
+       struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
+       struct rb_node                  *next;
+       struct lfsck_rbtree_node        *lrn;
+
+       lfsck->li_bottom->dd_record_fid_accessed = 0;
+       /* Invalid the rbtree, then no others will use it. */
+       write_lock(&llsd->llsd_rb_lock);
+       llsd->llsd_rbtree_valid = 0;
+       write_unlock(&llsd->llsd_rb_lock);
+
+       while (node != NULL) {
+               next = rb_next(node);
+               lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
+               rb_erase(node, &llsd->llsd_rb_root);
+               lfsck_rbtree_free(lrn);
+               node = next;
+       }
+
+       if (llsd->llsd_rb_obj != NULL) {
+               lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
+               llsd->llsd_rb_obj = NULL;
+       }
+}
+
+static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
+                                      struct lfsck_component *com,
+                                      const struct lu_fid *fid,
+                                      bool accessed)
+{
+       struct lfsck_layout_slave_data  *llsd   = com->lc_data;
+       struct lfsck_rbtree_node        *lrn;
+       bool                             insert = false;
+       int                              idx;
+       int                              rc     = 0;
+       ENTRY;
+
+       CDEBUG(D_LFSCK, "%s: update bitmap for "DFID"\n",
+              lfsck_lfsck2name(com->lc_lfsck), PFID(fid));
+
+       if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
+               RETURN_EXIT;
+
+       if (!fid_is_idif(fid) && !fid_is_norm(fid))
+               RETURN_EXIT;
+
+       read_lock(&llsd->llsd_rb_lock);
+       if (!llsd->llsd_rbtree_valid)
+               GOTO(unlock, rc = 0);
+
+       lrn = lfsck_rbtree_search(llsd, fid);
+       if (lrn == NULL) {
+               struct lfsck_rbtree_node *tmp;
+
+               LASSERT(!insert);
+
+               read_unlock(&llsd->llsd_rb_lock);
+               tmp = lfsck_rbtree_new(env, fid);
+               if (IS_ERR(tmp))
+                       GOTO(out, rc = PTR_ERR(tmp));
+
+               insert = true;
+               write_lock(&llsd->llsd_rb_lock);
+               if (!llsd->llsd_rbtree_valid) {
+                       lfsck_rbtree_free(tmp);
+                       GOTO(unlock, rc = 0);
+               }
+
+               lrn = lfsck_rbtree_insert(llsd, tmp);
+               if (lrn != tmp)
+                       lfsck_rbtree_free(tmp);
+       }
+
+       idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
+       /* Any accessed object must be a known object. */
+       if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
+               atomic_inc(&lrn->lrn_known_count);
+       if (accessed) {
+               if (!test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
+                       atomic_inc(&lrn->lrn_accessed_count);
+       }
+
+       GOTO(unlock, rc = 0);
+
+unlock:
+       if (insert)
+               write_unlock(&llsd->llsd_rb_lock);
+       else
+               read_unlock(&llsd->llsd_rb_lock);
+out:
+       if (rc != 0 && accessed) {
+               struct lfsck_layout *lo = com->lc_file_ram;
+
+               CERROR("%s: Fail to update object accessed bitmap, will cause "
+                      "incorrect LFSCK OST-object handling, so disable it to "
+                      "cancel orphan handling for related device. rc = %d.\n",
+                      lfsck_lfsck2name(com->lc_lfsck), rc);
+               lo->ll_flags |= LF_INCOMPLETE;
+               lfsck_rbtree_cleanup(env, com);
+       }
+}
+
 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
                                   const struct lfsck_layout *src)
 {
@@ -2698,15 +2962,19 @@ static int lfsck_layout_slave_prep(const struct lu_env *env,
        struct lfsck_layout_slave_data  *llsd   = com->lc_data;
        int                              rc;
 
-       /* XXX: For a new scanning, generate OST-objects
-        *      bitmap for orphan detection. */
-
        rc = lfsck_layout_prep(env, com);
        if (rc != 0 || lo->ll_status != LS_SCANNING_PHASE1 ||
            !lsp->lsp_index_valid)
                return rc;
 
        rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
+       if (rc == 0 && !(lo->ll_flags & LF_INCOMPLETE)) {
+               LASSERT(!llsd->llsd_rbtree_valid);
+
+               write_lock(&llsd->llsd_rb_lock);
+               rc = lfsck_rbtree_setup(env, com);
+               write_unlock(&llsd->llsd_rb_lock);
+       }
 
        return rc;
 }
@@ -3050,10 +3318,10 @@ static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
        int                              rc;
        ENTRY;
 
-       /* XXX: Update OST-objects bitmap for orphan detection. */
-
        LASSERT(llsd != NULL);
 
+       lfsck_rbtree_update_bitmap(env, com, fid, false);
+
        down_write(&com->lc_sem);
        if (fid_is_idif(fid))
                seq = 0;
@@ -3264,6 +3532,9 @@ static int lfsck_layout_slave_post(const struct lu_env *env,
 
        lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
 
+       if (result <= 0)
+               lfsck_rbtree_cleanup(env, com);
+
        return rc;
 }
 
@@ -3482,8 +3753,10 @@ static int lfsck_layout_slave_double_scan(const struct lu_env *env,
        int                              rc;
        ENTRY;
 
-       if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
+       if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
+               lfsck_rbtree_cleanup(env, com);
                RETURN(0);
+       }
 
        atomic_inc(&lfsck->li_double_scan_count);
 
@@ -3526,6 +3799,7 @@ static int lfsck_layout_slave_double_scan(const struct lu_env *env,
 done:
        rc = lfsck_layout_double_scan_result(env, com, rc);
 
+       lfsck_rbtree_cleanup(env, com);
        if (atomic_dec_and_test(&lfsck->li_double_scan_count))
                wake_up_all(&lfsck->li_thread.t_ctl_waitq);
 
@@ -3590,8 +3864,6 @@ static void lfsck_layout_slave_data_release(const struct lu_env *env,
 
        LASSERT(llsd != NULL);
 
-       com->lc_data = NULL;
-
        list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
                                     lls_list) {
                list_del_init(&lls->lls_list);
@@ -3605,6 +3877,8 @@ static void lfsck_layout_slave_data_release(const struct lu_env *env,
                OBD_FREE_PTR(llst);
        }
 
+       lfsck_rbtree_cleanup(env, com);
+       com->lc_data = NULL;
        OBD_FREE_PTR(llsd);
 }
 
@@ -3624,6 +3898,12 @@ static void lfsck_layout_master_quit(const struct lu_env *env,
                     &lwi);
 }
 
+static void lfsck_layout_slave_quit(const struct lu_env *env,
+                                   struct lfsck_component *com)
+{
+       lfsck_rbtree_cleanup(env, com);
+}
+
 static int lfsck_layout_master_in_notify(const struct lu_env *env,
                                         struct lfsck_component *com,
                                         struct lfsck_request *lr)
@@ -3714,6 +3994,12 @@ static int lfsck_layout_slave_in_notify(const struct lu_env *env,
        struct lfsck_layout_slave_target *llst;
        ENTRY;
 
+       if (lr->lr_event == LE_FID_ACCESSED) {
+               lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
+
+               RETURN(0);
+       }
+
        if (lr->lr_event != LE_PHASE2_DONE &&
            lr->lr_event != LE_STOP)
                RETURN(-EINVAL);
@@ -3858,6 +4144,7 @@ static struct lfsck_operations lfsck_layout_slave_ops = {
        .lfsck_dump             = lfsck_layout_dump,
        .lfsck_double_scan      = lfsck_layout_slave_double_scan,
        .lfsck_data_release     = lfsck_layout_slave_data_release,
+       .lfsck_quit             = lfsck_layout_slave_quit,
        .lfsck_in_notify        = lfsck_layout_slave_in_notify,
        .lfsck_query            = lfsck_layout_query,
        .lfsck_join             = lfsck_layout_slave_join,
@@ -3911,6 +4198,8 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
                INIT_LIST_HEAD(&llsd->llsd_seq_list);
                INIT_LIST_HEAD(&llsd->llsd_master_list);
                spin_lock_init(&llsd->llsd_lock);
+               llsd->llsd_rb_root = RB_ROOT;
+               rwlock_init(&llsd->llsd_rb_lock);
                com->lc_data = llsd;
        }
        com->lc_file_size = sizeof(*lo);
index 9dd35f9..3fa1f93 100644 (file)
@@ -1544,6 +1544,7 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
        case LE_STOP:
        case LE_PHASE1_DONE:
        case LE_PHASE2_DONE:
+       case LE_FID_ACCESSED:
                break;
        default:
                RETURN(-EOPNOTSUPP);
index b22ceec..ab041ce 100644 (file)
@@ -183,7 +183,8 @@ struct ofd_device {
                                  * supporting OBD_CONNECT_GRANT_PARAM? */
                                 ofd_grant_compat_disable:1,
                                 /* Protected by ofd_lastid_rwsem. */
-                                ofd_lastid_rebuilding:1;
+                                ofd_lastid_rebuilding:1,
+                                ofd_record_fid_accessed:1;
        struct seq_server_site   ofd_seq_site;
        /* the limit of SOFT_SYNC RPCs that will trigger a soft sync */
        unsigned int             ofd_soft_sync_limit;
@@ -319,6 +320,7 @@ struct ofd_thread_info {
        /* Space used by the I/O, used by grant code */
        unsigned long                    fti_used;
        struct ost_lvb                   fti_lvb;
+       struct lfsck_request             fti_lr;
 };
 
 extern void target_recovery_fini(struct obd_device *obd);
index f6fd068..3262722 100644 (file)
@@ -43,6 +43,7 @@
 
 #include <dt_object.h>
 #include <lustre/lustre_idl.h>
+#include <lustre_lfsck.h>
 
 #include "ofd_internal.h"
 
@@ -295,6 +296,14 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
                fo = batch[i];
                LASSERT(fo);
 
+               /* Only the new created objects need to be recorded. */
+               if (ofd->ofd_osd->dd_record_fid_accessed) {
+                       lfsck_pack_rfa(&ofd_info(env)->fti_lr,
+                                      lu_object_fid(&fo->ofo_obj.do_lu));
+                       lfsck_in_notify(env, ofd->ofd_osd,
+                                       &ofd_info(env)->fti_lr);
+               }
+
                if (likely(!ofd_object_exists(fo) &&
                           !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
                        next = ofd_object_child(fo);
index 83105a9..0f58654 100644 (file)
@@ -1318,6 +1318,12 @@ int out_handle(struct tgt_session_info *tsi)
                if (IS_ERR(dt_obj))
                        GOTO(out, rc = PTR_ERR(dt_obj));
 
+               if (dt->dd_record_fid_accessed) {
+                       lfsck_pack_rfa(&tti->tti_lr,
+                                      lu_object_fid(&dt_obj->do_lu));
+                       tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
+               }
+
                tti->tti_u.update.tti_dt_object = dt_obj;
                tti->tti_u.update.tti_update = update;
                tti->tti_u.update.tti_update_reply_index = i;
index 2164cda..7241a3e 100644 (file)
@@ -137,6 +137,7 @@ struct tgt_thread_info {
                        struct dt_object        *tti_dt_object;
                } update;
        } tti_u;
+       struct lfsck_request tti_lr;
 };
 
 extern struct lu_context_key tgt_thread_key;