* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
}
}
list_add(&entry->se_list, pos);
- entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
+ /*
+ * LU-9210: ll_statahead_interpet must be able to see this before
+ * we wake it up
+ */
+ smp_store_release(&entry->se_state, ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
return (index == sai->sai_index_wait);
}
+/* finish async stat RPC arguments */
+static void sa_fini_data(struct md_enqueue_info *minfo)
+{
+ ll_unlock_md_op_lsm(&minfo->mi_data);
+ iput(minfo->mi_dir);
+ OBD_FREE_PTR(minfo);
+}
+
+static int ll_statahead_interpret(struct ptlrpc_request *req,
+ struct md_enqueue_info *minfo, int rc);
+
+/*
+ * prepare arguments for async stat RPC.
+ */
+static struct md_enqueue_info *
+sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
+{
+ struct md_enqueue_info *minfo;
+ struct ldlm_enqueue_info *einfo;
+ struct md_op_data *op_data;
+
+ OBD_ALLOC_PTR(minfo);
+ if (minfo == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
+ entry->se_qstr.name, entry->se_qstr.len, 0,
+ LUSTRE_OPC_ANY, NULL);
+ if (IS_ERR(op_data)) {
+ OBD_FREE_PTR(minfo);
+ return (struct md_enqueue_info *)op_data;
+ }
+
+ if (child == NULL)
+ op_data->op_fid2 = entry->se_fid;
+
+ minfo->mi_it.it_op = IT_GETATTR;
+ minfo->mi_dir = igrab(dir);
+ minfo->mi_cb = ll_statahead_interpret;
+ minfo->mi_cbdata = entry;
+
+ einfo = &minfo->mi_einfo;
+ einfo->ei_type = LDLM_IBITS;
+ einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
+ einfo->ei_cb_bl = ll_md_blocking_ast;
+ einfo->ei_cb_cp = ldlm_completion_ast;
+ einfo->ei_cb_gl = NULL;
+ einfo->ei_cbdata = NULL;
+
+ return minfo;
+}
+
/*
* release resources used in async stat RPC, update entry state and wakeup if
* scanner process it waiting on this entry.
if (minfo) {
entry->se_minfo = NULL;
ll_intent_release(&minfo->mi_it);
- iput(minfo->mi_dir);
- OBD_FREE_PTR(minfo);
+ sa_fini_data(minfo);
}
if (req) {
static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
{
struct ll_inode_info *lli = ll_i2info(inode);
- __u64 index = lli->lli_agl_index;
+ u64 index = lli->lli_agl_index;
+ ktime_t expire;
int rc;
- ENTRY;
+ ENTRY;
LASSERT(list_empty(&lli->lli_agl_list));
/* AGL maybe fall behind statahead with one entry */
* relative rare. AGL can ignore such case, and it will not muchly
* affect the performance.
*/
- if (lli->lli_glimpse_time != 0 &&
- cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) {
+ expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
+ if (ktime_to_ns(lli->lli_glimpse_time) &&
+ ktime_before(expire, lli->lli_glimpse_time)) {
up_write(&lli->lli_glimpse_sem);
lli->lli_agl_index = 0;
iput(inode);
cl_agl(inode);
lli->lli_agl_index = 0;
- lli->lli_glimpse_time = cfs_time_current();
+ lli->lli_glimpse_time = ktime_get();
up_write(&lli->lli_glimpse_sem);
CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
int rc = 0;
ENTRY;
- LASSERT(entry->se_handle != 0);
+ LASSERT(entry->se_handle != 0);
- minfo = entry->se_minfo;
- it = &minfo->mi_it;
- req = entry->se_req;
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- if (body == NULL)
- GOTO(out, rc = -EFAULT);
+ minfo = entry->se_minfo;
+ it = &minfo->mi_it;
+ req = entry->se_req;
+ body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ GOTO(out, rc = -EFAULT);
child = entry->se_inode;
if (child != NULL) {
it->it_lock_handle = entry->se_handle;
rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
- if (rc != 1)
- GOTO(out, rc = -EAGAIN);
+ if (rc != 1)
+ GOTO(out, rc = -EAGAIN);
- rc = ll_prep_inode(&child, req, dir->i_sb, it);
- if (rc)
- GOTO(out, rc);
+ rc = ll_prep_inode(&child, req, dir->i_sb, it);
+ if (rc)
+ GOTO(out, rc);
CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
- ll_get_fsname(child->i_sb, NULL, 0),
- entry->se_qstr.len, entry->se_qstr.name,
- PFID(ll_inode2fid(child)), child);
- ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
+ ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
+ entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
+ ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
- entry->se_inode = child;
+ entry->se_inode = child;
- if (agl_should_run(sai, child))
- ll_agl_add(sai, child, entry->se_index);
+ if (agl_should_run(sai, child))
+ ll_agl_add(sai, child, entry->se_index);
- EXIT;
+ EXIT;
out:
/* sa_make_ready() will drop ldlm ibits lock refcount by calling
lli = ll_i2info(sai->sai_dentry->d_inode);
+ spin_lock(&lli->lli_sa_lock);
while (sa_has_callback(sai)) {
struct sa_entry *entry;
- spin_lock(&lli->lli_sa_lock);
- if (unlikely(!sa_has_callback(sai))) {
- spin_unlock(&lli->lli_sa_lock);
- break;
- }
entry = list_entry(sai->sai_interim_entries.next,
struct sa_entry, se_list);
list_del_init(&entry->se_list);
spin_unlock(&lli->lli_sa_lock);
sa_instantiate(sai, entry);
+ spin_lock(&lli->lli_sa_lock);
}
+ spin_unlock(&lli->lli_sa_lock);
}
/*
if (rc != 0) {
ll_intent_release(it);
- iput(dir);
- OBD_FREE_PTR(minfo);
+ sa_fini_data(minfo);
} else {
/* release ibits lock ASAP to avoid deadlock when statahead
* thread enqueues lock on parent in readdir and another
* unlink. */
handle = it->it_lock_handle;
ll_intent_drop_lock(it);
+ ll_unlock_md_op_lsm(&minfo->mi_data);
}
spin_lock(&lli->lli_sa_lock);
}
sai->sai_replied++;
- smp_mb();
if (waitq != NULL)
wake_up(waitq);
spin_unlock(&lli->lli_sa_lock);
RETURN(rc);
}
-/* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo)
-{
- iput(minfo->mi_dir);
- OBD_FREE_PTR(minfo);
-}
-
-/*
- * prepare arguments for async stat RPC.
- */
-static struct md_enqueue_info *
-sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
-{
- struct md_enqueue_info *minfo;
- struct ldlm_enqueue_info *einfo;
- struct md_op_data *op_data;
-
- OBD_ALLOC_PTR(minfo);
- if (minfo == NULL)
- return ERR_PTR(-ENOMEM);
-
- op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
- entry->se_qstr.name, entry->se_qstr.len, 0,
- LUSTRE_OPC_ANY, NULL);
- if (IS_ERR(op_data)) {
- OBD_FREE_PTR(minfo);
- return (struct md_enqueue_info *)op_data;
- }
-
- if (child == NULL)
- op_data->op_fid2 = entry->se_fid;
-
- minfo->mi_it.it_op = IT_GETATTR;
- minfo->mi_dir = igrab(dir);
- minfo->mi_cb = ll_statahead_interpret;
- minfo->mi_cbdata = entry;
-
- einfo = &minfo->mi_einfo;
- einfo->ei_type = LDLM_IBITS;
- einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
- einfo->ei_cb_bl = ll_md_blocking_ast;
- einfo->ei_cb_cp = ldlm_completion_ast;
- einfo->ei_cb_gl = NULL;
- einfo->ei_cbdata = NULL;
-
- return minfo;
-}
-
/* async stat for file not found in dcache */
static int sa_lookup(struct inode *dir, struct sa_entry *entry)
{
if (d_mountpoint(dentry))
RETURN(1);
+ minfo = sa_prep_data(dir, inode, entry);
+ if (IS_ERR(minfo))
+ RETURN(PTR_ERR(minfo));
+
entry->se_inode = igrab(inode);
rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
NULL);
if (rc == 1) {
entry->se_handle = it.it_lock_handle;
ll_intent_release(&it);
+ sa_fini_data(minfo);
RETURN(1);
}
- minfo = sa_prep_data(dir, inode, entry);
- if (IS_ERR(minfo)) {
- entry->se_inode = NULL;
- iput(inode);
- RETURN(PTR_ERR(minfo));
- }
-
rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
if (rc < 0) {
entry->se_inode = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct ll_statahead_info *sai;
struct ptlrpc_thread *thread;
- struct l_wait_info lwi = { 0 };
ENTRY;
-
sai = ll_sai_get(dir);
thread = &sai->sai_agl_thread;
thread->t_pid = current_pid();
spin_unlock(&plli->lli_agl_lock);
wake_up(&thread->t_ctl_waitq);
- while (1) {
- l_wait_event(thread->t_ctl_waitq,
- !agl_list_empty(sai) ||
- !thread_is_running(thread),
- &lwi);
-
- if (!thread_is_running(thread))
- break;
+ while (1) {
+ wait_event_idle(thread->t_ctl_waitq,
+ !agl_list_empty(sai) ||
+ !thread_is_running(thread));
+ if (!thread_is_running(thread))
+ break;
spin_lock(&plli->lli_agl_lock);
/* The statahead thread maybe help to process AGL entries,
list_del_init(&clli->lli_agl_list);
spin_unlock(&plli->lli_agl_lock);
ll_agl_trigger(&clli->lli_vfs_inode, sai);
+ cond_resched();
} else {
spin_unlock(&plli->lli_agl_lock);
}
static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
{
struct ptlrpc_thread *thread = &sai->sai_agl_thread;
- struct l_wait_info lwi = { 0 };
struct ll_inode_info *plli;
struct task_struct *task;
ENTRY;
RETURN_EXIT;
}
- l_wait_event(thread->t_ctl_waitq,
- thread_is_running(thread) || thread_is_stopped(thread),
- &lwi);
+ wait_event_idle(thread->t_ctl_waitq,
+ thread_is_running(thread) || thread_is_stopped(thread));
EXIT;
}
int first = 0;
struct md_op_data *op_data;
struct ll_dir_chain chain;
- struct l_wait_info lwi = { 0 };
struct page *page = NULL;
__u64 pos = 0;
int rc = 0;
CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
sai, parent->d_name.len, parent->d_name.name);
- op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
- LUSTRE_OPC_ANY, dir);
- if (IS_ERR(op_data))
- GOTO(out, rc = PTR_ERR(op_data));
+ OBD_ALLOC_PTR(op_data);
+ if (!op_data)
+ GOTO(out, rc = -ENOMEM);
if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
ll_start_agl(parent, sai);
struct lu_dirpage *dp;
struct lu_dirent *ent;
+ op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
+ LUSTRE_OPC_ANY, dir);
+ if (IS_ERR(op_data)) {
+ rc = PTR_ERR(op_data);
+ break;
+ }
+
sai->sai_in_readpage = 1;
page = ll_get_dir_page(dir, op_data, pos, &chain);
+ ll_unlock_md_op_lsm(op_data);
sai->sai_in_readpage = 0;
if (IS_ERR(page)) {
rc = PTR_ERR(page);
/* wait for spare statahead window */
do {
- l_wait_event(sa_thread->t_ctl_waitq,
- !sa_sent_full(sai) ||
- sa_has_callback(sai) ||
- !agl_list_empty(sai) ||
- !thread_is_running(sa_thread),
- &lwi);
+ wait_event_idle(sa_thread->t_ctl_waitq,
+ !sa_sent_full(sai) ||
+ sa_has_callback(sai) ||
+ !agl_list_empty(sai) ||
+ !thread_is_running(sa_thread));
sa_handle_callback(sai);
ll_agl_trigger(&clli->lli_vfs_inode,
sai);
-
+ cond_resched();
spin_lock(&lli->lli_agl_lock);
}
spin_unlock(&lli->lli_agl_lock);
/* statahead is finished, but statahead entries need to be cached, wait
* for file release to stop me. */
while (thread_is_running(sa_thread)) {
- l_wait_event(sa_thread->t_ctl_waitq,
- sa_has_callback(sai) ||
- !thread_is_running(sa_thread),
- &lwi);
+ wait_event_idle(sa_thread->t_ctl_waitq,
+ sa_has_callback(sai) ||
+ !thread_is_running(sa_thread));
sa_handle_callback(sai);
}
CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
sai, (unsigned int)agl_thread->t_pid);
- l_wait_event(agl_thread->t_ctl_waitq,
- thread_is_stopped(agl_thread),
- &lwi);
+ wait_event_idle(agl_thread->t_ctl_waitq,
+ thread_is_stopped(agl_thread));
} else {
/* Set agl_thread flags anyway. */
thread_set_flags(agl_thread, SVC_STOPPED);
* safely because statahead RPC will access sai data */
while (sai->sai_sent != sai->sai_replied) {
/* in case we're not woken up, timeout wait */
- lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3),
- NULL, NULL);
- l_wait_event(sa_thread->t_ctl_waitq,
- sai->sai_sent == sai->sai_replied, &lwi);
+ wait_event_idle_timeout(sa_thread->t_ctl_waitq,
+ sai->sai_sent == sai->sai_replied,
+ cfs_time_seconds(1) >> 3);
}
/* release resources held by statahead RPCs */
struct ll_inode_info *lli = ll_i2info(dir);
rc = PTR_ERR(page);
- CERROR("%s: reading dir "DFID" at %llu"
- "opendir_pid = %u : rc = %d\n",
- ll_get_fsname(dir->i_sb, NULL, 0),
+ CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
+ ll_i2sbi(dir)->ll_fsname,
PFID(ll_inode2fid(dir)), pos,
lli->lli_opendir_pid, rc);
break;
bool unplug)
{
struct sa_entry *entry = NULL;
- struct l_wait_info lwi = { 0 };
struct ll_dentry_data *ldd;
struct ll_inode_info *lli = ll_i2info(dir);
int rc = 0;
spin_lock(&lli->lli_sa_lock);
sai->sai_index_wait = entry->se_index;
spin_unlock(&lli->lli_sa_lock);
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
- LWI_ON_SIGNAL_NOOP, NULL);
- rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi);
- if (rc < 0) {
+ rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
+ cfs_time_seconds(30));
+ if (rc == 0) {
/*
* entry may not be ready, so it may be used by inflight
* statahead RPC, don't free it.
}
}
- if (entry->se_state == SA_ENTRY_SUCC && entry->se_inode != NULL) {
+ /*
+ * We need to see the value that was set immediately before we
+ * were woken up.
+ */
+ if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
+ entry->se_inode) {
struct inode *inode = entry->se_inode;
struct lookup_intent it = { .it_op = IT_GETATTR,
.it_lock_handle =
"%s: stale dentry %.*s inode "
DFID", statahead inode "DFID
"\n",
- ll_get_fsname((*dentryp)->d_inode->i_sb,
- NULL, 0),
+ ll_i2sbi(inode)->ll_fsname,
(*dentryp)->d_name.len,
(*dentryp)->d_name.name,
PFID(ll_inode2fid((*dentryp)->d_inode)),
struct ll_statahead_info *sai = NULL;
struct dentry *parent = dentry->d_parent;
struct ptlrpc_thread *thread;
- struct l_wait_info lwi = { 0 };
struct task_struct *task;
- int rc;
+ struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
+ int first = LS_FIRST_DE;
+ int rc = 0;
ENTRY;
/* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
- rc = is_first_dirent(dir, dentry);
- if (rc == LS_NOT_FIRST_DE)
+ first = is_first_dirent(dir, dentry);
+ if (first == LS_NOT_FIRST_DE)
/* It is not "ls -{a}l" operation, no need statahead for it. */
GOTO(out, rc = -EFAULT);
+ if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
+ sbi->ll_sa_running_max)) {
+ CDEBUG(D_READA,
+ "Too many concurrent statahead instances, "
+ "avoid new statahead instance temporarily.\n");
+ GOTO(out, rc = -EMFILE);
+ }
+
sai = ll_sai_alloc(parent);
if (sai == NULL)
GOTO(out, rc = -ENOMEM);
- sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
+ sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
/* if current lli_opendir_key was deauthorized, or dir re-opened by
* another process, don't start statahead, otherwise the newly spawned
lli->lli_sai = sai;
spin_unlock(&lli->lli_sa_lock);
- atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
-
CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
current_pid(), parent->d_name.len, parent->d_name.name);
lli->lli_opendir_pid);
thread = &sai->sai_thread;
if (IS_ERR(task)) {
+ spin_lock(&lli->lli_sa_lock);
+ lli->lli_sai = NULL;
+ spin_unlock(&lli->lli_sa_lock);
rc = PTR_ERR(task);
CERROR("can't start ll_sa thread, rc: %d\n", rc);
GOTO(out, rc);
}
- l_wait_event(thread->t_ctl_waitq,
- thread_is_running(thread) || thread_is_stopped(thread),
- &lwi);
+ wait_event_idle(thread->t_ctl_waitq,
+ thread_is_running(thread) || thread_is_stopped(thread));
ll_sai_put(sai);
/*
/* once we start statahead thread failed, disable statahead so that
* subsequent stat won't waste time to try it. */
spin_lock(&lli->lli_sa_lock);
- lli->lli_sa_enabled = 0;
- lli->lli_sai = NULL;
+ if (lli->lli_opendir_pid == current->pid)
+ lli->lli_sa_enabled = 0;
spin_unlock(&lli->lli_sa_lock);
if (sai != NULL)
ll_sai_free(sai);
+ if (first != LS_NOT_FIRST_DE)
+ atomic_dec(&sbi->ll_sa_running);
RETURN(rc);
}