Whamcloud - gitweb
LU-200 1.8.6 client supports 64-bit dir name hash under interoperability mode
authornasf <yong.fan@whamcloud.com>
Wed, 18 May 2011 06:35:02 +0000 (14:35 +0800)
committerJohann Lombardi <johann@whamcloud.com>
Wed, 18 May 2011 09:14:00 +0000 (02:14 -0700)
1.8.6 client supports 64-bit dir name hash under interoperability mode.

Signed-off-by: nasf <yong.fan@whamcloud.com>
Change-Id: If8130cfe809424502f40ff32dcb89deb8b82d720
Reviewed-on: http://review.whamcloud.com/410
Tested-by: Hudson
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
lustre/include/lustre/lustre_idl.h
lustre/llite/dir.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_nfs.c
lustre/llite/namei.c
lustre/obdclass/lprocfs_status.c
lustre/ptlrpc/wiretest.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 7d28f95..ece8717 100644 (file)
@@ -359,6 +359,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb, int msgsize);
 #define OBD_CONNECT_MAX_EASIZE    0x800000000ULL /* preserved for large EA */
 #define OBD_CONNECT_FULL20       0x1000000000ULL /* it is 2.0 client */
 #define OBD_CONNECT_LAYOUTLOCK   0x2000000000ULL /* client supports layout lock */
+#define OBD_CONNECT_64BITHASH    0x4000000000ULL /* client supports 64-bits
+                                                  * directory hash */
 /* also update obd_connect_names[] for lprocfs_rd_connect_flags()
  * and lustre/utils/wirecheck.c */
 
index d101b30..0f32502 100644 (file)
@@ -434,9 +434,14 @@ static inline void ll_dir_chain_fini(struct ll_dir_chain *chain)
 {
 }
 
-static inline __u32 hash_x_index(__u32 value)
+static inline unsigned long hash_x_index(__u64 hash)
 {
-        return ((__u32)~0) - value;
+#ifdef __KERNEL__
+# if BITS_PER_LONG == 32
+        hash >>= 32;
+# endif
+#endif
+        return ~0UL - hash;
 }
 
 /**
@@ -569,6 +574,7 @@ static inline int lu_dirent_size(struct lu_dirent *ent)
 }
 
 #define DIR_END_OFF              0xfffffffffffffffeULL
+#define DIR_END_OFF_32BIT        0xfffffffeUL
 
 #ifdef HAVE_RW_TREE_LOCK
 #define TREE_READ_LOCK_IRQ(mapping)     read_lock_irq(&(mapping)->tree_lock)
@@ -589,7 +595,11 @@ static int ll_dir_readpage_20(struct file *file, struct page *page)
         int rc;
         ENTRY;
 
-        hash = hash_x_index(page->index);
+        /*XXX: statahead is disabled by force under interoperability mode.
+         *     So file must not be NULL here. Fix me when enable statahead
+         *     under interoperability mode. */
+        LASSERT(file != NULL);
+        hash = ((struct ll_file_data *)LUSTRE_FPRIVATE(file))->fd_dir.lfd_next;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n",
                inode->i_ino, inode->i_generation, inode, (unsigned long)hash);
 
@@ -627,7 +637,7 @@ static void ll_check_page(struct inode *dir, struct page *page)
 /*
  * Find, kmap and return page that contains given hash.
  */
