From 9eba5aee8e06d950fc433434749ccaa6d219e68f Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Sun, 7 Sep 2014 11:09:20 -0400 Subject: [PATCH 1/1] LU-3270 statahead: statahead thread wait for RPCs to finish Statahead thread should wait for inflight stat RPCs to finish in case statahead RPC callback may access data allocated in statahead thread context. ll_sa_entry_fini() should keep old entry if stat RPC is not finished yet. Simplify sai refcounting: * newly allocated sai will hold one refcount, and it will put it after starting statahead thread. * statahead thread holds one refcount. * agl thread holds one refcount. * stat process calls do_statahead_enter() which will try to get sai, and if it's valid, it will revalidate from statahead cache, and put refcount after use. Signed-off-by: Lai Siyao Change-Id: I55a4fe66a5f6c04595d3bc84f0cd3750f20e0ee4 Reviewed-on: http://review.whamcloud.com/9663 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Fan Yong Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lustre/include/obd.h | 1 - lustre/llite/dcache.c | 2 +- lustre/llite/file.c | 50 +- lustre/llite/llite_internal.h | 79 +-- lustre/llite/llite_lib.c | 25 +- lustre/llite/statahead.c | 1077 +++++++++++++++++++---------------------- 6 files changed, 583 insertions(+), 651 deletions(-) diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 9c0c680..2a2bb37 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -867,7 +867,6 @@ struct md_enqueue_info { struct inode *mi_dir; md_enqueue_cb_t mi_cb; __u64 mi_cbdata; - unsigned int mi_generation; }; struct obd_ops { diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 6838af2..c8b9099 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -377,7 +377,7 @@ static int ll_revalidate_dentry(struct dentry *dentry, if (lookup_flags & (LOOKUP_OPEN | LOOKUP_CREATE)) return 0; - if (d_need_statahead(dir, dentry) <= 0) + if (!dentry_need_statahead(dir, dentry)) return 1; #ifndef HAVE_DCACHE_LOCK diff --git a/lustre/llite/file.c b/lustre/llite/file.c index a17ae91..8e3d07c 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -383,12 +383,10 @@ int ll_file_release(struct inode *inode, struct file *file) fd = LUSTRE_FPRIVATE(file); LASSERT(fd != NULL); - /* The last ref on @file, maybe not the the owner pid of statahead. - * Different processes can open the same dir, "ll_opendir_key" means: - * it is me that should stop the statahead thread. */ - if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd && - lli->lli_opendir_pid != 0) - ll_stop_statahead(inode, lli->lli_opendir_key); + /* The last ref on @file, maybe not the the owner pid of statahead, + * because parent and child process can share the same file handle. */ + if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd) + ll_deauthorize_statahead(inode, fd); if (inode->i_sb->s_root == file->f_dentry) { LUSTRE_FPRIVATE(file) = NULL; @@ -555,36 +553,28 @@ static int ll_local_open(struct file *file, struct lookup_intent *it, */ int ll_file_open(struct inode *inode, struct file *file) { - struct ll_inode_info *lli = ll_i2info(inode); - struct lookup_intent *it, oit = { .it_op = IT_OPEN, - .it_flags = file->f_flags }; - struct obd_client_handle **och_p = NULL; - __u64 *och_usecount = NULL; - struct ll_file_data *fd; - int rc = 0, opendir_set = 0; - ENTRY; + struct ll_inode_info *lli = ll_i2info(inode); + struct lookup_intent *it, oit = { .it_op = IT_OPEN, + .it_flags = file->f_flags }; + struct obd_client_handle **och_p = NULL; + __u64 *och_usecount = NULL; + struct ll_file_data *fd; + int rc = 0; + ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), flags %o\n", PFID(ll_inode2fid(inode)), inode, file->f_flags); - it = file->private_data; /* XXX: compat macro */ - file->private_data = NULL; /* prevent ll_local_open assertion */ + it = file->private_data; /* XXX: compat macro */ + file->private_data = NULL; /* prevent ll_local_open assertion */ fd = ll_file_data_get(); if (fd == NULL) GOTO(out_openerr, rc = -ENOMEM); fd->fd_file = file; - if (S_ISDIR(inode->i_mode)) { - spin_lock(&lli->lli_sa_lock); - if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL && - lli->lli_opendir_pid == 0) { - lli->lli_opendir_key = fd; - lli->lli_opendir_pid = current_pid(); - opendir_set = 1; - } - spin_unlock(&lli->lli_sa_lock); - } + if (S_ISDIR(inode->i_mode)) + ll_authorize_statahead(inode, fd); if (inode->i_sb->s_root == file->f_dentry) { LUSTRE_FPRIVATE(file) = fd; @@ -735,10 +725,10 @@ out_och_free: mutex_unlock(&lli->lli_och_mutex); out_openerr: - if (opendir_set != 0) - ll_stop_statahead(inode, lli->lli_opendir_key); - if (fd != NULL) - ll_file_data_put(fd); + if (lli->lli_opendir_key == fd) + ll_deauthorize_statahead(inode, fd); + if (fd != NULL) + ll_file_data_put(fd); } else { ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1); } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 16dc8ef..86c86a0 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -178,18 +178,24 @@ struct ll_inode_info { /* serialize normal readdir and statahead-readdir. */ struct mutex d_readdir_mutex; - /* metadata statahead */ - /* since parent-child threads can share the same @file - * struct, "opendir_key" is the token when dir close for - * case of parent exit before child -- it is me should - * cleanup the dir readahead. */ - void *d_opendir_key; - struct ll_statahead_info *d_sai; - /* protect statahead stuff. */ + /* metadata statahead */ + /* since parent-child threads can share the same @file + * struct, "opendir_key" is the token when dir close for + * case of parent exit before child -- it is me should + * cleanup the dir readahead. */ + void *d_opendir_key; + struct ll_statahead_info *d_sai; + /* protect statahead stuff. */ spinlock_t d_sa_lock; /* "opendir_pid" is the token when lookup/revalid * -- I am the owner of dir statahead. */ - pid_t d_opendir_pid; + pid_t d_opendir_pid; + /* stat will try to access statahead entries or start + * statahead if this flag is set, and this flag will be + * set upon dir open, and cleared when dir is closed, + * statahead hit ratio is too low, or start statahead + * thread failed. */ + unsigned int d_sa_enabled:1; /* directory stripe information */ struct lmv_stripe_md *d_lsm_md; /* striped directory size */ @@ -202,6 +208,7 @@ struct ll_inode_info { #define lli_opendir_key u.d.d_opendir_key #define lli_sai u.d.d_sai #define lli_sa_lock u.d.d_sa_lock +#define lli_sa_enabled u.d.d_sa_enabled #define lli_opendir_pid u.d.d_opendir_pid #define lli_lsm_md u.d.d_lsm_md #define lli_stripe_dir_size u.d.d_stripe_size @@ -539,20 +546,22 @@ struct ll_sb_info { int ll_rw_stats_on; /* metadata stat-ahead */ - unsigned int ll_sa_max; /* max statahead RPCs */ - atomic_t ll_sa_total; /* statahead thread started + unsigned int ll_sa_max; /* max statahead RPCs */ + atomic_t ll_sa_total; /* statahead thread started * count */ - atomic_t ll_sa_wrong; /* statahead thread stopped for + atomic_t ll_sa_wrong; /* statahead thread stopped for * low hit ratio */ - atomic_t ll_agl_total; /* AGL thread started count */ + atomic_t ll_sa_running; /* running statahead thread + * count */ + atomic_t ll_agl_total; /* AGL thread started count */ - dev_t ll_sdev_orig; /* save s_dev before assign for + dev_t ll_sdev_orig; /* save s_dev before assign for * clustred nfs */ - struct rmtacl_ctl_table ll_rct; - struct eacl_table ll_et; + struct rmtacl_ctl_table ll_rct; + struct eacl_table ll_et; /* root squash */ - struct root_squash_info ll_squash; + struct root_squash_info ll_squash; }; #define LL_DEFAULT_MAX_RW_CHUNK (32 * 1024 * 1024) @@ -1253,7 +1262,8 @@ struct ll_statahead_info { int do_statahead_enter(struct inode *dir, struct dentry **dentry, int only_unplug); -void ll_stop_statahead(struct inode *dir, void *key); +void ll_authorize_statahead(struct inode *dir, void *key); +void ll_deauthorize_statahead(struct inode *dir, void *key); static inline int ll_glimpse_size(struct inode *inode) { @@ -1283,25 +1293,29 @@ ll_statahead_mark(struct inode *dir, struct dentry *dentry) ldd->lld_sa_generation = sai->sai_generation; } -static inline int -d_need_statahead(struct inode *dir, struct dentry *dentryp) +static inline bool +dentry_need_statahead(struct inode *dir, struct dentry *dentry) { struct ll_inode_info *lli; struct ll_dentry_data *ldd; if (ll_i2sbi(dir)->ll_sa_max == 0) - return -EAGAIN; + return false; lli = ll_i2info(dir); + + /* statahead is not allowed for this dir, there may be three causes: + * 1. dir is not opened. + * 2. statahead hit ratio is too low. + * 3. previous stat started statahead thread failed. */ + if (!lli->lli_sa_enabled) + return false; + /* not the same process, don't statahead */ if (lli->lli_opendir_pid != current_pid()) - return -EAGAIN; + return false; - /* statahead has been stopped */ - if (lli->lli_opendir_key == NULL) - return -EAGAIN; - - ldd = ll_d2d(dentryp); + ldd = ll_d2d(dentry); /* * When stats a dentry, the system trigger more than once "revalidate" * or "lookup", for "getattr", for "getxattr", and maybe for others. @@ -1319,19 +1333,16 @@ d_need_statahead(struct inode *dir, struct dentry *dentryp) */ if (ldd && lli->lli_sai && ldd->lld_sa_generation == lli->lli_sai->sai_generation) - return -EAGAIN; + return false; - return 1; + return true; } static inline int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int only_unplug) { - int ret; - - ret = d_need_statahead(dir, *dentryp); - if (ret <= 0) - return ret; + if (!dentry_need_statahead(dir, *dentryp)) + return -EAGAIN; return do_statahead_enter(dir, dentryp, only_unplug); } diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 66a2d72..3f8eea2 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -139,6 +139,7 @@ static struct ll_sb_info *ll_init_sbi(void) sbi->ll_sa_max = LL_SA_RPC_DEF; atomic_set(&sbi->ll_sa_total, 0); atomic_set(&sbi->ll_sa_wrong, 0); + atomic_set(&sbi->ll_sa_running, 0); atomic_set(&sbi->ll_agl_total, 0); sbi->ll_flags |= LL_SBI_AGL_ENABLED; @@ -773,22 +774,27 @@ static void client_common_put_super(struct super_block *sb) void ll_kill_super(struct super_block *sb) { - struct ll_sb_info *sbi; - - ENTRY; + struct ll_sb_info *sbi; + ENTRY; /* not init sb ?*/ - if (!(sb->s_flags & MS_ACTIVE)) - return; + if (!(sb->s_flags & MS_ACTIVE)) + return; - sbi = ll_s2sbi(sb); - /* we need restore s_dev from changed for clustred NFS before put_super - * because new kernels have cached s_dev and change sb->s_dev in - * put_super not affected real removing devices */ + sbi = ll_s2sbi(sb); + /* we need restore s_dev from changed for clustred NFS before put_super + * because new kernels have cached s_dev and change sb->s_dev in + * put_super not affected real removing devices */ if (sbi) { sb->s_dev = sbi->ll_sdev_orig; sbi->ll_umounting = 1; + + /* wait running statahead threads to quit */ + while (atomic_read(&sbi->ll_sa_running) > 0) + schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE, + HZ >> 3); } + EXIT; } @@ -982,6 +988,7 @@ void ll_lli_init(struct ll_inode_info *lli) lli->lli_sai = NULL; spin_lock_init(&lli->lli_sa_lock); lli->lli_opendir_pid = 0; + lli->lli_sa_enabled = 0; } else { mutex_init(&lli->lli_size_mutex); lli->lli_symlink_name = NULL; diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index cc290aa..ead6f4c 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -193,8 +193,8 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) * Insert it into sai_entries tail when init. */ static struct ll_sa_entry * -ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, - const char *name, int len) +ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, + int len) { struct ll_inode_info *lli; struct ll_sa_entry *entry; @@ -303,25 +303,6 @@ ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index) return NULL; } -static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, - struct ll_sa_entry *entry) -{ - struct md_enqueue_info *minfo = entry->se_minfo; - struct ptlrpc_request *req = entry->se_req; - - if (minfo) { - entry->se_minfo = NULL; - ll_intent_release(&minfo->mi_it); - iput(minfo->mi_dir); - OBD_FREE_PTR(minfo); - } - - if (req) { - entry->se_req = NULL; - ptlrpc_req_finished(req); - } -} - static void ll_sa_entry_put(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { @@ -334,7 +315,6 @@ static void ll_sa_entry_put(struct ll_statahead_info *sai, LASSERT(list_empty(&entry->se_list)); LASSERT(ll_sa_entry_unhashed(entry)); - ll_sa_entry_cleanup(sai, entry); if (entry->se_inode) iput(entry->se_inode); @@ -378,7 +358,10 @@ ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) { if (!is_omitted_entry(sai, pos->se_index)) break; - do_sa_entry_fini(sai, pos); + /* keep those whose statahead RPC not finished */ + if (pos->se_stat == SA_ENTRY_SUCC || + pos->se_stat == SA_ENTRY_INVA) + do_sa_entry_fini(sai, pos); } } @@ -386,11 +369,13 @@ ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) * Inside lli_sa_lock. */ static void -do_sa_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, se_stat_t stat) +__sa_entry_post_stat(struct ll_statahead_info *sai, struct ll_sa_entry *entry, + se_stat_t stat) { struct ll_sa_entry *se; - struct list_head *pos = &sai->sai_entries_stated; + struct list_head *pos = &sai->sai_entries_stated; + + LASSERT(entry->se_stat == SA_ENTRY_INIT); if (!list_empty(&entry->se_list)) list_del_init(&entry->se_list); @@ -411,23 +396,30 @@ do_sa_entry_to_stated(struct ll_statahead_info *sai, * \retval 1 -- entry to be destroyed. * \retval 0 -- entry is inserted into stated list. */ -static int -ll_sa_entry_to_stated(struct ll_statahead_info *sai, - struct ll_sa_entry *entry, se_stat_t stat) +static void +sa_entry_post_stat(struct ll_statahead_info *sai, struct ll_sa_entry *entry, + se_stat_t stat) { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - int ret = 1; + struct md_enqueue_info *minfo = entry->se_minfo; + struct ptlrpc_request *req = entry->se_req; + + /* release resources used in RPC */ + if (minfo) { + entry->se_minfo = NULL; + ll_intent_release(&minfo->mi_it); + iput(minfo->mi_dir); + OBD_FREE_PTR(minfo); + } - ll_sa_entry_cleanup(sai, entry); + if (req) { + entry->se_req = NULL; + ptlrpc_req_finished(req); + } spin_lock(&lli->lli_sa_lock); - if (likely(entry->se_stat != SA_ENTRY_DEST)) { - do_sa_entry_to_stated(sai, entry, stat); - ret = 0; - } + __sa_entry_post_stat(sai, entry, stat); spin_unlock(&lli->lli_sa_lock); - - return ret; } /* @@ -499,59 +491,47 @@ static struct ll_statahead_info *ll_sai_alloc(void) RETURN(sai); } -static inline struct ll_statahead_info * -ll_sai_get(struct ll_statahead_info *sai) +static inline struct ll_statahead_info *ll_sai_get(struct inode *dir) { - atomic_inc(&sai->sai_refcount); + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = NULL; + + spin_lock(&lli->lli_sa_lock); + sai = lli->lli_sai; + if (sai != NULL) + atomic_inc(&sai->sai_refcount); + spin_unlock(&lli->lli_sa_lock); + return sai; } static void ll_sai_put(struct ll_statahead_info *sai) { - struct inode *inode = sai->sai_inode; - struct ll_inode_info *lli = ll_i2info(inode); - ENTRY; + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) { struct ll_sa_entry *entry, *next; - - if (unlikely(atomic_read(&sai->sai_refcount) > 0)) { - /* It is race case, the interpret callback just hold - * a reference count */ - spin_unlock(&lli->lli_sa_lock); - RETURN_EXIT; - } - - LASSERT(lli->lli_opendir_key == NULL); - LASSERT(thread_is_stopped(&sai->sai_thread)); - LASSERT(thread_is_stopped(&sai->sai_agl_thread)); + struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); lli->lli_sai = NULL; - lli->lli_opendir_pid = 0; spin_unlock(&lli->lli_sa_lock); - if (sai->sai_sent > sai->sai_replied) - CDEBUG(D_READA,"statahead for dir "DFID" does not " - "finish: [sent:"LPU64"] [replied:"LPU64"]\n", - PFID(&lli->lli_fid), - sai->sai_sent, sai->sai_replied); + LASSERT(thread_is_stopped(&sai->sai_thread)); + LASSERT(thread_is_stopped(&sai->sai_agl_thread)); + LASSERT(sai->sai_sent == sai->sai_replied); - list_for_each_entry_safe(entry, next, - &sai->sai_entries, se_link) + list_for_each_entry_safe(entry, next, &sai->sai_entries, + se_link) do_sa_entry_fini(sai, entry); - LASSERT(list_empty(&sai->sai_entries)); - LASSERT(sa_received_empty(sai)); - LASSERT(list_empty(&sai->sai_entries_stated)); - LASSERT(atomic_read(&sai->sai_cache_count) == 0); LASSERT(agl_list_empty(sai)); + LASSERT(atomic_read(&sai->sai_refcount) == 0); - iput(inode); + iput(sai->sai_inode); OBD_FREE_PTR(sai); + atomic_dec(&sbi->ll_sa_running); } - - EXIT; } /* Do NOT forget to drop inode refcount when into sai_entries_agl. */ @@ -617,12 +597,12 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) EXIT; } -static void ll_post_statahead(struct ll_statahead_info *sai) +/* prepare inode for received statahead entry, and add it into agl list */ +static void sa_post_one(struct ll_statahead_info *sai, + struct ll_sa_entry *entry) { struct inode *dir = sai->sai_inode; struct inode *child; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_sa_entry *entry; struct md_enqueue_info *minfo; struct lookup_intent *it; struct ptlrpc_request *req; @@ -630,16 +610,6 @@ static void ll_post_statahead(struct ll_statahead_info *sai) int rc = 0; ENTRY; - spin_lock(&lli->lli_sa_lock); - if (unlikely(sa_received_empty(sai))) { - spin_unlock(&lli->lli_sa_lock); - RETURN_EXIT; - } - entry = sa_first_received_entry(sai); - atomic_inc(&entry->se_refcount); - list_del_init(&entry->se_list); - spin_unlock(&lli->lli_sa_lock); - LASSERT(entry->se_handle != 0); minfo = entry->se_minfo; @@ -695,87 +665,103 @@ static void ll_post_statahead(struct ll_statahead_info *sai) EXIT; out: - /* The "ll_sa_entry_to_stated()" will drop related ldlm ibits lock + /* The "sa_entry_post_stat()" will drop related ldlm ibits lock * reference count by calling "ll_intent_drop_lock()" in spite of the * above operations failed or not. Do not worry about calling * "ll_intent_drop_lock()" more than once. */ - rc = ll_sa_entry_to_stated(sai, entry, - rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); - if (rc == 0 && entry->se_index == sai->sai_index_wait) + sa_entry_post_stat(sai, entry, rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (entry->se_index == sai->sai_index_wait) wake_up(&sai->sai_waitq); ll_sa_entry_put(sai, entry); } -static int ll_statahead_interpret(struct ptlrpc_request *req, - struct md_enqueue_info *minfo, int rc) +static void ll_post_statahead(struct ll_statahead_info *sai) { - struct lookup_intent *it = &minfo->mi_it; - struct inode *dir = minfo->mi_dir; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = NULL; - struct ll_sa_entry *entry; - int wakeup; - ENTRY; + struct ll_inode_info *lli; - if (it_disposition(it, DISP_LOOKUP_NEG)) - rc = -ENOENT; + lli = ll_i2info(sai->sai_inode); - spin_lock(&lli->lli_sa_lock); - /* stale entry */ - if (unlikely(lli->lli_sai == NULL || - lli->lli_sai->sai_generation != minfo->mi_generation)) { - spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -ESTALE); - } else { - sai = ll_sai_get(lli->lli_sai); - if (unlikely(!thread_is_running(&sai->sai_thread))) { - sai->sai_replied++; - spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -EBADFD); - } + while (!sa_received_empty(sai)) { + struct ll_sa_entry *entry; - entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata); - if (entry == NULL) { - sai->sai_replied++; + spin_lock(&lli->lli_sa_lock); + if (unlikely(sa_received_empty(sai))) { spin_unlock(&lli->lli_sa_lock); - GOTO(out, rc = -EIDRM); + break; } - - if (rc != 0) { - do_sa_entry_to_stated(sai, entry, SA_ENTRY_INVA); - wakeup = (entry->se_index == sai->sai_index_wait); - } else { - entry->se_minfo = minfo; - entry->se_req = ptlrpc_request_addref(req); - /* Release the async ibits lock ASAP to avoid deadlock - * when statahead thread tries to enqueue lock on parent - * for readpage and other tries to enqueue lock on child - * with parent's lock held, for example: unlink. */ - entry->se_handle = it->d.lustre.it_lock_handle; - ll_intent_drop_lock(it); - wakeup = sa_received_empty(sai); - list_add_tail(&entry->se_list, - &sai->sai_entries_received); - } - sai->sai_replied++; + entry = sa_first_received_entry(sai); + atomic_inc(&entry->se_refcount); + list_del_init(&entry->se_list); spin_unlock(&lli->lli_sa_lock); - ll_sa_entry_put(sai, entry); - if (wakeup) - wake_up(&sai->sai_thread.t_ctl_waitq); - } + sa_post_one(sai, entry); + } - EXIT; + spin_lock(&lli->lli_agl_lock); + while (!agl_list_empty(sai)) { + struct ll_inode_info *clli; -out: - if (rc != 0) { - ll_intent_release(it); - iput(dir); - OBD_FREE_PTR(minfo); - } - if (sai != NULL) - ll_sai_put(sai); - return rc; + clli = agl_first_entry(sai); + list_del_init(&clli->lli_agl_list); + spin_unlock(&lli->lli_agl_lock); + + ll_agl_trigger(&clli->lli_vfs_inode, sai); + + spin_lock(&lli->lli_agl_lock); + } + spin_unlock(&lli->lli_agl_lock); +} + +static int ll_statahead_interpret(struct ptlrpc_request *req, + struct md_enqueue_info *minfo, int rc) +{ + struct lookup_intent *it = &minfo->mi_it; + struct inode *dir = minfo->mi_dir; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai; + struct ll_sa_entry *entry; + int wakeup; + ENTRY; + + if (it_disposition(it, DISP_LOOKUP_NEG)) + rc = -ENOENT; + + sai = ll_sai_get(dir); + LASSERT(sai != NULL); + LASSERT(!thread_is_stopped(&sai->sai_thread)); + + spin_lock(&lli->lli_sa_lock); + entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata); + LASSERT(entry != NULL); + if (rc != 0) { + __sa_entry_post_stat(sai, entry, SA_ENTRY_INVA); + wakeup = (entry->se_index == sai->sai_index_wait); + } else { + entry->se_minfo = minfo; + entry->se_req = ptlrpc_request_addref(req); + /* Release the async ibits lock ASAP to avoid deadlock + * when statahead thread tries to enqueue lock on parent + * for readpage and other tries to enqueue lock on child + * with parent's lock held, for example: unlink. */ + entry->se_handle = it->d.lustre.it_lock_handle; + ll_intent_drop_lock(it); + wakeup = sa_received_empty(sai); + list_add_tail(&entry->se_list, &sai->sai_entries_received); + } + sai->sai_replied++; + spin_unlock(&lli->lli_sa_lock); + + ll_sa_entry_put(sai, entry); + if (wakeup) + wake_up(&sai->sai_thread.t_ctl_waitq); + + if (rc != 0) { + ll_intent_release(it); + iput(dir); + OBD_FREE_PTR(minfo); + } + ll_sai_put(sai); + RETURN(rc); } static void sa_args_fini(struct md_enqueue_info *minfo, @@ -804,7 +790,6 @@ static int sa_args_init(struct inode *dir, struct inode *child, struct obd_capa **pcapa) { struct qstr *qstr = &entry->se_qstr; - struct ll_inode_info *lli = ll_i2info(dir); struct md_enqueue_info *minfo; struct ldlm_enqueue_info *einfo; struct md_op_data *op_data; @@ -827,11 +812,10 @@ static int sa_args_init(struct inode *dir, struct inode *child, return PTR_ERR(op_data); } - minfo->mi_it.it_op = IT_GETATTR; - minfo->mi_dir = igrab(dir); - minfo->mi_cb = ll_statahead_interpret; - minfo->mi_generation = lli->lli_sai->sai_generation; - minfo->mi_cbdata = entry->se_index; + minfo->mi_it.it_op = IT_GETATTR; + minfo->mi_dir = igrab(dir); + minfo->mi_cb = ll_statahead_interpret; + minfo->mi_cbdata = entry->se_index; einfo->ei_type = LDLM_IBITS; einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); @@ -923,63 +907,64 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, RETURN(rc); } -static void ll_statahead_one(struct dentry *parent, const char* entry_name, - int entry_name_len) +static void ll_statahead_one(struct dentry *parent, const char *name, + const int namelen) { - struct inode *dir = parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; - struct dentry *dentry = NULL; - struct ll_sa_entry *entry; - int rc; - int rc1; - ENTRY; + struct inode *dir = parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; + struct dentry *dentry = NULL; + struct ll_sa_entry *entry; + int rc; + ENTRY; - entry = ll_sa_entry_alloc(sai, sai->sai_index, entry_name, - entry_name_len); - if (IS_ERR(entry)) - RETURN_EXIT; + entry = ll_sa_entry_alloc(sai, sai->sai_index, name,namelen); + if (IS_ERR(entry)) + RETURN_EXIT; - dentry = d_lookup(parent, &entry->se_qstr); - if (!dentry) { - rc = do_sa_lookup(dir, entry); - } else { - rc = do_sa_revalidate(dir, entry, dentry); - if (rc == 1 && agl_should_run(sai, dentry->d_inode)) - ll_agl_add(sai, dentry->d_inode, entry->se_index); - } + dentry = d_lookup(parent, &entry->se_qstr); + if (!dentry) { + rc = do_sa_lookup(dir, entry); + } else { + rc = do_sa_revalidate(dir, entry, dentry); + if (rc == 1 && agl_should_run(sai, dentry->d_inode)) + ll_agl_add(sai, dentry->d_inode, entry->se_index); + } - if (dentry != NULL) - dput(dentry); + if (dentry != NULL) + dput(dentry); if (rc) { - rc1 = ll_sa_entry_to_stated(sai, entry, - rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); - if (rc1 == 0 && entry->se_index == sai->sai_index_wait) + sa_entry_post_stat(sai, entry, + rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (entry->se_index == sai->sai_index_wait) wake_up(&sai->sai_waitq); } else { sai->sai_sent++; } - sai->sai_index++; - /* drop one refcount on entry by ll_sa_entry_alloc */ - ll_sa_entry_put(sai, entry); + sai->sai_index++; + /* drop one refcount on entry by ll_sa_entry_alloc */ + ll_sa_entry_put(sai, entry); - EXIT; + EXIT; } static int ll_agl_thread(void *arg) { - struct dentry *parent = (struct dentry *)arg; - struct inode *dir = parent->d_inode; - struct ll_inode_info *plli = ll_i2info(dir); - struct ll_inode_info *clli; - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); - struct ptlrpc_thread *thread = &sai->sai_agl_thread; - struct l_wait_info lwi = { 0 }; - ENTRY; + struct dentry *parent = (struct dentry *)arg; + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai; + struct ptlrpc_thread *thread; + struct l_wait_info lwi = { 0 }; + ENTRY; + + sai = ll_sai_get(dir); + thread = &sai->sai_agl_thread; thread->t_pid = current_pid(); CDEBUG(D_READA, "agl thread started: sai %p, parent %.*s\n", sai, parent->d_name.len, parent->d_name.name); @@ -1064,23 +1049,25 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) static int ll_statahead_thread(void *arg) { - struct dentry *parent = (struct dentry *)arg; - struct inode *dir = parent->d_inode; - struct ll_inode_info *plli = ll_i2info(dir); - struct ll_inode_info *clli; - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); - struct ptlrpc_thread *thread = &sai->sai_thread; - struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; - int first = 0; - int rc = 0; - struct md_op_data *op_data; - struct ll_dir_chain chain; - struct l_wait_info lwi = { 0 }; - struct page *page = NULL; - __u64 pos = 0; + struct dentry *parent = (struct dentry *)arg; + struct inode *dir = parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai; + struct ptlrpc_thread *thread; + struct ptlrpc_thread *agl_thread; + int first = 0; + struct md_op_data *op_data; + struct ll_dir_chain chain; + struct l_wait_info lwi = { 0 }; + struct page *page = NULL; + __u64 pos = 0; + int rc = 0; ENTRY; + sai = ll_sai_get(dir); + thread = &sai->sai_thread; + agl_thread = &sai->sai_agl_thread; thread->t_pid = current_pid(); CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n", sai, parent->d_name.len, parent->d_name.name); @@ -1088,7 +1075,7 @@ static int ll_statahead_thread(void *arg) op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, LUSTRE_OPC_ANY, dir); if (IS_ERR(op_data)) - GOTO(out_put, rc = PTR_ERR(op_data)); + GOTO(out, rc = PTR_ERR(op_data)); op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; @@ -1096,32 +1083,36 @@ static int ll_statahead_thread(void *arg) ll_start_agl(parent, sai); atomic_inc(&sbi->ll_sa_total); - spin_lock(&plli->lli_sa_lock); + spin_lock(&lli->lli_sa_lock); if (thread_is_init(thread)) /* If someone else has changed the thread state * (e.g. already changed to SVC_STOPPING), we can't just * blindly overwrite that setting. */ thread_set_flags(thread, SVC_RUNNING); - spin_unlock(&plli->lli_sa_lock); + spin_unlock(&lli->lli_sa_lock); wake_up(&thread->t_ctl_waitq); ll_dir_chain_init(&chain); - page = ll_get_dir_page(dir, op_data, pos, &chain); - while (1) { + while (pos != MDS_DIR_END_OFF && thread_is_running(thread)) { struct lu_dirpage *dp; struct lu_dirent *ent; + sai->sai_in_readpage = 1; + page = ll_get_dir_page(dir, op_data, pos, &chain); + sai->sai_in_readpage = 0; if (IS_ERR(page)) { rc = PTR_ERR(page); CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 "/"LPU64" opendir_pid = %u: rc = %d\n", PFID(ll_inode2fid(dir)), pos, sai->sai_index, - plli->lli_opendir_pid, rc); - GOTO(out, rc); + lli->lli_opendir_pid, rc); + break; } dp = page_address(page); - for (ent = lu_dirent_start(dp); ent != NULL; + for (ent = lu_dirent_start(dp); + ent != NULL && thread_is_running(thread) && + !sa_low_hit(sai); ent = lu_dirent_next(ent)) { __u64 hash; int namelen; @@ -1168,115 +1159,67 @@ static int ll_statahead_thread(void *arg) if (unlikely(++first == 1)) continue; -keep_it: - l_wait_event(thread->t_ctl_waitq, - !sa_sent_full(sai) || - !sa_received_empty(sai) || - !agl_list_empty(sai) || - !thread_is_running(thread), - &lwi); + /* wait for spare statahead window */ + do { + l_wait_event(thread->t_ctl_waitq, + !sa_sent_full(sai) || + !sa_received_empty(sai) || + !agl_list_empty(sai) || + !thread_is_running(thread), + &lwi); -interpret_it: - while (!sa_received_empty(sai)) ll_post_statahead(sai); + } while (sa_sent_full(sai) && + thread_is_running(thread)); - if (unlikely(!thread_is_running(thread))) { - ll_release_page(dir, page, false); - GOTO(out, rc = 0); - } - - /* If no window for metadata statahead, but there are - * some AGL entries to be triggered, then try to help - * to process the AGL entries. */ - if (sa_sent_full(sai)) { - spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai)) { - clli = agl_first_entry(sai); - list_del_init(&clli->lli_agl_list); - spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, - sai); - - if (!sa_received_empty(sai)) - goto interpret_it; - - if (unlikely( - !thread_is_running(thread))) { - ll_release_page(dir, page, - false); - GOTO(out, rc = 0); - } - - if (!sa_sent_full(sai)) - goto do_it; - - spin_lock(&plli->lli_agl_lock); - } - spin_unlock(&plli->lli_agl_lock); - - goto keep_it; - } -do_it: ll_statahead_one(parent, name, namelen); } pos = le64_to_cpu(dp->ldp_hash_end); - if (pos == MDS_DIR_END_OFF) { - /* - * End of directory reached. - */ - ll_release_page(dir, page, false); - while (1) { - l_wait_event(thread->t_ctl_waitq, - !sa_received_empty(sai) || - sai->sai_sent == sai->sai_replied || - !thread_is_running(thread), - &lwi); + ll_release_page(dir, page, + le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - while (!sa_received_empty(sai)) - ll_post_statahead(sai); - - if (unlikely(!thread_is_running(thread))) - GOTO(out, rc = 0); + if (sa_low_hit(sai)) { + rc = -EFAULT; + atomic_inc(&sbi->ll_sa_wrong); + CDEBUG(D_READA, "Statahead for dir "DFID" hit " + "ratio too low: hit/miss "LPU64"/"LPU64 + ", sent/replied "LPU64"/"LPU64", stopping " + "statahead thread: pid %d\n", + PFID(&lli->lli_fid), sai->sai_hit, + sai->sai_miss, sai->sai_sent, + sai->sai_replied, current_pid()); + break; + } + } + ll_dir_chain_fini(&chain); + ll_finish_md_op_data(op_data); - if (sai->sai_sent == sai->sai_replied && - sa_received_empty(sai)) - break; - } + if (rc < 0) { + spin_lock(&lli->lli_sa_lock); + thread_set_flags(thread, SVC_STOPPING); + lli->lli_sa_enabled = 0; + spin_unlock(&lli->lli_sa_lock); + } - spin_lock(&plli->lli_agl_lock); - while (!agl_list_empty(sai) && - thread_is_running(thread)) { - clli = agl_first_entry(sai); - list_del_init(&clli->lli_agl_list); - spin_unlock(&plli->lli_agl_lock); - ll_agl_trigger(&clli->lli_vfs_inode, sai); - spin_lock(&plli->lli_agl_lock); - } - spin_unlock(&plli->lli_agl_lock); + /* statahead is finished, but statahead entries need to be cached, wait + * for file release to stop me. */ + while (thread_is_running(thread)) { + l_wait_event(thread->t_ctl_waitq, + !sa_received_empty(sai) || + !agl_list_empty(sai) || + !thread_is_running(thread), + &lwi); - GOTO(out, rc = 0); - } else { - /* - * chain is exhausted. - * Normal case: continue to the next page. - */ - ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) & - LDF_COLLIDE); - sai->sai_in_readpage = 1; - page = ll_get_dir_page(dir, op_data, pos, &chain); - sai->sai_in_readpage = 0; - } + ll_post_statahead(sai); } -out: + EXIT; - ll_dir_chain_fini(&chain); - ll_finish_md_op_data(op_data); -out_put: - if (sai->sai_agl_valid) { - spin_lock(&plli->lli_agl_lock); +out: + if (sai->sai_agl_valid) { + spin_lock(&lli->lli_agl_lock); thread_set_flags(agl_thread, SVC_STOPPING); - spin_unlock(&plli->lli_agl_lock); + spin_unlock(&lli->lli_agl_lock); wake_up(&agl_thread->t_ctl_waitq); CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n", @@ -1284,23 +1227,27 @@ out_put: l_wait_event(agl_thread->t_ctl_waitq, thread_is_stopped(agl_thread), &lwi); - } else { - /* Set agl_thread flags anyway. */ - thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); - } - spin_lock(&plli->lli_sa_lock); - if (!sa_received_empty(sai)) { - thread_set_flags(thread, SVC_STOPPING); - spin_unlock(&plli->lli_sa_lock); - - /* To release the resources held by received entries. */ - while (!sa_received_empty(sai)) - ll_post_statahead(sai); + } else { + /* Set agl_thread flags anyway. */ + thread_set_flags(agl_thread, SVC_STOPPED); + } - spin_lock(&plli->lli_sa_lock); + /* wait for inflight statahead RPCs to finish, and then we can free sai + * safely because statahead RPC will access sai data */ + while (sai->sai_sent != sai->sai_replied) { + /* in case we're not woken up, timeout wait */ + lwi = LWI_TIMEOUT(HZ >> 3, NULL, NULL); + l_wait_event(thread->t_ctl_waitq, + sai->sai_sent == sai->sai_replied, &lwi); } + + /* release resources held by received entries. */ + ll_post_statahead(sai); + + spin_lock(&lli->lli_sa_lock); thread_set_flags(thread, SVC_STOPPED); - spin_unlock(&plli->lli_sa_lock); + spin_unlock(&lli->lli_sa_lock); + wake_up(&sai->sai_waitq); wake_up(&thread->t_ctl_waitq); ll_sai_put(sai); @@ -1310,52 +1257,52 @@ out_put: return rc; } -/** - * called in ll_file_release(). - */ -void ll_stop_statahead(struct inode *dir, void *key) +/* authorize opened dir handle @key to statahead later */ +void ll_authorize_statahead(struct inode *dir, void *key) { struct ll_inode_info *lli = ll_i2info(dir); - if (unlikely(key == NULL)) - return; - spin_lock(&lli->lli_sa_lock); - if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) { - spin_unlock(&lli->lli_sa_lock); - return; - } + if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL) { + /* + * if lli_sai is not NULL, it means previous statahead is not + * finished yet, we'd better not start a new statahead for now. + */ + LASSERT(lli->lli_opendir_pid == 0); + lli->lli_opendir_key = key; + lli->lli_opendir_pid = current_pid(); + lli->lli_sa_enabled = 1; + } + spin_unlock(&lli->lli_sa_lock); +} - lli->lli_opendir_key = NULL; +/* deauthorize opened dir handle @key to statahead, but statahead thread may + * still be running, notify it to quit. */ +void ll_deauthorize_statahead(struct inode *dir, void *key) +{ + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai; - if (lli->lli_sai) { - struct l_wait_info lwi = { 0 }; - struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread; + LASSERT(lli->lli_opendir_key == key); + LASSERT(lli->lli_opendir_pid != 0); - if (!thread_is_stopped(thread)) { - thread_set_flags(thread, SVC_STOPPING); - spin_unlock(&lli->lli_sa_lock); - wake_up(&thread->t_ctl_waitq); - - CDEBUG(D_READA, "stop statahead thread: sai %p pid %u\n", - lli->lli_sai, (unsigned int)thread->t_pid); - l_wait_event(thread->t_ctl_waitq, - thread_is_stopped(thread), - &lwi); - } else { - spin_unlock(&lli->lli_sa_lock); - } + CDEBUG(D_READA, "deauthorize statahead for "DFID"\n", + PFID(&lli->lli_fid)); + spin_lock(&lli->lli_sa_lock); + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; + lli->lli_sa_enabled = 0; + sai = lli->lli_sai; + if (sai != NULL && thread_is_running(&sai->sai_thread)) { /* - * Put the ref which was held when first statahead_enter. - * It maybe not the last ref for some statahead requests - * maybe inflight. + * statahead thread may not quit yet because it needs to cache + * stated entries, now it's time to tell it to quit. */ - ll_sai_put(lli->lli_sai); - } else { - lli->lli_opendir_pid = 0; - spin_unlock(&lli->lli_sa_lock); + thread_set_flags(&sai->sai_thread, SVC_STOPPING); + wake_up(&sai->sai_thread.t_ctl_waitq); } + spin_unlock(&lli->lli_sa_lock); } enum { @@ -1496,264 +1443,242 @@ out: static void ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { - struct ptlrpc_thread *thread = &sai->sai_thread; - struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); - int hit; - ENTRY; - - if (entry != NULL && entry->se_stat == SA_ENTRY_SUCC) - hit = 1; - else - hit = 0; - - ll_sa_entry_fini(sai, entry); - if (hit) { - sai->sai_hit++; - sai->sai_consecutive_miss = 0; - sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); - } else { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + if (entry != NULL && entry->se_stat == SA_ENTRY_SUCC) { + struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); + sai->sai_hit++; + sai->sai_consecutive_miss = 0; + sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); + } else { sai->sai_miss++; sai->sai_consecutive_miss++; - if (sa_low_hit(sai) && thread_is_running(thread)) { - atomic_inc(&sbi->ll_sa_wrong); - CDEBUG(D_READA, "Statahead for dir "DFID" hit " - "ratio too low: hit/miss "LPU64"/"LPU64 - ", sent/replied "LPU64"/"LPU64", stopping " - "statahead thread\n", - PFID(&lli->lli_fid), sai->sai_hit, - sai->sai_miss, sai->sai_sent, - sai->sai_replied); - spin_lock(&lli->lli_sa_lock); - if (!thread_is_stopped(thread)) - thread_set_flags(thread, SVC_STOPPING); - spin_unlock(&lli->lli_sa_lock); - } } - - if (!thread_is_stopped(thread)) - wake_up(&thread->t_ctl_waitq); - - EXIT; + ll_sa_entry_fini(sai, entry); + wake_up(&sai->sai_thread.t_ctl_waitq); } -/** - * Start statahead thread if this is the first dir entry. - * Otherwise if a thread is started already, wait it until it is ahead of me. - * \retval 1 -- find entry with lock in cache, the caller needs to do - * nothing. - * \retval 0 -- find entry in cache, but without lock, the caller needs - * refresh from MDS. - * \retval others -- the caller need to process as non-statahead. - */ -int do_statahead_enter(struct inode *dir, struct dentry **dentryp, - int only_unplug) +static int revalidate_statahead_dentry(struct inode *dir, + struct ll_statahead_info *sai, + struct dentry **dentryp, + int only_unplug) { - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; - struct dentry *parent; - struct ll_sa_entry *entry; - struct ptlrpc_thread *thread; - struct l_wait_info lwi = { 0 }; - struct task_struct *task; - int rc = 0; - struct ll_inode_info *plli; + struct ll_sa_entry *entry = NULL; + struct l_wait_info lwi = { 0 }; + int rc = 0; ENTRY; - LASSERT(lli->lli_opendir_pid == current_pid()); - - if (sai) { - thread = &sai->sai_thread; - if (unlikely(thread_is_stopped(thread) && - list_empty(&sai->sai_entries_stated))) { - /* to release resource */ - ll_stop_statahead(dir, lli->lli_opendir_key); - RETURN(-EAGAIN); - } + if ((*dentryp)->d_name.name[0] == '.') { + if (sai->sai_ls_all || + sai->sai_miss_hidden >= sai->sai_skip_hidden) { + /* + * Hidden dentry is the first one, or statahead + * thread does not skip so many hidden dentries + * before "sai_ls_all" enabled as below. + */ + } else { + if (!sai->sai_ls_all) + /* + * It maybe because hidden dentry is not + * the first one, "sai_ls_all" was not + * set, then "ls -al" missed. Enable + * "sai_ls_all" for such case. + */ + sai->sai_ls_all = 1; - if ((*dentryp)->d_name.name[0] == '.') { - if (sai->sai_ls_all || - sai->sai_miss_hidden >= sai->sai_skip_hidden) { - /* - * Hidden dentry is the first one, or statahead - * thread does not skip so many hidden dentries - * before "sai_ls_all" enabled as below. - */ - } else { - if (!sai->sai_ls_all) - /* - * It maybe because hidden dentry is not - * the first one, "sai_ls_all" was not - * set, then "ls -al" missed. Enable - * "sai_ls_all" for such case. - */ - sai->sai_ls_all = 1; - - /* - * Such "getattr" has been skipped before - * "sai_ls_all" enabled as above. - */ - sai->sai_miss_hidden++; - RETURN(-EAGAIN); - } - } + /* + * Such "getattr" has been skipped before + * "sai_ls_all" enabled as above. + */ + sai->sai_miss_hidden++; + RETURN(-EAGAIN); + } + } - entry = ll_sa_entry_get_byname(sai, &(*dentryp)->d_name); - if (entry == NULL || only_unplug) { - ll_sai_unplug(sai, entry); - RETURN(entry ? 1 : -EAGAIN); - } + entry = ll_sa_entry_get_byname(sai, &(*dentryp)->d_name); + if (entry == NULL || only_unplug) { + ll_sai_unplug(sai, entry); + RETURN(entry ? 1 : -EAGAIN); + } - /* if statahead is busy in readdir, help it do post-work */ - while (!ll_sa_entry_stated(entry) && - sai->sai_in_readpage && !sa_received_empty(sai)) - ll_post_statahead(sai); - - if (!ll_sa_entry_stated(entry)) { - sai->sai_index_wait = entry->se_index; - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL, - LWI_ON_SIGNAL_NOOP, NULL); - rc = l_wait_event(sai->sai_waitq, - ll_sa_entry_stated(entry) || - thread_is_stopped(thread), - &lwi); - if (rc < 0) { - ll_sai_unplug(sai, entry); - RETURN(-EAGAIN); - } - } + /* if statahead is busy in readdir, help it do post-work */ + if (!ll_sa_entry_stated(entry) && sai->sai_in_readpage) + ll_post_statahead(sai); + + if (!ll_sa_entry_stated(entry)) { + sai->sai_index_wait = entry->se_index; + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL, + LWI_ON_SIGNAL_NOOP, NULL); + rc = l_wait_event(sai->sai_waitq, + ll_sa_entry_stated(entry) || + thread_is_stopped(&sai->sai_thread), + &lwi); + if (rc < 0) { + ll_sai_unplug(sai, entry); + RETURN(-EAGAIN); + } + } - if (entry->se_stat == SA_ENTRY_SUCC && - entry->se_inode != NULL) { - struct inode *inode = entry->se_inode; - struct lookup_intent it = { .it_op = IT_GETATTR, - .d.lustre.it_lock_handle = - entry->se_handle }; - __u64 bits; - - rc = md_revalidate_lock(ll_i2mdexp(dir), &it, - ll_inode2fid(inode), &bits); - if (rc == 1) { - if ((*dentryp)->d_inode == NULL) { - struct dentry *alias; - - alias = ll_splice_alias(inode, - *dentryp); - if (IS_ERR(alias)) { - ll_sai_unplug(sai, entry); - RETURN(PTR_ERR(alias)); - } - *dentryp = alias; - } else if ((*dentryp)->d_inode != inode) { - /* revalidate, but inode is recreated */ - CDEBUG(D_READA, - "%s: stale dentry %.*s inode " - DFID", statahead inode "DFID - "\n", - ll_get_fsname((*dentryp)->d_inode->i_sb, NULL, 0), - (*dentryp)->d_name.len, - (*dentryp)->d_name.name, - PFID(ll_inode2fid((*dentryp)->d_inode)), - PFID(ll_inode2fid(inode))); - ll_intent_release(&it); + if (entry->se_stat == SA_ENTRY_SUCC && entry->se_inode != NULL) { + struct inode *inode = entry->se_inode; + struct lookup_intent it = { .it_op = IT_GETATTR, + .d.lustre.it_lock_handle = + entry->se_handle }; + __u64 bits; + + rc = md_revalidate_lock(ll_i2mdexp(dir), &it, + ll_inode2fid(inode), &bits); + if (rc == 1) { + if ((*dentryp)->d_inode == NULL) { + struct dentry *alias; + + alias = ll_splice_alias(inode, *dentryp); + if (IS_ERR(alias)) { ll_sai_unplug(sai, entry); - RETURN(-ESTALE); - } else { - iput(inode); + RETURN(PTR_ERR(alias)); } - entry->se_inode = NULL; + *dentryp = alias; + } else if ((*dentryp)->d_inode != inode) { + /* revalidate, but inode is recreated */ + CDEBUG(D_READA, + "%s: stale dentry %.*s inode " + DFID", statahead inode "DFID + "\n", + ll_get_fsname((*dentryp)->d_inode->i_sb, + NULL, 0), + (*dentryp)->d_name.len, + (*dentryp)->d_name.name, + PFID(ll_inode2fid((*dentryp)->d_inode)), + PFID(ll_inode2fid(inode))); + ll_sai_unplug(sai, entry); + RETURN(-ESTALE); + } else { + iput(inode); + } + entry->se_inode = NULL; - if ((bits & MDS_INODELOCK_LOOKUP) && - d_lustre_invalid(*dentryp)) - d_lustre_revalidate(*dentryp); - ll_intent_release(&it); - } - } + if ((bits & MDS_INODELOCK_LOOKUP) && + d_lustre_invalid(*dentryp)) + d_lustre_revalidate(*dentryp); + ll_intent_release(&it); + } + } - ll_sai_unplug(sai, entry); - RETURN(rc); - } + ll_sai_unplug(sai, entry); + RETURN(rc); +} - /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ - rc = is_first_dirent(dir, *dentryp); - if (rc == LS_NONE_FIRST_DE) - /* It is not "ls -{a}l" operation, no need statahead for it. */ - GOTO(out, rc = -EAGAIN); +static int start_statahead_thread(struct inode *dir, struct dentry *dentry) +{ + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = NULL; + struct dentry *parent; + struct ptlrpc_thread *thread; + struct l_wait_info lwi = { 0 }; + struct task_struct *task; + int rc; + ENTRY; - sai = ll_sai_alloc(); - if (sai == NULL) - GOTO(out, rc = -ENOMEM); + /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ + rc = is_first_dirent(dir, dentry); + if (rc == LS_NONE_FIRST_DE) + /* It is not "ls -{a}l" operation, no need statahead for it. */ + GOTO(out, rc = -EAGAIN); + + sai = ll_sai_alloc(); + if (sai == NULL) + GOTO(out, rc = -ENOMEM); + + sai->sai_ls_all = (rc == LS_FIRST_DOT_DE); + sai->sai_inode = igrab(dir); + if (unlikely(sai->sai_inode == NULL)) { + CWARN("Do not start stat ahead on dying inode "DFID"\n", + PFID(&lli->lli_fid)); + GOTO(out, rc = -ESTALE); + } - sai->sai_ls_all = (rc == LS_FIRST_DOT_DE); - sai->sai_inode = igrab(dir); - if (unlikely(sai->sai_inode == NULL)) { - CWARN("Do not start stat ahead on dying inode "DFID"\n", - PFID(&lli->lli_fid)); - GOTO(out, rc = -ESTALE); - } + /* get parent reference count here, and put it in ll_statahead_thread */ + parent = dget(dentry->d_parent); + if (unlikely(sai->sai_inode != parent->d_inode)) { + struct ll_inode_info *nlli = ll_i2info(parent->d_inode); - /* get parent reference count here, and put it in ll_statahead_thread */ - parent = dget((*dentryp)->d_parent); - if (unlikely(sai->sai_inode != parent->d_inode)) { - struct ll_inode_info *nlli = ll_i2info(parent->d_inode); - - CWARN("Race condition, someone changed %.*s just now: " - "old parent "DFID", new parent "DFID"\n", - (*dentryp)->d_name.len, (*dentryp)->d_name.name, - PFID(&lli->lli_fid), PFID(&nlli->lli_fid)); - dput(parent); - iput(sai->sai_inode); - GOTO(out, rc = -EAGAIN); - } + CWARN("Race condition, someone changed %.*s just now: " + "old parent "DFID", new parent "DFID"\n", + dentry->d_name.len, dentry->d_name.name, + PFID(&lli->lli_fid), PFID(&nlli->lli_fid)); + dput(parent); + iput(sai->sai_inode); + GOTO(out, rc = -EAGAIN); + } CDEBUG(D_READA, "start statahead thread: sai %p, parent %.*s\n", sai, parent->d_name.len, parent->d_name.name); - /* The sai buffer already has one reference taken at allocation time, - * but as soon as we expose the sai by attaching it to the lli that - * default reference can be dropped by another thread calling - * ll_stop_statahead. We need to take a local reference to protect - * the sai buffer while we intend to access it. */ - ll_sai_get(sai); lli->lli_sai = sai; - plli = ll_i2info(parent->d_inode); task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u", - plli->lli_opendir_pid); + lli->lli_opendir_pid); thread = &sai->sai_thread; if (IS_ERR(task)) { rc = PTR_ERR(task); CERROR("cannot start ll_sa thread: rc = %d\n", rc); dput(parent); - lli->lli_opendir_key = NULL; - thread_set_flags(thread, SVC_STOPPED); - thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); - /* Drop both our own local reference and the default - * reference from allocation time. */ - ll_sai_put(sai); + lli->lli_opendir_key = NULL; + thread_set_flags(thread, SVC_STOPPED); + thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); ll_sai_put(sai); - LASSERT(lli->lli_sai == NULL); - RETURN(-EAGAIN); - } + LASSERT(lli->lli_sai == NULL); + RETURN(-EAGAIN); + } - l_wait_event(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread), - &lwi); + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); + atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running); ll_sai_put(sai); - /* - * We don't stat-ahead for the first dirent since we are already in - * lookup. - */ - RETURN(-EAGAIN); + /* + * We don't stat-ahead for the first dirent since we are already in + * lookup. + */ + RETURN(-EAGAIN); out: - if (sai != NULL) - OBD_FREE_PTR(sai); + if (sai != NULL) + OBD_FREE_PTR(sai); spin_lock(&lli->lli_sa_lock); lli->lli_opendir_key = NULL; lli->lli_opendir_pid = 0; + lli->lli_sa_enabled = 0; spin_unlock(&lli->lli_sa_lock); - return rc; + + RETURN(rc); +} + +/** + * Start statahead thread if this is the first dir entry. + * Otherwise if a thread is started already, wait it until it is ahead of me. + * \retval 1 -- find entry with lock in cache, the caller needs to do + * nothing. + * \retval 0 -- find entry in cache, but without lock, the caller needs + * refresh from MDS. + * \retval others -- the caller need to process as non-statahead. + */ +int do_statahead_enter(struct inode *dir, struct dentry **dentryp, + int only_unplug) +{ + struct ll_statahead_info *sai; + + sai = ll_sai_get(dir); + if (sai != NULL) { + int rc; + + rc = revalidate_statahead_dentry(dir, sai, dentryp, + only_unplug); + CDEBUG(D_READA, "revalidate statahead %.*s: %d.\n", + (*dentryp)->d_name.len, (*dentryp)->d_name.name, rc); + ll_sai_put(sai); + return rc; + } + + return start_statahead_thread(dir, *dentryp); } -- 1.8.3.1