Whamcloud - gitweb
LU-11616 llite: replace smp_wb() with full memory barrier
[fs/lustre-release.git] / lustre / llite / statahead.c
index 5e136b1..5c7dbb5 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2014, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -82,6 +78,8 @@ struct sa_entry {
        struct inode           *se_inode;
        /* entry name */
        struct qstr             se_qstr;
+       /* entry fid */
+       struct lu_fid           se_fid;
 };
 
 static unsigned int sai_generation = 0;
@@ -181,7 +179,8 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
 
 /* allocate sa_entry and hash it to allow scanner process to find it */
 static struct sa_entry *
-sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
+sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
+        const char *name, int len, const struct lu_fid *fid)
 {
        struct ll_inode_info *lli;
        struct sa_entry *entry;
@@ -194,7 +193,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
        if (unlikely(entry == NULL))
                RETURN(ERR_PTR(-ENOMEM));
 
-       CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n",
+       CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
               len, name, entry, index);
 
        entry->se_index = index;
@@ -204,9 +203,10 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
        dname = (char *)entry + sizeof(struct sa_entry);
        memcpy(dname, name, len);
        dname[len] = 0;
-       entry->se_qstr.hash = full_name_hash(name, len);
+       entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
        entry->se_qstr.len = len;
        entry->se_qstr.name = dname;
+       entry->se_fid = *fid;
 
        lli = ll_i2info(sai->sai_dentry->d_inode);
 
@@ -223,7 +223,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
 /* free sa_entry, which should have been unhashed and not in any list */
 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
 {
-       CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n",
+       CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
               entry->se_qstr.len, entry->se_qstr.name, entry,
               entry->se_index);
 
@@ -325,11 +325,67 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
                }
        }
        list_add(&entry->se_list, pos);
-       entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
+       /*
+        * LU-9210: ll_statahead_interpet must be able to see this before
+        * we wake it up
+        */
+       smp_store_release(&entry->se_state, ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
 
        return (index == sai->sai_index_wait);
 }
 
+/* finish async stat RPC arguments */
+static void sa_fini_data(struct md_enqueue_info *minfo)
+{
+       ll_unlock_md_op_lsm(&minfo->mi_data);
+       iput(minfo->mi_dir);
+       OBD_FREE_PTR(minfo);
+}
+
+static int ll_statahead_interpret(struct ptlrpc_request *req,
+                                 struct md_enqueue_info *minfo, int rc);
+
+/*
+ * prepare arguments for async stat RPC.
+ */
+static struct md_enqueue_info *
+sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
+{
+       struct md_enqueue_info   *minfo;
+       struct ldlm_enqueue_info *einfo;
+       struct md_op_data        *op_data;
+
+       OBD_ALLOC_PTR(minfo);
+       if (minfo == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
+                                    entry->se_qstr.name, entry->se_qstr.len, 0,
+                                    LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data)) {
+               OBD_FREE_PTR(minfo);
+               return (struct md_enqueue_info *)op_data;
+       }
+
+       if (child == NULL)
+               op_data->op_fid2 = entry->se_fid;
+
+       minfo->mi_it.it_op = IT_GETATTR;
+       minfo->mi_dir = igrab(dir);
+       minfo->mi_cb = ll_statahead_interpret;
+       minfo->mi_cbdata = entry;
+
+       einfo = &minfo->mi_einfo;
+       einfo->ei_type   = LDLM_IBITS;
+       einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
+       einfo->ei_cb_bl  = ll_md_blocking_ast;
+       einfo->ei_cb_cp  = ldlm_completion_ast;
+       einfo->ei_cb_gl  = NULL;
+       einfo->ei_cbdata = NULL;
+
+       return minfo;
+}
+
 /*
  * release resources used in async stat RPC, update entry state and wakeup if
  * scanner process it waiting on this entry.
@@ -346,8 +402,7 @@ sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
        if (minfo) {
                entry->se_minfo = NULL;
                ll_intent_release(&minfo->mi_it);
-               iput(minfo->mi_dir);
-               OBD_FREE_PTR(minfo);
+               sa_fini_data(minfo);
        }
 
        if (req) {
@@ -493,10 +548,11 @@ static void ll_sai_put(struct ll_statahead_info *sai)
 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
-       __u64 index = lli->lli_agl_index;
+       u64 index = lli->lli_agl_index;
+       ktime_t expire;
        int rc;
-       ENTRY;
 
+       ENTRY;
        LASSERT(list_empty(&lli->lli_agl_list));
 
         /* AGL maybe fall behind statahead with one entry */
