Whamcloud - gitweb
Branch b1_6
authorhuanghua <huanghua>
Tue, 4 Sep 2007 05:29:07 +0000 (05:29 +0000)
committerhuanghua <huanghua>
Tue, 4 Sep 2007 05:29:07 +0000 (05:29 +0000)
b=11401
i=adilger
i=tappro
i=green

client-side metadata stat-ahead during readdir (directory readahead)

16 files changed:
lustre/ChangeLog
lustre/include/lustre_mds.h
lustre/llite/Makefile.in
lustre/llite/dcache.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/namei.c
lustre/llite/statahead.c [new file with mode: 0644]
lustre/llite/xattr.c
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/tests/sanity.sh

index 8d68675..df278f0 100644 (file)
@@ -147,6 +147,11 @@ Description: eliminate client locks in face of contention
 Details    : file contention detection and lockless i/o implementation
              for contended files.
 
+Severity   : enhancement
+Bugzilla   : 11401
+Description: client-side metadata stat-ahead during readdir(directory readahead)
+Details    : perform client-side metadata stat-ahead when the client detects
+             readdir and sequential stat of dir entries therein
 --------------------------------------------------------------------------------
 
 2007-08-27         Cluster File Systems, Inc. <info@clusterfs.com>
index 981e444..0759c5f 100644 (file)
@@ -113,6 +113,8 @@ int mds_reint_rec(struct mds_update_record *r, int offset,
 /* mds/mds_lov.c */
 
 /* mdc/mdc_locks.c */
+struct md_enqueue_info;
+
 int it_disposition(struct lookup_intent *it, int flag);
 void it_set_disposition(struct lookup_intent *it, int flag);
 void it_clear_disposition(struct lookup_intent *it, int flag);
@@ -120,6 +122,9 @@ int it_open_error(int phase, struct lookup_intent *it);
 void mdc_set_lock_data(__u64 *lockh, void *data);
 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
                       ldlm_iterator_t it, void *data);
+int mdc_revalidate_lock(struct obd_export *exp,
+                        struct lookup_intent *it,
+                        struct ll_fid *fid);
 int mdc_intent_lock(struct obd_export *exp,
                     struct mdc_op_data *,
                     void *lmm, int lmmsize,
@@ -130,6 +135,9 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 struct lookup_intent *it, struct mdc_op_data *data,
                 struct lustre_handle *lockh, void *lmm, int lmmlen,
                 int extra_lock_flags);
+int mdc_intent_getattr_async(struct obd_export *exp,
+                             struct md_enqueue_info *minfo,
+                             struct ldlm_enqueue_info *einfo);
 
 /* mdc/mdc_request.c */
 int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp);
@@ -197,6 +205,18 @@ static inline void mdc_pack_fid(struct ll_fid *fid, obd_id ino, __u32 gen,
         fid->f_type = type;
 }
 
+static inline int it_to_lock_mode(struct lookup_intent *it)
+{
+        /* CREAT needs to be tested before open (both could be set) */
+        if (it->it_op & IT_CREAT)
+                return LCK_CW;
+        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
+                return LCK_CR;
+
+        LBUG();
+        return -EINVAL;
+}
+
 /* ioctls for trying requests */
 #define IOC_REQUEST_TYPE                   'f'
 #define IOC_REQUEST_MIN_NR                 30
@@ -209,4 +229,25 @@ static inline void mdc_pack_fid(struct ll_fid *fid, obd_id ino, __u32 gen,
 #define IOC_REQUEST_CLOSE               _IOWR('f', 35, long)
 #define IOC_REQUEST_MAX_NR               35
 
+/* metadata stat-ahead */
+typedef int (* md_enqueue_cb_t)(struct obd_export *exp,
+                                struct ptlrpc_request *req,
+                                struct md_enqueue_info *minfo,
+                                int rc);
+
+struct md_enqueue_info {
+        struct obd_export      *mi_exp;
+        struct mdc_op_data      mi_data;
+        struct lookup_intent    mi_it;
+        struct lustre_handle    mi_lockh;
+        struct dentry          *mi_dentry;
+        md_enqueue_cb_t         mi_cb;
+        void                   *mi_cbdata;
+};
+
+struct mdc_enqueue_args {
+        struct md_enqueue_info   *ma_mi;
+        struct ldlm_enqueue_info *ma_ei;
+};
+
 #endif
index dfa273b..ff06efd 100644 (file)
@@ -1,5 +1,5 @@
 MODULES := lustre
-lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o
+lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o statahead.o
 
 ifeq ($(PATCHLEVEL),4)
 lustre-objs += rw24.o super.o
index 1228695..29418ff 100644 (file)
@@ -333,11 +333,11 @@ void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
 int ll_revalidate_it(struct dentry *de, int lookup_flags,
                      struct lookup_intent *it)
 {
-        int rc;
         struct mdc_op_data op_data;
         struct ptlrpc_request *req = NULL;
         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
         struct obd_export *exp;
+        int first = 0, rc;
 
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
@@ -426,11 +426,16 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
                 }
         }
 
+        if (it->it_op == IT_GETATTR)
+                first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
+
 do_lock:
         it->it_create_mode &= ~current->fs->umask;
 
         rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
                              &req, ll_mdc_blocking_ast, 0);
+        if (it->it_op == IT_GETATTR && !first)
+                ll_statahead_exit(de, rc);
         /* If req is NULL, then mdc_intent_lock only tried to do a lock match;
          * if all was well, it will return 1 if it found locks, 0 otherwise. */
         if (req == NULL && rc >= 0) {
index 2666a04..60e2762 100644 (file)
@@ -27,7 +27,6 @@
  */
 
 #include <linux/fs.h>
-#include <linux/ext2_fs.h>
 #include <linux/pagemap.h>
 #include <linux/mm.h>
 #include <linux/version.h>
@@ -49,8 +48,6 @@
 #include <lustre_dlm.h>
 #include "llite_internal.h"
 
-typedef struct ext2_dir_entry_2 ext2_dirent;
-
 #ifdef HAVE_PG_FS_MISC
 #define PageChecked(page)        test_bit(PG_fs_misc, &(page)->flags)
 #define SetPageChecked(page)     set_bit(PG_fs_misc, &(page)->flags)
@@ -105,18 +102,6 @@ static inline unsigned ext2_chunk_size(struct inode *inode)
         return inode->i_sb->s_blocksize;
 }
 
-static inline void ext2_put_page(struct page *page)
-{
-        kunmap(page);
-        page_cache_release(page);
-}
-
-static inline unsigned long dir_pages(struct inode *inode)
-{
-        return (inode->i_size+CFS_PAGE_SIZE-1) >> CFS_PAGE_SHIFT;
-}
-
-
 static void ext2_check_page(struct inode *dir, struct page *page)
 {
         unsigned chunk_size = ext2_chunk_size(dir);
@@ -205,7 +190,7 @@ fail:
         SetPageError(page);
 }
 
-static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
+struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
 {
         struct ldlm_res_id res_id =
                 { .name = { dir->i_ino, (__u64)dir->i_generation} };
@@ -264,24 +249,6 @@ fail:
         goto out_unlock;
 }
 
-/*
- * p is at least 6 bytes before the end of page
- */
-static inline ext2_dirent *ext2_next_entry(ext2_dirent *p)
-{
-        return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len));
-}
-
-static inline unsigned
-ext2_validate_entry(char *base, unsigned offset, unsigned mask)
-{
-        ext2_dirent *de = (ext2_dirent*)(base + offset);
-        ext2_dirent *p = (ext2_dirent*)(base + (offset&mask));
-        while ((char*)p < (char*)de)
-                p = ext2_next_entry(p);
-        return (char *)p - base;
-}
-
 static unsigned char ext2_filetype_table[EXT2_FT_MAX] = {
         [EXT2_FT_UNKNOWN]       DT_UNKNOWN,
         [EXT2_FT_REG_FILE]      DT_REG,
index 4dd4cd9..b50cfba 100644 (file)
@@ -235,6 +235,9 @@ int ll_file_release(struct inode *inode, struct file *file)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
 
+        if (S_ISDIR(inode->i_mode))
+                ll_stop_statahead(inode);
+
         /* don't do anything for / */
         if (inode->i_sb->s_root == file->f_dentry)
                 RETURN(0);
@@ -262,6 +265,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         struct inode *inode = file->f_dentry->d_inode;
         struct ptlrpc_request *req;
         int rc;
+        ENTRY;
 
         if (!parent)
                 RETURN(-ENOENT);
@@ -385,6 +389,9 @@ int ll_file_open(struct inode *inode, struct file *file)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
                inode->i_generation, inode, file->f_flags);
 
+        if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0)
+                lli->lli_opendir_pid = current->pid;
+
         /* don't do anything for / */
         if (inode->i_sb->s_root == file->f_dentry)
                 RETURN(0);
@@ -397,9 +404,10 @@ int ll_file_open(struct inode *inode, struct file *file)
 #endif
 
         fd = ll_file_data_get();
-        if (fd == NULL)
+        if (fd == NULL) {
+                lli->lli_opendir_pid = 0;
                 RETURN(-ENOMEM);
-
+        }
         if (!it || !it->d.lustre.it_disposition) {
                 /* Convert f_flags into access mode. We cannot use file->f_mode,
                  * because everything but O_ACCMODE mask was stripped from it */
@@ -528,6 +536,7 @@ out_och_free:
                         (*och_usecount)--;
                 }
                 up(&lli->lli_och_sem);
+                lli->lli_opendir_pid = 0;
         }
         return rc;
 }
index d5f3d1f..1282a89 100644 (file)
@@ -5,6 +5,7 @@
 #ifndef LLITE_INTERNAL_H
 #define LLITE_INTERNAL_H
 
