From: Fan Yong Date: Sat, 19 Dec 2009 16:08:46 +0000 (+0800) Subject: b=15692 statahead should hold parent dir's i_mutex to synchronize with other operatio... X-Git-Tag: 1.10.0.33~10 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=e99ff8c12711281dc7e7e6bfd02c96eb956e4a33 b=15692 statahead should hold parent dir's i_mutex to synchronize with other operations from VFS layer when inserting dentry to dcache 1) statahead should hold parent dir's i_mutex to synchronize with other operations from VFS layer when inserting dentry to dcache 2) drop unused ll_lookup_lock 3) drop unused DCACHE_LUSTRE_INVALID definition checking 4) more debug information 5) other code cleanup i=robert.read i=eric.mei --- diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index dd7e9c5..fe56468 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -51,7 +51,6 @@ #include "llite_internal.h" -spinlock_t ll_lookup_lock = SPIN_LOCK_UNLOCKED; /* should NOT be called with the dcache lock, see fs/dcache.c */ static void ll_release(struct dentry *de) @@ -119,18 +118,12 @@ static int ll_ddelete(struct dentry *de) { ENTRY; LASSERT(de); -#ifndef DCACHE_LUSTRE_INVALID -#define DCACHE_LUSTRE_INVALID 0 -#endif CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n", (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"), de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode, d_unhashed(de) ? "" : "hashed,", list_empty(&de->d_subdirs) ? "" : "subdirs"); -#if DCACHE_LUSTRE_INVALID == 0 -#undef DCACHE_LUSTRE_INVALID -#endif RETURN(0); } @@ -216,9 +209,7 @@ int ll_drop_dentry(struct dentry *dentry) __d_drop(dentry); unlock_dentry(dentry); spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); dput(dentry); - spin_lock(&ll_lookup_lock); spin_lock(&dcache_lock); return 1; } @@ -229,11 +220,7 @@ int ll_drop_dentry(struct dentry *dentry) RETURN (0); } -#ifdef DCACHE_LUSTRE_INVALID if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) { -#else - if (!d_unhashed(dentry)) { -#endif CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p " "inode %p refc %d\n", dentry->d_name.len, dentry->d_name.name, dentry, dentry->d_parent, @@ -241,12 +228,9 @@ int ll_drop_dentry(struct dentry *dentry) /* actually we don't unhash the dentry, rather just * mark it inaccessible for to __d_lookup(). otherwise * sys_getcwd() could return -ENOENT -bzzz */ -#ifdef DCACHE_LUSTRE_INVALID dentry->d_flags |= DCACHE_LUSTRE_INVALID; -#endif if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode)) __d_drop(dentry); - } unlock_dentry(dentry); return 0; @@ -266,7 +250,6 @@ void ll_unhash_aliases(struct inode *inode) inode->i_ino, inode->i_generation, inode); head = &inode->i_dentry; - spin_lock(&ll_lookup_lock); spin_lock(&dcache_lock); restart: tmp = head; @@ -289,7 +272,6 @@ restart: goto restart; } spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); EXIT; } @@ -312,7 +294,7 @@ int ll_revalidate_it_finish(struct ptlrpc_request *request, RETURN(rc); } -void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry) +void ll_finish_locks(struct lookup_intent *it, struct dentry *dentry) { LASSERT(it != NULL); LASSERT(dentry != NULL); @@ -378,10 +360,8 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, if (it && (it->it_op & IT_CREAT)) RETURN(0); -#ifdef DCACHE_LUSTRE_INVALID if (de->d_flags & DCACHE_LUSTRE_INVALID) RETURN(0); -#endif rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE); GOTO(out_sa, rc); @@ -516,14 +496,12 @@ revalidate_finish: /* unfortunately ll_intent_lock may cause a callback and revoke our * dentry */ - spin_lock(&ll_lookup_lock); spin_lock(&dcache_lock); lock_dentry(de); __d_drop(de); unlock_dentry(de); d_rehash_cond(de, 0); spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); out: /* We do not free request as it may be reused during following lookup @@ -533,25 +511,16 @@ out: if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE)) ptlrpc_req_finished(req); if (rc == 0) { -#ifdef DCACHE_LUSTRE_INVALID ll_unhash_aliases(de->d_inode); - /* done in ll_unhash_aliases() - dentry->d_flags |= DCACHE_LUSTRE_INVALID; */ -#else - /* We do not want d_invalidate to kill all child dentries too */ - d_drop(de); -#endif } else { CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p " "inode %p refc %d\n", de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode, atomic_read(&de->d_count)); - ll_lookup_finish_locks(it, de); -#ifdef DCACHE_LUSTRE_INVALID + ll_finish_locks(it, de); lock_dentry(de); de->d_flags &= ~DCACHE_LUSTRE_INVALID; unlock_dentry(de); -#endif } RETURN(rc); @@ -575,7 +544,7 @@ do_lookup: /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */ if (it->it_op == IT_GETATTR) lookup_it.it_op = IT_GETATTR; - ll_lookup_finish_locks(it, de); + ll_finish_locks(it, de); it = &lookup_it; } @@ -624,7 +593,8 @@ out_sa: return rc; } -/*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag) +#if 0 +static void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag) { struct inode *inode= de->d_inode; struct ll_sb_info *sbi = ll_i2sbi(inode); @@ -671,7 +641,7 @@ out_sa: return; } -/*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag) +static void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag) { struct ll_sb_info *sbi = ll_i2sbi(de->d_inode); struct ll_dentry_data *ldd = ll_d2d(de); @@ -708,6 +678,7 @@ out_sa: EXIT; return; } +#endif #ifdef HAVE_VFS_INTENT_PATCHES int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd) diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 0032abf..ff45b5f 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -301,20 +301,20 @@ struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact, if (request) ptlrpc_req_finished(request); if (rc < 0) { - CERROR("lock enqueue: rc: %d\n", rc); + CERROR("lock enqueue: "DFID" at "LPU64": rc %d\n", + PFID(ll_inode2fid(dir)), hash, rc); return ERR_PTR(rc); } - } else { - /* for cross-ref object, l_ast_data of the lock may not be set, - * we reset it here */ - md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, &lockh.cookie, - dir, NULL); } + md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, &lockh.cookie, dir, NULL); ldlm_lock_dump_handle(D_OTHER, &lockh); page = ll_dir_page_locate(dir, hash, &start, &end); - if (IS_ERR(page)) + if (IS_ERR(page)) { + CERROR("dir page locate: "DFID" at "LPU64": rc %ld\n", + PFID(ll_inode2fid(dir)), hash, PTR_ERR(page)); GOTO(out_unlock, page); + } if (page != NULL) { /* @@ -348,17 +348,26 @@ struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact, page = read_cache_page(mapping, hash_x_index((unsigned long)hash), (filler_t*)mapping->a_ops->readpage, NULL); - if (IS_ERR(page)) + if (IS_ERR(page)) { + CERROR("read cache page: "DFID" at "LPU64": rc %ld\n", + PFID(ll_inode2fid(dir)), hash, PTR_ERR(page)); GOTO(out_unlock, page); + } wait_on_page(page); (void)kmap(page); - if (!PageUptodate(page)) + if (!PageUptodate(page)) { + CERROR("page not updated: "DFID" at "LPU64": rc %d\n", + PFID(ll_inode2fid(dir)), hash, -5); goto fail; + } if (!PageChecked(page)) ll_check_page(dir, page); - if (PageError(page)) + if (PageError(page)) { + CERROR("page error: "DFID" at "LPU64": rc %d\n", + PFID(ll_inode2fid(dir)), hash, -5); goto fail; + } hash_collision: dp = page_address(page); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 1bc7e0b..f7eb5405 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -2101,14 +2101,12 @@ int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it, here to preserve get_cwd functionality on 2.6. Bug 10503 */ if (!dentry->d_inode->i_nlink) { - spin_lock(&ll_lookup_lock); spin_lock(&dcache_lock); ll_drop_dentry(dentry); spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); } - ll_lookup_finish_locks(&oit, dentry); + ll_finish_locks(&oit, dentry); } else if (!ll_have_md_lock(dentry->d_inode, ibits)) { struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 930399f..7c120dd 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -389,15 +389,10 @@ struct ll_sb_info { /* metadata stat-ahead */ unsigned int ll_sa_max; /* max statahead RPCs */ - unsigned int ll_sa_wrong; /* statahead thread stopped for - * low hit ratio */ - unsigned int ll_sa_total; /* statahead thread started + atomic_t ll_sa_total; /* statahead thread started * count */ - unsigned long long ll_sa_blocked; /* ls count waiting for - * statahead */ - unsigned long long ll_sa_cached; /* ls count got in cache */ - unsigned long long ll_sa_hit; /* hit count */ - unsigned long long ll_sa_miss; /* miss count */ + atomic_t ll_sa_wrong; /* statahead thread stopped for + * low hit ratio */ dev_t ll_sdev_orig; /* save s_dev before assign for * clustred nfs */ @@ -586,7 +581,6 @@ struct lookup_intent *ll_convert_intent(struct open_intent *oit, #endif int ll_lookup_it_finish(struct ptlrpc_request *request, struct lookup_intent *it, void *data); -void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry); /* llite/rw.c */ int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); @@ -670,7 +664,6 @@ int ll_fid2path(struct obd_export *exp, void *arg); /** * protect race ll_find_aliases vs ll_revalidate_it vs ll_unhash_aliases */ -extern spinlock_t ll_lookup_lock; extern struct dentry_operations ll_d_ops; void ll_intent_drop_lock(struct lookup_intent *); void ll_intent_release(struct lookup_intent *); @@ -679,7 +672,7 @@ extern void ll_set_dd(struct dentry *de); int ll_drop_dentry(struct dentry *dentry); void ll_unhash_aliases(struct inode *); void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft); -void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry); +void ll_finish_locks(struct lookup_intent *it, struct dentry *dentry); int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name); int ll_revalidate_it_finish(struct ptlrpc_request *request, struct lookup_intent *it, struct dentry *de); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 9b244da..98aae9a 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -135,6 +135,8 @@ static struct ll_sb_info *ll_init_sbi(void) /* metadata statahead is enabled by default */ sbi->ll_sa_max = LL_SA_RPC_DEF; + atomic_set(&sbi->ll_sa_total, 0); + atomic_set(&sbi->ll_sa_wrong, 0); RETURN(sbi); } diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 37ee5d4..943123e 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -562,18 +562,10 @@ static int ll_rd_statahead_stats(char *page, char **start, off_t off, struct ll_sb_info *sbi = ll_s2sbi(sb); return snprintf(page, count, - "statahead wrong: %u\n" "statahead total: %u\n" - "ls blocked: %llu\n" - "ls cached: %llu\n" - "hit count: %llu\n" - "miss count: %llu\n", - sbi->ll_sa_wrong, - sbi->ll_sa_total, - sbi->ll_sa_blocked, - sbi->ll_sa_cached, - sbi->ll_sa_hit, - sbi->ll_sa_miss); + "statahead wrong: %u\n", + atomic_read(&sbi->ll_sa_total), + atomic_read(&sbi->ll_sa_wrong)); } static int ll_rd_lazystatfs(char *page, char **start, off_t off, diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 187cfea..61fb628 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -154,7 +154,6 @@ static void ll_drop_negative_dentry(struct inode *dir) { struct dentry *dentry, *tmp_alias, *tmp_subdir; - spin_lock(&ll_lookup_lock); spin_lock(&dcache_lock); restart: list_for_each_entry_safe(dentry, tmp_alias, @@ -175,7 +174,6 @@ restart: } } spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); } @@ -350,7 +348,6 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) struct dentry *dentry; struct dentry *last_discon = NULL; - spin_lock(&ll_lookup_lock); spin_lock(&dcache_lock); list_for_each(tmp, &inode->i_dentry) { dentry = list_entry(tmp, struct dentry, d_alias); @@ -387,7 +384,6 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) ll_dops_init(dentry, 0); d_rehash_cond(dentry, 0); /* avoid taking dcache_lock inside */ spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); iput(inode); CDEBUG(D_DENTRY, "alias dentry %.*s (%p) parent %p inode %p " "refc %d\n", de->d_name.len, de->d_name.name, de, @@ -404,7 +400,6 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) last_discon->d_flags |= DCACHE_LUSTRE_INVALID; unlock_dentry(last_discon); spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); ll_dops_init(last_discon, 1); d_rehash(de); d_move(last_discon, de); @@ -415,7 +410,6 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) ll_d_add(de, inode); spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); return de; } @@ -471,7 +465,7 @@ int ll_lookup_it_finish(struct ptlrpc_request *request, /* we have lookup look - unhide dentry */ if (bits & MDS_INODELOCK_LOOKUP) { lock_dentry(*de); - (*de)->d_flags &= ~(DCACHE_LUSTRE_INVALID); + (*de)->d_flags &= ~DCACHE_LUSTRE_INVALID; unlock_dentry(*de); } } else { @@ -583,7 +577,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, !S_ISDIR(dentry->d_inode->i_mode)) { ll_release_openhandle(dentry, it); } - ll_lookup_finish_locks(it, dentry); + ll_finish_locks(it, dentry); if (dentry == save) GOTO(out, retval = NULL); diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 28082be..2eb1b8b 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -257,7 +257,7 @@ ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index) CDEBUG(D_READA, "alloc sai entry %p index %u\n", entry, index); entry->se_index = index; - entry->se_stat = SA_ENTRY_UNSTATED; + entry->se_stat = SA_ENTRY_UNSTATED; spin_lock(&lli->lli_lock); list_add_tail(&entry->se_list, &sai->sai_entries_sent); @@ -270,10 +270,11 @@ ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index) * delete it from sai_entries_stated head when fini, it need not * to process entry's member. */ -static void ll_sai_entry_fini(struct ll_statahead_info *sai) +static int ll_sai_entry_fini(struct ll_statahead_info *sai) { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); struct ll_sai_entry *entry; + int rc = 0; ENTRY; spin_lock(&lli->lli_lock); @@ -283,13 +284,15 @@ static void ll_sai_entry_fini(struct ll_statahead_info *sai) struct ll_sai_entry, se_list); if (entry->se_index < sai->sai_index_next) { list_del(&entry->se_list); + rc = entry->se_stat; OBD_FREE_PTR(entry); } - } else + } else { LASSERT(sa_is_stopped(sai)); + } spin_unlock(&lli->lli_lock); - EXIT; + RETURN(rc); } /** @@ -311,8 +314,9 @@ ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat, entry->se_req = ptlrpc_request_addref(req); entry->se_minfo = minfo; RETURN(entry); - } else if (entry->se_index > index) + } else if (entry->se_index > index) { RETURN(NULL); + } } } RETURN(NULL); @@ -430,14 +434,20 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) if (body->valid & OBD_MD_MDS) GOTO(out, rc = -EAGAIN); + /* BUG 15962: if statahead insert dentry into dcache (for + * lookup),it should hold parent dir's i_mutex to synchronize + * with other operations from VFS layer. + * E.g.: create/delete/rename/lookup, and so on. */ + mutex_lock(&minfo->mi_dir->i_mutex); rc = ll_lookup_it_finish(req, it, &icbd); + mutex_unlock(&minfo->mi_dir->i_mutex); if (!rc) /* * Here dentry->d_inode might be NULL, * because the entry may have been removed before * we start doing stat ahead. */ - ll_lookup_finish_locks(it, dentry); + ll_finish_locks(it, dentry); if (dentry != save) { minfo->mi_dentry = dentry; @@ -458,7 +468,6 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) GOTO(out, rc); } - spin_lock(&ll_lookup_lock); spin_lock(&dcache_lock); lock_dentry(dentry); __d_drop(dentry); @@ -466,9 +475,8 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) unlock_dentry(dentry); d_rehash_cond(dentry, 0); spin_unlock(&dcache_lock); - spin_unlock(&ll_lookup_lock); - ll_lookup_finish_locks(it, dentry); + ll_finish_locks(it, dentry); } EXIT; @@ -506,8 +514,8 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, sai = ll_sai_get(lli->lli_sai); entry = ll_sai_entry_set(sai, (unsigned int)(long)minfo->mi_cbdata, - rc ? SA_ENTRY_UNSTATED : - SA_ENTRY_STATED, req, minfo); + rc < 0 ? rc : SA_ENTRY_STATED, req, + minfo); LASSERT(entry != NULL); if (likely(sa_is_running(sai))) { ll_sai_entry_to_received(sai, entry); @@ -639,13 +647,13 @@ static int do_sa_revalidate(struct inode *dir, struct dentry *dentry) int rc; ENTRY; - if (inode == NULL) + if (unlikely(inode == NULL)) RETURN(1); if (d_mountpoint(dentry)) RETURN(1); - if (dentry == dentry->d_sb->s_root) + if (unlikely(dentry == dentry->d_sb->s_root)) RETURN(1); rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode)); @@ -722,7 +730,7 @@ out: if (rc) { CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n", se, se->se_index, se->se_stat, rc); - se->se_stat = rc; + se->se_stat = rc < 0 ? rc : SA_ENTRY_STATED; if (ll_sai_entry_to_stated(sai, se)) cfs_waitq_signal(&sai->sai_waitq); } else { @@ -754,7 +762,7 @@ static int ll_statahead_thread(void *arg) cfs_daemonize(pname); } - sbi->ll_sa_total++; + atomic_inc(&sbi->ll_sa_total); spin_lock(&lli->lli_lock); thread->t_flags = SVC_RUNNING; spin_unlock(&lli->lli_lock); @@ -771,9 +779,10 @@ static int ll_statahead_thread(void *arg) if (IS_ERR(page)) { rc = PTR_ERR(page); - CDEBUG(D_READA, "error reading dir "DFID" at "LPU64"/%u: rc %d\n", - PFID(ll_inode2fid(dir)), pos, - sai->sai_index, rc); + CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 + "/%u: [rc %d] [parent %u]\n", + PFID(ll_inode2fid(dir)), pos, sai->sai_index, + rc, lli->lli_opendir_pid); break; } @@ -970,9 +979,13 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) struct lu_dirent *ent; if (IS_ERR(page)) { + struct ll_inode_info *lli = ll_i2info(dir); + rc = PTR_ERR(page); - CERROR("error reading dir "DFID" at "LPU64": rc %d\n", - PFID(ll_inode2fid(dir)), pos, rc); + CERROR("error reading dir "DFID" at "LPU64": " + "[rc %d] [parent %u]\n", + PFID(ll_inode2fid(dir)), pos, + rc, lli->lli_opendir_pid); break; } @@ -1074,8 +1087,6 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) sai = lli->lli_sai; if (sai) { - struct ll_sb_info *sbi; - if (unlikely(sa_is_stopped(sai) && list_empty(&sai->sai_entries_stated))) RETURN(-EBADFD); @@ -1107,18 +1118,33 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) } } - sbi = ll_i2sbi(dir); - if (ll_sai_entry_stated(sai)) { - sbi->ll_sa_cached++; - } else { - sbi->ll_sa_blocked++; - /* - * thread started already, avoid double-stat. - */ + if (!ll_sai_entry_stated(sai)) { + /* BUG 15962: + * + * If statahead insert dentry into dcache (for lookup), + * it should hold parent dir's i_mutex to synchronize + * with other operations from VFS layer. + * E.g.: create/delete/rename/lookup, and so on. + * + * To prevent the dead lock between statahead and its + * parent process, the parent process should release + * such i_mutex before waiting for statahead to fetch + * related dentry attribute from MDS. + * + * It is no matter for parent process to release such + * i_mutex temporary, if someone else create dentry for + * the same item in such interval, we can find it after + * woke up by statahead. */ + if (lookup) { + LASSERT(mutex_is_locked(&dir->i_mutex)); + mutex_unlock(&dir->i_mutex); + } rc = l_wait_event(sai->sai_waitq, ll_sai_entry_stated(sai) || sa_is_stopped(sai), &lwi); + if (lookup) + mutex_lock(&dir->i_mutex); } if (lookup) { @@ -1213,6 +1239,7 @@ void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result) struct ll_statahead_info *sai; struct ll_sb_info *sbi; struct ll_dentry_data *ldd = ll_d2d(dentry); + int rc; ENTRY; LASSERT(dir != NULL); @@ -1222,17 +1249,24 @@ void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result) LASSERT(sai != NULL); sbi = ll_i2sbi(dir); - if (result >= 1) { - sbi->ll_sa_hit++; + rc = ll_sai_entry_fini(sai); + /* rc == -ENOENT means such dentry was removed just between statahead + * readdir and pre-fetched, count it as hit. + * + * result == -ENOENT has two meanings: + * 1. such dentry was removed just between statahead pre-fetched and + * main process stat such dentry. + * 2. main process stat non-exist dentry. + * Since we can distinguish such two cases, just count it as miss. */ + if (result >= 1 || unlikely(rc == -ENOENT)) { sai->sai_hit++; sai->sai_consecutive_miss = 0; sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); } else { - sbi->ll_sa_miss++; sai->sai_miss++; sai->sai_consecutive_miss++; if (sa_low_hit(sai) && sa_is_running(sai)) { - sbi->ll_sa_wrong++; + atomic_inc(&sbi->ll_sa_wrong); CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio " "too low: hit/miss %u/%u, sent/replied %u/%u, " "stopping statahead thread: pid %d\n", @@ -1248,7 +1282,6 @@ void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result) if (!sa_is_stopped(sai)) cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); - ll_sai_entry_fini(sai); if (likely(ldd != NULL)) ldd->lld_sa_generation = sai->sai_generation;