Whamcloud - gitweb
LU-7828 statahead: set sai_index_wait with lli_sa_lock held
[fs/lustre-release.git] / lustre / llite / statahead.c
index 756ae36..384bd20 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2014, Intel Corporation.
+ * Copyright (c) 2011, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -36,6 +36,7 @@
 
 #include <linux/fs.h>
 #include <linux/sched.h>
+#include <linux/kthread.h>
 #include <linux/mm.h>
 #include <linux/highmem.h>
 #include <linux/pagemap.h>
@@ -81,6 +82,8 @@ struct sa_entry {
        struct inode           *se_inode;
        /* entry name */
        struct qstr             se_qstr;
+       /* entry fid */
+       struct lu_fid           se_fid;
 };
 
 static unsigned int sai_generation = 0;
@@ -180,7 +183,8 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
 
 /* allocate sa_entry and hash it to allow scanner process to find it */
 static struct sa_entry *
-sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
+sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len,
+        const struct lu_fid *fid)
 {
        struct ll_inode_info *lli;
        struct sa_entry *entry;
@@ -206,6 +210,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len)
        entry->se_qstr.hash = full_name_hash(name, len);
        entry->se_qstr.len = len;
        entry->se_qstr.name = dname;
+       entry->se_fid = *fid;
 
        lli = ll_i2info(sai->sai_dentry->d_inode);
 
@@ -312,12 +317,11 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 {
        struct sa_entry *se;
        struct list_head *pos = &sai->sai_entries;
+       __u64 index = entry->se_index;
 
        LASSERT(!sa_ready(entry));
        LASSERT(list_empty(&entry->se_list));
 
-       entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
-
        list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
                if (se->se_index < entry->se_index) {
                        pos = &se->se_list;
@@ -325,8 +329,9 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
                }
        }
        list_add(&entry->se_list, pos);
+       entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
 
-       return (entry->se_index == sai->sai_index_wait);
+       return (index == sai->sai_index_wait);
 }
 
 /*
@@ -576,29 +581,16 @@ static void sa_instantiate(struct ll_statahead_info *sai,
         if (body == NULL)
                 GOTO(out, rc = -EFAULT);
 
-        child = entry->se_inode;
-        if (child == NULL) {
-                /*
-                 * lookup.
-                 */
-                LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
-
-                /* XXX: No fid in reply, this is probaly cross-ref case.
-                 * SA can't handle it yet. */
-               if (body->mbo_valid & OBD_MD_MDS)
-                       GOTO(out, rc = -EAGAIN);
-       } else {
-               /*
-                * revalidate.
-                */
-               /* unlinked and re-created with the same name */
+       child = entry->se_inode;
+       if (child != NULL) {
+               /* revalidate; unlinked and re-created with the same name */
                if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2,
                                        &body->mbo_fid1))) {
-                        entry->se_inode = NULL;
-                        iput(child);
-                        child = NULL;
-                }
-        }
+                       entry->se_inode = NULL;
+                       iput(child);
+                       child = NULL;
+               }
+       }
 
         it->d.lustre.it_lock_handle = entry->se_handle;
        rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
@@ -667,7 +659,7 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        struct ll_statahead_info *sai = lli->lli_sai;
        struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
        __u64 handle = 0;
-       bool wakeup;
+       wait_queue_head_t *waitq = NULL;
        ENTRY;
 
        if (it_disposition(it, DISP_LOOKUP_NEG))
@@ -697,7 +689,8 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
 
        spin_lock(&lli->lli_sa_lock);
        if (rc != 0) {
-               wakeup = __sa_make_ready(sai, entry, rc);
+               if (__sa_make_ready(sai, entry, rc))
+                       waitq = &sai->sai_waitq;
        } else {
                entry->se_minfo = minfo;
                entry->se_req = ptlrpc_request_addref(req);
@@ -706,108 +699,80 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
                 * for readpage and other tries to enqueue lock on child
                 * with parent's lock held, for example: unlink. */
                entry->se_handle = handle;
-               wakeup = !sa_has_callback(sai);
+               if (!sa_has_callback(sai))
+                       waitq = &sai->sai_thread.t_ctl_waitq;
+
                list_add_tail(&entry->se_list, &sai->sai_interim_entries);
        }
        sai->sai_replied++;
