Whamcloud - gitweb
LU-3270 statahead: small fixes and cleanup 67/9667/12
authorLai Siyao <lai.siyao@intel.com>
Fri, 14 Mar 2014 12:10:32 +0000 (20:10 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 19 Nov 2014 19:15:43 +0000 (19:15 +0000)
small fixes:
* when 'unplug' is set for ll_statahead(), sa_put() shouldn't kill
  the entry found, because its inflight RPC may not finish yet.
* remove 'sai_generation', add 'lli_sa_generation' because the
  former one is not safe to access without lock.
* revalidate_statahead_dentry() may fail to wait for statahead
  entry to become ready, in this case it should not release this
  entry, because it may be used by inflight statahead RPC.

cleanups:
* rename ll_statahead_enter() to ll_statahead().
* move dentry 'lld_sa_generation' update to ll_statahead() to
  simplify code and logic.
* other small cleanups.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: I65759c7dfcbe879b42f14152dbfe5949e3d37ea0
Reviewed-on: http://review.whamcloud.com/9667
Tested-by: Jenkins
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: James Simmons <uja.ornl@gmail.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/llite/dcache.c
lustre/llite/llite_internal.h
lustre/llite/namei.c
lustre/llite/statahead.c
lustre/mdc/mdc_request.c

index 71fc91e..ad9be91 100644 (file)
@@ -376,7 +376,7 @@ static int ll_revalidate_dentry(struct dentry *dentry,
        if (lookup_flags & (LOOKUP_OPEN | LOOKUP_CREATE))
                return 0;
 
        if (lookup_flags & (LOOKUP_OPEN | LOOKUP_CREATE))
                return 0;
 
-       if (!dentry_need_statahead(dir, dentry))
+       if (!dentry_may_statahead(dir, dentry))
                return 1;
 
 #ifndef HAVE_DCACHE_LOCK
                return 1;
 
 #ifndef HAVE_DCACHE_LOCK
@@ -384,8 +384,7 @@ static int ll_revalidate_dentry(struct dentry *dentry,
                return -ECHILD;
 #endif
 
                return -ECHILD;
 #endif
 
-       do_statahead_enter(dir, &dentry, dentry->d_inode == NULL);
-       ll_statahead_mark(dir, dentry);
+       ll_statahead(dir, &dentry, dentry->d_inode == NULL);
        return 1;
 }
 
        return 1;
 }
 
index 1a5623d..a3e1ff3 100644 (file)
@@ -182,49 +182,41 @@ struct ll_inode_info {
                /* for directory */
                struct {
                        /* serialize normal readdir and statahead-readdir. */
                /* for directory */
                struct {
                        /* serialize normal readdir and statahead-readdir. */
-                       struct mutex                    d_readdir_mutex;
+                       struct mutex                    lli_readdir_mutex;
 
                        /* metadata statahead */
                        /* since parent-child threads can share the same @file
                         * struct, "opendir_key" is the token when dir close for
                         * case of parent exit before child -- it is me should
                         * cleanup the dir readahead. */
 
                        /* metadata statahead */
                        /* since parent-child threads can share the same @file
                         * struct, "opendir_key" is the token when dir close for
                         * case of parent exit before child -- it is me should
                         * cleanup the dir readahead. */
-                       void                           *d_opendir_key;
-                       struct ll_statahead_info       *d_sai;
+                       void                           *lli_opendir_key;
+                       struct ll_statahead_info       *lli_sai;
                        /* protect statahead stuff. */
                        /* protect statahead stuff. */
-                       spinlock_t                      d_sa_lock;
+                       spinlock_t                      lli_sa_lock;
                        /* "opendir_pid" is the token when lookup/revalid
                         * -- I am the owner of dir statahead. */
                        /* "opendir_pid" is the token when lookup/revalid
                         * -- I am the owner of dir statahead. */
-                       pid_t                           d_opendir_pid;
+                       pid_t                           lli_opendir_pid;
                        /* stat will try to access statahead entries or start
                         * statahead if this flag is set, and this flag will be
                         * set upon dir open, and cleared when dir is closed,
                         * statahead hit ratio is too low, or start statahead
                         * thread failed. */
                        /* stat will try to access statahead entries or start
                         * statahead if this flag is set, and this flag will be
                         * set upon dir open, and cleared when dir is closed,
                         * statahead hit ratio is too low, or start statahead
                         * thread failed. */
-                       unsigned int                    d_sa_enabled:1;
+                       unsigned int                    lli_sa_enabled:1;
+                       /* generation for statahead */
+                       unsigned int                    lli_sa_generation;
                        /* directory stripe information */
                        /* directory stripe information */
-                       struct lmv_stripe_md            *d_lsm_md;
+                       struct lmv_stripe_md            *lli_lsm_md;
                        /* striped directory size */
                        /* striped directory size */
-                       loff_t                          d_stripe_size;
+                       loff_t                          lli_stripe_dir_size;
                        /* striped directory nlink */
                        /* striped directory nlink */
-                       __u64                           d_stripe_nlink;
-               } d;
-
-#define lli_readdir_mutex       u.d.d_readdir_mutex
-#define lli_opendir_key         u.d.d_opendir_key
-#define lli_sai                 u.d.d_sai
-#define lli_sa_lock             u.d.d_sa_lock
-#define lli_sa_enabled         u.d.d_sa_enabled
-#define lli_opendir_pid         u.d.d_opendir_pid
-#define lli_lsm_md             u.d.d_lsm_md
-#define lli_stripe_dir_size    u.d.d_stripe_size
-#define lli_stripe_dir_nlink   u.d.d_stripe_nlink
+                       __u64                           lli_stripe_dir_nlink;
+               };
 
                /* for non-directory */
                struct {
 
                /* for non-directory */
                struct {
-                       struct mutex                    f_size_mutex;
-                       char                            *f_symlink_name;
-                       __u64                           f_maxbytes;
+                       struct mutex                    lli_size_mutex;
+                       char                           *lli_symlink_name;
+                       __u64                           lli_maxbytes;
                        /*
                         * struct rw_semaphore {
                         *    signed long       count;     // align d.d_def_acl
                        /*
                         * struct rw_semaphore {
                         *    signed long       count;     // align d.d_def_acl
@@ -232,16 +224,16 @@ struct ll_inode_info {
                         *    struct list_head wait_list;
                         * }
                         */
                         *    struct list_head wait_list;
                         * }
                         */
-                       struct rw_semaphore             f_trunc_sem;
-                       struct range_lock_tree          f_write_tree;
+                       struct rw_semaphore             lli_trunc_sem;
+                       struct range_lock_tree          lli_write_tree;
 
 
-                       struct rw_semaphore             f_glimpse_sem;
-                       cfs_time_t                      f_glimpse_time;
-                       struct list_head                        f_agl_list;
-                       __u64                           f_agl_index;
+                       struct rw_semaphore             lli_glimpse_sem;
+                       cfs_time_t                      lli_glimpse_time;
+                       struct list_head                lli_agl_list;
+                       __u64                           lli_agl_index;
 
                        /* for writepage() only to communicate to fsync */
 
                        /* for writepage() only to communicate to fsync */
-                       int                             f_async_rc;
+                       int                             lli_async_rc;
 
                        /*
                         * whenever a process try to read/write the file, the
 
                        /*
                         * whenever a process try to read/write the file, the
@@ -251,22 +243,9 @@ struct ll_inode_info {
                         * so the read/write statistics for jobid will not be
                         * accurate if the file is shared by different jobs.
                         */
                         * so the read/write statistics for jobid will not be
                         * accurate if the file is shared by different jobs.
                         */
-                       char                     f_jobid[LUSTRE_JOBID_SIZE];
-               } f;
-
-#define lli_size_mutex          u.f.f_size_mutex
-#define lli_symlink_name        u.f.f_symlink_name
-#define lli_maxbytes            u.f.f_maxbytes
-#define lli_trunc_sem           u.f.f_trunc_sem
-#define lli_write_tree          u.f.f_write_tree
-#define lli_glimpse_sem        u.f.f_glimpse_sem
-#define lli_glimpse_time       u.f.f_glimpse_time
-#define lli_agl_list           u.f.f_agl_list
-#define lli_agl_index          u.f.f_agl_index
-#define lli_async_rc           u.f.f_async_rc
-#define lli_jobid              u.f.f_jobid
-
-       } u;
+                       char                    lli_jobid[LUSTRE_JOBID_SIZE];
+               };
+       };
 
         /* XXX: For following frequent used members, although they maybe special
          *      used for non-directory object, it is some time-wasting to check
 
         /* XXX: For following frequent used members, although they maybe special
          *      used for non-directory object, it is some time-wasting to check
@@ -1310,31 +1289,31 @@ void et_fini(struct eacl_table *et);
 
 /* per inode struct, for dir only */
 struct ll_statahead_info {
 
 /* per inode struct, for dir only */
 struct ll_statahead_info {
-       struct inode            *sai_inode;
-       atomic_t                sai_refcount;   /* when access this struct, hold
+       struct dentry          *sai_dentry;
+       atomic_t                sai_refcount;   /* when access this struct, hold
                                                 * refcount */
                                                 * refcount */
-       unsigned int            sai_generation; /* generation for statahead */
-       unsigned int            sai_max;        /* max ahead of lookup */
-       __u64                   sai_sent;       /* stat requests sent count */
-       __u64                   sai_replied;    /* stat requests which received
+       unsigned int            sai_max;        /* max ahead of lookup */
+       __u64                   sai_sent;       /* stat requests sent count */
+       __u64                   sai_replied;    /* stat requests which received
                                                 * reply */
                                                 * reply */
-       __u64                   sai_index;      /* index of statahead entry */
-       __u64                   sai_index_wait; /* index of entry which is the
+       __u64                   sai_index;      /* index of statahead entry */
+       __u64                   sai_index_wait; /* index of entry which is the
                                                 * caller is waiting for */
                                                 * caller is waiting for */
-       __u64                   sai_hit;        /* hit count */
-       __u64                   sai_miss;       /* miss count:
-                                                * for "ls -al" case, it
-                                                * includes hidden dentry miss;
+       __u64                   sai_hit;        /* hit count */
+       __u64                   sai_miss;       /* miss count:
+                                                * for "ls -al" case, includes
+                                                * hidden dentry miss;
                                                 * for "ls -l" case, it does not
                                                 * include hidden dentry miss.
                                                 * "sai_miss_hidden" is used for
                                                 * the later case.
                                                 */
                                                 * for "ls -l" case, it does not
                                                 * include hidden dentry miss.
                                                 * "sai_miss_hidden" is used for
                                                 * the later case.
                                                 */
-        unsigned int            sai_consecutive_miss; /* consecutive miss */
-        unsigned int            sai_miss_hidden;/* "ls -al", but first dentry
-                                                 * is not a hidden one */
-        unsigned int            sai_skip_hidden;/* skipped hidden dentry count */
-       unsigned int            sai_ls_all:1,   /* "ls -al", do stat-ahead for
+       unsigned int            sai_consecutive_miss; /* consecutive miss */
+       unsigned int            sai_miss_hidden;/* "ls -al", but first dentry
+                                                * is not a hidden one */
+       unsigned int            sai_skip_hidden;/* skipped hidden dentry count
+                                                */
+       unsigned int            sai_ls_all:1,   /* "ls -al", do stat-ahead for
                                                 * hidden entries */
                                sai_agl_valid:1,/* AGL is valid for the dir */
                                sai_in_readpage:1;/* statahead is in readdir()*/
                                                 * hidden entries */
                                sai_agl_valid:1,/* AGL is valid for the dir */
                                sai_in_readpage:1;/* statahead is in readdir()*/
@@ -1351,8 +1330,7 @@ struct ll_statahead_info {
        atomic_t                sai_cache_count; /* entry count in cache */
 };
 
        atomic_t                sai_cache_count; /* entry count in cache */
 };
 
-int do_statahead_enter(struct inode *dir, struct dentry **dentry,
-                       int only_unplug);
+int ll_statahead(struct inode *dir, struct dentry **dentry, bool unplug);
 void ll_authorize_statahead(struct inode *dir, void *key);
 void ll_deauthorize_statahead(struct inode *dir, void *key);
 
 void ll_authorize_statahead(struct inode *dir, void *key);
 void ll_deauthorize_statahead(struct inode *dir, void *key);
 
@@ -1368,24 +1346,10 @@ static inline int ll_glimpse_size(struct inode *inode)
        return rc;
 }
 
        return rc;
 }
 
