Whamcloud - gitweb
b=15692 statahead should hold parent dir's i_mutex to synchronize with other operatio...
authorFan Yong <Yong.Fan@Sun.COM>
Sat, 19 Dec 2009 16:08:46 +0000 (00:08 +0800)
committerRobert Read <rread@sun.com>
Tue, 22 Dec 2009 00:34:01 +0000 (16:34 -0800)
1) statahead should hold parent dir's i_mutex to synchronize with other operations from VFS layer when inserting dentry to dcache
2) drop unused ll_lookup_lock
3) drop unused DCACHE_LUSTRE_INVALID definition checking
4) more debug information
5) other code cleanup

i=robert.read
i=eric.mei

lustre/llite/dcache.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/namei.c
lustre/llite/statahead.c

index dd7e9c5..fe56468 100644 (file)
@@ -51,7 +51,6 @@
 
 #include "llite_internal.h"
 
-spinlock_t ll_lookup_lock = SPIN_LOCK_UNLOCKED;
 
 /* should NOT be called with the dcache lock, see fs/dcache.c */
 static void ll_release(struct dentry *de)
@@ -119,18 +118,12 @@ static int ll_ddelete(struct dentry *de)
 {
         ENTRY;
         LASSERT(de);
-#ifndef DCACHE_LUSTRE_INVALID
-#define DCACHE_LUSTRE_INVALID 0
-#endif
 
         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
                d_unhashed(de) ? "" : "hashed,",
                list_empty(&de->d_subdirs) ? "" : "subdirs");
-#if DCACHE_LUSTRE_INVALID == 0
-#undef DCACHE_LUSTRE_INVALID
-#endif
 
         RETURN(0);
 }
@@ -216,9 +209,7 @@ int ll_drop_dentry(struct dentry *dentry)
                 __d_drop(dentry);
                 unlock_dentry(dentry);
                 spin_unlock(&dcache_lock);
-                spin_unlock(&ll_lookup_lock);
                 dput(dentry);
-                spin_lock(&ll_lookup_lock);
                 spin_lock(&dcache_lock);
                 return 1;
         }
@@ -229,11 +220,7 @@ int ll_drop_dentry(struct dentry *dentry)
                 RETURN (0);
         }
 
-#ifdef DCACHE_LUSTRE_INVALID
         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
-#else
-        if (!d_unhashed(dentry)) {
-#endif
                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
                        "inode %p refc %d\n", dentry->d_name.len,
                        dentry->d_name.name, dentry, dentry->d_parent,
@@ -241,12 +228,9 @@ int ll_drop_dentry(struct dentry *dentry)
                 /* actually we don't unhash the dentry, rather just
                  * mark it inaccessible for to __d_lookup(). otherwise
                  * sys_getcwd() could return -ENOENT -bzzz */
-#ifdef DCACHE_LUSTRE_INVALID
                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
-#endif
                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
                         __d_drop(dentry);
-
         }
         unlock_dentry(dentry);
         return 0;
@@ -266,7 +250,6 @@ void ll_unhash_aliases(struct inode *inode)
                inode->i_ino, inode->i_generation, inode);
 
         head = &inode->i_dentry;
-        spin_lock(&ll_lookup_lock);
         spin_lock(&dcache_lock);
 restart:
         tmp = head;
@@ -289,7 +272,6 @@ restart:
                           goto restart;
         }
         spin_unlock(&dcache_lock);
-        spin_unlock(&ll_lookup_lock);
 
         EXIT;
 }
@@ -312,7 +294,7 @@ int ll_revalidate_it_finish(struct ptlrpc_request *request,
         RETURN(rc);
 }
 
-void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
+void ll_finish_locks(struct lookup_intent *it, struct dentry *dentry)
 {
         LASSERT(it != NULL);
         LASSERT(dentry != NULL);
@@ -378,10 +360,8 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
                 if (it && (it->it_op & IT_CREAT))
                         RETURN(0);
 
-#ifdef DCACHE_LUSTRE_INVALID
                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
                         RETURN(0);
-#endif
 
                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE);
                 GOTO(out_sa, rc);
@@ -516,14 +496,12 @@ revalidate_finish:
 
         /* unfortunately ll_intent_lock may cause a callback and revoke our
          * dentry */
