+ dp = kmap(page);
+ hash = le64_to_cpu(dp->ldp_hash_start);
+ kunmap(page);
+
+ offset = hash_x_index(hash, rp->rp_hash64);
+
+ prefetchw(&page->flags);
+ ret = add_to_page_cache_lru(page, inode->i_mapping, offset,
+ GFP_KERNEL);
+ if (ret == 0)
+ unlock_page(page);
+ else
+ CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:"
+ " rc = %d\n", offset, ret);
+ page_cache_release(page);
+ }
+
+ if (page_pool != &page0)
+ OBD_FREE(page_pool, sizeof(page_pool[0]) * max_pages);
+
+ RETURN(rc);
+}
+
+/**
+ * Read dir page from cache first, if it can not find it, read it from
+ * server and add into the cache.
+ *
+ * \param[in] exp MDC export
+ * \param[in] op_data client MD stack parameters, transfering parameters
+ * between different layers on client MD stack.
+ * \param[in] cb_op callback required for ldlm lock enqueue during
+ * read page
+ * \param[in] hash_offset the hash offset of the page to be read
+ * \param[in] ppage the page to be read
+ *
+ * retval = 0 get the page successfully
+ * errno(<0) get the page failed
+ */
+static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
+ struct md_callback *cb_op, __u64 hash_offset,
+ struct page **ppage)
+{
+ struct lookup_intent it = { .it_op = IT_READDIR };
+ struct page *page;
+ struct inode *dir = op_data->op_data;
+ struct address_space *mapping;
+ struct lu_dirpage *dp;
+ __u64 start = 0;
+ __u64 end = 0;
+ struct lustre_handle lockh;
+ struct ptlrpc_request *enq_req = NULL;
+ struct readpage_param rp_param;
+ int rc;
+
+ ENTRY;
+
+ *ppage = NULL;
+
+ LASSERT(dir != NULL);
+ mapping = dir->i_mapping;
+
+ rc = mdc_intent_lock(exp, op_data, &it, &enq_req,
+ cb_op->md_blocking_ast, 0);
+ if (enq_req != NULL)
+ ptlrpc_req_finished(enq_req);
+
+ if (rc < 0) {
+ CERROR("%s: "DFID" lock enqueue fails: rc = %d\n",
+ exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rc);
+ RETURN(rc);
+ }
+
+ rc = 0;
+ mdc_set_lock_data(exp, &it.d.lustre.it_lock_handle, dir, NULL);
+
+ rp_param.rp_off = hash_offset;
+ rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64;
+ page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end,
+ rp_param.rp_hash64);
+ if (IS_ERR(page)) {
+ CERROR("%s: dir page locate: "DFID" at "LPU64": rc %ld\n",
+ exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
+ rp_param.rp_off, PTR_ERR(page));
+ GOTO(out_unlock, rc = PTR_ERR(page));
+ } else if (page != NULL) {
+ /*
+ * XXX nikita: not entirely correct handling of a corner case:
+ * suppose hash chain of entries with hash value HASH crosses
+ * border between pages P0 and P1. First both P0 and P1 are
+ * cached, seekdir() is called for some entry from the P0 part
+ * of the chain. Later P0 goes out of cache. telldir(HASH)
+ * happens and finds P1, as it starts with matching hash
+ * value. Remaining entries from P0 part of the chain are
+ * skipped. (Is that really a bug?)
+ *
+ * Possible solutions: 0. don't cache P1 is such case, handle
+ * it as an "overflow" page. 1. invalidate all pages at
+ * once. 2. use HASH|1 as an index for P1.
+ */
+ GOTO(hash_collision, page);
+ }
+
+ rp_param.rp_exp = exp;
+ rp_param.rp_mod = op_data;
+ page = read_cache_page(mapping,
+ hash_x_index(rp_param.rp_off,
+ rp_param.rp_hash64),
+ mdc_read_page_remote, &rp_param);
+ if (IS_ERR(page)) {
+ CDEBUG(D_INFO, "%s: read cache page: "DFID" at "LPU64": %ld\n",
+ exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
+ rp_param.rp_off, PTR_ERR(page));
+ GOTO(out_unlock, rc = PTR_ERR(page));
+ }
+
+ wait_on_page_locked(page);
+ (void)kmap(page);
+ if (!PageUptodate(page)) {
+ CERROR("%s: page not updated: "DFID" at "LPU64": rc %d\n",
+ exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
+ rp_param.rp_off, -5);
+ goto fail;
+ }
+ if (!PageChecked(page))
+ SetPageChecked(page);
+ if (PageError(page)) {
+ CERROR("%s: page error: "DFID" at "LPU64": rc %d\n",
+ exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
+ rp_param.rp_off, -5);
+ goto fail;
+ }
+
+hash_collision:
+ dp = page_address(page);
+ if (BITS_PER_LONG == 32 && rp_param.rp_hash64) {
+ start = le64_to_cpu(dp->ldp_hash_start) >> 32;
+ end = le64_to_cpu(dp->ldp_hash_end) >> 32;
+ rp_param.rp_off = hash_offset >> 32;
+ } else {
+ start = le64_to_cpu(dp->ldp_hash_start);
+ end = le64_to_cpu(dp->ldp_hash_end);
+ rp_param.rp_off = hash_offset;
+ }
+ if (end == start) {
+ LASSERT(start == rp_param.rp_off);
+ CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end);
+#if BITS_PER_LONG == 32
+ CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with "
+ "hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start),
+ le64_to_cpu(dp->ldp_hash_end), hash_offset);
+#endif
+
+ /*
+ * Fetch whole overflow chain...
+ *
+ * XXX not yet.
+ */
+ goto fail;
+ }
+ *ppage = page;
+out_unlock:
+ lockh.cookie = it.d.lustre.it_lock_handle;
+ ldlm_lock_decref(&lockh, it.d.lustre.it_lock_mode);
+ it.d.lustre.it_lock_handle = 0;
+ return rc;
+fail:
+ kunmap(page);
+ mdc_release_page(page, 1);
+ rc = -EIO;
+ goto out_unlock;