From: Fan Yong Date: Sat, 12 Jan 2013 11:47:12 +0000 (+0800) Subject: LU-1866 lfsck: enhance otable-based iteration X-Git-Tag: 2.3.61~50 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=6e6357dbf9a14aaea459f460dbe4f93e52c814d4 LU-1866 lfsck: enhance otable-based iteration 1) Use ::load() method to unplug OSD layer iteration. 2) Use ::put() method to wakeup the LFSCK main engine if it is blocked at low layer for ::next() call. Test-Parameters: envdefinitions=ENABLE_QUOTA=yes testlist=sanity-scrub Signed-off-by: Fan Yong Change-Id: I28e5689ad3a4ee96a3f17a64e8d6dca553a86d77 Reviewed-on: http://review.whamcloud.com/4906 Reviewed-by: Andreas Dilger Tested-by: Hudson Tested-by: Maloo Tested-by: Andreas Dilger Reviewed-by: Alex Zhuravlev --- diff --git a/lustre/mdd/mdd_lfsck.c b/lustre/mdd/mdd_lfsck.c index 0847524..6c69793 100644 --- a/lustre/mdd/mdd_lfsck.c +++ b/lustre/mdd/mdd_lfsck.c @@ -125,17 +125,12 @@ static int mdd_lfsck_main(void *args) CDEBUG(D_LFSCK, "LFSCK: flags = 0x%x, pid = %d\n", lfsck->ml_args, cfs_curproc_pid()); - /* XXX: Prepare before wakeup the sponsor. - * Each lfsck component should call iops->get() API with - * every bookmark, then low layer module can decide the - * start point for current iteration. */ - spin_lock(&lfsck->ml_lock); thread_set_flags(thread, SVC_RUNNING); spin_unlock(&lfsck->ml_lock); cfs_waitq_broadcast(&thread->t_ctl_waitq); - /* Call iops->load() to finish the choosing start point. */ + /* The call iops->load() will unplug low layer iteration. */ rc = iops->load(&env, di, 0); if (rc != 0) GOTO(out, rc); diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index 24a96d9..1dec8c2 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -216,11 +216,9 @@ struct osd_otable_cache { struct osd_otable_it { struct osd_device *ooi_dev; + pid_t ooi_pid; struct osd_otable_cache ooi_cache; - /* For osd_otable_it_key. */ - __u8 ooi_key[16]; - /* The following bits can be updated/checked w/o lock protection. * If more bits will be introduced in the future and need lock to * protect, please add comment. */ @@ -230,7 +228,9 @@ struct osd_otable_it { * filled into cache. */ ooi_user_ready:1, /* The user out of OSD is * ready to iterate. */ - ooi_waiting:1; /* it::next is waiting. */ + ooi_waiting:1, /* it::next is waiting. */ + ooi_stopping:1; /* Someone is stopping + * the iteration. */ }; extern const int osd_dto_credits_noquota[]; diff --git a/lustre/osd-ldiskfs/osd_scrub.c b/lustre/osd-ldiskfs/osd_scrub.c index 1c0c78f..ed6868a 100644 --- a/lustre/osd-ldiskfs/osd_scrub.c +++ b/lustre/osd-ldiskfs/osd_scrub.c @@ -50,6 +50,8 @@ #define HALF_SEC (CFS_HZ >> 1) +#define OSD_OTABLE_MAX_HASH 0x00000000ffffffffULL + #define SCRUB_NEXT_BREAK 1 /* exit current loop and process next group */ #define SCRUB_NEXT_CONTINUE 2 /* skip current object and process next bit */ #define SCRUB_NEXT_EXIT 3 /* exit all the loops */ @@ -837,7 +839,7 @@ static int osd_preload_exec(struct osd_thread_info *info, #define SCRUB_IT_CRASH 2 static int osd_inode_iteration(struct osd_thread_info *info, - struct osd_device *dev, __u32 max, int preload) + struct osd_device *dev, __u32 max, bool preload) { osd_iit_next_policy next; osd_iit_exec_policy exec; @@ -849,7 +851,7 @@ static int osd_inode_iteration(struct osd_thread_info *info, int rc; ENTRY; - if (preload == 0) { + if (!preload) { struct osd_scrub *scrub = &dev->od_scrub; next = osd_scrub_next; @@ -904,10 +906,18 @@ static int osd_inode_iteration(struct osd_thread_info *info, brelse(param.bitmap); RETURN(rc); } + + if (preload && dev->od_otable_it->ooi_stopping) { + brelse(param.bitmap); + RETURN(0); + } } next_group: brelse(param.bitmap); + + if (preload && dev->od_otable_it->ooi_stopping) + RETURN(0); } if (*pos > limit) @@ -925,7 +935,7 @@ static int osd_otable_it_preload(const struct lu_env *env, ENTRY; rc = osd_inode_iteration(osd_oti_get(env), dev, - OSD_OTABLE_IT_CACHE_SIZE, 1); + OSD_OTABLE_IT_CACHE_SIZE, true); if (rc == SCRUB_IT_ALL) it->ooi_all_cached = 1; @@ -984,7 +994,7 @@ static int osd_scrub_main(void *args) CDEBUG(D_LFSCK, "OI scrub: flags = 0x%x, pos = %u\n", scrub->os_start_flags, scrub->os_pos_current); - rc = osd_inode_iteration(osd_oti_get(&env), dev, ~0U, 0); + rc = osd_inode_iteration(osd_oti_get(&env), dev, ~0U, false); if (unlikely(rc == SCRUB_IT_CRASH)) GOTO(out, rc = -EINVAL); GOTO(post, rc); @@ -1709,6 +1719,7 @@ static struct dt_it *osd_otable_it_init(const struct lu_env *env, dev->od_otable_it = it; it->ooi_dev = dev; + it->ooi_pid = cfs_curproc_pid(); it->ooi_cache.ooc_consumer_idx = -1; if (flags & DOIF_OUTUSED) it->ooi_used_outside = 1; @@ -1724,16 +1735,14 @@ static struct dt_it *osd_otable_it_init(const struct lu_env *env, } rc = do_osd_scrub_start(dev, start); - if (rc == -EALREADY) { - it->ooi_cache.ooc_pos_preload = scrub->os_pos_current - 1; - } else if (rc < 0) { + if (rc < 0 && rc != -EALREADY) { dev->od_otable_it = NULL; OBD_FREE_PTR(it); - GOTO(out, it = ERR_PTR(-EALREADY)); - } else { - it->ooi_cache.ooc_pos_preload = scrub->os_pos_current; + GOTO(out, it = ERR_PTR(rc)); } + it->ooi_cache.ooc_pos_preload = scrub->os_pos_current; + GOTO(out, it); out: @@ -1756,50 +1765,36 @@ static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di) OBD_FREE_PTR(it); } -/** - * XXX: Temporary used to notify otable iteration to be paused. - */ -static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di) +static int osd_otable_it_get(const struct lu_env *env, + struct dt_it *di, const struct dt_key *key) { - struct osd_device *dev = ((struct osd_otable_it *)di)->ooi_dev; - - /* od_otable_mutex: prevent curcurrent init/fini */ - mutex_lock(&dev->od_otable_mutex); - dev->od_scrub.os_paused = 1; - mutex_unlock(&dev->od_otable_mutex); + return 0; } /** - * Set the OSD layer iteration start position as the specified key. + * It is hack here: * - * The LFSCK out of OSD layer does not know the detail of the key, so if there - * are several keys, they cannot be compared out of OSD, so call "::get()" for - * each key, and OSD will select the smallest one by itself. + * Sometimes the otable-based iteration driver (LFSCK) may be blocked in OSD + * layer when someone wants to stop/pause the iteration. Under such case, we + * need some mechanism to notify the event and wakeup the blocker. */ -static int osd_otable_it_get(const struct lu_env *env, - struct dt_it *di, const struct dt_key *key) +static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di) { - struct osd_otable_it *it = (struct osd_otable_it *)di; - struct osd_otable_cache *ooc = &it->ooi_cache; - const char *str = (const char *)key; - __u32 ino; - ENTRY; - - /* Forbid to set iteration position after iteration started. */ - if (it->ooi_user_ready) - RETURN(-EPERM); - - if (str[0] == '\0') - RETURN(-EINVAL); - - if (sscanf(str, "%u", &ino) <= 0) - RETURN(-EINVAL); + struct osd_otable_it *it = (struct osd_otable_it *)di; + struct osd_device *dev = it->ooi_dev; - /* Skip the one that has been processed last time. */ - if (ooc->ooc_pos_preload > ++ino) - ooc->ooc_pos_preload = ino; + /* od_otable_mutex: prevent curcurrent init/fini */ + mutex_lock(&dev->od_otable_mutex); + if (it->ooi_pid == cfs_curproc_pid()) { + dev->od_scrub.os_paused = 1; + } else { + struct ptlrpc_thread *thread = &dev->od_scrub.os_thread; - RETURN(0); + it->ooi_stopping = 1; + if (it->ooi_waiting) + cfs_waitq_broadcast(&thread->t_ctl_waitq); + } + mutex_unlock(&dev->od_otable_mutex); } static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di) @@ -1836,13 +1831,17 @@ again: it->ooi_waiting = 1; l_wait_event(thread->t_ctl_waitq, ooc->ooc_pos_preload < scrub->os_pos_current || - !thread_is_running(thread), + !thread_is_running(thread) || + it->ooi_stopping, &lwi); it->ooi_waiting = 0; if (!thread_is_running(thread) && !it->ooi_used_outside) RETURN(1); + if (it->ooi_stopping) + RETURN(0); + rc = osd_otable_it_preload(env, it); if (rc >= 0) goto again; @@ -1853,18 +1852,13 @@ again: static struct dt_key *osd_otable_it_key(const struct lu_env *env, const struct dt_it *di) { - struct osd_otable_it *it = (struct osd_otable_it *)di; - struct osd_otable_cache *ooc = &it->ooi_cache; - - sprintf(it->ooi_key, "%u", - ooc->ooc_cache[ooc->ooc_consumer_idx].oic_lid.oii_ino); - return (struct dt_key *)it->ooi_key; + return NULL; } static int osd_otable_it_key_size(const struct lu_env *env, const struct dt_it *di) { - return sizeof(((struct osd_otable_it *)di)->ooi_key); + return sizeof(__u64); } static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di, @@ -1877,6 +1871,23 @@ static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di, return 0; } +static __u64 osd_otable_it_store(const struct lu_env *env, + const struct dt_it *di) +{ + struct osd_otable_it *it = (struct osd_otable_it *)di; + struct osd_otable_cache *ooc = &it->ooi_cache; + __u64 hash; + + if (it->ooi_user_ready) + hash = ooc->ooc_pos_preload; + else + hash = ooc->ooc_cache[ooc->ooc_consumer_idx].oic_lid.oii_ino; + return hash; +} + +/** + * Set the OSD layer iteration start position as the specified hash. + */ static int osd_otable_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) { @@ -1884,31 +1895,52 @@ static int osd_otable_it_load(const struct lu_env *env, struct osd_device *dev = it->ooi_dev; struct osd_otable_cache *ooc = &it->ooi_cache; struct osd_scrub *scrub = &dev->od_scrub; + int rc; + ENTRY; + /* Forbid to set iteration position after iteration started. */ if (it->ooi_user_ready) - return 0; + RETURN(-EPERM); + + if (hash > OSD_OTABLE_MAX_HASH) + hash = OSD_OTABLE_MAX_HASH; + + /* Skip the one that has been processed last time. */ + if (ooc->ooc_pos_preload > hash) + ooc->ooc_pos_preload = hash; if (ooc->ooc_pos_preload <= LDISKFS_FIRST_INO(osd_sb(dev))) ooc->ooc_pos_preload = LDISKFS_FIRST_INO(osd_sb(dev)) + 1; + it->ooi_user_ready = 1; if (!scrub->os_full_speed) cfs_waitq_broadcast(&scrub->os_thread.t_ctl_waitq); /* Unplug OSD layer iteration by the first next() call. */ - return osd_otable_it_next(env, (struct dt_it *)it); + rc = osd_otable_it_next(env, (struct dt_it *)it); + + RETURN(rc); +} + +static int osd_otable_it_key_rec(const struct lu_env *env, + const struct dt_it *di, void *key_rec) +{ + return 0; } const struct dt_index_operations osd_otable_ops = { .dio_it = { .init = osd_otable_it_init, .fini = osd_otable_it_fini, - .put = osd_otable_it_put, .get = osd_otable_it_get, + .put = osd_otable_it_put, .next = osd_otable_it_next, - .key = osd_otable_it_key, + .key = osd_otable_it_key, .key_size = osd_otable_it_key_size, .rec = osd_otable_it_rec, + .store = osd_otable_it_store, .load = osd_otable_it_load, + .key_rec = osd_otable_it_key_rec, } };