Whamcloud - gitweb
LU-4603 lmv: a few fixes about readdir of striped dir. 91/9191/29
authorwang di <di.wang@intel.com>
Mon, 31 Mar 2014 14:45:46 +0000 (07:45 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 21 May 2014 16:59:44 +0000 (16:59 +0000)
The offset of dir should be set to EOF as soon as
ll_dir_read() has nothing to read. Otherwise, "ls"
on NFS client will return -ELOOP.

And ll_dir_entry_start() should be able to start reading
from the beginning of the offset. Otherwise, some files
might be missing when "ls" on NFS client.

Skip . and .. for slave stripe.

If user buffer is full in ll_dir_read, remember
the last unsuccessful entry in op_hash_offset, so in next
read, it will locate entry by >= hash_offset,  instead
of >.

It will use op_same_hash_offset to remember entry offset among
the same hash entries, so mdc_read_entry will tell which entry
the caller needs.

Signed-off-by: Di Wang <di.wang@intel.com>
Signed-off-by: Li Xi <lixi@ddn.com>
Change-Id: I93f94836ce59f523794581301f3bc3c491c956a5
Reviewed-on: http://review.whamcloud.com/9191
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Li Xi <pkuelelixi@gmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lclient.h
lustre/include/obd.h
lustre/llite/dir.c
lustre/llite/llite_internal.h
lustre/llite/llite_nfs.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_request.c
lustre/tests/sanity.sh

index e483798..4f42eea 100644 (file)
@@ -481,6 +481,7 @@ enum op_cli_flags {
        CLI_HASH64      = 1 << 2,
        CLI_API32       = 1 << 3,
        CLI_MIGRATE     = 1 << 4,
        CLI_HASH64      = 1 << 2,
        CLI_API32       = 1 << 3,
        CLI_MIGRATE     = 1 << 4,
+       CLI_NEXT_ENTRY  = 1 << 5,
 };
 
 #endif /*LCLIENT_H */
 };
 
 #endif /*LCLIENT_H */
index 3abcc4d..94f9d8c 100644 (file)
@@ -984,6 +984,10 @@ struct md_op_data {
        /* Used by readdir */
        __u64                   op_hash_offset;
 
        /* Used by readdir */
        __u64                   op_hash_offset;
 
+       /* The offset of dir entry among the same hash entries, which
+        * is used to resolve the hash conflict. */
+       __u32                   op_same_hash_offset;
+
        /* Used by readdir */
        __u32                   op_npages;
 
        /* Used by readdir */
        __u32                   op_npages;
 
index 04c1da6..22c4a42 100644 (file)
@@ -162,16 +162,18 @@ struct lu_dirent *ll_dir_entry_start(struct inode *dir,
                                     struct md_op_data *op_data,
                                     struct page **ppage)
 {
                                     struct md_op_data *op_data,
                                     struct page **ppage)
 {
-       struct lu_dirent *entry;
+       struct lu_dirent *entry = NULL;
        struct md_callback cb_op;
        int rc;
        struct md_callback cb_op;
        int rc;
+       ENTRY;
 
        LASSERT(*ppage == NULL);
        cb_op.md_blocking_ast = ll_md_blocking_ast;
 
        LASSERT(*ppage == NULL);
        cb_op.md_blocking_ast = ll_md_blocking_ast;
+       op_data->op_cli_flags &= ~CLI_NEXT_ENTRY;
        rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage);
        if (rc != 0)
                entry = ERR_PTR(rc);
        rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage);
        if (rc != 0)
                entry = ERR_PTR(rc);