-static inline void
-ll_statahead_mark(struct inode *dir, struct dentry *dentry)
-{
-       struct ll_inode_info     *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
-       struct ll_dentry_data    *ldd = ll_d2d(dentry);
-
-       /* not the same process, don't mark */
-       if (lli->lli_opendir_pid != current_pid())
-               return;
-
-       LASSERT(ldd != NULL);
-       if (sai != NULL)
-               ldd->lld_sa_generation = sai->sai_generation;
-}
-
+/* dentry may statahead when statahead is enabled and current process has opened
+ * parent directory, and this dentry hasn't accessed statahead cache before */
 static inline bool
 static inline bool
-dentry_need_statahead(struct inode *dir, struct dentry *dentry)
+dentry_may_statahead(struct inode *dir, struct dentry *dentry)
 {
        struct ll_inode_info  *lli;
        struct ll_dentry_data *ldd;
 {
        struct ll_inode_info  *lli;
        struct ll_dentry_data *ldd;
@@ -1406,38 +1370,27 @@ dentry_need_statahead(struct inode *dir, struct dentry *dentry)
        if (lli->lli_opendir_pid != current_pid())
                return false;
 
        if (lli->lli_opendir_pid != current_pid())
                return false;
 
-       ldd = ll_d2d(dentry);
        /*
        /*
-        * When stats a dentry, the system trigger more than once "revalidate"
-        * or "lookup", for "getattr", for "getxattr", and maybe for others.
-        * Under patchless client mode, the operation intent is not accurate,
-        * which maybe misguide the statahead thread. For example:
-        * The "revalidate" call for "getattr" and "getxattr" of a dentry maybe
-        * have the same operation intent -- "IT_GETATTR".
-        * In fact, one dentry should has only one chance to interact with the
-        * statahead thread, otherwise the statahead windows will be confused.
+        * When stating a dentry, kernel may trigger 'revalidate' or 'lookup'
+        * multiple times, eg. for 'getattr', 'getxattr' and etc.
+        * For patchless client, lookup intent is not accurate, which may
+        * misguide statahead. For example:
+        * The 'revalidate' call for 'getattr' and 'getxattr' of a dentry will
+        * have the same intent -- IT_GETATTR, while one dentry should access
+        * statahead cache once, otherwise statahead windows is messed up.
         * The solution is as following:
         * The solution is as following:
-        * Assign "lld_sa_generation" with "sai_generation" when a dentry
-        * "IT_GETATTR" for the first time, and the subsequent "IT_GETATTR"
-        * will bypass interacting with statahead thread for checking:
-        * "lld_sa_generation == lli_sai->sai_generation"
+        * Assign 'lld_sa_generation' with 'lli_sa_generation' when a dentry
+        * IT_GETATTR for the first time, and subsequent IT_GETATTR will
+        * bypass interacting with statahead cache by checking
+        * 'lld_sa_generation == lli->lli_sa_generation'.
         */
         */
-       if (ldd && lli->lli_sai &&
-           ldd->lld_sa_generation == lli->lli_sai->sai_generation)
+       ldd = ll_d2d(dentry);
+       if (ldd != NULL && ldd->lld_sa_generation == lli->lli_sa_generation)
                return false;
 
        return true;
 }
 
                return false;
 
        return true;
 }
 
