From: Wang Di Date: Fri, 6 Jun 2014 08:49:37 +0000 (-0700) Subject: LU-4906 llite: read page from LMV/MDC for readdir X-Git-Tag: 2.6.0-RC1~82 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=dee25c13da7c50c5d3279803bdd315e20863a533 LU-4906 llite: read page from LMV/MDC for readdir This patch will change readdir iteration from read entry to read page. During rm -rf, the statahead thread will enqueue the lock for each entry in mdc_read_entry, in the mean time, MDT will revoke the lock when it deletes each entry, which causes ldlm lock ping pong behavior, and a lot extra RPC. With this patch, llite will read one page from MDC each time, then stat ahead thread will hold the page and do getattr for all entries in this page. Note: it does not need to hold ldlm lock after it get the page, because stat ahead thread can live with -ENOENT or ESTALE, if the entry becomes stale during this time. With this patch, the performance of "rm -rf" will be improved for current master. 1. without this patch on master [root@mds tests]# ./createmany -o /mnt/lustre/test1/f- 10000 total: 10000 creates in 11.98 seconds: 835.02 creates/second [root@mds tests]# ls /mnt/lustre/test1/ | wc 10000 10000 68890 [root@mds tests]# time rm -rf /mnt/lustre/test1 real 1m4.496s user 0m0.014s sys 0m3.832s 2. with this patch on master [root@mds tests]# ./createmany -o /mnt/lustre/test1/f- 10000 total: 10000 creates in 11.72 seconds: 853.50 creates/second [root@mds tests]# time rm -rf /mnt/lustre/test1 real 0m8.151s user 0m0.012s sys 0m2.242s Signed-off-by: wang di Change-Id: I189f0039b5624455b70d1381b43d452cc8766543 Reviewed-on: http://review.whamcloud.com/10622 Reviewed-by: Andreas Dilger Tested-by: Jenkins Tested-by: Maloo Reviewed-by: John L. Hammond Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lclient.h b/lustre/include/lclient.h index 3016c08..62a79d7 100644 --- a/lustre/include/lclient.h +++ b/lustre/include/lclient.h @@ -510,7 +510,6 @@ enum op_cli_flags { CLI_HASH64 = 1 << 2, CLI_API32 = 1 << 3, CLI_MIGRATE = 1 << 4, - CLI_NEXT_ENTRY = 1 << 5, }; #endif /*LCLIENT_H */ diff --git a/lustre/include/lustre_mdc.h b/lustre/include/lustre_mdc.h index 040b975..9067b6a 100644 --- a/lustre/include/lustre_mdc.h +++ b/lustre/include/lustre_mdc.h @@ -107,7 +107,7 @@ static inline void mdc_get_rpc_lock(struct mdc_rpc_lock *lck, ENTRY; if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || - it->it_op == IT_LAYOUT)) + it->it_op == IT_LAYOUT || it->it_op == IT_READDIR)) return; /* This would normally block until the existing request finishes. @@ -145,7 +145,7 @@ static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck, struct lookup_intent *it) { if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || - it->it_op == IT_LAYOUT)) + it->it_op == IT_LAYOUT || it->it_op == IT_READDIR)) goto out; if (lck->rpcl_it == MDC_FAKE_RPCL_IT) { /* OBD_FAIL_MDC_RPCS_SEM */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 1007438..f8255d0 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -857,13 +857,6 @@ struct md_op_data { enum mds_op_bias op_bias; /* Used by readdir */ - __u64 op_hash_offset; - - /* The offset of dir entry among the same hash entries, which - * is used to resolve the hash conflict. */ - __u32 op_same_hash_offset; - - /* Used by readdir */ __u32 op_npages; /* used to transfer info between the stacks of MD client @@ -1099,9 +1092,9 @@ struct md_ops { int (*m_fsync)(struct obd_export *, const struct lu_fid *, struct obd_capa *, struct ptlrpc_request **); - int (*m_read_entry)(struct obd_export *, struct md_op_data *, - struct md_callback *cb_op, struct lu_dirent **ld, - struct page **ppage); + int (*m_read_page)(struct obd_export *, struct md_op_data *, + struct md_callback *cb_op, __u64 hash_offset, + struct page **ppage); int (*m_unlink)(struct obd_export *, struct md_op_data *, struct ptlrpc_request **); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 962f789..331a9e6 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1631,17 +1631,18 @@ static inline int md_fsync(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -static inline int md_read_entry(struct obd_export *exp, - struct md_op_data *op_data, - struct md_callback *cb_op, - struct lu_dirent **ld, - struct page **ppage) +static inline int md_read_page(struct obd_export *exp, + struct md_op_data *op_data, + struct md_callback *cb_op, + __u64 hash_offset, + struct page **ppage) { int rc; ENTRY; - EXP_CHECK_MD_OP(exp, read_entry); - EXP_MD_COUNTER_INCREMENT(exp, read_entry); - rc = MDP(exp->exp_obd, read_entry)(exp, op_data, cb_op, ld, ppage); + EXP_CHECK_MD_OP(exp, read_page); + EXP_MD_COUNTER_INCREMENT(exp, read_page); + rc = MDP(exp->exp_obd, read_page)(exp, op_data, cb_op, hash_offset, + ppage); RETURN(rc); } diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index b14e1a2..2b41e8f 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -98,7 +98,6 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) } ldlm_lock_dump_handle(D_OTHER, &lockh); - op_data.op_hash_offset = hash_x_index(page->index, 0); op_data.op_npages = 1; rc = md_readpage(sbi->ll_md_exp, &op_data, &page, &request); if (!rc) { diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 6ef796a..bfc75ea 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -139,148 +139,146 @@ * lmv_adjust_dirpages(). * */ -/** - * The following three APIs will be used by llite to iterate directory - * entries from MDC dir page caches. - * - * ll_dir_entry_start(next) will lookup(return) entry by op_hash_offset. - * To avoid extra memory allocation, the @entry will be pointed to - * the dir entries in MDC page directly, so these pages can not be released - * until the entry has been accessed in ll_readdir(or statahead). - * - * The iterate process will be - * - * ll_dir_entry_start: locate the page in MDC, and return the first entry. - * hold the page. - * - * ll_dir_entry_next: return the next entry in the current page, if it reaches - * to the end, release current page. - * - * ll_dir_entry_end: release the last page. - **/ -struct lu_dirent *ll_dir_entry_start(struct inode *dir, - struct md_op_data *op_data, - struct page **ppage) +struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data, + __u64 offset, struct ll_dir_chain *chain) { - struct lu_dirent *entry = NULL; - struct md_callback cb_op; - int rc; - ENTRY; + struct md_callback cb_op; + struct page *page; + int rc; - LASSERT(*ppage == NULL); cb_op.md_blocking_ast = ll_md_blocking_ast; - op_data->op_cli_flags &= ~CLI_NEXT_ENTRY; - rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage); + rc = md_read_page(ll_i2mdexp(dir), op_data, &cb_op, offset, &page); if (rc != 0) - entry = ERR_PTR(rc); - RETURN(entry); + return ERR_PTR(rc); + + return page; } -struct lu_dirent *ll_dir_entry_next(struct inode *dir, - struct md_op_data *op_data, - struct lu_dirent *ent, - struct page **ppage) +void ll_release_page(struct inode *inode, struct page *page, + bool remove) { - struct lu_dirent *entry = NULL; - struct md_callback cb_op; - int rc; - ENTRY; - - op_data->op_hash_offset = le64_to_cpu(ent->lde_hash); - - /* release last page */ - LASSERT(*ppage != NULL); - kunmap(*ppage); - page_cache_release(*ppage); - - cb_op.md_blocking_ast = ll_md_blocking_ast; - op_data->op_cli_flags |= CLI_NEXT_ENTRY; - rc = md_read_entry(ll_i2mdexp(dir), op_data, &cb_op, &entry, ppage); - if (rc != 0) - entry = ERR_PTR(rc); + kunmap(page); + + /* Always remove the page for striped dir, because the page is + * built from temporarily in LMV layer */ + if (inode != NULL && S_ISDIR(inode->i_mode) && + ll_i2info(inode)->lli_lsm_md != NULL) { + __free_page(page); + return; + } - RETURN(entry); + if (remove) { + lock_page(page); + if (likely(page->mapping != NULL)) + truncate_complete_page(page->mapping, page); + unlock_page(page); + } + page_cache_release(page); } #ifdef HAVE_DIR_CONTEXT -int ll_dir_read(struct inode *inode, struct md_op_data *op_data, +int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data, struct dir_context *ctx) { #else -int ll_dir_read(struct inode *inode, struct md_op_data *op_data, +int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data, void *cookie, filldir_t filldir) { #endif - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ll_dir_chain chain; - struct lu_dirent *ent; - int api32 = ll_need_32bit_api(sbi); - int hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH; - int done = 0; - int rc = 0; - __u64 hash = MDS_DIR_END_OFF; - struct page *page = NULL; + struct ll_sb_info *sbi = ll_i2sbi(inode); + __u64 pos = *ppos; + bool is_api32 = ll_need_32bit_api(sbi); + bool is_hash64 = sbi->ll_flags & LL_SBI_64BIT_HASH; + struct page *page; + struct ll_dir_chain chain; + bool done = false; + int rc = 0; ENTRY; - ll_dir_chain_init(&chain); - for (ent = ll_dir_entry_start(inode, op_data, &page); - ent != NULL && !IS_ERR(ent) && !done; - ent = ll_dir_entry_next(inode, op_data, ent, &page)) { - __u16 type; - int namelen; - struct lu_fid fid; - __u64 lhash; - __u64 ino; - - hash = le64_to_cpu(ent->lde_hash); - if (hash < op_data->op_hash_offset) - /* - * Skip until we find target hash - * value. - */ - continue; - namelen = le16_to_cpu(ent->lde_namelen); - if (namelen == 0) - /* - * Skip dummy record. - */ - continue; + ll_dir_chain_init(&chain); - if (api32 && hash64) - lhash = hash >> 32; - else - lhash = hash; - fid_le_to_cpu(&fid, &ent->lde_fid); - ino = cl_fid_build_ino(&fid, api32); - type = ll_dirent_type_get(ent); + page = ll_get_dir_page(inode, op_data, pos, &chain); -#ifdef HAVE_DIR_CONTEXT - /* For 'll_nfs_get_name_filldir()', it will try - * to access the 'ent' through its 'lde_name', - * so the parameter 'name' for 'filldir()' must - * be part of the 'ent'. */ - done = !dir_emit(ctx, ent->lde_name, namelen, ino, type); -#else - done = filldir(cookie, ent->lde_name, namelen, lhash, - ino, type); -#endif - if (done) { - if (op_data->op_hash_offset != MDS_DIR_END_OFF) - op_data->op_hash_offset = hash; + while (rc == 0 && !done) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + __u64 hash; + __u64 next; + + if (IS_ERR(page)) { + rc = PTR_ERR(page); break; } - } - if (IS_ERR(ent)) - rc = PTR_ERR(ent); - else if (ent == NULL) - op_data->op_hash_offset = MDS_DIR_END_OFF; + hash = MDS_DIR_END_OFF; + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL && !done; + ent = lu_dirent_next(ent)) { + __u16 type; + int namelen; + struct lu_fid fid; + __u64 lhash; + __u64 ino; + + hash = le64_to_cpu(ent->lde_hash); + if (hash < pos) + /* + * Skip until we find target hash + * value. + */ + continue; + + namelen = le16_to_cpu(ent->lde_namelen); + if (namelen == 0) + /* + * Skip dummy record. + */ + continue; + + if (is_api32 && is_hash64) + lhash = hash >> 32; + else + lhash = hash; + fid_le_to_cpu(&fid, &ent->lde_fid); + ino = cl_fid_build_ino(&fid, is_api32); + type = ll_dirent_type_get(ent); + /* For 'll_nfs_get_name_filldir()', it will try + * to access the 'ent' through its 'lde_name', + * so the parameter 'name' for 'filldir()' must + * be part of the 'ent'. */ + done = filldir(cookie, ent->lde_name, namelen, lhash, + ino, type); + } - if (page != NULL) { - kunmap(page); - page_cache_release(page); + if (done) { + pos = hash; + ll_release_page(inode, page, false); + break; + } + + next = le64_to_cpu(dp->ldp_hash_end); + pos = next; + if (pos == MDS_DIR_END_OFF) { + /* + * End of directory reached. + */ + done = 1; + ll_release_page(inode, page, false); + } else { + /* + * Normal case: continue to the next + * page. + */ + ll_release_page(inode, page, + le32_to_cpu(dp->ldp_flags) & + LDF_COLLIDE); + next = pos; + page = ll_get_dir_page(inode, op_data, pos, + &chain); + } } + *ppos = pos; ll_dir_chain_fini(&chain); RETURN(rc); } @@ -338,21 +336,22 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) * object */ if (fid_is_zero(&op_data->op_fid3)) { rc = ll_dir_get_parent_fid(inode, &op_data->op_fid3); - if (rc != 0) + if (rc != 0) { + ll_finish_md_op_data(op_data); RETURN(rc); + } } } - op_data->op_hash_offset = pos; op_data->op_max_pages = sbi->ll_md_brw_pages; #ifdef HAVE_DIR_CONTEXT ctx->pos = pos; - rc = ll_dir_read(inode, op_data, ctx); + rc = ll_dir_read(inode, &pos, op_data, ctx); pos = ctx->pos; #else - rc = ll_dir_read(inode, op_data, cookie, filldir); + rc = ll_dir_read(inode, &pos, op_data, cookie, filldir); #endif if (lfd != NULL) - lfd->lfd_pos = op_data->op_hash_offset; + lfd->lfd_pos = pos; if (pos == MDS_DIR_END_OFF) { if (api32) @@ -361,9 +360,7 @@ static int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) pos = LL_DIR_END_OFF; } else { if (api32 && hash64) - pos = op_data->op_hash_offset >> 32; - else - pos = op_data->op_hash_offset; + pos = pos >> 32; } #ifdef HAVE_DIR_CONTEXT ctx->pos = pos; diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 85ea3ab..ab1a43d 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -720,23 +720,18 @@ static void ll_stats_ops_tally(struct ll_sb_info *sbi, int op, int count) {} extern const struct file_operations ll_dir_operations; extern const struct inode_operations ll_dir_inode_operations; #ifdef HAVE_DIR_CONTEXT -int ll_dir_read(struct inode *inode, struct md_op_data *op_data, +int ll_dir_read(struct inode *inode, __u64 *pos, struct md_op_data *op_data, struct dir_context *ctx); #else -int ll_dir_read(struct inode *inode, struct md_op_data *op_data, +int ll_dir_read(struct inode *inode, __u64 *pos, struct md_op_data *op_data, void *cookie, filldir_t filldir); #endif int ll_get_mdt_idx(struct inode *inode); int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid); +struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data, + __u64 offset, struct ll_dir_chain *chain); +void ll_release_page(struct inode *inode, struct page *page, bool remove); -struct lu_dirent *ll_dir_entry_start(struct inode *dir, - struct md_op_data *op_data, - struct page **ppage); - -struct lu_dirent *ll_dir_entry_next(struct inode *dir, - struct md_op_data *op_data, - struct lu_dirent *ent, - struct page **ppage); /* llite/namei.c */ extern const struct inode_operations ll_special_inode_operations; @@ -1239,7 +1234,8 @@ struct ll_statahead_info { unsigned int sai_skip_hidden;/* skipped hidden dentry count */ unsigned int sai_ls_all:1, /* "ls -al", do stat-ahead for * hidden entries */ - sai_agl_valid:1;/* AGL is valid for the dir */ + sai_agl_valid:1,/* AGL is valid for the dir */ + sai_in_readpage:1;/* statahead is in readdir()*/ wait_queue_head_t sai_waitq; /* stat-ahead wait queue */ struct ptlrpc_thread sai_thread; /* stat-ahead thread */ struct ptlrpc_thread sai_agl_thread; /* AGL thread */ diff --git a/lustre/llite/llite_nfs.c b/lustre/llite/llite_nfs.c index d92424f..6fb3315 100644 --- a/lustre/llite/llite_nfs.c +++ b/lustre/llite/llite_nfs.c @@ -239,6 +239,7 @@ static int ll_get_name(struct dentry *dentry, char *name, .lgd_found = 0, }; struct md_op_data *op_data; + __u64 pos = 0; int rc; ENTRY; @@ -253,13 +254,12 @@ static int ll_get_name(struct dentry *dentry, char *name, if (IS_ERR(op_data)) GOTO(out, rc = PTR_ERR(op_data)); - op_data->op_hash_offset = 0; op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; mutex_lock(&dir->i_mutex); #ifdef HAVE_DIR_CONTEXT - rc = ll_dir_read(dir, op_data, &lgd.ctx); + rc = ll_dir_read(dir, &pos, op_data, &lgd.ctx); #else - rc = ll_dir_read(dir, op_data, &lgd, ll_nfs_get_name_filldir); + rc = ll_dir_read(dir, &pos, op_data, &lgd, ll_nfs_get_name_filldir); #endif mutex_unlock(&dir->i_mutex); ll_finish_md_op_data(op_data); diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index ca1d789..5e2a4c7 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -1077,8 +1077,8 @@ static int ll_statahead_thread(void *arg) struct md_op_data *op_data; struct ll_dir_chain chain; struct l_wait_info lwi = { 0 }; - struct lu_dirent *ent; struct page *page = NULL; + __u64 pos = 0; ENTRY; thread->t_pid = current_pid(); @@ -1090,7 +1090,6 @@ static int ll_statahead_thread(void *arg) if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - op_data->op_hash_offset = 0; op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; if (sbi->ll_flags & LL_SBI_AGL_ENABLED) @@ -1107,138 +1106,168 @@ static int ll_statahead_thread(void *arg) wake_up(&thread->t_ctl_waitq); ll_dir_chain_init(&chain); - for (ent = ll_dir_entry_start(dir, op_data, &page); - ent != NULL && !IS_ERR(ent); - ent = ll_dir_entry_next(dir, op_data, ent, &page)) { - __u64 hash; - int namelen; - char *name; - - hash = le64_to_cpu(ent->lde_hash); - if (unlikely(hash < op_data->op_hash_offset)) - /* - * Skip until we find target hash value. - */ - continue; + page = ll_get_dir_page(dir, op_data, pos, &chain); + while (1) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + + if (IS_ERR(page)) { + rc = PTR_ERR(page); + CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 + "/"LPU64" opendir_pid = %u: rc = %d\n", + PFID(ll_inode2fid(dir)), pos, sai->sai_index, + plli->lli_opendir_pid, rc); + GOTO(out, rc); + } - namelen = le16_to_cpu(ent->lde_namelen); - if (unlikely(namelen == 0)) - /* - * Skip dummy record. - */ - continue; + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + __u64 hash; + int namelen; + char *name; - name = ent->lde_name; - if (name[0] == '.') { - if (namelen == 1) { - /* - * skip "." - */ - continue; - } else if (name[1] == '.' && namelen == 2) { + hash = le64_to_cpu(ent->lde_hash); + if (unlikely(hash < pos)) /* - * skip ".." + * Skip until we find target hash value. */ continue; - } else if (!sai->sai_ls_all) { + + namelen = le16_to_cpu(ent->lde_namelen); + if (unlikely(namelen == 0)) /* - * skip hidden files. + * Skip dummy record. */ - sai->sai_skip_hidden++; continue; + + name = ent->lde_name; + if (name[0] == '.') { + if (namelen == 1) { + /* + * skip "." + */ + continue; + } else if (name[1] == '.' && namelen == 2) { + /* + * skip ".." + */ + continue; + } else if (!sai->sai_ls_all) { + /* + * skip hidden files. + */ + sai->sai_skip_hidden++; + continue; + } } - } - /* - * don't stat-ahead first entry. - */ - if (unlikely(++first == 1)) - continue; + /* + * don't stat-ahead first entry. + */ + if (unlikely(++first == 1)) + continue; keep_it: - l_wait_event(thread->t_ctl_waitq, - !sa_sent_full(sai) || - !sa_received_empty(sai) || - !agl_list_empty(sai) || - !thread_is_running(thread), - &lwi); + l_wait_event(thread->t_ctl_waitq, + !sa_sent_full(sai) || + !sa_received_empty(sai) || + !agl_list_empty(sai) || + !thread_is_running(thread), + &lwi); interpret_it: - while (!sa_received_empty(sai)) - ll_post_statahead(sai); + while (!sa_received_empty(sai)) + ll_post_statahead(sai); - if (unlikely(!thread_is_running(thread))) - GOTO(out, rc = 0); + if (unlikely(!thread_is_running(thread))) { + ll_release_page(dir, page, false); + GOTO(out, rc = 0); + } - /* If no window for metadata statahead, but there are - * some AGL entries to be triggered, then try to help - * to process the AGL entries. */ - if (sa_sent_full(sai)) { - spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - list_del_init(&clli->lli_agl_list); + /* If no window for metadata statahead, but there are + * some AGL entries to be triggered, then try to help + * to process the AGL entries. */ + if (sa_sent_full(sai)) { + spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, + sai); + + if (!sa_received_empty(sai)) + goto interpret_it; + + if (unlikely( + !thread_is_running(thread))) { + ll_release_page(dir, page, + false); + GOTO(out, rc = 0); + } + + if (!sa_sent_full(sai)) + goto do_it; + + spin_lock(&plli->lli_agl_lock); + } spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, - sai); - if (!sa_received_empty(sai)) - goto interpret_it; + goto keep_it; + } +do_it: + ll_statahead_one(parent, name, namelen); + } - if (unlikely( - !thread_is_running(thread))) + pos = le64_to_cpu(dp->ldp_hash_end); + if (pos == MDS_DIR_END_OFF) { + /* + * End of directory reached. + */ + ll_release_page(dir, page, false); + while (1) { + l_wait_event(thread->t_ctl_waitq, + !sa_received_empty(sai) || + sai->sai_sent == sai->sai_replied || + !thread_is_running(thread), + &lwi); + + while (!sa_received_empty(sai)) + ll_post_statahead(sai); + + if (unlikely(!thread_is_running(thread))) GOTO(out, rc = 0); - if (!sa_sent_full(sai)) - goto do_it; + if (sai->sai_sent == sai->sai_replied && + sa_received_empty(sai)) + break; + } + spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai) && + thread_is_running(thread)) { + clli = agl_first_entry(sai); + list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); spin_lock(&plli->lli_agl_lock); } spin_unlock(&plli->lli_agl_lock); - goto keep_it; - } - -do_it: - ll_statahead_one(parent, name, namelen); - } - - if (page != NULL) { - kunmap(page); - page_cache_release(page); - } - - /* - * End of directory reached. - */ - while (1) { - l_wait_event(thread->t_ctl_waitq, - !sa_received_empty(sai) || - sai->sai_sent == sai->sai_replied || - !thread_is_running(thread), - &lwi); - - while (!sa_received_empty(sai)) - ll_post_statahead(sai); - - if (unlikely(!thread_is_running(thread))) GOTO(out, rc = 0); - - if (sai->sai_sent == sai->sai_replied && - sa_received_empty(sai)) - break; - } - - spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai) && - thread_is_running(thread)) { - clli = agl_first_entry(sai); - list_del_init(&clli->lli_agl_list); - spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, sai); - spin_lock(&plli->lli_agl_lock); + } else { + /* + * chain is exhausted. + * Normal case: continue to the next page. + */ + ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) & + LDF_COLLIDE); + sai->sai_in_readpage = 1; + page = ll_get_dir_page(dir, op_data, pos, &chain); + sai->sai_in_readpage = 0; + } } - spin_unlock(&plli->lli_agl_lock); out: EXIT; ll_finish_md_op_data(op_data); @@ -1349,88 +1378,117 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) struct qstr *target = &dentry->d_name; struct md_op_data *op_data; int dot_de; - struct lu_dirent *ent; struct page *page = NULL; int rc = LS_NONE_FIRST_DE; + __u64 pos = 0; ENTRY; - ll_dir_chain_init(&chain); - op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, LUSTRE_OPC_ANY, dir); if (IS_ERR(op_data)) - GOTO(out, rc = PTR_ERR(op_data)); + RETURN(PTR_ERR(op_data)); /** *FIXME choose the start offset of the readdir */ op_data->op_stripe_offset = 0; - op_data->op_hash_offset = 0; op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; - for (ent = ll_dir_entry_start(dir, op_data, &page); - ent != NULL && !IS_ERR(ent); - ent = ll_dir_entry_next(dir, op_data, ent, &page)) { - __u64 hash; - int namelen; - char *name; - - hash = le64_to_cpu(ent->lde_hash); - /* The ll_get_dir_page() can return any page containing - * the given hash which may be not the start hash. */ - if (unlikely(hash < op_data->op_hash_offset)) - continue; - - namelen = le16_to_cpu(ent->lde_namelen); - if (unlikely(namelen == 0)) - /* - * skip dummy record. - */ - continue; + ll_dir_chain_init(&chain); + page = ll_get_dir_page(dir, op_data, 0, &chain); - name = ent->lde_name; - if (name[0] == '.') { - if (namelen == 1) - /* - * skip "." - */ + while (1) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + + if (IS_ERR(page)) { + struct ll_inode_info *lli = ll_i2info(dir); + + rc = PTR_ERR(page); + CERROR("%s: reading dir "DFID" at "LPU64 + "opendir_pid = %u : rc = %d\n", + ll_get_fsname(dir->i_sb, NULL, 0), + PFID(ll_inode2fid(dir)), pos, + lli->lli_opendir_pid, rc); + break; + } + + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + __u64 hash; + int namelen; + char *name; + + hash = le64_to_cpu(ent->lde_hash); + /* The ll_get_dir_page() can return any page containing + * the given hash which may be not the start hash. */ + if (unlikely(hash < pos)) continue; - else if (name[1] == '.' && namelen == 2) + + namelen = le16_to_cpu(ent->lde_namelen); + if (unlikely(namelen == 0)) /* - * skip ".." + * skip dummy record. */ continue; - else - dot_de = 1; - } else { - dot_de = 0; - } - if (dot_de && target->name[0] != '.') { - CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", - target->len, target->name, - namelen, name); - continue; - } + name = ent->lde_name; + if (name[0] == '.') { + if (namelen == 1) + /* + * skip "." + */ + continue; + else if (name[1] == '.' && namelen == 2) + /* + * skip ".." + */ + continue; + else + dot_de = 1; + } else { + dot_de = 0; + } - if (target->len != namelen || - memcmp(target->name, name, namelen) != 0) - rc = LS_NONE_FIRST_DE; - else if (!dot_de) - rc = LS_FIRST_DE; - else - rc = LS_FIRST_DOT_DE; + if (dot_de && target->name[0] != '.') { + CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", + target->len, target->name, + namelen, name); + continue; + } - break; - } - EXIT; + if (target->len != namelen || + memcmp(target->name, name, namelen) != 0) + rc = LS_NONE_FIRST_DE; + else if (!dot_de) + rc = LS_FIRST_DE; + else + rc = LS_FIRST_DOT_DE; - if (page != NULL) { - kunmap(page); - page_cache_release(page); + ll_release_page(dir, page, false); + GOTO(out, rc); + } + pos = le64_to_cpu(dp->ldp_hash_end); + if (pos == MDS_DIR_END_OFF) { + /* + * End of directory reached. + */ + ll_release_page(dir, page, false); + GOTO(out, rc); + } else { + /* + * chain is exhausted + * Normal case: continue to the next page. + */ + ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) & + LDF_COLLIDE); + page = ll_get_dir_page(dir, op_data, pos, &chain); + } } - ll_finish_md_op_data(op_data); + EXIT; out: ll_dir_chain_fini(&chain); + ll_finish_md_op_data(op_data); return rc; } @@ -1546,6 +1604,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, RETURN(entry ? 1 : -EAGAIN); } + /* if statahead is busy in readdir, help it do post-work */ + while (!ll_sa_entry_stated(entry) && + sai->sai_in_readpage && !sa_received_empty(sai)) + ll_post_statahead(sai); + if (!ll_sa_entry_stated(entry)) { sai->sai_index_wait = entry->se_index; lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL, diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 8750215..4994986 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -2348,187 +2348,296 @@ static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs) #endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */ /** - * This function will read entry from a striped directory, bascially, it will - * read entries from all of stripes, and choose one closest to the required - * offset(&op_data->op_hash_offset). A few notes - * 1. skip . and .. for non-zero stripes, because there can only have one . - * and .. in a directory. - * 2. op_data will be shared by all of stripes, instead of allocating new - * one, so need to restore before reusing. - * 3. release the entry page if that is not being chosen. + * Get current minimum entry from striped directory * - * param[in]exp obd export refer to LMV - * param[in]op_data hold those MD parameters of read_entry. - * param[in]cb_op ldlm callback being used in enqueue in mdc_read_entry - * param[out]ldp the entry being read. - * param[out]ppage the page holding the entry, note: because the entry - * will be accessed in upper layer, so we need hold the - * page until the usages of entry is finished, see - * ll_dir_entry_next. + * This function will search the dir entry, whose hash value is the + * closest(>=) to @hash_offset, from all of sub-stripes, and it is + * only being called for striped directory. * - * retval =0 if get entry successfully - * <0 can not get entry. + * \param[in] exp export of LMV + * \param[in] op_data parameters transferred beween client MD stack + * stripe_information will be included in this + * parameter + * \param[in] cb_op ldlm callback being used in enqueue in + * mdc_read_page + * \param[in] hash_offset the hash value, which is used to locate + * minum(closet) dir entry + * \param[in|out] stripe_offset the caller use this to indicate the stripe + * index of last entry, so to avoid hash conflict + * between stripes. It will also be used to + * return the stripe index of current dir entry. + * \param[in|out] entp the minum entry and it also is being used + * to input the last dir entry to resolve the + * hash conflict + * + * \param[out] ppage the page which holds the minum entry + * + * \retval = 0 get the entry successfully + * negative errno (< 0) does not get the entry */ -#define NORMAL_MAX_STRIPES 4 -static int lmv_read_striped_entry(struct obd_export *exp, - struct md_op_data *op_data, - struct md_callback *cb_op, - struct lu_dirent **ldp, - struct page **ppage) +static int lmv_get_min_striped_entry(struct obd_export *exp, + struct md_op_data *op_data, + struct md_callback *cb_op, + __u64 hash_offset, int *stripe_offset, + struct lu_dirent **entp, + struct page **ppage) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_stripe_md *lsm = op_data->op_mea1; struct lmv_tgt_desc *tgt; - struct lu_dirent *tmp_ents[NORMAL_MAX_STRIPES]; - struct lu_dirent **ents = NULL; - struct lu_fid master_fid = op_data->op_fid1; - void *master_data = op_data->op_data; - __u64 last_idx = op_data->op_stripe_offset; - __u64 hash_offset = op_data->op_hash_offset; - __u32 same_hash_offset = op_data->op_same_hash_offset; - __u32 cli_flags = op_data->op_cli_flags; int stripe_count; - __u64 min_hash; - int min_same_hash_offset = 0; - int min_idx = 0; + struct lu_dirent *min_ent = NULL; struct page *min_page = NULL; + int min_idx = 0; int i; - int rc; + int rc = 0; ENTRY; - LASSERT(lsm != NULL); - - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); - - /* . and .. will be stored on the master object, so we need iterate - * the master object as well */ stripe_count = lsm->lsm_md_stripe_count; - if (stripe_count > NORMAL_MAX_STRIPES) { - OBD_ALLOC(ents, sizeof(ents[0]) * stripe_count); - if (ents == NULL) - GOTO(out, rc = -ENOMEM); - } else { - ents = tmp_ents; - memset(ents, 0, sizeof(ents[0]) * stripe_count); - } - - min_hash = MDS_DIR_END_OFF; for (i = 0; i < stripe_count; i++) { - struct page *page = NULL; + struct lu_dirent *ent = NULL; + struct page *page = NULL; + struct lu_dirpage *dp; + __u64 stripe_hash = hash_offset; tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL); if (IS_ERR(tgt)) GOTO(out, rc = PTR_ERR(tgt)); - if (last_idx != i) - op_data->op_same_hash_offset = 0; - else - op_data->op_same_hash_offset = same_hash_offset; - /* op_data will be shared by each stripe, so we need * reset these value for each stripe */ op_data->op_stripe_offset = i; - op_data->op_hash_offset = hash_offset; - op_data->op_cli_flags = cli_flags; op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid; op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid; op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root; - next: - rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, &ents[i], - &page); + rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash, + &page); if (rc != 0) GOTO(out, rc); - if (ents[i] != NULL && - (strncmp(ents[i]->lde_name, ".", - le16_to_cpu(ents[i]->lde_namelen)) == 0 || - strncmp(ents[i]->lde_name, "..", - le16_to_cpu(ents[i]->lde_namelen)) == 0)) { - if (i == 0) { - /* replace . with master FID */ - if (le16_to_cpu(ents[i]->lde_namelen) == 1) - fid_cpu_to_le(&ents[i]->lde_fid, - &master_fid); - else - fid_cpu_to_le(&ents[i]->lde_fid, - &op_data->op_fid3); - } else { - /* skip . and .. for other stripes */ - op_data->op_cli_flags |= CLI_NEXT_ENTRY; - op_data->op_hash_offset = - le64_to_cpu(ents[i]->lde_hash); - kunmap(page); - page_cache_release(page); + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + /* Skip dummy entry */ + if (le16_to_cpu(ent->lde_namelen) == 0) + continue; + + if (le64_to_cpu(ent->lde_hash) < hash_offset) + continue; + + if (le64_to_cpu(ent->lde_hash) == hash_offset && + (*entp == ent || i < *stripe_offset)) + continue; + + /* skip . and .. for other stripes */ + if (i != 0 && + (strncmp(ent->lde_name, ".", + le16_to_cpu(ent->lde_namelen)) == 0 || + strncmp(ent->lde_name, "..", + le16_to_cpu(ent->lde_namelen)) == 0)) + continue; + break; + } + + if (ent == NULL) { + stripe_hash = le64_to_cpu(dp->ldp_hash_end); + + kunmap(page); + page_cache_release(page); + page = NULL; + + /* reach the end of current stripe, go to next stripe */ + if (stripe_hash == MDS_DIR_END_OFF) + continue; + else goto next; - } } - if (ents[i] != NULL) { - /* If the hash value of read_entry is equal to the - * current min_hash, which is very rare and only - * happens if two entries have the same hash value - * but on different stripes, in this case, we need - * make sure these entries are being reading forward, - * not backward, i.e. only reset the min_entry, if - * current stripe is ahead of last entry. Note: if - * there are hash conflict inside the entry, MDC - * (see mdc_read_entry) will resolve them. */ - if (le64_to_cpu(ents[i]->lde_hash) < min_hash || - (le64_to_cpu(ents[i]->lde_hash) == min_hash && - i >= last_idx)) { - if (min_page != NULL) { - kunmap(min_page); - page_cache_release(min_page); - } - min_page = page; - min_hash = le64_to_cpu(ents[i]->lde_hash); - min_same_hash_offset = - op_data->op_same_hash_offset; + if (min_ent != NULL) { + if (le64_to_cpu(min_ent->lde_hash) > + le64_to_cpu(ent->lde_hash)) { + min_ent = ent; + kunmap(min_page); + page_cache_release(min_page); min_idx = i; + min_page = page; } else { kunmap(page); page_cache_release(page); + page = NULL; } + } else { + min_ent = ent; + min_page = page; + min_idx = i; } } - if (min_hash != MDS_DIR_END_OFF) { - *ldp = ents[min_idx]; - op_data->op_stripe_offset = min_idx; - op_data->op_same_hash_offset = min_same_hash_offset; - *ppage = min_page; - } else { - *ldp = NULL; - *ppage = NULL; +out: + if (*ppage != NULL) { + kunmap(*ppage); + page_cache_release(*ppage); } + *stripe_offset = min_idx; + *entp = min_ent; + *ppage = min_page; + RETURN(rc); +} + +/** + * Build dir entry page from a striped directory + * + * This function gets one entry by @offset from a striped directory. It will + * read entries from all of stripes, and choose one closest to the required + * offset(&offset). A few notes + * 1. skip . and .. for non-zero stripes, because there can only have one . + * and .. in a directory. + * 2. op_data will be shared by all of stripes, instead of allocating new + * one, so need to restore before reusing. + * 3. release the entry page if that is not being chosen. + * + * \param[in] exp obd export refer to LMV + * \param[in] op_data hold those MD parameters of read_entry + * \param[in] cb_op ldlm callback being used in enqueue in mdc_read_entry + * \param[out] ldp the entry being read + * \param[out] ppage the page holding the entry. Note: because the entry + * will be accessed in upper layer, so we need hold the + * page until the usages of entry is finished, see + * ll_dir_entry_next. + * + * retval =0 if get entry successfully + * <0 cannot get entry + */ +static int lmv_read_striped_page(struct obd_export *exp, + struct md_op_data *op_data, + struct md_callback *cb_op, + __u64 offset, struct page **ppage) +{ + struct obd_device *obd = exp->exp_obd; + struct lu_fid master_fid = op_data->op_fid1; + struct inode *master_inode = op_data->op_data; + __u64 hash_offset = offset; + struct lu_dirpage *dp; + struct page *min_ent_page = NULL; + struct page *ent_page = NULL; + struct lu_dirent *ent; + void *area; + int ent_idx = 0; + struct lu_dirent *min_ent = NULL; + struct lu_dirent *last_ent; + int left_bytes; + int rc; + ENTRY; + + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); + + /* Allocate a page and read entries from all of stripes and fill + * the page by hash order */ + ent_page = alloc_page(GFP_KERNEL); + if (ent_page == NULL) + RETURN(-ENOMEM); + + /* Initialize the entry page */ + dp = kmap(ent_page); + memset(dp, 0, sizeof(*dp)); + dp->ldp_hash_start = cpu_to_le64(offset); + dp->ldp_flags |= LDF_COLLIDE; + + area = dp + 1; + left_bytes = PAGE_CACHE_SIZE - sizeof(*dp); + ent = area; + last_ent = ent; + do { + __u16 ent_size; + + /* Find the minum entry from all sub-stripes */ + rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset, + &ent_idx, &min_ent, + &min_ent_page); + if (rc != 0) + GOTO(out, rc); + + /* If it can not get minum entry, it means it already reaches + * the end of this directory */ + if (min_ent == NULL) { + last_ent->lde_reclen = 0; + hash_offset = MDS_DIR_END_OFF; + GOTO(out, rc); + } + + ent_size = le16_to_cpu(min_ent->lde_reclen); + + /* the last entry lde_reclen is 0, but it might not + * the end of this entry of this temporay entry */ + if (ent_size == 0) + ent_size = lu_dirent_calc_size( + le16_to_cpu(min_ent->lde_namelen), + le32_to_cpu(min_ent->lde_attrs)); + if (ent_size > left_bytes) { + last_ent->lde_reclen = cpu_to_le16(0); + hash_offset = le64_to_cpu(min_ent->lde_hash); + GOTO(out, rc); + } + + memcpy(ent, min_ent, ent_size); + + /* Replace . with master FID and Replace .. with the parent FID + * of master object */ + if (strncmp(ent->lde_name, ".", + le16_to_cpu(ent->lde_namelen)) == 0 && + le16_to_cpu(ent->lde_namelen) == 1) + fid_cpu_to_le(&ent->lde_fid, &master_fid); + else if (strncmp(ent->lde_name, "..", + le16_to_cpu(ent->lde_namelen)) == 0 && + le16_to_cpu(ent->lde_namelen) == 2) + fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3); + + left_bytes -= ent_size; + ent->lde_reclen = cpu_to_le16(ent_size); + last_ent = ent; + ent = (void *)ent + ent_size; + hash_offset = le64_to_cpu(min_ent->lde_hash); + if (hash_offset == MDS_DIR_END_OFF) { + last_ent->lde_reclen = 0; + break; + } + } while (1); out: + if (min_ent_page != NULL) { + kunmap(min_ent_page); + page_cache_release(min_ent_page); + } + + if (unlikely(rc != 0)) { + __free_page(ent_page); + ent_page = NULL; + } else { + if (ent == area) + dp->ldp_flags |= LDF_EMPTY; + dp->ldp_flags = cpu_to_le32(dp->ldp_flags); + dp->ldp_hash_end = cpu_to_le64(hash_offset); + } + /* We do not want to allocate md_op_data during each * dir entry reading, so op_data will be shared by every stripe, * then we need to restore it back to original value before * return to the upper layer */ - op_data->op_hash_offset = hash_offset; op_data->op_fid1 = master_fid; op_data->op_fid2 = master_fid; - op_data->op_data = master_data; - op_data->op_cli_flags = cli_flags; - if (stripe_count > NORMAL_MAX_STRIPES && ents != NULL) - OBD_FREE(ents, sizeof(ents[0]) * stripe_count); + op_data->op_data = master_inode; - if (rc != 0 && min_page != NULL) { - kunmap(min_page); - page_cache_release(min_page); - } + *ppage = ent_page; RETURN(rc); } -int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data, - struct md_callback *cb_op, struct lu_dirent **ldp, - struct page **ppage) +int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, __u64 offset, + struct page **ppage) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -2542,8 +2651,7 @@ int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); if (unlikely(lsm != NULL)) { - rc = lmv_read_striped_entry(exp, op_data, cb_op, - ldp, ppage); + rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage); RETURN(rc); } @@ -2551,8 +2659,8 @@ int lmv_read_entry(struct obd_export *exp, struct md_op_data *op_data, if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); - rc = md_read_entry(tgt->ltd_exp, op_data, cb_op, ldp, - ppage); + rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage); + RETURN(rc); } @@ -3474,7 +3582,7 @@ struct md_ops lmv_md_ops = { .m_setattr = lmv_setattr, .m_setxattr = lmv_setxattr, .m_fsync = lmv_fsync, - .m_read_entry = lmv_read_entry, + .m_read_page = lmv_read_page, .m_unlink = lmv_unlink, .m_init_ea_size = lmv_init_ea_size, .m_cancel_unused = lmv_cancel_unused, diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 37f9032..f9e2b09 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -1221,8 +1221,8 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, /* * upon hash collision, remove this page, * otherwise put page reference, and - * ll_get_dir_page() will issue RPC to fetch - * the page we want. + * mdc_read_page_remote() will issue RPC to + * fetch the page we want. */ kunmap(page); mdc_release_page(page, @@ -1380,15 +1380,8 @@ static int mdc_read_page_remote(void *data, struct page *page0) ENTRY; LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES); - if (op_data->op_mea1 != NULL) { - __u32 index = op_data->op_stripe_offset; - - inode = op_data->op_mea1->lsm_md_oinfo[index].lmo_root; - fid = &op_data->op_mea1->lsm_md_oinfo[index].lmo_fid; - } else { - inode = op_data->op_data; - fid = &op_data->op_fid1; - } + inode = op_data->op_data; + fid = &op_data->op_fid1; LASSERT(inode != NULL); OBD_ALLOC(page_pool, sizeof(page_pool[0]) * max_pages); @@ -1468,9 +1461,21 @@ static int mdc_read_page_remote(void *data, struct page *page0) /** * Read dir page from cache first, if it can not find it, read it from * server and add into the cache. + * + * \param[in] exp MDC export + * \param[in] op_data client MD stack parameters, transfering parameters + * between different layers on client MD stack. + * \param[in] cb_op callback required for ldlm lock enqueue during + * read page + * \param[in] hash_offset the hash offset of the page to be read + * \param[in] ppage the page to be read + * + * retval = 0 get the page successfully + * errno(<0) get the page failed */ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, - struct md_callback *cb_op, struct page **ppage) + struct md_callback *cb_op, __u64 hash_offset, + struct page **ppage) { struct lookup_intent it = { .it_op = IT_READDIR }; struct page *page; @@ -1505,7 +1510,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, rc = 0; mdc_set_lock_data(exp, &it.d.lustre.it_lock_handle, dir, NULL); - rp_param.rp_off = op_data->op_hash_offset; + rp_param.rp_off = hash_offset; rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64; page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end, rp_param.rp_hash64); @@ -1567,11 +1572,11 @@ hash_collision: if (BITS_PER_LONG == 32 && rp_param.rp_hash64) { start = le64_to_cpu(dp->ldp_hash_start) >> 32; end = le64_to_cpu(dp->ldp_hash_end) >> 32; - rp_param.rp_off = op_data->op_hash_offset >> 32; + rp_param.rp_off = hash_offset >> 32; } else { start = le64_to_cpu(dp->ldp_hash_start); end = le64_to_cpu(dp->ldp_hash_end); - rp_param.rp_off = op_data->op_hash_offset; + rp_param.rp_off = hash_offset; } if (end == start) { LASSERT(start == rp_param.rp_off); @@ -1579,7 +1584,7 @@ hash_collision: #if BITS_PER_LONG == 32 CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with " "hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start), - le64_to_cpu(dp->ldp_hash_end), op_data->op_hash_offset); + le64_to_cpu(dp->ldp_hash_end), hash_offset); #endif /* @@ -1602,102 +1607,6 @@ fail: goto out_unlock; } -/** - * Read one directory entry from the cache. - */ -int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, - struct md_callback *cb_op, struct lu_dirent **entp, - struct page **ppage) -{ - struct page *page = NULL; - struct lu_dirpage *dp; - struct lu_dirent *ent; - int rc = 0; - __u32 same_hash_count; - __u64 hash_offset = op_data->op_hash_offset; - ENTRY; - - CDEBUG(D_INFO, DFID " offset = "LPU64", flags %#x\n", - PFID(&op_data->op_fid1), op_data->op_hash_offset, - op_data->op_cli_flags); - - *ppage = NULL; - *entp = NULL; - - if (op_data->op_hash_offset == MDS_DIR_END_OFF) - RETURN(0); - - rc = mdc_read_page(exp, op_data, cb_op, &page); - if (rc != 0) - RETURN(rc); - - /* same_hash_count means how many entries with this - * hash value has been read */ - same_hash_count = op_data->op_same_hash_offset + 1; - dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - /* Skip dummy entry */ - if (le16_to_cpu(ent->lde_namelen) == 0) - continue; - - if (le64_to_cpu(ent->lde_hash) < - op_data->op_hash_offset) - continue; - - if (unlikely(le64_to_cpu(ent->lde_hash) == - op_data->op_hash_offset)) { - /* If it is not for next entry, which usually from - * ll_dir_entry_start, return this entry. */ - if (!(op_data->op_cli_flags & CLI_NEXT_ENTRY)) - break; - - /* Keep reading until all of entries being read are - * skipped. */ - if (same_hash_count > 0) { - same_hash_count--; - continue; - } - } - break; - } - - /* If it can not find entry in current page, try next page. */ - if (ent == NULL) { - if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) { - op_data->op_same_hash_offset = 0; - mdc_release_page(page, - le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - RETURN(0); - } - - op_data->op_hash_offset = le64_to_cpu(dp->ldp_hash_end); - mdc_release_page(page, - le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - rc = mdc_read_page(exp, op_data, cb_op, &page); - if (rc != 0) - RETURN(rc); - - if (page != NULL) { - dp = page_address(page); - ent = lu_dirent_start(dp); - } - } - - /* If the next hash is the same as the current hash, increase - * the op_same_hash_offset to resolve the same hash conflict */ - if (ent != NULL && op_data->op_cli_flags & CLI_NEXT_ENTRY) { - if (unlikely(le64_to_cpu(ent->lde_hash) == hash_offset)) - op_data->op_same_hash_offset++; - else - op_data->op_same_hash_offset = 0; - } - - *ppage = page; - *entp = ent; - RETURN(rc); -} - #else /* __KERNEL__ */ static struct page @@ -1725,7 +1634,7 @@ static struct page static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, - struct md_callback *cb_op, + struct md_callback *cb_op, __u64 hash_offset, struct page **ppage) { struct page *page; @@ -1734,7 +1643,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, /* No local cache for liblustre, always read entry remotely */ lmo = &op_data->op_mea1->lsm_md_oinfo[op_data->op_stripe_offset]; - page = mdc_read_page_remote(exp, lmo, op_data->op_hash_offset, + page = mdc_read_page_remote(exp, lmo, hash_offset, op_data->op_capa1); if (IS_ERR(page)) return PTR_ERR(page); @@ -1744,35 +1653,6 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, return rc; } -int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, - struct md_callback *cb_op, struct lu_dirent **entp, - struct page **ppage) -{ - struct page *page = NULL; - struct lu_dirpage *dp; - struct lu_dirent *ent; - int rc; - ENTRY; - - rc = mdc_read_page(exp, op_data, cb_op, &page); - if (rc != 0) - RETURN(rc); - - dp = page_address(page); - if (le64_to_cpu(dp->ldp_hash_end) < op_data->op_hash_offset) - GOTO(out, *entp = NULL); - - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) - if (le64_to_cpu(ent->lde_hash) >= op_data->op_hash_offset) - break; - *entp = ent; -out: - - OBD_PAGE_FREE(page); - RETURN(rc); -} - #endif static int mdc_statfs(const struct lu_env *env, @@ -3318,7 +3198,7 @@ struct md_ops mdc_md_ops = { .m_setxattr = mdc_setxattr, .m_getxattr = mdc_getxattr, .m_fsync = mdc_fsync, - .m_read_entry = mdc_read_entry, + .m_read_page = mdc_read_page, .m_unlink = mdc_unlink, .m_cancel_unused = mdc_cancel_unused, .m_init_ea_size = mdc_init_ea_size, diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 3be5dfd..25f66b1 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -2136,7 +2136,7 @@ void lprocfs_init_mps_stats(int num_private_stats, struct lprocfs_stats *stats) LPROCFS_MD_OP_INIT(num_private_stats, stats, rename); LPROCFS_MD_OP_INIT(num_private_stats, stats, setattr); LPROCFS_MD_OP_INIT(num_private_stats, stats, fsync); - LPROCFS_MD_OP_INIT(num_private_stats, stats, read_entry); + LPROCFS_MD_OP_INIT(num_private_stats, stats, read_page); LPROCFS_MD_OP_INIT(num_private_stats, stats, unlink); LPROCFS_MD_OP_INIT(num_private_stats, stats, setxattr); LPROCFS_MD_OP_INIT(num_private_stats, stats, getxattr);