+#include <linux/ext2_fs.h>
 #ifdef CONFIG_FS_POSIX_ACL
 # include <linux/fs.h>
 #ifdef HAVE_XATTR_ACL
@@ -110,6 +111,10 @@ struct ll_inode_info {
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         struct inode            lli_vfs_inode;
 #endif
+
+        /* metadata stat-ahead */
+        pid_t                   lli_opendir_pid;
+        struct ll_statahead_info *lli_sai;
 };
 
 /*
@@ -265,9 +270,19 @@ struct ll_sb_info {
         enum stats_track_type     ll_stats_track_type;
         int                       ll_stats_track_id;
         int                       ll_rw_stats_on;
-
         dev_t                     ll_sdev_orig; /* save s_dev before assign for
                                                  * clustred nfs */
+
+        /* metadata stat-ahead */
+        unsigned int              ll_sa_count; /* current statahead RPCs */
+        unsigned int              ll_sa_max;   /* max statahead RPCs */
+        unsigned int              ll_sa_wrong; /* statahead thread stopped for
+                                                * low hit ratio */
+        unsigned int              ll_sa_total; /* statahead thread started
+                                                * count */
+        unsigned long long        ll_sa_blocked; /* ls count waiting for
+                                                  * statahead */
+        unsigned long long        ll_sa_cached;  /* ls count got in cache */
 };
 
 #define LL_DEFAULT_MAX_RW_CHUNK         (32 * 1024 * 1024)
@@ -443,6 +458,38 @@ static void ll_stats_ops_tally(struct ll_sb_info *sbi, int op, int count) {}
 extern struct file_operations ll_dir_operations;
 extern struct inode_operations ll_dir_inode_operations;
 
+struct page *ll_get_dir_page(struct inode *dir, unsigned long n);
+/*
+ * p is at least 6 bytes before the end of page
+ */
+typedef struct ext2_dir_entry_2 ext2_dirent;
+
+static inline ext2_dirent *ext2_next_entry(ext2_dirent *p)
+{
+        return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len));
+}
+
+static inline unsigned
+ext2_validate_entry(char *base, unsigned offset, unsigned mask)
+{
+        ext2_dirent *de = (ext2_dirent*)(base + offset);
+        ext2_dirent *p = (ext2_dirent*)(base + (offset&mask));
+        while ((char*)p < (char*)de)
+                p = ext2_next_entry(p);
+        return (char *)p - base;
+}
+
+static inline void ext2_put_page(struct page *page)
+{
+        kunmap(page);
+        page_cache_release(page);
+}
+
+static inline unsigned long dir_pages(struct inode *inode)
+{
+        return (inode->i_size + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+}
+
 /* llite/namei.c */
 int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir);
 struct inode *ll_iget(struct super_block *sb, ino_t hash,
@@ -458,6 +505,9 @@ int ll_prepare_mdc_op_data(struct mdc_op_data *,
 struct lookup_intent *ll_convert_intent(struct open_intent *oit,
                                         int lookup_flags);
 #endif
+int lookup_it_finish(struct ptlrpc_request *request, int offset,
+                     struct lookup_intent *it, void *data);
+void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
 
 /* llite/rw.c */
 int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to);
@@ -723,4 +773,35 @@ ssize_t ll_getxattr(struct dentry *dentry, const char *name,
 ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size);
 int ll_removexattr(struct dentry *dentry, const char *name);
 
+/* statahead.c */
+
+#define LL_STATAHEAD_MIN  1
+#define LL_STATAHEAD_DEF  32
+#define LL_STATAHEAD_MAX  10000
+
+/* per inode struct, for dir only */
+struct ll_statahead_info {
+        struct inode           *sai_inode;
+        atomic_t                sai_refc;       /* when access this struct, hold
+                                                 * refcount */
+        unsigned int            sai_max;        /* max ahead of lookup */
+        unsigned int            sai_sent;       /* stat requests sent count */
+        unsigned int            sai_replied;    /* stat requests which received
+                                                 * reply */
+        unsigned int            sai_cached;     /* UPDATE lock cached locally
+                                                 * already */
+        unsigned int            sai_hit;        /* hit count */
+        unsigned int            sai_miss;       /* miss count */
+        unsigned int            sai_consecutive_miss; /* consecutive miss */
+        unsigned                sai_ls_all:1;   /* ls -al, do stat-ahead for
+                                                 * hidden entries */
+        struct ptlrpc_thread    sai_thread;     /* stat-ahead thread */
+        struct list_head        sai_entries;    /* stat-ahead entries */
+        unsigned int            sai_entries_nr; /* stat-ahead entries count */
+};
+
+int ll_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup);
+void ll_statahead_exit(struct dentry *dentry, int result);
+void ll_stop_statahead(struct inode *inode);
+
 #endif /* LLITE_INTERNAL_H */
index 0719ac2..8c618c5 100644 (file)
@@ -99,6 +99,9 @@ static struct ll_sb_info *ll_init_sbi(void)
                 spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock);
         }
 
+        /* metadata statahead is enabled by default */
+        sbi->ll_sa_max = LL_STATAHEAD_DEF;
+
         RETURN(sbi);
 }
 
@@ -1143,6 +1146,12 @@ void ll_clear_inode(struct inode *inode)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
 
+        if (S_ISDIR(inode->i_mode)) {
+                /* these should have been cleared in ll_file_release */
+                LASSERT(lli->lli_sai == NULL);
+                LASSERT(lli->lli_opendir_pid == 0);
+        }
+
         ll_inode2fid(&fid, inode);
         clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags);
         mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode);
index b354a13..29ccf15 100644 (file)
@@ -457,6 +457,59 @@ static int ll_wr_contention_time(struct file *file, const char *buffer,
                 count;
 }
 
+static int ll_rd_statahead_count(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return snprintf(page, count, "%u\n", sbi->ll_sa_count);
+}
+
+static int ll_rd_statahead_max(char *page, char **start, off_t off,
+                               int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return snprintf(page, count, "%u\n", sbi->ll_sa_max);
+}
+
+static int ll_wr_statahead_max(struct file *file, const char *buffer,
+                               unsigned long count, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+        int val, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+        if (val >= 0 && val <= LL_STATAHEAD_MAX)
+                sbi->ll_sa_max = val;
+        else
+                CERROR("Bad statahead_max value %d. Valid values are in the "
+                       "range [0, %d]\n", val, LL_STATAHEAD_MAX);
+
+        return count;
+}
+
+static int ll_rd_statahead_stats(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return snprintf(page, count,
+                        "statahead wrong: %u\n"
+                        "statahead total: %u\n"
+                        "ls blocked:      %llu\n"
+                        "ls total:        %llu\n",
+                        sbi->ll_sa_wrong, sbi->ll_sa_total,
+                        sbi->ll_sa_blocked,
+                        sbi->ll_sa_blocked + sbi->ll_sa_cached);
+}
+
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
@@ -479,6 +532,9 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
         { "stats_track_gid",  ll_rd_track_gid, ll_wr_track_gid, 0 },
         { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
+        { "statahead_count", ll_rd_statahead_count, 0, 0 },
+        { "statahead_max",   ll_rd_statahead_max, ll_wr_statahead_max, 0 },
+        { "statahead_stats", ll_rd_statahead_stats, 0, 0 },
         { 0 }
 };
 