-static inline int
-ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int only_unplug)
-{
-       if (!dentry_need_statahead(dir, *dentryp))
-               return -EAGAIN;
-
-       return do_statahead_enter(dir, dentryp, only_unplug);
-}
-
 /* llite ioctl register support rountine */
 enum llioc_iter {
         LLIOC_CONT = 0,
 /* llite ioctl register support rountine */
 enum llioc_iter {
         LLIOC_CONT = 0,
index f10df8a..041e2e1 100644 (file)
@@ -536,14 +536,11 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
        if (it == NULL || it->it_op == IT_GETXATTR)
                it = &lookup_it;
 
        if (it == NULL || it->it_op == IT_GETXATTR)
                it = &lookup_it;
 
-        if (it->it_op == IT_GETATTR) {
-                rc = ll_statahead_enter(parent, &dentry, 0);
-                if (rc == 1) {
-                        if (dentry == save)
-                                GOTO(out, retval = NULL);
-                        GOTO(out, retval = dentry);
-                }
-        }
+       if (it->it_op == IT_GETATTR && dentry_may_statahead(parent, dentry)) {
+               rc = ll_statahead(parent, &dentry, 0);
+               if (rc == 1)
+                       RETURN(dentry == save ? NULL : dentry);
+       }
 
        if (it->it_op & IT_CREAT)
                opc = LUSTRE_OPC_CREATE;
 
        if (it->it_op & IT_CREAT)
                opc = LUSTRE_OPC_CREATE;
@@ -578,16 +575,12 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
         }
         ll_lookup_finish_locks(it, dentry);
 
         }
         ll_lookup_finish_locks(it, dentry);
 
-        if (dentry == save)
-                GOTO(out, retval = NULL);
-        else
-                GOTO(out, retval = dentry);
- out:
-        if (req)
-                ptlrpc_req_finished(req);
-        if (it->it_op == IT_GETATTR && (retval == NULL || retval == dentry))
-                ll_statahead_mark(parent, dentry);
-        return retval;
+       retval = (dentry == save) ? NULL : dentry;
+       EXIT;
+
+out:
+       ptlrpc_req_finished(req);
+       return retval;
 }
 
 #ifdef HAVE_IOP_ATOMIC_OPEN
 }
 
 #ifdef HAVE_IOP_ATOMIC_OPEN
index ea69243..27c5c2e 100644 (file)
@@ -57,11 +57,11 @@ typedef enum {
 
 /* sa_entry is not refcounted: statahead thread allocates it and do async stat,
  * and in async stat callback ll_statahead_interpret() will add it into
 
 /* sa_entry is not refcounted: statahead thread allocates it and do async stat,
  * and in async stat callback ll_statahead_interpret() will add it into
- * sai_cb_entries, later statahead thread will call sa_handle_callback() to
+ * sai_interim_entries, later statahead thread will call sa_handle_callback() to
  * instantiate entry and move it into sai_entries, and then only scanner process
  * can access and free it. */
 struct sa_entry {
  * instantiate entry and move it into sai_entries, and then only scanner process
  * can access and free it. */
 struct sa_entry {
-       /* link into sai_cb_entries or sai_entries */
+       /* link into sai_interim_entries or sai_entries */
        struct list_head        se_list;
        /* link into sai hash table locally */
        struct list_head        se_hash;
        struct list_head        se_list;
        /* link into sai hash table locally */
        struct list_head        se_hash;
@@ -91,23 +91,20 @@ static inline int sa_unhashed(struct sa_entry *entry)
        return list_empty(&entry->se_hash);
 }
 
        return list_empty(&entry->se_hash);
 }
 
-/*
- * The entry only can be released by the caller, it is necessary to hold lock.
- */
+/* sa_entry is ready to use */
 static inline int sa_ready(struct sa_entry *entry)
 {
        smp_rmb();
        return (entry->se_state != SA_ENTRY_INIT);
 }
 
 static inline int sa_ready(struct sa_entry *entry)
 {
        smp_rmb();
        return (entry->se_state != SA_ENTRY_INIT);
 }
 
+/* hash value to put in sai_cache */
 static inline int sa_hash(int val)
 {
        return val & LL_SA_CACHE_MASK;
 }
 
 static inline int sa_hash(int val)
 {
        return val & LL_SA_CACHE_MASK;
 }
 
-/*
- * Insert entry to hash SA table.
- */
+/* hash entry into sai_cache */
 static inline void
 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
 static inline void
 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
@@ -118,9 +115,7 @@ sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
        spin_unlock(&sai->sai_cache_lock[i]);
 }
 
        spin_unlock(&sai->sai_cache_lock[i]);
 }
 
-/*
- * Remove entry from SA table.
- */
+/* unhash entry from sai_cache */
 static inline void
 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
 static inline void
 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
@@ -144,11 +139,13 @@ agl_first_entry(struct ll_statahead_info *sai)
                          lli_agl_list);
 }
 
                          lli_agl_list);
 }
 
+/* statahead window is full */
 static inline int sa_sent_full(struct ll_statahead_info *sai)
 {
        return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
 }
 
 static inline int sa_sent_full(struct ll_statahead_info *sai)
 {
        return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
 }
 
+/* got async stat replies */
 static inline int sa_has_callback(struct ll_statahead_info *sai)
 {
        return !list_empty(&sai->sai_interim_entries);
 static inline int sa_has_callback(struct ll_statahead_info *sai)
 {
        return !list_empty(&sai->sai_interim_entries);
@@ -172,7 +169,7 @@ static inline int sa_low_hit(struct ll_statahead_info *sai)
 }
 
 /*
 }
 
 /*
- * If the given index is behind of statahead window more than
+ * if the given index is behind of statahead window more than
  * SA_OMITTED_ENTRY_MAX, then it is old.
  */
 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
  * SA_OMITTED_ENTRY_MAX, then it is old.
  */
 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
@@ -181,7 +178,7 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
                  sai->sai_index);
 }
 
                  sai->sai_index);
 }
 
-/* allocate sa_entry and add it into hash to let scanner process to find it */
+/* allocate sa_entry and hash it to allow scanner process to find it */
 static struct sa_entry *
 sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
 {
 static struct sa_entry *
 sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
 {
@@ -210,7 +207,8 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
        entry->se_qstr.len = len;
        entry->se_qstr.name = dname;
 
        entry->se_qstr.len = len;
        entry->se_qstr.name = dname;
 
-       lli = ll_i2info(sai->sai_inode);
+       lli = ll_i2info(sai->sai_dentry->d_inode);
+
        spin_lock(&lli->lli_sa_lock);
        INIT_LIST_HEAD(&entry->se_list);
        sa_rehash(sai, entry);
        spin_lock(&lli->lli_sa_lock);
        INIT_LIST_HEAD(&entry->se_list);
        sa_rehash(sai, entry);
@@ -221,7 +219,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
        RETURN(entry);
 }
 
        RETURN(entry);
 }
 
