Whamcloud - gitweb
LU-14361 statahead: add statahead advise IOCTL
[fs/lustre-release.git] / lustre / llite / statahead.c
index 0253b0d..29e32e9 100644 (file)
@@ -60,23 +60,25 @@ typedef enum {
  */
 struct sa_entry {
        /* link into sai_entries */
-       struct list_head        se_list;
+       struct list_head                 se_list;
        /* link into sai hash table locally */
-       struct list_head        se_hash;
+       struct list_head                 se_hash;
        /* entry index in the sai */
-       __u64                   se_index;
+       __u64                            se_index;
        /* low layer ldlm lock handle */
-       __u64                   se_handle;
+       __u64                            se_handle;
        /* entry status */
-       se_state_t              se_state;
+       se_state_t                       se_state;
        /* entry size, contains name */
-       int                     se_size;
+       int                              se_size;
        /* pointer to the target inode */
-       struct inode           *se_inode;
+       struct inode                    *se_inode;
+       /* pointer to @sai per process struct */
+       struct ll_statahead_info        *se_sai;
        /* entry name */
-       struct qstr             se_qstr;
+       struct qstr                      se_qstr;
        /* entry fid */
-       struct lu_fid           se_fid;
+       struct lu_fid                    se_fid;
 };
 
 static unsigned int sai_generation;
@@ -101,26 +103,26 @@ static inline int sa_hash(int val)
        return val & LL_SA_CACHE_MASK;
 }
 
-/* hash entry into sai_cache */
+/* hash entry into sax_cache */
 static inline void
-sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
 {
        int i = sa_hash(entry->se_qstr.hash);
 
-       spin_lock(&sai->sai_cache_lock[i]);
-       list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
-       spin_unlock(&sai->sai_cache_lock[i]);
+       spin_lock(&ctx->sax_cache_lock[i]);
+       list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
+       spin_unlock(&ctx->sax_cache_lock[i]);
 }
 
 /* unhash entry from sai_cache */
 static inline void
-sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
 {
        int i = sa_hash(entry->se_qstr.hash);
 
-       spin_lock(&sai->sai_cache_lock[i]);
+       spin_lock(&ctx->sax_cache_lock[i]);
        list_del_init(&entry->se_hash);
-       spin_unlock(&sai->sai_cache_lock[i]);
+       spin_unlock(&ctx->sax_cache_lock[i]);
 }
 
 static inline int agl_should_run(struct ll_statahead_info *sai,
@@ -196,7 +198,8 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
 
        ENTRY;
 
-       entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
+       entry_size = sizeof(struct sa_entry) +
+                    round_up(len + 1 /* for trailing NUL */, 4);
        OBD_ALLOC(entry, entry_size);
        if (unlikely(!entry))
                RETURN(ERR_PTR(-ENOMEM));
@@ -205,6 +208,7 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
               len, name, entry, index);
 
        entry->se_index = index;
+       entry->se_sai = sai;
 
        entry->se_state = SA_ENTRY_INIT;
        entry->se_size = entry_size;
@@ -214,13 +218,14 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
        entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
        entry->se_qstr.len = len;
        entry->se_qstr.name = dname;
-       entry->se_fid = *fid;
 
-       lli = ll_i2info(sai->sai_dentry->d_inode);
+       if (fid)
+               entry->se_fid = *fid;
 
+       lli = ll_i2info(sai->sai_dentry->d_inode);
        spin_lock(&lli->lli_sa_lock);
        INIT_LIST_HEAD(&entry->se_list);
-       sa_rehash(sai, entry);
+       sa_rehash(lli->lli_sax, entry);
        spin_unlock(&lli->lli_sa_lock);
 
        atomic_inc(&sai->sai_cache_count);
@@ -229,7 +234,7 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
 }
 
 /* free sa_entry, which should have been unhashed and not in any list */
-static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
+static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
 {
        CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
               entry->se_qstr.len, entry->se_qstr.name, entry,
@@ -239,7 +244,6 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
        LASSERT(sa_unhashed(entry));
 
        OBD_FREE(entry, entry->se_size);
-       atomic_dec(&sai->sai_cache_count);
 }
 
 /*
@@ -247,39 +251,48 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
  * only scanner can remove the entry from cache.
  */
 static struct sa_entry *
-sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
+sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
 {
        struct sa_entry *entry;
        int i = sa_hash(qstr->hash);
 
-       list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
+       spin_lock(&ctx->sax_cache_lock[i]);
+       list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
                if (entry->se_qstr.hash == qstr->hash &&
                    entry->se_qstr.len == qstr->len &&
-                   memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
+                   memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
+                       spin_unlock(&ctx->sax_cache_lock[i]);
                        return entry;
+               }
        }
+       spin_unlock(&ctx->sax_cache_lock[i]);
        return NULL;
 }
 
 /* unhash and unlink sa_entry, and then free it */
 static inline void
-sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
 {
-       struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
+       struct inode *dir = sai->sai_dentry->d_inode;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_context *ctx = lli->lli_sax;
 
        LASSERT(!sa_unhashed(entry));
        LASSERT(!list_empty(&entry->se_list));
        LASSERT(sa_ready(entry));
 
-       sa_unhash(sai, entry);
+       sa_unhash(ctx, entry);
 
-       spin_lock(&lli->lli_sa_lock);
+       if (!locked)
+               spin_lock(&lli->lli_sa_lock);
        list_del_init(&entry->se_list);
        spin_unlock(&lli->lli_sa_lock);
 
        iput(entry->se_inode);
-
-       sa_free(sai, entry);
+       atomic_dec(&sai->sai_cache_count);
+       sa_free(ctx, entry);
+       if (locked)
+               spin_lock(&lli->lli_sa_lock);
 }
 
 /* called by scanner after use, sa_entry will be killed */
@@ -287,7 +300,7 @@ static void
 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct sa_entry *tmp, *next;
+       struct sa_entry *tmp;
        bool wakeup = false;
 
        if (entry && entry->se_state == SA_ENTRY_SUCC) {
@@ -309,23 +322,30 @@ sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
                } else {
                        wakeup = true;
                }
-       } else {
+       } else if (sai) {
                sai->sai_miss++;
                sai->sai_consecutive_miss++;
                wakeup = true;
        }
 
        if (entry)
-               sa_kill(sai, entry);
+               sa_kill(sai, entry, false);
 
-       /*
-        * kill old completed entries, only scanner process does this, no need
-        * to lock
-        */
-       list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
-               if (!is_omitted_entry(sai, tmp->se_index))
-                       break;
-               sa_kill(sai, tmp);
+       if (sai) {
+               /*
+                * kill old completed entries. Maybe kicking old entries can
+                * be ignored?
+                */
+               spin_lock(&lli->lli_sa_lock);
+               while ((tmp = list_first_entry_or_null(&sai->sai_entries,
+                               struct sa_entry, se_list))) {
+                       if (!is_omitted_entry(sai, tmp->se_index))
+                               break;
+
+                       /* ll_sa_lock is dropped by sa_kill(), restart list */
+                       sa_kill(sai, tmp, true);
+               }
+               spin_unlock(&lli->lli_sa_lock);
        }
 
        spin_lock(&lli->lli_sa_lock);
@@ -472,12 +492,79 @@ static void ll_agl_add(struct ll_statahead_info *sai,
        }
 }
 
