Whamcloud - gitweb
LU-2388 statahead: race in do_sa_entry_fini()
authorLai Siyao <lai.siyao@intel.com>
Tue, 26 Mar 2013 08:12:11 +0000 (16:12 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 30 Mar 2013 18:40:31 +0000 (14:40 -0400)
Two fixes:
* When iterating sa_entry list in do_sa_entry_fini(), there is
  no lock, as may cause one entry put twice. To fix this, all
  entries are put in one list, and only 'scanner' will drop
  entry from this list.
* sa_entry may be linked to sai_sent_entries, but not hashed
  yet, if ll_sa_entry_fini() is called at this moment, this
  sa_entry may be unhashed.

Also include minor cleanup:
* rename do_sai_entry_fini() to do_sa_entry_fini().
* rename do_sai_entry_to_stated() to do_sa_entry_to_stated().
* rename do_statahead_interpret() to ll_post_statahead() to
  distinguish from ll_statahead_interpret().
* ll_post_statahead() always post handle statahead from received
  list to simplify logic.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: I3d0911d0bd3b940c9650473099604646408200c4
Reviewed-on: http://review.whamcloud.com/5842
Tested-by: Hudson
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Peng Tao <bergwolf@gmail.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/llite/llite_internal.h
lustre/llite/statahead.c

index 52e40b5..957ca24 100644 (file)
@@ -1277,7 +1277,7 @@ struct ll_statahead_info {
         cfs_waitq_t             sai_waitq;      /* stat-ahead wait queue */
         struct ptlrpc_thread    sai_thread;     /* stat-ahead thread */
         struct ptlrpc_thread    sai_agl_thread; /* AGL thread */
         cfs_waitq_t             sai_waitq;      /* stat-ahead wait queue */
         struct ptlrpc_thread    sai_thread;     /* stat-ahead thread */
         struct ptlrpc_thread    sai_agl_thread; /* AGL thread */
-        cfs_list_t              sai_entries_sent;     /* entries sent out */
+       cfs_list_t              sai_entries;    /* entry list */
         cfs_list_t              sai_entries_received; /* entries returned */
         cfs_list_t              sai_entries_stated;   /* entries stated */
         cfs_list_t              sai_entries_agl; /* AGL entries to be sent */
         cfs_list_t              sai_entries_received; /* entries returned */
         cfs_list_t              sai_entries_stated;   /* entries stated */
         cfs_list_t              sai_entries_agl; /* AGL entries to be sent */
index 9eea49e..6d5b7af 100644 (file)
@@ -58,38 +58,35 @@ typedef enum {
 } se_stat_t;
 
 struct ll_sa_entry {
 } se_stat_t;
 
 struct ll_sa_entry {
-        /* link into sai->sai_entries_{sent,received,stated} */
-        cfs_list_t              se_list;
-        /* link into sai hash table locally */
-        cfs_list_t              se_hash;
-        /* entry reference count */
-        cfs_atomic_t            se_refcount;
-        /* entry index in the sai */
-        __u64                   se_index;
-        /* low layer ldlm lock handle */
-        __u64                   se_handle;
-        /* entry status */
-        se_stat_t               se_stat;
-        /* entry size, contains name */
-        int                     se_size;
-        /* pointer to async getattr enqueue info */
-        struct md_enqueue_info *se_minfo;
-        /* pointer to the async getattr request */
-        struct ptlrpc_request  *se_req;
-        /* pointer to the target inode */
-        struct inode           *se_inode;
-        /* entry name */
-        struct qstr             se_qstr;
+       /* link into sai->sai_entries */
+       cfs_list_t              se_link;
+       /* link into sai->sai_entries_{received,stated} */
+       cfs_list_t              se_list;
+       /* link into sai hash table locally */
+       cfs_list_t              se_hash;
+       /* entry reference count */
+       cfs_atomic_t            se_refcount;
+       /* entry index in the sai */
+       __u64                   se_index;
+       /* low layer ldlm lock handle */
+       __u64                   se_handle;
+       /* entry status */
+       se_stat_t               se_stat;
+       /* entry size, contains name */
+       int                     se_size;
+       /* pointer to async getattr enqueue info */
+       struct md_enqueue_info *se_minfo;
+       /* pointer to the async getattr request */
+       struct ptlrpc_request  *se_req;
+       /* pointer to the target inode */
+       struct inode           *se_inode;
+       /* entry name */
+       struct qstr             se_qstr;
 };
 
 static unsigned int sai_generation = 0;
 static DEFINE_SPINLOCK(sai_generation_lock);
 
 };
 
 static unsigned int sai_generation = 0;
 static DEFINE_SPINLOCK(sai_generation_lock);
 
-static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry)
-{
-        return cfs_list_empty(&entry->se_list);
-}
-
 static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry)
 {
         return cfs_list_empty(&entry->se_hash);
 static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry)
 {
         return cfs_list_empty(&entry->se_hash);
@@ -193,7 +190,7 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
 }
 
 /*
 }
 
 /*
- * Insert it into sai_entries_sent tail when init.
+ * Insert it into sai_entries tail when init.
  */
 static struct ll_sa_entry *
 ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index,
  */
 static struct ll_sa_entry *
 ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index,
