Whamcloud - gitweb
new hash-order readdir. Initial implementation
authornikita <nikita>
Thu, 27 Jul 2006 22:31:50 +0000 (22:31 +0000)
committernikita <nikita>
Thu, 27 Jul 2006 22:31:50 +0000 (22:31 +0000)
13 files changed:
lustre/cmm/cmm_object.c
lustre/include/dt_object.h
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/md_object.h
lustre/kernel_patches/patches/ext3-iam-separate.patch
lustre/kernel_patches/patches/ext3-iam-uapi.patch
lustre/llite/dir.c
lustre/llite/llite_internal.h
lustre/lmv/lmv_obd.c
lustre/mdd/mdd_handler.c
lustre/mdt/mdt_handler.c
lustre/osd/osd_handler.c

index ef5fd16..99586d1 100644 (file)
@@ -308,7 +308,7 @@ static int cml_close(const struct lu_context *ctx, struct md_object *mo)
 }
 
 static int cml_readpage(const struct lu_context *ctxt, struct md_object *mo,
-                        struct lu_rdpg *rdpg)
+                        const struct lu_rdpg *rdpg)
 {
         int rc;
         ENTRY;
@@ -585,6 +585,12 @@ static int cmr_close(const struct lu_context *ctx, struct md_object *mo)
         RETURN(-EFAULT);
 }
 
+static int cmr_readpage(const struct lu_context *ctxt, struct md_object *mo,
+                        const struct lu_rdpg *rdpg)
+{
+        RETURN(-EREMOTE);
+}
+
 static struct md_object_operations cmr_mo_ops = {
         .moo_attr_get      = cmr_attr_get,
         .moo_attr_set      = cmr_attr_set,
@@ -596,7 +602,8 @@ static struct md_object_operations cmr_mo_ops = {
         .moo_ref_add       = cmr_ref_add,
         .moo_ref_del       = cmr_ref_del,
         .moo_open          = cmr_open,
-        .moo_close         = cmr_close
+        .moo_close         = cmr_close,
+        .moo_readpage      = cmr_readpage
 };
 
 /* remote part of md_dir operations */
index 2ae1320..0c992cc 100644 (file)
@@ -232,7 +232,7 @@ struct dt_object_operations {
                             struct dt_object *dt, struct thandle *th);
 
         int (*do_readpage)(const struct lu_context *ctxt,
-                           struct dt_object *dt, struct lu_rdpg *rdpg);
+                           struct dt_object *dt, const struct lu_rdpg *rdpg);
 };
 
 /*
index ddb2947..611d6ce 100644 (file)
@@ -773,13 +773,10 @@ static inline int lu_object_assert_not_exists(const struct lu_context *ctx,
 
 struct lu_rdpg {
         /* input params, should be filled out by mdt */
-        loff_t                  rp_offset;      /* offset, page alligned*/
+        __u32                   rp_hash;        /* hash */
         int                     rp_count;       /* count in bytes       */
         int                     rp_npages;      /* number of pages      */
         struct page           **rp_pages;       /* pointers to pages    */
-
-        /* output params, filled by osd */
-        __u64                   rp_size;        /* file size in total   */
 };
 
 enum lu_xattr_flags {
index c525140..e34d701 100644 (file)
@@ -262,24 +262,24 @@ static inline void fid_le_to_cpu(struct lu_fid *fid)
         fid->f_ver = le32_to_cpu(fid_ver(fid));
 }
 
-#define LU_NAME_LEN 255
-
 /*
- * lustre directory entry. This is used for sending directory content to the
- * client, where it later gets converted to what Linux expects.
+ * Layout of readdir pages, as transmitted on wire.
  */
-struct lu_dir_entry {
-        struct lu_fid            de_fid;               /* file fid */
-        __u16                    de_rec_len;           /* rec len */
-        __u16                    de_name_len;          /* name len */
-       char                     de_name[LU_NAME_LEN]; /* file name */
+struct lu_dirent {
+        struct lu_fid lde_fid;
+        __u32         lde_hash;
+        __u16         lde_reclen;
+        __u16         lde_namelen;
+        char          lde_name[0];
 };
 
-#define LU_DIR_PAD          4
-#define LU_DIR_ROUND        (LU_DIR_PAD - 1)
-
-#define LU_DIR_REC_LEN(len) ((sizeof(struct lu_fid) + 2*sizeof(__u16) + \
-                              len + 1 + LU_DIR_ROUND) & ~LU_DIR_ROUND)
+struct lu_dirpage {
+        __u32            ldp_hash_start;
+        __u32            ldp_hash_end;
+        __u16            ldp_pad0;
+        __u32            ldp_pad1;
+        struct lu_dirent ldp_entries[0];
+};
 
 #define MEA_MAGIC_LAST_CHAR      0xb2221ca1
 #define MEA_MAGIC_ALL_CHARS      0xb222a11c