index 773f828..c47986b 100644 (file)
@@ -373,7 +373,7 @@ static void ll_d_add(struct dentry *de, struct inode *inode)
  * in ll_revalidate_it.  After revaliadate inode will be have hashed aliases
  * and it triggers BUG_ON in d_instantiate_unique (bug #10954).
  */
-struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
+static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
 {
         struct list_head *tmp;
         struct dentry *dentry;
@@ -442,7 +442,7 @@ struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
         return de;
 }
 
-static int lookup_it_finish(struct ptlrpc_request *request, int offset,
+int lookup_it_finish(struct ptlrpc_request *request, int offset,
                             struct lookup_intent *it, void *data)
 {
         struct it_cb_data *icbd = data;
@@ -530,8 +530,17 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                         RETURN(ERR_PTR(rc));
         }
 
-        icbd.icbd_childp = &dentry;
+        if (it->it_op == IT_GETATTR) {
+                rc = ll_statahead_enter(parent, &dentry, 1);
+                if (rc >= 0) {
+                        ll_statahead_exit(dentry, rc);
+                        if (rc == 1)
+                                RETURN(retval = dentry);
+                }
+        }
+
         icbd.icbd_parent = parent;
+        icbd.icbd_childp = &dentry;
 
         rc = ll_prepare_mdc_op_data(&op_data, parent, NULL, dentry->d_name.name,
                                     dentry->d_name.len, lookup_flags, NULL);
diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c
new file mode 100644 (file)
index 0000000..67af5aa
--- /dev/null
@@ -0,0 +1,860 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2007 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <linux/fs.h>
+#include <linux/sched.h>
+#include <linux/mm.h>
+#include <linux/smp_lock.h>
+#include <linux/highmem.h>
+#include <linux/pagemap.h>
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <obd_support.h>
+#include <lustre_lite.h>
+#include <lustre_dlm.h>
+#include <linux/lustre_version.h>
+#include "llite_internal.h"
+
+struct ll_sai_entry {
+        struct list_head        se_list;
+        int                     se_index;
+        int                     se_stat;
+};
+
+enum {
+        SA_ENTRY_UNSTATED = 0,
+        SA_ENTRY_STATED
+};
+
+static struct ll_statahead_info *ll_sai_alloc(void)
+{
+        struct ll_statahead_info *sai;
+
+        OBD_ALLOC_PTR(sai);
+        if (!sai)
+                return NULL;
+
+        sai->sai_max = LL_STATAHEAD_MIN;
+        init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
+        INIT_LIST_HEAD(&sai->sai_entries);
+        atomic_set(&sai->sai_refc, 1);
+        return sai;
+}
+
+static inline 
+struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
+{
+        LASSERT(sai);
+        atomic_inc(&sai->sai_refc);
+        return sai;
+}
+
+static void ll_sai_put(struct ll_statahead_info *sai)
+{
+        struct inode *inode = sai->sai_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        ENTRY;
+
+        if (atomic_dec_and_lock(&sai->sai_refc, &lli->lli_lock)) {
+                struct ll_sai_entry  *entry, *next;
+
+                LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
+                list_for_each_entry_safe(entry, next, &sai->sai_entries,
+                                         se_list) {
+                        list_del(&entry->se_list);
+                        OBD_FREE_PTR(entry);
+                }
+                OBD_FREE_PTR(sai);
+                lli->lli_sai = NULL;
+                spin_unlock(&lli->lli_lock);
+                iput(inode);
+        }
+        EXIT;
+}
+
+static struct ll_sai_entry *ll_sai_entry_get(struct ll_statahead_info *sai,
+                                             int index, int stat)
+{
+        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+        struct ll_sb_info    *sbi = ll_i2sbi(sai->sai_inode);
+        struct ll_sai_entry  *entry;
+        ENTRY;
+
+        OBD_ALLOC_PTR(entry);
+        if (entry == NULL)
+                RETURN(NULL);
+        
+        CDEBUG(D_READA, "alloc sai entry %p index %d, stat %d\n",
+               entry, index, stat);
+        entry->se_index = index;
+        entry->se_stat  = stat;
+
+        spin_lock(&lli->lli_lock);
+        list_add_tail(&entry->se_list, &sai->sai_entries);
+        sai->sai_entries_nr++;
+        sbi->ll_sa_count = sai->sai_entries_nr;
+        spin_unlock(&lli->lli_lock);
+
+        LASSERT(sai->sai_entries_nr <= sbi->ll_sa_max);
+        RETURN(entry);
+}
+
+static void ll_sai_entry_set(struct ll_statahead_info *sai, int index,
+                             int stat)
+{
+        struct ll_sai_entry *entry;
+        ENTRY;
+
+        list_for_each_entry(entry, &sai->sai_entries, se_list) {
+                if (entry->se_index == index) {
+                        LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
+                        entry->se_stat = stat;
+                        CDEBUG(D_READA, "set sai entry %p index %d stat %d\n",
+                               entry, index, stat);
+                        EXIT;
+                        return;
+                }
+        }
+        /* Sometimes, this happens when entry has been put and freed */
+        CDEBUG(D_READA, "can't find sai entry index %d\n", index);
+        EXIT;
+}
+
+/* check first entry was stated already */
+static int ll_sai_entry_stated(struct ll_statahead_info *sai)
+{
+        struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+        struct ll_sai_entry  *entry;
+        int                   rc = 0;
+        ENTRY;
+
+        spin_lock(&lli->lli_lock);
+        if (!list_empty(&sai->sai_entries)) {
+                entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
+                                   se_list);
+                CDEBUG(D_READA, "check sai entry %p index %d stat %d\n",
+                       entry, entry->se_index, entry->se_stat);
+                rc = (entry->se_stat != SA_ENTRY_UNSTATED);
+        }
+        spin_unlock(&lli->lli_lock);
+
+        RETURN(rc);
+}
+
+/* inside lli_lock */
+static void ll_sai_entry_put(struct ll_statahead_info *sai)
+{
+        struct ll_sai_entry  *entry;
+        ENTRY;
+        
+        if (list_empty(&sai->sai_entries)) {
+                EXIT;
+                return;
+        }
+        LASSERT(sai->sai_entries_nr > 0);
+
+        entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
+        list_del(&entry->se_list);
+        sai->sai_entries_nr--;
+
+        CDEBUG(D_READA, "free sa entry %p index %d stat %d\n",
+               entry, entry->se_index, entry->se_stat);
+        OBD_FREE_PTR(entry);
+        EXIT;
+}
+
+/* finish lookup/revalidate */
+static int ll_statahead_interpret(struct obd_export *exp,
+                                  struct ptlrpc_request *req,
+                                  struct md_enqueue_info *minfo,
+                                  int rc)
+{
+        struct lookup_intent     *it = &minfo->mi_it;
+        struct dentry            *dentry = minfo->mi_dentry;
+        struct inode             *dir = dentry->d_parent->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_statahead_info *sai;
+        ENTRY;
+
+        CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
+               dentry->d_name.len, dentry->d_name.name, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        if (dentry->d_inode == NULL) {
+                /* lookup */
+                struct dentry    *save = dentry;
+                struct it_cb_data icbd = {
+                        .icbd_parent = dir,
+                        .icbd_childp = &dentry
+                };
+
+                rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
+                if (!rc) {
+                        LASSERT(dentry->d_inode);
+                        if (dentry != save)
+                                dput(save);
+                        ll_lookup_finish_locks(it, dentry);
+                }
+        } else {
+                /* revalidate */
+                struct mds_body *body;
+
+                body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
+                                      sizeof(*body));
+                if (memcmp(&minfo->mi_data.fid2, &body->fid1,
+                           sizeof(body->fid1))) {
+                        ll_unhash_aliases(dentry->d_inode);
+                        GOTO(out, rc = -EAGAIN);
+                }
+
+                rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
+                if (rc) {
+                        ll_unhash_aliases(dentry->d_inode);
+                        GOTO(out, rc);
+                }
+
+                spin_lock(&dcache_lock);
+                lock_dentry(dentry);
+                __d_drop(dentry);
+#ifdef DCACHE_LUSTRE_INVALID
+                dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
+#endif
+                unlock_dentry(dentry);
+                __d_rehash(dentry, 0);
+                spin_unlock(&dcache_lock);
+
+                ll_lookup_finish_locks(it, dentry);
+
+        }
+        EXIT;
+out:
+        spin_lock(&lli->lli_lock);
+        sai = lli->lli_sai;
+        if (sai) {
+                lli->lli_sai->sai_replied++;
+                ll_sai_entry_set(lli->lli_sai, (int)minfo->mi_cbdata,
+                                 SA_ENTRY_STATED);
+                wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+        }
+        spin_unlock(&lli->lli_lock);
+        ll_intent_release(it);
+        OBD_FREE_PTR(minfo);
+
+        dput(dentry);
+        return rc;
+}
+
+static void sa_args_fini(struct md_enqueue_info *minfo,
+                         struct ldlm_enqueue_info *einfo)
+{
+        LASSERT(minfo && einfo);
+        OBD_FREE_PTR(minfo);
+        OBD_FREE_PTR(einfo);
+}
+
+static int sa_args_prep(struct inode *dir, struct dentry *dentry,
+                        struct md_enqueue_info **pmi,
+                        struct ldlm_enqueue_info **pei)
+{
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+
+        OBD_ALLOC_PTR(einfo);
+        if (einfo == NULL)
+                return -ENOMEM;
+
+        OBD_ALLOC_PTR(minfo);
+        if (minfo == NULL) {
+                OBD_FREE_PTR(einfo);
+                return -ENOMEM;
+        }
+
+        minfo->mi_exp = ll_i2mdcexp(dir);
+        minfo->mi_it.it_op = IT_GETATTR;
+        minfo->mi_dentry = dentry;
+        minfo->mi_cb = ll_statahead_interpret;
+        minfo->mi_cbdata = (void *)lli->lli_sai->sai_sent;
+
+        einfo->ei_type   = LDLM_IBITS;
+        einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
+        einfo->ei_cb_bl  = ll_mdc_blocking_ast;
+        einfo->ei_cb_cp  = ldlm_completion_ast;
+        einfo->ei_cb_gl  = NULL;
+        einfo->ei_cbdata = NULL;
+
+        *pmi = minfo;
+        *pei = einfo;
+
+        return 0;
+}
+
+/* similar to ll_lookup_it(). */
+static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
+{
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        int                       rc;                
+        ENTRY;
+
+        rc = sa_args_prep(dir, dentry, &minfo, &einfo);
+        if (rc)
+                RETURN(rc);
+
+        rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
+                                    dentry->d_name.name, dentry->d_name.len, 0,
+                                    NULL);
+        if (rc == 0)
+                rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
+
+        if (rc)
+                sa_args_fini(minfo, einfo);
+
+        RETURN(rc);
+}
+
+/* similar to ll_revalidate_it().
+ * return 1: dentry valid.
+ *        0: will send stat-ahead request.
+ *        -errno: prepare stat-ahead request failed. */
+static int do_sa_revalidate(struct dentry *dentry)
+{
+        struct inode             *inode = dentry->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dentry->d_parent->d_inode);
+        struct ll_fid             fid;
+        struct lookup_intent      it = { .it_op = IT_GETATTR };
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        int rc;
+        ENTRY;
+
+        if (inode == NULL)
+                RETURN(1);
+
+        if (d_mountpoint(dentry))
+                RETURN(1);
+
+        ll_inode2fid(&fid, inode);
+
+        rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid);
+        if (rc == 1) {
+                ll_intent_release(&it);
+                lli->lli_sai->sai_cached++;
+                wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+                RETURN(1);
+        }
+
+        rc = sa_args_prep(dentry->d_parent->d_inode, dentry, &minfo, &einfo);
+        if (rc)
+                RETURN(rc);
+
+        rc = ll_prepare_mdc_op_data(&minfo->mi_data, dentry->d_parent->d_inode,
+                                    inode, dentry->d_name.name,
+                                    dentry->d_name.len, 0, NULL);
+        if (rc == 0)
+                rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
+
+        if (rc)
+                sa_args_fini(minfo, einfo);
+
+        RETURN(rc);
+}
+
+/* copied from kernel */
+static inline void name2qstr(struct qstr *this, const char *name, int namelen)
+{
+        unsigned long        hash;
+        const unsigned char *p = (const unsigned char *)name;
+        int                  len;
+        unsigned int         c;
+
+        hash = init_name_hash();
+        for (len = 0; len < namelen; len++, p++) {
+                c = *p;
+                hash = partial_name_hash(c, hash);
+        }
+        this->name = name;
+        this->len  = namelen;
+        this->hash = end_name_hash(hash);
+}
+
+static int ll_statahead_one(struct dentry *parent, ext2_dirent *de)
+{
+        struct inode           *dir = parent->d_inode;
+        struct ll_inode_info   *lli = ll_i2info(dir);
+        struct qstr             name;
+        struct dentry          *dentry;
+        struct ll_sai_entry    *se;
+        int                     rc;
+        ENTRY;
+
+        name2qstr(&name, de->name, de->name_len);
+
+        se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_sent,
+                              SA_ENTRY_UNSTATED);
+
+#ifdef DCACHE_LUSTRE_INVALID
+        if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
+#else
+        if (d_unhashed(parent)) {
+#endif
+                CDEBUG(D_READA, "parent dentry@%p %.*s is "
+                       "invalid, skip statahead\n",
+                       parent, parent->d_name.len, parent->d_name.name);
+                GOTO(out, rc = -EINVAL);
+        }
+
+        dentry = d_lookup(parent, &name);
+        if (!dentry) {
+                struct dentry *dentry = d_alloc(parent, &name);
+
+                rc = -ENOMEM;
+                if (dentry) {
+                        rc = do_sa_lookup(dir, dentry);
+                        if (rc)
+                                dput(dentry);
+                }
+                GOTO(out, rc);
+        }
+
+        rc = do_sa_revalidate(dentry);
+        if (rc)
+                dput(dentry);
+        GOTO(out, rc);
+out:
+        if (rc) {
+                CDEBUG(D_READA, "set sai entry %p index %d stat %d, rc %d\n",
+                       se, se->se_index, se->se_stat, rc);
+                se->se_stat = rc;
+                wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq);
+        }
+        lli->lli_sai->sai_sent++;
+        return rc;
+}
+                
+static inline int sa_check_stop(struct ll_statahead_info *sai)
+{
+        return !!(sai->sai_thread.t_flags & SVC_STOPPING);
+}
+
+static inline int sa_not_full(struct ll_statahead_info *sai)
+{
+        return sai->sai_sent - sai->sai_miss - sai->sai_hit < sai->sai_max;
+}
+
+struct ll_sa_thread_args {
+        struct dentry   *sta_parent;
+        pid_t            sta_pid;
+};
+
+static int ll_statahead_thread(void *arg)
+{
+        struct ll_sa_thread_args *sta = arg;
+        struct dentry            *parent = dget(sta->sta_parent);
+        struct inode             *dir = parent->d_inode;
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_sb_info        *sbi = ll_i2sbi(dir);
+        struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
+        struct ptlrpc_thread     *thread = &sai->sai_thread;
+        struct l_wait_info        lwi = { 0 };
+        unsigned long             index = 0;
+        __u64                     offset = 0;
+        int                       skip = 0;
+        int                       rc = 0;
+        char                      name[16] = "";
+        ENTRY;
+
+        sbi->ll_sa_total++;
+
+        snprintf(name, 15, "ll_sa_%u", sta->sta_pid);
+        cfs_daemonize(name);
+        thread->t_flags = SVC_RUNNING;
+        wake_up(&thread->t_ctl_waitq);
+        CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
+
+        if (sai->sai_ls_all)
+                CDEBUG(D_READA, "do statahead for hidden files\n");
+
+        while (1) {
+                unsigned long npages = dir_pages(dir);
+
+                /* hit ratio < 80% */
+                if ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
+                     (sai->sai_consecutive_miss > 8)) {
+                        sbi->ll_sa_wrong++;
+                        CDEBUG(D_READA, "statahead for dir %.*s hit ratio too "
+                               "low: hit/miss %u/%u, sent/replied %u/%u, "
+                               "cached %u\n",
+                               parent->d_name.len, parent->d_name.name,
+                               sai->sai_hit, sai->sai_miss, sai->sai_sent,
+                               sai->sai_replied, sai->sai_cached);
+                        break;
+                }
+
+                /* reach the end of dir */
+                if (index == npages) {
+                        CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
+                               index, npages);
+                        break;
+                }
+
+                l_wait_event(thread->t_ctl_waitq,
+                             sa_check_stop(sai) || sa_not_full(sai),
+                             &lwi);
+
+                if (sa_check_stop(sai))
+                        break;
+
+                for (; index < npages; index++, offset = 0) {
+                        char *kaddr, *limit;
+                        ext2_dirent *de;
+                        struct page *page;
+
+                        CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu"
+                               "/%lu size %llu\n",
+                               CFS_PAGE_SIZE, dir->i_ino, dir->i_generation,
+                               index, npages, dir->i_size);
+
+                        page = ll_get_dir_page(dir, index);
+                        npages = dir_pages(dir);
+
+                        if (IS_ERR(page)) {
+                                rc = PTR_ERR(page);
+                                CERROR("error reading dir %lu/%u page %lu: "
+                                       "rc %d\n",
+                                       dir->i_ino, dir->i_generation, index,
+                                       rc);
+                                GOTO(out, rc);
+                        }
+
+                        kaddr = page_address(page);
+                        de = (ext2_dirent *)(kaddr + offset);
+                        limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
+                        for (; (char*)de <= limit && sa_not_full(sai);
+                             de = ext2_next_entry(de)) {
+                                if (!de->inode)
+                                        continue;
+
+                                /* don't stat-ahead ".", ".." */
+                                if (skip < 2) {
+                                        skip++;
+                                        continue;
+                                }
+
+                                /* don't stat-ahead for hidden files */
+                                if (de->name[0] == '.' && !sai->sai_ls_all)
+                                        continue;
+
+                                /* don't stat-ahead for the first de */
+                                if (skip < 3) {
+                                        skip++;
+                                        continue;
+                                }
+
+                                rc = ll_statahead_one(parent, de);
+                                if (rc < 0) {
+                                        ext2_put_page(page);
+                                        GOTO(out, rc);
+                                }
+                        }
+                        offset = (char *)de - kaddr;
+                        ext2_put_page(page);
+
+                        if ((char *)de <= limit)
+                                /* !sa_not_full() */
+                                break;
+                }
+        }
+        EXIT;
+out:
+        thread->t_flags = SVC_STOPPED;
+        wake_up(&thread->t_ctl_waitq);
+        lli->lli_opendir_pid = 0; /* avoid statahead again */
+        ll_sai_put(sai);
+        dput(parent);
+        CDEBUG(D_READA, "stopped statahead thread, pid %d for %s\n",
+               current->pid, parent->d_name.name);
+        return 0;
+}
+
+/* called in ll_file_release */
+void ll_stop_statahead(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ptlrpc_thread *thread;
+
+        /* don't check pid here. upon fork, if parent closedir before child,
+         * child will not have chance to stop this thread. */
+        lli->lli_opendir_pid = 0;
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_sai) {
+                ll_sai_get(lli->lli_sai);
+                spin_unlock(&lli->lli_lock);
+
+                CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
+                       current->pid);
+                thread = &lli->lli_sai->sai_thread;
+                thread->t_flags = SVC_STOPPING;
+                wake_up(&thread->t_ctl_waitq);
+                wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+                ll_sai_put(lli->lli_sai);
+
+                return;
+        }
+        spin_unlock(&lli->lli_lock);
+}
+
+enum {
+        LS_NONE_FIRST_DE = 0,   /* not first dirent, or is "." */
+        LS_FIRST_DE,            /* the first non-hidden dirent */
+        LS_FIRST_DOT_DE         /* the first hidden dirent, that is ".xxx" */
+};
+
+static int is_first_dirent(struct inode *dir, struct dentry *dentry)
+{
+        struct qstr   *d_name = &dentry->d_name;
+        unsigned long  npages = dir_pages(dir);
+        struct page   *page;
+        ext2_dirent   *de;
+        unsigned long  index;
+        __u64          offset = 0;
+        char          *kaddr, *limit;
+        int            dot_de = 1; /* dirent is dotfile till now */
+        int            rc = LS_NONE_FIRST_DE;
+        ENTRY;
+
+        page = ll_get_dir_page(dir, 0);
+        if (IS_ERR(page)) {
+                CERROR("error reading dir %lu/%u page 0: rc %ld\n",
+                       dir->i_ino, dir->i_generation, PTR_ERR(page));
+                RETURN(LS_NONE_FIRST_DE);
+        }
+
+        kaddr = page_address(page);
+        de = (ext2_dirent *)kaddr;
+        if (!(de->name_len == 1 && strncmp(de->name, ".", 1) == 0))
+                CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
+        de = ext2_next_entry(de); /* skip ".", or ingore bad entry */
+        if (!(de->name_len == 2 && strncmp(de->name, "..", 2) == 0))
+                CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino);
+        de = ext2_next_entry(de); /* skip "..", or ingore bad entry */
+
+        offset = (char *)de - kaddr;
+
+        for (index = 0; index < npages; offset = 0) {
+                de = (ext2_dirent *)(kaddr + offset);
+                limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
+                for (; (char*)de <= limit; de = ext2_next_entry(de)) {
+                        if (!de->inode)
+                                continue;
+
+                        if (de->name[0] != '.')
+                                dot_de = 0;
+
+                        if (dot_de && d_name->name[0] != '.') {
+                                CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
+                                       d_name->len, d_name->name,
+                                       de->name_len, de->name);
+                                continue;
+                        }
+
+                        if (d_name->len == de->name_len &&
+                            !strncmp(d_name->name, de->name, d_name->len))
+                                rc = LS_FIRST_DE + dot_de;
+                        else
+                                rc = LS_NONE_FIRST_DE;
+                        GOTO(out, rc);
+                }
+
+                if (++index >= npages)
+                        break;
+
+                ext2_put_page(page);
+
+                page = ll_get_dir_page(dir, index);
+                if (IS_ERR(page)) {
+                        CERROR("error reading dir %lu/%u page %lu: rc %ld\n",
+                               dir->i_ino, dir->i_generation, index,
+                               PTR_ERR(page));
+                        RETURN(LS_NONE_FIRST_DE);
+                }
+                kaddr = page_address(page);
+        }
+        CERROR("%.*s not found in dir %.*s!\n", d_name->len, d_name->name,
+               dentry->d_parent->d_name.len, dentry->d_parent->d_name.name);
+        EXIT;
+out:
+        ext2_put_page(page);
+        return rc;
+}
+
+/* start stat-ahead thread if this is the first dir entry, otherwise if a thread
+ * is started already, wait until thread is ahead of me.
+ * Return value: 
+ *    0 -- miss,
+ *    1 -- hit,
+ *    -EEXIST -- stat ahead thread started, and this is the first try.
+ *    other negative value -- error.
+ */
+int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
+{
+        struct ll_sb_info        *sbi = ll_i2sbi(dir);
+        struct ll_inode_info     *lli = ll_i2info(dir);
+        struct ll_statahead_info *sai;
+        struct ll_sa_thread_args  sta;
+        int                       rc;
+        ENTRY;
+
+        if (sbi->ll_sa_max == 0)
+                RETURN(-ENOTSUPP);
+
+        /* not the same process, don't statahead */
+        if (lli->lli_opendir_pid != current->pid)
+                RETURN(-EBADF);
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_sai) {
+                sai = ll_sai_get(lli->lli_sai);
+                spin_unlock(&lli->lli_lock);
+
+                if (ll_sai_entry_stated(sai)) {
+                        sbi->ll_sa_cached++;
+                } else {
+                        struct l_wait_info lwi = { 0 };
+
+                        sbi->ll_sa_blocked++;
+                        /* thread started already, avoid double-stat */
+                        l_wait_event(sai->sai_thread.t_ctl_waitq,
+                                     ll_sai_entry_stated(sai) ||
+                                     sai->sai_thread.t_flags & SVC_STOPPED,
+                                     &lwi);
+                }
+
+                ll_sai_put(sai);
+
+                if (lookup) {
+                        struct dentry *result;
+
+                        result = d_lookup((*dentryp)->d_parent,
+                                          &(*dentryp)->d_name);
+                        if (result) {
+                                LASSERT(result != *dentryp);
+                                dput(*dentryp);
+                                *dentryp = result;
+                        }
+                        RETURN(result != NULL);
+                }
+                /* do nothing for revalidate */
+                RETURN(0);
+        }
+        spin_unlock(&lli->lli_lock);
+
+        rc = is_first_dirent(dir, *dentryp);
+        if (!rc) {
+                /* optimization: don't statahead for this pid any longer */
+                spin_lock(&lli->lli_lock);
+                if (lli->lli_sai == NULL)
+                        lli->lli_opendir_pid = 0;
+                spin_unlock(&lli->lli_lock);
+                RETURN(-EBADF);
+        }
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_sai == NULL) {
+                lli->lli_sai = ll_sai_alloc();
+                if (lli->lli_sai == NULL) {
+                        spin_unlock(&lli->lli_lock);
+                        RETURN(-ENOMEM);
+                }
+        } else {
+                /* sai is already there */
+                spin_unlock(&lli->lli_lock);
+                RETURN(-EBUSY);
+        }
+        spin_unlock(&lli->lli_lock);
+        
+        sai = lli->lli_sai;
+        sai->sai_inode = igrab(dir);
+        sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
+
+        sta.sta_parent = (*dentryp)->d_parent;
+        sta.sta_pid    = current->pid;
+        rc = kernel_thread(ll_statahead_thread, &sta, 0);
+        if (rc < 0) {
+                CERROR("can't start ll_sa thread, rc: %d\n", rc);
+                ll_sai_put(sai);
+                RETURN(rc);
+        }
+
+        wait_event(sai->sai_thread.t_ctl_waitq, 
+                   sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED));
+        ll_sai_put(sai);
+
+        /* we don't stat-ahead for the first dirent since we are already in
+         * lookup, and -EEXIST also indicates that this is the first dirent.
+         */
+        RETURN(-EEXIST);
+}
+
+/* update hit/miss count */
+void ll_statahead_exit(struct dentry *dentry, int result)
+{
+        struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode);
+        struct ll_sb_info    *sbi = ll_i2sbi(dentry->d_parent->d_inode);
+
+        if (lli->lli_opendir_pid != current->pid)
+                return;
+
+        spin_lock(&lli->lli_lock);
+        if (lli->lli_sai) {
+                struct ll_statahead_info *sai = lli->lli_sai;
+
+                ll_sai_entry_put(sai);
+                if (result == 1) {
+                        sai->sai_hit++;
+                        sai->sai_consecutive_miss = 0;
+                        sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
+                        CDEBUG(D_READA, "statahead %.*s hit(hit/miss %u/%u)\n",
+                               dentry->d_name.len, dentry->d_name.name,
+                               sai->sai_hit, sai->sai_miss);
+                } else {
+                        sai->sai_miss++;
+                        sai->sai_consecutive_miss++;
+                        /* upon miss, it's always because some dentry is added
+                         * by statahead thread, and at the mean time `ls`
+                         * processs finds this dentry, but the d_op for this
+                         * dentry is NULL, then revalidate is not done, and
+                         * ll_statahead_exit() not called for this dentry,
+                         * so statahead thread should be behind of `ls` process,
+                         * put one entry to go ahead.
+                         */
+                        CDEBUG(D_READA, "statahead %.*s miss(hit/miss %u/%u)\n",
+                               dentry->d_name.len, dentry->d_name.name,
+                               sai->sai_hit, sai->sai_miss);
+                        ll_sai_entry_put(sai);
+                }
+                wake_up(&sai->sai_thread.t_ctl_waitq);
+        }
+        spin_unlock(&lli->lli_lock);
+}
index d123ba1..5834f02 100644 (file)
@@ -239,6 +239,8 @@ int ll_getxattr_common(struct inode *inode, const char *name,
                 posix_acl_release(acl);
                 RETURN(rc);
         }
