cfs_spin_unlock(&sai->sai_cache_lock[i]);
}
+static inline int agl_should_run(struct ll_statahead_info *sai,
+ struct inode *inode)
+{
+ if (inode != NULL && S_ISREG(inode->i_mode) &&
+ ll_i2info(inode)->lli_smd != NULL && sai->sai_agl_valid)
+ return 1;
+ return 0;
+}
+
+static inline struct ll_sa_entry *
+sa_first_received_entry(struct ll_statahead_info *sai)
+{
+ return cfs_list_entry(sai->sai_entries_received.next,
+ struct ll_sa_entry, se_list);
+}
+
+static inline struct ll_inode_info *
+agl_first_entry(struct ll_statahead_info *sai)
+{
+ return cfs_list_entry(sai->sai_entries_agl.next,
+ struct ll_inode_info, lli_agl_list);
+}
+
+static inline int sa_sent_full(struct ll_statahead_info *sai)
+{
+ return cfs_atomic_read(&sai->sai_cache_count) >= sai->sai_max;
+}
+
static inline int sa_received_empty(struct ll_statahead_info *sai)
{
return cfs_list_empty(&sai->sai_entries_received);
}
-static inline int sa_not_full(struct ll_statahead_info *sai)
+static inline int agl_list_empty(struct ll_statahead_info *sai)
{
- return (cfs_atomic_read(&sai->sai_cache_count) < sai->sai_max);
+ return cfs_list_empty(&sai->sai_entries_agl);
}
/**
return ret;
}
+/*
+ * Insert inode into the list of sai_entries_agl.
+ */
+static void ll_agl_add(struct ll_statahead_info *sai,
+ struct inode *inode, int index)
+{
+ struct ll_inode_info *child = ll_i2info(inode);
+ struct ll_inode_info *parent = ll_i2info(sai->sai_inode);
+ int added = 0;
+
+ cfs_spin_lock(&child->lli_agl_lock);
+ if (child->lli_agl_index == 0) {
+ child->lli_agl_index = index;
+ cfs_spin_unlock(&child->lli_agl_lock);
+
+ LASSERT(cfs_list_empty(&child->lli_agl_list));
+
+ igrab(inode);
+ cfs_spin_lock(&parent->lli_agl_lock);
+ if (agl_list_empty(sai))
+ added = 1;
+ cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl);
+ cfs_spin_unlock(&parent->lli_agl_lock);
+ } else {
+ cfs_spin_unlock(&child->lli_agl_lock);
+ }
+
+ if (added > 0)
+ cfs_waitq_signal(&sai->sai_agl_thread.t_ctl_waitq);
+}
+
static struct ll_statahead_info *ll_sai_alloc(void)
{
struct ll_statahead_info *sai;
RETURN(NULL);
cfs_atomic_set(&sai->sai_refcount, 1);
+
cfs_spin_lock(&sai_generation_lock);
sai->sai_generation = ++sai_generation;
if (unlikely(sai_generation == 0))
sai->sai_generation = ++sai_generation;
cfs_spin_unlock(&sai_generation_lock);
+
sai->sai_max = LL_SA_RPC_MIN;
+ sai->sai_index = 1;
cfs_waitq_init(&sai->sai_waitq);
cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
+ cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq);
+
CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
+ CFS_INIT_LIST_HEAD(&sai->sai_entries_agl);
+
for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
CFS_INIT_LIST_HEAD(&sai->sai_cache[i]);
cfs_spin_lock_init(&sai->sai_cache_lock[i]);
}
LASSERT(lli->lli_opendir_key == NULL);
+ LASSERT(thread_is_stopped(&sai->sai_thread));
+ LASSERT(thread_is_stopped(&sai->sai_agl_thread));
+
lli->lli_sai = NULL;
lli->lli_opendir_pid = 0;
cfs_spin_unlock(&lli->lli_sa_lock);
- LASSERT(thread_is_stopped(&sai->sai_thread));
-
if (sai->sai_sent > sai->sai_replied)
CDEBUG(D_READA,"statahead for dir "DFID" does not "
"finish: [sent:"LPU64"] [replied:"LPU64"]\n",
do_sai_entry_fini(sai, entry);
LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0);
+ LASSERT(agl_list_empty(sai));
iput(inode);
OBD_FREE_PTR(sai);
EXIT;
}
+#ifndef HAVE_AGL_SUPPORT
+# define cl_agl(inode) do {} while (0)
+#endif
+
+/* Do NOT forget to drop inode refcount when into sai_entries_agl. */
+static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ __u64 index = lli->lli_agl_index;
+ int rc;
+ ENTRY;
+
+ LASSERT(cfs_list_empty(&lli->lli_agl_list));
+
+ /* AGL maybe fall behind statahead with one entry */
+ if (is_omitted_entry(sai, index + 1)) {
+ lli->lli_agl_index = 0;
+ iput(inode);
+ RETURN_EXIT;
+ }
+
+ /* Someone is in glimpse (sync or async), do nothing. */
+ rc = cfs_down_write_trylock(&lli->lli_glimpse_sem);
+ if (rc == 0) {
+ lli->lli_agl_index = 0;
+ iput(inode);
+ RETURN_EXIT;
+ }
+
+ /*
+ * Someone triggered glimpse within 1 sec before.
+ * 1) The former glimpse succeeded with glimpse lock granted by OST, and
+ * if the lock is still cached on client, AGL needs to do nothing. If
+ * it is cancelled by other client, AGL maybe cannot obtaion new lock
+ * for no glimpse callback triggered by AGL.
+ * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
+ * Under such case, it is quite possible that the OST will not grant
+ * glimpse lock for AGL also.
+ * 3) The former glimpse failed, compared with other two cases, it is
+ * relative rare. AGL can ignore such case, and it will not muchly
+ * affect the performance.
+ */
+ if (lli->lli_glimpse_time != 0 &&
+ cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) {
+ cfs_up_write(&lli->lli_glimpse_sem);
+ lli->lli_agl_index = 0;
+ iput(inode);
+ RETURN_EXIT;
+ }
+
+ CDEBUG(D_READA, "Handling (init) async glimpse: inode = "
+ DFID", idx = "LPU64"\n", PFID(&lli->lli_fid), index);
+
+ cl_agl(inode);
+ lli->lli_agl_index = 0;
+ lli->lli_glimpse_time = cfs_time_current();
+ cfs_up_write(&lli->lli_glimpse_sem);
+
+ CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
+ DFID", idx = "LPU64", rc = %d\n",
+ PFID(&lli->lli_fid), index, rc);
+
+ iput(inode);
+
+ EXIT;
+}
+
static void do_statahead_interpret(struct ll_statahead_info *sai,
struct ll_sa_entry *target)
{
cfs_spin_unlock(&lli->lli_sa_lock);
RETURN_EXIT;
} else {
- entry = cfs_list_entry(sai->sai_entries_received.next,
- struct ll_sa_entry, se_list);
+ entry = sa_first_received_entry(sai);
}
cfs_atomic_inc(&entry->se_refcount);
entry->se_inode = child;
+ if (agl_should_run(sai, child))
+ ll_agl_add(sai, child, entry->se_index);
+
EXIT;
out:
* "md_intent_getattr_async".
*/
static int sa_args_init(struct inode *dir, struct inode *child,
- struct qstr *qstr, struct md_enqueue_info **pmi,
+ struct ll_sa_entry *entry, struct md_enqueue_info **pmi,
struct ldlm_enqueue_info **pei,
struct obd_capa **pcapa)
{
- struct ll_inode_info *lli = ll_i2info(dir);
+ struct qstr *qstr = &entry->se_qstr;
+ struct ll_inode_info *lli = ll_i2info(dir);
struct md_enqueue_info *minfo;
struct ldlm_enqueue_info *einfo;
struct md_op_data *op_data;
minfo->mi_dir = igrab(dir);
minfo->mi_cb = ll_statahead_interpret;
minfo->mi_generation = lli->lli_sai->sai_generation;
- minfo->mi_cbdata = lli->lli_sai->sai_index;
+ minfo->mi_cbdata = entry->se_index;
einfo->ei_type = LDLM_IBITS;
einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
int rc;
ENTRY;
- rc = sa_args_init(dir, NULL, &entry->se_qstr, &minfo, &einfo, capas);
+ rc = sa_args_init(dir, NULL, entry, &minfo, &einfo, capas);
if (rc)
RETURN(rc);
RETURN(1);
}
- rc = sa_args_init(dir, inode, &entry->se_qstr, &minfo, &einfo, capas);
+ rc = sa_args_init(dir, inode, entry, &minfo, &einfo, capas);
if (rc) {
entry->se_inode = NULL;
iput(inode);
static void ll_statahead_one(struct dentry *parent, const char* entry_name,
int entry_name_len)
{
- struct inode *dir = parent->d_inode;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai = lli->lli_sai;
struct dentry *dentry = NULL;
struct ll_sa_entry *entry;
int rc;
RETURN_EXIT;
dentry = d_lookup(parent, &entry->se_qstr);
- if (!dentry)
+ if (!dentry) {
rc = do_sa_lookup(dir, entry);
- else
+ } else {
rc = do_sa_revalidate(dir, entry, dentry);
+ if (rc == 1 && agl_should_run(sai, dentry->d_inode))
+ ll_agl_add(sai, dentry->d_inode, entry->se_index);
+ }
if (dentry != NULL)
dput(dentry);
EXIT;
}
+static int ll_agl_thread(void *arg)
+{
+ struct dentry *parent = (struct dentry *)arg;
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *plli = ll_i2info(dir);
+ struct ll_inode_info *clli;
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai);
+ struct ptlrpc_thread *thread = &sai->sai_agl_thread;
+ struct l_wait_info lwi = { 0 };
+ ENTRY;
+
+ {
+ char pname[16];
+ snprintf(pname, 15, "ll_agl_%u", plli->lli_opendir_pid);
+ cfs_daemonize(pname);
+ }
+
+ CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+
+ atomic_inc(&sbi->ll_agl_total);
+ cfs_spin_lock(&plli->lli_agl_lock);
+ sai->sai_agl_valid = 1;
+ thread_set_flags(thread, SVC_RUNNING);
+ cfs_spin_unlock(&plli->lli_agl_lock);
+ cfs_waitq_signal(&thread->t_ctl_waitq);
+
+ while (1) {
+ l_wait_event(thread->t_ctl_waitq,
+ !agl_list_empty(sai) ||
+ !thread_is_running(thread),
+ &lwi);
+
+ if (!thread_is_running(thread))
+ break;
+
+ cfs_spin_lock(&plli->lli_agl_lock);
+ /* The statahead thread maybe help to process AGL entries,
+ * so check whether list empty again. */
+ if (!agl_list_empty(sai)) {
+ clli = agl_first_entry(sai);
+ cfs_list_del_init(&clli->lli_agl_list);
+ cfs_spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode, sai);
+ } else {
+ cfs_spin_unlock(&plli->lli_agl_lock);
+ }
+ }
+
+ cfs_spin_lock(&plli->lli_agl_lock);
+ sai->sai_agl_valid = 0;
+ while (!agl_list_empty(sai)) {
+ clli = agl_first_entry(sai);
+ cfs_list_del_init(&clli->lli_agl_list);
+ cfs_spin_unlock(&plli->lli_agl_lock);
+ clli->lli_agl_index = 0;
+ iput(&clli->lli_vfs_inode);
+ cfs_spin_lock(&plli->lli_agl_lock);
+ }
+ thread_set_flags(thread, SVC_STOPPED);
+ cfs_spin_unlock(&plli->lli_agl_lock);
+ cfs_waitq_signal(&thread->t_ctl_waitq);
+ ll_sai_put(sai);
+ CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+ RETURN(0);
+}
+
+static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
+{
+ struct ptlrpc_thread *thread = &sai->sai_agl_thread;
+ struct l_wait_info lwi = { 0 };
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+
+ rc = cfs_create_thread(ll_agl_thread, parent, 0);
+ if (rc < 0) {
+ CERROR("can't start ll_agl thread, rc: %d\n", rc);
+ RETURN_EXIT;
+ }
+
+ l_wait_event(thread->t_ctl_waitq,
+ thread_is_running(thread) || thread_is_stopped(thread),
+ &lwi);
+ EXIT;
+}
+
static int ll_statahead_thread(void *arg)
{
struct dentry *parent = (struct dentry *)arg;
- struct inode *dir = parent->d_inode;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_sb_info *sbi = ll_i2sbi(dir);
- struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *plli = ll_i2info(dir);
+ struct ll_inode_info *clli;
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai);
struct ptlrpc_thread *thread = &sai->sai_thread;
struct page *page;
- __u64 pos = 0;
- int first = 0;
- int rc = 0;
+ __u64 pos = 0;
+ int first = 0;
+ int rc = 0;
struct ll_dir_chain chain;
+ struct l_wait_info lwi = { 0 };
ENTRY;
{
char pname[16];
- snprintf(pname, 15, "ll_sa_%u", lli->lli_opendir_pid);
+ snprintf(pname, 15, "ll_sa_%u", plli->lli_opendir_pid);
cfs_daemonize(pname);
}
+ CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+
+ if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
+ ll_start_agl(parent, sai);
+
atomic_inc(&sbi->ll_sa_total);
- cfs_spin_lock(&lli->lli_sa_lock);
+ cfs_spin_lock(&plli->lli_sa_lock);
thread_set_flags(thread, SVC_RUNNING);
- cfs_spin_unlock(&lli->lli_sa_lock);
+ cfs_spin_unlock(&plli->lli_sa_lock);
cfs_waitq_signal(&thread->t_ctl_waitq);
- CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
- lli->lli_sa_pos = 0;
+ plli->lli_sa_pos = 0;
ll_dir_chain_init(&chain);
page = ll_get_dir_page(NULL, dir, pos, &chain);
while (1) {
- struct l_wait_info lwi = { 0 };
struct lu_dirpage *dp;
struct lu_dirent *ent;
CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
"/"LPU64": [rc %d] [parent %u]\n",
PFID(ll_inode2fid(dir)), pos, sai->sai_index,
- rc, lli->lli_opendir_pid);
+ rc, plli->lli_opendir_pid);
GOTO(out, rc);
}
if (unlikely(++first == 1))
continue;
-keep_de:
+keep_it:
l_wait_event(thread->t_ctl_waitq,
- sa_not_full(sai) ||
+ !sa_sent_full(sai) ||
!sa_received_empty(sai) ||
+ !agl_list_empty(sai) ||
!thread_is_running(thread),
&lwi);
+interpret_it:
while (!sa_received_empty(sai))
do_statahead_interpret(sai, NULL);
if (unlikely(!thread_is_running(thread))) {
ll_release_page(page, 0);
- GOTO(out, rc);
+ GOTO(out, rc = 0);
}
- if (!sa_not_full(sai))
- /*
- * do not skip the current de.
- */
- goto keep_de;
+ /* If no window for metadata statahead, but there are
+ * some AGL entries to be triggered, then try to help
+ * to process the AGL entries. */
+ if (sa_sent_full(sai)) {
+ cfs_spin_lock(&plli->lli_agl_lock);
+ while (!agl_list_empty(sai)) {
+ clli = agl_first_entry(sai);
+ cfs_list_del_init(&clli->lli_agl_list);
+ cfs_spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode,
+ sai);
+
+ if (!sa_received_empty(sai))
+ goto interpret_it;
+
+ if (unlikely(
+ !thread_is_running(thread))) {
+ ll_release_page(page, 0);
+ GOTO(out, rc = 0);
+ }
+
+ if (!sa_sent_full(sai))
+ goto do_it;
+
+ cfs_spin_lock(&plli->lli_agl_lock);
+ }
+ cfs_spin_unlock(&plli->lli_agl_lock);
+
+ goto keep_it;
+ }
+do_it:
ll_statahead_one(parent, name, namelen);
}
pos = le64_to_cpu(dp->ldp_hash_end);
while (!sa_received_empty(sai))
do_statahead_interpret(sai, NULL);
- if ((sai->sai_sent == sai->sai_replied &&
- sa_received_empty(sai)) ||
- !thread_is_running(thread))
+ if (unlikely(!thread_is_running(thread)))
GOTO(out, rc = 0);
+
+ if (sai->sai_sent == sai->sai_replied &&
+ sa_received_empty(sai))
+ break;
+ }
+
+ cfs_spin_lock(&plli->lli_agl_lock);
+ while (!agl_list_empty(sai) &&
+ thread_is_running(thread)) {
+ clli = agl_first_entry(sai);
+ cfs_list_del_init(&clli->lli_agl_list);
+ cfs_spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode, sai);
+ cfs_spin_lock(&plli->lli_agl_lock);
}
+ cfs_spin_unlock(&plli->lli_agl_lock);
+
+ GOTO(out, rc = 0);
} else if (1) {
/*
* chain is exhausted.
*/
ll_release_page(page, le32_to_cpu(dp->ldp_flags) &
LDF_COLLIDE);
- lli->lli_sa_pos = pos;
+ plli->lli_sa_pos = pos;
sai->sai_in_readpage = 1;
page = ll_get_dir_page(NULL, dir, pos, &chain);
sai->sai_in_readpage = 0;
EXIT;
out:
+ if (sai->sai_agl_valid) {
+ struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread;
+
+ cfs_spin_lock(&plli->lli_agl_lock);
+ thread_set_flags(agl_thread, SVC_STOPPING);
+ cfs_spin_unlock(&plli->lli_agl_lock);
+ cfs_waitq_signal(&agl_thread->t_ctl_waitq);
+
+ CDEBUG(D_READA, "stop agl thread: [pid %d]\n",
+ cfs_curproc_pid());
+ l_wait_event(agl_thread->t_ctl_waitq,
+ thread_is_stopped(agl_thread),
+ &lwi);
+ }
+
ll_dir_chain_fini(&chain);
- cfs_spin_lock(&lli->lli_sa_lock);
+ cfs_spin_lock(&plli->lli_sa_lock);
if (!sa_received_empty(sai)) {
thread_set_flags(thread, SVC_STOPPING);
- cfs_spin_unlock(&lli->lli_sa_lock);
+ cfs_spin_unlock(&plli->lli_sa_lock);
/* To release the resources held by received entries. */
while (!sa_received_empty(sai))
do_statahead_interpret(sai, NULL);
- cfs_spin_lock(&lli->lli_sa_lock);
+ cfs_spin_lock(&plli->lli_sa_lock);
}
thread_set_flags(thread, SVC_STOPPED);
- cfs_spin_unlock(&lli->lli_sa_lock);
+ cfs_spin_unlock(&plli->lli_sa_lock);
cfs_waitq_signal(&sai->sai_waitq);
cfs_waitq_signal(&thread->t_ctl_waitq);
ll_sai_put(sai);
dput(parent);
- CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
- cfs_curproc_pid());
+ CDEBUG(D_READA, "statahead thread stopped: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
return rc;
}
cfs_spin_unlock(&lli->lli_sa_lock);
cfs_waitq_signal(&thread->t_ctl_waitq);
- CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
+ CDEBUG(D_READA, "stop statahead thread: [pid %d]\n",
cfs_curproc_pid());
l_wait_event(thread->t_ctl_waitq,
thread_is_stopped(thread),
static int is_first_dirent(struct inode *dir, struct dentry *dentry)
{
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_dir_chain chain;
- struct qstr *target = &dentry->d_name;
- struct page *page;
- __u64 pos = 0;
- int dot_de;
- int rc = LS_NONE_FIRST_DE;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_dir_chain chain;
+ struct qstr *target = &dentry->d_name;
+ struct page *page;
+ __u64 pos = 0;
+ int dot_de;
+ int rc = LS_NONE_FIRST_DE;
ENTRY;
lli->lli_sa_pos = 0;
{
struct ptlrpc_thread *thread = &sai->sai_thread;
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
- int hit;
+ int hit;
ENTRY;
if (entry != NULL && entry->se_stat == SA_ENTRY_SUCC)
GOTO(out, rc = -EAGAIN);
}
+ CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+
lli->lli_sai = sai;
rc = cfs_create_thread(ll_statahead_thread, parent, 0);
thread = &sai->sai_thread;