@@ -506,6 +562,18 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
                 RETURN_EXIT;
         }
 
+       /* In case of restore, the MDT has the right size and has already
+        * sent it back without granting the layout lock, inode is up-to-date.
+        * Then AGL (async glimpse lock) is useless.
+        * Also to glimpse we need the layout, in case of a runninh restore
+        * the MDT holds the layout lock so the glimpse will block up to the
+        * end of restore (statahead/agl will block) */
+       if (ll_file_test_flag(lli, LLIF_FILE_RESTORING)) {
+               lli->lli_agl_index = 0;
+               iput(inode);
+               RETURN_EXIT;
+       }
+
         /* Someone is in glimpse (sync or async), do nothing. */
        rc = down_write_trylock(&lli->lli_glimpse_sem);
         if (rc == 0) {
@@ -527,8 +595,9 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
          *    relative rare. AGL can ignore such case, and it will not muchly
          *    affect the performance.
          */
-        if (lli->lli_glimpse_time != 0 &&
-            cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) {
+       expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
+       if (ktime_to_ns(lli->lli_glimpse_time) &&
+           ktime_before(expire, lli->lli_glimpse_time)) {
                up_write(&lli->lli_glimpse_sem);
                 lli->lli_agl_index = 0;
                 iput(inode);
@@ -536,15 +605,15 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
         }
 
         CDEBUG(D_READA, "Handling (init) async glimpse: inode = "
-               DFID", idx = "LPU64"\n", PFID(&lli->lli_fid), index);
+              DFID", idx = %llu\n", PFID(&lli->lli_fid), index);
 
         cl_agl(inode);
         lli->lli_agl_index = 0;
-        lli->lli_glimpse_time = cfs_time_current();
+       lli->lli_glimpse_time = ktime_get();
        up_write(&lli->lli_glimpse_sem);
 
         CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
-               DFID", idx = "LPU64", rc = %d\n",
+              DFID", idx = %llu, rc = %d\n",
                PFID(&lli->lli_fid), index, rc);
 
         iput(inode);
@@ -568,60 +637,47 @@ static void sa_instantiate(struct ll_statahead_info *sai,
        int rc = 0;
        ENTRY;
 
-        LASSERT(entry->se_handle != 0);
-
-        minfo = entry->se_minfo;
-        it = &minfo->mi_it;
-        req = entry->se_req;
-        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-        if (body == NULL)
-                GOTO(out, rc = -EFAULT);
-
-        child = entry->se_inode;
-        if (child == NULL) {
-                /*
-                 * lookup.
-                 */
-                LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
-
-                /* XXX: No fid in reply, this is probaly cross-ref case.
-                 * SA can't handle it yet. */
-               if (body->mbo_valid & OBD_MD_MDS)
-                       GOTO(out, rc = -EAGAIN);
-       } else {
-               /*
-                * revalidate.
-                */
-               /* unlinked and re-created with the same name */
+       LASSERT(entry->se_handle != 0);
+
+       minfo = entry->se_minfo;
+       it = &minfo->mi_it;
+       req = entry->se_req;
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               GOTO(out, rc = -EFAULT);
+
+       child = entry->se_inode;
+       if (child != NULL) {
+               /* revalidate; unlinked and re-created with the same name */
                if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2,
                                        &body->mbo_fid1))) {
-                        entry->se_inode = NULL;
-                        iput(child);
-                        child = NULL;
-                }
-        }
+                       entry->se_inode = NULL;
+                       iput(child);
+                       child = NULL;
+               }
+       }
 
-        it->d.lustre.it_lock_handle = entry->se_handle;
+       it->it_lock_handle = entry->se_handle;
        rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
-        if (rc != 1)
-                GOTO(out, rc = -EAGAIN);
+       if (rc != 1)
+               GOTO(out, rc = -EAGAIN);
 
-        rc = ll_prep_inode(&child, req, dir->i_sb, it);
-        if (rc)
-                GOTO(out, rc);
+       rc = ll_prep_inode(&child, req, dir->i_sb, it);
+       if (rc)
+               GOTO(out, rc);
 
        CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
               ll_get_fsname(child->i_sb, NULL, 0),
               entry->se_qstr.len, entry->se_qstr.name,
               PFID(ll_inode2fid(child)), child);
