Whamcloud - gitweb
Land b1_8_dir_ra onto HEAD (20080521_1834)
authorfanyong <fanyong>
Wed, 21 May 2008 14:20:44 +0000 (14:20 +0000)
committerfanyong <fanyong>
Wed, 21 May 2008 14:20:44 +0000 (14:20 +0000)
b=11401,15405
i=huanghua
i=tappro

25 files changed:
lustre/ChangeLog
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_support.h
lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch
lustre/llite/Makefile.in
lustre/llite/dcache.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_nfs.c
lustre/llite/lproc_llite.c
lustre/llite/namei.c
lustre/llite/rw26.c
lustre/llite/statahead.c [new file with mode: 0644]
lustre/llite/xattr.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/obdclass/lprocfs_status.c
lustre/tests/sanity.sh

index 5b708d2..216a81a 100644 (file)
@@ -161,6 +161,21 @@ Details    : When MGC is disconnected from MGS long enough, MGS will evict the
             of the error messages complaining that MGS is not connected.
 
 Severity   : major
+Bugzilla   : 15027
+Frequency  : on network error
+Description: panic with double free request if network error 
+Details    : mdc_finish_enqueue is finish request if any network error ocuring,
+             but it's true only for synchronus enqueue, for async enqueue 
+             (via ptlrpcd) this incorrect and ptlrpcd want finish request
+             himself.
+
+Severity   : enhancement
+Bugzilla   : 11401
+Description: client-side metadata stat-ahead during readdir(directory readahead)
+Details    : perform client-side metadata stat-ahead when the client detects
+             readdir and sequential stat of dir entries therein
+
+Severity   : major
 Frequency  : on start mds
 Bugzilla   : 14884
 Description: Implement get_info(last_id) in obdfilter.
index 40f1caa..710ee2e 100644 (file)
@@ -1006,6 +1006,18 @@ enum obd_cleanup_stage {
 
 struct lu_context;
 
+static inline int it_to_lock_mode(struct lookup_intent *it)
+{
+        /* CREAT needs to be tested before open (both could be set) */
+        if (it->it_op & IT_CREAT)
+                return LCK_CW;
+        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
+                return LCK_CR;
+        LASSERTF(0, "Invalid it_op: %d\n", it->it_op);
+        return -EINVAL;
+}
+
 struct md_op_data {
         struct lu_fid           op_fid1; /* operation fid1 (usualy parent) */
         struct lu_fid           op_fid2; /* operation fid2 (usualy child) */
@@ -1049,6 +1061,22 @@ struct md_op_data {
         __u32                   op_opc;
 };
 
+struct md_enqueue_info;
+/* metadata stat-ahead */
+typedef int (* md_enqueue_cb_t)(struct ptlrpc_request *req,
+                                struct md_enqueue_info *minfo,
+                                int rc);
+
+struct md_enqueue_info {
+        struct md_op_data       mi_data;
+        struct lookup_intent    mi_it;
+        struct lustre_handle    mi_lockh;
+        struct dentry          *mi_dentry;
+        md_enqueue_cb_t         mi_cb;
+        unsigned int            mi_generation;
+        void                   *mi_cbdata;
+};
+
 struct obd_ops {
         struct module *o_owner;
         int (*o_iocontrol)(unsigned int cmd, struct obd_export *exp, int len,
@@ -1368,6 +1396,14 @@ struct md_ops {
                                  struct obd_capa *, __u32,
                                  struct ptlrpc_request **);
 
+        int (*m_intent_getattr_async)(struct obd_export *,
+                                      struct md_enqueue_info *,
+                                      struct ldlm_enqueue_info *);
+
+        int (*m_revalidate_lock)(struct obd_export *,
+                                 struct lookup_intent *,
+                                 struct lu_fid *);
+
         /*
          * NOTE: If adding ops, add another LPROCFS_MD_OP_INIT() line to
          * lprocfs_alloc_md_stats() in obdclass/lprocfs_status.c. Also, add a
index 76ce36b..f6ea134 100644 (file)
@@ -332,7 +332,7 @@ do {                                                            \
         if (!OBT((exp)->exp_obd) || !MDP((exp)->exp_obd, op)) { \
                 CERROR("obd_" #op ": dev %s/%d no operation\n", \
                        (exp)->exp_obd->obd_name,                \
-                      (exp)->exp_obd->obd_minor);              \
+                       (exp)->exp_obd->obd_minor);              \
                 RETURN(-EOPNOTSUPP);                            \
         }                                                       \
 } while (0)
@@ -2000,6 +2000,31 @@ static inline int md_renew_capa(struct obd_export *exp, struct obd_capa *ocapa,
         RETURN(rc);
 }
 
+static inline int md_intent_getattr_async(struct obd_export *exp,
+                                          struct md_enqueue_info *minfo,
+                                          struct ldlm_enqueue_info *einfo)
+{
+        int rc;
+        ENTRY;
+        EXP_CHECK_MD_OP(exp, intent_getattr_async);
+        EXP_MD_COUNTER_INCREMENT(exp, intent_getattr_async);
+        rc = MDP(exp->exp_obd, intent_getattr_async)(exp, minfo, einfo);
+        RETURN(rc);
+}
+
+static inline int md_revalidate_lock(struct obd_export *exp,
+                                     struct lookup_intent *it,
+                                     struct lu_fid *fid)
+{
+        int rc;
+        ENTRY;
+        EXP_CHECK_MD_OP(exp, revalidate_lock);
+        EXP_MD_COUNTER_INCREMENT(exp, revalidate_lock);
+        rc = MDP(exp->exp_obd, revalidate_lock)(exp, it, fid);
+        RETURN(rc);
+}
+
+
 /* OBD Metadata Support */
 
 extern int obd_init_caches(void);
index 71c5046..26e5eb7 100644 (file)
@@ -246,6 +246,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 
 #define OBD_FAIL_MDC_REVALIDATE_PAUSE    0x800
 #define OBD_FAIL_MDC_ENQUEUE_PAUSE       0x801
+#define OBD_FAIL_MDC_GETATTR_ENQUEUE     0x803
 
 #define OBD_FAIL_MGS                     0x900
 #define OBD_FAIL_MGS_ALL_REQUEST_NET     0x901
index ee655c3..66e65fb 100644 (file)
@@ -1205,8 +1205,9 @@ diff -rup RH_2_6_9_55.orig/fs/stat.c RH_2_6_9_55/fs/stat.c
        int error;
 +      intent_init(&nd.intent, IT_GETATTR);
 
-       error = user_path_walk(name, &nd);
-       if (!error) {
+-      error = user_path_walk(name, &nd);
++      error = user_path_walk_it(name, &nd);
+       if (!error) {
 -              error = vfs_getattr64(nd.mnt, nd.dentry, stat);
 +              error = vfs_getattr64_it(nd.mnt, nd.dentry, &nd.intent, stat);
                path_release(&nd);
@@ -1218,8 +1219,9 @@ diff -rup RH_2_6_9_55.orig/fs/stat.c RH_2_6_9_55/fs/stat.c
        int error;
 +      intent_init(&nd.intent, IT_GETATTR);
 
-       error = user_path_walk_link(name, &nd);
-       if (!error) {
+-      error = user_path_walk_link(name, &nd);
++      error = user_path_walk_link_it(name, &nd);
+       if (!error) {
 -              error = vfs_getattr64(nd.mnt, nd.dentry, stat);
 +              error = vfs_getattr64_it(nd.mnt, nd.dentry, &nd.intent, stat);
                path_release(&nd);
index 5cfb431..8d02c85 100644 (file)
@@ -2,7 +2,7 @@ MODULES := lustre llite_lloop
 lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o
 lustre-objs += llite_fid.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o
 lustre-objs += xattr.o remote_perm.o llite_rmtacl.o llite_capa.o
-lustre-objs += rw26.o super25.o
+lustre-objs += rw26.o super25.o statahead.o
 
 llite_lloop-objs := lloop.o
 
index 878fd8e..4d8b687 100644 (file)
@@ -120,11 +120,21 @@ void ll_set_dd(struct dentry *de)
         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
                atomic_read(&de->d_count));
-        lock_kernel();
+
         if (de->d_fsdata == NULL) {
-                OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data));
+                struct ll_dentry_data *lld;
+
+                OBD_ALLOC_PTR(lld);
+                if (likely(lld != NULL)) {
+                        cfs_waitq_init(&lld->lld_waitq);
+                        lock_dentry(de);
+                        if (likely(de->d_fsdata == NULL))
+                                de->d_fsdata = lld;
+                        else
+                                OBD_FREE_PTR(lld);
+                        unlock_dentry(de);
+                }
         }
-        unlock_kernel();
 
         EXIT;
 }
@@ -332,12 +342,12 @@ void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
 int ll_revalidate_it(struct dentry *de, int lookup_flags,
                      struct lookup_intent *it)
 {
-        int rc;
         struct md_op_data *op_data;
         struct ptlrpc_request *req = NULL;
         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
         struct obd_export *exp;
         struct inode *parent;
+        int rc, first = 0;
 
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
@@ -359,7 +369,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
 
                 rc = ll_have_md_lock(de->d_parent->d_inode,
                                      MDS_INODELOCK_UPDATE);
-                RETURN(rc);
+                GOTO(out_sa, rc);
         }
 
         exp = ll_i2mdexp(de->d_inode);
@@ -367,12 +377,12 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
         /* Never execute intents for mount points.
          * Attributes will be fixed up in ll_inode_revalidate_it */
         if (d_mountpoint(de))
-                RETURN(1);
+                GOTO(out_sa, rc = 1);
 
         /* Root of the lustre tree. Always valid.
          * Attributes will be fixed up in ll_inode_revalidate_it */
         if (de == de->d_sb->s_root)
-                RETURN(1);
+                GOTO(out_sa, rc = 1);
 
         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
         ll_frob_intent(&it, &lookup_it);
@@ -434,6 +444,9 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
                 }
         }
 
+        if (it->it_op == IT_GETATTR)
+                first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
+
 do_lock:
         it->it_create_mode &= ~current->fs->umask;
         it->it_flags |= O_CHECK_STALE;
@@ -442,6 +455,9 @@ do_lock:
                             &req, ll_md_blocking_ast, 0);
         it->it_flags &= ~O_CHECK_STALE;
         ll_finish_md_op_data(op_data);
+        if (it->it_op == IT_GETATTR && !first)
+                ll_statahead_exit(de, rc);
+
         /* If req is NULL, then md_intent_lock only tried to do a lock match;
          * if all was well, it will return 1 if it found locks, 0 otherwise. */
         if (req == NULL && rc >= 0) {
@@ -564,6 +580,19 @@ do_lookup:
         }
         ll_finish_md_op_data(op_data);
         GOTO(out, rc = 0);
+
+out_sa:
+        /*
+         * For rc == 1 case, should not return directly to prevent losing
+         * statahead windows; for rc == 0 case, the "lookup" will be done later.
+         */
+        if (it && it->it_op == IT_GETATTR && rc == 1) {
+                first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
+                if (!first)
+                        ll_statahead_exit(de, rc);
+        }
+
+        return rc;
 }
 
 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
@@ -747,3 +776,45 @@ struct dentry_operations ll_d_ops = {
         .d_unpin = ll_unpin,
 #endif
 };
+
+static int ll_fini_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
+{
+        ENTRY;
+        /* need lookup */
+        RETURN(0);
+}
+
+struct dentry_operations ll_fini_d_ops = {
+        .d_revalidate = ll_fini_revalidate_nd,
+        .d_release = ll_release,
+};
+
+/*
+ * It is for the following race condition:
+ * When someone (maybe statahead thread) adds the dentry to the dentry hash
+ * table, the dentry's "d_op" maybe NULL, at the same time, another (maybe
+ * "ls -l") process finds such dentry by "do_lookup()" without "do_revalidate()"
+ * called. It causes statahead window lost, and maybe other issues. --Fan Yong
+ */
+static int ll_init_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
+{
+        struct l_wait_info lwi = { 0 };
+        struct ll_dentry_data *lld;
+        ENTRY;
+
+        ll_set_dd(dentry);
+        lld = ll_d2d(dentry);
+        if (unlikely(lld == NULL))
+                RETURN(-ENOMEM);
+
+        l_wait_event(lld->lld_waitq, dentry->d_op != &ll_init_d_ops, &lwi);
+        if (likely(dentry->d_op == &ll_d_ops))
+                RETURN(ll_revalidate_nd(dentry, nd));
+        else
+                RETURN(dentry->d_op == &ll_fini_d_ops ? 0 : -EINVAL);
+}
+
+struct dentry_operations ll_init_d_ops = {
+        .d_revalidate = ll_init_revalidate_nd,
+        .d_release = ll_release,
+};
index f88f514..dc8a211 100644 (file)
@@ -27,7 +27,6 @@
  */
 
 #include <linux/fs.h>
-#include <linux/ext2_fs.h>
 #include <linux/pagemap.h>
 #include <linux/mm.h>
 #include <linux/version.h>
@@ -144,7 +143,7 @@ static int ll_dir_readpage(struct file *file, struct page *page)
         int rc;
         ENTRY;
 