+        if (xattr_type == XATTR_ACL_DEFAULT_T && !S_ISDIR(inode->i_mode))
+                RETURN(-ENODATA);
 #endif
 
 do_getxattr:
index 8b7c6af..8bed138 100644 (file)
@@ -315,7 +315,8 @@ void mdc_getattr_pack(struct ptlrpc_request *req, int offset, int valid,
                 char *tmp;
                 tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1,
                                      data->namelen + 1);
-                LOGL0(data->name, data->namelen, tmp);
+                memcpy(tmp, data->name, data->namelen);
+                data->name = tmp;
         }
 }
 
@@ -396,8 +397,8 @@ void mdc_exit_request(struct client_obd *cli)
 
         spin_lock(&cli->cl_loi_list_lock);
         cli->cl_r_in_flight--;
-        list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
                 
+        list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
                 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
                         /* No free request slots anymore */
                         break;
index 7962b0a..6e08a73 100644 (file)
@@ -59,18 +59,6 @@ void it_clear_disposition(struct lookup_intent *it, int flag)
 }
 EXPORT_SYMBOL(it_clear_disposition);
 
-static int it_to_lock_mode(struct lookup_intent *it)
-{
-        /* CREAT needs to be tested before open (both could be set) */
-        if (it->it_op & IT_CREAT)
-                return LCK_CW;
-        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
-                return LCK_CR;
-
-        LBUG();
-        RETURN(-EINVAL);
-}
-
 int it_open_error(int phase, struct lookup_intent *it)
 {
         if (it_disposition(it, DISP_OPEN_OPEN)) {
@@ -195,25 +183,28 @@ static int round_up(int val)
  * but this is incredibly unlikely, and questionable whether the client
  * could do MDS recovery under OOM anyways... */
 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
-                                struct mds_body *body, int size[6])
+                                struct mds_body *body)
 {
-        int new_size, old_size;
+        int old_len, new_size, old_size;
+        struct lustre_msg *old_msg = req->rq_reqmsg;
         struct lustre_msg *new_msg;
 
+        old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
         /* save old size */
-        old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
-
-        size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
-        new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
+        old_size = lustre_msg_size(lustre_request_magic(req),
+                                   req->rq_reqmsg->lm_bufcount,
+                                   req->rq_reqmsg->lm_buflens);
+
+        lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
+                              body->eadatasize);
+        new_size = lustre_msg_size(lustre_request_magic(req),
+                                   req->rq_reqmsg->lm_bufcount,
+                                   req->rq_reqmsg->lm_buflens);
         OBD_ALLOC(new_msg, new_size);
         if (new_msg != NULL) {
-                struct lustre_msg *old_msg = req->rq_reqmsg;
-
-                DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
+                DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
                           body->eadatasize);
                 memcpy(new_msg, old_msg, old_size);