-        spin_lock(&ll_lookup_lock);
         spin_lock(&dcache_lock);
         lock_dentry(de);
         __d_drop(de);
         unlock_dentry(de);
         d_rehash_cond(de, 0);
         spin_unlock(&dcache_lock);
-        spin_unlock(&ll_lookup_lock);
 
 out:
         /* We do not free request as it may be reused during following lookup
@@ -533,25 +511,16 @@ out:
         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
                 ptlrpc_req_finished(req);
         if (rc == 0) {
-#ifdef DCACHE_LUSTRE_INVALID
                 ll_unhash_aliases(de->d_inode);
-                /* done in ll_unhash_aliases()
-                   dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
-#else
-                /* We do not want d_invalidate to kill all child dentries too */
-                d_drop(de);
-#endif
         } else {
                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
                        "inode %p refc %d\n", de->d_name.len,
                        de->d_name.name, de, de->d_parent, de->d_inode,
                        atomic_read(&de->d_count));
-                ll_lookup_finish_locks(it, de);
-#ifdef DCACHE_LUSTRE_INVALID
+                ll_finish_locks(it, de);
                 lock_dentry(de);
                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
                 unlock_dentry(de);
-#endif
         }
         RETURN(rc);
 
@@ -575,7 +544,7 @@ do_lookup:
                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
                 if (it->it_op == IT_GETATTR)
                         lookup_it.it_op = IT_GETATTR;
-                ll_lookup_finish_locks(it, de);
+                ll_finish_locks(it, de);
                 it = &lookup_it;
         }
 
@@ -624,7 +593,8 @@ out_sa:
         return rc;
 }
 
-/*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
+#if 0
+static void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
 {
         struct inode *inode= de->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
@@ -671,7 +641,7 @@ out_sa:
         return;
 }
 
-/*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
+static void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
 {
         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
         struct ll_dentry_data *ldd = ll_d2d(de);
@@ -708,6 +678,7 @@ out_sa:
         EXIT;
         return;
 }
+#endif
 
 #ifdef HAVE_VFS_INTENT_PATCHES
 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
index 0032abf..ff45b5f 100644 (file)
@@ -301,20 +301,20 @@ struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact,
                 if (request)
                         ptlrpc_req_finished(request);
                 if (rc < 0) {
-                        CERROR("lock enqueue: rc: %d\n", rc);
+                        CERROR("lock enqueue: "DFID" at "LPU64": rc %d\n",
+                               PFID(ll_inode2fid(dir)), hash, rc);
                         return ERR_PTR(rc);
                 }
-        } else {
-                /* for cross-ref object, l_ast_data of the lock may not be set,
-                 * we reset it here */
-                md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, &lockh.cookie,
-                                 dir, NULL);
         }
+        md_set_lock_data(ll_i2sbi(dir)->ll_md_exp, &lockh.cookie, dir, NULL);
         ldlm_lock_dump_handle(D_OTHER, &lockh);
 
         page = ll_dir_page_locate(dir, hash, &start, &end);
