Whamcloud - gitweb
LU-4906 llite: read page from LMV/MDC for readdir 22/10622/8
authorWang Di <di.wang@intel.com>
Fri, 6 Jun 2014 08:49:37 +0000 (01:49 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 17 Jun 2014 14:48:56 +0000 (14:48 +0000)
This patch will change readdir iteration from read entry to read
page. During rm -rf, the statahead thread will enqueue the lock for
each entry in mdc_read_entry, in the mean time, MDT will revoke
the lock when it deletes each entry, which causes ldlm lock ping
pong behavior, and a lot extra RPC.

With this patch, llite will read one page from MDC each time, then
stat ahead thread will hold the page and do getattr for all entries
in this page. Note: it does not need to hold ldlm lock after it
get the page, because stat ahead thread can live with -ENOENT
or ESTALE, if the entry becomes stale during this time.

With this patch, the performance of "rm -rf" will be improved
for current master.

1. without this patch on master

[root@mds tests]# ./createmany -o /mnt/lustre/test1/f- 10000
total: 10000 creates in 11.98 seconds: 835.02 creates/second
[root@mds tests]# ls /mnt/lustre/test1/ | wc
  10000   10000   68890
[root@mds tests]# time rm -rf /mnt/lustre/test1

real 1m4.496s
user 0m0.014s
sys 0m3.832s

2. with this patch on master

[root@mds tests]# ./createmany -o /mnt/lustre/test1/f- 10000
total: 10000 creates in 11.72 seconds: 853.50 creates/second
[root@mds tests]# time rm -rf /mnt/lustre/test1

real 0m8.151s
user 0m0.012s
sys 0m2.242s

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I189f0039b5624455b70d1381b43d452cc8766543
Reviewed-on: http://review.whamcloud.com/10622
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
12 files changed:
lustre/include/lclient.h
lustre/include/lustre_mdc.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/liblustre/dir.c
lustre/llite/dir.c
lustre/llite/llite_internal.h
lustre/llite/llite_nfs.c
lustre/llite/statahead.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_request.c
lustre/obdclass/lprocfs_status.c

index 3016c08..62a79d7 100644 (file)
@@ -510,7 +510,6 @@ enum op_cli_flags {
        CLI_HASH64      = 1 << 2,
        CLI_API32       = 1 << 3,
        CLI_MIGRATE     = 1 << 4,
-       CLI_NEXT_ENTRY  = 1 << 5,
 };
 
 #endif /*LCLIENT_H */
index 040b975..9067b6a 100644 (file)
@@ -107,7 +107,7 @@ static inline void mdc_get_rpc_lock(struct mdc_rpc_lock *lck,
        ENTRY;
 
        if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
-                          it->it_op == IT_LAYOUT))
+                          it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
                return;
 
        /* This would normally block until the existing request finishes.
@@ -145,7 +145,7 @@ static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck,
                                    struct lookup_intent *it)
 {
        if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
-                          it->it_op == IT_LAYOUT))
+                          it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
                goto out;
 
        if (lck->rpcl_it == MDC_FAKE_RPCL_IT) { /* OBD_FAIL_MDC_RPCS_SEM */
index 1007438..f8255d0 100644 (file)
@@ -857,13 +857,6 @@ struct md_op_data {
        enum mds_op_bias        op_bias;
 
        /* Used by readdir */
-       __u64                   op_hash_offset;
-
-       /* The offset of dir entry among the same hash entries, which
-        * is used to resolve the hash conflict. */
-       __u32                   op_same_hash_offset;
-
-       /* Used by readdir */
        __u32                   op_npages;
 
        /* used to transfer info between the stacks of MD client
@@ -1099,9 +1092,9 @@ struct md_ops {
        int (*m_fsync)(struct obd_export *, const struct lu_fid *,
                       struct obd_capa *, struct ptlrpc_request **);
 
-       int (*m_read_entry)(struct obd_export *, struct md_op_data *,
-                           struct md_callback *cb_op, struct lu_dirent **ld,
-                           struct page **ppage);
+       int (*m_read_page)(struct obd_export *, struct md_op_data *,
+                          struct md_callback *cb_op, __u64 hash_offset,
+                          struct page **ppage);
 
        int (*m_unlink)(struct obd_export *, struct md_op_data *,
                        struct ptlrpc_request **);
index 962f789..331a9e6 100644 (file)
@@ -1631,17 +1631,18 @@ static inline int md_fsync(struct obd_export *exp, const struct lu_fid *fid,
        RETURN(rc);
 }
 
-static inline int md_read_entry(struct obd_export *exp,
-                               struct md_op_data *op_data,
-                               struct md_callback *cb_op,
-                               struct lu_dirent **ld,
-                               struct page **ppage)
+static inline int md_read_page(struct obd_export *exp,
+                              struct md_op_data *op_data,
+                              struct md_callback *cb_op,
+                              __u64  hash_offset,
+                              struct page **ppage)
 {
        int rc;
        ENTRY;
-       EXP_CHECK_MD_OP(exp, read_entry);
-       EXP_MD_COUNTER_INCREMENT(exp, read_entry);
-       rc = MDP(exp->exp_obd, read_entry)(exp, op_data, cb_op, ld, ppage);
+       EXP_CHECK_MD_OP(exp, read_page);
+       EXP_MD_COUNTER_INCREMENT(exp, read_page);
+       rc = MDP(exp->exp_obd, read_page)(exp, op_data, cb_op, hash_offset,
+                                         ppage);
        RETURN(rc);
 }
 
index b14e1a2..2b41e8f 100644 (file)
@@ -98,7 +98,6 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         }
         ldlm_lock_dump_handle(D_OTHER, &lockh);
 
-        op_data.op_hash_offset = hash_x_index(page->index, 0);
         op_data.op_npages = 1;
         rc = md_readpage(sbi->ll_md_exp, &op_data, &page, &request);
         if (!rc) {
index 6ef796a..bfc75ea 100644 (file)
  * lmv_adjust_dirpages().
  *
  */
-/**
- * The following three APIs will be used by llite to iterate directory
- * entries from MDC dir page caches.
- *
- * ll_dir_entry_start(next) will lookup(return) entry by op_hash_offset.
- * To avoid extra memory allocation, the @entry will be pointed to
- * the dir entries in MDC page directly, so these pages can not be released
- * until the entry has been accessed in ll_readdir(or statahead).
- *
- * The iterate process will be
- *
- * ll_dir_entry_start: locate the page in MDC, and return the first entry.
- *                    hold the page.
- *
- * ll_dir_entry_next: return the next entry in the current page, if it reaches
- *                   to the end, release current page.
- *
- * ll_dir_entry_end: release the last page.
- **/
-struct lu_dirent *ll_dir_entry_start(struct inode *dir,
-                                    struct md_op_data *op_data,
-                                    struct page **ppage)
+struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data,
+                            __u64 offset, struct ll_dir_chain *chain)
 {
-       struct lu_dirent *entry = NULL;
-       struct md_callback cb_op;
-       int rc;
-       ENTRY;
+       struct md_callback      cb_op;
+       struct page             *page;
+       int                     rc;
 
-       LASSERT(*ppage == NULL);
        cb_op.md_blocking_ast = ll_md_blocking_ast;
-       op_data->op_cli_flags &= ~CLI_NEXT_ENTRY;
-       rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage);
+       rc = md_read_page(ll_i2mdexp(dir), op_data, &cb_op, offset, &page);
        if (rc != 0)
-               entry = ERR_PTR(rc);
-       RETURN(entry);
+               return ERR_PTR(rc);
+
+       return page;
 }
 