-       return entry;
+       RETURN(entry);
 }
 
 struct lu_dirent *ll_dir_entry_next(struct inode *dir,
 }
 
 struct lu_dirent *ll_dir_entry_next(struct inode *dir,
@@ -179,20 +181,25 @@ struct lu_dirent *ll_dir_entry_next(struct inode *dir,
                                    struct lu_dirent *ent,
                                    struct page **ppage)
 {
                                    struct lu_dirent *ent,
                                    struct page **ppage)
 {
-       struct lu_dirent *entry;
+       struct lu_dirent *entry = NULL;
        struct md_callback cb_op;
        int rc;
        struct md_callback cb_op;
        int rc;
+       ENTRY;
 
 
-       LASSERT(*ppage != NULL);
-       cb_op.md_blocking_ast = ll_md_blocking_ast;
        op_data->op_hash_offset = le64_to_cpu(ent->lde_hash);
        op_data->op_hash_offset = le64_to_cpu(ent->lde_hash);
+
+       /* release last page */
+       LASSERT(*ppage != NULL);
        kunmap(*ppage);
        page_cache_release(*ppage);
        kunmap(*ppage);
        page_cache_release(*ppage);
-       *ppage = NULL;
+
+       cb_op.md_blocking_ast = ll_md_blocking_ast;
+       op_data->op_cli_flags |= CLI_NEXT_ENTRY;
        rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage);
        if (rc != 0)
                entry = ERR_PTR(rc);
        rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage);
        if (rc != 0)
                entry = ERR_PTR(rc);
-       return entry;
+
+       RETURN(entry);
 }
 
 #ifdef HAVE_DIR_CONTEXT
 }
 
 #ifdef HAVE_DIR_CONTEXT
@@ -212,7 +219,6 @@ int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
        int                     done = 0;
        int                     rc = 0;
        __u64                   hash = MDS_DIR_END_OFF;
        int                     done = 0;
        int                     rc = 0;
        __u64                   hash = MDS_DIR_END_OFF;
-       __u64                   last_hash = MDS_DIR_END_OFF;
        struct page             *page = NULL;
        ENTRY;
 
        struct page             *page = NULL;
        ENTRY;
 
@@ -260,15 +266,15 @@ int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
 #endif
                if (done) {
                        if (op_data->op_hash_offset != MDS_DIR_END_OFF)
 #endif
                if (done) {
                        if (op_data->op_hash_offset != MDS_DIR_END_OFF)
-                               op_data->op_hash_offset = last_hash;
+                               op_data->op_hash_offset = hash;
                        break;
                        break;
-               } else {
-                       last_hash = hash;
                }
        }
 
        if (IS_ERR(ent))
                rc = PTR_ERR(ent);
                }
        }
 
        if (IS_ERR(ent))
                rc = PTR_ERR(ent);
