* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
} se_stat_t;
struct ll_sa_entry {
- /* link into sai->sai_entries_{sent,received,stated} */
- cfs_list_t se_list;
- /* link into sai hash table locally */
- cfs_list_t se_hash;
- /* entry reference count */
- cfs_atomic_t se_refcount;
- /* entry index in the sai */
- __u64 se_index;
- /* low layer ldlm lock handle */
- __u64 se_handle;
- /* entry status */
- se_stat_t se_stat;
- /* entry size, contains name */
- int se_size;
- /* pointer to async getattr enqueue info */
- struct md_enqueue_info *se_minfo;
- /* pointer to the async getattr request */
- struct ptlrpc_request *se_req;
- /* pointer to the target inode */
- struct inode *se_inode;
- /* entry name */
- struct qstr se_qstr;
+ /* link into sai->sai_entries */
+ cfs_list_t se_link;
+ /* link into sai->sai_entries_{received,stated} */
+ cfs_list_t se_list;
+ /* link into sai hash table locally */
+ cfs_list_t se_hash;
+ /* entry reference count */
+ cfs_atomic_t se_refcount;
+ /* entry index in the sai */
+ __u64 se_index;
+ /* low layer ldlm lock handle */
+ __u64 se_handle;
+ /* entry status */
+ se_stat_t se_stat;
+ /* entry size, contains name */
+ int se_size;
+ /* pointer to async getattr enqueue info */
+ struct md_enqueue_info *se_minfo;
+ /* pointer to the async getattr request */
+ struct ptlrpc_request *se_req;
+ /* pointer to the target inode */
+ struct inode *se_inode;
+ /* entry name */
+ struct qstr se_qstr;
};
static unsigned int sai_generation = 0;
static DEFINE_SPINLOCK(sai_generation_lock);
-static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry)
-{
- return cfs_list_empty(&entry->se_list);
-}
-
static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry)
{
return cfs_list_empty(&entry->se_hash);
}
/*
- * Insert it into sai_entries_sent tail when init.
+ * Insert it into sai_entries tail when init.
*/
static struct ll_sa_entry *
ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index,
if (unlikely(entry == NULL))
RETURN(ERR_PTR(-ENOMEM));
- CDEBUG(D_READA, "alloc sai entry %.*s(%p) index "LPU64"\n",
- len, name, entry, index);
+ CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n",
+ len, name, entry, index);
entry->se_index = index;
lli = ll_i2info(sai->sai_inode);
spin_lock(&lli->lli_sa_lock);
- cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent);
+ cfs_list_add_tail(&entry->se_link, &sai->sai_entries);
+ CFS_INIT_LIST_HEAD(&entry->se_list);
+ ll_sa_entry_enhash(sai, entry);
spin_unlock(&lli->lli_sa_lock);
cfs_atomic_inc(&sai->sai_cache_count);
- ll_sa_entry_enhash(sai, entry);
RETURN(entry);
}
static struct ll_sa_entry *
ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index)
{
- struct ll_sa_entry *entry;
+ struct ll_sa_entry *entry;
- cfs_list_for_each_entry(entry, &sai->sai_entries_sent, se_list) {
- if (entry->se_index == index) {
- cfs_atomic_inc(&entry->se_refcount);
- return entry;
+ cfs_list_for_each_entry(entry, &sai->sai_entries, se_link) {
+ if (entry->se_index == index) {
+ LASSERT(atomic_read(&entry->se_refcount) > 0);
+ cfs_atomic_inc(&entry->se_refcount);
+ return entry;
}
- if (entry->se_index > index)
- break;
- }
- return NULL;
+ if (entry->se_index > index)
+ break;
+ }
+ return NULL;
}
static void ll_sa_entry_cleanup(struct ll_statahead_info *sai,
static void ll_sa_entry_put(struct ll_statahead_info *sai,
struct ll_sa_entry *entry)
{
- if (cfs_atomic_dec_and_test(&entry->se_refcount)) {
- CDEBUG(D_READA, "free sai entry %.*s(%p) index "LPU64"\n",
- entry->se_qstr.len, entry->se_qstr.name, entry,
- entry->se_index);
+ if (cfs_atomic_dec_and_test(&entry->se_refcount)) {
+ CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n",
+ entry->se_qstr.len, entry->se_qstr.name, entry,
+ entry->se_index);
- LASSERT(ll_sa_entry_unhashed(entry));
- LASSERT(ll_sa_entry_unlinked(entry));
+ LASSERT(cfs_list_empty(&entry->se_link));
+ LASSERT(cfs_list_empty(&entry->se_list));
+ LASSERT(ll_sa_entry_unhashed(entry));
- ll_sa_entry_cleanup(sai, entry);
- if (entry->se_inode)
- iput(entry->se_inode);
+ ll_sa_entry_cleanup(sai, entry);
+ if (entry->se_inode)
+ iput(entry->se_inode);
- OBD_FREE(entry, entry->se_size);
- cfs_atomic_dec(&sai->sai_cache_count);
- }
+ OBD_FREE(entry, entry->se_size);
+ cfs_atomic_dec(&sai->sai_cache_count);
+ }
}
static inline void
-do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
+do_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
+ LASSERT(!ll_sa_entry_unhashed(entry));
+ LASSERT(!cfs_list_empty(&entry->se_link));
+
ll_sa_entry_unhash(sai, entry);
spin_lock(&lli->lli_sa_lock);
entry->se_stat = SA_ENTRY_DEST;
- if (likely(!ll_sa_entry_unlinked(entry)))
+ cfs_list_del_init(&entry->se_link);
+ if (likely(!cfs_list_empty(&entry->se_list)))
cfs_list_del_init(&entry->se_list);
spin_unlock(&lli->lli_sa_lock);
static void
ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
{
- struct ll_sa_entry *pos, *next;
+ struct ll_sa_entry *pos, *next;
- if (entry)
- do_sai_entry_fini(sai, entry);
+ if (entry)
+ do_sa_entry_fini(sai, entry);
- /* drop old entry from sent list */
- cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_sent,
- se_list) {
- if (is_omitted_entry(sai, pos->se_index))
- do_sai_entry_fini(sai, pos);
- else
- break;
- }
-
- /* drop old entry from stated list */
- cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_stated,
- se_list) {
- if (is_omitted_entry(sai, pos->se_index))
- do_sai_entry_fini(sai, pos);
- else
- break;
- }
+ /* drop old entry, only 'scanner' process does this, no need to lock */
+ cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) {
+ if (!is_omitted_entry(sai, pos->se_index))
+ break;
+ do_sa_entry_fini(sai, pos);
+ }
}
/*
* Inside lli_sa_lock.
*/
static void
-do_sai_entry_to_stated(struct ll_statahead_info *sai,
- struct ll_sa_entry *entry, int rc)
+do_sa_entry_to_stated(struct ll_statahead_info *sai,
+ struct ll_sa_entry *entry, se_stat_t stat)
{
- struct ll_sa_entry *se;
- cfs_list_t *pos = &sai->sai_entries_stated;
+ struct ll_sa_entry *se;
+ cfs_list_t *pos = &sai->sai_entries_stated;
- if (!ll_sa_entry_unlinked(entry))
- cfs_list_del_init(&entry->se_list);
+ if (!cfs_list_empty(&entry->se_list))
+ cfs_list_del_init(&entry->se_list);
- cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
- if (se->se_index < entry->se_index) {
- pos = &se->se_list;
- break;
- }
- }
+ cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
+ if (se->se_index < entry->se_index) {
+ pos = &se->se_list;
+ break;
+ }
+ }
- cfs_list_add(&entry->se_list, pos);
- entry->se_stat = rc;
+ cfs_list_add(&entry->se_list, pos);
+ entry->se_stat = stat;
}
/*
*/
static int
ll_sa_entry_to_stated(struct ll_statahead_info *sai,
- struct ll_sa_entry *entry, int rc)
+ struct ll_sa_entry *entry, se_stat_t stat)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
int ret = 1;
spin_lock(&lli->lli_sa_lock);
if (likely(entry->se_stat != SA_ENTRY_DEST)) {
- do_sai_entry_to_stated(sai, entry, rc);
+ do_sa_entry_to_stated(sai, entry, stat);
ret = 0;
}
spin_unlock(&lli->lli_sa_lock);
cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq);
- CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
+ CFS_INIT_LIST_HEAD(&sai->sai_entries);
CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
CFS_INIT_LIST_HEAD(&sai->sai_entries_agl);
PFID(&lli->lli_fid),
sai->sai_sent, sai->sai_replied);
- cfs_list_for_each_entry_safe(entry, next,
- &sai->sai_entries_sent, se_list)
- do_sai_entry_fini(sai, entry);
+ cfs_list_for_each_entry_safe(entry, next,
+ &sai->sai_entries, se_link)
+ do_sa_entry_fini(sai, entry);
- LASSERT(sa_received_empty(sai));
-
- cfs_list_for_each_entry_safe(entry, next,
- &sai->sai_entries_stated, se_list)
- do_sai_entry_fini(sai, entry);
+ LASSERT(list_empty(&sai->sai_entries));
+ LASSERT(sa_received_empty(sai));
+ LASSERT(list_empty(&sai->sai_entries_stated));
LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0);
LASSERT(agl_list_empty(sai));
EXIT;
}
-static void do_statahead_interpret(struct ll_statahead_info *sai,
- struct ll_sa_entry *target)
+static void ll_post_statahead(struct ll_statahead_info *sai)
{
struct inode *dir = sai->sai_inode;
struct inode *child;
ENTRY;
spin_lock(&lli->lli_sa_lock);
- if (target != NULL && target->se_req != NULL &&
- !cfs_list_empty(&target->se_list)) {
- entry = target;
- } else if (unlikely(sa_received_empty(sai))) {
+ if (unlikely(sa_received_empty(sai))) {
spin_unlock(&lli->lli_sa_lock);
RETURN_EXIT;
- } else {
- entry = sa_first_received_entry(sai);
}
-
+ entry = sa_first_received_entry(sai);
cfs_atomic_inc(&entry->se_refcount);
cfs_list_del_init(&entry->se_list);
spin_unlock(&lli->lli_sa_lock);
}
it->d.lustre.it_lock_handle = entry->se_handle;
- rc = md_revalidate_lock(ll_i2mdexp(dir), it, NULL, NULL);
+ rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
if (rc != 1)
GOTO(out, rc = -EAGAIN);
* reference count by calling "ll_intent_drop_lock()" in spite of the
* above operations failed or not. Do not worry about calling
* "ll_intent_drop_lock()" more than once. */
- rc = ll_sa_entry_to_stated(sai, entry, rc < 0 ? rc : SA_ENTRY_SUCC);
- if (rc == 0 && entry->se_index == sai->sai_index_wait && target == NULL)
+ rc = ll_sa_entry_to_stated(sai, entry,
+ rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
+ if (rc == 0 && entry->se_index == sai->sai_index_wait)
cfs_waitq_signal(&sai->sai_waitq);
ll_sa_entry_put(sai, entry);
}
GOTO(out, rc = -EIDRM);
}
- cfs_list_del_init(&entry->se_list);
if (rc != 0) {
- sai->sai_replied++;
- do_sai_entry_to_stated(sai, entry, rc);
- spin_unlock(&lli->lli_sa_lock);
- if (entry->se_index == sai->sai_index_wait)
- cfs_waitq_signal(&sai->sai_waitq);
+ do_sa_entry_to_stated(sai, entry, SA_ENTRY_INVA);
+ wakeup = (entry->se_index == sai->sai_index_wait);
} else {
- entry->se_minfo = minfo;
- entry->se_req = ptlrpc_request_addref(req);
- /* Release the async ibits lock ASAP to avoid deadlock
- * when statahead thread tries to enqueue lock on parent
- * for readpage and other tries to enqueue lock on child
- * with parent's lock held, for example: unlink. */
- entry->se_handle = it->d.lustre.it_lock_handle;
- ll_intent_drop_lock(it);
- wakeup = sa_received_empty(sai);
- cfs_list_add_tail(&entry->se_list,
- &sai->sai_entries_received);
- sai->sai_replied++;
- spin_unlock(&lli->lli_sa_lock);
- if (wakeup)
- cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
+ entry->se_minfo = minfo;
+ entry->se_req = ptlrpc_request_addref(req);
+ /* Release the async ibits lock ASAP to avoid deadlock
+ * when statahead thread tries to enqueue lock on parent
+ * for readpage and other tries to enqueue lock on child
+ * with parent's lock held, for example: unlink. */
+ entry->se_handle = it->d.lustre.it_lock_handle;
+ ll_intent_drop_lock(it);
+ wakeup = sa_received_empty(sai);
+ cfs_list_add_tail(&entry->se_list,
+ &sai->sai_entries_received);
}
- ll_sa_entry_put(sai, entry);
+ sai->sai_replied++;
+ spin_unlock(&lli->lli_sa_lock);
+
+ ll_sa_entry_put(sai, entry);
+ if (wakeup)
+ cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
}
EXIT;
if (d_mountpoint(dentry))
RETURN(1);
- if (unlikely(dentry == dentry->d_sb->s_root))
- RETURN(1);
-
entry->se_inode = igrab(inode);
rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),NULL);
if (rc == 1) {
struct l_wait_info lwi = { 0 };
ENTRY;
- {
- char pname[16];
- snprintf(pname, 15, "ll_agl_%u", plli->lli_opendir_pid);
- cfs_daemonize(pname);
- }
-
CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n",
cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
{
- struct ptlrpc_thread *thread = &sai->sai_agl_thread;
- struct l_wait_info lwi = { 0 };
- int rc;
- ENTRY;
+ struct ptlrpc_thread *thread = &sai->sai_agl_thread;
+ struct l_wait_info lwi = { 0 };
+ struct ll_inode_info *plli;
+ cfs_task_t *task;
+ ENTRY;
- CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n",
- cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
+ CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n",
+ cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
- rc = cfs_create_thread(ll_agl_thread, parent, 0);
- if (rc < 0) {
- CERROR("can't start ll_agl thread, rc: %d\n", rc);
- thread_set_flags(thread, SVC_STOPPED);
- RETURN_EXIT;
- }
+ plli = ll_i2info(parent->d_inode);
+ task = kthread_run(ll_agl_thread, parent,
+ "ll_agl_%u", plli->lli_opendir_pid);
+ if (IS_ERR(task)) {
+ CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
+ thread_set_flags(thread, SVC_STOPPED);
+ RETURN_EXIT;
+ }
- l_wait_event(thread->t_ctl_waitq,
- thread_is_running(thread) || thread_is_stopped(thread),
- &lwi);
- EXIT;
+ l_wait_event(thread->t_ctl_waitq,
+ thread_is_running(thread) || thread_is_stopped(thread),
+ &lwi);
+ EXIT;
}
static int ll_statahead_thread(void *arg)
struct l_wait_info lwi = { 0 };
ENTRY;
- {
- char pname[16];
- snprintf(pname, 15, "ll_sa_%u", plli->lli_opendir_pid);
- cfs_daemonize(pname);
- }
-
CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n",
cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
interpret_it:
while (!sa_received_empty(sai))
- do_statahead_interpret(sai, NULL);
+ ll_post_statahead(sai);
if (unlikely(!thread_is_running(thread))) {
ll_release_page(page, 0);
&lwi);
while (!sa_received_empty(sai))
- do_statahead_interpret(sai, NULL);
+ ll_post_statahead(sai);
if (unlikely(!thread_is_running(thread)))
GOTO(out, rc = 0);
/* To release the resources held by received entries. */
while (!sa_received_empty(sai))
- do_statahead_interpret(sai, NULL);
+ ll_post_statahead(sai);
spin_lock(&plli->lli_sa_lock);
}
struct ptlrpc_thread *thread;
struct l_wait_info lwi = { 0 };
int rc = 0;
+ struct ll_inode_info *plli;
ENTRY;
LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
RETURN(entry ? 1 : -EAGAIN);
}
+ /* if statahead is busy in readdir, help it do post-work */
while (!ll_sa_entry_stated(entry) &&
sai->sai_in_readpage &&
!sa_received_empty(sai))
- do_statahead_interpret(sai, entry);
+ ll_post_statahead(sai);
if (!ll_sa_entry_stated(entry)) {
sai->sai_index_wait = entry->se_index;
if ((*dentryp)->d_inode == NULL) {
*dentryp = ll_splice_alias(inode,
*dentryp);
+ if (IS_ERR(*dentryp)) {
+ ll_sai_unplug(sai, entry);
+ RETURN(PTR_ERR(*dentryp));
+ }
} else if ((*dentryp)->d_inode != inode) {
/* revalidate, but inode is recreated */
CDEBUG(D_READA,
cfs_curproc_pid(), parent->d_name.len, parent->d_name.name);
lli->lli_sai = sai;
- rc = cfs_create_thread(ll_statahead_thread, parent, 0);
- thread = &sai->sai_thread;
- if (rc < 0) {
- CERROR("can't start ll_sa thread, rc: %d\n", rc);
- dput(parent);
+
+ plli = ll_i2info(parent->d_inode);
+ rc = PTR_ERR(kthread_run(ll_statahead_thread, parent,
+ "ll_sa_%u", plli->lli_opendir_pid));
+ thread = &sai->sai_thread;
+ if (IS_ERR_VALUE(rc)) {
+ CERROR("can't start ll_sa thread, rc: %d\n", rc);
+ dput(parent);
lli->lli_opendir_key = NULL;
thread_set_flags(thread, SVC_STOPPED);
thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);