-        ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
+       ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
 
-        entry->se_inode = child;
+       entry->se_inode = child;
 
-        if (agl_should_run(sai, child))
-                ll_agl_add(sai, child, entry->se_index);
+       if (agl_should_run(sai, child))
+               ll_agl_add(sai, child, entry->se_index);
 
-        EXIT;
+       EXIT;
 
 out:
        /* sa_make_ready() will drop ldlm ibits lock refcount by calling
@@ -668,7 +724,7 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        struct ll_statahead_info *sai = lli->lli_sai;
        struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
        __u64 handle = 0;
-       bool wakeup;
+       wait_queue_head_t *waitq = NULL;
        ENTRY;
 
        if (it_disposition(it, DISP_LOOKUP_NEG))
@@ -685,20 +741,21 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
 
        if (rc != 0) {
                ll_intent_release(it);
-               iput(dir);
-               OBD_FREE_PTR(minfo);
+               sa_fini_data(minfo);
        } else {
                /* release ibits lock ASAP to avoid deadlock when statahead
                 * thread enqueues lock on parent in readdir and another
                 * process enqueues lock on child with parent lock held, eg.
                 * unlink. */
-               handle = it->d.lustre.it_lock_handle;
+               handle = it->it_lock_handle;
                ll_intent_drop_lock(it);
+               ll_unlock_md_op_lsm(&minfo->mi_data);
        }
 
        spin_lock(&lli->lli_sa_lock);
        if (rc != 0) {
-               wakeup = __sa_make_ready(sai, entry, rc);
+               if (__sa_make_ready(sai, entry, rc))
+                       waitq = &sai->sai_waitq;
        } else {
                entry->se_minfo = minfo;
                entry->se_req = ptlrpc_request_addref(req);
@@ -707,90 +764,34 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
                 * for readpage and other tries to enqueue lock on child
                 * with parent's lock held, for example: unlink. */
                entry->se_handle = handle;
-               wakeup = !sa_has_callback(sai);
+               if (!sa_has_callback(sai))
+                       waitq = &sai->sai_thread.t_ctl_waitq;
+
                list_add_tail(&entry->se_list, &sai->sai_interim_entries);
        }
        sai->sai_replied++;
-       if (wakeup)
-               wake_up(&sai->sai_thread.t_ctl_waitq);
+
+       if (waitq != NULL)
+               wake_up(waitq);
        spin_unlock(&lli->lli_sa_lock);
 
        RETURN(rc);
 }
 
-/* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo,
-                         struct ldlm_enqueue_info *einfo)
-{
-        LASSERT(minfo && einfo);
-        iput(minfo->mi_dir);
-        OBD_FREE_PTR(minfo);
-        OBD_FREE_PTR(einfo);
-}
-
-/*
- * prepare arguments for async stat RPC.
- */
-static int sa_prep_data(struct inode *dir, struct inode *child,
-                       struct sa_entry *entry, struct md_enqueue_info **pmi,
-                       struct ldlm_enqueue_info **pei)
-{
-        struct qstr              *qstr = &entry->se_qstr;
-        struct md_enqueue_info   *minfo;
-        struct ldlm_enqueue_info *einfo;
-        struct md_op_data        *op_data;
-
-        OBD_ALLOC_PTR(einfo);
-        if (einfo == NULL)
-                return -ENOMEM;
-
-        OBD_ALLOC_PTR(minfo);
-        if (minfo == NULL) {
-                OBD_FREE_PTR(einfo);
-                return -ENOMEM;
-        }
-
-        op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, qstr->name,
-                                     qstr->len, 0, LUSTRE_OPC_ANY, NULL);
-        if (IS_ERR(op_data)) {
-                OBD_FREE_PTR(einfo);
-                OBD_FREE_PTR(minfo);
-                return PTR_ERR(op_data);
-        }
-
-       minfo->mi_it.it_op = IT_GETATTR;
-       minfo->mi_dir = igrab(dir);
-       minfo->mi_cb = ll_statahead_interpret;
-       minfo->mi_cbdata = entry;
-
-        einfo->ei_type   = LDLM_IBITS;
-        einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
-        einfo->ei_cb_bl  = ll_md_blocking_ast;
-        einfo->ei_cb_cp  = ldlm_completion_ast;
-        einfo->ei_cb_gl  = NULL;
-        einfo->ei_cbdata = NULL;
-
-        *pmi = minfo;
-        *pei = einfo;
-
-        return 0;
-}
-
 /* async stat for file not found in dcache */
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
        struct md_enqueue_info   *minfo;