-        hash = hash_x_index(page->index);
+        hash = (__u64)hash_x_index(page->index);
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n",
                inode->i_ino, inode->i_generation, inode, (unsigned long)hash);
 
@@ -175,32 +174,33 @@ struct address_space_operations ll_dir_aops = {
         .readpage  = ll_dir_readpage,
 };
 
-static inline unsigned long dir_pages(struct inode *inode)
-{
-        return (i_size_read(inode) + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-}
-
-static inline unsigned ll_chunk_size(struct inode *inode)
-{
-        return inode->i_sb->s_blocksize;
-}
-
 static void ll_check_page(struct inode *dir, struct page *page)
 {
         /* XXX: check page format later */
         SetPageChecked(page);
 }
 
-static inline void ll_put_page(struct page *page)
+static void ll_release_page(struct page *page, __u64 hash,
+                            __u64 start, __u64 end)
 {
         kunmap(page);
+        lock_page(page);
+        if (likely(page->mapping != NULL)) {
+                ll_truncate_complete_page(page);
+                unlock_page(page);
+        } else {
+                unlock_page(page);
+                CWARN("NULL mapping page %p, truncated by others: "
+                      "hash(%#llx) | start(%#llx) | end(%#llx)\n",
+                      page, hash, start, end);
+        }
         page_cache_release(page);
 }
 
 /*
  * Find, kmap and return page that contains given hash.
  */
-static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
+static struct page *ll_dir_page_locate(struct inode *dir, __u64 hash,
                                        __u64 *start, __u64 *end)
 {
         struct address_space *mapping = dir->i_mapping;
@@ -209,17 +209,17 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
          * radix_tree_gang_lookup() can be used to find a page with starting
          * hash _smaller_ than one we are looking for.
          */
-        unsigned long offset = hash_x_index(hash);
+        unsigned long offset = hash_x_index((__u32)hash);
         struct page *page;
         int found;
 
         TREE_READ_LOCK_IRQ(mapping);
-       found = radix_tree_gang_lookup(&mapping->page_tree,
+        found = radix_tree_gang_lookup(&mapping->page_tree,
                                        (void **)&page, offset, 1);
-       if (found > 0) {
+        if (found > 0) {
                 struct lu_dirpage *dp;
 
-               page_cache_get(page);
+                page_cache_get(page);
                 TREE_READ_UNLOCK_IRQ(mapping);
                 /*
                  * In contrast to find_lock_page() we are sure that directory
@@ -236,11 +236,7 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
                         *end   = le64_to_cpu(dp->ldp_hash_end);
                         LASSERT(*start <= hash);
                         if (hash > *end || (*end != *start && hash == *end)) {
-                                kunmap(page);
-                                lock_page(page);
-                                ll_truncate_complete_page(page);
-                                unlock_page(page);
-                                page_cache_release(page);
+                                ll_release_page(page, hash, *start, *end);
                                 page = NULL;
                         }
                 } else {
@@ -248,15 +244,15 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
                         page = ERR_PTR(-EIO);
                 }
 
-       } else {
+        } else {
                 TREE_READ_UNLOCK_IRQ(mapping);
                 page = NULL;
         }
         return page;
 }
 
-static struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact,
-                                    struct ll_dir_chain *chain)
+struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact,
+                             struct ll_dir_chain *chain)
 {
         ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} };
         struct address_space *mapping = dir->i_mapping;
@@ -278,7 +274,7 @@ static struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact,
                 struct ptlrpc_request *request;
                 struct md_op_data *op_data;
 
-                op_data = ll_prep_md_op_data(NULL, dir, NULL, NULL, 0, 0, 
+                op_data = ll_prep_md_op_data(NULL, dir, NULL, NULL, 0, 0,
                                              LUSTRE_OPC_ANY, NULL);
                 if (IS_ERR(op_data))
                         return (void *)op_data;
@@ -328,17 +324,15 @@ static struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact,
                          * entries with smaller hash values. Stale page should
                          * be invalidated, and new one fetched.
                          */
-                        CWARN("Stale readpage page %p: %#lx != %#lx\n", page,
-                              (unsigned long)hash, (unsigned long)start);
-                        lock_page(page);
-                        ll_truncate_complete_page(page);
-                        unlock_page(page);
-                        page_cache_release(page);
-                } else
+                        CWARN("Stale readpage page %p: %#llx != %#llx\n",
+                              page, hash, start);
+                        ll_release_page(page, hash, start, end);
+                } else {
                         GOTO(hash_collision, page);
+                }
         }
 
-        page = read_cache_page(mapping, hash_x_index(hash),
+        page = read_cache_page(mapping, hash_x_index((__u32)hash),
                                (filler_t*)mapping->a_ops->readpage, NULL);
         if (IS_ERR(page))
                 GOTO(out_unlock, page);
@@ -411,9 +405,9 @@ int ll_readdir(struct file *filp, void *cookie, filldir_t filldir)
                 struct lu_dirent  *ent;
 
                 if (!IS_ERR(page)) {
-                        /* 
+                        /*
                          * If page is empty (end of directoryis reached),
-                         * use this value. 
+                         * use this value.
                          */
                         __u64 hash = DIR_END_OFF;
                         __u64 next;
@@ -610,8 +604,8 @@ end:
         return rc;
 }
 
-int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmmp, 
-                     int *lmm_size, struct ptlrpc_request **request) 
+int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmmp,
+                     int *lmm_size, struct ptlrpc_request **request)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct mdt_body   *body;
@@ -619,7 +613,7 @@ int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmmp,
         struct ptlrpc_request *req = NULL;
         int rc, lmmsize;
         struct obd_capa *oc;
-        
+
         rc = ll_get_max_mdsize(sbi, &lmmsize);
         if (rc)
                 RETURN(rc);
@@ -768,7 +762,7 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
                         if (IS_ERR(filename))
                                 RETURN(PTR_ERR(filename));
 
-                        rc = ll_lov_getstripe_ea_info(inode, filename, &lmm, 
+                        rc = ll_lov_getstripe_ea_info(inode, filename, &lmm,
                                                       &lmmsize, &request);
                 } else {
                         rc = ll_dir_getstripe(inode, &lmm, &lmmsize, &request);
@@ -783,7 +777,7 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
                 }
 
                 if (rc < 0) {
-                        if (rc == -ENODATA && (cmd == IOC_MDC_GETFILEINFO || 
+                        if (rc == -ENODATA && (cmd == IOC_MDC_GETFILEINFO ||
                                                cmd == LL_IOC_MDC_GETINFO))
                                 GOTO(skip_lmm, rc = 0);
                         else
index 32360dc..7d1765a 100644 (file)
@@ -300,11 +300,17 @@ int ll_file_release(struct inode *inode, struct file *file)
         }
 #endif
 
-        ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
+        if (inode->i_sb->s_root != file->f_dentry)
+                ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
         fd = LUSTRE_FPRIVATE(file);
         LASSERT(fd != NULL);
 
-        /* don't do anything for / */
+        /* The last ref on @file, maybe not the the owner pid of statahead.
+         * Different processes can open the same dir, "ll_opendir_key" means:
+         * it is me that should stop the statahead thread. */
+        if (lli->lli_opendir_key == fd)
+                ll_stop_statahead(inode, fd);
+
         if (inode->i_sb->s_root == file->f_dentry) {
                 LUSTRE_FPRIVATE(file) = NULL;
                 ll_file_data_put(fd);
@@ -329,6 +335,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         struct md_op_data *op_data;
         struct ptlrpc_request *req;
         int rc;
+        ENTRY;
 
         if (!parent)
                 RETURN(-ENOENT);
@@ -465,7 +472,7 @@ int ll_file_open(struct inode *inode, struct file *file)
         struct obd_client_handle **och_p;
         __u64 *och_usecount;
         struct ll_file_data *fd;
-        int rc = 0;
+        int rc = 0, opendir_set = 0;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
@@ -482,7 +489,29 @@ int ll_file_open(struct inode *inode, struct file *file)
         if (fd == NULL)
                 RETURN(-ENOMEM);
 
-        /* don't do anything for / */
+        if (S_ISDIR(inode->i_mode)) {
+                spin_lock(&lli->lli_lock);
+                /* "lli->lli_opendir_pid != 0" means someone has set it.
+                 * "lli->lli_sai != NULL" means the previous statahead has not
+                 *                        been cleanup. */ 
+                if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
+                        opendir_set = 1;
+                        lli->lli_opendir_pid = cfs_curproc_pid();
+                        lli->lli_opendir_key = fd;
+                } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
+                        /* Two cases for this:
+                         * (1) The same process open such directory many times.
+                         * (2) The old process opened the directory, and exited
+                         *     before its children processes. Then new process
+                         *     with the same pid opens such directory before the
+                         *     old process's children processes exit.
+                         * Change the owner to the latest one. */
+                        opendir_set = 2;
+                        lli->lli_opendir_key = fd;
+                }
+                spin_unlock(&lli->lli_lock);
+        }
+
         if (inode->i_sb->s_root == file->f_dentry) {
                 LUSTRE_FPRIVATE(file) = fd;
                 RETURN(0);
@@ -632,9 +661,13 @@ out_och_free:
                         (*och_usecount)--;
                 }
                 up(&lli->lli_och_sem);
-out_openerr: ;/* Looks weierd, eh? Just wait for statahead code to insert
-                a statement here <-- remove this comment after statahead
-                landing */
+out_openerr:
+                if (opendir_set == 1) {
+                        lli->lli_opendir_key = NULL;
+                        lli->lli_opendir_pid = 0;
+                } else if (unlikely(opendir_set == 2)) {
+                        ll_stop_statahead(inode, fd);
+                }
         }
 
         return rc;
@@ -686,7 +719,8 @@ int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
                lli->lli_smd->lsm_object_id, i_size_read(inode),
-               (unsigned long long)inode->i_blocks, ll_inode_blksize(inode));
+               (unsigned long long)inode->i_blocks,
+               (unsigned long)ll_inode_blksize(inode));
         RETURN(0);
 }
 
index 6aed986..3b73342 100644 (file)
@@ -5,7 +5,7 @@
 #ifndef LLITE_INTERNAL_H
 #define LLITE_INTERNAL_H
 
-# include <linux/lustre_acl.h>
+#include <linux/lustre_acl.h>
 
 #ifdef CONFIG_FS_POSIX_ACL
 # include <linux/fs.h>
@@ -42,11 +42,13 @@ struct ll_dentry_data {
         struct obd_client_handle lld_cwd_och;
         struct obd_client_handle lld_mnt_och;
 #ifndef HAVE_VFS_INTENT_PATCHES
-        struct lookup_intent     *lld_it;
+        struct lookup_intent    *lld_it;
 #endif
+        unsigned int             lld_sa_generation;
+        cfs_waitq_t              lld_waitq;
 };
 
-#define ll_d2d(de) ((struct ll_dentry_data*) de->d_fsdata)
+#define ll_d2d(de) ((struct ll_dentry_data*)((de)->d_fsdata))
 
 extern struct file_operations ll_pgcache_seq_fops;
 
@@ -141,6 +143,19 @@ struct ll_inode_info {
         atomic_t                lli_open_count;
         struct obd_capa        *lli_mds_capa;
         struct list_head        lli_oss_capas;
+
+        /* metadata stat-ahead */
+        /*
+         * "opendir_pid" is the token when lookup/revalid -- I am the owner of
+         * dir statahead.
+         */
+        pid_t                   lli_opendir_pid;
+        /* 
+         * since parent-child threads can share the same @file struct,
+         * "opendir_key" is the token when dir close for case of parent exit
+         * before child -- it is me should cleanup the dir readahead. */
+        void                   *lli_opendir_key;
+        struct ll_statahead_info *lli_sai;
 };
 
 /*
@@ -325,6 +340,18 @@ struct ll_sb_info {
         int                       ll_stats_track_id;
         int                       ll_rw_stats_on;
 
+        /* metadata stat-ahead */
+        unsigned int              ll_sa_max;     /* max statahead RPCs */
+        unsigned int              ll_sa_wrong;   /* statahead thread stopped for
+                                                  * low hit ratio */
+        unsigned int              ll_sa_total;   /* statahead thread started
+                                                  * count */
+        unsigned long long        ll_sa_blocked; /* ls count waiting for
+                                                  * statahead */
+        unsigned long long        ll_sa_cached;  /* ls count got in cache */
+        unsigned long long        ll_sa_hit;     /* hit count */
+        unsigned long long        ll_sa_miss;    /* miss count */
+
         dev_t                     ll_sdev_orig; /* save s_dev before assign for
                                                  * clustred nfs */
         struct rmtacl_ctl_table   ll_rct;
@@ -529,21 +556,30 @@ static void lprocfs_llite_init_vars(struct lprocfs_static_vars *lvars)
 
 
 /* llite/dir.c */
+static inline void ll_put_page(struct page *page)
+{
+        kunmap(page);
+        page_cache_release(page);
+}
+
 extern struct file_operations ll_dir_operations;
 extern struct inode_operations ll_dir_inode_operations;