-/* free sa_entry which should have been unhashed and not in any list */
+/* free sa_entry, which should have been unhashed and not in any list */
 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
        CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n",
 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
        CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n",
@@ -235,8 +233,9 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
        atomic_dec(&sai->sai_cache_count);
 }
 
        atomic_dec(&sai->sai_cache_count);
 }
 
-/* find sa_entry by name, used by directory scanner, lock is not needed because
- * only scanner can remove the entry from hash.
+/*
+ * find sa_entry by name, used by directory scanner, lock is not needed because
+ * only scanner can remove the entry from cache.
  */
 static struct sa_entry *
 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
  */
 static struct sa_entry *
 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
@@ -253,10 +252,11 @@ sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
        return NULL;
 }
 
        return NULL;
 }
 
+/* unhash and unlink sa_entry, and then free it */
 static inline void
 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
 static inline void
 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
-       struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+       struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 
        LASSERT(!sa_unhashed(entry));
        LASSERT(!list_empty(&entry->se_list));
 
        LASSERT(!sa_unhashed(entry));
        LASSERT(!list_empty(&entry->se_list));
@@ -281,7 +281,7 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
        struct sa_entry *tmp, *next;
 
        if (entry != NULL && entry->se_state == SA_ENTRY_SUCC) {
        struct sa_entry *tmp, *next;
 
        if (entry != NULL && entry->se_state == SA_ENTRY_SUCC) {
-               struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
+               struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
 
                sai->sai_hit++;
                sai->sai_consecutive_miss = 0;
 
                sai->sai_hit++;
                sai->sai_consecutive_miss = 0;
@@ -329,12 +329,14 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
        return (entry->se_index == sai->sai_index_wait);
 }
 
        return (entry->se_index == sai->sai_index_wait);
 }
 
-/* release resources used in async stat RPC, complete entry information and
- * wakeup if necessary */
+/*
+ * release resources used in async stat RPC, update entry state and wakeup if
+ * scanner process it waiting on this entry.
+ */
 static void
 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 {
 static void
 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 {
-       struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+       struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
        struct md_enqueue_info *minfo = entry->se_minfo;
        struct ptlrpc_request *req = entry->se_req;
        bool wakeup;
        struct md_enqueue_info *minfo = entry->se_minfo;
        struct ptlrpc_request *req = entry->se_req;
        bool wakeup;
@@ -360,14 +362,12 @@ sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
                wake_up(&sai->sai_waitq);
 }
 
                wake_up(&sai->sai_waitq);
 }
 
-/*
- * Insert inode into the list of sai_agls.
- */
+/* insert inode into the list of sai_agls */
 static void ll_agl_add(struct ll_statahead_info *sai,
                        struct inode *inode, int index)
 {
        struct ll_inode_info *child  = ll_i2info(inode);
 static void ll_agl_add(struct ll_statahead_info *sai,
                        struct inode *inode, int index)
 {
        struct ll_inode_info *child  = ll_i2info(inode);
-       struct ll_inode_info *parent = ll_i2info(sai->sai_inode);
+       struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
        int                   added  = 0;
 
        spin_lock(&child->lli_agl_lock);
        int                   added  = 0;
 
        spin_lock(&child->lli_agl_lock);
@@ -391,24 +391,20 @@ static void ll_agl_add(struct ll_statahead_info *sai,
                wake_up(&sai->sai_agl_thread.t_ctl_waitq);
 }
 
                wake_up(&sai->sai_agl_thread.t_ctl_waitq);
 }
 
-static struct ll_statahead_info *ll_sai_alloc(void)
+/* allocate sai */
+static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
 {
        struct ll_statahead_info *sai;
 {
        struct ll_statahead_info *sai;
-       int                       i;
+       struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
+       int i;
        ENTRY;
 
        OBD_ALLOC_PTR(sai);
        if (!sai)
                RETURN(NULL);
 
        ENTRY;
 
        OBD_ALLOC_PTR(sai);
        if (!sai)
                RETURN(NULL);
 
+       sai->sai_dentry = dget(dentry);
        atomic_set(&sai->sai_refcount, 1);
        atomic_set(&sai->sai_refcount, 1);
-
-       spin_lock(&sai_generation_lock);
-       sai->sai_generation = ++sai_generation;
-       if (unlikely(sai_generation == 0))
-               sai->sai_generation = ++sai_generation;
-       spin_unlock(&sai_generation_lock);
-
        sai->sai_max = LL_SA_RPC_MIN;
        sai->sai_index = 1;
        init_waitqueue_head(&sai->sai_waitq);
        sai->sai_max = LL_SA_RPC_MIN;
        sai->sai_index = 1;
        init_waitqueue_head(&sai->sai_waitq);
@@ -425,9 +421,27 @@ static struct ll_statahead_info *ll_sai_alloc(void)
        }
        atomic_set(&sai->sai_cache_count, 0);
 
        }
        atomic_set(&sai->sai_cache_count, 0);
 
+       spin_lock(&sai_generation_lock);
+       lli->lli_sa_generation = ++sai_generation;
+       if (unlikely(sai_generation == 0))
+               lli->lli_sa_generation = ++sai_generation;
+       spin_unlock(&sai_generation_lock);
+
        RETURN(sai);
 }
 
        RETURN(sai);
 }
 
+/* free sai */
+static inline void ll_sai_free(struct ll_statahead_info *sai)
+{
+       LASSERT(sai->sai_dentry != NULL);
+       dput(sai->sai_dentry);
+       OBD_FREE_PTR(sai);
+}
+
+/*
+ * take refcount of sai if sai for @dir exists, which means statahead is on for
+ * this directory.
+ */
 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
@@ -442,13 +456,17 @@ static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
        return sai;
 }
 
        return sai;
 }
 
+/*
+ * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
+ * attached to it.
+ */
 static void ll_sai_put(struct ll_statahead_info *sai)
 {
 static void ll_sai_put(struct ll_statahead_info *sai)
 {
-       struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+       struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 
        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
                struct sa_entry *entry, *next;
 
        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
                struct sa_entry *entry, *next;
-               struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
+               struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
 
                lli->lli_sai = NULL;
                spin_unlock(&lli->lli_sa_lock);
 
                lli->lli_sai = NULL;
                spin_unlock(&lli->lli_sa_lock);
@@ -465,8 +483,7 @@ static void ll_sai_put(struct ll_statahead_info *sai)
                LASSERT(atomic_read(&sai->sai_cache_count) == 0);
                LASSERT(agl_list_empty(sai));
 
                LASSERT(atomic_read(&sai->sai_cache_count) == 0);
                LASSERT(agl_list_empty(sai));
 
-               iput(sai->sai_inode);
-               OBD_FREE_PTR(sai);
+               ll_sai_free(sai);
                atomic_dec(&sbi->ll_sa_running);
        }
 }
                atomic_dec(&sbi->ll_sa_running);
        }
 }
@@ -534,12 +551,14 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
         EXIT;
 }
 
         EXIT;
 }
 
