CLI_HASH64 = 1 << 2,
CLI_API32 = 1 << 3,
CLI_MIGRATE = 1 << 4,
- CLI_NEXT_ENTRY = 1 << 5,
};
#endif /*LCLIENT_H */
ENTRY;
if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
- it->it_op == IT_LAYOUT))
+ it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
return;
/* This would normally block until the existing request finishes.
struct lookup_intent *it)
{
if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
- it->it_op == IT_LAYOUT))
+ it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
goto out;
if (lck->rpcl_it == MDC_FAKE_RPCL_IT) { /* OBD_FAIL_MDC_RPCS_SEM */
enum mds_op_bias op_bias;
/* Used by readdir */
- __u64 op_hash_offset;
-
- /* The offset of dir entry among the same hash entries, which
- * is used to resolve the hash conflict. */
- __u32 op_same_hash_offset;
-
- /* Used by readdir */
__u32 op_npages;
/* used to transfer info between the stacks of MD client
int (*m_fsync)(struct obd_export *, const struct lu_fid *,
struct obd_capa *, struct ptlrpc_request **);
- int (*m_read_entry)(struct obd_export *, struct md_op_data *,
- struct md_callback *cb_op, struct lu_dirent **ld,
- struct page **ppage);
+ int (*m_read_page)(struct obd_export *, struct md_op_data *,
+ struct md_callback *cb_op, __u64 hash_offset,
+ struct page **ppage);
int (*m_unlink)(struct obd_export *, struct md_op_data *,
struct ptlrpc_request **);
RETURN(rc);
}
-static inline int md_read_entry(struct obd_export *exp,
- struct md_op_data *op_data,
- struct md_callback *cb_op,
- struct lu_dirent **ld,
- struct page **ppage)
+static inline int md_read_page(struct obd_export *exp,
+ struct md_op_data *op_data,
+ struct md_callback *cb_op,
+ __u64 hash_offset,
+ struct page **ppage)
{
int rc;
ENTRY;
- EXP_CHECK_MD_OP(exp, read_entry);
- EXP_MD_COUNTER_INCREMENT(exp, read_entry);
- rc = MDP(exp->exp_obd, read_entry)(exp, op_data, cb_op, ld, ppage);
+ EXP_CHECK_MD_OP(exp, read_page);
+ EXP_MD_COUNTER_INCREMENT(exp, read_page);
+ rc = MDP(exp->exp_obd, read_page)(exp, op_data, cb_op, hash_offset,
+ ppage);
RETURN(rc);
}
}
ldlm_lock_dump_handle(D_OTHER, &lockh);
- op_data.op_hash_offset = hash_x_index(page->index, 0);
op_data.op_npages = 1;
rc = md_readpage(sbi->ll_md_exp, &op_data, &page, &request);
if (!rc) {
* lmv_adjust_dirpages().
*
*/
-/**
- * The following three APIs will be used by llite to iterate directory
- * entries from MDC dir page caches.
- *
- * ll_dir_entry_start(next) will lookup(return) entry by op_hash_offset.
- * To avoid extra memory allocation, the @entry will be pointed to
- * the dir entries in MDC page directly, so these pages can not be released
- * until the entry has been accessed in ll_readdir(or statahead).
- *
- * The iterate process will be
- *
- * ll_dir_entry_start: locate the page in MDC, and return the first entry.
- * hold the page.
- *
- * ll_dir_entry_next: return the next entry in the current page, if it reaches
- * to the end, release current page.
- *
- * ll_dir_entry_end: release the last page.
- **/
-struct lu_dirent *ll_dir_entry_start(struct inode *dir,
- struct md_op_data *op_data,
- struct page **ppage)
+struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data,
+ __u64 offset, struct ll_dir_chain *chain)
{
- struct lu_dirent *entry = NULL;
- struct md_callback cb_op;
- int rc;
- ENTRY;
+ struct md_callback cb_op;
+ struct page *page;
+ int rc;
- LASSERT(*ppage == NULL);
cb_op.md_blocking_ast = ll_md_blocking_ast;
- op_data->op_cli_flags &= ~CLI_NEXT_ENTRY;
- rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage);
+ rc = md_read_page(ll_i2mdexp(dir), op_data, &cb_op, offset, &page);
if (rc != 0)
- entry = ERR_PTR(rc);
- RETURN(entry);
+ return ERR_PTR(rc);
+
+ return page;
}
-struct lu_dirent *ll_dir_entry_next(struct inode *dir,
- struct md_op_data *op_data,
- struct lu_dirent *ent,
- struct page **ppage)
+void ll_release_page(struct inode *inode, struct page *page,
+ bool remove)
{
- struct lu_dirent *entry = NULL;
- struct md_callback cb_op;
- int rc;
- ENTRY;
-
- op_data->op_hash_offset = le64_to_cpu(ent->lde_hash);
-
- /* release last page */
- LASSERT(*ppage != NULL);
- kunmap(*ppage);
- page_cache_release(*ppage);
-
- cb_op.md_blocking_ast = ll_md_blocking_ast;
- op_data->op_cli_flags |= CLI_NEXT_ENTRY;
- rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage);
- if (rc != 0)
- entry = ERR_PTR(rc);
+ kunmap(page);
+
+ /* Always remove the page for striped dir, because the page is
+ * built from temporarily in LMV layer */
+ if (inode != NULL && S_ISDIR(inode->i_mode) &&
+ ll_i2info(inode)->lli_lsm_md != NULL) {
+ __free_page(page);
+ return;
+ }
- RETURN(entry);
+ if (remove) {
+ lock_page(page);
+ if (likely(page->mapping != NULL))
+ truncate_complete_page(page->mapping, page);
+ unlock_page(page);
+ }
+ page_cache_release(page);
}
#ifdef HAVE_DIR_CONTEXT
-int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data,
struct dir_context *ctx)
{
#else
-int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data,
void *cookie, filldir_t filldir)
{
#endif
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ll_dir_chain chain;
- struct lu_dirent *ent;
- int api32 = ll_need_32bit_api(sbi);
- int hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH;
- int done = 0;
- int rc = 0;
- __u64 hash = MDS_DIR_END_OFF;
- struct page *page = NULL;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ __u64 pos = *ppos;
+ bool is_api32 = ll_need_32bit_api(sbi);
+ bool is_hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH;
+ struct page *page;
+ struct ll_dir_chain chain;
+ bool done = false;
+ int rc = 0;
ENTRY;
- ll_dir_chain_init(&chain);
- for (ent = ll_dir_entry_start(inode, op_data, &page);
- ent != NULL && !IS_ERR(ent) && !done;
- ent = ll_dir_entry_next(inode, op_data, ent, &page)) {
- __u16 type;
- int namelen;
- struct lu_fid fid;
- __u64 lhash;
- __u64 ino;
-
- hash = le64_to_cpu(ent->lde_hash);
- if (hash < op_data->op_hash_offset)
- /*
- * Skip until we find target hash
- * value.
- */
- continue;
- namelen = le16_to_cpu(ent->lde_namelen);
- if (namelen == 0)
- /*
- * Skip dummy record.
- */
- continue;
+ ll_dir_chain_init(&chain);
- if (api32 && hash64)
- lhash = hash >> 32;
- else
- lhash = hash;
- fid_le_to_cpu(&fid, &ent->lde_fid);
- ino = cl_fid_build_ino(&fid, api32);
- type = ll_dirent_type_get(ent);
+ page = ll_get_dir_page(inode, op_data, pos, &chain);
-#ifdef HAVE_DIR_CONTEXT
- /* For 'll_nfs_get_name_filldir()', it will try
- * to access the 'ent' through its 'lde_name',
- * so the parameter 'name' for 'filldir()' must
- * be part of the 'ent'. */
- done = !dir_emit(ctx, ent->lde_name, namelen, ino, type);
-#else
- done = filldir(cookie, ent->lde_name, namelen, lhash,
- ino, type);
-#endif
- if (done) {
- if (op_data->op_hash_offset != MDS_DIR_END_OFF)
- op_data->op_hash_offset = hash;
+ while (rc == 0 && !done) {
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+ __u64 hash;
+ __u64 next;
+
+ if (IS_ERR(page)) {
+ rc = PTR_ERR(page);
break;
}
- }
- if (IS_ERR(ent))
- rc = PTR_ERR(ent);
- else if (ent == NULL)
- op_data->op_hash_offset = MDS_DIR_END_OFF;
+ hash = MDS_DIR_END_OFF;
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL && !done;
+ ent = lu_dirent_next(ent)) {
+ __u16 type;
+ int namelen;
+ struct lu_fid fid;
+ __u64 lhash;
+ __u64 ino;
+
+ hash = le64_to_cpu(ent->lde_hash);
+ if (hash < pos)
+ /*
+ * Skip until we find target hash
+ * value.
+ */
+ continue;
+
+ namelen = le16_to_cpu(ent->lde_namelen);
+ if (namelen == 0)
+ /*
+ * Skip dummy record.
+ */
+ continue;
+
+ if (is_api32 && is_hash64)
+ lhash = hash >> 32;
+ else
+ lhash = hash;
+ fid_le_to_cpu(&fid, &ent->lde_fid);
+ ino = cl_fid_build_ino(&fid, is_api32);
+ type = ll_dirent_type_get(ent);
+ /* For 'll_nfs_get_name_filldir()', it will try
+ * to access the 'ent' through its 'lde_name',
+ * so the parameter 'name' for 'filldir()' must
+ * be part of the 'ent'. */
+ done = filldir(cookie, ent->lde_name, namelen, lhash,
+ ino, type);
+ }
- if (page != NULL) {
- kunmap(page);
- page_cache_release(page);
+ if (done) {
+ pos = hash;
+ ll_release_page(inode, page, false);
+ break;
+ }
+
+ next = le64_to_cpu(dp->ldp_hash_end);
+ pos = next;
+ if (pos == MDS_DIR_END_OFF) {
+ /*
+ * End of directory reached.
+ */
+ done = 1;
+ ll_release_page(inode, page, false);
+ } else {
+ /*
+ * Normal case: continue to the next
+ * page.
+ */
+ ll_release_page(inode, page,
+ le32_to_cpu(dp->ldp_flags) &
+ LDF_COLLIDE);
+ next = pos;
+ page = ll_get_dir_page(inode, op_data, pos,
+ &chain);
+ }
}
+ *ppos = pos;
ll_dir_chain_fini(&chain);
RETURN(rc);
}
* object */
if (fid_is_zero(&op_data->op_fid3)) {
rc = ll_dir_get_parent_fid(inode, &op_data->op_fid3);
- if (rc != 0)
+ if (rc != 0) {
+ ll_finish_md_op_data(op_data);
RETURN(rc);
+ }
}
}
- op_data->op_hash_offset = pos;
op_data->op_max_pages = sbi->ll_md_brw_pages;
#ifdef HAVE_DIR_CONTEXT
ctx->pos = pos;
- rc = ll_dir_read(inode, op_data, ctx);
+ rc = ll_dir_read(inode, &pos, op_data, ctx);
pos = ctx->pos;
#else
- rc = ll_dir_read(inode, op_data, cookie, filldir);
+ rc = ll_dir_read(inode, &pos, op_data, cookie, filldir);
#endif
if (lfd != NULL)
- lfd->lfd_pos = op_data->op_hash_offset;
+ lfd->lfd_pos = pos;
if (pos == MDS_DIR_END_OFF) {
if (api32)
pos = LL_DIR_END_OFF;
} else {
if (api32 && hash64)
- pos = op_data->op_hash_offset >> 32;
- else
- pos = op_data->op_hash_offset;
+ pos = pos >> 32;
}
#ifdef HAVE_DIR_CONTEXT
ctx->pos = pos;
extern const struct file_operations ll_dir_operations;
extern const struct inode_operations ll_dir_inode_operations;
#ifdef HAVE_DIR_CONTEXT
-int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+int ll_dir_read(struct inode *inode, __u64 *pos, struct md_op_data *op_data,
struct dir_context *ctx);
#else
-int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
+int ll_dir_read(struct inode *inode, __u64 *pos, struct md_op_data *op_data,
void *cookie, filldir_t filldir);
#endif
int ll_get_mdt_idx(struct inode *inode);
int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid);
+struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data,
+ __u64 offset, struct ll_dir_chain *chain);
+void ll_release_page(struct inode *inode, struct page *page, bool remove);
-struct lu_dirent *ll_dir_entry_start(struct inode *dir,
- struct md_op_data *op_data,
- struct page **ppage);
-
-struct lu_dirent *ll_dir_entry_next(struct inode *dir,
- struct md_op_data *op_data,
- struct lu_dirent *ent,
- struct page **ppage);
/* llite/namei.c */
extern const struct inode_operations ll_special_inode_operations;
unsigned int sai_skip_hidden;/* skipped hidden dentry count */
unsigned int sai_ls_all:1, /* "ls -al", do stat-ahead for
* hidden entries */
- sai_agl_valid:1;/* AGL is valid for the dir */
+ sai_agl_valid:1,/* AGL is valid for the dir */
+ sai_in_readpage:1;/* statahead is in readdir()*/
wait_queue_head_t sai_waitq; /* stat-ahead wait queue */
struct ptlrpc_thread sai_thread; /* stat-ahead thread */
struct ptlrpc_thread sai_agl_thread; /* AGL thread */
.lgd_found = 0,
};
struct md_op_data *op_data;
+ __u64 pos = 0;
int rc;
ENTRY;
if (IS_ERR(op_data))
GOTO(out, rc = PTR_ERR(op_data));
- op_data->op_hash_offset = 0;
op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
mutex_lock(&dir->i_mutex);
#ifdef HAVE_DIR_CONTEXT
- rc = ll_dir_read(dir, op_data, &lgd.ctx);
+ rc = ll_dir_read(dir, &pos, op_data, &lgd.ctx);
#else
- rc = ll_dir_read(dir, op_data, &lgd, ll_nfs_get_name_filldir);
+ rc = ll_dir_read(dir, &pos, op_data, &lgd, ll_nfs_get_name_filldir);
#endif
mutex_unlock(&dir->i_mutex);
ll_finish_md_op_data(op_data);
struct md_op_data *op_data;
struct ll_dir_chain chain;
struct l_wait_info lwi = { 0 };
- struct lu_dirent *ent;
struct page *page = NULL;
+ __u64 pos = 0;
ENTRY;
thread->t_pid = current_pid();
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
- op_data->op_hash_offset = 0;
op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
wake_up(&thread->t_ctl_waitq);
ll_dir_chain_init(&chain);
- for (ent = ll_dir_entry_start(dir, op_data, &page);
- ent != NULL && !IS_ERR(ent);
- ent = ll_dir_entry_next(dir, op_data, ent, &page)) {
- __u64 hash;
- int namelen;
- char *name;
-
- hash = le64_to_cpu(ent->lde_hash);
- if (unlikely(hash < op_data->op_hash_offset))
- /*
- * Skip until we find target hash value.
- */
- continue;
+ page = ll_get_dir_page(dir, op_data, pos, &chain);
+ while (1) {
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+
+ if (IS_ERR(page)) {
+ rc = PTR_ERR(page);
+ CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
+ "/"LPU64" opendir_pid = %u: rc = %d\n",
+ PFID(ll_inode2fid(dir)), pos, sai->sai_index,
+ plli->lli_opendir_pid, rc);
+ GOTO(out, rc);
+ }
- namelen = le16_to_cpu(ent->lde_namelen);
- if (unlikely(namelen == 0))
- /*
- * Skip dummy record.
- */
- continue;
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ __u64 hash;
+ int namelen;
+ char *name;
- name = ent->lde_name;
- if (name[0] == '.') {
- if (namelen == 1) {
- /*
- * skip "."
- */
- continue;
- } else if (name[1] == '.' && namelen == 2) {
+ hash = le64_to_cpu(ent->lde_hash);
+ if (unlikely(hash < pos))
/*
- * skip ".."
+ * Skip until we find target hash value.
*/
continue;
- } else if (!sai->sai_ls_all) {
+
+ namelen = le16_to_cpu(ent->lde_namelen);
+ if (unlikely(namelen == 0))
/*
- * skip hidden files.
+ * Skip dummy record.
*/
- sai->sai_skip_hidden++;
continue;
+
+ name = ent->lde_name;
+ if (name[0] == '.') {
+ if (namelen == 1) {
+ /*
+ * skip "."
+ */
+ continue;
+ } else if (name[1] == '.' && namelen == 2) {
+ /*
+ * skip ".."
+ */
+ continue;
+ } else if (!sai->sai_ls_all) {
+ /*
+ * skip hidden files.
+ */
+ sai->sai_skip_hidden++;
+ continue;
+ }
}
- }
- /*
- * don't stat-ahead first entry.
- */
- if (unlikely(++first == 1))
- continue;
+ /*
+ * don't stat-ahead first entry.
+ */
+ if (unlikely(++first == 1))
+ continue;
keep_it:
- l_wait_event(thread->t_ctl_waitq,
- !sa_sent_full(sai) ||
- !sa_received_empty(sai) ||
- !agl_list_empty(sai) ||
- !thread_is_running(thread),
- &lwi);
+ l_wait_event(thread->t_ctl_waitq,
+ !sa_sent_full(sai) ||
+ !sa_received_empty(sai) ||
+ !agl_list_empty(sai) ||
+ !thread_is_running(thread),
+ &lwi);
interpret_it:
- while (!sa_received_empty(sai))
- ll_post_statahead(sai);
+ while (!sa_received_empty(sai))
+ ll_post_statahead(sai);
- if (unlikely(!thread_is_running(thread)))
- GOTO(out, rc = 0);
+ if (unlikely(!thread_is_running(thread))) {
+ ll_release_page(dir, page, false);
+ GOTO(out, rc = 0);
+ }
- /* If no window for metadata statahead, but there are
- * some AGL entries to be triggered, then try to help
- * to process the AGL entries. */
- if (sa_sent_full(sai)) {
- spin_lock(&plli->lli_agl_lock);
- while (!agl_list_empty(sai)) {
- clli = agl_first_entry(sai);
- list_del_init(&clli->lli_agl_list);
+ /* If no window for metadata statahead, but there are
+ * some AGL entries to be triggered, then try to help
+ * to process the AGL entries. */
+ if (sa_sent_full(sai)) {
+ spin_lock(&plli->lli_agl_lock);
+ while (!agl_list_empty(sai)) {
+ clli = agl_first_entry(sai);
+ list_del_init(&clli->lli_agl_list);
+ spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode,
+ sai);
+
+ if (!sa_received_empty(sai))
+ goto interpret_it;
+
+ if (unlikely(
+ !thread_is_running(thread))) {
+ ll_release_page(dir, page,
+ false);
+ GOTO(out, rc = 0);
+ }
+
+ if (!sa_sent_full(sai))
+ goto do_it;
+
+ spin_lock(&plli->lli_agl_lock);
+ }
spin_unlock(&plli->lli_agl_lock);
- ll_agl_trigger(&clli->lli_vfs_inode,
- sai);
- if (!sa_received_empty(sai))
- goto interpret_it;
+ goto keep_it;
+ }
+do_it:
+ ll_statahead_one(parent, name, namelen);
+ }
- if (unlikely(
- !thread_is_running(thread)))
+ pos = le64_to_cpu(dp->ldp_hash_end);
+ if (pos == MDS_DIR_END_OFF) {
+ /*
+ * End of directory reached.
+ */
+ ll_release_page(dir, page, false);
+ while (1) {
+ l_wait_event(thread->t_ctl_waitq,
+ !sa_received_empty(sai) ||
+ sai->sai_sent == sai->sai_replied ||
+ !thread_is_running(thread),
+ &lwi);
+
+ while (!sa_received_empty(sai))
+ ll_post_statahead(sai);
+
+ if (unlikely(!thread_is_running(thread)))
GOTO(out, rc = 0);
- if (!sa_sent_full(sai))
- goto do_it;
+ if (sai->sai_sent == sai->sai_replied &&
+ sa_received_empty(sai))
+ break;
+ }
+ spin_lock(&plli->lli_agl_lock);
+ while (!agl_list_empty(sai) &&
+ thread_is_running(thread)) {
+ clli = agl_first_entry(sai);
+ list_del_init(&clli->lli_agl_list);
+ spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode, sai);
spin_lock(&plli->lli_agl_lock);
}
spin_unlock(&plli->lli_agl_lock);
- goto keep_it;
- }
-
-do_it:
- ll_statahead_one(parent, name, namelen);
- }
-
- if (page != NULL) {
- kunmap(page);
- page_cache_release(page);
- }
-
- /*
- * End of directory reached.
- */
- while (1) {
- l_wait_event(thread->t_ctl_waitq,
- !sa_received_empty(sai) ||
- sai->sai_sent == sai->sai_replied ||
- !thread_is_running(thread),
- &lwi);
-
- while (!sa_received_empty(sai))
- ll_post_statahead(sai);
-
- if (unlikely(!thread_is_running(thread)))
GOTO(out, rc = 0);
-
- if (sai->sai_sent == sai->sai_replied &&
- sa_received_empty(sai))
- break;
- }
-
- spin_lock(&plli->lli_agl_lock);
- while (!agl_list_empty(sai) &&
- thread_is_running(thread)) {
- clli = agl_first_entry(sai);
- list_del_init(&clli->lli_agl_list);
- spin_unlock(&plli->lli_agl_lock);
- ll_agl_trigger(&clli->lli_vfs_inode, sai);
- spin_lock(&plli->lli_agl_lock);
+ } else {
+ /*
+ * chain is exhausted.
+ * Normal case: continue to the next page.
+ */
+ ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
+ LDF_COLLIDE);
+ sai->sai_in_readpage = 1;
+ page = ll_get_dir_page(dir, op_data, pos, &chain);
+ sai->sai_in_readpage = 0;
+ }
}
- spin_unlock(&plli->lli_agl_lock);
out:
EXIT;
ll_finish_md_op_data(op_data);
struct qstr *target = &dentry->d_name;
struct md_op_data *op_data;
int dot_de;
- struct lu_dirent *ent;
struct page *page = NULL;
int rc = LS_NONE_FIRST_DE;
+ __u64 pos = 0;
ENTRY;
- ll_dir_chain_init(&chain);
-
op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
LUSTRE_OPC_ANY, dir);
if (IS_ERR(op_data))
- GOTO(out, rc = PTR_ERR(op_data));
+ RETURN(PTR_ERR(op_data));
/**
*FIXME choose the start offset of the readdir
*/
op_data->op_stripe_offset = 0;
- op_data->op_hash_offset = 0;
op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
- for (ent = ll_dir_entry_start(dir, op_data, &page);
- ent != NULL && !IS_ERR(ent);
- ent = ll_dir_entry_next(dir, op_data, ent, &page)) {
- __u64 hash;
- int namelen;
- char *name;
-
- hash = le64_to_cpu(ent->lde_hash);
- /* The ll_get_dir_page() can return any page containing
- * the given hash which may be not the start hash. */
- if (unlikely(hash < op_data->op_hash_offset))
- continue;
-
- namelen = le16_to_cpu(ent->lde_namelen);
- if (unlikely(namelen == 0))
- /*
- * skip dummy record.
- */
- continue;
+ ll_dir_chain_init(&chain);
+ page = ll_get_dir_page(dir, op_data, 0, &chain);
- name = ent->lde_name;
- if (name[0] == '.') {
- if (namelen == 1)
- /*
- * skip "."
- */
+ while (1) {
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+
+ if (IS_ERR(page)) {
+ struct ll_inode_info *lli = ll_i2info(dir);
+
+ rc = PTR_ERR(page);
+ CERROR("%s: reading dir "DFID" at "LPU64
+ "opendir_pid = %u : rc = %d\n",
+ ll_get_fsname(dir->i_sb, NULL, 0),
+ PFID(ll_inode2fid(dir)), pos,
+ lli->lli_opendir_pid, rc);
+ break;
+ }
+
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ __u64 hash;
+ int namelen;
+ char *name;
+
+ hash = le64_to_cpu(ent->lde_hash);
+ /* The ll_get_dir_page() can return any page containing
+ * the given hash which may be not the start hash. */
+ if (unlikely(hash < pos))
continue;
- else if (name[1] == '.' && namelen == 2)
+
+ namelen = le16_to_cpu(ent->lde_namelen);
+ if (unlikely(namelen == 0))
/*
- * skip ".."
+ * skip dummy record.
*/
continue;
- else
- dot_de = 1;
- } else {
- dot_de = 0;
- }
- if (dot_de && target->name[0] != '.') {
- CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
- target->len, target->name,
- namelen, name);
- continue;
- }
+ name = ent->lde_name;
+ if (name[0] == '.') {
+ if (namelen == 1)
+ /*
+ * skip "."
+ */
+ continue;
+ else if (name[1] == '.' && namelen == 2)
+ /*
+ * skip ".."
+ */
+ continue;
+ else
+ dot_de = 1;
+ } else {
+ dot_de = 0;
+ }
- if (target->len != namelen ||
- memcmp(target->name, name, namelen) != 0)
- rc = LS_NONE_FIRST_DE;
- else if (!dot_de)
- rc = LS_FIRST_DE;
- else
- rc = LS_FIRST_DOT_DE;
+ if (dot_de && target->name[0] != '.') {
+ CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
+ target->len, target->name,
+ namelen, name);
+ continue;
+ }
- break;
- }
- EXIT;
+ if (target->len != namelen ||
+ memcmp(target->name, name, namelen) != 0)
+ rc = LS_NONE_FIRST_DE;
+ else if (!dot_de)
+ rc = LS_FIRST_DE;
+ else
+ rc = LS_FIRST_DOT_DE;
- if (page != NULL) {
- kunmap(page);
- page_cache_release(page);
+ ll_release_page(dir, page, false);
+ GOTO(out, rc);
+ }
+ pos = le64_to_cpu(dp->ldp_hash_end);
+ if (pos == MDS_DIR_END_OFF) {
+ /*
+ * End of directory reached.
+ */
+ ll_release_page(dir, page, false);
+ GOTO(out, rc);
+ } else {
+ /*
+ * chain is exhausted
+ * Normal case: continue to the next page.
+ */
+ ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
+ LDF_COLLIDE);
+ page = ll_get_dir_page(dir, op_data, pos, &chain);
+ }
}
- ll_finish_md_op_data(op_data);
+ EXIT;
out:
ll_dir_chain_fini(&chain);
+ ll_finish_md_op_data(op_data);
return rc;
}
RETURN(entry ? 1 : -EAGAIN);
}
+ /* if statahead is busy in readdir, help it do post-work */
+ while (!ll_sa_entry_stated(entry) &&
+ sai->sai_in_readpage && !sa_received_empty(sai))
+ ll_post_statahead(sai);
+
if (!ll_sa_entry_stated(entry)) {
sai->sai_index_wait = entry->se_index;
lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
/**
- * This function will read entry from a striped directory, bascially, it will
- * read entries from all of stripes, and choose one closest to the required
- * offset(&op_data->op_hash_offset). A few notes
- * 1. skip . and .. for non-zero stripes, because there can only have one .
- * and .. in a directory.
- * 2. op_data will be shared by all of stripes, instead of allocating new
- * one, so need to restore before reusing.
- * 3. release the entry page if that is not being chosen.
+ * Get current minimum entry from striped directory
*
- * param[in]exp obd export refer to LMV
- * param[in]op_data hold those MD parameters of read_entry.
- * param[in]cb_op ldlm callback being used in enqueue in mdc_read_entry
- * param[out]ldp the entry being read.
- * param[out]ppage the page holding the entry, note: because the entry
- * will be accessed in upper layer, so we need hold the
- * page until the usages of entry is finished, see
- * ll_dir_entry_next.
+ * This function will search the dir entry, whose hash value is the
+ * closest(>=) to @hash_offset, from all of sub-stripes, and it is
+ * only being called for striped directory.
*
- * retval =0 if get entry successfully
- * <0 can not get entry.
+ * \param[in] exp export of LMV
+ * \param[in] op_data parameters transferred beween client MD stack
+ * stripe_information will be included in this
+ * parameter
+ * \param[in] cb_op ldlm callback being used in enqueue in
+ * mdc_read_page
+ * \param[in] hash_offset the hash value, which is used to locate
+ * minum(closet) dir entry
+ * \param[in|out] stripe_offset the caller use this to indicate the stripe
+ * index of last entry, so to avoid hash conflict
+ * between stripes. It will also be used to
+ * return the stripe index of current dir entry.
+ * \param[in|out] entp the minum entry and it also is being used
+ * to input the last dir entry to resolve the
+ * hash conflict
+ *
+ * \param[out] ppage the page which holds the minum entry
+ *
+ * \retval = 0 get the entry successfully
+ * negative errno (< 0) does not get the entry
*/
-#define NORMAL_MAX_STRIPES 4
-static int lmv_read_striped_entry(struct obd_export *exp,
- struct md_op_data *op_data,
- struct md_callback *cb_op,
- struct lu_dirent **ldp,
- struct page **ppage)
+static int lmv_get_min_striped_entry(struct obd_export *exp,
+ struct md_op_data *op_data,
+ struct md_callback *cb_op,
+ __u64 hash_offset, int *stripe_offset,
+ struct lu_dirent **entp,
+ struct page **ppage)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_stripe_md *lsm = op_data->op_mea1;
struct lmv_tgt_desc *tgt;
- struct lu_dirent *tmp_ents[NORMAL_MAX_STRIPES];
- struct lu_dirent **ents = NULL;
- struct lu_fid master_fid = op_data->op_fid1;
- void *master_data = op_data->op_data;
- __u64 last_idx = op_data->op_stripe_offset;
- __u64 hash_offset = op_data->op_hash_offset;
- __u32 same_hash_offset = op_data->op_same_hash_offset;
- __u32 cli_flags = op_data->op_cli_flags;
int stripe_count;
- __u64 min_hash;
- int min_same_hash_offset = 0;
- int min_idx = 0;
+ struct lu_dirent *min_ent = NULL;
struct page *min_page = NULL;
+ int min_idx = 0;
int i;
- int rc;
+ int rc = 0;
ENTRY;
- LASSERT(lsm != NULL);
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- /* . and .. will be stored on the master object, so we need iterate
- * the master object as well */
stripe_count = lsm->lsm_md_stripe_count;
- if (stripe_count > NORMAL_MAX_STRIPES) {
- OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count);
- if (ents == NULL)
- GOTO(out, rc = -ENOMEM);
- } else {
- ents = tmp_ents;
- memset(ents, 0, sizeof(ents[0]) * stripe_count);
- }
-
- min_hash = MDS_DIR_END_OFF;
for (i = 0; i < stripe_count; i++) {
- struct page *page = NULL;
+ struct lu_dirent *ent = NULL;
+ struct page *page = NULL;
+ struct lu_dirpage *dp;
+ __u64 stripe_hash = hash_offset;
tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
if (IS_ERR(tgt))
GOTO(out, rc = PTR_ERR(tgt));
- if (last_idx != i)
- op_data->op_same_hash_offset = 0;
- else
- op_data->op_same_hash_offset = same_hash_offset;
-
/* op_data will be shared by each stripe, so we need
* reset these value for each stripe */
op_data->op_stripe_offset = i;
- op_data->op_hash_offset = hash_offset;
- op_data->op_cli_flags = cli_flags;
op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
-
next:
- rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i],
- &page);
+ rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash,
+ &page);
if (rc != 0)
GOTO(out, rc);
- if (ents[i] != NULL &&
- (strncmp(ents[i]->lde_name, ".",
- le16_to_cpu(ents[i]->lde_namelen)) == 0 ||
- strncmp(ents[i]->lde_name, "..",
- le16_to_cpu(ents[i]->lde_namelen)) == 0)) {
- if (i == 0) {
- /* replace . with master FID */
- if (le16_to_cpu(ents[i]->lde_namelen) == 1)
- fid_cpu_to_le(&ents[i]->lde_fid,
- &master_fid);
- else
- fid_cpu_to_le(&ents[i]->lde_fid,
- &op_data->op_fid3);
- } else {
- /* skip . and .. for other stripes */
- op_data->op_cli_flags |= CLI_NEXT_ENTRY;
- op_data->op_hash_offset =
- le64_to_cpu(ents[i]->lde_hash);
- kunmap(page);
- page_cache_release(page);
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ /* Skip dummy entry */
+ if (le16_to_cpu(ent->lde_namelen) == 0)
+ continue;
+
+ if (le64_to_cpu(ent->lde_hash) < hash_offset)
+ continue;
+
+ if (le64_to_cpu(ent->lde_hash) == hash_offset &&
+ (*entp == ent || i < *stripe_offset))
+ continue;
+
+ /* skip . and .. for other stripes */
+ if (i != 0 &&
+ (strncmp(ent->lde_name, ".",
+ le16_to_cpu(ent->lde_namelen)) == 0 ||
+ strncmp(ent->lde_name, "..",
+ le16_to_cpu(ent->lde_namelen)) == 0))
+ continue;
+ break;
+ }
+
+ if (ent == NULL) {
+ stripe_hash = le64_to_cpu(dp->ldp_hash_end);
+
+ kunmap(page);
+ page_cache_release(page);
+ page = NULL;
+
+ /* reach the end of current stripe, go to next stripe */
+ if (stripe_hash == MDS_DIR_END_OFF)
+ continue;
+ else
goto next;
- }
}
- if (ents[i] != NULL) {
- /* If the hash value of read_entry is equal to the
- * current min_hash, which is very rare and only
- * happens if two entries have the same hash value
- * but on different stripes, in this case, we need
- * make sure these entries are being reading forward,
- * not backward, i.e. only reset the min_entry, if
- * current stripe is ahead of last entry. Note: if
- * there are hash conflict inside the entry, MDC
- * (see mdc_read_entry) will resolve them. */
- if (le64_to_cpu(ents[i]->lde_hash) < min_hash ||
- (le64_to_cpu(ents[i]->lde_hash) == min_hash &&
- i >= last_idx)) {
- if (min_page != NULL) {
- kunmap(min_page);
- page_cache_release(min_page);
- }
- min_page = page;
- min_hash = le64_to_cpu(ents[i]->lde_hash);
- min_same_hash_offset =
- op_data->op_same_hash_offset;
+ if (min_ent != NULL) {
+ if (le64_to_cpu(min_ent->lde_hash) >
+ le64_to_cpu(ent->lde_hash)) {
+ min_ent = ent;
+ kunmap(min_page);
+ page_cache_release(min_page);
min_idx = i;
+ min_page = page;
} else {
kunmap(page);
page_cache_release(page);
+ page = NULL;
}
+ } else {
+ min_ent = ent;
+ min_page = page;
+ min_idx = i;
}
}
- if (min_hash != MDS_DIR_END_OFF) {
- *ldp = ents[min_idx];
- op_data->op_stripe_offset = min_idx;
- op_data->op_same_hash_offset = min_same_hash_offset;
- *ppage = min_page;
- } else {
- *ldp = NULL;
- *ppage = NULL;
+out:
+ if (*ppage != NULL) {
+ kunmap(*ppage);
+ page_cache_release(*ppage);
}
+ *stripe_offset = min_idx;
+ *entp = min_ent;
+ *ppage = min_page;
+ RETURN(rc);
+}
+
+/**
+ * Build dir entry page from a striped directory
+ *
+ * This function gets one entry by @offset from a striped directory. It will
+ * read entries from all of stripes, and choose one closest to the required
+ * offset(&offset). A few notes
+ * 1. skip . and .. for non-zero stripes, because there can only have one .
+ * and .. in a directory.
+ * 2. op_data will be shared by all of stripes, instead of allocating new
+ * one, so need to restore before reusing.
+ * 3. release the entry page if that is not being chosen.
+ *
+ * \param[in] exp obd export refer to LMV
+ * \param[in] op_data hold those MD parameters of read_entry
+ * \param[in] cb_op ldlm callback being used in enqueue in mdc_read_entry
+ * \param[out] ldp the entry being read
+ * \param[out] ppage the page holding the entry. Note: because the entry
+ * will be accessed in upper layer, so we need hold the
+ * page until the usages of entry is finished, see
+ * ll_dir_entry_next.
+ *
+ * retval =0 if get entry successfully
+ * <0 cannot get entry
+ */
+static int lmv_read_striped_page(struct obd_export *exp,
+ struct md_op_data *op_data,
+ struct md_callback *cb_op,
+ __u64 offset, struct page **ppage)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lu_fid master_fid = op_data->op_fid1;
+ struct inode *master_inode = op_data->op_data;
+ __u64 hash_offset = offset;
+ struct lu_dirpage *dp;
+ struct page *min_ent_page = NULL;
+ struct page *ent_page = NULL;
+ struct lu_dirent *ent;
+ void *area;
+ int ent_idx = 0;
+ struct lu_dirent *min_ent = NULL;
+ struct lu_dirent *last_ent;
+ int left_bytes;
+ int rc;
+ ENTRY;
+
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
+
+ /* Allocate a page and read entries from all of stripes and fill
+ * the page by hash order */
+ ent_page = alloc_page(GFP_KERNEL);
+ if (ent_page == NULL)
+ RETURN(-ENOMEM);
+
+ /* Initialize the entry page */
+ dp = kmap(ent_page);
+ memset(dp, 0, sizeof(*dp));
+ dp->ldp_hash_start = cpu_to_le64(offset);
+ dp->ldp_flags |= LDF_COLLIDE;
+
+ area = dp + 1;
+ left_bytes = PAGE_CACHE_SIZE - sizeof(*dp);
+ ent = area;
+ last_ent = ent;
+ do {
+ __u16 ent_size;
+
+ /* Find the minum entry from all sub-stripes */
+ rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset,
+ &ent_idx, &min_ent,
+ &min_ent_page);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ /* If it can not get minum entry, it means it already reaches
+ * the end of this directory */
+ if (min_ent == NULL) {
+ last_ent->lde_reclen = 0;
+ hash_offset = MDS_DIR_END_OFF;
+ GOTO(out, rc);
+ }
+
+ ent_size = le16_to_cpu(min_ent->lde_reclen);
+
+ /* the last entry lde_reclen is 0, but it might not
+ * the end of this entry of this temporay entry */
+ if (ent_size == 0)
+ ent_size = lu_dirent_calc_size(
+ le16_to_cpu(min_ent->lde_namelen),
+ le32_to_cpu(min_ent->lde_attrs));
+ if (ent_size > left_bytes) {
+ last_ent->lde_reclen = cpu_to_le16(0);
+ hash_offset = le64_to_cpu(min_ent->lde_hash);
+ GOTO(out, rc);
+ }
+
+ memcpy(ent, min_ent, ent_size);
+
+ /* Replace . with master FID and Replace .. with the parent FID
+ * of master object */
+ if (strncmp(ent->lde_name, ".",
+ le16_to_cpu(ent->lde_namelen)) == 0 &&
+ le16_to_cpu(ent->lde_namelen) == 1)
+ fid_cpu_to_le(&ent->lde_fid, &master_fid);
+ else if (strncmp(ent->lde_name, "..",
+ le16_to_cpu(ent->lde_namelen)) == 0 &&
+ le16_to_cpu(ent->lde_namelen) == 2)
+ fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
+
+ left_bytes -= ent_size;
+ ent->lde_reclen = cpu_to_le16(ent_size);
+ last_ent = ent;
+ ent = (void *)ent + ent_size;
+ hash_offset = le64_to_cpu(min_ent->lde_hash);
+ if (hash_offset == MDS_DIR_END_OFF) {
+ last_ent->lde_reclen = 0;
+ break;
+ }
+ } while (1);
out:
+ if (min_ent_page != NULL) {
+ kunmap(min_ent_page);
+ page_cache_release(min_ent_page);
+ }
+
+ if (unlikely(rc != 0)) {
+ __free_page(ent_page);
+ ent_page = NULL;
+ } else {
+ if (ent == area)
+ dp->ldp_flags |= LDF_EMPTY;
+ dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+ dp->ldp_hash_end = cpu_to_le64(hash_offset);
+ }
+
/* We do not want to allocate md_op_data during each
* dir entry reading, so op_data will be shared by every stripe,
* then we need to restore it back to original value before
* return to the upper layer */
- op_data->op_hash_offset = hash_offset;
op_data->op_fid1 = master_fid;
op_data->op_fid2 = master_fid;
- op_data->op_data = master_data;
- op_data->op_cli_flags = cli_flags;
- if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL)
- OBD_FREE(ents, sizeof(ents[0]) * stripe_count);
+ op_data->op_data = master_inode;
- if (rc != 0 && min_page != NULL) {
- kunmap(min_page);
- page_cache_release(min_page);
- }
+ *ppage = ent_page;
RETURN(rc);
}
-int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op, struct lu_dirent **ldp,
- struct page **ppage)
+int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
+ struct md_callback *cb_op, __u64 offset,
+ struct page **ppage)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
RETURN(rc);
if (unlikely(lsm != NULL)) {
- rc = lmv_read_striped_entry(exp, op_data, cb_op,
- ldp, ppage);
+ rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage);
RETURN(rc);
}
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
- rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, ldp,
- ppage);
+ rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage);
+
RETURN(rc);
}
.m_setattr = lmv_setattr,
.m_setxattr = lmv_setxattr,
.m_fsync = lmv_fsync,
- .m_read_entry = lmv_read_entry,
+ .m_read_page = lmv_read_page,
.m_unlink = lmv_unlink,
.m_init_ea_size = lmv_init_ea_size,
.m_cancel_unused = lmv_cancel_unused,
/*
* upon hash collision, remove this page,
* otherwise put page reference, and
- * ll_get_dir_page() will issue RPC to fetch
- * the page we want.
+ * mdc_read_page_remote() will issue RPC to
+ * fetch the page we want.
*/
kunmap(page);
mdc_release_page(page,
ENTRY;
LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES);
- if (op_data->op_mea1 != NULL) {
- __u32 index = op_data->op_stripe_offset;
-
- inode = op_data->op_mea1->lsm_md_oinfo[index].lmo_root;
- fid = &op_data->op_mea1->lsm_md_oinfo[index].lmo_fid;
- } else {
- inode = op_data->op_data;
- fid = &op_data->op_fid1;
- }
+ inode = op_data->op_data;
+ fid = &op_data->op_fid1;
LASSERT(inode != NULL);
OBD_ALLOC(page_pool, sizeof(page_pool[0]) * max_pages);
/**
* Read dir page from cache first, if it can not find it, read it from
* server and add into the cache.
+ *
+ * \param[in] exp MDC export
+ * \param[in] op_data client MD stack parameters, transfering parameters
+ * between different layers on client MD stack.
+ * \param[in] cb_op callback required for ldlm lock enqueue during
+ * read page
+ * \param[in] hash_offset the hash offset of the page to be read
+ * \param[in] ppage the page to be read
+ *
+ * retval = 0 get the page successfully
+ * errno(<0) get the page failed
*/
static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op, struct page **ppage)
+ struct md_callback *cb_op, __u64 hash_offset,
+ struct page **ppage)
{
struct lookup_intent it = { .it_op = IT_READDIR };
struct page *page;
rc = 0;
mdc_set_lock_data(exp, &it.d.lustre.it_lock_handle, dir, NULL);
- rp_param.rp_off = op_data->op_hash_offset;
+ rp_param.rp_off = hash_offset;
rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64;
page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end,
rp_param.rp_hash64);
if (BITS_PER_LONG == 32 && rp_param.rp_hash64) {
start = le64_to_cpu(dp->ldp_hash_start) >> 32;
end = le64_to_cpu(dp->ldp_hash_end) >> 32;
- rp_param.rp_off = op_data->op_hash_offset >> 32;
+ rp_param.rp_off = hash_offset >> 32;
} else {
start = le64_to_cpu(dp->ldp_hash_start);
end = le64_to_cpu(dp->ldp_hash_end);
- rp_param.rp_off = op_data->op_hash_offset;
+ rp_param.rp_off = hash_offset;
}
if (end == start) {
LASSERT(start == rp_param.rp_off);
#if BITS_PER_LONG == 32
CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with "
"hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start),
- le64_to_cpu(dp->ldp_hash_end), op_data->op_hash_offset);
+ le64_to_cpu(dp->ldp_hash_end), hash_offset);
#endif
/*
goto out_unlock;
}
-/**
- * Read one directory entry from the cache.
- */
-int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op, struct lu_dirent **entp,
- struct page **ppage)
-{
- struct page *page = NULL;
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc = 0;
- __u32 same_hash_count;
- __u64 hash_offset = op_data->op_hash_offset;
- ENTRY;
-
- CDEBUG(D_INFO, DFID " offset = "LPU64", flags %#x\n",
- PFID(&op_data->op_fid1), op_data->op_hash_offset,
- op_data->op_cli_flags);
-
- *ppage = NULL;
- *entp = NULL;
-
- if (op_data->op_hash_offset == MDS_DIR_END_OFF)
- RETURN(0);
-
- rc = mdc_read_page(exp, op_data, cb_op, &page);
- if (rc != 0)
- RETURN(rc);
-
- /* same_hash_count means how many entries with this
- * hash value has been read */
- same_hash_count = op_data->op_same_hash_offset + 1;
- dp = page_address(page);
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent)) {
- /* Skip dummy entry */
- if (le16_to_cpu(ent->lde_namelen) == 0)
- continue;
-
- if (le64_to_cpu(ent->lde_hash) <
- op_data->op_hash_offset)
- continue;
-
- if (unlikely(le64_to_cpu(ent->lde_hash) ==
- op_data->op_hash_offset)) {
- /* If it is not for next entry, which usually from
- * ll_dir_entry_start, return this entry. */
- if (!(op_data->op_cli_flags & CLI_NEXT_ENTRY))
- break;
-
- /* Keep reading until all of entries being read are
- * skipped. */
- if (same_hash_count > 0) {
- same_hash_count--;
- continue;
- }
- }
- break;
- }
-
- /* If it can not find entry in current page, try next page. */
- if (ent == NULL) {
- if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) {
- op_data->op_same_hash_offset = 0;
- mdc_release_page(page,
- le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
- RETURN(0);
- }
-
- op_data->op_hash_offset = le64_to_cpu(dp->ldp_hash_end);
- mdc_release_page(page,
- le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
- rc = mdc_read_page(exp, op_data, cb_op, &page);
- if (rc != 0)
- RETURN(rc);
-
- if (page != NULL) {
- dp = page_address(page);
- ent = lu_dirent_start(dp);
- }
- }
-
- /* If the next hash is the same as the current hash, increase
- * the op_same_hash_offset to resolve the same hash conflict */
- if (ent != NULL && op_data->op_cli_flags & CLI_NEXT_ENTRY) {
- if (unlikely(le64_to_cpu(ent->lde_hash) == hash_offset))
- op_data->op_same_hash_offset++;
- else
- op_data->op_same_hash_offset = 0;
- }
-
- *ppage = page;
- *entp = ent;
- RETURN(rc);
-}
-
#else /* __KERNEL__ */
static struct page
static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op,
+ struct md_callback *cb_op, __u64 hash_offset,
struct page **ppage)
{
struct page *page;
/* No local cache for liblustre, always read entry remotely */
lmo = &op_data->op_mea1->lsm_md_oinfo[op_data->op_stripe_offset];
- page = mdc_read_page_remote(exp, lmo, op_data->op_hash_offset,
+ page = mdc_read_page_remote(exp, lmo, hash_offset,
op_data->op_capa1);
if (IS_ERR(page))
return PTR_ERR(page);
return rc;
}
-int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data,
- struct md_callback *cb_op, struct lu_dirent **entp,
- struct page **ppage)
-{
- struct page *page = NULL;
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc;
- ENTRY;
-
- rc = mdc_read_page(exp, op_data, cb_op, &page);
- if (rc != 0)
- RETURN(rc);
-
- dp = page_address(page);
- if (le64_to_cpu(dp->ldp_hash_end) < op_data->op_hash_offset)
- GOTO(out, *entp = NULL);
-
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent))
- if (le64_to_cpu(ent->lde_hash) >= op_data->op_hash_offset)
- break;
- *entp = ent;
-out:
-
- OBD_PAGE_FREE(page);
- RETURN(rc);
-}
-
#endif
static int mdc_statfs(const struct lu_env *env,
.m_setxattr = mdc_setxattr,
.m_getxattr = mdc_getxattr,
.m_fsync = mdc_fsync,
- .m_read_entry = mdc_read_entry,
+ .m_read_page = mdc_read_page,
.m_unlink = mdc_unlink,
.m_cancel_unused = mdc_cancel_unused,
.m_init_ea_size = mdc_init_ea_size,
LPROCFS_MD_OP_INIT(num_private_stats, stats, rename);
LPROCFS_MD_OP_INIT(num_private_stats, stats, setattr);
LPROCFS_MD_OP_INIT(num_private_stats, stats, fsync);
- LPROCFS_MD_OP_INIT(num_private_stats, stats, read_entry);
+ LPROCFS_MD_OP_INIT(num_private_stats, stats, read_page);
LPROCFS_MD_OP_INIT(num_private_stats, stats, unlink);
LPROCFS_MD_OP_INIT(num_private_stats, stats, setxattr);
LPROCFS_MD_OP_INIT(num_private_stats, stats, getxattr);