-
+struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact,
+                             struct ll_dir_chain *chain);
 /* llite/namei.c */
 int ll_objects_destroy(struct ptlrpc_request *request,
                        struct inode *dir);
 struct inode *ll_iget(struct super_block *sb, ino_t hash,
                       struct lustre_md *lic);
-struct dentry *ll_find_alias(struct inode *, struct dentry *);
 int ll_md_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                        void *data, int flag);
 #ifndef HAVE_VFS_INTENT_PATCHES
 struct lookup_intent *ll_convert_intent(struct open_intent *oit,
                                         int lookup_flags);
 #endif
+int ll_lookup_it_finish(struct ptlrpc_request *request,
+                        struct lookup_intent *it, void *data);
+void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
 
 /* llite/rw.c */
 int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
@@ -621,6 +657,9 @@ int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmm,
                      int *lmm_size, struct ptlrpc_request **request);
 
 /* llite/dcache.c */
+extern struct dentry_operations ll_init_d_ops;
+extern struct dentry_operations ll_d_ops;
+extern struct dentry_operations ll_fini_d_ops;
 void ll_intent_drop_lock(struct lookup_intent *);
 void ll_intent_release(struct lookup_intent *);
 int ll_drop_dentry(struct dentry *dentry);
@@ -856,6 +895,93 @@ void et_init(struct eacl_table *et);
 void et_fini(struct eacl_table *et);
 #endif
 
+/* statahead.c */
+
+#define LL_SA_RPC_MIN   2
+#define LL_SA_RPC_DEF   32
+#define LL_SA_RPC_MAX   8192
+
+/* per inode struct, for dir only */
+struct ll_statahead_info {
+        struct inode           *sai_inode;
+        unsigned int            sai_generation; /* generation for statahead */
+        atomic_t                sai_refcount;   /* when access this struct, hold
+                                                 * refcount */
+        unsigned int            sai_sent;       /* stat requests sent count */
+        unsigned int            sai_replied;    /* stat requests which received
+                                                 * reply */
+        unsigned int            sai_max;        /* max ahead of lookup */
+        unsigned int            sai_index;      /* index of statahead entry */
+        unsigned int            sai_hit;        /* hit count */
+        unsigned int            sai_miss;       /* miss count:
+                                                 * for "ls -al" case, it includes
+                                                 * hidden dentry miss;
+                                                 * for "ls -l" case, it does not
+                                                 * include hidden dentry miss.
+                                                 * "sai_miss_hidden" is used for
+                                                 * the later case.
+                                                 */
+        unsigned int            sai_consecutive_miss; /* consecutive miss */
+        unsigned int            sai_miss_hidden;/* "ls -al", but first dentry
+                                                 * is not a hidden one */
+        unsigned int            sai_skip_hidden;/* skipped hidden dentry count */
+        unsigned int            sai_ls_all:1;   /* "ls -al", do stat-ahead for
+                                                 * hidden entries */
+        cfs_waitq_t             sai_waitq;      /* stat-ahead wait queue */
+        struct ptlrpc_thread    sai_thread;     /* stat-ahead thread */
+        struct list_head        sai_entries;    /* stat-ahead entries */
+};
+
+int do_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup);
+void ll_statahead_exit(struct dentry *dentry, int result);
+void ll_stop_statahead(struct inode *inode, void *key);
+
+static inline
+void ll_d_wakeup(struct dentry *dentry)
+{
+        struct ll_dentry_data *lld = ll_d2d(dentry);
+
+        LASSERT(dentry->d_op != &ll_init_d_ops);
+        if (lld != NULL)
+                cfs_waitq_broadcast(&lld->lld_waitq);
+}
+
+static inline
+int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
+{
+        struct ll_sb_info        *sbi = ll_i2sbi(dir);
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_dentry_data    *ldd = ll_d2d(*dentryp);
+
+        if (sbi->ll_sa_max == 0)
+                return -ENOTSUPP;
+
+        /* not the same process, don't statahead */
+        if (lli->lli_opendir_pid != cfs_curproc_pid())
+                return -EBADF;
+
+        /*
+         * When "ls" a dentry, the system trigger more than once "revalidate" or
+         * "lookup", for "getattr", for "getxattr", and maybe for others.
+         * Under patchless client mode, the operation intent is not accurate,
+         * it maybe misguide the statahead thread. For example:
+         * The "revalidate" call for "getattr" and "getxattr" of a dentry maybe
+         * have the same operation intent -- "IT_GETATTR".
+         * In fact, one dentry should has only one chance to interact with the
+         * statahead thread, otherwise the statahead windows will be confused.
+         * The solution is as following:
+         * Assign "lld_sa_generation" with "sai_generation" when a dentry
+         * "IT_GETATTR" for the first time, and the subsequent "IT_GETATTR"
+         * will bypass interacting with statahead thread for checking:
+         * "lld_sa_generation == lli_sai->sai_generation"
+         */ 
+        if (ldd && lli->lli_sai &&
+            ldd->lld_sa_generation == lli->lli_sai->sai_generation)
+                return -EAGAIN;
+
+        return do_statahead_enter(dir, dentryp, lookup);
+}
+
 /* llite ioctl register support rountine */
 #ifdef __KERNEL__
 enum llioc_iter {
index e5f2380..aa14d11 100644 (file)
@@ -106,6 +106,9 @@ static struct ll_sb_info *ll_init_sbi(void)
                 spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock);
         }
 
+        /* metadata statahead is enabled by default */
+        sbi->ll_sa_max = LL_SA_RPC_DEF;
+
         RETURN(sbi);
 }
 
@@ -1105,6 +1108,13 @@ void ll_clear_inode(struct inode *inode)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
 
+        if (S_ISDIR(inode->i_mode)) {
+                /* these should have been cleared in ll_file_release */
+                LASSERT(lli->lli_sai == NULL);
+                LASSERT(lli->lli_opendir_key == NULL);
+                LASSERT(lli->lli_opendir_pid == 0);
+        }
+
         ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK;
         md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
                          null_if_equal, inode);
@@ -2239,6 +2249,7 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data,
                 op_data->op_capa2 = ll_mdscapa_get(i2);
         } else {
                 fid_zero(&op_data->op_fid2);
+                op_data->op_capa2 = NULL;
         }
 
         op_data->op_name = name;
index 8892862..61f06d6 100644 (file)
@@ -78,8 +78,6 @@ static struct inode *search_inode_for_lustre(struct super_block *sb,
         RETURN(inode);
 }
 
-extern struct dentry_operations ll_d_ops;
-
 static struct dentry *ll_iget_for_nfs(struct super_block *sb,
                                       struct lu_fid *fid,
                                       umode_t mode)
@@ -109,8 +107,20 @@ static struct dentry *ll_iget_for_nfs(struct super_block *sb,
                 iput(inode);
                 RETURN(ERR_PTR(-ENOMEM));
         }
+
         ll_set_dd(result);
-        result->d_op = &ll_d_ops;
+
+        lock_dentry(result);
+        if (unlikely(result->d_op == &ll_init_d_ops)) {
+                result->d_op = &ll_d_ops;
+                unlock_dentry(result);
+                smp_wmb();
+                ll_d_wakeup(result);
+        } else {
+                result->d_op = &ll_d_ops;
+                unlock_dentry(result);
+        }
+
         RETURN(result);
 }
 
index 50a476e..4e785a3 100644 (file)
@@ -460,6 +460,56 @@ static int ll_wr_track_gid(struct file *file, const char *buffer,
         return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID));
 }
 
