* lmv_adjust_dirpages().
*
*/
-
-/* returns the page unlocked, but with a reference */
-static int ll_dir_filler(void *_hash, struct page *page0)
+/**
+ * The following three APIs will be used by llite to iterate directory
+ * entries from MDC dir page caches.
+ *
+ * ll_dir_entry_start(next) will lookup(return) entry by op_hash_offset.
+ * To avoid extra memory allocation, the @entry will be pointed to
+ * the dir entries in MDC page directly, so these pages can not be released
+ * until the entry has been accessed in ll_readdir(or statahead).
+ *
+ * The iterate process will be
+ *
+ * ll_dir_entry_start: locate the page in MDC, and return the first entry.
+ * hold the page.
+ *
+ * ll_dir_entry_next: return the next entry in the current page, if it reaches
+ * to the end, release current page.
+ *
+ * ll_dir_entry_end: release the last page.
+ **/
+struct lu_dirent *ll_dir_entry_start(struct inode *dir,
+ struct md_op_data *op_data)
{
- struct inode *inode = page0->mapping->host;
- int hash64 = ll_i2sbi(inode)->ll_flags & LL_SBI_64BIT_HASH;
- struct obd_export *exp = ll_i2sbi(inode)->ll_md_exp;
- struct ptlrpc_request *request;
- struct mdt_body *body;
- struct md_op_data *op_data;
- __u64 hash = *((__u64 *)_hash);
- struct page **page_pool;
- struct page *page;
- struct lu_dirpage *dp;
- int max_pages = ll_i2sbi(inode)->ll_md_brw_size >> PAGE_CACHE_SHIFT;
- int nrdpgs = 0; /* number of pages read actually */
- int npages;
- int i;
+ struct lu_dirent *entry;
+ struct md_callback cb_op;
int rc;
- ENTRY;
-
- CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p) hash "LPU64"\n",
- PFID(ll_inode2fid(inode)), inode, hash);
-
- LASSERT(max_pages > 0 && max_pages <= MD_MAX_BRW_PAGES);
-
- op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
- LUSTRE_OPC_ANY, NULL);
- if (IS_ERR(op_data))
- RETURN(PTR_ERR(op_data));
-
- OBD_ALLOC(page_pool, sizeof(page) * max_pages);
- if (page_pool != NULL) {
- page_pool[0] = page0;
- } else {
- page_pool = &page0;
- max_pages = 1;
- }
- for (npages = 1; npages < max_pages; npages++) {
- page = page_cache_alloc_cold(inode->i_mapping);
- if (!page)
- break;
- page_pool[npages] = page;
- }
-
- op_data->op_npages = npages;
- op_data->op_offset = hash;
- rc = md_readpage(exp, op_data, page_pool, &request);
- ll_finish_md_op_data(op_data);
- if (rc == 0) {
- body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
- /* Checked by mdc_readpage() */
- LASSERT(body != NULL);
-
- if (body->valid & OBD_MD_FLSIZE)
- cl_isize_write(inode, body->size);
-
- nrdpgs = (request->rq_bulk->bd_nob_transferred +
- PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
- SetPageUptodate(page0);
- }
- unlock_page(page0);
- ptlrpc_req_finished(request);
-
- CDEBUG(D_VFSTRACE, "read %d/%d pages\n", nrdpgs, npages);
-
- for (i = 1; i < npages; i++) {
- unsigned long offset;
- int ret;
- page = page_pool[i];
-
- if (rc < 0 || i >= nrdpgs) {
- page_cache_release(page);
- continue;
- }
-
- SetPageUptodate(page);
-
- dp = kmap(page);
- hash = le64_to_cpu(dp->ldp_hash_start);
- kunmap(page);
-
- offset = hash_x_index(hash, hash64);
-
- prefetchw(&page->flags);
- ret = add_to_page_cache_lru(page, inode->i_mapping, offset,
- GFP_KERNEL);
- if (ret == 0)
- unlock_page(page);
- else
- CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:"
- " %d\n", offset, ret);
- page_cache_release(page);
- }
-
- if (page_pool != &page0)
- OBD_FREE(page_pool, sizeof(struct page *) * max_pages);
-
- RETURN(rc);
+ cb_op.md_blocking_ast = ll_md_blocking_ast;
+ rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry);
+ if (rc != 0)
+ entry = ERR_PTR(rc);
+ return entry;
}
-static void ll_check_page(struct inode *dir, struct page *page)
+struct lu_dirent *ll_dir_entry_next(struct inode *dir,
+ struct md_op_data *op_data,
+ struct lu_dirent *ent)
{
- /* XXX: check page format later */
- SetPageChecked(page);
-}
+ struct lu_dirent *entry;
+ struct md_callback cb_op;
+ int rc;
-void ll_release_page(struct page *page, int remove)
-{
- kunmap(page);
- if (remove) {
- lock_page(page);
- if (likely(page->mapping != NULL))
- truncate_complete_page(page->mapping, page);
- unlock_page(page);
- }
- page_cache_release(page);
+ cb_op.md_blocking_ast = ll_md_blocking_ast;
+ op_data->op_hash_offset = le64_to_cpu(ent->lde_hash);
+ rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry);
+ if (rc != 0)
+ entry = ERR_PTR(rc);
+ return entry;
}
-/*
- * Find, kmap and return page that contains given hash.
- */
-static struct page *ll_dir_page_locate(struct inode *dir, __u64 *hash,
- __u64 *start, __u64 *end)
+void ll_dir_entry_end(struct inode *dir, struct md_op_data *op_data,
+ struct lu_dirent *ent)
{
- int hash64 = ll_i2sbi(dir)->ll_flags & LL_SBI_64BIT_HASH;
- struct address_space *mapping = dir->i_mapping;
- /*
- * Complement of hash is used as an index so that
- * radix_tree_gang_lookup() can be used to find a page with starting
- * hash _smaller_ than one we are looking for.
- */
- unsigned long offset = hash_x_index(*hash, hash64);
- struct page *page;
- int found;
-
- spin_lock_irq(&mapping->tree_lock);
- found = radix_tree_gang_lookup(&mapping->page_tree,
- (void **)&page, offset, 1);
- if (found > 0) {
- struct lu_dirpage *dp;
-
- page_cache_get(page);
- spin_unlock_irq(&mapping->tree_lock);
- /*
- * In contrast to find_lock_page() we are sure that directory
- * page cannot be truncated (while DLM lock is held) and,
- * hence, can avoid restart.
- *
- * In fact, page cannot be locked here at all, because
- * ll_dir_filler() does synchronous io.
- */
- wait_on_page_locked(page);
- if (PageUptodate(page)) {
- dp = kmap(page);
- if (BITS_PER_LONG == 32 && hash64) {
- *start = le64_to_cpu(dp->ldp_hash_start) >> 32;
- *end = le64_to_cpu(dp->ldp_hash_end) >> 32;
- *hash = *hash >> 32;
- } else {
- *start = le64_to_cpu(dp->ldp_hash_start);
- *end = le64_to_cpu(dp->ldp_hash_end);
- }
- LASSERTF(*start <= *hash, "start = "LPX64",end = "
- LPX64",hash = "LPX64"\n", *start, *end, *hash);
- CDEBUG(D_VFSTRACE, "page %lu [%llu %llu], hash "LPU64"\n",
- offset, *start, *end, *hash);
- if (*hash > *end) {
- ll_release_page(page, 0);
- page = NULL;
- } else if (*end != *start && *hash == *end) {
- /*
- * upon hash collision, remove this page,
- * otherwise put page reference, and
- * ll_get_dir_page() will issue RPC to fetch
- * the page we want.
- */
- ll_release_page(page,
- le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
- page = NULL;
- }
- } else {
- page_cache_release(page);
- page = ERR_PTR(-EIO);
- }
+ struct lu_dirent *entry;
+ struct md_callback cb_op;
- } else {
- spin_unlock_irq(&mapping->tree_lock);
- page = NULL;
- }
- return page;
+ cb_op.md_blocking_ast = ll_md_blocking_ast;
+ op_data->op_cli_flags = CLI_READENT_END;
+ md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry);
+ return;
}
-struct page *ll_get_dir_page(struct inode *dir, __u64 hash,
- struct ll_dir_chain *chain)
+int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+ void *cookie, filldir_t filldir)
{
- ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} };
- struct address_space *mapping = dir->i_mapping;
- struct lustre_handle lockh;
- struct lu_dirpage *dp;
- struct page *page;
- ldlm_mode_t mode;
- int rc;
- __u64 start = 0;
- __u64 end = 0;
- __u64 lhash = hash;
- struct ll_inode_info *lli = ll_i2info(dir);
- int hash64 = ll_i2sbi(dir)->ll_flags & LL_SBI_64BIT_HASH;
-
- mode = LCK_PR;
- rc = md_lock_match(ll_i2sbi(dir)->ll_md_exp, LDLM_FL_BLOCK_GRANTED,
- ll_inode2fid(dir), LDLM_IBITS, &policy, mode, &lockh);
- if (!rc) {
- struct ldlm_enqueue_info einfo = {
- .ei_type = LDLM_IBITS,
- .ei_mode = mode,
- .ei_cb_bl = ll_md_blocking_ast,
- .ei_cb_cp = ldlm_completion_ast,
- };
- struct lookup_intent it = { .it_op = IT_READDIR };
- struct ptlrpc_request *request;
- struct md_op_data *op_data;
-
- op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
- LUSTRE_OPC_ANY, NULL);
- if (IS_ERR(op_data))
- return (void *)op_data;
-
- rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, &einfo, &it,
- op_data, &lockh, NULL, 0, NULL, 0);
-
- ll_finish_md_op_data(op_data);
-
- request = (struct ptlrpc_request *)it.d.lustre.it_data;
- if (request)
- ptlrpc_req_finished(request);
- if (rc < 0) {
- CERROR("lock enqueue: "DFID" at "LPU64": rc %d\n",
- PFID(ll_inode2fid(dir)), hash, rc);
- return ERR_PTR(rc);
- }
-
- CDEBUG(D_INODE, "setting lr_lvb_inode to inode "DFID"(%p)\n",
- PFID(ll_inode2fid(dir)), dir);
- md_set_lock_data(ll_i2sbi(dir)->ll_md_exp,
- &it.d.lustre.it_lock_handle, dir, NULL);
- } else {
- /* for cross-ref object, l_ast_data of the lock may not be set,
- * we reset it here */
- md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, &lockh.cookie,
- dir, NULL);
- }
- ldlm_lock_dump_handle(D_OTHER, &lockh);
-
- mutex_lock(&lli->lli_readdir_mutex);
- page = ll_dir_page_locate(dir, &lhash, &start, &end);
- if (IS_ERR(page)) {
- CERROR("dir page locate: "DFID" at "LPU64": rc %ld\n",
- PFID(ll_inode2fid(dir)), lhash, PTR_ERR(page));
- GOTO(out_unlock, page);
- } else if (page != NULL) {
- /*
- * XXX nikita: not entirely correct handling of a corner case:
- * suppose hash chain of entries with hash value HASH crosses
- * border between pages P0 and P1. First both P0 and P1 are
- * cached, seekdir() is called for some entry from the P0 part
- * of the chain. Later P0 goes out of cache. telldir(HASH)
- * happens and finds P1, as it starts with matching hash
- * value. Remaining entries from P0 part of the chain are
- * skipped. (Is that really a bug?)
- *
- * Possible solutions: 0. don't cache P1 is such case, handle
- * it as an "overflow" page. 1. invalidate all pages at
- * once. 2. use HASH|1 as an index for P1.
- */
- GOTO(hash_collision, page);
- }
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_dir_chain chain;
+ struct lu_dirent *ent;
+ int api32 = ll_need_32bit_api(sbi);
+ int hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH;
+ int done = 0;
+ int rc = 0;
+ __u64 hash = MDS_DIR_END_OFF;
+ __u64 last_hash = MDS_DIR_END_OFF;
+ ENTRY;
- page = read_cache_page(mapping, hash_x_index(hash, hash64),
- ll_dir_filler, &lhash);
- if (IS_ERR(page)) {
- CERROR("read cache page: "DFID" at "LPU64": rc %ld\n",
- PFID(ll_inode2fid(dir)), hash, PTR_ERR(page));
- GOTO(out_unlock, page);
- }
+ ll_dir_chain_init(&chain);
+ for (ent = ll_dir_entry_start(inode, op_data);
+ ent != NULL && !IS_ERR(ent) && !done;
+ ent = ll_dir_entry_next(inode, op_data, ent)) {
+ __u16 type;
+ int namelen;
+ struct lu_fid fid;
+ __u64 lhash;
+ __u64 ino;
+
+ hash = le64_to_cpu(ent->lde_hash);
+ if (hash < op_data->op_hash_offset)
+ /*
+ * Skip until we find target hash
+ * value.
+ */
+ continue;
+ namelen = le16_to_cpu(ent->lde_namelen);
+ if (namelen == 0)
+ /*
+ * Skip dummy record.
+ */
+ continue;
- wait_on_page_locked(page);
- (void)kmap(page);
- if (!PageUptodate(page)) {
- CERROR("page not updated: "DFID" at "LPU64": rc %d\n",
- PFID(ll_inode2fid(dir)), hash, -5);
- goto fail;
+ if (api32 && hash64)
+ lhash = hash >> 32;
+ else
+ lhash = hash;
+ fid_le_to_cpu(&fid, &ent->lde_fid);
+ ino = cl_fid_build_ino(&fid, api32);
+ type = ll_dirent_type_get(ent);
+ /* For 'll_nfs_get_name_filldir()', it will try
+ * to access the 'ent' through its 'lde_name',
+ * so the parameter 'name' for 'filldir()' must
+ * be part of the 'ent'. */
+ done = filldir(cookie, ent->lde_name, namelen, lhash,
+ ino, type);
+ if (done) {
+ if (op_data->op_hash_offset != MDS_DIR_END_OFF)
+ op_data->op_hash_offset = last_hash;
+ break;
+ } else {
+ last_hash = hash;
+ }
}
- if (!PageChecked(page))
- ll_check_page(dir, page);
- if (PageError(page)) {
- CERROR("page error: "DFID" at "LPU64": rc %d\n",
- PFID(ll_inode2fid(dir)), hash, -5);
- goto fail;
- }
-hash_collision:
- dp = page_address(page);
- if (BITS_PER_LONG == 32 && hash64) {
- start = le64_to_cpu(dp->ldp_hash_start) >> 32;
- end = le64_to_cpu(dp->ldp_hash_end) >> 32;
- lhash = hash >> 32;
- } else {
- start = le64_to_cpu(dp->ldp_hash_start);
- end = le64_to_cpu(dp->ldp_hash_end);
- lhash = hash;
- }
- if (end == start) {
- LASSERT(start == lhash);
- CWARN("Page-wide hash collision: "LPU64"\n", end);
- if (BITS_PER_LONG == 32 && hash64)
- CWARN("Real page-wide hash collision at ["LPU64" "LPU64
- "] with hash "LPU64"\n",
- le64_to_cpu(dp->ldp_hash_start),
- le64_to_cpu(dp->ldp_hash_end), hash);
- /*
- * Fetch whole overflow chain...
- *
- * XXX not yet.
- */
- goto fail;
- }
-out_unlock:
- mutex_unlock(&lli->lli_readdir_mutex);
- ldlm_lock_decref(&lockh, mode);
- return page;
-
-fail:
- ll_release_page(page, 1);
- page = ERR_PTR(-EIO);
- goto out_unlock;
-}
-int ll_dir_read(struct inode *inode, __u64 *_pos, void *cookie,
- filldir_t filldir)
-{
- struct ll_inode_info *info = ll_i2info(inode);
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- __u64 pos = *_pos;
- int api32 = ll_need_32bit_api(sbi);
- int hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH;
- struct page *page;
- struct ll_dir_chain chain;
- int done = 0;
- int rc = 0;
- ENTRY;
+ if (IS_ERR(ent))
+ rc = PTR_ERR(ent);
+ else if (ent != NULL)
+ ll_dir_entry_end(inode, op_data, ent);
- ll_dir_chain_init(&chain);
-
- page = ll_get_dir_page(inode, pos, &chain);
-
- while (rc == 0 && !done) {
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
-
- if (!IS_ERR(page)) {
- /*
- * If page is empty (end of directory is reached),
- * use this value.
- */
- __u64 hash = MDS_DIR_END_OFF;
- __u64 next;
-
- dp = page_address(page);
- for (ent = lu_dirent_start(dp); ent != NULL && !done;
- ent = lu_dirent_next(ent)) {
- __u16 type;
- int namelen;
- struct lu_fid fid;
- __u64 lhash;
- __u64 ino;
-
- /*
- * XXX: implement correct swabbing here.
- */
-
- hash = le64_to_cpu(ent->lde_hash);
- if (hash < pos)
- /*
- * Skip until we find target hash
- * value.
- */
- continue;
-
- namelen = le16_to_cpu(ent->lde_namelen);
- if (namelen == 0)
- /*
- * Skip dummy record.
- */
- continue;
-
- if (api32 && hash64)
- lhash = hash >> 32;
- else
- lhash = hash;
- fid_le_to_cpu(&fid, &ent->lde_fid);
- ino = cl_fid_build_ino(&fid, api32);
- type = ll_dirent_type_get(ent);
- /* For 'll_nfs_get_name_filldir()', it will try
- * to access the 'ent' through its 'lde_name',
- * so the parameter 'name' for 'filldir()' must
- * be part of the 'ent'. */
- done = filldir(cookie, ent->lde_name, namelen,
- lhash, ino, type);
- }
- next = le64_to_cpu(dp->ldp_hash_end);
- if (!done) {
- pos = next;
- if (pos == MDS_DIR_END_OFF) {
- /*
- * End of directory reached.
- */
- done = 1;
- ll_release_page(page, 0);
- } else if (1 /* chain is exhausted*/) {
- /*
- * Normal case: continue to the next
- * page.
- */
- ll_release_page(page,
- le32_to_cpu(dp->ldp_flags) &
- LDF_COLLIDE);
- next = pos;
- page = ll_get_dir_page(inode, pos,
- &chain);
- } else {
- /*
- * go into overflow page.
- */
- LASSERT(le32_to_cpu(dp->ldp_flags) &
- LDF_COLLIDE);
- ll_release_page(page, 1);
- }
- } else {
- pos = hash;
- ll_release_page(page, 0);
- }
- } else {
- rc = PTR_ERR(page);
- CERROR("error reading dir "DFID" at %lu: rc %d\n",
- PFID(&info->lli_fid), (unsigned long)pos, rc);
- }
- }
-
- *_pos = pos;
ll_dir_chain_fini(&chain);
RETURN(rc);
}
struct inode *inode = filp->f_dentry->d_inode;
struct ll_file_data *lfd = LUSTRE_FPRIVATE(filp);
struct ll_sb_info *sbi = ll_i2sbi(inode);
- __u64 pos;
+ __u64 pos = lfd->lfd_pos;
int hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH;
int api32 = ll_need_32bit_api(sbi);
+ struct md_op_data *op_data;
int rc;
#ifdef HAVE_TOUCH_ATIME_1ARG
struct path path;
else
pos = 0;
- CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p) pos %lu/%llu "
- " 32bit_api %d\n", PFID(ll_inode2fid(inode)),
+ CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p) pos/size"
+ "%lu/%llu 32bit_api %d\n", PFID(ll_inode2fid(inode)),
inode, (unsigned long)pos, i_size_read(inode), api32);
if (pos == MDS_DIR_END_OFF)
*/
GOTO(out, rc = 0);
- rc = ll_dir_read(inode, &pos, cookie, filldir);
+ op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
+ LUSTRE_OPC_ANY, inode);
+ if (IS_ERR(op_data))
+ GOTO(out, rc = PTR_ERR(op_data));
+
+ op_data->op_hash_offset = pos;
+ op_data->op_max_pages = sbi->ll_md_brw_size >> PAGE_CACHE_SHIFT;
+ rc = ll_dir_read(inode, op_data, cookie, filldir);
if (lfd != NULL)
- lfd->lfd_pos = pos;
- if (pos == MDS_DIR_END_OFF) {
- if (api32)
- filp->f_pos = LL_DIR_END_OFF_32BIT;
- else
- filp->f_pos = LL_DIR_END_OFF;
- } else {
- if (api32 && hash64)
- filp->f_pos = pos >> 32;
- else
- filp->f_pos = pos;
- }
+ lfd->lfd_pos = op_data->op_hash_offset;
+
+ if (pos == MDS_DIR_END_OFF) {
+ if (api32)
+ filp->f_pos = LL_DIR_END_OFF_32BIT;
+ else
+ filp->f_pos = LL_DIR_END_OFF;
+ } else {
+ if (api32 && hash64)
+ filp->f_pos = op_data->op_hash_offset >> 32;
+ else
+ filp->f_pos = op_data->op_hash_offset;
+ }
+
+ ll_finish_md_op_data(op_data);
filp->f_version = inode->i_version;
#ifdef HAVE_TOUCH_ATIME_1ARG
#ifdef HAVE_F_PATH_MNT