index e78ac87..15c7cfe 100644 (file)
@@ -92,7 +92,7 @@ struct md_object_operations {
         int (*moo_close)(const struct lu_context *, struct md_object *);
 
         int (*moo_readpage)(const struct lu_context *, struct md_object *,
-                            struct lu_rdpg *);
+                            const struct lu_rdpg *);
 };
 
 /*
@@ -249,7 +249,7 @@ static inline int mo_close(const struct lu_context *cx, struct md_object *m)
 }
 
 static inline int mo_readpage(const struct lu_context *cx, struct md_object *m,
-                              struct lu_rdpg *rdpg)
+                              const struct lu_rdpg *rdpg)
 {
         LASSERT(m->mo_ops->moo_readpage);
         return m->mo_ops->moo_readpage(cx, m, rdpg);
index f2a76dc..b06605a 100644 (file)
@@ -1,7 +1,7 @@
 Index: iam/fs/ext3/Makefile
 ===================================================================
 --- iam.orig/fs/ext3/Makefile  2006-05-31 20:24:32.000000000 +0400
-+++ iam/fs/ext3/Makefile       2006-07-25 16:59:51.000000000 +0400
++++ iam/fs/ext3/Makefile       2006-07-28 01:53:16.000000000 +0400
 @@ -6,7 +6,7 @@ obj-$(CONFIG_EXT3_FS) += ext3.o
  
  ext3-y        := balloc.o bitmap.o dir.o file.o fsync.o ialloc.o inode.o iopen.o \
@@ -5461,7 +5461,7 @@ Index: iam/fs/ext3/namei.c
 Index: iam/include/linux/lustre_iam.h
 ===================================================================
 --- iam.orig/include/linux/lustre_iam.h        2006-05-31 20:24:32.000000000 +0400
-+++ iam/include/linux/lustre_iam.h     2006-07-25 16:59:51.000000000 +0400
++++ iam/include/linux/lustre_iam.h     2006-07-28 01:53:16.000000000 +0400
 @@ -1,9 +1,68 @@
 +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
 + * vim:expandtab:shiftwidth=8:tabstop=8:
index a3db133..146d4af 100644 (file)
@@ -1,7 +1,7 @@
 Index: iam/fs/ext3/Makefile
 ===================================================================
---- iam.orig/fs/ext3/Makefile  2006-07-25 16:59:51.000000000 +0400
-+++ iam/fs/ext3/Makefile       2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/fs/ext3/Makefile  2006-07-28 01:53:16.000000000 +0400
++++ iam/fs/ext3/Makefile       2006-07-28 01:53:16.000000000 +0400
 @@ -6,7 +6,7 @@ obj-$(CONFIG_EXT3_FS) += ext3.o
  
  ext3-y        := balloc.o bitmap.o dir.o file.o fsync.o ialloc.o inode.o iopen.o \
@@ -13,8 +13,8 @@ Index: iam/fs/ext3/Makefile
  ext3-$(CONFIG_EXT3_FS_POSIX_ACL) += acl.o
 Index: iam/fs/ext3/dir.c
 ===================================================================
---- iam.orig/fs/ext3/dir.c     2006-07-25 16:59:51.000000000 +0400
-+++ iam/fs/ext3/dir.c  2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/fs/ext3/dir.c     2006-07-28 01:53:16.000000000 +0400
++++ iam/fs/ext3/dir.c  2006-07-28 01:53:16.000000000 +0400
 @@ -28,6 +28,7 @@
  #include <linux/smp_lock.h>
  #include <linux/slab.h>
@@ -112,8 +112,8 @@ Index: iam/fs/ext3/dir.c
                    (filp->f_version != inode->i_version)) {
 Index: iam/fs/ext3/file.c
 ===================================================================
---- iam.orig/fs/ext3/file.c    2006-07-25 16:59:51.000000000 +0400
-+++ iam/fs/ext3/file.c 2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/fs/ext3/file.c    2006-07-28 01:53:16.000000000 +0400
++++ iam/fs/ext3/file.c 2006-07-28 01:53:16.000000000 +0400
 @@ -23,6 +23,7 @@
  #include <linux/jbd.h>
  #include <linux/ext3_fs.h>
@@ -149,7 +149,7 @@ Index: iam/fs/ext3/file.c
 Index: iam/fs/ext3/iam-uapi.c
 ===================================================================
 --- iam.orig/fs/ext3/iam-uapi.c        2004-04-06 17:27:52.000000000 +0400
-+++ iam/fs/ext3/iam-uapi.c     2006-07-25 16:59:51.000000000 +0400
++++ iam/fs/ext3/iam-uapi.c     2006-07-28 01:53:16.000000000 +0400
 @@ -0,0 +1,361 @@
 +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
 + * vim:expandtab:shiftwidth=8:tabstop=8:
@@ -514,8 +514,8 @@ Index: iam/fs/ext3/iam-uapi.c
 +}
 Index: iam/fs/ext3/ioctl.c
 ===================================================================
---- iam.orig/fs/ext3/ioctl.c   2006-07-25 16:59:51.000000000 +0400
-+++ iam/fs/ext3/ioctl.c        2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/fs/ext3/ioctl.c   2006-07-28 01:53:16.000000000 +0400
++++ iam/fs/ext3/ioctl.c        2006-07-28 01:53:16.000000000 +0400
 @@ -250,6 +250,6 @@ flags_err:
  
  
@@ -526,8 +526,8 @@ Index: iam/fs/ext3/ioctl.c
  }
 Index: iam/include/linux/lustre_iam.h
 ===================================================================
---- iam.orig/include/linux/lustre_iam.h        2006-07-25 16:59:51.000000000 +0400
-+++ iam/include/linux/lustre_iam.h     2006-07-25 16:59:51.000000000 +0400
+--- iam.orig/include/linux/lustre_iam.h        2006-07-28 01:53:16.000000000 +0400
++++ iam/include/linux/lustre_iam.h     2006-07-28 01:53:16.000000000 +0400
 @@ -30,9 +30,6 @@
  #ifndef __LINUX_LUSTRE_IAM_H__
  #define __LINUX_LUSTRE_IAM_H__
index 43e828b..eb604c4 100644 (file)
 #define PageChecked(page)        test_bit(PG_checked, &(page)->flags)
 #define SetPageChecked(page)     set_bit(PG_checked, &(page)->flags)
 
+static __u32 hash_x_index(__u32 value)
+{
+        return ((__u32)~0) - value;
+}
+
 /* returns the page unlocked, but with a reference */
 static int ll_dir_readpage(struct file *file, struct page *page)
 {
         struct inode *inode = page->mapping->host;
         struct ptlrpc_request *request;
         struct mdt_body *body;
-        __u64 offset;
-        int rc = 0;
+        __u32 hash;
+        int rc;
         ENTRY;
 
-        offset = (__u64)page->index << PAGE_SHIFT;
-        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off "LPU64"\n",
-               inode->i_ino, inode->i_generation, inode, offset);
+        hash = hash_x_index(page->index);
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n",
+               inode->i_ino, inode->i_generation, inode, (unsigned long)hash);
 
         rc = md_readpage(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode),
