Whamcloud - gitweb
LU-14361 statahead: add statahead advise IOCTL 25/48625/16
authorQian Yingjin <qian@ddn.com>
Thu, 22 Sep 2022 09:24:14 +0000 (05:24 -0400)
committerOleg Drokin <green@whamcloud.com>
Thu, 31 Aug 2023 06:21:21 +0000 (06:21 +0000)
This patch reuse ioctl(LL_IOC_LADVISE2) for statahead advise.
This allows userspace programs to advise the kernel statahead
of the order that they will be traversing a directory, so that
the client can prefetch inode attributes from the MDT, similar
to what posix_fadvise(POSIX_FADV_SEQUENTIAL) does for file data.

After patched mdtest via adding this statahead IOCTL hint, it
can support mdtest benchmark with regularized file naming format:
mdtest.$rank.$i
The usage of this statahead advise IOCTL could be as follows:
open(dir);
ioctl(dir_fd, IOC_LADVISE2, ...);
stat mdtest.0.0;
stat mdtest.0.1;
stat mdtest.0.2;
stat mdtest.0.3;
...
clsoedir(dir);

Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: Iac38e33bfc6d7a0b755c2646ba8053a263e3afc9
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/48625
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/statahead.c
lustre/lmv/lmv_obd.c
lustre/mdt/mdt_handler.c

index 48ead30..9881854 100644 (file)
@@ -631,6 +631,7 @@ struct ll_ioc_lease_id {
 #define LL_IOC_FID2MDTIDX              _IOWR('f', 248, struct lu_fid)
 #define LL_IOC_GETPARENT               _IOWR('f', 249, struct getparent)
 #define LL_IOC_LADVISE                 _IOR('f', 250, struct llapi_lu_ladvise)
+#define LL_IOC_LADVISE2                        _IOW('f', 250, struct llapi_lu_ladvise2)
 #define LL_IOC_HEAT_GET                        _IOWR('f', 251, struct lu_heat)
 #define LL_IOC_HEAT_SET                        _IOW('f', 251, __u64)
 #define LL_IOC_PCC_ATTACH              _IOW('f', 252, struct lu_pcc_attach)
@@ -2610,6 +2611,8 @@ enum lu_ladvise_type {
        LU_LADVISE_DONTNEED     = 2,
        LU_LADVISE_LOCKNOEXPAND = 3,
        LU_LADVISE_LOCKAHEAD    = 4,
+       /* Ahead operations for open|create|stat|read|write. */
+       LU_LADVISE_AHEAD        = 5,
        LU_LADVISE_MAX
 };
 
@@ -2618,6 +2621,7 @@ enum lu_ladvise_type {
        [LU_LADVISE_DONTNEED]           = "dontneed",                   \
        [LU_LADVISE_LOCKNOEXPAND]       = "locknoexpand",               \
        [LU_LADVISE_LOCKAHEAD]          = "lockahead",                  \
+       [LU_LADVISE_AHEAD]              = "ahead",                      \
 }
 
 /* This is the userspace argument for ladvise.  It is currently the same as
@@ -2633,6 +2637,58 @@ struct llapi_lu_ladvise {
        __u32 lla_value4;
 };
 
+struct llapi_lu_ladvise2 {
+       __u16   lla_advice;     /* advice type */
+       __u16   lla_value1;     /* values for different advice types */
+       __u32   lla_value2;
+       __u64   lla_start;
+       __u64   lla_end;
+       __u32   lla_value3;
+       __u32   lla_value4;
+       union {
+               struct {
+                       __u32   lla_value5;
+                       __u32   lla_value6;
+               };
+               char lla_buf[NAME_MAX + 1];
+       };
+};
+
+/* I/O call sequences in a batch access. */
+enum lu_access_flags {
+       ACCESS_FL_NONE  = 0x0,
+       ACCESS_FL_STAT  = 0x01,
+       ACCESS_FL_OPEN  = 0x02,
+       ACCESS_FL_CREAT = 0x04,
+       ACCESS_FL_READ  = 0x08,
+       ACCESS_FL_WRITE = 0x10,
+       ACCESS_FL_OC    = ACCESS_FL_OPEN | ACCESS_FL_CREAT,
+       ACCESS_FL_SOR   = ACCESS_FL_STAT | ACCESS_FL_OPEN | ACCESS_FL_READ,
+       ACCESS_FL_OCW   = ACCESS_FL_OPEN | ACCESS_FL_CREAT | ACCESS_FL_WRITE,
+};
+
+enum lu_ahead_mode {
+       LU_AH_MODE_NONE         = 0,
+       /*
+        * The batch access pattern obeys certain naming rules, such as mdtest
+        * with the file naming format mdtest.$rank.$i.
+        */
+       LU_AH_NAME_INDEX        = 1,
+       /*
+        * Provide a file name list as input to do batch accesses with
+        * irregular file name format.
+        */
+       LU_AH_NAME_ARRAY        = 2,
+       /* Prefetching in readdir() order under a directory. */
+       LU_AH_NAME_READDIR      = 3,
+       LU_AH_MODE_MAX,
+};
+
+#define lla_ahead_mode         lla_value1
+#define lla_access_flags       lla_value2
+#define lla_batch_max          lla_value3
+#define lla_fname              lla_buf
+
 enum ladvise_flag {
        LF_ASYNC        = 0x00000001,
        LF_UNSET        = 0x00000002,
index 8e1a1fb..2856bb7 100644 (file)
@@ -1534,6 +1534,8 @@ static char *ll_getname(const char __user *filename)
        return tmp;
 }
 
+static const char *const ladvise_names[] = LU_LADVISE_NAMES;
+
 #define ll_putname(filename) OBD_FREE(filename, NAME_MAX + 1);
 
 static long ll_dir_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
@@ -2292,6 +2294,58 @@ migrate_free:
 
                RETURN(rc);
        }