+/* Allocate sax */
+static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
+{
+       struct ll_statahead_context *ctx;
+       int i;
+
+       ENTRY;
+
+       OBD_ALLOC_PTR(ctx);
+       if (ctx == NULL)
+               RETURN(NULL);
+
+       ctx->sax_inode = igrab(dir);
+       atomic_set(&ctx->sax_refcount, 1);
+       for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
+               INIT_LIST_HEAD(&ctx->sax_cache[i]);
+               spin_lock_init(&ctx->sax_cache_lock[i]);
+       }
+
+       RETURN(ctx);
+}
+
+static inline void ll_sax_free(struct ll_statahead_context *ctx)
+{
+       LASSERT(ctx->sax_inode != NULL);
+       iput(ctx->sax_inode);
+       OBD_FREE_PTR(ctx);
+}
+
+static inline void __ll_sax_get(struct ll_statahead_context *ctx)
+{
+       atomic_inc(&ctx->sax_refcount);
+}
+
+static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
+{
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_context *ctx = NULL;
+
+       spin_lock(&lli->lli_sa_lock);
+       ctx = lli->lli_sax;
+       if (ctx)
+               __ll_sax_get(ctx);
+       spin_unlock(&lli->lli_sa_lock);
+
+       return ctx;
+}
+
+static inline void ll_sax_put(struct inode *dir,
+                             struct ll_statahead_context *ctx)
+{
+       struct ll_inode_info *lli = ll_i2info(dir);
+
+       if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
+               lli->lli_sai = NULL;
+               lli->lli_sax = NULL;
+               if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+                       lli->lli_opendir_key = NULL;
+                       lli->lli_opendir_pid = 0;
+                       lli->lli_sa_enabled = 0;
+               }
+               lli->lli_sa_pattern = LSA_PATTERN_NONE;
+               spin_unlock(&lli->lli_sa_lock);
+
+               ll_sax_free(ctx);
+       }
+}
+
 /* allocate sai */
 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
 {
        struct ll_statahead_info *sai;
        struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
-       int i;
 
        ENTRY;
 
@@ -494,10 +581,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
        INIT_LIST_HEAD(&sai->sai_entries);
        INIT_LIST_HEAD(&sai->sai_agls);
 
-       for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
-               INIT_LIST_HEAD(&sai->sai_cache[i]);
-               spin_lock_init(&sai->sai_cache_lock[i]);
-       }
        atomic_set(&sai->sai_cache_count, 0);
 
        spin_lock(&sai_generation_lock);
@@ -517,21 +600,10 @@ static inline void ll_sai_free(struct ll_statahead_info *sai)
        OBD_FREE_PTR(sai);
 }
 
-/*
- * take refcount of sai if sai for @dir exists, which means statahead is on for
- * this directory.
- */
-static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
+static inline struct ll_statahead_info *
+__ll_sai_get(struct ll_statahead_info *sai)
 {
-       struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = NULL;
-
-       spin_lock(&lli->lli_sa_lock);
-       sai = lli->lli_sai;
-       if (sai)
-               atomic_inc(&sai->sai_refcount);
-       spin_unlock(&lli->lli_sa_lock);
-
+       atomic_inc(&sai->sai_refcount);
        return sai;
 }
 
@@ -544,8 +616,8 @@ static void ll_sai_put(struct ll_statahead_info *sai)
        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
 
        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
-               struct sa_entry *entry, *next;
                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
+               struct sa_entry *entry, *next;
 
                lli->lli_sai = NULL;
                spin_unlock(&lli->lli_sa_lock);
@@ -556,7 +628,7 @@ static void ll_sai_put(struct ll_statahead_info *sai)
 
                list_for_each_entry_safe(entry, next, &sai->sai_entries,
                                         se_list)
-                       sa_kill(sai, entry);
+                       sa_kill(sai, entry, false);
 
                LASSERT(atomic_read(&sai->sai_cache_count) == 0);
                LASSERT(agl_list_empty(sai));
@@ -677,7 +749,7 @@ static void ll_statahead_interpret_work(struct work_struct *work)
        struct req_capsule *pill = item->mop_pill;
        struct inode *dir = item->mop_dir;
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
+       struct ll_statahead_info *sai;
        struct lookup_intent *it;
        struct sa_entry *entry;
        struct mdt_body *body;
@@ -689,6 +761,7 @@ static void ll_statahead_interpret_work(struct work_struct *work)
        entry = (struct sa_entry *)item->mop_cbdata;
        LASSERT(entry->se_handle != 0);
 
+       sai = entry->se_sai;
        it = &item->mop_it;
        body = req_capsule_server_get(pill, &RMF_MDT_BODY);
        if (!body)
@@ -696,7 +769,8 @@ static void ll_statahead_interpret_work(struct work_struct *work)
 
        child = entry->se_inode;
        /* revalidate; unlinked and re-created with the same name */
-       if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+       if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
+                    !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
                if (child) {
                        entry->se_inode = NULL;
                        iput(child);
@@ -764,9 +838,9 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc)
        struct lookup_intent *it = &item->mop_it;
        struct inode *dir = item->mop_dir;
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
        struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
        struct work_struct *work = &item->mop_work;
