* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
#include <linux/smp_lock.h>
#include <asm/uaccess.h>
#include <linux/buffer_head.h> // for wait_on_buffer
+#include <linux/pagevec.h>
#define DEBUG_SUBSYSTEM S_LLITE
*
* page format
*
- *
- *
- *
+ * Page in MDS_READPAGE RPC is packed in LU_PAGE_SIZE, and each page contains
+ * a header lu_dirpage which describes the start/end hash, and whether this
+ * page is empty (contains no dir entry) or hash collide with next page.
+ * After client receives reply, several pages will be integrated into dir page
+ * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the
+ * lu_dirpage for this integrated page will be adjusted.
*
*/
/* returns the page unlocked, but with a reference */
-static int ll_dir_readpage(struct file *file, struct page *page)
+static int ll_dir_readpage(struct file *file, struct page *page0)
{
- struct inode *inode = page->mapping->host;
+ struct inode *inode = page0->mapping->host;
+ int hash64 = ll_i2sbi(inode)->ll_flags & LL_SBI_64BIT_HASH;
+ struct obd_export *exp = ll_i2sbi(inode)->ll_md_exp;
struct ptlrpc_request *request;
struct mdt_body *body;
struct obd_capa *oc;
__u64 hash;
+ struct page **page_pool;
+ struct page *page;
+#ifndef HAVE_ADD_TO_PAGE_CACHE_LRU
+ struct pagevec lru_pvec;
+#endif
+ struct lu_dirpage *dp;
+ int max_pages = ll_i2sbi(inode)->ll_md_brw_size >> CFS_PAGE_SHIFT;
+ int nrdpgs = 0; /* number of pages read actually */
+ int npages;
+ int i;
int rc;
ENTRY;
hash = lli->lli_sa_pos;
cfs_spin_unlock(&lli->lli_sa_lock);
}
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n",
- inode->i_ino, inode->i_generation, inode, (unsigned long)hash);
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) hash "LPU64"\n",
+ inode->i_ino, inode->i_generation, inode, hash);
+
+ LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES);
+
+ OBD_ALLOC(page_pool, sizeof(page) * max_pages);
+ if (page_pool != NULL) {
+ page_pool[0] = page0;
+ } else {
+ page_pool = &page0;
+ max_pages = 1;
+ }
+ for (npages = 1; npages < max_pages; npages++) {
+ page = page_cache_alloc_cold(inode->i_mapping);
+ if (!page)
+ break;
+ page_pool[npages] = page;
+ }
oc = ll_mdscapa_get(inode);
- rc = md_readpage(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode),
- oc, hash, page, &request);
+ rc = md_readpage(exp, ll_inode2fid(inode), oc, hash, page_pool, npages,
+ &request);
capa_put(oc);
- if (!rc) {
+ if (rc == 0) {
body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
/* Checked by mdc_readpage() */
LASSERT(body != NULL);
if (body->valid & OBD_MD_FLSIZE)
cl_isize_write(inode, body->size);
- SetPageUptodate(page);
+
+ nrdpgs = (request->rq_bulk->bd_nob_transferred+CFS_PAGE_SIZE-1)
+ >> CFS_PAGE_SHIFT;
+ SetPageUptodate(page0);
}
+ unlock_page(page0);
ptlrpc_req_finished(request);
- unlock_page(page);
+ CDEBUG(D_VFSTRACE, "read %d/%d pages\n", nrdpgs, npages);
+
+ ll_pagevec_init(&lru_pvec, 0);
+ for (i = 1; i < npages; i++) {
+ unsigned long offset;
+ int ret;
+
+ page = page_pool[i];
+
+ if (rc < 0 || i >= nrdpgs) {
+ page_cache_release(page);
+ continue;
+ }
+
+ SetPageUptodate(page);
+
+ dp = cfs_kmap(page);
+ hash = le64_to_cpu(dp->ldp_hash_start);
+ cfs_kunmap(page);
+
+ offset = hash_x_index(hash, hash64);
+
+ prefetchw(&page->flags);
+ ret = ll_add_to_page_cache_lru(page, inode->i_mapping, offset,
+ GFP_KERNEL);
+ if (ret == 0) {
+ unlock_page(page);
+ page_cache_get(page);
+ if (ll_pagevec_add(&lru_pvec, page) == 0)
+ ll_pagevec_lru_add_file(&lru_pvec);
+ } else {
+ CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:"
+ " %d\n", offset, ret);
+ }
+ page_cache_release(page);
+ }
+ ll_pagevec_lru_add_file(&lru_pvec);
+
+ if (page_pool != &page0)
+ OBD_FREE(page_pool, sizeof(struct page *) * max_pages);
EXIT;
return rc;
}
*/
wait_on_page(page);
if (PageUptodate(page)) {
- dp = kmap(page);
+ dp = cfs_kmap(page);
if (BITS_PER_LONG == 32 && hash64) {
*start = le64_to_cpu(dp->ldp_hash_start) >> 32;
*end = le64_to_cpu(dp->ldp_hash_end) >> 32;
}
LASSERTF(*start <= *hash, "start = "LPX64",end = "
LPX64",hash = "LPX64"\n", *start, *end, *hash);
+ CDEBUG(D_VFSTRACE, "page %lu [%llu %llu], hash "LPU64"\n",
+ offset, *start, *end, *hash);
if (*hash > *end || (*end != *start && *hash == *end)) {
- ll_release_page(page, *hash, *start, *end);
+ /*
+ * upon hash collision, remove this page,
+ * otherwise put page reference, and
+ * ll_get_dir_page() will issue RPC to fetch
+ * the page we want.
+ */
+ if (dp->ldp_flags & cpu_to_le32(LDF_COLLIDE)) {
+ ll_release_page(page, *hash, *start, *end);
+ } else {
+ cfs_kunmap(page);
+ page_cache_release(page);
+ }
page = NULL;
}
} else {