-       struct ldlm_enqueue_info *einfo;
        int                       rc;
        ENTRY;
 
-       rc = sa_prep_data(dir, NULL, entry, &minfo, &einfo);
-       if (rc)
-               RETURN(rc);
+       minfo = sa_prep_data(dir, NULL, entry);
+       if (IS_ERR(minfo))
+               RETURN(PTR_ERR(minfo));
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
+       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
        if (rc < 0)
-               sa_fini_data(minfo, einfo);
+               sa_fini_data(minfo);
 
        RETURN(rc);
 }
@@ -807,9 +808,8 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
 {
        struct inode *inode = dentry->d_inode;
        struct lookup_intent it = { .it_op = IT_GETATTR,
-                                   .d.lustre.it_lock_handle = 0 };
+                                   .it_lock_handle = 0 };
        struct md_enqueue_info *minfo;
-       struct ldlm_enqueue_info *einfo;
        int rc;
        ENTRY;
 
@@ -819,34 +819,33 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
        if (d_mountpoint(dentry))
                RETURN(1);
 
+       minfo = sa_prep_data(dir, inode, entry);
+       if (IS_ERR(minfo))
+               RETURN(PTR_ERR(minfo));
+
        entry->se_inode = igrab(inode);
        rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
                                NULL);
        if (rc == 1) {
-               entry->se_handle = it.d.lustre.it_lock_handle;
+               entry->se_handle = it.it_lock_handle;
                ll_intent_release(&it);
+               sa_fini_data(minfo);
                RETURN(1);
        }
 
-       rc = sa_prep_data(dir, inode, entry, &minfo, &einfo);
-       if (rc) {
-               entry->se_inode = NULL;
-               iput(inode);
-               RETURN(rc);
-       }
-
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
+       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
        if (rc < 0) {
                entry->se_inode = NULL;
                iput(inode);
-               sa_fini_data(minfo, einfo);
+               sa_fini_data(minfo);
        }
 
        RETURN(rc);
 }
 
 /* async stat for file with @name */
-static void sa_statahead(struct dentry *parent, const char *name, int len)
+static void sa_statahead(struct dentry *parent, const char *name, int len,
+                        const struct lu_fid *fid)
 {
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
@@ -856,7 +855,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len)
        int rc;
        ENTRY;
 
-       entry = sa_alloc(sai, sai->sai_index, name, len);
+       entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
        if (IS_ERR(entry))
                RETURN_EXIT;
 
@@ -930,6 +929,7 @@ static int ll_agl_thread(void *arg)
                        list_del_init(&clli->lli_agl_list);
                        spin_unlock(&plli->lli_agl_lock);
                        ll_agl_trigger(&clli->lli_vfs_inode, sai);
+                       cond_resched();
                } else {
                        spin_unlock(&plli->lli_agl_lock);
                }
@@ -1007,13 +1007,10 @@ static int ll_statahead_thread(void *arg)
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
 
-       op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
-                                    LUSTRE_OPC_ANY, dir);
+       OBD_ALLOC_PTR(op_data);
        if (IS_ERR(op_data))
                GOTO(out, rc = PTR_ERR(op_data));
 
-       op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
-
        if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
                ll_start_agl(parent, sai);
 
@@ -1032,13 +1029,21 @@ static int ll_statahead_thread(void *arg)
                struct lu_dirpage *dp;
                struct lu_dirent  *ent;
 
+               op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
+                                    LUSTRE_OPC_ANY, dir);
+               if (IS_ERR(op_data)) {
+                       rc = PTR_ERR(op_data);
+                       break;
+               }
+
                sai->sai_in_readpage = 1;
                page = ll_get_dir_page(dir, op_data, pos, &chain);