+       struct ll_statahead_info *sai;
        struct mdt_body *body;
        struct inode *child;
        __u64 handle = 0;
@@ -780,8 +854,9 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc)
         * because statahead thread will wait for all inflight RPC to finish,
         * sai should be always valid, no need to refcount
         */
-       LASSERT(sai != NULL);
        LASSERT(entry != NULL);
+       sai = entry->se_sai;
+       LASSERT(sai != NULL);
 
        CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
               entry->se_qstr.len, entry->se_qstr.name, rc);
@@ -794,8 +869,14 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc)
                GOTO(out, rc = -EFAULT);
 
        child = entry->se_inode;
-       /* revalidate; unlinked and re-created with the same name */
-       if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+       /*
+        * revalidate; unlinked and re-created with the same name.
+        * exclude the case where FID is zero as it was from statahead with
+        * regularized file name pattern and had no idea for the FID of the
+        * children file.
+        */
+       if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
+                    !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
                if (child) {
                        entry->se_inode = NULL;
                        iput(child);
@@ -839,9 +920,9 @@ out:
        RETURN(rc);
 }
 
-static inline int sa_getattr(struct inode *dir, struct md_op_item *item)
+static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
+                            struct md_op_item *item)
 {
-       struct ll_statahead_info *sai = ll_i2info(dir)->lli_sai;
        int rc;
 
        if (sa_has_batch_handle(sai))
@@ -864,7 +945,7 @@ static int sa_lookup(struct inode *dir, struct sa_entry *entry)
        if (IS_ERR(item))
                RETURN(PTR_ERR(item));
 
-       rc = sa_getattr(dir, item);
+       rc = sa_getattr(entry->se_sai, dir, item);
        if (rc < 0)
                sa_fini_data(item);
 
@@ -909,7 +990,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
                RETURN(1);
        }
 
-       rc = sa_getattr(dir, item);
+       rc = sa_getattr(entry->se_sai, dir, item);
        if (rc < 0) {
                entry->se_inode = NULL;
                iput(inode);
@@ -920,12 +1001,10 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
 }
 
 /* async stat for file with @name */
-static void sa_statahead(struct dentry *parent, const char *name, int len,
-                        const struct lu_fid *fid)
+static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
+                        const char *name, int len, const struct lu_fid *fid)
 {
        struct inode *dir = parent->d_inode;
-       struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
        struct dentry *dentry = NULL;
        struct sa_entry *entry;
        int rc;
@@ -964,15 +1043,15 @@ static void sa_statahead(struct dentry *parent, const char *name, int len,
 /* async glimpse (agl) thread main function */
 static int ll_agl_thread(void *arg)
 {
-       struct dentry *parent = (struct dentry *)arg;
-       struct inode *dir = parent->d_inode;
-       struct ll_inode_info *plli = ll_i2info(dir);
-       struct ll_inode_info *clli;
        /*
         * We already own this reference, so it is safe to take it
         * without a lock.
         */
-       struct ll_statahead_info *sai = plli->lli_sai;
+       struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
+       struct dentry *parent = sai->sai_dentry;
+       struct inode *dir = parent->d_inode;
+       struct ll_inode_info *plli = ll_i2info(dir);
+       struct ll_inode_info *clli;
 
        ENTRY;
 
@@ -1047,7 +1126,7 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
               sai, parent);
 
        plli = ll_i2info(parent->d_inode);
-       task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
+       task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
                                      plli->lli_opendir_pid);
        if (IS_ERR(task)) {
                CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
@@ -1056,26 +1135,23 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
        sai->sai_agl_task = task;
        atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
        /* Get an extra reference that the thread holds */
-       ll_sai_get(d_inode(parent));
+       __ll_sai_get(sai);
 
        wake_up_process(task);
 
        EXIT;
 }
 
-/* statahead thread main function */
-static int ll_statahead_thread(void *arg)
+static int ll_statahead_by_list(struct dentry *parent)
 {
-       struct dentry *parent = (struct dentry *)arg;
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct ll_statahead_info *sai = lli->lli_sai;
-       int first = 0;
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct md_op_data *op_data;
        struct page *page = NULL;
-       struct lu_batch *bh = NULL;
        __u64 pos = 0;
+       int first = 0;
        int rc = 0;
 
        ENTRY;
@@ -1083,18 +1159,9 @@ static int ll_statahead_thread(void *arg)
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
               sai, parent);
 
-       sai->sai_max_batch_count = sbi->ll_sa_batch_max;
-       if (sai->sai_max_batch_count) {
-               bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
-                                    sai->sai_max_batch_count);
-               if (IS_ERR(bh))
-                       GOTO(out_stop_agl, rc = PTR_ERR(bh));
-       }
-
-       sai->sai_bh = bh;
        OBD_ALLOC_PTR(op_data);
        if (!op_data)
