X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fllite%2Fstatahead.c;h=d8d2187bb88d33621ad71fe88a74fa9993b36605;hb=b3edbd0f5f9ef10941b98f2b82606916b850374a;hp=3ac64c9941b915b64eeb31ff7f57e292247f9ac8;hpb=ecaba99677b28536f9c376b2b835b554a7792668;p=fs%2Flustre-release.git diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 3ac64c9..d8d2187 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -27,7 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -58,38 +58,35 @@ typedef enum { } se_stat_t; struct ll_sa_entry { - /* link into sai->sai_entries_{sent,received,stated} */ - cfs_list_t se_list; - /* link into sai hash table locally */ - cfs_list_t se_hash; - /* entry reference count */ - cfs_atomic_t se_refcount; - /* entry index in the sai */ - __u64 se_index; - /* low layer ldlm lock handle */ - __u64 se_handle; - /* entry status */ - se_stat_t se_stat; - /* entry size, contains name */ - int se_size; - /* pointer to async getattr enqueue info */ - struct md_enqueue_info *se_minfo; - /* pointer to the async getattr request */ - struct ptlrpc_request *se_req; - /* pointer to the target inode */ - struct inode *se_inode; - /* entry name */ - struct qstr se_qstr; + /* link into sai->sai_entries */ + cfs_list_t se_link; + /* link into sai->sai_entries_{received,stated} */ + cfs_list_t se_list; + /* link into sai hash table locally */ + cfs_list_t se_hash; + /* entry reference count */ + cfs_atomic_t se_refcount; + /* entry index in the sai */ + __u64 se_index; + /* low layer ldlm lock handle */ + __u64 se_handle; + /* entry status */ + se_stat_t se_stat; + /* entry size, contains name */ + int se_size; + /* pointer to async getattr enqueue info */ + struct md_enqueue_info *se_minfo; + /* pointer to the async getattr request */ + struct ptlrpc_request *se_req; + /* pointer to the target inode */ + struct inode *se_inode; + /* entry name */ + struct qstr se_qstr; }; static unsigned int sai_generation = 0; static DEFINE_SPINLOCK(sai_generation_lock); -static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry) -{ - return cfs_list_empty(&entry->se_list); -} - static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry) { return cfs_list_empty(&entry->se_hash); @@ -115,11 +112,11 @@ static inline int ll_sa_entry_hash(int val) static inline void ll_sa_entry_enhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - int i = ll_sa_entry_hash(entry->se_qstr.hash); + int i = ll_sa_entry_hash(entry->se_qstr.hash); - cfs_spin_lock(&sai->sai_cache_lock[i]); - cfs_list_add_tail(&entry->se_hash, &sai->sai_cache[i]); - cfs_spin_unlock(&sai->sai_cache_lock[i]); + spin_lock(&sai->sai_cache_lock[i]); + cfs_list_add_tail(&entry->se_hash, &sai->sai_cache[i]); + spin_unlock(&sai->sai_cache_lock[i]); } /* @@ -128,11 +125,11 @@ ll_sa_entry_enhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) static inline void ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - int i = ll_sa_entry_hash(entry->se_qstr.hash); + int i = ll_sa_entry_hash(entry->se_qstr.hash); - cfs_spin_lock(&sai->sai_cache_lock[i]); - cfs_list_del_init(&entry->se_hash); - cfs_spin_unlock(&sai->sai_cache_lock[i]); + spin_lock(&sai->sai_cache_lock[i]); + cfs_list_del_init(&entry->se_hash); + spin_unlock(&sai->sai_cache_lock[i]); } static inline int agl_should_run(struct ll_statahead_info *sai, @@ -193,7 +190,7 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) } /* - * Insert it into sai_entries_sent tail when init. + * Insert it into sai_entries tail when init. */ static struct ll_sa_entry * ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, @@ -210,8 +207,8 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, if (unlikely(entry == NULL)) RETURN(ERR_PTR(-ENOMEM)); - CDEBUG(D_READA, "alloc sai entry %.*s(%p) index "LPU64"\n", - len, name, entry, index); + CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n", + len, name, entry, index); entry->se_index = index; @@ -249,14 +246,15 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, entry->se_qstr.name = dname; lli = ll_i2info(sai->sai_inode); - cfs_spin_lock(&lli->lli_sa_lock); - cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent); - cfs_spin_unlock(&lli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); + cfs_list_add_tail(&entry->se_link, &sai->sai_entries); + CFS_INIT_LIST_HEAD(&entry->se_list); + ll_sa_entry_enhash(sai, entry); + spin_unlock(&lli->lli_sa_lock); - cfs_atomic_inc(&sai->sai_cache_count); - ll_sa_entry_enhash(sai, entry); + cfs_atomic_inc(&sai->sai_cache_count); - RETURN(entry); + RETURN(entry); } /* @@ -291,17 +289,18 @@ ll_sa_entry_get_byname(struct ll_statahead_info *sai, const struct qstr *qstr) static struct ll_sa_entry * ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index) { - struct ll_sa_entry *entry; + struct ll_sa_entry *entry; - cfs_list_for_each_entry(entry, &sai->sai_entries_sent, se_list) { - if (entry->se_index == index) { - cfs_atomic_inc(&entry->se_refcount); - return entry; + cfs_list_for_each_entry(entry, &sai->sai_entries, se_link) { + if (entry->se_index == index) { + LASSERT(atomic_read(&entry->se_refcount) > 0); + cfs_atomic_inc(&entry->se_refcount); + return entry; } - if (entry->se_index > index) - break; - } - return NULL; + if (entry->se_index > index) + break; + } + return NULL; } static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, @@ -326,37 +325,42 @@ static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, static void ll_sa_entry_put(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - if (cfs_atomic_dec_and_test(&entry->se_refcount)) { - CDEBUG(D_READA, "free sai entry %.*s(%p) index "LPU64"\n", - entry->se_qstr.len, entry->se_qstr.name, entry, - entry->se_index); - - LASSERT(ll_sa_entry_unhashed(entry)); - LASSERT(ll_sa_entry_unlinked(entry)); - - ll_sa_entry_cleanup(sai, entry); - if (entry->se_inode) - iput(entry->se_inode); - - OBD_FREE(entry, entry->se_size); - cfs_atomic_dec(&sai->sai_cache_count); - } + if (cfs_atomic_dec_and_test(&entry->se_refcount)) { + CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n", + entry->se_qstr.len, entry->se_qstr.name, entry, + entry->se_index); + + LASSERT(cfs_list_empty(&entry->se_link)); + LASSERT(cfs_list_empty(&entry->se_list)); + LASSERT(ll_sa_entry_unhashed(entry)); + + ll_sa_entry_cleanup(sai, entry); + if (entry->se_inode) + iput(entry->se_inode); + + OBD_FREE(entry, entry->se_size); + cfs_atomic_dec(&sai->sai_cache_count); + } } static inline void -do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +do_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - ll_sa_entry_unhash(sai, entry); + LASSERT(!ll_sa_entry_unhashed(entry)); + LASSERT(!cfs_list_empty(&entry->se_link)); - cfs_spin_lock(&lli->lli_sa_lock); - entry->se_stat = SA_ENTRY_DEST; - if (likely(!ll_sa_entry_unlinked(entry))) - cfs_list_del_init(&entry->se_list); - cfs_spin_unlock(&lli->lli_sa_lock); + ll_sa_entry_unhash(sai, entry); - ll_sa_entry_put(sai, entry); + spin_lock(&lli->lli_sa_lock); + entry->se_stat = SA_ENTRY_DEST; + cfs_list_del_init(&entry->se_link); + if (likely(!cfs_list_empty(&entry->se_list))) + cfs_list_del_init(&entry->se_list); + spin_unlock(&lli->lli_sa_lock); + + ll_sa_entry_put(sai, entry); } /* @@ -365,52 +369,41 @@ do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) static void ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - struct ll_sa_entry *pos, *next; - - if (entry) - do_sai_entry_fini(sai, entry); + struct ll_sa_entry *pos, *next; - /* drop old entry from sent list */ - cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_sent, - se_list) { - if (is_omitted_entry(sai, pos->se_index)) - do_sai_entry_fini(sai, pos); - else - break; - } + if (entry) + do_sa_entry_fini(sai, entry); - /* drop old entry from stated list */ - cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_stated, - se_list) { - if (is_omitted_entry(sai, pos->se_index)) - do_sai_entry_fini(sai, pos); - else - break; - } + /* drop old entry, only 'scanner' process does this, no need to lock */ + cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) { + if (!is_omitted_entry(sai, pos->se_index)) + break; + do_sa_entry_fini(sai, pos); + } } /* * Inside lli_sa_lock. */ static void -do_sai_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, int rc) +do_sa_entry_to_stated(struct ll_statahead_info *sai, + struct ll_sa_entry *entry, se_stat_t stat) { - struct ll_sa_entry *se; - cfs_list_t *pos = &sai->sai_entries_stated; + struct ll_sa_entry *se; + cfs_list_t *pos = &sai->sai_entries_stated; - if (!ll_sa_entry_unlinked(entry)) - cfs_list_del_init(&entry->se_list); + if (!cfs_list_empty(&entry->se_list)) + cfs_list_del_init(&entry->se_list); - cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { - if (se->se_index < entry->se_index) { - pos = &se->se_list; - break; - } - } + cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { + if (se->se_index < entry->se_index) { + pos = &se->se_list; + break; + } + } - cfs_list_add(&entry->se_list, pos); - entry->se_stat = rc; + cfs_list_add(&entry->se_list, pos); + entry->se_stat = stat; } /* @@ -420,21 +413,21 @@ do_sai_entry_to_stated(struct ll_statahead_info *sai, */ static int ll_sa_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, int rc) + struct ll_sa_entry *entry, se_stat_t stat) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - int ret = 1; + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + int ret = 1; - ll_sa_entry_cleanup(sai, entry); + ll_sa_entry_cleanup(sai, entry); - cfs_spin_lock(&lli->lli_sa_lock); - if (likely(entry->se_stat != SA_ENTRY_DEST)) { - do_sai_entry_to_stated(sai, entry, rc); - ret = 0; - } - cfs_spin_unlock(&lli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); + if (likely(entry->se_stat != SA_ENTRY_DEST)) { + do_sa_entry_to_stated(sai, entry, stat); + ret = 0; + } + spin_unlock(&lli->lli_sa_lock); - return ret; + return ret; } /* @@ -443,29 +436,29 @@ ll_sa_entry_to_stated(struct ll_statahead_info *sai, static void ll_agl_add(struct ll_statahead_info *sai, struct inode *inode, int index) { - struct ll_inode_info *child = ll_i2info(inode); - struct ll_inode_info *parent = ll_i2info(sai->sai_inode); - int added = 0; - - cfs_spin_lock(&child->lli_agl_lock); - if (child->lli_agl_index == 0) { - child->lli_agl_index = index; - cfs_spin_unlock(&child->lli_agl_lock); - - LASSERT(cfs_list_empty(&child->lli_agl_list)); - - igrab(inode); - cfs_spin_lock(&parent->lli_agl_lock); - if (agl_list_empty(sai)) - added = 1; - cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl); - cfs_spin_unlock(&parent->lli_agl_lock); - } else { - cfs_spin_unlock(&child->lli_agl_lock); - } - - if (added > 0) - cfs_waitq_signal(&sai->sai_agl_thread.t_ctl_waitq); + struct ll_inode_info *child = ll_i2info(inode); + struct ll_inode_info *parent = ll_i2info(sai->sai_inode); + int added = 0; + + spin_lock(&child->lli_agl_lock); + if (child->lli_agl_index == 0) { + child->lli_agl_index = index; + spin_unlock(&child->lli_agl_lock); + + LASSERT(cfs_list_empty(&child->lli_agl_list)); + + igrab(inode); + spin_lock(&parent->lli_agl_lock); + if (agl_list_empty(sai)) + added = 1; + cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl); + spin_unlock(&parent->lli_agl_lock); + } else { + spin_unlock(&child->lli_agl_lock); + } + + if (added > 0) + cfs_waitq_signal(&sai->sai_agl_thread.t_ctl_waitq); } static struct ll_statahead_info *ll_sai_alloc(void) @@ -480,11 +473,11 @@ static struct ll_statahead_info *ll_sai_alloc(void) cfs_atomic_set(&sai->sai_refcount, 1); - cfs_spin_lock(&sai_generation_lock); - sai->sai_generation = ++sai_generation; - if (unlikely(sai_generation == 0)) - sai->sai_generation = ++sai_generation; - cfs_spin_unlock(&sai_generation_lock); + spin_lock(&sai_generation_lock); + sai->sai_generation = ++sai_generation; + if (unlikely(sai_generation == 0)) + sai->sai_generation = ++sai_generation; + spin_unlock(&sai_generation_lock); sai->sai_max = LL_SA_RPC_MIN; sai->sai_index = 1; @@ -492,14 +485,14 @@ static struct ll_statahead_info *ll_sai_alloc(void) cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq); - CFS_INIT_LIST_HEAD(&sai->sai_entries_sent); + CFS_INIT_LIST_HEAD(&sai->sai_entries); CFS_INIT_LIST_HEAD(&sai->sai_entries_received); CFS_INIT_LIST_HEAD(&sai->sai_entries_stated); CFS_INIT_LIST_HEAD(&sai->sai_entries_agl); for (i = 0; i < LL_SA_CACHE_SIZE; i++) { CFS_INIT_LIST_HEAD(&sai->sai_cache[i]); - cfs_spin_lock_init(&sai->sai_cache_lock[i]); + spin_lock_init(&sai->sai_cache_lock[i]); } cfs_atomic_set(&sai->sai_cache_count, 0); @@ -525,17 +518,17 @@ static void ll_sai_put(struct ll_statahead_info *sai) if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) { /* It is race case, the interpret callback just hold * a reference count */ - cfs_spin_unlock(&lli->lli_sa_lock); - RETURN_EXIT; - } + spin_unlock(&lli->lli_sa_lock); + RETURN_EXIT; + } - LASSERT(lli->lli_opendir_key == NULL); - LASSERT(thread_is_stopped(&sai->sai_thread)); - LASSERT(thread_is_stopped(&sai->sai_agl_thread)); + LASSERT(lli->lli_opendir_key == NULL); + LASSERT(thread_is_stopped(&sai->sai_thread)); + LASSERT(thread_is_stopped(&sai->sai_agl_thread)); - lli->lli_sai = NULL; - lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_sa_lock); + lli->lli_sai = NULL; + lli->lli_opendir_pid = 0; + spin_unlock(&lli->lli_sa_lock); if (sai->sai_sent > sai->sai_replied) CDEBUG(D_READA,"statahead for dir "DFID" does not " @@ -543,15 +536,13 @@ static void ll_sai_put(struct ll_statahead_info *sai) PFID(&lli->lli_fid), sai->sai_sent, sai->sai_replied); - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_sent, se_list) - do_sai_entry_fini(sai, entry); - - LASSERT(sa_received_empty(sai)); + cfs_list_for_each_entry_safe(entry, next, + &sai->sai_entries, se_link) + do_sa_entry_fini(sai, entry); - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_stated, se_list) - do_sai_entry_fini(sai, entry); + LASSERT(list_empty(&sai->sai_entries)); + LASSERT(sa_received_empty(sai)); + LASSERT(list_empty(&sai->sai_entries_stated)); LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0); LASSERT(agl_list_empty(sai)); @@ -581,7 +572,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) } /* Someone is in glimpse (sync or async), do nothing. */ - rc = cfs_down_write_trylock(&lli->lli_glimpse_sem); + rc = down_write_trylock(&lli->lli_glimpse_sem); if (rc == 0) { lli->lli_agl_index = 0; iput(inode); @@ -603,7 +594,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) */ if (lli->lli_glimpse_time != 0 && cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) { - cfs_up_write(&lli->lli_glimpse_sem); + up_write(&lli->lli_glimpse_sem); lli->lli_agl_index = 0; iput(inode); RETURN_EXIT; @@ -615,7 +606,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) cl_agl(inode); lli->lli_agl_index = 0; lli->lli_glimpse_time = cfs_time_current(); - cfs_up_write(&lli->lli_glimpse_sem); + up_write(&lli->lli_glimpse_sem); CDEBUG(D_READA, "Handled (init) async glimpse: inode= " DFID", idx = "LPU64", rc = %d\n", @@ -626,8 +617,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) EXIT; } -static void do_statahead_interpret(struct ll_statahead_info *sai, - struct ll_sa_entry *target) +static void ll_post_statahead(struct ll_statahead_info *sai) { struct inode *dir = sai->sai_inode; struct inode *child; @@ -640,20 +630,15 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, int rc = 0; ENTRY; - cfs_spin_lock(&lli->lli_sa_lock); - if (target != NULL && target->se_req != NULL && - !cfs_list_empty(&target->se_list)) { - entry = target; - } else if (unlikely(sa_received_empty(sai))) { - cfs_spin_unlock(&lli->lli_sa_lock); - RETURN_EXIT; - } else { - entry = sa_first_received_entry(sai); - } - - cfs_atomic_inc(&entry->se_refcount); - cfs_list_del_init(&entry->se_list); - cfs_spin_unlock(&lli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); + if (unlikely(sa_received_empty(sai))) { + spin_unlock(&lli->lli_sa_lock); + RETURN_EXIT; + } + entry = sa_first_received_entry(sai); + cfs_atomic_inc(&entry->se_refcount); + cfs_list_del_init(&entry->se_list); + spin_unlock(&lli->lli_sa_lock); LASSERT(entry->se_handle != 0); @@ -688,11 +673,11 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, } it->d.lustre.it_lock_handle = entry->se_handle; - rc = md_revalidate_lock(ll_i2mdexp(dir), it, NULL, NULL); + rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL); if (rc != 1) GOTO(out, rc = -EAGAIN); - rc = ll_prep_inode(&child, req, dir->i_sb); + rc = ll_prep_inode(&child, req, dir->i_sb, it); if (rc) GOTO(out, rc); @@ -712,8 +697,9 @@ out: * reference count by calling "ll_intent_drop_lock()" in spite of the * above operations failed or not. Do not worry about calling * "ll_intent_drop_lock()" more than once. */ - rc = ll_sa_entry_to_stated(sai, entry, rc < 0 ? rc : SA_ENTRY_SUCC); - if (rc == 0 && entry->se_index == sai->sai_index_wait && target == NULL) + rc = ll_sa_entry_to_stated(sai, entry, + rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (rc == 0 && entry->se_index == sai->sai_index_wait) cfs_waitq_signal(&sai->sai_waitq); ll_sa_entry_put(sai, entry); } @@ -732,52 +718,49 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, if (it_disposition(it, DISP_LOOKUP_NEG)) rc = -ENOENT; - cfs_spin_lock(&lli->lli_sa_lock); - /* stale entry */ - if (unlikely(lli->lli_sai == NULL || - lli->lli_sai->sai_generation != minfo->mi_generation)) { - cfs_spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -ESTALE); - } else { - sai = ll_sai_get(lli->lli_sai); - if (unlikely(!thread_is_running(&sai->sai_thread))) { - sai->sai_replied++; - cfs_spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -EBADFD); - } - - entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata); - if (entry == NULL) { - sai->sai_replied++; - cfs_spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -EIDRM); - } - - cfs_list_del_init(&entry->se_list); - if (rc != 0) { - sai->sai_replied++; - do_sai_entry_to_stated(sai, entry, rc); - cfs_spin_unlock(&lli->lli_sa_lock); - if (entry->se_index == sai->sai_index_wait) - cfs_waitq_signal(&sai->sai_waitq); + spin_lock(&lli->lli_sa_lock); + /* stale entry */ + if (unlikely(lli->lli_sai == NULL || + lli->lli_sai->sai_generation != minfo->mi_generation)) { + spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -ESTALE); + } else { + sai = ll_sai_get(lli->lli_sai); + if (unlikely(!thread_is_running(&sai->sai_thread))) { + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -EBADFD); + } + + entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata); + if (entry == NULL) { + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -EIDRM); + } + + if (rc != 0) { + do_sa_entry_to_stated(sai, entry, SA_ENTRY_INVA); + wakeup = (entry->se_index == sai->sai_index_wait); } else { - entry->se_minfo = minfo; - entry->se_req = ptlrpc_request_addref(req); - /* Release the async ibits lock ASAP to avoid deadlock - * when statahead thread tries to enqueue lock on parent - * for readpage and other tries to enqueue lock on child - * with parent's lock held, for example: unlink. */ - entry->se_handle = it->d.lustre.it_lock_handle; - ll_intent_drop_lock(it); - wakeup = sa_received_empty(sai); - cfs_list_add_tail(&entry->se_list, - &sai->sai_entries_received); - sai->sai_replied++; - cfs_spin_unlock(&lli->lli_sa_lock); - if (wakeup) - cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); + entry->se_minfo = minfo; + entry->se_req = ptlrpc_request_addref(req); + /* Release the async ibits lock ASAP to avoid deadlock + * when statahead thread tries to enqueue lock on parent + * for readpage and other tries to enqueue lock on child + * with parent's lock held, for example: unlink. */ + entry->se_handle = it->d.lustre.it_lock_handle; + ll_intent_drop_lock(it); + wakeup = sa_received_empty(sai); + cfs_list_add_tail(&entry->se_list, + &sai->sai_entries_received); } - ll_sa_entry_put(sai, entry); + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + + ll_sa_entry_put(sai, entry); + if (wakeup) + cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); } EXIT; @@ -910,9 +893,6 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, if (d_mountpoint(dentry)) RETURN(1); - if (unlikely(dentry == dentry->d_sb->s_root)) - RETURN(1); - entry->se_inode = igrab(inode); rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),NULL); if (rc == 1) { @@ -998,20 +978,14 @@ static int ll_agl_thread(void *arg) struct l_wait_info lwi = { 0 }; ENTRY; - { - char pname[16]; - snprintf(pname, 15, "ll_agl_%u", plli->lli_opendir_pid); - cfs_daemonize(pname); - } - CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n", cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); atomic_inc(&sbi->ll_agl_total); - cfs_spin_lock(&plli->lli_agl_lock); - sai->sai_agl_valid = 1; - thread_set_flags(thread, SVC_RUNNING); - cfs_spin_unlock(&plli->lli_agl_lock); + spin_lock(&plli->lli_agl_lock); + sai->sai_agl_valid = 1; + thread_set_flags(thread, SVC_RUNNING); + spin_unlock(&plli->lli_agl_lock); cfs_waitq_signal(&thread->t_ctl_waitq); while (1) { @@ -1023,59 +997,62 @@ static int ll_agl_thread(void *arg) if (!thread_is_running(thread)) break; - cfs_spin_lock(&plli->lli_agl_lock); - /* The statahead thread maybe help to process AGL entries, - * so check whether list empty again. */ - if (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - cfs_spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, sai); - } else { - cfs_spin_unlock(&plli->lli_agl_lock); - } - } - - cfs_spin_lock(&plli->lli_agl_lock); - sai->sai_agl_valid = 0; - while (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - cfs_spin_unlock(&plli->lli_agl_lock); - clli->lli_agl_index = 0; - iput(&clli->lli_vfs_inode); - cfs_spin_lock(&plli->lli_agl_lock); - } - thread_set_flags(thread, SVC_STOPPED); - cfs_spin_unlock(&plli->lli_agl_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); - ll_sai_put(sai); - CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - RETURN(0); + spin_lock(&plli->lli_agl_lock); + /* The statahead thread maybe help to process AGL entries, + * so check whether list empty again. */ + if (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + } else { + spin_unlock(&plli->lli_agl_lock); + } + } + + spin_lock(&plli->lli_agl_lock); + sai->sai_agl_valid = 0; + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + clli->lli_agl_index = 0; + iput(&clli->lli_vfs_inode); + spin_lock(&plli->lli_agl_lock); + } + thread_set_flags(thread, SVC_STOPPED); + spin_unlock(&plli->lli_agl_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); + ll_sai_put(sai); + CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + RETURN(0); } static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) { - struct ptlrpc_thread *thread = &sai->sai_agl_thread; - struct l_wait_info lwi = { 0 }; - int rc; - ENTRY; - - CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - - rc = cfs_create_thread(ll_agl_thread, parent, 0); - if (rc < 0) { - CERROR("can't start ll_agl thread, rc: %d\n", rc); - thread_set_flags(thread, SVC_STOPPED); - RETURN_EXIT; - } - - l_wait_event(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread), - &lwi); - EXIT; + struct ptlrpc_thread *thread = &sai->sai_agl_thread; + struct l_wait_info lwi = { 0 }; + struct ll_inode_info *plli; + cfs_task_t *task; + ENTRY; + + CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + + plli = ll_i2info(parent->d_inode); + task = kthread_run(ll_agl_thread, parent, + "ll_agl_%u", plli->lli_opendir_pid); + if (IS_ERR(task)) { + CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task)); + thread_set_flags(thread, SVC_STOPPED); + RETURN_EXIT; + } + + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); + EXIT; } static int ll_statahead_thread(void *arg) @@ -1096,12 +1073,6 @@ static int ll_statahead_thread(void *arg) struct l_wait_info lwi = { 0 }; ENTRY; - { - char pname[16]; - snprintf(pname, 15, "ll_sa_%u", plli->lli_opendir_pid); - cfs_daemonize(pname); - } - CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n", cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); @@ -1109,12 +1080,12 @@ static int ll_statahead_thread(void *arg) ll_start_agl(parent, sai); atomic_inc(&sbi->ll_sa_total); - cfs_spin_lock(&plli->lli_sa_lock); - thread_set_flags(thread, SVC_RUNNING); - cfs_spin_unlock(&plli->lli_sa_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); + spin_lock(&plli->lli_sa_lock); + thread_set_flags(thread, SVC_RUNNING); + spin_unlock(&plli->lli_sa_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); - ll_dir_chain_init(&chain); + ll_dir_chain_init(&chain); page = ll_get_dir_page(dir, pos, &chain); while (1) { @@ -1188,7 +1159,7 @@ keep_it: interpret_it: while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); + ll_post_statahead(sai); if (unlikely(!thread_is_running(thread))) { ll_release_page(page, 0); @@ -1199,11 +1170,11 @@ interpret_it: * some AGL entries to be triggered, then try to help * to process the AGL entries. */ if (sa_sent_full(sai)) { - cfs_spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - cfs_spin_unlock(&plli->lli_agl_lock); + spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); ll_agl_trigger(&clli->lli_vfs_inode, sai); @@ -1219,9 +1190,9 @@ interpret_it: if (!sa_sent_full(sai)) goto do_it; - cfs_spin_lock(&plli->lli_agl_lock); - } - cfs_spin_unlock(&plli->lli_agl_lock); + spin_lock(&plli->lli_agl_lock); + } + spin_unlock(&plli->lli_agl_lock); goto keep_it; } @@ -1243,7 +1214,7 @@ do_it: &lwi); while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); + ll_post_statahead(sai); if (unlikely(!thread_is_running(thread))) GOTO(out, rc = 0); @@ -1253,16 +1224,16 @@ do_it: break; } - cfs_spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai) && - thread_is_running(thread)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - cfs_spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, sai); - cfs_spin_lock(&plli->lli_agl_lock); - } - cfs_spin_unlock(&plli->lli_agl_lock); + spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai) && + thread_is_running(thread)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + spin_lock(&plli->lli_agl_lock); + } + spin_unlock(&plli->lli_agl_lock); GOTO(out, rc = 0); } else if (1) { @@ -1287,9 +1258,9 @@ do_it: out: if (sai->sai_agl_valid) { - cfs_spin_lock(&plli->lli_agl_lock); - thread_set_flags(agl_thread, SVC_STOPPING); - cfs_spin_unlock(&plli->lli_agl_lock); + spin_lock(&plli->lli_agl_lock); + thread_set_flags(agl_thread, SVC_STOPPING); + spin_unlock(&plli->lli_agl_lock); cfs_waitq_signal(&agl_thread->t_ctl_waitq); CDEBUG(D_READA, "stop agl thread: [pid %d]\n", @@ -1302,19 +1273,19 @@ out: thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); } ll_dir_chain_fini(&chain); - cfs_spin_lock(&plli->lli_sa_lock); - if (!sa_received_empty(sai)) { - thread_set_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&plli->lli_sa_lock); - - /* To release the resources held by received entries. */ - while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); - - cfs_spin_lock(&plli->lli_sa_lock); - } - thread_set_flags(thread, SVC_STOPPED); - cfs_spin_unlock(&plli->lli_sa_lock); + spin_lock(&plli->lli_sa_lock); + if (!sa_received_empty(sai)) { + thread_set_flags(thread, SVC_STOPPING); + spin_unlock(&plli->lli_sa_lock); + + /* To release the resources held by received entries. */ + while (!sa_received_empty(sai)) + ll_post_statahead(sai); + + spin_lock(&plli->lli_sa_lock); + } + thread_set_flags(thread, SVC_STOPPED); + spin_unlock(&plli->lli_sa_lock); cfs_waitq_signal(&sai->sai_waitq); cfs_waitq_signal(&thread->t_ctl_waitq); ll_sai_put(sai); @@ -1329,14 +1300,14 @@ out: */ void ll_stop_statahead(struct inode *dir, void *key) { - struct ll_inode_info *lli = ll_i2info(dir); + struct ll_inode_info *lli = ll_i2info(dir); - if (unlikely(key == NULL)) - return; + if (unlikely(key == NULL)) + return; - cfs_spin_lock(&lli->lli_sa_lock); - if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) { - cfs_spin_unlock(&lli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); + if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) { + spin_unlock(&lli->lli_sa_lock); return; } @@ -1348,28 +1319,28 @@ void ll_stop_statahead(struct inode *dir, void *key) if (!thread_is_stopped(thread)) { thread_set_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&lli->lli_sa_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); - - CDEBUG(D_READA, "stop statahead thread: [pid %d]\n", - cfs_curproc_pid()); - l_wait_event(thread->t_ctl_waitq, - thread_is_stopped(thread), - &lwi); - } else { - cfs_spin_unlock(&lli->lli_sa_lock); - } - - /* - * Put the ref which was held when first statahead_enter. - * It maybe not the last ref for some statahead requests - * maybe inflight. - */ - ll_sai_put(lli->lli_sai); - } else { - lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_sa_lock); - } + spin_unlock(&lli->lli_sa_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); + + CDEBUG(D_READA, "stop statahead thread: [pid %d]\n", + cfs_curproc_pid()); + l_wait_event(thread->t_ctl_waitq, + thread_is_stopped(thread), + &lwi); + } else { + spin_unlock(&lli->lli_sa_lock); + } + + /* + * Put the ref which was held when first statahead_enter. + * It maybe not the last ref for some statahead requests + * maybe inflight. + */ + ll_sai_put(lli->lli_sai); + } else { + lli->lli_opendir_pid = 0; + spin_unlock(&lli->lli_sa_lock); + } } enum { @@ -1533,17 +1504,17 @@ ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) PFID(&lli->lli_fid), sai->sai_hit, sai->sai_miss, sai->sai_sent, sai->sai_replied, cfs_curproc_pid()); - cfs_spin_lock(&lli->lli_sa_lock); - if (!thread_is_stopped(thread)) - thread_set_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&lli->lli_sa_lock); - } - } + spin_lock(&lli->lli_sa_lock); + if (!thread_is_stopped(thread)) + thread_set_flags(thread, SVC_STOPPING); + spin_unlock(&lli->lli_sa_lock); + } + } - if (!thread_is_stopped(thread)) - cfs_waitq_signal(&thread->t_ctl_waitq); + if (!thread_is_stopped(thread)) + cfs_waitq_signal(&thread->t_ctl_waitq); - EXIT; + EXIT; } /** @@ -1565,6 +1536,7 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, struct ptlrpc_thread *thread; struct l_wait_info lwi = { 0 }; int rc = 0; + struct ll_inode_info *plli; ENTRY; LASSERT(lli->lli_opendir_pid == cfs_curproc_pid()); @@ -1611,10 +1583,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, RETURN(entry ? 1 : -EAGAIN); } + /* if statahead is busy in readdir, help it do post-work */ while (!ll_sa_entry_stated(entry) && sai->sai_in_readpage && !sa_received_empty(sai)) - do_statahead_interpret(sai, entry); + ll_post_statahead(sai); if (!ll_sa_entry_stated(entry)) { sai->sai_index_wait = entry->se_index; @@ -1644,6 +1617,10 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, if ((*dentryp)->d_inode == NULL) { *dentryp = ll_splice_alias(inode, *dentryp); + if (IS_ERR(*dentryp)) { + ll_sai_unplug(sai, entry); + RETURN(PTR_ERR(*dentryp)); + } } else if ((*dentryp)->d_inode != inode) { /* revalidate, but inode is recreated */ CDEBUG(D_READA, @@ -1709,11 +1686,14 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); lli->lli_sai = sai; - rc = cfs_create_thread(ll_statahead_thread, parent, 0); - thread = &sai->sai_thread; - if (rc < 0) { - CERROR("can't start ll_sa thread, rc: %d\n", rc); - dput(parent); + + plli = ll_i2info(parent->d_inode); + rc = PTR_ERR(kthread_run(ll_statahead_thread, parent, + "ll_sa_%u", plli->lli_opendir_pid)); + thread = &sai->sai_thread; + if (IS_ERR_VALUE(rc)) { + CERROR("can't start ll_sa thread, rc: %d\n", rc); + dput(parent); lli->lli_opendir_key = NULL; thread_set_flags(thread, SVC_STOPPED); thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); @@ -1735,9 +1715,9 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, out: if (sai != NULL) OBD_FREE_PTR(sai); - cfs_spin_lock(&lli->lli_sa_lock); - lli->lli_opendir_key = NULL; - lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_sa_lock); - return rc; + spin_lock(&lli->lli_sa_lock); + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; + spin_unlock(&lli->lli_sa_lock); + return rc; }