-        if (IS_ERR(page))
+        if (IS_ERR(page)) {
+                CERROR("dir page locate: "DFID" at "LPU64": rc %ld\n",
+                       PFID(ll_inode2fid(dir)), hash, PTR_ERR(page));
                 GOTO(out_unlock, page);
+        }
 
         if (page != NULL) {
                 /*
@@ -348,17 +348,26 @@ struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact,
 
         page = read_cache_page(mapping, hash_x_index((unsigned long)hash),
                                (filler_t*)mapping->a_ops->readpage, NULL);
-        if (IS_ERR(page))
+        if (IS_ERR(page)) {
+                CERROR("read cache page: "DFID" at "LPU64": rc %ld\n",
+                       PFID(ll_inode2fid(dir)), hash, PTR_ERR(page));
                 GOTO(out_unlock, page);
+        }
 
         wait_on_page(page);
         (void)kmap(page);
-        if (!PageUptodate(page))
+        if (!PageUptodate(page)) {
+                CERROR("page not updated: "DFID" at "LPU64": rc %d\n",
+                       PFID(ll_inode2fid(dir)), hash, -5);
                 goto fail;
+        }
         if (!PageChecked(page))
                 ll_check_page(dir, page);
-        if (PageError(page))
+        if (PageError(page)) {
+                CERROR("page error: "DFID" at "LPU64": rc %d\n",
+                       PFID(ll_inode2fid(dir)), hash, -5);
                 goto fail;
+        }
 hash_collision:
         dp = page_address(page);
 
index 1bc7e0b..f7eb540 100644 (file)
@@ -2101,14 +2101,12 @@ int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
                    here to preserve get_cwd functionality on 2.6.
                    Bug 10503 */
                 if (!dentry->d_inode->i_nlink) {
-                        spin_lock(&ll_lookup_lock);
                         spin_lock(&dcache_lock);
                         ll_drop_dentry(dentry);
                         spin_unlock(&dcache_lock);
-                        spin_unlock(&ll_lookup_lock);
                 }
 
-                ll_lookup_finish_locks(&oit, dentry);
+                ll_finish_locks(&oit, dentry);
         } else if (!ll_have_md_lock(dentry->d_inode, ibits)) {
 
                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
index 930399f..7c120dd 100644 (file)
@@ -389,15 +389,10 @@ struct ll_sb_info {
 
         /* metadata stat-ahead */
         unsigned int              ll_sa_max;     /* max statahead RPCs */
-        unsigned int              ll_sa_wrong;   /* statahead thread stopped for
-                                                  * low hit ratio */
-        unsigned int              ll_sa_total;   /* statahead thread started
+        atomic_t                  ll_sa_total;   /* statahead thread started
                                                   * count */
-        unsigned long long        ll_sa_blocked; /* ls count waiting for
-                                                  * statahead */
-        unsigned long long        ll_sa_cached;  /* ls count got in cache */
-        unsigned long long        ll_sa_hit;     /* hit count */
-        unsigned long long        ll_sa_miss;    /* miss count */
+        atomic_t                  ll_sa_wrong;   /* statahead thread stopped for
+                                                  * low hit ratio */
 
         dev_t                     ll_sdev_orig; /* save s_dev before assign for
                                                  * clustred nfs */
@@ -586,7 +581,6 @@ struct lookup_intent *ll_convert_intent(struct open_intent *oit,
 #endif
 int ll_lookup_it_finish(struct ptlrpc_request *request,
                         struct lookup_intent *it, void *data);
-void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
 
 /* llite/rw.c */
 int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
@@ -670,7 +664,6 @@ int ll_fid2path(struct obd_export *exp, void *arg);
 /**
  * protect race ll_find_aliases vs ll_revalidate_it vs ll_unhash_aliases
  */
-extern spinlock_t ll_lookup_lock;
 extern struct dentry_operations ll_d_ops;
 void ll_intent_drop_lock(struct lookup_intent *);
 void ll_intent_release(struct lookup_intent *);
@@ -679,7 +672,7 @@ extern void ll_set_dd(struct dentry *de);
 int ll_drop_dentry(struct dentry *dentry);
 void ll_unhash_aliases(struct inode *);
 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft);
-void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
+void ll_finish_locks(struct lookup_intent *it, struct dentry *dentry);
 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name);
 int ll_revalidate_it_finish(struct ptlrpc_request *request,
                             struct lookup_intent *it, struct dentry *de);
index 9b244da..98aae9a 100644 (file)
@@ -135,6 +135,8 @@ static struct ll_sb_info *ll_init_sbi(void)
 
         /* metadata statahead is enabled by default */
         sbi->ll_sa_max = LL_SA_RPC_DEF;
+        atomic_set(&sbi->ll_sa_total, 0);
+        atomic_set(&sbi->ll_sa_wrong, 0);
 
         RETURN(sbi);
 }
index 37ee5d4..943123e 100644 (file)
@@ -562,18 +562,10 @@ static int ll_rd_statahead_stats(char *page, char **start, off_t off,
         struct ll_sb_info *sbi = ll_s2sbi(sb);
 
         return snprintf(page, count,
-                        "statahead wrong: %u\n"
                         "statahead total: %u\n"
-                        "ls blocked:      %llu\n"
-                        "ls cached:       %llu\n"
-                        "hit count:       %llu\n"
-                        "miss count:      %llu\n",
-                        sbi->ll_sa_wrong,
-                        sbi->ll_sa_total,
-                        sbi->ll_sa_blocked,
-                        sbi->ll_sa_cached,
-                        sbi->ll_sa_hit,
-                        sbi->ll_sa_miss);
+                        "statahead wrong: %u\n",
+                        atomic_read(&sbi->ll_sa_total),
+                        atomic_read(&sbi->ll_sa_wrong));
 }
 
 static int ll_rd_lazystatfs(char *page, char **start, off_t off,
index 187cfea..61fb628 100644 (file)
@@ -154,7 +154,6 @@ static void ll_drop_negative_dentry(struct inode *dir)
 {
         struct dentry *dentry, *tmp_alias, *tmp_subdir;
 
-        spin_lock(&ll_lookup_lock);
         spin_lock(&dcache_lock);
 restart:
         list_for_each_entry_safe(dentry, tmp_alias,
@@ -175,7 +174,6 @@ restart:
                 }
         }
         spin_unlock(&dcache_lock);
-        spin_unlock(&ll_lookup_lock);
 }
 
 
