Whamcloud - gitweb
LU-12780 llite: avoid ptlrpc_thread for ll_statahead_thread 59/36259/6
authorMr NeilBrown <neilb@suse.de>
Wed, 23 Oct 2019 00:30:49 +0000 (11:30 +1100)
committerOleg Drokin <green@whamcloud.com>
Tue, 25 Feb 2020 05:51:03 +0000 (05:51 +0000)
Instead of ptlrpc_thread use more direct interfaces.

- Instead of waiting for thread startup to complete, perform
  the startup before starting the thread.
- as nothing waits for the thread to finish we cannot use
  kthread_should_stop().  Instead, set the task pointer
  sai_task to NULL when the thread is finishing up.
- As we don't use kthread_should_stop(), we can safely do cleanup
  in the thread, because it is sure to run.
- use wake_up_process to signal the thread that there
  is work to do.
- the wake_up that is currently at the end of sa_put() becomes
  a little more complicated and is move to after the one place
  where sa_put() is called.

Change-Id: If694dafc6864348fe5203a4935f4c128ce5ff255
Signed-off-by: Mr NeilBrown <neilb@suse.de>
Reviewed-on: https://review.whamcloud.com/36259
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Shaun Tancheff <shaun.tancheff@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/llite/llite_internal.h
lustre/llite/statahead.c