-/* prepare inode for sa entry, add it into agl list, now sa_entry is ready
- * to be used by scanner process. */
+/*
+ * prepare inode for sa entry, add it into agl list, now sa_entry is ready
+ * to be used by scanner process.
+ */
 static void sa_instantiate(struct ll_statahead_info *sai,
                                 struct sa_entry *entry)
 {
 static void sa_instantiate(struct ll_statahead_info *sai,
                                 struct sa_entry *entry)
 {
-       struct inode *dir = sai->sai_inode;
+       struct inode *dir = sai->sai_dentry->d_inode;
        struct inode *child;
        struct md_enqueue_info *minfo;
        struct lookup_intent *it;
        struct inode *child;
        struct md_enqueue_info *minfo;
        struct lookup_intent *it;
@@ -610,12 +629,12 @@ out:
        sa_make_ready(sai, entry, rc);
 }
 
        sa_make_ready(sai, entry, rc);
 }
 
-/* once there are async stat replies, instantiate sa_entry */
+/* once there are async stat replies, instantiate sa_entry from replies */
 static void sa_handle_callback(struct ll_statahead_info *sai)
 {
        struct ll_inode_info *lli;
 
 static void sa_handle_callback(struct ll_statahead_info *sai)
 {
        struct ll_inode_info *lli;
 
-       lli = ll_i2info(sai->sai_inode);
+       lli = ll_i2info(sai->sai_dentry->d_inode);
 
        while (sa_has_callback(sai)) {
                struct sa_entry *entry;
 
        while (sa_has_callback(sai)) {
                struct sa_entry *entry;
@@ -632,25 +651,13 @@ static void sa_handle_callback(struct ll_statahead_info *sai)
 
                sa_instantiate(sai, entry);
        }
 
                sa_instantiate(sai, entry);
        }
-
-       spin_lock(&lli->lli_agl_lock);
-       while (!agl_list_empty(sai)) {
-               struct ll_inode_info *clli;
-
-               clli = agl_first_entry(sai);
-               list_del_init(&clli->lli_agl_list);
-               spin_unlock(&lli->lli_agl_lock);
-
-               ll_agl_trigger(&clli->lli_vfs_inode, sai);
-
-               spin_lock(&lli->lli_agl_lock);
-       }
-       spin_unlock(&lli->lli_agl_lock);
 }
 
 }
 
-/* callback for async stat, because this is called in ptlrpcd context, we only
- * put sa_entry in sai_cb_entries list, and let sa_handle_callback() to really
- * prepare inode and instantiate sa_entry later. */
+/*
+ * callback for async stat RPC, because this is called in ptlrpcd context, we
+ * only put sa_entry in sai_interim_entries, and wake up statahead thread to
+ * really prepare inode and instantiate sa_entry later.
+ */
 static int ll_statahead_interpret(struct ptlrpc_request *req,
                                  struct md_enqueue_info *minfo, int rc)
 {
 static int ll_statahead_interpret(struct ptlrpc_request *req,
                                  struct md_enqueue_info *minfo, int rc)
 {
@@ -710,6 +717,7 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+/* finish async stat RPC arguments */
 static void sa_fini_data(struct md_enqueue_info *minfo,
                          struct ldlm_enqueue_info *einfo)
 {
 static void sa_fini_data(struct md_enqueue_info *minfo,
                          struct ldlm_enqueue_info *einfo)
 {
@@ -721,7 +729,9 @@ static void sa_fini_data(struct md_enqueue_info *minfo,
         OBD_FREE_PTR(einfo);
 }
 
         OBD_FREE_PTR(einfo);
 }
 
-/**
+/*
+ * prepare arguments for async stat RPC.
+ *
  * There is race condition between "capa_put" and "ll_statahead_interpret" for
  * accessing "op_data.op_capa[1,2]" as following:
  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
  * There is race condition between "capa_put" and "ll_statahead_interpret" for
  * accessing "op_data.op_capa[1,2]" as following:
  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
@@ -778,6 +788,7 @@ static int sa_prep_data(struct inode *dir, struct inode *child,
         return 0;
 }
 
         return 0;
 }
 
+/* async stat for file not found in dcache */
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
        struct md_enqueue_info   *minfo;
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
        struct md_enqueue_info   *minfo;
@@ -802,10 +813,11 @@ static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 }
 
 /**
 }
 
 /**
- * similar to ll_revalidate_it().
- * \retval      1 -- dentry valid
- * \retval      0 -- will send stat-ahead request
- * \retval others -- prepare stat-ahead request failed
+ * async stat for file found in dcache, similar to .revalidate
+ *
+ * \retval     1 dentry valid, no RPC sent
+ * \retval     0 dentry invalid, will send async stat RPC
+ * \retval     negative number upon error
  */
 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
                         struct dentry *dentry)
  */
 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
                         struct dentry *dentry)
@@ -854,6 +866,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+/* async stat for file with @name */
 static void sa_statahead(struct dentry *parent, const char *name, int len)
 {
        struct inode *dir = parent->d_inode;
 static void sa_statahead(struct dentry *parent, const char *name, int len)
 {
        struct inode *dir = parent->d_inode;
@@ -890,6 +903,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len)
        EXIT;
 }
 
        EXIT;
 }
 
+/* async glimpse (agl) thread main function */
 static int ll_agl_thread(void *arg)
 {
        struct dentry *parent = (struct dentry *)arg;
 static int ll_agl_thread(void *arg)
 {
        struct dentry *parent = (struct dentry *)arg;
@@ -961,6 +975,7 @@ static int ll_agl_thread(void *arg)
        RETURN(0);
 }
 
        RETURN(0);
 }
 
+/* start agl thread */
 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
 {
        struct ptlrpc_thread *thread = &sai->sai_agl_thread;
 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
 {
        struct ptlrpc_thread *thread = &sai->sai_agl_thread;
@@ -987,6 +1002,7 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
        EXIT;
 }
 
        EXIT;
 }
 