+       case LL_IOC_LADVISE2: {
+               struct llapi_lu_ladvise2 *ladvise;
+
+               OBD_ALLOC_PTR(ladvise);
+               if (ladvise == NULL)
+                       RETURN(-ENOMEM);
+
+               if (copy_from_user(ladvise, uarg, sizeof(*ladvise)))
+                       GOTO(out_ladvise, rc = -EFAULT);
+
+               switch (ladvise->lla_advice) {
+               case LU_LADVISE_AHEAD:
+                       if (ladvise->lla_start >= ladvise->lla_end) {
+                               CDEBUG(D_VFSTRACE,
+                                      "%s: Invalid range (%llu %llu) for %s\n",
+                                      sbi->ll_fsname, ladvise->lla_start,
+                                      ladvise->lla_end,
+                                      ladvise_names[ladvise->lla_advice]);
+                               GOTO(out_ladvise, rc = -EINVAL);
+                       }
+
+                       /*
+                        * Currently we only support name indexing format
+                        * ahead operations.
+                        */
+                       if (ladvise->lla_ahead_mode != LU_AH_NAME_INDEX) {
+                               CDEBUG(D_VFSTRACE,
+                                      "%s: Invalid access mode (%d) for %s\n",
+                                      sbi->ll_fsname, ladvise->lla_ahead_mode,
+                                      ladvise_names[ladvise->lla_advice]);
+                               GOTO(out_ladvise, rc = -EINVAL);
+                       }
+
+                       /* Currently we only support stat-ahead operations. */
+                       if (!(ladvise->lla_access_flags & ACCESS_FL_STAT)) {
+                               CDEBUG(D_VFSTRACE,
+                                      "%s: Invalid access flags (%x) for %s\n",
+                                      sbi->ll_fsname,
+                                      ladvise->lla_access_flags,
+                                      ladvise_names[ladvise->lla_advice]);
+                               GOTO(out_ladvise, rc = -EINVAL);
+                       }
+
+                       rc = ll_ioctl_ahead(file, ladvise);
+                       break;
+               default:
+                       rc = -EINVAL;
+               }
+out_ladvise:
+               OBD_FREE_PTR(ladvise);
+               RETURN(rc);
+       }
        case LL_IOC_PCC_DETACH_BY_FID: {
                struct lu_pcc_detach_fid *detach;
                struct lu_fid *fid;
index d365b0b..bf8192a 100644 (file)
@@ -414,7 +414,8 @@ int ll_file_release(struct inode *inode, struct file *file)
 
        /* The last ref on @file, maybe not the the owner pid of statahead,
         * because parent and child process can share the same file handle. */
-       if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd)
+       if (S_ISDIR(inode->i_mode) &&
+           (lli->lli_opendir_key == fd || fd->fd_sai))
                ll_deauthorize_statahead(inode, fd);
 
        if (is_root_inode(inode)) {
index 01d4bb8..ce933ff 100644 (file)
@@ -118,6 +118,13 @@ struct ll_trunc_sem {
        atomic_t        ll_trunc_waiters;
 };
 
+enum ll_sa_pattern {
+       LSA_PATTERN_NONE        = 0x0000,
+       LSA_PATTERN_LIST        = 0x0001,
+       LSA_PATTERN_FNAME       = 0X0002,
+       LSA_PATTERN_MAX,
+};
+
 struct ll_inode_info {
        __u32                           lli_inode_magic;
        rwlock_t                        lli_lock;
@@ -171,6 +178,7 @@ struct ll_inode_info {
                         * cleanup the dir readahead. */
                        void                           *lli_opendir_key;
                        struct ll_statahead_info       *lli_sai;
+                       struct ll_statahead_context    *lli_sax;
                        /* protect statahead stuff. */
                        spinlock_t                      lli_sa_lock;
                        /* "opendir_pid" is the token when lookup/revalid
@@ -195,6 +203,8 @@ struct ll_inode_info {
                                                        lli_def_lsm_obj_set:1;
                        /* generation for statahead */
                        unsigned int                    lli_sa_generation;
+                       /* access pattern for statahead */
+                       enum ll_sa_pattern              lli_sa_pattern;
                        /* rw lock protects lli_lsm_md */
                        struct rw_semaphore             lli_lsm_sem;
                        /* directory stripe information */
@@ -1050,6 +1060,8 @@ struct ll_file_data {
         * -errno is saved here, and will return to user in close().
         */
        int fd_partial_readdir_rc;
+       /* mdtest unique/shared dir stat mode: per process statahead struct. */
+       struct ll_statahead_info *fd_sai;
 };
 
 void llite_tunables_unregister(void);
@@ -1295,6 +1307,8 @@ int ll_ioctl_fssetxattr(struct inode *inode, unsigned int cmd,
                        void __user *uarg);
 #endif
 int ll_ioctl_project(struct file *file, unsigned int cmd, void __user *uarg);
+int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise);
+
 int ll_lov_setstripe_ea_info(struct inode *inode, struct dentry *dentry,
                             __u64 flags, struct lov_user_md *lum,
                             int lum_size);
@@ -1617,11 +1631,11 @@ void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
 #define LL_SA_BATCH_MAX                1024
 #define LL_SA_BATCH_DEF                64
 
-#define LL_SA_CACHE_BIT         5
+#define LL_SA_CACHE_BIT         6
 #define LL_SA_CACHE_SIZE        (1 << LL_SA_CACHE_BIT)
 #define LL_SA_CACHE_MASK        (LL_SA_CACHE_SIZE - 1)
 
-/* per inode struct, for dir only */
+/* statahead controller, per process struct, for dir only */
 struct ll_statahead_info {
        struct dentry          *sai_dentry;
        atomic_t                sai_refcount;   /* when access this struct, hold
@@ -1654,12 +1668,23 @@ struct ll_statahead_info {
        struct task_struct      *sai_agl_task;  /* AGL thread */
        struct list_head        sai_entries;    /* completed entries */
        struct list_head        sai_agls;       /* AGLs to be sent */
-       struct list_head        sai_cache[LL_SA_CACHE_SIZE];
-       spinlock_t              sai_cache_lock[LL_SA_CACHE_SIZE];
        atomic_t                sai_cache_count; /* entry count in cache */
        struct lu_batch         *sai_bh;
        __u32                   sai_max_batch_count;
        __u64                   sai_index_end;
+
+       __u64                   sai_fstart;
+       __u64                   sai_fend;
+       char                    sai_fname[NAME_MAX];
+};
+
+/* Per inode statahead information */
+struct ll_statahead_context {
+       struct inode            *sax_inode;
+       atomic_t                 sax_refcount;
+       /* Local dcache */
+       struct list_head         sax_cache[LL_SA_CACHE_SIZE];
+       spinlock_t               sax_cache_lock[LL_SA_CACHE_SIZE];
 };
 
 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentry,
@@ -1724,10 +1749,6 @@ dentry_may_statahead(struct inode *dir, struct dentry *dentry)
        if (!lli->lli_sa_enabled)
                return false;
 
-       /* not the same process, don't statahead */
-       if (lli->lli_opendir_pid != current->pid)
-               return false;
-
        /*
         * When stating a dentry, kernel may trigger 'revalidate' or 'lookup'
         * multiple times, eg. for 'getattr', 'getxattr' and etc.
@@ -1747,6 +1768,13 @@ dentry_may_statahead(struct inode *dir, struct dentry *dentry)
            ldd->lld_sa_generation == lli->lli_sa_generation)
                return false;
 
+       if (lli->lli_sa_pattern == LSA_PATTERN_FNAME)
+               return true;
+
+       /* not the same process, don't statahead */
+       if (lli->lli_opendir_pid != current->pid)
+               return false;
+
        return true;
 }
 
index 0253b0d..29e32e9 100644 (file)
@@ -60,23 +60,25 @@ typedef enum {
  */
 struct sa_entry {
        /* link into sai_entries */
-       struct list_head        se_list;
+       struct list_head                 se_list;
        /* link into sai hash table locally */
-       struct list_head        se_hash;
+       struct list_head                 se_hash;
        /* entry index in the sai */
-       __u64                   se_index;
+       __u64                            se_index;
        /* low layer ldlm lock handle */
-       __u64                   se_handle;
+       __u64                            se_handle;
        /* entry status */
-       se_state_t              se_state;
+       se_state_t                       se_state;
        /* entry size, contains name */
-       int                     se_size;
+       int                              se_size;
        /* pointer to the target inode */
-       struct inode           *se_inode;
+       struct inode                    *se_inode;
+       /* pointer to @sai per process struct */
+       struct ll_statahead_info        *se_sai;
        /* entry name */
-       struct qstr             se_qstr;
+       struct qstr                      se_qstr;
        /* entry fid */
-       struct lu_fid           se_fid;
+       struct lu_fid                    se_fid;
 };
 
 static unsigned int sai_generation;
@@ -101,26 +103,26 @@ static inline int sa_hash(int val)
        return val & LL_SA_CACHE_MASK;
 }
 
-/* hash entry into sai_cache */
+/* hash entry into sax_cache */
 static inline void
-sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
 {
        int i = sa_hash(entry->se_qstr.hash);
 
-       spin_lock(&sai->sai_cache_lock[i]);
-       list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
-       spin_unlock(&sai->sai_cache_lock[i]);
+       spin_lock(&ctx->sax_cache_lock[i]);
+       list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
+       spin_unlock(&ctx->sax_cache_lock[i]);
 }
 
 /* unhash entry from sai_cache */
 static inline void
-sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
 {
        int i = sa_hash(entry->se_qstr.hash);
 
-       spin_lock(&sai->sai_cache_lock[i]);
+       spin_lock(&ctx->sax_cache_lock[i]);
        list_del_init(&entry->se_hash);
-       spin_unlock(&sai->sai_cache_lock[i]);
+       spin_unlock(&ctx->sax_cache_lock[i]);
 }
 
 static inline int agl_should_run(struct ll_statahead_info *sai,
@@ -196,7 +198,8 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
 
        ENTRY;
 
-       entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
+       entry_size = sizeof(struct sa_entry) +
+                    round_up(len + 1 /* for trailing NUL */, 4);
        OBD_ALLOC(entry, entry_size);
        if (unlikely(!entry))
                RETURN(ERR_PTR(-ENOMEM));
@@ -205,6 +208,7 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
               len, name, entry, index);
 
        entry->se_index = index;
+       entry->se_sai = sai;
 
        entry->se_state = SA_ENTRY_INIT;
        entry->se_size = entry_size;
@@ -214,13 +218,14 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
        entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
        entry->se_qstr.len = len;
        entry->se_qstr.name = dname;
-       entry->se_fid = *fid;
 
-       lli = ll_i2info(sai->sai_dentry->d_inode);
+       if (fid)
+               entry->se_fid = *fid;
 
+       lli = ll_i2info(sai->sai_dentry->d_inode);
        spin_lock(&lli->lli_sa_lock);
        INIT_LIST_HEAD(&entry->se_list);
-       sa_rehash(sai, entry);
+       sa_rehash(lli->lli_sax, entry);
        spin_unlock(&lli->lli_sa_lock);
 
        atomic_inc(&sai->sai_cache_count);
@@ -229,7 +234,7 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
 }
 
 /* free sa_entry, which should have been unhashed and not in any list */