-struct lu_dirent *ll_dir_entry_next(struct inode *dir,
-                                   struct md_op_data *op_data,
-                                   struct lu_dirent *ent,
-                                   struct page **ppage)
+void ll_release_page(struct inode *inode, struct page *page,
+                    bool remove)
 {
-       struct lu_dirent *entry = NULL;
-       struct md_callback cb_op;
-       int rc;
-       ENTRY;
-
-       op_data->op_hash_offset = le64_to_cpu(ent->lde_hash);
-
-       /* release last page */
-       LASSERT(*ppage != NULL);
-       kunmap(*ppage);
-       page_cache_release(*ppage);
-
-       cb_op.md_blocking_ast = ll_md_blocking_ast;
-       op_data->op_cli_flags |= CLI_NEXT_ENTRY;
-       rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage);
-       if (rc != 0)
-               entry = ERR_PTR(rc);
+       kunmap(page);
+
+       /* Always remove the page for striped dir, because the page is
+        * built from temporarily in LMV layer */
+       if (inode != NULL && S_ISDIR(inode->i_mode) &&
+           ll_i2info(inode)->lli_lsm_md != NULL) {
+               __free_page(page);
+               return;
+       }
 
-       RETURN(entry);
+       if (remove) {
+               lock_page(page);
+               if (likely(page->mapping != NULL))
+                       truncate_complete_page(page->mapping, page);
+               unlock_page(page);
+       }
+       page_cache_release(page);
 }
 
 #ifdef HAVE_DIR_CONTEXT
-int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data,
                struct dir_context *ctx)
 {
 #else
-int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data,
                void *cookie, filldir_t filldir)
 {
 #endif
-       struct ll_sb_info       *sbi = ll_i2sbi(inode);
-       struct ll_dir_chain     chain;
-       struct lu_dirent        *ent;
-       int                     api32 = ll_need_32bit_api(sbi);
-       int                     hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH;
-       int                     done = 0;
-       int                     rc = 0;
-       __u64                   hash = MDS_DIR_END_OFF;
-       struct page             *page = NULL;
+       struct ll_sb_info    *sbi        = ll_i2sbi(inode);
+       __u64                 pos        = *ppos;
+       bool                  is_api32 = ll_need_32bit_api(sbi);
+       bool                  is_hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH;
+       struct page          *page;
+       struct ll_dir_chain   chain;
+       bool                  done = false;
+       int                   rc = 0;
        ENTRY;
 
-        ll_dir_chain_init(&chain);
-       for (ent = ll_dir_entry_start(inode, op_data, &page);
-            ent != NULL && !IS_ERR(ent) && !done;
-            ent = ll_dir_entry_next(inode, op_data, ent, &page)) {
-               __u16          type;
-               int            namelen;
-               struct lu_fid  fid;
-               __u64          lhash;
-               __u64          ino;
-
-               hash = le64_to_cpu(ent->lde_hash);
-               if (hash < op_data->op_hash_offset)
-                       /*
-                        * Skip until we find target hash
-                        * value.
-                        */
-                       continue;
-               namelen = le16_to_cpu(ent->lde_namelen);
-               if (namelen == 0)
-                       /*
-                        * Skip dummy record.
-                        */
-                       continue;
+       ll_dir_chain_init(&chain);
 
-               if (api32 && hash64)
-                       lhash = hash >> 32;
-               else
-                       lhash = hash;
-               fid_le_to_cpu(&fid, &ent->lde_fid);
-               ino = cl_fid_build_ino(&fid, api32);
-               type = ll_dirent_type_get(ent);
+       page = ll_get_dir_page(inode, op_data, pos, &chain);
 
-#ifdef HAVE_DIR_CONTEXT
-               /* For 'll_nfs_get_name_filldir()', it will try
-                * to access the 'ent' through its 'lde_name',
-                * so the parameter 'name' for 'filldir()' must
-                * be part of the 'ent'. */
-               done = !dir_emit(ctx, ent->lde_name, namelen, ino, type);
-#else
-               done = filldir(cookie, ent->lde_name, namelen, lhash,
-                              ino, type);
-#endif
-               if (done) {
-                       if (op_data->op_hash_offset != MDS_DIR_END_OFF)
-                               op_data->op_hash_offset = hash;
+       while (rc == 0 && !done) {
+               struct lu_dirpage *dp;
+               struct lu_dirent  *ent;
+               __u64 hash;
+               __u64 next;
+
+               if (IS_ERR(page)) {
+                       rc = PTR_ERR(page);
                        break;
                }
-       }
 
-       if (IS_ERR(ent))
-               rc = PTR_ERR(ent);
-       else if (ent == NULL)
-               op_data->op_hash_offset = MDS_DIR_END_OFF;
+               hash = MDS_DIR_END_OFF;
+               dp = page_address(page);
+               for (ent = lu_dirent_start(dp); ent != NULL && !done;
+                    ent = lu_dirent_next(ent)) {
+                       __u16          type;
+                       int            namelen;
+                       struct lu_fid  fid;
+                       __u64          lhash;
+                       __u64          ino;
+
+                       hash = le64_to_cpu(ent->lde_hash);
+                       if (hash < pos)
+                               /*
+                                * Skip until we find target hash
+                                * value.
+                                */
+                               continue;
+
+                       namelen = le16_to_cpu(ent->lde_namelen);
+                       if (namelen == 0)
+                               /*
+                                * Skip dummy record.
+                                */
+                               continue;
+
+                       if (is_api32 && is_hash64)
+                               lhash = hash >> 32;
+                       else
+                               lhash = hash;
+                       fid_le_to_cpu(&fid, &ent->lde_fid);
+                       ino = cl_fid_build_ino(&fid, is_api32);
+                       type = ll_dirent_type_get(ent);
+                       /* For 'll_nfs_get_name_filldir()', it will try
+                        * to access the 'ent' through its 'lde_name',
+                        * so the parameter 'name' for 'filldir()' must
+                        * be part of the 'ent'. */
+                       done = filldir(cookie, ent->lde_name, namelen, lhash,
+                                      ino, type);
+               }
 
-       if (page != NULL) {
-               kunmap(page);
-               page_cache_release(page);
+               if (done) {
+                       pos = hash;
+                       ll_release_page(inode, page, false);
+                       break;
+               }
+
+               next = le64_to_cpu(dp->ldp_hash_end);
+               pos = next;
+               if (pos == MDS_DIR_END_OFF) {
+                       /*
+                        * End of directory reached.
+                        */
+                       done = 1;
+                       ll_release_page(inode, page, false);
+               } else {
+                       /*
+                        * Normal case: continue to the next
+                        * page.
+                        */
+                       ll_release_page(inode, page,
+                                       le32_to_cpu(dp->ldp_flags) &
+                                       LDF_COLLIDE);
+                       next = pos;
+                       page = ll_get_dir_page(inode, op_data, pos,
+                                              &chain);
+               }
        }
 
+       *ppos = pos;
        ll_dir_chain_fini(&chain);
        RETURN(rc);
 }
@@ -338,21 +336,22 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
                 * object */
                if (fid_is_zero(&op_data->op_fid3)) {
                        rc = ll_dir_get_parent_fid(inode, &op_data->op_fid3);
-                       if (rc != 0)
+                       if (rc != 0) {
+                               ll_finish_md_op_data(op_data);
                                RETURN(rc);
+                       }
                }
        }
-       op_data->op_hash_offset = pos;
        op_data->op_max_pages = sbi->ll_md_brw_pages;
 #ifdef HAVE_DIR_CONTEXT
        ctx->pos = pos;
-       rc = ll_dir_read(inode, op_data, ctx);
+       rc = ll_dir_read(inode, &pos, op_data, ctx);
        pos = ctx->pos;
 #else
-       rc = ll_dir_read(inode, op_data, cookie, filldir);
+       rc = ll_dir_read(inode, &pos, op_data, cookie, filldir);
 #endif
        if (lfd != NULL)
-               lfd->lfd_pos = op_data->op_hash_offset;
+               lfd->lfd_pos = pos;
 
        if (pos == MDS_DIR_END_OFF) {
                if (api32)
@@ -361,9 +360,7 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
                        pos = LL_DIR_END_OFF;
        } else {
                if (api32 && hash64)
-                       pos = op_data->op_hash_offset >> 32;
-               else
-                       pos = op_data->op_hash_offset;
+                       pos = pos >> 32;
        }
 #ifdef HAVE_DIR_CONTEXT
        ctx->pos = pos;
index 85ea3ab..ab1a43d 100644 (file)
@@ -720,23 +720,18 @@ static void ll_stats_ops_tally(struct ll_sb_info *sbi, int op, int count) {}
 extern const struct file_operations ll_dir_operations;
 extern const struct inode_operations ll_dir_inode_operations;
 #ifdef HAVE_DIR_CONTEXT
-int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+int ll_dir_read(struct inode *inode, __u64 *pos, struct md_op_data *op_data,
                struct dir_context *ctx);
 #else
-int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+int ll_dir_read(struct inode *inode, __u64 *pos, struct md_op_data *op_data,
                void *cookie, filldir_t filldir);
 #endif
 int ll_get_mdt_idx(struct inode *inode);
 int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid);
+struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data,
+                            __u64 offset, struct ll_dir_chain *chain);
+void ll_release_page(struct inode *inode, struct page *page, bool remove);
 
-struct lu_dirent *ll_dir_entry_start(struct inode *dir,
-                                    struct md_op_data *op_data,
-                                    struct page **ppage);
-
-struct lu_dirent *ll_dir_entry_next(struct inode *dir,
-                                   struct md_op_data *op_data,
-                                   struct lu_dirent *ent,
-                                   struct page **ppage);
 /* llite/namei.c */
 extern const struct inode_operations ll_special_inode_operations;
 
@@ -1239,7 +1234,8 @@ struct ll_statahead_info {
         unsigned int            sai_skip_hidden;/* skipped hidden dentry count */
        unsigned int            sai_ls_all:1,   /* "ls -al", do stat-ahead for
                                                 * hidden entries */
-                               sai_agl_valid:1;/* AGL is valid for the dir */
+                               sai_agl_valid:1,/* AGL is valid for the dir */
+                               sai_in_readpage:1;/* statahead is in readdir()*/
        wait_queue_head_t       sai_waitq;      /* stat-ahead wait queue */
        struct ptlrpc_thread    sai_thread;     /* stat-ahead thread */
        struct ptlrpc_thread    sai_agl_thread; /* AGL thread */
index d92424f..6fb3315 100644 (file)
@@ -239,6 +239,7 @@ static int ll_get_name(struct dentry *dentry, char *name,
                .lgd_found = 0,
        };
        struct md_op_data *op_data;
+       __u64   pos = 0;
        int rc;
        ENTRY;
 
@@ -253,13 +254,12 @@ static int ll_get_name(struct dentry *dentry, char *name,
        if (IS_ERR(op_data))
                GOTO(out, rc = PTR_ERR(op_data));
 
-       op_data->op_hash_offset = 0;
        op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
        mutex_lock(&dir->i_mutex);
 #ifdef HAVE_DIR_CONTEXT
-       rc = ll_dir_read(dir, op_data, &lgd.ctx);
+       rc = ll_dir_read(dir, &pos, op_data, &lgd.ctx);
 #else
-       rc = ll_dir_read(dir, op_data, &lgd, ll_nfs_get_name_filldir);
+       rc = ll_dir_read(dir, &pos, op_data, &lgd, ll_nfs_get_name_filldir);
 #endif
        mutex_unlock(&dir->i_mutex);
        ll_finish_md_op_data(op_data);
index ca1d789..5e2a4c7 100644 (file)
@@ -1077,8 +1077,8 @@ static int ll_statahead_thread(void *arg)
        struct md_op_data        *op_data;
        struct ll_dir_chain       chain;
        struct l_wait_info        lwi    = { 0 };
-       struct lu_dirent         *ent;
        struct page             *page = NULL;
+       __u64                   pos = 0;
        ENTRY;
 
        thread->t_pid = current_pid();
@@ -1090,7 +1090,6 @@ static int ll_statahead_thread(void *arg)
        if (IS_ERR(op_data))
                RETURN(PTR_ERR(op_data));
 
-       op_data->op_hash_offset = 0;
        op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
 
        if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
@@ -1107,138 +1106,168 @@ static int ll_statahead_thread(void *arg)
        wake_up(&thread->t_ctl_waitq);
 
        ll_dir_chain_init(&chain);
-       for (ent = ll_dir_entry_start(dir, op_data, &page);
-            ent != NULL && !IS_ERR(ent);
-            ent = ll_dir_entry_next(dir, op_data, ent, &page)) {
-               __u64 hash;
-               int namelen;
-               char *name;
-
-               hash = le64_to_cpu(ent->lde_hash);
-               if (unlikely(hash < op_data->op_hash_offset))
-                       /*
-                        * Skip until we find target hash value.
-                        */
-                       continue;
+       page = ll_get_dir_page(dir, op_data, pos, &chain);
+       while (1) {
+               struct lu_dirpage *dp;
+               struct lu_dirent  *ent;
+
+               if (IS_ERR(page)) {
+                       rc = PTR_ERR(page);
+                       CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
+                              "/"LPU64" opendir_pid = %u: rc = %d\n",
+                              PFID(ll_inode2fid(dir)), pos, sai->sai_index,
+                              plli->lli_opendir_pid, rc);
+                       GOTO(out, rc);
+               }
 
-               namelen = le16_to_cpu(ent->lde_namelen);
-               if (unlikely(namelen == 0))
-                       /*
-                        * Skip dummy record.
-                        */
-                       continue;
+               dp = page_address(page);
+               for (ent = lu_dirent_start(dp); ent != NULL;
+                    ent = lu_dirent_next(ent)) {
+                       __u64 hash;
+                       int namelen;
+                       char *name;
 
-               name = ent->lde_name;
-               if (name[0] == '.') {
-                       if (namelen == 1) {
-                               /*
-                                * skip "."
-                                */
-                               continue;
-                       } else if (name[1] == '.' && namelen == 2) {
+                       hash = le64_to_cpu(ent->lde_hash);
+                       if (unlikely(hash < pos))
                                /*
-                                * skip ".."
+                                * Skip until we find target hash value.
                                 */
                                continue;
-                       } else if (!sai->sai_ls_all) {
+
+                       namelen = le16_to_cpu(ent->lde_namelen);
+                       if (unlikely(namelen == 0))
                                /*
-                                * skip hidden files.
+                                * Skip dummy record.
                                 */
-                               sai->sai_skip_hidden++;
                                continue;
+
+                       name = ent->lde_name;
+                       if (name[0] == '.') {
+                               if (namelen == 1) {
+                                       /*
+                                        * skip "."
+                                        */
+                                       continue;
+                               } else if (name[1] == '.' && namelen == 2) {
+                                       /*
+                                        * skip ".."
+                                        */
+                                       continue;
+                               } else if (!sai->sai_ls_all) {
+                                       /*
+                                        * skip hidden files.
+                                        */
+                                       sai->sai_skip_hidden++;
+                                       continue;
+                               }
                        }
-               }
 
-               /*
-                * don't stat-ahead first entry.
-                */
-               if (unlikely(++first == 1))
-                       continue;
+                       /*
+                        * don't stat-ahead first entry.
+                        */
+                       if (unlikely(++first == 1))
+                               continue;
 
 keep_it:
-               l_wait_event(thread->t_ctl_waitq,
-                            !sa_sent_full(sai) ||
-                            !sa_received_empty(sai) ||
-                            !agl_list_empty(sai) ||
-                            !thread_is_running(thread),
-                            &lwi);
+                       l_wait_event(thread->t_ctl_waitq,
+                                    !sa_sent_full(sai) ||
+                                    !sa_received_empty(sai) ||
+                                    !agl_list_empty(sai) ||
+                                    !thread_is_running(thread),
+                                    &lwi);
 
 interpret_it:
-               while (!sa_received_empty(sai))
-                       ll_post_statahead(sai);
+                       while (!sa_received_empty(sai))
+                               ll_post_statahead(sai);
 
-               if (unlikely(!thread_is_running(thread)))
-                       GOTO(out, rc = 0);
+                       if (unlikely(!thread_is_running(thread))) {
+                               ll_release_page(dir, page, false);
+                               GOTO(out, rc = 0);
+                       }
 
-               /* If no window for metadata statahead, but there are
-                * some AGL entries to be triggered, then try to help
-                * to process the AGL entries. */
-               if (sa_sent_full(sai)) {
-                       spin_lock(&plli->lli_agl_lock);
-                       while (!agl_list_empty(sai)) {
-                               clli = agl_first_entry(sai);
-                               list_del_init(&clli->lli_agl_list);
+                       /* If no window for metadata statahead, but there are
+                        * some AGL entries to be triggered, then try to help
+                        * to process the AGL entries. */
+                       if (sa_sent_full(sai)) {
+                               spin_lock(&plli->lli_agl_lock);
+                               while (!agl_list_empty(sai)) {
+                                       clli = agl_first_entry(sai);
+                                       list_del_init(&clli->lli_agl_list);
+                                       spin_unlock(&plli->lli_agl_lock);
+                                       ll_agl_trigger(&clli->lli_vfs_inode,
+                                                      sai);
+
+                                       if (!sa_received_empty(sai))
+                                               goto interpret_it;
+
+                                       if (unlikely(
+                                               !thread_is_running(thread))) {
+                                               ll_release_page(dir, page,
+                                                               false);
+                                               GOTO(out, rc = 0);
+                                       }
+
+                                       if (!sa_sent_full(sai))
+                                               goto do_it;
+
+                                       spin_lock(&plli->lli_agl_lock);
+                               }
                                spin_unlock(&plli->lli_agl_lock);
-                               ll_agl_trigger(&clli->lli_vfs_inode,
-                                              sai);
 
-                               if (!sa_received_empty(sai))
-                                       goto interpret_it;
+                               goto keep_it;
+                       }
+do_it:
+                       ll_statahead_one(parent, name, namelen);
+               }
 
-                               if (unlikely(
-                                       !thread_is_running(thread)))
+               pos = le64_to_cpu(dp->ldp_hash_end);
+               if (pos == MDS_DIR_END_OFF) {
+                       /*
+                        * End of directory reached.
+                        */
+                       ll_release_page(dir, page, false);
+                       while (1) {
+                               l_wait_event(thread->t_ctl_waitq,
+                                            !sa_received_empty(sai) ||
+                                           sai->sai_sent == sai->sai_replied ||
+                                            !thread_is_running(thread),
+                                            &lwi);
+
+                               while (!sa_received_empty(sai))
+                                       ll_post_statahead(sai);
+
+                               if (unlikely(!thread_is_running(thread)))
                                        GOTO(out, rc = 0);
 
-                               if (!sa_sent_full(sai))
-                                       goto do_it;
+                               if (sai->sai_sent == sai->sai_replied &&
+                                   sa_received_empty(sai))
+                                       break;
+                       }
 
+                       spin_lock(&plli->lli_agl_lock);
+                       while (!agl_list_empty(sai) &&
+                              thread_is_running(thread)) {
+                               clli = agl_first_entry(sai);
+                               list_del_init(&clli->lli_agl_list);
+                               spin_unlock(&plli->lli_agl_lock);
+                               ll_agl_trigger(&clli->lli_vfs_inode, sai);
                                spin_lock(&plli->lli_agl_lock);
                        }
                        spin_unlock(&plli->lli_agl_lock);
 
-                       goto keep_it;
-               }
-
-do_it:
-               ll_statahead_one(parent, name, namelen);
-       }
-
-       if (page != NULL) {
-               kunmap(page);
-               page_cache_release(page);
-       }
-
-        /*
-        * End of directory reached.
-        */
-       while (1) {
-               l_wait_event(thread->t_ctl_waitq,
-                            !sa_received_empty(sai) ||
-                            sai->sai_sent == sai->sai_replied ||
-                            !thread_is_running(thread),
-                            &lwi);
-
-               while (!sa_received_empty(sai))
-                       ll_post_statahead(sai);
-
-               if (unlikely(!thread_is_running(thread)))
                        GOTO(out, rc = 0);
-
-               if (sai->sai_sent == sai->sai_replied &&
-                   sa_received_empty(sai))
-                       break;
-       }
-
-       spin_lock(&plli->lli_agl_lock);
-       while (!agl_list_empty(sai) &&
-              thread_is_running(thread)) {
-               clli = agl_first_entry(sai);
-               list_del_init(&clli->lli_agl_list);
-               spin_unlock(&plli->lli_agl_lock);
-               ll_agl_trigger(&clli->lli_vfs_inode, sai);
-               spin_lock(&plli->lli_agl_lock);
+               } else {
+                       /*
+                        * chain is exhausted.
+                        * Normal case: continue to the next page.
+                        */
+                       ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
+                                             LDF_COLLIDE);
+                       sai->sai_in_readpage = 1;
+                       page = ll_get_dir_page(dir, op_data, pos, &chain);
+                       sai->sai_in_readpage = 0;
+               }
        }
-       spin_unlock(&plli->lli_agl_lock);
 out:
        EXIT;
        ll_finish_md_op_data(op_data);
@@ -1349,88 +1378,117 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
        struct qstr          *target = &dentry->d_name;
        struct md_op_data    *op_data;
        int                   dot_de;
-       struct lu_dirent     *ent;
        struct page          *page = NULL;
        int                   rc     = LS_NONE_FIRST_DE;
+       __u64                 pos = 0;
        ENTRY;
 
-       ll_dir_chain_init(&chain);
-
        op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
                                     LUSTRE_OPC_ANY, dir);
        if (IS_ERR(op_data))
-               GOTO(out, rc = PTR_ERR(op_data));
+               RETURN(PTR_ERR(op_data));
        /**
         *FIXME choose the start offset of the readdir
         */
        op_data->op_stripe_offset = 0;
-       op_data->op_hash_offset = 0;
        op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
 
-       for (ent = ll_dir_entry_start(dir, op_data, &page);
-            ent != NULL && !IS_ERR(ent);
-            ent = ll_dir_entry_next(dir, op_data, ent, &page)) {
-               __u64 hash;
-               int namelen;
-               char *name;
-
-               hash = le64_to_cpu(ent->lde_hash);
-               /* The ll_get_dir_page() can return any page containing
-                * the given hash which may be not the start hash. */
-               if (unlikely(hash < op_data->op_hash_offset))
-                       continue;
-
-               namelen = le16_to_cpu(ent->lde_namelen);
-               if (unlikely(namelen == 0))
-                       /*
-                        * skip dummy record.
-                        */
-                       continue;
+       ll_dir_chain_init(&chain);
+       page = ll_get_dir_page(dir, op_data, 0, &chain);
 
-               name = ent->lde_name;
-               if (name[0] == '.') {
-                       if (namelen == 1)
-                               /*
-                                * skip "."
-                                */
+       while (1) {
+               struct lu_dirpage *dp;
+               struct lu_dirent  *ent;
+
+               if (IS_ERR(page)) {
+                       struct ll_inode_info *lli = ll_i2info(dir);
+
+                       rc = PTR_ERR(page);
+                       CERROR("%s: reading dir "DFID" at "LPU64
+                              "opendir_pid = %u : rc = %d\n",
+                              ll_get_fsname(dir->i_sb, NULL, 0),
+                              PFID(ll_inode2fid(dir)), pos,
+                              lli->lli_opendir_pid, rc);
+                       break;
+               }
+
+               dp = page_address(page);
+               for (ent = lu_dirent_start(dp); ent != NULL;
+                    ent = lu_dirent_next(ent)) {
+                       __u64 hash;
+                       int namelen;
+                       char *name;
+
+                       hash = le64_to_cpu(ent->lde_hash);
+                       /* The ll_get_dir_page() can return any page containing
+                        * the given hash which may be not the start hash. */
+                       if (unlikely(hash < pos))
                                continue;
-                       else if (name[1] == '.' && namelen == 2)
+
+                       namelen = le16_to_cpu(ent->lde_namelen);
+                       if (unlikely(namelen == 0))
                                /*
-                                * skip ".."
+                                * skip dummy record.
                                 */
                                continue;
-                       else
-                               dot_de = 1;
-               } else {
-                       dot_de = 0;
-               }
 
-               if (dot_de && target->name[0] != '.') {
-                       CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
-                              target->len, target->name,
-                              namelen, name);
-                       continue;
-               }
+                       name = ent->lde_name;
+                       if (name[0] == '.') {
+                               if (namelen == 1)
+                                       /*
+                                        * skip "."
+                                        */
+                                       continue;
+                               else if (name[1] == '.' && namelen == 2)
+                                       /*
+                                        * skip ".."
+                                        */
+                                       continue;
+                               else
+                                       dot_de = 1;
+                       } else {
+                               dot_de = 0;
+                       }
 
-               if (target->len != namelen ||
-                   memcmp(target->name, name, namelen) != 0)
-                       rc = LS_NONE_FIRST_DE;
-               else if (!dot_de)
-                       rc = LS_FIRST_DE;
-               else
-                       rc = LS_FIRST_DOT_DE;
+                       if (dot_de && target->name[0] != '.') {
+                               CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
+                                      target->len, target->name,
+                                      namelen, name);
+                               continue;
+                       }
 
-               break;
-       }
-        EXIT;
+                       if (target->len != namelen ||
+                           memcmp(target->name, name, namelen) != 0)
+                               rc = LS_NONE_FIRST_DE;
+                       else if (!dot_de)
+                               rc = LS_FIRST_DE;
+                       else
+                               rc = LS_FIRST_DOT_DE;
 
-       if (page != NULL) {
-               kunmap(page);
-               page_cache_release(page);
+                       ll_release_page(dir, page, false);
+                       GOTO(out, rc);
+               }
+               pos = le64_to_cpu(dp->ldp_hash_end);
+               if (pos == MDS_DIR_END_OFF) {
+                       /*
+                        * End of directory reached.
+                        */
+                       ll_release_page(dir, page, false);
+                       GOTO(out, rc);
+               } else {
+                       /*
+                        * chain is exhausted
+                        * Normal case: continue to the next page.
+                        */
+                       ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
+                                             LDF_COLLIDE);
+                       page = ll_get_dir_page(dir, op_data, pos, &chain);
+               }
        }