-       if (wakeup)
-               wake_up(&sai->sai_thread.t_ctl_waitq);
+       if (waitq != NULL)
+               wake_up(waitq);
        spin_unlock(&lli->lli_sa_lock);
 
        RETURN(rc);
 }
 
 /* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo,
-                         struct ldlm_enqueue_info *einfo)
+static void sa_fini_data(struct md_enqueue_info *minfo)
 {
-        LASSERT(minfo && einfo);
         iput(minfo->mi_dir);
-        capa_put(minfo->mi_data.op_capa1);
-        capa_put(minfo->mi_data.op_capa2);
         OBD_FREE_PTR(minfo);
-        OBD_FREE_PTR(einfo);
 }
 
 /*
  * prepare arguments for async stat RPC.
- *
- * There is race condition between "capa_put" and "ll_statahead_interpret" for
- * accessing "op_data.op_capa[1,2]" as following:
- * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
- * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
- * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
- * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
- * "md_intent_getattr_async".
  */
-static int sa_prep_data(struct inode *dir, struct inode *child,
-                       struct sa_entry *entry, struct md_enqueue_info **pmi,
-                       struct ldlm_enqueue_info **pei,
-                       struct obd_capa **pcapa)
+static struct md_enqueue_info *
+sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
 {
-        struct qstr              *qstr = &entry->se_qstr;
-        struct md_enqueue_info   *minfo;
-        struct ldlm_enqueue_info *einfo;
-        struct md_op_data        *op_data;
-
-        OBD_ALLOC_PTR(einfo);
-        if (einfo == NULL)
-                return -ENOMEM;
-
-        OBD_ALLOC_PTR(minfo);
-        if (minfo == NULL) {
-                OBD_FREE_PTR(einfo);
-                return -ENOMEM;
-        }
+       struct md_enqueue_info   *minfo;
+       struct ldlm_enqueue_info *einfo;
+       struct md_op_data        *op_data;
 
-        op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, qstr->name,
-                                     qstr->len, 0, LUSTRE_OPC_ANY, NULL);
-        if (IS_ERR(op_data)) {
-                OBD_FREE_PTR(einfo);
-                OBD_FREE_PTR(minfo);
-                return PTR_ERR(op_data);
-        }
+       OBD_ALLOC_PTR(minfo);
+       if (minfo == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, NULL, 0, 0,
+                                    LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data)) {
+               OBD_FREE_PTR(minfo);
+               return (struct md_enqueue_info *)op_data;
+       }
+
+       if (child == NULL)
+               op_data->op_fid2 = entry->se_fid;
 
        minfo->mi_it.it_op = IT_GETATTR;
        minfo->mi_dir = igrab(dir);
        minfo->mi_cb = ll_statahead_interpret;
        minfo->mi_cbdata = entry;
 
-        einfo->ei_type   = LDLM_IBITS;
-        einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
-        einfo->ei_cb_bl  = ll_md_blocking_ast;
-        einfo->ei_cb_cp  = ldlm_completion_ast;
-        einfo->ei_cb_gl  = NULL;
-        einfo->ei_cbdata = NULL;
+       einfo = &minfo->mi_einfo;
+       einfo->ei_type   = LDLM_IBITS;
+       einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
+       einfo->ei_cb_bl  = ll_md_blocking_ast;
+       einfo->ei_cb_cp  = ldlm_completion_ast;
+       einfo->ei_cb_gl  = NULL;
+       einfo->ei_cbdata = NULL;
 
-        *pmi = minfo;
-        *pei = einfo;
-        pcapa[0] = op_data->op_capa1;
-        pcapa[1] = op_data->op_capa2;
-
-        return 0;
+       return minfo;
 }
 
 /* async stat for file not found in dcache */
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
        struct md_enqueue_info   *minfo;
-       struct ldlm_enqueue_info *einfo;
-       struct obd_capa          *capas[2];
        int                       rc;
        ENTRY;
 
-       rc = sa_prep_data(dir, NULL, entry, &minfo, &einfo, capas);
-       if (rc)
-               RETURN(rc);
+       minfo = sa_prep_data(dir, NULL, entry);
+       if (IS_ERR(minfo))
+               RETURN(PTR_ERR(minfo));
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
-       if (!rc) {
-               capa_put(capas[0]);
-               capa_put(capas[1]);
-       } else {
-               sa_fini_data(minfo, einfo);
-       }
+       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+       if (rc < 0)
+               sa_fini_data(minfo);
 
        RETURN(rc);
 }
@@ -826,8 +791,6 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
        struct lookup_intent it = { .it_op = IT_GETATTR,
                                    .d.lustre.it_lock_handle = 0 };
        struct md_enqueue_info *minfo;
-       struct ldlm_enqueue_info *einfo;
-       struct obd_capa *capas[2];
        int rc;
        ENTRY;
 