-static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
+static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
 {
        CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
               entry->se_qstr.len, entry->se_qstr.name, entry,
@@ -239,7 +244,6 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
        LASSERT(sa_unhashed(entry));
 
        OBD_FREE(entry, entry->se_size);
-       atomic_dec(&sai->sai_cache_count);
 }
 
 /*
@@ -247,39 +251,48 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
  * only scanner can remove the entry from cache.
  */
 static struct sa_entry *
-sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
+sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
 {
        struct sa_entry *entry;
        int i = sa_hash(qstr->hash);
 
-       list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
+       spin_lock(&ctx->sax_cache_lock[i]);
+       list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
                if (entry->se_qstr.hash == qstr->hash &&
                    entry->se_qstr.len == qstr->len &&
-                   memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
+                   memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
+                       spin_unlock(&ctx->sax_cache_lock[i]);
                        return entry;
+               }
        }
+       spin_unlock(&ctx->sax_cache_lock[i]);
        return NULL;
 }
 
 /* unhash and unlink sa_entry, and then free it */
 static inline void
-sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
 {
-       struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
+       struct inode *dir = sai->sai_dentry->d_inode;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_context *ctx = lli->lli_sax;
 
        LASSERT(!sa_unhashed(entry));
        LASSERT(!list_empty(&entry->se_list));
        LASSERT(sa_ready(entry));
 
-       sa_unhash(sai, entry);
+       sa_unhash(ctx, entry);
 
-       spin_lock(&lli->lli_sa_lock);
+       if (!locked)
+               spin_lock(&lli->lli_sa_lock);
        list_del_init(&entry->se_list);
        spin_unlock(&lli->lli_sa_lock);
 
        iput(entry->se_inode);
-
-       sa_free(sai, entry);
+       atomic_dec(&sai->sai_cache_count);
+       sa_free(ctx, entry);
+       if (locked)
+               spin_lock(&lli->lli_sa_lock);
 }
 
 /* called by scanner after use, sa_entry will be killed */
