-/* main purpose of LMV blocking ast is to remove split directory LMV
- * presentation object (struct lmv_obj) attached to the lock being revoked. */
-int lmv_blocking_ast(struct ldlm_lock *lock,
- struct ldlm_lock_desc *desc,
- void *data, int flag)
-{
- struct lustre_handle lockh;
- struct lmv_obj *obj;
- int rc;
- ENTRY;
-
- switch (flag) {
- case LDLM_CB_BLOCKING:
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc < 0) {
- CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
- RETURN(rc);
- }
- break;
- case LDLM_CB_CANCELING:
- /* time to drop cached attrs for dirobj */
- obj = lock->l_ast_data;
- if (obj) {
- CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
- ", master "DFID"\n",
- lock->l_resource->lr_name.name[3] == 1 ?
- "LOOKUP" : "UPDATE",
- lock->l_resource->lr_name.name[0],
- lock->l_resource->lr_name.name[1],
- PFID(&obj->lo_fid));
- lmv_obj_put(obj);
- }
- break;
- default:
- LBUG();
- }
- RETURN(0);
-}
-
-static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
-{
- __u64 val;
-
- val = le64_to_cpu(*hash);
- if (val < hash_adj)
- val += MAX_HASH_SIZE;
- if (val != DIR_END_OFF)
- *hash = cpu_to_le64(val - hash_adj);
-}
-
-static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
-{
- __u64 id;
- struct obd_import *imp;
-
- /*
- * XXX Hack: to get nid we assume that underlying obd device is mdc.
- */
- imp = class_exp2cliimp(exp);
- id = imp->imp_connection->c_self + fid_flatten(fid);
-
- CDEBUG(D_INFO, "node rank: %llx "DFID" %llx %llx\n",
- imp->imp_connection->c_self, PFID(fid), id, id ^ (id >> 32));
-
- return id ^ (id >> 32);
-}
-
-static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
- struct obd_capa *oc, __u64 offset64, struct page *page,
- struct ptlrpc_request **request)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- struct lu_fid rid = *fid;
- struct lmv_obj *obj;
- __u64 offset;
- __u64 hash_adj = 0;
- __u32 rank = 0;
- __u64 seg_size = 0;
- __u64 tgt_tmp = 0;
- int tgt = 0;
- int tgt0 = 0;
- int rc;
- int nr = 0;
- ENTRY;
-
- offset = offset64;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- CDEBUG(D_INFO, "READPAGE at %llx from "DFID"\n", offset, PFID(&rid));
-
- obj = lmv_obj_grab(obd, fid);
- if (obj) {
-
- /*
- * This case handle directory lookup in clustered metadata case (i.e.
- * split directory is located on multiple md servers.)
- * each server keeps directory entries for certain range of hashes.
- * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
- * first server will keep records with hashes [ 0 ... MAX_HASH / N - 1],
- * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
- * so on....
- * readdir can simply start reading entries from 0 - N server in
- * order but that will not scale well as all client will request dir in
- * to server in same order.
- * Following algorithm does optimization:
- * Instead of doing readdir in 1, 2, ...., N order, client with a
- * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
- * (every client has rank R)
- * But ll_readdir() expect offset range [0 to MAX_HASH/N) but
- * since client ask dir from MDS{R} client has pages with offsets
- * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
- * on hash values that we get.
- */
-
- struct lmv_inode *loi;
-
- lmv_obj_lock(obj);
-
- nr = obj->lo_objcount;
- LASSERT(nr > 0);
- seg_size = MAX_HASH_SIZE;
- do_div(seg_size, nr);
- loi = obj->lo_inodes;
- rank = lmv_node_rank(lmv_get_export(lmv, loi[0].li_mds),
- fid) % nr;
- tgt_tmp = offset;
- do_div(tgt_tmp, seg_size);
- tgt0 = do_div(tgt_tmp, nr);
- tgt = (tgt0 + rank) % nr;
-
- if (tgt < tgt0)
- /*
- * Wrap around.
- *
- * Last segment has unusual length due to division
- * rounding.
- */
- hash_adj = MAX_HASH_SIZE - seg_size * nr;
- else
- hash_adj = 0;
-
- hash_adj += rank * seg_size;
-
- CDEBUG(D_INFO, "hash_adj: %x %llx %llx/%x -> %llx/%x\n",
- rank, hash_adj, offset, tgt0, offset + hash_adj, tgt);
-
- offset = (offset + hash_adj) & MAX_HASH_SIZE;
- rid = obj->lo_inodes[tgt].li_fid;
- tgt_exp = lmv_get_export(lmv, loi[tgt].li_mds);
-
- CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n",
- PFID(&rid), (unsigned long)offset, tgt);
- } else
- tgt_exp = lmv_find_export(lmv, &rid);
-
- if (IS_ERR(tgt_exp))
- GOTO(cleanup, rc = PTR_ERR(tgt_exp));
-
- rc = md_readpage(tgt_exp, &rid, oc, offset, page, request);
- if (rc)
- GOTO(cleanup, rc);
- if (obj) {
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
-
- dp = cfs_kmap(page);
-
- lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
- lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
- LASSERT(le64_to_cpu(dp->ldp_hash_start) <= offset64);
-
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent))
- lmv_hash_adjust(&ent->lde_hash, hash_adj);
-
- if (tgt0 != nr - 1) {
- __u64 end;
-
- end = le64_to_cpu(dp->ldp_hash_end);
- if (end == DIR_END_OFF) {
- dp->ldp_hash_end = cpu_to_le32(seg_size *
- (tgt0 + 1));
- CDEBUG(D_INFO, ""DFID" reset end %llx tgt %d\n",
- PFID(&rid),
- le64_to_cpu(dp->ldp_hash_end), tgt);
- }
- }
- cfs_kunmap(page);
- }
- /*
- * Here we could remove "." and ".." from all pages which at not from
- * master. But MDS has only "." and ".." for master dir.
- */
- EXIT;
-cleanup:
- if (obj) {
- lmv_obj_unlock(obj);
- lmv_obj_put(obj);
- }
- return rc;
+/*
+ * Adjust a set of pages, each page containing an array of lu_dirpages,
+ * so that each page can be used as a single logical lu_dirpage.
+ *
+ * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
+ * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
+ * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end
+ * value is used as a cookie to request the next lu_dirpage in a
+ * directory listing that spans multiple pages (two in this example):
+ * ________
+ * | |
+ * .|--------v------- -----.
+ * |s|e|f|p|ent|ent| ... |ent|
+ * '--|-------------- -----' Each CFS_PAGE contains a single
+ * '------. lu_dirpage.
+ * .---------v------- -----.
+ * |s|e|f|p|ent| 0 | ... | 0 |
+ * '----------------- -----'
+ *
+ * However, on hosts where the native VM page size (CFS_PAGE_SIZE) is
+ * larger than LU_PAGE_SIZE, a single host page may contain multiple
+ * lu_dirpages. After reading the lu_dirpages from the MDS, the
+ * ldp_hash_end of the first lu_dirpage refers to the one immediately
+ * after it in the same CFS_PAGE (arrows simplified for brevity, but
+ * in general e0==s1, e1==s2, etc.):
+ *
+ * .-------------------- -----.
+ * |s0|e0|f0|p|ent|ent| ... |ent|
+ * |---v---------------- -----|
+ * |s1|e1|f1|p|ent|ent| ... |ent|
+ * |---v---------------- -----| Here, each CFS_PAGE contains
+ * ... multiple lu_dirpages.
+ * |---v---------------- -----|
+ * |s'|e'|f'|p|ent|ent| ... |ent|
+ * '---|---------------- -----'
+ * v
+ * .----------------------------.
+ * | next CFS_PAGE |
+ *
+ * This structure is transformed into a single logical lu_dirpage as follows:
+ *
+ * - Replace e0 with e' so the request for the next lu_dirpage gets the page
+ * labeled 'next CFS_PAGE'.
+ *
+ * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
+ * a hash collision with the next page exists.
+ *
+ * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
+ * to the first entry of the next lu_dirpage.
+ */
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
+{
+ int i;
+
+ for (i = 0; i < ncfspgs; i++) {
+ struct lu_dirpage *dp = cfs_kmap(pages[i]);
+ struct lu_dirpage *first = dp;
+ struct lu_dirent *end_dirent = NULL;
+ struct lu_dirent *ent;
+ __u64 hash_end = dp->ldp_hash_end;
+ __u32 flags = dp->ldp_flags;
+
+ for (; nlupgs > 1; nlupgs--) {
+ ent = lu_dirent_start(dp);
+ for (end_dirent = ent; ent != NULL;
+ end_dirent = ent, ent = lu_dirent_next(ent));
+
+ /* Advance dp to next lu_dirpage. */
+ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+
+ /* Check if we've reached the end of the CFS_PAGE. */
+ if (!((unsigned long)dp & ~CFS_PAGE_MASK))
+ break;
+
+ /* Save the hash and flags of this lu_dirpage. */
+ hash_end = dp->ldp_hash_end;
+ flags = dp->ldp_flags;
+
+ /* Check if lu_dirpage contains no entries. */
+ if (!end_dirent)
+ break;
+
+ /* Enlarge the end entry lde_reclen from 0 to
+ * first entry of next lu_dirpage. */
+ LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
+ end_dirent->lde_reclen =
+ cpu_to_le16((char *)(dp->ldp_entries) -
+ (char *)end_dirent);
+ }
+
+ first->ldp_hash_end = hash_end;
+ first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+ first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+
+ cfs_kunmap(pages[i]);
+ }