@@ -846,28 +809,26 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
                RETURN(1);
        }
 
-       rc = sa_prep_data(dir, inode, entry, &minfo, &einfo, capas);
-       if (rc) {
+       minfo = sa_prep_data(dir, inode, entry);
+       if (IS_ERR(minfo)) {
                entry->se_inode = NULL;
                iput(inode);
-               RETURN(rc);
+               RETURN(PTR_ERR(minfo));
        }
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
-       if (!rc) {
-               capa_put(capas[0]);
-               capa_put(capas[1]);
-       } else {
+       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+       if (rc < 0) {
                entry->se_inode = NULL;
                iput(inode);
-               sa_fini_data(minfo, einfo);
+               sa_fini_data(minfo);
        }
 
        RETURN(rc);
 }
 
 /* async stat for file with @name */
-static void sa_statahead(struct dentry *parent, const char *name, int len)
+static void sa_statahead(struct dentry *parent, const char *name, int len,
+                        const struct lu_fid *fid)
 {
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
@@ -877,7 +838,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len)
        int rc;
        ENTRY;
 
-       entry = sa_alloc(sai, sai->sai_index, name, len);
+       entry = sa_alloc(sai, sai->sai_index, name, len, fid);
        if (IS_ERR(entry))
                RETURN_EXIT;
 
@@ -1073,6 +1034,7 @@ static int ll_statahead_thread(void *arg)
                        __u64 hash;
                        int namelen;
                        char *name;
+                       struct lu_fid fid;
 
                        hash = le64_to_cpu(ent->lde_hash);
                        if (unlikely(hash < pos))
@@ -1115,6 +1077,8 @@ static int ll_statahead_thread(void *arg)
                        if (unlikely(++first == 1))
                                continue;
 
+                       fid_le_to_cpu(&fid, &ent->lde_fid);
+
                        /* wait for spare statahead window */
                        do {
                                l_wait_event(sa_thread->t_ctl_waitq,
@@ -1144,7 +1108,7 @@ static int ll_statahead_thread(void *arg)
                        } while (sa_sent_full(sai) &&
                                 thread_is_running(sa_thread));
 
-                       sa_statahead(parent, name, namelen);
+                       sa_statahead(parent, name, namelen, &fid);
                }
 
                pos = le64_to_cpu(dp->ldp_hash_end);
@@ -1435,7 +1399,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
        struct sa_entry *entry = NULL;
        struct l_wait_info lwi = { 0 };
        struct ll_dentry_data *ldd;
-       struct ll_inode_info *lli;
+       struct ll_inode_info *lli = ll_i2info(dir);
        int rc = 0;
        ENTRY;
 
@@ -1478,13 +1442,12 @@ static int revalidate_statahead_dentry(struct inode *dir,
                sa_handle_callback(sai);
 
        if (!sa_ready(entry)) {
+               spin_lock(&lli->lli_sa_lock);
                sai->sai_index_wait = entry->se_index;
+               spin_unlock(&lli->lli_sa_lock);
                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
-                                       LWI_ON_SIGNAL_NOOP, NULL);
-               rc = l_wait_event(sai->sai_waitq,
-                               sa_ready(entry) ||
-                               thread_is_stopped(&sai->sai_thread),
-                               &lwi);
+                                      LWI_ON_SIGNAL_NOOP, NULL);
+               rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi);
                if (rc < 0) {
                        /*
                         * entry may not be ready, so it may be used by inflight
@@ -1509,8 +1472,10 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                struct dentry *alias;
 
                                alias = ll_splice_alias(inode, *dentryp);
-                               if (IS_ERR(alias))
+                               if (IS_ERR(alias)) {
+                                       ll_intent_release(&it);
                                        GOTO(out, rc = PTR_ERR(alias));
+                               }
                                *dentryp = alias;
                                /* statahead prepared this inode, transfer inode
                                 * refcount from sa_entry to dentry */
@@ -1527,6 +1492,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                        (*dentryp)->d_name.name,
                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
                                        PFID(ll_inode2fid(inode)));
+                               ll_intent_release(&it);
                                GOTO(out, rc = -ESTALE);
                        }
 
@@ -1545,7 +1511,6 @@ out:
         * dentry_may_statahead().
         */
        ldd = ll_d2d(*dentryp);
-       lli = ll_i2info(dir);
        /* ldd can be NULL if llite lookup failed. */
        if (ldd != NULL)
                ldd->lld_sa_generation = lli->lli_sa_generation;