+static int ll_rd_statahead_max(char *page, char **start, off_t off,
+                               int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return snprintf(page, count, "%u\n", sbi->ll_sa_max);
+}
+
+static int ll_wr_statahead_max(struct file *file, const char *buffer,
+                               unsigned long count, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+        int val, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+
+        if (val >= 0 && val <= LL_SA_RPC_MAX)
+                sbi->ll_sa_max = val;
+        else
+                CERROR("Bad statahead_max value %d. Valid values are in the "
+                       "range [0, %d]\n", val, LL_SA_RPC_MAX);
+
+        return count;
+}
+
+static int ll_rd_statahead_stats(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return snprintf(page, count,
+                        "statahead wrong: %u\n"
+                        "statahead total: %u\n"
+                        "ls blocked:      %llu\n"
+                        "ls cached:       %llu\n"
+                        "hit count:       %llu\n"
+                        "miss count:      %llu\n",
+                        sbi->ll_sa_wrong,
+                        sbi->ll_sa_total,
+                        sbi->ll_sa_blocked,
+                        sbi->ll_sa_cached,
+                        sbi->ll_sa_hit,
+                        sbi->ll_sa_miss);
+}
+
 static int ll_rd_contention_time(char *page, char **start, off_t off,
                                  int count, int *eof, void *data)
 {
@@ -523,6 +573,8 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "stats_track_pid",  ll_rd_track_pid, ll_wr_track_pid, 0 },
         { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
         { "stats_track_gid",  ll_rd_track_gid, ll_wr_track_gid, 0 },
+        { "statahead_max",    ll_rd_statahead_max, ll_wr_statahead_max, 0 },
+        { "statahead_stats",  ll_rd_statahead_stats, 0, 0 },
         { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
         { "lockless_truncate", ll_rd_lockless_truncate,
                                ll_wr_lockless_truncate, 0},
index bfe181f..2631bda 100644 (file)
@@ -37,9 +37,6 @@
 #include <lustre_mdc.h>
 #include "llite_internal.h"
 
-/* methods */
-extern struct dentry_operations ll_d_ops;
-
 /*
  * Check if we have something mounted at the named dchild.
  * In such a case there would always be dentry present.
@@ -317,7 +314,7 @@ static void ll_d_add(struct dentry *de, struct inode *inode)
  * in ll_revalidate_it.  After revaliadate inode will be have hashed aliases
  * and it triggers BUG_ON in d_instantiate_unique (bug #10954).
  */
-struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
+static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
 {
         struct list_head *tmp;
         struct dentry *dentry;
@@ -387,25 +384,58 @@ struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
         return de;
 }
 
-static int lookup_it_finish(struct ptlrpc_request *request,
-                            struct lookup_intent *it,
-                            void *data)
+static inline void ll_dop_init(struct dentry *de, int *set)
+{
+        lock_dentry(de);
+        if (likely(de->d_op != &ll_d_ops)) {
+                de->d_op = &ll_init_d_ops;
+                *set = 1;
+        }
+        unlock_dentry(de);
+}
+
+static inline void ll_dop_fini(struct dentry *de, int succ)
+{
+        lock_dentry(de);
+        if (likely(de->d_op == &ll_init_d_ops)) {
+                if (succ)
+                        de->d_op = &ll_d_ops;
+                else
+                        de->d_op = &ll_fini_d_ops;
+                unlock_dentry(de);
+                smp_wmb();
+                ll_d_wakeup(de);
+        } else {
+                if (succ)
+                        de->d_op = &ll_d_ops;
+                unlock_dentry(de);
+        }
+}
+
+int ll_lookup_it_finish(struct ptlrpc_request *request,
+                     struct lookup_intent *it, void *data)
 {
         struct it_cb_data *icbd = data;
         struct dentry **de = icbd->icbd_childp;
         struct inode *parent = icbd->icbd_parent;
         struct ll_sb_info *sbi = ll_i2sbi(parent);
         struct inode *inode = NULL;
-        int rc;
+        int set = 0, rc;
+        ENTRY;
+
+        ll_dop_init(*de, &set);
 
         /* NB 1 request reference will be taken away by ll_intent_lock()
          * when I return */
         if (!it_disposition(it, DISP_LOOKUP_NEG)) {
-                ENTRY;
+                struct dentry *save = *de;
 
                 rc = ll_prep_inode(&inode, request, (*de)->d_sb);
-                if (rc)
+                if (rc) {
+                        if (set)
+                                ll_dop_fini(*de, 0);
                         RETURN(rc);
+                }
 
                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
                        inode, inode->i_ino, inode->i_generation);
@@ -422,8 +452,9 @@ static int lookup_it_finish(struct ptlrpc_request *request,
                    Also see bug 7198. */
 
                 *de = ll_find_alias(inode, *de);
+                if (set && *de != save)
+                        ll_dop_fini(save, 0);
         } else {
-                ENTRY;
                 /* Check that parent has UPDATE lock. If there is none, we
                    cannot afford to hash this dentry (done by ll_d_add) as it
                    might get picked up later when UPDATE lock will appear */
@@ -444,7 +475,8 @@ static int lookup_it_finish(struct ptlrpc_request *request,
         }
 
         ll_set_dd(*de);
-        (*de)->d_op = &ll_d_ops;
+
+        ll_dop_fini(*de, 1);
 
         RETURN(0);
 }
@@ -482,6 +514,15 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                         RETURN(ERR_PTR(rc));
         }
 
+        if (it->it_op == IT_GETATTR) {
+                rc = ll_statahead_enter(parent, &dentry, 1);
+                if (rc >= 0) {
+                        ll_statahead_exit(dentry, rc);
+                        if (rc == 1)
+                                RETURN(retval = dentry);
+                }
+        }
+
         icbd.icbd_childp = &dentry;
         icbd.icbd_parent = parent;
 
@@ -505,7 +546,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
         if (rc < 0)
                 GOTO(out, retval = ERR_PTR(rc));
 
-        rc = lookup_it_finish(req, it, &icbd);
+        rc = ll_lookup_it_finish(req, it, &icbd);
         if (rc != 0) {
                 ll_intent_release(it);
                 GOTO(out, retval = ERR_PTR(rc));
index 9818ec4..7cbc293 100644 (file)
@@ -76,7 +76,7 @@ static void ll_invalidatepage(struct page *page, unsigned long offset)
                 ll_removepage(page);
 }
 #endif
-static int ll_releasepage(struct page *page, int gfp_mask)
+static int ll_releasepage(struct page *page, gfp_t gfp_mask)
 {
         if (PagePrivate(page))
                 ll_removepage(page);
diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c
new file mode 100644 (file)
index 0000000..c2780ca
--- /dev/null
@@ -0,0 +1,963 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2007 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <linux/fs.h>
+#include <linux/sched.h>
+#include <linux/mm.h>
+#include <linux/smp_lock.h>
+#include <linux/highmem.h>
+#include <linux/pagemap.h>
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <obd_support.h>
+#include <lustre_lite.h>
+#include <lustre_dlm.h>
+#include <linux/lustre_version.h>
+#include "llite_internal.h"
+
+struct ll_sai_entry {
+        struct list_head        se_list;
+        unsigned int            se_index;
+        int                     se_stat;
+};
+
+enum {
+        SA_ENTRY_UNSTATED = 0,
+        SA_ENTRY_STATED
+};
+
+static unsigned int sai_generation = 0;
+static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
+
+static struct ll_statahead_info *ll_sai_alloc(void)
+{
+        struct ll_statahead_info *sai;
+
+        OBD_ALLOC_PTR(sai);
+        if (!sai)
+                return NULL;
+
+        spin_lock(&sai_generation_lock);
+        sai->sai_generation = ++sai_generation;
+        if (unlikely(sai_generation == 0))
+                sai->sai_generation = ++sai_generation;
+        spin_unlock(&sai_generation_lock);
+        atomic_set(&sai->sai_refcount, 1);
+        sai->sai_max = LL_SA_RPC_MIN;
+        cfs_waitq_init(&sai->sai_waitq);
+        cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
+        CFS_INIT_LIST_HEAD(&sai->sai_entries);
+        return sai;
+}
+
+static inline 
+struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
+{
+        LASSERT(sai);
+        atomic_inc(&sai->sai_refcount);
+        return sai;
+}
+
+static void ll_sai_put(struct ll_statahead_info *sai)
+{
+        struct inode         *inode = sai->sai_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        ENTRY;
+
+        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
+                struct ll_sai_entry *entry, *next;
+
+                lli->lli_sai = NULL;
+                spin_unlock(&lli->lli_lock);
+
+                LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
+
+                if (sai->sai_sent > sai->sai_replied)
+                        CDEBUG(D_READA,"statahead for dir "DFID" does not "
+                              "finish: [sent:%u] [replied:%u]\n",
+                              PFID(&lli->lli_fid),
+                              sai->sai_sent, sai->sai_replied);
+
+                list_for_each_entry_safe(entry, next, &sai->sai_entries,
+                                         se_list) {
+                        list_del(&entry->se_list);
+                        OBD_FREE_PTR(entry);
+                }
+                OBD_FREE_PTR(sai);
+                iput(inode);
+        }
+        EXIT;
+}
+
+static struct ll_sai_entry *
+ll_sai_entry_get(struct ll_statahead_info *sai, unsigned int index, int stat)
+{
+        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+        struct ll_sai_entry  *entry;
+        ENTRY;
+
+        OBD_ALLOC_PTR(entry);
+        if (entry == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
+
+        CDEBUG(D_READA, "alloc sai entry %p index %u, stat %d\n",
+               entry, index, stat);
+        entry->se_index = index;
+        entry->se_stat  = stat;
+
+        spin_lock(&lli->lli_lock);
+        list_add_tail(&entry->se_list, &sai->sai_entries);
+        spin_unlock(&lli->lli_lock);
+
+        RETURN(entry);
+}
+
+/* inside lli_lock
+ * return value:
+ *  0: can not find the entry with the index
+ *  1: it is the first entry
+ *  2: it is not the first entry */
+static int
+ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat)
+{
+        struct ll_sai_entry *entry;
+        int                  rc = 0;
+        ENTRY;
+
+        if (list_empty(&sai->sai_entries))
+                RETURN(0);
+
+        entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
+        if (entry->se_index == index)
+                GOTO(out, rc = 1);
+
+        while (entry->se_list.next != &sai->sai_entries &&
+               entry->se_index < index) {
+                entry = list_entry(entry->se_list.next, struct ll_sai_entry,
+                                   se_list);
+                if (entry->se_index == index)
+                        GOTO(out, rc = 2);
+        }
+
+        EXIT;
+
+out:
+        if (rc) {
+                LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
+                entry->se_stat = stat;
+        }
+
+        return rc;
+}
+
+/* Check whether first entry was stated already or not.
+ * No need to hold lli_lock, for:
+ * (1) it is me that remove entry from the list (ll_sai_entry_put)
+ * (2) the statahead thread only add new entry to the list tail */
+static int ll_sai_entry_stated(struct ll_statahead_info *sai)
+{
+        struct ll_sai_entry  *entry;
+        int                   rc = 0;
+        ENTRY;
+
+        if (!list_empty(&sai->sai_entries)) {
+                entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
+                                   se_list);
+                rc = (entry->se_stat != SA_ENTRY_UNSTATED);
+        }
+
+        RETURN(rc);
+}
+
+static void ll_sai_entry_put(struct ll_statahead_info *sai)
+{
+        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+        struct ll_sai_entry  *entry;
+        ENTRY;
+
+        spin_lock(&lli->lli_lock);
+        if (!list_empty(&sai->sai_entries)) {
+                entry = list_entry(sai->sai_entries.next,
+                                   struct ll_sai_entry, se_list);
+                list_del(&entry->se_list);
+                OBD_FREE_PTR(entry);
+        }
+        spin_unlock(&lli->lli_lock);
+
+        EXIT;
+}
+
+/* finish lookup/revalidate */
+static int ll_statahead_interpret(struct ptlrpc_request *req,
+                                  struct md_enqueue_info *minfo,
+                                  int rc)
+{
+        struct lookup_intent     *it = &minfo->mi_it;
+        struct dentry            *dentry = minfo->mi_dentry;
+        struct inode             *dir = dentry->d_parent->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_statahead_info *sai = NULL;
+        ENTRY;
+
+        CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
+               dentry->d_name.len, dentry->d_name.name, rc);
+
+        spin_lock(&lli->lli_lock);
+        if (unlikely(lli->lli_sai == NULL ||
+            lli->lli_sai->sai_generation != minfo->mi_generation)) {
+                spin_unlock(&lli->lli_lock);
+                GOTO(out_free, rc = -ESTALE);
+        } else {
+                sai = ll_sai_get(lli->lli_sai);
+                spin_unlock(&lli->lli_lock);
+        }
+
+        if (rc || dir == NULL)
+                GOTO(out, rc);
+
+        if (dentry->d_inode == NULL) {
+                /* lookup */
+                struct dentry    *save = dentry;
+                struct it_cb_data icbd = {
+                        .icbd_parent   = dir,
+                        .icbd_childp   = &dentry
+                };
+
+                LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
+
+                rc = ll_lookup_it_finish(req, it, &icbd);
+                if (!rc)
+                        /* Here dentry->d_inode might be NULL,
+                         * because the entry may have been removed before
+                         * we start doing stat ahead. */
+                        ll_lookup_finish_locks(it, dentry);
+
+                if (dentry != save)
+                        dput(save);
+        } else {
+                /* revalidate */
+                struct mdt_body *body;
+
+                body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
+                                      sizeof(*body));
+                if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
+                        ll_unhash_aliases(dentry->d_inode);
+                        GOTO(out, rc = -EAGAIN);
+                }
+
+                rc = ll_revalidate_it_finish(req, it, dentry);
+                if (rc) {
+                        ll_unhash_aliases(dentry->d_inode);
+                        GOTO(out, rc);
+                }
+
+                spin_lock(&dcache_lock);
+                lock_dentry(dentry);
+                __d_drop(dentry);
+#ifdef DCACHE_LUSTRE_INVALID
+                dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
+#endif
+                unlock_dentry(dentry);
+                d_rehash_cond(dentry, 0);
+                spin_unlock(&dcache_lock);
+
+                ll_lookup_finish_locks(it, dentry);
+        }
+        EXIT;
+
+out:
+        if (sai != NULL) {
+                int first;
+
+                sai->sai_replied++;
+                spin_lock(&lli->lli_lock);
+                first = ll_sai_entry_set(sai,
+                                         (unsigned int)(long)minfo->mi_cbdata,
+                                         SA_ENTRY_STATED);
+                spin_unlock(&lli->lli_lock);
+                if (first == 1)
+                        /* wake up the "ls -l" process only when the first entry
+                         * returned. */
+                        cfs_waitq_signal(&sai->sai_waitq);
+                else if (first == 0)
+                        CDEBUG(D_READA, "can't find sai entry for dir "
+                               DFID" generation %u index %u\n",
+                               PFID(&lli->lli_fid),
+                               minfo->mi_generation,
+                               (unsigned int)(long)minfo->mi_cbdata);
+
+                ll_sai_put(sai);
+        }
+out_free:
+        ll_intent_release(it);
+        OBD_FREE_PTR(minfo);
+
+        dput(dentry);
+        return rc;
+}
+
+static void sa_args_fini(struct md_enqueue_info *minfo,
+                         struct ldlm_enqueue_info *einfo)
+{
+        LASSERT(minfo && einfo);
+        capa_put(minfo->mi_data.op_capa1);
+        capa_put(minfo->mi_data.op_capa2);
+        OBD_FREE_PTR(minfo);
+        OBD_FREE_PTR(einfo);
+}
+
+/* There is race condition between "capa_put" and "ll_statahead_interpret" for
+ * accessing "op_data.op_capa[1,2]" as following:
+ * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
+ * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
+ * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
+ * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
+ * "md_intent_getattr_async". */
+static int sa_args_init(struct inode *dir, struct dentry *dentry,
+                        struct md_enqueue_info **pmi,
+                        struct ldlm_enqueue_info **pei,
+                        struct obd_capa **pcapa)
+{
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        struct md_op_data        *op_data;
+
+        OBD_ALLOC_PTR(einfo);
+        if (einfo == NULL)
+                return -ENOMEM;
+
+        OBD_ALLOC_PTR(minfo);
+        if (minfo == NULL) {
+                OBD_FREE_PTR(einfo);
+                return -ENOMEM;
+        }
+
+        op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
+                                     dentry->d_name.name, dentry->d_name.len,
+                                     0, LUSTRE_OPC_ANY, NULL);
+        if (IS_ERR(op_data)) {
+                OBD_FREE_PTR(einfo);
+                OBD_FREE_PTR(minfo);
+                return PTR_ERR(op_data);
+        }
+
+        minfo->mi_it.it_op = IT_GETATTR;
+        minfo->mi_dentry = dentry;
+        minfo->mi_cb = ll_statahead_interpret;
+        minfo->mi_generation = lli->lli_sai->sai_generation;
+        minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
+
+        einfo->ei_type   = LDLM_IBITS;
+        einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
+        einfo->ei_cb_bl  = ll_md_blocking_ast;
+        einfo->ei_cb_cp  = ldlm_completion_ast;
+        einfo->ei_cb_gl  = NULL;
+        einfo->ei_cbdata = NULL;
+
+        *pmi = minfo;
+        *pei = einfo;
+        pcapa[0] = op_data->op_capa1;
+        pcapa[1] = op_data->op_capa2;
+
+        return 0;
+}
+
+/* similar to ll_lookup_it(). */
+static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
+{
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        struct obd_capa          *capas[2];
+        int                       rc;
+        ENTRY;
+
+        rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
+        if (rc)
+                RETURN(rc);
+
+        rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
+        if (!rc) {
+                capa_put(capas[0]);
+                capa_put(capas[1]);
+        } else {
+                sa_args_fini(minfo, einfo);
+        }
+
+        RETURN(rc);
+}
+
+/* similar to ll_revalidate_it().
+ * return value:
+ *  1      -- dentry valid
+ *  0      -- will send stat-ahead request
+ *  others -- prepare stat-ahead request failed */
+static int do_sa_revalidate(struct dentry *dentry)
+{
+        struct inode             *inode = dentry->d_inode;
+        struct inode             *dir = dentry->d_parent->d_inode;
+        struct lookup_intent      it = { .it_op = IT_GETATTR };
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        struct obd_capa          *capas[2];
+        int rc;
+        ENTRY;
+
+        if (inode == NULL)
+                RETURN(1);
+
+        if (d_mountpoint(dentry))
+                RETURN(1);
+
+        if (dentry == dentry->d_sb->s_root)
+                RETURN(1);
+
+        rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
+        if (rc == 1) {
+                ll_intent_release(&it);
+                RETURN(1);
+        }
+
+        rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
+        if (rc)
+                RETURN(rc);
+
+        rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
+        if (!rc) {
+                capa_put(capas[0]);
+                capa_put(capas[1]);
+        } else {
+                sa_args_fini(minfo, einfo);
+        }
+
+        RETURN(rc);
+}
+
+static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen)
+{
+        unsigned long hash = init_name_hash();
+        unsigned int  c;
+
+        this->name = name;
+        this->len  = namelen;
+        for (; namelen > 0; namelen--, name++) {
+                c = *(const unsigned char *)name;
+                hash = partial_name_hash(c, hash);
+        }
+        this->hash = end_name_hash(hash);
+}
+
+static int ll_statahead_one(struct dentry *parent, const char* entry_name,
+                            int entry_name_len)
+{
+        struct inode             *dir = parent->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_statahead_info *sai = lli->lli_sai;
+        struct qstr               name;
+        struct dentry            *dentry;
+        struct ll_sai_entry      *se;
+        int                       rc;
+        ENTRY;
+
+#ifdef DCACHE_LUSTRE_INVALID
+        if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
+#else
+        if (d_unhashed(parent)) {
+#endif
+                CDEBUG(D_READA, "parent dentry@%p %.*s is "
+                       "invalid, skip statahead\n",
+                       parent, parent->d_name.len, parent->d_name.name);
+                RETURN(-EINVAL);
+        }
+
+        se = ll_sai_entry_get(sai, sai->sai_index, SA_ENTRY_UNSTATED);
+        if (IS_ERR(se))
+                RETURN(PTR_ERR(se));
+
+        ll_name2qstr(&name, entry_name, entry_name_len);
+        dentry = d_lookup(parent, &name);
+        if (!dentry) {
+                dentry = d_alloc(parent, &name);
+                if (dentry) {
+                        rc = do_sa_lookup(dir, dentry);
+                        if (rc)
+                                dput(dentry);
+                } else {
+                        GOTO(out, rc = -ENOMEM);
+                }
+        } else {
+                rc = do_sa_revalidate(dentry);
+                if (rc)
+                        dput(dentry);
+        }
+
+        EXIT;
+
+out:
+        if (rc) {
+                CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
+                       se, se->se_index, se->se_stat, rc);
+                se->se_stat = rc;
+                cfs_waitq_signal(&sai->sai_waitq);
+        } else {
+                sai->sai_sent++;
+        }
+
+        sai->sai_index++;
+        return rc;
+}
+
+static inline int sa_check_stop(struct ll_statahead_info *sai)
+{
+        return !!(sai->sai_thread.t_flags & SVC_STOPPING);
+}
+
+static inline int sa_not_full(struct ll_statahead_info *sai)
+{
+        return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max;
+}
+
+/* (1) hit ratio less than 80%
+ * or
+ * (2) consecutive miss more than 8 */
+static inline int sa_low_hit(struct ll_statahead_info *sai)
+{
+        return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
+                (sai->sai_consecutive_miss > 8));
+}
+
+struct ll_sa_thread_args {
+        struct dentry   *sta_parent;
+        pid_t            sta_pid;
+};
+
+static int ll_statahead_thread(void *arg)
+{
+        struct ll_sa_thread_args *sta = arg;
+        struct dentry            *parent = dget(sta->sta_parent);
+        struct inode             *dir = parent->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_sb_info        *sbi = ll_i2sbi(dir);
+        struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
+        struct ptlrpc_thread     *thread = &sai->sai_thread;
+        struct page              *page;
+        __u64                     pos = 0;
+        int                       first = 0;
+        int                       rc = 0;
+        struct ll_dir_chain       chain;
+        ENTRY;
+
+        {
+                char pname[16];
+                snprintf(pname, 15, "ll_sa_%u", sta->sta_pid);
+                cfs_daemonize(pname);
+        }
+
+        sbi->ll_sa_total++;
+        spin_lock(&lli->lli_lock);
+        thread->t_flags = SVC_RUNNING;
+        spin_unlock(&lli->lli_lock);
+        cfs_waitq_signal(&thread->t_ctl_waitq);
+        CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
+
+        ll_dir_chain_init(&chain);
+        page = ll_get_dir_page(dir, pos, 0, &chain);
+
+        while (1) {
+                struct lu_dirpage *dp;
+                struct lu_dirent  *ent;
+
+                if (IS_ERR(page)) {
+                        rc = PTR_ERR(page);
+                        CERROR("error reading dir "DFID" at %llu/%u: rc %d\n",
+                               PFID(ll_inode2fid(dir)), pos,
+                               sai->sai_index, rc);
+                        break;
+                }
+
+                dp = page_address(page);
+                for (ent = lu_dirent_start(dp); ent != NULL;
+                     ent = lu_dirent_next(ent)) {
+                        struct l_wait_info lwi = { 0 };
+                        char *name = ent->lde_name;
+                        int namelen = le16_to_cpu(ent->lde_namelen);
+
+                        if (namelen == 0)
+                                /* Skip dummy record. */
+                                continue;
+
+                        if (name[0] == '.') {
+                                if (namelen == 1) {
+                                        /* skip . */
+                                        continue;
+                                } else if (name[1] == '.' && namelen == 2) {
+                                        /* skip .. */
+                                        continue;
+                                } else if (!sai->sai_ls_all) {
+                                        /* skip hidden files */
+                                        sai->sai_skip_hidden++;
+                                        continue;
+                                }
+                        }
+
+                        /* don't stat-ahead first entry */
+                        if (unlikely(!first)) {
+                                first++;
+                                continue;
+                        }
+
+                        l_wait_event(thread->t_ctl_waitq,
+                                     sa_check_stop(sai) || sa_not_full(sai),
+                                     &lwi);
+
+                        if (unlikely(sa_check_stop(sai))) {
+                                ll_put_page(page);
+                                GOTO(out, rc);
+                        }
+
+                        rc = ll_statahead_one(parent, name, namelen);
+                        if (rc < 0) {
+                                ll_put_page(page);
+                                GOTO(out, rc);
+                        }
+                }
+                pos = le64_to_cpu(dp->ldp_hash_end);
+                ll_put_page(page);
+                if (pos == DIR_END_OFF) {
+                        /* End of directory reached. */
+                        break;
+                } else if (1 /* chain is exhausted*/) {
+                        /* Normal case: continue to the next page. */
+                        page = ll_get_dir_page(dir, pos, 1, &chain);
+                } else {
+                        /* go into overflow page. */
+                }
+        }
+        EXIT;
+
+out:
+        ll_dir_chain_fini(&chain);
+        spin_lock(&lli->lli_lock);
+        thread->t_flags = SVC_STOPPED;
+        spin_unlock(&lli->lli_lock);
+        cfs_waitq_signal(&sai->sai_waitq);
+        cfs_waitq_signal(&thread->t_ctl_waitq);
+        ll_sai_put(sai);
+        dput(parent);
+        CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
+               cfs_curproc_pid());
+        return rc;
+}
+
+/* called in ll_file_release() */
+void ll_stop_statahead(struct inode *inode, void *key)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ptlrpc_thread *thread;
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_opendir_pid == 0 ||
+            unlikely(lli->lli_opendir_key != key)) {
+                spin_unlock(&lli->lli_lock);
+                return;
+        }
+
+        lli->lli_opendir_key = NULL;
+        lli->lli_opendir_pid = 0;
+
+        if (lli->lli_sai) {
+                struct l_wait_info lwi = { 0 };
+
+                thread = &lli->lli_sai->sai_thread;
+                if (!(thread->t_flags & SVC_STOPPED)) {
+                        thread->t_flags = SVC_STOPPING;
+                        spin_unlock(&lli->lli_lock);
+                        cfs_waitq_signal(&thread->t_ctl_waitq);
+
+                        CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
+                               cfs_curproc_pid());
+                        l_wait_event(thread->t_ctl_waitq,
+                                     thread->t_flags & SVC_STOPPED,
+                                     &lwi);
+                } else {
+                        spin_unlock(&lli->lli_lock);
+                }
+
+                /* Put the ref which was held when first statahead_enter.
+                 * It maybe not the last ref for some statahead requests
+                 * maybe inflight. */
+                ll_sai_put(lli->lli_sai);
+                return;
+        }
+        spin_unlock(&lli->lli_lock);
+}
+
+enum {
+        LS_NONE_FIRST_DE = 0,   /* not first dirent, or is "." */
+        LS_FIRST_DE,            /* the first non-hidden dirent */
+        LS_FIRST_DOT_DE         /* the first hidden dirent, that is ".xxx" */
+};
+
+static int is_first_dirent(struct inode *dir, struct dentry *dentry)
+{
+        struct ll_dir_chain chain;
+        struct qstr        *target = &dentry->d_name;
+        struct page        *page;
+        __u64               pos = 0;
+        int                 dot_de;
+        int                 rc = LS_NONE_FIRST_DE;
+        ENTRY;
+
+        ll_dir_chain_init(&chain);
+        page = ll_get_dir_page(dir, pos, 0, &chain);
+
+        while (1) {
+                struct lu_dirpage *dp;
+                struct lu_dirent  *ent;
+
+                if (IS_ERR(page)) {
+                        rc = PTR_ERR(page);
+                        CERROR("error reading dir "DFID" at %llu: rc %d\n",
+                               PFID(ll_inode2fid(dir)), pos, rc);
+                        break;
+                }
+
+                dp = page_address(page);
+                for (ent = lu_dirent_start(dp); ent != NULL;
+                     ent = lu_dirent_next(ent)) {
+                        char *name = ent->lde_name;
+                        int namelen = le16_to_cpu(ent->lde_namelen);
+
+                        if (namelen == 0)
+                                /* Skip dummy record. */
+                                continue;
+
+                        if (name[0] == '.') {
+                                if (namelen == 1)
+                                        /* skip . */
+                                        continue;
+                                else if (name[1] == '.' && namelen == 2)
+                                        /* skip .. */
+                                        continue;
+                                else
+                                        dot_de = 1;
+                        } else {
+                                dot_de = 0;
+                        }
+
+                        if (dot_de && target->name[0] != '.') {
+                                CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
+                                       target->len, target->name,
+                                       namelen, name);
+                                continue;
+                        }
+
+                        if (target->len == namelen &&
+                            !strncmp(target->name, name, target->len))
+                                rc = LS_FIRST_DE + dot_de;
+                        else
+                                rc = LS_NONE_FIRST_DE;
+                        ll_put_page(page);
+                        GOTO(out, rc);
+                }
+                pos = le64_to_cpu(dp->ldp_hash_end);
+                ll_put_page(page);
+                if (pos == DIR_END_OFF) {
+                        /* End of directory reached. */
+                        break;
+                } else if (1 /* chain is exhausted*/) {
+                        /* Normal case: continue to the next page. */
+                        page = ll_get_dir_page(dir, pos, 1, &chain);
+                } else {
+                        /* go into overflow page. */
+                }
+        }
+        EXIT;
+
+out:
+        ll_dir_chain_fini(&chain);
+        return rc;
+}
+
+/* Start statahead thread if this is the first dir entry.
+ * Otherwise if a thread is started already, wait it until it is ahead of me.
+ * Return value: 
+ *  0       -- stat ahead thread process such dentry, for lookup, it miss
+ *  1       -- stat ahead thread process such dentry, for lookup, it hit
+ *  -EEXIST -- stat ahead thread started, and this is the first dentry
+ *  -EBADFD -- statahead thread exit and not dentry available
+ *  others  -- error */
+int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
+{
+        struct ll_sb_info        *sbi = ll_i2sbi(dir);
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_statahead_info *sai = lli->lli_sai;
+        struct ll_sa_thread_args  sta;
+        struct l_wait_info        lwi = { 0 };
+        int                       rc;
+        ENTRY;
+
+        LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
+
+        if (sai) {
+                if (unlikely(sai->sai_thread.t_flags & SVC_STOPPED &&
+                             list_empty(&sai->sai_entries)))
+                        RETURN(-EBADFD);
+
+                if ((*dentryp)->d_name.name[0] == '.') {
+                        if (likely(sai->sai_ls_all ||
+                            sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
+                                /* Hidden dentry is the first one, or statahead
+                                 * thread does not skip so many hidden dentries
+                                 * before "sai_ls_all" enabled as below. */
+                        } else {
+                                if (!sai->sai_ls_all)
+                                        /* It maybe because hidden dentry is not
+                                         * the first one, "sai_ls_all" was not
+                                         * set, then "ls -al" missed. Enable
+                                         * "sai_ls_all" for such case. */
+                                        sai->sai_ls_all = 1;
+
+                                /* Such "getattr" has been skipped before
+                                 * "sai_ls_all" enabled as above. */
+                                sai->sai_miss_hidden++;
+                                RETURN(-ENOENT);
+                        }
+                }
+
+                if (ll_sai_entry_stated(sai)) {
+                        sbi->ll_sa_cached++;
+                } else {
+                        sbi->ll_sa_blocked++;
+                        /* thread started already, avoid double-stat */
+                        l_wait_event(sai->sai_waitq,
+                                     ll_sai_entry_stated(sai) ||
+                                     sai->sai_thread.t_flags & SVC_STOPPED,
+                                     &lwi);
+                }
+
+                if (lookup) {
+                        struct dentry *result;
+
+                        result = d_lookup((*dentryp)->d_parent,
+                                          &(*dentryp)->d_name);
+                        if (result) {
+                                LASSERT(result != *dentryp);
+                                dput(*dentryp);
+                                *dentryp = result;
+                                RETURN(1);
+                        }
+                }
+                /* do nothing for revalidate */
+                RETURN(0);
+        }
+
+         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ 
+        LASSERT(lli->lli_sai == NULL);
+
+        rc = is_first_dirent(dir, *dentryp);
+        if (rc == LS_NONE_FIRST_DE) {
+                /* It is not "ls -{a}l" operation, no need statahead for it */
+                spin_lock(&lli->lli_lock);
+                lli->lli_opendir_key = NULL;
+                lli->lli_opendir_pid = 0;
+                spin_unlock(&lli->lli_lock);
+                RETURN(-EBADF);
+        }
+
+        sai = ll_sai_alloc();
+        if (sai == NULL)
+                RETURN(-ENOMEM);
+
+        sai->sai_inode  = igrab(dir);
+        sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
+
+        sta.sta_parent = (*dentryp)->d_parent;
+        sta.sta_pid    = cfs_curproc_pid();
+
+        lli->lli_sai = sai;
+        rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
+        if (rc < 0) {
+                CERROR("can't start ll_sa thread, rc: %d\n", rc);
+                sai->sai_thread.t_flags = SVC_STOPPED;
+                ll_sai_put(sai);
+                LASSERT(lli->lli_sai == NULL);
+                RETURN(rc);
+        }
+
+        l_wait_event(sai->sai_thread.t_ctl_waitq, 
+                     sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED),
+                     &lwi);
+
+        /* We don't stat-ahead for the first dirent since we are already in
+         * lookup, and -EEXIST also indicates that this is the first dirent. */
+        RETURN(-EEXIST);
+}
+
+/* update hit/miss count */
+void ll_statahead_exit(struct dentry *dentry, int result)
+{
+        struct dentry         *parent = dentry->d_parent;
+        struct ll_inode_info  *lli = ll_i2info(parent->d_inode);
+        struct ll_sb_info     *sbi = ll_i2sbi(parent->d_inode);
+        struct ll_dentry_data *ldd = ll_d2d(dentry);
+
+        if (lli->lli_opendir_pid != cfs_curproc_pid())
+                return;
+
+        if (lli->lli_sai) {
+                struct ll_statahead_info *sai = lli->lli_sai;
+
+                if (result == 1) {
+                        sbi->ll_sa_hit++;
+                        sai->sai_hit++;
+                        sai->sai_consecutive_miss = 0;
+                        sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
+                } else {
+                        sbi->ll_sa_miss++;
+                        sai->sai_miss++;
+                        sai->sai_consecutive_miss++;
+                        if (sa_low_hit(sai) &&
+                            sai->sai_thread.t_flags & SVC_RUNNING) {
+                                sbi->ll_sa_wrong++;
+                                CDEBUG(D_READA, "statahead for dir %.*s hit "
+                                       "ratio too low: hit/miss %u/%u, "
+                                       "sent/replied %u/%u. stopping statahead "
+                                       "thread: pid %d\n",
+                                       parent->d_name.len, parent->d_name.name,
+                                       sai->sai_hit, sai->sai_miss,
+                                       sai->sai_sent, sai->sai_replied,
+                                       cfs_curproc_pid());
+                                spin_lock(&lli->lli_lock);
+                                if (!(sai->sai_thread.t_flags & SVC_STOPPED))
+                                        sai->sai_thread.t_flags = SVC_STOPPING;
+                                spin_unlock(&lli->lli_lock);
+                        }
+                }
+
+                cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
+                ll_sai_entry_put(sai);
+
+                if (likely(ldd != NULL))
+                        ldd->lld_sa_generation = sai->sai_generation;
+        }
+}
index 933834f..ffbec6e 100644 (file)
@@ -309,6 +309,8 @@ int ll_getxattr_common(struct inode *inode, const char *name,
                 posix_acl_release(acl);
                 RETURN(rc);
         }
