Whamcloud - gitweb
LU-1866 lfsck: enhance otable-based iteration
authorFan Yong <yong.fan@whamcloud.com>
Sat, 12 Jan 2013 11:47:12 +0000 (19:47 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 2 Feb 2013 22:12:50 +0000 (17:12 -0500)
1) Use ::load() method to unplug OSD layer iteration.

2) Use ::put() method to wakeup the LFSCK main engine
   if it is blocked at low layer for ::next() call.

Test-Parameters: envdefinitions=ENABLE_QUOTA=yes testlist=sanity-scrub
Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I28e5689ad3a4ee96a3f17a64e8d6dca553a86d77
Reviewed-on: http://review.whamcloud.com/4906
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Tested-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
lustre/mdd/mdd_lfsck.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_scrub.c

index 0847524..6c69793 100644 (file)
@@ -125,17 +125,12 @@ static int mdd_lfsck_main(void *args)
        CDEBUG(D_LFSCK, "LFSCK: flags = 0x%x, pid = %d\n",
               lfsck->ml_args, cfs_curproc_pid());
 
-       /* XXX: Prepare before wakeup the sponsor.
-        *      Each lfsck component should call iops->get() API with
-        *      every bookmark, then low layer module can decide the
-        *      start point for current iteration. */
-
        spin_lock(&lfsck->ml_lock);
        thread_set_flags(thread, SVC_RUNNING);
        spin_unlock(&lfsck->ml_lock);
        cfs_waitq_broadcast(&thread->t_ctl_waitq);
 
-       /* Call iops->load() to finish the choosing start point. */
+       /* The call iops->load() will unplug low layer iteration. */
        rc = iops->load(&env, di, 0);
        if (rc != 0)
                GOTO(out, rc);