index f631f53..2ff0fd0 100644 (file)
@@ -1367,7 +1367,7 @@ struct ll_statahead_info {
                                sai_agl_valid:1,/* AGL is valid for the dir */
                                sai_in_readpage:1;/* statahead is in readdir()*/
        wait_queue_head_t       sai_waitq;      /* stat-ahead wait queue */
                                sai_agl_valid:1,/* AGL is valid for the dir */
                                sai_in_readpage:1;/* statahead is in readdir()*/
        wait_queue_head_t       sai_waitq;      /* stat-ahead wait queue */
-       struct ptlrpc_thread    sai_thread;     /* stat-ahead thread */
+       struct task_struct      *sai_task;      /* stat-ahead thread */
        struct task_struct      *sai_agl_task;  /* AGL thread */
        struct list_head        sai_interim_entries; /* entries which got async
                                                      * stat reply, but not
        struct task_struct      *sai_agl_task;  /* AGL thread */
        struct list_head        sai_interim_entries; /* entries which got async
                                                      * stat reply, but not
index f0d33d7..25b3849 100644 (file)
@@ -36,6 +36,7 @@
 #include <linux/mm.h>
 #include <linux/highmem.h>
 #include <linux/pagemap.h>
 #include <linux/mm.h>
 #include <linux/highmem.h>
 #include <linux/pagemap.h>
+#include <linux/delay.h>
 
 #define DEBUG_SUBSYSTEM S_LLITE
 
 
 #define DEBUG_SUBSYSTEM S_LLITE
 
@@ -302,8 +303,6 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
                        break;
                sa_kill(sai, tmp);
        }
                        break;
                sa_kill(sai, tmp);
        }
-
-       wake_up(&sai->sai_thread.t_ctl_waitq);
 }
 
 /* update state and sort add entry to sai_entries by index, return true if
 }
 
 /* update state and sort add entry to sai_entries by index, return true if
@@ -463,7 +462,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
        sai->sai_max = LL_SA_RPC_MIN;
        sai->sai_index = 1;
        init_waitqueue_head(&sai->sai_waitq);
        sai->sai_max = LL_SA_RPC_MIN;
        sai->sai_index = 1;
        init_waitqueue_head(&sai->sai_waitq);
-       init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
 
        INIT_LIST_HEAD(&sai->sai_interim_entries);
        INIT_LIST_HEAD(&sai->sai_entries);
 
        INIT_LIST_HEAD(&sai->sai_interim_entries);
        INIT_LIST_HEAD(&sai->sai_entries);
@@ -525,7 +523,7 @@ static void ll_sai_put(struct ll_statahead_info *sai)
                lli->lli_sai = NULL;
                spin_unlock(&lli->lli_sa_lock);
 
                lli->lli_sai = NULL;
                spin_unlock(&lli->lli_sa_lock);
 
-               LASSERT(thread_is_stopped(&sai->sai_thread));
+               LASSERT(!sai->sai_task);
                LASSERT(!sai->sai_agl_task);
                LASSERT(sai->sai_sent == sai->sai_replied);
                LASSERT(!sa_has_callback(sai));
                LASSERT(!sai->sai_agl_task);
                LASSERT(sai->sai_sent == sai->sai_replied);
                LASSERT(!sa_has_callback(sai));
@@ -719,7 +717,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        struct ll_statahead_info *sai = lli->lli_sai;
        struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
        __u64 handle = 0;
        struct ll_statahead_info *sai = lli->lli_sai;
        struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
        __u64 handle = 0;
-       wait_queue_head_t *waitq = NULL;
        ENTRY;
 
        if (it_disposition(it, DISP_LOOKUP_NEG))
        ENTRY;
 
        if (it_disposition(it, DISP_LOOKUP_NEG))
@@ -728,7 +725,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        /* because statahead thread will wait for all inflight RPC to finish,
         * sai should be always valid, no need to refcount */
        LASSERT(sai != NULL);
        /* because statahead thread will wait for all inflight RPC to finish,
         * sai should be always valid, no need to refcount */
        LASSERT(sai != NULL);
-       LASSERT(!thread_is_stopped(&sai->sai_thread));
        LASSERT(entry != NULL);
 
        CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
        LASSERT(entry != NULL);
 
        CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
@@ -750,8 +746,10 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        spin_lock(&lli->lli_sa_lock);
        if (rc != 0) {
                if (__sa_make_ready(sai, entry, rc))
        spin_lock(&lli->lli_sa_lock);
        if (rc != 0) {
                if (__sa_make_ready(sai, entry, rc))
-                       waitq = &sai->sai_waitq;
+                       wake_up(&sai->sai_waitq);
        } else {
        } else {
+               int first = 0;
+
                entry->se_minfo = minfo;
                entry->se_req = ptlrpc_request_addref(req);
                /* Release the async ibits lock ASAP to avoid deadlock
                entry->se_minfo = minfo;
                entry->se_req = ptlrpc_request_addref(req);
                /* Release the async ibits lock ASAP to avoid deadlock
@@ -760,14 +758,14 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
                 * with parent's lock held, for example: unlink. */
                entry->se_handle = handle;
                if (!sa_has_callback(sai))
                 * with parent's lock held, for example: unlink. */
                entry->se_handle = handle;
                if (!sa_has_callback(sai))
-                       waitq = &sai->sai_thread.t_ctl_waitq;
+                       first = 1;
 
                list_add_tail(&entry->se_list, &sai->sai_interim_entries);
 
                list_add_tail(&entry->se_list, &sai->sai_interim_entries);
+               if (first && sai->sai_task)
+                       wake_up_process(sai->sai_task);
        }
        sai->sai_replied++;
 
        }
        sai->sai_replied++;
 
-       if (waitq != NULL)
-               wake_up(waitq);
        spin_unlock(&lli->lli_sa_lock);
 
        RETURN(rc);
        spin_unlock(&lli->lli_sa_lock);
 
        RETURN(rc);
@@ -984,8 +982,7 @@ static int ll_statahead_thread(void *arg)
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_sb_info *sbi = ll_i2sbi(dir);
-       struct ll_statahead_info *sai;
-       struct ptlrpc_thread *sa_thread;
+       struct ll_statahead_info *sai = lli->lli_sai;
        int first = 0;
        struct md_op_data *op_data;
        struct ll_dir_chain chain;
        int first = 0;
        struct md_op_data *op_data;
        struct ll_dir_chain chain;
@@ -994,9 +991,6 @@ static int ll_statahead_thread(void *arg)
        int rc = 0;
        ENTRY;
 
        int rc = 0;
        ENTRY;
 
-       sai = ll_sai_get(dir);
-       sa_thread = &sai->sai_thread;
-       sa_thread->t_pid = current_pid();
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
 
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
 
@@ -1004,21 +998,8 @@ static int ll_statahead_thread(void *arg)
        if (!op_data)
                GOTO(out, rc = -ENOMEM);
 
        if (!op_data)
                GOTO(out, rc = -ENOMEM);
 
-       if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
-               ll_start_agl(parent, sai);
-
-       atomic_inc(&sbi->ll_sa_total);
-       spin_lock(&lli->lli_sa_lock);
-       if (thread_is_init(sa_thread))
-               /* If someone else has changed the thread state
-                * (e.g. already changed to SVC_STOPPING), we can't just
-                * blindly overwrite that setting. */
-               thread_set_flags(sa_thread, SVC_RUNNING);
-       spin_unlock(&lli->lli_sa_lock);
-       wake_up(&sa_thread->t_ctl_waitq);
-
        ll_dir_chain_init(&chain);
        ll_dir_chain_init(&chain);
-       while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) {
+       while (pos != MDS_DIR_END_OFF && sai->sai_task) {
                struct lu_dirpage *dp;
                struct lu_dirent  *ent;
 
                struct lu_dirpage *dp;
                struct lu_dirent  *ent;
 
@@ -1044,7 +1025,7 @@ static int ll_statahead_thread(void *arg)
 
                dp = page_address(page);
                for (ent = lu_dirent_start(dp);
 
                dp = page_address(page);
                for (ent = lu_dirent_start(dp);
-                    ent != NULL && thread_is_running(sa_thread) &&
+                    ent != NULL && sai->sai_task &&
                     !sa_low_hit(sai);
                     ent = lu_dirent_next(ent)) {
                        __u64 hash;
                     !sa_low_hit(sai);
                     ent = lu_dirent_next(ent)) {
                        __u64 hash;
@@ -1095,21 +1076,19 @@ static int ll_statahead_thread(void *arg)
 
                        fid_le_to_cpu(&fid, &ent->lde_fid);
 
 
                        fid_le_to_cpu(&fid, &ent->lde_fid);
 
-                       /* wait for spare statahead window */
-                       do {
-                               wait_event_idle(sa_thread->t_ctl_waitq,
-                                               !sa_sent_full(sai) ||
-                                               sa_has_callback(sai) ||
-                                               !agl_list_empty(sai) ||
-                                               !thread_is_running(sa_thread));
-
-                               sa_handle_callback(sai);
+                       while (({set_current_state(TASK_IDLE);
+                                sai->sai_task; })) {
+                               if (sa_has_callback(sai)) {
+                                       __set_current_state(TASK_RUNNING);
+                                       sa_handle_callback(sai);
+                               }
 
                                spin_lock(&lli->lli_agl_lock);
                                while (sa_sent_full(sai) &&
                                       !agl_list_empty(sai)) {
                                        struct ll_inode_info *clli;
 
 
                                spin_lock(&lli->lli_agl_lock);
                                while (sa_sent_full(sai) &&
                                       !agl_list_empty(sai)) {
                                        struct ll_inode_info *clli;
 
+                                       __set_current_state(TASK_RUNNING);
                                        clli = agl_first_entry(sai);
                                        list_del_init(&clli->lli_agl_list);
                                        spin_unlock(&lli->lli_agl_lock);
                                        clli = agl_first_entry(sai);
                                        list_del_init(&clli->lli_agl_list);
                                        spin_unlock(&lli->lli_agl_lock);
@@ -1120,8 +1099,12 @@ static int ll_statahead_thread(void *arg)
                                        spin_lock(&lli->lli_agl_lock);
                                }
                                spin_unlock(&lli->lli_agl_lock);
                                        spin_lock(&lli->lli_agl_lock);
                                }
                                spin_unlock(&lli->lli_agl_lock);
-                       } while (sa_sent_full(sai) &&
-                                thread_is_running(sa_thread));
+
+                               if (!sa_sent_full(sai))
+                                       break;
+                               schedule();
+                       }
+                       __set_current_state(TASK_RUNNING);
 
                        sa_statahead(parent, name, namelen, &fid);
                }
 
                        sa_statahead(parent, name, namelen, &fid);
                }
@@ -1148,20 +1131,24 @@ static int ll_statahead_thread(void *arg)
 
        if (rc < 0) {
                spin_lock(&lli->lli_sa_lock);
 
        if (rc < 0) {
                spin_lock(&lli->lli_sa_lock);
-               thread_set_flags(sa_thread, SVC_STOPPING);
+               sai->sai_task = NULL;
                lli->lli_sa_enabled = 0;
                spin_unlock(&lli->lli_sa_lock);
        }
 
        /* statahead is finished, but statahead entries need to be cached, wait
                lli->lli_sa_enabled = 0;
                spin_unlock(&lli->lli_sa_lock);
        }
 
        /* statahead is finished, but statahead entries need to be cached, wait
-        * for file release to stop me. */
-       while (thread_is_running(sa_thread)) {
-               wait_event_idle(sa_thread->t_ctl_waitq,
-                               sa_has_callback(sai) ||
-                               !thread_is_running(sa_thread));
-
-               sa_handle_callback(sai);
+        * for file release to stop me.
+        */
+       while (({set_current_state(TASK_IDLE);
+                sai->sai_task; })) {
+               if (sa_has_callback(sai)) {
+                       __set_current_state(TASK_RUNNING);
+                       sa_handle_callback(sai);
+               } else {
+                       schedule();
+               }
        }
        }
+       __set_current_state(TASK_RUNNING);
 
        EXIT;
 out:
 
        EXIT;
 out:
@@ -1169,25 +1156,21 @@ out:
 
        /* wait for inflight statahead RPCs to finish, and then we can free sai
         * safely because statahead RPC will access sai data */
 
        /* wait for inflight statahead RPCs to finish, and then we can free sai
         * safely because statahead RPC will access sai data */
-       while (sai->sai_sent != sai->sai_replied) {
+       while (sai->sai_sent != sai->sai_replied)
                /* in case we're not woken up, timeout wait */
                /* in case we're not woken up, timeout wait */
-               wait_event_idle_timeout(sa_thread->t_ctl_waitq,
-                                       sai->sai_sent == sai->sai_replied,
-                                       cfs_time_seconds(1) >> 3);
-       }
+               msleep(125);
 
        /* release resources held by statahead RPCs */
        sa_handle_callback(sai);
 
 
        /* release resources held by statahead RPCs */
        sa_handle_callback(sai);
 
-       spin_lock(&lli->lli_sa_lock);
-       thread_set_flags(sa_thread, SVC_STOPPED);
-       spin_unlock(&lli->lli_sa_lock);
-
        CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
 
        CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n",
               sai, parent->d_name.len, parent->d_name.name);
 
+       spin_lock(&lli->lli_sa_lock);
+       sai->sai_task = NULL;
+       spin_unlock(&lli->lli_sa_lock);
        wake_up(&sai->sai_waitq);
        wake_up(&sai->sai_waitq);
-       wake_up(&sa_thread->t_ctl_waitq);
+
        ll_sai_put(sai);
 
        return rc;
        ll_sai_put(sai);
 
        return rc;
@@ -1232,17 +1215,18 @@ void ll_deauthorize_statahead(struct inode *dir, void *key)
        lli->lli_opendir_pid = 0;
        lli->lli_sa_enabled = 0;
        sai = lli->lli_sai;
        lli->lli_opendir_pid = 0;
        lli->lli_sa_enabled = 0;
        sai = lli->lli_sai;
-       if (sai != NULL && thread_is_running(&sai->sai_thread)) {
+       if (sai && sai->sai_task) {
                /*
                /*
-                * statahead thread may not quit yet because it needs to cache
-                * entries, now it's time to tell it to quit.
+                * statahead thread may not have quit yet because it needs to
+                * cache entries, now it's time to tell it to quit.
                 *
                 *
-                * In case sai is released, wake_up() is called inside spinlock,
-                * so we have to call smp_mb() explicitely to serialize ops.
+                * wake_up_process() provides the necessary barriers
+                * to pair with set_current_state().
                 */
                 */
-               thread_set_flags(&sai->sai_thread, SVC_STOPPING);
-               smp_mb();
-               wake_up(&sai->sai_thread.t_ctl_waitq);
+               struct task_struct *task = sai->sai_task;
+
+               sai->sai_task = NULL;
+               wake_up_process(task);
        }
        spin_unlock(&lli->lli_sa_lock);
 }
        }
        spin_unlock(&lli->lli_sa_lock);
 }
