Whamcloud - gitweb
LU-9679 general: add missing spaces to folded strings.
[fs/lustre-release.git] / lustre / llite / statahead.c
index 22203ce..0f1dfe3 100644 (file)
@@ -325,11 +325,67 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
                }
        }
        list_add(&entry->se_list, pos);
-       entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
+       /*
+        * LU-9210: ll_statahead_interpet must be able to see this before
+        * we wake it up
+        */
+       smp_store_release(&entry->se_state, ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
 
        return (index == sai->sai_index_wait);
 }
 
+/* finish async stat RPC arguments */
+static void sa_fini_data(struct md_enqueue_info *minfo)
+{
+       ll_unlock_md_op_lsm(&minfo->mi_data);
+       iput(minfo->mi_dir);
+       OBD_FREE_PTR(minfo);
+}
+
+static int ll_statahead_interpret(struct ptlrpc_request *req,
+                                 struct md_enqueue_info *minfo, int rc);
+
+/*
+ * prepare arguments for async stat RPC.
+ */
+static struct md_enqueue_info *
+sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
+{
+       struct md_enqueue_info   *minfo;
+       struct ldlm_enqueue_info *einfo;
+       struct md_op_data        *op_data;
+
+       OBD_ALLOC_PTR(minfo);
+       if (minfo == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
+                                    entry->se_qstr.name, entry->se_qstr.len, 0,
+                                    LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data)) {
+               OBD_FREE_PTR(minfo);
+               return (struct md_enqueue_info *)op_data;
+       }
+
+       if (child == NULL)
+               op_data->op_fid2 = entry->se_fid;
+
+       minfo->mi_it.it_op = IT_GETATTR;
+       minfo->mi_dir = igrab(dir);
+       minfo->mi_cb = ll_statahead_interpret;
+       minfo->mi_cbdata = entry;
+
+       einfo = &minfo->mi_einfo;
+       einfo->ei_type   = LDLM_IBITS;
+       einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
+       einfo->ei_cb_bl  = ll_md_blocking_ast;
+       einfo->ei_cb_cp  = ldlm_completion_ast;
+       einfo->ei_cb_gl  = NULL;
+       einfo->ei_cbdata = NULL;
+
+       return minfo;
+}
+
 /*
  * release resources used in async stat RPC, update entry state and wakeup if
  * scanner process it waiting on this entry.
@@ -346,8 +402,7 @@ sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
        if (minfo) {
                entry->se_minfo = NULL;
                ll_intent_release(&minfo->mi_it);
-               iput(minfo->mi_dir);
-               OBD_FREE_PTR(minfo);
+               sa_fini_data(minfo);
        }
 
        if (req) {
@@ -582,14 +637,14 @@ static void sa_instantiate(struct ll_statahead_info *sai,
        int rc = 0;
        ENTRY;
 
-        LASSERT(entry->se_handle != 0);
+       LASSERT(entry->se_handle != 0);
 
-        minfo = entry->se_minfo;
-        it = &minfo->mi_it;
-        req = entry->se_req;
-        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-        if (body == NULL)
-                GOTO(out, rc = -EFAULT);
+       minfo = entry->se_minfo;
+       it = &minfo->mi_it;
+       req = entry->se_req;
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               GOTO(out, rc = -EFAULT);
 
        child = entry->se_inode;
        if (child != NULL) {
@@ -604,25 +659,24 @@ static void sa_instantiate(struct ll_statahead_info *sai,
 
        it->it_lock_handle = entry->se_handle;
        rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
-        if (rc != 1)
-                GOTO(out, rc = -EAGAIN);
+       if (rc != 1)
+               GOTO(out, rc = -EAGAIN);
 
-        rc = ll_prep_inode(&child, req, dir->i_sb, it);
-        if (rc)
-                GOTO(out, rc);
+       rc = ll_prep_inode(&child, req, dir->i_sb, it);
+       if (rc)
+               GOTO(out, rc);
 
        CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
-              ll_get_fsname(child->i_sb, NULL, 0),
-              entry->se_qstr.len, entry->se_qstr.name,
-              PFID(ll_inode2fid(child)), child);
-        ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
+              ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
+              entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
+       ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
 
-        entry->se_inode = child;
+       entry->se_inode = child;
 
-        if (agl_should_run(sai, child))
-                ll_agl_add(sai, child, entry->se_index);
+       if (agl_should_run(sai, child))
+               ll_agl_add(sai, child, entry->se_index);
 
-        EXIT;
+       EXIT;
 
 out:
        /* sa_make_ready() will drop ldlm ibits lock refcount by calling
@@ -638,21 +692,19 @@ static void sa_handle_callback(struct ll_statahead_info *sai)
 
        lli = ll_i2info(sai->sai_dentry->d_inode);
 
+       spin_lock(&lli->lli_sa_lock);
        while (sa_has_callback(sai)) {
                struct sa_entry *entry;
 
-               spin_lock(&lli->lli_sa_lock);
-               if (unlikely(!sa_has_callback(sai))) {
-                       spin_unlock(&lli->lli_sa_lock);
-                       break;
-               }
                entry = list_entry(sai->sai_interim_entries.next,
                                   struct sa_entry, se_list);
                list_del_init(&entry->se_list);
                spin_unlock(&lli->lli_sa_lock);
 
                sa_instantiate(sai, entry);
+               spin_lock(&lli->lli_sa_lock);
        }
+       spin_unlock(&lli->lli_sa_lock);
 }
 
 /*
@@ -686,8 +738,7 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
 
        if (rc != 0) {
                ll_intent_release(it);
-               iput(dir);
-               OBD_FREE_PTR(minfo);
+               sa_fini_data(minfo);
        } else {
                /* release ibits lock ASAP to avoid deadlock when statahead
                 * thread enqueues lock on parent in readdir and another
@@ -695,6 +746,7 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
                 * unlink. */
                handle = it->it_lock_handle;
                ll_intent_drop_lock(it);