-               GOTO(out, rc = -ENOMEM);
+               RETURN(-ENOMEM);
 
        /* matches smp_store_release() in ll_deauthorize_statahead() */
        while (pos != MDS_DIR_END_OFF && smp_load_acquire(&sai->sai_task)) {
@@ -1222,7 +1289,7 @@ static int ll_statahead_thread(void *arg)
                                namelen = lltr.len;
                        }
 
-                       sa_statahead(parent, name, namelen, &fid);
+                       sa_statahead(sai, parent, name, namelen, &fid);
                        llcrypt_fname_free_buffer(&lltr);
                }
 
@@ -1243,6 +1310,117 @@ static int ll_statahead_thread(void *arg)
        }
        ll_finish_md_op_data(op_data);
 
+       RETURN(rc);
+}
+
+static int ll_statahead_by_fname(struct ll_statahead_info *sai,
+                                struct dentry *parent)
+{
+       struct inode *dir = parent->d_inode;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       size_t max_len;
+       size_t len;
+       char *fname;
+       char *ptr;
+       int rc = 0;
+       __u64 i = 0;
+
+       ENTRY;
+
+       CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
+              sbi->ll_fsname, parent, sai->sai_fname);
+
+       OBD_ALLOC(fname, NAME_MAX);
+       if (fname == NULL)
+               RETURN(-ENOMEM);
+
+       len = strlen(sai->sai_fname);
+       memcpy(fname, sai->sai_fname, len);
+       max_len = sizeof(sai->sai_fname) - len;
+       ptr = fname + len;
+
+       /* matches smp_store_release() in ll_deauthorize_statahead() */
+       while (smp_load_acquire(&sai->sai_task)) {
+               size_t numlen;
+
+               numlen = snprintf(ptr, max_len, "%llu",
+                                 sai->sai_fstart + i);
+
+               while (({set_current_state(TASK_IDLE);
+                        /*
+                         * matches smp_store_release() in
+                         * ll_deauthorize_statahead()
+                         */
+                        smp_load_acquire(&sai->sai_task); })) {
+                       spin_lock(&lli->lli_agl_lock);
+                       while (sa_sent_full(sai) && !agl_list_empty(sai)) {
+                               struct ll_inode_info *clli;
+
+                               __set_current_state(TASK_RUNNING);
+                               clli = agl_first_entry(sai);
+                               list_del_init(&clli->lli_agl_list);
+                               spin_unlock(&lli->lli_agl_lock);
+
+                               ll_agl_trigger(&clli->lli_vfs_inode, sai);
+                               cond_resched();
+                               spin_lock(&lli->lli_agl_lock);
+                       }
+                       spin_unlock(&lli->lli_agl_lock);
+
+                       if (!sa_sent_full(sai))
+                               break;
+                       schedule();
+               }
+               __set_current_state(TASK_RUNNING);
+
+               sa_statahead(sai, parent, fname, len + numlen, NULL);
+               if (++i >= sai->sai_fend)
+                       break;
+       }
+
+       OBD_FREE(fname, NAME_MAX);
+       RETURN(rc);
+}
+
+/* statahead thread main function */
+static int ll_statahead_thread(void *arg)
+{
+       struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
+       struct dentry *parent = sai->sai_dentry;
+       struct inode *dir = parent->d_inode;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       struct lu_batch *bh = NULL;
+       int rc = 0;
+
+       ENTRY;
+
+       CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
+              sai, parent);
+
+       sai->sai_max_batch_count = sbi->ll_sa_batch_max;
+       if (sai->sai_max_batch_count) {
+               bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
+                                    sai->sai_max_batch_count);
+               if (IS_ERR(bh))
+                       GOTO(out_stop_agl, rc = PTR_ERR(bh));
+       }
+
+       sai->sai_bh = bh;
+
+       switch (lli->lli_sa_pattern) {
+       case LSA_PATTERN_LIST:
+               rc = ll_statahead_by_list(parent);
+               break;
+       case LSA_PATTERN_FNAME:
+               rc = ll_statahead_by_fname(sai, parent);
+               break;
+       default:
+               rc = -EFAULT;
+               break;
+       }
+
        if (rc < 0) {
                spin_lock(&lli->lli_sa_lock);
                sai->sai_task = NULL;
@@ -1264,7 +1442,7 @@ static int ll_statahead_thread(void *arg)
        __set_current_state(TASK_RUNNING);
 
        EXIT;
-out:
+
        if (bh) {
                rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
                sai->sai_bh = NULL;
@@ -1281,8 +1459,8 @@ out_stop_agl:
                /* in case we're not woken up, timeout wait */
                msleep(125);
 
-       CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
-              sbi->ll_fsname, sai, parent);
+       CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
+              sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
 
        spin_lock(&lli->lli_sa_lock);
        sai->sai_task = NULL;
@@ -1293,6 +1471,7 @@ out_stop_agl:
        atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
 
        ll_sai_put(sai);
+       ll_sax_put(dir, lli->lli_sax);
 
        return rc;
 }