@@ -350,7 +348,6 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
         struct dentry *dentry;
         struct dentry *last_discon = NULL;
 
-        spin_lock(&ll_lookup_lock);
         spin_lock(&dcache_lock);
         list_for_each(tmp, &inode->i_dentry) {
                 dentry = list_entry(tmp, struct dentry, d_alias);
@@ -387,7 +384,6 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
                 ll_dops_init(dentry, 0);
                 d_rehash_cond(dentry, 0); /* avoid taking dcache_lock inside */
                 spin_unlock(&dcache_lock);
-                spin_unlock(&ll_lookup_lock);
                 iput(inode);
                 CDEBUG(D_DENTRY, "alias dentry %.*s (%p) parent %p inode %p "
                        "refc %d\n", de->d_name.len, de->d_name.name, de,
@@ -404,7 +400,6 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
                 last_discon->d_flags |= DCACHE_LUSTRE_INVALID;
                 unlock_dentry(last_discon);
                 spin_unlock(&dcache_lock);
-                spin_unlock(&ll_lookup_lock);
                 ll_dops_init(last_discon, 1);
                 d_rehash(de);
                 d_move(last_discon, de);
@@ -415,7 +410,6 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
         ll_d_add(de, inode);
 
         spin_unlock(&dcache_lock);
-        spin_unlock(&ll_lookup_lock);
 
         return de;
 }
@@ -471,7 +465,7 @@ int ll_lookup_it_finish(struct ptlrpc_request *request,
                 /* we have lookup look - unhide dentry */
                 if (bits & MDS_INODELOCK_LOOKUP) {
                         lock_dentry(*de);
-                        (*de)->d_flags &= ~(DCACHE_LUSTRE_INVALID);
+                        (*de)->d_flags &= ~DCACHE_LUSTRE_INVALID;
                         unlock_dentry(*de);
                 }
         } else {
@@ -583,7 +577,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
             !S_ISDIR(dentry->d_inode->i_mode)) {
                 ll_release_openhandle(dentry, it);
         }
-        ll_lookup_finish_locks(it, dentry);
+        ll_finish_locks(it, dentry);
 
         if (dentry == save)
                 GOTO(out, retval = NULL);
index 28082be..2eb1b8b 100644 (file)
@@ -257,7 +257,7 @@ ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
                entry, index);
         entry->se_index = index;
-        entry->se_stat  = SA_ENTRY_UNSTATED;
+        entry->se_stat = SA_ENTRY_UNSTATED;
 
         spin_lock(&lli->lli_lock);
         list_add_tail(&entry->se_list, &sai->sai_entries_sent);
@@ -270,10 +270,11 @@ ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
  * delete it from sai_entries_stated head when fini, it need not
  * to process entry's member.
  */
-static void ll_sai_entry_fini(struct ll_statahead_info *sai)
+static int ll_sai_entry_fini(struct ll_statahead_info *sai)
 {
         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
         struct ll_sai_entry  *entry;
+        int rc = 0;
         ENTRY;
 
         spin_lock(&lli->lli_lock);
@@ -283,13 +284,15 @@ static void ll_sai_entry_fini(struct ll_statahead_info *sai)
                                    struct ll_sai_entry, se_list);
                 if (entry->se_index < sai->sai_index_next) {
                         list_del(&entry->se_list);
+                        rc = entry->se_stat;
                         OBD_FREE_PTR(entry);
                 }
-        } else
+        } else {
                 LASSERT(sa_is_stopped(sai));
+        }
         spin_unlock(&lli->lli_lock);
 
