X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Fstatahead.c;h=cc290aad82d432f198740bd2ca1be48bf066940e;hp=98c40e354de13e6bbe7a42241f9b7fe9ecfd51c4;hb=8e13d34da223e0fc86a7cd269675b067e353f829;hpb=08aa217ce49aba1ded52e0f7adb8a607035123fd diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 98c40e3..cc290aa 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -27,7 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel Corporation. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -58,41 +58,38 @@ typedef enum { } se_stat_t; struct ll_sa_entry { - /* link into sai->sai_entries_{sent,received,stated} */ - cfs_list_t se_list; - /* link into sai hash table locally */ - cfs_list_t se_hash; - /* entry reference count */ - cfs_atomic_t se_refcount; - /* entry index in the sai */ - __u64 se_index; - /* low layer ldlm lock handle */ - __u64 se_handle; - /* entry status */ - se_stat_t se_stat; - /* entry size, contains name */ - int se_size; - /* pointer to async getattr enqueue info */ - struct md_enqueue_info *se_minfo; - /* pointer to the async getattr request */ - struct ptlrpc_request *se_req; - /* pointer to the target inode */ - struct inode *se_inode; - /* entry name */ - struct qstr se_qstr; + /* link into sai->sai_entries */ + struct list_head se_link; + /* link into sai->sai_entries_{received,stated} */ + struct list_head se_list; + /* link into sai hash table locally */ + struct list_head se_hash; + /* entry reference count */ + atomic_t se_refcount; + /* entry index in the sai */ + __u64 se_index; + /* low layer ldlm lock handle */ + __u64 se_handle; + /* entry status */ + se_stat_t se_stat; + /* entry size, contains name */ + int se_size; + /* pointer to async getattr enqueue info */ + struct md_enqueue_info *se_minfo; + /* pointer to the async getattr request */ + struct ptlrpc_request *se_req; + /* pointer to the target inode */ + struct inode *se_inode; + /* entry name */ + struct qstr se_qstr; }; static unsigned int sai_generation = 0; static DEFINE_SPINLOCK(sai_generation_lock); -static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry) -{ - return cfs_list_empty(&entry->se_list); -} - static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry) { - return cfs_list_empty(&entry->se_hash); + return list_empty(&entry->se_hash); } /* @@ -118,7 +115,7 @@ ll_sa_entry_enhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) int i = ll_sa_entry_hash(entry->se_qstr.hash); spin_lock(&sai->sai_cache_lock[i]); - cfs_list_add_tail(&entry->se_hash, &sai->sai_cache[i]); + list_add_tail(&entry->se_hash, &sai->sai_cache[i]); spin_unlock(&sai->sai_cache_lock[i]); } @@ -131,7 +128,7 @@ ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) int i = ll_sa_entry_hash(entry->se_qstr.hash); spin_lock(&sai->sai_cache_lock[i]); - cfs_list_del_init(&entry->se_hash); + list_del_init(&entry->se_hash); spin_unlock(&sai->sai_cache_lock[i]); } @@ -144,30 +141,30 @@ static inline int agl_should_run(struct ll_statahead_info *sai, static inline struct ll_sa_entry * sa_first_received_entry(struct ll_statahead_info *sai) { - return cfs_list_entry(sai->sai_entries_received.next, - struct ll_sa_entry, se_list); + return list_entry(sai->sai_entries_received.next, + struct ll_sa_entry, se_list); } static inline struct ll_inode_info * agl_first_entry(struct ll_statahead_info *sai) { - return cfs_list_entry(sai->sai_entries_agl.next, - struct ll_inode_info, lli_agl_list); + return list_entry(sai->sai_entries_agl.next, + struct ll_inode_info, lli_agl_list); } static inline int sa_sent_full(struct ll_statahead_info *sai) { - return cfs_atomic_read(&sai->sai_cache_count) >= sai->sai_max; + return atomic_read(&sai->sai_cache_count) >= sai->sai_max; } static inline int sa_received_empty(struct ll_statahead_info *sai) { - return cfs_list_empty(&sai->sai_entries_received); + return list_empty(&sai->sai_entries_received); } static inline int agl_list_empty(struct ll_statahead_info *sai) { - return cfs_list_empty(&sai->sai_entries_agl); + return list_empty(&sai->sai_entries_agl); } /** @@ -193,7 +190,7 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) } /* - * Insert it into sai_entries_sent tail when init. + * Insert it into sai_entries tail when init. */ static struct ll_sa_entry * ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, @@ -210,51 +207,52 @@ ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, if (unlikely(entry == NULL)) RETURN(ERR_PTR(-ENOMEM)); - CDEBUG(D_READA, "alloc sai entry %.*s(%p) index "LPU64"\n", - len, name, entry, index); + CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n", + len, name, entry, index); entry->se_index = index; - /* - * Statahead entry reference rules: - * - * 1) When statahead entry is initialized, its reference is set as 2. - * One reference is used by the directory scanner. When the scanner - * searches the statahead cache for the given name, it can perform - * lockless hash lookup (only the scanner can remove entry from hash - * list), and once found, it needn't to call "atomic_inc()" for the - * entry reference. So the performance is improved. After using the - * statahead entry, the scanner will call "atomic_dec()" to drop the - * reference held when initialization. If it is the last reference, - * the statahead entry will be freed. - * - * 2) All other threads, including statahead thread and ptlrpcd thread, - * when they process the statahead entry, the reference for target - * should be held to guarantee the entry will not be released by the - * directory scanner. After processing the entry, these threads will - * drop the entry reference. If it is the last reference, the entry - * will be freed. - * - * The second reference when initializes the statahead entry is used - * by the statahead thread, following the rule 2). - */ - cfs_atomic_set(&entry->se_refcount, 2); - entry->se_stat = SA_ENTRY_INIT; - entry->se_size = entry_size; - dname = (char *)entry + sizeof(struct ll_sa_entry); - memcpy(dname, name, len); - dname[len] = 0; - entry->se_qstr.hash = full_name_hash(name, len); - entry->se_qstr.len = len; - entry->se_qstr.name = dname; - - lli = ll_i2info(sai->sai_inode); + /* + * Statahead entry reference rules: + * + * 1) When statahead entry is initialized, its reference is set as 2. + * One reference is used by the directory scanner. When the scanner + * searches the statahead cache for the given name, it can perform + * lockless hash lookup (only the scanner can remove entry from hash + * list), and once found, it needn't to call "atomic_inc()" for the + * entry reference. So the performance is improved. After using the + * statahead entry, the scanner will call "atomic_dec()" to drop the + * reference held when initialization. If it is the last reference, + * the statahead entry will be freed. + * + * 2) All other threads, including statahead thread and ptlrpcd thread, + * when they process the statahead entry, the reference for target + * should be held to guarantee the entry will not be released by the + * directory scanner. After processing the entry, these threads will + * drop the entry reference. If it is the last reference, the entry + * will be freed. + * + * The second reference when initializes the statahead entry is used + * by the statahead thread, following the rule 2). + */ + atomic_set(&entry->se_refcount, 2); + entry->se_stat = SA_ENTRY_INIT; + entry->se_size = entry_size; + dname = (char *)entry + sizeof(struct ll_sa_entry); + memcpy(dname, name, len); + dname[len] = 0; + entry->se_qstr.hash = full_name_hash(name, len); + entry->se_qstr.len = len; + entry->se_qstr.name = dname; + + lli = ll_i2info(sai->sai_inode); spin_lock(&lli->lli_sa_lock); - cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent); + list_add_tail(&entry->se_link, &sai->sai_entries); + INIT_LIST_HEAD(&entry->se_list); + ll_sa_entry_enhash(sai, entry); spin_unlock(&lli->lli_sa_lock); - cfs_atomic_inc(&sai->sai_cache_count); - ll_sa_entry_enhash(sai, entry); + atomic_inc(&sai->sai_cache_count); RETURN(entry); } @@ -272,7 +270,7 @@ ll_sa_entry_get_byname(struct ll_statahead_info *sai, const struct qstr *qstr) struct ll_sa_entry *entry; int i = ll_sa_entry_hash(qstr->hash); - cfs_list_for_each_entry(entry, &sai->sai_cache[i], se_hash) { + list_for_each_entry(entry, &sai->sai_cache[i], se_hash) { if (entry->se_qstr.hash == qstr->hash && entry->se_qstr.len == qstr->len && memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) @@ -291,17 +289,18 @@ ll_sa_entry_get_byname(struct ll_statahead_info *sai, const struct qstr *qstr) static struct ll_sa_entry * ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index) { - struct ll_sa_entry *entry; + struct ll_sa_entry *entry; - cfs_list_for_each_entry(entry, &sai->sai_entries_sent, se_list) { - if (entry->se_index == index) { - cfs_atomic_inc(&entry->se_refcount); - return entry; + list_for_each_entry(entry, &sai->sai_entries, se_link) { + if (entry->se_index == index) { + LASSERT(atomic_read(&entry->se_refcount) > 0); + atomic_inc(&entry->se_refcount); + return entry; } - if (entry->se_index > index) - break; - } - return NULL; + if (entry->se_index > index) + break; + } + return NULL; } static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, @@ -326,34 +325,39 @@ static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, static void ll_sa_entry_put(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - if (cfs_atomic_dec_and_test(&entry->se_refcount)) { - CDEBUG(D_READA, "free sai entry %.*s(%p) index "LPU64"\n", - entry->se_qstr.len, entry->se_qstr.name, entry, - entry->se_index); + if (atomic_dec_and_test(&entry->se_refcount)) { + CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n", + entry->se_qstr.len, entry->se_qstr.name, entry, + entry->se_index); - LASSERT(ll_sa_entry_unhashed(entry)); - LASSERT(ll_sa_entry_unlinked(entry)); + LASSERT(list_empty(&entry->se_link)); + LASSERT(list_empty(&entry->se_list)); + LASSERT(ll_sa_entry_unhashed(entry)); - ll_sa_entry_cleanup(sai, entry); - if (entry->se_inode) - iput(entry->se_inode); + ll_sa_entry_cleanup(sai, entry); + if (entry->se_inode) + iput(entry->se_inode); - OBD_FREE(entry, entry->se_size); - cfs_atomic_dec(&sai->sai_cache_count); - } + OBD_FREE(entry, entry->se_size); + atomic_dec(&sai->sai_cache_count); + } } static inline void -do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +do_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + LASSERT(!ll_sa_entry_unhashed(entry)); + LASSERT(!list_empty(&entry->se_link)); + ll_sa_entry_unhash(sai, entry); spin_lock(&lli->lli_sa_lock); entry->se_stat = SA_ENTRY_DEST; - if (likely(!ll_sa_entry_unlinked(entry))) - cfs_list_del_init(&entry->se_list); + list_del_init(&entry->se_link); + if (likely(!list_empty(&entry->se_list))) + list_del_init(&entry->se_list); spin_unlock(&lli->lli_sa_lock); ll_sa_entry_put(sai, entry); @@ -365,52 +369,41 @@ do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) static void ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - struct ll_sa_entry *pos, *next; + struct ll_sa_entry *pos, *next; - if (entry) - do_sai_entry_fini(sai, entry); + if (entry) + do_sa_entry_fini(sai, entry); - /* drop old entry from sent list */ - cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_sent, - se_list) { - if (is_omitted_entry(sai, pos->se_index)) - do_sai_entry_fini(sai, pos); - else - break; - } - - /* drop old entry from stated list */ - cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_stated, - se_list) { - if (is_omitted_entry(sai, pos->se_index)) - do_sai_entry_fini(sai, pos); - else - break; - } + /* drop old entry, only 'scanner' process does this, no need to lock */ + list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) { + if (!is_omitted_entry(sai, pos->se_index)) + break; + do_sa_entry_fini(sai, pos); + } } /* * Inside lli_sa_lock. */ static void -do_sai_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, int rc) +do_sa_entry_to_stated(struct ll_statahead_info *sai, + struct ll_sa_entry *entry, se_stat_t stat) { - struct ll_sa_entry *se; - cfs_list_t *pos = &sai->sai_entries_stated; + struct ll_sa_entry *se; + struct list_head *pos = &sai->sai_entries_stated; - if (!ll_sa_entry_unlinked(entry)) - cfs_list_del_init(&entry->se_list); + if (!list_empty(&entry->se_list)) + list_del_init(&entry->se_list); - cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { - if (se->se_index < entry->se_index) { - pos = &se->se_list; - break; - } - } + list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { + if (se->se_index < entry->se_index) { + pos = &se->se_list; + break; + } + } - cfs_list_add(&entry->se_list, pos); - entry->se_stat = rc; + list_add(&entry->se_list, pos); + entry->se_stat = stat; } /* @@ -420,7 +413,7 @@ do_sai_entry_to_stated(struct ll_statahead_info *sai, */ static int ll_sa_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, int rc) + struct ll_sa_entry *entry, se_stat_t stat) { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); int ret = 1; @@ -429,7 +422,7 @@ ll_sa_entry_to_stated(struct ll_statahead_info *sai, spin_lock(&lli->lli_sa_lock); if (likely(entry->se_stat != SA_ENTRY_DEST)) { - do_sai_entry_to_stated(sai, entry, rc); + do_sa_entry_to_stated(sai, entry, stat); ret = 0; } spin_unlock(&lli->lli_sa_lock); @@ -452,33 +445,33 @@ static void ll_agl_add(struct ll_statahead_info *sai, child->lli_agl_index = index; spin_unlock(&child->lli_agl_lock); - LASSERT(cfs_list_empty(&child->lli_agl_list)); + LASSERT(list_empty(&child->lli_agl_list)); igrab(inode); spin_lock(&parent->lli_agl_lock); if (agl_list_empty(sai)) added = 1; - cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl); + list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl); spin_unlock(&parent->lli_agl_lock); } else { spin_unlock(&child->lli_agl_lock); } if (added > 0) - cfs_waitq_signal(&sai->sai_agl_thread.t_ctl_waitq); + wake_up(&sai->sai_agl_thread.t_ctl_waitq); } static struct ll_statahead_info *ll_sai_alloc(void) { - struct ll_statahead_info *sai; - int i; - ENTRY; + struct ll_statahead_info *sai; + int i; + ENTRY; - OBD_ALLOC_PTR(sai); - if (!sai) - RETURN(NULL); + OBD_ALLOC_PTR(sai); + if (!sai) + RETURN(NULL); - cfs_atomic_set(&sai->sai_refcount, 1); + atomic_set(&sai->sai_refcount, 1); spin_lock(&sai_generation_lock); sai->sai_generation = ++sai_generation; @@ -486,45 +479,45 @@ static struct ll_statahead_info *ll_sai_alloc(void) sai->sai_generation = ++sai_generation; spin_unlock(&sai_generation_lock); - sai->sai_max = LL_SA_RPC_MIN; - sai->sai_index = 1; - cfs_waitq_init(&sai->sai_waitq); - cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); - cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq); + sai->sai_max = LL_SA_RPC_MIN; + sai->sai_index = 1; + init_waitqueue_head(&sai->sai_waitq); + init_waitqueue_head(&sai->sai_thread.t_ctl_waitq); + init_waitqueue_head(&sai->sai_agl_thread.t_ctl_waitq); - CFS_INIT_LIST_HEAD(&sai->sai_entries_sent); - CFS_INIT_LIST_HEAD(&sai->sai_entries_received); - CFS_INIT_LIST_HEAD(&sai->sai_entries_stated); - CFS_INIT_LIST_HEAD(&sai->sai_entries_agl); + INIT_LIST_HEAD(&sai->sai_entries); + INIT_LIST_HEAD(&sai->sai_entries_received); + INIT_LIST_HEAD(&sai->sai_entries_stated); + INIT_LIST_HEAD(&sai->sai_entries_agl); - for (i = 0; i < LL_SA_CACHE_SIZE; i++) { - CFS_INIT_LIST_HEAD(&sai->sai_cache[i]); + for (i = 0; i < LL_SA_CACHE_SIZE; i++) { + INIT_LIST_HEAD(&sai->sai_cache[i]); spin_lock_init(&sai->sai_cache_lock[i]); - } - cfs_atomic_set(&sai->sai_cache_count, 0); + } + atomic_set(&sai->sai_cache_count, 0); - RETURN(sai); + RETURN(sai); } static inline struct ll_statahead_info * ll_sai_get(struct ll_statahead_info *sai) { - cfs_atomic_inc(&sai->sai_refcount); - return sai; + atomic_inc(&sai->sai_refcount); + return sai; } static void ll_sai_put(struct ll_statahead_info *sai) { - struct inode *inode = sai->sai_inode; - struct ll_inode_info *lli = ll_i2info(inode); - ENTRY; + struct inode *inode = sai->sai_inode; + struct ll_inode_info *lli = ll_i2info(inode); + ENTRY; - if (cfs_atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) { - struct ll_sa_entry *entry, *next; + if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) { + struct ll_sa_entry *entry, *next; - if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) { - /* It is race case, the interpret callback just hold - * a reference count */ + if (unlikely(atomic_read(&sai->sai_refcount) > 0)) { + /* It is race case, the interpret callback just hold + * a reference count */ spin_unlock(&lli->lli_sa_lock); RETURN_EXIT; } @@ -543,24 +536,22 @@ static void ll_sai_put(struct ll_statahead_info *sai) PFID(&lli->lli_fid), sai->sai_sent, sai->sai_replied); - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_sent, se_list) - do_sai_entry_fini(sai, entry); - - LASSERT(sa_received_empty(sai)); + list_for_each_entry_safe(entry, next, + &sai->sai_entries, se_link) + do_sa_entry_fini(sai, entry); - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_stated, se_list) - do_sai_entry_fini(sai, entry); + LASSERT(list_empty(&sai->sai_entries)); + LASSERT(sa_received_empty(sai)); + LASSERT(list_empty(&sai->sai_entries_stated)); - LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0); - LASSERT(agl_list_empty(sai)); + LASSERT(atomic_read(&sai->sai_cache_count) == 0); + LASSERT(agl_list_empty(sai)); - iput(inode); - OBD_FREE_PTR(sai); - } + iput(inode); + OBD_FREE_PTR(sai); + } - EXIT; + EXIT; } /* Do NOT forget to drop inode refcount when into sai_entries_agl. */ @@ -571,7 +562,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) int rc; ENTRY; - LASSERT(cfs_list_empty(&lli->lli_agl_list)); + LASSERT(list_empty(&lli->lli_agl_list)); /* AGL maybe fall behind statahead with one entry */ if (is_omitted_entry(sai, index + 1)) { @@ -626,8 +617,7 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) EXIT; } -static void do_statahead_interpret(struct ll_statahead_info *sai, - struct ll_sa_entry *target) +static void ll_post_statahead(struct ll_statahead_info *sai) { struct inode *dir = sai->sai_inode; struct inode *child; @@ -641,18 +631,13 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, ENTRY; spin_lock(&lli->lli_sa_lock); - if (target != NULL && target->se_req != NULL && - !cfs_list_empty(&target->se_list)) { - entry = target; - } else if (unlikely(sa_received_empty(sai))) { + if (unlikely(sa_received_empty(sai))) { spin_unlock(&lli->lli_sa_lock); RETURN_EXIT; - } else { - entry = sa_first_received_entry(sai); } - - cfs_atomic_inc(&entry->se_refcount); - cfs_list_del_init(&entry->se_list); + entry = sa_first_received_entry(sai); + atomic_inc(&entry->se_refcount); + list_del_init(&entry->se_list); spin_unlock(&lli->lli_sa_lock); LASSERT(entry->se_handle != 0); @@ -673,14 +658,15 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, /* XXX: No fid in reply, this is probaly cross-ref case. * SA can't handle it yet. */ - if (body->valid & OBD_MD_MDS) - GOTO(out, rc = -EAGAIN); - } else { - /* - * revalidate. - */ - /* unlinked and re-created with the same name */ - if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1))){ + if (body->mbo_valid & OBD_MD_MDS) + GOTO(out, rc = -EAGAIN); + } else { + /* + * revalidate. + */ + /* unlinked and re-created with the same name */ + if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, + &body->mbo_fid1))) { entry->se_inode = NULL; iput(child); child = NULL; @@ -688,16 +674,17 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, } it->d.lustre.it_lock_handle = entry->se_handle; - rc = md_revalidate_lock(ll_i2mdexp(dir), it, NULL, NULL); + rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL); if (rc != 1) GOTO(out, rc = -EAGAIN); - rc = ll_prep_inode(&child, req, dir->i_sb); + rc = ll_prep_inode(&child, req, dir->i_sb, it); if (rc) GOTO(out, rc); - CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n", - child, child->i_ino, child->i_generation); + CDEBUG(D_DLMTRACE, "%s: setting l_data to inode "DFID"(%p)\n", + ll_get_fsname(child->i_sb, NULL, 0), + PFID(ll_inode2fid(child)), child); ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL); entry->se_inode = child; @@ -708,14 +695,15 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, EXIT; out: - /* The "ll_sa_entry_to_stated()" will drop related ldlm ibits lock - * reference count by calling "ll_intent_drop_lock()" in spite of the - * above operations failed or not. Do not worry about calling - * "ll_intent_drop_lock()" more than once. */ - rc = ll_sa_entry_to_stated(sai, entry, rc < 0 ? rc : SA_ENTRY_SUCC); - if (rc == 0 && entry->se_index == sai->sai_index_wait && target == NULL) - cfs_waitq_signal(&sai->sai_waitq); - ll_sa_entry_put(sai, entry); + /* The "ll_sa_entry_to_stated()" will drop related ldlm ibits lock + * reference count by calling "ll_intent_drop_lock()" in spite of the + * above operations failed or not. Do not worry about calling + * "ll_intent_drop_lock()" more than once. */ + rc = ll_sa_entry_to_stated(sai, entry, + rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (rc == 0 && entry->se_index == sai->sai_index_wait) + wake_up(&sai->sai_waitq); + ll_sa_entry_put(sai, entry); } static int ll_statahead_interpret(struct ptlrpc_request *req, @@ -753,31 +741,28 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, GOTO(out, rc = -EIDRM); } - cfs_list_del_init(&entry->se_list); if (rc != 0) { - sai->sai_replied++; - do_sai_entry_to_stated(sai, entry, rc); - spin_unlock(&lli->lli_sa_lock); - if (entry->se_index == sai->sai_index_wait) - cfs_waitq_signal(&sai->sai_waitq); + do_sa_entry_to_stated(sai, entry, SA_ENTRY_INVA); + wakeup = (entry->se_index == sai->sai_index_wait); } else { - entry->se_minfo = minfo; - entry->se_req = ptlrpc_request_addref(req); - /* Release the async ibits lock ASAP to avoid deadlock - * when statahead thread tries to enqueue lock on parent - * for readpage and other tries to enqueue lock on child - * with parent's lock held, for example: unlink. */ - entry->se_handle = it->d.lustre.it_lock_handle; - ll_intent_drop_lock(it); - wakeup = sa_received_empty(sai); - cfs_list_add_tail(&entry->se_list, - &sai->sai_entries_received); - sai->sai_replied++; - spin_unlock(&lli->lli_sa_lock); - if (wakeup) - cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); + entry->se_minfo = minfo; + entry->se_req = ptlrpc_request_addref(req); + /* Release the async ibits lock ASAP to avoid deadlock + * when statahead thread tries to enqueue lock on parent + * for readpage and other tries to enqueue lock on child + * with parent's lock held, for example: unlink. */ + entry->se_handle = it->d.lustre.it_lock_handle; + ll_intent_drop_lock(it); + wakeup = sa_received_empty(sai); + list_add_tail(&entry->se_list, + &sai->sai_entries_received); } - ll_sa_entry_put(sai, entry); + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + + ll_sa_entry_put(sai, entry); + if (wakeup) + wake_up(&sai->sai_thread.t_ctl_waitq); } EXIT; @@ -910,9 +895,6 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, if (d_mountpoint(dentry)) RETURN(1); - if (unlikely(dentry == dentry->d_sb->s_root)) - RETURN(1); - entry->se_inode = igrab(inode); rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),NULL); if (rc == 1) { @@ -970,14 +952,14 @@ static void ll_statahead_one(struct dentry *parent, const char* entry_name, if (dentry != NULL) dput(dentry); - if (rc) { - rc1 = ll_sa_entry_to_stated(sai, entry, - rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); - if (rc1 == 0 && entry->se_index == sai->sai_index_wait) - cfs_waitq_signal(&sai->sai_waitq); - } else { - sai->sai_sent++; - } + if (rc) { + rc1 = ll_sa_entry_to_stated(sai, entry, + rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (rc1 == 0 && entry->se_index == sai->sai_index_wait) + wake_up(&sai->sai_waitq); + } else { + sai->sai_sent++; + } sai->sai_index++; /* drop one refcount on entry by ll_sa_entry_alloc */ @@ -998,21 +980,20 @@ static int ll_agl_thread(void *arg) struct l_wait_info lwi = { 0 }; ENTRY; - { - char pname[16]; - snprintf(pname, 15, "ll_agl_%u", plli->lli_opendir_pid); - cfs_daemonize(pname); - } - - CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + thread->t_pid = current_pid(); + CDEBUG(D_READA, "agl thread started: sai %p, parent %.*s\n", + sai, parent->d_name.len, parent->d_name.name); - atomic_inc(&sbi->ll_agl_total); + atomic_inc(&sbi->ll_agl_total); spin_lock(&plli->lli_agl_lock); sai->sai_agl_valid = 1; - thread_set_flags(thread, SVC_RUNNING); + if (thread_is_init(thread)) + /* If someone else has changed the thread state + * (e.g. already changed to SVC_STOPPING), we can't just + * blindly overwrite that setting. */ + thread_set_flags(thread, SVC_RUNNING); spin_unlock(&plli->lli_agl_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&thread->t_ctl_waitq); while (1) { l_wait_event(thread->t_ctl_waitq, @@ -1028,7 +1009,7 @@ static int ll_agl_thread(void *arg) * so check whether list empty again. */ if (!agl_list_empty(sai)) { clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); + list_del_init(&clli->lli_agl_list); spin_unlock(&plli->lli_agl_lock); ll_agl_trigger(&clli->lli_vfs_inode, sai); } else { @@ -1040,7 +1021,7 @@ static int ll_agl_thread(void *arg) sai->sai_agl_valid = 0; while (!agl_list_empty(sai)) { clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); + list_del_init(&clli->lli_agl_list); spin_unlock(&plli->lli_agl_lock); clli->lli_agl_index = 0; iput(&clli->lli_vfs_inode); @@ -1048,260 +1029,265 @@ static int ll_agl_thread(void *arg) } thread_set_flags(thread, SVC_STOPPED); spin_unlock(&plli->lli_agl_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&thread->t_ctl_waitq); ll_sai_put(sai); - CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + CDEBUG(D_READA, "agl thread stopped: sai %p, parent %.*s\n", + sai, parent->d_name.len, parent->d_name.name); RETURN(0); } static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) { - struct ptlrpc_thread *thread = &sai->sai_agl_thread; - struct l_wait_info lwi = { 0 }; - int rc; - ENTRY; - - CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - - rc = cfs_create_thread(ll_agl_thread, parent, 0); - if (rc < 0) { - CERROR("can't start ll_agl thread, rc: %d\n", rc); - thread_set_flags(thread, SVC_STOPPED); - RETURN_EXIT; - } + struct ptlrpc_thread *thread = &sai->sai_agl_thread; + struct l_wait_info lwi = { 0 }; + struct ll_inode_info *plli; + struct task_struct *task; + ENTRY; + + CDEBUG(D_READA, "start agl thread: sai %p, parent %.*s\n", + sai, parent->d_name.len, parent->d_name.name); + + plli = ll_i2info(parent->d_inode); + task = kthread_run(ll_agl_thread, parent, + "ll_agl_%u", plli->lli_opendir_pid); + if (IS_ERR(task)) { + CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task)); + thread_set_flags(thread, SVC_STOPPED); + RETURN_EXIT; + } - l_wait_event(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread), - &lwi); - EXIT; + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); + EXIT; } static int ll_statahead_thread(void *arg) { - struct dentry *parent = (struct dentry *)arg; - struct inode *dir = parent->d_inode; - struct ll_inode_info *plli = ll_i2info(dir); - struct ll_inode_info *clli; - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); - struct ptlrpc_thread *thread = &sai->sai_thread; - struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; - struct page *page; - __u64 pos = 0; - int first = 0; - int rc = 0; - struct ll_dir_chain chain; - struct l_wait_info lwi = { 0 }; - ENTRY; - - { - char pname[16]; - snprintf(pname, 15, "ll_sa_%u", plli->lli_opendir_pid); - cfs_daemonize(pname); - } - - CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - - if (sbi->ll_flags & LL_SBI_AGL_ENABLED) - ll_start_agl(parent, sai); - - atomic_inc(&sbi->ll_sa_total); + struct dentry *parent = (struct dentry *)arg; + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); + struct ptlrpc_thread *thread = &sai->sai_thread; + struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; + int first = 0; + int rc = 0; + struct md_op_data *op_data; + struct ll_dir_chain chain; + struct l_wait_info lwi = { 0 }; + struct page *page = NULL; + __u64 pos = 0; + ENTRY; + + thread->t_pid = current_pid(); + CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n", + sai, parent->d_name.len, parent->d_name.name); + + op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, + LUSTRE_OPC_ANY, dir); + if (IS_ERR(op_data)) + GOTO(out_put, rc = PTR_ERR(op_data)); + + op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; + + if (sbi->ll_flags & LL_SBI_AGL_ENABLED) + ll_start_agl(parent, sai); + + atomic_inc(&sbi->ll_sa_total); spin_lock(&plli->lli_sa_lock); - thread_set_flags(thread, SVC_RUNNING); + if (thread_is_init(thread)) + /* If someone else has changed the thread state + * (e.g. already changed to SVC_STOPPING), we can't just + * blindly overwrite that setting. */ + thread_set_flags(thread, SVC_RUNNING); spin_unlock(&plli->lli_sa_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&thread->t_ctl_waitq); ll_dir_chain_init(&chain); - page = ll_get_dir_page(dir, pos, &chain); - - while (1) { - struct lu_dirpage *dp; - struct lu_dirent *ent; - - if (IS_ERR(page)) { - rc = PTR_ERR(page); - CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 - "/"LPU64": [rc %d] [parent %u]\n", - PFID(ll_inode2fid(dir)), pos, sai->sai_index, - rc, plli->lli_opendir_pid); - GOTO(out, rc); - } - - dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - __u64 hash; - int namelen; - char *name; - - hash = le64_to_cpu(ent->lde_hash); - if (unlikely(hash < pos)) - /* - * Skip until we find target hash value. - */ - continue; - - namelen = le16_to_cpu(ent->lde_namelen); - if (unlikely(namelen == 0)) - /* - * Skip dummy record. - */ - continue; + page = ll_get_dir_page(dir, op_data, pos, &chain); + while (1) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + + if (IS_ERR(page)) { + rc = PTR_ERR(page); + CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 + "/"LPU64" opendir_pid = %u: rc = %d\n", + PFID(ll_inode2fid(dir)), pos, sai->sai_index, + plli->lli_opendir_pid, rc); + GOTO(out, rc); + } - name = ent->lde_name; - if (name[0] == '.') { - if (namelen == 1) { - /* - * skip "." - */ - continue; - } else if (name[1] == '.' && namelen == 2) { - /* - * skip ".." - */ - continue; - } else if (!sai->sai_ls_all) { - /* - * skip hidden files. - */ - sai->sai_skip_hidden++; - continue; - } - } + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + __u64 hash; + int namelen; + char *name; + + hash = le64_to_cpu(ent->lde_hash); + if (unlikely(hash < pos)) + /* + * Skip until we find target hash value. + */ + continue; + + namelen = le16_to_cpu(ent->lde_namelen); + if (unlikely(namelen == 0)) + /* + * Skip dummy record. + */ + continue; + + name = ent->lde_name; + if (name[0] == '.') { + if (namelen == 1) { + /* + * skip "." + */ + continue; + } else if (name[1] == '.' && namelen == 2) { + /* + * skip ".." + */ + continue; + } else if (!sai->sai_ls_all) { + /* + * skip hidden files. + */ + sai->sai_skip_hidden++; + continue; + } + } - /* - * don't stat-ahead first entry. - */ - if (unlikely(++first == 1)) - continue; + /* + * don't stat-ahead first entry. + */ + if (unlikely(++first == 1)) + continue; keep_it: - l_wait_event(thread->t_ctl_waitq, - !sa_sent_full(sai) || - !sa_received_empty(sai) || - !agl_list_empty(sai) || - !thread_is_running(thread), - &lwi); + l_wait_event(thread->t_ctl_waitq, + !sa_sent_full(sai) || + !sa_received_empty(sai) || + !agl_list_empty(sai) || + !thread_is_running(thread), + &lwi); interpret_it: - while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); + while (!sa_received_empty(sai)) + ll_post_statahead(sai); - if (unlikely(!thread_is_running(thread))) { - ll_release_page(page, 0); - GOTO(out, rc = 0); - } + if (unlikely(!thread_is_running(thread))) { + ll_release_page(dir, page, false); + GOTO(out, rc = 0); + } - /* If no window for metadata statahead, but there are - * some AGL entries to be triggered, then try to help - * to process the AGL entries. */ - if (sa_sent_full(sai)) { + /* If no window for metadata statahead, but there are + * some AGL entries to be triggered, then try to help + * to process the AGL entries. */ + if (sa_sent_full(sai)) { spin_lock(&plli->lli_agl_lock); while (!agl_list_empty(sai)) { clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); + list_del_init(&clli->lli_agl_list); spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, - sai); + ll_agl_trigger(&clli->lli_vfs_inode, + sai); - if (!sa_received_empty(sai)) - goto interpret_it; + if (!sa_received_empty(sai)) + goto interpret_it; - if (unlikely( - !thread_is_running(thread))) { - ll_release_page(page, 0); - GOTO(out, rc = 0); - } + if (unlikely( + !thread_is_running(thread))) { + ll_release_page(dir, page, + false); + GOTO(out, rc = 0); + } - if (!sa_sent_full(sai)) - goto do_it; + if (!sa_sent_full(sai)) + goto do_it; spin_lock(&plli->lli_agl_lock); } spin_unlock(&plli->lli_agl_lock); - goto keep_it; - } - + goto keep_it; + } do_it: - ll_statahead_one(parent, name, namelen); - } - pos = le64_to_cpu(dp->ldp_hash_end); - if (pos == MDS_DIR_END_OFF) { - /* - * End of directory reached. - */ - ll_release_page(page, 0); - while (1) { - l_wait_event(thread->t_ctl_waitq, - !sa_received_empty(sai) || - sai->sai_sent == sai->sai_replied|| - !thread_is_running(thread), - &lwi); - - while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); - - if (unlikely(!thread_is_running(thread))) - GOTO(out, rc = 0); - - if (sai->sai_sent == sai->sai_replied && - sa_received_empty(sai)) - break; - } + ll_statahead_one(parent, name, namelen); + } + + pos = le64_to_cpu(dp->ldp_hash_end); + if (pos == MDS_DIR_END_OFF) { + /* + * End of directory reached. + */ + ll_release_page(dir, page, false); + while (1) { + l_wait_event(thread->t_ctl_waitq, + !sa_received_empty(sai) || + sai->sai_sent == sai->sai_replied || + !thread_is_running(thread), + &lwi); + + while (!sa_received_empty(sai)) + ll_post_statahead(sai); + + if (unlikely(!thread_is_running(thread))) + GOTO(out, rc = 0); + + if (sai->sai_sent == sai->sai_replied && + sa_received_empty(sai)) + break; + } spin_lock(&plli->lli_agl_lock); while (!agl_list_empty(sai) && thread_is_running(thread)) { clli = agl_first_entry(sai); - cfs_list_del_init(&clli->lli_agl_list); + list_del_init(&clli->lli_agl_list); spin_unlock(&plli->lli_agl_lock); ll_agl_trigger(&clli->lli_vfs_inode, sai); spin_lock(&plli->lli_agl_lock); } spin_unlock(&plli->lli_agl_lock); - GOTO(out, rc = 0); - } else if (1) { - /* - * chain is exhausted. - * Normal case: continue to the next page. - */ - ll_release_page(page, le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - sai->sai_in_readpage = 1; - page = ll_get_dir_page(dir, pos, &chain); - sai->sai_in_readpage = 0; - } else { - LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - ll_release_page(page, 1); - /* - * go into overflow page. - */ - } - } - EXIT; - + GOTO(out, rc = 0); + } else { + /* + * chain is exhausted. + * Normal case: continue to the next page. + */ + ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) & + LDF_COLLIDE); + sai->sai_in_readpage = 1; + page = ll_get_dir_page(dir, op_data, pos, &chain); + sai->sai_in_readpage = 0; + } + } out: + EXIT; + ll_dir_chain_fini(&chain); + ll_finish_md_op_data(op_data); +out_put: if (sai->sai_agl_valid) { spin_lock(&plli->lli_agl_lock); thread_set_flags(agl_thread, SVC_STOPPING); spin_unlock(&plli->lli_agl_lock); - cfs_waitq_signal(&agl_thread->t_ctl_waitq); + wake_up(&agl_thread->t_ctl_waitq); - CDEBUG(D_READA, "stop agl thread: [pid %d]\n", - cfs_curproc_pid()); - l_wait_event(agl_thread->t_ctl_waitq, - thread_is_stopped(agl_thread), - &lwi); + CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n", + sai, (unsigned int)agl_thread->t_pid); + l_wait_event(agl_thread->t_ctl_waitq, + thread_is_stopped(agl_thread), + &lwi); } else { /* Set agl_thread flags anyway. */ thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); } - ll_dir_chain_fini(&chain); spin_lock(&plli->lli_sa_lock); if (!sa_received_empty(sai)) { thread_set_flags(thread, SVC_STOPPING); @@ -1309,19 +1295,19 @@ out: /* To release the resources held by received entries. */ while (!sa_received_empty(sai)) - do_statahead_interpret(sai, NULL); + ll_post_statahead(sai); spin_lock(&plli->lli_sa_lock); } thread_set_flags(thread, SVC_STOPPED); spin_unlock(&plli->lli_sa_lock); - cfs_waitq_signal(&sai->sai_waitq); - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&sai->sai_waitq); + wake_up(&thread->t_ctl_waitq); ll_sai_put(sai); - dput(parent); - CDEBUG(D_READA, "statahead thread stopped: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); - return rc; + CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n", + sai, parent->d_name.len, parent->d_name.name); + dput(parent); + return rc; } /** @@ -1349,10 +1335,10 @@ void ll_stop_statahead(struct inode *dir, void *key) if (!thread_is_stopped(thread)) { thread_set_flags(thread, SVC_STOPPING); spin_unlock(&lli->lli_sa_lock); - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&thread->t_ctl_waitq); - CDEBUG(D_READA, "stop statahead thread: [pid %d]\n", - cfs_curproc_pid()); + CDEBUG(D_READA, "stop statahead thread: sai %p pid %u\n", + lli->lli_sai, (unsigned int)thread->t_pid); l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread), &lwi); @@ -1389,115 +1375,121 @@ enum { static int is_first_dirent(struct inode *dir, struct dentry *dentry) { - struct ll_dir_chain chain; - struct qstr *target = &dentry->d_name; - struct page *page; - __u64 pos = 0; - int dot_de; - int rc = LS_NONE_FIRST_DE; - ENTRY; - - ll_dir_chain_init(&chain); - page = ll_get_dir_page(dir, pos, &chain); - - while (1) { - struct lu_dirpage *dp; - struct lu_dirent *ent; - - if (IS_ERR(page)) { - struct ll_inode_info *lli = ll_i2info(dir); - - rc = PTR_ERR(page); - CERROR("error reading dir "DFID" at "LPU64": " - "[rc %d] [parent %u]\n", - PFID(ll_inode2fid(dir)), pos, - rc, lli->lli_opendir_pid); - break; - } - - dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL; - ent = lu_dirent_next(ent)) { - __u64 hash; - int namelen; - char *name; - - hash = le64_to_cpu(ent->lde_hash); - /* The ll_get_dir_page() can return any page containing - * the given hash which may be not the start hash. */ - if (unlikely(hash < pos)) - continue; - - namelen = le16_to_cpu(ent->lde_namelen); - if (unlikely(namelen == 0)) - /* - * skip dummy record. - */ - continue; + struct ll_dir_chain chain; + struct qstr *target = &dentry->d_name; + struct md_op_data *op_data; + int dot_de; + struct page *page = NULL; + int rc = LS_NONE_FIRST_DE; + __u64 pos = 0; + ENTRY; + + op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, + LUSTRE_OPC_ANY, dir); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + /** + *FIXME choose the start offset of the readdir + */ + op_data->op_stripe_offset = 0; + op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; - name = ent->lde_name; - if (name[0] == '.') { - if (namelen == 1) - /* - * skip "." - */ - continue; - else if (name[1] == '.' && namelen == 2) - /* - * skip ".." - */ - continue; - else - dot_de = 1; - } else { - dot_de = 0; - } + ll_dir_chain_init(&chain); + page = ll_get_dir_page(dir, op_data, 0, &chain); + + while (1) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + + if (IS_ERR(page)) { + struct ll_inode_info *lli = ll_i2info(dir); + + rc = PTR_ERR(page); + CERROR("%s: reading dir "DFID" at "LPU64 + "opendir_pid = %u : rc = %d\n", + ll_get_fsname(dir->i_sb, NULL, 0), + PFID(ll_inode2fid(dir)), pos, + lli->lli_opendir_pid, rc); + break; + } - if (dot_de && target->name[0] != '.') { - CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", - target->len, target->name, - namelen, name); - continue; - } + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + __u64 hash; + int namelen; + char *name; + + hash = le64_to_cpu(ent->lde_hash); + /* The ll_get_dir_page() can return any page containing + * the given hash which may be not the start hash. */ + if (unlikely(hash < pos)) + continue; + + namelen = le16_to_cpu(ent->lde_namelen); + if (unlikely(namelen == 0)) + /* + * skip dummy record. + */ + continue; + + name = ent->lde_name; + if (name[0] == '.') { + if (namelen == 1) + /* + * skip "." + */ + continue; + else if (name[1] == '.' && namelen == 2) + /* + * skip ".." + */ + continue; + else + dot_de = 1; + } else { + dot_de = 0; + } - if (target->len != namelen || - memcmp(target->name, name, namelen) != 0) - rc = LS_NONE_FIRST_DE; - else if (!dot_de) - rc = LS_FIRST_DE; - else - rc = LS_FIRST_DOT_DE; + if (dot_de && target->name[0] != '.') { + CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", + target->len, target->name, + namelen, name); + continue; + } - ll_release_page(page, 0); - GOTO(out, rc); - } - pos = le64_to_cpu(dp->ldp_hash_end); - if (pos == MDS_DIR_END_OFF) { - /* - * End of directory reached. - */ - ll_release_page(page, 0); - break; - } else if (1) { - /* - * chain is exhausted - * Normal case: continue to the next page. - */ - ll_release_page(page, le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - page = ll_get_dir_page(dir, pos, &chain); - } else { - /* - * go into overflow page. - */ - LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - ll_release_page(page, 1); - } - } - EXIT; + if (target->len != namelen || + memcmp(target->name, name, namelen) != 0) + rc = LS_NONE_FIRST_DE; + else if (!dot_de) + rc = LS_FIRST_DE; + else + rc = LS_FIRST_DOT_DE; + ll_release_page(dir, page, false); + GOTO(out, rc); + } + pos = le64_to_cpu(dp->ldp_hash_end); + if (pos == MDS_DIR_END_OFF) { + /* + * End of directory reached. + */ + ll_release_page(dir, page, false); + GOTO(out, rc); + } else { + /* + * chain is exhausted + * Normal case: continue to the next page. + */ + ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) & + LDF_COLLIDE); + page = ll_get_dir_page(dir, op_data, pos, &chain); + } + } + EXIT; out: - ll_dir_chain_fini(&chain); + ll_dir_chain_fini(&chain); + ll_finish_md_op_data(op_data); return rc; } @@ -1522,17 +1514,17 @@ ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) } else { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - sai->sai_miss++; - sai->sai_consecutive_miss++; - if (sa_low_hit(sai) && thread_is_running(thread)) { - atomic_inc(&sbi->ll_sa_wrong); - CDEBUG(D_READA, "Statahead for dir "DFID" hit " - "ratio too low: hit/miss "LPU64"/"LPU64 - ", sent/replied "LPU64"/"LPU64", stopping " - "statahead thread: pid %d\n", - PFID(&lli->lli_fid), sai->sai_hit, - sai->sai_miss, sai->sai_sent, - sai->sai_replied, cfs_curproc_pid()); + sai->sai_miss++; + sai->sai_consecutive_miss++; + if (sa_low_hit(sai) && thread_is_running(thread)) { + atomic_inc(&sbi->ll_sa_wrong); + CDEBUG(D_READA, "Statahead for dir "DFID" hit " + "ratio too low: hit/miss "LPU64"/"LPU64 + ", sent/replied "LPU64"/"LPU64", stopping " + "statahead thread\n", + PFID(&lli->lli_fid), sai->sai_hit, + sai->sai_miss, sai->sai_sent, + sai->sai_replied); spin_lock(&lli->lli_sa_lock); if (!thread_is_stopped(thread)) thread_set_flags(thread, SVC_STOPPING); @@ -1541,7 +1533,7 @@ ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) } if (!thread_is_stopped(thread)) - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&thread->t_ctl_waitq); EXIT; } @@ -1558,21 +1550,23 @@ ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int only_unplug) { - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; - struct dentry *parent; - struct ll_sa_entry *entry; - struct ptlrpc_thread *thread; - struct l_wait_info lwi = { 0 }; - int rc = 0; - ENTRY; - - LASSERT(lli->lli_opendir_pid == cfs_curproc_pid()); + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; + struct dentry *parent; + struct ll_sa_entry *entry; + struct ptlrpc_thread *thread; + struct l_wait_info lwi = { 0 }; + struct task_struct *task; + int rc = 0; + struct ll_inode_info *plli; + ENTRY; + + LASSERT(lli->lli_opendir_pid == current_pid()); if (sai) { thread = &sai->sai_thread; if (unlikely(thread_is_stopped(thread) && - cfs_list_empty(&sai->sai_entries_stated))) { + list_empty(&sai->sai_entries_stated))) { /* to release resource */ ll_stop_statahead(dir, lli->lli_opendir_key); RETURN(-EAGAIN); @@ -1611,10 +1605,10 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, RETURN(entry ? 1 : -EAGAIN); } - while (!ll_sa_entry_stated(entry) && - sai->sai_in_readpage && - !sa_received_empty(sai)) - do_statahead_interpret(sai, entry); + /* if statahead is busy in readdir, help it do post-work */ + while (!ll_sa_entry_stated(entry) && + sai->sai_in_readpage && !sa_received_empty(sai)) + ll_post_statahead(sai); if (!ll_sa_entry_stated(entry)) { sai->sai_index_wait = entry->se_index; @@ -1642,20 +1636,28 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, ll_inode2fid(inode), &bits); if (rc == 1) { if ((*dentryp)->d_inode == NULL) { - *dentryp = ll_splice_alias(inode, - *dentryp); + struct dentry *alias; + + alias = ll_splice_alias(inode, + *dentryp); + if (IS_ERR(alias)) { + ll_sai_unplug(sai, entry); + RETURN(PTR_ERR(alias)); + } + *dentryp = alias; } else if ((*dentryp)->d_inode != inode) { /* revalidate, but inode is recreated */ CDEBUG(D_READA, - "stale dentry %.*s inode %lu/%u, " - "statahead inode %lu/%u\n", - (*dentryp)->d_name.len, - (*dentryp)->d_name.name, - (*dentryp)->d_inode->i_ino, - (*dentryp)->d_inode->i_generation, - inode->i_ino, - inode->i_generation); - ll_sai_unplug(sai, entry); + "%s: stale dentry %.*s inode " + DFID", statahead inode "DFID + "\n", + ll_get_fsname((*dentryp)->d_inode->i_sb, NULL, 0), + (*dentryp)->d_name.len, + (*dentryp)->d_name.name, + PFID(ll_inode2fid((*dentryp)->d_inode)), + PFID(ll_inode2fid(inode))); + ll_intent_release(&it); + ll_sai_unplug(sai, entry); RETURN(-ESTALE); } else { iput(inode); @@ -1705,19 +1707,32 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, GOTO(out, rc = -EAGAIN); } - CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", - cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + CDEBUG(D_READA, "start statahead thread: sai %p, parent %.*s\n", + sai, parent->d_name.len, parent->d_name.name); + /* The sai buffer already has one reference taken at allocation time, + * but as soon as we expose the sai by attaching it to the lli that + * default reference can be dropped by another thread calling + * ll_stop_statahead. We need to take a local reference to protect + * the sai buffer while we intend to access it. */ + ll_sai_get(sai); lli->lli_sai = sai; - rc = cfs_create_thread(ll_statahead_thread, parent, 0); - thread = &sai->sai_thread; - if (rc < 0) { - CERROR("can't start ll_sa thread, rc: %d\n", rc); - dput(parent); + + plli = ll_i2info(parent->d_inode); + task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u", + plli->lli_opendir_pid); + thread = &sai->sai_thread; + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CERROR("cannot start ll_sa thread: rc = %d\n", rc); + dput(parent); lli->lli_opendir_key = NULL; thread_set_flags(thread, SVC_STOPPED); thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); + /* Drop both our own local reference and the default + * reference from allocation time. */ ll_sai_put(sai); + ll_sai_put(sai); LASSERT(lli->lli_sai == NULL); RETURN(-EAGAIN); } @@ -1725,6 +1740,7 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, l_wait_event(thread->t_ctl_waitq, thread_is_running(thread) || thread_is_stopped(thread), &lwi); + ll_sai_put(sai); /* * We don't stat-ahead for the first dirent since we are already in