+               ll_unlock_md_op_lsm(op_data);
                sai->sai_in_readpage = 0;
                if (IS_ERR(page)) {
                        rc = PTR_ERR(page);
-                       CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
-                              "/"LPU64" opendir_pid = %u: rc = %d\n",
+                       CDEBUG(D_READA, "error reading dir "DFID" at %llu"
+                              "/%llu opendir_pid = %u: rc = %d\n",
                               PFID(ll_inode2fid(dir)), pos, sai->sai_index,
                               lli->lli_opendir_pid, rc);
                        break;
@@ -1052,6 +1057,7 @@ static int ll_statahead_thread(void *arg)
                        __u64 hash;
                        int namelen;
                        char *name;
+                       struct lu_fid fid;
 
                        hash = le64_to_cpu(ent->lde_hash);
                        if (unlikely(hash < pos))
@@ -1094,6 +1100,8 @@ static int ll_statahead_thread(void *arg)
                        if (unlikely(++first == 1))
                                continue;
 
+                       fid_le_to_cpu(&fid, &ent->lde_fid);
+
                        /* wait for spare statahead window */
                        do {
                                l_wait_event(sa_thread->t_ctl_waitq,
@@ -1116,14 +1124,14 @@ static int ll_statahead_thread(void *arg)
 
                                        ll_agl_trigger(&clli->lli_vfs_inode,
                                                        sai);
-
+                                       cond_resched();
                                        spin_lock(&lli->lli_agl_lock);
                                }
                                spin_unlock(&lli->lli_agl_lock);
                        } while (sa_sent_full(sai) &&
                                 thread_is_running(sa_thread));
 
-                       sa_statahead(parent, name, namelen);
+                       sa_statahead(parent, name, namelen, &fid);
                }
 
                pos = le64_to_cpu(dp->ldp_hash_end);
@@ -1134,8 +1142,8 @@ static int ll_statahead_thread(void *arg)
                        rc = -EFAULT;
                        atomic_inc(&sbi->ll_sa_wrong);
                        CDEBUG(D_READA, "Statahead for dir "DFID" hit "
-                              "ratio too low: hit/miss "LPU64"/"LPU64
-                              ", sent/replied "LPU64"/"LPU64", stopping "
+                              "ratio too low: hit/miss %llu/%llu"
+                              ", sent/replied %llu/%llu, stopping "
                               "statahead thread: pid %d\n",
                               PFID(&lli->lli_fid), sai->sai_hit,
                               sai->sai_miss, sai->sai_sent,
@@ -1252,8 +1260,12 @@ void ll_deauthorize_statahead(struct inode *dir, void *key)
                /*
                 * statahead thread may not quit yet because it needs to cache
                 * entries, now it's time to tell it to quit.
+                *
+                * In case sai is released, wake_up() is called inside spinlock,
+                * so we have to call smp_mb() explicitely to serialize ops.
                 */
                thread_set_flags(&sai->sai_thread, SVC_STOPPING);
+               smp_mb();
                wake_up(&sai->sai_thread.t_ctl_waitq);
        }
        spin_unlock(&lli->lli_sa_lock);
@@ -1293,7 +1305,6 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
        /**
         *FIXME choose the start offset of the readdir
         */
-       op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
 
        ll_dir_chain_init(&chain);
        page = ll_get_dir_page(dir, op_data, 0, &chain);
@@ -1306,7 +1317,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
                        struct ll_inode_info *lli = ll_i2info(dir);
 
                        rc = PTR_ERR(page);