+               ll_unlock_md_op_lsm(&minfo->mi_data);
        }
 
        spin_lock(&lli->lli_sa_lock);
@@ -716,7 +768,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        }
        sai->sai_replied++;
 
-       smp_mb();
        if (waitq != NULL)
                wake_up(waitq);
        spin_unlock(&lli->lli_sa_lock);
@@ -724,54 +775,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        RETURN(rc);
 }
 
-/* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo)
-{
-        iput(minfo->mi_dir);
-        OBD_FREE_PTR(minfo);
-}
-
-/*
- * prepare arguments for async stat RPC.
- */
-static struct md_enqueue_info *
-sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
-{
-       struct md_enqueue_info   *minfo;
-       struct ldlm_enqueue_info *einfo;
-       struct md_op_data        *op_data;
-
-       OBD_ALLOC_PTR(minfo);
-       if (minfo == NULL)
-               return ERR_PTR(-ENOMEM);
-
-       op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
-                                    entry->se_qstr.name, entry->se_qstr.len, 0,
-                                    LUSTRE_OPC_ANY, NULL);
-       if (IS_ERR(op_data)) {
-               OBD_FREE_PTR(minfo);
-               return (struct md_enqueue_info *)op_data;
-       }
-
-       if (child == NULL)
-               op_data->op_fid2 = entry->se_fid;
-
-       minfo->mi_it.it_op = IT_GETATTR;
-       minfo->mi_dir = igrab(dir);
-       minfo->mi_cb = ll_statahead_interpret;
-       minfo->mi_cbdata = entry;
-
-       einfo = &minfo->mi_einfo;
-       einfo->ei_type   = LDLM_IBITS;
-       einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
-       einfo->ei_cb_bl  = ll_md_blocking_ast;
-       einfo->ei_cb_cp  = ldlm_completion_ast;
-       einfo->ei_cb_gl  = NULL;
-       einfo->ei_cbdata = NULL;
-
-       return minfo;
-}
-
 /* async stat for file not found in dcache */
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
@@ -813,22 +816,20 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
        if (d_mountpoint(dentry))
                RETURN(1);
 
+       minfo = sa_prep_data(dir, inode, entry);
+       if (IS_ERR(minfo))
+               RETURN(PTR_ERR(minfo));
+
        entry->se_inode = igrab(inode);
        rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
                                NULL);
        if (rc == 1) {
                entry->se_handle = it.it_lock_handle;
                ll_intent_release(&it);
+               sa_fini_data(minfo);
                RETURN(1);
        }
 
