From b8c8ce3d1082929360ad168db85ec687b3e07026 Mon Sep 17 00:00:00 2001 From: ericm Date: Thu, 9 Apr 2009 20:50:31 +0000 Subject: [PATCH] branch: HEAD OSD doing batch read of dir entries in compatiblility mode iterator. b=17560 o=pravin i=oleg,rahul,pravin,rread --- lustre/mdd/mdd_orphans.c | 2 +- lustre/osd/osd_handler.c | 98 ++++++++++++++++++++++++++++++++--------------- lustre/osd/osd_internal.h | 28 ++++++++++---- 3 files changed, 90 insertions(+), 38 deletions(-) diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index 6030725..f4ea74b 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -374,7 +374,7 @@ static int orph_index_iterate(const struct lu_env *env, iops = &dor->do_index_ops->dio_it; it = iops->init(env, dor, BYPASS_CAPA); if (it != NULL) { - result = iops->get(env, it, (const void *)""); + result = iops->load(env, it, 0); if (result > 0) { /* main cycle */ do { diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 268c7b9..57c930c 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -3072,9 +3072,12 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env, obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); obj_dentry->d_name.hash = 0; - it->oie_namelen = 0; + it->oie_rd_dirent = 0; + it->oie_it_dirent = 0; it->oie_curr_pos = 0; it->oie_next_pos = 0; + it->oie_dirent = NULL; + it->oie_buf = info->oti_it_ea_buf; it->oie_obj = obj; it->oie_file.f_dentry = obj_dentry; it->oie_file.f_mapping = obj->oo_inode->i_mapping; @@ -3118,9 +3121,11 @@ static int osd_it_ea_get(const struct lu_env *env, ENTRY; LASSERT(((const char *)key)[0] == '\0'); - it->oie_namelen = 0; it->oie_curr_pos = 0; it->oie_next_pos = 0; + it->oie_rd_dirent = 0; + it->oie_it_dirent = 0; + it->oie_dirent = NULL; RETURN(+1); } @@ -3147,22 +3152,27 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, loff_t offset, ino_t ino, unsigned int d_type) { - struct osd_it_ea *it = (struct osd_it_ea *)buf; - struct dirent64 *dirent = &it->oie_dirent64; - + struct osd_it_ea *it = (struct osd_it_ea *)buf; + struct osd_it_ea_dirent *ent = it->oie_dirent; ENTRY; - if (it->oie_namelen) - RETURN(-ENOENT); - if (namelen == 0 || namelen > LDISKFS_NAME_LEN) + /* this should never happen */ + if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) { + CERROR("ldiskfs return invalid namelen %d\n", namelen); RETURN(-EIO); + } - strncpy(dirent->d_name, name, LDISKFS_NAME_LEN); - dirent->d_name[namelen] = 0; - dirent->d_ino = ino; - it->oie_namelen = namelen; - it->oie_curr_pos = offset; + if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen > + OSD_IT_EA_BUFSIZE) + RETURN(1); + ent->oied_ino = ino; + ent->oied_off = offset; + ent->oied_namelen = namelen; + memcpy(ent->oied_name, name, namelen); + + it->oie_rd_dirent++; + it->oie_dirent = (void *) ent + size_round(sizeof(*ent) + namelen); RETURN(0); } @@ -3175,7 +3185,7 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, * \retval 0, on success * \retval -ve, on error */ -int osd_ldiskfs_it_fill(const struct dt_it *di) +static int osd_ldiskfs_it_fill(const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; struct osd_object *obj = it->oie_obj; @@ -3183,7 +3193,8 @@ int osd_ldiskfs_it_fill(const struct dt_it *di) int result = 0; ENTRY; - it->oie_namelen = 0; + it->oie_dirent = it->oie_buf; + it->oie_rd_dirent = 0; it->oie_file.f_pos = it->oie_curr_pos; down_read(&obj->oo_ext_idx_sem); @@ -3193,8 +3204,12 @@ int osd_ldiskfs_it_fill(const struct dt_it *di) up_read(&obj->oo_ext_idx_sem); it->oie_next_pos = it->oie_file.f_pos; - if (it->oie_namelen == 0) + if (it->oie_rd_dirent == 0) { result = -EIO; + } else { + it->oie_dirent = it->oie_buf; + it->oie_it_dirent = 1; + } RETURN(result); } @@ -3216,12 +3231,21 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) int rc; ENTRY; - it->oie_curr_pos = it->oie_next_pos; - if (it->oie_curr_pos == LDISKFS_HTREE_EOF) - rc = +1; - else - rc = osd_ldiskfs_it_fill(di); + if (it->oie_it_dirent < it->oie_rd_dirent) { + it->oie_dirent = (void *) it->oie_dirent + + size_round(sizeof(struct osd_it_ea_dirent) + + it->oie_dirent->oied_namelen); + it->oie_it_dirent++; + RETURN(0); + } else { + it->oie_curr_pos = it->oie_next_pos; + + if (it->oie_curr_pos == LDISKFS_HTREE_EOF) + rc = +1; + else + rc = osd_ldiskfs_it_fill(di); + } RETURN(rc); } @@ -3238,7 +3262,7 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env, { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN((struct dt_key *)it->oie_dirent64.d_name); + RETURN((struct dt_key *)it->oie_dirent->oied_name); } /** @@ -3252,7 +3276,7 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN(it->oie_namelen); + RETURN(it->oie_dirent->oied_namelen); } /** @@ -3279,7 +3303,7 @@ static struct dt_rec *osd_it_ea_rec(const struct lu_env *env, ENTRY; dev = osd_dev(ldev); - id->oii_ino = it->oie_dirent64.d_ino; + id->oii_ino = it->oie_dirent->oied_ino; id->oii_gen = OSD_OII_NOGEN; inode = osd_iget(info, dev, id); if (!IS_ERR(inode)) { @@ -3310,7 +3334,7 @@ static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di) { struct osd_it_ea *it = (struct osd_it_ea *)di; ENTRY; - RETURN(it->oie_curr_pos); + RETURN(it->oie_dirent->oied_off); } /** @@ -3406,15 +3430,29 @@ static void *osd_key_init(const struct lu_context *ctx, struct osd_thread_info *info; OBD_ALLOC_PTR(info); - if (info != NULL) - info->oti_env = container_of(ctx, struct lu_env, le_ctx); - else + if (info != NULL) { + OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + if (info->oti_it_ea_buf != NULL) { + info->oti_env = container_of(ctx, struct lu_env, + le_ctx); + } else { + OBD_FREE_PTR(info); + info = ERR_PTR(-ENOMEM); + } + } else { info = ERR_PTR(-ENOMEM); + } return info; } -/* context key destructor: osd_key_fini */ -LU_KEY_FINI(osd, struct osd_thread_info); +static void osd_key_fini(const struct lu_context *ctx, + struct lu_context_key *key, void* data) +{ + struct osd_thread_info *info = data; + + OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE); + OBD_FREE_PTR(info); +} static void osd_key_exit(const struct lu_context *ctx, struct lu_context_key *key, void *data) diff --git a/lustre/osd/osd_internal.h b/lustre/osd/osd_internal.h index 8007fbd..df1600b 100644 --- a/lustre/osd/osd_internal.h +++ b/lustre/osd/osd_internal.h @@ -128,6 +128,15 @@ struct osd_device { __u32 od_iop_mode; }; +struct osd_it_ea_dirent { + __u64 oied_ino; + __u64 oied_off; + unsigned short oied_namelen; + char oied_name[0]; +} __attribute__((packed)); + +#define OSD_IT_EA_BUFSIZE CFS_PAGE_SIZE + /** * This is iterator's in-memory data structure in interoperability * mode (i.e. iterator over ldiskfs style directory) @@ -136,15 +145,18 @@ struct osd_it_ea { struct osd_object *oie_obj; /** used in ldiskfs iterator, to stored file pointer */ struct file oie_file; - /** used in ldiskfs iterator, to store directory entry */ - struct dirent64 oie_dirent64; /** current file position */ - __u64 oie_curr_pos; + __u64 oie_curr_pos; /** next file position */ - __u64 oie_next_pos; - /** namelen of the file */ - __u8 oie_namelen; - + __u64 oie_next_pos; + /** how many entries have been read-cached from storage */ + int oie_rd_dirent; + /** current entry is being iterated by caller */ + int oie_it_dirent; + /** current processing entry */ + struct osd_it_ea_dirent *oie_dirent; + /** buffer to hold entries, size == OSD_IT_EA_BUFSIZE */ + void *oie_buf; }; /** @@ -201,6 +213,8 @@ struct osd_thread_info { struct osd_it_ea oti_it_ea; }; + /** pre-allocated buffer used by oti_it_ea, size OSD_IT_EA_BUFSIZE */ + void *oti_it_ea_buf; /** IAM iterator for index operation. */ struct iam_iterator oti_idx_it; -- 1.8.3.1