From: Mr NeilBrown Date: Wed, 23 Oct 2019 00:30:49 +0000 (+1100) Subject: LU-12780 llite: avoid ptlrpc_thread for ll_statahead_thread X-Git-Tag: 2.13.53~198 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=6bf49037b3a1345877692a6ba6d92f579a8e2442 LU-12780 llite: avoid ptlrpc_thread for ll_statahead_thread Instead of ptlrpc_thread use more direct interfaces. - Instead of waiting for thread startup to complete, perform the startup before starting the thread. - as nothing waits for the thread to finish we cannot use kthread_should_stop(). Instead, set the task pointer sai_task to NULL when the thread is finishing up. - As we don't use kthread_should_stop(), we can safely do cleanup in the thread, because it is sure to run. - use wake_up_process to signal the thread that there is work to do. - the wake_up that is currently at the end of sa_put() becomes a little more complicated and is move to after the one place where sa_put() is called. Change-Id: If694dafc6864348fe5203a4935f4c128ce5ff255 Signed-off-by: Mr NeilBrown Reviewed-on: https://review.whamcloud.com/36259 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: James Simmons Reviewed-by: Shaun Tancheff Reviewed-by: Oleg Drokin --- diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index f631f53..2ff0fd0 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -1367,7 +1367,7 @@ struct ll_statahead_info { sai_agl_valid:1,/* AGL is valid for the dir */ sai_in_readpage:1;/* statahead is in readdir()*/ wait_queue_head_t sai_waitq; /* stat-ahead wait queue */ - struct ptlrpc_thread sai_thread; /* stat-ahead thread */ + struct task_struct *sai_task; /* stat-ahead thread */ struct task_struct *sai_agl_task; /* AGL thread */ struct list_head sai_interim_entries; /* entries which got async * stat reply, but not diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index f0d33d7..25b3849 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -36,6 +36,7 @@ #include #include #include +#include #define DEBUG_SUBSYSTEM S_LLITE @@ -302,8 +303,6 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry) break; sa_kill(sai, tmp); } - - wake_up(&sai->sai_thread.t_ctl_waitq); } /* update state and sort add entry to sai_entries by index, return true if @@ -463,7 +462,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry) sai->sai_max = LL_SA_RPC_MIN; sai->sai_index = 1; init_waitqueue_head(&sai->sai_waitq); - init_waitqueue_head(&sai->sai_thread.t_ctl_waitq); INIT_LIST_HEAD(&sai->sai_interim_entries); INIT_LIST_HEAD(&sai->sai_entries); @@ -525,7 +523,7 @@ static void ll_sai_put(struct ll_statahead_info *sai) lli->lli_sai = NULL; spin_unlock(&lli->lli_sa_lock); - LASSERT(thread_is_stopped(&sai->sai_thread)); + LASSERT(!sai->sai_task); LASSERT(!sai->sai_agl_task); LASSERT(sai->sai_sent == sai->sai_replied); LASSERT(!sa_has_callback(sai)); @@ -719,7 +717,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, struct ll_statahead_info *sai = lli->lli_sai; struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata; __u64 handle = 0; - wait_queue_head_t *waitq = NULL; ENTRY; if (it_disposition(it, DISP_LOOKUP_NEG)) @@ -728,7 +725,6 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, /* because statahead thread will wait for all inflight RPC to finish, * sai should be always valid, no need to refcount */ LASSERT(sai != NULL); - LASSERT(!thread_is_stopped(&sai->sai_thread)); LASSERT(entry != NULL); CDEBUG(D_READA, "sa_entry %.*s rc %d\n", @@ -750,8 +746,10 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, spin_lock(&lli->lli_sa_lock); if (rc != 0) { if (__sa_make_ready(sai, entry, rc)) - waitq = &sai->sai_waitq; + wake_up(&sai->sai_waitq); } else { + int first = 0; + entry->se_minfo = minfo; entry->se_req = ptlrpc_request_addref(req); /* Release the async ibits lock ASAP to avoid deadlock @@ -760,14 +758,14 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, * with parent's lock held, for example: unlink. */ entry->se_handle = handle; if (!sa_has_callback(sai)) - waitq = &sai->sai_thread.t_ctl_waitq; + first = 1; list_add_tail(&entry->se_list, &sai->sai_interim_entries); + if (first && sai->sai_task) + wake_up_process(sai->sai_task); } sai->sai_replied++; - if (waitq != NULL) - wake_up(waitq); spin_unlock(&lli->lli_sa_lock); RETURN(rc); @@ -984,8 +982,7 @@ static int ll_statahead_thread(void *arg) struct inode *dir = parent->d_inode; struct ll_inode_info *lli = ll_i2info(dir); struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai; - struct ptlrpc_thread *sa_thread; + struct ll_statahead_info *sai = lli->lli_sai; int first = 0; struct md_op_data *op_data; struct ll_dir_chain chain; @@ -994,9 +991,6 @@ static int ll_statahead_thread(void *arg) int rc = 0; ENTRY; - sai = ll_sai_get(dir); - sa_thread = &sai->sai_thread; - sa_thread->t_pid = current_pid(); CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n", sai, parent->d_name.len, parent->d_name.name); @@ -1004,21 +998,8 @@ static int ll_statahead_thread(void *arg) if (!op_data) GOTO(out, rc = -ENOMEM); - if (sbi->ll_flags & LL_SBI_AGL_ENABLED) - ll_start_agl(parent, sai); - - atomic_inc(&sbi->ll_sa_total); - spin_lock(&lli->lli_sa_lock); - if (thread_is_init(sa_thread)) - /* If someone else has changed the thread state - * (e.g. already changed to SVC_STOPPING), we can't just - * blindly overwrite that setting. */ - thread_set_flags(sa_thread, SVC_RUNNING); - spin_unlock(&lli->lli_sa_lock); - wake_up(&sa_thread->t_ctl_waitq); - ll_dir_chain_init(&chain); - while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) { + while (pos != MDS_DIR_END_OFF && sai->sai_task) { struct lu_dirpage *dp; struct lu_dirent *ent; @@ -1044,7 +1025,7 @@ static int ll_statahead_thread(void *arg) dp = page_address(page); for (ent = lu_dirent_start(dp); - ent != NULL && thread_is_running(sa_thread) && + ent != NULL && sai->sai_task && !sa_low_hit(sai); ent = lu_dirent_next(ent)) { __u64 hash; @@ -1095,21 +1076,19 @@ static int ll_statahead_thread(void *arg) fid_le_to_cpu(&fid, &ent->lde_fid); - /* wait for spare statahead window */ - do { - wait_event_idle(sa_thread->t_ctl_waitq, - !sa_sent_full(sai) || - sa_has_callback(sai) || - !agl_list_empty(sai) || - !thread_is_running(sa_thread)); - - sa_handle_callback(sai); + while (({set_current_state(TASK_IDLE); + sai->sai_task; })) { + if (sa_has_callback(sai)) { + __set_current_state(TASK_RUNNING); + sa_handle_callback(sai); + } spin_lock(&lli->lli_agl_lock); while (sa_sent_full(sai) && !agl_list_empty(sai)) { struct ll_inode_info *clli; + __set_current_state(TASK_RUNNING); clli = agl_first_entry(sai); list_del_init(&clli->lli_agl_list); spin_unlock(&lli->lli_agl_lock); @@ -1120,8 +1099,12 @@ static int ll_statahead_thread(void *arg) spin_lock(&lli->lli_agl_lock); } spin_unlock(&lli->lli_agl_lock); - } while (sa_sent_full(sai) && - thread_is_running(sa_thread)); + + if (!sa_sent_full(sai)) + break; + schedule(); + } + __set_current_state(TASK_RUNNING); sa_statahead(parent, name, namelen, &fid); } @@ -1148,20 +1131,24 @@ static int ll_statahead_thread(void *arg) if (rc < 0) { spin_lock(&lli->lli_sa_lock); - thread_set_flags(sa_thread, SVC_STOPPING); + sai->sai_task = NULL; lli->lli_sa_enabled = 0; spin_unlock(&lli->lli_sa_lock); } /* statahead is finished, but statahead entries need to be cached, wait - * for file release to stop me. */ - while (thread_is_running(sa_thread)) { - wait_event_idle(sa_thread->t_ctl_waitq, - sa_has_callback(sai) || - !thread_is_running(sa_thread)); - - sa_handle_callback(sai); + * for file release to stop me. + */ + while (({set_current_state(TASK_IDLE); + sai->sai_task; })) { + if (sa_has_callback(sai)) { + __set_current_state(TASK_RUNNING); + sa_handle_callback(sai); + } else { + schedule(); + } } + __set_current_state(TASK_RUNNING); EXIT; out: @@ -1169,25 +1156,21 @@ out: /* wait for inflight statahead RPCs to finish, and then we can free sai * safely because statahead RPC will access sai data */ - while (sai->sai_sent != sai->sai_replied) { + while (sai->sai_sent != sai->sai_replied) /* in case we're not woken up, timeout wait */ - wait_event_idle_timeout(sa_thread->t_ctl_waitq, - sai->sai_sent == sai->sai_replied, - cfs_time_seconds(1) >> 3); - } + msleep(125); /* release resources held by statahead RPCs */ sa_handle_callback(sai); - spin_lock(&lli->lli_sa_lock); - thread_set_flags(sa_thread, SVC_STOPPED); - spin_unlock(&lli->lli_sa_lock); - CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n", sai, parent->d_name.len, parent->d_name.name); + spin_lock(&lli->lli_sa_lock); + sai->sai_task = NULL; + spin_unlock(&lli->lli_sa_lock); wake_up(&sai->sai_waitq); - wake_up(&sa_thread->t_ctl_waitq); + ll_sai_put(sai); return rc; @@ -1232,17 +1215,18 @@ void ll_deauthorize_statahead(struct inode *dir, void *key) lli->lli_opendir_pid = 0; lli->lli_sa_enabled = 0; sai = lli->lli_sai; - if (sai != NULL && thread_is_running(&sai->sai_thread)) { + if (sai && sai->sai_task) { /* - * statahead thread may not quit yet because it needs to cache - * entries, now it's time to tell it to quit. + * statahead thread may not have quit yet because it needs to + * cache entries, now it's time to tell it to quit. * - * In case sai is released, wake_up() is called inside spinlock, - * so we have to call smp_mb() explicitely to serialize ops. + * wake_up_process() provides the necessary barriers + * to pair with set_current_state(). */ - thread_set_flags(&sai->sai_thread, SVC_STOPPING); - smp_mb(); - wake_up(&sai->sai_thread.t_ctl_waitq); + struct task_struct *task = sai->sai_task; + + sai->sai_task = NULL; + wake_up_process(task); } spin_unlock(&lli->lli_sa_lock); } @@ -1518,6 +1502,10 @@ out: if (ldd != NULL) ldd->lld_sa_generation = lli->lli_sa_generation; sa_put(sai, entry); + spin_lock(&lli->lli_sa_lock); + if (sai->sai_task) + wake_up_process(sai->sai_task); + spin_unlock(&lli->lli_sa_lock); RETURN(rc); } @@ -1539,7 +1527,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) struct ll_inode_info *lli = ll_i2info(dir); struct ll_statahead_info *sai = NULL; struct dentry *parent = dentry->d_parent; - struct ptlrpc_thread *thread; struct task_struct *task; struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode); int first = LS_FIRST_DE; @@ -1582,9 +1569,8 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", current_pid(), parent->d_name.len, parent->d_name.name); - task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u", - lli->lli_opendir_pid); - thread = &sai->sai_thread; + task = kthread_create(ll_statahead_thread, parent, "ll_sa_%u", + lli->lli_opendir_pid); if (IS_ERR(task)) { spin_lock(&lli->lli_sa_lock); lli->lli_sai = NULL; @@ -1594,10 +1580,13 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) GOTO(out, rc); } - wait_event_idle(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread)); - ll_sai_put(sai); + if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED) + ll_start_agl(parent, sai); + atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total); + sai->sai_task = task; + + wake_up_process(task); /* * We don't stat-ahead for the first dirent since we are already in * lookup.