@@ -210,8 +207,8 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index,
         if (unlikely(entry == NULL))
                 RETURN(ERR_PTR(-ENOMEM));
 
         if (unlikely(entry == NULL))
                 RETURN(ERR_PTR(-ENOMEM));
 
-        CDEBUG(D_READA, "alloc sai entry %.*s(%p) index "LPU64"\n",
-               len, name, entry, index);
+       CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n",
+              len, name, entry, index);
 
         entry->se_index = index;
 
 
         entry->se_index = index;
 
@@ -250,11 +247,12 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index,
 
         lli = ll_i2info(sai->sai_inode);
        spin_lock(&lli->lli_sa_lock);
 
         lli = ll_i2info(sai->sai_inode);
        spin_lock(&lli->lli_sa_lock);
-       cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent);
+       cfs_list_add_tail(&entry->se_link, &sai->sai_entries);
+       CFS_INIT_LIST_HEAD(&entry->se_list);
+       ll_sa_entry_enhash(sai, entry);
        spin_unlock(&lli->lli_sa_lock);
 
        cfs_atomic_inc(&sai->sai_cache_count);
        spin_unlock(&lli->lli_sa_lock);
 
        cfs_atomic_inc(&sai->sai_cache_count);
-       ll_sa_entry_enhash(sai, entry);
 
        RETURN(entry);
 }
 
        RETURN(entry);
 }
@@ -291,17 +289,18 @@ ll_sa_entry_get_byname(struct ll_statahead_info *sai, const struct qstr *qstr)
 static struct ll_sa_entry *
 ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index)
 {
 static struct ll_sa_entry *
 ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index)
 {
-        struct ll_sa_entry *entry;
+       struct ll_sa_entry *entry;
 
 
-        cfs_list_for_each_entry(entry, &sai->sai_entries_sent, se_list) {
-                if (entry->se_index == index) {
-                        cfs_atomic_inc(&entry->se_refcount);
-                        return entry;
+       cfs_list_for_each_entry(entry, &sai->sai_entries, se_link) {
+               if (entry->se_index == index) {
+                       LASSERT(atomic_read(&entry->se_refcount) > 0);
+                       cfs_atomic_inc(&entry->se_refcount);
+                       return entry;
                 }
                 }
-                if (entry->se_index > index)
-                        break;
-        }
-        return NULL;
+               if (entry->se_index > index)
+                       break;
+       }
+       return NULL;
 }
 
 static void ll_sa_entry_cleanup(struct ll_statahead_info *sai,
 }
 
 static void ll_sa_entry_cleanup(struct ll_statahead_info *sai,
@@ -326,33 +325,38 @@ static void ll_sa_entry_cleanup(struct ll_statahead_info *sai,
 static void ll_sa_entry_put(struct ll_statahead_info *sai,
                              struct ll_sa_entry *entry)
 {
 static void ll_sa_entry_put(struct ll_statahead_info *sai,
                              struct ll_sa_entry *entry)
 {
-        if (cfs_atomic_dec_and_test(&entry->se_refcount)) {
-                CDEBUG(D_READA, "free sai entry %.*s(%p) index "LPU64"\n",
-                       entry->se_qstr.len, entry->se_qstr.name, entry,
-                       entry->se_index);
+       if (cfs_atomic_dec_and_test(&entry->se_refcount)) {
+               CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n",
+                      entry->se_qstr.len, entry->se_qstr.name, entry,
+                      entry->se_index);
 
 
-                LASSERT(ll_sa_entry_unhashed(entry));
-                LASSERT(ll_sa_entry_unlinked(entry));
+               LASSERT(cfs_list_empty(&entry->se_link));
+               LASSERT(cfs_list_empty(&entry->se_list));
+               LASSERT(ll_sa_entry_unhashed(entry));
 
 
-                ll_sa_entry_cleanup(sai, entry);
-                if (entry->se_inode)
-                        iput(entry->se_inode);
+               ll_sa_entry_cleanup(sai, entry);
+               if (entry->se_inode)
+                       iput(entry->se_inode);
 
 
-                OBD_FREE(entry, entry->se_size);
-                cfs_atomic_dec(&sai->sai_cache_count);
-        }
+               OBD_FREE(entry, entry->se_size);
+               cfs_atomic_dec(&sai->sai_cache_count);
+       }
 }
 
 static inline void
 }
 
 static inline void
-do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
+do_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
 {
        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
 
 {
        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
 
+       LASSERT(!ll_sa_entry_unhashed(entry));
+       LASSERT(!cfs_list_empty(&entry->se_link));
+
        ll_sa_entry_unhash(sai, entry);
 
        spin_lock(&lli->lli_sa_lock);
        entry->se_stat = SA_ENTRY_DEST;
        ll_sa_entry_unhash(sai, entry);
 
        spin_lock(&lli->lli_sa_lock);
        entry->se_stat = SA_ENTRY_DEST;
-       if (likely(!ll_sa_entry_unlinked(entry)))
+       cfs_list_del_init(&entry->se_link);
+       if (likely(!cfs_list_empty(&entry->se_list)))
                cfs_list_del_init(&entry->se_list);
        spin_unlock(&lli->lli_sa_lock);
 
                cfs_list_del_init(&entry->se_list);
        spin_unlock(&lli->lli_sa_lock);
 
@@ -365,52 +369,41 @@ do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
 static void
 ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
 {
 static void
 ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
 {
-        struct ll_sa_entry *pos, *next;
+       struct ll_sa_entry *pos, *next;
 
 
-        if (entry)
-                do_sai_entry_fini(sai, entry);
+       if (entry)
+               do_sa_entry_fini(sai, entry);
 
 
-        /* drop old entry from sent list */
-        cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_sent,
-                                     se_list) {
-                if (is_omitted_entry(sai, pos->se_index))
-                        do_sai_entry_fini(sai, pos);
-                else
-                        break;
-        }
-
-        /* drop old entry from stated list */
-        cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_stated,
-                                     se_list) {
-                if (is_omitted_entry(sai, pos->se_index))
-                        do_sai_entry_fini(sai, pos);
-                else
-                        break;
-        }
+       /* drop old entry, only 'scanner' process does this, no need to lock */
+       cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) {
+               if (!is_omitted_entry(sai, pos->se_index))
+                       break;
+               do_sa_entry_fini(sai, pos);
+       }
 }
 
 /*
  * Inside lli_sa_lock.
  */
 static void
 }
 
 /*
  * Inside lli_sa_lock.
  */
 static void
-do_sai_entry_to_stated(struct ll_statahead_info *sai,
-                       struct ll_sa_entry *entry, int rc)
+do_sa_entry_to_stated(struct ll_statahead_info *sai,
+                     struct ll_sa_entry *entry, se_stat_t stat)
 {
 {
-        struct ll_sa_entry *se;
-        cfs_list_t         *pos = &sai->sai_entries_stated;
+       struct ll_sa_entry *se;
+       cfs_list_t         *pos = &sai->sai_entries_stated;
 
 
-        if (!ll_sa_entry_unlinked(entry))
-                cfs_list_del_init(&entry->se_list);
+       if (!cfs_list_empty(&entry->se_list))
+               cfs_list_del_init(&entry->se_list);
 
 
-        cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
-                if (se->se_index < entry->se_index) {
-                        pos = &se->se_list;
-                        break;
-                }
-        }
+       cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
+               if (se->se_index < entry->se_index) {
+                       pos = &se->se_list;
+                       break;
+               }
+       }
 
 
-        cfs_list_add(&entry->se_list, pos);
-        entry->se_stat = rc;
+       cfs_list_add(&entry->se_list, pos);
+       entry->se_stat = stat;
 }
 
 /*
 }
 
 /*
@@ -420,7 +413,7 @@ do_sai_entry_to_stated(struct ll_statahead_info *sai,
  */
 static int
 ll_sa_entry_to_stated(struct ll_statahead_info *sai,
  */
 static int
 ll_sa_entry_to_stated(struct ll_statahead_info *sai,
-                     struct ll_sa_entry *entry, int rc)
+                     struct ll_sa_entry *entry, se_stat_t stat)
 {
        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
        int                   ret = 1;
 {
        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
        int                   ret = 1;
@@ -429,7 +422,7 @@ ll_sa_entry_to_stated(struct ll_statahead_info *sai,
 
        spin_lock(&lli->lli_sa_lock);
        if (likely(entry->se_stat != SA_ENTRY_DEST)) {
 
        spin_lock(&lli->lli_sa_lock);
        if (likely(entry->se_stat != SA_ENTRY_DEST)) {
-               do_sai_entry_to_stated(sai, entry, rc);
+               do_sa_entry_to_stated(sai, entry, stat);
                ret = 0;
        }
        spin_unlock(&lli->lli_sa_lock);
                ret = 0;
        }
        spin_unlock(&lli->lli_sa_lock);
@@ -492,7 +485,7 @@ static struct ll_statahead_info *ll_sai_alloc(void)
         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
         cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq);
 
         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
         cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq);
 
-        CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
+       CFS_INIT_LIST_HEAD(&sai->sai_entries);
         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
         CFS_INIT_LIST_HEAD(&sai->sai_entries_agl);
         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
         CFS_INIT_LIST_HEAD(&sai->sai_entries_agl);
@@ -543,15 +536,13 @@ static void ll_sai_put(struct ll_statahead_info *sai)
                               PFID(&lli->lli_fid),
                               sai->sai_sent, sai->sai_replied);
 
                               PFID(&lli->lli_fid),
                               sai->sai_sent, sai->sai_replied);
 
