From: Fan Yong Date: Tue, 8 Nov 2011 02:28:54 +0000 (+0800) Subject: ORNL-7 replace statahead hacking with its own cache X-Git-Tag: 2.1.52~7 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=51074e76259ac428a7e7a7ac3f189971bf20122f ORNL-7 replace statahead hacking with its own cache Original statahead hacks dcache, it inserts dentry into dcache and associates with inode without holding parent's lock, which breaks VFS layer synchronization mechanism. The new statahead does not build dentry, it just pre-fetches inode's attributes and related ldlm locks, caches them in a small statahead cache against parent's inode. The statahead sponsor can search such small cache similar as searching dcache to look for what it wants. If cache hit, then builds dentry, associates with inode, and inserts into dcache by the statahead sponsor itself. Signed-off-by: Fan Yong Change-Id: I6948f8a438a938c51563468d775e676e4185e580 Reviewed-on: http://review.whamcloud.com/1208 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index c70e879..b54e656 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -1084,6 +1084,8 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, const struct ldlm_res_id *, ldlm_type_t type, ldlm_policy_data_t *, ldlm_mode_t mode, struct lustre_handle *, int unref); +ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh, + __u64 *bits); struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, __u32 *flags); void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 4d23a25..45e473f 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1252,11 +1252,10 @@ struct md_enqueue_info { struct md_op_data mi_data; struct lookup_intent mi_it; struct lustre_handle mi_lockh; - struct dentry *mi_dentry; struct inode *mi_dir; md_enqueue_cb_t mi_cb; + __u64 mi_cbdata; unsigned int mi_generation; - void *mi_cbdata; }; struct obd_ops { @@ -1565,7 +1564,7 @@ struct md_ops { struct ldlm_enqueue_info *); int (*m_revalidate_lock)(struct obd_export *, struct lookup_intent *, - struct lu_fid *); + struct lu_fid *, __u64 *bits); /* * NOTE: If adding ops, add another LPROCFS_MD_OP_INIT() line to diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 29c543c..fca1291 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -2243,13 +2243,13 @@ static inline int md_intent_getattr_async(struct obd_export *exp, static inline int md_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid) + struct lu_fid *fid, __u64 *bits) { int rc; ENTRY; EXP_CHECK_MD_OP(exp, revalidate_lock); EXP_MD_COUNTER_INCREMENT(exp, revalidate_lock); - rc = MDP(exp->exp_obd, revalidate_lock)(exp, it, fid); + rc = MDP(exp->exp_obd, revalidate_lock)(exp, it, fid, bits); RETURN(rc); } diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index f9184f3..12ff5ce 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -46,6 +46,12 @@ extern cfs_list_t ldlm_srv_namespace_list; extern cfs_semaphore_t ldlm_cli_namespace_lock; extern cfs_list_t ldlm_cli_namespace_list; +static inline int ldlm_res_eq(const struct ldlm_res_id *res0, + const struct ldlm_res_id *res1) +{ + return !memcmp(res0, res1, sizeof(*res0)); +} + static inline cfs_atomic_t *ldlm_namespace_nr(ldlm_side_t client) { return client == LDLM_NAMESPACE_SERVER ? diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 6a4d7fd..9378385 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1221,6 +1221,40 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, return rc ? mode : 0; } +ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh, + __u64 *bits) +{ + struct ldlm_lock *lock; + ldlm_mode_t mode = 0; + ENTRY; + + lock = ldlm_handle2lock(lockh); + if (lock != NULL) { + lock_res_and_lock(lock); + if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) + GOTO(out, mode); + + if (lock->l_flags & LDLM_FL_CBPENDING && + lock->l_readers == 0 && lock->l_writers == 0) + GOTO(out, mode); + + if (bits) + *bits = lock->l_policy_data.l_inodebits.bits; + mode = lock->l_granted_mode; + ldlm_lock_addref_internal_nolock(lock, mode); + } + + EXIT; + +out: + if (lock != NULL) { + unlock_res_and_lock(lock); + LDLM_LOCK_PUT(lock); + } + return mode; +} +EXPORT_SYMBOL(ldlm_revalidate_lock_handle); + /* Returns a referenced lock */ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index eb9cb22..99e77dd 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -427,12 +427,6 @@ static void *ldlm_res_hop_key(cfs_hlist_node_t *hnode) return &res->lr_name; } -static int ldlm_res_eq(const struct ldlm_res_id *res0, - const struct ldlm_res_id *res1) -{ - return !memcmp(res0, res1, sizeof(*res0)); -} - static int ldlm_res_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) { struct ldlm_resource *res; diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index cb24e66..42c1bd4 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -62,10 +62,9 @@ static void ll_release(struct dentry *de) ENTRY; LASSERT(de != NULL); lld = ll_d2d(de); - if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */ - EXIT; - return; - } + if (lld == NULL) /* NFS copies the de->d_op methods (bug 4655) */ + RETURN_EXIT; + if (lld->lld_it) { ll_intent_release(lld->lld_it); OBD_FREE(lld->lld_it, sizeof(*lld->lld_it)); @@ -404,7 +403,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, struct lookup_intent lookup_it = { .it_op = IT_LOOKUP }; struct obd_export *exp; struct inode *parent = de->d_parent->d_inode; - int rc, first = 0; + int rc; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name, @@ -449,13 +448,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, LASSERT(it); if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID)) - GOTO(out_sa, rc = 1); - - op_data = ll_prep_md_op_data(NULL, parent, de->d_inode, - de->d_name.name, de->d_name.len, - 0, LUSTRE_OPC_ANY, NULL); - if (IS_ERR(op_data)) - RETURN(PTR_ERR(op_data)); + RETURN(1); if ((it->it_op == IT_OPEN) && de->d_inode) { struct inode *inode = de->d_inode; @@ -501,17 +494,25 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, if it would be, we'll reopen the open request to MDS later during file open path */ cfs_up(&lli->lli_och_sem); - ll_finish_md_op_data(op_data); RETURN(1); } else { cfs_up(&lli->lli_och_sem); } } - if (it->it_op == IT_GETATTR) - first = ll_statahead_enter(parent, &de, 0); + if (it->it_op == IT_GETATTR) { + rc = ll_statahead_enter(parent, &de, 0); + if (rc == 1) + goto mark; + } do_lock: + op_data = ll_prep_md_op_data(NULL, parent, de->d_inode, + de->d_name.name, de->d_name.len, + 0, LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + it->it_create_mode &= ~cfs_curproc_umask(); it->it_create_mode |= M_CHECK_STALE; rc = md_intent_lock(exp, op_data, NULL, 0, it, @@ -519,13 +520,6 @@ do_lock: &req, ll_md_blocking_ast, 0); it->it_create_mode &= ~M_CHECK_STALE; ll_finish_md_op_data(op_data); - if (it->it_op == IT_GETATTR && !first) - /* If there are too many locks on client-side, then some - * locks taken by statahead maybe dropped automatically - * before the real "revalidate" using them. */ - ll_statahead_exit(parent, de, req == NULL ? rc : 0); - else if (first == -EEXIST) - ll_statahead_mark(parent, de); /* If req is NULL, then md_intent_lock only tried to do a lock match; * if all was well, it will return 1 if it found locks, 0 otherwise. */ @@ -560,14 +554,7 @@ revalidate_finish: /* unfortunately ll_intent_lock may cause a callback and revoke our * dentry */ - cfs_spin_lock(&ll_lookup_lock); - spin_lock(&dcache_lock); - lock_dentry(de); - __d_drop(de); - unlock_dentry(de); - d_rehash_cond(de, 0); - spin_unlock(&dcache_lock); - cfs_spin_unlock(&ll_lookup_lock); + ll_dentry_rehash(de, 0); out: /* We do not free request as it may be reused during following lookup @@ -588,14 +575,13 @@ out: de->d_name.name, de, de->d_parent, de->d_inode, atomic_read(&de->d_count)); ll_set_lock_data(exp, de->d_inode, it, &bits); - if (de->d_flags & DCACHE_LUSTRE_INVALID && - bits & MDS_INODELOCK_LOOKUP) { - lock_dentry(de); - de->d_flags &= ~DCACHE_LUSTRE_INVALID; - unlock_dentry(de); - } + ll_dentry_reset_flags(de, bits); ll_lookup_finish_locks(it, de); } + +mark: + if (it != NULL && it->it_op == IT_GETATTR && rc > 0) + ll_statahead_mark(parent, de); RETURN(rc); /* @@ -656,15 +642,9 @@ out_sa: * For rc == 1 case, should not return directly to prevent losing * statahead windows; for rc == 0 case, the "lookup" will be done later. */ - if (it && it->it_op == IT_GETATTR && rc == 1) { - first = ll_statahead_enter(parent, &de, 0); - if (first >= 0) - ll_statahead_exit(parent, de, 1); - else if (first == -EEXIST) - ll_statahead_mark(parent, de); - } - - return rc; + if (it != NULL && it->it_op == IT_GETATTR && rc == 1) + ll_statahead_enter(parent, &de, 1); + goto mark; } int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd) diff --git a/lustre/llite/file.c b/lustre/llite/file.c index cb55e3f..e5a417a 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -519,8 +519,8 @@ int ll_file_open(struct inode *inode, struct file *file) fd->fd_file = file; if (S_ISDIR(inode->i_mode)) { cfs_spin_lock(&lli->lli_sa_lock); - if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) { - LASSERT(lli->lli_sai == NULL); + if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0 && + lli->lli_sai == NULL) { lli->lli_opendir_key = fd; lli->lli_opendir_pid = cfs_curproc_pid(); opendir_set = 1; diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index fd2e04a..a7b314b 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -614,6 +614,7 @@ struct lookup_intent *ll_convert_intent(struct open_intent *oit, int lookup_flags); int ll_lookup_it_finish(struct ptlrpc_request *request, struct lookup_intent *it, void *data); +struct dentry *ll_find_alias(struct inode *inode, struct dentry *de); /* llite/rw.c */ int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); @@ -1107,25 +1108,29 @@ void et_fini(struct eacl_table *et); /* statahead.c */ -#define LL_SA_RPC_MIN 2 -#define LL_SA_RPC_DEF 32 -#define LL_SA_RPC_MAX 8192 +#define LL_SA_RPC_MIN 2 +#define LL_SA_RPC_DEF 32 +#define LL_SA_RPC_MAX 8192 + +#define LL_SA_CACHE_BIT 5 +#define LL_SA_CACHE_SIZE (1 << LL_SA_CACHE_BIT) +#define LL_SA_CACHE_MASK (LL_SA_CACHE_SIZE - 1) /* per inode struct, for dir only */ struct ll_statahead_info { struct inode *sai_inode; - unsigned int sai_generation; /* generation for statahead */ cfs_atomic_t sai_refcount; /* when access this struct, hold * refcount */ - unsigned int sai_sent; /* stat requests sent count */ - unsigned int sai_replied; /* stat requests which received - * reply */ + unsigned int sai_generation; /* generation for statahead */ unsigned int sai_max; /* max ahead of lookup */ - unsigned int sai_index; /* index of statahead entry */ - unsigned int sai_index_next; /* index for the next statahead - * entry to be stated */ - unsigned int sai_hit; /* hit count */ - unsigned int sai_miss; /* miss count: + __u64 sai_sent; /* stat requests sent count */ + __u64 sai_replied; /* stat requests which received + * reply */ + __u64 sai_index; /* index of statahead entry */ + __u64 sai_index_wait; /* index of entry which is the + * caller is waiting for */ + __u64 sai_hit; /* hit count */ + __u64 sai_miss; /* miss count: * for "ls -al" case, it includes * hidden dentry miss; * for "ls -l" case, it does not @@ -1137,48 +1142,43 @@ struct ll_statahead_info { unsigned int sai_miss_hidden;/* "ls -al", but first dentry * is not a hidden one */ unsigned int sai_skip_hidden;/* skipped hidden dentry count */ - unsigned int sai_ls_all:1; /* "ls -al", do stat-ahead for + unsigned int sai_ls_all:1, /* "ls -al", do stat-ahead for * hidden entries */ + sai_in_readpage:1;/* statahead is in readdir()*/ cfs_waitq_t sai_waitq; /* stat-ahead wait queue */ struct ptlrpc_thread sai_thread; /* stat-ahead thread */ cfs_list_t sai_entries_sent; /* entries sent out */ cfs_list_t sai_entries_received; /* entries returned */ cfs_list_t sai_entries_stated; /* entries stated */ + cfs_list_t sai_cache[LL_SA_CACHE_SIZE]; + cfs_spinlock_t sai_cache_lock[LL_SA_CACHE_SIZE]; + cfs_atomic_t sai_cache_count; /* entry count in cache */ }; -int do_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup); -void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result); +int do_statahead_enter(struct inode *dir, struct dentry **dentry, + int only_unplug); void ll_stop_statahead(struct inode *dir, void *key); -static inline -void ll_statahead_mark(struct inode *dir, struct dentry *dentry) +static inline void +ll_statahead_mark(struct inode *dir, struct dentry *dentry) { - struct ll_inode_info *lli; - struct ll_dentry_data *ldd = ll_d2d(dentry); - - /* dentry has been move to other directory, no need mark */ - if (unlikely(dir != dentry->d_parent->d_inode)) - return; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; + struct ll_dentry_data *ldd = ll_d2d(dentry); - lli = ll_i2info(dir); /* not the same process, don't mark */ if (lli->lli_opendir_pid != cfs_curproc_pid()) return; - cfs_spin_lock(&lli->lli_sa_lock); - if (likely(lli->lli_sai != NULL && ldd != NULL)) - ldd->lld_sa_generation = lli->lli_sai->sai_generation; - cfs_spin_unlock(&lli->lli_sa_lock); + if (sai != NULL && ldd != NULL) + ldd->lld_sa_generation = sai->sai_generation; } -static inline -int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) +static inline int +ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int only_unplug) { struct ll_inode_info *lli; - struct ll_dentry_data *ldd = ll_d2d(*dentryp); - - if (unlikely(dir == NULL)) - return -EAGAIN; + struct ll_dentry_data *ldd; if (ll_i2sbi(dir)->ll_sa_max == 0) return -ENOTSUPP; @@ -1188,11 +1188,12 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) if (lli->lli_opendir_pid != cfs_curproc_pid()) return -EAGAIN; + ldd = ll_d2d(*dentryp); /* - * When "ls" a dentry, the system trigger more than once "revalidate" or - * "lookup", for "getattr", for "getxattr", and maybe for others. + * When stats a dentry, the system trigger more than once "revalidate" + * or "lookup", for "getattr", for "getxattr", and maybe for others. * Under patchless client mode, the operation intent is not accurate, - * it maybe misguide the statahead thread. For example: + * which maybe misguide the statahead thread. For example: * The "revalidate" call for "getattr" and "getxattr" of a dentry maybe * have the same operation intent -- "IT_GETATTR". * In fact, one dentry should has only one chance to interact with the @@ -1207,7 +1208,7 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) ldd->lld_sa_generation == lli->lli_sai->sai_generation) return -EAGAIN; - return do_statahead_enter(dir, dentryp, lookup); + return do_statahead_enter(dir, dentryp, only_unplug); } /* llite ioctl register support rountine */ @@ -1354,6 +1355,32 @@ static inline void ll_set_lock_data(struct obd_export *exp, struct inode *inode, *bits = it->d.lustre.it_lock_bits; } +static inline void ll_dentry_rehash(struct dentry *dentry, int locked) +{ + if (!locked) { + cfs_spin_lock(&ll_lookup_lock); + spin_lock(&dcache_lock); + } + lock_dentry(dentry); + __d_drop(dentry); + unlock_dentry(dentry); + d_rehash_cond(dentry, 0); + if (!locked) { + spin_unlock(&dcache_lock); + cfs_spin_unlock(&ll_lookup_lock); + } +} + +static inline void ll_dentry_reset_flags(struct dentry *dentry, __u64 bits) +{ + if (bits & MDS_INODELOCK_LOOKUP && + dentry->d_flags & DCACHE_LUSTRE_INVALID) { + lock_dentry(dentry); + dentry->d_flags &= ~DCACHE_LUSTRE_INVALID; + unlock_dentry(dentry); + } +} + #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0) /* Compatibility for old (1.8) compiled userspace quota code */ struct if_quotactl_18 { diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 9fd4b1a..883f181 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -496,7 +496,6 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, CERROR("md_getattr failed for root: rc = %d\n", err); GOTO(out_lock_cn_cb, err); } - memset(&lmd, 0, sizeof(lmd)); err = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp, sbi->ll_md_exp, &lmd); if (err) { @@ -2028,8 +2027,6 @@ int ll_prep_inode(struct inode **inode, LASSERT(*inode || sb); sbi = sb ? ll_s2sbi(sb) : ll_i2sbi(*inode); - memset(&md, 0, sizeof(struct lustre_md)); - rc = md_get_lustre_md(sbi->ll_md_exp, req, sbi->ll_dt_exp, sbi->ll_md_exp, &md); if (rc) diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 26c5e6ee..dbf6ee0 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -348,7 +348,7 @@ static void ll_d_add(struct dentry *de, struct inode *inode) * in ll_revalidate_it. After revaliadate inode will be have hashed aliases * and it triggers BUG_ON in d_instantiate_unique (bug #10954). */ -static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) +struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) { struct list_head *tmp; struct dentry *dentry; @@ -385,11 +385,8 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) continue; dget_locked(dentry); - lock_dentry(dentry); - __d_drop(dentry); - unlock_dentry(dentry); ll_dops_init(dentry, 0, 1); - d_rehash_cond(dentry, 0); /* avoid taking dcache_lock inside */ + ll_dentry_rehash(dentry, 1); spin_unlock(&dcache_lock); cfs_spin_unlock(&ll_lookup_lock); iput(inode); @@ -466,12 +463,7 @@ int ll_lookup_it_finish(struct ptlrpc_request *request, ll_dops_init(*de, 1, 1); } /* we have lookup look - unhide dentry */ - if ((*de)->d_flags & DCACHE_LUSTRE_INVALID && - bits & MDS_INODELOCK_LOOKUP) { - lock_dentry(*de); - (*de)->d_flags &= ~DCACHE_LUSTRE_INVALID; - unlock_dentry(*de); - } + ll_dentry_reset_flags(*de, bits); } else { __u64 ibits; @@ -514,7 +506,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, struct md_op_data *op_data; struct it_cb_data icbd; __u32 opc; - int rc, first = 0; + int rc; ENTRY; if (dentry->d_name.len > ll_i2sbi(parent)->ll_namelen) @@ -540,11 +532,11 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, } if (it->it_op == IT_GETATTR) { - first = ll_statahead_enter(parent, &dentry, 1); - if (first >= 0) { - ll_statahead_exit(parent, dentry, first); - if (first == 1) - RETURN(retval = dentry); + rc = ll_statahead_enter(parent, &dentry, 0); + if (rc == 1) { + if (dentry == save) + GOTO(out, retval = NULL); + GOTO(out, retval = dentry); } } @@ -577,9 +569,6 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, GOTO(out, retval = ERR_PTR(rc)); } - if (first == -EEXIST) - ll_statahead_mark(parent, dentry); - if ((it->it_op & IT_OPEN) && dentry->d_inode && !S_ISREG(dentry->d_inode->i_mode) && !S_ISDIR(dentry->d_inode->i_mode)) { @@ -594,6 +583,8 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, out: if (req) ptlrpc_req_finished(req); + if (it->it_op == IT_GETATTR && (retval == NULL || retval == dentry)) + ll_statahead_mark(parent, dentry); return retval; } diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 0d83b1e..c3fa164 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -52,40 +52,92 @@ #include #include "llite_internal.h" -struct ll_sai_entry { +#define SA_OMITTED_ENTRY_MAX 8ULL + +typedef enum { + /** negative values are for error cases */ + SA_ENTRY_INIT = 0, /** init entry */ + SA_ENTRY_SUCC = 1, /** stat succeed */ + SA_ENTRY_INVA = 2, /** invalid entry */ + SA_ENTRY_DEST = 3, /** entry to be destroyed */ +} se_stat_t; + +struct ll_sa_entry { + /* link into sai->sai_entries_{sent,received,stated} */ cfs_list_t se_list; - unsigned int se_index; - int se_stat; - struct ptlrpc_request *se_req; + /* link into sai hash table locally */ + cfs_list_t se_hash; + /* entry reference count */ + cfs_atomic_t se_refcount; + /* entry index in the sai */ + __u64 se_index; + /* low layer ldlm lock handle */ + __u64 se_handle; + /* entry status */ + se_stat_t se_stat; + /* entry size, contains name */ + int se_size; + /* pointer to async getattr enqueue info */ struct md_enqueue_info *se_minfo; -}; - -enum { - SA_ENTRY_UNSTATED = 0, - SA_ENTRY_STATED + /* pointer to the async getattr request */ + struct ptlrpc_request *se_req; + /* pointer to the target inode */ + struct inode *se_inode; + /* entry name */ + struct qstr se_qstr; }; static unsigned int sai_generation = 0; static cfs_spinlock_t sai_generation_lock = CFS_SPIN_LOCK_UNLOCKED; -/** - * Check whether first entry was stated already or not. - * No need to hold lli_sa_lock, for: - * (1) it is me that remove entry from the list - * (2) the statahead thread only add new entry to the list +static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry) +{ + return cfs_list_empty(&entry->se_list); +} + +static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry) +{ + return cfs_list_empty(&entry->se_hash); +} + +/* + * The entry only can be released by the caller, it is necessary to hold lock. */ -static int ll_sai_entry_stated(struct ll_statahead_info *sai) +static inline int ll_sa_entry_stated(struct ll_sa_entry *entry) { - struct ll_sai_entry *entry; - int rc = 0; - - if (!cfs_list_empty(&sai->sai_entries_stated)) { - entry = cfs_list_entry(sai->sai_entries_stated.next, - struct ll_sai_entry, se_list); - if (entry->se_index == sai->sai_index_next) - rc = 1; - } - return rc; + smp_rmb(); + return (entry->se_stat != SA_ENTRY_INIT); +} + +static inline int ll_sa_entry_hash(int val) +{ + return val & LL_SA_CACHE_MASK; +} + +/* + * Insert entry to hash SA table. + */ +static inline void +ll_sa_entry_enhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +{ + int i = ll_sa_entry_hash(entry->se_qstr.hash); + + cfs_spin_lock(&sai->sai_cache_lock[i]); + cfs_list_add_tail(&entry->se_hash, &sai->sai_cache[i]); + cfs_spin_unlock(&sai->sai_cache_lock[i]); +} + +/* + * Remove entry from SA table. + */ +static inline void +ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +{ + int i = ll_sa_entry_hash(entry->se_qstr.hash); + + cfs_spin_lock(&sai->sai_cache_lock[i]); + cfs_list_del_init(&entry->se_hash); + cfs_spin_unlock(&sai->sai_cache_lock[i]); } static inline int sa_received_empty(struct ll_statahead_info *sai) @@ -95,7 +147,7 @@ static inline int sa_received_empty(struct ll_statahead_info *sai) static inline int sa_not_full(struct ll_statahead_info *sai) { - return !!(sai->sai_index < sai->sai_index_next + sai->sai_max); + return (cfs_atomic_read(&sai->sai_cache_count) < sai->sai_max); } static inline int sa_is_running(struct ll_statahead_info *sai) @@ -117,6 +169,7 @@ static inline int sa_is_stopped(struct ll_statahead_info *sai) * (1) hit ratio less than 80% * or * (2) consecutive miss more than 8 + * then means low hit. */ static inline int sa_low_hit(struct ll_statahead_info *sai) { @@ -124,65 +177,295 @@ static inline int sa_low_hit(struct ll_statahead_info *sai) (sai->sai_consecutive_miss > 8)); } -/** - * process the deleted entry's member and free the entry. - * (1) release intent - * (2) free md_enqueue_info - * (3) drop dentry's ref count - * (4) release request's ref count +/* + * If the given index is behind of statahead window more than + * SA_OMITTED_ENTRY_MAX, then it is old. + */ +static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) +{ + return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX < + sai->sai_index); +} + +/* + * Insert it into sai_entries_sent tail when init. + */ +static struct ll_sa_entry * +ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, + const char *name, int len) +{ + struct ll_inode_info *lli; + struct ll_sa_entry *entry; + int entry_size; + char *dname; + ENTRY; + + entry_size = sizeof(struct ll_sa_entry) + (len & ~3) + 4; + OBD_ALLOC(entry, entry_size); + if (unlikely(entry == NULL)) + RETURN(ERR_PTR(-ENOMEM)); + + CDEBUG(D_READA, "alloc sai entry %.*s(%p) index "LPU64"\n", + len, name, entry, index); + + entry->se_index = index; + + /* + * Statahead entry reference rules: + * + * 1) When statahead entry is initialized, its reference is set as 2. + * One reference is used by the directory scanner. When the scanner + * searches the statahead cache for the given name, it can perform + * lockless hash lookup (only the scanner can remove entry from hash + * list), and once found, it needn't to call "atomic_inc()" for the + * entry reference. So the performance is improved. After using the + * statahead entry, the scanner will call "atomic_dec()" to drop the + * reference held when initialization. If it is the last reference, + * the statahead entry will be freed. + * + * 2) All other threads, including statahead thread and ptlrpcd thread, + * when they process the statahead entry, the reference for target + * should be held to guarantee the entry will not be released by the + * directory scanner. After processing the entry, these threads will + * drop the entry reference. If it is the last reference, the entry + * will be freed. + * + * The second reference when initializes the statahead entry is used + * by the statahead thread, following the rule 2). + */ + cfs_atomic_set(&entry->se_refcount, 2); + entry->se_stat = SA_ENTRY_INIT; + entry->se_size = entry_size; + dname = (char *)entry + sizeof(struct ll_sa_entry); + memcpy(dname, name, len); + dname[len] = 0; + entry->se_qstr.hash = full_name_hash(name, len); + entry->se_qstr.len = len; + entry->se_qstr.name = dname; + + lli = ll_i2info(sai->sai_inode); + cfs_spin_lock(&lli->lli_sa_lock); + cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent); + cfs_spin_unlock(&lli->lli_sa_lock); + + cfs_atomic_inc(&sai->sai_cache_count); + ll_sa_entry_enhash(sai, entry); + + RETURN(entry); +} + +/* + * Used by the directory scanner to search entry with name. + * + * Only the caller can remove the entry from hash, so it is unnecessary to hold + * hash lock. It is caller's duty to release the init refcount on the entry, so + * it is also unnecessary to increase refcount on the entry. */ -static void ll_sai_entry_cleanup(struct ll_sai_entry *entry, int free) +static struct ll_sa_entry * +ll_sa_entry_get_byname(struct ll_statahead_info *sai, const struct qstr *qstr) +{ + struct ll_sa_entry *entry; + int i = ll_sa_entry_hash(qstr->hash); + + cfs_list_for_each_entry(entry, &sai->sai_cache[i], se_hash) { + if (entry->se_qstr.hash == qstr->hash && + entry->se_qstr.len == qstr->len && + memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) + return entry; + } + return NULL; +} + +/* + * Used by the async getattr request callback to find entry with index. + * + * Inside lli_sa_lock to prevent others to change the list during the search. + * It needs to increase entry refcount before returning to guarantee that the + * entry cannot be freed by others. + */ +static struct ll_sa_entry * +ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index) +{ + struct ll_sa_entry *entry; + + cfs_list_for_each_entry(entry, &sai->sai_entries_sent, se_list) { + if (entry->se_index == index) { + cfs_atomic_inc(&entry->se_refcount); + return entry; + } + if (entry->se_index > index) + break; + } + return NULL; +} + +static void ll_sa_entry_cleanup(struct ll_statahead_info *sai, + struct ll_sa_entry *entry) { struct md_enqueue_info *minfo = entry->se_minfo; struct ptlrpc_request *req = entry->se_req; - ENTRY; if (minfo) { entry->se_minfo = NULL; ll_intent_release(&minfo->mi_it); - dput(minfo->mi_dentry); iput(minfo->mi_dir); OBD_FREE_PTR(minfo); } + if (req) { entry->se_req = NULL; ptlrpc_req_finished(req); } - if (free) { - LASSERT(cfs_list_empty(&entry->se_list)); - OBD_FREE_PTR(entry); +} + +static void ll_sa_entry_put(struct ll_statahead_info *sai, + struct ll_sa_entry *entry) +{ + if (cfs_atomic_dec_and_test(&entry->se_refcount)) { + CDEBUG(D_READA, "free sai entry %.*s(%p) index "LPU64"\n", + entry->se_qstr.len, entry->se_qstr.name, entry, + entry->se_index); + + LASSERT(ll_sa_entry_unhashed(entry)); + LASSERT(ll_sa_entry_unlinked(entry)); + + ll_sa_entry_cleanup(sai, entry); + if (entry->se_inode) + iput(entry->se_inode); + + OBD_FREE(entry, entry->se_size); + cfs_atomic_dec(&sai->sai_cache_count); } +} - EXIT; +static inline void +do_sai_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +{ + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + + ll_sa_entry_unhash(sai, entry); + + cfs_spin_lock(&lli->lli_sa_lock); + entry->se_stat = SA_ENTRY_DEST; + if (likely(!ll_sa_entry_unlinked(entry))) + cfs_list_del_init(&entry->se_list); + cfs_spin_unlock(&lli->lli_sa_lock); + + ll_sa_entry_put(sai, entry); +} + +/* + * Delete it from sai_entries_stated list when fini. + */ +static void +ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +{ + struct ll_sa_entry *pos, *next; + + if (entry) + do_sai_entry_fini(sai, entry); + + /* drop old entry from sent list */ + cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_sent, + se_list) { + if (is_omitted_entry(sai, pos->se_index)) + do_sai_entry_fini(sai, pos); + else + break; + } + + /* drop old entry from stated list */ + cfs_list_for_each_entry_safe(pos, next, &sai->sai_entries_stated, + se_list) { + if (is_omitted_entry(sai, pos->se_index)) + do_sai_entry_fini(sai, pos); + else + break; + } +} + +/* + * Inside lli_sa_lock. + */ +static void +do_sai_entry_to_stated(struct ll_statahead_info *sai, + struct ll_sa_entry *entry, int rc) +{ + struct ll_sa_entry *se; + cfs_list_t *pos = &sai->sai_entries_stated; + + if (!ll_sa_entry_unlinked(entry)) + cfs_list_del_init(&entry->se_list); + + cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { + if (se->se_index < entry->se_index) { + pos = &se->se_list; + break; + } + } + + cfs_list_add(&entry->se_list, pos); + entry->se_stat = rc; +} + +/* + * Move entry to sai_entries_stated and sort with the index. + * \retval 1 -- entry to be destroyed. + * \retval 0 -- entry is inserted into stated list. + */ +static int +ll_sa_entry_to_stated(struct ll_statahead_info *sai, + struct ll_sa_entry *entry, int rc) +{ + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + int ret = 1; + + ll_sa_entry_cleanup(sai, entry); + + cfs_spin_lock(&lli->lli_sa_lock); + if (likely(entry->se_stat != SA_ENTRY_DEST)) { + do_sai_entry_to_stated(sai, entry, rc); + ret = 0; + } + cfs_spin_unlock(&lli->lli_sa_lock); + + return ret; } static struct ll_statahead_info *ll_sai_alloc(void) { struct ll_statahead_info *sai; + int i; + ENTRY; OBD_ALLOC_PTR(sai); if (!sai) - return NULL; + RETURN(NULL); + cfs_atomic_set(&sai->sai_refcount, 1); cfs_spin_lock(&sai_generation_lock); sai->sai_generation = ++sai_generation; if (unlikely(sai_generation == 0)) sai->sai_generation = ++sai_generation; cfs_spin_unlock(&sai_generation_lock); - cfs_atomic_set(&sai->sai_refcount, 1); sai->sai_max = LL_SA_RPC_MIN; cfs_waitq_init(&sai->sai_waitq); cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); CFS_INIT_LIST_HEAD(&sai->sai_entries_sent); CFS_INIT_LIST_HEAD(&sai->sai_entries_received); CFS_INIT_LIST_HEAD(&sai->sai_entries_stated); - return sai; + for (i = 0; i < LL_SA_CACHE_SIZE; i++) { + CFS_INIT_LIST_HEAD(&sai->sai_cache[i]); + cfs_spin_lock_init(&sai->sai_cache_lock[i]); + } + cfs_atomic_set(&sai->sai_cache_count, 0); + + RETURN(sai); } -static inline -struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai) +static inline struct ll_statahead_info * +ll_sai_get(struct ll_statahead_info *sai) { - LASSERT(sai); cfs_atomic_inc(&sai->sai_refcount); return sai; } @@ -190,15 +473,11 @@ struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai) static void ll_sai_put(struct ll_statahead_info *sai) { struct inode *inode = sai->sai_inode; - struct ll_inode_info *lli; + struct ll_inode_info *lli = ll_i2info(inode); ENTRY; - LASSERT(inode != NULL); - lli = ll_i2info(inode); - LASSERT(lli->lli_sai == sai); - if (cfs_atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) { - struct ll_sai_entry *entry, *next; + struct ll_sa_entry *entry, *next; if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) { /* It is race case, the interpret callback just hold @@ -216,323 +495,192 @@ static void ll_sai_put(struct ll_statahead_info *sai) if (sai->sai_sent > sai->sai_replied) CDEBUG(D_READA,"statahead for dir "DFID" does not " - "finish: [sent:%u] [replied:%u]\n", + "finish: [sent:"LPU64"] [replied:"LPU64"]\n", PFID(&lli->lli_fid), sai->sai_sent, sai->sai_replied); cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_sent, se_list) { - cfs_list_del_init(&entry->se_list); - ll_sai_entry_cleanup(entry, 1); - } - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_received, - se_list) { - cfs_list_del_init(&entry->se_list); - ll_sai_entry_cleanup(entry, 1); - } - cfs_list_for_each_entry_safe(entry, next, - &sai->sai_entries_stated, - se_list) { - cfs_list_del_init(&entry->se_list); - ll_sai_entry_cleanup(entry, 1); - } - iput(inode); - OBD_FREE_PTR(sai); - } - EXIT; -} - -/** - * insert it into sai_entries_sent tail when init. - */ -static struct ll_sai_entry * -ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index) -{ - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - struct ll_sai_entry *entry; - ENTRY; - - OBD_ALLOC_PTR(entry); - if (entry == NULL) - RETURN(ERR_PTR(-ENOMEM)); - - CDEBUG(D_READA, "alloc sai entry %p index %u\n", - entry, index); - entry->se_index = index; - entry->se_stat = SA_ENTRY_UNSTATED; - - cfs_spin_lock(&lli->lli_sa_lock); - cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent); - cfs_spin_unlock(&lli->lli_sa_lock); - - RETURN(entry); -} - -/** - * delete it from sai_entries_stated head when fini, it need not - * to process entry's member. - */ -static int ll_sai_entry_fini(struct ll_statahead_info *sai) -{ - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - struct ll_sai_entry *entry; - int rc = 0; - ENTRY; - - cfs_spin_lock(&lli->lli_sa_lock); - sai->sai_index_next++; - if (likely(!cfs_list_empty(&sai->sai_entries_stated))) { - entry = cfs_list_entry(sai->sai_entries_stated.next, - struct ll_sai_entry, se_list); - if (entry->se_index < sai->sai_index_next) { - cfs_list_del_init(&entry->se_list); - rc = entry->se_stat; - OBD_FREE_PTR(entry); - } - } else { - LASSERT(sa_is_stopped(sai)); - } - cfs_spin_unlock(&lli->lli_sa_lock); - - RETURN(rc); -} - -/** - * inside lli_sa_lock. - * \retval NULL : can not find the entry in sai_entries_sent with the index - * \retval entry: find the entry in sai_entries_sent with the index - */ -static struct ll_sai_entry * -ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat, - struct ptlrpc_request *req, struct md_enqueue_info *minfo) -{ - struct ll_sai_entry *entry; - ENTRY; - - if (!cfs_list_empty(&sai->sai_entries_sent)) { - cfs_list_for_each_entry(entry, &sai->sai_entries_sent, - se_list) { - if (entry->se_index == index) { - entry->se_stat = stat; - entry->se_req = ptlrpc_request_addref(req); - entry->se_minfo = minfo; - RETURN(entry); - } else if (entry->se_index > index) { - RETURN(NULL); - } - } - } - RETURN(NULL); -} - -/** - * inside lli_sa_lock. - * Move entry to sai_entries_received and - * insert it into sai_entries_received tail. - */ -static inline void -ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry) -{ - if (!cfs_list_empty(&entry->se_list)) - cfs_list_del_init(&entry->se_list); - cfs_list_add_tail(&entry->se_list, &sai->sai_entries_received); -} + &sai->sai_entries_sent, se_list) + do_sai_entry_fini(sai, entry); -/** - * Move entry to sai_entries_stated and - * sort with the index. - */ -static int -ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry) -{ - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - struct ll_sai_entry *se; - ENTRY; - - ll_sai_entry_cleanup(entry, 0); + LASSERT(sa_received_empty(sai)); - cfs_spin_lock(&lli->lli_sa_lock); - if (!cfs_list_empty(&entry->se_list)) - cfs_list_del_init(&entry->se_list); + cfs_list_for_each_entry_safe(entry, next, + &sai->sai_entries_stated, se_list) + do_sai_entry_fini(sai, entry); - /* stale entry */ - if (unlikely(entry->se_index < sai->sai_index_next)) { - cfs_spin_unlock(&lli->lli_sa_lock); - OBD_FREE_PTR(entry); - RETURN(0); - } + LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0); - cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { - if (se->se_index < entry->se_index) { - cfs_list_add(&entry->se_list, &se->se_list); - cfs_spin_unlock(&lli->lli_sa_lock); - RETURN(1); - } + iput(inode); + OBD_FREE_PTR(sai); } - /* - * I am the first entry. - */ - cfs_list_add(&entry->se_list, &sai->sai_entries_stated); - cfs_spin_unlock(&lli->lli_sa_lock); - RETURN(1); + EXIT; } -/** - * finish lookup/revalidate. - */ -static int do_statahead_interpret(struct ll_statahead_info *sai) +static void do_statahead_interpret(struct ll_statahead_info *sai, + struct ll_sa_entry *target) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - struct ll_sai_entry *entry; - struct ptlrpc_request *req; + struct inode *dir = sai->sai_inode; + struct inode *child; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_sa_entry *entry; struct md_enqueue_info *minfo; struct lookup_intent *it; - struct dentry *dentry; - int rc = 0; + struct ptlrpc_request *req; struct mdt_body *body; + int rc = 0; ENTRY; cfs_spin_lock(&lli->lli_sa_lock); - LASSERT(!sa_received_empty(sai)); - entry = cfs_list_entry(sai->sai_entries_received.next, - struct ll_sai_entry, se_list); + if (target != NULL && target->se_req != NULL && + !cfs_list_empty(&target->se_list)) { + entry = target; + } else if (unlikely(sa_received_empty(sai))) { + cfs_spin_unlock(&lli->lli_sa_lock); + RETURN_EXIT; + } else { + entry = cfs_list_entry(sai->sai_entries_received.next, + struct ll_sa_entry, se_list); + } + + cfs_atomic_inc(&entry->se_refcount); cfs_list_del_init(&entry->se_list); cfs_spin_unlock(&lli->lli_sa_lock); - if (unlikely(entry->se_index < sai->sai_index_next)) { - CWARN("Found stale entry: [index %u] [next %u]\n", - entry->se_index, sai->sai_index_next); - ll_sai_entry_cleanup(entry, 1); - RETURN(0); - } - - if (entry->se_stat != SA_ENTRY_STATED) - GOTO(out, rc = entry->se_stat); + LASSERT(entry->se_handle != 0); - req = entry->se_req; minfo = entry->se_minfo; it = &minfo->mi_it; - dentry = minfo->mi_dentry; - + req = entry->se_req; body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); if (body == NULL) GOTO(out, rc = -EFAULT); - if (dentry->d_inode == NULL) { + child = entry->se_inode; + if (child == NULL) { /* * lookup. */ - struct dentry *save = dentry; - struct it_cb_data icbd = { - .icbd_parent = minfo->mi_dir, - .icbd_childp = &dentry - }; - LASSERT(fid_is_zero(&minfo->mi_data.op_fid2)); /* XXX: No fid in reply, this is probaly cross-ref case. * SA can't handle it yet. */ if (body->valid & OBD_MD_MDS) GOTO(out, rc = -EAGAIN); - - /* Here dentry->d_inode might be NULL, because the entry may - * have been removed before we start doing stat ahead. */ - rc = ll_lookup_it_finish(req, it, &icbd); - if (!rc) - ll_lookup_finish_locks(it, dentry); - - if (dentry != save) { - minfo->mi_dentry = dentry; - dput(save); - } } else { /* * revalidate. */ - if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) { - ll_unhash_aliases(dentry->d_inode); - GOTO(out, rc = -EAGAIN); + /* unlinked and re-created with the same name */ + if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1))){ + entry->se_inode = NULL; + iput(child); + child = NULL; } + } - rc = ll_revalidate_it_finish(req, it, dentry); - if (rc) { - ll_unhash_aliases(dentry->d_inode); - GOTO(out, rc); - } + it->d.lustre.it_lock_handle = entry->se_handle; + rc = md_revalidate_lock(ll_i2mdexp(dir), it, NULL, NULL); + if (rc != 1) + GOTO(out, rc = -EAGAIN); + + rc = ll_prep_inode(&child, req, dir->i_sb); + if (rc) + GOTO(out, rc); + + CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n", + child, child->i_ino, child->i_generation); + ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL); + + entry->se_inode = child; - cfs_spin_lock(&ll_lookup_lock); - spin_lock(&dcache_lock); - lock_dentry(dentry); - __d_drop(dentry); - dentry->d_flags &= ~DCACHE_LUSTRE_INVALID; - unlock_dentry(dentry); - d_rehash_cond(dentry, 0); - spin_unlock(&dcache_lock); - cfs_spin_unlock(&ll_lookup_lock); - - ll_lookup_finish_locks(it, dentry); - } EXIT; out: - /* The "ll_sai_entry_to_stated()" will drop related ldlm ibits lock - * reference count with ll_intent_drop_lock() called in spite of the + /* The "ll_sa_entry_to_stated()" will drop related ldlm ibits lock + * reference count by calling "ll_intent_drop_lock()" in spite of the * above operations failed or not. Do not worry about calling * "ll_intent_drop_lock()" more than once. */ - if (likely(ll_sai_entry_to_stated(sai, entry))) + rc = ll_sa_entry_to_stated(sai, entry, rc < 0 ? rc : SA_ENTRY_SUCC); + if (rc == 0 && entry->se_index == sai->sai_index_wait && target == NULL) cfs_waitq_signal(&sai->sai_waitq); - return rc; + ll_sa_entry_put(sai, entry); } static int ll_statahead_interpret(struct ptlrpc_request *req, - struct md_enqueue_info *minfo, - int rc) + struct md_enqueue_info *minfo, int rc) { - struct lookup_intent *it = &minfo->mi_it; - struct dentry *dentry = minfo->mi_dentry; + struct lookup_intent *it = &minfo->mi_it; struct inode *dir = minfo->mi_dir; struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai; - struct ll_sai_entry *entry; + struct ll_statahead_info *sai = NULL; + struct ll_sa_entry *entry; + int wakeup; ENTRY; - CDEBUG(D_READA, "interpret statahead %.*s rc %d\n", - dentry->d_name.len, dentry->d_name.name, rc); + if (it_disposition(it, DISP_LOOKUP_NEG)) + rc = -ENOENT; cfs_spin_lock(&lli->lli_sa_lock); /* stale entry */ if (unlikely(lli->lli_sai == NULL || - lli->lli_sai->sai_generation != minfo->mi_generation)) { + lli->lli_sai->sai_generation != minfo->mi_generation)) { cfs_spin_unlock(&lli->lli_sa_lock); - ll_intent_release(it); - dput(dentry); - iput(dir); - OBD_FREE_PTR(minfo); - RETURN(-ESTALE); + GOTO(out, rc = -ESTALE); } else { sai = ll_sai_get(lli->lli_sai); - entry = ll_sai_entry_set(sai, - (unsigned int)(long)minfo->mi_cbdata, - rc < 0 ? rc : SA_ENTRY_STATED, req, - minfo); - LASSERT(entry != NULL); - if (likely(sa_is_running(sai))) { - ll_sai_entry_to_received(sai, entry); + if (unlikely(!sa_is_running(sai))) { + sai->sai_replied++; + cfs_spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -EBADFD); + } + + entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata); + if (entry == NULL) { + sai->sai_replied++; + cfs_spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -EIDRM); + } + + cfs_list_del_init(&entry->se_list); + if (rc != 0) { sai->sai_replied++; + do_sai_entry_to_stated(sai, entry, rc); cfs_spin_unlock(&lli->lli_sa_lock); - cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); + if (entry->se_index == sai->sai_index_wait) + cfs_waitq_signal(&sai->sai_waitq); } else { - if (!cfs_list_empty(&entry->se_list)) - cfs_list_del_init(&entry->se_list); + entry->se_minfo = minfo; + entry->se_req = ptlrpc_request_addref(req); + /* Release the async ibits lock ASAP to avoid deadlock + * when statahead thread tries to enqueue lock on parent + * for readpage and other tries to enqueue lock on child + * with parent's lock held, for example: unlink. */ + entry->se_handle = it->d.lustre.it_lock_handle; + ll_intent_drop_lock(it); + wakeup = sa_received_empty(sai); + cfs_list_add_tail(&entry->se_list, + &sai->sai_entries_received); sai->sai_replied++; cfs_spin_unlock(&lli->lli_sa_lock); - ll_sai_entry_cleanup(entry, 1); + if (wakeup) + cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); } - ll_sai_put(sai); - RETURN(rc); + ll_sa_entry_put(sai, entry); } + + EXIT; + +out: + if (rc != 0) { + ll_intent_release(it); + iput(dir); + OBD_FREE_PTR(minfo); + } + if (sai != NULL) + ll_sai_put(sai); + return rc; } static void sa_args_fini(struct md_enqueue_info *minfo, @@ -555,8 +703,8 @@ static void sa_args_fini(struct md_enqueue_info *minfo, * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling * "md_intent_getattr_async". */ -static int sa_args_init(struct inode *dir, struct dentry *dentry, - struct md_enqueue_info **pmi, +static int sa_args_init(struct inode *dir, struct inode *child, + struct qstr *qstr, struct md_enqueue_info **pmi, struct ldlm_enqueue_info **pei, struct obd_capa **pcapa) { @@ -575,9 +723,8 @@ static int sa_args_init(struct inode *dir, struct dentry *dentry, return -ENOMEM; } - op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode, - dentry->d_name.name, dentry->d_name.len, - 0, LUSTRE_OPC_ANY, NULL); + op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, qstr->name, + qstr->len, 0, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) { OBD_FREE_PTR(einfo); OBD_FREE_PTR(minfo); @@ -585,11 +732,10 @@ static int sa_args_init(struct inode *dir, struct dentry *dentry, } minfo->mi_it.it_op = IT_GETATTR; - minfo->mi_dentry = dentry; minfo->mi_dir = igrab(dir); minfo->mi_cb = ll_statahead_interpret; minfo->mi_generation = lli->lli_sai->sai_generation; - minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index; + minfo->mi_cbdata = lli->lli_sai->sai_index; einfo->ei_type = LDLM_IBITS; einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); @@ -606,10 +752,7 @@ static int sa_args_init(struct inode *dir, struct dentry *dentry, return 0; } -/** - * similar to ll_lookup_it(). - */ -static int do_sa_lookup(struct inode *dir, struct dentry *dentry) +static int do_sa_lookup(struct inode *dir, struct ll_sa_entry *entry) { struct md_enqueue_info *minfo; struct ldlm_enqueue_info *einfo; @@ -617,7 +760,7 @@ static int do_sa_lookup(struct inode *dir, struct dentry *dentry) int rc; ENTRY; - rc = sa_args_init(dir, dentry, &minfo, &einfo, capas); + rc = sa_args_init(dir, NULL, &entry->se_qstr, &minfo, &einfo, capas); if (rc) RETURN(rc); @@ -638,10 +781,12 @@ static int do_sa_lookup(struct inode *dir, struct dentry *dentry) * \retval 0 -- will send stat-ahead request * \retval others -- prepare stat-ahead request failed */ -static int do_sa_revalidate(struct inode *dir, struct dentry *dentry) +static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, + struct dentry *dentry) { struct inode *inode = dentry->d_inode; - struct lookup_intent it = { .it_op = IT_GETATTR }; + struct lookup_intent it = { .it_op = IT_GETATTR, + .d.lustre.it_lock_handle = 0 }; struct md_enqueue_info *minfo; struct ldlm_enqueue_info *einfo; struct obd_capa *capas[2]; @@ -657,86 +802,74 @@ static int do_sa_revalidate(struct inode *dir, struct dentry *dentry) if (unlikely(dentry == dentry->d_sb->s_root)) RETURN(1); - rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode)); + entry->se_inode = igrab(inode); + rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),NULL); if (rc == 1) { + entry->se_handle = it.d.lustre.it_lock_handle; ll_intent_release(&it); RETURN(1); } - rc = sa_args_init(dir, dentry, &minfo, &einfo, capas); - if (rc) + rc = sa_args_init(dir, inode, &entry->se_qstr, &minfo, &einfo, capas); + if (rc) { + entry->se_inode = NULL; + iput(inode); RETURN(rc); + } rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo); if (!rc) { capa_put(capas[0]); capa_put(capas[1]); } else { + entry->se_inode = NULL; + iput(inode); sa_args_fini(minfo, einfo); } RETURN(rc); } -static inline void ll_name2qstr(struct qstr *q, const char *name, int namelen) -{ - q->name = name; - q->len = namelen; - q->hash = full_name_hash(name, namelen); -} - -static int ll_statahead_one(struct dentry *parent, const char* entry_name, - int entry_name_len) +static void ll_statahead_one(struct dentry *parent, const char* entry_name, + int entry_name_len) { struct inode *dir = parent->d_inode; struct ll_inode_info *lli = ll_i2info(dir); struct ll_statahead_info *sai = lli->lli_sai; - struct qstr name; struct dentry *dentry = NULL; - struct ll_sai_entry *se; + struct ll_sa_entry *entry; int rc; + int rc1; ENTRY; - if (parent->d_flags & DCACHE_LUSTRE_INVALID) { - CDEBUG(D_READA, "parent dentry@%p %.*s is " - "invalid, skip statahead\n", - parent, parent->d_name.len, parent->d_name.name); - RETURN(-EINVAL); - } - - se = ll_sai_entry_init(sai, sai->sai_index); - if (IS_ERR(se)) - RETURN(PTR_ERR(se)); + entry = ll_sa_entry_alloc(sai, sai->sai_index, entry_name, + entry_name_len); + if (IS_ERR(entry)) + RETURN_EXIT; - ll_name2qstr(&name, entry_name, entry_name_len); - dentry = d_lookup(parent, &name); - if (!dentry) { - dentry = d_alloc(parent, &name); - if (dentry) - rc = do_sa_lookup(dir, dentry); - else - GOTO(out, rc = -ENOMEM); - } else { - rc = do_sa_revalidate(dir, dentry); - } + dentry = d_lookup(parent, &entry->se_qstr); + if (!dentry) + rc = do_sa_lookup(dir, entry); + else + rc = do_sa_revalidate(dir, entry, dentry); - EXIT; + if (dentry != NULL) + dput(dentry); -out: if (rc) { - if (dentry != NULL) - dput(dentry); - se->se_stat = rc < 0 ? rc : SA_ENTRY_STATED; - CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n", - se, se->se_index, se->se_stat, rc); - if (ll_sai_entry_to_stated(sai, se)) + rc1 = ll_sa_entry_to_stated(sai, entry, + rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); + if (rc1 == 0 && entry->se_index == sai->sai_index_wait) cfs_waitq_signal(&sai->sai_waitq); } else { sai->sai_sent++; } sai->sai_index++; - return rc; + /* drop one refcount on entry by ll_sa_entry_alloc */ + ll_sa_entry_put(sai, entry); + + EXIT; } static int ll_statahead_thread(void *arg) @@ -779,10 +912,10 @@ static int ll_statahead_thread(void *arg) if (IS_ERR(page)) { rc = PTR_ERR(page); CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 - "/%u: [rc %d] [parent %u]\n", + "/"LPU64": [rc %d] [parent %u]\n", PFID(ll_inode2fid(dir)), pos, sai->sai_index, rc, lli->lli_opendir_pid); - break; + GOTO(out, rc); } dp = page_address(page); @@ -830,19 +963,18 @@ static int ll_statahead_thread(void *arg) /* * don't stat-ahead first entry. */ - if (unlikely(!first)) { - first++; + if (unlikely(++first == 1)) continue; - } keep_de: l_wait_event(thread->t_ctl_waitq, - !sa_is_running(sai) || sa_not_full(sai) || - !sa_received_empty(sai), + sa_not_full(sai) || + !sa_received_empty(sai) || + !sa_is_running(sai), &lwi); - while (!sa_received_empty(sai) && sa_is_running(sai)) - do_statahead_interpret(sai); + while (!sa_received_empty(sai)) + do_statahead_interpret(sai, NULL); if (unlikely(!sa_is_running(sai))) { ll_release_page(page, 0); @@ -855,11 +987,7 @@ keep_de: */ goto keep_de; - rc = ll_statahead_one(parent, name, namelen); - if (rc < 0) { - ll_release_page(page, 0); - GOTO(out, rc); - } + ll_statahead_one(parent, name, namelen); } pos = le64_to_cpu(dp->ldp_hash_end); if (pos == MDS_DIR_END_OFF) { @@ -869,15 +997,18 @@ keep_de: ll_release_page(page, 0); while (1) { l_wait_event(thread->t_ctl_waitq, - !sa_is_running(sai) || !sa_received_empty(sai) || - sai->sai_sent == sai->sai_replied, + sai->sai_sent == sai->sai_replied|| + !sa_is_running(sai), &lwi); - if (!sa_received_empty(sai) && - sa_is_running(sai)) - do_statahead_interpret(sai); - else - GOTO(out, rc); + + while (!sa_received_empty(sai)) + do_statahead_interpret(sai, NULL); + + if ((sai->sai_sent == sai->sai_replied && + sa_received_empty(sai)) || + !sa_is_running(sai)) + GOTO(out, rc = 0); } } else if (1) { /* @@ -887,7 +1018,9 @@ keep_de: ll_release_page(page, le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); lli->lli_sa_pos = pos; + sai->sai_in_readpage = 1; page = ll_get_dir_page(NULL, dir, pos, &chain); + sai->sai_in_readpage = 0; } else { LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); ll_release_page(page, 1); @@ -901,6 +1034,16 @@ keep_de: out: ll_dir_chain_fini(&chain); cfs_spin_lock(&lli->lli_sa_lock); + if (!sa_received_empty(sai)) { + thread->t_flags = SVC_STOPPING; + cfs_spin_unlock(&lli->lli_sa_lock); + + /* To release the resources held by received entries. */ + while (!sa_received_empty(sai)) + do_statahead_interpret(sai, NULL); + + cfs_spin_lock(&lli->lli_sa_lock); + } thread->t_flags = SVC_STOPPED; cfs_spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&sai->sai_waitq); @@ -970,7 +1113,7 @@ enum { */ LS_FIRST_DE, /** - * the first hidden dirent, that is "." + * the first hidden dirent, that is "." */ LS_FIRST_DOT_DE }; @@ -1008,9 +1151,16 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) dp = page_address(page); for (ent = lu_dirent_start(dp); ent != NULL; ent = lu_dirent_next(ent)) { + __u64 hash; int namelen; char *name; + hash = le64_to_cpu(ent->lde_hash); + /* The ll_get_dir_page() can return any page containing + * the given hash which may be not the start hash. */ + if (unlikely(hash < pos)) + continue; + namelen = le16_to_cpu(ent->lde_namelen); if (unlikely(namelen == 0)) /* @@ -1085,38 +1235,83 @@ out: return rc; } +static void +ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) +{ + struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); + int hit; + ENTRY; + + if (entry != NULL && entry->se_stat == SA_ENTRY_SUCC) + hit = 1; + else + hit = 0; + + ll_sa_entry_fini(sai, entry); + if (hit) { + sai->sai_hit++; + sai->sai_consecutive_miss = 0; + sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); + } else { + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + + sai->sai_miss++; + sai->sai_consecutive_miss++; + if (sa_low_hit(sai) && sa_is_running(sai)) { + atomic_inc(&sbi->ll_sa_wrong); + CDEBUG(D_READA, "Statahead for dir "DFID" hit " + "ratio too low: hit/miss "LPU64"/"LPU64 + ", sent/replied "LPU64"/"LPU64", stopping " + "statahead thread: pid %d\n", + PFID(&lli->lli_fid), sai->sai_hit, + sai->sai_miss, sai->sai_sent, + sai->sai_replied, cfs_curproc_pid()); + cfs_spin_lock(&lli->lli_sa_lock); + if (!sa_is_stopped(sai)) + sai->sai_thread.t_flags = SVC_STOPPING; + cfs_spin_unlock(&lli->lli_sa_lock); + } + } + + if (!sa_is_stopped(sai)) + cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); + + EXIT; +} + /** * Start statahead thread if this is the first dir entry. * Otherwise if a thread is started already, wait it until it is ahead of me. - * \retval 0 -- stat ahead thread process such dentry, for lookup, it miss - * \retval 1 -- stat ahead thread process such dentry, for lookup, it hit - * \retval -EEXIST -- stat ahead thread started, and this is the first dentry - * \retval -EBADFD -- statahead thread exit and not dentry available - * \retval -EAGAIN -- try to stat by caller - * \retval others -- error + * \retval 1 -- find entry with lock in cache, the caller needs to do + * nothing. + * \retval 0 -- find entry in cache, but without lock, the caller needs + * refresh from MDS. + * \retval others -- the caller need to process as non-statahead. */ -int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) +int do_statahead_enter(struct inode *dir, struct dentry **dentryp, + int only_unplug) { - struct ll_inode_info *lli; - struct ll_statahead_info *sai; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; struct dentry *parent; - struct l_wait_info lwi = { 0 }; - int rc = 0; + struct ll_sa_entry *entry; + struct l_wait_info lwi = { 0 }; + int rc = 0; ENTRY; - LASSERT(dir != NULL); - lli = ll_i2info(dir); LASSERT(lli->lli_opendir_pid == cfs_curproc_pid()); - sai = lli->lli_sai; if (sai) { if (unlikely(sa_is_stopped(sai) && - cfs_list_empty(&sai->sai_entries_stated))) - RETURN(-EBADFD); + cfs_list_empty(&sai->sai_entries_stated))) { + /* to release resource */ + ll_stop_statahead(dir, lli->lli_opendir_key); + RETURN(-EAGAIN); + } if ((*dentryp)->d_name.name[0] == '.') { - if (likely(sai->sai_ls_all || - sai->sai_miss_hidden >= sai->sai_skip_hidden)) { + if (sai->sai_ls_all || + sai->sai_miss_hidden >= sai->sai_skip_hidden) { /* * Hidden dentry is the first one, or statahead * thread does not skip so many hidden dentries @@ -1137,40 +1332,68 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) * "sai_ls_all" enabled as above. */ sai->sai_miss_hidden++; - RETURN(-ENOENT); + RETURN(-EAGAIN); } } - if (!ll_sai_entry_stated(sai)) { - /* - * thread started already, avoid double-stat. - */ - lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + entry = ll_sa_entry_get_byname(sai, &(*dentryp)->d_name); + if (entry == NULL || only_unplug) { + ll_sai_unplug(sai, entry); + RETURN(entry ? 1 : -EAGAIN); + } + + while (!ll_sa_entry_stated(entry) && + sai->sai_in_readpage && + !sa_received_empty(sai)) + do_statahead_interpret(sai, entry); + + if (!ll_sa_entry_stated(entry)) { + sai->sai_index_wait = entry->se_index; + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL, + LWI_ON_SIGNAL_NOOP, NULL); rc = l_wait_event(sai->sai_waitq, - ll_sai_entry_stated(sai) || + ll_sa_entry_stated(entry) || sa_is_stopped(sai), &lwi); - if (unlikely(rc == -EINTR)) - RETURN(rc); + if (rc < 0) { + ll_sai_unplug(sai, entry); + RETURN(-EAGAIN); + } } - if (lookup) { - struct dentry *result; - - result = d_lookup((*dentryp)->d_parent, - &(*dentryp)->d_name); - if (result) { - LASSERT(result != *dentryp); - /* BUG 16303: do not drop reference count for - * "*dentryp", VFS will do that by itself. */ - *dentryp = result; - RETURN(1); + if (entry->se_stat == SA_ENTRY_SUCC && + entry->se_inode != NULL) { + struct inode *inode = entry->se_inode; + struct lookup_intent it = { .it_op = IT_GETATTR, + .d.lustre.it_lock_handle = + entry->se_handle }; + struct ll_dentry_data *lld; + __u64 bits; + + rc = md_revalidate_lock(ll_i2mdexp(dir), &it, + ll_inode2fid(inode), &bits); + if (rc == 1) { + if ((*dentryp)->d_inode == NULL) { + *dentryp = ll_find_alias(inode, + *dentryp); + lld = ll_d2d(*dentryp); + if (unlikely(lld == NULL)) + ll_dops_init(*dentryp, 1, 1); + } else { + LASSERT((*dentryp)->d_inode == inode); + + ll_dentry_rehash(*dentryp, 0); + iput(inode); + } + entry->se_inode = NULL; + + ll_dentry_reset_flags(*dentryp, bits); + ll_intent_release(&it); } } - /* - * do nothing for revalidate. - */ - RETURN(0); + + ll_sai_unplug(sai, entry); + RETURN(rc); } /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ @@ -1188,7 +1411,6 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) if (unlikely(sai->sai_inode == NULL)) { CWARN("Do not start stat ahead on dying inode "DFID"\n", PFID(&lli->lli_fid)); - OBD_FREE_PTR(sai); GOTO(out, rc = -ESTALE); } @@ -1203,8 +1425,7 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) PFID(&lli->lli_fid), PFID(&nlli->lli_fid)); dput(parent); iput(sai->sai_inode); - OBD_FREE_PTR(sai); - RETURN(-EAGAIN); + GOTO(out, rc = -EAGAIN); } lli->lli_sai = sai; @@ -1230,67 +1451,11 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) RETURN(-EEXIST); out: + if (sai != NULL) + OBD_FREE_PTR(sai); cfs_spin_lock(&lli->lli_sa_lock); lli->lli_opendir_key = NULL; lli->lli_opendir_pid = 0; cfs_spin_unlock(&lli->lli_sa_lock); return rc; } - -/** - * update hit/miss count. - */ -void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result) -{ - struct ll_inode_info *lli; - struct ll_statahead_info *sai; - struct ll_sb_info *sbi; - struct ll_dentry_data *ldd = ll_d2d(dentry); - int rc; - ENTRY; - - LASSERT(dir != NULL); - lli = ll_i2info(dir); - LASSERT(lli->lli_opendir_pid == cfs_curproc_pid()); - sai = lli->lli_sai; - LASSERT(sai != NULL); - sbi = ll_i2sbi(dir); - - rc = ll_sai_entry_fini(sai); - /* rc == -ENOENT means such dentry was removed just between statahead - * readdir and pre-fetched, count it as hit. - * - * result == -ENOENT has two meanings: - * 1. such dentry was removed just between statahead pre-fetched and - * main process stat such dentry. - * 2. main process stat non-exist dentry. - * We can not distinguish such two cases, just count them as miss. */ - if (result >= 1 || unlikely(rc == -ENOENT)) { - sai->sai_hit++; - sai->sai_consecutive_miss = 0; - sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); - } else { - sai->sai_miss++; - sai->sai_consecutive_miss++; - if (sa_low_hit(sai) && sa_is_running(sai)) { - atomic_inc(&sbi->ll_sa_wrong); - CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio " - "too low: hit/miss %u/%u, sent/replied %u/%u, " - "stopping statahead thread: pid %d\n", - PFID(&lli->lli_fid), sai->sai_hit, - sai->sai_miss, sai->sai_sent, - sai->sai_replied, cfs_curproc_pid()); - cfs_spin_lock(&lli->lli_sa_lock); - if (!sa_is_stopped(sai)) - sai->sai_thread.t_flags = SVC_STOPPING; - cfs_spin_unlock(&lli->lli_sa_lock); - } - } - - if (!sa_is_stopped(sai)) - cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); - if (likely(ldd != NULL)) - ldd->lld_sa_generation = sai->sai_generation; - - EXIT; -} diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 380c6d7..4aef69a 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -3075,7 +3075,7 @@ int lmv_intent_getattr_async(struct obd_export *exp, } int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid) + struct lu_fid *fid, __u64 *bits) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -3091,7 +3091,7 @@ int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); - rc = md_revalidate_lock(tgt->ltd_exp, it, fid); + rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits); RETURN(rc); } diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index 353f295..db5b1e4 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -159,7 +159,7 @@ static inline void mdc_set_capa_size(struct ptlrpc_request *req, } int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid); + struct lu_fid *fid, __u64 *bits); int mdc_intent_getattr_async(struct obd_export *exp, struct md_enqueue_info *minfo, diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 4d496cf..d75508b 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -844,7 +844,7 @@ static int mdc_finish_intent_lock(struct obd_export *exp, } int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, - struct lu_fid *fid) + struct lu_fid *fid, __u64 *bits) { /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's @@ -855,16 +855,25 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, ldlm_mode_t mode; ENTRY; - fid_build_reg_res_name(fid, &res_id); - policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? + if (it->d.lustre.it_lock_handle) { + lockh.cookie = it->d.lustre.it_lock_handle; + mode = ldlm_revalidate_lock_handle(&lockh, bits); + } else { + fid_build_reg_res_name(fid, &res_id); + policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP; + mode = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, + LDLM_IBITS, &policy, + LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0); + } - mode = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, - &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0); if (mode) { it->d.lustre.it_lock_handle = lockh.cookie; it->d.lustre.it_lock_mode = mode; + } else { + it->d.lustre.it_lock_handle = 0; + it->d.lustre.it_lock_mode = 0; } RETURN(!!mode); @@ -920,7 +929,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, /* We could just return 1 immediately, but since we should only * be called in revalidate_it if we already have a lock, let's * verify that. */ - rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2); + it->d.lustre.it_lock_handle = 0; + rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL); /* Only return failure if it was not GETATTR by cfid (from inode_revalidate) */ if (rc || op_data->op_namelen != 0)