+       else if (ent == NULL)
+               op_data->op_hash_offset = MDS_DIR_END_OFF;
 
        if (page != NULL) {
                kunmap(page);
 
        if (page != NULL) {
                kunmap(page);
@@ -315,6 +321,27 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
        if (IS_ERR(op_data))
                GOTO(out, rc = PTR_ERR(op_data));
 
        if (IS_ERR(op_data))
                GOTO(out, rc = PTR_ERR(op_data));
 
+       if (unlikely(op_data->op_mea1 != NULL)) {
+               /* This is only needed for striped dir to fill ..,
+                * see lmv_read_entry */
+               if (filp->f_dentry->d_parent != NULL &&
+                   filp->f_dentry->d_parent->d_inode != NULL) {
+                       __u64 ibits = MDS_INODELOCK_UPDATE;
+                       struct inode *parent =
+                               filp->f_dentry->d_parent->d_inode;
+
+                       if (ll_have_md_lock(parent, &ibits, LCK_MINMODE))
+                               op_data->op_fid3 = *ll_inode2fid(parent);
+               }
+
+               /* If it can not find in cache, do lookup .. on the master
+                * object */
+               if (fid_is_zero(&op_data->op_fid3)) {
+                       rc = ll_dir_get_parent_fid(inode, &op_data->op_fid3);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+       }
        op_data->op_hash_offset = pos;
        op_data->op_max_pages = sbi->ll_md_brw_pages;
 #ifdef HAVE_DIR_CONTEXT
        op_data->op_hash_offset = pos;
        op_data->op_max_pages = sbi->ll_md_brw_pages;
 #ifdef HAVE_DIR_CONTEXT
index dc3b644..cb6a254 100644 (file)
@@ -897,6 +897,7 @@ extern struct export_operations lustre_export_operations;
 __u32 get_uuid2int(const char *name, int len);
 struct inode *search_inode_for_lustre(struct super_block *sb,
                                      const struct lu_fid *fid);
 __u32 get_uuid2int(const char *name, int len);
 struct inode *search_inode_for_lustre(struct super_block *sb,
                                      const struct lu_fid *fid);
+int ll_dir_get_parent_fid(struct inode *dir, struct lu_fid *parent_fid);
 
 /* llite/symlink.c */
 extern struct inode_operations ll_fast_symlink_inode_operations;
 
 /* llite/symlink.c */
 extern struct inode_operations ll_fast_symlink_inode_operations;
index d942d90..3dd1183 100644 (file)
@@ -292,22 +292,20 @@ static struct dentry *ll_fh_to_parent(struct super_block *sb, struct fid *fid,
         RETURN(ll_iget_for_nfs(sb, &nfs_fid->lnf_parent, NULL));
 }
 
         RETURN(ll_iget_for_nfs(sb, &nfs_fid->lnf_parent, NULL));
 }
 
-static struct dentry *ll_get_parent(struct dentry *dchild)
+int ll_dir_get_parent_fid(struct inode *dir, struct lu_fid *parent_fid)
 {
 {
-        struct ptlrpc_request *req = NULL;
-        struct inode          *dir = dchild->d_inode;
-        struct ll_sb_info     *sbi;
-        struct dentry         *result = NULL;
-        struct mdt_body       *body;
-        static char           dotdot[] = "..";
-        struct md_op_data     *op_data;
-        int                   rc;
-       int                   lmmsize;
-        ENTRY;
+       struct ptlrpc_request   *req = NULL;
+       struct ll_sb_info       *sbi;
+       struct mdt_body         *body;
+       static const char       dotdot[] = "..";
+       struct md_op_data       *op_data;
+       int                     rc;
+       int                     lmmsize;
+       ENTRY;
 
 
-        LASSERT(dir && S_ISDIR(dir->i_mode));
+       LASSERT(dir && S_ISDIR(dir->i_mode));
 
 
-        sbi = ll_s2sbi(dir->i_sb);
+       sbi = ll_s2sbi(dir->i_sb);
 
        CDEBUG(D_INFO, "%s: getting parent for ("DFID")\n",
               ll_get_fsname(dir->i_sb, NULL, 0),
 
        CDEBUG(D_INFO, "%s: getting parent for ("DFID")\n",
               ll_get_fsname(dir->i_sb, NULL, 0),
@@ -315,23 +313,24 @@ static struct dentry *ll_get_parent(struct dentry *dchild)
 
        rc = ll_get_default_mdsize(sbi, &lmmsize);
        if (rc != 0)
 
        rc = ll_get_default_mdsize(sbi, &lmmsize);
        if (rc != 0)
-               RETURN(ERR_PTR(rc));
+               RETURN(rc);
 
        op_data = ll_prep_md_op_data(NULL, dir, NULL, dotdot,
                                     strlen(dotdot), lmmsize,
                                     LUSTRE_OPC_ANY, NULL);
        if (IS_ERR(op_data))
 
        op_data = ll_prep_md_op_data(NULL, dir, NULL, dotdot,
                                     strlen(dotdot), lmmsize,
                                     LUSTRE_OPC_ANY, NULL);
        if (IS_ERR(op_data))
-               RETURN((void *)op_data);
+               RETURN(PTR_ERR(op_data));
 
        rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
        ll_finish_md_op_data(op_data);
 
        rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
        ll_finish_md_op_data(op_data);
-       if (rc) {
+       if (rc != 0) {
                CERROR("%s: failure inode "DFID" get parent: rc = %d\n",
                       ll_get_fsname(dir->i_sb, NULL, 0),
                       PFID(ll_inode2fid(dir)), rc);
                CERROR("%s: failure inode "DFID" get parent: rc = %d\n",
                       ll_get_fsname(dir->i_sb, NULL, 0),
                       PFID(ll_inode2fid(dir)), rc);
-               RETURN(ERR_PTR(rc));
+               RETURN(rc);
        }
        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
        }
        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+
        /*
         * LU-3952: MDT may lost the FID of its parent, we should not crash
         * the NFS server, ll_iget_for_nfs() will handle the error.
        /*
         * LU-3952: MDT may lost the FID of its parent, we should not crash
         * the NFS server, ll_iget_for_nfs() will handle the error.
@@ -339,11 +338,27 @@ static struct dentry *ll_get_parent(struct dentry *dchild)
        if (body->valid & OBD_MD_FLID) {
                CDEBUG(D_INFO, "parent for "DFID" is "DFID"\n",
                       PFID(ll_inode2fid(dir)), PFID(&body->fid1));
        if (body->valid & OBD_MD_FLID) {
                CDEBUG(D_INFO, "parent for "DFID" is "DFID"\n",
                       PFID(ll_inode2fid(dir)), PFID(&body->fid1));
+               *parent_fid = body->fid1;
        }
        }
-       result = ll_iget_for_nfs(dir->i_sb, &body->fid1, NULL);
 
        ptlrpc_req_finished(req);
 
        ptlrpc_req_finished(req);
-       RETURN(result);
+       RETURN(0);
+}
+
+static struct dentry *ll_get_parent(struct dentry *dchild)
+{
+       struct lu_fid   parent_fid = { 0 };
+       int             rc;
+       struct dentry   *dentry;
+       ENTRY;
+
+       rc = ll_dir_get_parent_fid(dchild->d_inode, &parent_fid);
+       if (rc != 0)
+               RETURN(ERR_PTR(rc));
+
+       dentry = ll_iget_for_nfs(dchild->d_inode->i_sb, &parent_fid, NULL);
+
+       RETURN(dentry);
 }
 
 struct export_operations lustre_export_operations = {
 }
 
 struct export_operations lustre_export_operations = {
index c18a68b..8a7395f 100644 (file)
@@ -2331,33 +2331,65 @@ static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
 #endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
 
 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
 #endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
 
+/**
+ * This function will read entry from a striped directory, bascially, it will
+ * read entries from all of stripes, and choose one closest to the required
+ * offset(&op_data->op_hash_offset). A few notes
+ * 1. skip . and .. for non-zero stripes, because there can only have one .
+ * and .. in a directory.
+ * 2. op_data will be shared by all of stripes, instead of allocating new
+ * one, so need to restore before reusing.
+ * 3. release the entry page if that is not being chosen.
+ *
+ * param[in]exp                obd export refer to LMV
+ * param[in]op_data    hold those MD parameters of read_entry.
+ * param[in]cb_op      ldlm callback being used in enqueue in mdc_read_entry
+ * param[out]ldp       the entry being read.
+ * param[out]ppage     the page holding the entry, note: because the entry
+ *                      will be accessed in upper layer, so we need hold the
+ *                      page until the usages of entry is finished, see
+ *                      ll_dir_entry_next.
+ *
+ * retval              =0 if get entry successfully
+ *                      <0 can not get entry.
+ */
 #define NORMAL_MAX_STRIPES 4
 #define NORMAL_MAX_STRIPES 4
-int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
-                  struct md_callback *cb_op, struct lu_dirent **ldp,
-                  struct page **ppage)
+static int lmv_read_striped_entry(struct obd_export *exp,
+                                 struct md_op_data *op_data,
+                                 struct md_callback *cb_op,
+                                 struct lu_dirent **ldp,
+                                 struct page **ppage)
 {
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
        struct lmv_stripe_md    *lsm = op_data->op_mea1;
 {
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
        struct lmv_stripe_md    *lsm = op_data->op_mea1;
+       struct lmv_tgt_desc     *tgt;
        struct lu_dirent        *tmp_ents[NORMAL_MAX_STRIPES];
        struct lu_dirent        **ents = NULL;
        struct lu_dirent        *tmp_ents[NORMAL_MAX_STRIPES];
        struct lu_dirent        **ents = NULL;
+       struct lu_fid           master_fid = op_data->op_fid1;
+       void                    *master_data = op_data->op_data;
+       __u64                   last_idx = op_data->op_stripe_offset;
+       __u64                   hash_offset = op_data->op_hash_offset;
+       __u32                   same_hash_offset = op_data->op_same_hash_offset;
+       __u32                   cli_flags = op_data->op_cli_flags;
        int                     stripe_count;
        __u64                   min_hash;
        int                     stripe_count;
        __u64                   min_hash;
+       int                     min_same_hash_offset = 0;
        int                     min_idx = 0;
        struct page             *min_page = NULL;
        int                     i;
        int                     rc;
        ENTRY;
 
        int                     min_idx = 0;
        struct page             *min_page = NULL;
        int                     i;
        int                     rc;
        ENTRY;
 
+       LASSERT(lsm != NULL);
+
        rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
 
        rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
 
-       if (lsm == NULL)
-               stripe_count = 1;
-       else
-               stripe_count = lsm->lsm_md_stripe_count;
-
+       /* . and .. will be stored on the master object, so we need iterate
+        * the master object as well */
+       stripe_count = lsm->lsm_md_stripe_count;
        if (stripe_count > NORMAL_MAX_STRIPES) {
                OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count);
                if (ents == NULL)
        if (stripe_count > NORMAL_MAX_STRIPES) {
                OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count);
                if (ents == NULL)
@@ -2369,56 +2401,145 @@ int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
 
        min_hash = MDS_DIR_END_OFF;
        for (i = 0; i < stripe_count; i++) {
 
        min_hash = MDS_DIR_END_OFF;
        for (i = 0; i < stripe_count; i++) {
-               struct lmv_tgt_desc *tgt;
                struct page *page = NULL;
 
                struct page *page = NULL;
 
-               if (likely(lsm == NULL)) {
-                       tgt = lmv_find_target(lmv, &op_data->op_fid1);
-                       if (IS_ERR(tgt))
-                               GOTO(out, rc = PTR_ERR(tgt));
-                       LASSERT(op_data->op_data != NULL);
-               } else {
-                       tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds);
-                       if (IS_ERR(tgt))
-                               GOTO(out, rc = PTR_ERR(tgt));
-                       op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
-                       op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
-                       op_data->op_stripe_offset = i;
-               }
+               tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds);
+               if (IS_ERR(tgt))
+                       GOTO(out, rc = PTR_ERR(tgt));
 
 
+               if (last_idx != i)
+                       op_data->op_same_hash_offset = 0;
+               else
+                       op_data->op_same_hash_offset = same_hash_offset;
+
+               /* op_data will be shared by each stripe, so we need
+                * reset these value for each stripe */
+               op_data->op_stripe_offset = i;
+               op_data->op_hash_offset = hash_offset;
+               op_data->op_cli_flags = cli_flags;
+               op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
+               op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
+               op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
+
+next:
                rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i],
                                   &page);
                if (rc != 0)
                        GOTO(out, rc);
 
                if (ents[i] != NULL &&
                rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i],
                                   &page);
                if (rc != 0)
                        GOTO(out, rc);
 
                if (ents[i] != NULL &&
-                   le64_to_cpu(ents[i]->lde_hash) <= min_hash) {
-                       if (min_page != NULL)
-                               page_cache_release(min_page);
-                       min_page = page;
-                       min_hash = le64_to_cpu(ents[i]->lde_hash);
-                       min_idx = i;
+                   (strncmp(ents[i]->lde_name, ".",
+                            le16_to_cpu(ents[i]->lde_namelen)) == 0 ||
+                   strncmp(ents[i]->lde_name, "..",
+                           le16_to_cpu(ents[i]->lde_namelen)) == 0)) {
+                       if (i == 0) {
+                               /* replace . with master FID */
+                               if (le16_to_cpu(ents[i]->lde_namelen) == 1)
+                                       fid_cpu_to_le(&ents[i]->lde_fid,
+                                                     &master_fid);
+                               else
+                                       fid_cpu_to_le(&ents[i]->lde_fid,
+                                                     &op_data->op_fid3);
+                       } else {
+                               /* skip . and .. for other stripes */
+                               op_data->op_cli_flags |= CLI_NEXT_ENTRY;
+                               op_data->op_hash_offset =
+                                       le64_to_cpu(ents[i]->lde_hash);
+                               kunmap(page);
+                               page_cache_release(page);
+                               goto next;
+                       }
+               }
+
+               if (ents[i] != NULL) {
+                       /* If the hash value of read_entry is equal to the
+                        * current min_hash, which is very rare and only
+                        * happens if two entries have the same hash value
+                        * but on different stripes, in this case, we need
+                        * make sure these entries are being reading forward,
+                        * not backward, i.e. only reset the min_entry, if
+                        * current stripe is ahead of last entry. Note: if
+                        * there are hash conflict inside the entry, MDC
+                        * (see mdc_read_entry) will resolve them. */
+                       if (le64_to_cpu(ents[i]->lde_hash) < min_hash ||
+                           (le64_to_cpu(ents[i]->lde_hash) == min_hash &&
+                            i >= last_idx)) {
+                               if (min_page != NULL) {
+                                       kunmap(min_page);
+                                       page_cache_release(min_page);
+                               }
+                               min_page = page;
+                               min_hash = le64_to_cpu(ents[i]->lde_hash);
+                               min_same_hash_offset =
+                                          op_data->op_same_hash_offset;
+                               min_idx = i;
+                       } else {
+                               kunmap(page);
+                               page_cache_release(page);
+                       }
                }
        }
 
                }
        }
 
-       if (min_hash != MDS_DIR_END_OFF)
+       if (min_hash != MDS_DIR_END_OFF) {
                *ldp = ents[min_idx];
                *ldp = ents[min_idx];
-       else
+               op_data->op_stripe_offset = min_idx;
+               op_data->op_same_hash_offset = min_same_hash_offset;
+               *ppage = min_page;
+       } else {
                *ldp = NULL;
                *ldp = NULL;
+               *ppage = NULL;
+       }
 out:
 out:
+       /* We do not want to allocate md_op_data during each
+        * dir entry reading, so op_data will be shared by every stripe,
+        * then we need to restore it back to original value before
+        * return to the upper layer */
+       op_data->op_hash_offset = hash_offset;
+       op_data->op_fid1 = master_fid;
+       op_data->op_fid2 = master_fid;
+       op_data->op_data = master_data;
+       op_data->op_cli_flags = cli_flags;
        if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL)
                OBD_FREE(ents, sizeof(ents[0]) * stripe_count);
 
        if (rc != 0 && min_page != NULL) {
                kunmap(min_page);
                page_cache_release(min_page);
        if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL)
                OBD_FREE(ents, sizeof(ents[0]) * stripe_count);
 
        if (rc != 0 && min_page != NULL) {
                kunmap(min_page);
                page_cache_release(min_page);
-       } else {
-               *ppage = min_page;
        }
 
        RETURN(rc);
 }
 
        }
 
        RETURN(rc);
 }
 
