From f739b9c14354676792449a613476fcbcebec805f Mon Sep 17 00:00:00 2001 From: nasf Date: Wed, 18 May 2011 14:35:02 +0800 Subject: [PATCH] LU-200 1.8.6 client supports 64-bit dir name hash under interoperability mode 1.8.6 client supports 64-bit dir name hash under interoperability mode. Signed-off-by: nasf Change-Id: If8130cfe809424502f40ff32dcb89deb8b82d720 Reviewed-on: http://review.whamcloud.com/410 Tested-by: Hudson Reviewed-by: Johann Lombardi --- lustre/include/lustre/lustre_idl.h | 2 + lustre/llite/dir.c | 157 ++++++++++++++++++++++++++++++------- lustre/llite/llite_internal.h | 10 ++- lustre/llite/llite_lib.c | 5 +- lustre/llite/llite_nfs.c | 2 +- lustre/llite/namei.c | 10 +-- lustre/obdclass/lprocfs_status.c | 1 + lustre/ptlrpc/wiretest.c | 1 + lustre/utils/wirecheck.c | 1 + lustre/utils/wiretest.c | 1 + 10 files changed, 152 insertions(+), 38 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 7d28f95..ece8717 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -359,6 +359,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb, int msgsize); #define OBD_CONNECT_MAX_EASIZE 0x800000000ULL /* preserved for large EA */ #define OBD_CONNECT_FULL20 0x1000000000ULL /* it is 2.0 client */ #define OBD_CONNECT_LAYOUTLOCK 0x2000000000ULL /* client supports layout lock */ +#define OBD_CONNECT_64BITHASH 0x4000000000ULL /* client supports 64-bits + * directory hash */ /* also update obd_connect_names[] for lprocfs_rd_connect_flags() * and lustre/utils/wirecheck.c */ diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index d101b30..0f32502 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -434,9 +434,14 @@ static inline void ll_dir_chain_fini(struct ll_dir_chain *chain) { } -static inline __u32 hash_x_index(__u32 value) +static inline unsigned long hash_x_index(__u64 hash) { - return ((__u32)~0) - value; +#ifdef __KERNEL__ +# if BITS_PER_LONG == 32 + hash >>= 32; +# endif +#endif + return ~0UL - hash; } /** @@ -569,6 +574,7 @@ static inline int lu_dirent_size(struct lu_dirent *ent) } #define DIR_END_OFF 0xfffffffffffffffeULL +#define DIR_END_OFF_32BIT 0xfffffffeUL #ifdef HAVE_RW_TREE_LOCK #define TREE_READ_LOCK_IRQ(mapping) read_lock_irq(&(mapping)->tree_lock) @@ -589,7 +595,11 @@ static int ll_dir_readpage_20(struct file *file, struct page *page) int rc; ENTRY; - hash = hash_x_index(page->index); + /*XXX: statahead is disabled by force under interoperability mode. + * So file must not be NULL here. Fix me when enable statahead + * under interoperability mode. */ + LASSERT(file != NULL); + hash = ((struct ll_file_data *)LUSTRE_FPRIVATE(file))->fd_dir.lfd_next; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n", inode->i_ino, inode->i_generation, inode, (unsigned long)hash); @@ -627,7 +637,7 @@ static void ll_check_page(struct inode *dir, struct page *page) /* * Find, kmap and return page that contains given hash. */ -static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, +static struct page *ll_dir_page_locate(struct inode *dir, __u64 *hash, __u64 *start, __u64 *end) { struct address_space *mapping = dir->i_mapping; @@ -636,7 +646,7 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, * radix_tree_gang_lookup() can be used to find a page with starting * hash _smaller_ than one we are looking for. */ - unsigned long offset = hash_x_index(hash); + unsigned long offset = hash_x_index(*hash); struct page *page; int found; ENTRY; @@ -660,10 +670,17 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, wait_on_page(page); if (PageUptodate(page)) { dp = kmap(page); +#if BITS_PER_LONG == 32 + *start = le64_to_cpu(dp->ldp_hash_start) >> 32; + *end = le64_to_cpu(dp->ldp_hash_end) >> 32; + *hash = *hash >> 32; +#else *start = le64_to_cpu(dp->ldp_hash_start); *end = le64_to_cpu(dp->ldp_hash_end); - LASSERT(*start <= hash); - if (hash > *end || (*end != *start && hash == *end)) { +#endif + LASSERTF(*start <= *hash, "start = "LPX64",end = " + LPX64",hash = "LPX64"\n", *start, *end, *hash); + if (*hash > *end || (*end != *start && *hash == *end)) { kunmap(page); lock_page(page); ll_truncate_complete_page(page); @@ -683,7 +700,8 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, RETURN(page); } -static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact, +static struct page *ll_get_dir_page_20(struct file *filp, struct inode *dir, + __u64 hash, int exact, struct ll_dir_chain *chain) { struct ldlm_res_id res_id; @@ -697,6 +715,7 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact, int rc; __u64 start = 0; __u64 end = 0; + __u64 lhash = hash; ENTRY; fid_build_reg_res_name(ll_inode_lu_fid(dir), &res_id); @@ -725,7 +744,7 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact, } ldlm_lock_dump_handle(D_OTHER, &lockh); - page = ll_dir_page_locate(dir, hash, &start, &end); + page = ll_dir_page_locate(dir, &lhash, &start, &end); if (IS_ERR(page)) GOTO(out_unlock, page); @@ -752,7 +771,7 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact, * be invalidated, and new one fetched. */ CDEBUG(D_INFO, "Stale readpage page %p: %#lx != %#lx\n", - page, (unsigned long)hash, (unsigned long)start); + page, (unsigned long)lhash, (unsigned long)start); lock_page(page); ll_truncate_complete_page(page); unlock_page(page); @@ -763,7 +782,7 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact, } page = read_cache_page(mapping, hash_x_index(hash), - (filler_t*)ll_dir_readpage_20, NULL); + (filler_t*)ll_dir_readpage_20, filp); if (IS_ERR(page)) GOTO(out_unlock, page); @@ -778,11 +797,23 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact, hash_collision: dp = page_address(page); +#if BITS_PER_LONG == 32 + start = le64_to_cpu(dp->ldp_hash_start) >> 32; + end = le64_to_cpu(dp->ldp_hash_end) >> 32; + lhash = hash >> 32; +#else start = le64_to_cpu(dp->ldp_hash_start); end = le64_to_cpu(dp->ldp_hash_end); + lhash = hash; +#endif if (end == start) { - LASSERT(start == hash); - CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end); + LASSERT(start == lhash); + CWARN("Page-wide hash collision: "LPU64"\n", end); +#if BITS_PER_LONG == 32 + CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with " + "hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start), + le64_to_cpu(dp->ldp_hash_end), hash); +#endif /* * Fetch whole overflow chain... * @@ -803,8 +834,9 @@ fail: static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir) { struct inode *inode = filp->f_dentry->d_inode; - __u64 pos = filp->f_pos; struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(filp); + __u64 pos = fd->fd_dir.lfd_pos; struct page *page; struct ll_dir_chain chain; int rc; @@ -830,7 +862,9 @@ static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir) shift = 0; ll_dir_chain_init(&chain); - page = ll_get_dir_page_20(inode, pos, 0, &chain); + fd->fd_dir.lfd_next = pos; + page = ll_get_dir_page_20(filp, inode, pos, 0, &chain); + while (rc == 0 && !done) { struct lu_dirpage *dp; @@ -851,10 +885,9 @@ static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir) int namelen; struct lu_fid fid; __u64 ino; + __u64 lhash; - hash = le64_to_cpu(ent->lde_hash); - namelen = le16_to_cpu(ent->lde_namelen); - + hash = le64_to_cpu(ent->lde_hash); if (hash < pos) /* * Skip until we find target hash @@ -862,41 +895,46 @@ static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir) */ continue; + namelen = le16_to_cpu(ent->lde_namelen); if (namelen == 0) /* * Skip dummy record. */ continue; - fid = ent->lde_fid; name = ent->lde_name; - fid_le_to_cpu(&fid, &fid); - if (need_32bit) + fid_le_to_cpu(&fid, &ent->lde_fid); + if (need_32bit) { + lhash = hash >> 32; ino = ll_fid_build_ino32((struct ll_fid *)&fid); - else + } else { + lhash = hash; ino = ll_fid_build_ino((struct ll_fid *)&fid); + } type = ll_dirent_type_get(ent); done = filldir(cookie, name, namelen, - (loff_t)hash, ino, type); + lhash, ino, type); } next = le64_to_cpu(dp->ldp_hash_end); ll_put_page(page); if (!done) { pos = next; - if (pos == DIR_END_OFF) + if (pos == DIR_END_OFF) { /* * End of directory reached. */ done = 1; - else if (1 /* chain is exhausted*/) + } else if (1 /* chain is exhausted*/) { /* * Normal case: continue to the next * page. */ - page = ll_get_dir_page_20(inode, pos, 1, + fd->fd_dir.lfd_next = pos; + page = ll_get_dir_page_20(filp, inode, + pos, 1, &chain); - else { + } else { /* * go into overflow page. */ @@ -912,7 +950,15 @@ static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir) } } - filp->f_pos = (loff_t)(__s32)pos; + fd->fd_dir.lfd_pos = pos; + if (need_32bit) { + if (pos == DIR_END_OFF) + filp->f_pos = DIR_END_OFF_32BIT; + else + filp->f_pos = pos >> 32; + } else { + filp->f_pos = pos; + } filp->f_version = inode->i_version; touch_atime(filp->f_vfsmnt, filp->f_dentry); @@ -1612,8 +1658,63 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file, } } +static loff_t ll_dir_seek(struct file *file, loff_t offset, int origin) +{ + struct inode *inode = file->f_mapping->host; + struct ll_sb_info *sbi = ll_i2sbi(inode); + int need_32bit = ll_need_32bit_api(sbi); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + loff_t ret = -EINVAL; + ENTRY; + + if (!(sbi->ll_mdc_exp->exp_connect_flags & OBD_CONNECT_FID)) + return default_llseek(file, offset, origin); + + mutex_lock(&inode->i_mutex); + switch (origin) { + case 2: + offset += inode->i_size; + break; + case 1: + if ((need_32bit && file->f_pos == DIR_END_OFF_32BIT) || + (!need_32bit && file->f_pos == DIR_END_OFF)) { + if (offset == 0) + GOTO(out, ret = file->f_pos); + else if (offset > 0) + GOTO(out, ret); + } + offset += file->f_pos; + break; + } + + if (need_32bit && offset >= 0 && offset <= DIR_END_OFF_32BIT) { + if (offset != file->f_pos) { + if (offset == DIR_END_OFF_32BIT) + fd->fd_dir.lfd_pos = DIR_END_OFF; + else + fd->fd_dir.lfd_pos = offset << 32; + file->f_pos = offset; + file->f_version = 0; + } + ret = offset; + } else if (!need_32bit && (offset >= 0 || offset == DIR_END_OFF)) { + if (offset != file->f_pos) { + fd->fd_dir.lfd_pos = offset; + file->f_pos = offset; + file->f_version = 0; + } + ret = offset; + } + EXIT; + +out: + mutex_unlock(&inode->i_mutex); + return ret; +} + struct file_operations ll_dir_operations = { .open = ll_file_open, + .llseek = ll_dir_seek, .release = ll_file_release, .read = generic_read_dir, .readdir = ll_readdir, diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index a3458b7..5bf3b3e 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -565,6 +565,11 @@ struct ll_readahead_state { unsigned long ras_consecutive_stride_requests; }; +struct ll_file_dir { + __u64 lfd_pos; + __u64 lfd_next; +}; + extern cfs_mem_cache_t *ll_file_data_slab; extern struct rw_semaphore ll_sb_sem; struct lustre_handle; @@ -573,6 +578,7 @@ struct ll_file_data { int fd_omode; struct lustre_handle fd_cwlockh; unsigned long fd_gid; + struct ll_file_dir fd_dir; __u32 fd_flags; }; @@ -1138,7 +1144,7 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) return do_statahead_enter(dir, dentryp, lookup); } -static void inline ll_dops_init(struct dentry *de, int block) +static void inline ll_dops_init(struct dentry *de, int block, int init_sa) { struct ll_dentry_data *lld = ll_d2d(de); @@ -1147,7 +1153,7 @@ static void inline ll_dops_init(struct dentry *de, int block) lld = ll_d2d(de); } - if (lld != NULL) + if (lld != NULL && init_sa != 0) lld->lld_sa_generation = 0; de->d_op = &ll_d_ops; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 95fc4c0..7a47ce1 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -281,8 +281,9 @@ static int client_common_fill_super(struct super_block *sb, data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_IBITS | OBD_CONNECT_JOIN | OBD_CONNECT_ATTRFID | OBD_CONNECT_NODEVOH | OBD_CONNECT_CANCELSET | - OBD_CONNECT_AT | OBD_CONNECT_FID | - OBD_CONNECT_VBR | OBD_CONNECT_LOV_V3; + OBD_CONNECT_AT | OBD_CONNECT_FID | + OBD_CONNECT_VBR | OBD_CONNECT_LOV_V3 | + OBD_CONNECT_64BITHASH; #ifdef HAVE_LRU_RESIZE_SUPPORT if (sbi->ll_flags & LL_SBI_LRU_RESIZE) data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE; diff --git a/lustre/llite/llite_nfs.c b/lustre/llite/llite_nfs.c index fc5e9b5..d75c4e6 100644 --- a/lustre/llite/llite_nfs.c +++ b/lustre/llite/llite_nfs.c @@ -135,7 +135,7 @@ static struct dentry *ll_iget_for_nfs(struct super_block *sb, result = d_obtain_alias(inode); if (!result) RETURN(ERR_PTR(-ENOMEM)); - ll_dops_init(result, 1); + ll_dops_init(result, 1, 0); RETURN(result); } diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 7535b00..3e79725 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -462,7 +462,7 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) lock_dentry(dentry); __d_drop(dentry); unlock_dentry(dentry); - ll_dops_init(dentry, 0); + ll_dops_init(dentry, 0, 1); d_rehash_cond(dentry, 0); /* avoid taking dcache_lock inside */ spin_unlock(&dcache_lock); spin_unlock(&ll_lookup_lock); @@ -481,7 +481,7 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) last_discon->d_flags |= DCACHE_LUSTRE_INVALID; spin_unlock(&dcache_lock); spin_unlock(&ll_lookup_lock); - ll_dops_init(last_discon, 1); + ll_dops_init(last_discon, 1, 1); d_rehash(de); d_move(last_discon, de); iput(inode); @@ -532,7 +532,7 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset, ll_glimpse_size or some equivalent themselves anyway. Also see bug 7198. */ - ll_dops_init(*de, 1); + ll_dops_init(*de, 1, 1); *de = ll_find_alias(inode, *de); if (*de != save) { struct ll_dentry_data *lld = ll_d2d(*de); @@ -552,7 +552,7 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset, unlock_dentry(*de); } } else { - ll_dops_init(*de, 1); + ll_dops_init(*de, 1, 1); /* Check that parent has UPDATE lock. If there is none, we cannot afford to hash this dentry (done by ll_d_add) as it might get picked up later when UPDATE lock will appear */ @@ -730,7 +730,7 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry, if ((nd->flags & LOOKUP_CREATE ) && !(nd->flags & LOOKUP_OPEN)) { /* We are sure this is new dentry, so we need to create our private data and set the dentry ops */ - ll_dops_init(dentry, 1); + ll_dops_init(dentry, 1, 1); RETURN(NULL); } it = ll_convert_intent(&nd->intent.open, nd->flags); diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 40410c0..e9e4761 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -752,6 +752,7 @@ static const char *obd_connect_names[] = { "large_ea", "full20", "layout_lock", + "64bithash", NULL }; diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 329d01e..00ba531 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -564,6 +564,7 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_MAX_EASIZE == 0x800000000ULL); CLASSERT(OBD_CONNECT_FULL20 == 0x1000000000ULL); CLASSERT(OBD_CONNECT_LAYOUTLOCK == 0x2000000000ULL); + CLASSERT(OBD_CONNECT_64BITHASH == 0x4000000000ULL); /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index f978997..1dd1276 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -249,6 +249,7 @@ static void check_obd_connect_data(void) CHECK_CDEFINE(OBD_CONNECT_MAX_EASIZE); CHECK_CDEFINE(OBD_CONNECT_FULL20); CHECK_CDEFINE(OBD_CONNECT_LAYOUTLOCK); + CHECK_CDEFINE(OBD_CONNECT_64BITHASH); } static void diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 0138e9f..91f02fc 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -562,6 +562,7 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_MAX_EASIZE == 0x800000000ULL); CLASSERT(OBD_CONNECT_FULL20 == 0x1000000000ULL); CLASSERT(OBD_CONNECT_LAYOUTLOCK == 0x2000000000ULL); + CLASSERT(OBD_CONNECT_64BITHASH == 0x4000000000ULL); /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", -- 1.8.3.1