X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fllite%2Fstatahead.c;h=01521efae132837c329abd75d8f50a8dfde51e2c;hb=4e0c8aeb9460e20eb7be9011c24edca35e17340d;hp=5cf4b9c88cc52400505ae673414d6a2f2d9078cd;hpb=d8b486644ceef57c7388e8049b90bde4bf967500;p=fs%2Flustre-release.git diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 5cf4b9c..01521ef 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -27,7 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -58,38 +58,35 @@ typedef enum { } se_stat_t; struct ll_sa_entry { - /* link into sai->sai_entries_{sent,received,stated} */ - cfs_list_t se_list; - /* link into sai hash table locally */ - cfs_list_t se_hash; - /* entry reference count */ - cfs_atomic_t se_refcount; - /* entry index in the sai */ - __u64 se_index; - /* low layer ldlm lock handle */ - __u64 se_handle; - /* entry status */ - se_stat_t se_stat; - /* entry size, contains name */ - int se_size; - /* pointer to async getattr enqueue info */ - struct md_enqueue_info *se_minfo; - /* pointer to the async getattr request */ - struct ptlrpc_request *se_req; - /* pointer to the target inode */ - struct inode *se_inode; - /* entry name */ - struct qstr se_qstr; + /* link into sai->sai_entries */ + cfs_list_t se_link; + /* link into sai->sai_entries_{received,stated} */ + cfs_list_t se_list; + /* link into sai hash table locally */ + cfs_list_t se_hash; + /* entry reference count */ + cfs_atomic_t se_refcount; + /* entry index in the sai */ + __u64 se_index; + /* low layer ldlm lock handle */ + __u64 se_handle; + /* entry status */ + se_stat_t se_stat; + /* entry size, contains name */ + int se_size; + /* pointer to async getattr enqueue info */ + struct md_enqueue_info *se_minfo; + /* pointer to the async getattr request */ + struct ptlrpc_request *se_req; + /* pointer to the target inode */ + struct inode *se_inode; + /* entry name */ + struct qstr se_qstr; }; static unsigned int sai_generation = 0; static DEFINE_SPINLOCK(sai_generation_lock); -static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry) -{ - return cfs_list_empty(&entry->se_list); -} - static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry) { return cfs_list_empty(&entry->se_hash); @@ -115,11 +112,11 @@ static inline int ll_sa_entry_hash(int val) static inline void ll_sa_entry_enhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - int i = ll_sa_entry_hash(entry->se_qstr.hash); + int i = ll_sa_entry_hash(entry->se_qstr.hash); - cfs_spin_lock(&sai->sai_cache_lock[i]); - cfs_list_add_tail(&entry->se_hash, &sai->sai_cache[i]); - cfs_spin_unlock(&sai->sai_cache_lock[i]); + spin_lock(&sai->sai_cache_lock[i]); + cfs_list_add_tail(&entry->se_hash, &sai->sai_cache[i]); + spin_unlock(&sai->sai_cache_lock[i]); } /* @@ -128,20 +125,17 @@ ll_sa_entry_enhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) static inline void ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - int i = ll_sa_entry_hash(entry->se_qstr.hash); + int i = ll_sa_entry_hash(entry->se_qstr.hash); - cfs_spin_lock(&sai->sai_cache_lock[i]); - cfs_list_del_init(&entry->se_hash); - cfs_spin_unlock(&sai->sai_cache_lock[i]); + spin_lock(&sai->sai_cache_lock[i]); + cfs_list_del_init(&entry->se_hash); + spin_unlock(&sai->sai_cache_lock[i]); } static inline int agl_should_run(struct ll_statahead_info *sai, struct inode *inode) { - if (inode != NULL && S_ISREG(inode->i_mode) && - ll_i2info(inode)->lli_has_smd && sai->sai_agl_valid) - return 1; - return 0; + return (inode != NULL && S_ISREG(inode->i_mode) && sai->sai_agl_valid); } static inline struct ll_sa_entry * @@ -196,7 +190,7 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) } /* - * Insert it into sai_entries_sent tail when init. + * Insert it into sai_entries tail when init. */ static struct ll_sa_entry * ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, @@ -213,8 +207,8 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, if (unlikely(entry == NULL)) RETURN(ERR_PTR(-ENOMEM)); - CDEBUG(D_READA, "alloc sai entry %.*s(%p) index "LPU64"\n", - len, name, entry, index); + CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n", + len, name, entry, index); entry->se_index = index; @@ -252,14 +246,15 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, entry->se_qstr.name = dname; lli = ll_i2info(sai->sai_inode); - cfs_spin_lock(&lli->lli_sa_lock); - cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent); - cfs_spin_unlock(&lli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); + cfs_list_add_tail(&entry->se_link, &sai->sai_entries); + CFS_INIT_LIST_HEAD(&entry->se_list); + ll_sa_entry_enhash(sai, entry); + spin_unlock(&lli->lli_sa_lock); - cfs_atomic_inc(&sai->sai_cache_count); - ll_sa_entry_enhash(sai, entry); + cfs_atomic_inc(&sai->sai_cache_count); - RETURN(entry); + RETURN(entry); } /* @@ -294,17 +289,18 @@ ll_sa_entry_get_byname(struct ll_statahead_info *sai, const struct qstr *qstr) static struct ll_sa_entry * ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index) { - struct ll_sa_entry *entry; + struct ll_sa_entry *entry; - cfs_list_for_each_entry(entry, &sai->sai_entries_sent, se_list) { - if (entry->se_index == index) { - cfs_atomic_inc(&entry->se_refcount); - return entry; + cfs_list_for_each_entry(entry, &sai->sai_entries, se_link) { + if (entry->se_index == index) { + LASSERT(atomic_read(&entry->se_refcount) > 0); + cfs_atomic_inc(&entry->se_refcount); + return entry; } - if (entry->se_index > index) - break; - } - return NULL; + if (entry->se_index > index) + break; + } + return NULL; } static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, @@ -329,37 +325,42 @@ static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, static void ll_sa_entry_put(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - if (cfs_atomic_dec_and_test(&entry->se_refcount)) { - CDEBUG(D_READA, "free sai entry %.*s(%p) index "LPU64"\n", - entry->se_qstr.len, entry->se_qstr.name, entry, - entry->se_index); - - LASSERT(ll_sa_entry_unhashed(entry)); - LASSERT(ll_sa_entry_unlinked(entry)); - - ll_sa_entry_cleanup(sai, entry); - if (entry->se_inode) - iput(entry->se_inode); - - OBD_FREE(entry, entry->se_size); - cfs_atomic_dec(&sai->sai_cache_count); - } + if (cfs_atomic_dec_and_test(&entry->se_refcount)) { + CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n", + entry->se_qstr.len, entry->se_qstr.name, entry, + entry->se_index); + + LASSERT(cfs_list_empty(&entry->se_link)); + LASSERT(cfs_list_empty(&entry->se_list)); + LASSERT(ll_sa_entry_unhashed(entry)); + + ll_sa_entry_cleanup(sai, entry); + if (entry->se_inode) + iput(entry->se_inode); + + OBD_FREE(entry, entry->se_size); + cfs_atomic_dec(&sai->sai_cache_count); + } } static inline void -do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +do_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - ll_sa_entry_unhash(sai, entry); + LASSERT(!ll_sa_entry_unhashed(entry)); + LASSERT(!cfs_list_empty(&entry->se_link)); - cfs_spin_lock(&lli->lli_sa_lock); - entry->se_stat = SA_ENTRY_DEST; - if (likely(!ll_sa_entry_unlinked(entry))) - cfs_list_del_init(&entry->se_list); - cfs_spin_unlock(&lli->lli_sa_lock); + ll_sa_entry_unhash(sai, entry); - ll_sa_entry_put(sai, entry); + spin_lock(&lli->lli_sa_lock); + entry->se_stat = SA_ENTRY_DEST; + cfs_list_del_init(&entry->se_link); + if (likely(!cfs_list_empty(&entry->se_list))) + cfs_list_del_init(&entry->se_list); + spin_unlock(&lli->lli_sa_lock); + + ll_sa_entry_put(sai, entry); } /* @@ -368,52 +369,41 @@ do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) static void ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - struct ll_sa_entry *pos, *next; + struct ll_sa_entry *pos, *next; - if (entry) - do_sai_entry_fini(sai, entry); - - /* drop old entry from sent list */ - cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_sent, - se_list) { - if (is_omitted_entry(sai, pos->se_index)) - do_sai_entry_fini(sai, pos); - else - break; - } + if (entry) + do_sa_entry_fini(sai, entry); - /* drop old entry from stated list */ - cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_stated, - se_list) { - if (is_omitted_entry(sai, pos->se_index)) - do_sai_entry_fini(sai, pos); - else - break; - } + /* drop old entry, only 'scanner' process does this, no need to lock */ + cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) { + if (!is_omitted_entry(sai, pos->se_index)) + break; + do_sa_entry_fini(sai, pos); + } } /* * Inside lli_sa_lock. */ static void -do_sai_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, int rc) +do_sa_entry_to_stated(struct ll_statahead_info *sai, + struct ll_sa_entry *entry, se_stat_t stat) { - struct ll_sa_entry *se; - cfs_list_t *pos = &sai->sai_entries_stated; + struct ll_sa_entry *se; + cfs_list_t *pos = &sai->sai_entries_stated; - if (!ll_sa_entry_unlinked(entry)) - cfs_list_del_init(&entry->se_list); + if (!cfs_list_empty(&entry->se_list)) + cfs_list_del_init(&entry->se_list); - cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { - if (se->se_index < entry->se_index) { - pos = &se->se_list; - break; - } - } + cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { + if (se->se_index < entry->se_index) { + pos = &se->se_list; + break; + } + } - cfs_list_add(&entry->se_list, pos); - entry->se_stat = rc; + cfs_list_add(&entry->se_list, pos); + entry->se_stat = stat; } /* @@ -423,21 +413,21 @@ do_sai_entry_to_stated(struct ll_statahead_info *sai, */ static int ll_sa_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, int rc) + struct ll_sa_entry *entry, se_stat_t stat) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - int ret = 1; + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + int ret = 1; - ll_sa_entry_cleanup(sai, entry); + ll_sa_entry_cleanup(sai, entry); - cfs_spin_lock(&lli->lli_sa_lock); - if (likely(entry->se_stat != SA_ENTRY_DEST)) { - do_sai_entry_to_stated(sai, entry, rc); - ret = 0; - } - cfs_spin_unlock(&lli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); + if (likely(entry->se_stat != SA_ENTRY_DEST)) { + do_sa_entry_to_stated(sai, entry, stat); + ret = 0; + } + spin_unlock(&lli->lli_sa_lock); - return ret; + return ret; } /* @@ -446,67 +436,67 @@ ll_sa_entry_to_stated(struct ll_statahead_info *sai, static void ll_agl_add(struct ll_statahead_info *sai, struct inode *inode, int index) { - struct ll_inode_info *child = ll_i2info(inode); - struct ll_inode_info *parent = ll_i2info(sai->sai_inode); - int added = 0; - - cfs_spin_lock(&child->lli_agl_lock); - if (child->lli_agl_index == 0) { - child->lli_agl_index = index; - cfs_spin_unlock(&child->lli_agl_lock); - - LASSERT(cfs_list_empty(&child->lli_agl_list)); - - igrab(inode); - cfs_spin_lock(&parent->lli_agl_lock); - if (agl_list_empty(sai)) - added = 1; - cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl); - cfs_spin_unlock(&parent->lli_agl_lock); - } else { - cfs_spin_unlock(&child->lli_agl_lock); - } - - if (added > 0) - cfs_waitq_signal(&sai->sai_agl_thread.t_ctl_waitq); + struct ll_inode_info *child = ll_i2info(inode); + struct ll_inode_info *parent = ll_i2info(sai->sai_inode); + int added = 0; + + spin_lock(&child->lli_agl_lock); + if (child->lli_agl_index == 0) { + child->lli_agl_index = index; + spin_unlock(&child->lli_agl_lock); + + LASSERT(cfs_list_empty(&child->lli_agl_list)); + + igrab(inode); + spin_lock(&parent->lli_agl_lock); + if (agl_list_empty(sai)) + added = 1; + cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl); + spin_unlock(&parent->lli_agl_lock); + } else { + spin_unlock(&child->lli_agl_lock); + } + + if (added > 0) + wake_up(&sai->sai_agl_thread.t_ctl_waitq); } static struct ll_statahead_info *ll_sai_alloc(void) { - struct ll_statahead_info *sai; - int i; - ENTRY; - - OBD_ALLOC_PTR(sai); - if (!sai) - RETURN(NULL); - - cfs_atomic_set(&sai->sai_refcount, 1); - - cfs_spin_lock(&sai_generation_lock); - sai->sai_generation = ++sai_generation; - if (unlikely(sai_generation == 0)) - sai->sai_generation = ++sai_generation; - cfs_spin_unlock(&sai_generation_lock); - - sai->sai_max = LL_SA_RPC_MIN; - sai->sai_index = 1; - cfs_waitq_init(&sai->sai_waitq); - cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); - cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq); - - CFS_INIT_LIST_HEAD(&sai->sai_entries_sent); - CFS_INIT_LIST_HEAD(&sai->sai_entries_received); - CFS_INIT_LIST_HEAD(&sai->sai_entries_stated); - CFS_INIT_LIST_HEAD(&sai->sai_entries_agl); - - for (i = 0; i < LL_SA_CACHE_SIZE; i++) { - CFS_INIT_LIST_HEAD(&sai->sai_cache[i]); - cfs_spin_lock_init(&sai->sai_cache_lock[i]); - } - cfs_atomic_set(&sai->sai_cache_count, 0); - - RETURN(sai); + struct ll_statahead_info *sai; + int i; + ENTRY; + + OBD_ALLOC_PTR(sai); + if (!sai) + RETURN(NULL); + + cfs_atomic_set(&sai->sai_refcount, 1); + + spin_lock(&sai_generation_lock); + sai->sai_generation = ++sai_generation; + if (unlikely(sai_generation == 0)) + sai->sai_generation = ++sai_generation; + spin_unlock(&sai_generation_lock); + + sai->sai_max = LL_SA_RPC_MIN; + sai->sai_index = 1; + init_waitqueue_head(&sai->sai_waitq); + init_waitqueue_head(&sai->sai_thread.t_ctl_waitq); + init_waitqueue_head(&sai->sai_agl_thread.t_ctl_waitq); + + CFS_INIT_LIST_HEAD(&sai->sai_entries); + CFS_INIT_LIST_HEAD(&sai->sai_entries_received); + CFS_INIT_LIST_HEAD(&sai->sai_entries_stated); + CFS_INIT_LIST_HEAD(&sai->sai_entries_agl); + + for (i = 0; i < LL_SA_CACHE_SIZE; i++) { + CFS_INIT_LIST_HEAD(&sai->sai_cache[i]); + spin_lock_init(&sai->sai_cache_lock[i]); + } + cfs_atomic_set(&sai->sai_cache_count, 0); + + RETURN(sai); } static inline struct ll_statahead_info * @@ -528,17 +518,17 @@ static void ll_sai_put(struct ll_statahead_info *sai) if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) { /* It is race case, the interpret callback just hold * a reference count */ - cfs_spin_unlock(&lli->lli_sa_lock); - RETURN_EXIT; - } + spin_unlock(&lli->lli_sa_lock); + RETURN_EXIT; + } - LASSERT(lli->lli_opendir_key == NULL); - LASSERT(thread_is_stopped(&sai->sai_thread)); - LASSERT(thread_is_stopped(&sai->sai_agl_thread)); + LASSERT(lli->lli_opendir_key == NULL); + LASSERT(thread_is_stopped(&sai->sai_thread)); + LASSERT(thread_is_stopped(&sai->sai_agl_thread)); - lli->lli_sai = NULL; - lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_sa_lock); + lli->lli_sai = NULL; + lli->lli_opendir_pid = 0; + spin_unlock(&lli->lli_sa_lock); if (sai->sai_sent > sai->sai_replied) CDEBUG(D_READA,"statahead for dir "DFID" does not " @@ -546,15 +536,13 @@ static void ll_sai_put(struct ll_statahead_info *sai) PFID(&lli->lli_fid), sai->sai_sent, sai->sai_replied); - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_sent, se_list) - do_sai_entry_fini(sai, entry); + cfs_list_for_each_entry_safe(entry, next, + &sai->sai_entries, se_link) + do_sa_entry_fini(sai, entry); - LASSERT(sa_received_empty(sai)); - - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_stated, se_list) - do_sai_entry_fini(sai, entry); + LASSERT(list_empty(&sai->sai_entries)); + LASSERT(sa_received_empty(sai)); + LASSERT(list_empty(&sai->sai_entries_stated)); LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0); LASSERT(agl_list_empty(sai)); @@ -584,7 +572,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) } /* Someone is in glimpse (sync or async), do nothing. */ - rc = cfs_down_write_trylock(&lli->lli_glimpse_sem); + rc = down_write_trylock(&lli->lli_glimpse_sem); if (rc == 0) { lli->lli_agl_index = 0; iput(inode); @@ -606,7 +594,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) */ if (lli->lli_glimpse_time != 0 && cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) { - cfs_up_write(&lli->lli_glimpse_sem); + up_write(&lli->lli_glimpse_sem); lli->lli_agl_index = 0; iput(inode); RETURN_EXIT; @@ -618,7 +606,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) cl_agl(inode); lli->lli_agl_index = 0; lli->lli_glimpse_time = cfs_time_current(); - cfs_up_write(&lli->lli_glimpse_sem); + up_write(&lli->lli_glimpse_sem); CDEBUG(D_READA, "Handled (init) async glimpse: inode= " DFID", idx = "LPU64", rc = %d\n", @@ -629,8 +617,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) EXIT; } -static void do_statahead_interpret(struct ll_statahead_info *sai, - struct ll_sa_entry *target) +static void ll_post_statahead(struct ll_statahead_info *sai) { struct inode *dir = sai->sai_inode; struct inode *child; @@ -643,20 +630,15 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, int rc = 0; ENTRY; - cfs_spin_lock(&lli->lli_sa_lock); - if (target != NULL && target->se_req != NULL && - !cfs_list_empty(&target->se_list)) { - entry = target; - } else if (unlikely(sa_received_empty(sai))) { - cfs_spin_unlock(&lli->lli_sa_lock); - RETURN_EXIT; - } else { - entry = sa_first_received_entry(sai); - } - - cfs_atomic_inc(&entry->se_refcount); - cfs_list_del_init(&entry->se_list); - cfs_spin_unlock(&lli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); + if (unlikely(sa_received_empty(sai))) { + spin_unlock(&lli->lli_sa_lock); + RETURN_EXIT; + } + entry = sa_first_received_entry(sai); + cfs_atomic_inc(&entry->se_refcount); + cfs_list_del_init(&entry->se_list); + spin_unlock(&lli->lli_sa_lock); LASSERT(entry->se_handle != 0); @@ -691,16 +673,17 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, } it->d.lustre.it_lock_handle = entry->se_handle; - rc = md_revalidate_lock(ll_i2mdexp(dir), it, NULL, NULL); + rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL); if (rc != 1) GOTO(out, rc = -EAGAIN); - rc = ll_prep_inode(&child, req, dir->i_sb); + rc = ll_prep_inode(&child, req, dir->i_sb, it); if (rc) GOTO(out, rc); - CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n", - child, child->i_ino, child->i_generation); + CDEBUG(D_DLMTRACE, "%s: setting l_data to inode "DFID"(%p)\n", + ll_get_fsname(child->i_sb, NULL, 0), + PFID(ll_inode2fid(child)), child); ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL); entry->se_inode = child; @@ -711,14 +694,15 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, EXIT; out: - /* The "ll_sa_entry_to_stated()" will drop related ldlm ibits lock - * reference count by calling "ll_intent_drop_lock()" in spite of the - * above operations failed or not. Do not worry about calling - * "ll_intent_drop_lock()" more than once. */ - rc = ll_sa_entry_to_stated(sai, entry, rc < 0 ? rc : SA_ENTRY_SUCC); - if (rc == 0 && entry->se_index == sai->sai_index_wait && target == NULL) - cfs_waitq_signal(&sai->sai_waitq); - ll_sa_entry_put(sai, entry); + /* The "ll_sa_entry_to_stated()" will drop related ldlm ibits lock + * reference count by calling "ll_intent_drop_lock()" in spite of the + * above operations failed or not. Do not worry about calling + * "ll_intent_drop_lock()" more than once. */ + rc = ll_sa_entry_to_stated(sai, entry, + rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (rc == 0 && entry->se_index == sai->sai_index_wait) + wake_up(&sai->sai_waitq); + ll_sa_entry_put(sai, entry); } static int ll_statahead_interpret(struct ptlrpc_request *req, @@ -735,52 +719,49 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, if (it_disposition(it, DISP_LOOKUP_NEG)) rc = -ENOENT; - cfs_spin_lock(&lli->lli_sa_lock); - /* stale entry */ - if (unlikely(lli->lli_sai == NULL || - lli->lli_sai->sai_generation != minfo->mi_generation)) { - cfs_spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -ESTALE); - } else { - sai = ll_sai_get(lli->lli_sai); - if (unlikely(!thread_is_running(&sai->sai_thread))) { - sai->sai_replied++; - cfs_spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -EBADFD); - } - - entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata); - if (entry == NULL) { - sai->sai_replied++; - cfs_spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -EIDRM); - } - - cfs_list_del_init(&entry->se_list); - if (rc != 0) { - sai->sai_replied++; - do_sai_entry_to_stated(sai, entry, rc); - cfs_spin_unlock(&lli->lli_sa_lock); - if (entry->se_index == sai->sai_index_wait) - cfs_waitq_signal(&sai->sai_waitq); + spin_lock(&lli->lli_sa_lock); + /* stale entry */ + if (unlikely(lli->lli_sai == NULL || + lli->lli_sai->sai_generation != minfo->mi_generation)) { + spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -ESTALE); + } else { + sai = ll_sai_get(lli->lli_sai); + if (unlikely(!thread_is_running(&sai->sai_thread))) { + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -EBADFD); + } + + entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata); + if (entry == NULL) { + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -EIDRM); + } + + if (rc != 0) { + do_sa_entry_to_stated(sai, entry, SA_ENTRY_INVA); + wakeup = (entry->se_index == sai->sai_index_wait); } else { - entry->se_minfo = minfo; - entry->se_req = ptlrpc_request_addref(req); - /* Release the async ibits lock ASAP to avoid deadlock - * when statahead thread tries to enqueue lock on parent - * for readpage and other tries to enqueue lock on child - * with parent's lock held, for example: unlink. */ - entry->se_handle = it->d.lustre.it_lock_handle; - ll_intent_drop_lock(it); - wakeup = sa_received_empty(sai); - cfs_list_add_tail(&entry->se_list, - &sai->sai_entries_received); - sai->sai_replied++; - cfs_spin_unlock(&lli->lli_sa_lock); - if (wakeup) - cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); + entry->se_minfo = minfo; + entry->se_req = ptlrpc_request_addref(req); + /* Release the async ibits lock ASAP to avoid deadlock + * when statahead thread tries to enqueue lock on parent + * for readpage and other tries to enqueue lock on child + * with parent's lock held, for example: unlink. */ + entry->se_handle = it->d.lustre.it_lock_handle; + ll_intent_drop_lock(it); + wakeup = sa_received_empty(sai); + cfs_list_add_tail(&entry->se_list, + &sai->sai_entries_received); } - ll_sa_entry_put(sai, entry); + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + + ll_sa_entry_put(sai, entry); + if (wakeup) + wake_up(&sai->sai_thread.t_ctl_waitq); } EXIT; @@ -913,9 +894,6 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, if (d_mountpoint(dentry)) RETURN(1); - if (unlikely(dentry == dentry->d_sb->s_root)) - RETURN(1); - entry->se_inode = igrab(inode); rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),NULL); if (rc == 1) { @@ -973,14 +951,14 @@ static void ll_statahead_one(struct dentry *parent, const char* entry_name, if (dentry != NULL) dput(dentry); - if (rc) { - rc1 = ll_sa_entry_to_stated(sai, entry, - rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); - if (rc1 == 0 && entry->se_index == sai->sai_index_wait) - cfs_waitq_signal(&sai->sai_waitq); - } else { - sai->sai_sent++; - } + if (rc) { + rc1 = ll_sa_entry_to_stated(sai, entry, + rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (rc1 == 0 && entry->se_index == sai->sai_index_wait) + wake_up(&sai->sai_waitq); + } else { + sai->sai_sent++; + } sai->sai_index++; /* drop one refcount on entry by ll_sa_entry_alloc */ @@ -1001,21 +979,15 @@ static int ll_agl_thread(void *arg) struct l_wait_info lwi = { 0 }; ENTRY; - { - char pname[16]; - snprintf(pname, 15, "ll_agl_%u", plli->lli_opendir_pid); - cfs_daemonize(pname); - } - - CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n", + current_pid(), parent->d_name.len, parent->d_name.name); - atomic_inc(&sbi->ll_agl_total); - cfs_spin_lock(&plli->lli_agl_lock); - sai->sai_agl_valid = 1; - thread_set_flags(thread, SVC_RUNNING); - cfs_spin_unlock(&plli->lli_agl_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); + atomic_inc(&sbi->ll_agl_total); + spin_lock(&plli->lli_agl_lock); + sai->sai_agl_valid = 1; + thread_set_flags(thread, SVC_RUNNING); + spin_unlock(&plli->lli_agl_lock); + wake_up(&thread->t_ctl_waitq); while (1) { l_wait_event(thread->t_ctl_waitq, @@ -1026,305 +998,273 @@ static int ll_agl_thread(void *arg) if (!thread_is_running(thread)) break; - cfs_spin_lock(&plli->lli_agl_lock); - /* The statahead thread maybe help to process AGL entries, - * so check whether list empty again. */ - if (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - cfs_spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, sai); - } else { - cfs_spin_unlock(&plli->lli_agl_lock); - } - } - - cfs_spin_lock(&plli->lli_agl_lock); - sai->sai_agl_valid = 0; - while (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - cfs_spin_unlock(&plli->lli_agl_lock); - clli->lli_agl_index = 0; - iput(&clli->lli_vfs_inode); - cfs_spin_lock(&plli->lli_agl_lock); - } - thread_set_flags(thread, SVC_STOPPED); - cfs_spin_unlock(&plli->lli_agl_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); - ll_sai_put(sai); - CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - RETURN(0); + spin_lock(&plli->lli_agl_lock); + /* The statahead thread maybe help to process AGL entries, + * so check whether list empty again. */ + if (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + } else { + spin_unlock(&plli->lli_agl_lock); + } + } + + spin_lock(&plli->lli_agl_lock); + sai->sai_agl_valid = 0; + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + clli->lli_agl_index = 0; + iput(&clli->lli_vfs_inode); + spin_lock(&plli->lli_agl_lock); + } + thread_set_flags(thread, SVC_STOPPED); + spin_unlock(&plli->lli_agl_lock); + wake_up(&thread->t_ctl_waitq); + ll_sai_put(sai); + CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n", + current_pid(), parent->d_name.len, parent->d_name.name); + RETURN(0); } static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) { - struct ptlrpc_thread *thread = &sai->sai_agl_thread; - struct l_wait_info lwi = { 0 }; - int rc; - ENTRY; - - CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - - rc = cfs_create_thread(ll_agl_thread, parent, 0); - if (rc < 0) { - CERROR("can't start ll_agl thread, rc: %d\n", rc); - thread_set_flags(thread, SVC_STOPPED); - RETURN_EXIT; - } - - l_wait_event(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread), - &lwi); - EXIT; + struct ptlrpc_thread *thread = &sai->sai_agl_thread; + struct l_wait_info lwi = { 0 }; + struct ll_inode_info *plli; + struct task_struct *task; + ENTRY; + + CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n", + current_pid(), parent->d_name.len, parent->d_name.name); + + plli = ll_i2info(parent->d_inode); + task = kthread_run(ll_agl_thread, parent, + "ll_agl_%u", plli->lli_opendir_pid); + if (IS_ERR(task)) { + CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task)); + thread_set_flags(thread, SVC_STOPPED); + RETURN_EXIT; + } + + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); + EXIT; } static int ll_statahead_thread(void *arg) { - struct dentry *parent = (struct dentry *)arg; - struct inode *dir = parent->d_inode; - struct ll_inode_info *plli = ll_i2info(dir); - struct ll_inode_info *clli; - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); - struct ptlrpc_thread *thread = &sai->sai_thread; - struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; - struct page *page; - __u64 pos = 0; - int first = 0; - int rc = 0; - struct ll_dir_chain chain; - struct l_wait_info lwi = { 0 }; - ENTRY; - - { - char pname[16]; - snprintf(pname, 15, "ll_sa_%u", plli->lli_opendir_pid); - cfs_daemonize(pname); - } - - CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - - if (sbi->ll_flags & LL_SBI_AGL_ENABLED) - ll_start_agl(parent, sai); + struct dentry *parent = (struct dentry *)arg; + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); + struct ptlrpc_thread *thread = &sai->sai_thread; + struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; + int first = 0; + int rc = 0; + struct md_op_data *op_data; + struct ll_dir_chain chain; + struct l_wait_info lwi = { 0 }; + struct lu_dirent *ent; + ENTRY; + + CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n", + current_pid(), parent->d_name.len, parent->d_name.name); + + op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, + LUSTRE_OPC_ANY, dir); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + + op_data->op_hash_offset = 0; + op_data->op_max_pages = + ll_i2sbi(dir)->ll_md_brw_size >> PAGE_CACHE_SHIFT; + + if (sbi->ll_flags & LL_SBI_AGL_ENABLED) + ll_start_agl(parent, sai); + + atomic_inc(&sbi->ll_sa_total); + spin_lock(&plli->lli_sa_lock); + thread_set_flags(thread, SVC_RUNNING); + spin_unlock(&plli->lli_sa_lock); + wake_up(&thread->t_ctl_waitq); + + ll_dir_chain_init(&chain); + for (ent = ll_dir_entry_start(dir, op_data); + ent != NULL && !IS_ERR(ent); + ent = ll_dir_entry_next(dir, op_data, ent)) { + __u64 hash; + int namelen; + char *name; + + hash = le64_to_cpu(ent->lde_hash); + if (unlikely(hash < op_data->op_hash_offset)) + /* + * Skip until we find target hash value. + */ + continue; + + namelen = le16_to_cpu(ent->lde_namelen); + if (unlikely(namelen == 0)) + /* + * Skip dummy record. + */ + continue; + + name = ent->lde_name; + if (name[0] == '.') { + if (namelen == 1) { + /* + * skip "." + */ + continue; + } else if (name[1] == '.' && namelen == 2) { + /* + * skip ".." + */ + continue; + } else if (!sai->sai_ls_all) { + /* + * skip hidden files. + */ + sai->sai_skip_hidden++; + continue; + } + } + + /* + * don't stat-ahead first entry. + */ + if (unlikely(++first == 1)) + continue; - atomic_inc(&sbi->ll_sa_total); - cfs_spin_lock(&plli->lli_sa_lock); - thread_set_flags(thread, SVC_RUNNING); - cfs_spin_unlock(&plli->lli_sa_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); - - ll_dir_chain_init(&chain); - page = ll_get_dir_page(dir, pos, &chain); - - while (1) { - struct lu_dirpage *dp; - struct lu_dirent *ent; - - if (IS_ERR(page)) { - rc = PTR_ERR(page); - CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 - "/"LPU64": [rc %d] [parent %u]\n", - PFID(ll_inode2fid(dir)), pos, sai->sai_index, - rc, plli->lli_opendir_pid); - GOTO(out, rc); - } - - dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - __u64 hash; - int namelen; - char *name; +keep_it: + l_wait_event(thread->t_ctl_waitq, + !sa_sent_full(sai) || + !sa_received_empty(sai) || + !agl_list_empty(sai) || + !thread_is_running(thread), + &lwi); - hash = le64_to_cpu(ent->lde_hash); - if (unlikely(hash < pos)) - /* - * Skip until we find target hash value. - */ - continue; +interpret_it: + while (!sa_received_empty(sai)) + ll_post_statahead(sai); - namelen = le16_to_cpu(ent->lde_namelen); - if (unlikely(namelen == 0)) - /* - * Skip dummy record. - */ - continue; + if (unlikely(!thread_is_running(thread))) + GOTO(out, rc = 0); - name = ent->lde_name; - if (name[0] == '.') { - if (namelen == 1) { - /* - * skip "." - */ - continue; - } else if (name[1] == '.' && namelen == 2) { - /* - * skip ".." - */ - continue; - } else if (!sai->sai_ls_all) { - /* - * skip hidden files. - */ - sai->sai_skip_hidden++; - continue; - } - } + /* If no window for metadata statahead, but there are + * some AGL entries to be triggered, then try to help + * to process the AGL entries. */ + if (sa_sent_full(sai)) { + spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, + sai); - /* - * don't stat-ahead first entry. - */ - if (unlikely(++first == 1)) - continue; + if (!sa_received_empty(sai)) + goto interpret_it; -keep_it: - l_wait_event(thread->t_ctl_waitq, - !sa_sent_full(sai) || - !sa_received_empty(sai) || - !agl_list_empty(sai) || - !thread_is_running(thread), - &lwi); + if (unlikely( + !thread_is_running(thread))) + GOTO(out, rc = 0); -interpret_it: - while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); + if (!sa_sent_full(sai)) + goto do_it; - if (unlikely(!thread_is_running(thread))) { - ll_release_page(page, 0); - GOTO(out, rc = 0); - } + spin_lock(&plli->lli_agl_lock); + } + spin_unlock(&plli->lli_agl_lock); - /* If no window for metadata statahead, but there are - * some AGL entries to be triggered, then try to help - * to process the AGL entries. */ - if (sa_sent_full(sai)) { - cfs_spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - cfs_spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, - sai); - - if (!sa_received_empty(sai)) - goto interpret_it; - - if (unlikely( - !thread_is_running(thread))) { - ll_release_page(page, 0); - GOTO(out, rc = 0); - } - - if (!sa_sent_full(sai)) - goto do_it; - - cfs_spin_lock(&plli->lli_agl_lock); - } - cfs_spin_unlock(&plli->lli_agl_lock); - - goto keep_it; - } + goto keep_it; + } do_it: - ll_statahead_one(parent, name, namelen); - } - pos = le64_to_cpu(dp->ldp_hash_end); - if (pos == MDS_DIR_END_OFF) { - /* - * End of directory reached. - */ - ll_release_page(page, 0); - while (1) { - l_wait_event(thread->t_ctl_waitq, - !sa_received_empty(sai) || - sai->sai_sent == sai->sai_replied|| - !thread_is_running(thread), - &lwi); - - while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); - - if (unlikely(!thread_is_running(thread))) - GOTO(out, rc = 0); - - if (sai->sai_sent == sai->sai_replied && - sa_received_empty(sai)) - break; - } - - cfs_spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai) && - thread_is_running(thread)) { - clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); - cfs_spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, sai); - cfs_spin_lock(&plli->lli_agl_lock); - } - cfs_spin_unlock(&plli->lli_agl_lock); - - GOTO(out, rc = 0); - } else if (1) { - /* - * chain is exhausted. - * Normal case: continue to the next page. - */ - ll_release_page(page, le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - sai->sai_in_readpage = 1; - page = ll_get_dir_page(dir, pos, &chain); - sai->sai_in_readpage = 0; - } else { - LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - ll_release_page(page, 1); - /* - * go into overflow page. - */ - } - } - EXIT; - + ll_statahead_one(parent, name, namelen); + } + + if (ent != NULL && !IS_ERR(ent)) + ll_dir_entry_end(dir, op_data, ent); + + /* + * End of directory reached. + */ + while (1) { + l_wait_event(thread->t_ctl_waitq, + !sa_received_empty(sai) || + sai->sai_sent == sai->sai_replied || + !thread_is_running(thread), + &lwi); + + while (!sa_received_empty(sai)) + ll_post_statahead(sai); + + if (unlikely(!thread_is_running(thread))) + GOTO(out, rc = 0); + + if (sai->sai_sent == sai->sai_replied && + sa_received_empty(sai)) + break; + } + + spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai) && + thread_is_running(thread)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + spin_lock(&plli->lli_agl_lock); + } + spin_unlock(&plli->lli_agl_lock); out: + EXIT; + ll_finish_md_op_data(op_data); if (sai->sai_agl_valid) { - cfs_spin_lock(&plli->lli_agl_lock); - thread_set_flags(agl_thread, SVC_STOPPING); - cfs_spin_unlock(&plli->lli_agl_lock); - cfs_waitq_signal(&agl_thread->t_ctl_waitq); - - CDEBUG(D_READA, "stop agl thread: [pid %d]\n", - cfs_curproc_pid()); - l_wait_event(agl_thread->t_ctl_waitq, - thread_is_stopped(agl_thread), - &lwi); + spin_lock(&plli->lli_agl_lock); + thread_set_flags(agl_thread, SVC_STOPPING); + spin_unlock(&plli->lli_agl_lock); + wake_up(&agl_thread->t_ctl_waitq); + + CDEBUG(D_READA, "stop agl thread: [pid %d]\n", + current_pid()); + l_wait_event(agl_thread->t_ctl_waitq, + thread_is_stopped(agl_thread), + &lwi); } else { /* Set agl_thread flags anyway. */ thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); } ll_dir_chain_fini(&chain); - cfs_spin_lock(&plli->lli_sa_lock); - if (!sa_received_empty(sai)) { - thread_set_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&plli->lli_sa_lock); - - /* To release the resources held by received entries. */ - while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); - - cfs_spin_lock(&plli->lli_sa_lock); - } - thread_set_flags(thread, SVC_STOPPED); - cfs_spin_unlock(&plli->lli_sa_lock); - cfs_waitq_signal(&sai->sai_waitq); - cfs_waitq_signal(&thread->t_ctl_waitq); + spin_lock(&plli->lli_sa_lock); + if (!sa_received_empty(sai)) { + thread_set_flags(thread, SVC_STOPPING); + spin_unlock(&plli->lli_sa_lock); + + /* To release the resources held by received entries. */ + while (!sa_received_empty(sai)) + ll_post_statahead(sai); + + spin_lock(&plli->lli_sa_lock); + } + thread_set_flags(thread, SVC_STOPPED); + spin_unlock(&plli->lli_sa_lock); + wake_up(&sai->sai_waitq); + wake_up(&thread->t_ctl_waitq); ll_sai_put(sai); dput(parent); - CDEBUG(D_READA, "statahead thread stopped: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - return rc; + CDEBUG(D_READA, "statahead thread stopped: [pid %d] [parent %.*s]\n", + current_pid(), parent->d_name.len, parent->d_name.name); + return rc; } /** @@ -1332,14 +1272,14 @@ out: */ void ll_stop_statahead(struct inode *dir, void *key) { - struct ll_inode_info *lli = ll_i2info(dir); + struct ll_inode_info *lli = ll_i2info(dir); - if (unlikely(key == NULL)) - return; + if (unlikely(key == NULL)) + return; - cfs_spin_lock(&lli->lli_sa_lock); - if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) { - cfs_spin_unlock(&lli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); + if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) { + spin_unlock(&lli->lli_sa_lock); return; } @@ -1351,28 +1291,28 @@ void ll_stop_statahead(struct inode *dir, void *key) if (!thread_is_stopped(thread)) { thread_set_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&lli->lli_sa_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); - - CDEBUG(D_READA, "stop statahead thread: [pid %d]\n", - cfs_curproc_pid()); - l_wait_event(thread->t_ctl_waitq, - thread_is_stopped(thread), - &lwi); - } else { - cfs_spin_unlock(&lli->lli_sa_lock); - } - - /* - * Put the ref which was held when first statahead_enter. - * It maybe not the last ref for some statahead requests - * maybe inflight. - */ - ll_sai_put(lli->lli_sai); - } else { - lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_sa_lock); - } + spin_unlock(&lli->lli_sa_lock); + wake_up(&thread->t_ctl_waitq); + + CDEBUG(D_READA, "stop statahead thread: [pid %d]\n", + current_pid()); + l_wait_event(thread->t_ctl_waitq, + thread_is_stopped(thread), + &lwi); + } else { + spin_unlock(&lli->lli_sa_lock); + } + + /* + * Put the ref which was held when first statahead_enter. + * It maybe not the last ref for some statahead requests + * maybe inflight. + */ + ll_sai_put(lli->lli_sai); + } else { + lli->lli_opendir_pid = 0; + spin_unlock(&lli->lli_sa_lock); + } } enum { @@ -1392,115 +1332,89 @@ enum { static int is_first_dirent(struct inode *dir, struct dentry *dentry) { - struct ll_dir_chain chain; - struct qstr *target = &dentry->d_name; - struct page *page; - __u64 pos = 0; - int dot_de; - int rc = LS_NONE_FIRST_DE; - ENTRY; - - ll_dir_chain_init(&chain); - page = ll_get_dir_page(dir, pos, &chain); - - while (1) { - struct lu_dirpage *dp; - struct lu_dirent *ent; - - if (IS_ERR(page)) { - struct ll_inode_info *lli = ll_i2info(dir); - - rc = PTR_ERR(page); - CERROR("error reading dir "DFID" at "LPU64": " - "[rc %d] [parent %u]\n", - PFID(ll_inode2fid(dir)), pos, - rc, lli->lli_opendir_pid); - break; - } - - dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - __u64 hash; - int namelen; - char *name; - - hash = le64_to_cpu(ent->lde_hash); - /* The ll_get_dir_page() can return any page containing - * the given hash which may be not the start hash. */ - if (unlikely(hash < pos)) - continue; - - namelen = le16_to_cpu(ent->lde_namelen); - if (unlikely(namelen == 0)) - /* - * skip dummy record. - */ - continue; - - name = ent->lde_name; - if (name[0] == '.') { - if (namelen == 1) - /* - * skip "." - */ - continue; - else if (name[1] == '.' && namelen == 2) - /* - * skip ".." - */ - continue; - else - dot_de = 1; - } else { - dot_de = 0; - } - - if (dot_de && target->name[0] != '.') { - CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", - target->len, target->name, - namelen, name); - continue; - } - - if (target->len != namelen || - memcmp(target->name, name, namelen) != 0) - rc = LS_NONE_FIRST_DE; - else if (!dot_de) - rc = LS_FIRST_DE; - else - rc = LS_FIRST_DOT_DE; - - ll_release_page(page, 0); - GOTO(out, rc); - } - pos = le64_to_cpu(dp->ldp_hash_end); - if (pos == MDS_DIR_END_OFF) { - /* - * End of directory reached. - */ - ll_release_page(page, 0); - break; - } else if (1) { - /* - * chain is exhausted - * Normal case: continue to the next page. - */ - ll_release_page(page, le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - page = ll_get_dir_page(dir, pos, &chain); - } else { - /* - * go into overflow page. - */ - LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - ll_release_page(page, 1); - } - } + struct ll_dir_chain chain; + struct qstr *target = &dentry->d_name; + struct md_op_data *op_data; + int dot_de; + struct lu_dirent *ent; + int rc = LS_NONE_FIRST_DE; + ENTRY; + + ll_dir_chain_init(&chain); + + op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, + LUSTRE_OPC_ANY, dir); + if (IS_ERR(op_data)) + GOTO(out, rc = PTR_ERR(op_data)); + /** + *FIXME choose the start offset of the readdir + */ + op_data->op_stripe_offset = 0; + op_data->op_hash_offset = 0; + op_data->op_max_pages = + ll_i2sbi(dir)->ll_md_brw_size >> PAGE_CACHE_SHIFT; + + for (ent = ll_dir_entry_start(dir, op_data); + ent != NULL && !IS_ERR(ent); + ent = ll_dir_entry_next(dir, op_data, ent)) { + __u64 hash; + int namelen; + char *name; + + hash = le64_to_cpu(ent->lde_hash); + /* The ll_get_dir_page() can return any page containing + * the given hash which may be not the start hash. */ + if (unlikely(hash < op_data->op_hash_offset)) + continue; + + namelen = le16_to_cpu(ent->lde_namelen); + if (unlikely(namelen == 0)) + /* + * skip dummy record. + */ + continue; + + name = ent->lde_name; + if (name[0] == '.') { + if (namelen == 1) + /* + * skip "." + */ + continue; + else if (name[1] == '.' && namelen == 2) + /* + * skip ".." + */ + continue; + else + dot_de = 1; + } else { + dot_de = 0; + } + + if (dot_de && target->name[0] != '.') { + CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", + target->len, target->name, + namelen, name); + continue; + } + + if (target->len != namelen || + memcmp(target->name, name, namelen) != 0) + rc = LS_NONE_FIRST_DE; + else if (!dot_de) + rc = LS_FIRST_DE; + else + rc = LS_FIRST_DOT_DE; + + break; + } EXIT; - + if (ent != NULL && !IS_ERR(ent)) + ll_dir_entry_end(dir, op_data, ent); + ll_finish_md_op_data(op_data); out: - ll_dir_chain_fini(&chain); + ll_dir_chain_fini(&chain); return rc; } @@ -1529,24 +1443,24 @@ ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) sai->sai_consecutive_miss++; if (sa_low_hit(sai) && thread_is_running(thread)) { atomic_inc(&sbi->ll_sa_wrong); - CDEBUG(D_READA, "Statahead for dir "DFID" hit " - "ratio too low: hit/miss "LPU64"/"LPU64 - ", sent/replied "LPU64"/"LPU64", stopping " - "statahead thread: pid %d\n", - PFID(&lli->lli_fid), sai->sai_hit, - sai->sai_miss, sai->sai_sent, - sai->sai_replied, cfs_curproc_pid()); - cfs_spin_lock(&lli->lli_sa_lock); - if (!thread_is_stopped(thread)) - thread_set_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&lli->lli_sa_lock); - } - } - - if (!thread_is_stopped(thread)) - cfs_waitq_signal(&thread->t_ctl_waitq); - - EXIT; + CDEBUG(D_READA, "Statahead for dir "DFID" hit " + "ratio too low: hit/miss "LPU64"/"LPU64 + ", sent/replied "LPU64"/"LPU64", stopping " + "statahead thread: pid %d\n", + PFID(&lli->lli_fid), sai->sai_hit, + sai->sai_miss, sai->sai_sent, + sai->sai_replied, current_pid()); + spin_lock(&lli->lli_sa_lock); + if (!thread_is_stopped(thread)) + thread_set_flags(thread, SVC_STOPPING); + spin_unlock(&lli->lli_sa_lock); + } + } + + if (!thread_is_stopped(thread)) + wake_up(&thread->t_ctl_waitq); + + EXIT; } /** @@ -1561,16 +1475,17 @@ ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int only_unplug) { - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; - struct dentry *parent; - struct ll_sa_entry *entry; - struct ptlrpc_thread *thread; - struct l_wait_info lwi = { 0 }; - int rc = 0; - ENTRY; - - LASSERT(lli->lli_opendir_pid == cfs_curproc_pid()); + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; + struct dentry *parent; + struct ll_sa_entry *entry; + struct ptlrpc_thread *thread; + struct l_wait_info lwi = { 0 }; + int rc = 0; + struct ll_inode_info *plli; + ENTRY; + + LASSERT(lli->lli_opendir_pid == current_pid()); if (sai) { thread = &sai->sai_thread; @@ -1614,10 +1529,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, RETURN(entry ? 1 : -EAGAIN); } + /* if statahead is busy in readdir, help it do post-work */ while (!ll_sa_entry_stated(entry) && sai->sai_in_readpage && !sa_received_empty(sai)) - do_statahead_interpret(sai, entry); + ll_post_statahead(sai); if (!ll_sa_entry_stated(entry)) { sai->sai_index_wait = entry->se_index; @@ -1645,19 +1561,26 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, ll_inode2fid(inode), &bits); if (rc == 1) { if ((*dentryp)->d_inode == NULL) { - *dentryp = ll_splice_alias(inode, - *dentryp); + struct dentry *alias; + + alias = ll_splice_alias(inode, + *dentryp); + if (IS_ERR(alias)) { + ll_sai_unplug(sai, entry); + RETURN(PTR_ERR(alias)); + } + *dentryp = alias; } else if ((*dentryp)->d_inode != inode) { /* revalidate, but inode is recreated */ CDEBUG(D_READA, - "stale dentry %.*s inode %lu/%u, " - "statahead inode %lu/%u\n", - (*dentryp)->d_name.len, - (*dentryp)->d_name.name, - (*dentryp)->d_inode->i_ino, - (*dentryp)->d_inode->i_generation, - inode->i_ino, - inode->i_generation); + "%s: stale dentry %.*s inode " + DFID", statahead inode "DFID + "\n", + ll_get_fsname((*dentryp)->d_inode->i_sb, NULL, 0), + (*dentryp)->d_name.len, + (*dentryp)->d_name.name, + PFID(ll_inode2fid((*dentryp)->d_inode)), + PFID(ll_inode2fid(inode))); ll_sai_unplug(sai, entry); RETURN(-ESTALE); } else { @@ -1708,15 +1631,18 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, GOTO(out, rc = -EAGAIN); } - CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", + current_pid(), parent->d_name.len, parent->d_name.name); lli->lli_sai = sai; - rc = cfs_create_thread(ll_statahead_thread, parent, 0); - thread = &sai->sai_thread; - if (rc < 0) { - CERROR("can't start ll_sa thread, rc: %d\n", rc); - dput(parent); + + plli = ll_i2info(parent->d_inode); + rc = PTR_ERR(kthread_run(ll_statahead_thread, parent, + "ll_sa_%u", plli->lli_opendir_pid)); + thread = &sai->sai_thread; + if (IS_ERR_VALUE(rc)) { + CERROR("can't start ll_sa thread, rc: %d\n", rc); + dput(parent); lli->lli_opendir_key = NULL; thread_set_flags(thread, SVC_STOPPED); thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); @@ -1738,9 +1664,9 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, out: if (sai != NULL) OBD_FREE_PTR(sai); - cfs_spin_lock(&lli->lli_sa_lock); - lli->lli_opendir_key = NULL; - lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_sa_lock); - return rc; + spin_lock(&lli->lli_sa_lock); + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; + spin_unlock(&lli->lli_sa_lock); + return rc; }