index 24a96d9..1dec8c2 100644 (file)
@@ -216,11 +216,9 @@ struct osd_otable_cache {
 
 struct osd_otable_it {
        struct osd_device       *ooi_dev;
+       pid_t                    ooi_pid;
        struct osd_otable_cache  ooi_cache;
 
-       /* For osd_otable_it_key. */
-       __u8                     ooi_key[16];
-
        /* The following bits can be updated/checked w/o lock protection.
         * If more bits will be introduced in the future and need lock to
         * protect, please add comment. */
@@ -230,7 +228,9 @@ struct osd_otable_it {
                                                    * filled into cache. */
                                 ooi_user_ready:1, /* The user out of OSD is
                                                    * ready to iterate. */
-                                ooi_waiting:1; /* it::next is waiting. */
+                                ooi_waiting:1, /* it::next is waiting. */
+                                ooi_stopping:1; /* Someone is stopping
+                                                 * the iteration. */
 };
 
 extern const int osd_dto_credits_noquota[];
index 1c0c78f..ed6868a 100644 (file)
@@ -50,6 +50,8 @@
 
 #define HALF_SEC       (CFS_HZ >> 1)
 
+#define OSD_OTABLE_MAX_HASH            0x00000000ffffffffULL
+
 #define SCRUB_NEXT_BREAK       1 /* exit current loop and process next group */
 #define SCRUB_NEXT_CONTINUE    2 /* skip current object and process next bit */
 #define SCRUB_NEXT_EXIT        3 /* exit all the loops */
@@ -837,7 +839,7 @@ static int osd_preload_exec(struct osd_thread_info *info,
 #define SCRUB_IT_CRASH 2
 
 static int osd_inode_iteration(struct osd_thread_info *info,
-                              struct osd_device *dev, __u32 max, int preload)
+                              struct osd_device *dev, __u32 max, bool preload)
 {
        osd_iit_next_policy   next;
        osd_iit_exec_policy   exec;
@@ -849,7 +851,7 @@ static int osd_inode_iteration(struct osd_thread_info *info,
        int                   rc;
        ENTRY;
 
-       if (preload == 0) {
+       if (!preload) {
                struct osd_scrub *scrub = &dev->od_scrub;
 
                next = osd_scrub_next;
@@ -904,10 +906,18 @@ static int osd_inode_iteration(struct osd_thread_info *info,
                                brelse(param.bitmap);
                                RETURN(rc);
                        }
+
+                       if (preload && dev->od_otable_it->ooi_stopping) {
+                               brelse(param.bitmap);
+                               RETURN(0);
+                       }
                }
 
 next_group:
                brelse(param.bitmap);
+
+               if (preload && dev->od_otable_it->ooi_stopping)
+                       RETURN(0);
        }
 
        if (*pos > limit)
@@ -925,7 +935,7 @@ static int osd_otable_it_preload(const struct lu_env *env,
        ENTRY;
 
        rc = osd_inode_iteration(osd_oti_get(env), dev,
-                                OSD_OTABLE_IT_CACHE_SIZE, 1);
+                                OSD_OTABLE_IT_CACHE_SIZE, true);
        if (rc == SCRUB_IT_ALL)
                it->ooi_all_cached = 1;
 
@@ -984,7 +994,7 @@ static int osd_scrub_main(void *args)
        CDEBUG(D_LFSCK, "OI scrub: flags = 0x%x, pos = %u\n",
               scrub->os_start_flags, scrub->os_pos_current);
 
-       rc = osd_inode_iteration(osd_oti_get(&env), dev, ~0U, 0);
+       rc = osd_inode_iteration(osd_oti_get(&env), dev, ~0U, false);
        if (unlikely(rc == SCRUB_IT_CRASH))
                GOTO(out, rc = -EINVAL);
        GOTO(post, rc);
@@ -1709,6 +1719,7 @@ static struct dt_it *osd_otable_it_init(const struct lu_env *env,
 
        dev->od_otable_it = it;
        it->ooi_dev = dev;
+       it->ooi_pid = cfs_curproc_pid();
        it->ooi_cache.ooc_consumer_idx = -1;
        if (flags & DOIF_OUTUSED)
                it->ooi_used_outside = 1;
@@ -1724,16 +1735,14 @@ static struct dt_it *osd_otable_it_init(const struct lu_env *env,
        }
 
        rc = do_osd_scrub_start(dev, start);
-       if (rc == -EALREADY) {
-               it->ooi_cache.ooc_pos_preload = scrub->os_pos_current - 1;
-       } else if (rc < 0) {
+       if (rc < 0 && rc != -EALREADY) {
                dev->od_otable_it = NULL;
                OBD_FREE_PTR(it);
-               GOTO(out, it = ERR_PTR(-EALREADY));
-       } else {
-               it->ooi_cache.ooc_pos_preload = scrub->os_pos_current;
+               GOTO(out, it = ERR_PTR(rc));
        }
 
+       it->ooi_cache.ooc_pos_preload = scrub->os_pos_current;
+
        GOTO(out, it);
 
 out:
@@ -1756,50 +1765,36 @@ static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
        OBD_FREE_PTR(it);
 }
 
-/**
- * XXX: Temporary used to notify otable iteration to be paused.
- */
-static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
+static int osd_otable_it_get(const struct lu_env *env,
+                            struct dt_it *di, const struct dt_key *key)
 {
-       struct osd_device *dev = ((struct osd_otable_it *)di)->ooi_dev;
-
-       /* od_otable_mutex: prevent curcurrent init/fini */
-       mutex_lock(&dev->od_otable_mutex);
-       dev->od_scrub.os_paused = 1;
-       mutex_unlock(&dev->od_otable_mutex);
+       return 0;
 }
 
 /**
- * Set the OSD layer iteration start position as the specified key.
+ * It is hack here:
  *
- * The LFSCK out of OSD layer does not know the detail of the key, so if there
- * are several keys, they cannot be compared out of OSD, so call "::get()" for
- * each key, and OSD will select the smallest one by itself.
+ * Sometimes the otable-based iteration driver (LFSCK) may be blocked in OSD
+ * layer when someone wants to stop/pause the iteration. Under such case, we
+ * need some mechanism to notify the event and wakeup the blocker.
  */
-static int osd_otable_it_get(const struct lu_env *env,
-                            struct dt_it *di, const struct dt_key *key)
+static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
 {
-       struct osd_otable_it    *it  = (struct osd_otable_it *)di;
-       struct osd_otable_cache *ooc = &it->ooi_cache;
-       const char              *str = (const char *)key;
-       __u32                    ino;
-       ENTRY;
-
-       /* Forbid to set iteration position after iteration started. */
-       if (it->ooi_user_ready)
-               RETURN(-EPERM);
-
-       if (str[0] == '\0')
-               RETURN(-EINVAL);
-
-       if (sscanf(str, "%u", &ino) <= 0)
-               RETURN(-EINVAL);
+       struct osd_otable_it *it  = (struct osd_otable_it *)di;
+       struct osd_device    *dev = it->ooi_dev;
 
-       /* Skip the one that has been processed last time. */
-       if (ooc->ooc_pos_preload > ++ino)
-               ooc->ooc_pos_preload = ino;
+       /* od_otable_mutex: prevent curcurrent init/fini */
+       mutex_lock(&dev->od_otable_mutex);
+       if (it->ooi_pid == cfs_curproc_pid()) {
+               dev->od_scrub.os_paused = 1;
+       } else {
+               struct ptlrpc_thread *thread = &dev->od_scrub.os_thread;
 
-       RETURN(0);
+               it->ooi_stopping = 1;
+               if (it->ooi_waiting)
+                       cfs_waitq_broadcast(&thread->t_ctl_waitq);
+       }
+       mutex_unlock(&dev->od_otable_mutex);
 }
 
 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
@@ -1836,13 +1831,17 @@ again:
        it->ooi_waiting = 1;
        l_wait_event(thread->t_ctl_waitq,
                     ooc->ooc_pos_preload < scrub->os_pos_current ||
-                    !thread_is_running(thread),
+                    !thread_is_running(thread) ||
+                    it->ooi_stopping,
                     &lwi);
        it->ooi_waiting = 0;
 
        if (!thread_is_running(thread) && !it->ooi_used_outside)
                RETURN(1);
 
+       if (it->ooi_stopping)
+               RETURN(0);
+
        rc = osd_otable_it_preload(env, it);
        if (rc >= 0)
                goto again;
@@ -1853,18 +1852,13 @@ again:
 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
                                        const struct dt_it *di)
 {
-       struct osd_otable_it    *it  = (struct osd_otable_it *)di;
-       struct osd_otable_cache *ooc = &it->ooi_cache;
-
-       sprintf(it->ooi_key, "%u",
-               ooc->ooc_cache[ooc->ooc_consumer_idx].oic_lid.oii_ino);
-       return (struct dt_key *)it->ooi_key;
+       return NULL;
 }
 
 static int osd_otable_it_key_size(const struct lu_env *env,
                                  const struct dt_it *di)
 {
-       return sizeof(((struct osd_otable_it *)di)->ooi_key);
+       return sizeof(__u64);
 }
 
 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
@@ -1877,6 +1871,23 @@ static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
        return 0;
 }
 
+static __u64 osd_otable_it_store(const struct lu_env *env,
+                                const struct dt_it *di)
+{
+       struct osd_otable_it    *it  = (struct osd_otable_it *)di;
+       struct osd_otable_cache *ooc = &it->ooi_cache;
+       __u64                    hash;
+
+       if (it->ooi_user_ready)
+               hash = ooc->ooc_pos_preload;
+       else
+               hash = ooc->ooc_cache[ooc->ooc_consumer_idx].oic_lid.oii_ino;
+       return hash;
+}
+
+/**
+ * Set the OSD layer iteration start position as the specified hash.
+ */
 static int osd_otable_it_load(const struct lu_env *env,
                              const struct dt_it *di, __u64 hash)
 {
@@ -1884,31 +1895,52 @@ static int osd_otable_it_load(const struct lu_env *env,
        struct osd_device       *dev   = it->ooi_dev;
        struct osd_otable_cache *ooc   = &it->ooi_cache;
        struct osd_scrub        *scrub = &dev->od_scrub;
+       int                      rc;
+       ENTRY;
 
+       /* Forbid to set iteration position after iteration started. */
        if (it->ooi_user_ready)
-               return 0;
+               RETURN(-EPERM);
+
+       if (hash > OSD_OTABLE_MAX_HASH)
+               hash = OSD_OTABLE_MAX_HASH;
+
+       /* Skip the one that has been processed last time. */
+       if (ooc->ooc_pos_preload > hash)
+               ooc->ooc_pos_preload = hash;
 
        if (ooc->ooc_pos_preload <= LDISKFS_FIRST_INO(osd_sb(dev)))
                ooc->ooc_pos_preload = LDISKFS_FIRST_INO(osd_sb(dev)) + 1;
+
        it->ooi_user_ready = 1;
        if (!scrub->os_full_speed)
                cfs_waitq_broadcast(&scrub->os_thread.t_ctl_waitq);
 
        /* Unplug OSD layer iteration by the first next() call. */
-       return osd_otable_it_next(env, (struct dt_it *)it);
+       rc = osd_otable_it_next(env, (struct dt_it *)it);
+
+       RETURN(rc);
+}
+
+static int osd_otable_it_key_rec(const struct lu_env *env,
+                                const struct dt_it *di, void *key_rec)
+{
+       return 0;
 }
 
 const struct dt_index_operations osd_otable_ops = {
        .dio_it = {
                .init     = osd_otable_it_init,
                .fini     = osd_otable_it_fini,
-               .put      = osd_otable_it_put,
                .get      = osd_otable_it_get,
+               .put      = osd_otable_it_put,
                .next     = osd_otable_it_next,
-               .key      = osd_otable_it_key,
+               .key      = osd_otable_it_key,
                .key_size = osd_otable_it_key_size,
                .rec      = osd_otable_it_rec,
+               .store    = osd_otable_it_store,
                .load     = osd_otable_it_load,
+               .key_rec  = osd_otable_it_key_rec,
        }
 };