@@ -1308,7 +1487,6 @@ void ll_authorize_statahead(struct inode *dir, void *key)
                 * if lli_sai is not NULL, it means previous statahead is not
                 * finished yet, we'd better not start a new statahead for now.
                 */
-               LASSERT(lli->lli_opendir_pid == 0);
                lli->lli_opendir_key = key;
                lli->lli_opendir_pid = current->pid;
                lli->lli_sa_enabled = 1;
@@ -1316,6 +1494,29 @@ void ll_authorize_statahead(struct inode *dir, void *key)
        spin_unlock(&lli->lli_sa_lock);
 }
 
+static void ll_deauthorize_statahead_fname(struct inode *dir, void *key)
+{
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_file_data *fd = (struct ll_file_data *)key;
+       struct ll_statahead_info *sai = fd->fd_sai;
+
+       if (sai == NULL)
+               return;
+
+       spin_lock(&lli->lli_sa_lock);
+       if (sai->sai_task) {
+               struct task_struct *task = sai->sai_task;
+
+               sai->sai_task = NULL;
+               wake_up_process(task);
+       }
+       fd->fd_sai = NULL;
+       spin_unlock(&lli->lli_sa_lock);
+       ll_sai_put(sai);
+       LASSERT(lli->lli_sax != NULL);
+       ll_sax_put(dir, lli->lli_sax);
+}
+
 /*
  * deauthorize opened dir handle @key to statahead, and notify statahead thread
  * to quit if it's running.
@@ -1325,12 +1526,17 @@ void ll_deauthorize_statahead(struct inode *dir, void *key)
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai;
 
-       LASSERT(lli->lli_opendir_key == key);
        LASSERT(lli->lli_opendir_pid != 0);
 
        CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
               PFID(&lli->lli_fid));
 
+       if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+               ll_deauthorize_statahead_fname(dir, key);
+               return;
+       }
+
+       LASSERT(lli->lli_opendir_key == key);
        spin_lock(&lli->lli_sa_lock);
        lli->lli_opendir_key = NULL;
        lli->lli_opendir_pid = 0;
@@ -1523,17 +1729,18 @@ out:
  * \retval             negative number upon error
  */
 static int revalidate_statahead_dentry(struct inode *dir,
-                                      struct ll_statahead_info *sai,
+                                      struct ll_statahead_context *ctx,
                                       struct dentry **dentryp,
                                       bool unplug)
 {
        struct sa_entry *entry = NULL;
        struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_info *sai = lli->lli_sai;
        int rc = 0;
 
        ENTRY;
 
-       if ((*dentryp)->d_name.name[0] == '.') {
+       if (sai && (*dentryp)->d_name.name[0] == '.') {
                if (sai->sai_ls_all ||
                    sai->sai_miss_hidden >= sai->sai_skip_hidden) {
                        /*
@@ -1563,10 +1770,16 @@ static int revalidate_statahead_dentry(struct inode *dir,
        if (unplug)
                GOTO(out, rc = 1);
 
-       entry = sa_get(sai, &(*dentryp)->d_name);
+       entry = sa_get(ctx, &(*dentryp)->d_name);
        if (!entry)
                GOTO(out, rc = -EAGAIN);
 
+       if (lli->lli_sa_pattern == LSA_PATTERN_LIST)
+               LASSERT(sai == entry->se_sai);
+       else if (lli->lli_sa_pattern == LSA_PATTERN_FNAME)
+               sai = entry->se_sai;
+
+       LASSERT(sai != NULL);
        if (!sa_ready(entry)) {
                spin_lock(&lli->lli_sa_lock);
                sai->sai_index_wait = entry->se_index;
@@ -1666,6 +1879,7 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
        int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = NULL;
+       struct ll_statahead_context *ctx = NULL;
        struct dentry *parent = dentry->d_parent;
        struct task_struct *task;
        struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
@@ -1691,6 +1905,10 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
        if (!sai)
                GOTO(out, rc = -ENOMEM);
 
+       ctx = ll_sax_alloc(dir);
+       if (!ctx)
+               GOTO(out, rc = -ENOMEM);
+
        sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
 
        /*
@@ -1700,17 +1918,20 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
         */
        spin_lock(&lli->lli_sa_lock);
        if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
-                    lli->lli_opendir_pid != current->pid)) {
+                    lli->lli_opendir_pid != current->pid ||
+                    lli->lli_sa_pattern != LSA_PATTERN_NONE)) {
                spin_unlock(&lli->lli_sa_lock);
                GOTO(out, rc = -EPERM);
        }
        lli->lli_sai = sai;
+       lli->lli_sax = ctx;
+       lli->lli_sa_pattern = LSA_PATTERN_LIST;
        spin_unlock(&lli->lli_sa_lock);
 
        CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
               current->pid, parent);
 