-       minfo = sa_prep_data(dir, inode, entry);
-       if (IS_ERR(minfo)) {
-               entry->se_inode = NULL;
-               iput(inode);
-               RETURN(PTR_ERR(minfo));
-       }
-
        rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
        if (rc < 0) {
                entry->se_inode = NULL;
@@ -887,10 +888,8 @@ static int ll_agl_thread(void *arg)
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct ll_statahead_info *sai;
        struct ptlrpc_thread *thread;
-       struct l_wait_info lwi = { 0 };
        ENTRY;
 
-
        sai = ll_sai_get(dir);
        thread = &sai->sai_agl_thread;
        thread->t_pid = current_pid();
@@ -908,14 +907,12 @@ static int ll_agl_thread(void *arg)
        spin_unlock(&plli->lli_agl_lock);
        wake_up(&thread->t_ctl_waitq);
 
-        while (1) {
-                l_wait_event(thread->t_ctl_waitq,
-                             !agl_list_empty(sai) ||
-                             !thread_is_running(thread),
-                             &lwi);
-
-                if (!thread_is_running(thread))
-                        break;
+       while (1) {
+               wait_event_idle(thread->t_ctl_waitq,
+                               !agl_list_empty(sai) ||
+                               !thread_is_running(thread));
+               if (!thread_is_running(thread))
+                       break;
 
                spin_lock(&plli->lli_agl_lock);
                /* The statahead thread maybe help to process AGL entries,
@@ -954,7 +951,6 @@ static int ll_agl_thread(void *arg)
 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
 {
        struct ptlrpc_thread *thread = &sai->sai_agl_thread;
-       struct l_wait_info    lwi    = { 0 };
        struct ll_inode_info  *plli;
        struct task_struct            *task;
        ENTRY;
@@ -971,9 +967,8 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
                RETURN_EXIT;
        }
 
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_running(thread) || thread_is_stopped(thread),
-                    &lwi);
+       wait_event_idle(thread->t_ctl_waitq,
+                       thread_is_running(thread) || thread_is_stopped(thread));
        EXIT;
 }
 
@@ -990,7 +985,6 @@ static int ll_statahead_thread(void *arg)
        int first = 0;
        struct md_op_data *op_data;
        struct ll_dir_chain chain;
-       struct l_wait_info lwi = { 0 };
        struct page *page = NULL;
        __u64 pos = 0;
        int rc = 0;
@@ -1003,10 +997,9 @@ static int ll_statahead_thread(void *arg)
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
 
-       op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
-                                    LUSTRE_OPC_ANY, dir);
-       if (IS_ERR(op_data))
-               GOTO(out, rc = PTR_ERR(op_data));
+       OBD_ALLOC_PTR(op_data);
+       if (!op_data)
+               GOTO(out, rc = -ENOMEM);
 
        if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
                ll_start_agl(parent, sai);
@@ -1026,8 +1019,16 @@ static int ll_statahead_thread(void *arg)
                struct lu_dirpage *dp;
                struct lu_dirent  *ent;
 
+               op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
+                                    LUSTRE_OPC_ANY, dir);
+               if (IS_ERR(op_data)) {
+                       rc = PTR_ERR(op_data);
+                       break;
+               }
+
                sai->sai_in_readpage = 1;
                page = ll_get_dir_page(dir, op_data, pos, &chain);
+               ll_unlock_md_op_lsm(op_data);
                sai->sai_in_readpage = 0;
                if (IS_ERR(page)) {
                        rc = PTR_ERR(page);
@@ -1093,12 +1094,11 @@ static int ll_statahead_thread(void *arg)
 
                        /* wait for spare statahead window */
                        do {
-                               l_wait_event(sa_thread->t_ctl_waitq,
-                                            !sa_sent_full(sai) ||
-                                            sa_has_callback(sai) ||
-                                            !agl_list_empty(sai) ||
-                                            !thread_is_running(sa_thread),
-                                            &lwi);
+                               wait_event_idle(sa_thread->t_ctl_waitq,
+                                               !sa_sent_full(sai) ||
+                                               sa_has_callback(sai) ||
+                                               !agl_list_empty(sai) ||
+                                               !thread_is_running(sa_thread));
 
                                sa_handle_callback(sai);
 
@@ -1153,10 +1153,9 @@ static int ll_statahead_thread(void *arg)
        /* statahead is finished, but statahead entries need to be cached, wait
         * for file release to stop me. */
        while (thread_is_running(sa_thread)) {
-               l_wait_event(sa_thread->t_ctl_waitq,
-                            sa_has_callback(sai) ||
-                            !thread_is_running(sa_thread),
-                            &lwi);
+               wait_event_idle(sa_thread->t_ctl_waitq,
+                               sa_has_callback(sai) ||
+                               !thread_is_running(sa_thread));
 
                sa_handle_callback(sai);
        }
@@ -1171,9 +1170,8 @@ out:
 
                CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
                       sai, (unsigned int)agl_thread->t_pid);
-               l_wait_event(agl_thread->t_ctl_waitq,
-                            thread_is_stopped(agl_thread),
-                            &lwi);
+               wait_event_idle(agl_thread->t_ctl_waitq,
+                               thread_is_stopped(agl_thread));
        } else {
                /* Set agl_thread flags anyway. */
                thread_set_flags(agl_thread, SVC_STOPPED);
@@ -1183,10 +1181,9 @@ out:
         * safely because statahead RPC will access sai data */
        while (sai->sai_sent != sai->sai_replied) {
                /* in case we're not woken up, timeout wait */
-               lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3),
-                                 NULL, NULL);
-               l_wait_event(sa_thread->t_ctl_waitq,
-                       sai->sai_sent == sai->sai_replied, &lwi);
+               wait_event_idle_timeout(sa_thread->t_ctl_waitq,
+                                       sai->sai_sent == sai->sai_replied,
+                                       cfs_time_seconds(1) >> 3);
        }
 
        /* release resources held by statahead RPCs */
@@ -1306,9 +1303,8 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
                        struct ll_inode_info *lli = ll_i2info(dir);
 
                        rc = PTR_ERR(page);
-                       CERROR("%s: reading dir "DFID" at %llu"
-                              "opendir_pid = %u : rc = %d\n",
-                              ll_get_fsname(dir->i_sb, NULL, 0),
+                       CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
+                              ll_i2sbi(dir)->ll_fsname,
                               PFID(ll_inode2fid(dir)), pos,
                               lli->lli_opendir_pid, rc);
                        break;
@@ -1412,7 +1408,6 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                        bool unplug)
 {
        struct sa_entry *entry = NULL;
-       struct l_wait_info lwi = { 0 };
        struct ll_dentry_data *ldd;
        struct ll_inode_info *lli = ll_i2info(dir);
        int rc = 0;
@@ -1460,10 +1455,9 @@ static int revalidate_statahead_dentry(struct inode *dir,
                spin_lock(&lli->lli_sa_lock);
                sai->sai_index_wait = entry->se_index;
                spin_unlock(&lli->lli_sa_lock);
-               lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
-                                      LWI_ON_SIGNAL_NOOP, NULL);
-               rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi);
-               if (rc < 0) {
+               rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
+                                            cfs_time_seconds(30));
+               if (rc == 0) {
                        /*
                         * entry may not be ready, so it may be used by inflight
                         * statahead RPC, don't free it.
@@ -1473,7 +1467,12 @@ static int revalidate_statahead_dentry(struct inode *dir,
                }
        }
 
-       if (entry->se_state == SA_ENTRY_SUCC && entry->se_inode != NULL) {
+       /*
+        * We need to see the value that was set immediately before we
+        * were woken up.
+        */
+       if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
+           entry->se_inode) {
                struct inode *inode = entry->se_inode;
                struct lookup_intent it = { .it_op = IT_GETATTR,
                                            .it_lock_handle =
@@ -1501,8 +1500,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
                                        "%s: stale dentry %.*s inode "
                                        DFID", statahead inode "DFID
                                        "\n",
-                                       ll_get_fsname((*dentryp)->d_inode->i_sb,
-                                                     NULL, 0),
+                                       ll_i2sbi(inode)->ll_fsname,
                                        (*dentryp)->d_name.len,
                                        (*dentryp)->d_name.name,
                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
@@ -1552,7 +1550,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
        struct ll_statahead_info *sai = NULL;
        struct dentry *parent = dentry->d_parent;
        struct ptlrpc_thread *thread;
-       struct l_wait_info lwi = { 0 };
        struct task_struct *task;
        struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
        int first = LS_FIRST_DE;
@@ -1607,9 +1604,8 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
                GOTO(out, rc);
        }
 
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_running(thread) || thread_is_stopped(thread),
-                    &lwi);
+       wait_event_idle(thread->t_ctl_waitq,
+                       thread_is_running(thread) || thread_is_stopped(thread));
        ll_sai_put(sai);
 
        /*