X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Fstatahead.c;h=6745d086d51d2cc5f056ce8f452ad58899accdff;hp=0f1dfe3ad2e0e1e28f585ee952ed27bd7df32648;hb=aaf0eb86969b41464093d36f1c0a2f128a1d9851;hpb=38e6ee6fb779c34c7ef3eb4a010332385b3b84c1 diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 0f1dfe3..6745d08 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -36,6 +36,7 @@ #include #include #include +#include #define DEBUG_SUBSYSTEM S_LLITE @@ -52,11 +53,13 @@ typedef enum { SA_ENTRY_INVA = 2, /** invalid entry */ } se_state_t; -/* sa_entry is not refcounted: statahead thread allocates it and do async stat, +/* + * sa_entry is not refcounted: statahead thread allocates it and do async stat, * and in async stat callback ll_statahead_interpret() will add it into * sai_interim_entries, later statahead thread will call sa_handle_callback() to * instantiate entry and move it into sai_entries, and then only scanner process - * can access and free it. */ + * can access and free it. + */ struct sa_entry { /* link into sai_interim_entries or sai_entries */ struct list_head se_list; @@ -82,7 +85,7 @@ struct sa_entry { struct lu_fid se_fid; }; -static unsigned int sai_generation = 0; +static unsigned int sai_generation; static DEFINE_SPINLOCK(sai_generation_lock); static inline int sa_unhashed(struct sa_entry *entry) @@ -93,6 +96,7 @@ static inline int sa_unhashed(struct sa_entry *entry) /* sa_entry is ready to use */ static inline int sa_ready(struct sa_entry *entry) { + /* Make sure sa_entry is updated and ready to use */ smp_rmb(); return (entry->se_state != SA_ENTRY_INIT); } @@ -126,9 +130,9 @@ sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry) } static inline int agl_should_run(struct ll_statahead_info *sai, - struct inode *inode) + struct inode *inode) { - return (inode != NULL && S_ISREG(inode->i_mode) && sai->sai_agl_valid); + return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task; } static inline struct ll_inode_info * @@ -163,8 +167,8 @@ static inline int agl_list_empty(struct ll_statahead_info *sai) */ static inline int sa_low_hit(struct ll_statahead_info *sai) { - return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) || - (sai->sai_consecutive_miss > 8)); + return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) || + (sai->sai_consecutive_miss > 8)); } /* @@ -173,8 +177,8 @@ static inline int sa_low_hit(struct ll_statahead_info *sai) */ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) { - return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX < - sai->sai_index); + return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX < + sai->sai_index); } /* allocate sa_entry and hash it to allow scanner process to find it */ @@ -186,11 +190,12 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index, struct sa_entry *entry; int entry_size; char *dname; + ENTRY; entry_size = sizeof(struct sa_entry) + (len & ~3) + 4; OBD_ALLOC(entry, entry_size); - if (unlikely(entry == NULL)) + if (unlikely(!entry)) RETURN(ERR_PTR(-ENOMEM)); CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n", @@ -269,7 +274,7 @@ sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry) list_del_init(&entry->se_list); spin_unlock(&lli->lli_sa_lock); - if (entry->se_inode != NULL) + if (entry->se_inode) iput(entry->se_inode); sa_free(sai, entry); @@ -281,7 +286,7 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry) { struct sa_entry *tmp, *next; - if (entry != NULL && entry->se_state == SA_ENTRY_SUCC) { + if (entry && entry->se_state == SA_ENTRY_SUCC) { struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode); sai->sai_hit++; @@ -292,22 +297,24 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry) sai->sai_consecutive_miss++; } - if (entry != NULL) + if (entry) sa_kill(sai, entry); - /* kill old completed entries, only scanner process does this, no need - * to lock */ + /* + * kill old completed entries, only scanner process does this, no need + * to lock + */ list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) { if (!is_omitted_entry(sai, tmp->se_index)) break; sa_kill(sai, tmp); } - - wake_up(&sai->sai_thread.t_ctl_waitq); } -/* update state and sort add entry to sai_entries by index, return true if - * scanner is waiting on this entry. */ +/* + * update state and sort add entry to sai_entries by index, return true if + * scanner is waiting on this entry. + */ static bool __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret) { @@ -329,7 +336,8 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret) * LU-9210: ll_statahead_interpet must be able to see this before * we wake it up */ - smp_store_release(&entry->se_state, ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + smp_store_release(&entry->se_state, + ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); return (index == sai->sai_index_wait); } @@ -356,7 +364,7 @@ sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry) struct md_op_data *op_data; OBD_ALLOC_PTR(minfo); - if (minfo == NULL) + if (!minfo) return ERR_PTR(-ENOMEM); op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, @@ -367,7 +375,7 @@ sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry) return (struct md_enqueue_info *)op_data; } - if (child == NULL) + if (!child) op_data->op_fid2 = entry->se_fid; minfo->mi_it.it_op = IT_GETATTR; @@ -420,11 +428,10 @@ sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret) /* insert inode into the list of sai_agls */ static void ll_agl_add(struct ll_statahead_info *sai, - struct inode *inode, int index) + struct inode *inode, int index) { struct ll_inode_info *child = ll_i2info(inode); struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode); - int added = 0; spin_lock(&child->lli_agl_lock); if (child->lli_agl_index == 0) { @@ -433,18 +440,19 @@ static void ll_agl_add(struct ll_statahead_info *sai, LASSERT(list_empty(&child->lli_agl_list)); - igrab(inode); spin_lock(&parent->lli_agl_lock); - if (agl_list_empty(sai)) - added = 1; - list_add_tail(&child->lli_agl_list, &sai->sai_agls); + /* Re-check under the lock */ + if (agl_should_run(sai, inode)) { + if (agl_list_empty(sai)) + wake_up_process(sai->sai_agl_task); + igrab(inode); + list_add_tail(&child->lli_agl_list, &sai->sai_agls); + } else + child->lli_agl_index = 0; spin_unlock(&parent->lli_agl_lock); } else { spin_unlock(&child->lli_agl_lock); } - - if (added > 0) - wake_up(&sai->sai_agl_thread.t_ctl_waitq); } /* allocate sai */ @@ -453,6 +461,7 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry) struct ll_statahead_info *sai; struct ll_inode_info *lli = ll_i2info(dentry->d_inode); int i; + ENTRY; OBD_ALLOC_PTR(sai); @@ -464,8 +473,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry) sai->sai_max = LL_SA_RPC_MIN; sai->sai_index = 1; init_waitqueue_head(&sai->sai_waitq); - init_waitqueue_head(&sai->sai_thread.t_ctl_waitq); - init_waitqueue_head(&sai->sai_agl_thread.t_ctl_waitq); INIT_LIST_HEAD(&sai->sai_interim_entries); INIT_LIST_HEAD(&sai->sai_entries); @@ -505,7 +512,7 @@ static inline struct ll_statahead_info *ll_sai_get(struct inode *dir) spin_lock(&lli->lli_sa_lock); sai = lli->lli_sai; - if (sai != NULL) + if (sai) atomic_inc(&sai->sai_refcount); spin_unlock(&lli->lli_sa_lock); @@ -527,8 +534,8 @@ static void ll_sai_put(struct ll_statahead_info *sai) lli->lli_sai = NULL; spin_unlock(&lli->lli_sa_lock); - LASSERT(thread_is_stopped(&sai->sai_thread)); - LASSERT(thread_is_stopped(&sai->sai_agl_thread)); + LASSERT(!sai->sai_task); + LASSERT(!sai->sai_agl_task); LASSERT(sai->sai_sent == sai->sai_replied); LASSERT(!sa_has_callback(sai)); @@ -553,72 +560,76 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) int rc; ENTRY; + LASSERT(list_empty(&lli->lli_agl_list)); - /* AGL maybe fall behind statahead with one entry */ - if (is_omitted_entry(sai, index + 1)) { - lli->lli_agl_index = 0; - iput(inode); - RETURN_EXIT; - } + /* AGL maybe fall behind statahead with one entry */ + if (is_omitted_entry(sai, index + 1)) { + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } - /* In case of restore, the MDT has the right size and has already + /* + * In case of restore, the MDT has the right size and has already * sent it back without granting the layout lock, inode is up-to-date. * Then AGL (async glimpse lock) is useless. * Also to glimpse we need the layout, in case of a runninh restore * the MDT holds the layout lock so the glimpse will block up to the - * end of restore (statahead/agl will block) */ + * end of restore (statahead/agl will block) + */ if (ll_file_test_flag(lli, LLIF_FILE_RESTORING)) { lli->lli_agl_index = 0; iput(inode); RETURN_EXIT; } - /* Someone is in glimpse (sync or async), do nothing. */ + /* Someone is in glimpse (sync or async), do nothing. */ rc = down_write_trylock(&lli->lli_glimpse_sem); - if (rc == 0) { - lli->lli_agl_index = 0; - iput(inode); - RETURN_EXIT; - } - - /* - * Someone triggered glimpse within 1 sec before. - * 1) The former glimpse succeeded with glimpse lock granted by OST, and - * if the lock is still cached on client, AGL needs to do nothing. If - * it is cancelled by other client, AGL maybe cannot obtaion new lock - * for no glimpse callback triggered by AGL. - * 2) The former glimpse succeeded, but OST did not grant glimpse lock. - * Under such case, it is quite possible that the OST will not grant - * glimpse lock for AGL also. - * 3) The former glimpse failed, compared with other two cases, it is - * relative rare. AGL can ignore such case, and it will not muchly - * affect the performance. - */ + if (rc == 0) { + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } + + /* + * Someone triggered glimpse within 1 sec before. + * 1) The former glimpse succeeded with glimpse lock granted by OST, and + * if the lock is still cached on client, AGL needs to do nothing. If + * it is cancelled by other client, AGL maybe cannot obtaion new lock + * for no glimpse callback triggered by AGL. + * 2) The former glimpse succeeded, but OST did not grant glimpse lock. + * Under such case, it is quite possible that the OST will not grant + * glimpse lock for AGL also. + * 3) The former glimpse failed, compared with other two cases, it is + * relative rare. AGL can ignore such case, and it will not muchly + * affect the performance. + */ expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC); if (ktime_to_ns(lli->lli_glimpse_time) && ktime_before(expire, lli->lli_glimpse_time)) { up_write(&lli->lli_glimpse_sem); - lli->lli_agl_index = 0; - iput(inode); - RETURN_EXIT; - } + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } - CDEBUG(D_READA, "Handling (init) async glimpse: inode = " - DFID", idx = %llu\n", PFID(&lli->lli_fid), index); + CDEBUG(D_READA, + "Handling (init) async glimpse: inode = " DFID", idx = %llu\n", + PFID(&lli->lli_fid), index); - cl_agl(inode); - lli->lli_agl_index = 0; + cl_agl(inode); + lli->lli_agl_index = 0; lli->lli_glimpse_time = ktime_get(); up_write(&lli->lli_glimpse_sem); - CDEBUG(D_READA, "Handled (init) async glimpse: inode= " - DFID", idx = %llu, rc = %d\n", - PFID(&lli->lli_fid), index, rc); + CDEBUG(D_READA, + "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n", + PFID(&lli->lli_fid), index, rc); - iput(inode); + iput(inode); - EXIT; + EXIT; } /* @@ -626,7 +637,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) * to be used by scanner process. */ static void sa_instantiate(struct ll_statahead_info *sai, - struct sa_entry *entry) + struct sa_entry *entry) { struct inode *dir = sai->sai_dentry->d_inode; struct inode *child; @@ -635,6 +646,7 @@ static void sa_instantiate(struct ll_statahead_info *sai, struct ptlrpc_request *req; struct mdt_body *body; int rc = 0; + ENTRY; LASSERT(entry->se_handle != 0); @@ -643,18 +655,18 @@ static void sa_instantiate(struct ll_statahead_info *sai, it = &minfo->mi_it; req = entry->se_req; body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - if (body == NULL) + if (!body) GOTO(out, rc = -EFAULT); child = entry->se_inode; - if (child != NULL) { - /* revalidate; unlinked and re-created with the same name */ - if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, - &body->mbo_fid1))) { + /* revalidate; unlinked and re-created with the same name */ + if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) { + if (child) { entry->se_inode = NULL; iput(child); - child = NULL; } + /* The mdt_body is invalid. Skip this entry */ + GOTO(out, rc = -EAGAIN); } it->it_lock_handle = entry->se_handle; @@ -679,9 +691,11 @@ static void sa_instantiate(struct ll_statahead_info *sai, EXIT; out: - /* sa_make_ready() will drop ldlm ibits lock refcount by calling + /* + * sa_make_ready() will drop ldlm ibits lock refcount by calling * ll_intent_drop_lock() in spite of failures. Do not worry about - * calling ll_intent_drop_lock() more than once. */ + * calling ll_intent_drop_lock() more than once. + */ sa_make_ready(sai, entry, rc); } @@ -721,16 +735,17 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, struct ll_statahead_info *sai = lli->lli_sai; struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata; __u64 handle = 0; - wait_queue_head_t *waitq = NULL; + ENTRY; if (it_disposition(it, DISP_LOOKUP_NEG)) rc = -ENOENT; - /* because statahead thread will wait for all inflight RPC to finish, - * sai should be always valid, no need to refcount */ + /* + * because statahead thread will wait for all inflight RPC to finish, + * sai should be always valid, no need to refcount + */ LASSERT(sai != NULL); - LASSERT(!thread_is_stopped(&sai->sai_thread)); LASSERT(entry != NULL); CDEBUG(D_READA, "sa_entry %.*s rc %d\n", @@ -740,10 +755,12 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, ll_intent_release(it); sa_fini_data(minfo); } else { - /* release ibits lock ASAP to avoid deadlock when statahead + /* + * release ibits lock ASAP to avoid deadlock when statahead * thread enqueues lock on parent in readdir and another * process enqueues lock on child with parent lock held, eg. - * unlink. */ + * unlink. + */ handle = it->it_lock_handle; ll_intent_drop_lock(it); ll_unlock_md_op_lsm(&minfo->mi_data); @@ -752,24 +769,28 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, spin_lock(&lli->lli_sa_lock); if (rc != 0) { if (__sa_make_ready(sai, entry, rc)) - waitq = &sai->sai_waitq; + wake_up(&sai->sai_waitq); } else { + int first = 0; + entry->se_minfo = minfo; entry->se_req = ptlrpc_request_addref(req); - /* Release the async ibits lock ASAP to avoid deadlock + /* + * Release the async ibits lock ASAP to avoid deadlock * when statahead thread tries to enqueue lock on parent * for readpage and other tries to enqueue lock on child - * with parent's lock held, for example: unlink. */ + * with parent's lock held, for example: unlink. + */ entry->se_handle = handle; if (!sa_has_callback(sai)) - waitq = &sai->sai_thread.t_ctl_waitq; + first = 1; list_add_tail(&entry->se_list, &sai->sai_interim_entries); + if (first && sai->sai_task) + wake_up_process(sai->sai_task); } sai->sai_replied++; - if (waitq != NULL) - wake_up(waitq); spin_unlock(&lli->lli_sa_lock); RETURN(rc); @@ -780,6 +801,7 @@ static int sa_lookup(struct inode *dir, struct sa_entry *entry) { struct md_enqueue_info *minfo; int rc; + ENTRY; minfo = sa_prep_data(dir, NULL, entry); @@ -808,9 +830,10 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry, .it_lock_handle = 0 }; struct md_enqueue_info *minfo; int rc; + ENTRY; - if (unlikely(inode == NULL)) + if (unlikely(!inode)) RETURN(1); if (d_mountpoint(dentry)) @@ -850,6 +873,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len, struct dentry *dentry = NULL; struct sa_entry *entry; int rc; + ENTRY; entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid); @@ -865,7 +889,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len, ll_agl_add(sai, dentry->d_inode, entry->se_index); } - if (dentry != NULL) + if (dentry) dput(dentry); if (rc != 0) @@ -885,39 +909,22 @@ static int ll_agl_thread(void *arg) struct inode *dir = parent->d_inode; struct ll_inode_info *plli = ll_i2info(dir); struct ll_inode_info *clli; - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai; - struct ptlrpc_thread *thread; - ENTRY; - - sai = ll_sai_get(dir); - thread = &sai->sai_agl_thread; - thread->t_pid = current_pid(); - CDEBUG(D_READA, "agl thread started: sai %p, parent %.*s\n", - sai, parent->d_name.len, parent->d_name.name); + /* + * We already own this reference, so it is safe to take it + * without a lock. + */ + struct ll_statahead_info *sai = plli->lli_sai; - atomic_inc(&sbi->ll_agl_total); - spin_lock(&plli->lli_agl_lock); - sai->sai_agl_valid = 1; - if (thread_is_init(thread)) - /* If someone else has changed the thread state - * (e.g. already changed to SVC_STOPPING), we can't just - * blindly overwrite that setting. */ - thread_set_flags(thread, SVC_RUNNING); - spin_unlock(&plli->lli_agl_lock); - wake_up(&thread->t_ctl_waitq); + ENTRY; - while (1) { - wait_event_idle(thread->t_ctl_waitq, - !agl_list_empty(sai) || - !thread_is_running(thread)); - if (!thread_is_running(thread)) - break; + CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n", + sai, parent); + while (({set_current_state(TASK_IDLE); + !kthread_should_stop(); })) { spin_lock(&plli->lli_agl_lock); - /* The statahead thread maybe help to process AGL entries, - * so check whether list empty again. */ if (!agl_list_empty(sai)) { + __set_current_state(TASK_RUNNING); clli = agl_first_entry(sai); list_del_init(&clli->lli_agl_list); spin_unlock(&plli->lli_agl_lock); @@ -925,11 +932,32 @@ static int ll_agl_thread(void *arg) cond_resched(); } else { spin_unlock(&plli->lli_agl_lock); + schedule(); } } + __set_current_state(TASK_RUNNING); + RETURN(0); +} + +static void ll_stop_agl(struct ll_statahead_info *sai) +{ + struct dentry *parent = sai->sai_dentry; + struct ll_inode_info *plli = ll_i2info(parent->d_inode); + struct ll_inode_info *clli; + struct task_struct *agl_task; + + spin_lock(&plli->lli_agl_lock); + agl_task = sai->sai_agl_task; + sai->sai_agl_task = NULL; + spin_unlock(&plli->lli_agl_lock); + if (!agl_task) + return; + + CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n", + sai, (unsigned int)agl_task->pid); + kthread_stop(agl_task); spin_lock(&plli->lli_agl_lock); - sai->sai_agl_valid = 0; while (!agl_list_empty(sai)) { clli = agl_first_entry(sai); list_del_init(&clli->lli_agl_list); @@ -938,37 +966,38 @@ static int ll_agl_thread(void *arg) iput(&clli->lli_vfs_inode); spin_lock(&plli->lli_agl_lock); } - thread_set_flags(thread, SVC_STOPPED); spin_unlock(&plli->lli_agl_lock); - wake_up(&thread->t_ctl_waitq); + CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n", + sai, parent); ll_sai_put(sai); - CDEBUG(D_READA, "agl thread stopped: sai %p, parent %.*s\n", - sai, parent->d_name.len, parent->d_name.name); - RETURN(0); } /* start agl thread */ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) { - struct ptlrpc_thread *thread = &sai->sai_agl_thread; - struct ll_inode_info *plli; - struct task_struct *task; + int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY); + struct ll_inode_info *plli; + struct task_struct *task; + ENTRY; - CDEBUG(D_READA, "start agl thread: sai %p, parent %.*s\n", - sai, parent->d_name.len, parent->d_name.name); + CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n", + sai, parent); plli = ll_i2info(parent->d_inode); - task = kthread_run(ll_agl_thread, parent, - "ll_agl_%u", plli->lli_opendir_pid); + task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d", + plli->lli_opendir_pid); if (IS_ERR(task)) { CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task)); - thread_set_flags(thread, SVC_STOPPED); RETURN_EXIT; } + sai->sai_agl_task = task; + atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total); + /* Get an extra reference that the thread holds */ + ll_sai_get(d_inode(parent)); + + wake_up_process(task); - wait_event_idle(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread)); EXIT; } @@ -979,61 +1008,41 @@ static int ll_statahead_thread(void *arg) struct inode *dir = parent->d_inode; struct ll_inode_info *lli = ll_i2info(dir); struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai; - struct ptlrpc_thread *sa_thread; - struct ptlrpc_thread *agl_thread; + struct ll_statahead_info *sai = lli->lli_sai; int first = 0; struct md_op_data *op_data; - struct ll_dir_chain chain; struct page *page = NULL; __u64 pos = 0; int rc = 0; + ENTRY; - sai = ll_sai_get(dir); - sa_thread = &sai->sai_thread; - agl_thread = &sai->sai_agl_thread; - sa_thread->t_pid = current_pid(); - CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n", - sai, parent->d_name.len, parent->d_name.name); + CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n", + sai, parent); OBD_ALLOC_PTR(op_data); if (!op_data) GOTO(out, rc = -ENOMEM); - if (sbi->ll_flags & LL_SBI_AGL_ENABLED) - ll_start_agl(parent, sai); - - atomic_inc(&sbi->ll_sa_total); - spin_lock(&lli->lli_sa_lock); - if (thread_is_init(sa_thread)) - /* If someone else has changed the thread state - * (e.g. already changed to SVC_STOPPING), we can't just - * blindly overwrite that setting. */ - thread_set_flags(sa_thread, SVC_RUNNING); - spin_unlock(&lli->lli_sa_lock); - wake_up(&sa_thread->t_ctl_waitq); - - ll_dir_chain_init(&chain); - while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) { + while (pos != MDS_DIR_END_OFF && sai->sai_task) { struct lu_dirpage *dp; struct lu_dirent *ent; op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0, - LUSTRE_OPC_ANY, dir); + LUSTRE_OPC_ANY, dir); if (IS_ERR(op_data)) { rc = PTR_ERR(op_data); break; } sai->sai_in_readpage = 1; - page = ll_get_dir_page(dir, op_data, pos, &chain); + page = ll_get_dir_page(dir, op_data, pos); ll_unlock_md_op_lsm(op_data); sai->sai_in_readpage = 0; if (IS_ERR(page)) { rc = PTR_ERR(page); - CDEBUG(D_READA, "error reading dir "DFID" at %llu" - "/%llu opendir_pid = %u: rc = %d\n", + CDEBUG(D_READA, + "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n", PFID(ll_inode2fid(dir)), pos, sai->sai_index, lli->lli_opendir_pid, rc); break; @@ -1041,7 +1050,7 @@ static int ll_statahead_thread(void *arg) dp = page_address(page); for (ent = lu_dirent_start(dp); - ent != NULL && thread_is_running(sa_thread) && + ent != NULL && sai->sai_task && !sa_low_hit(sai); ent = lu_dirent_next(ent)) { __u64 hash; @@ -1092,33 +1101,35 @@ static int ll_statahead_thread(void *arg) fid_le_to_cpu(&fid, &ent->lde_fid); - /* wait for spare statahead window */ - do { - wait_event_idle(sa_thread->t_ctl_waitq, - !sa_sent_full(sai) || - sa_has_callback(sai) || - !agl_list_empty(sai) || - !thread_is_running(sa_thread)); - - sa_handle_callback(sai); + while (({set_current_state(TASK_IDLE); + sai->sai_task; })) { + if (sa_has_callback(sai)) { + __set_current_state(TASK_RUNNING); + sa_handle_callback(sai); + } spin_lock(&lli->lli_agl_lock); while (sa_sent_full(sai) && !agl_list_empty(sai)) { struct ll_inode_info *clli; + __set_current_state(TASK_RUNNING); clli = agl_first_entry(sai); list_del_init(&clli->lli_agl_list); spin_unlock(&lli->lli_agl_lock); ll_agl_trigger(&clli->lli_vfs_inode, - sai); + sai); cond_resched(); spin_lock(&lli->lli_agl_lock); } spin_unlock(&lli->lli_agl_lock); - } while (sa_sent_full(sai) && - thread_is_running(sa_thread)); + + if (!sa_sent_full(sai)) + break; + schedule(); + } + __set_current_state(TASK_RUNNING); sa_statahead(parent, name, namelen, &fid); } @@ -1130,74 +1141,61 @@ static int ll_statahead_thread(void *arg) if (sa_low_hit(sai)) { rc = -EFAULT; atomic_inc(&sbi->ll_sa_wrong); - CDEBUG(D_READA, "Statahead for dir "DFID" hit " - "ratio too low: hit/miss %llu/%llu" - ", sent/replied %llu/%llu, stopping " - "statahead thread: pid %d\n", + CDEBUG(D_READA, + "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n", PFID(&lli->lli_fid), sai->sai_hit, sai->sai_miss, sai->sai_sent, - sai->sai_replied, current_pid()); + sai->sai_replied, current->pid); break; } } - ll_dir_chain_fini(&chain); ll_finish_md_op_data(op_data); if (rc < 0) { spin_lock(&lli->lli_sa_lock); - thread_set_flags(sa_thread, SVC_STOPPING); + sai->sai_task = NULL; lli->lli_sa_enabled = 0; spin_unlock(&lli->lli_sa_lock); } - /* statahead is finished, but statahead entries need to be cached, wait - * for file release to stop me. */ - while (thread_is_running(sa_thread)) { - wait_event_idle(sa_thread->t_ctl_waitq, - sa_has_callback(sai) || - !thread_is_running(sa_thread)); - - sa_handle_callback(sai); + /* + * statahead is finished, but statahead entries need to be cached, wait + * for file release to stop me. + */ + while (({set_current_state(TASK_IDLE); + sai->sai_task; })) { + if (sa_has_callback(sai)) { + __set_current_state(TASK_RUNNING); + sa_handle_callback(sai); + } else { + schedule(); + } } + __set_current_state(TASK_RUNNING); EXIT; out: - if (sai->sai_agl_valid) { - spin_lock(&lli->lli_agl_lock); - thread_set_flags(agl_thread, SVC_STOPPING); - spin_unlock(&lli->lli_agl_lock); - wake_up(&agl_thread->t_ctl_waitq); - - CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n", - sai, (unsigned int)agl_thread->t_pid); - wait_event_idle(agl_thread->t_ctl_waitq, - thread_is_stopped(agl_thread)); - } else { - /* Set agl_thread flags anyway. */ - thread_set_flags(agl_thread, SVC_STOPPED); - } + ll_stop_agl(sai); - /* wait for inflight statahead RPCs to finish, and then we can free sai - * safely because statahead RPC will access sai data */ - while (sai->sai_sent != sai->sai_replied) { + /* + * wait for inflight statahead RPCs to finish, and then we can free sai + * safely because statahead RPC will access sai data + */ + while (sai->sai_sent != sai->sai_replied) /* in case we're not woken up, timeout wait */ - wait_event_idle_timeout(sa_thread->t_ctl_waitq, - sai->sai_sent == sai->sai_replied, - cfs_time_seconds(1) >> 3); - } + msleep(125); /* release resources held by statahead RPCs */ sa_handle_callback(sai); + CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n", + sbi->ll_fsname, sai, parent); + spin_lock(&lli->lli_sa_lock); - thread_set_flags(sa_thread, SVC_STOPPED); + sai->sai_task = NULL; spin_unlock(&lli->lli_sa_lock); - - CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n", - sai, parent->d_name.len, parent->d_name.name); - wake_up(&sai->sai_waitq); - wake_up(&sa_thread->t_ctl_waitq); + ll_sai_put(sai); return rc; @@ -1209,14 +1207,14 @@ void ll_authorize_statahead(struct inode *dir, void *key) struct ll_inode_info *lli = ll_i2info(dir); spin_lock(&lli->lli_sa_lock); - if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL) { + if (!lli->lli_opendir_key && !lli->lli_sai) { /* * if lli_sai is not NULL, it means previous statahead is not * finished yet, we'd better not start a new statahead for now. */ LASSERT(lli->lli_opendir_pid == 0); lli->lli_opendir_key = key; - lli->lli_opendir_pid = current_pid(); + lli->lli_opendir_pid = current->pid; lli->lli_sa_enabled = 1; } spin_unlock(&lli->lli_sa_lock); @@ -1235,24 +1233,25 @@ void ll_deauthorize_statahead(struct inode *dir, void *key) LASSERT(lli->lli_opendir_pid != 0); CDEBUG(D_READA, "deauthorize statahead for "DFID"\n", - PFID(&lli->lli_fid)); + PFID(&lli->lli_fid)); spin_lock(&lli->lli_sa_lock); lli->lli_opendir_key = NULL; lli->lli_opendir_pid = 0; lli->lli_sa_enabled = 0; sai = lli->lli_sai; - if (sai != NULL && thread_is_running(&sai->sai_thread)) { + if (sai && sai->sai_task) { /* - * statahead thread may not quit yet because it needs to cache - * entries, now it's time to tell it to quit. + * statahead thread may not have quit yet because it needs to + * cache entries, now it's time to tell it to quit. * - * In case sai is released, wake_up() is called inside spinlock, - * so we have to call smp_mb() explicitely to serialize ops. + * wake_up_process() provides the necessary barriers + * to pair with set_current_state(). */ - thread_set_flags(&sai->sai_thread, SVC_STOPPING); - smp_mb(); - wake_up(&sai->sai_thread.t_ctl_waitq); + struct task_struct *task = sai->sai_task; + + sai->sai_task = NULL; + wake_up_process(task); } spin_unlock(&lli->lli_sa_lock); } @@ -1275,13 +1274,13 @@ enum { /* file is first dirent under @dir */ static int is_first_dirent(struct inode *dir, struct dentry *dentry) { - struct ll_dir_chain chain; struct qstr *target = &dentry->d_name; struct md_op_data *op_data; int dot_de; struct page *page = NULL; int rc = LS_NOT_FIRST_DE; __u64 pos = 0; + ENTRY; op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, @@ -1292,8 +1291,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) *FIXME choose the start offset of the readdir */ - ll_dir_chain_init(&chain); - page = ll_get_dir_page(dir, op_data, 0, &chain); + page = ll_get_dir_page(dir, op_data, 0); while (1) { struct lu_dirpage *dp; @@ -1318,8 +1316,10 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) char *name; hash = le64_to_cpu(ent->lde_hash); - /* The ll_get_dir_page() can return any page containing - * the given hash which may be not the start hash. */ + /* + * The ll_get_dir_page() can return any page containing + * the given hash which may be not the start hash. + */ if (unlikely(hash < pos)) continue; @@ -1380,14 +1380,14 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) */ ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - page = ll_get_dir_page(dir, op_data, pos, &chain); + page = ll_get_dir_page(dir, op_data, pos); } } EXIT; out: - ll_dir_chain_fini(&chain); ll_finish_md_op_data(op_data); - return rc; + + return rc; } /** @@ -1403,14 +1403,15 @@ out: * \retval negative number upon error */ static int revalidate_statahead_dentry(struct inode *dir, - struct ll_statahead_info *sai, - struct dentry **dentryp, - bool unplug) + struct ll_statahead_info *sai, + struct dentry **dentryp, + bool unplug) { struct sa_entry *entry = NULL; struct ll_dentry_data *ldd; struct ll_inode_info *lli = ll_i2info(dir); int rc = 0; + ENTRY; if ((*dentryp)->d_name.name[0] == '.') { @@ -1444,7 +1445,7 @@ static int revalidate_statahead_dentry(struct inode *dir, GOTO(out, rc = 1); entry = sa_get(sai, &(*dentryp)->d_name); - if (entry == NULL) + if (!entry) GOTO(out, rc = -EAGAIN); /* if statahead is busy in readdir, help it do post-work */ @@ -1482,7 +1483,7 @@ static int revalidate_statahead_dentry(struct inode *dir, rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode), &bits); if (rc == 1) { - if ((*dentryp)->d_inode == NULL) { + if (!(*dentryp)->d_inode) { struct dentry *alias; alias = ll_splice_alias(inode, *dentryp); @@ -1491,20 +1492,18 @@ static int revalidate_statahead_dentry(struct inode *dir, GOTO(out, rc = PTR_ERR(alias)); } *dentryp = alias; - /* statahead prepared this inode, transfer inode - * refcount from sa_entry to dentry */ + /* + * statahead prepared this inode, transfer inode + * refcount from sa_entry to dentry + */ entry->se_inode = NULL; } else if ((*dentryp)->d_inode != inode) { /* revalidate, but inode is recreated */ CDEBUG(D_READA, - "%s: stale dentry %.*s inode " - DFID", statahead inode "DFID - "\n", - ll_i2sbi(inode)->ll_fsname, - (*dentryp)->d_name.len, - (*dentryp)->d_name.name, - PFID(ll_inode2fid((*dentryp)->d_inode)), - PFID(ll_inode2fid(inode))); + "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n", + ll_i2sbi(inode)->ll_fsname, *dentryp, + PFID(ll_inode2fid((*dentryp)->d_inode)), + PFID(ll_inode2fid(inode))); ll_intent_release(&it); GOTO(out, rc = -ESTALE); } @@ -1525,9 +1524,13 @@ out: */ ldd = ll_d2d(*dentryp); /* ldd can be NULL if llite lookup failed. */ - if (ldd != NULL) + if (ldd) ldd->lld_sa_generation = lli->lli_sa_generation; sa_put(sai, entry); + spin_lock(&lli->lli_sa_lock); + if (sai->sai_task) + wake_up_process(sai->sai_task); + spin_unlock(&lli->lli_sa_lock); RETURN(rc); } @@ -1538,22 +1541,25 @@ out: * \param[in] dir parent directory * \param[in] dentry dentry that triggers statahead, normally the first * dirent under @dir + * \param[in] agl indicate whether AGL is needed * \retval -EAGAIN on success, because when this function is * called, it's already in lookup call, so client should * do it itself instead of waiting for statahead thread * to do it asynchronously. * \retval negative number upon error */ -static int start_statahead_thread(struct inode *dir, struct dentry *dentry) +static int start_statahead_thread(struct inode *dir, struct dentry *dentry, + bool agl) { + int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY); struct ll_inode_info *lli = ll_i2info(dir); struct ll_statahead_info *sai = NULL; struct dentry *parent = dentry->d_parent; - struct ptlrpc_thread *thread; struct task_struct *task; struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode); int first = LS_FIRST_DE; int rc = 0; + ENTRY; /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ @@ -1565,23 +1571,23 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) if (unlikely(atomic_inc_return(&sbi->ll_sa_running) > sbi->ll_sa_running_max)) { CDEBUG(D_READA, - "Too many concurrent statahead instances, " - "avoid new statahead instance temporarily.\n"); + "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n"); GOTO(out, rc = -EMFILE); } sai = ll_sai_alloc(parent); - if (sai == NULL) + if (!sai) GOTO(out, rc = -ENOMEM); sai->sai_ls_all = (first == LS_FIRST_DOT_DE); - /* if current lli_opendir_key was deauthorized, or dir re-opened by + /* + * if current lli_opendir_key was deauthorized, or dir re-opened by * another process, don't start statahead, otherwise the newly spawned - * statahead thread won't be notified to quit. */ + * statahead thread won't be notified to quit. + */ spin_lock(&lli->lli_sa_lock); - if (unlikely(lli->lli_sai != NULL || - lli->lli_opendir_key == NULL || + if (unlikely(lli->lli_sai || !lli->lli_opendir_key || lli->lli_opendir_pid != current->pid)) { spin_unlock(&lli->lli_sa_lock); GOTO(out, rc = -EPERM); @@ -1589,12 +1595,11 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) lli->lli_sai = sai; spin_unlock(&lli->lli_sa_lock); - CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", - current_pid(), parent->d_name.len, parent->d_name.name); + CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n", + current->pid, parent); - task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u", - lli->lli_opendir_pid); - thread = &sai->sai_thread; + task = kthread_create_on_node(ll_statahead_thread, parent, node, + "ll_sa_%u", lli->lli_opendir_pid); if (IS_ERR(task)) { spin_lock(&lli->lli_sa_lock); lli->lli_sai = NULL; @@ -1604,10 +1609,13 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) GOTO(out, rc); } - wait_event_idle(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread)); - ll_sai_put(sai); + if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED && agl) + ll_start_agl(parent, sai); + + atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total); + sai->sai_task = task; + wake_up_process(task); /* * We don't stat-ahead for the first dirent since we are already in * lookup. @@ -1615,14 +1623,16 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) RETURN(-EAGAIN); out: - /* once we start statahead thread failed, disable statahead so that - * subsequent stat won't waste time to try it. */ + /* + * once we start statahead thread failed, disable statahead so that + * subsequent stat won't waste time to try it. + */ spin_lock(&lli->lli_sa_lock); if (lli->lli_opendir_pid == current->pid) lli->lli_sa_enabled = 0; spin_unlock(&lli->lli_sa_lock); - if (sai != NULL) + if (sai) ll_sai_free(sai); if (first != LS_NOT_FIRST_DE) atomic_dec(&sbi->ll_sa_running); @@ -1630,6 +1640,26 @@ out: RETURN(rc); } +/* + * Check whether statahead for @dir was started. + */ +static inline bool ll_statahead_started(struct inode *dir, bool agl) +{ + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai; + + spin_lock(&lli->lli_sa_lock); + sai = lli->lli_sai; + if (sai && (sai->sai_agl_task != NULL) != agl) + CDEBUG(D_READA, + "%s: Statahead AGL hint changed from %d to %d\n", + ll_i2sbi(dir)->ll_fsname, + sai->sai_agl_task != NULL, agl); + spin_unlock(&lli->lli_sa_lock); + + return !!sai; +} + /** * statahead entry function, this is called when client getattr on a file, it * will start statahead thread if this is the first dir entry, else revalidate @@ -1637,6 +1667,26 @@ out: * * \param[in] dir parent directory * \param[out] dentryp dentry to getattr + * \param[in] agl whether start the agl thread + * + * \retval 1 on success + * \retval 0 revalidation from statahead cache failed, caller needs + * to getattr from server directly + * \retval negative number on error, caller often ignores this and + * then getattr from server + */ +int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl) +{ + if (!ll_statahead_started(dir, agl)) + return start_statahead_thread(dir, dentry, agl); + return 0; +} + +/** + * revalidate dentry from statahead cache. + * + * \param[in] dir parent directory + * \param[out] dentryp dentry to getattr * \param[in] unplug unplug statahead window only (normally for negative * dentry) * \retval 1 on success @@ -1645,19 +1695,18 @@ out: * \retval negative number on error, caller often ignores this and * then getattr from server */ -int ll_statahead(struct inode *dir, struct dentry **dentryp, bool unplug) +int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp, + bool unplug) { struct ll_statahead_info *sai; + int rc = 0; sai = ll_sai_get(dir); - if (sai != NULL) { - int rc; - + if (sai) { rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug); - CDEBUG(D_READA, "revalidate statahead %.*s: %d.\n", - (*dentryp)->d_name.len, (*dentryp)->d_name.name, rc); + CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n", + *dentryp, rc); ll_sai_put(sai); - return rc; } - return start_statahead_thread(dir, *dentryp); + return rc; }