-       task = kthread_create_on_node(ll_statahead_thread, parent, node,
+       task = kthread_create_on_node(ll_statahead_thread, sai, node,
                                      "ll_sa_%u", lli->lli_opendir_pid);
        if (IS_ERR(task)) {
                spin_lock(&lli->lli_sa_lock);
@@ -1746,6 +1967,10 @@ out:
 
        if (sai)
                ll_sai_free(sai);
+
+       if (ctx)
+               ll_sax_free(ctx);
+
        if (first != LS_NOT_FIRST_DE)
                atomic_dec(&sbi->ll_sa_running);
 
@@ -1758,9 +1983,11 @@ out:
 static inline bool ll_statahead_started(struct inode *dir, bool agl)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_context *ctx;
        struct ll_statahead_info *sai;
 
        spin_lock(&lli->lli_sa_lock);
+       ctx = lli->lli_sax;
        sai = lli->lli_sai;
        if (sai && (sai->sai_agl_task != NULL) != agl)
                CDEBUG(D_READA,
@@ -1769,7 +1996,7 @@ static inline bool ll_statahead_started(struct inode *dir, bool agl)
                       sai->sai_agl_task != NULL, agl);
        spin_unlock(&lli->lli_sa_lock);
 
-       return !!sai;
+       return !!ctx;
 }
 
 /**
@@ -1810,15 +2037,162 @@ int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
                            bool unplug)
 {
-       struct ll_statahead_info *sai;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_context *ctx;
+       struct ll_statahead_info *sai = NULL;
        int rc = 0;
 
-       sai = ll_sai_get(dir);
-       if (sai) {
-               rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
+       spin_lock(&lli->lli_sa_lock);
+       ctx = lli->lli_sax;
+       if (ctx) {
+               sai = lli->lli_sai;
+               if (sai) {
+                       atomic_inc(&sai->sai_refcount);
+               } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
+                       spin_unlock(&lli->lli_sa_lock);
+                       return 0;
+               }
+               __ll_sax_get(ctx);
+       }
+       spin_unlock(&lli->lli_sa_lock);
+       if (ctx) {
+               rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
                CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
                       *dentryp, rc);
-               ll_sai_put(sai);
+               if (sai)
+                       ll_sai_put(sai);
+               ll_sax_put(dir, ctx);
        }
        return rc;
 }
+
+int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
+{
+       int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
+       struct ll_file_data *fd = file->private_data;
+       struct dentry *dentry = file_dentry(file);
+       struct inode *dir = dentry->d_inode;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       struct ll_statahead_info *sai = NULL;
+       struct ll_statahead_context *ctx = NULL;
+       struct task_struct *task;
+       bool agl = true;
+       int rc;
+
+       ENTRY;
+
+       if (sbi->ll_sa_max == 0)
+               RETURN(0);
+
+       if (!S_ISDIR(dir->i_mode))
+               RETURN(-EINVAL);
+
+       if (fd->fd_sai) {
+               rc = -EALREADY;
+               CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
+                     sbi->ll_fsname, dentry, rc);
+               RETURN(rc);
+       }
+
+       if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
+                                      sbi->ll_sa_running_max)) {
+               CDEBUG(D_READA,
+                      "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
+               GOTO(out, rc = -EMFILE);
+       }
+
+       sai = ll_sai_alloc(dentry);
+       if (sai == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       sai->sai_fstart = ladvise->lla_start;
+       sai->sai_fend = ladvise->lla_end;
+       sai->sai_ls_all = 0;
+       sai->sai_max = sbi->ll_sa_max;
+       strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
+
+       ctx = ll_sax_get(dir);
+       if (ctx == NULL) {
+               ctx = ll_sax_alloc(dir);
+               if (ctx == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               spin_lock(&lli->lli_sa_lock);
+               if (unlikely(lli->lli_sax)) {
+                       struct ll_statahead_context *tmp = ctx;
+
+                       if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
+                           lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+                               lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+                               ctx = lli->lli_sax;
+                               __ll_sax_get(ctx);
+                               fd->fd_sai = __ll_sai_get(sai);
+                               rc = 0;
+                       } else {
+                               rc = -EINVAL;
+                               CWARN("%s: pattern %X is not FNAME: rc = %d\n",
+                                     sbi->ll_fsname, lli->lli_sa_pattern, rc);
+                       }
+
+                       spin_unlock(&lli->lli_sa_lock);
+                       ll_sax_free(tmp);
+                       if (rc)
+                               GOTO(out, rc);
+               } else {
+                       lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+                       lli->lli_sax = ctx;
+                       fd->fd_sai = __ll_sai_get(sai);
+                       spin_unlock(&lli->lli_sa_lock);
+               }
+       } else {
+               spin_lock(&lli->lli_sa_lock);
+               if (!(lli->lli_sa_pattern == LSA_PATTERN_FNAME ||
+                     lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
+                       spin_unlock(&lli->lli_sa_lock);
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+               fd->fd_sai = __ll_sai_get(sai);
+               spin_unlock(&lli->lli_sa_lock);
+       }
+
+       __ll_sax_get(ctx);
+       CDEBUG(D_READA,
+              "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
+              current->pid, dentry, sai, ctx);
+
+       task = kthread_create_on_node(ll_statahead_thread, sai, node,
+                                     "ll_sa_%u", current->pid);
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CERROR("%s: cannot start ll_sa thread: rc = %d\n",
+                      sbi->ll_fsname, rc);
+               GOTO(out, rc);
+       }
+
+       if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
+               ll_start_agl(dentry, sai);
+
+       atomic_inc(&sbi->ll_sa_total);
+       sai->sai_task = task;
+       wake_up_process(task);
+
+       RETURN(0);
+out:
+       if (fd->fd_sai) {
+               ll_sai_put(sai);
+               ll_sax_put(dir, ctx);
+               fd->fd_sai = NULL;
+       }
+
+       if (sai)
+               ll_sai_free(sai);
+
+       if (ctx)
+               ll_sax_free(ctx);
+
+       atomic_dec(&sbi->ll_sa_running);
+       RETURN(rc);
+}