-       ll_finish_md_op_data(op_data);
+       EXIT;
 out:
        ll_dir_chain_fini(&chain);
+       ll_finish_md_op_data(op_data);
         return rc;
 }
 
@@ -1546,6 +1604,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
                         RETURN(entry ? 1 : -EAGAIN);
                 }
 
+               /* if statahead is busy in readdir, help it do post-work */
+               while (!ll_sa_entry_stated(entry) &&
+                      sai->sai_in_readpage && !sa_received_empty(sai))
+                       ll_post_statahead(sai);
+
                 if (!ll_sa_entry_stated(entry)) {
                         sai->sai_index_wait = entry->se_index;
                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
index 8750215..4994986 100644 (file)
@@ -2348,187 +2348,296 @@ static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
 #endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
 
 /**
- * This function will read entry from a striped directory, bascially, it will
- * read entries from all of stripes, and choose one closest to the required
- * offset(&op_data->op_hash_offset). A few notes
- * 1. skip . and .. for non-zero stripes, because there can only have one .
- * and .. in a directory.
- * 2. op_data will be shared by all of stripes, instead of allocating new
- * one, so need to restore before reusing.
- * 3. release the entry page if that is not being chosen.
+ * Get current minimum entry from striped directory
  *
- * param[in]exp                obd export refer to LMV
- * param[in]op_data    hold those MD parameters of read_entry.
- * param[in]cb_op      ldlm callback being used in enqueue in mdc_read_entry
- * param[out]ldp       the entry being read.
- * param[out]ppage     the page holding the entry, note: because the entry
- *                      will be accessed in upper layer, so we need hold the
- *                      page until the usages of entry is finished, see
- *                      ll_dir_entry_next.
+ * This function will search the dir entry, whose hash value is the
+ * closest(>=) to @hash_offset, from all of sub-stripes, and it is
+ * only being called for striped directory.
  *
- * retval              =0 if get entry successfully
- *                      <0 can not get entry.
+ * \param[in] exp              export of LMV
+ * \param[in] op_data          parameters transferred beween client MD stack
+ *                              stripe_information will be included in this
+ *                              parameter
+ * \param[in] cb_op            ldlm callback being used in enqueue in
+ *                              mdc_read_page
+ * \param[in] hash_offset      the hash value, which is used to locate
+ *                              minum(closet) dir entry
+ * \param[in|out] stripe_offset the caller use this to indicate the stripe
+ *                              index of last entry, so to avoid hash conflict
+ *                              between stripes. It will also be used to
+ *                              return the stripe index of current dir entry.
+ * \param[in|out] entp         the minum entry and it also is being used
+ *                              to input the last dir entry to resolve the
+ *                              hash conflict
+ *
+ * \param[out] ppage           the page which holds the minum entry
+ *
+ * \retval                      = 0 get the entry successfully
+ *                              negative errno (< 0) does not get the entry
  */
-#define NORMAL_MAX_STRIPES 4
-static int lmv_read_striped_entry(struct obd_export *exp,
-                                 struct md_op_data *op_data,
-                                 struct md_callback *cb_op,
-                                 struct lu_dirent **ldp,
-                                 struct page **ppage)
+static int lmv_get_min_striped_entry(struct obd_export *exp,
+                                    struct md_op_data *op_data,
+                                    struct md_callback *cb_op,
+                                    __u64 hash_offset, int *stripe_offset,
+                                    struct lu_dirent **entp,
+                                    struct page **ppage)
 {
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
        struct lmv_stripe_md    *lsm = op_data->op_mea1;
        struct lmv_tgt_desc     *tgt;
-       struct lu_dirent        *tmp_ents[NORMAL_MAX_STRIPES];
-       struct lu_dirent        **ents = NULL;
-       struct lu_fid           master_fid = op_data->op_fid1;
-       void                    *master_data = op_data->op_data;
-       __u64                   last_idx = op_data->op_stripe_offset;
-       __u64                   hash_offset = op_data->op_hash_offset;
-       __u32                   same_hash_offset = op_data->op_same_hash_offset;
-       __u32                   cli_flags = op_data->op_cli_flags;
        int                     stripe_count;
-       __u64                   min_hash;
-       int                     min_same_hash_offset = 0;
-       int                     min_idx = 0;
+       struct lu_dirent        *min_ent = NULL;
        struct page             *min_page = NULL;
+       int                     min_idx = 0;
        int                     i;
-       int                     rc;
+       int                     rc = 0;
        ENTRY;
 
-       LASSERT(lsm != NULL);
-
-       rc = lmv_check_connect(obd);
-       if (rc)
-               RETURN(rc);
-
-       /* . and .. will be stored on the master object, so we need iterate
-        * the master object as well */
        stripe_count = lsm->lsm_md_stripe_count;
-       if (stripe_count > NORMAL_MAX_STRIPES) {
-               OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count);
-               if (ents == NULL)
-                       GOTO(out, rc = -ENOMEM);
-       } else {
-               ents = tmp_ents;
-               memset(ents, 0, sizeof(ents[0]) * stripe_count);
-       }
-
-       min_hash = MDS_DIR_END_OFF;
        for (i = 0; i < stripe_count; i++) {
-               struct page *page = NULL;
+               struct lu_dirent        *ent = NULL;
+               struct page             *page = NULL;
+               struct lu_dirpage       *dp;
+               __u64                   stripe_hash = hash_offset;
 
                tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
                if (IS_ERR(tgt))
                        GOTO(out, rc = PTR_ERR(tgt));
 
-               if (last_idx != i)
-                       op_data->op_same_hash_offset = 0;
-               else
-                       op_data->op_same_hash_offset = same_hash_offset;
-
                /* op_data will be shared by each stripe, so we need
                 * reset these value for each stripe */
                op_data->op_stripe_offset = i;
-               op_data->op_hash_offset = hash_offset;
-               op_data->op_cli_flags = cli_flags;
                op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
                op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
                op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
-
 next:
-               rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i],
-                                  &page);
+               rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash,
+                                 &page);
                if (rc != 0)
                        GOTO(out, rc);
 