+/* statahead thread main function */
 static int ll_statahead_thread(void *arg)
 {
        struct dentry *parent = (struct dentry *)arg;
 static int ll_statahead_thread(void *arg)
 {
        struct dentry *parent = (struct dentry *)arg;
@@ -994,7 +1010,7 @@ static int ll_statahead_thread(void *arg)
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct ll_statahead_info *sai;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct ll_statahead_info *sai;
-       struct ptlrpc_thread *thread;
+       struct ptlrpc_thread *sa_thread;
        struct ptlrpc_thread *agl_thread;
        int first = 0;
        struct md_op_data *op_data;
        struct ptlrpc_thread *agl_thread;
        int first = 0;
        struct md_op_data *op_data;
@@ -1006,9 +1022,9 @@ static int ll_statahead_thread(void *arg)
        ENTRY;
 
        sai = ll_sai_get(dir);
        ENTRY;
 
        sai = ll_sai_get(dir);
-       thread = &sai->sai_thread;
+       sa_thread = &sai->sai_thread;
        agl_thread = &sai->sai_agl_thread;
        agl_thread = &sai->sai_agl_thread;
-       thread->t_pid = current_pid();
+       sa_thread->t_pid = current_pid();
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
 
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
 
@@ -1024,16 +1040,16 @@ static int ll_statahead_thread(void *arg)
 
        atomic_inc(&sbi->ll_sa_total);
        spin_lock(&lli->lli_sa_lock);
 
        atomic_inc(&sbi->ll_sa_total);
        spin_lock(&lli->lli_sa_lock);
-       if (thread_is_init(thread))
+       if (thread_is_init(sa_thread))
                /* If someone else has changed the thread state
                 * (e.g. already changed to SVC_STOPPING), we can't just
                 * blindly overwrite that setting. */
                /* If someone else has changed the thread state
                 * (e.g. already changed to SVC_STOPPING), we can't just
                 * blindly overwrite that setting. */
-               thread_set_flags(thread, SVC_RUNNING);
+               thread_set_flags(sa_thread, SVC_RUNNING);
        spin_unlock(&lli->lli_sa_lock);
        spin_unlock(&lli->lli_sa_lock);
-       wake_up(&thread->t_ctl_waitq);
+       wake_up(&sa_thread->t_ctl_waitq);
 
        ll_dir_chain_init(&chain);
 
        ll_dir_chain_init(&chain);
-       while (pos != MDS_DIR_END_OFF && thread_is_running(thread)) {
+       while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) {
                struct lu_dirpage *dp;
                struct lu_dirent  *ent;
 
                struct lu_dirpage *dp;
                struct lu_dirent  *ent;
 
@@ -1051,7 +1067,7 @@ static int ll_statahead_thread(void *arg)
 
                dp = page_address(page);
                for (ent = lu_dirent_start(dp);
 
                dp = page_address(page);
                for (ent = lu_dirent_start(dp);
-                    ent != NULL && thread_is_running(thread) &&
+                    ent != NULL && thread_is_running(sa_thread) &&
                     !sa_low_hit(sai);
                     ent = lu_dirent_next(ent)) {
                        __u64 hash;
                     !sa_low_hit(sai);
                     ent = lu_dirent_next(ent)) {
                        __u64 hash;
@@ -1101,16 +1117,32 @@ static int ll_statahead_thread(void *arg)
 
                        /* wait for spare statahead window */
                        do {
 
                        /* wait for spare statahead window */
                        do {
-                               l_wait_event(thread->t_ctl_waitq,
+                               l_wait_event(sa_thread->t_ctl_waitq,
                                             !sa_sent_full(sai) ||
                                             sa_has_callback(sai) ||
                                             !agl_list_empty(sai) ||
                                             !sa_sent_full(sai) ||
                                             sa_has_callback(sai) ||
                                             !agl_list_empty(sai) ||
-                                            !thread_is_running(thread),
+                                            !thread_is_running(sa_thread),
                                             &lwi);
 
                                sa_handle_callback(sai);
                                             &lwi);
 
                                sa_handle_callback(sai);
+
+                               spin_lock(&lli->lli_agl_lock);
+                               while (sa_sent_full(sai) &&
+                                      !agl_list_empty(sai)) {
+                                       struct ll_inode_info *clli;
+
+                                       clli = agl_first_entry(sai);
+                                       list_del_init(&clli->lli_agl_list);
+                                       spin_unlock(&lli->lli_agl_lock);
+
+                                       ll_agl_trigger(&clli->lli_vfs_inode,
+                                                       sai);
+
+                                       spin_lock(&lli->lli_agl_lock);
+                               }
+                               spin_unlock(&lli->lli_agl_lock);
                        } while (sa_sent_full(sai) &&
                        } while (sa_sent_full(sai) &&
-                                thread_is_running(thread));
+                                thread_is_running(sa_thread));
 
                        sa_statahead(parent, name, namelen);
                }
 
                        sa_statahead(parent, name, namelen);
                }
@@ -1137,18 +1169,17 @@ static int ll_statahead_thread(void *arg)
 
        if (rc < 0) {
                spin_lock(&lli->lli_sa_lock);
 
        if (rc < 0) {
                spin_lock(&lli->lli_sa_lock);
-               thread_set_flags(thread, SVC_STOPPING);
+               thread_set_flags(sa_thread, SVC_STOPPING);
                lli->lli_sa_enabled = 0;
                spin_unlock(&lli->lli_sa_lock);
        }
 
        /* statahead is finished, but statahead entries need to be cached, wait
         * for file release to stop me. */
                lli->lli_sa_enabled = 0;
                spin_unlock(&lli->lli_sa_lock);
        }
 
        /* statahead is finished, but statahead entries need to be cached, wait
         * for file release to stop me. */
-       while (thread_is_running(thread)) {
-               l_wait_event(thread->t_ctl_waitq,
+       while (thread_is_running(sa_thread)) {
+               l_wait_event(sa_thread->t_ctl_waitq,
                             sa_has_callback(sai) ||
                             sa_has_callback(sai) ||
-                            !agl_list_empty(sai) ||
-                            !thread_is_running(thread),
+                            !thread_is_running(sa_thread),
                             &lwi);
 
                sa_handle_callback(sai);
                             &lwi);
 
                sa_handle_callback(sai);
@@ -1178,7 +1209,7 @@ out:
                /* in case we're not woken up, timeout wait */
                lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3),
                                  NULL, NULL);
                /* in case we're not woken up, timeout wait */
                lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3),
                                  NULL, NULL);
-               l_wait_event(thread->t_ctl_waitq,
+               l_wait_event(sa_thread->t_ctl_waitq,
                        sai->sai_sent == sai->sai_replied, &lwi);
        }
 
                        sai->sai_sent == sai->sai_replied, &lwi);
        }
 
@@ -1186,19 +1217,20 @@ out:
        sa_handle_callback(sai);
 
        spin_lock(&lli->lli_sa_lock);
        sa_handle_callback(sai);
 
        spin_lock(&lli->lli_sa_lock);
-       thread_set_flags(thread, SVC_STOPPED);
+       thread_set_flags(sa_thread, SVC_STOPPED);
        spin_unlock(&lli->lli_sa_lock);
 
        spin_unlock(&lli->lli_sa_lock);
 
-       wake_up(&sai->sai_waitq);
-       wake_up(&thread->t_ctl_waitq);
-        ll_sai_put(sai);
        CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
        CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
-       dput(parent);
+
+       wake_up(&sai->sai_waitq);
+       wake_up(&sa_thread->t_ctl_waitq);
+       ll_sai_put(sai);
+
        return rc;
 }
 
        return rc;
 }
 
-/* authorize opened dir handle @key to statahead later */
+/* authorize opened dir handle @key to statahead */
 void ll_authorize_statahead(struct inode *dir, void *key)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
 void ll_authorize_statahead(struct inode *dir, void *key)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
@@ -1217,8 +1249,10 @@ void ll_authorize_statahead(struct inode *dir, void *key)
        spin_unlock(&lli->lli_sa_lock);
 }
 
        spin_unlock(&lli->lli_sa_lock);
 }
 
-/* deauthorize opened dir handle @key to statahead, but statahead thread may
- * still be running, notify it to quit. */
+/*
+ * deauthorize opened dir handle @key to statahead, and notify statahead thread
+ * to quit if it's running.
+ */
 void ll_deauthorize_statahead(struct inode *dir, void *key)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
 void ll_deauthorize_statahead(struct inode *dir, void *key)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
@@ -1247,20 +1281,21 @@ void ll_deauthorize_statahead(struct inode *dir, void *key)
 }
 
 enum {
 }
 
 enum {
-        /**
-         * not first dirent, or is "."
-         */
-        LS_NONE_FIRST_DE = 0,
-        /**
-         * the first non-hidden dirent
-         */
-        LS_FIRST_DE,
-        /**
-         * the first hidden dirent, that is "."
-         */
-        LS_FIRST_DOT_DE
+       /**
+        * not first dirent, or is "."
+        */
+       LS_NOT_FIRST_DE = 0,
+       /**
+        * the first non-hidden dirent
+        */
+       LS_FIRST_DE,
+       /**
+        * the first hidden dirent, that is "."
+        */
+       LS_FIRST_DOT_DE
 };
 
 };
 