+        if (xattr_type == XATTR_ACL_DEFAULT_T && !S_ISDIR(inode->i_mode))
+                RETURN(-ENODATA);
 #endif
 
 do_getxattr:
index f87d2f5..fe3ffe7 100644 (file)
@@ -1009,8 +1009,7 @@ release_lock:
                         body->valid = OBD_MD_FLSIZE;
                 }
                 if (master_valid == 0) {
-                        memcpy(&oit->d.lustre.it_lock_handle,
-                               &master_lockh, sizeof(master_lockh));
+                        oit->d.lustre.it_lock_handle = master_lockh.cookie;
                         oit->d.lustre.it_lock_mode = master_lock_mode;
                 }
                 rc = 0;
index fed558f..1374c1f 100644 (file)
@@ -1061,7 +1061,7 @@ static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
                         GOTO(out, rc = -EINVAL);
 
                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
-               rc = lmv_add_target(obd, &tgt_uuid);
+                rc = lmv_add_target(obd, &tgt_uuid);
                 GOTO(out, rc);
         default: {
                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
@@ -1660,8 +1660,8 @@ lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
         ENTRY;
 
         rc = lmv_check_connect(obd);
-       if (rc)
-               RETURN(rc);
+        if (rc)
+                RETURN(rc);
 
 repeat:
         ++loop;
@@ -1755,7 +1755,7 @@ static int lmv_early_cancel(struct lmv_obd *lmv, struct obd_export *tgt_exp,
 
         if (!fid_is_sane(fid))
                 RETURN(0);
-        
+
         if (fid_exp == NULL)
                 fid_exp = lmv_find_export(lmv, fid);
 
@@ -1796,7 +1796,7 @@ static int lmv_early_cancel_stripes(struct obd_export *exp,
                 ldlm_policy_data_t policy = {{0}};
                 struct lu_fid *st_fid;
                 int i;
-                
+
                 policy.l_inodebits.bits = bits;
                 for (i = 0; i < obj->lo_objcount; i++) {
                         st_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
@@ -1839,8 +1839,8 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
         ENTRY;
 
         rc = lmv_check_connect(obd);
-       if (rc)
-               RETURN(rc);
+        if (rc)
+                RETURN(rc);
 
 repeat:
         ++loop;
@@ -1929,8 +1929,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                newlen, new, PFID(&op_data->op_fid2));
 
         rc = lmv_check_connect(obd);
-       if (rc)
-               RETURN(rc);
+        if (rc)
+                RETURN(rc);
 
         if (oldlen == 0) {
                 /*
@@ -2070,8 +2070,8 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
         ENTRY;
 
         rc = lmv_check_connect(obd);
-       if (rc)
-               RETURN(rc);
+        if (rc)
+                RETURN(rc);
 
         obj = lmv_obj_grab(obd, &op_data->op_fid1);
 
@@ -2128,8 +2128,8 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
         ENTRY;
 
         rc = lmv_check_connect(obd);
-       if (rc)
-               RETURN(rc);
+        if (rc)
+                RETURN(rc);
 
         tgt_exp = lmv_find_export(lmv, fid);
         if (IS_ERR(tgt_exp))
@@ -2230,8 +2230,8 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
         offset = offset64;
 
         rc = lmv_check_connect(obd);
-       if (rc)
-               RETURN(rc);
+        if (rc)
+                RETURN(rc);
 
         CDEBUG(D_INFO, "READPAGE at %llx from "DFID"\n", offset, PFID(&rid));
 
@@ -2408,9 +2408,9 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
         int rc, loop = 0;
         ENTRY;
 
-       rc = lmv_check_connect(obd);
-       if (rc)
-               RETURN(rc);
+        rc = lmv_check_connect(obd);
+        if (rc)
+                RETURN(rc);
 
         if (op_data->op_namelen == 0 && op_data->op_mea1 != NULL) {
                 /* mds asks to remove slave objects */
@@ -2902,6 +2902,54 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
         RETURN(rc);
 }
 
+int lmv_intent_getattr_async(struct obd_export *exp,
+                             struct md_enqueue_info *minfo,
+                             struct ldlm_enqueue_info *einfo)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd    *lmv = &obd->u.lmv;
+        struct obd_export *tgt_exp;
+        int rc;
+        ENTRY;
+
+        rc = lmv_check_connect(obd);
+        if (rc)
+                RETURN(rc);
+
+        if (fid_is_zero(&minfo->mi_data.op_fid2))
+                tgt_exp = lmv_find_export(lmv, &minfo->mi_data.op_fid1);
+        else
+                tgt_exp = lmv_find_export(lmv, &minfo->mi_data.op_fid2);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+
+        rc = md_intent_getattr_async(tgt_exp, minfo, einfo);
+        RETURN(rc);
+}
+
+int lmv_revalidate_lock(struct obd_export *exp,
+                        struct lookup_intent *it,
+                        struct lu_fid *fid)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd    *lmv = &obd->u.lmv;
+        struct obd_export *tgt_exp;
+        int rc;
+        ENTRY;
+
+        rc = lmv_check_connect(obd);
+        if (rc)
+                RETURN(rc);
+
+        tgt_exp = lmv_find_export(lmv, fid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
+
+        rc = md_revalidate_lock(tgt_exp, it, fid);
+        RETURN(rc);
+}
+
+
 struct obd_ops lmv_obd_ops = {
         .o_owner                = THIS_MODULE,
         .o_setup                = lmv_setup,
@@ -2948,8 +2996,10 @@ struct md_ops lmv_md_ops = {
         .m_free_lustre_md       = lmv_free_lustre_md,
         .m_set_open_replay_data = lmv_set_open_replay_data,
         .m_clear_open_replay_data = lmv_clear_open_replay_data,
+        .m_renew_capa           = lmv_renew_capa,
         .m_get_remote_perm      = lmv_get_remote_perm,
-        .m_renew_capa           = lmv_renew_capa
+        .m_intent_getattr_async = lmv_intent_getattr_async,
+        .m_revalidate_lock      = lmv_revalidate_lock
 };
 
 int __init lmv_init(void)
index bcf08ba..5483a6d 100644 (file)
@@ -100,7 +100,7 @@ int mdc_intent_lock(struct obd_export *exp,
                     ldlm_blocking_callback cb_blocking, int extra_lock_flags);
 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 struct lookup_intent *it, struct md_op_data *op_data,
-                struct lustre_handle *lockh, void *lmm, int lmmlen,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
                 int extra_lock_flags);
 
 int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
@@ -158,6 +158,14 @@ static inline void mdc_set_capa_size(struct ptlrpc_request *req,
                 ;
 }
 
+int mdc_revalidate_lock(struct obd_export *exp,
+                        struct lookup_intent *it,
+                        struct lu_fid *fid);
+
+int mdc_intent_getattr_async(struct obd_export *exp,
+                             struct md_enqueue_info *minfo,
+                             struct ldlm_enqueue_info *einfo);
+
 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
                            const struct lu_fid *fid, ldlm_type_t type,
                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
index 818a621..a6bc50f 100644 (file)
@@ -458,6 +458,7 @@ void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags,
         if (op_data->op_name) {
                 char *tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 LOGL0(op_data->op_name, op_data->op_namelen, tmp);
+
         }
 }
 
index b18f642..fdf6fe9 100644 (file)
@@ -62,20 +62,6 @@ void it_clear_disposition(struct lookup_intent *it, int flag)
 }
 EXPORT_SYMBOL(it_clear_disposition);
 