-                lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
-                                      body->eadatasize);
 
                 spin_lock(&req->rq_lock);
                 req->rq_reqmsg = new_msg;
@@ -222,70 +213,55 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
 
                 OBD_FREE(old_msg, old_size);
         } else {
+                lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
                 body->valid &= ~OBD_MD_FLEASIZE;
                 body->eadatasize = 0;
         }
 }
 
-/* We always reserve enough space in the reply packet for a stripe MD, because
- * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
-                struct lookup_intent *it, struct mdc_op_data *op_data,
-                struct lustre_handle *lockh, void *lmm, int lmmsize,
-                int extra_lock_flags)
+static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
+                                                   struct lookup_intent *it,
+                                                   struct mdc_op_data *data,
+                                                   void *lmm, int lmmsize)
 {
         struct ptlrpc_request *req;
-        struct obd_device *obddev = class_exp2obd(exp);
-        struct ldlm_res_id res_id =
-                { .name = {op_data->fid1.id, op_data->fid1.generation} };
-        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
-        struct ldlm_request *lockreq;
         struct ldlm_intent *lit;
-        struct ldlm_reply *lockrep;
+        struct obd_device *obddev = class_exp2obd(exp);
         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*lockreq),
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
-                        0, 0, 0, 0 };
+                        [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
+                        [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
+                        /* As an optimization, we allocate an RPC request buffer
+                         * for at least a default-sized LOV EA even if we aren't
+                         * sending one.  We grow the whole request to the next
+                         * power-of-two size since we get that much from a slab
+                         * allocation anyways. This avoids an allocation below
+                         * in the common case where we need to save a
+                         * default-sized LOV EA for open replay. */
+                        [DLM_INTENT_REC_OFF+2]= max(lmmsize,
+                                         obddev->u.cli.cl_default_mds_easize) };
         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                           [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
+                           [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
-                                                   cl_max_mds_easize, 0 };
-        int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
-        int repbufcnt = 4, rc;
-        void *eadata;
-        ENTRY;
-
-        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
-//        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
-//                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
-
-        if (it->it_op & IT_OPEN) {
+                                                        cl_max_mds_easize,
+                           [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
                 CFS_LIST_HEAD(cancels);
                 int count = 0;
                 int mode;
+        int rc;
                 
                 it->it_create_mode |= S_IFREG;
 
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
-                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
-                /* As an optimization, we allocate an RPC request buffer for
-                 * at least a default-sized LOV EA even if we aren't sending
-                 * one.  We grow the whole request to the next power-of-two
-                 * size since we get that much from a slab allocation anyways.
-                 * This avoids an allocation below in the common case where
-                 * we need to save a default-sized LOV EA for open replay. */
-                size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
-                                          obddev->u.cli.cl_default_mds_easize);
-                rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
-                                     size);
+        rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
                 if (rc & (rc - 1))
                         size[DLM_INTENT_REC_OFF + 2] =
-                                 min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
+                         min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
                                      obddev->u.cli.cl_max_mds_easize);
 
                 /* If inode is known, cancel conflicting OPEN locks. */