-               if (ents[i] != NULL &&
-                   (strncmp(ents[i]->lde_name, ".",
-                            le16_to_cpu(ents[i]->lde_namelen)) == 0 ||
-                   strncmp(ents[i]->lde_name, "..",
-                           le16_to_cpu(ents[i]->lde_namelen)) == 0)) {
-                       if (i == 0) {
-                               /* replace . with master FID */
-                               if (le16_to_cpu(ents[i]->lde_namelen) == 1)
-                                       fid_cpu_to_le(&ents[i]->lde_fid,
-                                                     &master_fid);
-                               else
-                                       fid_cpu_to_le(&ents[i]->lde_fid,
-                                                     &op_data->op_fid3);
-                       } else {
-                               /* skip . and .. for other stripes */
-                               op_data->op_cli_flags |= CLI_NEXT_ENTRY;
-                               op_data->op_hash_offset =
-                                       le64_to_cpu(ents[i]->lde_hash);
-                               kunmap(page);
-                               page_cache_release(page);
+               dp = page_address(page);
+               for (ent = lu_dirent_start(dp); ent != NULL;
+                    ent = lu_dirent_next(ent)) {
+                       /* Skip dummy entry */
+                       if (le16_to_cpu(ent->lde_namelen) == 0)
+                               continue;
+
+                       if (le64_to_cpu(ent->lde_hash) < hash_offset)
+                               continue;
+
+                       if (le64_to_cpu(ent->lde_hash) == hash_offset &&
+                           (*entp == ent || i < *stripe_offset))
+                               continue;
+
+                       /* skip . and .. for other stripes */
+                       if (i != 0 &&
+                           (strncmp(ent->lde_name, ".",
+                                    le16_to_cpu(ent->lde_namelen)) == 0 ||
+                            strncmp(ent->lde_name, "..",
+                                    le16_to_cpu(ent->lde_namelen)) == 0))
+                               continue;
+                       break;
+               }
+
+               if (ent == NULL) {
+                       stripe_hash = le64_to_cpu(dp->ldp_hash_end);
+
+                       kunmap(page);
+                       page_cache_release(page);
+                       page = NULL;
+
+                       /* reach the end of current stripe, go to next stripe */
+                       if (stripe_hash == MDS_DIR_END_OFF)
+                               continue;
+                       else
                                goto next;
-                       }
                }
 
-               if (ents[i] != NULL) {
-                       /* If the hash value of read_entry is equal to the
-                        * current min_hash, which is very rare and only
-                        * happens if two entries have the same hash value
-                        * but on different stripes, in this case, we need
-                        * make sure these entries are being reading forward,
-                        * not backward, i.e. only reset the min_entry, if
-                        * current stripe is ahead of last entry. Note: if
-                        * there are hash conflict inside the entry, MDC
-                        * (see mdc_read_entry) will resolve them. */
-                       if (le64_to_cpu(ents[i]->lde_hash) < min_hash ||
-                           (le64_to_cpu(ents[i]->lde_hash) == min_hash &&
-                            i >= last_idx)) {
-                               if (min_page != NULL) {
-                                       kunmap(min_page);
-                                       page_cache_release(min_page);
-                               }
-                               min_page = page;
-                               min_hash = le64_to_cpu(ents[i]->lde_hash);
-                               min_same_hash_offset =
-                                          op_data->op_same_hash_offset;
+               if (min_ent != NULL) {
+                       if (le64_to_cpu(min_ent->lde_hash) >
+                           le64_to_cpu(ent->lde_hash)) {
+                               min_ent = ent;
+                               kunmap(min_page);
+                               page_cache_release(min_page);
                                min_idx = i;
+                               min_page = page;
                        } else {
                                kunmap(page);
                                page_cache_release(page);
+                               page = NULL;
                        }
+               } else {
+                       min_ent = ent;
+                       min_page = page;
+                       min_idx = i;
                }
        }
 
-       if (min_hash != MDS_DIR_END_OFF) {
-               *ldp = ents[min_idx];
-               op_data->op_stripe_offset = min_idx;
-               op_data->op_same_hash_offset = min_same_hash_offset;
-               *ppage = min_page;
-       } else {
-               *ldp = NULL;
-               *ppage = NULL;
+out:
+       if (*ppage != NULL) {
+               kunmap(*ppage);
+               page_cache_release(*ppage);
        }
+       *stripe_offset = min_idx;
+       *entp = min_ent;
+       *ppage = min_page;
+       RETURN(rc);
+}
+
+/**
+ * Build dir entry page from a striped directory
+ *
+ * This function gets one entry by @offset from a striped directory. It will
+ * read entries from all of stripes, and choose one closest to the required
+ * offset(&offset). A few notes
+ * 1. skip . and .. for non-zero stripes, because there can only have one .
+ * and .. in a directory.
+ * 2. op_data will be shared by all of stripes, instead of allocating new
+ * one, so need to restore before reusing.
+ * 3. release the entry page if that is not being chosen.
+ *
+ * \param[in] exp      obd export refer to LMV
+ * \param[in] op_data  hold those MD parameters of read_entry
+ * \param[in] cb_op    ldlm callback being used in enqueue in mdc_read_entry
+ * \param[out] ldp     the entry being read
+ * \param[out] ppage   the page holding the entry. Note: because the entry
+ *                      will be accessed in upper layer, so we need hold the
+ *                      page until the usages of entry is finished, see
+ *                      ll_dir_entry_next.
+ *
+ * retval              =0 if get entry successfully
+ *                      <0 cannot get entry
+ */
+static int lmv_read_striped_page(struct obd_export *exp,
+                                struct md_op_data *op_data,
+                                struct md_callback *cb_op,
+                                __u64 offset, struct page **ppage)
+{
+       struct obd_device       *obd = exp->exp_obd;
+       struct lu_fid           master_fid = op_data->op_fid1;
+       struct inode            *master_inode = op_data->op_data;
+       __u64                   hash_offset = offset;
+       struct lu_dirpage       *dp;
+       struct page             *min_ent_page = NULL;
+       struct page             *ent_page = NULL;
+       struct lu_dirent        *ent;
+       void                    *area;
+       int                     ent_idx = 0;
+       struct lu_dirent        *min_ent = NULL;
+       struct lu_dirent        *last_ent;
+       int                     left_bytes;
+       int                     rc;
+       ENTRY;
+
+       rc = lmv_check_connect(obd);
+       if (rc)
+               RETURN(rc);
+
+       /* Allocate a page and read entries from all of stripes and fill
+        * the page by hash order */
+       ent_page = alloc_page(GFP_KERNEL);
+       if (ent_page == NULL)
+               RETURN(-ENOMEM);
+
+       /* Initialize the entry page */
+       dp = kmap(ent_page);
+       memset(dp, 0, sizeof(*dp));
+       dp->ldp_hash_start = cpu_to_le64(offset);
+       dp->ldp_flags |= LDF_COLLIDE;
+
+       area = dp + 1;
+       left_bytes = PAGE_CACHE_SIZE - sizeof(*dp);
+       ent = area;
+       last_ent = ent;
+       do {
+               __u16   ent_size;
+
+               /* Find the minum entry from all sub-stripes */
+               rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset,
+                                              &ent_idx, &min_ent,
+                                              &min_ent_page);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               /* If it can not get minum entry, it means it already reaches
+                * the end of this directory */
+               if (min_ent == NULL) {
+                       last_ent->lde_reclen = 0;
+                       hash_offset = MDS_DIR_END_OFF;
+                       GOTO(out, rc);
+               }
+
+               ent_size = le16_to_cpu(min_ent->lde_reclen);
+
+               /* the last entry lde_reclen is 0, but it might not
+                * the end of this entry of this temporay entry */
+               if (ent_size == 0)
+                       ent_size = lu_dirent_calc_size(
+                                       le16_to_cpu(min_ent->lde_namelen),
+                                       le32_to_cpu(min_ent->lde_attrs));
+               if (ent_size > left_bytes) {
+                       last_ent->lde_reclen = cpu_to_le16(0);
+                       hash_offset = le64_to_cpu(min_ent->lde_hash);
+                       GOTO(out, rc);
+               }
+
+               memcpy(ent, min_ent, ent_size);
+
+               /* Replace . with master FID and Replace .. with the parent FID
+                * of master object */
+               if (strncmp(ent->lde_name, ".",
+                           le16_to_cpu(ent->lde_namelen)) == 0 &&
+                   le16_to_cpu(ent->lde_namelen) == 1)
+                       fid_cpu_to_le(&ent->lde_fid, &master_fid);
+               else if (strncmp(ent->lde_name, "..",
+                                  le16_to_cpu(ent->lde_namelen)) == 0 &&
+                          le16_to_cpu(ent->lde_namelen) == 2)
+                       fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
+
+               left_bytes -= ent_size;
+               ent->lde_reclen = cpu_to_le16(ent_size);
+               last_ent = ent;
+               ent = (void *)ent + ent_size;
+               hash_offset = le64_to_cpu(min_ent->lde_hash);
+               if (hash_offset == MDS_DIR_END_OFF) {
+                       last_ent->lde_reclen = 0;
+                       break;
+               }
+       } while (1);
 out:
+       if (min_ent_page != NULL) {
+               kunmap(min_ent_page);
+               page_cache_release(min_ent_page);
+       }
+
+       if (unlikely(rc != 0)) {
+               __free_page(ent_page);
+               ent_page = NULL;
+       } else {
+               if (ent == area)
+                       dp->ldp_flags |= LDF_EMPTY;
+               dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+               dp->ldp_hash_end = cpu_to_le64(hash_offset);
+       }
+
        /* We do not want to allocate md_op_data during each
         * dir entry reading, so op_data will be shared by every stripe,
         * then we need to restore it back to original value before
         * return to the upper layer */
-       op_data->op_hash_offset = hash_offset;
        op_data->op_fid1 = master_fid;
        op_data->op_fid2 = master_fid;
-       op_data->op_data = master_data;
-       op_data->op_cli_flags = cli_flags;
-       if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL)
-               OBD_FREE(ents, sizeof(ents[0]) * stripe_count);
+       op_data->op_data = master_inode;
 
-       if (rc != 0 && min_page != NULL) {
-               kunmap(min_page);
-               page_cache_release(min_page);
-       }
+       *ppage = ent_page;
 
        RETURN(rc);
 }
 