-static int it_to_lock_mode(struct lookup_intent *it)
-{
-        ENTRY;
-
-        /* CREAT needs to be tested before open (both could be set) */
-        if (it->it_op & IT_CREAT)
-                return LCK_PW;
-        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
-                return LCK_PR;
-
-        LBUG();
-        RETURN(-EINVAL);
-}
-
 int it_open_error(int phase, struct lookup_intent *it)
 {
         if (it_disposition(it, DISP_OPEN_OPEN)) {
@@ -151,13 +137,11 @@ ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
                            struct lustre_handle *lockh)
 {
-        struct ldlm_res_id res_id =
-                { .name = {fid_seq(fid),
-                           fid_oid(fid),
-                           fid_ver(fid)} };
+        struct ldlm_res_id res_id;
         ldlm_mode_t rc;
         ENTRY;
 
+        fid_build_reg_res_name(fid, &res_id);
         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
                              &res_id, type, policy, mode, lockh);
         RETURN(rc);
@@ -168,15 +152,13 @@ int mdc_cancel_unused(struct obd_export *exp,
                       ldlm_policy_data_t *policy,
                       ldlm_mode_t mode, int flags, void *opaque)
 {
-        struct ldlm_res_id res_id =
-                { .name = {fid_seq(fid),
-                           fid_oid(fid),
-                           fid_ver(fid)} };
+        struct ldlm_res_id res_id;
         struct obd_device *obd = class_exp2obd(exp);
         int rc;
 
         ENTRY;
 
+        fid_build_reg_res_name(fid, &res_id);
         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
                                              policy, mode, flags, opaque);
         RETURN(rc);
@@ -186,13 +168,10 @@ int mdc_change_cbdata(struct obd_export *exp,
                       const struct lu_fid *fid,
                       ldlm_iterator_t it, void *data)
 {
-        struct ldlm_res_id res_id = { .name = {0} };
+        struct ldlm_res_id res_id;
         ENTRY;
 
-        res_id.name[0] = fid_seq(fid);
-        res_id.name[1] = fid_oid(fid);
-        res_id.name[2] = fid_ver(fid);
-
+        fid_build_reg_res_name(fid, &res_id);
         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
                               &res_id, it, data);
 
@@ -226,7 +205,7 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
  * but this is incredibly unlikely, and questionable whether the client
  * could do MDS recovery under OOM anyways... */
 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
-                               struct mdt_body *body)
+                                struct mdt_body *body)
 {
         int     rc;
 
@@ -380,7 +359,7 @@ static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
 
 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
                                                       struct lookup_intent *it,
-                                                     struct md_op_data *op_data)
+                                                      struct md_op_data *op_data)
 {
         struct ptlrpc_request *req;
         struct obd_device     *obddev = class_exp2obd(exp);
@@ -444,69 +423,19 @@ static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
         RETURN(req);
 }
 