-                if (op_data->fid2.id) {
+        if (data->fid2.id) {
                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
                                 mode = LCK_CW;
 #ifdef FMODE_EXEC
@@ -294,9 +270,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 #endif
                         else 
                                 mode = LCK_CR;
-                        count = mdc_resource_get_unused(exp, &op_data->fid2,
-                                                        &cancels, mode,
-                                                        MDS_INODELOCK_OPEN);
+                count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
+                                                mode, MDS_INODELOCK_OPEN);
                 }
 
                 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
@@ -304,25 +279,19 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                         mode = LCK_EX;
                 else
                         mode = LCK_CR;
-                count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
-                                                 mode, MDS_INODELOCK_UPDATE);
+        count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
+                                         MDS_INODELOCK_UPDATE);
                 if (it->it_flags & O_JOIN_FILE) {
+                __u64 head_size = (*(__u64 *)data->data);
                         /* join is like an unlink of the tail */
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                        size[DLM_INTENT_REC_OFF + 3] =
-                                                 sizeof(struct mds_rec_join);
-                        req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
-                                                    count);
-                        mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
-                                      (*(__u64 *)op_data->data));
+                size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
+                req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
+                mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, head_size);
                 } else {
-                        req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
-                                                    count);
+                req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
                 }
 
-                if (!req)
-                        RETURN(-ENOMEM);
-
+        if (req) {
                 spin_lock(&req->rq_lock);
                 req->rq_replay = 1;
                 spin_unlock(&req->rq_lock);
@@ -333,76 +302,110 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
+                mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
                               it->it_create_mode, 0, it->it_flags,
                               lmm, lmmsize);
 