-                         offset, page, &request);
+                         hash, page, &request);
         if (!rc) {
                 body = lustre_msg_buf(request->rq_repmsg, 0, sizeof (*body));
                 LASSERT (body != NULL);         /* checked by md_readpage() */
@@ -110,13 +115,87 @@ static inline void ll_put_page(struct page *page)
         page_cache_release(page);
 }
 
-static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
+/*
+ * Find, kmap and return page that contains given hash.
+ */
+static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
+                                       __u32 *start, __u32 *end)
+{
+        struct address_space *mapping = dir->i_mapping;
+        /*
+         * Complement of hash is used as an index so that
+         * radix_tree_gang_lookup() can be used to find a page with starting
+         * hash _smaller_ than one we are looking for.
+         */
+        unsigned long offset = hash_x_index(hash);
+        struct page *page;
+        int found;
+
+       spin_lock_irq(&mapping->tree_lock);
+       found = radix_tree_gang_lookup(&mapping->page_tree,
+                                       (void **)&page, offset, 1);
+       if (found > 0) {
+                struct lu_dirpage *dp;
+
+               page_cache_get(page);
+                spin_unlock_irq(&mapping->tree_lock);
+                /*
+                 * In contrast to find_lock_page() we are sure that directory
+                 * page cannot be truncated (while DLM lock is held) and,
+                 * hence, can avoid restart.
+                 *
+                 * In fact, page cannot be locked here at all, because
+                 * ll_dir_readpage() does synchronous io.
+                 */
+                wait_on_page(page);
+                if (PageUptodate(page)) {
+                        dp = kmap(page);
+                        *start = le32_to_cpu(dp->ldp_hash_start);
+                        *end   = le32_to_cpu(dp->ldp_hash_end);
+                        LASSERT(*start <= hash);
+                        if (hash > *end || (*end != *start && hash == *end)) {
+                                kunmap(page);
+                                page_cache_release(page);
+                                page = NULL;
+                        }
+                } else {
+                        page_cache_release(page);
+                        page = ERR_PTR(-EIO);
+                }
+
+       } else {
+                spin_unlock_irq(&mapping->tree_lock);
+                page = NULL;
+        }
+        return page;
+}
+
+/*
+ * Chain of hash overflow pages.
+ */
+struct ll_dir_chain {
+        /* XXX something. Later */
+};
+
+static void ll_dir_chain_init(struct ll_dir_chain *chain)
+{
+}
+
+static void ll_dir_chain_fini(struct ll_dir_chain *chain)
+{
+}
+
+static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact,
+                                    struct ll_dir_chain *chain)
 {
         ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} };
         struct address_space *mapping = dir->i_mapping;
         struct lustre_handle lockh;
         struct page *page;
+        struct lu_dirpage *dp;
         int rc;
+        __u32 start;
+        __u32 end;
 
         rc = md_lock_match(ll_i2sbi(dir)->ll_md_exp, LDLM_FL_BLOCK_GRANTED,
                            ll_inode2fid(dir), LDLM_IBITS, &policy, LCK_CR, &lockh);
@@ -142,7 +221,27 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
         }
         ldlm_lock_dump_handle(D_OTHER, &lockh);
 