-/* We always reserve enough space in the reply packet for a stripe MD, because
- * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
-                struct lookup_intent *it, struct md_op_data *op_data,
-                struct lustre_handle *lockh, void *lmm, int lmmsize,
-                int extra_lock_flags)
+static int mdc_finish_enqueue(struct obd_export *exp,
+                              struct ptlrpc_request *req,
+                              struct ldlm_enqueue_info *einfo,
+                              struct lookup_intent *it,
+                              struct lustre_handle *lockh,
+                              int rc)
 {
-        struct obd_device     *obddev = class_exp2obd(exp);
-        struct ptlrpc_request *req;
-        struct req_capsule    *pill;
-        struct ldlm_request   *lockreq;
-        struct ldlm_reply     *lockrep;
-        int                    flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
-        int                    rc;
-        struct ldlm_res_id res_id =
-                { .name = {fid_seq(&op_data->op_fid1),
-                           fid_oid(&op_data->op_fid1),
-                           fid_ver(&op_data->op_fid1)} };
-        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
+        struct req_capsule  *pill = &req->rq_pill;
+        struct ldlm_request *lockreq;
+        struct ldlm_reply   *lockrep;
         ENTRY;
 
-        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
-
-        if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
-                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-
-        if (it->it_op & IT_OPEN) {
-                int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
-                                              op_data->op_data);
-
-                req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
-                                           einfo->ei_cbdata);
-                if (!joinfile) {
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                        einfo->ei_cbdata = NULL;
-                        lmm = NULL;
-                } else
-                        it->it_flags &= ~O_JOIN_FILE;
-        } else if (it->it_op & IT_UNLINK)
-                req = mdc_intent_unlink_pack(exp, it, op_data);
-        else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
-                req = mdc_intent_getattr_pack(exp, it, op_data);
-        else if (it->it_op == IT_READDIR)
-                req = ldlm_enqueue_pack(exp);
-        else {
-                LBUG();
-                RETURN(-EINVAL);
-        }
-
-        if (IS_ERR(req))
-                RETURN(PTR_ERR(req));
-        pill = &req->rq_pill;
-
-        /* It is important to obtain rpc_lock first (if applicable), so that
-         * threads that are serialised with rpc_lock are not polluting our
-         * rpcs in flight counter */
-        mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
-        mdc_enter_request(&obddev->u.cli);
-        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
-                              0, NULL, lockh, 0);
-        mdc_exit_request(&obddev->u.cli);
-        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
-
+        LASSERT(rc >= 0);
         /* Similarly, if we're going to replay this request, we don't want to
          * actually get a lock, just perform the intent. */
         if (req->rq_transno || req->rq_replay) {
@@ -518,12 +447,6 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 einfo->ei_mode = 0;
                 memset(lockh, 0, sizeof(*lockh));
                 rc = 0;
-        } else if (rc != 0) {
-                CERROR("ldlm_cli_enqueue: %d\n", rc);
-                LASSERTF(rc < 0, "rc %d\n", rc);
-                mdc_clear_replay_flag(req, rc);
-                ptlrpc_req_finished(req);
-                RETURN(rc);
         } else { /* rc = 0 */
                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
                 LASSERT(lock);
@@ -597,6 +520,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                                 RETURN(-EPROTO);
 
                         if (body->valid & OBD_MD_FLMODEASIZE) {
+                                struct obd_device *obddev = class_exp2obd(exp);
+
                                 if (obddev->u.cli.cl_max_mds_easize <
                                     body->max_mdsize) {
                                         obddev->u.cli.cl_max_mds_easize =
@@ -623,6 +548,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                          * (for example error one).
                          */
                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
+                                void *lmm;
                                 if (req_capsule_get_size(pill, &RMF_EADATA,
                                                          RCL_CLIENT) <
                                     body->eadatasize) {
@@ -671,6 +597,193 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         RETURN(rc);
 }
+
+/* We always reserve enough space in the reply packet for a stripe MD, because
+ * we don't know in advance the file type. */
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct md_op_data *op_data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+                int extra_lock_flags)
+{
+        struct obd_device     *obddev = class_exp2obd(exp);
+        struct ptlrpc_request *req;
+        struct req_capsule    *pill;
+        int                    flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+        int                    rc;
+        struct ldlm_res_id res_id;
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
+        ENTRY;
+
+        LASSERTF(einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type);
+
+        fid_build_reg_res_name(&op_data->op_fid1, &res_id);
+
+        if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+
+        if (it->it_op & IT_OPEN) {
+                int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
+                                              op_data->op_data);
+
+                req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
+                                           einfo->ei_cbdata);
+                if (!joinfile) {
+                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                        einfo->ei_cbdata = NULL;
+                        lmm = NULL;
+                } else
+                        it->it_flags &= ~O_JOIN_FILE;
+        } else if (it->it_op & IT_UNLINK)
+                req = mdc_intent_unlink_pack(exp, it, op_data);
+        else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
+                req = mdc_intent_getattr_pack(exp, it, op_data);
+        else if (it->it_op == IT_READDIR)
+                req = ldlm_enqueue_pack(exp);
+        else {
+                LBUG();
+                RETURN(-EINVAL);
+        }
+
+        if (IS_ERR(req))
+                RETURN(PTR_ERR(req));
+        pill = &req->rq_pill;
+
+        /* It is important to obtain rpc_lock first (if applicable), so that
+         * threads that are serialised with rpc_lock are not polluting our
+         * rpcs in flight counter */
+        mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+        mdc_enter_request(&obddev->u.cli);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
+                              0, NULL, lockh, 0);
+        mdc_exit_request(&obddev->u.cli);
+        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+        if (rc < 0) {
+                CERROR("ldlm_cli_enqueue: %d\n", rc);
+                mdc_clear_replay_flag(req, rc);
+                ptlrpc_req_finished(req);
+                RETURN(rc);
+        }
+        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+
+        RETURN(rc);
+}
+
+static int mdc_finish_intent_lock(struct obd_export *exp,
+                                  struct ptlrpc_request *request,
+                                  struct md_op_data *op_data,
+                                  struct lookup_intent *it,
+                                  struct lustre_handle *lockh)
+{
+        struct lustre_handle old_lock;
+        struct mdt_body *mdt_body;
+        struct ldlm_lock *lock;
+        int rc;
+
+
+        LASSERT(request != NULL);
+        LASSERT(request != LP_POISON);
+        LASSERT(request->rq_repmsg != LP_POISON);
+
+        if (!it_disposition(it, DISP_IT_EXECD)) {
+                /* The server failed before it even started executing the
+                 * intent, i.e. because it couldn't unpack the request. */
+                LASSERT(it->d.lustre.it_status != 0);
+                RETURN(it->d.lustre.it_status);
+        }
+        rc = it_open_error(DISP_IT_EXECD, it);
+        if (rc)
+                RETURN(rc);
+
+        mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
+        LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
+
+        /* If we were revalidating a fid/name pair, mark the intent in
+         * case we fail and get called again from lookup */
+        if (fid_is_sane(&op_data->op_fid2) &&
+            it->it_flags & O_CHECK_STALE &&
+            it->it_op != IT_GETATTR) {
+                it_set_disposition(it, DISP_ENQ_COMPLETE);
+
+                /* Also: did we find the same inode? */
+                /* sever can return one of two fids:
+                 * op_fid2 - new allocated fid - if file is created.
+                 * op_fid3 - existent fid - if file only open.
+                 * op_fid3 is saved in lmv_intent_open */
+                if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
+                    (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
+                        CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
+                               "\n", PFID(&op_data->op_fid2),
+                               PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
+                        RETURN(-ESTALE);
+                }
+        }
+
+        rc = it_open_error(DISP_LOOKUP_EXECD, it);
+        if (rc)
+                RETURN(rc);
+
+        /* keep requests around for the multiple phases of the call
+         * this shows the DISP_XX must guarantee we make it into the call
+         */
+        if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
+            it_disposition(it, DISP_OPEN_CREATE) &&
+            !it_open_error(DISP_OPEN_CREATE, it)) {
+                it_set_disposition(it, DISP_ENQ_CREATE_REF);
+                ptlrpc_request_addref(request); /* balanced in ll_create_node */
+        }
+        if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
+            it_disposition(it, DISP_OPEN_OPEN) &&
+            !it_open_error(DISP_OPEN_OPEN, it)) {
+                it_set_disposition(it, DISP_ENQ_OPEN_REF);
+                ptlrpc_request_addref(request); /* balanced in ll_file_open */
+                /* BUG 11546 - eviction in the middle of open rpc processing */
+                OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
+        }
+
+        if (it->it_op & IT_CREAT) {
+                /* XXX this belongs in ll_create_it */
+        } else if (it->it_op == IT_OPEN) {
+                LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
+        } else {
+                LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
+        }
+
+        /* If we already have a matching lock, then cancel the new
+         * one.  We have to set the data here instead of in
+         * mdc_enqueue, because we need to use the child's inode as
+         * the l_ast_data to match, and that's not available until
+         * intent_finish has performed the iget().) */
+        lock = ldlm_handle2lock(lockh);
+        if (lock) {
+                ldlm_policy_data_t policy = lock->l_policy_data;
+                LDLM_DEBUG(lock, "matching against this");
+
+                LASSERTF(fid_res_name_eq(&mdt_body->fid1,
+                                         &lock->l_resource->lr_name),
+                         "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
+                         (unsigned long)lock->l_resource->lr_name.name[0],
+                         (unsigned long)lock->l_resource->lr_name.name[1],
+                         (unsigned long)lock->l_resource->lr_name.name[2],
+                         (unsigned long)fid_seq(&mdt_body->fid1),
+                         (unsigned long)fid_oid(&mdt_body->fid1),
+                         (unsigned long)fid_ver(&mdt_body->fid1));
+                LDLM_LOCK_PUT(lock);
+
+                memcpy(&old_lock, lockh, sizeof(*lockh));
+                if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
+                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
+                        ldlm_lock_decref_and_cancel(lockh,
+                                                    it->d.lustre.it_lock_mode);
+                        memcpy(lockh, &old_lock, sizeof(old_lock));
+                        it->d.lustre.it_lock_handle = lockh->cookie;
+                }
+        }
+        CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
+               op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
+               it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
+        RETURN(rc);
+}
+
 /*
  * This long block is all about fixing up the lock and request state
  * so that it is correct as of the moment _before_ the operation was
@@ -704,11 +817,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                     ldlm_blocking_callback cb_blocking,
                     int extra_lock_flags)
 {
-        struct ptlrpc_request *request;
-        struct lustre_handle old_lock;
         struct lustre_handle lockh;
-        struct mdt_body *mdt_body;
-        struct ldlm_lock *lock;
         int rc = 0;
         ENTRY;
         LASSERT(it);
@@ -742,8 +851,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                                       &op_data->op_fid2, LDLM_IBITS, &policy,
                                       LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
                 if (mode) {
-                        memcpy(&it->d.lustre.it_lock_handle, &lockh,
-                               sizeof(lockh));
+                        it->d.lustre.it_lock_handle = lockh.cookie;
                         it->d.lustre.it_lock_mode = mode;
                 }
 
@@ -778,7 +886,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                                  lmm, lmmsize, extra_lock_flags);
                 if (rc < 0)
                         RETURN(rc);
-                memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
+                it->d.lustre.it_lock_handle = lockh.cookie;
         } else if (!fid_is_sane(&op_data->op_fid2) ||
                    !(it->it_flags & O_CHECK_STALE)) {
                 /* DISP_ENQ_COMPLETE set means there is extra reference on
@@ -787,109 +895,125 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                  * lookup, so we clear DISP_ENQ_COMPLETE */
                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
         }
-        request = *reqp = it->d.lustre.it_data;
-        LASSERT(request != NULL);
-        LASSERT(request != LP_POISON);
-        LASSERT(request->rq_repmsg != LP_POISON);
+        *reqp = it->d.lustre.it_data;
+        rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
+        RETURN(rc);
+}
 