-                       CERROR("%s: reading dir "DFID" at "LPU64
+                       CERROR("%s: reading dir "DFID" at %llu"
                               "opendir_pid = %u : rc = %d\n",
                               ll_get_fsname(dir->i_sb, NULL, 0),
                               PFID(ll_inode2fid(dir)), pos,
@@ -1414,7 +1425,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
        struct sa_entry *entry = NULL;
        struct l_wait_info lwi = { 0 };
        struct ll_dentry_data *ldd;
-       struct ll_inode_info *lli;
+       struct ll_inode_info *lli = ll_i2info(dir);
        int rc = 0;
        ENTRY;
 
@@ -1457,9 +1468,11 @@ static int revalidate_statahead_dentry(struct inode *dir,
                sa_handle_callback(sai);
 
        if (!sa_ready(entry)) {
+               spin_lock(&lli->lli_sa_lock);
                sai->sai_index_wait = entry->se_index;
+               spin_unlock(&lli->lli_sa_lock);
                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
-                                       LWI_ON_SIGNAL_NOOP, NULL);
+                                      LWI_ON_SIGNAL_NOOP, NULL);
                rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi);
                if (rc < 0) {
                        /*
@@ -1471,10 +1484,15 @@ static int revalidate_statahead_dentry(struct inode *dir,
                }
        }
 
-       if (entry->se_state == SA_ENTRY_SUCC && entry->se_inode != NULL) {
+       /*
+        * We need to see the value that was set immediately before we
+        * were woken up.
+        */
+       if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
+           entry->se_inode) {
                struct inode *inode = entry->se_inode;
                struct lookup_intent it = { .it_op = IT_GETATTR,
-                                           .d.lustre.it_lock_handle =
+                                           .it_lock_handle =
                                                entry->se_handle };
                __u64 bits;
 
@@ -1485,8 +1503,10 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                struct dentry *alias;
 
                                alias = ll_splice_alias(inode, *dentryp);
-                               if (IS_ERR(alias))
+                               if (IS_ERR(alias)) {
+                                       ll_intent_release(&it);
                                        GOTO(out, rc = PTR_ERR(alias));
+                               }
                                *dentryp = alias;
                                /* statahead prepared this inode, transfer inode
                                 * refcount from sa_entry to dentry */
@@ -1503,6 +1523,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                        (*dentryp)->d_name.name,
                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
                                        PFID(ll_inode2fid(inode)));
+                               ll_intent_release(&it);
                                GOTO(out, rc = -ESTALE);
                        }
 
@@ -1521,7 +1542,6 @@ out:
         * dentry_may_statahead().
         */
        ldd = ll_d2d(*dentryp);
-       lli = ll_i2info(dir);
        /* ldd can be NULL if llite lookup failed. */
        if (ldd != NULL)
                ldd->lld_sa_generation = lli->lli_sa_generation;
@@ -1550,20 +1570,30 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
        struct ptlrpc_thread *thread;
        struct l_wait_info lwi = { 0 };
        struct task_struct *task;
-       int rc;
+       struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
+       int first = LS_FIRST_DE;
+       int rc = 0;
        ENTRY;
 
        /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
-       rc = is_first_dirent(dir, dentry);
-       if (rc == LS_NOT_FIRST_DE)
+       first = is_first_dirent(dir, dentry);
+       if (first == LS_NOT_FIRST_DE)
                /* It is not "ls -{a}l" operation, no need statahead for it. */
                GOTO(out, rc = -EFAULT);
 
+       if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
+                                      sbi->ll_sa_running_max)) {
+               CDEBUG(D_READA,
+                      "Too many concurrent statahead instances, "
+                      "avoid new statahead instance temporarily.\n");
+               GOTO(out, rc = -EMFILE);
+       }
+
        sai = ll_sai_alloc(parent);
        if (sai == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
+       sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
 
        /* if current lli_opendir_key was deauthorized, or dir re-opened by
         * another process, don't start statahead, otherwise the newly spawned
@@ -1578,8 +1608,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
        lli->lli_sai = sai;
        spin_unlock(&lli->lli_sa_lock);
 
-       atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
-
        CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
               current_pid(), parent->d_name.len, parent->d_name.name);
 
@@ -1587,6 +1615,9 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
                           lli->lli_opendir_pid);
        thread = &sai->sai_thread;
        if (IS_ERR(task)) {
+               spin_lock(&lli->lli_sa_lock);
+               lli->lli_sai = NULL;
+               spin_unlock(&lli->lli_sa_lock);
                rc = PTR_ERR(task);
                CERROR("can't start ll_sa thread, rc: %d\n", rc);
                GOTO(out, rc);
@@ -1607,12 +1638,14 @@ out:
        /* once we start statahead thread failed, disable statahead so that
         * subsequent stat won't waste time to try it. */
        spin_lock(&lli->lli_sa_lock);
-       lli->lli_sa_enabled = 0;
-       lli->lli_sai = NULL;
+       if (lli->lli_opendir_pid == current->pid)
+               lli->lli_sa_enabled = 0;
        spin_unlock(&lli->lli_sa_lock);
 
        if (sai != NULL)
                ll_sai_free(sai);
+       if (first != LS_NOT_FIRST_DE)
+               atomic_dec(&sbi->ll_sa_running);
 
        RETURN(rc);
 }