+int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
+                  struct md_callback *cb_op, struct lu_dirent **ldp,
+                  struct page **ppage)
+{
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct lmv_stripe_md    *lsm = op_data->op_mea1;
+       struct lmv_tgt_desc     *tgt;
+       int                     rc;
+       ENTRY;
+
+       rc = lmv_check_connect(obd);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (unlikely(lsm != NULL)) {
+               rc = lmv_read_striped_entry(exp, op_data, cb_op,
+                                           ldp, ppage);
+               RETURN(rc);
+       }
+
+       tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
+       rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, ldp,
+                          ppage);
+       RETURN(rc);
+}
+
 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
                       struct ptlrpc_request **request)
 {
 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
                       struct ptlrpc_request **request)
 {
index 9dc00ce..aca1807 100644 (file)
@@ -1240,8 +1240,8 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
                                LASSERTF(*start <= *hash, "start = "LPX64
                                         ",end = "LPX64",hash = "LPX64"\n",
                                         *start, *end, *hash);
                                LASSERTF(*start <= *hash, "start = "LPX64
                                         ",end = "LPX64",hash = "LPX64"\n",
                                         *start, *end, *hash);
-                       CDEBUG(D_VFSTRACE, "page%lu [%llu %llu], hash"LPU64"\n",
-                              offset, *start, *end, *hash);
+                       CDEBUG(D_VFSTRACE, "offset %lx ["LPX64" "LPX64"],"
+                             " hash "LPX64"\n", offset, *start, *end, *hash);
                        if (*hash > *end) {
                                kunmap(page);
                                mdc_release_page(page, 0);
                        if (*hash > *end) {
                                kunmap(page);
                                mdc_release_page(page, 0);
@@ -1503,7 +1503,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
 {
        struct lookup_intent    it = { .it_op = IT_READDIR };
        struct page             *page;
 {
        struct lookup_intent    it = { .it_op = IT_READDIR };
        struct page             *page;
-       struct inode            *dir = NULL;
+       struct inode            *dir = op_data->op_data;
        struct address_space    *mapping;
        struct lu_dirpage       *dp;
        __u64                   start = 0;
        struct address_space    *mapping;
        struct lu_dirpage       *dp;
        __u64                   start = 0;
@@ -1517,15 +1517,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
 
        *ppage = NULL;
 
 
        *ppage = NULL;
 
-       if (op_data->op_mea1 != NULL) {
-               __u32 index = op_data->op_stripe_offset;
-
-               dir = op_data->op_mea1->lsm_md_oinfo[index].lmo_root;
-       } else {
-               dir = op_data->op_data;
-       }
        LASSERT(dir != NULL);
        LASSERT(dir != NULL);
-
        mapping = dir->i_mapping;
 
        rc = mdc_intent_lock(exp, op_data, &it, &enq_req,
        mapping = dir->i_mapping;
 
        rc = mdc_intent_lock(exp, op_data, &it, &enq_req,
@@ -1650,10 +1642,13 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
        struct lu_dirpage       *dp;
        struct lu_dirent        *ent;
        int                     rc = 0;
        struct lu_dirpage       *dp;
        struct lu_dirent        *ent;
        int                     rc = 0;
+       __u32                   same_hash_count;
+       __u64                   hash_offset = op_data->op_hash_offset;
        ENTRY;
 
        ENTRY;
 
-       CDEBUG(D_INFO, DFID "offset = "LPU64"\n", PFID(&op_data->op_fid1),
-              op_data->op_hash_offset);
+       CDEBUG(D_INFO, DFID " offset = "LPU64", flags %#x\n",
+              PFID(&op_data->op_fid1), op_data->op_hash_offset,
+              op_data->op_cli_flags);
 
        *ppage = NULL;
        *entp = NULL;
 
        *ppage = NULL;
        *entp = NULL;
@@ -1665,6 +1660,9 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
+       /* same_hash_count means how many entries with this
+        * hash value has been read */
+       same_hash_count = op_data->op_same_hash_offset + 1;
        dp = page_address(page);
        for (ent = lu_dirent_start(dp); ent != NULL;
             ent = lu_dirent_next(ent)) {
        dp = page_address(page);
        for (ent = lu_dirent_start(dp); ent != NULL;
             ent = lu_dirent_next(ent)) {
@@ -1672,16 +1670,33 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
                if (le16_to_cpu(ent->lde_namelen) == 0)
                        continue;
 
                if (le16_to_cpu(ent->lde_namelen) == 0)
                        continue;
 
-               if (le64_to_cpu(ent->lde_hash) > op_data->op_hash_offset)
-                       break;
+               if (le64_to_cpu(ent->lde_hash) <
+                               op_data->op_hash_offset)
+                       continue;
+
+               if (unlikely(le64_to_cpu(ent->lde_hash) ==
+                               op_data->op_hash_offset)) {
+                       /* If it is not for next entry, which usually from
+                        * ll_dir_entry_start, return this entry. */
+                       if (!(op_data->op_cli_flags & CLI_NEXT_ENTRY))
+                               break;
+
+                       /* Keep reading until all of entries being read are
+                        * skipped. */
+                       if (same_hash_count > 0) {
+                               same_hash_count--;
+                               continue;
+                       }
+               }
+               break;
        }
 
        /* If it can not find entry in current page, try next page. */
        if (ent == NULL) {
        }
 
        /* If it can not find entry in current page, try next page. */
        if (ent == NULL) {
-               __u64 orig_offset = op_data->op_hash_offset;
-
                if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) {
                if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) {
-                       mdc_release_page(page, 0);
+                       op_data->op_same_hash_offset = 0;
+                       mdc_release_page(page,
+                                le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
                        RETURN(0);
                }
 
                        RETURN(0);
                }
 
@@ -1696,13 +1711,19 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
                        dp = page_address(page);
                        ent = lu_dirent_start(dp);
                }
                        dp = page_address(page);
                        ent = lu_dirent_start(dp);
                }
+       }
 
 
-               op_data->op_hash_offset = orig_offset;
+       /* If the next hash is the same as the current hash, increase
+        * the op_same_hash_offset to resolve the same hash conflict */
+       if (ent != NULL && op_data->op_cli_flags & CLI_NEXT_ENTRY) {
+               if (unlikely(le64_to_cpu(ent->lde_hash) == hash_offset))
+                       op_data->op_same_hash_offset++;
+               else
+                       op_data->op_same_hash_offset = 0;
        }
 
        *ppage = page;
        *entp = ent;
        }
 
        *ppage = page;
        *entp = ent;
-
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
index 4af836c..8741269 100644 (file)
@@ -1199,16 +1199,71 @@ test_24A() { # LU-3182
        rm -rf $DIR/$tdir
        mkdir -p $DIR/$tdir
        createmany -m $DIR/$tdir/$tfile $NFILES
        rm -rf $DIR/$tdir
        mkdir -p $DIR/$tdir
        createmany -m $DIR/$tdir/$tfile $NFILES
-       local t=`ls $DIR/$tdir | wc -l`
-       local u=`ls $DIR/$tdir | sort -u | wc -l`
-       if [ $t -ne $NFILES -o $u -ne $NFILES ] ; then
-               error "Expected $NFILES files, got $t ($u unique)"
+       local t=$(ls $DIR/$tdir | wc -l)
+       local u=$(ls $DIR/$tdir | sort -u | wc -l)
+       local v=$(ls -ai $DIR/$tdir | sort -u | wc -l)
+       if [ $t -ne $NFILES -o $u -ne $NFILES -o $v -ne $((NFILES + 2)) ] ; then
+               error "Expected $NFILES files, got $t ($u unique $v .&..)"
        fi
 
        rm -rf $DIR/$tdir || error "Can not delete directories"
 }
 run_test 24A "readdir() returns correct number of entries."
 
        fi
 
        rm -rf $DIR/$tdir || error "Can not delete directories"
 }
 run_test 24A "readdir() returns correct number of entries."
 
+test_24B() { # LU-4805
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       local count
+
+       mkdir $DIR/$tdir
+       $LFS setdirstripe -i0 -c$MDSCOUNT $DIR/$tdir/striped_dir ||
+               error "create striped dir failed"
+
+       count=$(ls -ai $DIR/$tdir/striped_dir | wc -l)
+       [ $count -eq 2 ] || error "Expected 2, got $count"
+
+       touch $DIR/$tdir/striped_dir/a
+
+       count=$(ls -ai $DIR/$tdir/striped_dir | wc -l)
+       [ $count -eq 3 ] || error "Expected 3, got $count"
+
+       touch $DIR/$tdir/striped_dir/.f
+
+       count=$(ls -ai $DIR/$tdir/striped_dir | wc -l)
+       [ $count -eq 4 ] || error "Expected 4, got $count"
+
+       rm -rf $DIR/$tdir || error "Can not delete directories"
+}
+run_test 24B "readdir for striped dir return correct number of entries"
+
+test_24C() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+
+       mkdir $DIR/$tdir
+       mkdir $DIR/$tdir/d0
+       mkdir $DIR/$tdir/d1
+
+       $LFS setdirstripe -i0 -c$MDSCOUNT $DIR/$tdir/d0/striped_dir ||
+               error "create striped dir failed"
+
+       cd $DIR/$tdir/d0/striped_dir
+
+       local d0_ino=$(ls -i -l -a $DIR/$tdir | grep "d0" | awk '{print $1}')
+       local d1_ino=$(ls -i -l -a $DIR/$tdir | grep "d1" | awk '{print $1}')
+       local parent_ino=$(ls -i -l -a | grep "\.\." | awk '{print $1}')
+
+       [ "$d0_ino" = "$parent_ino" ] ||
+               error ".. wrong, expect $d0_ino, get $parent_ino"
+
+       mv $DIR/$tdir/d0/striped_dir $DIR/$tdir/d1/ ||
+               error "mv striped dir failed"
+
+       parent_ino=$(ls -i -l -a | grep "\.\." | awk '{print $1}')
+
+       [ "$d1_ino" = "$parent_ino" ] ||
+               error ".. wrong after mv, expect $d1_ino, get $parent_ino"
+}
+run_test 24C "check .. in striped dir"
+
 test_25a() {
        echo '== symlink sanity ============================================='
 
 test_25a() {
        echo '== symlink sanity ============================================='