From: Lai Siyao Date: Tue, 26 Mar 2013 08:12:11 +0000 (+0800) Subject: LU-2388 statahead: race in do_sa_entry_fini() X-Git-Tag: 2.3.64~61 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=633d4edb4e27ff3af7026e8d4e6770cec042a931;ds=sidebyside LU-2388 statahead: race in do_sa_entry_fini() Two fixes: * When iterating sa_entry list in do_sa_entry_fini(), there is no lock, as may cause one entry put twice. To fix this, all entries are put in one list, and only 'scanner' will drop entry from this list. * sa_entry may be linked to sai_sent_entries, but not hashed yet, if ll_sa_entry_fini() is called at this moment, this sa_entry may be unhashed. Also include minor cleanup: * rename do_sai_entry_fini() to do_sa_entry_fini(). * rename do_sai_entry_to_stated() to do_sa_entry_to_stated(). * rename do_statahead_interpret() to ll_post_statahead() to distinguish from ll_statahead_interpret(). * ll_post_statahead() always post handle statahead from received list to simplify logic. Signed-off-by: Lai Siyao Change-Id: I3d0911d0bd3b940c9650473099604646408200c4 Reviewed-on: http://review.whamcloud.com/5842 Tested-by: Hudson Reviewed-by: Fan Yong Reviewed-by: Peng Tao Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 52e40b5..957ca24 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -1277,7 +1277,7 @@ struct ll_statahead_info { cfs_waitq_t sai_waitq; /* stat-ahead wait queue */ struct ptlrpc_thread sai_thread; /* stat-ahead thread */ struct ptlrpc_thread sai_agl_thread; /* AGL thread */ - cfs_list_t sai_entries_sent; /* entries sent out */ + cfs_list_t sai_entries; /* entry list */ cfs_list_t sai_entries_received; /* entries returned */ cfs_list_t sai_entries_stated; /* entries stated */ cfs_list_t sai_entries_agl; /* AGL entries to be sent */ diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 9eea49e..6d5b7af 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -58,38 +58,35 @@ typedef enum { } se_stat_t; struct ll_sa_entry { - /* link into sai->sai_entries_{sent,received,stated} */ - cfs_list_t se_list; - /* link into sai hash table locally */ - cfs_list_t se_hash; - /* entry reference count */ - cfs_atomic_t se_refcount; - /* entry index in the sai */ - __u64 se_index; - /* low layer ldlm lock handle */ - __u64 se_handle; - /* entry status */ - se_stat_t se_stat; - /* entry size, contains name */ - int se_size; - /* pointer to async getattr enqueue info */ - struct md_enqueue_info *se_minfo; - /* pointer to the async getattr request */ - struct ptlrpc_request *se_req; - /* pointer to the target inode */ - struct inode *se_inode; - /* entry name */ - struct qstr se_qstr; + /* link into sai->sai_entries */ + cfs_list_t se_link; + /* link into sai->sai_entries_{received,stated} */ + cfs_list_t se_list; + /* link into sai hash table locally */ + cfs_list_t se_hash; + /* entry reference count */ + cfs_atomic_t se_refcount; + /* entry index in the sai */ + __u64 se_index; + /* low layer ldlm lock handle */ + __u64 se_handle; + /* entry status */ + se_stat_t se_stat; + /* entry size, contains name */ + int se_size; + /* pointer to async getattr enqueue info */ + struct md_enqueue_info *se_minfo; + /* pointer to the async getattr request */ + struct ptlrpc_request *se_req; + /* pointer to the target inode */ + struct inode *se_inode; + /* entry name */ + struct qstr se_qstr; }; static unsigned int sai_generation = 0; static DEFINE_SPINLOCK(sai_generation_lock); -static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry) -{ - return cfs_list_empty(&entry->se_list); -} - static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry) { return cfs_list_empty(&entry->se_hash); @@ -193,7 +190,7 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) } /* - * Insert it into sai_entries_sent tail when init. + * Insert it into sai_entries tail when init. */ static struct ll_sa_entry * ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, @@ -210,8 +207,8 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, if (unlikely(entry == NULL)) RETURN(ERR_PTR(-ENOMEM)); - CDEBUG(D_READA, "alloc sai entry %.*s(%p) index "LPU64"\n", - len, name, entry, index); + CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n", + len, name, entry, index); entry->se_index = index; @@ -250,11 +247,12 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, lli = ll_i2info(sai->sai_inode); spin_lock(&lli->lli_sa_lock); - cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent); + cfs_list_add_tail(&entry->se_link, &sai->sai_entries); + CFS_INIT_LIST_HEAD(&entry->se_list); + ll_sa_entry_enhash(sai, entry); spin_unlock(&lli->lli_sa_lock); cfs_atomic_inc(&sai->sai_cache_count); - ll_sa_entry_enhash(sai, entry); RETURN(entry); } @@ -291,17 +289,18 @@ ll_sa_entry_get_byname(struct ll_statahead_info *sai, const struct qstr *qstr) static struct ll_sa_entry * ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index) { - struct ll_sa_entry *entry; + struct ll_sa_entry *entry; - cfs_list_for_each_entry(entry, &sai->sai_entries_sent, se_list) { - if (entry->se_index == index) { - cfs_atomic_inc(&entry->se_refcount); - return entry; + cfs_list_for_each_entry(entry, &sai->sai_entries, se_link) { + if (entry->se_index == index) { + LASSERT(atomic_read(&entry->se_refcount) > 0); + cfs_atomic_inc(&entry->se_refcount); + return entry; } - if (entry->se_index > index) - break; - } - return NULL; + if (entry->se_index > index) + break; + } + return NULL; } static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, @@ -326,33 +325,38 @@ static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, static void ll_sa_entry_put(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - if (cfs_atomic_dec_and_test(&entry->se_refcount)) { - CDEBUG(D_READA, "free sai entry %.*s(%p) index "LPU64"\n", - entry->se_qstr.len, entry->se_qstr.name, entry, - entry->se_index); + if (cfs_atomic_dec_and_test(&entry->se_refcount)) { + CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n", + entry->se_qstr.len, entry->se_qstr.name, entry, + entry->se_index); - LASSERT(ll_sa_entry_unhashed(entry)); - LASSERT(ll_sa_entry_unlinked(entry)); + LASSERT(cfs_list_empty(&entry->se_link)); + LASSERT(cfs_list_empty(&entry->se_list)); + LASSERT(ll_sa_entry_unhashed(entry)); - ll_sa_entry_cleanup(sai, entry); - if (entry->se_inode) - iput(entry->se_inode); + ll_sa_entry_cleanup(sai, entry); + if (entry->se_inode) + iput(entry->se_inode); - OBD_FREE(entry, entry->se_size); - cfs_atomic_dec(&sai->sai_cache_count); - } + OBD_FREE(entry, entry->se_size); + cfs_atomic_dec(&sai->sai_cache_count); + } } static inline void -do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +do_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + LASSERT(!ll_sa_entry_unhashed(entry)); + LASSERT(!cfs_list_empty(&entry->se_link)); + ll_sa_entry_unhash(sai, entry); spin_lock(&lli->lli_sa_lock); entry->se_stat = SA_ENTRY_DEST; - if (likely(!ll_sa_entry_unlinked(entry))) + cfs_list_del_init(&entry->se_link); + if (likely(!cfs_list_empty(&entry->se_list))) cfs_list_del_init(&entry->se_list); spin_unlock(&lli->lli_sa_lock); @@ -365,52 +369,41 @@ do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) static void ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - struct ll_sa_entry *pos, *next; + struct ll_sa_entry *pos, *next; - if (entry) - do_sai_entry_fini(sai, entry); + if (entry) + do_sa_entry_fini(sai, entry); - /* drop old entry from sent list */ - cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_sent, - se_list) { - if (is_omitted_entry(sai, pos->se_index)) - do_sai_entry_fini(sai, pos); - else - break; - } - - /* drop old entry from stated list */ - cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_stated, - se_list) { - if (is_omitted_entry(sai, pos->se_index)) - do_sai_entry_fini(sai, pos); - else - break; - } + /* drop old entry, only 'scanner' process does this, no need to lock */ + cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) { + if (!is_omitted_entry(sai, pos->se_index)) + break; + do_sa_entry_fini(sai, pos); + } } /* * Inside lli_sa_lock. */ static void -do_sai_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, int rc) +do_sa_entry_to_stated(struct ll_statahead_info *sai, + struct ll_sa_entry *entry, se_stat_t stat) { - struct ll_sa_entry *se; - cfs_list_t *pos = &sai->sai_entries_stated; + struct ll_sa_entry *se; + cfs_list_t *pos = &sai->sai_entries_stated; - if (!ll_sa_entry_unlinked(entry)) - cfs_list_del_init(&entry->se_list); + if (!cfs_list_empty(&entry->se_list)) + cfs_list_del_init(&entry->se_list); - cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { - if (se->se_index < entry->se_index) { - pos = &se->se_list; - break; - } - } + cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { + if (se->se_index < entry->se_index) { + pos = &se->se_list; + break; + } + } - cfs_list_add(&entry->se_list, pos); - entry->se_stat = rc; + cfs_list_add(&entry->se_list, pos); + entry->se_stat = stat; } /* @@ -420,7 +413,7 @@ do_sai_entry_to_stated(struct ll_statahead_info *sai, */ static int ll_sa_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, int rc) + struct ll_sa_entry *entry, se_stat_t stat) { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); int ret = 1; @@ -429,7 +422,7 @@ ll_sa_entry_to_stated(struct ll_statahead_info *sai, spin_lock(&lli->lli_sa_lock); if (likely(entry->se_stat != SA_ENTRY_DEST)) { - do_sai_entry_to_stated(sai, entry, rc); + do_sa_entry_to_stated(sai, entry, stat); ret = 0; } spin_unlock(&lli->lli_sa_lock); @@ -492,7 +485,7 @@ static struct ll_statahead_info *ll_sai_alloc(void) cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq); - CFS_INIT_LIST_HEAD(&sai->sai_entries_sent); + CFS_INIT_LIST_HEAD(&sai->sai_entries); CFS_INIT_LIST_HEAD(&sai->sai_entries_received); CFS_INIT_LIST_HEAD(&sai->sai_entries_stated); CFS_INIT_LIST_HEAD(&sai->sai_entries_agl); @@ -543,15 +536,13 @@ static void ll_sai_put(struct ll_statahead_info *sai) PFID(&lli->lli_fid), sai->sai_sent, sai->sai_replied); - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_sent, se_list) - do_sai_entry_fini(sai, entry); - - LASSERT(sa_received_empty(sai)); + cfs_list_for_each_entry_safe(entry, next, + &sai->sai_entries, se_link) + do_sa_entry_fini(sai, entry); - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_stated, se_list) - do_sai_entry_fini(sai, entry); + LASSERT(list_empty(&sai->sai_entries)); + LASSERT(sa_received_empty(sai)); + LASSERT(list_empty(&sai->sai_entries_stated)); LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0); LASSERT(agl_list_empty(sai)); @@ -626,8 +617,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) EXIT; } -static void do_statahead_interpret(struct ll_statahead_info *sai, - struct ll_sa_entry *target) +static void ll_post_statahead(struct ll_statahead_info *sai) { struct inode *dir = sai->sai_inode; struct inode *child; @@ -641,16 +631,11 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, ENTRY; spin_lock(&lli->lli_sa_lock); - if (target != NULL && target->se_req != NULL && - !cfs_list_empty(&target->se_list)) { - entry = target; - } else if (unlikely(sa_received_empty(sai))) { + if (unlikely(sa_received_empty(sai))) { spin_unlock(&lli->lli_sa_lock); RETURN_EXIT; - } else { - entry = sa_first_received_entry(sai); } - + entry = sa_first_received_entry(sai); cfs_atomic_inc(&entry->se_refcount); cfs_list_del_init(&entry->se_list); spin_unlock(&lli->lli_sa_lock); @@ -712,8 +697,9 @@ out: * reference count by calling "ll_intent_drop_lock()" in spite of the * above operations failed or not. Do not worry about calling * "ll_intent_drop_lock()" more than once. */ - rc = ll_sa_entry_to_stated(sai, entry, rc < 0 ? rc : SA_ENTRY_SUCC); - if (rc == 0 && entry->se_index == sai->sai_index_wait && target == NULL) + rc = ll_sa_entry_to_stated(sai, entry, + rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (rc == 0 && entry->se_index == sai->sai_index_wait) cfs_waitq_signal(&sai->sai_waitq); ll_sa_entry_put(sai, entry); } @@ -753,31 +739,28 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, GOTO(out, rc = -EIDRM); } - cfs_list_del_init(&entry->se_list); if (rc != 0) { - sai->sai_replied++; - do_sai_entry_to_stated(sai, entry, rc); - spin_unlock(&lli->lli_sa_lock); - if (entry->se_index == sai->sai_index_wait) - cfs_waitq_signal(&sai->sai_waitq); + do_sa_entry_to_stated(sai, entry, SA_ENTRY_INVA); + wakeup = (entry->se_index == sai->sai_index_wait); } else { - entry->se_minfo = minfo; - entry->se_req = ptlrpc_request_addref(req); - /* Release the async ibits lock ASAP to avoid deadlock - * when statahead thread tries to enqueue lock on parent - * for readpage and other tries to enqueue lock on child - * with parent's lock held, for example: unlink. */ - entry->se_handle = it->d.lustre.it_lock_handle; - ll_intent_drop_lock(it); - wakeup = sa_received_empty(sai); - cfs_list_add_tail(&entry->se_list, - &sai->sai_entries_received); - sai->sai_replied++; - spin_unlock(&lli->lli_sa_lock); - if (wakeup) - cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); + entry->se_minfo = minfo; + entry->se_req = ptlrpc_request_addref(req); + /* Release the async ibits lock ASAP to avoid deadlock + * when statahead thread tries to enqueue lock on parent + * for readpage and other tries to enqueue lock on child + * with parent's lock held, for example: unlink. */ + entry->se_handle = it->d.lustre.it_lock_handle; + ll_intent_drop_lock(it); + wakeup = sa_received_empty(sai); + cfs_list_add_tail(&entry->se_list, + &sai->sai_entries_received); } - ll_sa_entry_put(sai, entry); + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + + ll_sa_entry_put(sai, entry); + if (wakeup) + cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); } EXIT; @@ -1188,7 +1171,7 @@ keep_it: interpret_it: while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); + ll_post_statahead(sai); if (unlikely(!thread_is_running(thread))) { ll_release_page(page, 0); @@ -1243,7 +1226,7 @@ do_it: &lwi); while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); + ll_post_statahead(sai); if (unlikely(!thread_is_running(thread))) GOTO(out, rc = 0); @@ -1309,7 +1292,7 @@ out: /* To release the resources held by received entries. */ while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); + ll_post_statahead(sai); spin_lock(&plli->lli_sa_lock); } @@ -1611,10 +1594,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, RETURN(entry ? 1 : -EAGAIN); } + /* if statahead is busy in readdir, help it do post-work */ while (!ll_sa_entry_stated(entry) && sai->sai_in_readpage && !sa_received_empty(sai)) - do_statahead_interpret(sai, entry); + ll_post_statahead(sai); if (!ll_sa_entry_stated(entry)) { sai->sai_index_wait = entry->se_index;