-                repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
-        } else if (it->it_op & IT_UNLINK) {
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
-                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
-                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
+                ptlrpc_req_set_repsize(req, 5, repsize);
+        }
+        return req;
+}
+
+static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
+                                                     struct lookup_intent *it,
+                                                     struct mdc_op_data *data)
+{
+        struct ptlrpc_request *req;
+        struct ldlm_intent *lit;
+        struct obd_device *obddev = class_exp2obd(exp);
+        int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
+                        [DLM_INTENT_IT_OFF]   = sizeof(*lit),
+                        [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_unlink),
+                        [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
+        int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                           [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
+                           [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
+                           [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
+                                                        cl_max_mds_easize,
+                           [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
+                                                        cl_max_mds_cookiesize };
 
+        req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
+        if (req) {
                 /* pack the intent */
                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
                                      sizeof(*lit));
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
+                mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
 
-                repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
-        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
-                obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
-                                  OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
-                                  OBD_MD_FLDIREA;
-                size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
-                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
+                ptlrpc_req_set_repsize(req, 5, repsize);
+        }
+        return req;
+}
 
-                if (it->it_op & IT_GETATTR)
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
+                                                     struct lookup_intent *it,
+                                                     struct mdc_op_data *data)
+{
+        struct ptlrpc_request *req;
+        struct ldlm_intent *lit;
+        struct obd_device *obddev = class_exp2obd(exp);
+        int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
+                        [DLM_INTENT_IT_OFF]   = sizeof(*lit),
+                        [DLM_INTENT_REC_OFF]  = sizeof(struct mds_body),
+                        [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
+        int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                           [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
+                           [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
+                           [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
+                                                        cl_max_mds_easize,
+                           [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
+        obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
+                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
 
                 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
-
+        if (req) {
                 /* pack the intent */
                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
                                      sizeof(*lit));
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
-                                 it->it_flags, op_data);
-
-                repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
-        } else if (it->it_op == IT_READDIR) {
-                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                repbufcnt = 2;
-        } else {
-                LBUG();
-                RETURN(-EINVAL);
+                mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
+                                 data);
+                ptlrpc_req_set_repsize(req, 5, repsize);
         }
+        return req;
+}
 
-        /* get ready for the reply */
-        ptlrpc_req_set_repsize(req, repbufcnt, repsize);
+static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
+{
+        struct ptlrpc_request *req;
+        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
+        int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+                           [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply) };
+
+        req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+        if (req)
+                ptlrpc_req_set_repsize(req, 2, repsize);
+        return req;
+}
 
-         /* It is important to obtain rpc_lock first (if applicable), so that
-          * threads that are serialised with rpc_lock are not polluting our
-          * rpcs in flight counter */
-        mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
-        mdc_enter_request(&obddev->u.cli);
-        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
-                              0, NULL, lockh, 0);
-        mdc_exit_request(&obddev->u.cli);
-        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+static int mdc_finish_enqueue(struct obd_export *exp,
+                              struct ptlrpc_request *req,
+                              struct ldlm_enqueue_info *einfo,
+                              struct lookup_intent *it,
+                              struct lustre_handle *lockh,
+                              int rc)
+{
+        struct ldlm_request *lockreq;
+        struct ldlm_reply *lockrep;
+        ENTRY;
 
         /* Similarly, if we're going to replay this request, we don't want to
          * actually get a lock, just perform the intent. */
@@ -456,16 +459,17 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
          * It's important that we do this first!  Otherwise we might exit the
          * function without doing so, and try to replay a failed create
          * (bug 3440) */
-        if (it->it_op & IT_OPEN && req->rq_replay &&
-            (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
+        if ((it->it_op & IT_OPEN) &&
+            req->rq_replay &&
+            (!it_disposition(it, DISP_OPEN_OPEN) ||
+             it->d.lustre.it_status != 0))
                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
 
         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
 
         /* We know what to expect, so we do any byte flipping required here */
-        LASSERT(repbufcnt == 5 || repbufcnt == 2);
-        if (repbufcnt == 5) {
+        if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
                 struct mds_body *body;
 
                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
@@ -484,6 +488,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                         mdc_set_open_replay_data(NULL, req);
 
                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
+                        void *eadata;
+
                         /* The eadata is opaque; just check that it is there.
                          * Eventually, obd_unpackmd() will check the contents */
                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
@@ -493,6 +499,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                                 RETURN (-EPROTO);
                         }
                         if (body->valid & OBD_MD_FLMODEASIZE) {
+                                struct obd_device *obddev = class_exp2obd(exp);
+
                                 if (obddev->u.cli.cl_max_mds_easize < 
                                                         body->max_mdsize) {
                                         obddev->u.cli.cl_max_mds_easize = 
@@ -514,10 +522,11 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                          * reallocate it here to hold the actual LOV EA. */
                         if (it->it_op & IT_OPEN) {
                                 int offset = DLM_INTENT_REC_OFF + 2;
+                                void *lmm;
 
                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
                                     body->eadatasize)
-                                        mdc_realloc_openmsg(req, body, size);
+                                        mdc_realloc_openmsg(req, body);
 
                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
                                                      body->eadatasize);
@@ -529,63 +538,74 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         RETURN(rc);
 }
-EXPORT_SYMBOL(mdc_enqueue);
 
-/* 
- * This long block is all about fixing up the lock and request state
- * so that it is correct as of the moment _before_ the operation was
- * applied; that way, the VFS will think that everything is normal and
- * call Lustre's regular VFS methods.
- *
- * If we're performing a creation, that means that unless the creation
- * failed with EEXIST, we should fake up a negative dentry.
- *
- * For everything else, we want to lookup to succeed.
- *
- * One additional note: if CREATE or OPEN succeeded, we add an extra
- * reference to the request because we need to keep it around until
- * ll_create/ll_open gets called.
- *
- * The server will return to us, in it_disposition, an indication of
- * exactly what d.lustre.it_status refers to.
- *
- * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
- * otherwise if DISP_OPEN_CREATE is set, then it status is the
- * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
- * DISP_LOOKUP_POS will be set, indicating whether the child lookup
- * was successful.
- *
- * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
- * child lookup.
- */
-int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
-                    void *lmm, int lmmsize, struct lookup_intent *it,
-                    int lookup_flags, struct ptlrpc_request **reqp,
-                    ldlm_blocking_callback cb_blocking, int extra_lock_flags)
+/* We always reserve enough space in the reply packet for a stripe MD, because
+ * we don't know in advance the file type. */
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct mdc_op_data *data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+                int extra_lock_flags)
 {
-        struct lustre_handle lockh;
-        struct ptlrpc_request *request;
-        int rc = 0;
-        struct mds_body *mds_body;
-        struct lustre_handle old_lock;
-        struct ldlm_lock *lock;
+        struct ptlrpc_request *req;
+        struct obd_device *obddev = class_exp2obd(exp);
+        struct ldlm_res_id res_id =
+                { .name = {data->fid1.id, data->fid1.generation} };
+        ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
+        int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+        int rc;
         ENTRY;
-        LASSERT(it);
 
-        CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
-               op_data->namelen, op_data->name, op_data->fid1.id,
-               ldlm_it2str(it->it_op), it->it_flags);
+        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
+        if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
 
-        if (op_data->fid2.id &&
-            (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
+        if (it->it_op & IT_OPEN) {
+                req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
+                if (it->it_flags & O_JOIN_FILE) {
+                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                }
+        } else if (it->it_op & IT_UNLINK) {
+                req = mdc_intent_unlink_pack(exp, it, data);
+        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
+                req = mdc_intent_lookup_pack(exp, it, data);
+        } else if (it->it_op == IT_READDIR) {
+                req = mdc_intent_readdir_pack(exp);
+        } else {
+                CERROR("bad it_op %x\n", it->it_op);
+                RETURN(-EINVAL);
+        }
+
+        if (!req)
+                RETURN(-ENOMEM);
+
+         /* It is important to obtain rpc_lock first (if applicable), so that
+          * threads that are serialised with rpc_lock are not polluting our
+          * rpcs in flight counter */
+        mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+        mdc_enter_request(&obddev->u.cli);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+                              0, NULL, lockh, 0);
+        mdc_exit_request(&obddev->u.cli);
+        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+
+        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(mdc_enqueue);
+
+int mdc_revalidate_lock(struct obd_export *exp,
+                        struct lookup_intent *it,
+                        struct ll_fid *fid)
+{
                 /* We could just return 1 immediately, but since we should only
                  * be called in revalidate_it if we already have a lock, let's
                  * verify that. */
-                struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
-                                                     op_data->fid2.generation}};
+        struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
                 struct lustre_handle lockh;
                 ldlm_policy_data_t policy;
                 int mode = LCK_CR;
+        int rc;
 
                 /* As not all attributes are kept under update lock, e.g. 
                    owner/group/acls are under lookup lock, we need both 
@@ -594,62 +614,44 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
                         MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
                         MDS_INODELOCK_LOOKUP;
 
-                rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                     LDLM_FL_BLOCK_GRANTED, &res_id,
-                                     LDLM_IBITS, &policy, LCK_CR, &lockh);
+        rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED,
+                             &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
                 if (!rc) {
                         mode = LCK_CW;
                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy,LCK_CW,&lockh);
+                                     LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+                                     &policy, LCK_CW, &lockh);
                 }
                 if (!rc) {
                         mode = LCK_PR;
                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
-                                             LDLM_FL_BLOCK_GRANTED, &res_id,
-                                             LDLM_IBITS, &policy,LCK_PR,&lockh);
+                                     LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
+                                     &policy, LCK_PR, &lockh);
                 }
                 if (rc) {
-                        memcpy(&it->d.lustre.it_lock_handle, &lockh,
-                               sizeof(lockh));
+                memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
                         it->d.lustre.it_lock_mode = mode;
                 }
 
-                /* Only return failure if it was not GETATTR by cfid
-                   (from inode_revalidate) */
-                if (rc || op_data->namelen != 0)
-                        RETURN(rc);
-        }
+        return rc;
+}
+EXPORT_SYMBOL(mdc_revalidate_lock);
 
-        /* lookup_it may be called only after revalidate_it has run, because
-         * revalidate_it cannot return errors, only zero.  Returning zero causes
-         * this call to lookup, which *can* return an error.
-         *
-         * We only want to execute the request associated with the intent one
-         * time, however, so don't send the request again.  Instead, skip past
-         * this and use the request from revalidate.  In this case, revalidate
-         * never dropped its reference, so the refcounts are all OK */
-        if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
-                struct ldlm_enqueue_info einfo =
-                        { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
-                          ldlm_completion_ast, NULL, NULL };
+static int mdc_finish_intent_lock(struct obd_export *exp,
+                                  struct ptlrpc_request *req,
+                                  struct mdc_op_data *data,
+                                  struct lookup_intent *it,
+                                  struct lustre_handle *lockh)
+{
+        struct mds_body *mds_body;
+        struct lustre_handle old_lock;
+        struct ldlm_lock *lock;
+        int rc;
+        ENTRY;
 
-                rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
-                                 lmm, lmmsize, extra_lock_flags);
-                if (rc < 0)
-                        RETURN(rc);
-                memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
-        } else if (!op_data->fid2.id) {
-                /* DISP_ENQ_COMPLETE set means there is extra reference on
-                 * request referenced from this intent, saved for subsequent
-                 * lookup.  This path is executed when we proceed to this
-                 * lookup, so we clear DISP_ENQ_COMPLETE */
-                it_clear_disposition(it, DISP_ENQ_COMPLETE);
-        }
-        request = *reqp = it->d.lustre.it_data;
-        LASSERT(request != NULL);
-        LASSERT(request != LP_POISON);
-        LASSERT(request->rq_repmsg != LP_POISON);
+        LASSERT(req != NULL);
+        LASSERT(req != LP_POISON);
+        LASSERT(req->rq_repmsg != LP_POISON);
 
         if (!it_disposition(it, DISP_IT_EXECD)) {
                 /* The server failed before it even started executing the
@@ -661,19 +663,18 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
         if (rc)
                 RETURN(rc);
 
-        mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
+        mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
                                   sizeof(*mds_body));
         LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
-        LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
+        LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* mdc_enqueue swabbed */
 
         /* If we were revalidating a fid/name pair, mark the intent in
          * case we fail and get called again from lookup */
-        if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
+        if (data->fid2.id && (it->it_op != IT_GETATTR)) {
                 it_set_disposition(it, DISP_ENQ_COMPLETE);
                 /* Also: did we find the same inode? */
-                if (memcmp(&op_data->fid2, &mds_body->fid1,
-                           sizeof(op_data->fid2)))
-                        RETURN (-ESTALE);
+                if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2))) 
+                        RETURN(-ESTALE);
         }
 
         rc = it_open_error(DISP_LOOKUP_EXECD, it);
@@ -687,13 +688,13 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
             it_disposition(it, DISP_OPEN_CREATE) &&
             !it_open_error(DISP_OPEN_CREATE, it)) {
                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
-                ptlrpc_request_addref(request); /* balanced in ll_create_node */
+                ptlrpc_request_addref(req); /* balanced in ll_create_node */
         }
         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
             it_disposition(it, DISP_OPEN_OPEN) &&
             !it_open_error(DISP_OPEN_OPEN, it)) {
                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
-                ptlrpc_request_addref(request); /* balanced in ll_file_open */
+                ptlrpc_request_addref(req); /* balanced in ll_file_open */
                 /* BUG 11546 - eviction in the middle of open rpc processing */
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
         }
@@ -711,25 +712,199 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
          * mdc_enqueue, because we need to use the child's inode as
          * the l_ast_data to match, and that's not available until
          * intent_finish has performed the iget().) */
-        lock = ldlm_handle2lock(&lockh);
+        lock = ldlm_handle2lock(lockh);
         if (lock) {
                 ldlm_policy_data_t policy = lock->l_policy_data;
+
                 LDLM_DEBUG(lock, "matching against this");
                 LDLM_LOCK_PUT(lock);
-                memcpy(&old_lock, &lockh, sizeof(lockh));
+                memcpy(&old_lock, lockh, sizeof(*lockh));
                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
-                        ldlm_lock_decref_and_cancel(&lockh,
+                        ldlm_lock_decref_and_cancel(lockh,
                                                     it->d.lustre.it_lock_mode);
-                        memcpy(&lockh, &old_lock, sizeof(old_lock));
-                        memcpy(&it->d.lustre.it_lock_handle, &lockh,
-                               sizeof(lockh));
+                        memcpy(lockh, &old_lock, sizeof(old_lock));
+                        memcpy(&it->d.lustre.it_lock_handle, lockh,
+                               sizeof(*lockh));
                 }
         }
+
         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
-               op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
+               data->namelen, data->name, ldlm_it2str(it->it_op),
                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
