RETURN(rc);
}
+/*
+ * Adjust a set of pages, each page containing an array of lu_dirpages,
+ * so that each page can be used as a single logical lu_dirpage.
+ *
+ * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
+ * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
+ * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end
+ * value is used as a cookie to request the next lu_dirpage in a
+ * directory listing that spans multiple pages (two in this example):
+ * ________
+ * | |
+ * .|--------v------- -----.
+ * |s|e|f|p|ent|ent| ... |ent|
+ * '--|-------------- -----' Each CFS_PAGE contains a single
+ * '------. lu_dirpage.
+ * .---------v------- -----.
+ * |s|e|f|p|ent| 0 | ... | 0 |
+ * '----------------- -----'
+ *
+ * However, on hosts where the native VM page size (CFS_PAGE_SIZE) is
+ * larger than LU_PAGE_SIZE, a single host page may contain multiple
+ * lu_dirpages. After reading the lu_dirpages from the MDS, the
+ * ldp_hash_end of the first lu_dirpage refers to the one immediately
+ * after it in the same CFS_PAGE (arrows simplified for brevity, but
+ * in general e0==s1, e1==s2, etc.):
+ *
+ * .-------------------- -----.
+ * |s0|e0|f0|p|ent|ent| ... |ent|
+ * |---v---------------- -----|
+ * |s1|e1|f1|p|ent|ent| ... |ent|
+ * |---v---------------- -----| Here, each CFS_PAGE contains
+ * ... multiple lu_dirpages.
+ * |---v---------------- -----|
+ * |s'|e'|f'|p|ent|ent| ... |ent|
+ * '---|---------------- -----'
+ * v
+ * .----------------------------.
+ * | next CFS_PAGE |
+ *
+ * This structure is transformed into a single logical lu_dirpage as follows:
+ *
+ * - Replace e0 with e' so the request for the next lu_dirpage gets the page
+ * labeled 'next CFS_PAGE'.
+ *
+ * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
+ * a hash collision with the next page exists.
+ *
+ * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
+ * to the first entry of the next lu_dirpage.
+ */
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
+{
+ int i;
+
+ for (i = 0; i < ncfspgs; i++) {
+ struct lu_dirpage *dp = cfs_kmap(pages[i]);
+ struct lu_dirpage *first = dp;
+ struct lu_dirent *end_dirent = NULL;
+ struct lu_dirent *ent;
+ __u64 hash_end = dp->ldp_hash_end;
+ __u32 flags = dp->ldp_flags;
+
+ for (; nlupgs > 1; nlupgs--) {
+ ent = lu_dirent_start(dp);
+ for (end_dirent = ent; ent != NULL;
+ end_dirent = ent, ent = lu_dirent_next(ent));
+
+ /* Advance dp to next lu_dirpage. */
+ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+
+ /* Check if we've reached the end of the CFS_PAGE. */
+ if (!((unsigned long)dp & ~CFS_PAGE_MASK))
+ break;
+
+ /* Save the hash and flags of this lu_dirpage. */
+ hash_end = dp->ldp_hash_end;
+ flags = dp->ldp_flags;
+
+ /* Check if lu_dirpage contains no entries. */
+ if (!end_dirent)
+ break;
+
+ /* Enlarge the end entry lde_reclen from 0 to
+ * first entry of next lu_dirpage. */
+ LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
+ end_dirent->lde_reclen =
+ cpu_to_le16((char *)(dp->ldp_entries) -
+ (char *)end_dirent);
+ }
+
+ first->ldp_hash_end = hash_end;
+ first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+ first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+
+ cfs_kunmap(pages[i]);
+ }
+}
+#else
+#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
+#endif /* CFS_PAGE_SIZE > LU_PAGE_SIZE */
+
static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
- struct page **pages, struct ptlrpc_request **request)
+ struct page **pages, struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- __u64 offset = op_data->op_offset;
- int rc;
- int i;
- /* number of pages read, in CFS_PAGE_SIZE */
- int nrdpgs;
- /* number of pages transferred in LU_PAGE_SIZE */
- int nlupgs;
- struct lmv_tgt_desc *tgt;
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ __u64 offset = op_data->op_offset;
+ int rc;
+ int ncfspgs; /* pages read in CFS_PAGE_SIZE */
+ int nlupgs; /* pages read in LU_PAGE_SIZE */
+ struct lmv_tgt_desc *tgt;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
offset, PFID(&op_data->op_fid1));
- /*
- * This case handle directory lookup in clustered metadata case (i.e.
- * split directory is located on multiple md servers.)
- * each server keeps directory entries for certain range of hashes.
- * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
- * first server will keep records with hashes [ 0 ... MAX_HASH /N - 1],
- * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
- * so on....
- * readdir can simply start reading entries from 0 - N server in
- * order but that will not scale well as all client will request dir in
- * to server in same order.
- * Following algorithm does optimization:
- * Instead of doing readdir in 1, 2, ...., N order, client with a
- * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
- * (every client has rank R)
- * But ll_readdir() expect offset range [0 to MAX_HASH/N) but
- * since client ask dir from MDS{R} client has pages with offsets
- * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
- * on hash values that we get.
- * Since these codes might be still useful for sharded directory, so
- * Keeping this code for further reference
- if (0) {
- LASSERT(nr > 0);
- seg_size = MAX_HASH_SIZE;
- do_div(seg_size, nr);
- los = obj->lo_stripes;
- tgt = lmv_get_target(lmv, los[0].ls_mds);
- rank = lmv_node_rank(tgt->ltd_exp, fid) % nr;
- tgt_tmp = offset;
- do_div(tgt_tmp, seg_size);
- tgt0_idx = do_div(tgt_tmp, nr);
- tgt_idx = (tgt0_idx + rank) % nr;
-
- if (tgt_idx < tgt0_idx)
- * Wrap around.
- *
- * Last segment has unusual length due to division
- * rounding.
- hash_adj = MAX_HASH_SIZE - seg_size * nr;
- else
- hash_adj = 0;
-
- hash_adj += rank * seg_size;
-
- CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
- LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
- offset, tgt0_idx, offset + hash_adj, tgt_idx);
-
- offset = (offset + hash_adj) & MAX_HASH_SIZE;
- rid = lsm->mea_oinfo[tgt_idx].lmo_fid;
- tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds);
-
- CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
- PFID(&rid), (unsigned long)offset, tgt_idx);
- }
- */
tgt = lmv_find_target(lmv, &op_data->op_fid1);
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
if (rc != 0)
RETURN(rc);
- nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
+ ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
>> CFS_PAGE_SHIFT;
nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
- LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
+ LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
- CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
+ CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
op_data->op_npages);
- for (i = 0; i < nrdpgs; i++) {
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- struct lu_dirpage *first;
- __u64 hash_end = 0;
- __u32 flags = 0;
-#endif
- struct lu_dirent *tmp = NULL;
+ lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
- dp = cfs_kmap(pages[i]);
- ent = lu_dirent_start(dp);
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- first = dp;
- hash_end = dp->ldp_hash_end;
-repeat:
-#endif
- nlupgs--;
-
- for (tmp = ent; ent != NULL;
- tmp = ent, ent = lu_dirent_next(ent));
-#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
- if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
- ent = lu_dirent_start(dp);
-
- if (tmp) {
- /* enlarge the end entry lde_reclen from 0 to
- * first entry of next lu_dirpage, in this way
- * several lu_dirpages can be stored into one
- * client page on client. */
- tmp = ((void *)tmp) +
- le16_to_cpu(tmp->lde_reclen);
- tmp->lde_reclen =
- cpu_to_le16((char *)(dp->ldp_entries) -
- (char *)tmp);
- goto repeat;
- }
- }
- first->ldp_hash_end = hash_end;
- first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
- first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
-#else
- SET_BUT_UNUSED(tmp);
-#endif
- cfs_kunmap(pages[i]);
- }
RETURN(rc);
}