-                cfs_list_for_each_entry_safe(entry, next,
-                                             &sai->sai_entries_sent, se_list)
-                        do_sai_entry_fini(sai, entry);
-
-                LASSERT(sa_received_empty(sai));
+               cfs_list_for_each_entry_safe(entry, next,
+                                            &sai->sai_entries, se_link)
+                       do_sa_entry_fini(sai, entry);
 
 
-                cfs_list_for_each_entry_safe(entry, next,
-                                             &sai->sai_entries_stated, se_list)
-                        do_sai_entry_fini(sai, entry);
+               LASSERT(list_empty(&sai->sai_entries));
+               LASSERT(sa_received_empty(sai));
+               LASSERT(list_empty(&sai->sai_entries_stated));
 
                 LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0);
                 LASSERT(agl_list_empty(sai));
 
                 LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0);
                 LASSERT(agl_list_empty(sai));
@@ -626,8 +617,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
         EXIT;
 }
 
         EXIT;
 }
 
-static void do_statahead_interpret(struct ll_statahead_info *sai,
-                                   struct ll_sa_entry *target)
+static void ll_post_statahead(struct ll_statahead_info *sai)
 {
         struct inode           *dir   = sai->sai_inode;
         struct inode           *child;
 {
         struct inode           *dir   = sai->sai_inode;
         struct inode           *child;
@@ -641,16 +631,11 @@ static void do_statahead_interpret(struct ll_statahead_info *sai,
         ENTRY;
 
        spin_lock(&lli->lli_sa_lock);
         ENTRY;
 
        spin_lock(&lli->lli_sa_lock);
-       if (target != NULL && target->se_req != NULL &&
-           !cfs_list_empty(&target->se_list)) {
-               entry = target;
-       } else if (unlikely(sa_received_empty(sai))) {
+       if (unlikely(sa_received_empty(sai))) {
                spin_unlock(&lli->lli_sa_lock);
                RETURN_EXIT;
                spin_unlock(&lli->lli_sa_lock);
                RETURN_EXIT;
-       } else {
-               entry = sa_first_received_entry(sai);
        }
        }
-
+       entry = sa_first_received_entry(sai);
        cfs_atomic_inc(&entry->se_refcount);
        cfs_list_del_init(&entry->se_list);
        spin_unlock(&lli->lli_sa_lock);
        cfs_atomic_inc(&entry->se_refcount);
        cfs_list_del_init(&entry->se_list);
        spin_unlock(&lli->lli_sa_lock);
@@ -712,8 +697,9 @@ out:
          * reference count by calling "ll_intent_drop_lock()" in spite of the
          * above operations failed or not. Do not worry about calling
          * "ll_intent_drop_lock()" more than once. */
          * reference count by calling "ll_intent_drop_lock()" in spite of the
          * above operations failed or not. Do not worry about calling
          * "ll_intent_drop_lock()" more than once. */
-        rc = ll_sa_entry_to_stated(sai, entry, rc < 0 ? rc : SA_ENTRY_SUCC);
-        if (rc == 0 && entry->se_index == sai->sai_index_wait && target == NULL)
+       rc = ll_sa_entry_to_stated(sai, entry,
+                                  rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
+       if (rc == 0 && entry->se_index == sai->sai_index_wait)
                 cfs_waitq_signal(&sai->sai_waitq);
         ll_sa_entry_put(sai, entry);
 }
                 cfs_waitq_signal(&sai->sai_waitq);
         ll_sa_entry_put(sai, entry);
 }
@@ -753,31 +739,28 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
                        GOTO(out, rc = -EIDRM);
                }
 
                        GOTO(out, rc = -EIDRM);
                }
 