-        page = read_cache_page(mapping, n,
+        page = ll_dir_page_locate(dir, hash, &start, &end);
+        if (IS_ERR(page))
+                GOTO(out_unlock, page);
+
+        if (page != NULL) {
+                if (exact && hash != start) {
+                        /*
+                         * readdir asked for a page starting _exactly_ from
+                         * given hash, but cache contains stale page, with
+                         * entries with smaller hash values. Stale page should
+                         * be invalidated, and new one fetched.
+                         */
+                        CWARN("Stale readpage page: %#lx != %#lx\n",
+                              (unsigned long)hash, (unsigned long)start);
+                        truncate_complete_page(mapping, page);
+                        page_cache_release(page);
+                } else
+                        GOTO(hash_collision, page);
+        }
+
+        page = read_cache_page(mapping, hash_x_index(hash),
                                (filler_t*)mapping->a_ops->readpage, NULL);
         if (IS_ERR(page))
                 GOTO(out_unlock, page);
@@ -155,7 +254,21 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
                 ll_check_page(dir, page);
         if (PageError(page))
                 goto fail;
-
+        dp = page_address(page);
+
+        start = le32_to_cpu(dp->ldp_hash_start);
+        end   = le32_to_cpu(dp->ldp_hash_end);
+hash_collision:
+        if (end == start) {
+                LASSERT(start == hash);
+                CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end);
+                /*
+                 * Fetch whole overflow chain...
+                 *
+                 * XXX not yet.
+                 */
+                goto fail;
+        }
 out_unlock:
         ldlm_lock_decref(&lockh, LCK_CR);
         return page;
@@ -166,78 +279,166 @@ fail:
         goto out_unlock;
 }
 
-static inline struct lu_dir_entry *ll_next_entry(struct lu_dir_entry *p)
+static loff_t ll_llseek(struct file *filp, loff_t off, int whence)
 {
-        return (struct lu_dir_entry *)((char *)p + le16_to_cpu(p->de_rec_len));
+        if (off != 0 || whence != 1 /* SEEK_CUR */) {
+                /*
+                 * Except when telldir() is going on, reset readdir to the
+                 * beginning of hash collision chain.
+                 */
+                struct ll_file_data *fd = LUSTRE_FPRIVATE(filp);
+
+                fd->fd_dir.lfd_dup = 0;
+        }
+        return default_llseek(filp, off, whence);
 }
 
-int ll_readdir(struct file *filp, void *dirent, filldir_t filldir)
+static struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp)
 {
-        struct inode *inode = filp->f_dentry->d_inode;
-        loff_t pos = filp->f_pos;
-        unsigned offset = pos & ~PAGE_CACHE_MASK;
-        unsigned long n = pos >> PAGE_CACHE_SHIFT;
-        unsigned long npages = dir_pages(inode);
-        int rc = 0;
-        ENTRY;
-
-        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %llu/%llu\n",
-               inode->i_ino, inode->i_generation, inode, pos, inode->i_size);
+        return dp->ldp_entries;
+}
 
