#include <linux/mm.h>
#include <linux/highmem.h>
#include <linux/pagemap.h>
+#include <linux/delay.h>
#define DEBUG_SUBSYSTEM S_LLITE
break;
sa_kill(sai, tmp);
}
-
- wake_up(&sai->sai_thread.t_ctl_waitq);
}
/* update state and sort add entry to sai_entries by index, return true if
sai->sai_max = LL_SA_RPC_MIN;
sai->sai_index = 1;
init_waitqueue_head(&sai->sai_waitq);
- init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
INIT_LIST_HEAD(&sai->sai_interim_entries);
INIT_LIST_HEAD(&sai->sai_entries);
lli->lli_sai = NULL;
spin_unlock(&lli->lli_sa_lock);
- LASSERT(thread_is_stopped(&sai->sai_thread));
+ LASSERT(!sai->sai_task);
LASSERT(!sai->sai_agl_task);
LASSERT(sai->sai_sent == sai->sai_replied);
LASSERT(!sa_has_callback(sai));
struct ll_statahead_info *sai = lli->lli_sai;
struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
__u64 handle = 0;
- wait_queue_head_t *waitq = NULL;
ENTRY;
if (it_disposition(it, DISP_LOOKUP_NEG))
/* because statahead thread will wait for all inflight RPC to finish,
* sai should be always valid, no need to refcount */
LASSERT(sai != NULL);
- LASSERT(!thread_is_stopped(&sai->sai_thread));
LASSERT(entry != NULL);
CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
spin_lock(&lli->lli_sa_lock);
if (rc != 0) {
if (__sa_make_ready(sai, entry, rc))
- waitq = &sai->sai_waitq;
+ wake_up(&sai->sai_waitq);
} else {
+ int first = 0;
+
entry->se_minfo = minfo;
entry->se_req = ptlrpc_request_addref(req);
/* Release the async ibits lock ASAP to avoid deadlock
* with parent's lock held, for example: unlink. */
entry->se_handle = handle;
if (!sa_has_callback(sai))
- waitq = &sai->sai_thread.t_ctl_waitq;
+ first = 1;
list_add_tail(&entry->se_list, &sai->sai_interim_entries);
+ if (first && sai->sai_task)
+ wake_up_process(sai->sai_task);
}
sai->sai_replied++;
- if (waitq != NULL)
- wake_up(waitq);
spin_unlock(&lli->lli_sa_lock);
RETURN(rc);
struct inode *dir = parent->d_inode;
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_sb_info *sbi = ll_i2sbi(dir);
- struct ll_statahead_info *sai;
- struct ptlrpc_thread *sa_thread;
+ struct ll_statahead_info *sai = lli->lli_sai;
int first = 0;
struct md_op_data *op_data;
struct ll_dir_chain chain;
int rc = 0;
ENTRY;
- sai = ll_sai_get(dir);
- sa_thread = &sai->sai_thread;
- sa_thread->t_pid = current_pid();
CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
sai, parent->d_name.len, parent->d_name.name);
if (!op_data)
GOTO(out, rc = -ENOMEM);
- if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
- ll_start_agl(parent, sai);
-
- atomic_inc(&sbi->ll_sa_total);
- spin_lock(&lli->lli_sa_lock);
- if (thread_is_init(sa_thread))
- /* If someone else has changed the thread state
- * (e.g. already changed to SVC_STOPPING), we can't just
- * blindly overwrite that setting. */
- thread_set_flags(sa_thread, SVC_RUNNING);
- spin_unlock(&lli->lli_sa_lock);
- wake_up(&sa_thread->t_ctl_waitq);
-
ll_dir_chain_init(&chain);
- while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) {
+ while (pos != MDS_DIR_END_OFF && sai->sai_task) {
struct lu_dirpage *dp;
struct lu_dirent *ent;
dp = page_address(page);
for (ent = lu_dirent_start(dp);
- ent != NULL && thread_is_running(sa_thread) &&
+ ent != NULL && sai->sai_task &&
!sa_low_hit(sai);
ent = lu_dirent_next(ent)) {
__u64 hash;
fid_le_to_cpu(&fid, &ent->lde_fid);
- /* wait for spare statahead window */
- do {
- wait_event_idle(sa_thread->t_ctl_waitq,
- !sa_sent_full(sai) ||
- sa_has_callback(sai) ||
- !agl_list_empty(sai) ||
- !thread_is_running(sa_thread));
-
- sa_handle_callback(sai);
+ while (({set_current_state(TASK_IDLE);
+ sai->sai_task; })) {
+ if (sa_has_callback(sai)) {
+ __set_current_state(TASK_RUNNING);
+ sa_handle_callback(sai);
+ }
spin_lock(&lli->lli_agl_lock);
while (sa_sent_full(sai) &&
!agl_list_empty(sai)) {
struct ll_inode_info *clli;
+ __set_current_state(TASK_RUNNING);
clli = agl_first_entry(sai);
list_del_init(&clli->lli_agl_list);
spin_unlock(&lli->lli_agl_lock);
spin_lock(&lli->lli_agl_lock);
}
spin_unlock(&lli->lli_agl_lock);
- } while (sa_sent_full(sai) &&
- thread_is_running(sa_thread));
+
+ if (!sa_sent_full(sai))
+ break;
+ schedule();
+ }
+ __set_current_state(TASK_RUNNING);
sa_statahead(parent, name, namelen, &fid);
}
if (rc < 0) {
spin_lock(&lli->lli_sa_lock);
- thread_set_flags(sa_thread, SVC_STOPPING);
+ sai->sai_task = NULL;
lli->lli_sa_enabled = 0;
spin_unlock(&lli->lli_sa_lock);
}
/* statahead is finished, but statahead entries need to be cached, wait
- * for file release to stop me. */
- while (thread_is_running(sa_thread)) {
- wait_event_idle(sa_thread->t_ctl_waitq,
- sa_has_callback(sai) ||
- !thread_is_running(sa_thread));
-
- sa_handle_callback(sai);
+ * for file release to stop me.
+ */
+ while (({set_current_state(TASK_IDLE);
+ sai->sai_task; })) {
+ if (sa_has_callback(sai)) {
+ __set_current_state(TASK_RUNNING);
+ sa_handle_callback(sai);
+ } else {
+ schedule();
+ }
}
+ __set_current_state(TASK_RUNNING);
EXIT;
out:
/* wait for inflight statahead RPCs to finish, and then we can free sai
* safely because statahead RPC will access sai data */
- while (sai->sai_sent != sai->sai_replied) {
+ while (sai->sai_sent != sai->sai_replied)
/* in case we're not woken up, timeout wait */
- wait_event_idle_timeout(sa_thread->t_ctl_waitq,
- sai->sai_sent == sai->sai_replied,
- cfs_time_seconds(1) >> 3);
- }
+ msleep(125);
/* release resources held by statahead RPCs */
sa_handle_callback(sai);
- spin_lock(&lli->lli_sa_lock);
- thread_set_flags(sa_thread, SVC_STOPPED);
- spin_unlock(&lli->lli_sa_lock);
-
CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n",
sai, parent->d_name.len, parent->d_name.name);
+ spin_lock(&lli->lli_sa_lock);
+ sai->sai_task = NULL;
+ spin_unlock(&lli->lli_sa_lock);
wake_up(&sai->sai_waitq);
- wake_up(&sa_thread->t_ctl_waitq);
+
ll_sai_put(sai);
return rc;
lli->lli_opendir_pid = 0;
lli->lli_sa_enabled = 0;
sai = lli->lli_sai;
- if (sai != NULL && thread_is_running(&sai->sai_thread)) {
+ if (sai && sai->sai_task) {
/*
- * statahead thread may not quit yet because it needs to cache
- * entries, now it's time to tell it to quit.
+ * statahead thread may not have quit yet because it needs to
+ * cache entries, now it's time to tell it to quit.
*
- * In case sai is released, wake_up() is called inside spinlock,
- * so we have to call smp_mb() explicitely to serialize ops.
+ * wake_up_process() provides the necessary barriers
+ * to pair with set_current_state().
*/
- thread_set_flags(&sai->sai_thread, SVC_STOPPING);
- smp_mb();
- wake_up(&sai->sai_thread.t_ctl_waitq);
+ struct task_struct *task = sai->sai_task;
+
+ sai->sai_task = NULL;
+ wake_up_process(task);
}
spin_unlock(&lli->lli_sa_lock);
}
if (ldd != NULL)
ldd->lld_sa_generation = lli->lli_sa_generation;
sa_put(sai, entry);
+ spin_lock(&lli->lli_sa_lock);
+ if (sai->sai_task)
+ wake_up_process(sai->sai_task);
+ spin_unlock(&lli->lli_sa_lock);
RETURN(rc);
}
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = NULL;
struct dentry *parent = dentry->d_parent;
- struct ptlrpc_thread *thread;
struct task_struct *task;
struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
int first = LS_FIRST_DE;
CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
current_pid(), parent->d_name.len, parent->d_name.name);
- task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
- lli->lli_opendir_pid);
- thread = &sai->sai_thread;
+ task = kthread_create(ll_statahead_thread, parent, "ll_sa_%u",
+ lli->lli_opendir_pid);
if (IS_ERR(task)) {
spin_lock(&lli->lli_sa_lock);
lli->lli_sai = NULL;
GOTO(out, rc);
}
- wait_event_idle(thread->t_ctl_waitq,
- thread_is_running(thread) || thread_is_stopped(thread));
- ll_sai_put(sai);
+ if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED)
+ ll_start_agl(parent, sai);
+ atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
+ sai->sai_task = task;
+
+ wake_up_process(task);
/*
* We don't stat-ahead for the first dirent since we are already in
* lookup.