From fafa0e31a54281167eebd615da847866eaf03a93 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Thu, 14 Jun 2012 15:40:39 +0800 Subject: [PATCH] LU-957 scrub: osd-ldiskfs itable based iteration Implement inode table based object iteration in osd-ldiskfs. It is implemented as DT iteration APIs, which can be used by up layer LFSCK to scan the whole device sequentially. Signed-off-by: Fan Yong Change-Id: I3d80bb4a174d47429764e5cca35e4f07be52d50b Reviewed-on: http://review.whamcloud.com/2553 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Andreas Dilger --- lustre/include/dt_object.h | 27 ++ lustre/obdclass/dt_object.c | 3 + lustre/osd-ldiskfs/osd_handler.c | 3 + lustre/osd-ldiskfs/osd_scrub.c | 706 +++++++++++++++++++++++++++++---------- 4 files changed, 571 insertions(+), 168 deletions(-) diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 7e41e6c..ff143fe 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -213,6 +213,7 @@ enum dt_index_flags { * names to fids). */ extern const struct dt_index_features dt_directory_features; +extern const struct dt_index_features dt_otable_features; /** * This is a general purpose dt allocation hint. @@ -600,6 +601,32 @@ struct dt_index_operations { } dio_it; }; +enum dt_otable_it_valid { + DOIV_ERROR_HANDLE = 0x0001, +}; + +enum dt_otable_it_flags { + /* Exit when fail. */ + DOIF_FAILOUT = 0x0001, + + /* Reset iteration position to the device beginning. */ + DOIF_RESET = 0x0002, + + /* There is up layer component uses the iteration. */ + DOIF_OUTUSED = 0x0004, +}; + +/* otable based iteration needs to use the common DT interation APIs. + * To initialize the iteration, it needs call dio_it::init() firstly. + * Here is how the otable based iteration should prepare arguments to + * call dt_it_ops::init(). + * + * For otable based iteration, the 32-bits 'attr' for dt_it_ops::init() + * is composed of two parts: + * low 16-bits is for valid bits, high 16-bits is for flags bits. */ +#define DT_OTABLE_IT_FLAGS_SHIFT 16 +#define DT_OTABLE_IT_FLAGS_MASK 0xffff0000 + struct dt_device { struct lu_device dd_lu_dev; const struct dt_device_operations *dd_ops; diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 737ae03..435f068 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -561,3 +561,6 @@ EXPORT_SYMBOL(dt_version_get); const struct dt_index_features dt_directory_features; EXPORT_SYMBOL(dt_directory_features); + +const struct dt_index_features dt_otable_features; +EXPORT_SYMBOL(dt_otable_features); diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index f71b522..6db8e25 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -2536,6 +2536,9 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, else result = -ENOTDIR; ea_dir = 1; + } else if (unlikely(feat == &dt_otable_features)) { + dt->do_index_ops = &osd_otable_ops; + return 0; } else if (!osd_has_index(obj)) { struct osd_directory *dir; diff --git a/lustre/osd-ldiskfs/osd_scrub.c b/lustre/osd-ldiskfs/osd_scrub.c index ccdca14..0e6382f 100644 --- a/lustre/osd-ldiskfs/osd_scrub.c +++ b/lustre/osd-ldiskfs/osd_scrub.c @@ -281,8 +281,8 @@ static int osd_scrub_prep(struct osd_device *dev) RETURN(rc); } -static int osd_scrub_error_handler(struct osd_device *dev, - struct osd_inode_id *lid, int rc) +static int +osd_scrub_error(struct osd_device *dev, struct osd_inode_id *lid, int rc) { struct osd_scrub *scrub = &dev->od_scrub; struct scrub_file *sf = &scrub->os_file; @@ -491,22 +491,92 @@ static void osd_scrub_post(struct osd_scrub *scrub, int result) EXIT; } -#define SCRUB_NEXT_BREAK 1 -#define SCRUB_NEXT_CONTINUE 2 +#define SCRUB_NEXT_BREAK 1 /* exit current loop and process next group */ +#define SCRUB_NEXT_CONTINUE 2 /* skip current object and process next bit */ +#define SCRUB_NEXT_EXIT 3 /* exit all the loops */ +#define SCRUB_NEXT_WAIT 4 /* wait for free cache slot */ + +struct osd_iit_param { + struct super_block *sb; + struct buffer_head *bitmap; + ldiskfs_group_t bg; + __u32 gbase; + __u32 offset; +}; + +typedef int (*osd_iit_next_policy)(struct osd_thread_info *info, + struct osd_device *dev, + struct osd_iit_param *param, + struct osd_idmap_cache **oic, + int noslot); + +typedef int (*osd_iit_exec_policy)(struct osd_thread_info *info, + struct osd_device *dev, + struct osd_iit_param *param, + struct osd_idmap_cache *oic, + int *noslot, int rc); -static int -osd_scrub_next(struct osd_thread_info *info, struct osd_device *dev, - struct osd_scrub *scrub, struct super_block *sb, - ldiskfs_group_t bg, struct buffer_head *bitmap, __u32 gbase, - __u32 *offset, struct osd_idmap_cache **oic) +static inline int osd_scrub_has_window(struct osd_scrub *scrub, + struct osd_otable_cache *ooc) +{ + return scrub->os_pos_current < ooc->ooc_pos_preload + SCRUB_WINDOW_SIZE; +} + +static int osd_iit_next(struct osd_iit_param *param, __u32 *pos) { - struct osd_inconsistent_item *oii; - struct lu_fid *fid; - struct osd_inode_id *lid; - struct inode *inode; - int rc = 0; + param->offset = ldiskfs_find_next_bit(param->bitmap->b_data, + LDISKFS_INODES_PER_GROUP(param->sb), param->offset); + if (param->offset >= LDISKFS_INODES_PER_GROUP(param->sb)) { + *pos = 1 + (param->bg+1) * LDISKFS_INODES_PER_GROUP(param->sb); + return SCRUB_NEXT_BREAK; + } else { + *pos = param->gbase + param->offset; + return 0; + } +} + +static int osd_iit_iget(struct osd_thread_info *info, struct osd_device *dev, + struct lu_fid *fid, struct osd_inode_id *lid, __u32 pos, + struct super_block *sb, struct inode **pinode) +{ + struct inode *inode; + int rc; + + osd_id_gen(lid, pos, OSD_OII_NOGEN); + inode = osd_iget_fid(info, dev, lid, fid); + if (IS_ERR(inode)) { + rc = PTR_ERR(inode); + /* The inode may be removed after bitmap searching, or the + * file is new created without inode initialized yet. */ + if (rc == -ENOENT || rc == -ESTALE) + return SCRUB_NEXT_CONTINUE; + + CERROR("%.16s: fail to read inode, ino# = %u, rc = %d\n", + LDISKFS_SB(sb)->s_es->s_volume_name, pos, rc); + return rc; + } + + *pinode = inode; + return 0; +} + +static int osd_scrub_next(struct osd_thread_info *info, struct osd_device *dev, + struct osd_iit_param *param, + struct osd_idmap_cache **oic, int noslot) +{ + struct osd_scrub *scrub = &dev->od_scrub; + struct ptlrpc_thread *thread = &scrub->os_thread; + struct lu_fid *fid; + struct osd_inode_id *lid; + struct inode *inode; + int rc; + + if (unlikely(!thread_is_running(thread))) + return SCRUB_NEXT_EXIT; if (!cfs_list_empty(&scrub->os_inconsistent_items)) { + struct osd_inconsistent_item *oii; + oii = cfs_list_entry(scrub->os_inconsistent_items.next, struct osd_inconsistent_item, oii_list); *oic = &oii->oii_cache; @@ -514,65 +584,227 @@ osd_scrub_next(struct osd_thread_info *info, struct osd_device *dev, return 0; } + if (noslot != 0) + return SCRUB_NEXT_WAIT; + + rc = osd_iit_next(param, &scrub->os_pos_current); + if (rc != 0) + return rc; + *oic = &scrub->os_oic; fid = &(*oic)->oic_fid; lid = &(*oic)->oic_lid; - *offset = ldiskfs_find_next_bit(bitmap->b_data, - LDISKFS_INODES_PER_GROUP(sb), *offset); - if (*offset >= LDISKFS_INODES_PER_GROUP(sb)) { - brelse(bitmap); - scrub->os_pos_current = 1 + (bg + 1) * - LDISKFS_INODES_PER_GROUP(sb); - return SCRUB_NEXT_BREAK; - } + rc = osd_iit_iget(info, dev, fid, lid, + scrub->os_pos_current, param->sb, &inode); + if (rc != 0) + return rc; - scrub->os_pos_current = gbase + *offset; - osd_id_gen(lid, scrub->os_pos_current, OSD_OII_NOGEN); - inode = osd_iget_fid(info, dev, lid, fid); - if (IS_ERR(inode)) { - rc = PTR_ERR(inode); - /* The inode may be removed after bitmap searching, or the - * file is new created without inode initialized yet. */ - if (rc == -ENOENT || rc == -ESTALE) - rc = SCRUB_NEXT_CONTINUE; - else - CERROR("%.16s: fail to read inode, group = %u, " - "ino# = %u, rc = %d\n", - LDISKFS_SB(sb)->s_es->s_volume_name, - bg, scrub->os_pos_current, rc); - } else { - if (fid_is_igif(fid) || fid_is_idif(fid) || - fid_seq(fid) == FID_SEQ_LLOG || - fid_seq(fid) == FID_SEQ_LOCAL_FILE || - fid_seq_is_rsvd(fid_seq(fid)) || - inode->i_state & I_LUSTRE_NOSCRUB) - rc = SCRUB_NEXT_CONTINUE; + if (!fid_is_norm(fid) || inode->i_state & I_LUSTRE_NOSCRUB) + rc = SCRUB_NEXT_CONTINUE; + iput(inode); + return rc; +} + +static int osd_preload_next(struct osd_thread_info *info, + struct osd_device *dev, struct osd_iit_param *param, + struct osd_idmap_cache **oic, int noslot) +{ + struct osd_otable_cache *ooc = &dev->od_otable_it->ooi_cache; + struct osd_scrub *scrub; + struct ptlrpc_thread *thread; + struct inode *inode; + int rc; + + rc = osd_iit_next(param, &ooc->ooc_pos_preload); + if (rc != 0) + return rc; + + scrub = &dev->od_scrub; + thread = &scrub->os_thread; + if (thread_is_running(thread) && + ooc->ooc_pos_preload >= scrub->os_pos_current) + return SCRUB_NEXT_EXIT; + + rc = osd_iit_iget(info, dev, + &ooc->ooc_cache[ooc->ooc_producer_idx].oic_fid, + &ooc->ooc_cache[ooc->ooc_producer_idx].oic_lid, + ooc->ooc_pos_preload, param->sb, &inode); + /* If succeed, it needs to move forward; otherwise up layer LFSCK may + * ignore the failure, so it still need to skip the inode next time. */ + ooc->ooc_pos_preload = param->gbase + ++(param->offset); + if (rc == 0) iput(inode); - } return rc; } -static inline int osd_scrub_has_window(struct osd_scrub *scrub, - struct osd_otable_cache *ooc) +static int osd_scrub_exec(struct osd_thread_info *info, struct osd_device *dev, + struct osd_iit_param *param, + struct osd_idmap_cache *oic, int *noslot, int rc) { - return scrub->os_pos_current < ooc->ooc_pos_preload + SCRUB_WINDOW_SIZE; + struct l_wait_info lwi = { 0 }; + struct osd_scrub *scrub = &dev->od_scrub; + struct ptlrpc_thread *thread = &scrub->os_thread; + struct osd_otable_it *it = dev->od_otable_it; + struct osd_otable_cache *ooc = it ? &it->ooi_cache : NULL; + + switch (rc) { + case SCRUB_NEXT_CONTINUE: + goto next; + case SCRUB_NEXT_WAIT: + goto wait; + } + + LASSERTF(rc <= 0, "unexpected rc = %d\n", rc); + + if (rc != 0) + rc = osd_scrub_error(dev, &oic->oic_lid, rc); + else + rc = osd_scrub_check_update(info, dev, oic); + if (rc != 0) + return rc; + + rc = osd_scrub_checkpoint(scrub); + if (rc != 0) { + CERROR("%.16s: fail to checkpoint, pos = %u, rc = %d\n", + LDISKFS_SB(param->sb)->s_es->s_volume_name, + scrub->os_pos_current, rc); + /* Continue, as long as the scrub itself can go ahead. */ + } + + if (scrub->os_in_prior) { + scrub->os_in_prior = 0; + return 0; + } + +next: + scrub->os_pos_current = param->gbase + ++(param->offset); + if (it != NULL && it->ooi_waiting && + ooc->ooc_pos_preload < scrub->os_pos_current) { + it->ooi_waiting = 0; + cfs_waitq_broadcast(&thread->t_ctl_waitq); + } + + if (scrub->os_full_speed || rc == SCRUB_NEXT_CONTINUE) + return 0; + +wait: + if (osd_scrub_has_window(scrub, ooc)) { + *noslot = 0; + return 0; + } + + scrub->os_waiting = 1; + l_wait_event(thread->t_ctl_waitq, + osd_scrub_has_window(scrub, ooc) || + !cfs_list_empty(&scrub->os_inconsistent_items) || + !thread_is_running(thread), + &lwi); + scrub->os_waiting = 0; + + if (osd_scrub_has_window(scrub, ooc)) + *noslot = 0; + else + *noslot = 1; + return 0; +} + +static int osd_preload_exec(struct osd_thread_info *info, + struct osd_device *dev, struct osd_iit_param *param, + struct osd_idmap_cache *oic, int *noslot, int rc) +{ + struct osd_otable_cache *ooc = &dev->od_otable_it->ooi_cache; + + if (rc == 0) { + ooc->ooc_cached_items++; + ooc->ooc_producer_idx = (ooc->ooc_producer_idx + 1) & + ~OSD_OTABLE_IT_CACHE_MASK; + } + return rc > 0 ? 0 : rc; +} + +#define SCRUB_IT_ALL 1 + +static int osd_inode_iteration(struct osd_thread_info *info, + struct osd_device *dev, __u32 max, int preload) +{ + osd_iit_next_policy next; + osd_iit_exec_policy exec; + __u32 *pos; + __u32 *count; + struct osd_iit_param param; + __u32 limit; + int noslot = 0; + int rc; + ENTRY; + + if (preload == 0) { + struct osd_scrub *scrub = &dev->od_scrub; + + next = osd_scrub_next; + exec = osd_scrub_exec; + pos = &scrub->os_pos_current; + count = &scrub->os_new_checked; + } else { + struct osd_otable_cache *ooc = &dev->od_otable_it->ooi_cache; + + next = osd_preload_next; + exec = osd_preload_exec; + pos = &ooc->ooc_pos_preload; + count = &ooc->ooc_cached_items; + } + param.sb = osd_sb(dev); + limit = le32_to_cpu(LDISKFS_SB(param.sb)->s_es->s_inodes_count); + + while (*pos <= limit && *count < max) { + struct osd_idmap_cache *oic = NULL; + + param.bg = (*pos - 1) / LDISKFS_INODES_PER_GROUP(param.sb); + param.offset = (*pos - 1) % LDISKFS_INODES_PER_GROUP(param.sb); + param.gbase = 1 + param.bg * LDISKFS_INODES_PER_GROUP(param.sb); + param.bitmap = ldiskfs_read_inode_bitmap(param.sb, param.bg); + if (param.bitmap == NULL) { + CERROR("%.16s: fail to read bitmap for %u, " + "scrub will stop, urgent mode\n", + LDISKFS_SB(param.sb)->s_es->s_volume_name, + (__u32)param.bg); + RETURN(-EIO); + } + + while (param.offset < LDISKFS_INODES_PER_GROUP(param.sb) && + *count < max) { + rc = next(info, dev, ¶m, &oic, noslot); + switch (rc) { + case SCRUB_NEXT_BREAK: + goto next_group; + case SCRUB_NEXT_EXIT: + brelse(param.bitmap); + RETURN(0); + } + + rc = exec(info, dev, ¶m, oic, &noslot, rc); + if (rc != 0) { + brelse(param.bitmap); + RETURN(rc); + } + } + +next_group: + brelse(param.bitmap); + } + + if (*pos > limit) + RETURN(SCRUB_IT_ALL); + RETURN(0); } static int osd_scrub_main(void *args) { - struct lu_env env; - struct osd_thread_info *info; - struct osd_device *dev = (struct osd_device *)args; - struct osd_scrub *scrub = &dev->od_scrub; - struct ptlrpc_thread *thread = &scrub->os_thread; - cfs_list_t *list = &scrub->os_inconsistent_items; - struct l_wait_info lwi = { 0 }; - struct super_block *sb = osd_sb(dev); - struct osd_otable_it *it = NULL; - struct osd_otable_cache *ooc = NULL; - int noslot = 0; - int rc; - __u32 max; + struct lu_env env; + struct osd_device *dev = (struct osd_device *)args; + struct osd_scrub *scrub = &dev->od_scrub; + struct ptlrpc_thread *thread = &scrub->os_thread; + struct super_block *sb = osd_sb(dev); + int rc; ENTRY; cfs_daemonize("OI_scrub"); @@ -583,7 +815,6 @@ static int osd_scrub_main(void *args) GOTO(noenv, rc); } - info = osd_oti_get(&env); rc = osd_scrub_prep(dev); if (rc != 0) { CERROR("%.16s: OI scrub, fail to scrub prep, rc = %d\n", @@ -592,10 +823,10 @@ static int osd_scrub_main(void *args) } if (!scrub->os_full_speed) { - LASSERT(dev->od_otable_it != NULL); + struct l_wait_info lwi = { 0 }; + struct osd_otable_it *it = dev->od_otable_it; + struct osd_otable_cache *ooc = &it->ooi_cache; - it = dev->od_otable_it; - ooc = &it->ooi_cache; l_wait_event(thread->t_ctl_waitq, it->ooi_user_ready || !thread_is_running(thread), &lwi); @@ -609,108 +840,8 @@ static int osd_scrub_main(void *args) CDEBUG(D_LFSCK, "OI scrub: flags = 0x%x, pos = %u\n", scrub->os_start_flags, scrub->os_pos_current); - max = le32_to_cpu(LDISKFS_SB(sb)->s_es->s_inodes_count); - while (scrub->os_pos_current <= max) { - struct buffer_head *bitmap = NULL; - struct osd_idmap_cache *oic = NULL; - ldiskfs_group_t bg = (scrub->os_pos_current - 1) / - LDISKFS_INODES_PER_GROUP(sb); - __u32 offset = (scrub->os_pos_current - 1) % - LDISKFS_INODES_PER_GROUP(sb); - __u32 gbase = 1 + bg * LDISKFS_INODES_PER_GROUP(sb); - - bitmap = ldiskfs_read_inode_bitmap(sb, bg); - if (bitmap == NULL) { - CERROR("%.16s: fail to read bitmap at pos = %u, " - "bg = %u, scrub will stop\n", - LDISKFS_SB(sb)->s_es->s_volume_name, - scrub->os_pos_current, (__u32)bg); - GOTO(post, rc = -EIO); - } - - while (offset < LDISKFS_INODES_PER_GROUP(sb)) { - if (unlikely(!thread_is_running(thread))) { - brelse(bitmap); - GOTO(post, rc = 0); - } - - if (cfs_list_empty(list) && noslot != 0) - goto wait; - - rc = osd_scrub_next(info, dev, scrub, sb, bg, - bitmap, gbase, &offset, &oic); - if (rc == SCRUB_NEXT_BREAK) - break; - else if (rc == SCRUB_NEXT_CONTINUE) - goto next; - - if (rc != 0) - rc = osd_scrub_error_handler(dev, &oic->oic_lid, - rc); - else - rc = osd_scrub_check_update(info, dev, oic); - if (rc != 0) { - brelse(bitmap); - GOTO(post, rc); - } - - rc = osd_scrub_checkpoint(scrub); - if (rc != 0) { - CERROR("%.16s: fail to checkpoint, pos = %u, " - "rc = %d\n", - LDISKFS_SB(sb)->s_es->s_volume_name, - scrub->os_pos_current, rc); - brelse(bitmap); - GOTO(post, rc); - } - - if (scrub->os_in_prior) { - scrub->os_in_prior = 0; - continue; - } - -next: - scrub->os_pos_current = gbase + ++offset; - if (dev->od_otable_it != NULL) { - if (unlikely(it == NULL)) { - it = dev->od_otable_it; - ooc = &it->ooi_cache; - } - - if (it->ooi_waiting && - (ooc->ooc_pos_preload < - scrub->os_pos_current)) { - it->ooi_waiting = 0; - cfs_waitq_broadcast( - &thread->t_ctl_waitq); - } - } - - if (scrub->os_full_speed || rc == SCRUB_NEXT_CONTINUE) - continue; - -wait: - if (osd_scrub_has_window(scrub, ooc)) { - noslot = 0; - continue; - } - - scrub->os_waiting = 1; - l_wait_event(thread->t_ctl_waitq, - osd_scrub_has_window(scrub, ooc) || - !cfs_list_empty(list) || - !thread_is_running(thread), - &lwi); - scrub->os_waiting = 0; - - if (osd_scrub_has_window(scrub, ooc)) - noslot = 0; - else - noslot = 1; - } - } - - GOTO(post, rc = (scrub->os_pos_current > max ? 1 : rc)); + rc = osd_inode_iteration(osd_oti_get(&env), dev, ~0U, 0); + GOTO(post, rc); post: osd_scrub_post(scrub, rc); @@ -718,10 +849,10 @@ post: rc, scrub->os_pos_current); out: - while (!cfs_list_empty(list)) { + while (!cfs_list_empty(&scrub->os_inconsistent_items)) { struct osd_inconsistent_item *oii; - oii = cfs_list_entry(list->next, + oii = cfs_list_entry(scrub->os_inconsistent_items.next, struct osd_inconsistent_item, oii_list); cfs_list_del_init(&oii->oii_list); OBD_FREE_PTR(oii); @@ -938,3 +1069,242 @@ void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev) if (dev->od_oi_table != NULL) osd_oi_fini(osd_oti_get(env), dev); } + +static struct dt_it *osd_otable_it_init(const struct lu_env *env, + struct dt_object *dt, __u32 attr, + struct lustre_capa *capa) +{ + enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT; + enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK; + struct osd_device *dev = osd_dev(dt->do_lu.lo_dev); + struct osd_scrub *scrub = &dev->od_scrub; + struct osd_otable_it *it; + __u32 start = 0; + int rc; + ENTRY; + + /* od_otable_mutex: prevent curcurrent init/fini */ + cfs_mutex_lock(&dev->od_otable_mutex); + if (dev->od_otable_it != NULL) + GOTO(out, it = ERR_PTR(-EALREADY)); + + OBD_ALLOC_PTR(it); + if (it == NULL) + GOTO(out, it = ERR_PTR(-ENOMEM)); + + dev->od_otable_it = it; + it->ooi_dev = dev; + it->ooi_cache.ooc_consumer_idx = -1; + if (flags & DOIF_OUTUSED) + it->ooi_used_outside = 1; + + if (flags & DOIF_RESET) + start |= SS_RESET; + + if (valid & DOIV_ERROR_HANDLE) { + if (flags & DOIF_FAILOUT) + start |= SS_SET_FAILOUT; + else + start |= SS_CLEAR_FAILOUT; + } + + rc = do_osd_scrub_start(dev, start); + if (rc == -EALREADY) { + it->ooi_cache.ooc_pos_preload = scrub->os_pos_current - 1; + } else if (rc < 0) { + dev->od_otable_it = NULL; + OBD_FREE_PTR(it); + GOTO(out, it = ERR_PTR(-EALREADY)); + } else { + it->ooi_cache.ooc_pos_preload = scrub->os_pos_current; + } + + GOTO(out, it); + +out: + cfs_mutex_unlock(&dev->od_otable_mutex); + return (struct dt_it *)it; +} + +static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct osd_otable_it *it = (struct osd_otable_it *)di; + struct osd_device *dev = it->ooi_dev; + + /* od_otable_mutex: prevent curcurrent init/fini */ + cfs_mutex_lock(&dev->od_otable_mutex); + do_osd_scrub_stop(&dev->od_scrub); + LASSERT(dev->od_otable_it == it); + + dev->od_otable_it = NULL; + cfs_mutex_unlock(&dev->od_otable_mutex); + OBD_FREE_PTR(it); +} + +/** + * Set the OSD layer iteration start position as the specified key. + * + * The LFSCK out of OSD layer does not know the detail of the key, so if there + * are several keys, they cannot be compared out of OSD, so call "::get()" for + * each key, and OSD will select the smallest one by itself. + */ +static int osd_otable_it_get(const struct lu_env *env, + struct dt_it *di, const struct dt_key *key) +{ + struct osd_otable_it *it = (struct osd_otable_it *)di; + struct osd_otable_cache *ooc = &it->ooi_cache; + const char *str = (const char *)key; + __u32 ino; + ENTRY; + + /* Forbid to set iteration position after iteration started. */ + if (it->ooi_user_ready) + RETURN(-EPERM); + + if (str[0] == '\0') + RETURN(-EINVAL); + + if (sscanf(str, "%u", &ino) <= 0) + RETURN(-EINVAL); + + /* Skip the one that has been processed last time. */ + if (ooc->ooc_pos_preload > ++ino) + ooc->ooc_pos_preload = ino; + + RETURN(0); +} + +static int osd_otable_it_preload(const struct lu_env *env, + struct osd_otable_it *it) +{ + struct osd_device *dev = it->ooi_dev; + struct osd_scrub *scrub = &dev->od_scrub; + struct osd_otable_cache *ooc = &it->ooi_cache; + int rc; + ENTRY; + + rc = osd_inode_iteration(osd_oti_get(env), dev, + OSD_OTABLE_IT_CACHE_SIZE, 1); + if (rc == SCRUB_IT_ALL) + it->ooi_all_cached = 1; + + CDEBUG(D_LFSCK, "OSD pre-loaded: max = %u, preload = %u, rc = %d\n", + le32_to_cpu(LDISKFS_SB(osd_sb(dev))->s_es->s_inodes_count), + ooc->ooc_pos_preload, rc); + + if (scrub->os_waiting && osd_scrub_has_window(scrub, ooc)) { + scrub->os_waiting = 0; + cfs_waitq_broadcast(&scrub->os_thread.t_ctl_waitq); + } + + RETURN(rc < 0 ? rc : ooc->ooc_cached_items); +} + +static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct osd_otable_it *it = (struct osd_otable_it *)di; + struct osd_device *dev = it->ooi_dev; + struct osd_scrub *scrub = &dev->od_scrub; + struct osd_otable_cache *ooc = &it->ooi_cache; + struct ptlrpc_thread *thread = &scrub->os_thread; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + LASSERT(it->ooi_user_ready); + +again: + if (!thread_is_running(thread) && !it->ooi_used_outside) + RETURN(1); + + if (ooc->ooc_cached_items > 0) { + ooc->ooc_cached_items--; + ooc->ooc_consumer_idx = (ooc->ooc_consumer_idx + 1) & + ~OSD_OTABLE_IT_CACHE_MASK; + RETURN(0); + } + + if (it->ooi_all_cached) { + l_wait_event(thread->t_ctl_waitq, + !thread_is_running(thread), + &lwi); + RETURN(1); + } + + it->ooi_waiting = 1; + l_wait_event(thread->t_ctl_waitq, + ooc->ooc_pos_preload < scrub->os_pos_current || + !thread_is_running(thread), + &lwi); + it->ooi_waiting = 0; + + if (!thread_is_running(thread) && !it->ooi_used_outside) + RETURN(1); + + rc = osd_otable_it_preload(env, it); + if (rc >= 0) + goto again; + + RETURN(rc); +} + +static struct dt_key *osd_otable_it_key(const struct lu_env *env, + const struct dt_it *di) +{ + struct osd_otable_it *it = (struct osd_otable_it *)di; + struct osd_otable_cache *ooc = &it->ooi_cache; + + sprintf(it->ooi_key, "%u", + ooc->ooc_cache[ooc->ooc_consumer_idx].oic_lid.oii_ino); + return (struct dt_key *)it->ooi_key; +} + +static int osd_otable_it_key_size(const struct lu_env *env, + const struct dt_it *di) +{ + return sizeof(((struct osd_otable_it *)di)->ooi_key); +} + +static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + struct osd_otable_it *it = (struct osd_otable_it *)di; + struct osd_otable_cache *ooc = &it->ooi_cache; + + *(struct lu_fid *)rec = ooc->ooc_cache[ooc->ooc_consumer_idx].oic_fid; + return 0; +} + +static int osd_otable_it_load(const struct lu_env *env, + const struct dt_it *di, __u64 hash) +{ + struct osd_otable_it *it = (struct osd_otable_it *)di; + struct osd_device *dev = it->ooi_dev; + struct osd_otable_cache *ooc = &it->ooi_cache; + struct osd_scrub *scrub = &dev->od_scrub; + + if (it->ooi_user_ready) + return 0; + + if (ooc->ooc_pos_preload < LDISKFS_FIRST_INO(osd_sb(dev))) + ooc->ooc_pos_preload = LDISKFS_FIRST_INO(osd_sb(dev)); + it->ooi_user_ready = 1; + if (!scrub->os_full_speed) + cfs_waitq_broadcast(&scrub->os_thread.t_ctl_waitq); + + /* Unplug OSD layer iteration by the first next() call. */ + return osd_otable_it_next(env, (struct dt_it *)it); +} + +const struct dt_index_operations osd_otable_ops = { + .dio_it = { + .init = osd_otable_it_init, + .fini = osd_otable_it_fini, + .get = osd_otable_it_get, + .next = osd_otable_it_next, + .key = osd_otable_it_key, + .key_size = osd_otable_it_key_size, + .rec = osd_otable_it_rec, + .load = osd_otable_it_load, + } +}; -- 1.8.3.1