-        EXIT;
+        RETURN(rc);
 }
 
 /**
@@ -311,8 +314,9 @@ ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
                                 entry->se_req = ptlrpc_request_addref(req);
                                 entry->se_minfo = minfo;
                                 RETURN(entry);
-                        } else if (entry->se_index > index)
+                        } else if (entry->se_index > index) {
                                 RETURN(NULL);
+                        }
                 }
         }
         RETURN(NULL);
@@ -430,14 +434,20 @@ static int do_statahead_interpret(struct ll_statahead_info *sai)
                 if (body->valid & OBD_MD_MDS)
                         GOTO(out, rc = -EAGAIN);
 
+                /* BUG 15962: if statahead insert dentry into dcache (for
+                 * lookup),it should hold parent dir's i_mutex to synchronize
+                 * with other operations from VFS layer.
+                 * E.g.: create/delete/rename/lookup, and so on. */
+                mutex_lock(&minfo->mi_dir->i_mutex);
                 rc = ll_lookup_it_finish(req, it, &icbd);
+                mutex_unlock(&minfo->mi_dir->i_mutex);
                 if (!rc)
                         /*
                          * Here dentry->d_inode might be NULL,
                          * because the entry may have been removed before
                          * we start doing stat ahead.
                          */
-                        ll_lookup_finish_locks(it, dentry);
+                        ll_finish_locks(it, dentry);
 
                 if (dentry != save) {
                         minfo->mi_dentry = dentry;
@@ -458,7 +468,6 @@ static int do_statahead_interpret(struct ll_statahead_info *sai)
                         GOTO(out, rc);
                 }
 
-                spin_lock(&ll_lookup_lock);
                 spin_lock(&dcache_lock);
                 lock_dentry(dentry);
                 __d_drop(dentry);
@@ -466,9 +475,8 @@ static int do_statahead_interpret(struct ll_statahead_info *sai)
                 unlock_dentry(dentry);
                 d_rehash_cond(dentry, 0);
                 spin_unlock(&dcache_lock);
-                spin_unlock(&ll_lookup_lock);
 
-                ll_lookup_finish_locks(it, dentry);
+                ll_finish_locks(it, dentry);
         }
         EXIT;
 
@@ -506,8 +514,8 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
                 sai = ll_sai_get(lli->lli_sai);
                 entry = ll_sai_entry_set(sai,
                                          (unsigned int)(long)minfo->mi_cbdata,
-                                         rc ? SA_ENTRY_UNSTATED :
-                                         SA_ENTRY_STATED, req, minfo);
+                                         rc < 0 ? rc : SA_ENTRY_STATED, req,
+                                         minfo);
                 LASSERT(entry != NULL);
                 if (likely(sa_is_running(sai))) {
                         ll_sai_entry_to_received(sai, entry);
@@ -639,13 +647,13 @@ static int do_sa_revalidate(struct inode *dir, struct dentry *dentry)
         int rc;
         ENTRY;
 
-        if (inode == NULL)
+        if (unlikely(inode == NULL))
                 RETURN(1);
 
         if (d_mountpoint(dentry))
                 RETURN(1);
 
-        if (dentry == dentry->d_sb->s_root)
+        if (unlikely(dentry == dentry->d_sb->s_root))
                 RETURN(1);
 
         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
@@ -722,7 +730,7 @@ out:
         if (rc) {
                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
                        se, se->se_index, se->se_stat, rc);
-                se->se_stat = rc;
+                se->se_stat = rc < 0 ? rc : SA_ENTRY_STATED;
                 if (ll_sai_entry_to_stated(sai, se))
                         cfs_waitq_signal(&sai->sai_waitq);
         } else {
@@ -754,7 +762,7 @@ static int ll_statahead_thread(void *arg)
                 cfs_daemonize(pname);
         }
 
-        sbi->ll_sa_total++;
+        atomic_inc(&sbi->ll_sa_total);
         spin_lock(&lli->lli_lock);
         thread->t_flags = SVC_RUNNING;
         spin_unlock(&lli->lli_lock);
@@ -771,9 +779,10 @@ static int ll_statahead_thread(void *arg)
 
                 if (IS_ERR(page)) {
                         rc = PTR_ERR(page);
-                        CDEBUG(D_READA, "error reading dir "DFID" at "LPU64"/%u: rc %d\n",
-                               PFID(ll_inode2fid(dir)), pos,
-                               sai->sai_index, rc);
+                        CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
+                               "/%u: [rc %d] [parent %u]\n",
+                               PFID(ll_inode2fid(dir)), pos, sai->sai_index,
+                               rc, lli->lli_opendir_pid);
                         break;
                 }
 