@@ -287,7 +300,7 @@ static void
 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct sa_entry *tmp, *next;
+       struct sa_entry *tmp;
        bool wakeup = false;
 
        if (entry && entry->se_state == SA_ENTRY_SUCC) {
@@ -309,23 +322,30 @@ sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
                } else {
                        wakeup = true;
                }
-       } else {
+       } else if (sai) {
                sai->sai_miss++;
                sai->sai_consecutive_miss++;
                wakeup = true;
        }
 
        if (entry)
-               sa_kill(sai, entry);
+               sa_kill(sai, entry, false);
 
-       /*
-        * kill old completed entries, only scanner process does this, no need
-        * to lock
-        */
-       list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
-               if (!is_omitted_entry(sai, tmp->se_index))
-                       break;
-               sa_kill(sai, tmp);
+       if (sai) {
+               /*
+                * kill old completed entries. Maybe kicking old entries can
+                * be ignored?
+                */
+               spin_lock(&lli->lli_sa_lock);
+               while ((tmp = list_first_entry_or_null(&sai->sai_entries,
+                               struct sa_entry, se_list))) {
+                       if (!is_omitted_entry(sai, tmp->se_index))
+                               break;
+
+                       /* ll_sa_lock is dropped by sa_kill(), restart list */
+                       sa_kill(sai, tmp, true);
+               }
+               spin_unlock(&lli->lli_sa_lock);
        }
 
        spin_lock(&lli->lli_sa_lock);
@@ -472,12 +492,79 @@ static void ll_agl_add(struct ll_statahead_info *sai,
        }
 }
 
+/* Allocate sax */
+static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
+{
+       struct ll_statahead_context *ctx;
+       int i;
+
+       ENTRY;
+
+       OBD_ALLOC_PTR(ctx);
+       if (ctx == NULL)
+               RETURN(NULL);
+
+       ctx->sax_inode = igrab(dir);
+       atomic_set(&ctx->sax_refcount, 1);
+       for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
+               INIT_LIST_HEAD(&ctx->sax_cache[i]);
+               spin_lock_init(&ctx->sax_cache_lock[i]);
+       }
+
+       RETURN(ctx);
+}
+
+static inline void ll_sax_free(struct ll_statahead_context *ctx)
+{
+       LASSERT(ctx->sax_inode != NULL);
+       iput(ctx->sax_inode);
+       OBD_FREE_PTR(ctx);
+}
+
+static inline void __ll_sax_get(struct ll_statahead_context *ctx)
+{
+       atomic_inc(&ctx->sax_refcount);
+}
+
+static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
+{
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_context *ctx = NULL;
+
+       spin_lock(&lli->lli_sa_lock);
+       ctx = lli->lli_sax;
+       if (ctx)
+               __ll_sax_get(ctx);
+       spin_unlock(&lli->lli_sa_lock);
+
+       return ctx;
+}
+
+static inline void ll_sax_put(struct inode *dir,
+                             struct ll_statahead_context *ctx)
+{
+       struct ll_inode_info *lli = ll_i2info(dir);
+
+       if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
+               lli->lli_sai = NULL;
+               lli->lli_sax = NULL;
+               if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+                       lli->lli_opendir_key = NULL;
+                       lli->lli_opendir_pid = 0;
+                       lli->lli_sa_enabled = 0;
+               }
+               lli->lli_sa_pattern = LSA_PATTERN_NONE;
+               spin_unlock(&lli->lli_sa_lock);
+
+               ll_sax_free(ctx);
+       }
+}
+
 /* allocate sai */
 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
 {
        struct ll_statahead_info *sai;
        struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
-       int i;
 
        ENTRY;
 
@@ -494,10 +581,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
        INIT_LIST_HEAD(&sai->sai_entries);
        INIT_LIST_HEAD(&sai->sai_agls);
 
-       for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
-               INIT_LIST_HEAD(&sai->sai_cache[i]);
-               spin_lock_init(&sai->sai_cache_lock[i]);
-       }
        atomic_set(&sai->sai_cache_count, 0);
 
        spin_lock(&sai_generation_lock);
@@ -517,21 +600,10 @@ static inline void ll_sai_free(struct ll_statahead_info *sai)
        OBD_FREE_PTR(sai);
 }
 
-/*
- * take refcount of sai if sai for @dir exists, which means statahead is on for
- * this directory.
- */
-static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
+static inline struct ll_statahead_info *
+__ll_sai_get(struct ll_statahead_info *sai)
 {
-       struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = NULL;
-
-       spin_lock(&lli->lli_sa_lock);
-       sai = lli->lli_sai;
-       if (sai)
-               atomic_inc(&sai->sai_refcount);
-       spin_unlock(&lli->lli_sa_lock);
-
+       atomic_inc(&sai->sai_refcount);
        return sai;
 }
 
@@ -544,8 +616,8 @@ static void ll_sai_put(struct ll_statahead_info *sai)
        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 
        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
-               struct sa_entry *entry, *next;
                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
+               struct sa_entry *entry, *next;
 
                lli->lli_sai = NULL;
                spin_unlock(&lli->lli_sa_lock);
@@ -556,7 +628,7 @@ static void ll_sai_put(struct ll_statahead_info *sai)
 
                list_for_each_entry_safe(entry, next, &sai->sai_entries,
                                         se_list)
-                       sa_kill(sai, entry);
+                       sa_kill(sai, entry, false);
 
                LASSERT(atomic_read(&sai->sai_cache_count) == 0);
                LASSERT(agl_list_empty(sai));