-        if (!it_disposition(it, DISP_IT_EXECD)) {
-                /* The server failed before it even started executing the
-                 * intent, i.e. because it couldn't unpack the request. */
-                LASSERT(it->d.lustre.it_status != 0);
-                RETURN(it->d.lustre.it_status);
-        }
-        rc = it_open_error(DISP_IT_EXECD, it);
-        if (rc)
-                RETURN(rc);
+static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
+                                              void *unused, int rc)
+{
+        struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
+        struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
+        struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
+        struct lookup_intent     *it;
+        struct lustre_handle     *lockh;
+        struct obd_device        *obddev;
+        int                       flags = LDLM_FL_HAS_INTENT;
+        ENTRY;
 
-        mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
-        LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
+        it    = &minfo->mi_it;
+        lockh = &minfo->mi_lockh;
 
-        /* If we were revalidating a fid/name pair, mark the intent in
-         * case we fail and get called again from lookup */
-        if (fid_is_sane(&op_data->op_fid2) &&
-            (it->it_flags & O_CHECK_STALE) &&
-            it->it_op != IT_GETATTR) {
-                it_set_disposition(it, DISP_ENQ_COMPLETE);
+        obddev = class_exp2obd(exp);
 
-                /* Also: did we find the same inode? */
-                /* sever can return one of two fids:
-                 * op_fid2 - new allocated fid - if file is created.
-                 * op_fid3 - existent fid - if file only open.
-                 * op_fid3 is saved in lmv_intent_open */
-                if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
-                    (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
-                        CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
-                               "\n", PFID(&op_data->op_fid2),
-                               PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
-                        RETURN(-ESTALE);
-                }
+        mdc_exit_request(&obddev->u.cli);
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
+                rc = -ETIMEDOUT;
+
+        rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
+                                   &flags, NULL, 0, NULL, lockh, rc);
+        if (rc < 0) {
+                CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
+                mdc_clear_replay_flag(req, rc);
+                GOTO(out, rc);
         }
 
-        rc = it_open_error(DISP_LOOKUP_EXECD, it);
+        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
         if (rc)
-                RETURN(rc);
+                GOTO(out, rc);
 
-        /* keep requests around for the multiple phases of the call
-         * this shows the DISP_XX must guarantee we make it into the call
-         */
-        if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
-            it_disposition(it, DISP_OPEN_CREATE) &&
-            !it_open_error(DISP_OPEN_CREATE, it)) {
-                it_set_disposition(it, DISP_ENQ_CREATE_REF);
-                ptlrpc_request_addref(request); /* balanced in ll_create_node */
-        }
-        if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
-            it_disposition(it, DISP_OPEN_OPEN) &&
-            !it_open_error(DISP_OPEN_OPEN, it)) {
-                it_set_disposition(it, DISP_ENQ_OPEN_REF);
-                ptlrpc_request_addref(request); /* balanced in ll_file_open */
-                /* BUG 11546 - eviction in the middle of open rpc processing */
-                OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
-        }
+        it->d.lustre.it_lock_handle = lockh->cookie;
 
-        if (it->it_op & IT_CREAT) {
-                /* XXX this belongs in ll_create_it */
-        } else if (it->it_op == IT_OPEN) {
-                LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
-        } else {
-                LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
+        rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
+        EXIT;
+
+out:
+        OBD_FREE_PTR(einfo);
+        minfo->mi_cb(req, minfo, rc);
+        return 0;
+}
+
+int mdc_intent_getattr_async(struct obd_export *exp,
+                             struct md_enqueue_info *minfo,
+                             struct ldlm_enqueue_info *einfo)
+{
+        struct md_op_data       *op_data = &minfo->mi_data;
+        struct lookup_intent    *it = &minfo->mi_it;
+        struct ptlrpc_request   *req;
+        struct obd_device       *obddev = class_exp2obd(exp);
+        struct ldlm_res_id       res_id;
+        ldlm_policy_data_t       policy = {
+                                        .l_inodebits = { MDS_INODELOCK_LOOKUP }
+                                 };
+        int                      rc;
+        int                      flags = LDLM_FL_HAS_INTENT;
+        ENTRY;
+
+        CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
+               op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+               ldlm_it2str(it->it_op), it->it_flags);
+
+        fid_build_reg_res_name(&op_data->op_fid1, &res_id);
+        req = mdc_intent_getattr_pack(exp, it, op_data);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        mdc_enter_request(&obddev->u.cli);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
+                              0, NULL, &minfo->mi_lockh, 1);
+        if (rc < 0) {
+                mdc_exit_request(&obddev->u.cli);
+                RETURN(rc);
         }
 
-        /* If we already have a matching lock, then cancel the new
-         * one.  We have to set the data here instead of in
-         * mdc_enqueue, because we need to use the child's inode as
-         * the l_ast_data to match, and that's not available until
-         * intent_finish has performed the iget().) */
-        lock = ldlm_handle2lock(&lockh);
-        if (lock) {
-                ldlm_policy_data_t policy = lock->l_policy_data;
-                LDLM_DEBUG(lock, "matching against this");
+        req->rq_async_args.pointer_arg[0] = exp;
+        req->rq_async_args.pointer_arg[1] = minfo;
+        req->rq_async_args.pointer_arg[2] = einfo;
+        req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
+        ptlrpcd_add_req(req);
 
-                LASSERTF(fid_res_name_eq(&mdt_body->fid1,
-                                         &lock->l_resource->lr_name),
-                         "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
-                         (unsigned long)lock->l_resource->lr_name.name[0],
-                         (unsigned long)lock->l_resource->lr_name.name[1],
-                         (unsigned long)lock->l_resource->lr_name.name[2],
-                         (unsigned long)fid_seq(&mdt_body->fid1),
-                         (unsigned long)fid_oid(&mdt_body->fid1),
-                         (unsigned long)fid_ver(&mdt_body->fid1));
-                LDLM_LOCK_PUT(lock);
+        RETURN(0);
+}
 
-                memcpy(&old_lock, &lockh, sizeof(lockh));
-                if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
-                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
-                        ldlm_lock_decref_and_cancel(&lockh,
-                                                    it->d.lustre.it_lock_mode);
-                        memcpy(&lockh, &old_lock, sizeof(old_lock));
-                        memcpy(&it->d.lustre.it_lock_handle, &lockh,
-                               sizeof(lockh));
-                }
+int mdc_revalidate_lock(struct obd_export *exp,
+                        struct lookup_intent *it,
+                        struct lu_fid *fid)
+{
+        /* We could just return 1 immediately, but since we should only
+         * be called in revalidate_it if we already have a lock, let's
+         * verify that. */
+        struct ldlm_res_id res_id;
+        struct lustre_handle lockh;
+        ldlm_policy_data_t policy;
+        ldlm_mode_t mode;
+        ENTRY;
+
+        fid_build_reg_res_name(fid, &res_id);
+        /* As not all attributes are kept under update lock, e.g. 
+           owner/group/acls are under lookup lock, we need both 
+           ibits for GETATTR. */
+        policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
+                MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
+                MDS_INODELOCK_LOOKUP;
+
+        mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                               LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+                               &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
+        if (mode) {
+                it->d.lustre.it_lock_handle = lockh.cookie;
+                it->d.lustre.it_lock_mode = mode;
         }
-        CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
-               op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
-               it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
 
-        RETURN(rc);
+        RETURN(!!mode);
 }
index b58bdcf..7148cf9 100644 (file)
@@ -136,11 +136,11 @@ int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
 /*
  * This function now is known to always saying that it will receive 4 buffers
  * from server. Even for cases when acl_size and md_size is zero, RPC header
- * willcontain 4 fields and RPC itself will contain zero size fields. This is
+ * will contain 4 fields and RPC itself will contain zero size fields. This is
  * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
  * and thus zero, it shirinks it, making zero size. The same story about
  * md_size. And this is course of problem when client waits for smaller number
- * of fields. This issue will be fixed later when client gets awar of RPC
+ * of fields. This issue will be fixed later when client gets aware of RPC
  * layouts.  --umka
  */
 static int mdc_getattr_common(struct obd_export *exp,
@@ -1831,8 +1831,10 @@ struct md_ops mdc_md_ops = {
         .m_free_lustre_md   = mdc_free_lustre_md,
         .m_set_open_replay_data = mdc_set_open_replay_data,
         .m_clear_open_replay_data = mdc_clear_open_replay_data,
+        .m_renew_capa       = mdc_renew_capa,
         .m_get_remote_perm  = mdc_get_remote_perm,
-        .m_renew_capa       = mdc_renew_capa
+        .m_intent_getattr_async = mdc_intent_getattr_async,
+        .m_revalidate_lock      = mdc_revalidate_lock
 };
 
 extern quota_interface_t mdc_quota_interface;
index 293c90d..f57b55c 100644 (file)
@@ -1200,6 +1200,8 @@ int lprocfs_alloc_md_stats(struct obd_device *obd,
         LPROCFS_MD_OP_INIT(num_private_stats, stats, cancel_unused);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, renew_capa);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, get_remote_perm);
+        LPROCFS_MD_OP_INIT(num_private_stats, stats, intent_getattr_async);
+        LPROCFS_MD_OP_INIT(num_private_stats, stats, revalidate_lock);
 
         for (i = num_private_stats; i < num_stats; i++) {
                 if (stats->ls_percpu[0]->lp_cntr[i].lc_name == NULL) {
index 2b567ad..4e7ee54 100644 (file)
@@ -80,6 +80,7 @@ FAIL_ON_ERROR=false
 
 cleanup() {
        echo -n "cln.."
+       pgrep ll_sa > /dev/null && { echo "There are ll_sa thread not exit!"; exit 20; }
        cleanupall ${FORCE} $* || { echo "FAILed to clean up"; exit 20; }
 }
 setup() {
@@ -4785,6 +4786,87 @@ test_121() { #bug #10589
 }
 run_test 121 "read cancel race ========="
 
+test_123a() { # was test 123, statahead(bug 11401)
+        if [ -z "$(grep "processor.*: 1" /proc/cpuinfo)" ]; then
+                log "testing on UP system. Performance may be not as good as expected."
+        fi
+
+        remount_client $MOUNT
+        mkdir -p $DIR/$tdir
+        error=0
+        NUMFREE=`df -i -P $DIR | tail -n 1 | awk '{ print $4 }'`
+        [ $NUMFREE -gt 100000 ] && NUMFREE=100000 || NUMFREE=$((NUMFREE-1000))
+        MULT=10
+        for ((i=1, j=0; i<=$NUMFREE; j=$i, i=$((i * MULT)) )); do
+                createmany -o $DIR/$tdir/$tfile $j $((i - j))
+
+                lctl get_param -n llite.*.statahead_max | grep '[0-9]'
+                cancel_lru_locks mdc
+                cancel_lru_locks osc
+                stime=`date +%s`
+                ls -l $DIR/$tdir > /dev/null
+                etime=`date +%s`
+                delta_sa=$((etime - stime))
+                log "ls $i files with statahead:    $delta_sa sec"
+               lctl get_param -n llite.*.statahead_stats
+
+                max=`lctl get_param -n llite.*.statahead_max | head -n 1`
+                lctl set_param -n llite.*.statahead_max 0
+                lctl get_param llite.*.statahead_max
+                cancel_lru_locks mdc
+                cancel_lru_locks osc
+                stime=`date +%s`
+                ls -l $DIR/$tdir > /dev/null
+                etime=`date +%s`
+                delta=$((etime - stime))
+                log "ls $i files without statahead: $delta sec"
+
+                lctl set_param llite.*.statahead_max=$max
+                if [ $delta_sa -gt $(($delta + 2)) ]; then
+                        log "ls $i files is slower with statahead!"
+                        error=1
+                fi
+
+                [ $delta -gt 20 ] && break
+                [ $delta -gt 8 ] && MULT=$((50 / delta))
+                [ "$SLOW" = "no" -a $delta -gt 3 ] && break
+        done
+        log "ls done"
+
+        stime=`date +%s`
+        rm -r $DIR/$tdir
+        sync
+        etime=`date +%s`
+        delta=$((etime - stime))
+        log "rm -r $DIR/$tdir/: $delta seconds"
+        log "rm done"
+        lctl get_param -n llite.*.statahead_stats
+        # wait for commitment of removal
+        sleep 2
+        [ $error -ne 0 ] && error "statahead is slow!"
+        return 0
+}
+run_test 123a "verify statahead work"
+
+test_123b () { # statahead(bug 15027)
+       mkdir -p $DIR/$tdir
+       createmany -o $DIR/$tdir/$tfile-%d 1000
+       
+        cancel_lru_locks mdc
+        cancel_lru_locks osc
+
+#define OBD_FAIL_MDC_GETATTR_ENQUEUE     0x803
+        sysctl -w lustre.fail_loc=0x80000803
+        ls -lR $DIR/$tdir > /dev/null
+        log "ls done"
+        sysctl -w lustre.fail_loc=0x0
+        lctl get_param -n llite.*.statahead_stats
+        rm -r $DIR/$tdir
+        sync
+
+}
+run_test 123b "not panic with network error in statahead enqueue (bug 15027)"
+
 test_124a() {
        [ -z "`lctl get_param -n mdc.*.connect_flags | grep lru_resize`" ] && \
                skip "no lru resize on server" && return 0