-               cfs_list_del_init(&entry->se_list);
                if (rc != 0) {
                if (rc != 0) {
-                       sai->sai_replied++;
-                       do_sai_entry_to_stated(sai, entry, rc);
-                       spin_unlock(&lli->lli_sa_lock);
-                        if (entry->se_index == sai->sai_index_wait)
-                                cfs_waitq_signal(&sai->sai_waitq);
+                       do_sa_entry_to_stated(sai, entry, SA_ENTRY_INVA);
+                       wakeup = (entry->se_index == sai->sai_index_wait);
                 } else {
                 } else {
-                        entry->se_minfo = minfo;
-                        entry->se_req = ptlrpc_request_addref(req);
-                        /* Release the async ibits lock ASAP to avoid deadlock
-                         * when statahead thread tries to enqueue lock on parent
-                         * for readpage and other tries to enqueue lock on child
-                         * with parent's lock held, for example: unlink. */
-                        entry->se_handle = it->d.lustre.it_lock_handle;
-                        ll_intent_drop_lock(it);
-                        wakeup = sa_received_empty(sai);
-                        cfs_list_add_tail(&entry->se_list,
-                                          &sai->sai_entries_received);
-                        sai->sai_replied++;
-                       spin_unlock(&lli->lli_sa_lock);
-                        if (wakeup)
-                                cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
+                       entry->se_minfo = minfo;
+                       entry->se_req = ptlrpc_request_addref(req);
+                       /* Release the async ibits lock ASAP to avoid deadlock
+                        * when statahead thread tries to enqueue lock on parent
+                        * for readpage and other tries to enqueue lock on child
+                        * with parent's lock held, for example: unlink. */
+                       entry->se_handle = it->d.lustre.it_lock_handle;
+                       ll_intent_drop_lock(it);
+                       wakeup = sa_received_empty(sai);
+                       cfs_list_add_tail(&entry->se_list,
+                                         &sai->sai_entries_received);
                 }
                 }
-                ll_sa_entry_put(sai, entry);
+               sai->sai_replied++;
+               spin_unlock(&lli->lli_sa_lock);
+
+               ll_sa_entry_put(sai, entry);
+               if (wakeup)
+                       cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
         }
 
         EXIT;
         }
 
         EXIT;
@@ -1188,7 +1171,7 @@ keep_it:
 
 interpret_it:
                         while (!sa_received_empty(sai))
 
 interpret_it:
                         while (!sa_received_empty(sai))
-                                do_statahead_interpret(sai, NULL);
+                               ll_post_statahead(sai);
 
                         if (unlikely(!thread_is_running(thread))) {
                                 ll_release_page(page, 0);
 
                         if (unlikely(!thread_is_running(thread))) {
                                 ll_release_page(page, 0);
@@ -1243,7 +1226,7 @@ do_it:
                                              &lwi);
 
                                 while (!sa_received_empty(sai))
                                              &lwi);
 
                                 while (!sa_received_empty(sai))
-                                        do_statahead_interpret(sai, NULL);
+                                       ll_post_statahead(sai);
 
                                 if (unlikely(!thread_is_running(thread)))
                                         GOTO(out, rc = 0);
 
                                 if (unlikely(!thread_is_running(thread)))
                                         GOTO(out, rc = 0);
@@ -1309,7 +1292,7 @@ out:
 
                /* To release the resources held by received entries. */
                while (!sa_received_empty(sai))
 
                /* To release the resources held by received entries. */
                while (!sa_received_empty(sai))
-                       do_statahead_interpret(sai, NULL);
+                       ll_post_statahead(sai);
 
                spin_lock(&plli->lli_sa_lock);
        }
 
                spin_lock(&plli->lli_sa_lock);
        }
@@ -1611,10 +1594,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
                         RETURN(entry ? 1 : -EAGAIN);
                 }
 
                         RETURN(entry ? 1 : -EAGAIN);
                 }
 
+               /* if statahead is busy in readdir, help it do post-work */
                 while (!ll_sa_entry_stated(entry) &&
                        sai->sai_in_readpage &&
                        !sa_received_empty(sai))
                 while (!ll_sa_entry_stated(entry) &&
                        sai->sai_in_readpage &&
                        !sa_received_empty(sai))
-                        do_statahead_interpret(sai, entry);
+                       ll_post_statahead(sai);
 
                 if (!ll_sa_entry_stated(entry)) {
                         sai->sai_index_wait = entry->se_index;
 
                 if (!ll_sa_entry_stated(entry)) {
                         sai->sai_index_wait = entry->se_index;