@@ -677,7 +749,7 @@ static void ll_statahead_interpret_work(struct work_struct *work)
        struct req_capsule *pill = item->mop_pill;
        struct inode *dir = item->mop_dir;
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
+       struct ll_statahead_info *sai;
        struct lookup_intent *it;
        struct sa_entry *entry;
        struct mdt_body *body;
@@ -689,6 +761,7 @@ static void ll_statahead_interpret_work(struct work_struct *work)
        entry = (struct sa_entry *)item->mop_cbdata;
        LASSERT(entry->se_handle != 0);
 
+       sai = entry->se_sai;
        it = &item->mop_it;
        body = req_capsule_server_get(pill, &RMF_MDT_BODY);
        if (!body)
@@ -696,7 +769,8 @@ static void ll_statahead_interpret_work(struct work_struct *work)
 
        child = entry->se_inode;
        /* revalidate; unlinked and re-created with the same name */
-       if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+       if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
+                    !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
                if (child) {
                        entry->se_inode = NULL;
                        iput(child);
@@ -764,9 +838,9 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc)
        struct lookup_intent *it = &item->mop_it;
        struct inode *dir = item->mop_dir;
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
        struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
        struct work_struct *work = &item->mop_work;
+       struct ll_statahead_info *sai;
        struct mdt_body *body;
        struct inode *child;
        __u64 handle = 0;
@@ -780,8 +854,9 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc)
         * because statahead thread will wait for all inflight RPC to finish,
         * sai should be always valid, no need to refcount
         */
-       LASSERT(sai != NULL);
        LASSERT(entry != NULL);
+       sai = entry->se_sai;
+       LASSERT(sai != NULL);
 
        CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
               entry->se_qstr.len, entry->se_qstr.name, rc);
@@ -794,8 +869,14 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc)
                GOTO(out, rc = -EFAULT);
 
        child = entry->se_inode;
-       /* revalidate; unlinked and re-created with the same name */
-       if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+       /*
+        * revalidate; unlinked and re-created with the same name.
+        * exclude the case where FID is zero as it was from statahead with
+        * regularized file name pattern and had no idea for the FID of the
+        * children file.
+        */
+       if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
+                    !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
                if (child) {
                        entry->se_inode = NULL;
                        iput(child);
@@ -839,9 +920,9 @@ out:
        RETURN(rc);
 }
 
-static inline int sa_getattr(struct inode *dir, struct md_op_item *item)
+static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
+                            struct md_op_item *item)
 {
-       struct ll_statahead_info *sai = ll_i2info(dir)->lli_sai;
        int rc;
 
        if (sa_has_batch_handle(sai))
@@ -864,7 +945,7 @@ static int sa_lookup(struct inode *dir, struct sa_entry *entry)
        if (IS_ERR(item))
                RETURN(PTR_ERR(item));
 
-       rc = sa_getattr(dir, item);
+       rc = sa_getattr(entry->se_sai, dir, item);
        if (rc < 0)
                sa_fini_data(item);
 
@@ -909,7 +990,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
                RETURN(1);
        }
 
-       rc = sa_getattr(dir, item);
+       rc = sa_getattr(entry->se_sai, dir, item);
        if (rc < 0) {
                entry->se_inode = NULL;
                iput(inode);
@@ -920,12 +1001,10 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
 }
 
 /* async stat for file with @name */
-static void sa_statahead(struct dentry *parent, const char *name, int len,
-                        const struct lu_fid *fid)
+static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
+                        const char *name, int len, const struct lu_fid *fid)
 {
        struct inode *dir = parent->d_inode;
-       struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
        struct dentry *dentry = NULL;
        struct sa_entry *entry;
        int rc;
@@ -964,15 +1043,15 @@ static void sa_statahead(struct dentry *parent, const char *name, int len,
 /* async glimpse (agl) thread main function */
 static int ll_agl_thread(void *arg)
 {
-       struct dentry *parent = (struct dentry *)arg;
-       struct inode *dir = parent->d_inode;
-       struct ll_inode_info *plli = ll_i2info(dir);
-       struct ll_inode_info *clli;
        /*
         * We already own this reference, so it is safe to take it
         * without a lock.
         */
-       struct ll_statahead_info *sai = plli->lli_sai;
+       struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
+       struct dentry *parent = sai->sai_dentry;
+       struct inode *dir = parent->d_inode;
+       struct ll_inode_info *plli = ll_i2info(dir);
+       struct ll_inode_info *clli;
 
        ENTRY;
 
@@ -1047,7 +1126,7 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
               sai, parent);
 
        plli = ll_i2info(parent->d_inode);
-       task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
+       task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
                                      plli->lli_opendir_pid);
        if (IS_ERR(task)) {
                CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
@@ -1056,26 +1135,23 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
        sai->sai_agl_task = task;
        atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
        /* Get an extra reference that the thread holds */
-       ll_sai_get(d_inode(parent));
+       __ll_sai_get(sai);
 
        wake_up_process(task);
 
        EXIT;
 }
 
-/* statahead thread main function */
-static int ll_statahead_thread(void *arg)
+static int ll_statahead_by_list(struct dentry *parent)
 {
-       struct dentry *parent = (struct dentry *)arg;
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct ll_statahead_info *sai = lli->lli_sai;
-       int first = 0;
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct md_op_data *op_data;
        struct page *page = NULL;
-       struct lu_batch *bh = NULL;
        __u64 pos = 0;
+       int first = 0;
        int rc = 0;
 
        ENTRY;
@@ -1083,18 +1159,9 @@ static int ll_statahead_thread(void *arg)
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
               sai, parent);
 
-       sai->sai_max_batch_count = sbi->ll_sa_batch_max;
-       if (sai->sai_max_batch_count) {
-               bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
-                                    sai->sai_max_batch_count);
-               if (IS_ERR(bh))
-                       GOTO(out_stop_agl, rc = PTR_ERR(bh));
-       }
-
-       sai->sai_bh = bh;
        OBD_ALLOC_PTR(op_data);
        if (!op_data)
-               GOTO(out, rc = -ENOMEM);
+               RETURN(-ENOMEM);
 
        /* matches smp_store_release() in ll_deauthorize_statahead() */
        while (pos != MDS_DIR_END_OFF && smp_load_acquire(&sai->sai_task)) {
@@ -1222,7 +1289,7 @@ static int ll_statahead_thread(void *arg)
                                namelen = lltr.len;
                        }
 
