Whamcloud - gitweb
LU-5 readdir read multiple pages per rpc
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
index cd4ab9d..6b6885e 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -2342,7 +2345,8 @@ static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
 }
 
 static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
-                        struct obd_capa *oc, __u64 offset64, struct page *page,
+                        struct obd_capa *oc, __u64 offset64,
+                        struct page **pages, unsigned npages,
                         struct ptlrpc_request **request)
 {
         struct obd_device       *obd = exp->exp_obd;
@@ -2358,6 +2362,11 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
         int                      tgt0_idx = 0;
         int                      rc;
         int                      nr = 0;
+        int                      i;
+        /* number of pages read, in CFS_PAGE_SIZE */
+        int                      nrdpgs;
+        /* number of pages transferred in LU_PAGE_SIZE */
+        int                      nlupgs;
         struct lmv_stripe       *los;
         struct lmv_tgt_desc     *tgt;
         struct lu_dirpage       *dp;
@@ -2435,34 +2444,99 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
         if (IS_ERR(tgt))
                 GOTO(cleanup, rc = PTR_ERR(tgt));
 
-        rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, page, request);
+        rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, pages, npages,
+                         request);
         if (rc)
                 GOTO(cleanup, rc);
-        if (obj) {
-                dp = cfs_kmap(page);
 
-                lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
-                lmv_hash_adjust(&dp->ldp_hash_end,   hash_adj);
-                LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64);
+        nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
+                 >> CFS_PAGE_SHIFT;
+        nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
+        LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
+        LASSERT(nrdpgs > 0 && nrdpgs <= npages);
 
-                for (ent = lu_dirent_start(dp); ent != NULL;
-                     ent = lu_dirent_next(ent))
-                        lmv_hash_adjust(&ent->lde_hash, hash_adj);
+        CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs, npages);
 
-                if (tgt0_idx != nr - 1) {
-                        __u64 end;
+        for (i = 0; i < nrdpgs; i++) {
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                struct lu_dirpage *first;
+                __u64 hash_end = 0;
+                __u32 flags = 0;
+#endif
+                struct lu_dirent *tmp = NULL;
+
+                dp = cfs_kmap(pages[i]);
+                if (obj) {
+                        lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
+                        lmv_hash_adjust(&dp->ldp_hash_end,   hash_adj);
+                        LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64);
 
-                        end = le64_to_cpu(dp->ldp_hash_end);
-                        if (end == MDS_DIR_END_OFF) {
+                        if ((tgt0_idx != nr - 1) &&
+                            (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF))
+                        {
                                 dp->ldp_hash_end = cpu_to_le32(seg_size *
                                                                (tgt0_idx + 1));
                                 CDEBUG(D_INODE,
                                        ""DFID" reset end "LPX64" tgt %d\n",
                                        PFID(&rid),
-                                       (__u64)le64_to_cpu(dp->ldp_hash_end), tgt_idx);
+                                       (__u64)le64_to_cpu(dp->ldp_hash_end),
+                                       tgt_idx);
                         }
                 }
-                cfs_kunmap(page);
+
+                ent = lu_dirent_start(dp);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                first = dp;
+                hash_end = dp->ldp_hash_end;
+repeat:
+#endif
+                nlupgs--;
+                for (tmp = ent; ent != NULL;
+                     tmp = ent, ent = lu_dirent_next(ent)) {
+                        if (obj)
+                                lmv_hash_adjust(&ent->lde_hash, hash_adj);
+                }
+
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+                dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+                if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
+                        ent = lu_dirent_start(dp);
+
+                        if (obj) {
+                                lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
+                                if ((tgt0_idx != nr - 1) &&
+                                    (le64_to_cpu(dp->ldp_hash_end) ==
+                                     MDS_DIR_END_OFF)) {
+                                        hash_end = cpu_to_le32(seg_size *
+                                                               (tgt0_idx + 1));
+                                        CDEBUG(D_INODE,
+                                            ""DFID" reset end "LPX64" tgt %d\n",
+                                            PFID(&rid),
+                                            (__u64)le64_to_cpu(hash_end),
+                                            tgt_idx);
+                                }
+                        }
+                        hash_end = dp->ldp_hash_end;
+                        flags = dp->ldp_flags;
+
+                        if (tmp) {
+                                /* enlarge the end entry lde_reclen from 0 to
+                                 * first entry of next lu_dirpage, in this way
+                                 * several lu_dirpages can be stored into one
+                                 * client page on client. */
+                                tmp = ((void *)tmp) +
+                                      le16_to_cpu(tmp->lde_reclen);
+                                tmp->lde_reclen =
+                                        cpu_to_le16((char *)(dp->ldp_entries) -
+                                                    (char *)tmp);
+                                goto repeat;
+                        }
+                }
+                first->ldp_hash_end = hash_end;
+                first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+                first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+#endif
+                cfs_kunmap(pages[i]);
         }
         EXIT;
 cleanup: