Whamcloud - gitweb
Branch b1_6
authorfanyong <fanyong>
Thu, 24 Apr 2008 04:41:11 +0000 (04:41 +0000)
committerfanyong <fanyong>
Thu, 24 Apr 2008 04:41:11 +0000 (04:41 +0000)
b=15406
i=huanghua
i=tappro
i=vitaly

Back port dir_SA fixes from HEAD (b1_8_dir_ra) to b1_6.

12 files changed:
lustre/include/lustre_mds.h
lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch
lustre/llite/dcache.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_nfs.c
lustre/llite/lproc_llite.c
lustre/llite/namei.c
lustre/llite/statahead.c
lustre/mdc/mdc_locks.c
lustre/tests/sanity.sh

index d391367..47d5073 100644 (file)
@@ -243,6 +243,7 @@ struct md_enqueue_info {
         struct lustre_handle    mi_lockh;
         struct dentry          *mi_dentry;
         md_enqueue_cb_t         mi_cb;
+        unsigned int            mi_generation;
         void                   *mi_cbdata;
 };
 
index ee655c3..66e65fb 100644 (file)
@@ -1205,8 +1205,9 @@ diff -rup RH_2_6_9_55.orig/fs/stat.c RH_2_6_9_55/fs/stat.c
        int error;
 +      intent_init(&nd.intent, IT_GETATTR);
 
-       error = user_path_walk(name, &nd);
-       if (!error) {
+-      error = user_path_walk(name, &nd);
++      error = user_path_walk_it(name, &nd);
+       if (!error) {
 -              error = vfs_getattr64(nd.mnt, nd.dentry, stat);
 +              error = vfs_getattr64_it(nd.mnt, nd.dentry, &nd.intent, stat);
                path_release(&nd);
@@ -1218,8 +1219,9 @@ diff -rup RH_2_6_9_55.orig/fs/stat.c RH_2_6_9_55/fs/stat.c
        int error;
 +      intent_init(&nd.intent, IT_GETATTR);
 
-       error = user_path_walk_link(name, &nd);
-       if (!error) {
+-      error = user_path_walk_link(name, &nd);
++      error = user_path_walk_link_it(name, &nd);
+       if (!error) {
 -              error = vfs_getattr64(nd.mnt, nd.dentry, stat);
 +              error = vfs_getattr64_it(nd.mnt, nd.dentry, &nd.intent, stat);
                path_release(&nd);
index 81c09e4..96e01b1 100644 (file)
@@ -118,11 +118,21 @@ void ll_set_dd(struct dentry *de)
         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
                atomic_read(&de->d_count));
-        lock_kernel();
+
         if (de->d_fsdata == NULL) {
-                OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data));
+                struct ll_dentry_data *lld;
+
+                OBD_ALLOC(lld, sizeof(struct ll_dentry_data));
+                if (likely(lld != NULL)) {
+                        cfs_waitq_init(&lld->lld_waitq);
+                        lock_dentry(de);
+                        if (likely(de->d_fsdata == NULL))
+                                de->d_fsdata = lld;
+                        else
+                                OBD_FREE(lld, sizeof(struct ll_dentry_data));
+                        unlock_dentry(de);
+                }
         }
-        unlock_kernel();
 
         EXIT;
 }
@@ -357,10 +367,9 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
                         RETURN(0);
 #endif
 
-                rc = ll_have_md_lock(de->d_parent->d_inode, 
+                rc = ll_have_md_lock(de->d_parent->d_inode,
                                      MDS_INODELOCK_UPDATE);
-        
-                RETURN(rc);
+                GOTO(out_sa, rc);
         }
 
         exp = ll_i2mdcexp(de->d_inode);
@@ -368,12 +377,12 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
         /* Never execute intents for mount points.
          * Attributes will be fixed up in ll_inode_revalidate_it */
         if (d_mountpoint(de))
-                RETURN(1);
+                GOTO(out_sa, rc = 1);
 
         /* Root of the lustre tree. Always valid.
          * Attributes will be fixed up in ll_inode_revalidate_it */
         if (de == de->d_sb->s_root)
-                RETURN(1);
+                GOTO(out_sa, rc = 1);
 
         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
         ll_frob_intent(&it, &lookup_it);
@@ -401,7 +410,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
                 } else if (it->it_flags & FMODE_EXEC) {
                         och_p = &lli->lli_mds_exec_och;
                         och_usecount = &lli->lli_open_fd_exec_count;
-                 } else {
+                } else {
                         och_p = &lli->lli_mds_read_och;
                         och_usecount = &lli->lli_open_fd_read_count;
                 }
@@ -540,6 +549,19 @@ do_lookup:
                 ll_intent_release(it);
         }
         GOTO(out, rc = 0);
+
+out_sa:
+        /*
+         * For rc == 1 case, should not return directly to prevent losing
+         * statahead windows; for rc == 0 case, the "lookup" will be done later.
+         */
+        if (it && it->it_op == IT_GETATTR && rc == 1) {
+                first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
+                if (!first)
+                        ll_statahead_exit(de, rc);
+        }
+
+        return rc;
 }
 
 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
@@ -728,3 +750,45 @@ struct dentry_operations ll_d_ops = {
         .d_unpin = ll_unpin,
 #endif
 };
+
+static int ll_fini_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
+{
+        ENTRY;
+        /* need lookup */
+        RETURN(0);
+}
+
+struct dentry_operations ll_fini_d_ops = {
+        .d_revalidate = ll_fini_revalidate_nd,
+        .d_release = ll_release,
+};
+
+/*
+ * It is for the following race condition:
+ * When someone (maybe statahead thread) adds the dentry to the dentry hash
+ * table, the dentry's "d_op" maybe NULL, at the same time, another (maybe
+ * "ls -l") process finds such dentry by "do_lookup()" without "do_revalidate()"
+ * called. It causes statahead window lost, and maybe other issues. --Fan Yong
+ */
+static int ll_init_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
+{
+        struct l_wait_info lwi = { 0 };
+        struct ll_dentry_data *lld;
+        ENTRY;
+
+        ll_set_dd(dentry);
+        lld = ll_d2d(dentry);
+        if (unlikely(lld == NULL))
+                RETURN(-ENOMEM);
+
+        l_wait_event(lld->lld_waitq, dentry->d_op != &ll_init_d_ops, &lwi);
+        if (likely(dentry->d_op == &ll_d_ops))
+                RETURN(ll_revalidate_nd(dentry, nd));
+        else
+                RETURN(dentry->d_op == &ll_fini_d_ops ? 0 : -EINVAL);
+}
+
+struct dentry_operations ll_init_d_ops = {
+        .d_revalidate = ll_init_revalidate_nd,
+        .d_release = ll_release,
+};
index 518e40a..d50d49a 100644 (file)
@@ -235,17 +235,26 @@ int ll_file_release(struct inode *inode, struct file *file)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
 
-        if (S_ISDIR(inode->i_mode))
-                ll_stop_statahead(inode);
 
-        /* don't do anything for / */
-        if (inode->i_sb->s_root == file->f_dentry)
-                RETURN(0);
-
-        ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
+        if (inode->i_sb->s_root != file->f_dentry)
+                ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
         fd = LUSTRE_FPRIVATE(file);
         LASSERT(fd != NULL);
 
+        /*
+         * The last ref on @file, maybe not the the owner pid of statahead.
+         * Different processes can open the same dir, "ll_opendir_key" means:
+         * it is me that should stop the statahead thread.
+         */
+        if (lli->lli_opendir_key == fd)
+                ll_stop_statahead(inode, fd);
+
+        if (inode->i_sb->s_root == file->f_dentry) {
+                LUSTRE_FPRIVATE(file) = NULL;
+                ll_file_data_put(fd);
+                RETURN(0);
+        }
+        
         if (lsm)
                 lov_test_and_clear_async_rc(lsm);
         lli->lli_async_rc = 0;
@@ -384,19 +393,12 @@ int ll_file_open(struct inode *inode, struct file *file)
         struct obd_client_handle **och_p;
         __u64 *och_usecount;
         struct ll_file_data *fd;
-        int rc = 0;
+        int rc = 0, opendir_set = 0;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
                inode->i_generation, inode, file->f_flags);
 
-        if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0)
-                lli->lli_opendir_pid = current->pid;
-
-        /* don't do anything for / */
-        if (inode->i_sb->s_root == file->f_dentry)
-                RETURN(0);
-
 #ifdef HAVE_VFS_INTENT_PATCHES
         it = file->f_it;
 #else
@@ -405,10 +407,40 @@ int ll_file_open(struct inode *inode, struct file *file)
 #endif
 
         fd = ll_file_data_get();
-        if (fd == NULL) {
-                lli->lli_opendir_pid = 0;
+        if (fd == NULL)
                 RETURN(-ENOMEM);
+
+        if (S_ISDIR(inode->i_mode)) {
+                spin_lock(&lli->lli_lock);
+                /*
+                 * "lli->lli_opendir_pid != 0" means someone has set it.
+                 * "lli->lli_sai != NULL" means the previous statahead has not
+                 *                        been cleanup.
+                 */ 
+                if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
+                        opendir_set = 1;
+                        lli->lli_opendir_pid = cfs_curproc_pid();
+                        lli->lli_opendir_key = fd;
+                } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
+                        /* Two cases for this:
+                         * (1) The same process open such directory many times.
+                         * (2) The old process opened the directory, and exited
+                         *     before its children processes. Then new process
+                         *     with the same pid opens such directory before the
+                         *     old process's children processes exit.
+                         * Change the owner to the latest one.
+                         */
+                        opendir_set = 2;
+                        lli->lli_opendir_key = fd;
+                }
+                spin_unlock(&lli->lli_lock);
         }
+
+        if (inode->i_sb->s_root == file->f_dentry) {
+                LUSTRE_FPRIVATE(file) = fd;
+                RETURN(0);
+        }
+
         if (!it || !it->d.lustre.it_disposition) {
                 /* Convert f_flags into access mode. We cannot use file->f_mode,
                  * because everything but O_ACCMODE mask was stripped from it */
@@ -547,7 +579,12 @@ out_och_free:
                 }
                 up(&lli->lli_och_sem);
 out_openerr:
-                lli->lli_opendir_pid = 0;
+                if (opendir_set) {
+                        lli->lli_opendir_key = NULL;
+                        lli->lli_opendir_pid = 0;
+                } else if (unlikely(opendir_set == 2)) {
+                        ll_stop_statahead(inode, fd);
+                }
         }
         return rc;
 }
index 5774f86..3ae815a 100644 (file)
@@ -54,11 +54,12 @@ struct ll_dentry_data {
         struct obd_client_handle lld_cwd_och;
         struct obd_client_handle lld_mnt_och;
 #ifndef HAVE_VFS_INTENT_PATCHES
-        struct lookup_intent     *lld_it;
+        struct lookup_intent    *lld_it;
 #endif
+        cfs_waitq_t              lld_waitq;
 };
 
-#define ll_d2d(de) ((struct ll_dentry_data*) de->d_fsdata)
+#define ll_d2d(de) ((struct ll_dentry_data*)((de)->d_fsdata))
 
 extern struct file_operations ll_pgcache_seq_fops;
 
@@ -113,7 +114,16 @@ struct ll_inode_info {
 #endif
 
         /* metadata stat-ahead */
+        /*
+         * "opendir_pid" is the token when lookup/revalid -- I am the owner of
+         * dir statahead.
+         */
         pid_t                   lli_opendir_pid;
+        /* 
+         * since parent-child threads can share the same @file struct,
+         * "opendir_key" is the token when dir close for case of parent exit
+         * before child -- it is me should cleanup the dir readahead. */
+        void                   *lli_opendir_key;
         struct ll_statahead_info *lli_sai;
 };
 
@@ -276,15 +286,16 @@ struct ll_sb_info {
                                                  * clustred nfs */
 
         /* metadata stat-ahead */
-        unsigned int              ll_sa_count; /* current statahead RPCs */
-        unsigned int              ll_sa_max;   /* max statahead RPCs */
-        unsigned int              ll_sa_wrong; /* statahead thread stopped for
-                                                * low hit ratio */
-        unsigned int              ll_sa_total; /* statahead thread started
-                                                * count */
+        unsigned int              ll_sa_max;     /* max statahead RPCs */
+        unsigned int              ll_sa_wrong;   /* statahead thread stopped for
+                                                  * low hit ratio */
+        unsigned int              ll_sa_total;   /* statahead thread started
+                                                  * count */
         unsigned long long        ll_sa_blocked; /* ls count waiting for
                                                   * statahead */
         unsigned long long        ll_sa_cached;  /* ls count got in cache */
+        unsigned long long        ll_sa_hit;     /* hit count */
+        unsigned long long        ll_sa_miss;    /* miss count */
 };
 
 #define LL_DEFAULT_MAX_RW_CHUNK         (32 * 1024 * 1024)
@@ -613,6 +624,9 @@ int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmm,
                      int *lmm_size, struct ptlrpc_request **request);
 
 /* llite/dcache.c */
+extern struct dentry_operations ll_init_d_ops;
+extern struct dentry_operations ll_d_ops;
+extern struct dentry_operations ll_fini_d_ops;
 void ll_intent_drop_lock(struct lookup_intent *);
 void ll_intent_release(struct lookup_intent *);
 extern void ll_set_dd(struct dentry *de);
@@ -815,34 +829,70 @@ int ll_removexattr(struct dentry *dentry, const char *name);
 
 /* statahead.c */
 
-#define LL_STATAHEAD_MIN  1
-#define LL_STATAHEAD_DEF  32
-#define LL_STATAHEAD_MAX  10000
+#define LL_SA_RPC_MIN   2
+#define LL_SA_RPC_DEF   32
+#define LL_SA_RPC_MAX   8192
 
 /* per inode struct, for dir only */
 struct ll_statahead_info {
         struct inode           *sai_inode;
-        atomic_t                sai_refc;       /* when access this struct, hold
+        unsigned int            sai_generation; /* generation for statahead */
+        atomic_t                sai_refcount;   /* when access this struct, hold
                                                  * refcount */
-        unsigned int            sai_max;        /* max ahead of lookup */
         unsigned int            sai_sent;       /* stat requests sent count */
         unsigned int            sai_replied;    /* stat requests which received
                                                  * reply */
-        unsigned int            sai_cached;     /* UPDATE lock cached locally
-                                                 * already */
+        unsigned int            sai_max;        /* max ahead of lookup */
+        unsigned int            sai_index;      /* index of statahead entry */
         unsigned int            sai_hit;        /* hit count */
-        unsigned int            sai_miss;       /* miss count */
+        unsigned int            sai_miss;       /* miss count:
+                                                 * for "ls -al" case, it includes
+                                                 * hidden dentry miss;
+                                                 * for "ls -l" case, it does not
+                                                 * include hidden dentry miss.
+                                                 * "sai_miss_hidden" is used for
+                                                 * the later case.
+                                                 */
         unsigned int            sai_consecutive_miss; /* consecutive miss */
-        unsigned                sai_ls_all:1;   /* ls -al, do stat-ahead for
+        unsigned int            sai_miss_hidden;/* "ls -al", but first dentry
+                                                 * is not a hidden one */
+        unsigned int            sai_skip_hidden;/* skipped hidden dentry count */
+        unsigned int            sai_ls_all:1;   /* "ls -al", do stat-ahead for
                                                  * hidden entries */
+        cfs_waitq_t             sai_waitq;      /* stat-ahead wait queue */
         struct ptlrpc_thread    sai_thread;     /* stat-ahead thread */
         struct list_head        sai_entries;    /* stat-ahead entries */
-        unsigned int            sai_entries_nr; /* stat-ahead entries count */
 };
 
-int ll_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup);
+int do_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup);
 void ll_statahead_exit(struct dentry *dentry, int result);
-void ll_stop_statahead(struct inode *inode);
+void ll_stop_statahead(struct inode *inode, void *key);
+
+static inline
+void ll_d_wakeup(struct dentry *dentry)
+{
+        struct ll_dentry_data *lld = ll_d2d(dentry);
+
+        LASSERT(dentry->d_op != &ll_init_d_ops);
+        if (lld != NULL)
+                cfs_waitq_broadcast(&lld->lld_waitq);
+}
+
+static inline
+int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
+{
+        struct ll_sb_info        *sbi = ll_i2sbi(dir);
+        struct ll_inode_info     *lli = ll_i2info(dir);
+
+        if (sbi->ll_sa_max == 0)
+                return -ENOTSUPP;
+
+        /* not the same process, don't statahead */
+        if (lli->lli_opendir_pid != cfs_curproc_pid())
+                return -EBADF;
+
+        return do_statahead_enter(dir, dentryp, lookup);
+}
 
 /* llite ioctl register support rountine */
 #ifdef __KERNEL__
index d28a16e..54fcd63 100644 (file)
@@ -108,7 +108,7 @@ static struct ll_sb_info *ll_init_sbi(void)
         }
 
         /* metadata statahead is enabled by default */
-        sbi->ll_sa_max = LL_STATAHEAD_DEF;
+        sbi->ll_sa_max = LL_SA_RPC_DEF;
 
         RETURN(sbi);
 }
@@ -1233,6 +1233,7 @@ void ll_clear_inode(struct inode *inode)
         if (S_ISDIR(inode->i_mode)) {
                 /* these should have been cleared in ll_file_release */
                 LASSERT(lli->lli_sai == NULL);
+                LASSERT(lli->lli_opendir_key == NULL);
                 LASSERT(lli->lli_opendir_pid == 0);
         }
 
index 6ba424a..03dd480 100644 (file)
@@ -98,8 +98,6 @@ static struct inode * search_inode_for_lustre(struct super_block *sb,
         RETURN(inode);
 }
 
-extern struct dentry_operations ll_d_ops;
-
 static struct dentry *ll_iget_for_nfs(struct super_block *sb, unsigned long ino,
                                       __u32 generation, umode_t mode)
 {
@@ -162,7 +160,18 @@ static struct dentry *ll_iget_for_nfs(struct super_block *sb, unsigned long ino,
 
 #endif
         ll_set_dd(result);
-        result->d_op = &ll_d_ops;
+
+        lock_dentry(result);
+        if (unlikely(result->d_op == &ll_init_d_ops)) {
+                result->d_op = &ll_d_ops;
+                unlock_dentry(result);
+                smp_wmb();
+                ll_d_wakeup(result);
+        } else {
+                result->d_op = &ll_d_ops;
+                unlock_dentry(result);
+        }
+
         RETURN(result);
 }
 
index a50453e..04a7f90 100644 (file)
@@ -463,15 +463,6 @@ static int ll_wr_contention_time(struct file *file, const char *buffer,
                 count;
 }
 
-static int ll_rd_statahead_count(char *page, char **start, off_t off,
-                                 int count, int *eof, void *data)
-{
-        struct super_block *sb = data;
-        struct ll_sb_info *sbi = ll_s2sbi(sb);
-
-        return snprintf(page, count, "%u\n", sbi->ll_sa_count);
-}
-
 static int ll_rd_statahead_max(char *page, char **start, off_t off,
                                int count, int *eof, void *data)
 {
@@ -491,11 +482,12 @@ static int ll_wr_statahead_max(struct file *file, const char *buffer,
         rc = lprocfs_write_helper(buffer, count, &val);
         if (rc)
                 return rc;
-        if (val >= 0 && val <= LL_STATAHEAD_MAX)
+
+        if (val >= 0 && val <= LL_SA_RPC_MAX)
                 sbi->ll_sa_max = val;
         else
                 CERROR("Bad statahead_max value %d. Valid values are in the "
-                       "range [0, %d]\n", val, LL_STATAHEAD_MAX);
+                       "range [0, %d]\n", val, LL_SA_RPC_MAX);
 
         return count;
 }
@@ -510,10 +502,15 @@ static int ll_rd_statahead_stats(char *page, char **start, off_t off,
                         "statahead wrong: %u\n"
                         "statahead total: %u\n"
                         "ls blocked:      %llu\n"
-                        "ls total:        %llu\n",
-                        sbi->ll_sa_wrong, sbi->ll_sa_total,
+                        "ls cached:       %llu\n"
+                        "hit count:       %llu\n"
+                        "miss count:      %llu\n",
+                        sbi->ll_sa_wrong,
+                        sbi->ll_sa_total,
                         sbi->ll_sa_blocked,
-                        sbi->ll_sa_blocked + sbi->ll_sa_cached);
+                        sbi->ll_sa_cached,
+                        sbi->ll_sa_hit,
+                        sbi->ll_sa_miss);
 }
 
 static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
@@ -538,9 +535,8 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
         { "stats_track_gid",  ll_rd_track_gid, ll_wr_track_gid, 0 },
         { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
-        { "statahead_count", ll_rd_statahead_count, 0, 0 },
-        { "statahead_max",   ll_rd_statahead_max, ll_wr_statahead_max, 0 },
-        { "statahead_stats", ll_rd_statahead_stats, 0, 0 },
+        { "statahead_max",      ll_rd_statahead_max, ll_wr_statahead_max, 0 },
+        { "statahead_stats",    ll_rd_statahead_stats, 0, 0 },
         { 0 }
 };
 
index e11e89d..2a0733f 100644 (file)
@@ -98,8 +98,6 @@ static int ll_test_inode(struct inode *inode, void *opaque)
         return 1;
 }
 
-extern struct dentry_operations ll_d_ops;
-
 int ll_unlock(__u32 mode, struct lustre_handle *lockh)
 {
         ENTRY;
@@ -444,24 +442,44 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
 }
 
 int lookup_it_finish(struct ptlrpc_request *request, int offset,
-                            struct lookup_intent *it, void *data)
+                     struct lookup_intent *it, void *data)
 {
         struct it_cb_data *icbd = data;
         struct dentry **de = icbd->icbd_childp;
         struct inode *parent = icbd->icbd_parent;
         struct ll_sb_info *sbi = ll_i2sbi(parent);
         struct inode *inode = NULL;
-        int rc;
+        int set = 0, rc;
+        ENTRY;
+
+        lock_dentry(*de);
+        if (likely((*de)->d_op != &ll_d_ops)) {
+                (*de)->d_op = &ll_init_d_ops;
+                set = 1;
+        }
+        unlock_dentry(*de);
 
         /* NB 1 request reference will be taken away by ll_intent_lock()
          * when I return */
         if (!it_disposition(it, DISP_LOOKUP_NEG)) {
-                ENTRY;
+                struct dentry *save = *de;
 
                 rc = ll_prep_inode(sbi->ll_osc_exp, &inode, request, offset,
                                    (*de)->d_sb);
-                if (rc)
+                if (rc) {
+                        if (set) {
+                                lock_dentry(*de);
+                                if (likely((*de)->d_op == &ll_init_d_ops)) {
+                                        (*de)->d_op = &ll_fini_d_ops;
+                                        unlock_dentry(*de);
+                                        smp_wmb();
+                                        ll_d_wakeup(*de);
+                                } else {
+                                        unlock_dentry(*de);
+                                }
+                        }
                         RETURN(rc);
+                }
 
                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
                        inode, inode->i_ino, inode->i_generation);
@@ -476,8 +494,18 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset,
                    ll_glimpse_size or some equivalent themselves anyway.
                    Also see bug 7198. */
                 *de = ll_find_alias(inode, *de);
+                if (set && *de != save) {
+                        lock_dentry(save);
+                        if (likely(save->d_op == &ll_init_d_ops)) {
+                                save->d_op = &ll_fini_d_ops;
+                                unlock_dentry(save);
+                                smp_wmb();
+                                ll_d_wakeup(save);
+                        } else {
+                                unlock_dentry(save);
+                        }
+                }
         } else {
-                ENTRY;
                 /* Check that parent has UPDATE lock. If there is none, we
                    cannot afford to hash this dentry (done by ll_d_add) as it
                    might get picked up later when UPDATE lock will appear */
@@ -497,7 +525,17 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset,
         }
 
         ll_set_dd(*de);
-        (*de)->d_op = &ll_d_ops;
+
+        lock_dentry(*de);
+        if (likely((*de)->d_op == &ll_init_d_ops)) {
+                (*de)->d_op = &ll_d_ops;
+                unlock_dentry(*de);
+                smp_wmb();
+                ll_d_wakeup(*de);
+        } else {
+                (*de)->d_op = &ll_d_ops;
+                unlock_dentry(*de);
+        }
 
         RETURN(0);
 }
index 6c23dd1..24b245d 100644 (file)
@@ -36,7 +36,7 @@
 
 struct ll_sai_entry {
         struct list_head        se_list;
-        int                     se_index;
+        unsigned int            se_index;
         int                     se_stat;
 };
 
@@ -45,6 +45,9 @@ enum {
         SA_ENTRY_STATED
 };
 
+static unsigned int sai_generation = 0;
+static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
+
 static struct ll_statahead_info *ll_sai_alloc(void)
 {
         struct ll_statahead_info *sai;
@@ -53,10 +56,14 @@ static struct ll_statahead_info *ll_sai_alloc(void)
         if (!sai)
                 return NULL;
 
-        sai->sai_max = LL_STATAHEAD_MIN;
+        spin_lock(&sai_generation_lock);
+        sai->sai_generation = ++sai_generation;
+        spin_unlock(&sai_generation_lock);
+        atomic_set(&sai->sai_refcount, 1);
+        sai->sai_max = LL_SA_RPC_MIN;
+        cfs_waitq_init(&sai->sai_waitq);
         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
         CFS_INIT_LIST_HEAD(&sai->sai_entries);
-        atomic_set(&sai->sai_refc, 1);
         return sai;
 }
 
@@ -64,18 +71,29 @@ static inline
 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
 {
         LASSERT(sai);
-        atomic_inc(&sai->sai_refc);
+        atomic_inc(&sai->sai_refcount);
         return sai;
 }
 
 static void ll_sai_put(struct ll_statahead_info *sai)
 {
-        struct inode *inode = sai->sai_inode;
+        struct inode         *inode = sai->sai_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
         ENTRY;
 
-        if (atomic_dec_and_lock(&sai->sai_refc, &lli->lli_lock)) {
-                struct ll_sai_entry  *entry, *next;
+        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
+                struct ll_sai_entry *entry, *next;
+
+                lli->lli_sai = NULL;
+                spin_unlock(&lli->lli_lock);
+
+                LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
+
+                if (sai->sai_sent > sai->sai_replied)
+                        CWARN("statahead for dir %lu/%u does not finish: "
+                              "[sent:%u] [replied:%u]\n",
+                              inode->i_ino, inode->i_generation,
+                              sai->sai_sent, sai->sai_replied);
 
                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
                                          se_list) {
@@ -83,102 +101,110 @@ static void ll_sai_put(struct ll_statahead_info *sai)
                         OBD_FREE_PTR(entry);
                 }
                 OBD_FREE_PTR(sai);
-                lli->lli_sai = NULL;
-                spin_unlock(&lli->lli_lock);
                 iput(inode);
         }
         EXIT;
 }
 
-static struct ll_sai_entry *ll_sai_entry_get(struct ll_statahead_info *sai,
-                                             int index, int stat)
+static struct ll_sai_entry *
+ll_sai_entry_get(struct ll_statahead_info *sai, unsigned int index, int stat)
 {
         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
-        struct ll_sb_info    *sbi = ll_i2sbi(sai->sai_inode);
         struct ll_sai_entry  *entry;
         ENTRY;
 
         OBD_ALLOC_PTR(entry);
         if (entry == NULL)
-                RETURN(NULL);
+                RETURN(ERR_PTR(-ENOMEM));
         
-        CDEBUG(D_READA, "alloc sai entry %p index %d, stat %d\n",
+        CDEBUG(D_READA, "alloc sai entry %p index %u, stat %d\n",
                entry, index, stat);
         entry->se_index = index;
         entry->se_stat  = stat;
 
         spin_lock(&lli->lli_lock);
         list_add_tail(&entry->se_list, &sai->sai_entries);
-        sai->sai_entries_nr++;
-        sbi->ll_sa_count = sai->sai_entries_nr;
         spin_unlock(&lli->lli_lock);
 
-        LASSERT(sai->sai_entries_nr <= sbi->ll_sa_max);
         RETURN(entry);
 }
 
-/* inside lli_lock */
-static void ll_sai_entry_set(struct ll_statahead_info *sai, int index,
-                             int stat)
+/*
+ * inside lli_lock
+ * return value:
+ *  0: can not find the entry with the index
+ *  1: it is the first entry
+ *  2: it is not the first entry
+ */
+static int
+ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat)
 {
         struct ll_sai_entry *entry;
+        int                  rc = 0;
         ENTRY;
 
-        list_for_each_entry(entry, &sai->sai_entries, se_list) {
-                if (entry->se_index == index) {
-                        LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
-                        entry->se_stat = stat;
-                        CDEBUG(D_READA, "set sai entry %p index %d stat %d\n",
-                               entry, index, stat);
-                        EXIT;
-                        return;
-                }
+        if (list_empty(&sai->sai_entries))
+                RETURN(0);
+
+        entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
+        if (entry->se_index == index)
+                GOTO(out, rc = 1);
+
+        while (entry->se_list.next != &sai->sai_entries &&
+               entry->se_index < index) {
+                entry = list_entry(entry->se_list.next, struct ll_sai_entry,
+                                   se_list);
+                if (entry->se_index == index)
+                        GOTO(out, rc = 2);
         }
-        /* Sometimes, this happens when entry has been put and freed */
-        CDEBUG(D_READA, "can't find sai entry index %d\n", index);
+
         EXIT;
+
+out:
+        if (rc) {
+                LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
+                entry->se_stat = stat;
+        }
+
+        return rc;
 }
 
-/* check first entry was stated already */
+/*
+ * Check whether first entry was stated already or not.
+ * No need to hold lli_lock, for:
+ * (1) it is me that remove entry from the list
+ * (2) the statahead thread only add new entry to the list tail
+ */
 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
 {
-        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
         struct ll_sai_entry  *entry;
         int                   rc = 0;
         ENTRY;
 
-        spin_lock(&lli->lli_lock);
         if (!list_empty(&sai->sai_entries)) {
                 entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
                                    se_list);
-                CDEBUG(D_READA, "check sai entry %p index %d stat %d\n",
-                       entry, entry->se_index, entry->se_stat);
                 rc = (entry->se_stat != SA_ENTRY_UNSTATED);
         }
-        spin_unlock(&lli->lli_lock);
 
         RETURN(rc);
 }
 
-/* inside lli_lock */
 static void ll_sai_entry_put(struct ll_statahead_info *sai)
 {
+        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
         struct ll_sai_entry  *entry;
         ENTRY;
         
-        if (list_empty(&sai->sai_entries)) {
-                EXIT;
-                return;
+        spin_lock(&lli->lli_lock);
+        if (!list_empty(&sai->sai_entries)) {
+                entry = list_entry(sai->sai_entries.next,
+                                   struct ll_sai_entry, se_list);
+                list_del(&entry->se_list);
+                OBD_FREE_PTR(entry);
         }
-        LASSERT(sai->sai_entries_nr > 0);
-
-        entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
-        list_del(&entry->se_list);
-        sai->sai_entries_nr--;
+        spin_unlock(&lli->lli_lock);
 
-        CDEBUG(D_READA, "free sa entry %p index %d stat %d\n",
-               entry, entry->se_index, entry->se_stat);
-        OBD_FREE_PTR(entry);
         EXIT;
 }
 
@@ -192,11 +218,22 @@ static int ll_statahead_interpret(struct obd_export *exp,
         struct dentry            *dentry = minfo->mi_dentry;
         struct inode             *dir = dentry->d_parent->d_inode;
         struct ll_inode_info     *lli = ll_i2info(dir);
-        struct ll_statahead_info *sai;
+        struct ll_statahead_info *sai = NULL;
         ENTRY;
 
         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
                dentry->d_name.len, dentry->d_name.name, rc);
+
+        spin_lock(&lli->lli_lock);
+        if (unlikely(lli->lli_sai == NULL ||
+            lli->lli_sai->sai_generation != minfo->mi_generation)) {
+                spin_unlock(&lli->lli_lock);
+                GOTO(out_free, rc = -ESTALE);
+        } else {
+                sai = ll_sai_get(lli->lli_sai);
+                spin_unlock(&lli->lli_lock);
+        }
+
         if (rc || dir == NULL)
                 GOTO(out, rc);
 
@@ -204,21 +241,21 @@ static int ll_statahead_interpret(struct obd_export *exp,
                 /* lookup */
                 struct dentry    *save = dentry;
                 struct it_cb_data icbd = {
-                        .icbd_parent = dir,
-                        .icbd_childp = &dentry
+                        .icbd_parent   = dir,
+                        .icbd_childp   = &dentry
                 };
 
                 rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
-                if (!rc) {
+                if (!rc)
                         /* 
                          * Here dentry->d_inode might be NULL,
                          * because the entry may have been removed before
                          * we start doing stat ahead.
                          */
-                        if (dentry != save)
-                                dput(save);
                         ll_lookup_finish_locks(it, dentry);
-                }
+
+                if (dentry != save)
+                        dput(save);
         } else {
                 /* revalidate */
                 struct mds_body *body;
@@ -248,19 +285,35 @@ static int ll_statahead_interpret(struct obd_export *exp,
                 spin_unlock(&dcache_lock);
 
                 ll_lookup_finish_locks(it, dentry);
-
         }
         EXIT;
+
 out:
-        spin_lock(&lli->lli_lock);
-        sai = lli->lli_sai;
-        if (sai) {
-                lli->lli_sai->sai_replied++;
-                ll_sai_entry_set(lli->lli_sai, (long)minfo->mi_cbdata,
-                                 SA_ENTRY_STATED);
-                cfs_waitq_signal(&lli->lli_sai->sai_thread.t_ctl_waitq);
+        if (sai != NULL) {
+                int first;
+
+                sai->sai_replied++;
+                spin_lock(&lli->lli_lock);
+                first = ll_sai_entry_set(sai,
+                                         (unsigned int)(long)minfo->mi_cbdata,
+                                         SA_ENTRY_STATED);
+                /*
+                 * wake up the "ls -l" process only when the first entry
+                 * returned.
+                 */
+                spin_unlock(&lli->lli_lock);
+                if (first == 1)
+                        cfs_waitq_signal(&sai->sai_waitq);
+                else if (first == 0)
+                        CDEBUG(D_READA, "can't find sai entry for dir "
+                               "%lu/%u generation %u index %d\n",
+                               dir->i_ino, dir->i_generation,
+                               minfo->mi_generation,
+                               (unsigned int)(long)minfo->mi_cbdata);
+
+                ll_sai_put(sai);
         }
-        spin_unlock(&lli->lli_lock);
+out_free:
         ll_intent_release(it);
         OBD_FREE_PTR(minfo);
 
@@ -298,7 +351,8 @@ static int sa_args_prep(struct inode *dir, struct dentry *dentry,
         minfo->mi_it.it_op = IT_GETATTR;
         minfo->mi_dentry = dentry;
         minfo->mi_cb = ll_statahead_interpret;
-        minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_sent;
+        minfo->mi_generation = lli->lli_sai->sai_generation;
+        minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
 
         einfo->ei_type   = LDLM_IBITS;
         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
@@ -337,14 +391,16 @@ static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
         RETURN(rc);
 }
 
-/* similar to ll_revalidate_it().
- * return 1: dentry valid.
- *        0: will send stat-ahead request.
- *        -errno: prepare stat-ahead request failed. */
+/* 
+ * similar to ll_revalidate_it().
+ * return value:
+ *  1      -- dentry valid
+ *  0      -- will send stat-ahead request
+ *  others -- prepare stat-ahead request failed
+ */
 static int do_sa_revalidate(struct dentry *dentry)
 {
         struct inode             *inode = dentry->d_inode;
-        struct ll_inode_info     *lli = ll_i2info(dentry->d_parent->d_inode);
         struct ll_fid             fid;
         struct lookup_intent      it = { .it_op = IT_GETATTR };
         struct md_enqueue_info   *minfo;
@@ -358,13 +414,14 @@ static int do_sa_revalidate(struct dentry *dentry)
         if (d_mountpoint(dentry))
                 RETURN(1);
 
+        if (dentry == dentry->d_sb->s_root)
+                RETURN(1);
+
         ll_inode2fid(&fid, inode);
 
         rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid);
         if (rc == 1) {
                 ll_intent_release(&it);
-                lli->lli_sai->sai_cached++;
-                cfs_waitq_signal(&lli->lli_sai->sai_thread.t_ctl_waitq);
                 RETURN(1);
         }
 
@@ -384,21 +441,17 @@ static int do_sa_revalidate(struct dentry *dentry)
         RETURN(rc);
 }
 
-/* copied from kernel */
-static inline void name2qstr(struct qstr *this, const char *name, int namelen)
+static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen)
 {
-        unsigned long        hash;
-        const unsigned char *p = (const unsigned char *)name;
-        int                  len;
-        unsigned int         c;
-
-        hash = init_name_hash();
-        for (len = 0; len < namelen; len++, p++) {
-                c = *p;
-                hash = partial_name_hash(c, hash);
-        }
+        unsigned long hash = init_name_hash();
+        unsigned int  c;
+
         this->name = name;
         this->len  = namelen;
+        for (; namelen > 0; namelen--, name++) {
+                c = *(const unsigned char *)name;
+                hash = partial_name_hash(c, hash);
+        }
         this->hash = end_name_hash(hash);
 }
 
@@ -412,11 +465,6 @@ static int ll_statahead_one(struct dentry *parent, ext2_dirent *de)
         int                     rc;
         ENTRY;
 
-        name2qstr(&name, de->name, de->name_len);
-
-        se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_sent,
-                              SA_ENTRY_UNSTATED);
-
 #ifdef DCACHE_LUSTRE_INVALID
         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
 #else
@@ -425,34 +473,44 @@ static int ll_statahead_one(struct dentry *parent, ext2_dirent *de)
                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
                        "invalid, skip statahead\n",
                        parent, parent->d_name.len, parent->d_name.name);
-                GOTO(out, rc = -EINVAL);
+                RETURN(-EINVAL);
         }
 
+        se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_index,
+                              SA_ENTRY_UNSTATED);
+        if (IS_ERR(se))
+                RETURN(PTR_ERR(se));
+
+        ll_name2qstr(&name, de->name, de->name_len);
         dentry = d_lookup(parent, &name);
         if (!dentry) {
-                struct dentry *dentry = d_alloc(parent, &name);
-
-                rc = -ENOMEM;
+                dentry = d_alloc(parent, &name);
                 if (dentry) {
                         rc = do_sa_lookup(dir, dentry);
                         if (rc)
                                 dput(dentry);
+                } else {
+                        GOTO(out, rc = -ENOMEM);
                 }
-                GOTO(out, rc);
+        } else {
+                rc = do_sa_revalidate(dentry);
+                if (rc)
+                        dput(dentry);
         }
 
-        rc = do_sa_revalidate(dentry);
-        if (rc)
-                dput(dentry);
-        GOTO(out, rc);
+        EXIT;
+
 out:
         if (rc) {
-                CDEBUG(D_READA, "set sai entry %p index %d stat %d, rc %d\n",
+                CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
                        se, se->se_index, se->se_stat, rc);
                 se->se_stat = rc;
-                cfs_waitq_signal(&lli->lli_sai->sai_thread.t_ctl_waitq);
+                cfs_waitq_signal(&lli->lli_sai->sai_waitq);
+        } else {
+                lli->lli_sai->sai_sent++;
         }
-        lli->lli_sai->sai_sent++;
+
+        lli->lli_sai->sai_index++;
         return rc;
 }
                 
@@ -463,7 +521,17 @@ static inline int sa_check_stop(struct ll_statahead_info *sai)
 
 static inline int sa_not_full(struct ll_statahead_info *sai)
 {
-        return sai->sai_sent - sai->sai_miss - sai->sai_hit < sai->sai_max;
+        return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max;
+}
+
+/* (1) hit ratio less than 80%
+ * or
+ * (2) consecutive miss more than 8
+ */
+static inline int sa_low_hit(struct ll_statahead_info *sai)
+{
+        return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
+                (sai->sai_consecutive_miss > 8));
 }
 
 struct ll_sa_thread_args {
@@ -480,155 +548,141 @@ static int ll_statahead_thread(void *arg)
         struct ll_sb_info        *sbi = ll_i2sbi(dir);
         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
         struct ptlrpc_thread     *thread = &sai->sai_thread;
-        struct l_wait_info        lwi = { 0 };
         unsigned long             index = 0;
-        __u64                     offset = 0;
-        int                       skip = 0;
+        int                       first = 0;
         int                       rc = 0;
         char                      name[16] = "";
         ENTRY;
 
         sbi->ll_sa_total++;
-
         snprintf(name, 15, "ll_sa_%u", sta->sta_pid);
         cfs_daemonize(name);
+        spin_lock(&lli->lli_lock);
         thread->t_flags = SVC_RUNNING;
+        spin_unlock(&lli->lli_lock);
         cfs_waitq_signal(&thread->t_ctl_waitq);
         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
 
-        if (sai->sai_ls_all)
-                CDEBUG(D_READA, "do statahead for hidden files\n");
-
         while (1) {
-                unsigned long npages = dir_pages(dir);
-
-                /* hit ratio < 80% */
-                if ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
-                     (sai->sai_consecutive_miss > 8)) {
-                        sbi->ll_sa_wrong++;
-                        CDEBUG(D_READA, "statahead for dir %.*s hit ratio too "
-                               "low: hit/miss %u/%u, sent/replied %u/%u, "
-                               "cached %u\n",
-                               parent->d_name.len, parent->d_name.name,
-                               sai->sai_hit, sai->sai_miss, sai->sai_sent,
-                               sai->sai_replied, sai->sai_cached);
-                        break;
-                }
+                struct l_wait_info lwi = { 0 };
+                unsigned long npages;
+                char *kaddr, *limit;
+                ext2_dirent *de;
+                struct page *page;
 
+                npages = dir_pages(dir);
                 /* reach the end of dir */
-                if (index == npages) {
+                if (index >= npages) {
                         CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
                                index, npages);
                         break;
                 }
 
-                l_wait_event(thread->t_ctl_waitq,
-                             sa_check_stop(sai) || sa_not_full(sai),
-                             &lwi);
-
-                if (sa_check_stop(sai))
+                page = ll_get_dir_page(dir, index);
+                if (IS_ERR(page)) {
+                        rc = PTR_ERR(page);
+                        CERROR("error reading dir %lu/%u page %lu/%u: rc %d\n",
+                               dir->i_ino, dir->i_generation, index,
+                               sai->sai_index, rc);
                         break;
+                }
 
-                for (; index < npages; index++, offset = 0) {
-                        char *kaddr, *limit;
-                        ext2_dirent *de;
-                        struct page *page;
-
-                        CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu"
-                               "/%lu size %llu\n",
-                               CFS_PAGE_SIZE, dir->i_ino, dir->i_generation,
-                               index, npages, dir->i_size);
-
-                        page = ll_get_dir_page(dir, index);
-                        npages = dir_pages(dir);
-
-                        if (IS_ERR(page)) {
-                                rc = PTR_ERR(page);
-                                CERROR("error reading dir %lu/%u page %lu: "
-                                       "rc %d\n",
-                                       dir->i_ino, dir->i_generation, index,
-                                       rc);
-                                GOTO(out, rc);
+                kaddr = page_address(page);
+                limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
+                de = (ext2_dirent *)kaddr;
+                if (!index) {
+                        de = ext2_next_entry(de); /* skip "." */
+                        de = ext2_next_entry(de); /* skip ".." */
+                }
+
+                for (; (char*)de <= limit; de = ext2_next_entry(de)) {
+                        if (!de->inode)
+                                continue;
+
+                        if (de->name[0] == '.' && !sai->sai_ls_all) {
+                                /* skip hidden files */
+                                sai->sai_skip_hidden++;
+                                continue;
                         }
 
-                        kaddr = page_address(page);
-                        de = (ext2_dirent *)(kaddr + offset);
-                        limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
-                        for (; (char*)de <= limit && sa_not_full(sai);
-                             de = ext2_next_entry(de)) {
-                                if (!de->inode)
-                                        continue;
-
-                                /* don't stat-ahead ".", ".." */
-                                if (skip < 2) {
-                                        skip++;
-                                        continue;
-                                }
-
-                                /* don't stat-ahead for hidden files */
-                                if (de->name[0] == '.' && !sai->sai_ls_all)
-                                        continue;
-
-                                /* don't stat-ahead for the first de */
-                                if (skip < 3) {
-                                        skip++;
-                                        continue;
-                                }
-
-                                rc = ll_statahead_one(parent, de);
-                                if (rc < 0) {
-                                        ext2_put_page(page);
-                                        GOTO(out, rc);
-                                }
+                        /* don't stat-ahead first entry */
+                        if (unlikely(!first)) {
+                                first++;
+                                continue;
+                        }
+
+                        l_wait_event(thread->t_ctl_waitq,
+                                     sa_check_stop(sai) || sa_not_full(sai),
+                                     &lwi);
+
+                        if (unlikely(sa_check_stop(sai))) {
+                                ext2_put_page(page);
+                                GOTO(out, rc);
                         }
-                        offset = (char *)de - kaddr;
-                        ext2_put_page(page);
 
-                        if ((char *)de <= limit)
-                                /* !sa_not_full() */
-                                break;
+                        rc = ll_statahead_one(parent, de);
+                        if (rc < 0) {
+                                ext2_put_page(page);
+                                GOTO(out, rc);
+                        }
                 }
+                ext2_put_page(page);
+                index++;
         }
         EXIT;
 out:
         spin_lock(&lli->lli_lock);
         thread->t_flags = SVC_STOPPED;
-        cfs_waitq_signal(&thread->t_ctl_waitq);
-        lli->lli_opendir_pid = 0; /* avoid statahead again */
         spin_unlock(&lli->lli_lock);
-
+        cfs_waitq_signal(&sai->sai_waitq);
+        cfs_waitq_signal(&thread->t_ctl_waitq);
         ll_sai_put(sai);
         dput(parent);
-        CDEBUG(D_READA, "stopped statahead thread, pid %d for %s\n",
-               current->pid, parent->d_name.name);
-        return 0;
+        CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
+               cfs_curproc_pid());
+        return rc;
 }
 
-/* called in ll_file_release */
-void ll_stop_statahead(struct inode *inode)
+/* called in ll_file_release() */
+void ll_stop_statahead(struct inode *inode, void *key)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ptlrpc_thread *thread;
 
         spin_lock(&lli->lli_lock);
-        /* don't check pid here. upon fork, if parent closedir before child,
-         * child will not have chance to stop this thread. */
+        if (lli->lli_opendir_pid == 0 ||
+            unlikely(lli->lli_opendir_key != key)) {
+                spin_unlock(&lli->lli_lock);
+                return;
+        }
+
+        lli->lli_opendir_key = NULL;
         lli->lli_opendir_pid = 0;
 
-        if (lli->lli_sai && (lli->lli_sai->sai_thread.t_flags & SVC_RUNNING)) {
+        if (lli->lli_sai) {
                 struct l_wait_info lwi = { 0 };
-                ll_sai_get(lli->lli_sai);
+
                 thread = &lli->lli_sai->sai_thread;
-                thread->t_flags = SVC_STOPPING;
-                cfs_waitq_signal(&thread->t_ctl_waitq);
-                spin_unlock(&lli->lli_lock);
+                if (!(thread->t_flags & SVC_STOPPED)) {
+                        thread->t_flags = SVC_STOPPING;
+                        spin_unlock(&lli->lli_lock);
+                        cfs_waitq_signal(&thread->t_ctl_waitq);
 
-                CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
-                       current->pid);
-                l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED,
-                             &lwi);
-                ll_sai_put(lli->lli_sai);
+                        CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
+                               cfs_curproc_pid());
+                        l_wait_event(thread->t_ctl_waitq,
+                                     thread->t_flags & SVC_STOPPED,
+                                     &lwi);
+                } else {
+                        spin_unlock(&lli->lli_lock);
+                }
 
+                /*
+                 * Put the ref which was held when first statahead_enter.
+                 * It maybe not the last ref for some statahead requests
+                 * maybe inflight.
+                 */
+                ll_sai_put(lli->lli_sai);
                 return;
         }
         spin_unlock(&lli->lli_lock);
@@ -643,42 +697,45 @@ enum {
 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
 {
         struct qstr   *d_name = &dentry->d_name;
-        unsigned long  npages = dir_pages(dir);
+        unsigned long  npages, index = 0;
         struct page   *page;
         ext2_dirent   *de;
-        unsigned long  index;
-        __u64          offset = 0;
         char          *kaddr, *limit;
-        int            dot_de = 1; /* dirent is dotfile till now */
-        int            rc = LS_NONE_FIRST_DE;
+        int            rc = LS_NONE_FIRST_DE, dot_de;
         ENTRY;
 
-        page = ll_get_dir_page(dir, 0);
-        if (IS_ERR(page)) {
-                CERROR("error reading dir %lu/%u page 0: rc %ld\n",
-                       dir->i_ino, dir->i_generation, PTR_ERR(page));
-                RETURN(LS_NONE_FIRST_DE);
-        }
-
-        kaddr = page_address(page);
-        de = (ext2_dirent *)kaddr;
-        if (!(de->name_len == 1 && strncmp(de->name, ".", 1) == 0))
-                CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
-        de = ext2_next_entry(de); /* skip ".", or ingore bad entry */
-        if (!(de->name_len == 2 && strncmp(de->name, "..", 2) == 0))
-                CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
-        de = ext2_next_entry(de); /* skip "..", or ingore bad entry */
+        while (1) {
+                npages = dir_pages(dir);
+                /* reach the end of dir */
+                if (index >= npages) {
+                        CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
+                               index, npages);
+                        break;
+                }
 
-        offset = (char *)de - kaddr;
+                page = ll_get_dir_page(dir, index);
+                if (IS_ERR(page)) {
+                        rc = PTR_ERR(page);
+                        CERROR("error reading dir %lu/%u page %lu: rc %d\n",
+                               dir->i_ino, dir->i_generation, index, rc);
+                        break;
+                }
 
-        for (index = 0; index < npages; offset = 0) {
-                de = (ext2_dirent *)(kaddr + offset);
+                kaddr = page_address(page);
                 limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
+                de = (ext2_dirent *)kaddr;
+                if (!index) {
+                        de = ext2_next_entry(de); /* skip "." */
+                        de = ext2_next_entry(de); /* skip ".." */
+                }
+
                 for (; (char*)de <= limit; de = ext2_next_entry(de)) {
                         if (!de->inode)
                                 continue;
 
-                        if (de->name[0] != '.')
+                        if (de->name[0] == '.')
+                                dot_de = 1;
+                        else
                                 dot_de = 0;
 
                         if (dot_de && d_name->name[0] != '.') {
@@ -693,76 +750,79 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
                                 rc = LS_FIRST_DE + dot_de;
                         else
                                 rc = LS_NONE_FIRST_DE;
-                        GOTO(out, rc);
+                        ext2_put_page(page);
+                        RETURN(rc);
                 }
-
-                if (++index >= npages)
-                        break;
-
                 ext2_put_page(page);
-
-                page = ll_get_dir_page(dir, index);
-                if (IS_ERR(page)) {
-                        CERROR("error reading dir %lu/%u page %lu: rc %ld\n",
-                               dir->i_ino, dir->i_generation, index,
-                               PTR_ERR(page));
-                        RETURN(LS_NONE_FIRST_DE);
-                }
-                kaddr = page_address(page);
+                index++;
         }
-        CERROR("%.*s not found in dir %.*s!\n", d_name->len, d_name->name,
-               dentry->d_parent->d_name.len, dentry->d_parent->d_name.name);
-        EXIT;
-out:
-        ext2_put_page(page);
-        return rc;
+        RETURN(rc);
 }
 
-/* start stat-ahead thread if this is the first dir entry, otherwise if a thread
- * is started already, wait until thread is ahead of me.
+/* Start statahead thread if this is the first dir entry.
+ * Otherwise if a thread is started already, wait it until it is ahead of me.
  * Return value: 
- *    0 -- miss,
- *    1 -- hit,
- *    -EEXIST -- stat ahead thread started, and this is the first try.
- *    other negative value -- error.
+ *  0       -- miss
+ *  1       -- hit
+ *  -EEXIST -- stat ahead thread started, and this is the first dentry
+ *  -EBADFD -- statahead thread exit and not dentry available
+ *  others  -- error
  */
-int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
+int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
 {
         struct ll_sb_info        *sbi = ll_i2sbi(dir);
         struct ll_inode_info     *lli = ll_i2info(dir);
-        struct ll_statahead_info *sai;
+        struct ll_statahead_info *sai = lli->lli_sai;
         struct ll_sa_thread_args  sta;
         struct l_wait_info        lwi = { 0 };
         int                       rc;
         ENTRY;
 
-        if (sbi->ll_sa_max == 0)
-                RETURN(-ENOTSUPP);
-
-        /* not the same process, don't statahead */
-        if (lli->lli_opendir_pid != current->pid)
-                RETURN(-EBADF);
+        LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
 
-        spin_lock(&lli->lli_lock);
-        if (lli->lli_sai) {
-                sai = ll_sai_get(lli->lli_sai);
-                spin_unlock(&lli->lli_lock);
+        if (sai) {
+                if (unlikely(sai->sai_thread.t_flags & SVC_STOPPED &&
+                             list_empty(&sai->sai_entries)))
+                        RETURN(-EBADFD);
+
+                if ((*dentryp)->d_name.name[0] == '.') {
+                        if (likely(sai->sai_ls_all ||
+                            sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
+                                /*
+                                 * Hidden dentry is the first one, or statahead
+                                 * thread does not skip so many hidden dentries
+                                 * before "sai_ls_all" enabled as below.
+                                 */
+                        } else {
+                                if (!sai->sai_ls_all)
+                                        /*
+                                         * It maybe because hidden dentry is not
+                                         * the first one, "sai_ls_all" was not
+                                         * set, then "ls -al" missed. Enable
+                                         * "sai_ls_all" for such case.
+                                         */
+                                        sai->sai_ls_all = 1;
+
+                                /*
+                                 * Such "getattr" has been skipped before
+                                 * "sai_ls_all" enabled as above.
+                                 */
+                                sai->sai_miss_hidden++;
+                                RETURN(-ENOENT);
+                        }
+                }
 
                 if (ll_sai_entry_stated(sai)) {
                         sbi->ll_sa_cached++;
                 } else {
-                        struct l_wait_info lwi = { 0 };
-
                         sbi->ll_sa_blocked++;
                         /* thread started already, avoid double-stat */
-                        l_wait_event(sai->sai_thread.t_ctl_waitq,
+                        l_wait_event(sai->sai_waitq,
                                      ll_sai_entry_stated(sai) ||
                                      sai->sai_thread.t_flags & SVC_STOPPED,
                                      &lwi);
                 }
 
-                ll_sai_put(sai);
-
                 if (lookup) {
                         struct dentry *result;
 
@@ -772,57 +832,52 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
                                 LASSERT(result != *dentryp);
                                 dput(*dentryp);
                                 *dentryp = result;
+                                RETURN(1);
                         }
-                        RETURN(result != NULL);
                 }
                 /* do nothing for revalidate */
                 RETURN(0);
         }
-        spin_unlock(&lli->lli_lock);
+
+         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ 
+        LASSERT(lli->lli_sai == NULL);
 
         rc = is_first_dirent(dir, *dentryp);
-        if (!rc) {
-                /* optimization: don't statahead for this pid any longer */
+        if (rc == LS_NONE_FIRST_DE) {
+                /* It is not "ls -{a}l" operation, no need statahead for it */
                 spin_lock(&lli->lli_lock);
-                if (lli->lli_sai == NULL)
-                        lli->lli_opendir_pid = 0;
+                lli->lli_opendir_key = NULL;
+                lli->lli_opendir_pid = 0;
                 spin_unlock(&lli->lli_lock);
                 RETURN(-EBADF);
         }
 
-        spin_lock(&lli->lli_lock);
-        if (lli->lli_sai == NULL) {
-                lli->lli_sai = ll_sai_alloc();
-                if (lli->lli_sai == NULL) {
-                        spin_unlock(&lli->lli_lock);
-                        RETURN(-ENOMEM);
-                }
-        } else {
-                /* sai is already there */
-                spin_unlock(&lli->lli_lock);
-                RETURN(-EBUSY);
-        }
-        spin_unlock(&lli->lli_lock);
+        sai = ll_sai_alloc();
+        if (sai == NULL)
+                RETURN(-ENOMEM);
         
-        sai = lli->lli_sai;
-        sai->sai_inode = igrab(dir);
+        sai->sai_inode  = igrab(dir);
         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
 
         sta.sta_parent = (*dentryp)->d_parent;
-        sta.sta_pid    = current->pid;
-        rc = kernel_thread(ll_statahead_thread, &sta, 0);
+        sta.sta_pid    = cfs_curproc_pid();
+
+        lli->lli_sai = sai;
+        rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
         if (rc < 0) {
                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
+                sai->sai_thread.t_flags = SVC_STOPPED;
                 ll_sai_put(sai);
+                LASSERT(lli->lli_sai == NULL);
                 RETURN(rc);
         }
 
         l_wait_event(sai->sai_thread.t_ctl_waitq, 
                      sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED),
                      &lwi);
-        ll_sai_put(sai);
 
-        /* we don't stat-ahead for the first dirent since we are already in
+        /*
+         * We don't stat-ahead for the first dirent since we are already in
          * lookup, and -EEXIST also indicates that this is the first dirent.
          */
         RETURN(-EEXIST);
@@ -831,41 +886,42 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
 /* update hit/miss count */
 void ll_statahead_exit(struct dentry *dentry, int result)
 {
-        struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode);
-        struct ll_sb_info    *sbi = ll_i2sbi(dentry->d_parent->d_inode);
+        struct dentry        *parent = dentry->d_parent;
+        struct ll_inode_info *lli = ll_i2info(parent->d_inode);
+        struct ll_sb_info    *sbi = ll_i2sbi(parent->d_inode);
 
-        if (lli->lli_opendir_pid != current->pid)
+        if (lli->lli_opendir_pid != cfs_curproc_pid())
                 return;
 
-        spin_lock(&lli->lli_lock);
         if (lli->lli_sai) {
                 struct ll_statahead_info *sai = lli->lli_sai;
 
-                ll_sai_entry_put(sai);
                 if (result == 1) {
+                        sbi->ll_sa_hit++;
                         sai->sai_hit++;
                         sai->sai_consecutive_miss = 0;
                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
-                        CDEBUG(D_READA, "statahead %.*s hit(hit/miss %u/%u)\n",
-                               dentry->d_name.len, dentry->d_name.name,
-                               sai->sai_hit, sai->sai_miss);
                 } else {
+                        sbi->ll_sa_miss++;
                         sai->sai_miss++;
                         sai->sai_consecutive_miss++;
-                        /* upon miss, it's always because some dentry is added
-                         * by statahead thread, and at the mean time `ls`
-                         * processs finds this dentry, but the d_op for this
-                         * dentry is NULL, then revalidate is not done, and
-                         * ll_statahead_exit() not called for this dentry,
-                         * so statahead thread should be behind of `ls` process,
-                         * put one entry to go ahead.
-                         */
-                        CDEBUG(D_READA, "statahead %.*s miss(hit/miss %u/%u)\n",
-                               dentry->d_name.len, dentry->d_name.name,
-                               sai->sai_hit, sai->sai_miss);
-                        ll_sai_entry_put(sai);
+                        if (sa_low_hit(sai)) {
+                                sbi->ll_sa_wrong++;
+                                CDEBUG(D_READA, "statahead for dir %.*s hit "
+                                       "ratio too low: hit/miss %u/%u, "
+                                       "sent/replied %u/%u. stopping statahead "
+                                       "thread: pid %d\n",
+                                       parent->d_name.len, parent->d_name.name,
+                                       sai->sai_hit, sai->sai_miss,
+                                       sai->sai_sent, sai->sai_replied,
+                                       cfs_curproc_pid());
+                                spin_lock(&lli->lli_lock);
+                                if (!(sai->sai_thread.t_flags & SVC_STOPPED))
+                                        sai->sai_thread.t_flags = SVC_STOPPING;
+                                spin_unlock(&lli->lli_lock);
+                        }
                 }
                 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
+                ll_sai_entry_put(sai);
         }
-        spin_unlock(&lli->lli_lock);
 }
index 85c9580..91d923d 100644 (file)
@@ -838,7 +838,7 @@ static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
                                    &flags, NULL, 0, NULL, lockh, rc);
         if (rc < 0) {
-                CERROR("ldlm_cli_enqueue: %d\n", rc);
+                CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
                 mdc_clear_replay_flag(req, rc);
                 GOTO(out, rc);
         }
index 07b039b..0d37861 100644 (file)
@@ -4664,7 +4664,9 @@ test_123a() { # was test 123, statahead(bug 11401)
                 max=`lctl get_param -n llite.*.statahead_max | head -n 1`
                 lctl set_param -n llite.*.statahead_max 0
                 lctl get_param llite.*.statahead_max
-
+                cancel_lru_locks mdc
+                cancel_lru_locks osc
+                stime=`date +%s`
                 ls -l $DIR/$tdir > /dev/null
                 etime=`date +%s`
                 delta=$((etime - stime))
@@ -4678,7 +4680,7 @@ test_123a() { # was test 123, statahead(bug 11401)
 
                 [ $delta -gt 20 ] && break
                 [ $delta -gt 8 ] && MULT=$((50 / delta))
-                [ "$SLOW" = "no" -a $delta -ge 3 ] && break            
+                [ "$SLOW" = "no" -a $delta -gt 3 ] && break
         done
         log "ls done"