@@ -970,9 +979,13 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
                 struct lu_dirent  *ent;
 
                 if (IS_ERR(page)) {
+                        struct ll_inode_info *lli = ll_i2info(dir);
+
                         rc = PTR_ERR(page);
-                        CERROR("error reading dir "DFID" at "LPU64": rc %d\n",
-                               PFID(ll_inode2fid(dir)), pos, rc);
+                        CERROR("error reading dir "DFID" at "LPU64": "
+                               "[rc %d] [parent %u]\n",
+                               PFID(ll_inode2fid(dir)), pos,
+                               rc, lli->lli_opendir_pid);
                         break;
                 }
 
@@ -1074,8 +1087,6 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
         sai = lli->lli_sai;
 
         if (sai) {
-                struct ll_sb_info *sbi;
-
                 if (unlikely(sa_is_stopped(sai) &&
                              list_empty(&sai->sai_entries_stated)))
                         RETURN(-EBADFD);
@@ -1107,18 +1118,33 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
                         }
                 }
 
-                sbi = ll_i2sbi(dir);
-                if (ll_sai_entry_stated(sai)) {
-                        sbi->ll_sa_cached++;
-                } else {
-                        sbi->ll_sa_blocked++;
-                        /*
-                         * thread started already, avoid double-stat.
-                         */
+                if (!ll_sai_entry_stated(sai)) {
+                        /* BUG 15962:
+                         *
+                         * If statahead insert dentry into dcache (for lookup),
+                         * it should hold parent dir's i_mutex to synchronize
+                         * with other operations from VFS layer.
+                         * E.g.: create/delete/rename/lookup, and so on.
+                         *
+                         * To prevent the dead lock between statahead and its
+                         * parent process, the parent process should release
+                         * such i_mutex before waiting for statahead to fetch
+                         * related dentry attribute from MDS.
+                         *
+                         * It is no matter for parent process to release such
+                         * i_mutex temporary, if someone else create dentry for
+                         * the same item in such interval, we can find it after
+                         * woke up by statahead. */
+                        if (lookup) {
+                                LASSERT(mutex_is_locked(&dir->i_mutex));
+                                mutex_unlock(&dir->i_mutex);
+                        }
                         rc = l_wait_event(sai->sai_waitq,
                                           ll_sai_entry_stated(sai) ||
                                           sa_is_stopped(sai),
                                           &lwi);
+                        if (lookup)
+                                mutex_lock(&dir->i_mutex);
                 }
 
                 if (lookup) {
@@ -1213,6 +1239,7 @@ void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result)
         struct ll_statahead_info *sai;
         struct ll_sb_info        *sbi;
         struct ll_dentry_data    *ldd = ll_d2d(dentry);
+        int                       rc;
         ENTRY;
 
         LASSERT(dir != NULL);
@@ -1222,17 +1249,24 @@ void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result)
         LASSERT(sai != NULL);
         sbi = ll_i2sbi(dir);
 
-        if (result >= 1) {
-                sbi->ll_sa_hit++;
+        rc = ll_sai_entry_fini(sai);
+        /* rc == -ENOENT means such dentry was removed just between statahead
+         * readdir and pre-fetched, count it as hit.
+         *
+         * result == -ENOENT has two meanings:
+         * 1. such dentry was removed just between statahead pre-fetched and
+         *    main process stat such dentry.
+         * 2. main process stat non-exist dentry.
+         * Since we can distinguish such two cases, just count it as miss. */
+        if (result >= 1 || unlikely(rc == -ENOENT)) {
                 sai->sai_hit++;
                 sai->sai_consecutive_miss = 0;
                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
         } else {
-                sbi->ll_sa_miss++;
                 sai->sai_miss++;
                 sai->sai_consecutive_miss++;
                 if (sa_low_hit(sai) && sa_is_running(sai)) {
-                        sbi->ll_sa_wrong++;
+                        atomic_inc(&sbi->ll_sa_wrong);
                         CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio "
                                "too low: hit/miss %u/%u, sent/replied %u/%u, "
                                "stopping statahead thread: pid %d\n",
@@ -1248,7 +1282,6 @@ void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result)
 
         if (!sa_is_stopped(sai))
                 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
-        ll_sai_entry_fini(sai);
         if (likely(ldd != NULL))
                 ldd->lld_sa_generation = sai->sai_generation;