-        if (pos > inode->i_size - LU_DIR_REC_LEN(1))
-                RETURN(0);
+static struct lu_dirent *lu_dirent_next(struct lu_dirent *ent)
+{
+        struct lu_dirent *next;
 
-        for ( ; n < npages; n++, offset = 0) {
-                char *kaddr, *limit;
-                struct lu_dir_entry *de;
-                struct page *page;
+        if (ent->lde_reclen != 0)
+                next = ((void *)ent) + le16_to_cpu(ent->lde_reclen);
+        else
+                next = NULL;
+        return next;
+}
 
-                CDEBUG(D_VFSTRACE,"read %lu of dir %lu/%u page %lu/%lu size %llu\n",
-                       PAGE_CACHE_SIZE, inode->i_ino, inode->i_generation,
-                       n, npages, inode->i_size);
-                page = ll_get_dir_page(inode, n);
+int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
+{
+        struct inode         *inode = filp->f_dentry->d_inode;
+        struct ll_inode_info *info  = ll_i2info(inode);
+        struct ll_file_data  *fd    = LUSTRE_FPRIVATE(filp);
+        struct ll_sb_info    *sbi   = ll_i2sbi(inode);
+        __u32                 pos   = filp->f_pos;
+        struct page          *page;
+        struct ll_dir_chain   chain;
+        __u32 prevhash;
+        int rc;
+        int dup;
+        int done;
+        int shift;
+        ENTRY;
 
-                /* size might have been updated by md_readpage */
-                npages = dir_pages(inode);
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %lu/%llu\n",
+               inode->i_ino, inode->i_generation, inode,
+               (unsigned long)pos, inode->i_size);
 
-                if (IS_ERR(page)) {
-                        rc = PTR_ERR(page);
-                        CERROR("error reading dir %lu/%u page %lu: rc %d\n",
-                               inode->i_ino, inode->i_generation, n, rc);
-                        continue;
-                }
+        if (pos == ~0)
+                /*
+                 * end-of-file.
+                 */
+                RETURN(0);
 
-                kaddr = page_address(page);
-                
-                de = (struct lu_dir_entry *)(kaddr + offset);
-                limit = kaddr + PAGE_CACHE_SIZE - LU_DIR_REC_LEN(1);
-                for ( ;(char*)de <= limit; de = ll_next_entry(de)) {
-                        if (fid_oid(&de->de_fid) && fid_seq(&de->de_fid)) {
-                                struct ll_sb_info *sbi = ll_i2sbi(inode);
-                                int over;
-
-                                rc = 0; /* no error if we return something */
-
-                                offset = (char *)de - kaddr;
-                                fid_le_to_cpu(&de->de_fid);
-                                over = filldir(dirent, de->de_name, de->de_name_len,
-                                               (n << PAGE_CACHE_SHIFT) | offset,
-                                               ll_fid_build_ino(sbi, &de->de_fid),
-                                               0);
-                                if (over) {
-                                        ll_put_page(page);
-                                        GOTO(done, rc);
+        rc    = 0;
+        dup   = 0;
+        done  = 0;
+        shift = 0;
+        prevhash = ~0; /* impossible hash value */
+        ll_dir_chain_init(&chain);
+
+        page = ll_get_dir_page(inode, pos, 0, &chain);
+
+        while (rc == 0 && !done) {
+                struct lu_dirpage *dp;
+                struct lu_dirent  *ent;
+
+                if (!IS_ERR(page)) {
+                        __u32 hash; /* no, Richard, it _is_ initialized */
+                        __u32 next;
+
+                        dp = page_address(page);
+                        for (ent = lu_dirent_start(dp); ent != NULL && !done;
+                             ent = lu_dirent_next(ent)) {
+                                char          *name;
+                                int            namelen;
+                                struct lu_fid  fid;
+                                ino_t          ino;
+
+                                /*
+                                 * XXX: implement correct swabbing here.
+                                 */
+
+                                hash    = le32_to_cpu(ent->lde_hash);
+                                namelen = le16_to_cpu(ent->lde_namelen);
+
+                                if (hash < pos)
+                                        /*
+                                         * Skip until we find target hash
+                                         * value.
+                                         */
+                                        continue;
+
+                                if (namelen == 0)
+                                        /*
+                                         * Skip dummy record.
+                                         */
+                                        continue;
+                                /*
+                                 * Keep track of how far we get into duplicate
+                                 * hash segment.
+                                 */
+                                if (hash == prevhash)
+                                        dup++;
+                                prevhash = hash;
+
+                                if (hash == fd->fd_dir.lfd_duppos &&
+                                    fd->fd_dir.lfd_dup > 0) {
+                                        fd->fd_dir.lfd_dup--;
+                                        continue;
                                 }
+
+                                fid  = ent->lde_fid;
+                                name = ent->lde_name;
+                                fid_le_to_cpu(&fid);
+                                ino  = ll_fid_build_ino(sbi, &fid);
+
+                                done = filldir(cookie, name, namelen,
+                                               hash, ino, DT_UNKNOWN);
                         }
+                        next = le32_to_cpu(dp->ldp_hash_end);
+                        ll_put_page(page);
+                        if (!done) {
+                                pos = next;
+                                if (pos == ~0)
+                                        /*
+                                         * End of directory reached.
+                                         */
+                                        done = 1;
+                                else if (1 /* chain is exhausted*/)
+                                        /*
+                                         * Normal case: continue to the next
+                                         * page.
+                                         */
+                                        page = ll_get_dir_page(inode, pos, 1,
+                                                               &chain);
+                                else {
+                                        /*
+                                         * go into overflow page.
+                                         */
+                                }
+                        } else
+                                pos = hash;
+                } else {
+                        rc = PTR_ERR(page);
+                        CERROR("error reading dir "DFID3" at %lu: rc %d\n",
+                               PFID3(&info->lli_fid), (unsigned long)pos, rc);
                 }
-                ll_put_page(page);
         }
 
-done:
-        filp->f_pos = (n << PAGE_CACHE_SHIFT) | offset;
+        filp->f_pos = pos;
         filp->f_version = inode->i_version;
+        fd->fd_dir.lfd_dup    = dup;
+        fd->fd_dir.lfd_duppos = prevhash;
         touch_atime(filp->f_vfsmnt, filp->f_dentry);
 
+        ll_dir_chain_fini(&chain);
+
         RETURN(rc);
 }
 
@@ -798,6 +999,7 @@ struct file_operations ll_dir_operations = {
         .release  = ll_dir_release,
         .read     = generic_read_dir,
         .readdir  = ll_readdir,
+        .llseek   = ll_llseek,
         .ioctl    = ll_dir_ioctl
 };
 
index 4b26a70..5da3837 100644 (file)
@@ -14,7 +14,7 @@
 #include <lustre_debug.h>
 #include <lustre_ver.h>
 #include <lustre_disk.h>  /* for s2sbi */
+
 /*
 struct lustre_intent_data {
         __u64 it_lock_handle[2];
@@ -217,7 +217,7 @@ struct ll_readahead_state {
         unsigned long   ras_consecutive_pages;
         /*
          * number of read requests after the last read-ahead window reset
-         * As window is reset on each seek, this is effectively the number 
+         * As window is reset on each seek, this is effectively the number
          * on consecutive read request and is used to trigger read-ahead.
          */
         unsigned long   ras_consecutive_requests;
@@ -244,7 +244,7 @@ struct ll_readahead_state {
          */
         unsigned long   ras_requests;
         /*
-         * Page index with respect to the current request, these value 
+         * Page index with respect to the current request, these value
          * will not be accurate when dealing with reads issued via mmap.
          */
         unsigned long   ras_request_index;
@@ -256,6 +256,11 @@ struct ll_readahead_state {
         struct list_head ras_read_beads;
 };
 
+struct ll_file_dir {
+        int           lfd_dup;
+        loff_t        lfd_duppos;
+};
+
 extern kmem_cache_t *ll_file_data_slab;
 struct lustre_handle;
 struct ll_file_data {
@@ -264,6 +269,7 @@ struct ll_file_data {
         __u32 fd_flags;
         struct lustre_handle fd_cwlockh;
         unsigned long fd_gid;
+        struct ll_file_dir fd_dir;
 };
 
 struct lov_stripe_md;
@@ -444,7 +450,7 @@ int ll_iocontrol(struct inode *inode, struct file *file,
                  unsigned int cmd, unsigned long arg);
 void ll_umount_begin(struct super_block *sb);
 int ll_remount_fs(struct super_block *sb, int *flags, char *data);
-int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req, 
+int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
                   int offset, struct super_block *);
 void lustre_dump_dentry(struct dentry *, int recur);
 void lustre_dump_inode(struct inode *);
index 66969a5..518fccc 100644 (file)
@@ -1768,10 +1768,14 @@ static int lmv_readpage(struct obd_export *exp, struct lu_fid *fid,
         rc = md_readpage(lmv->tgts[i].ltd_exp, &rid,
                          offset, page, request);
 
-        if (rc == 0 && !lu_fid_eq(&rid, fid))
+        if (0 && rc == 0 && !lu_fid_eq(&rid, fid))
                 /* this page isn't from master object. To avoid "." and ".."
                  * duplication in directory, we have to remove them from all
-                 * slave objects */
+                 * slave objects
+                 *
+                 * XXX this is not needed for cmd3 readdir, because only
+                 * master directory has dot and dotdot.
+                 */
                 lmv_remove_dots(page);
 
         RETURN(rc);
index 7f2eba5..e5e14a4 100644 (file)
@@ -621,22 +621,8 @@ static int mdd_dir_is_empty(const struct lu_context *ctx,
                 result = iops->get(ctx, it, (const void *)"");
                 if (result > 0) {
                         int i;
-                        for (result = 0, i = 0; result == 0 && i < 3; ++i) {
+                        for (result = 0, i = 0; result == 0 && i < 3; ++i)
                                 result = iops->next(ctx, it);
-#if 0
-                                if (result == 0) {
-                                        struct lu_fid *fid;
-                                        char          *name;
-                                        int            len;
-
-                                        fid  = (void *)iops->rec(ctx, it);
-                                        name = (void *)iops->key(ctx, it);
-                                        len  = iops->key_size(ctx, it);
-                                        CERROR("entry: "DFID3": \"%*.*s\"\n",
-                                               PFID3(fid), len, len, name);
-                                }
-#endif
-                        }
                         iops->put(ctx, it);
                         if (result == 0)
                                 result = -ENOTEMPTY;
@@ -1223,7 +1209,7 @@ static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
 }
 
 static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj,
-                        struct lu_rdpg *rdpg)
+                        const struct lu_rdpg *rdpg)
 {
         struct dt_object *next;
         int rc;
index 71b2e3e..9baac52 100644 (file)
@@ -355,7 +355,7 @@ skip_packing:
                         if (rc < 0) {
                                 if (rc == -ENODATA || rc == -EOPNOTSUPP)
                                         rc = 0;
-                                else 
+                                else
                                         CERROR("got acl size: %d\n", rc);
                         } else {
                                 repbody->aclsize = rc;
@@ -470,7 +470,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 if (lock) {
                         res_id = &lock->l_resource->lr_name;
                         LDLM_DEBUG(lock, "we will return this lock client\n");
-                        LASSERTF(fid_res_name_eq(mdt_object_fid(child), 
+                        LASSERTF(fid_res_name_eq(mdt_object_fid(child),
                                                 &lock->l_resource->lr_name),
                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID3".\n",
                          (unsigned long)res_id->name[0],
@@ -557,7 +557,7 @@ static int mdt_sendpage(struct mdt_thread_info *info,
 
         for (i = 0, tmpcount = rdpg->rp_count;
                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
-                tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
+                tmpsize = min(tmpcount, CFS_PAGE_SIZE);
                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
         }
 
@@ -607,15 +607,12 @@ static int mdt_readpage(struct mdt_thread_info *info)
         int                rc;
         int                i;
         ENTRY;
-        RETURN(-EOPNOTSUPP);
 
         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
                 RETURN(-ENOMEM);
 
-        reqbody = req_capsule_client_get(&info->mti_pill,
-                                         &RMF_MDT_BODY);
-        repbody = req_capsule_server_get(&info->mti_pill,
-                                         &RMF_MDT_BODY);
+        reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
+        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
         if (reqbody == NULL || repbody == NULL)
                 RETURN(-EFAULT);
 
@@ -624,14 +621,17 @@ static int mdt_readpage(struct mdt_thread_info *info)
          * reqbody->size contains offset of where to start to read and
          * reqbody->nlink contains number bytes to read.
          */
-        rdpg->rp_offset = reqbody->size;
-        rdpg->rp_count = reqbody->nlink;
-        rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
-        LASSERT((rdpg->rp_offset & (PAGE_SIZE - 1)) == 0);
-
+        rdpg->rp_hash = reqbody->size;
+        if ((__u64)rdpg->rp_hash != reqbody->size) {
+                CERROR("Invalid hash: %#llx != %#llx\n",
+                       (__u64)rdpg->rp_hash, reqbody->size);
+                RETURN(-EFAULT);
+        }
+        rdpg->rp_count  = reqbody->nlink;
+        rdpg->rp_npages = rdpg->rp_count + CFS_PAGE_SIZE - 1 >> CFS_PAGE_SHIFT;
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
         if (rdpg->rp_pages == NULL)
-                GOTO(out, rc = -ENOMEM);
+                RETURN(-ENOMEM);
 
         for (i = 0; i < rdpg->rp_npages; ++i) {
                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
@@ -644,9 +644,6 @@ static int mdt_readpage(struct mdt_thread_info *info)
         if (rc)
                 GOTO(free_rdpg, rc);
 
-        repbody->size = rdpg->rp_size;
-        repbody->valid = OBD_MD_FLSIZE;
-
         /* send pages to client */
         rc = mdt_sendpage(info, rdpg);
 
@@ -655,8 +652,7 @@ free_rdpg:
         for (i = 0; i < rdpg->rp_npages; i++)
                 if (rdpg->rp_pages[i] != NULL)
                         __free_pages(rdpg->rp_pages[i], 0);
-         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
-out:
+        OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
         return rc;
 }
 
index 8fac73b..c6dc005 100644 (file)
@@ -823,8 +823,8 @@ static void osd_object_ref_del(const struct lu_context *ctxt,
         LASSERT(osd_invariant(obj));
 }
 
-int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt,
-                  void *buf, int size, const char *name)
+static int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt,
+                         void *buf, int size, const char *name)
 {
         struct inode           *inode  = osd_dt_obj(dt)->oo_inode;
         struct osd_thread_info *info   = lu_context_key_get(ctxt, &osd_key);
@@ -836,9 +836,9 @@ int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt,
         return inode->i_op->getxattr(dentry, name, buf, size);
 }
 
-int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt,
-                  const void *buf, int size, const char *name, int fl,
-                  struct thandle *handle)
+static int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt,
+                         const void *buf, int size, const char *name, int fl,
+                         struct thandle *handle)
 {
         int fs_flags;
 
@@ -860,8 +860,8 @@ int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt,
         return inode->i_op->setxattr(dentry, name, buf, size, fs_flags);
 }
 
-int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt,
-                   void *buf, int size)
+static int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt,
+                          void *buf, int size)
 {
         struct inode           *inode  = osd_dt_obj(dt)->oo_inode;
         struct osd_thread_info *info   = lu_context_key_get(ctxt, &osd_key);
@@ -873,8 +873,8 @@ int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt,
         return inode->i_op->listxattr(dentry, buf, size);
 }
 
-int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt,
-                  const char *name, struct thandle *handle)
+static int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt,
+                         const char *name, struct thandle *handle)
 {
         struct inode           *inode  = osd_dt_obj(dt)->oo_inode;
         struct osd_thread_info *info   = lu_context_key_get(ctxt, &osd_key);
@@ -886,26 +886,89 @@ int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt,
         return inode->i_op->removexattr(dentry, name);
 }
 
+static int osd_dir_page_build(const struct lu_context *ctx, int first,
+                              void *area, int nob,
+                              struct dt_it_ops  *iops, struct dt_it *it,
+                              __u32 *start, __u32 *end,
+                              struct lu_dirent **last)
+{
+        int result;
+        struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
+        struct lu_fid          *fid  = &info->oti_fid;
+        struct lu_dirent       *ent;
+
+        if (first) {
+                area += sizeof (struct lu_dirpage);
+                nob  -= sizeof (struct lu_dirpage);
+        }
+
+        LASSERT(nob > sizeof *ent);
+
+        ent  = area;
+        result = 0;
+        do {
+                char  *name;
+                int    len;
+                int    recsize;
+                __u32  hash;
+
+                name = (char *)iops->key(ctx, it);
+                len  = iops->key_size(ctx, it);
+
+                *fid  = *(struct lu_fid *)iops->rec(ctx, it);
+                fid_cpu_to_le(fid);
+
+                recsize = (sizeof *ent + len + 3) & ~3;
+                /*
+                 * XXX an interface is needed to obtain a hash.
+                 *
+                 * XXX this is horrible, most horrible hack.
+                 */
+                hash = *(__u32 *)(name - sizeof(__u16) - sizeof(__u32));
+                *end = hash;
+                if (nob >= recsize) {
+                        ent->lde_fid = *fid;
+                        ent->lde_hash = hash;
+                        ent->lde_namelen = cpu_to_le16(len);
+                        ent->lde_reclen  = cpu_to_le16(recsize);
+                        memcpy(ent->lde_name, name, len);
+                        if (first && ent == area)
+                                *start = hash;
+                        *last = ent;
+                        ent = (void *)ent + recsize;
+                        nob -= recsize;
+                        result = iops->next(ctx, it);
+                } else {
+                        /*
+                         * record doesn't fit into page, enlarge previous one.
+                         */
+                        LASSERT(*last != NULL);
+                        (*last)->lde_reclen =
+                                cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
+                                            nob);
+                        break;
+                }
+        } while (result == 0);
+        return result;
+}
 
-int osd_readpage(const struct lu_context *ctxt,
-                 struct dt_object *dt, struct lu_rdpg *rdpg)
+static int osd_readpage(const struct lu_context *ctxt,
+                        struct dt_object *dt, const struct lu_rdpg *rdpg)
 {
+        struct dt_it      *it;
         struct osd_object *obj = osd_dt_obj(dt);
-        int i, rc, tmpcount, tmpsize = 0;
-        struct dt_it_ops *iops;
-        struct dt_it *it;
+        struct dt_it_ops  *iops;
+        int i;
+        int rc;
+        int nob;
 
         LASSERT(lu_object_exists(ctxt, &dt->do_lu));
         LASSERT(osd_invariant(obj));
 
         LASSERT(rdpg->rp_pages != NULL);
 
-        /* check input params */
-        if ((rdpg->rp_offset & (obj->oo_inode->i_blksize - 1)) != 0) {
-                CERROR("offset "LPU64" not on a block boundary of %lu\n",
-                       rdpg->rp_offset, obj->oo_inode->i_blksize);
+        if (rdpg->rp_count <= 0)
                 return -EFAULT;
-        }
 
         if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) {
                 CERROR("size %u is not multiple of blocksize %lu\n",
@@ -913,67 +976,58 @@ int osd_readpage(const struct lu_context *ctxt,
                 return -EFAULT;
         }
 
-        /* prepare output */
-        rdpg->rp_size = obj->oo_inode->i_size;
-
         /*
-         * iterating directory and fill pages from @rdpg
+         * iterating through directory and fill pages from @rdpg
          */
         iops = &dt->do_index_ops->dio_it;
         it = iops->init(ctxt, dt);
         if (it == NULL)
                 return -ENOMEM;
-
+        /*
+         * XXX position iterator at rdpg->rp_hash
+         */
         rc = iops->get(ctxt, it, (const void *)"");
         if (rc > 0) {
-                for (i = 0, tmpcount = rdpg->rp_count;
-                     i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
-                        struct lu_dir_entry *entry, *last;
-                        int page_space = PAGE_SIZE;
-
-                        tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
-                        entry = kmap(rdpg->rp_pages[i]);
-                        last = entry;
-
-                        for (rc = 0; rc == 0; ) {
-                                rc = iops->next(ctxt, it);
-
-                                if (rc == 0) {
-                                        struct lu_fid *fid;
-                                        char          *name;
-                                        int            len;
-
-                                        fid  = (void *)iops->rec(ctxt, it);
-                                        name = (void *)iops->key(ctxt, it);
-                                        len  = iops->key_size(ctxt, it);
-
-                                        entry->de_fid = *fid;
-                                        fid_cpu_to_le(&entry->de_fid);
-
-                                        entry->de_name_len = cpu_to_le16(len + 1);
-                                        entry->de_rec_len = cpu_to_le16(LU_DIR_REC_LEN(len + 1));
-
-                                        strncpy(entry->de_name, name, len);
-                                        entry->de_name[len] = '\0';
-
-                                        page_space -= LU_DIR_REC_LEN(len + 1);
-                                        last = entry;
-
-                                        entry = (struct lu_dir_entry *)((char *)entry +
-                                                                        LU_DIR_REC_LEN(len + 1));
-                                }
-                        }
-                        /* last entry fills whole space in the page */
-                        if (page_space < PAGE_SIZE)
-                                last->de_rec_len += page_space;
-                        kunmap(rdpg->rp_pages[i]);
+                struct page      *pg; /* no, Richard, it _is_ initialized */
+                struct lu_dirent *last;
+                __u32             hash_start;
+                __u32             hash_end;
+
+                for (i = 0, rc = 0, nob = rdpg->rp_count;
+                     rc == 0 && nob > 0; i++, nob -= CFS_PAGE_SIZE) {
+
+                        LASSERT(i < rdpg->rp_npages);
+                        pg = rdpg->rp_pages[i];
+                        rc = osd_dir_page_build(ctxt, !i, kmap(pg),
+                                                min_t(int, nob, CFS_PAGE_SIZE),
+                                                iops, it,
+                                                &hash_start, &hash_end, &last);
+                        kunmap(pg);
                 }
                 iops->put(ctxt, it);
-
-                rc = 0;
-        } else if (rc == 0) {
+                if (rc > 0) {
+                        /*
+                         * end of directory.
+                         */
+                        hash_end = ~0ul;
+                        rc = 0;
+                }
+                if (rc == 0) {
+                        struct lu_dirpage *dp;
+
+                        dp = kmap(rdpg->rp_pages[0]);
+                        dp->ldp_hash_start = hash_start;
+                        dp->ldp_hash_end   = hash_end;
+                        kunmap(rdpg->rp_pages[0]);
+                        kmap(pg);
+                        LASSERT(page_address(pg) <= (void *)last &&
+                                (void *)last < page_address(pg) + CFS_PAGE_SIZE);
+                        last->lde_reclen = 0;
+                        kunmap(pg);
+                }
+        } else if (rc == 0)
                 rc = -EIO;
-        }
+        iops->put(ctxt, it);
         iops->fini(ctxt, it);
 
         return rc;
@@ -990,6 +1044,7 @@ static struct dt_object_operations osd_obj_ops = {
         .do_ref_del    = osd_object_ref_del,
         .do_xattr_get  = osd_xattr_get,
         .do_xattr_set  = osd_xattr_set,
+        .do_xattr_del  = osd_xattr_del,
         .do_xattr_list = osd_xattr_list,
         .do_readpage   = osd_readpage
 };