-                       sa_statahead(parent, name, namelen, &fid);
+                       sa_statahead(sai, parent, name, namelen, &fid);
                        llcrypt_fname_free_buffer(&lltr);
                }
 
@@ -1243,6 +1310,117 @@ static int ll_statahead_thread(void *arg)
        }
        ll_finish_md_op_data(op_data);
 
+       RETURN(rc);
+}
+
+static int ll_statahead_by_fname(struct ll_statahead_info *sai,
+                                struct dentry *parent)
+{
+       struct inode *dir = parent->d_inode;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       size_t max_len;
+       size_t len;
+       char *fname;
+       char *ptr;
+       int rc = 0;
+       __u64 i = 0;
+
+       ENTRY;
+
+       CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
+              sbi->ll_fsname, parent, sai->sai_fname);
+
+       OBD_ALLOC(fname, NAME_MAX);
+       if (fname == NULL)
+               RETURN(-ENOMEM);
+
+       len = strlen(sai->sai_fname);
+       memcpy(fname, sai->sai_fname, len);
+       max_len = sizeof(sai->sai_fname) - len;
+       ptr = fname + len;
+
+       /* matches smp_store_release() in ll_deauthorize_statahead() */
+       while (smp_load_acquire(&sai->sai_task)) {
+               size_t numlen;
+
+               numlen = snprintf(ptr, max_len, "%llu",
+                                 sai->sai_fstart + i);
+
+               while (({set_current_state(TASK_IDLE);
+                        /*
+                         * matches smp_store_release() in
+                         * ll_deauthorize_statahead()
+                         */
+                        smp_load_acquire(&sai->sai_task); })) {
+                       spin_lock(&lli->lli_agl_lock);
+                       while (sa_sent_full(sai) && !agl_list_empty(sai)) {
+                               struct ll_inode_info *clli;
+
+                               __set_current_state(TASK_RUNNING);
+                               clli = agl_first_entry(sai);
+                               list_del_init(&clli->lli_agl_list);
+                               spin_unlock(&lli->lli_agl_lock);
+
+                               ll_agl_trigger(&clli->lli_vfs_inode, sai);
+                               cond_resched();
+                               spin_lock(&lli->lli_agl_lock);
+                       }
+                       spin_unlock(&lli->lli_agl_lock);
+
+                       if (!sa_sent_full(sai))
+                               break;
+                       schedule();
+               }
+               __set_current_state(TASK_RUNNING);
+
+               sa_statahead(sai, parent, fname, len + numlen, NULL);
+               if (++i >= sai->sai_fend)
+                       break;
+       }
+
+       OBD_FREE(fname, NAME_MAX);
+       RETURN(rc);
+}
+
+/* statahead thread main function */
+static int ll_statahead_thread(void *arg)
+{
+       struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
+       struct dentry *parent = sai->sai_dentry;
+       struct inode *dir = parent->d_inode;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       struct lu_batch *bh = NULL;
+       int rc = 0;
+
+       ENTRY;
+
+       CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
+              sai, parent);
+
+       sai->sai_max_batch_count = sbi->ll_sa_batch_max;
+       if (sai->sai_max_batch_count) {
+               bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
+                                    sai->sai_max_batch_count);
+               if (IS_ERR(bh))
+                       GOTO(out_stop_agl, rc = PTR_ERR(bh));
+       }
+
+       sai->sai_bh = bh;
+
+       switch (lli->lli_sa_pattern) {
+       case LSA_PATTERN_LIST:
+               rc = ll_statahead_by_list(parent);
+               break;
+       case LSA_PATTERN_FNAME:
+               rc = ll_statahead_by_fname(sai, parent);
+               break;
+       default:
+               rc = -EFAULT;
+               break;
+       }
+
        if (rc < 0) {
                spin_lock(&lli->lli_sa_lock);
                sai->sai_task = NULL;
@@ -1264,7 +1442,7 @@ static int ll_statahead_thread(void *arg)
        __set_current_state(TASK_RUNNING);
 
        EXIT;
-out:
+
        if (bh) {
                rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
                sai->sai_bh = NULL;
@@ -1281,8 +1459,8 @@ out_stop_agl:
                /* in case we're not woken up, timeout wait */
                msleep(125);
 
-       CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
-              sbi->ll_fsname, sai, parent);
+       CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
+              sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
 
        spin_lock(&lli->lli_sa_lock);
        sai->sai_task = NULL;
@@ -1293,6 +1471,7 @@ out_stop_agl:
        atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
 
        ll_sai_put(sai);
+       ll_sax_put(dir, lli->lli_sax);
 
        return rc;
 }
@@ -1308,7 +1487,6 @@ void ll_authorize_statahead(struct inode *dir, void *key)
                 * if lli_sai is not NULL, it means previous statahead is not
                 * finished yet, we'd better not start a new statahead for now.
                 */
-               LASSERT(lli->lli_opendir_pid == 0);
                lli->lli_opendir_key = key;
                lli->lli_opendir_pid = current->pid;
                lli->lli_sa_enabled = 1;
@@ -1316,6 +1494,29 @@ void ll_authorize_statahead(struct inode *dir, void *key)
        spin_unlock(&lli->lli_sa_lock);
 }
 