-int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
-                  struct md_callback *cb_op, struct lu_dirent **ldp,
-                  struct page **ppage)
+int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
+                 struct md_callback *cb_op, __u64 offset,
+                 struct page **ppage)
 {
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
@@ -2542,8 +2651,7 @@ int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
                RETURN(rc);
 
        if (unlikely(lsm != NULL)) {
-               rc = lmv_read_striped_entry(exp, op_data, cb_op,
-                                           ldp, ppage);
+               rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage);
                RETURN(rc);
        }
 
@@ -2551,8 +2659,8 @@ int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
 
-       rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, ldp,
-                          ppage);
+       rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage);
+
        RETURN(rc);
 }
 
@@ -3474,7 +3582,7 @@ struct md_ops lmv_md_ops = {
         .m_setattr              = lmv_setattr,
         .m_setxattr             = lmv_setxattr,
        .m_fsync                = lmv_fsync,
-       .m_read_entry           = lmv_read_entry,
+       .m_read_page            = lmv_read_page,
         .m_unlink               = lmv_unlink,
         .m_init_ea_size         = lmv_init_ea_size,
         .m_cancel_unused        = lmv_cancel_unused,
index 37f9032..f9e2b09 100644 (file)
@@ -1221,8 +1221,8 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
                                /*
                                 * upon hash collision, remove this page,
                                 * otherwise put page reference, and
-                                * ll_get_dir_page() will issue RPC to fetch
-                                * the page we want.
+                                * mdc_read_page_remote() will issue RPC to
+                                * fetch the page we want.
                                 */
                                kunmap(page);
                                mdc_release_page(page,
@@ -1380,15 +1380,8 @@ static int mdc_read_page_remote(void *data, struct page *page0)
        ENTRY;
 
        LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES);
-       if (op_data->op_mea1 != NULL) {
-               __u32 index = op_data->op_stripe_offset;
-
-               inode = op_data->op_mea1->lsm_md_oinfo[index].lmo_root;
-               fid = &op_data->op_mea1->lsm_md_oinfo[index].lmo_fid;
-       } else {
-               inode = op_data->op_data;
-               fid = &op_data->op_fid1;
-       }
+       inode = op_data->op_data;
+       fid = &op_data->op_fid1;
        LASSERT(inode != NULL);
 
        OBD_ALLOC(page_pool, sizeof(page_pool[0]) * max_pages);
@@ -1468,9 +1461,21 @@ static int mdc_read_page_remote(void *data, struct page *page0)
 /**
  * Read dir page from cache first, if it can not find it, read it from
  * server and add into the cache.
+ *
+ * \param[in] exp      MDC export
+ * \param[in] op_data  client MD stack parameters, transfering parameters
+ *                      between different layers on client MD stack.
+ * \param[in] cb_op    callback required for ldlm lock enqueue during
+ *                      read page
+ * \param[in] hash_offset the hash offset of the page to be read
+ * \param[in] ppage    the page to be read
+ *
+ * retval              = 0 get the page successfully
+ *                      errno(<0) get the page failed
  */
 static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
-                        struct md_callback *cb_op, struct page **ppage)
+                        struct md_callback *cb_op, __u64 hash_offset,
+                        struct page **ppage)
 {
        struct lookup_intent    it = { .it_op = IT_READDIR };
        struct page             *page;
@@ -1505,7 +1510,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
        rc = 0;
        mdc_set_lock_data(exp, &it.d.lustre.it_lock_handle, dir, NULL);
 
-       rp_param.rp_off = op_data->op_hash_offset;
+       rp_param.rp_off = hash_offset;
        rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64;
        page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end,
                               rp_param.rp_hash64);
