From c40ce65134a14965ff8df57ddc0d9527630a96cd Mon Sep 17 00:00:00 2001 From: fanyong Date: Thu, 24 Apr 2008 04:41:11 +0000 Subject: [PATCH] Branch b1_6 b=15406 i=huanghua i=tappro i=vitaly Back port dir_SA fixes from HEAD (b1_8_dir_ra) to b1_6. --- lustre/include/lustre_mds.h | 1 + .../patches/vfs_intent-2.6-rhel4.patch | 10 +- lustre/llite/dcache.c | 82 ++- lustre/llite/file.c | 73 ++- lustre/llite/llite_internal.h | 90 ++- lustre/llite/llite_lib.c | 3 +- lustre/llite/llite_nfs.c | 15 +- lustre/llite/lproc_llite.c | 30 +- lustre/llite/namei.c | 54 +- lustre/llite/statahead.c | 690 +++++++++++---------- lustre/mdc/mdc_locks.c | 2 +- lustre/tests/sanity.sh | 6 +- 12 files changed, 656 insertions(+), 400 deletions(-) diff --git a/lustre/include/lustre_mds.h b/lustre/include/lustre_mds.h index d391367..47d5073 100644 --- a/lustre/include/lustre_mds.h +++ b/lustre/include/lustre_mds.h @@ -243,6 +243,7 @@ struct md_enqueue_info { struct lustre_handle mi_lockh; struct dentry *mi_dentry; md_enqueue_cb_t mi_cb; + unsigned int mi_generation; void *mi_cbdata; }; diff --git a/lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch b/lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch index ee655c3..66e65fb 100644 --- a/lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch +++ b/lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch @@ -1205,8 +1205,9 @@ diff -rup RH_2_6_9_55.orig/fs/stat.c RH_2_6_9_55/fs/stat.c int error; + intent_init(&nd.intent, IT_GETATTR); - error = user_path_walk(name, &nd); - if (!error) { +- error = user_path_walk(name, &nd); ++ error = user_path_walk_it(name, &nd); + if (!error) { - error = vfs_getattr64(nd.mnt, nd.dentry, stat); + error = vfs_getattr64_it(nd.mnt, nd.dentry, &nd.intent, stat); path_release(&nd); @@ -1218,8 +1219,9 @@ diff -rup RH_2_6_9_55.orig/fs/stat.c RH_2_6_9_55/fs/stat.c int error; + intent_init(&nd.intent, IT_GETATTR); - error = user_path_walk_link(name, &nd); - if (!error) { +- error = user_path_walk_link(name, &nd); ++ error = user_path_walk_link_it(name, &nd); + if (!error) { - error = vfs_getattr64(nd.mnt, nd.dentry, stat); + error = vfs_getattr64_it(nd.mnt, nd.dentry, &nd.intent, stat); path_release(&nd); diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 81c09e4..96e01b1 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -118,11 +118,21 @@ void ll_set_dd(struct dentry *de) CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n", de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode, atomic_read(&de->d_count)); - lock_kernel(); + if (de->d_fsdata == NULL) { - OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data)); + struct ll_dentry_data *lld; + + OBD_ALLOC(lld, sizeof(struct ll_dentry_data)); + if (likely(lld != NULL)) { + cfs_waitq_init(&lld->lld_waitq); + lock_dentry(de); + if (likely(de->d_fsdata == NULL)) + de->d_fsdata = lld; + else + OBD_FREE(lld, sizeof(struct ll_dentry_data)); + unlock_dentry(de); + } } - unlock_kernel(); EXIT; } @@ -357,10 +367,9 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, RETURN(0); #endif - rc = ll_have_md_lock(de->d_parent->d_inode, + rc = ll_have_md_lock(de->d_parent->d_inode, MDS_INODELOCK_UPDATE); - - RETURN(rc); + GOTO(out_sa, rc); } exp = ll_i2mdcexp(de->d_inode); @@ -368,12 +377,12 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, /* Never execute intents for mount points. * Attributes will be fixed up in ll_inode_revalidate_it */ if (d_mountpoint(de)) - RETURN(1); + GOTO(out_sa, rc = 1); /* Root of the lustre tree. Always valid. * Attributes will be fixed up in ll_inode_revalidate_it */ if (de == de->d_sb->s_root) - RETURN(1); + GOTO(out_sa, rc = 1); OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5); ll_frob_intent(&it, &lookup_it); @@ -401,7 +410,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, } else if (it->it_flags & FMODE_EXEC) { och_p = &lli->lli_mds_exec_och; och_usecount = &lli->lli_open_fd_exec_count; - } else { + } else { och_p = &lli->lli_mds_read_och; och_usecount = &lli->lli_open_fd_read_count; } @@ -540,6 +549,19 @@ do_lookup: ll_intent_release(it); } GOTO(out, rc = 0); + +out_sa: + /* + * For rc == 1 case, should not return directly to prevent losing + * statahead windows; for rc == 0 case, the "lookup" will be done later. + */ + if (it && it->it_op == IT_GETATTR && rc == 1) { + first = ll_statahead_enter(de->d_parent->d_inode, &de, 0); + if (!first) + ll_statahead_exit(de, rc); + } + + return rc; } /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag) @@ -728,3 +750,45 @@ struct dentry_operations ll_d_ops = { .d_unpin = ll_unpin, #endif }; + +static int ll_fini_revalidate_nd(struct dentry *dentry, struct nameidata *nd) +{ + ENTRY; + /* need lookup */ + RETURN(0); +} + +struct dentry_operations ll_fini_d_ops = { + .d_revalidate = ll_fini_revalidate_nd, + .d_release = ll_release, +}; + +/* + * It is for the following race condition: + * When someone (maybe statahead thread) adds the dentry to the dentry hash + * table, the dentry's "d_op" maybe NULL, at the same time, another (maybe + * "ls -l") process finds such dentry by "do_lookup()" without "do_revalidate()" + * called. It causes statahead window lost, and maybe other issues. --Fan Yong + */ +static int ll_init_revalidate_nd(struct dentry *dentry, struct nameidata *nd) +{ + struct l_wait_info lwi = { 0 }; + struct ll_dentry_data *lld; + ENTRY; + + ll_set_dd(dentry); + lld = ll_d2d(dentry); + if (unlikely(lld == NULL)) + RETURN(-ENOMEM); + + l_wait_event(lld->lld_waitq, dentry->d_op != &ll_init_d_ops, &lwi); + if (likely(dentry->d_op == &ll_d_ops)) + RETURN(ll_revalidate_nd(dentry, nd)); + else + RETURN(dentry->d_op == &ll_fini_d_ops ? 0 : -EINVAL); +} + +struct dentry_operations ll_init_d_ops = { + .d_revalidate = ll_init_revalidate_nd, + .d_release = ll_release, +}; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 518e40a..d50d49a 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -235,17 +235,26 @@ int ll_file_release(struct inode *inode, struct file *file) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); - if (S_ISDIR(inode->i_mode)) - ll_stop_statahead(inode); - /* don't do anything for / */ - if (inode->i_sb->s_root == file->f_dentry) - RETURN(0); - - ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1); + if (inode->i_sb->s_root != file->f_dentry) + ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1); fd = LUSTRE_FPRIVATE(file); LASSERT(fd != NULL); + /* + * The last ref on @file, maybe not the the owner pid of statahead. + * Different processes can open the same dir, "ll_opendir_key" means: + * it is me that should stop the statahead thread. + */ + if (lli->lli_opendir_key == fd) + ll_stop_statahead(inode, fd); + + if (inode->i_sb->s_root == file->f_dentry) { + LUSTRE_FPRIVATE(file) = NULL; + ll_file_data_put(fd); + RETURN(0); + } + if (lsm) lov_test_and_clear_async_rc(lsm); lli->lli_async_rc = 0; @@ -384,19 +393,12 @@ int ll_file_open(struct inode *inode, struct file *file) struct obd_client_handle **och_p; __u64 *och_usecount; struct ll_file_data *fd; - int rc = 0; + int rc = 0, opendir_set = 0; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino, inode->i_generation, inode, file->f_flags); - if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0) - lli->lli_opendir_pid = current->pid; - - /* don't do anything for / */ - if (inode->i_sb->s_root == file->f_dentry) - RETURN(0); - #ifdef HAVE_VFS_INTENT_PATCHES it = file->f_it; #else @@ -405,10 +407,40 @@ int ll_file_open(struct inode *inode, struct file *file) #endif fd = ll_file_data_get(); - if (fd == NULL) { - lli->lli_opendir_pid = 0; + if (fd == NULL) RETURN(-ENOMEM); + + if (S_ISDIR(inode->i_mode)) { + spin_lock(&lli->lli_lock); + /* + * "lli->lli_opendir_pid != 0" means someone has set it. + * "lli->lli_sai != NULL" means the previous statahead has not + * been cleanup. + */ + if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) { + opendir_set = 1; + lli->lli_opendir_pid = cfs_curproc_pid(); + lli->lli_opendir_key = fd; + } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) { + /* Two cases for this: + * (1) The same process open such directory many times. + * (2) The old process opened the directory, and exited + * before its children processes. Then new process + * with the same pid opens such directory before the + * old process's children processes exit. + * Change the owner to the latest one. + */ + opendir_set = 2; + lli->lli_opendir_key = fd; + } + spin_unlock(&lli->lli_lock); } + + if (inode->i_sb->s_root == file->f_dentry) { + LUSTRE_FPRIVATE(file) = fd; + RETURN(0); + } + if (!it || !it->d.lustre.it_disposition) { /* Convert f_flags into access mode. We cannot use file->f_mode, * because everything but O_ACCMODE mask was stripped from it */ @@ -547,7 +579,12 @@ out_och_free: } up(&lli->lli_och_sem); out_openerr: - lli->lli_opendir_pid = 0; + if (opendir_set) { + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; + } else if (unlikely(opendir_set == 2)) { + ll_stop_statahead(inode, fd); + } } return rc; } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 5774f86..3ae815a 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -54,11 +54,12 @@ struct ll_dentry_data { struct obd_client_handle lld_cwd_och; struct obd_client_handle lld_mnt_och; #ifndef HAVE_VFS_INTENT_PATCHES - struct lookup_intent *lld_it; + struct lookup_intent *lld_it; #endif + cfs_waitq_t lld_waitq; }; -#define ll_d2d(de) ((struct ll_dentry_data*) de->d_fsdata) +#define ll_d2d(de) ((struct ll_dentry_data*)((de)->d_fsdata)) extern struct file_operations ll_pgcache_seq_fops; @@ -113,7 +114,16 @@ struct ll_inode_info { #endif /* metadata stat-ahead */ + /* + * "opendir_pid" is the token when lookup/revalid -- I am the owner of + * dir statahead. + */ pid_t lli_opendir_pid; + /* + * since parent-child threads can share the same @file struct, + * "opendir_key" is the token when dir close for case of parent exit + * before child -- it is me should cleanup the dir readahead. */ + void *lli_opendir_key; struct ll_statahead_info *lli_sai; }; @@ -276,15 +286,16 @@ struct ll_sb_info { * clustred nfs */ /* metadata stat-ahead */ - unsigned int ll_sa_count; /* current statahead RPCs */ - unsigned int ll_sa_max; /* max statahead RPCs */ - unsigned int ll_sa_wrong; /* statahead thread stopped for - * low hit ratio */ - unsigned int ll_sa_total; /* statahead thread started - * count */ + unsigned int ll_sa_max; /* max statahead RPCs */ + unsigned int ll_sa_wrong; /* statahead thread stopped for + * low hit ratio */ + unsigned int ll_sa_total; /* statahead thread started + * count */ unsigned long long ll_sa_blocked; /* ls count waiting for * statahead */ unsigned long long ll_sa_cached; /* ls count got in cache */ + unsigned long long ll_sa_hit; /* hit count */ + unsigned long long ll_sa_miss; /* miss count */ }; #define LL_DEFAULT_MAX_RW_CHUNK (32 * 1024 * 1024) @@ -613,6 +624,9 @@ int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmm, int *lmm_size, struct ptlrpc_request **request); /* llite/dcache.c */ +extern struct dentry_operations ll_init_d_ops; +extern struct dentry_operations ll_d_ops; +extern struct dentry_operations ll_fini_d_ops; void ll_intent_drop_lock(struct lookup_intent *); void ll_intent_release(struct lookup_intent *); extern void ll_set_dd(struct dentry *de); @@ -815,34 +829,70 @@ int ll_removexattr(struct dentry *dentry, const char *name); /* statahead.c */ -#define LL_STATAHEAD_MIN 1 -#define LL_STATAHEAD_DEF 32 -#define LL_STATAHEAD_MAX 10000 +#define LL_SA_RPC_MIN 2 +#define LL_SA_RPC_DEF 32 +#define LL_SA_RPC_MAX 8192 /* per inode struct, for dir only */ struct ll_statahead_info { struct inode *sai_inode; - atomic_t sai_refc; /* when access this struct, hold + unsigned int sai_generation; /* generation for statahead */ + atomic_t sai_refcount; /* when access this struct, hold * refcount */ - unsigned int sai_max; /* max ahead of lookup */ unsigned int sai_sent; /* stat requests sent count */ unsigned int sai_replied; /* stat requests which received * reply */ - unsigned int sai_cached; /* UPDATE lock cached locally - * already */ + unsigned int sai_max; /* max ahead of lookup */ + unsigned int sai_index; /* index of statahead entry */ unsigned int sai_hit; /* hit count */ - unsigned int sai_miss; /* miss count */ + unsigned int sai_miss; /* miss count: + * for "ls -al" case, it includes + * hidden dentry miss; + * for "ls -l" case, it does not + * include hidden dentry miss. + * "sai_miss_hidden" is used for + * the later case. + */ unsigned int sai_consecutive_miss; /* consecutive miss */ - unsigned sai_ls_all:1; /* ls -al, do stat-ahead for + unsigned int sai_miss_hidden;/* "ls -al", but first dentry + * is not a hidden one */ + unsigned int sai_skip_hidden;/* skipped hidden dentry count */ + unsigned int sai_ls_all:1; /* "ls -al", do stat-ahead for * hidden entries */ + cfs_waitq_t sai_waitq; /* stat-ahead wait queue */ struct ptlrpc_thread sai_thread; /* stat-ahead thread */ struct list_head sai_entries; /* stat-ahead entries */ - unsigned int sai_entries_nr; /* stat-ahead entries count */ }; -int ll_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup); +int do_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup); void ll_statahead_exit(struct dentry *dentry, int result); -void ll_stop_statahead(struct inode *inode); +void ll_stop_statahead(struct inode *inode, void *key); + +static inline +void ll_d_wakeup(struct dentry *dentry) +{ + struct ll_dentry_data *lld = ll_d2d(dentry); + + LASSERT(dentry->d_op != &ll_init_d_ops); + if (lld != NULL) + cfs_waitq_broadcast(&lld->lld_waitq); +} + +static inline +int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) +{ + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_inode_info *lli = ll_i2info(dir); + + if (sbi->ll_sa_max == 0) + return -ENOTSUPP; + + /* not the same process, don't statahead */ + if (lli->lli_opendir_pid != cfs_curproc_pid()) + return -EBADF; + + return do_statahead_enter(dir, dentryp, lookup); +} /* llite ioctl register support rountine */ #ifdef __KERNEL__ diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index d28a16e..54fcd63 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -108,7 +108,7 @@ static struct ll_sb_info *ll_init_sbi(void) } /* metadata statahead is enabled by default */ - sbi->ll_sa_max = LL_STATAHEAD_DEF; + sbi->ll_sa_max = LL_SA_RPC_DEF; RETURN(sbi); } @@ -1233,6 +1233,7 @@ void ll_clear_inode(struct inode *inode) if (S_ISDIR(inode->i_mode)) { /* these should have been cleared in ll_file_release */ LASSERT(lli->lli_sai == NULL); + LASSERT(lli->lli_opendir_key == NULL); LASSERT(lli->lli_opendir_pid == 0); } diff --git a/lustre/llite/llite_nfs.c b/lustre/llite/llite_nfs.c index 6ba424a..03dd480 100644 --- a/lustre/llite/llite_nfs.c +++ b/lustre/llite/llite_nfs.c @@ -98,8 +98,6 @@ static struct inode * search_inode_for_lustre(struct super_block *sb, RETURN(inode); } -extern struct dentry_operations ll_d_ops; - static struct dentry *ll_iget_for_nfs(struct super_block *sb, unsigned long ino, __u32 generation, umode_t mode) { @@ -162,7 +160,18 @@ static struct dentry *ll_iget_for_nfs(struct super_block *sb, unsigned long ino, #endif ll_set_dd(result); - result->d_op = &ll_d_ops; + + lock_dentry(result); + if (unlikely(result->d_op == &ll_init_d_ops)) { + result->d_op = &ll_d_ops; + unlock_dentry(result); + smp_wmb(); + ll_d_wakeup(result); + } else { + result->d_op = &ll_d_ops; + unlock_dentry(result); + } + RETURN(result); } diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index a50453e..04a7f90 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -463,15 +463,6 @@ static int ll_wr_contention_time(struct file *file, const char *buffer, count; } -static int ll_rd_statahead_count(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct super_block *sb = data; - struct ll_sb_info *sbi = ll_s2sbi(sb); - - return snprintf(page, count, "%u\n", sbi->ll_sa_count); -} - static int ll_rd_statahead_max(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -491,11 +482,12 @@ static int ll_wr_statahead_max(struct file *file, const char *buffer, rc = lprocfs_write_helper(buffer, count, &val); if (rc) return rc; - if (val >= 0 && val <= LL_STATAHEAD_MAX) + + if (val >= 0 && val <= LL_SA_RPC_MAX) sbi->ll_sa_max = val; else CERROR("Bad statahead_max value %d. Valid values are in the " - "range [0, %d]\n", val, LL_STATAHEAD_MAX); + "range [0, %d]\n", val, LL_SA_RPC_MAX); return count; } @@ -510,10 +502,15 @@ static int ll_rd_statahead_stats(char *page, char **start, off_t off, "statahead wrong: %u\n" "statahead total: %u\n" "ls blocked: %llu\n" - "ls total: %llu\n", - sbi->ll_sa_wrong, sbi->ll_sa_total, + "ls cached: %llu\n" + "hit count: %llu\n" + "miss count: %llu\n", + sbi->ll_sa_wrong, + sbi->ll_sa_total, sbi->ll_sa_blocked, - sbi->ll_sa_blocked + sbi->ll_sa_cached); + sbi->ll_sa_cached, + sbi->ll_sa_hit, + sbi->ll_sa_miss); } static struct lprocfs_vars lprocfs_llite_obd_vars[] = { @@ -538,9 +535,8 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = { { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 }, { "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 }, { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0}, - { "statahead_count", ll_rd_statahead_count, 0, 0 }, - { "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 }, - { "statahead_stats", ll_rd_statahead_stats, 0, 0 }, + { "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 }, + { "statahead_stats", ll_rd_statahead_stats, 0, 0 }, { 0 } }; diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index e11e89d..2a0733f 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -98,8 +98,6 @@ static int ll_test_inode(struct inode *inode, void *opaque) return 1; } -extern struct dentry_operations ll_d_ops; - int ll_unlock(__u32 mode, struct lustre_handle *lockh) { ENTRY; @@ -444,24 +442,44 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) } int lookup_it_finish(struct ptlrpc_request *request, int offset, - struct lookup_intent *it, void *data) + struct lookup_intent *it, void *data) { struct it_cb_data *icbd = data; struct dentry **de = icbd->icbd_childp; struct inode *parent = icbd->icbd_parent; struct ll_sb_info *sbi = ll_i2sbi(parent); struct inode *inode = NULL; - int rc; + int set = 0, rc; + ENTRY; + + lock_dentry(*de); + if (likely((*de)->d_op != &ll_d_ops)) { + (*de)->d_op = &ll_init_d_ops; + set = 1; + } + unlock_dentry(*de); /* NB 1 request reference will be taken away by ll_intent_lock() * when I return */ if (!it_disposition(it, DISP_LOOKUP_NEG)) { - ENTRY; + struct dentry *save = *de; rc = ll_prep_inode(sbi->ll_osc_exp, &inode, request, offset, (*de)->d_sb); - if (rc) + if (rc) { + if (set) { + lock_dentry(*de); + if (likely((*de)->d_op == &ll_init_d_ops)) { + (*de)->d_op = &ll_fini_d_ops; + unlock_dentry(*de); + smp_wmb(); + ll_d_wakeup(*de); + } else { + unlock_dentry(*de); + } + } RETURN(rc); + } CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n", inode, inode->i_ino, inode->i_generation); @@ -476,8 +494,18 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset, ll_glimpse_size or some equivalent themselves anyway. Also see bug 7198. */ *de = ll_find_alias(inode, *de); + if (set && *de != save) { + lock_dentry(save); + if (likely(save->d_op == &ll_init_d_ops)) { + save->d_op = &ll_fini_d_ops; + unlock_dentry(save); + smp_wmb(); + ll_d_wakeup(save); + } else { + unlock_dentry(save); + } + } } else { - ENTRY; /* Check that parent has UPDATE lock. If there is none, we cannot afford to hash this dentry (done by ll_d_add) as it might get picked up later when UPDATE lock will appear */ @@ -497,7 +525,17 @@ int lookup_it_finish(struct ptlrpc_request *request, int offset, } ll_set_dd(*de); - (*de)->d_op = &ll_d_ops; + + lock_dentry(*de); + if (likely((*de)->d_op == &ll_init_d_ops)) { + (*de)->d_op = &ll_d_ops; + unlock_dentry(*de); + smp_wmb(); + ll_d_wakeup(*de); + } else { + (*de)->d_op = &ll_d_ops; + unlock_dentry(*de); + } RETURN(0); } diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 6c23dd1..24b245d 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -36,7 +36,7 @@ struct ll_sai_entry { struct list_head se_list; - int se_index; + unsigned int se_index; int se_stat; }; @@ -45,6 +45,9 @@ enum { SA_ENTRY_STATED }; +static unsigned int sai_generation = 0; +static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED; + static struct ll_statahead_info *ll_sai_alloc(void) { struct ll_statahead_info *sai; @@ -53,10 +56,14 @@ static struct ll_statahead_info *ll_sai_alloc(void) if (!sai) return NULL; - sai->sai_max = LL_STATAHEAD_MIN; + spin_lock(&sai_generation_lock); + sai->sai_generation = ++sai_generation; + spin_unlock(&sai_generation_lock); + atomic_set(&sai->sai_refcount, 1); + sai->sai_max = LL_SA_RPC_MIN; + cfs_waitq_init(&sai->sai_waitq); cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); CFS_INIT_LIST_HEAD(&sai->sai_entries); - atomic_set(&sai->sai_refc, 1); return sai; } @@ -64,18 +71,29 @@ static inline struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai) { LASSERT(sai); - atomic_inc(&sai->sai_refc); + atomic_inc(&sai->sai_refcount); return sai; } static void ll_sai_put(struct ll_statahead_info *sai) { - struct inode *inode = sai->sai_inode; + struct inode *inode = sai->sai_inode; struct ll_inode_info *lli = ll_i2info(inode); ENTRY; - if (atomic_dec_and_lock(&sai->sai_refc, &lli->lli_lock)) { - struct ll_sai_entry *entry, *next; + if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) { + struct ll_sai_entry *entry, *next; + + lli->lli_sai = NULL; + spin_unlock(&lli->lli_lock); + + LASSERT(sai->sai_thread.t_flags & SVC_STOPPED); + + if (sai->sai_sent > sai->sai_replied) + CWARN("statahead for dir %lu/%u does not finish: " + "[sent:%u] [replied:%u]\n", + inode->i_ino, inode->i_generation, + sai->sai_sent, sai->sai_replied); list_for_each_entry_safe(entry, next, &sai->sai_entries, se_list) { @@ -83,102 +101,110 @@ static void ll_sai_put(struct ll_statahead_info *sai) OBD_FREE_PTR(entry); } OBD_FREE_PTR(sai); - lli->lli_sai = NULL; - spin_unlock(&lli->lli_lock); iput(inode); } EXIT; } -static struct ll_sai_entry *ll_sai_entry_get(struct ll_statahead_info *sai, - int index, int stat) +static struct ll_sai_entry * +ll_sai_entry_get(struct ll_statahead_info *sai, unsigned int index, int stat) { struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); struct ll_sai_entry *entry; ENTRY; OBD_ALLOC_PTR(entry); if (entry == NULL) - RETURN(NULL); + RETURN(ERR_PTR(-ENOMEM)); - CDEBUG(D_READA, "alloc sai entry %p index %d, stat %d\n", + CDEBUG(D_READA, "alloc sai entry %p index %u, stat %d\n", entry, index, stat); entry->se_index = index; entry->se_stat = stat; spin_lock(&lli->lli_lock); list_add_tail(&entry->se_list, &sai->sai_entries); - sai->sai_entries_nr++; - sbi->ll_sa_count = sai->sai_entries_nr; spin_unlock(&lli->lli_lock); - LASSERT(sai->sai_entries_nr <= sbi->ll_sa_max); RETURN(entry); } -/* inside lli_lock */ -static void ll_sai_entry_set(struct ll_statahead_info *sai, int index, - int stat) +/* + * inside lli_lock + * return value: + * 0: can not find the entry with the index + * 1: it is the first entry + * 2: it is not the first entry + */ +static int +ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat) { struct ll_sai_entry *entry; + int rc = 0; ENTRY; - list_for_each_entry(entry, &sai->sai_entries, se_list) { - if (entry->se_index == index) { - LASSERT(entry->se_stat == SA_ENTRY_UNSTATED); - entry->se_stat = stat; - CDEBUG(D_READA, "set sai entry %p index %d stat %d\n", - entry, index, stat); - EXIT; - return; - } + if (list_empty(&sai->sai_entries)) + RETURN(0); + + entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list); + if (entry->se_index == index) + GOTO(out, rc = 1); + + while (entry->se_list.next != &sai->sai_entries && + entry->se_index < index) { + entry = list_entry(entry->se_list.next, struct ll_sai_entry, + se_list); + if (entry->se_index == index) + GOTO(out, rc = 2); } - /* Sometimes, this happens when entry has been put and freed */ - CDEBUG(D_READA, "can't find sai entry index %d\n", index); + EXIT; + +out: + if (rc) { + LASSERT(entry->se_stat == SA_ENTRY_UNSTATED); + entry->se_stat = stat; + } + + return rc; } -/* check first entry was stated already */ +/* + * Check whether first entry was stated already or not. + * No need to hold lli_lock, for: + * (1) it is me that remove entry from the list + * (2) the statahead thread only add new entry to the list tail + */ static int ll_sai_entry_stated(struct ll_statahead_info *sai) { - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); struct ll_sai_entry *entry; int rc = 0; ENTRY; - spin_lock(&lli->lli_lock); if (!list_empty(&sai->sai_entries)) { entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list); - CDEBUG(D_READA, "check sai entry %p index %d stat %d\n", - entry, entry->se_index, entry->se_stat); rc = (entry->se_stat != SA_ENTRY_UNSTATED); } - spin_unlock(&lli->lli_lock); RETURN(rc); } -/* inside lli_lock */ static void ll_sai_entry_put(struct ll_statahead_info *sai) { + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); struct ll_sai_entry *entry; ENTRY; - if (list_empty(&sai->sai_entries)) { - EXIT; - return; + spin_lock(&lli->lli_lock); + if (!list_empty(&sai->sai_entries)) { + entry = list_entry(sai->sai_entries.next, + struct ll_sai_entry, se_list); + list_del(&entry->se_list); + OBD_FREE_PTR(entry); } - LASSERT(sai->sai_entries_nr > 0); - - entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list); - list_del(&entry->se_list); - sai->sai_entries_nr--; + spin_unlock(&lli->lli_lock); - CDEBUG(D_READA, "free sa entry %p index %d stat %d\n", - entry, entry->se_index, entry->se_stat); - OBD_FREE_PTR(entry); EXIT; } @@ -192,11 +218,22 @@ static int ll_statahead_interpret(struct obd_export *exp, struct dentry *dentry = minfo->mi_dentry; struct inode *dir = dentry->d_parent->d_inode; struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai; + struct ll_statahead_info *sai = NULL; ENTRY; CDEBUG(D_READA, "interpret statahead %.*s rc %d\n", dentry->d_name.len, dentry->d_name.name, rc); + + spin_lock(&lli->lli_lock); + if (unlikely(lli->lli_sai == NULL || + lli->lli_sai->sai_generation != minfo->mi_generation)) { + spin_unlock(&lli->lli_lock); + GOTO(out_free, rc = -ESTALE); + } else { + sai = ll_sai_get(lli->lli_sai); + spin_unlock(&lli->lli_lock); + } + if (rc || dir == NULL) GOTO(out, rc); @@ -204,21 +241,21 @@ static int ll_statahead_interpret(struct obd_export *exp, /* lookup */ struct dentry *save = dentry; struct it_cb_data icbd = { - .icbd_parent = dir, - .icbd_childp = &dentry + .icbd_parent = dir, + .icbd_childp = &dentry }; rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd); - if (!rc) { + if (!rc) /* * Here dentry->d_inode might be NULL, * because the entry may have been removed before * we start doing stat ahead. */ - if (dentry != save) - dput(save); ll_lookup_finish_locks(it, dentry); - } + + if (dentry != save) + dput(save); } else { /* revalidate */ struct mds_body *body; @@ -248,19 +285,35 @@ static int ll_statahead_interpret(struct obd_export *exp, spin_unlock(&dcache_lock); ll_lookup_finish_locks(it, dentry); - } EXIT; + out: - spin_lock(&lli->lli_lock); - sai = lli->lli_sai; - if (sai) { - lli->lli_sai->sai_replied++; - ll_sai_entry_set(lli->lli_sai, (long)minfo->mi_cbdata, - SA_ENTRY_STATED); - cfs_waitq_signal(&lli->lli_sai->sai_thread.t_ctl_waitq); + if (sai != NULL) { + int first; + + sai->sai_replied++; + spin_lock(&lli->lli_lock); + first = ll_sai_entry_set(sai, + (unsigned int)(long)minfo->mi_cbdata, + SA_ENTRY_STATED); + /* + * wake up the "ls -l" process only when the first entry + * returned. + */ + spin_unlock(&lli->lli_lock); + if (first == 1) + cfs_waitq_signal(&sai->sai_waitq); + else if (first == 0) + CDEBUG(D_READA, "can't find sai entry for dir " + "%lu/%u generation %u index %d\n", + dir->i_ino, dir->i_generation, + minfo->mi_generation, + (unsigned int)(long)minfo->mi_cbdata); + + ll_sai_put(sai); } - spin_unlock(&lli->lli_lock); +out_free: ll_intent_release(it); OBD_FREE_PTR(minfo); @@ -298,7 +351,8 @@ static int sa_args_prep(struct inode *dir, struct dentry *dentry, minfo->mi_it.it_op = IT_GETATTR; minfo->mi_dentry = dentry; minfo->mi_cb = ll_statahead_interpret; - minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_sent; + minfo->mi_generation = lli->lli_sai->sai_generation; + minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index; einfo->ei_type = LDLM_IBITS; einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); @@ -337,14 +391,16 @@ static int do_sa_lookup(struct inode *dir, struct dentry *dentry) RETURN(rc); } -/* similar to ll_revalidate_it(). - * return 1: dentry valid. - * 0: will send stat-ahead request. - * -errno: prepare stat-ahead request failed. */ +/* + * similar to ll_revalidate_it(). + * return value: + * 1 -- dentry valid + * 0 -- will send stat-ahead request + * others -- prepare stat-ahead request failed + */ static int do_sa_revalidate(struct dentry *dentry) { struct inode *inode = dentry->d_inode; - struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode); struct ll_fid fid; struct lookup_intent it = { .it_op = IT_GETATTR }; struct md_enqueue_info *minfo; @@ -358,13 +414,14 @@ static int do_sa_revalidate(struct dentry *dentry) if (d_mountpoint(dentry)) RETURN(1); + if (dentry == dentry->d_sb->s_root) + RETURN(1); + ll_inode2fid(&fid, inode); rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid); if (rc == 1) { ll_intent_release(&it); - lli->lli_sai->sai_cached++; - cfs_waitq_signal(&lli->lli_sai->sai_thread.t_ctl_waitq); RETURN(1); } @@ -384,21 +441,17 @@ static int do_sa_revalidate(struct dentry *dentry) RETURN(rc); } -/* copied from kernel */ -static inline void name2qstr(struct qstr *this, const char *name, int namelen) +static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen) { - unsigned long hash; - const unsigned char *p = (const unsigned char *)name; - int len; - unsigned int c; - - hash = init_name_hash(); - for (len = 0; len < namelen; len++, p++) { - c = *p; - hash = partial_name_hash(c, hash); - } + unsigned long hash = init_name_hash(); + unsigned int c; + this->name = name; this->len = namelen; + for (; namelen > 0; namelen--, name++) { + c = *(const unsigned char *)name; + hash = partial_name_hash(c, hash); + } this->hash = end_name_hash(hash); } @@ -412,11 +465,6 @@ static int ll_statahead_one(struct dentry *parent, ext2_dirent *de) int rc; ENTRY; - name2qstr(&name, de->name, de->name_len); - - se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_sent, - SA_ENTRY_UNSTATED); - #ifdef DCACHE_LUSTRE_INVALID if (parent->d_flags & DCACHE_LUSTRE_INVALID) { #else @@ -425,34 +473,44 @@ static int ll_statahead_one(struct dentry *parent, ext2_dirent *de) CDEBUG(D_READA, "parent dentry@%p %.*s is " "invalid, skip statahead\n", parent, parent->d_name.len, parent->d_name.name); - GOTO(out, rc = -EINVAL); + RETURN(-EINVAL); } + se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_index, + SA_ENTRY_UNSTATED); + if (IS_ERR(se)) + RETURN(PTR_ERR(se)); + + ll_name2qstr(&name, de->name, de->name_len); dentry = d_lookup(parent, &name); if (!dentry) { - struct dentry *dentry = d_alloc(parent, &name); - - rc = -ENOMEM; + dentry = d_alloc(parent, &name); if (dentry) { rc = do_sa_lookup(dir, dentry); if (rc) dput(dentry); + } else { + GOTO(out, rc = -ENOMEM); } - GOTO(out, rc); + } else { + rc = do_sa_revalidate(dentry); + if (rc) + dput(dentry); } - rc = do_sa_revalidate(dentry); - if (rc) - dput(dentry); - GOTO(out, rc); + EXIT; + out: if (rc) { - CDEBUG(D_READA, "set sai entry %p index %d stat %d, rc %d\n", + CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n", se, se->se_index, se->se_stat, rc); se->se_stat = rc; - cfs_waitq_signal(&lli->lli_sai->sai_thread.t_ctl_waitq); + cfs_waitq_signal(&lli->lli_sai->sai_waitq); + } else { + lli->lli_sai->sai_sent++; } - lli->lli_sai->sai_sent++; + + lli->lli_sai->sai_index++; return rc; } @@ -463,7 +521,17 @@ static inline int sa_check_stop(struct ll_statahead_info *sai) static inline int sa_not_full(struct ll_statahead_info *sai) { - return sai->sai_sent - sai->sai_miss - sai->sai_hit < sai->sai_max; + return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max; +} + +/* (1) hit ratio less than 80% + * or + * (2) consecutive miss more than 8 + */ +static inline int sa_low_hit(struct ll_statahead_info *sai) +{ + return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) || + (sai->sai_consecutive_miss > 8)); } struct ll_sa_thread_args { @@ -480,155 +548,141 @@ static int ll_statahead_thread(void *arg) struct ll_sb_info *sbi = ll_i2sbi(dir); struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai); struct ptlrpc_thread *thread = &sai->sai_thread; - struct l_wait_info lwi = { 0 }; unsigned long index = 0; - __u64 offset = 0; - int skip = 0; + int first = 0; int rc = 0; char name[16] = ""; ENTRY; sbi->ll_sa_total++; - snprintf(name, 15, "ll_sa_%u", sta->sta_pid); cfs_daemonize(name); + spin_lock(&lli->lli_lock); thread->t_flags = SVC_RUNNING; + spin_unlock(&lli->lli_lock); cfs_waitq_signal(&thread->t_ctl_waitq); CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name); - if (sai->sai_ls_all) - CDEBUG(D_READA, "do statahead for hidden files\n"); - while (1) { - unsigned long npages = dir_pages(dir); - - /* hit ratio < 80% */ - if ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) || - (sai->sai_consecutive_miss > 8)) { - sbi->ll_sa_wrong++; - CDEBUG(D_READA, "statahead for dir %.*s hit ratio too " - "low: hit/miss %u/%u, sent/replied %u/%u, " - "cached %u\n", - parent->d_name.len, parent->d_name.name, - sai->sai_hit, sai->sai_miss, sai->sai_sent, - sai->sai_replied, sai->sai_cached); - break; - } + struct l_wait_info lwi = { 0 }; + unsigned long npages; + char *kaddr, *limit; + ext2_dirent *de; + struct page *page; + npages = dir_pages(dir); /* reach the end of dir */ - if (index == npages) { + if (index >= npages) { CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n", index, npages); break; } - l_wait_event(thread->t_ctl_waitq, - sa_check_stop(sai) || sa_not_full(sai), - &lwi); - - if (sa_check_stop(sai)) + page = ll_get_dir_page(dir, index); + if (IS_ERR(page)) { + rc = PTR_ERR(page); + CERROR("error reading dir %lu/%u page %lu/%u: rc %d\n", + dir->i_ino, dir->i_generation, index, + sai->sai_index, rc); break; + } - for (; index < npages; index++, offset = 0) { - char *kaddr, *limit; - ext2_dirent *de; - struct page *page; - - CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu" - "/%lu size %llu\n", - CFS_PAGE_SIZE, dir->i_ino, dir->i_generation, - index, npages, dir->i_size); - - page = ll_get_dir_page(dir, index); - npages = dir_pages(dir); - - if (IS_ERR(page)) { - rc = PTR_ERR(page); - CERROR("error reading dir %lu/%u page %lu: " - "rc %d\n", - dir->i_ino, dir->i_generation, index, - rc); - GOTO(out, rc); + kaddr = page_address(page); + limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1); + de = (ext2_dirent *)kaddr; + if (!index) { + de = ext2_next_entry(de); /* skip "." */ + de = ext2_next_entry(de); /* skip ".." */ + } + + for (; (char*)de <= limit; de = ext2_next_entry(de)) { + if (!de->inode) + continue; + + if (de->name[0] == '.' && !sai->sai_ls_all) { + /* skip hidden files */ + sai->sai_skip_hidden++; + continue; } - kaddr = page_address(page); - de = (ext2_dirent *)(kaddr + offset); - limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1); - for (; (char*)de <= limit && sa_not_full(sai); - de = ext2_next_entry(de)) { - if (!de->inode) - continue; - - /* don't stat-ahead ".", ".." */ - if (skip < 2) { - skip++; - continue; - } - - /* don't stat-ahead for hidden files */ - if (de->name[0] == '.' && !sai->sai_ls_all) - continue; - - /* don't stat-ahead for the first de */ - if (skip < 3) { - skip++; - continue; - } - - rc = ll_statahead_one(parent, de); - if (rc < 0) { - ext2_put_page(page); - GOTO(out, rc); - } + /* don't stat-ahead first entry */ + if (unlikely(!first)) { + first++; + continue; + } + + l_wait_event(thread->t_ctl_waitq, + sa_check_stop(sai) || sa_not_full(sai), + &lwi); + + if (unlikely(sa_check_stop(sai))) { + ext2_put_page(page); + GOTO(out, rc); } - offset = (char *)de - kaddr; - ext2_put_page(page); - if ((char *)de <= limit) - /* !sa_not_full() */ - break; + rc = ll_statahead_one(parent, de); + if (rc < 0) { + ext2_put_page(page); + GOTO(out, rc); + } } + ext2_put_page(page); + index++; } EXIT; out: spin_lock(&lli->lli_lock); thread->t_flags = SVC_STOPPED; - cfs_waitq_signal(&thread->t_ctl_waitq); - lli->lli_opendir_pid = 0; /* avoid statahead again */ spin_unlock(&lli->lli_lock); - + cfs_waitq_signal(&sai->sai_waitq); + cfs_waitq_signal(&thread->t_ctl_waitq); ll_sai_put(sai); dput(parent); - CDEBUG(D_READA, "stopped statahead thread, pid %d for %s\n", - current->pid, parent->d_name.name); - return 0; + CDEBUG(D_READA, "statahead thread stopped, pid %d\n", + cfs_curproc_pid()); + return rc; } -/* called in ll_file_release */ -void ll_stop_statahead(struct inode *inode) +/* called in ll_file_release() */ +void ll_stop_statahead(struct inode *inode, void *key) { struct ll_inode_info *lli = ll_i2info(inode); struct ptlrpc_thread *thread; spin_lock(&lli->lli_lock); - /* don't check pid here. upon fork, if parent closedir before child, - * child will not have chance to stop this thread. */ + if (lli->lli_opendir_pid == 0 || + unlikely(lli->lli_opendir_key != key)) { + spin_unlock(&lli->lli_lock); + return; + } + + lli->lli_opendir_key = NULL; lli->lli_opendir_pid = 0; - if (lli->lli_sai && (lli->lli_sai->sai_thread.t_flags & SVC_RUNNING)) { + if (lli->lli_sai) { struct l_wait_info lwi = { 0 }; - ll_sai_get(lli->lli_sai); + thread = &lli->lli_sai->sai_thread; - thread->t_flags = SVC_STOPPING; - cfs_waitq_signal(&thread->t_ctl_waitq); - spin_unlock(&lli->lli_lock); + if (!(thread->t_flags & SVC_STOPPED)) { + thread->t_flags = SVC_STOPPING; + spin_unlock(&lli->lli_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); - CDEBUG(D_READA, "stopping statahead thread, pid %d\n", - current->pid); - l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED, - &lwi); - ll_sai_put(lli->lli_sai); + CDEBUG(D_READA, "stopping statahead thread, pid %d\n", + cfs_curproc_pid()); + l_wait_event(thread->t_ctl_waitq, + thread->t_flags & SVC_STOPPED, + &lwi); + } else { + spin_unlock(&lli->lli_lock); + } + /* + * Put the ref which was held when first statahead_enter. + * It maybe not the last ref for some statahead requests + * maybe inflight. + */ + ll_sai_put(lli->lli_sai); return; } spin_unlock(&lli->lli_lock); @@ -643,42 +697,45 @@ enum { static int is_first_dirent(struct inode *dir, struct dentry *dentry) { struct qstr *d_name = &dentry->d_name; - unsigned long npages = dir_pages(dir); + unsigned long npages, index = 0; struct page *page; ext2_dirent *de; - unsigned long index; - __u64 offset = 0; char *kaddr, *limit; - int dot_de = 1; /* dirent is dotfile till now */ - int rc = LS_NONE_FIRST_DE; + int rc = LS_NONE_FIRST_DE, dot_de; ENTRY; - page = ll_get_dir_page(dir, 0); - if (IS_ERR(page)) { - CERROR("error reading dir %lu/%u page 0: rc %ld\n", - dir->i_ino, dir->i_generation, PTR_ERR(page)); - RETURN(LS_NONE_FIRST_DE); - } - - kaddr = page_address(page); - de = (ext2_dirent *)kaddr; - if (!(de->name_len == 1 && strncmp(de->name, ".", 1) == 0)) - CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino); - de = ext2_next_entry(de); /* skip ".", or ingore bad entry */ - if (!(de->name_len == 2 && strncmp(de->name, "..", 2) == 0)) - CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino); - de = ext2_next_entry(de); /* skip "..", or ingore bad entry */ + while (1) { + npages = dir_pages(dir); + /* reach the end of dir */ + if (index >= npages) { + CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n", + index, npages); + break; + } - offset = (char *)de - kaddr; + page = ll_get_dir_page(dir, index); + if (IS_ERR(page)) { + rc = PTR_ERR(page); + CERROR("error reading dir %lu/%u page %lu: rc %d\n", + dir->i_ino, dir->i_generation, index, rc); + break; + } - for (index = 0; index < npages; offset = 0) { - de = (ext2_dirent *)(kaddr + offset); + kaddr = page_address(page); limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1); + de = (ext2_dirent *)kaddr; + if (!index) { + de = ext2_next_entry(de); /* skip "." */ + de = ext2_next_entry(de); /* skip ".." */ + } + for (; (char*)de <= limit; de = ext2_next_entry(de)) { if (!de->inode) continue; - if (de->name[0] != '.') + if (de->name[0] == '.') + dot_de = 1; + else dot_de = 0; if (dot_de && d_name->name[0] != '.') { @@ -693,76 +750,79 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) rc = LS_FIRST_DE + dot_de; else rc = LS_NONE_FIRST_DE; - GOTO(out, rc); + ext2_put_page(page); + RETURN(rc); } - - if (++index >= npages) - break; - ext2_put_page(page); - - page = ll_get_dir_page(dir, index); - if (IS_ERR(page)) { - CERROR("error reading dir %lu/%u page %lu: rc %ld\n", - dir->i_ino, dir->i_generation, index, - PTR_ERR(page)); - RETURN(LS_NONE_FIRST_DE); - } - kaddr = page_address(page); + index++; } - CERROR("%.*s not found in dir %.*s!\n", d_name->len, d_name->name, - dentry->d_parent->d_name.len, dentry->d_parent->d_name.name); - EXIT; -out: - ext2_put_page(page); - return rc; + RETURN(rc); } -/* start stat-ahead thread if this is the first dir entry, otherwise if a thread - * is started already, wait until thread is ahead of me. +/* Start statahead thread if this is the first dir entry. + * Otherwise if a thread is started already, wait it until it is ahead of me. * Return value: - * 0 -- miss, - * 1 -- hit, - * -EEXIST -- stat ahead thread started, and this is the first try. - * other negative value -- error. + * 0 -- miss + * 1 -- hit + * -EEXIST -- stat ahead thread started, and this is the first dentry + * -EBADFD -- statahead thread exit and not dentry available + * others -- error */ -int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) +int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) { struct ll_sb_info *sbi = ll_i2sbi(dir); struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai; + struct ll_statahead_info *sai = lli->lli_sai; struct ll_sa_thread_args sta; struct l_wait_info lwi = { 0 }; int rc; ENTRY; - if (sbi->ll_sa_max == 0) - RETURN(-ENOTSUPP); - - /* not the same process, don't statahead */ - if (lli->lli_opendir_pid != current->pid) - RETURN(-EBADF); + LASSERT(lli->lli_opendir_pid == cfs_curproc_pid()); - spin_lock(&lli->lli_lock); - if (lli->lli_sai) { - sai = ll_sai_get(lli->lli_sai); - spin_unlock(&lli->lli_lock); + if (sai) { + if (unlikely(sai->sai_thread.t_flags & SVC_STOPPED && + list_empty(&sai->sai_entries))) + RETURN(-EBADFD); + + if ((*dentryp)->d_name.name[0] == '.') { + if (likely(sai->sai_ls_all || + sai->sai_miss_hidden >= sai->sai_skip_hidden)) { + /* + * Hidden dentry is the first one, or statahead + * thread does not skip so many hidden dentries + * before "sai_ls_all" enabled as below. + */ + } else { + if (!sai->sai_ls_all) + /* + * It maybe because hidden dentry is not + * the first one, "sai_ls_all" was not + * set, then "ls -al" missed. Enable + * "sai_ls_all" for such case. + */ + sai->sai_ls_all = 1; + + /* + * Such "getattr" has been skipped before + * "sai_ls_all" enabled as above. + */ + sai->sai_miss_hidden++; + RETURN(-ENOENT); + } + } if (ll_sai_entry_stated(sai)) { sbi->ll_sa_cached++; } else { - struct l_wait_info lwi = { 0 }; - sbi->ll_sa_blocked++; /* thread started already, avoid double-stat */ - l_wait_event(sai->sai_thread.t_ctl_waitq, + l_wait_event(sai->sai_waitq, ll_sai_entry_stated(sai) || sai->sai_thread.t_flags & SVC_STOPPED, &lwi); } - ll_sai_put(sai); - if (lookup) { struct dentry *result; @@ -772,57 +832,52 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) LASSERT(result != *dentryp); dput(*dentryp); *dentryp = result; + RETURN(1); } - RETURN(result != NULL); } /* do nothing for revalidate */ RETURN(0); } - spin_unlock(&lli->lli_lock); + + /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ + LASSERT(lli->lli_sai == NULL); rc = is_first_dirent(dir, *dentryp); - if (!rc) { - /* optimization: don't statahead for this pid any longer */ + if (rc == LS_NONE_FIRST_DE) { + /* It is not "ls -{a}l" operation, no need statahead for it */ spin_lock(&lli->lli_lock); - if (lli->lli_sai == NULL) - lli->lli_opendir_pid = 0; + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; spin_unlock(&lli->lli_lock); RETURN(-EBADF); } - spin_lock(&lli->lli_lock); - if (lli->lli_sai == NULL) { - lli->lli_sai = ll_sai_alloc(); - if (lli->lli_sai == NULL) { - spin_unlock(&lli->lli_lock); - RETURN(-ENOMEM); - } - } else { - /* sai is already there */ - spin_unlock(&lli->lli_lock); - RETURN(-EBUSY); - } - spin_unlock(&lli->lli_lock); + sai = ll_sai_alloc(); + if (sai == NULL) + RETURN(-ENOMEM); - sai = lli->lli_sai; - sai->sai_inode = igrab(dir); + sai->sai_inode = igrab(dir); sai->sai_ls_all = (rc == LS_FIRST_DOT_DE); sta.sta_parent = (*dentryp)->d_parent; - sta.sta_pid = current->pid; - rc = kernel_thread(ll_statahead_thread, &sta, 0); + sta.sta_pid = cfs_curproc_pid(); + + lli->lli_sai = sai; + rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0); if (rc < 0) { CERROR("can't start ll_sa thread, rc: %d\n", rc); + sai->sai_thread.t_flags = SVC_STOPPED; ll_sai_put(sai); + LASSERT(lli->lli_sai == NULL); RETURN(rc); } l_wait_event(sai->sai_thread.t_ctl_waitq, sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED), &lwi); - ll_sai_put(sai); - /* we don't stat-ahead for the first dirent since we are already in + /* + * We don't stat-ahead for the first dirent since we are already in * lookup, and -EEXIST also indicates that this is the first dirent. */ RETURN(-EEXIST); @@ -831,41 +886,42 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) /* update hit/miss count */ void ll_statahead_exit(struct dentry *dentry, int result) { - struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode); - struct ll_sb_info *sbi = ll_i2sbi(dentry->d_parent->d_inode); + struct dentry *parent = dentry->d_parent; + struct ll_inode_info *lli = ll_i2info(parent->d_inode); + struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode); - if (lli->lli_opendir_pid != current->pid) + if (lli->lli_opendir_pid != cfs_curproc_pid()) return; - spin_lock(&lli->lli_lock); if (lli->lli_sai) { struct ll_statahead_info *sai = lli->lli_sai; - ll_sai_entry_put(sai); if (result == 1) { + sbi->ll_sa_hit++; sai->sai_hit++; sai->sai_consecutive_miss = 0; sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); - CDEBUG(D_READA, "statahead %.*s hit(hit/miss %u/%u)\n", - dentry->d_name.len, dentry->d_name.name, - sai->sai_hit, sai->sai_miss); } else { + sbi->ll_sa_miss++; sai->sai_miss++; sai->sai_consecutive_miss++; - /* upon miss, it's always because some dentry is added - * by statahead thread, and at the mean time `ls` - * processs finds this dentry, but the d_op for this - * dentry is NULL, then revalidate is not done, and - * ll_statahead_exit() not called for this dentry, - * so statahead thread should be behind of `ls` process, - * put one entry to go ahead. - */ - CDEBUG(D_READA, "statahead %.*s miss(hit/miss %u/%u)\n", - dentry->d_name.len, dentry->d_name.name, - sai->sai_hit, sai->sai_miss); - ll_sai_entry_put(sai); + if (sa_low_hit(sai)) { + sbi->ll_sa_wrong++; + CDEBUG(D_READA, "statahead for dir %.*s hit " + "ratio too low: hit/miss %u/%u, " + "sent/replied %u/%u. stopping statahead " + "thread: pid %d\n", + parent->d_name.len, parent->d_name.name, + sai->sai_hit, sai->sai_miss, + sai->sai_sent, sai->sai_replied, + cfs_curproc_pid()); + spin_lock(&lli->lli_lock); + if (!(sai->sai_thread.t_flags & SVC_STOPPED)) + sai->sai_thread.t_flags = SVC_STOPPING; + spin_unlock(&lli->lli_lock); + } } cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); + ll_sai_entry_put(sai); } - spin_unlock(&lli->lli_lock); } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 85c9580..91d923d 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -838,7 +838,7 @@ static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req, rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode, &flags, NULL, 0, NULL, lockh, rc); if (rc < 0) { - CERROR("ldlm_cli_enqueue: %d\n", rc); + CERROR("ldlm_cli_enqueue_fini: %d\n", rc); mdc_clear_replay_flag(req, rc); GOTO(out, rc); } diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 07b039b..0d37861 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -4664,7 +4664,9 @@ test_123a() { # was test 123, statahead(bug 11401) max=`lctl get_param -n llite.*.statahead_max | head -n 1` lctl set_param -n llite.*.statahead_max 0 lctl get_param llite.*.statahead_max - + cancel_lru_locks mdc + cancel_lru_locks osc + stime=`date +%s` ls -l $DIR/$tdir > /dev/null etime=`date +%s` delta=$((etime - stime)) @@ -4678,7 +4680,7 @@ test_123a() { # was test 123, statahead(bug 11401) [ $delta -gt 20 ] && break [ $delta -gt 8 ] && MULT=$((50 / delta)) - [ "$SLOW" = "no" -a $delta -ge 3 ] && break + [ "$SLOW" = "no" -a $delta -gt 3 ] && break done log "ls done" -- 1.8.3.1