-static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
+static struct page *ll_dir_page_locate(struct inode *dir, __u64 *hash,
                                        __u64 *start, __u64 *end)
 {
         struct address_space *mapping = dir->i_mapping;
@@ -636,7 +646,7 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
          * radix_tree_gang_lookup() can be used to find a page with starting
          * hash _smaller_ than one we are looking for.
          */
-        unsigned long offset = hash_x_index(hash);
+        unsigned long offset = hash_x_index(*hash);
         struct page *page;
         int found;
         ENTRY;
@@ -660,10 +670,17 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
                 wait_on_page(page);
                 if (PageUptodate(page)) {
                         dp = kmap(page);
+#if BITS_PER_LONG == 32
+                        *start = le64_to_cpu(dp->ldp_hash_start) >> 32;
+                        *end   = le64_to_cpu(dp->ldp_hash_end) >> 32;
+                        *hash  = *hash >> 32;
+#else
                         *start = le64_to_cpu(dp->ldp_hash_start);
                         *end   = le64_to_cpu(dp->ldp_hash_end);
-                        LASSERT(*start <= hash);
-                        if (hash > *end || (*end != *start && hash == *end)) {
+#endif
+                        LASSERTF(*start <= *hash, "start = "LPX64",end = "
+                                 LPX64",hash = "LPX64"\n", *start, *end, *hash);
+                        if (*hash > *end || (*end != *start && *hash == *end)) {
                                 kunmap(page);
                                 lock_page(page);
                                 ll_truncate_complete_page(page);
@@ -683,7 +700,8 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
         RETURN(page);
 }
 
-static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact,
+static struct page *ll_get_dir_page_20(struct file *filp, struct inode *dir,
+                                       __u64 hash, int exact,
                                        struct ll_dir_chain *chain)
 {
         struct ldlm_res_id res_id;
@@ -697,6 +715,7 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact,
         int rc;
         __u64 start = 0;
         __u64 end = 0;
+        __u64 lhash = hash;
         ENTRY;
  
         fid_build_reg_res_name(ll_inode_lu_fid(dir), &res_id);
@@ -725,7 +744,7 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact,
         }
         ldlm_lock_dump_handle(D_OTHER, &lockh);
 
-        page = ll_dir_page_locate(dir, hash, &start, &end);
+        page = ll_dir_page_locate(dir, &lhash, &start, &end);
         if (IS_ERR(page))
                 GOTO(out_unlock, page);
 
@@ -752,7 +771,7 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact,
                          * be invalidated, and new one fetched.
                          */
                         CDEBUG(D_INFO, "Stale readpage page %p: %#lx != %#lx\n",
-                              page, (unsigned long)hash, (unsigned long)start);
+                              page, (unsigned long)lhash, (unsigned long)start);
                         lock_page(page);
                         ll_truncate_complete_page(page);
                         unlock_page(page);
@@ -763,7 +782,7 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact,
         }
 
         page = read_cache_page(mapping, hash_x_index(hash),
-                               (filler_t*)ll_dir_readpage_20, NULL);
+                               (filler_t*)ll_dir_readpage_20, filp);
         if (IS_ERR(page))
                 GOTO(out_unlock, page);
 
@@ -778,11 +797,23 @@ static struct page *ll_get_dir_page_20(struct inode *dir, __u64 hash, int exact,
 hash_collision:
         dp = page_address(page);
 
+#if BITS_PER_LONG == 32
+        start = le64_to_cpu(dp->ldp_hash_start) >> 32;
+        end   = le64_to_cpu(dp->ldp_hash_end) >> 32;
+        lhash = hash >> 32;
+#else
         start = le64_to_cpu(dp->ldp_hash_start);
         end   = le64_to_cpu(dp->ldp_hash_end);
+        lhash = hash;
+#endif
         if (end == start) {
-                LASSERT(start == hash);
-                CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end);
+                LASSERT(start == lhash);
+                CWARN("Page-wide hash collision: "LPU64"\n", end);
+#if BITS_PER_LONG == 32
+                CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with "
+                      "hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start),
+                      le64_to_cpu(dp->ldp_hash_end), hash);
+#endif
                 /*
                  * Fetch whole overflow chain...
                  *
@@ -803,8 +834,9 @@ fail:
 static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir)
 {
         struct inode         *inode = filp->f_dentry->d_inode;
-        __u64                 pos   = filp->f_pos;
         struct ll_sb_info    *sbi   = ll_i2sbi(inode);
+        struct ll_file_data  *fd    = LUSTRE_FPRIVATE(filp);
+        __u64                 pos   = fd->fd_dir.lfd_pos;
         struct page          *page;
         struct ll_dir_chain   chain;
         int rc;
@@ -830,7 +862,9 @@ static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir)
         shift = 0;
         ll_dir_chain_init(&chain);
 
-        page = ll_get_dir_page_20(inode, pos, 0, &chain);
+        fd->fd_dir.lfd_next = pos;
+        page = ll_get_dir_page_20(filp, inode, pos, 0, &chain);
+
 
         while (rc == 0 && !done) {
                 struct lu_dirpage *dp;
@@ -851,10 +885,9 @@ static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir)
                                 int            namelen;
                                 struct lu_fid  fid;
                                 __u64          ino;
+                                __u64          lhash;
 
-                                hash    = le64_to_cpu(ent->lde_hash);
-                                namelen = le16_to_cpu(ent->lde_namelen);
-
+                                hash = le64_to_cpu(ent->lde_hash);
                                 if (hash < pos)
                                         /*
                                          * Skip until we find target hash
@@ -862,41 +895,46 @@ static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir)
                                          */
                                         continue;
 