@@ -1567,11 +1572,11 @@ hash_collision:
        if (BITS_PER_LONG == 32 && rp_param.rp_hash64) {
                start = le64_to_cpu(dp->ldp_hash_start) >> 32;
                end   = le64_to_cpu(dp->ldp_hash_end) >> 32;
-               rp_param.rp_off = op_data->op_hash_offset >> 32;
+               rp_param.rp_off = hash_offset >> 32;
        } else {
                start = le64_to_cpu(dp->ldp_hash_start);
                end   = le64_to_cpu(dp->ldp_hash_end);
-               rp_param.rp_off = op_data->op_hash_offset;
+               rp_param.rp_off = hash_offset;
        }
        if (end == start) {
                LASSERT(start == rp_param.rp_off);
@@ -1579,7 +1584,7 @@ hash_collision:
 #if BITS_PER_LONG == 32
                CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with "
                      "hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start),
-                     le64_to_cpu(dp->ldp_hash_end), op_data->op_hash_offset);
+                     le64_to_cpu(dp->ldp_hash_end), hash_offset);
 #endif
 
                /*
@@ -1602,102 +1607,6 @@ fail:
        goto out_unlock;
 }
 
-/**
- * Read one directory entry from the cache.
- */
-int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
-                  struct md_callback *cb_op, struct lu_dirent **entp,
-                  struct page **ppage)
-{
-       struct page             *page = NULL;
-       struct lu_dirpage       *dp;
-       struct lu_dirent        *ent;
-       int                     rc = 0;
-       __u32                   same_hash_count;
-       __u64                   hash_offset = op_data->op_hash_offset;
-       ENTRY;
-
-       CDEBUG(D_INFO, DFID " offset = "LPU64", flags %#x\n",
-              PFID(&op_data->op_fid1), op_data->op_hash_offset,
-              op_data->op_cli_flags);
-
-       *ppage = NULL;
-       *entp = NULL;
-
-       if (op_data->op_hash_offset == MDS_DIR_END_OFF)
-               RETURN(0);
-
-       rc = mdc_read_page(exp, op_data, cb_op, &page);
-       if (rc != 0)
-               RETURN(rc);
-
-       /* same_hash_count means how many entries with this
-        * hash value has been read */
-       same_hash_count = op_data->op_same_hash_offset + 1;
-       dp = page_address(page);
-       for (ent = lu_dirent_start(dp); ent != NULL;
-            ent = lu_dirent_next(ent)) {
-               /* Skip dummy entry */
-               if (le16_to_cpu(ent->lde_namelen) == 0)
-                       continue;
-
-               if (le64_to_cpu(ent->lde_hash) <
-                               op_data->op_hash_offset)
-                       continue;
-
-               if (unlikely(le64_to_cpu(ent->lde_hash) ==
-                               op_data->op_hash_offset)) {
-                       /* If it is not for next entry, which usually from
-                        * ll_dir_entry_start, return this entry. */
-                       if (!(op_data->op_cli_flags & CLI_NEXT_ENTRY))
-                               break;
-
-                       /* Keep reading until all of entries being read are
-                        * skipped. */
-                       if (same_hash_count > 0) {
-                               same_hash_count--;
-                               continue;
-                       }
-               }
-               break;
-       }
-
-       /* If it can not find entry in current page, try next page. */
-       if (ent == NULL) {
-               if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) {
-                       op_data->op_same_hash_offset = 0;
-                       mdc_release_page(page,
-                                le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
-                       RETURN(0);
-               }
-
-               op_data->op_hash_offset = le64_to_cpu(dp->ldp_hash_end);
-               mdc_release_page(page,
-                                le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
-               rc = mdc_read_page(exp, op_data, cb_op, &page);
-               if (rc != 0)
-                       RETURN(rc);
-
-               if (page != NULL) {
-                       dp = page_address(page);
-                       ent = lu_dirent_start(dp);
-               }
-       }
-
-       /* If the next hash is the same as the current hash, increase
-        * the op_same_hash_offset to resolve the same hash conflict */
-       if (ent != NULL && op_data->op_cli_flags & CLI_NEXT_ENTRY) {
-               if (unlikely(le64_to_cpu(ent->lde_hash) == hash_offset))
-                       op_data->op_same_hash_offset++;
-               else
-                       op_data->op_same_hash_offset = 0;
-       }
-
-       *ppage = page;
-       *entp = ent;
-       RETURN(rc);
-}
-
 #else /* __KERNEL__ */
 
 static struct page
@@ -1725,7 +1634,7 @@ static struct page
 
 
 static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
-                       struct md_callback *cb_op,
+                       struct md_callback *cb_op, __u64 hash_offset,
                        struct page **ppage)
 {
        struct page *page;
@@ -1734,7 +1643,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
 
        /* No local cache for liblustre, always read entry remotely */
        lmo = &op_data->op_mea1->lsm_md_oinfo[op_data->op_stripe_offset];
-       page = mdc_read_page_remote(exp, lmo, op_data->op_hash_offset,
+       page = mdc_read_page_remote(exp, lmo, hash_offset,
                                    op_data->op_capa1);
        if (IS_ERR(page))
                return PTR_ERR(page);
@@ -1744,35 +1653,6 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
        return rc;
 }
 
-int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
-                  struct md_callback *cb_op, struct lu_dirent **entp,
-                  struct page **ppage)
-{
-       struct page             *page = NULL;
-       struct lu_dirpage       *dp;
-       struct lu_dirent        *ent;
-       int                     rc;
-       ENTRY;
-
-       rc = mdc_read_page(exp, op_data, cb_op, &page);
-       if (rc != 0)
-               RETURN(rc);
-
-       dp = page_address(page);
-       if (le64_to_cpu(dp->ldp_hash_end) < op_data->op_hash_offset)
-               GOTO(out, *entp = NULL);
-
-       for (ent = lu_dirent_start(dp); ent != NULL;
-            ent = lu_dirent_next(ent))
-               if (le64_to_cpu(ent->lde_hash) >= op_data->op_hash_offset)
-                       break;
-       *entp = ent;
-out:
-
-       OBD_PAGE_FREE(page);
-       RETURN(rc);
-}
-
 #endif
 
 static int mdc_statfs(const struct lu_env *env,
@@ -3318,7 +3198,7 @@ struct md_ops mdc_md_ops = {
         .m_setxattr         = mdc_setxattr,
         .m_getxattr         = mdc_getxattr,
        .m_fsync                = mdc_fsync,
-       .m_read_entry           = mdc_read_entry,
+       .m_read_page            = mdc_read_page,
         .m_unlink           = mdc_unlink,
         .m_cancel_unused    = mdc_cancel_unused,
         .m_init_ea_size     = mdc_init_ea_size,
index 3be5dfd..25f66b1 100644 (file)
@@ -2136,7 +2136,7 @@ void lprocfs_init_mps_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_MD_OP_INIT(num_private_stats, stats, rename);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, setattr);
        LPROCFS_MD_OP_INIT(num_private_stats, stats, fsync);
-       LPROCFS_MD_OP_INIT(num_private_stats, stats, read_entry);
+       LPROCFS_MD_OP_INIT(num_private_stats, stats, read_page);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, unlink);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, setxattr);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, getxattr);