@@ -1518,6 +1502,10 @@ out:
        if (ldd != NULL)
                ldd->lld_sa_generation = lli->lli_sa_generation;
        sa_put(sai, entry);
        if (ldd != NULL)
                ldd->lld_sa_generation = lli->lli_sa_generation;
        sa_put(sai, entry);
+       spin_lock(&lli->lli_sa_lock);
+       if (sai->sai_task)
+               wake_up_process(sai->sai_task);
+       spin_unlock(&lli->lli_sa_lock);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -1539,7 +1527,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = NULL;
        struct dentry *parent = dentry->d_parent;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = NULL;
        struct dentry *parent = dentry->d_parent;
-       struct ptlrpc_thread *thread;
        struct task_struct *task;
        struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
        int first = LS_FIRST_DE;
        struct task_struct *task;
        struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
        int first = LS_FIRST_DE;
@@ -1582,9 +1569,8 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
        CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
               current_pid(), parent->d_name.len, parent->d_name.name);
 
        CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
               current_pid(), parent->d_name.len, parent->d_name.name);
 
-       task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
-                          lli->lli_opendir_pid);
-       thread = &sai->sai_thread;
+       task = kthread_create(ll_statahead_thread, parent, "ll_sa_%u",
+                             lli->lli_opendir_pid);
        if (IS_ERR(task)) {
                spin_lock(&lli->lli_sa_lock);
                lli->lli_sai = NULL;
        if (IS_ERR(task)) {
                spin_lock(&lli->lli_sa_lock);
                lli->lli_sai = NULL;
@@ -1594,10 +1580,13 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
-       wait_event_idle(thread->t_ctl_waitq,
-                       thread_is_running(thread) || thread_is_stopped(thread));
-       ll_sai_put(sai);
+       if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED)
+               ll_start_agl(parent, sai);
 
 
+       atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
+       sai->sai_task = task;
+
+       wake_up_process(task);
        /*
         * We don't stat-ahead for the first dirent since we are already in
         * lookup.
        /*
         * We don't stat-ahead for the first dirent since we are already in
         * lookup.