+        RETURN(rc);
+}
+
+/* 
+ * This long block is all about fixing up the lock and request state
+ * so that it is correct as of the moment _before_ the operation was
+ * applied; that way, the VFS will think that everything is normal and
+ * call Lustre's regular VFS methods.
+ *
+ * If we're performing a creation, that means that unless the creation
+ * failed with EEXIST, we should fake up a negative dentry.
+ *
+ * For everything else, we want to lookup to succeed.
+ *
+ * One additional note: if CREATE or OPEN succeeded, we add an extra
+ * reference to the request because we need to keep it around until
+ * ll_create/ll_open gets called.
+ *
+ * The server will return to us, in it_disposition, an indication of
+ * exactly what d.lustre.it_status refers to.
+ *
+ * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
+ * otherwise if DISP_OPEN_CREATE is set, then it status is the
+ * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
+ * DISP_LOOKUP_POS will be set, indicating whether the child lookup
+ * was successful.
+ *
+ * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
+ * child lookup.
+ */
+int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
+                    void *lmm, int lmmsize, struct lookup_intent *it,
+                    int lookup_flags, struct ptlrpc_request **reqp,
+                    ldlm_blocking_callback cb_blocking, int extra_lock_flags)
+{
+        struct lustre_handle lockh;
+        int rc;
+        ENTRY;
+
+        LASSERT(it);
+
+        CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
+               op_data->namelen, op_data->name, op_data->fid1.id,
+               ldlm_it2str(it->it_op), it->it_flags);
+
+        if (op_data->fid2.id &&
+            (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
+                rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
+                /* Only return failure if it was not GETATTR by cfid
+                   (from inode_revalidate) */
+                if (rc || op_data->namelen != 0)
+                        RETURN(rc);
+        }
+
+        /* lookup_it may be called only after revalidate_it has run, because
+         * revalidate_it cannot return errors, only zero.  Returning zero causes
+         * this call to lookup, which *can* return an error.
+         *
+         * We only want to execute the request associated with the intent one
+         * time, however, so don't send the request again.  Instead, skip past
+         * this and use the request from revalidate.  In this case, revalidate
+         * never dropped its reference, so the refcounts are all OK */
+        if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
+                struct ldlm_enqueue_info einfo =
+                        { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
+                          ldlm_completion_ast, NULL, NULL };
+
+                rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
+                                 lmm, lmmsize, extra_lock_flags);
+                if (rc < 0)
+                        RETURN(rc);
+                memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
+        } else if (!op_data->fid2.id) {
+                /* DISP_ENQ_COMPLETE set means there is extra reference on
+                 * request referenced from this intent, saved for subsequent
+                 * lookup.  This path is executed when we proceed to this
+                 * lookup, so we clear DISP_ENQ_COMPLETE */
+                it_clear_disposition(it, DISP_ENQ_COMPLETE);
+        }
+
+        *reqp = it->d.lustre.it_data;
+        rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(mdc_intent_lock);
+
+static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
+                                              void *unused, int rc)
+{
+        struct mdc_enqueue_args  *ma;
+        struct md_enqueue_info   *minfo;
+        struct ldlm_enqueue_info *einfo;
+        struct obd_export        *exp;
+        struct lookup_intent     *it;
+        struct lustre_handle     *lockh;
+        struct obd_device        *obddev;
+        int                       flags = LDLM_FL_HAS_INTENT;
+        ENTRY;
+
+        ma = (struct mdc_enqueue_args *)&req->rq_async_args;
+        minfo = ma->ma_mi;
+        einfo = ma->ma_ei;
+
+        exp   = minfo->mi_exp;
+        it    = &minfo->mi_it;
+        lockh = &minfo->mi_lockh;
+
+        obddev = class_exp2obd(exp);
+
+        mdc_exit_request(&obddev->u.cli);
+
+        rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
+                                   &flags, NULL, 0, NULL, lockh, rc);
+
+        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
+
+        rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
+        GOTO(out, rc);
+out:
+        OBD_FREE_PTR(einfo);
+        minfo->mi_cb(exp, req, minfo, rc);
+
+        return 0;
+}
+
+int mdc_intent_getattr_async(struct obd_export *exp,
+                             struct md_enqueue_info *minfo,
+                             struct ldlm_enqueue_info *einfo)
+{
+        struct mdc_op_data      *op_data = &minfo->mi_data;
+        struct lookup_intent    *it = &minfo->mi_it;
+        struct ptlrpc_request   *req;
+        struct obd_device       *obddev = class_exp2obd(exp);
+        struct ldlm_res_id       res_id = {
+                                        .name = {op_data->fid1.id,
+                                                 op_data->fid1.generation}
+                                 };
+        ldlm_policy_data_t       policy = {
+                                        .l_inodebits = { MDS_INODELOCK_LOOKUP }
+                                 };
+        struct mdc_enqueue_args *aa;
+        int                      rc;
+        int                      flags = LDLM_FL_HAS_INTENT;
+        ENTRY;
+
+        CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
+               op_data->namelen, op_data->name, op_data->fid1.id,
+               ldlm_it2str(it->it_op), it->it_flags);
+
+        req = mdc_intent_lookup_pack(exp, it, op_data);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        mdc_enter_request(&obddev->u.cli);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+                              0, NULL, &minfo->mi_lockh, 1);
+        if (rc < 0) {
+                mdc_exit_request(&obddev->u.cli);
+                RETURN(rc);
+        }
+
+        CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
+        aa = (struct mdc_enqueue_args *)&req->rq_async_args;
+        aa->ma_mi = minfo;
+        aa->ma_ei = einfo;
+        req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
+        ptlrpcd_add_req(req);
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(mdc_intent_getattr_async);
index 0846aca..f73c236 100644 (file)
@@ -108,6 +108,7 @@ static
 int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, 
                        unsigned int acl_size, struct ptlrpc_request *req)
 {
+        struct obd_device *obddev = class_exp2obd(exp);
         struct mds_body *body;
         void *eadata;
         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
@@ -128,7 +129,9 @@ int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
 
         ptlrpc_req_set_repsize(req, bufcount, size);
 
+        mdc_enter_request(&obddev->u.cli);
         rc = ptlrpc_queue_wait(req);
+        mdc_exit_request(&obddev->u.cli);
         if (rc != 0)
                 RETURN (rc);
 
@@ -236,6 +239,7 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid,
                      const char *input, int input_size, int output_size,
                      int flags, struct ptlrpc_request **request)
 {
+        struct obd_device *obddev = class_exp2obd(exp);
         struct ptlrpc_request *req;
         int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) };
         // int size[3] = {sizeof(struct mds_body)}, bufcnt = 1;
@@ -287,11 +291,15 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid,
         /* make rpc */
         if (opcode == MDS_SETXATTR)
                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+        else
+                mdc_enter_request(&obddev->u.cli);
 
         rc = ptlrpc_queue_wait(req);
 
         if (opcode == MDS_SETXATTR)
                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+        else
+                mdc_exit_request(&obddev->u.cli);
 
         if (rc != 0)
                 GOTO(err_out, rc);
index 2a7c03a..7faa80a 100644 (file)
@@ -90,6 +90,7 @@ FAIL_ON_ERROR=false
 
 cleanup() {
        echo -n "cln.."
+       pgrep ll_sa > /dev/null && { echo "There are ll_sa thread not exit!"; exit 20; }
        cleanupall ${FORCE} $* || { echo "FAILed to clean up"; exit 20; }
 }
 setup() {
@@ -4046,6 +4047,63 @@ test_122() { #bug #11544
 }
 run_test 122 "fail client bulk callback (shouldn't LBUG) ======="
 
+test_123() # statahead(bug 11401)
+{
+        if [ -z "$(grep "processor.*: 1" /proc/cpuinfo)" ]; then
+                log "testing on UP system. Performance may be not as good as expected."
+        fi
+
+        mkdir -p $DIR/$tdir
+
+        for ((i=1, j=0; i<=10000; j=$i, i=$((i * 10)) )); do
+                createmany -o $DIR/$tdir/$tfile $j $((i - j))
+
+                grep '[0-9]' $LPROC/llite/*/statahead_max
+                cancel_lru_locks mdc
+                stime=`date +%s`
+                ls -l $DIR/$tdir > /dev/null
+                etime=`date +%s`
+                delta_sa=$((etime - stime))
+                log "ls $i files with statahead:    $delta_sa sec"
+
+                for client in $LPROC/llite/*; do
+                        max=`cat $client/statahead_max`
+                        cat $client/statahead_stats
+                        echo 0 > $client/statahead_max
+                done
+
+                grep '[0-9]' $LPROC/llite/*/statahead_max
+                cancel_lru_locks mdc
+                stime=`date +%s`
+                ls -l $DIR/$tdir > /dev/null
+                etime=`date +%s`
+                delta=$((etime - stime))
+                log "ls $i files without statahead: $delta sec"
+
+                for client in /proc/fs/lustre/llite/*; do
+                        cat $client/statahead_stats
+                        echo $max > $client/statahead_max
+                done
+
+                if [ $delta_sa -gt $delta ]; then
+                        log "ls $i files is slower with statahead!"
+                fi
+        done
+        log "ls done"
+
+        stime=`date +%s`
+        rm -r $DIR/$tdir
+        sync
+        etime=`date +%s`
+        delta=$((etime - stime))
+        log "rm -r $DIR/$tdir/: $delta seconds"
+        log "rm done"
+        cat /proc/fs/lustre/llite/*/statahead_stats
+        # wait for commitment of removal
+        sleep 2
+}
+run_test 123 "verify statahead work"
+
 TMPDIR=$OLDTMPDIR
 TMP=$OLDTMP
 HOME=$OLDHOME