+                                namelen = le16_to_cpu(ent->lde_namelen);
                                 if (namelen == 0)
                                         /*
                                          * Skip dummy record.
                                          */
                                         continue;
 
-                                fid  = ent->lde_fid;
                                 name = ent->lde_name;
-                                fid_le_to_cpu(&fid, &fid);
-                                if (need_32bit)
+                                fid_le_to_cpu(&fid, &ent->lde_fid);
+                                if (need_32bit) {
+                                        lhash = hash >> 32;
                                         ino = ll_fid_build_ino32((struct ll_fid *)&fid);
-                                else
+                                } else {
+                                        lhash = hash;
                                         ino = ll_fid_build_ino((struct ll_fid *)&fid);
+                                }
 
                                 type = ll_dirent_type_get(ent);
                                 done = filldir(cookie, name, namelen,
-                                               (loff_t)hash, ino, type);
+                                               lhash, ino, type);
                         }
                         next = le64_to_cpu(dp->ldp_hash_end);
                         ll_put_page(page);
                         if (!done) {
                                 pos = next;
-                                if (pos == DIR_END_OFF)
+                                if (pos == DIR_END_OFF) {
                                         /*
                                          * End of directory reached.
                                          */
                                         done = 1;
-                                else if (1 /* chain is exhausted*/)
+                                } else if (1 /* chain is exhausted*/) {
                                         /*
                                          * Normal case: continue to the next
                                          * page.
                                          */
-                                        page = ll_get_dir_page_20(inode, pos, 1,
+                                        fd->fd_dir.lfd_next = pos;
+                                        page = ll_get_dir_page_20(filp, inode,
+                                                                  pos, 1,
                                                                   &chain);
-                                else {
+                                else {
                                         /*
                                          * go into overflow page.
                                          */
@@ -912,7 +950,15 @@ static int ll_readdir_20(struct file *filp, void *cookie, filldir_t filldir)
                 }
         }
 
-        filp->f_pos = (loff_t)(__s32)pos;
+        fd->fd_dir.lfd_pos = pos;
+        if (need_32bit) {
+                if (pos == DIR_END_OFF)
+                        filp->f_pos = DIR_END_OFF_32BIT;
+                else
+                        filp->f_pos = pos >> 32;
+        } else {
+                filp->f_pos = pos;
+        }
         filp->f_version = inode->i_version;
         touch_atime(filp->f_vfsmnt, filp->f_dentry);
 
@@ -1612,8 +1658,63 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
         }
 }
 
+static loff_t ll_dir_seek(struct file *file, loff_t offset, int origin)
+{
+        struct inode *inode = file->f_mapping->host;
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        int need_32bit = ll_need_32bit_api(sbi);
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+        loff_t ret = -EINVAL;
+        ENTRY;
+
+        if (!(sbi->ll_mdc_exp->exp_connect_flags & OBD_CONNECT_FID))
+                return default_llseek(file, offset, origin);
+
+        mutex_lock(&inode->i_mutex);
+        switch (origin) {
+                case 2:
+                        offset += inode->i_size;
+                        break;
+                case 1:
+                        if ((need_32bit && file->f_pos == DIR_END_OFF_32BIT) ||
+                            (!need_32bit && file->f_pos == DIR_END_OFF)) {
+                                if (offset == 0)
+                                        GOTO(out, ret = file->f_pos);
+                                else if (offset > 0)
+                                        GOTO(out, ret);
+                        }
+                        offset += file->f_pos;
+                        break;
+        }
+
+        if (need_32bit && offset >= 0 && offset <= DIR_END_OFF_32BIT) {
+                if (offset != file->f_pos) {
+                        if (offset == DIR_END_OFF_32BIT)
+                                fd->fd_dir.lfd_pos = DIR_END_OFF;
+                        else
+                                fd->fd_dir.lfd_pos = offset << 32;
+                        file->f_pos = offset;
+                        file->f_version = 0;
+                }
+                ret = offset;
+        } else if (!need_32bit && (offset >= 0 || offset == DIR_END_OFF)) {
+                if (offset != file->f_pos) {
+                        fd->fd_dir.lfd_pos = offset;
+                        file->f_pos = offset;
+                        file->f_version = 0;
+                }
+                ret = offset;
+        }
+        EXIT;
+
+out:
+        mutex_unlock(&inode->i_mutex);
+        return ret;
+}
+
 struct file_operations ll_dir_operations = {
         .open     = ll_file_open,
+        .llseek   = ll_dir_seek,
         .release  = ll_file_release,
         .read     = generic_read_dir,
         .readdir  = ll_readdir,
index a3458b7..5bf3b3e 100644 (file)
@@ -565,6 +565,11 @@ struct ll_readahead_state {
         unsigned long ras_consecutive_stride_requests;
 };
 
+struct ll_file_dir {
+        __u64 lfd_pos;
+        __u64 lfd_next;
+};
+
 extern cfs_mem_cache_t *ll_file_data_slab;
 extern struct rw_semaphore ll_sb_sem;
 struct lustre_handle;
@@ -573,6 +578,7 @@ struct ll_file_data {
         int fd_omode;
         struct lustre_handle fd_cwlockh;
         unsigned long fd_gid;
+        struct ll_file_dir fd_dir;
         __u32 fd_flags;
 };
 
@@ -1138,7 +1144,7 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
         return do_statahead_enter(dir, dentryp, lookup);
 }
 
-static void inline ll_dops_init(struct dentry *de, int block)
+static void inline ll_dops_init(struct dentry *de, int block, int init_sa)
 {
         struct ll_dentry_data *lld = ll_d2d(de);
 
@@ -1147,7 +1153,7 @@ static void inline ll_dops_init(struct dentry *de, int block)
                 lld = ll_d2d(de);
         }
 
-        if (lld != NULL)
+        if (lld != NULL && init_sa != 0)
                 lld->lld_sa_generation = 0;
 
         de->d_op = &ll_d_ops;
index 95fc4c0..7a47ce1 100644 (file)
@@ -281,8 +281,9 @@ static int client_common_fill_super(struct super_block *sb,
         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_IBITS      |
                                   OBD_CONNECT_JOIN    | OBD_CONNECT_ATTRFID    |
                                   OBD_CONNECT_NODEVOH | OBD_CONNECT_CANCELSET  |
-                                  OBD_CONNECT_AT      | OBD_CONNECT_FID |
-                                  OBD_CONNECT_VBR     | OBD_CONNECT_LOV_V3;
+                                  OBD_CONNECT_AT      | OBD_CONNECT_FID        |
+                                  OBD_CONNECT_VBR     | OBD_CONNECT_LOV_V3     |
+                                  OBD_CONNECT_64BITHASH;
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
                 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
index fc5e9b5..d75c4e6 100644 (file)
@@ -135,7 +135,7 @@ static struct dentry *ll_iget_for_nfs(struct super_block *sb,
         result = d_obtain_alias(inode);
         if (!result)
                 RETURN(ERR_PTR(-ENOMEM));
-        ll_dops_init(result, 1);
+        ll_dops_init(result, 1, 0);
 
         RETURN(result);
 }
index 7535b00..3e79725 100644 (file)
@@ -462,7 +462,7 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
                 lock_dentry(dentry);
                 __d_drop(dentry);
                 unlock_dentry(dentry);
-                ll_dops_init(dentry, 0);
+                ll_dops_init(dentry, 0, 1);
                 d_rehash_cond(dentry, 0); /* avoid taking dcache_lock inside */
                 spin_unlock(&dcache_lock);
                 spin_unlock(&ll_lookup_lock);
@@ -481,7 +481,7 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
                 last_discon->d_flags |= DCACHE_LUSTRE_INVALID;
                 spin_unlock(&dcache_lock);
                 spin_unlock(&ll_lookup_lock);
-                ll_dops_init(last_discon, 1);
+                ll_dops_init(last_discon, 1, 1);
                 d_rehash(de);
                 d_move(last_discon, de);
                 iput(inode);
@@ -532,7 +532,7 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset,
                    ll_glimpse_size or some equivalent themselves anyway.
                    Also see bug 7198. */
 
-                ll_dops_init(*de, 1);
+                ll_dops_init(*de, 1, 1);
                 *de = ll_find_alias(inode, *de);
                 if (*de != save) {
                         struct ll_dentry_data *lld = ll_d2d(*de);
@@ -552,7 +552,7 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset,
                         unlock_dentry(*de);
                 }
         } else {
-                ll_dops_init(*de, 1);
+                ll_dops_init(*de, 1, 1);
                 /* Check that parent has UPDATE lock. If there is none, we
                    cannot afford to hash this dentry (done by ll_d_add) as it
                    might get picked up later when UPDATE lock will appear */
@@ -730,7 +730,7 @@ static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry,
                         if ((nd->flags & LOOKUP_CREATE ) && !(nd->flags & LOOKUP_OPEN)) {
                                 /* We are sure this is new dentry, so we need to create
                                    our private data and set the dentry ops */ 
-                                ll_dops_init(dentry, 1);
+                                ll_dops_init(dentry, 1, 1);
                                 RETURN(NULL);
                         }
                         it = ll_convert_intent(&nd->intent.open, nd->flags);
index 40410c0..e9e4761 100644 (file)
@@ -752,6 +752,7 @@ static const char *obd_connect_names[] = {
         "large_ea",
         "full20",
         "layout_lock",
+        "64bithash",
         NULL
 };
 
index 329d01e..00ba531 100644 (file)
@@ -564,6 +564,7 @@ void lustre_assert_wire_constants(void)
         CLASSERT(OBD_CONNECT_MAX_EASIZE == 0x800000000ULL);
         CLASSERT(OBD_CONNECT_FULL20 == 0x1000000000ULL);
         CLASSERT(OBD_CONNECT_LAYOUTLOCK == 0x2000000000ULL);
+        CLASSERT(OBD_CONNECT_64BITHASH == 0x4000000000ULL);
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
index f978997..1dd1276 100644 (file)
@@ -249,6 +249,7 @@ static void check_obd_connect_data(void)
         CHECK_CDEFINE(OBD_CONNECT_MAX_EASIZE);
         CHECK_CDEFINE(OBD_CONNECT_FULL20);
         CHECK_CDEFINE(OBD_CONNECT_LAYOUTLOCK);
+        CHECK_CDEFINE(OBD_CONNECT_64BITHASH);
 }
 
 static void
index 0138e9f..91f02fc 100644 (file)
@@ -562,6 +562,7 @@ void lustre_assert_wire_constants(void)
         CLASSERT(OBD_CONNECT_MAX_EASIZE == 0x800000000ULL);
         CLASSERT(OBD_CONNECT_FULL20 == 0x1000000000ULL);
         CLASSERT(OBD_CONNECT_LAYOUTLOCK == 0x2000000000ULL);
+        CLASSERT(OBD_CONNECT_64BITHASH == 0x4000000000ULL);
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",