+/* file is first dirent under @dir */
 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
 {
        struct ll_dir_chain   chain;
 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
 {
        struct ll_dir_chain   chain;
@@ -1268,7 +1303,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
        struct md_op_data    *op_data;
        int                   dot_de;
        struct page          *page = NULL;
        struct md_op_data    *op_data;
        int                   dot_de;
        struct page          *page = NULL;
-       int                   rc     = LS_NONE_FIRST_DE;
+       int                   rc = LS_NOT_FIRST_DE;
        __u64                 pos = 0;
        ENTRY;
 
        __u64                 pos = 0;
        ENTRY;
 
@@ -1347,7 +1382,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
 
                        if (target->len != namelen ||
                            memcmp(target->name, name, namelen) != 0)
 
                        if (target->len != namelen ||
                            memcmp(target->name, name, namelen) != 0)
-                               rc = LS_NONE_FIRST_DE;
+                               rc = LS_NOT_FIRST_DE;
                        else if (!dot_de)
                                rc = LS_FIRST_DE;
                        else
                        else if (!dot_de)
                                rc = LS_FIRST_DE;
                        else
@@ -1380,13 +1415,27 @@ out:
         return rc;
 }
 
         return rc;
 }
 
+/**
+ * revalidate @dentryp from statahead cache
+ *
+ * \param[in] dir      parent directory
+ * \param[in] sai      sai structure
+ * \param[out] dentryp pointer to dentry which will be revalidated
+ * \param[in] unplug   unplug statahead window only (normally for negative
+ *                     dentry)
+ * \retval             1 on success, dentry is saved in @dentryp
+ * \retval             0 if revalidation failed (no proper lock on client)
+ * \retval             negative number upon error
+ */
 static int revalidate_statahead_dentry(struct inode *dir,
                                        struct ll_statahead_info *sai,
                                        struct dentry **dentryp,
 static int revalidate_statahead_dentry(struct inode *dir,
                                        struct ll_statahead_info *sai,
                                        struct dentry **dentryp,
-                                       int only_unplug)
+                                       bool unplug)
 {
        struct sa_entry *entry = NULL;
        struct l_wait_info lwi = { 0 };
 {
        struct sa_entry *entry = NULL;
        struct l_wait_info lwi = { 0 };
+       struct ll_dentry_data *ldd;
+       struct ll_inode_info *lli;
        int rc = 0;
        ENTRY;
 
        int rc = 0;
        ENTRY;
 
@@ -1417,11 +1466,12 @@ static int revalidate_statahead_dentry(struct inode *dir,
                }
        }
 
                }
        }
 
+       if (unplug)
+               GOTO(out, rc = 1);
+
        entry = sa_get(sai, &(*dentryp)->d_name);
        entry = sa_get(sai, &(*dentryp)->d_name);
-       if (entry == NULL || only_unplug) {
-               sa_put(sai, entry);
-               RETURN(entry ? 1 : -EAGAIN);
-       }
+       if (entry == NULL)
+               GOTO(out, rc = -EAGAIN);
 
        /* if statahead is busy in readdir, help it do post-work */
        if (!sa_ready(entry) && sai->sai_in_readpage)
 
        /* if statahead is busy in readdir, help it do post-work */
        if (!sa_ready(entry) && sai->sai_in_readpage)
@@ -1436,8 +1486,12 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                thread_is_stopped(&sai->sai_thread),
                                &lwi);
                if (rc < 0) {
                                thread_is_stopped(&sai->sai_thread),
                                &lwi);
                if (rc < 0) {
-                       sa_put(sai, entry);
-                       RETURN(-EAGAIN);
+                       /*
+                        * entry may not be ready, so it may be used by inflight
+                        * statahead RPC, don't free it.
+                        */
+                       entry = NULL;
+                       GOTO(out, rc = -EAGAIN);
                }
        }
 
                }
        }
 
@@ -1455,11 +1509,12 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                struct dentry *alias;
 
                                alias = ll_splice_alias(inode, *dentryp);
                                struct dentry *alias;
 
                                alias = ll_splice_alias(inode, *dentryp);
-                               if (IS_ERR(alias)) {
-                                       sa_put(sai, entry);
-                                       RETURN(PTR_ERR(alias));
-                               }
+                               if (IS_ERR(alias))
+                                       GOTO(out, rc = PTR_ERR(alias));
                                *dentryp = alias;
                                *dentryp = alias;
+                               /* statahead prepared this inode, transfer inode
+                                * refcount from sa_entry to dentry */
+                               entry->se_inode = NULL;
                        } else if ((*dentryp)->d_inode != inode) {
                                /* revalidate, but inode is recreated */
                                CDEBUG(D_READA,
                        } else if ((*dentryp)->d_inode != inode) {
                                /* revalidate, but inode is recreated */
                                CDEBUG(D_READA,
@@ -1472,12 +1527,8 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                        (*dentryp)->d_name.name,
                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
                                        PFID(ll_inode2fid(inode)));
                                        (*dentryp)->d_name.name,
                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
                                        PFID(ll_inode2fid(inode)));
-                               sa_put(sai, entry);
-                               RETURN(-ESTALE);
-                       } else {
-                               iput(inode);
+                               GOTO(out, rc = -ESTALE);
                        }
                        }
-                       entry->se_inode = NULL;
 
                        if ((bits & MDS_INODELOCK_LOOKUP) &&
                            d_lustre_invalid(*dentryp))
 
                        if ((bits & MDS_INODELOCK_LOOKUP) &&
                            d_lustre_invalid(*dentryp))
@@ -1485,16 +1536,41 @@ static int revalidate_statahead_dentry(struct inode *dir,
                        ll_intent_release(&it);
                }
        }
                        ll_intent_release(&it);
                }
        }
-
+out:
+       /*
+        * statahead cached sa_entry can be used only once, and will be killed
+        * right after use, so if lookup/revalidate accessed statahead cache,
+        * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
+        * stat this file again, we know we've done statahead before, see
+        * dentry_may_statahead().
+        */
+       ldd = ll_d2d(*dentryp);
+       lli = ll_i2info(dir);
+       /* ldd can be NULL if llite lookup failed. */
+       if (ldd != NULL)
+               ldd->lld_sa_generation = lli->lli_sa_generation;
        sa_put(sai, entry);
        sa_put(sai, entry);
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+/**
+ * start statahead thread
+ *
+ * \param[in] dir      parent directory
+ * \param[in] dentry   dentry that triggers statahead, normally the first
+ *                     dirent under @dir
+ * \retval             -EAGAIN on success, because when this function is
+ *                     called, it's already in lookup call, so client should
+ *                     do it itself instead of waiting for statahead thread
+ *                     to do it asynchronously.
+ * \retval             negative number upon error
+ */
 static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = NULL;
 static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = NULL;
-       struct dentry *parent;
+       struct dentry *parent = dentry->d_parent;
        struct ptlrpc_thread *thread;
        struct l_wait_info lwi = { 0 };
        struct task_struct *task;
        struct ptlrpc_thread *thread;
        struct l_wait_info lwi = { 0 };
        struct task_struct *task;
@@ -1503,72 +1579,41 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
 
        /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
        rc = is_first_dirent(dir, dentry);
 
        /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
        rc = is_first_dirent(dir, dentry);
-       if (rc == LS_NONE_FIRST_DE)
+       if (rc == LS_NOT_FIRST_DE)
                /* It is not "ls -{a}l" operation, no need statahead for it. */
                /* It is not "ls -{a}l" operation, no need statahead for it. */