+static void ll_deauthorize_statahead_fname(struct inode *dir, void *key)
+{
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_file_data *fd = (struct ll_file_data *)key;
+       struct ll_statahead_info *sai = fd->fd_sai;
+
+       if (sai == NULL)
+               return;
+
+       spin_lock(&lli->lli_sa_lock);
+       if (sai->sai_task) {
+               struct task_struct *task = sai->sai_task;
+
+               sai->sai_task = NULL;
+               wake_up_process(task);
+       }
+       fd->fd_sai = NULL;
+       spin_unlock(&lli->lli_sa_lock);
+       ll_sai_put(sai);
+       LASSERT(lli->lli_sax != NULL);
+       ll_sax_put(dir, lli->lli_sax);
+}
+
 /*
  * deauthorize opened dir handle @key to statahead, and notify statahead thread
  * to quit if it's running.
@@ -1325,12 +1526,17 @@ void ll_deauthorize_statahead(struct inode *dir, void *key)
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai;
 
-       LASSERT(lli->lli_opendir_key == key);
        LASSERT(lli->lli_opendir_pid != 0);
 
        CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
               PFID(&lli->lli_fid));
 
+       if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+               ll_deauthorize_statahead_fname(dir, key);
+               return;
+       }
+
+       LASSERT(lli->lli_opendir_key == key);
        spin_lock(&lli->lli_sa_lock);
        lli->lli_opendir_key = NULL;
        lli->lli_opendir_pid = 0;
@@ -1523,17 +1729,18 @@ out:
  * \retval             negative number upon error
  */
 static int revalidate_statahead_dentry(struct inode *dir,
-                                      struct ll_statahead_info *sai,
+                                      struct ll_statahead_context *ctx,
                                       struct dentry **dentryp,
                                       bool unplug)
 {
        struct sa_entry *entry = NULL;
        struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_info *sai = lli->lli_sai;
        int rc = 0;
 
        ENTRY;
 
-       if ((*dentryp)->d_name.name[0] == '.') {
+       if (sai && (*dentryp)->d_name.name[0] == '.') {
                if (sai->sai_ls_all ||
                    sai->sai_miss_hidden >= sai->sai_skip_hidden) {
                        /*
@@ -1563,10 +1770,16 @@ static int revalidate_statahead_dentry(struct inode *dir,
        if (unplug)
                GOTO(out, rc = 1);
 
-       entry = sa_get(sai, &(*dentryp)->d_name);
+       entry = sa_get(ctx, &(*dentryp)->d_name);
        if (!entry)
                GOTO(out, rc = -EAGAIN);
 
+       if (lli->lli_sa_pattern == LSA_PATTERN_LIST)
+               LASSERT(sai == entry->se_sai);
+       else if (lli->lli_sa_pattern == LSA_PATTERN_FNAME)
+               sai = entry->se_sai;
+
+       LASSERT(sai != NULL);
        if (!sa_ready(entry)) {
                spin_lock(&lli->lli_sa_lock);
                sai->sai_index_wait = entry->se_index;
@@ -1666,6 +1879,7 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
        int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = NULL;
+       struct ll_statahead_context *ctx = NULL;
        struct dentry *parent = dentry->d_parent;
        struct task_struct *task;
        struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
@@ -1691,6 +1905,10 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
        if (!sai)
                GOTO(out, rc = -ENOMEM);
 
+       ctx = ll_sax_alloc(dir);
+       if (!ctx)
+               GOTO(out, rc = -ENOMEM);
+
        sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
 
        /*
@@ -1700,17 +1918,20 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
         */
        spin_lock(&lli->lli_sa_lock);
        if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
-                    lli->lli_opendir_pid != current->pid)) {
+                    lli->lli_opendir_pid != current->pid ||
+                    lli->lli_sa_pattern != LSA_PATTERN_NONE)) {
                spin_unlock(&lli->lli_sa_lock);
                GOTO(out, rc = -EPERM);
        }
        lli->lli_sai = sai;
+       lli->lli_sax = ctx;
+       lli->lli_sa_pattern = LSA_PATTERN_LIST;
        spin_unlock(&lli->lli_sa_lock);
 
        CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
               current->pid, parent);
 
