From 672ab0e00d6148330561d21ccff9331124914d35 Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Fri, 14 Mar 2014 20:10:32 +0800 Subject: [PATCH] LU-3270 statahead: small fixes and cleanup small fixes: * when 'unplug' is set for ll_statahead(), sa_put() shouldn't kill the entry found, because its inflight RPC may not finish yet. * remove 'sai_generation', add 'lli_sa_generation' because the former one is not safe to access without lock. * revalidate_statahead_dentry() may fail to wait for statahead entry to become ready, in this case it should not release this entry, because it may be used by inflight statahead RPC. cleanups: * rename ll_statahead_enter() to ll_statahead(). * move dentry 'lld_sa_generation' update to ll_statahead() to simplify code and logic. * other small cleanups. Signed-off-by: Lai Siyao Change-Id: I65759c7dfcbe879b42f14152dbfe5949e3d37ea0 Reviewed-on: http://review.whamcloud.com/9667 Tested-by: Jenkins Reviewed-by: Fan Yong Reviewed-by: James Simmons Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/llite/dcache.c | 5 +- lustre/llite/llite_internal.h | 165 ++++++----------- lustre/llite/namei.c | 29 ++- lustre/llite/statahead.c | 415 +++++++++++++++++++++++------------------- lustre/mdc/mdc_request.c | 2 +- 5 files changed, 305 insertions(+), 311 deletions(-) diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 71fc91e..ad9be91 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -376,7 +376,7 @@ static int ll_revalidate_dentry(struct dentry *dentry, if (lookup_flags & (LOOKUP_OPEN | LOOKUP_CREATE)) return 0; - if (!dentry_need_statahead(dir, dentry)) + if (!dentry_may_statahead(dir, dentry)) return 1; #ifndef HAVE_DCACHE_LOCK @@ -384,8 +384,7 @@ static int ll_revalidate_dentry(struct dentry *dentry, return -ECHILD; #endif - do_statahead_enter(dir, &dentry, dentry->d_inode == NULL); - ll_statahead_mark(dir, dentry); + ll_statahead(dir, &dentry, dentry->d_inode == NULL); return 1; } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 1a5623d..a3e1ff3 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -182,49 +182,41 @@ struct ll_inode_info { /* for directory */ struct { /* serialize normal readdir and statahead-readdir. */ - struct mutex d_readdir_mutex; + struct mutex lli_readdir_mutex; /* metadata statahead */ /* since parent-child threads can share the same @file * struct, "opendir_key" is the token when dir close for * case of parent exit before child -- it is me should * cleanup the dir readahead. */ - void *d_opendir_key; - struct ll_statahead_info *d_sai; + void *lli_opendir_key; + struct ll_statahead_info *lli_sai; /* protect statahead stuff. */ - spinlock_t d_sa_lock; + spinlock_t lli_sa_lock; /* "opendir_pid" is the token when lookup/revalid * -- I am the owner of dir statahead. */ - pid_t d_opendir_pid; + pid_t lli_opendir_pid; /* stat will try to access statahead entries or start * statahead if this flag is set, and this flag will be * set upon dir open, and cleared when dir is closed, * statahead hit ratio is too low, or start statahead * thread failed. */ - unsigned int d_sa_enabled:1; + unsigned int lli_sa_enabled:1; + /* generation for statahead */ + unsigned int lli_sa_generation; /* directory stripe information */ - struct lmv_stripe_md *d_lsm_md; + struct lmv_stripe_md *lli_lsm_md; /* striped directory size */ - loff_t d_stripe_size; + loff_t lli_stripe_dir_size; /* striped directory nlink */ - __u64 d_stripe_nlink; - } d; - -#define lli_readdir_mutex u.d.d_readdir_mutex -#define lli_opendir_key u.d.d_opendir_key -#define lli_sai u.d.d_sai -#define lli_sa_lock u.d.d_sa_lock -#define lli_sa_enabled u.d.d_sa_enabled -#define lli_opendir_pid u.d.d_opendir_pid -#define lli_lsm_md u.d.d_lsm_md -#define lli_stripe_dir_size u.d.d_stripe_size -#define lli_stripe_dir_nlink u.d.d_stripe_nlink + __u64 lli_stripe_dir_nlink; + }; /* for non-directory */ struct { - struct mutex f_size_mutex; - char *f_symlink_name; - __u64 f_maxbytes; + struct mutex lli_size_mutex; + char *lli_symlink_name; + __u64 lli_maxbytes; /* * struct rw_semaphore { * signed long count; // align d.d_def_acl @@ -232,16 +224,16 @@ struct ll_inode_info { * struct list_head wait_list; * } */ - struct rw_semaphore f_trunc_sem; - struct range_lock_tree f_write_tree; + struct rw_semaphore lli_trunc_sem; + struct range_lock_tree lli_write_tree; - struct rw_semaphore f_glimpse_sem; - cfs_time_t f_glimpse_time; - struct list_head f_agl_list; - __u64 f_agl_index; + struct rw_semaphore lli_glimpse_sem; + cfs_time_t lli_glimpse_time; + struct list_head lli_agl_list; + __u64 lli_agl_index; /* for writepage() only to communicate to fsync */ - int f_async_rc; + int lli_async_rc; /* * whenever a process try to read/write the file, the @@ -251,22 +243,9 @@ struct ll_inode_info { * so the read/write statistics for jobid will not be * accurate if the file is shared by different jobs. */ - char f_jobid[LUSTRE_JOBID_SIZE]; - } f; - -#define lli_size_mutex u.f.f_size_mutex -#define lli_symlink_name u.f.f_symlink_name -#define lli_maxbytes u.f.f_maxbytes -#define lli_trunc_sem u.f.f_trunc_sem -#define lli_write_tree u.f.f_write_tree -#define lli_glimpse_sem u.f.f_glimpse_sem -#define lli_glimpse_time u.f.f_glimpse_time -#define lli_agl_list u.f.f_agl_list -#define lli_agl_index u.f.f_agl_index -#define lli_async_rc u.f.f_async_rc -#define lli_jobid u.f.f_jobid - - } u; + char lli_jobid[LUSTRE_JOBID_SIZE]; + }; + }; /* XXX: For following frequent used members, although they maybe special * used for non-directory object, it is some time-wasting to check @@ -1310,31 +1289,31 @@ void et_fini(struct eacl_table *et); /* per inode struct, for dir only */ struct ll_statahead_info { - struct inode *sai_inode; - atomic_t sai_refcount; /* when access this struct, hold + struct dentry *sai_dentry; + atomic_t sai_refcount; /* when access this struct, hold * refcount */ - unsigned int sai_generation; /* generation for statahead */ - unsigned int sai_max; /* max ahead of lookup */ - __u64 sai_sent; /* stat requests sent count */ - __u64 sai_replied; /* stat requests which received + unsigned int sai_max; /* max ahead of lookup */ + __u64 sai_sent; /* stat requests sent count */ + __u64 sai_replied; /* stat requests which received * reply */ - __u64 sai_index; /* index of statahead entry */ - __u64 sai_index_wait; /* index of entry which is the + __u64 sai_index; /* index of statahead entry */ + __u64 sai_index_wait; /* index of entry which is the * caller is waiting for */ - __u64 sai_hit; /* hit count */ - __u64 sai_miss; /* miss count: - * for "ls -al" case, it - * includes hidden dentry miss; + __u64 sai_hit; /* hit count */ + __u64 sai_miss; /* miss count: + * for "ls -al" case, includes + * hidden dentry miss; * for "ls -l" case, it does not * include hidden dentry miss. * "sai_miss_hidden" is used for * the later case. */ - unsigned int sai_consecutive_miss; /* consecutive miss */ - unsigned int sai_miss_hidden;/* "ls -al", but first dentry - * is not a hidden one */ - unsigned int sai_skip_hidden;/* skipped hidden dentry count */ - unsigned int sai_ls_all:1, /* "ls -al", do stat-ahead for + unsigned int sai_consecutive_miss; /* consecutive miss */ + unsigned int sai_miss_hidden;/* "ls -al", but first dentry + * is not a hidden one */ + unsigned int sai_skip_hidden;/* skipped hidden dentry count + */ + unsigned int sai_ls_all:1, /* "ls -al", do stat-ahead for * hidden entries */ sai_agl_valid:1,/* AGL is valid for the dir */ sai_in_readpage:1;/* statahead is in readdir()*/ @@ -1351,8 +1330,7 @@ struct ll_statahead_info { atomic_t sai_cache_count; /* entry count in cache */ }; -int do_statahead_enter(struct inode *dir, struct dentry **dentry, - int only_unplug); +int ll_statahead(struct inode *dir, struct dentry **dentry, bool unplug); void ll_authorize_statahead(struct inode *dir, void *key); void ll_deauthorize_statahead(struct inode *dir, void *key); @@ -1368,24 +1346,10 @@ static inline int ll_glimpse_size(struct inode *inode) return rc; } -static inline void -ll_statahead_mark(struct inode *dir, struct dentry *dentry) -{ - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; - struct ll_dentry_data *ldd = ll_d2d(dentry); - - /* not the same process, don't mark */ - if (lli->lli_opendir_pid != current_pid()) - return; - - LASSERT(ldd != NULL); - if (sai != NULL) - ldd->lld_sa_generation = sai->sai_generation; -} - +/* dentry may statahead when statahead is enabled and current process has opened + * parent directory, and this dentry hasn't accessed statahead cache before */ static inline bool -dentry_need_statahead(struct inode *dir, struct dentry *dentry) +dentry_may_statahead(struct inode *dir, struct dentry *dentry) { struct ll_inode_info *lli; struct ll_dentry_data *ldd; @@ -1406,38 +1370,27 @@ dentry_need_statahead(struct inode *dir, struct dentry *dentry) if (lli->lli_opendir_pid != current_pid()) return false; - ldd = ll_d2d(dentry); /* - * When stats a dentry, the system trigger more than once "revalidate" - * or "lookup", for "getattr", for "getxattr", and maybe for others. - * Under patchless client mode, the operation intent is not accurate, - * which maybe misguide the statahead thread. For example: - * The "revalidate" call for "getattr" and "getxattr" of a dentry maybe - * have the same operation intent -- "IT_GETATTR". - * In fact, one dentry should has only one chance to interact with the - * statahead thread, otherwise the statahead windows will be confused. + * When stating a dentry, kernel may trigger 'revalidate' or 'lookup' + * multiple times, eg. for 'getattr', 'getxattr' and etc. + * For patchless client, lookup intent is not accurate, which may + * misguide statahead. For example: + * The 'revalidate' call for 'getattr' and 'getxattr' of a dentry will + * have the same intent -- IT_GETATTR, while one dentry should access + * statahead cache once, otherwise statahead windows is messed up. * The solution is as following: - * Assign "lld_sa_generation" with "sai_generation" when a dentry - * "IT_GETATTR" for the first time, and the subsequent "IT_GETATTR" - * will bypass interacting with statahead thread for checking: - * "lld_sa_generation == lli_sai->sai_generation" + * Assign 'lld_sa_generation' with 'lli_sa_generation' when a dentry + * IT_GETATTR for the first time, and subsequent IT_GETATTR will + * bypass interacting with statahead cache by checking + * 'lld_sa_generation == lli->lli_sa_generation'. */ - if (ldd && lli->lli_sai && - ldd->lld_sa_generation == lli->lli_sai->sai_generation) + ldd = ll_d2d(dentry); + if (ldd != NULL && ldd->lld_sa_generation == lli->lli_sa_generation) return false; return true; } -static inline int -ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int only_unplug) -{ - if (!dentry_need_statahead(dir, *dentryp)) - return -EAGAIN; - - return do_statahead_enter(dir, dentryp, only_unplug); -} - /* llite ioctl register support rountine */ enum llioc_iter { LLIOC_CONT = 0, diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index f10df8a..041e2e1 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -536,14 +536,11 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, if (it == NULL || it->it_op == IT_GETXATTR) it = &lookup_it; - if (it->it_op == IT_GETATTR) { - rc = ll_statahead_enter(parent, &dentry, 0); - if (rc == 1) { - if (dentry == save) - GOTO(out, retval = NULL); - GOTO(out, retval = dentry); - } - } + if (it->it_op == IT_GETATTR && dentry_may_statahead(parent, dentry)) { + rc = ll_statahead(parent, &dentry, 0); + if (rc == 1) + RETURN(dentry == save ? NULL : dentry); + } if (it->it_op & IT_CREAT) opc = LUSTRE_OPC_CREATE; @@ -578,16 +575,12 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, } ll_lookup_finish_locks(it, dentry); - if (dentry == save) - GOTO(out, retval = NULL); - else - GOTO(out, retval = dentry); - out: - if (req) - ptlrpc_req_finished(req); - if (it->it_op == IT_GETATTR && (retval == NULL || retval == dentry)) - ll_statahead_mark(parent, dentry); - return retval; + retval = (dentry == save) ? NULL : dentry; + EXIT; + +out: + ptlrpc_req_finished(req); + return retval; } #ifdef HAVE_IOP_ATOMIC_OPEN diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index ea69243..27c5c2e9 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -57,11 +57,11 @@ typedef enum { /* sa_entry is not refcounted: statahead thread allocates it and do async stat, * and in async stat callback ll_statahead_interpret() will add it into - * sai_cb_entries, later statahead thread will call sa_handle_callback() to + * sai_interim_entries, later statahead thread will call sa_handle_callback() to * instantiate entry and move it into sai_entries, and then only scanner process * can access and free it. */ struct sa_entry { - /* link into sai_cb_entries or sai_entries */ + /* link into sai_interim_entries or sai_entries */ struct list_head se_list; /* link into sai hash table locally */ struct list_head se_hash; @@ -91,23 +91,20 @@ static inline int sa_unhashed(struct sa_entry *entry) return list_empty(&entry->se_hash); } -/* - * The entry only can be released by the caller, it is necessary to hold lock. - */ +/* sa_entry is ready to use */ static inline int sa_ready(struct sa_entry *entry) { smp_rmb(); return (entry->se_state != SA_ENTRY_INIT); } +/* hash value to put in sai_cache */ static inline int sa_hash(int val) { return val & LL_SA_CACHE_MASK; } -/* - * Insert entry to hash SA table. - */ +/* hash entry into sai_cache */ static inline void sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry) { @@ -118,9 +115,7 @@ sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry) spin_unlock(&sai->sai_cache_lock[i]); } -/* - * Remove entry from SA table. - */ +/* unhash entry from sai_cache */ static inline void sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry) { @@ -144,11 +139,13 @@ agl_first_entry(struct ll_statahead_info *sai) lli_agl_list); } +/* statahead window is full */ static inline int sa_sent_full(struct ll_statahead_info *sai) { return atomic_read(&sai->sai_cache_count) >= sai->sai_max; } +/* got async stat replies */ static inline int sa_has_callback(struct ll_statahead_info *sai) { return !list_empty(&sai->sai_interim_entries); @@ -172,7 +169,7 @@ static inline int sa_low_hit(struct ll_statahead_info *sai) } /* - * If the given index is behind of statahead window more than + * if the given index is behind of statahead window more than * SA_OMITTED_ENTRY_MAX, then it is old. */ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) @@ -181,7 +178,7 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) sai->sai_index); } -/* allocate sa_entry and add it into hash to let scanner process to find it */ +/* allocate sa_entry and hash it to allow scanner process to find it */ static struct sa_entry * sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len) { @@ -210,7 +207,8 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len) entry->se_qstr.len = len; entry->se_qstr.name = dname; - lli = ll_i2info(sai->sai_inode); + lli = ll_i2info(sai->sai_dentry->d_inode); + spin_lock(&lli->lli_sa_lock); INIT_LIST_HEAD(&entry->se_list); sa_rehash(sai, entry); @@ -221,7 +219,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len) RETURN(entry); } -/* free sa_entry which should have been unhashed and not in any list */ +/* free sa_entry, which should have been unhashed and not in any list */ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry) { CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n", @@ -235,8 +233,9 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry) atomic_dec(&sai->sai_cache_count); } -/* find sa_entry by name, used by directory scanner, lock is not needed because - * only scanner can remove the entry from hash. +/* + * find sa_entry by name, used by directory scanner, lock is not needed because + * only scanner can remove the entry from cache. */ static struct sa_entry * sa_get(struct ll_statahead_info *sai, const struct qstr *qstr) @@ -253,10 +252,11 @@ sa_get(struct ll_statahead_info *sai, const struct qstr *qstr) return NULL; } +/* unhash and unlink sa_entry, and then free it */ static inline void sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode); LASSERT(!sa_unhashed(entry)); LASSERT(!list_empty(&entry->se_list)); @@ -281,7 +281,7 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry) struct sa_entry *tmp, *next; if (entry != NULL && entry->se_state == SA_ENTRY_SUCC) { - struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); + struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode); sai->sai_hit++; sai->sai_consecutive_miss = 0; @@ -329,12 +329,14 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret) return (entry->se_index == sai->sai_index_wait); } -/* release resources used in async stat RPC, complete entry information and - * wakeup if necessary */ +/* + * release resources used in async stat RPC, update entry state and wakeup if + * scanner process it waiting on this entry. + */ static void sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode); struct md_enqueue_info *minfo = entry->se_minfo; struct ptlrpc_request *req = entry->se_req; bool wakeup; @@ -360,14 +362,12 @@ sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret) wake_up(&sai->sai_waitq); } -/* - * Insert inode into the list of sai_agls. - */ +/* insert inode into the list of sai_agls */ static void ll_agl_add(struct ll_statahead_info *sai, struct inode *inode, int index) { struct ll_inode_info *child = ll_i2info(inode); - struct ll_inode_info *parent = ll_i2info(sai->sai_inode); + struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode); int added = 0; spin_lock(&child->lli_agl_lock); @@ -391,24 +391,20 @@ static void ll_agl_add(struct ll_statahead_info *sai, wake_up(&sai->sai_agl_thread.t_ctl_waitq); } -static struct ll_statahead_info *ll_sai_alloc(void) +/* allocate sai */ +static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry) { struct ll_statahead_info *sai; - int i; + struct ll_inode_info *lli = ll_i2info(dentry->d_inode); + int i; ENTRY; OBD_ALLOC_PTR(sai); if (!sai) RETURN(NULL); + sai->sai_dentry = dget(dentry); atomic_set(&sai->sai_refcount, 1); - - spin_lock(&sai_generation_lock); - sai->sai_generation = ++sai_generation; - if (unlikely(sai_generation == 0)) - sai->sai_generation = ++sai_generation; - spin_unlock(&sai_generation_lock); - sai->sai_max = LL_SA_RPC_MIN; sai->sai_index = 1; init_waitqueue_head(&sai->sai_waitq); @@ -425,9 +421,27 @@ static struct ll_statahead_info *ll_sai_alloc(void) } atomic_set(&sai->sai_cache_count, 0); + spin_lock(&sai_generation_lock); + lli->lli_sa_generation = ++sai_generation; + if (unlikely(sai_generation == 0)) + lli->lli_sa_generation = ++sai_generation; + spin_unlock(&sai_generation_lock); + RETURN(sai); } +/* free sai */ +static inline void ll_sai_free(struct ll_statahead_info *sai) +{ + LASSERT(sai->sai_dentry != NULL); + dput(sai->sai_dentry); + OBD_FREE_PTR(sai); +} + +/* + * take refcount of sai if sai for @dir exists, which means statahead is on for + * this directory. + */ static inline struct ll_statahead_info *ll_sai_get(struct inode *dir) { struct ll_inode_info *lli = ll_i2info(dir); @@ -442,13 +456,17 @@ static inline struct ll_statahead_info *ll_sai_get(struct inode *dir) return sai; } +/* + * put sai refcount after use, if refcount reaches zero, free sai and sa_entries + * attached to it. + */ static void ll_sai_put(struct ll_statahead_info *sai) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode); if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) { struct sa_entry *entry, *next; - struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); + struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode); lli->lli_sai = NULL; spin_unlock(&lli->lli_sa_lock); @@ -465,8 +483,7 @@ static void ll_sai_put(struct ll_statahead_info *sai) LASSERT(atomic_read(&sai->sai_cache_count) == 0); LASSERT(agl_list_empty(sai)); - iput(sai->sai_inode); - OBD_FREE_PTR(sai); + ll_sai_free(sai); atomic_dec(&sbi->ll_sa_running); } } @@ -534,12 +551,14 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) EXIT; } -/* prepare inode for sa entry, add it into agl list, now sa_entry is ready - * to be used by scanner process. */ +/* + * prepare inode for sa entry, add it into agl list, now sa_entry is ready + * to be used by scanner process. + */ static void sa_instantiate(struct ll_statahead_info *sai, struct sa_entry *entry) { - struct inode *dir = sai->sai_inode; + struct inode *dir = sai->sai_dentry->d_inode; struct inode *child; struct md_enqueue_info *minfo; struct lookup_intent *it; @@ -610,12 +629,12 @@ out: sa_make_ready(sai, entry, rc); } -/* once there are async stat replies, instantiate sa_entry */ +/* once there are async stat replies, instantiate sa_entry from replies */ static void sa_handle_callback(struct ll_statahead_info *sai) { struct ll_inode_info *lli; - lli = ll_i2info(sai->sai_inode); + lli = ll_i2info(sai->sai_dentry->d_inode); while (sa_has_callback(sai)) { struct sa_entry *entry; @@ -632,25 +651,13 @@ static void sa_handle_callback(struct ll_statahead_info *sai) sa_instantiate(sai, entry); } - - spin_lock(&lli->lli_agl_lock); - while (!agl_list_empty(sai)) { - struct ll_inode_info *clli; - - clli = agl_first_entry(sai); - list_del_init(&clli->lli_agl_list); - spin_unlock(&lli->lli_agl_lock); - - ll_agl_trigger(&clli->lli_vfs_inode, sai); - - spin_lock(&lli->lli_agl_lock); - } - spin_unlock(&lli->lli_agl_lock); } -/* callback for async stat, because this is called in ptlrpcd context, we only - * put sa_entry in sai_cb_entries list, and let sa_handle_callback() to really - * prepare inode and instantiate sa_entry later. */ +/* + * callback for async stat RPC, because this is called in ptlrpcd context, we + * only put sa_entry in sai_interim_entries, and wake up statahead thread to + * really prepare inode and instantiate sa_entry later. + */ static int ll_statahead_interpret(struct ptlrpc_request *req, struct md_enqueue_info *minfo, int rc) { @@ -710,6 +717,7 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, RETURN(rc); } +/* finish async stat RPC arguments */ static void sa_fini_data(struct md_enqueue_info *minfo, struct ldlm_enqueue_info *einfo) { @@ -721,7 +729,9 @@ static void sa_fini_data(struct md_enqueue_info *minfo, OBD_FREE_PTR(einfo); } -/** +/* + * prepare arguments for async stat RPC. + * * There is race condition between "capa_put" and "ll_statahead_interpret" for * accessing "op_data.op_capa[1,2]" as following: * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling @@ -778,6 +788,7 @@ static int sa_prep_data(struct inode *dir, struct inode *child, return 0; } +/* async stat for file not found in dcache */ static int sa_lookup(struct inode *dir, struct sa_entry *entry) { struct md_enqueue_info *minfo; @@ -802,10 +813,11 @@ static int sa_lookup(struct inode *dir, struct sa_entry *entry) } /** - * similar to ll_revalidate_it(). - * \retval 1 -- dentry valid - * \retval 0 -- will send stat-ahead request - * \retval others -- prepare stat-ahead request failed + * async stat for file found in dcache, similar to .revalidate + * + * \retval 1 dentry valid, no RPC sent + * \retval 0 dentry invalid, will send async stat RPC + * \retval negative number upon error */ static int sa_revalidate(struct inode *dir, struct sa_entry *entry, struct dentry *dentry) @@ -854,6 +866,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry, RETURN(rc); } +/* async stat for file with @name */ static void sa_statahead(struct dentry *parent, const char *name, int len) { struct inode *dir = parent->d_inode; @@ -890,6 +903,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len) EXIT; } +/* async glimpse (agl) thread main function */ static int ll_agl_thread(void *arg) { struct dentry *parent = (struct dentry *)arg; @@ -961,6 +975,7 @@ static int ll_agl_thread(void *arg) RETURN(0); } +/* start agl thread */ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) { struct ptlrpc_thread *thread = &sai->sai_agl_thread; @@ -987,6 +1002,7 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) EXIT; } +/* statahead thread main function */ static int ll_statahead_thread(void *arg) { struct dentry *parent = (struct dentry *)arg; @@ -994,7 +1010,7 @@ static int ll_statahead_thread(void *arg) struct ll_inode_info *lli = ll_i2info(dir); struct ll_sb_info *sbi = ll_i2sbi(dir); struct ll_statahead_info *sai; - struct ptlrpc_thread *thread; + struct ptlrpc_thread *sa_thread; struct ptlrpc_thread *agl_thread; int first = 0; struct md_op_data *op_data; @@ -1006,9 +1022,9 @@ static int ll_statahead_thread(void *arg) ENTRY; sai = ll_sai_get(dir); - thread = &sai->sai_thread; + sa_thread = &sai->sai_thread; agl_thread = &sai->sai_agl_thread; - thread->t_pid = current_pid(); + sa_thread->t_pid = current_pid(); CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n", sai, parent->d_name.len, parent->d_name.name); @@ -1024,16 +1040,16 @@ static int ll_statahead_thread(void *arg) atomic_inc(&sbi->ll_sa_total); spin_lock(&lli->lli_sa_lock); - if (thread_is_init(thread)) + if (thread_is_init(sa_thread)) /* If someone else has changed the thread state * (e.g. already changed to SVC_STOPPING), we can't just * blindly overwrite that setting. */ - thread_set_flags(thread, SVC_RUNNING); + thread_set_flags(sa_thread, SVC_RUNNING); spin_unlock(&lli->lli_sa_lock); - wake_up(&thread->t_ctl_waitq); + wake_up(&sa_thread->t_ctl_waitq); ll_dir_chain_init(&chain); - while (pos != MDS_DIR_END_OFF && thread_is_running(thread)) { + while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) { struct lu_dirpage *dp; struct lu_dirent *ent; @@ -1051,7 +1067,7 @@ static int ll_statahead_thread(void *arg) dp = page_address(page); for (ent = lu_dirent_start(dp); - ent != NULL && thread_is_running(thread) && + ent != NULL && thread_is_running(sa_thread) && !sa_low_hit(sai); ent = lu_dirent_next(ent)) { __u64 hash; @@ -1101,16 +1117,32 @@ static int ll_statahead_thread(void *arg) /* wait for spare statahead window */ do { - l_wait_event(thread->t_ctl_waitq, + l_wait_event(sa_thread->t_ctl_waitq, !sa_sent_full(sai) || sa_has_callback(sai) || !agl_list_empty(sai) || - !thread_is_running(thread), + !thread_is_running(sa_thread), &lwi); sa_handle_callback(sai); + + spin_lock(&lli->lli_agl_lock); + while (sa_sent_full(sai) && + !agl_list_empty(sai)) { + struct ll_inode_info *clli; + + clli = agl_first_entry(sai); + list_del_init(&clli->lli_agl_list); + spin_unlock(&lli->lli_agl_lock); + + ll_agl_trigger(&clli->lli_vfs_inode, + sai); + + spin_lock(&lli->lli_agl_lock); + } + spin_unlock(&lli->lli_agl_lock); } while (sa_sent_full(sai) && - thread_is_running(thread)); + thread_is_running(sa_thread)); sa_statahead(parent, name, namelen); } @@ -1137,18 +1169,17 @@ static int ll_statahead_thread(void *arg) if (rc < 0) { spin_lock(&lli->lli_sa_lock); - thread_set_flags(thread, SVC_STOPPING); + thread_set_flags(sa_thread, SVC_STOPPING); lli->lli_sa_enabled = 0; spin_unlock(&lli->lli_sa_lock); } /* statahead is finished, but statahead entries need to be cached, wait * for file release to stop me. */ - while (thread_is_running(thread)) { - l_wait_event(thread->t_ctl_waitq, + while (thread_is_running(sa_thread)) { + l_wait_event(sa_thread->t_ctl_waitq, sa_has_callback(sai) || - !agl_list_empty(sai) || - !thread_is_running(thread), + !thread_is_running(sa_thread), &lwi); sa_handle_callback(sai); @@ -1178,7 +1209,7 @@ out: /* in case we're not woken up, timeout wait */ lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3), NULL, NULL); - l_wait_event(thread->t_ctl_waitq, + l_wait_event(sa_thread->t_ctl_waitq, sai->sai_sent == sai->sai_replied, &lwi); } @@ -1186,19 +1217,20 @@ out: sa_handle_callback(sai); spin_lock(&lli->lli_sa_lock); - thread_set_flags(thread, SVC_STOPPED); + thread_set_flags(sa_thread, SVC_STOPPED); spin_unlock(&lli->lli_sa_lock); - wake_up(&sai->sai_waitq); - wake_up(&thread->t_ctl_waitq); - ll_sai_put(sai); CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n", sai, parent->d_name.len, parent->d_name.name); - dput(parent); + + wake_up(&sai->sai_waitq); + wake_up(&sa_thread->t_ctl_waitq); + ll_sai_put(sai); + return rc; } -/* authorize opened dir handle @key to statahead later */ +/* authorize opened dir handle @key to statahead */ void ll_authorize_statahead(struct inode *dir, void *key) { struct ll_inode_info *lli = ll_i2info(dir); @@ -1217,8 +1249,10 @@ void ll_authorize_statahead(struct inode *dir, void *key) spin_unlock(&lli->lli_sa_lock); } -/* deauthorize opened dir handle @key to statahead, but statahead thread may - * still be running, notify it to quit. */ +/* + * deauthorize opened dir handle @key to statahead, and notify statahead thread + * to quit if it's running. + */ void ll_deauthorize_statahead(struct inode *dir, void *key) { struct ll_inode_info *lli = ll_i2info(dir); @@ -1247,20 +1281,21 @@ void ll_deauthorize_statahead(struct inode *dir, void *key) } enum { - /** - * not first dirent, or is "." - */ - LS_NONE_FIRST_DE = 0, - /** - * the first non-hidden dirent - */ - LS_FIRST_DE, - /** - * the first hidden dirent, that is "." - */ - LS_FIRST_DOT_DE + /** + * not first dirent, or is "." + */ + LS_NOT_FIRST_DE = 0, + /** + * the first non-hidden dirent + */ + LS_FIRST_DE, + /** + * the first hidden dirent, that is "." + */ + LS_FIRST_DOT_DE }; +/* file is first dirent under @dir */ static int is_first_dirent(struct inode *dir, struct dentry *dentry) { struct ll_dir_chain chain; @@ -1268,7 +1303,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) struct md_op_data *op_data; int dot_de; struct page *page = NULL; - int rc = LS_NONE_FIRST_DE; + int rc = LS_NOT_FIRST_DE; __u64 pos = 0; ENTRY; @@ -1347,7 +1382,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) if (target->len != namelen || memcmp(target->name, name, namelen) != 0) - rc = LS_NONE_FIRST_DE; + rc = LS_NOT_FIRST_DE; else if (!dot_de) rc = LS_FIRST_DE; else @@ -1380,13 +1415,27 @@ out: return rc; } +/** + * revalidate @dentryp from statahead cache + * + * \param[in] dir parent directory + * \param[in] sai sai structure + * \param[out] dentryp pointer to dentry which will be revalidated + * \param[in] unplug unplug statahead window only (normally for negative + * dentry) + * \retval 1 on success, dentry is saved in @dentryp + * \retval 0 if revalidation failed (no proper lock on client) + * \retval negative number upon error + */ static int revalidate_statahead_dentry(struct inode *dir, struct ll_statahead_info *sai, struct dentry **dentryp, - int only_unplug) + bool unplug) { struct sa_entry *entry = NULL; struct l_wait_info lwi = { 0 }; + struct ll_dentry_data *ldd; + struct ll_inode_info *lli; int rc = 0; ENTRY; @@ -1417,11 +1466,12 @@ static int revalidate_statahead_dentry(struct inode *dir, } } + if (unplug) + GOTO(out, rc = 1); + entry = sa_get(sai, &(*dentryp)->d_name); - if (entry == NULL || only_unplug) { - sa_put(sai, entry); - RETURN(entry ? 1 : -EAGAIN); - } + if (entry == NULL) + GOTO(out, rc = -EAGAIN); /* if statahead is busy in readdir, help it do post-work */ if (!sa_ready(entry) && sai->sai_in_readpage) @@ -1436,8 +1486,12 @@ static int revalidate_statahead_dentry(struct inode *dir, thread_is_stopped(&sai->sai_thread), &lwi); if (rc < 0) { - sa_put(sai, entry); - RETURN(-EAGAIN); + /* + * entry may not be ready, so it may be used by inflight + * statahead RPC, don't free it. + */ + entry = NULL; + GOTO(out, rc = -EAGAIN); } } @@ -1455,11 +1509,12 @@ static int revalidate_statahead_dentry(struct inode *dir, struct dentry *alias; alias = ll_splice_alias(inode, *dentryp); - if (IS_ERR(alias)) { - sa_put(sai, entry); - RETURN(PTR_ERR(alias)); - } + if (IS_ERR(alias)) + GOTO(out, rc = PTR_ERR(alias)); *dentryp = alias; + /* statahead prepared this inode, transfer inode + * refcount from sa_entry to dentry */ + entry->se_inode = NULL; } else if ((*dentryp)->d_inode != inode) { /* revalidate, but inode is recreated */ CDEBUG(D_READA, @@ -1472,12 +1527,8 @@ static int revalidate_statahead_dentry(struct inode *dir, (*dentryp)->d_name.name, PFID(ll_inode2fid((*dentryp)->d_inode)), PFID(ll_inode2fid(inode))); - sa_put(sai, entry); - RETURN(-ESTALE); - } else { - iput(inode); + GOTO(out, rc = -ESTALE); } - entry->se_inode = NULL; if ((bits & MDS_INODELOCK_LOOKUP) && d_lustre_invalid(*dentryp)) @@ -1485,16 +1536,41 @@ static int revalidate_statahead_dentry(struct inode *dir, ll_intent_release(&it); } } - +out: + /* + * statahead cached sa_entry can be used only once, and will be killed + * right after use, so if lookup/revalidate accessed statahead cache, + * set dentry ldd_sa_generation to parent lli_sa_generation, later if we + * stat this file again, we know we've done statahead before, see + * dentry_may_statahead(). + */ + ldd = ll_d2d(*dentryp); + lli = ll_i2info(dir); + /* ldd can be NULL if llite lookup failed. */ + if (ldd != NULL) + ldd->lld_sa_generation = lli->lli_sa_generation; sa_put(sai, entry); + RETURN(rc); } +/** + * start statahead thread + * + * \param[in] dir parent directory + * \param[in] dentry dentry that triggers statahead, normally the first + * dirent under @dir + * \retval -EAGAIN on success, because when this function is + * called, it's already in lookup call, so client should + * do it itself instead of waiting for statahead thread + * to do it asynchronously. + * \retval negative number upon error + */ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) { struct ll_inode_info *lli = ll_i2info(dir); struct ll_statahead_info *sai = NULL; - struct dentry *parent; + struct dentry *parent = dentry->d_parent; struct ptlrpc_thread *thread; struct l_wait_info lwi = { 0 }; struct task_struct *task; @@ -1503,72 +1579,41 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ rc = is_first_dirent(dir, dentry); - if (rc == LS_NONE_FIRST_DE) + if (rc == LS_NOT_FIRST_DE) /* It is not "ls -{a}l" operation, no need statahead for it. */ - GOTO(out, rc = -EAGAIN); + GOTO(out, rc = -EFAULT); - sai = ll_sai_alloc(); + sai = ll_sai_alloc(parent); if (sai == NULL) GOTO(out, rc = -ENOMEM); sai->sai_ls_all = (rc == LS_FIRST_DOT_DE); - sai->sai_inode = igrab(dir); - if (unlikely(sai->sai_inode == NULL)) { - CWARN("Do not start stat ahead on dying inode "DFID"\n", - PFID(&lli->lli_fid)); - GOTO(out, rc = -ESTALE); - } - - /* get parent reference count here, and put it in ll_statahead_thread */ - parent = dget(dentry->d_parent); - if (unlikely(sai->sai_inode != parent->d_inode)) { - struct ll_inode_info *nlli = ll_i2info(parent->d_inode); - - CWARN("Race condition, someone changed %.*s just now: " - "old parent "DFID", new parent "DFID"\n", - dentry->d_name.len, dentry->d_name.name, - PFID(&lli->lli_fid), PFID(&nlli->lli_fid)); - dput(parent); - iput(sai->sai_inode); - GOTO(out, rc = -EAGAIN); - } - CDEBUG(D_READA, "start statahead thread: sai %p, parent %.*s\n", - sai, parent->d_name.len, parent->d_name.name); - - /* if another process started statahead thread, or deauthorized current - * lli_opendir_key, don't start statahead. */ + /* if current lli_opendir_key was deauthorized, or dir re-opened by + * another process, don't start statahead, otherwise the newly spawned + * statahead thread won't be notified to quit. */ spin_lock(&lli->lli_sa_lock); if (unlikely(lli->lli_sai != NULL || lli->lli_opendir_key == NULL || lli->lli_opendir_pid != current->pid)) { spin_unlock(&lli->lli_sa_lock); - - dput(parent); - iput(sai->sai_inode); - GOTO(out, rc = -EAGAIN); + GOTO(out, rc = -EPERM); } lli->lli_sai = sai; spin_unlock(&lli->lli_sa_lock); atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running); + CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", + current_pid(), parent->d_name.len, parent->d_name.name); + task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u", lli->lli_opendir_pid); thread = &sai->sai_thread; if (IS_ERR(task)) { rc = PTR_ERR(task); - CERROR("cannot start ll_sa thread: rc = %d\n", rc); - dput(parent); - - spin_lock(&lli->lli_sa_lock); - thread_set_flags(thread, SVC_STOPPED); - thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); - spin_unlock(&lli->lli_sa_lock); - - ll_sai_put(sai); - LASSERT(lli->lli_sai == NULL); - RETURN(-EAGAIN); + CERROR("can't start ll_sa thread, rc: %d\n", rc); + GOTO(out, rc); } l_wait_event(thread->t_ctl_waitq, @@ -1583,29 +1628,35 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) RETURN(-EAGAIN); out: - if (sai != NULL) - OBD_FREE_PTR(sai); - - /* once we start statahead thread failed, disable statahead so - * subsequent won't waste time to try it. */ + /* once we start statahead thread failed, disable statahead so that + * subsequent stat won't waste time to try it. */ spin_lock(&lli->lli_sa_lock); lli->lli_sa_enabled = 0; + lli->lli_sai = NULL; spin_unlock(&lli->lli_sa_lock); + if (sai != NULL) + ll_sai_free(sai); + RETURN(rc); } /** - * Start statahead thread if this is the first dir entry. - * Otherwise if a thread is started already, wait it until it is ahead of me. - * \retval 1 -- find entry with lock in cache, the caller needs to do - * nothing. - * \retval 0 -- find entry in cache, but without lock, the caller needs - * refresh from MDS. - * \retval others -- the caller need to process as non-statahead. + * statahead entry function, this is called when client getattr on a file, it + * will start statahead thread if this is the first dir entry, else revalidate + * dentry from statahead cache. + * + * \param[in] dir parent directory + * \param[out] dentryp dentry to getattr + * \param[in] unplug unplug statahead window only (normally for negative + * dentry) + * \retval 1 on success + * \retval 0 revalidation from statahead cache failed, caller needs + * to getattr from server directly + * \retval negative number on error, caller often ignores this and + * then getattr from server */ -int do_statahead_enter(struct inode *dir, struct dentry **dentryp, - int only_unplug) +int ll_statahead(struct inode *dir, struct dentry **dentryp, bool unplug) { struct ll_statahead_info *sai; @@ -1613,13 +1664,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, if (sai != NULL) { int rc; - rc = revalidate_statahead_dentry(dir, sai, dentryp, - only_unplug); + rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug); CDEBUG(D_READA, "revalidate statahead %.*s: %d.\n", (*dentryp)->d_name.len, (*dentryp)->d_name.name, rc); ll_sai_put(sai); return rc; } - return start_statahead_thread(dir, *dentryp); } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 9ce58a9..09cfac5 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -1539,7 +1539,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, rp_param.rp_hash64), mdc_read_page_remote, &rp_param); if (IS_ERR(page)) { - CERROR("%s: read cache page: "DFID" at "LPU64": rc %ld\n", + CDEBUG(D_INFO, "%s: read cache page: "DFID" at "LPU64": %ld\n", exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rp_param.rp_off, PTR_ERR(page)); GOTO(out_unlock, rc = PTR_ERR(page)); -- 1.8.3.1