-               GOTO(out, rc = -EAGAIN);
+               GOTO(out, rc = -EFAULT);
 
 
-       sai = ll_sai_alloc();
+       sai = ll_sai_alloc(parent);
        if (sai == NULL)
                GOTO(out, rc = -ENOMEM);
 
        sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
        if (sai == NULL)
                GOTO(out, rc = -ENOMEM);
 
        sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
-       sai->sai_inode = igrab(dir);
-       if (unlikely(sai->sai_inode == NULL)) {
-               CWARN("Do not start stat ahead on dying inode "DFID"\n",
-                       PFID(&lli->lli_fid));
-               GOTO(out, rc = -ESTALE);
-       }
-
-       /* get parent reference count here, and put it in ll_statahead_thread */
-       parent = dget(dentry->d_parent);
-       if (unlikely(sai->sai_inode != parent->d_inode)) {
-               struct ll_inode_info *nlli = ll_i2info(parent->d_inode);
-
-               CWARN("Race condition, someone changed %.*s just now: "
-                       "old parent "DFID", new parent "DFID"\n",
-                       dentry->d_name.len, dentry->d_name.name,
-                       PFID(&lli->lli_fid), PFID(&nlli->lli_fid));
-               dput(parent);
-               iput(sai->sai_inode);
-               GOTO(out, rc = -EAGAIN);
-       }
 
 
-       CDEBUG(D_READA, "start statahead thread: sai %p, parent %.*s\n",
-              sai, parent->d_name.len, parent->d_name.name);
-
-       /* if another process started statahead thread, or deauthorized current
-        * lli_opendir_key, don't start statahead. */
+       /* if current lli_opendir_key was deauthorized, or dir re-opened by
+        * another process, don't start statahead, otherwise the newly spawned
+        * statahead thread won't be notified to quit. */
        spin_lock(&lli->lli_sa_lock);
        if (unlikely(lli->lli_sai != NULL ||
                     lli->lli_opendir_key == NULL ||
                     lli->lli_opendir_pid != current->pid)) {
                spin_unlock(&lli->lli_sa_lock);
        spin_lock(&lli->lli_sa_lock);
        if (unlikely(lli->lli_sai != NULL ||
                     lli->lli_opendir_key == NULL ||
                     lli->lli_opendir_pid != current->pid)) {
                spin_unlock(&lli->lli_sa_lock);
-
-               dput(parent);
-               iput(sai->sai_inode);
-               GOTO(out, rc = -EAGAIN);
+               GOTO(out, rc = -EPERM);
        }
        lli->lli_sai = sai;
        spin_unlock(&lli->lli_sa_lock);
 
        atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
 
        }
        lli->lli_sai = sai;
        spin_unlock(&lli->lli_sa_lock);
 
        atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
 
+       CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
+              current_pid(), parent->d_name.len, parent->d_name.name);
+
        task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
                           lli->lli_opendir_pid);
        thread = &sai->sai_thread;
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
        task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
                           lli->lli_opendir_pid);
        thread = &sai->sai_thread;
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
-               CERROR("cannot start ll_sa thread: rc = %d\n", rc);
-               dput(parent);
-
-               spin_lock(&lli->lli_sa_lock);
-               thread_set_flags(thread, SVC_STOPPED);
-               thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);
-               spin_unlock(&lli->lli_sa_lock);
-
-               ll_sai_put(sai);
-               LASSERT(lli->lli_sai == NULL);
-               RETURN(-EAGAIN);
+               CERROR("can't start ll_sa thread, rc: %d\n", rc);
+               GOTO(out, rc);
        }
 
        l_wait_event(thread->t_ctl_waitq,
        }
 
        l_wait_event(thread->t_ctl_waitq,
@@ -1583,29 +1628,35 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
        RETURN(-EAGAIN);
 
 out:
        RETURN(-EAGAIN);
 
 out:
-       if (sai != NULL)
-               OBD_FREE_PTR(sai);
-
-       /* once we start statahead thread failed, disable statahead so
-        * subsequent won't waste time to try it. */
+       /* once we start statahead thread failed, disable statahead so that
+        * subsequent stat won't waste time to try it. */
        spin_lock(&lli->lli_sa_lock);
        lli->lli_sa_enabled = 0;
        spin_lock(&lli->lli_sa_lock);
        lli->lli_sa_enabled = 0;
+       lli->lli_sai = NULL;
        spin_unlock(&lli->lli_sa_lock);
 
        spin_unlock(&lli->lli_sa_lock);
 
+       if (sai != NULL)
+               ll_sai_free(sai);
+
        RETURN(rc);
 }
 
 /**
        RETURN(rc);
 }
 
 /**
- * Start statahead thread if this is the first dir entry.
- * Otherwise if a thread is started already, wait it until it is ahead of me.
- * \retval 1       -- find entry with lock in cache, the caller needs to do
- *                   nothing.
- * \retval 0       -- find entry in cache, but without lock, the caller needs
- *                   refresh from MDS.
- * \retval others  -- the caller need to process as non-statahead.
+ * statahead entry function, this is called when client getattr on a file, it
+ * will start statahead thread if this is the first dir entry, else revalidate
+ * dentry from statahead cache.
+ *
+ * \param[in]  dir     parent directory
+ * \param[out] dentryp dentry to getattr
+ * \param[in]  unplug  unplug statahead window only (normally for negative
+ *                     dentry)
+ * \retval             1 on success
+ * \retval             0 revalidation from statahead cache failed, caller needs
+ *                     to getattr from server directly
+ * \retval             negative number on error, caller often ignores this and
+ *                     then getattr from server
  */
  */
-int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
-                      int only_unplug)
+int ll_statahead(struct inode *dir, struct dentry **dentryp, bool unplug)
 {
        struct ll_statahead_info *sai;
 
 {
        struct ll_statahead_info *sai;
 
@@ -1613,13 +1664,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
        if (sai != NULL) {
                int rc;
 
        if (sai != NULL) {
                int rc;
 
-               rc = revalidate_statahead_dentry(dir, sai, dentryp,
-                                                only_unplug);
+               rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
                CDEBUG(D_READA, "revalidate statahead %.*s: %d.\n",
                        (*dentryp)->d_name.len, (*dentryp)->d_name.name, rc);
                ll_sai_put(sai);
                return rc;
        }
                CDEBUG(D_READA, "revalidate statahead %.*s: %d.\n",
                        (*dentryp)->d_name.len, (*dentryp)->d_name.name, rc);
                ll_sai_put(sai);
                return rc;
        }
-
        return start_statahead_thread(dir, *dentryp);
 }
        return start_statahead_thread(dir, *dentryp);
 }
index 9ce58a9..09cfac5 100644 (file)
@@ -1539,7 +1539,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
                                            rp_param.rp_hash64),
                               mdc_read_page_remote, &rp_param);
        if (IS_ERR(page)) {
                                            rp_param.rp_hash64),
                               mdc_read_page_remote, &rp_param);
        if (IS_ERR(page)) {
-               CERROR("%s: read cache page: "DFID" at "LPU64": rc %ld\n",
+               CDEBUG(D_INFO, "%s: read cache page: "DFID" at "LPU64": %ld\n",
                       exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
                       rp_param.rp_off, PTR_ERR(page));
                GOTO(out_unlock, rc = PTR_ERR(page));
                       exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
                       rp_param.rp_off, PTR_ERR(page));
                GOTO(out_unlock, rc = PTR_ERR(page));