-       task = kthread_create_on_node(ll_statahead_thread, parent, node,
+       task = kthread_create_on_node(ll_statahead_thread, sai, node,
                                      "ll_sa_%u", lli->lli_opendir_pid);
        if (IS_ERR(task)) {
                spin_lock(&lli->lli_sa_lock);
@@ -1746,6 +1967,10 @@ out:
 
        if (sai)
                ll_sai_free(sai);
+
+       if (ctx)
+               ll_sax_free(ctx);
+
        if (first != LS_NOT_FIRST_DE)
                atomic_dec(&sbi->ll_sa_running);
 
@@ -1758,9 +1983,11 @@ out:
 static inline bool ll_statahead_started(struct inode *dir, bool agl)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_context *ctx;
        struct ll_statahead_info *sai;
 
        spin_lock(&lli->lli_sa_lock);
+       ctx = lli->lli_sax;
        sai = lli->lli_sai;
        if (sai && (sai->sai_agl_task != NULL) != agl)
                CDEBUG(D_READA,
@@ -1769,7 +1996,7 @@ static inline bool ll_statahead_started(struct inode *dir, bool agl)
                       sai->sai_agl_task != NULL, agl);
        spin_unlock(&lli->lli_sa_lock);
 
-       return !!sai;
+       return !!ctx;
 }
 
 /**
@@ -1810,15 +2037,162 @@ int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
                            bool unplug)
 {
-       struct ll_statahead_info *sai;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_context *ctx;
+       struct ll_statahead_info *sai = NULL;
        int rc = 0;
 
-       sai = ll_sai_get(dir);
-       if (sai) {
-               rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
+       spin_lock(&lli->lli_sa_lock);
+       ctx = lli->lli_sax;
+       if (ctx) {
+               sai = lli->lli_sai;
+               if (sai) {
+                       atomic_inc(&sai->sai_refcount);
+               } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
+                       spin_unlock(&lli->lli_sa_lock);
+                       return 0;
+               }
+               __ll_sax_get(ctx);
+       }
+       spin_unlock(&lli->lli_sa_lock);
+       if (ctx) {
+               rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
                CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
                       *dentryp, rc);
-               ll_sai_put(sai);
+               if (sai)
+                       ll_sai_put(sai);
+               ll_sax_put(dir, ctx);
        }
        return rc;
 }
+
+int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
+{
+       int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
+       struct ll_file_data *fd = file->private_data;
+       struct dentry *dentry = file_dentry(file);
+       struct inode *dir = dentry->d_inode;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       struct ll_statahead_info *sai = NULL;
+       struct ll_statahead_context *ctx = NULL;
+       struct task_struct *task;
+       bool agl = true;
+       int rc;
+
+       ENTRY;
+
+       if (sbi->ll_sa_max == 0)
+               RETURN(0);
+
+       if (!S_ISDIR(dir->i_mode))
+               RETURN(-EINVAL);
+
+       if (fd->fd_sai) {
+               rc = -EALREADY;
+               CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
+                     sbi->ll_fsname, dentry, rc);
+               RETURN(rc);
+       }
+
+       if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
+                                      sbi->ll_sa_running_max)) {
+               CDEBUG(D_READA,
+                      "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
+               GOTO(out, rc = -EMFILE);
+       }
+
+       sai = ll_sai_alloc(dentry);
+       if (sai == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       sai->sai_fstart = ladvise->lla_start;
+       sai->sai_fend = ladvise->lla_end;
+       sai->sai_ls_all = 0;
+       sai->sai_max = sbi->ll_sa_max;
+       strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
+
+       ctx = ll_sax_get(dir);
+       if (ctx == NULL) {
+               ctx = ll_sax_alloc(dir);
+               if (ctx == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               spin_lock(&lli->lli_sa_lock);
+               if (unlikely(lli->lli_sax)) {
+                       struct ll_statahead_context *tmp = ctx;
+
+                       if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
+                           lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+                               lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+                               ctx = lli->lli_sax;
+                               __ll_sax_get(ctx);
+                               fd->fd_sai = __ll_sai_get(sai);
+                               rc = 0;
+                       } else {
+                               rc = -EINVAL;
+                               CWARN("%s: pattern %X is not FNAME: rc = %d\n",
+                                     sbi->ll_fsname, lli->lli_sa_pattern, rc);
+                       }
+
+                       spin_unlock(&lli->lli_sa_lock);
+                       ll_sax_free(tmp);
+                       if (rc)
+                               GOTO(out, rc);
+               } else {
+                       lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+                       lli->lli_sax = ctx;
+                       fd->fd_sai = __ll_sai_get(sai);
+                       spin_unlock(&lli->lli_sa_lock);
+               }
+       } else {
+               spin_lock(&lli->lli_sa_lock);
+               if (!(lli->lli_sa_pattern == LSA_PATTERN_FNAME ||
+                     lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
+                       spin_unlock(&lli->lli_sa_lock);
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+               fd->fd_sai = __ll_sai_get(sai);
+               spin_unlock(&lli->lli_sa_lock);
+       }
+
+       __ll_sax_get(ctx);
+       CDEBUG(D_READA,
+              "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
+              current->pid, dentry, sai, ctx);
+
+       task = kthread_create_on_node(ll_statahead_thread, sai, node,
+                                     "ll_sa_%u", current->pid);
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CERROR("%s: cannot start ll_sa thread: rc = %d\n",
+                      sbi->ll_fsname, rc);
+               GOTO(out, rc);
+       }
+
+       if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
+               ll_start_agl(dentry, sai);
+
+       atomic_inc(&sbi->ll_sa_total);
+       sai->sai_task = task;
+       wake_up_process(task);
+
+       RETURN(0);
+out:
+       if (fd->fd_sai) {
+               ll_sai_put(sai);
+               ll_sax_put(dir, ctx);
+               fd->fd_sai = NULL;
+       }
+
+       if (sai)
+               ll_sai_free(sai);
+
+       if (ctx)
+               ll_sax_free(ctx);
+
+       atomic_dec(&sbi->ll_sa_running);
+       RETURN(rc);
+}
index 7f68caf..3be890b 100644 (file)
@@ -3826,23 +3826,31 @@ static int lmv_intent_getattr_async(struct obd_export *exp,
 
        ENTRY;
 
-       if (!fid_is_sane(&op_data->op_fid2))
+       if (!(fid_is_sane(&op_data->op_fid2) ||
+             fid_is_zero(&op_data->op_fid2)))
                RETURN(-EINVAL);
 
        ptgt = lmv_locate_tgt(lmv, op_data);
        if (IS_ERR(ptgt))
                RETURN(PTR_ERR(ptgt));
 
-       ctgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
-       if (IS_ERR(ctgt))
-               RETURN(PTR_ERR(ctgt));
-
        /*
-        * remote object needs two RPCs to lookup and getattr, considering the
-        * complexity don't support statahead for now.
+        * Zeroed FID @op_fid2 means that the intent getattr() comes from
+        * statahead by regularized file names. Currently only do statahead
+        * for the children files located same as the parent directory.
         */
-       if (ctgt != ptgt)
-               RETURN(-EREMOTE);
+       if (!fid_is_zero(&op_data->op_fid2)) {
+               ctgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
+               if (IS_ERR(ctgt))
+                       RETURN(PTR_ERR(ctgt));
+
+               /*
+                * remote object needs two RPCs to lookup and getattr,
+                * considering the complexity don't support statahead for now.
+                */
+               if (ctgt != ptgt)
+                       RETURN(-EREMOTE);
+       }
 
        rc = md_intent_getattr_async(ptgt->ltd_exp, item);
 
@@ -4087,13 +4095,27 @@ lmv_batch_locate_tgt(struct lmv_obd *lmv, struct md_op_item *item)
        case MD_OP_GETATTR: {
                struct lmv_tgt_desc *ptgt;
 
-               if (!fid_is_sane(&op_data->op_fid2))
+               if (!(fid_is_sane(&op_data->op_fid2) ||
+                     fid_is_zero(&op_data->op_fid2)))
                        RETURN(ERR_PTR(-EINVAL));
 
                ptgt = lmv_locate_tgt(lmv, op_data);
                if (IS_ERR(ptgt))
                        RETURN(ptgt);
 
+               /*
+                * Zeroed @op_fid2 means that it is a statahead populating call
+                * in the file name pattern which is using file name format to
+                * prefetch the attributes. Thus it has no idea about the FID of
+                * the children file. The children file is considered to be
+                * located on the same storage target with the parent directory
+                * or the stripped directory.
+                */
+               if (fid_is_zero(&op_data->op_fid2)) {
+                       tgt = ptgt;
+                       break;
+               }
+
                tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
                if (IS_ERR(tgt))
                        RETURN(tgt);
index f059339..2041ca1 100644 (file)
@@ -2364,6 +2364,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         */
                        if (mdt_object_remote(child))
                                GOTO(out_child, rc = -EREMOTE);
+
                        try_bits |= child_bits;
                        child_bits = 0;
                }