From 51b1da4054704d0e115ddb2c8857c5a84c7ed958 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Fri, 16 Dec 2011 15:34:25 +0800 Subject: [PATCH] LU-925 agl: trigger async glimpse lock when statahead Client will send async glimpse lock RPCs to OSTs for file size attribute before stat files to accelerate traversing large dir. Signed-off-by: Fan Yong Change-Id: I9bf850abdd3c02c8470ddfcd91a3dc3ef7819c6d Reviewed-on: http://review.whamcloud.com/1692 Reviewed-by: Lai Siyao Tested-by: Hudson Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/llite/file.c | 6 +- lustre/llite/llite_internal.h | 31 +++- lustre/llite/llite_lib.c | 10 + lustre/llite/llite_mmap.c | 2 +- lustre/llite/lproc_llite.c | 39 +++- lustre/llite/namei.c | 2 +- lustre/llite/statahead.c | 413 ++++++++++++++++++++++++++++++++++++------ 7 files changed, 437 insertions(+), 66 deletions(-) diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 50763ed..42d0164 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1888,7 +1888,7 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin) if (origin == 2) { /* SEEK_END */ int rc; - rc = cl_glimpse_size(inode); + rc = ll_glimpse_size(inode); if (rc != 0) RETURN(rc); @@ -2351,11 +2351,11 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it, RETURN(0); } - /* cl_glimpse_size will prefer locally cached writes if they extend + /* ll_glimpse_size will prefer locally cached writes if they extend * the file */ if (rc == 0) - rc = cl_glimpse_size(inode); + rc = ll_glimpse_size(inode); RETURN(rc); } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 4803907..7cfba13 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -172,6 +172,7 @@ struct ll_inode_info { /* the most recent timestamps obtained from mds */ struct ost_lvb lli_lvb; + cfs_spinlock_t lli_agl_lock; /* Try to make the d::member and f::member are aligned. Before using * these members, make clear whether it is directory or not. */ @@ -224,6 +225,11 @@ struct ll_inode_info { /* for writepage() only to communicate to fsync */ int f_async_rc; int f_write_rc; + + cfs_rw_semaphore_t f_glimpse_sem; + cfs_time_t f_glimpse_time; + cfs_list_t f_agl_list; + __u64 f_agl_index; } f; #define lli_size_sem u.f.f_size_sem @@ -234,6 +240,10 @@ struct ll_inode_info { #define lli_write_sem u.f.f_write_sem #define lli_async_rc u.f.f_async_rc #define lli_write_rc u.f.f_write_rc +#define lli_glimpse_sem u.f.f_glimpse_sem +#define lli_glimpse_time u.f.f_glimpse_time +#define lli_agl_list u.f.f_agl_list +#define lli_agl_index u.f.f_agl_index } u; @@ -369,6 +379,7 @@ enum stats_track_type { #define LL_SBI_SOM_PREVIEW 0x1000 /* SOM preview mount option */ #define LL_SBI_32BIT_API 0x2000 /* generate 32 bit inodes. */ #define LL_SBI_64BIT_HASH 0x4000 /* support 64-bits dir hash/offset */ +#define LL_SBI_AGL_ENABLED 0x8000 /* enable agl */ /* default value for ll_sb_info->contention_time */ #define SBI_DEFAULT_CONTENTION_SECONDS 60 @@ -447,8 +458,8 @@ struct ll_sb_info { unsigned int ll_offset_process_count; struct ll_rw_process_info ll_rw_offset_info[LL_OFFSET_HIST_MAX]; unsigned int ll_rw_offset_entry_count; - enum stats_track_type ll_stats_track_type; int ll_stats_track_id; + enum stats_track_type ll_stats_track_type; int ll_rw_stats_on; /* metadata stat-ahead */ @@ -457,6 +468,7 @@ struct ll_sb_info { * count */ atomic_t ll_sa_wrong; /* statahead thread stopped for * low hit ratio */ + atomic_t ll_agl_total; /* AGL thread started count */ dev_t ll_sdev_orig; /* save s_dev before assign for * clustred nfs */ @@ -1196,12 +1208,15 @@ struct ll_statahead_info { unsigned int sai_skip_hidden;/* skipped hidden dentry count */ unsigned int sai_ls_all:1, /* "ls -al", do stat-ahead for * hidden entries */ - sai_in_readpage:1;/* statahead is in readdir()*/ + sai_in_readpage:1,/* statahead is in readdir()*/ + sai_agl_valid:1;/* AGL is valid for the dir */ cfs_waitq_t sai_waitq; /* stat-ahead wait queue */ struct ptlrpc_thread sai_thread; /* stat-ahead thread */ + struct ptlrpc_thread sai_agl_thread; /* AGL thread */ cfs_list_t sai_entries_sent; /* entries sent out */ cfs_list_t sai_entries_received; /* entries returned */ cfs_list_t sai_entries_stated; /* entries stated */ + cfs_list_t sai_entries_agl; /* AGL entries to be sent */ cfs_list_t sai_cache[LL_SA_CACHE_SIZE]; cfs_spinlock_t sai_cache_lock[LL_SA_CACHE_SIZE]; cfs_atomic_t sai_cache_count; /* entry count in cache */ @@ -1211,6 +1226,18 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentry, int only_unplug); void ll_stop_statahead(struct inode *dir, void *key); +static inline int ll_glimpse_size(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + int rc; + + cfs_down_read(&lli->lli_glimpse_sem); + rc = cl_glimpse_size(inode); + lli->lli_glimpse_time = cfs_time_current(); + cfs_up_read(&lli->lli_glimpse_sem); + return rc; +} + static inline void ll_statahead_mark(struct inode *dir, struct dentry *dentry) { diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 70d0fcb..41624bb 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -142,6 +142,8 @@ static struct ll_sb_info *ll_init_sbi(void) sbi->ll_sa_max = LL_SA_RPC_DEF; cfs_atomic_set(&sbi->ll_sa_total, 0); cfs_atomic_set(&sbi->ll_sa_wrong, 0); + cfs_atomic_set(&sbi->ll_agl_total, 0); + sbi->ll_flags |= LL_SBI_AGL_ENABLED; RETURN(sbi); } @@ -875,6 +877,7 @@ void ll_lli_init(struct ll_inode_info *lli) lli->lli_open_fd_write_count = 0; lli->lli_open_fd_exec_count = 0; cfs_sema_init(&lli->lli_och_sem, 1); + cfs_spin_lock_init(&lli->lli_agl_lock); lli->lli_smd = NULL; lli->lli_clob = NULL; @@ -896,6 +899,10 @@ void ll_lli_init(struct ll_inode_info *lli) cfs_sema_init(&lli->lli_write_sem, 1); lli->lli_async_rc = 0; lli->lli_write_rc = 0; + cfs_init_rwsem(&lli->lli_glimpse_sem); + lli->lli_glimpse_time = 0; + CFS_INIT_LIST_HEAD(&lli->lli_agl_list); + lli->lli_agl_index = 0; } } @@ -1168,6 +1175,9 @@ void ll_clear_inode(struct inode *inode) lli->lli_inode_magic = LLI_INODE_DEAD; ll_clear_inode_capas(inode); + if (!S_ISDIR(inode->i_mode)) + LASSERT(cfs_list_empty(&lli->lli_agl_list)); + /* * XXX This has to be done before lsm is freed below, because * cl_object still uses inode lsm. diff --git a/lustre/llite/llite_mmap.c b/lustre/llite/llite_mmap.c index 821f96f..9de415d 100644 --- a/lustre/llite/llite_mmap.c +++ b/lustre/llite/llite_mmap.c @@ -603,7 +603,7 @@ int ll_file_mmap(struct file *file, struct vm_area_struct * vma) vma->vm_ops = &ll_file_vm_ops; vma->vm_ops->open(vma); /* update the inode's size and mtime */ - rc = cl_glimpse_size(inode); + rc = ll_glimpse_size(inode); } RETURN(rc); diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 0845ba7..3bfe608 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -30,6 +30,9 @@ * Use is subject to license terms. */ /* + * Copyright (c) 2011 Whamcloud, Inc. + */ +/* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. */ @@ -562,6 +565,35 @@ static int ll_wr_statahead_max(struct file *file, const char *buffer, return count; } +static int ll_rd_statahead_agl(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + + return snprintf(page, count, "%u\n", + sbi->ll_flags & LL_SBI_AGL_ENABLED ? 1 : 0); +} + +static int ll_wr_statahead_agl(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + int val, rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val) + sbi->ll_flags |= LL_SBI_AGL_ENABLED; + else + sbi->ll_flags &= ~LL_SBI_AGL_ENABLED; + + return count; +} + static int ll_rd_statahead_stats(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -570,9 +602,11 @@ static int ll_rd_statahead_stats(char *page, char **start, off_t off, return snprintf(page, count, "statahead total: %u\n" - "statahead wrong: %u\n", + "statahead wrong: %u\n" + "agl total: %u\n", atomic_read(&sbi->ll_sa_total), - atomic_read(&sbi->ll_sa_wrong)); + atomic_read(&sbi->ll_sa_wrong), + atomic_read(&sbi->ll_agl_total)); } static int ll_rd_lazystatfs(char *page, char **start, off_t off, @@ -630,6 +664,7 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = { { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 }, { "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 }, { "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 }, + { "statahead_agl", ll_rd_statahead_agl, ll_wr_statahead_agl, 0 }, { "statahead_stats", ll_rd_statahead_stats, 0, 0 }, { "lazystatfs", ll_rd_lazystatfs, ll_wr_lazystatfs, 0 }, { 0 } diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 52945c4..363bcb4 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -469,7 +469,7 @@ int ll_lookup_it_finish(struct ptlrpc_request *request, 2.4 and vfs_getattr_it->ll_getattr()->ll_inode_revalidate_it() in 2.6 Everybody else who needs correct file size would call - cl_glimpse_size or some equivalent themselves anyway. + ll_glimpse_size or some equivalent themselves anyway. Also see bug 7198. */ ll_dops_init(*de, 1, 1); *de = ll_find_alias(inode, *de); diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index cb1883b..537b44d 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -140,14 +140,42 @@ ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) cfs_spin_unlock(&sai->sai_cache_lock[i]); } +static inline int agl_should_run(struct ll_statahead_info *sai, + struct inode *inode) +{ + if (inode != NULL && S_ISREG(inode->i_mode) && + ll_i2info(inode)->lli_smd != NULL && sai->sai_agl_valid) + return 1; + return 0; +} + +static inline struct ll_sa_entry * +sa_first_received_entry(struct ll_statahead_info *sai) +{ + return cfs_list_entry(sai->sai_entries_received.next, + struct ll_sa_entry, se_list); +} + +static inline struct ll_inode_info * +agl_first_entry(struct ll_statahead_info *sai) +{ + return cfs_list_entry(sai->sai_entries_agl.next, + struct ll_inode_info, lli_agl_list); +} + +static inline int sa_sent_full(struct ll_statahead_info *sai) +{ + return cfs_atomic_read(&sai->sai_cache_count) >= sai->sai_max; +} + static inline int sa_received_empty(struct ll_statahead_info *sai) { return cfs_list_empty(&sai->sai_entries_received); } -static inline int sa_not_full(struct ll_statahead_info *sai) +static inline int agl_list_empty(struct ll_statahead_info *sai) { - return (cfs_atomic_read(&sai->sai_cache_count) < sai->sai_max); + return cfs_list_empty(&sai->sai_entries_agl); } /** @@ -417,6 +445,37 @@ ll_sa_entry_to_stated(struct ll_statahead_info *sai, return ret; } +/* + * Insert inode into the list of sai_entries_agl. + */ +static void ll_agl_add(struct ll_statahead_info *sai, + struct inode *inode, int index) +{ + struct ll_inode_info *child = ll_i2info(inode); + struct ll_inode_info *parent = ll_i2info(sai->sai_inode); + int added = 0; + + cfs_spin_lock(&child->lli_agl_lock); + if (child->lli_agl_index == 0) { + child->lli_agl_index = index; + cfs_spin_unlock(&child->lli_agl_lock); + + LASSERT(cfs_list_empty(&child->lli_agl_list)); + + igrab(inode); + cfs_spin_lock(&parent->lli_agl_lock); + if (agl_list_empty(sai)) + added = 1; + cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl); + cfs_spin_unlock(&parent->lli_agl_lock); + } else { + cfs_spin_unlock(&child->lli_agl_lock); + } + + if (added > 0) + cfs_waitq_signal(&sai->sai_agl_thread.t_ctl_waitq); +} + static struct ll_statahead_info *ll_sai_alloc(void) { struct ll_statahead_info *sai; @@ -428,17 +487,24 @@ static struct ll_statahead_info *ll_sai_alloc(void) RETURN(NULL); cfs_atomic_set(&sai->sai_refcount, 1); + cfs_spin_lock(&sai_generation_lock); sai->sai_generation = ++sai_generation; if (unlikely(sai_generation == 0)) sai->sai_generation = ++sai_generation; cfs_spin_unlock(&sai_generation_lock); + sai->sai_max = LL_SA_RPC_MIN; + sai->sai_index = 1; cfs_waitq_init(&sai->sai_waitq); cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); + cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq); + CFS_INIT_LIST_HEAD(&sai->sai_entries_sent); CFS_INIT_LIST_HEAD(&sai->sai_entries_received); CFS_INIT_LIST_HEAD(&sai->sai_entries_stated); + CFS_INIT_LIST_HEAD(&sai->sai_entries_agl); + for (i = 0; i < LL_SA_CACHE_SIZE; i++) { CFS_INIT_LIST_HEAD(&sai->sai_cache[i]); cfs_spin_lock_init(&sai->sai_cache_lock[i]); @@ -472,12 +538,13 @@ static void ll_sai_put(struct ll_statahead_info *sai) } LASSERT(lli->lli_opendir_key == NULL); + LASSERT(thread_is_stopped(&sai->sai_thread)); + LASSERT(thread_is_stopped(&sai->sai_agl_thread)); + lli->lli_sai = NULL; lli->lli_opendir_pid = 0; cfs_spin_unlock(&lli->lli_sa_lock); - LASSERT(thread_is_stopped(&sai->sai_thread)); - if (sai->sai_sent > sai->sai_replied) CDEBUG(D_READA,"statahead for dir "DFID" does not " "finish: [sent:"LPU64"] [replied:"LPU64"]\n", @@ -495,6 +562,7 @@ static void ll_sai_put(struct ll_statahead_info *sai) do_sai_entry_fini(sai, entry); LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0); + LASSERT(agl_list_empty(sai)); iput(inode); OBD_FREE_PTR(sai); @@ -503,6 +571,73 @@ static void ll_sai_put(struct ll_statahead_info *sai) EXIT; } +#ifndef HAVE_AGL_SUPPORT +# define cl_agl(inode) do {} while (0) +#endif + +/* Do NOT forget to drop inode refcount when into sai_entries_agl. */ +static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) +{ + struct ll_inode_info *lli = ll_i2info(inode); + __u64 index = lli->lli_agl_index; + int rc; + ENTRY; + + LASSERT(cfs_list_empty(&lli->lli_agl_list)); + + /* AGL maybe fall behind statahead with one entry */ + if (is_omitted_entry(sai, index + 1)) { + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } + + /* Someone is in glimpse (sync or async), do nothing. */ + rc = cfs_down_write_trylock(&lli->lli_glimpse_sem); + if (rc == 0) { + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } + + /* + * Someone triggered glimpse within 1 sec before. + * 1) The former glimpse succeeded with glimpse lock granted by OST, and + * if the lock is still cached on client, AGL needs to do nothing. If + * it is cancelled by other client, AGL maybe cannot obtaion new lock + * for no glimpse callback triggered by AGL. + * 2) The former glimpse succeeded, but OST did not grant glimpse lock. + * Under such case, it is quite possible that the OST will not grant + * glimpse lock for AGL also. + * 3) The former glimpse failed, compared with other two cases, it is + * relative rare. AGL can ignore such case, and it will not muchly + * affect the performance. + */ + if (lli->lli_glimpse_time != 0 && + cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) { + cfs_up_write(&lli->lli_glimpse_sem); + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } + + CDEBUG(D_READA, "Handling (init) async glimpse: inode = " + DFID", idx = "LPU64"\n", PFID(&lli->lli_fid), index); + + cl_agl(inode); + lli->lli_agl_index = 0; + lli->lli_glimpse_time = cfs_time_current(); + cfs_up_write(&lli->lli_glimpse_sem); + + CDEBUG(D_READA, "Handled (init) async glimpse: inode= " + DFID", idx = "LPU64", rc = %d\n", + PFID(&lli->lli_fid), index, rc); + + iput(inode); + + EXIT; +} + static void do_statahead_interpret(struct ll_statahead_info *sai, struct ll_sa_entry *target) { @@ -525,8 +660,7 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, cfs_spin_unlock(&lli->lli_sa_lock); RETURN_EXIT; } else { - entry = cfs_list_entry(sai->sai_entries_received.next, - struct ll_sa_entry, se_list); + entry = sa_first_received_entry(sai); } cfs_atomic_inc(&entry->se_refcount); @@ -580,6 +714,9 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, entry->se_inode = child; + if (agl_should_run(sai, child)) + ll_agl_add(sai, child, entry->se_index); + EXIT; out: @@ -689,11 +826,12 @@ static void sa_args_fini(struct md_enqueue_info *minfo, * "md_intent_getattr_async". */ static int sa_args_init(struct inode *dir, struct inode *child, - struct qstr *qstr, struct md_enqueue_info **pmi, + struct ll_sa_entry *entry, struct md_enqueue_info **pmi, struct ldlm_enqueue_info **pei, struct obd_capa **pcapa) { - struct ll_inode_info *lli = ll_i2info(dir); + struct qstr *qstr = &entry->se_qstr; + struct ll_inode_info *lli = ll_i2info(dir); struct md_enqueue_info *minfo; struct ldlm_enqueue_info *einfo; struct md_op_data *op_data; @@ -720,7 +858,7 @@ static int sa_args_init(struct inode *dir, struct inode *child, minfo->mi_dir = igrab(dir); minfo->mi_cb = ll_statahead_interpret; minfo->mi_generation = lli->lli_sai->sai_generation; - minfo->mi_cbdata = lli->lli_sai->sai_index; + minfo->mi_cbdata = entry->se_index; einfo->ei_type = LDLM_IBITS; einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); @@ -745,7 +883,7 @@ static int do_sa_lookup(struct inode *dir, struct ll_sa_entry *entry) int rc; ENTRY; - rc = sa_args_init(dir, NULL, &entry->se_qstr, &minfo, &einfo, capas); + rc = sa_args_init(dir, NULL, entry, &minfo, &einfo, capas); if (rc) RETURN(rc); @@ -795,7 +933,7 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, RETURN(1); } - rc = sa_args_init(dir, inode, &entry->se_qstr, &minfo, &einfo, capas); + rc = sa_args_init(dir, inode, entry, &minfo, &einfo, capas); if (rc) { entry->se_inode = NULL; iput(inode); @@ -818,9 +956,9 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, static void ll_statahead_one(struct dentry *parent, const char* entry_name, int entry_name_len) { - struct inode *dir = parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; + struct inode *dir = parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; struct dentry *dentry = NULL; struct ll_sa_entry *entry; int rc; @@ -833,10 +971,13 @@ static void ll_statahead_one(struct dentry *parent, const char* entry_name, RETURN_EXIT; dentry = d_lookup(parent, &entry->se_qstr); - if (!dentry) + if (!dentry) { rc = do_sa_lookup(dir, entry); - else + } else { rc = do_sa_revalidate(dir, entry, dentry); + if (rc == 1 && agl_should_run(sai, dentry->d_inode)) + ll_agl_add(sai, dentry->d_inode, entry->se_index); + } if (dentry != NULL) dput(dentry); @@ -857,40 +998,137 @@ static void ll_statahead_one(struct dentry *parent, const char* entry_name, EXIT; } +static int ll_agl_thread(void *arg) +{ + struct dentry *parent = (struct dentry *)arg; + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); + struct ptlrpc_thread *thread = &sai->sai_agl_thread; + struct l_wait_info lwi = { 0 }; + ENTRY; + + { + char pname[16]; + snprintf(pname, 15, "ll_agl_%u", plli->lli_opendir_pid); + cfs_daemonize(pname); + } + + CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + + atomic_inc(&sbi->ll_agl_total); + cfs_spin_lock(&plli->lli_agl_lock); + sai->sai_agl_valid = 1; + thread_set_flags(thread, SVC_RUNNING); + cfs_spin_unlock(&plli->lli_agl_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); + + while (1) { + l_wait_event(thread->t_ctl_waitq, + !agl_list_empty(sai) || + !thread_is_running(thread), + &lwi); + + if (!thread_is_running(thread)) + break; + + cfs_spin_lock(&plli->lli_agl_lock); + /* The statahead thread maybe help to process AGL entries, + * so check whether list empty again. */ + if (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + cfs_spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + } else { + cfs_spin_unlock(&plli->lli_agl_lock); + } + } + + cfs_spin_lock(&plli->lli_agl_lock); + sai->sai_agl_valid = 0; + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + cfs_spin_unlock(&plli->lli_agl_lock); + clli->lli_agl_index = 0; + iput(&clli->lli_vfs_inode); + cfs_spin_lock(&plli->lli_agl_lock); + } + thread_set_flags(thread, SVC_STOPPED); + cfs_spin_unlock(&plli->lli_agl_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); + ll_sai_put(sai); + CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + RETURN(0); +} + +static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) +{ + struct ptlrpc_thread *thread = &sai->sai_agl_thread; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + + rc = cfs_create_thread(ll_agl_thread, parent, 0); + if (rc < 0) { + CERROR("can't start ll_agl thread, rc: %d\n", rc); + RETURN_EXIT; + } + + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); + EXIT; +} + static int ll_statahead_thread(void *arg) { struct dentry *parent = (struct dentry *)arg; - struct inode *dir = parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai); + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); struct ptlrpc_thread *thread = &sai->sai_thread; struct page *page; - __u64 pos = 0; - int first = 0; - int rc = 0; + __u64 pos = 0; + int first = 0; + int rc = 0; struct ll_dir_chain chain; + struct l_wait_info lwi = { 0 }; ENTRY; { char pname[16]; - snprintf(pname, 15, "ll_sa_%u", lli->lli_opendir_pid); + snprintf(pname, 15, "ll_sa_%u", plli->lli_opendir_pid); cfs_daemonize(pname); } + CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + + if (sbi->ll_flags & LL_SBI_AGL_ENABLED) + ll_start_agl(parent, sai); + atomic_inc(&sbi->ll_sa_total); - cfs_spin_lock(&lli->lli_sa_lock); + cfs_spin_lock(&plli->lli_sa_lock); thread_set_flags(thread, SVC_RUNNING); - cfs_spin_unlock(&lli->lli_sa_lock); + cfs_spin_unlock(&plli->lli_sa_lock); cfs_waitq_signal(&thread->t_ctl_waitq); - CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name); - lli->lli_sa_pos = 0; + plli->lli_sa_pos = 0; ll_dir_chain_init(&chain); page = ll_get_dir_page(NULL, dir, pos, &chain); while (1) { - struct l_wait_info lwi = { 0 }; struct lu_dirpage *dp; struct lu_dirent *ent; @@ -899,7 +1137,7 @@ static int ll_statahead_thread(void *arg) CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 "/"LPU64": [rc %d] [parent %u]\n", PFID(ll_inode2fid(dir)), pos, sai->sai_index, - rc, lli->lli_opendir_pid); + rc, plli->lli_opendir_pid); GOTO(out, rc); } @@ -951,27 +1189,55 @@ static int ll_statahead_thread(void *arg) if (unlikely(++first == 1)) continue; -keep_de: +keep_it: l_wait_event(thread->t_ctl_waitq, - sa_not_full(sai) || + !sa_sent_full(sai) || !sa_received_empty(sai) || + !agl_list_empty(sai) || !thread_is_running(thread), &lwi); +interpret_it: while (!sa_received_empty(sai)) do_statahead_interpret(sai, NULL); if (unlikely(!thread_is_running(thread))) { ll_release_page(page, 0); - GOTO(out, rc); + GOTO(out, rc = 0); } - if (!sa_not_full(sai)) - /* - * do not skip the current de. - */ - goto keep_de; + /* If no window for metadata statahead, but there are + * some AGL entries to be triggered, then try to help + * to process the AGL entries. */ + if (sa_sent_full(sai)) { + cfs_spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + cfs_spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, + sai); + + if (!sa_received_empty(sai)) + goto interpret_it; + + if (unlikely( + !thread_is_running(thread))) { + ll_release_page(page, 0); + GOTO(out, rc = 0); + } + + if (!sa_sent_full(sai)) + goto do_it; + + cfs_spin_lock(&plli->lli_agl_lock); + } + cfs_spin_unlock(&plli->lli_agl_lock); + + goto keep_it; + } +do_it: ll_statahead_one(parent, name, namelen); } pos = le64_to_cpu(dp->ldp_hash_end); @@ -990,11 +1256,26 @@ keep_de: while (!sa_received_empty(sai)) do_statahead_interpret(sai, NULL); - if ((sai->sai_sent == sai->sai_replied && - sa_received_empty(sai)) || - !thread_is_running(thread)) + if (unlikely(!thread_is_running(thread))) GOTO(out, rc = 0); + + if (sai->sai_sent == sai->sai_replied && + sa_received_empty(sai)) + break; + } + + cfs_spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai) && + thread_is_running(thread)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + cfs_spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + cfs_spin_lock(&plli->lli_agl_lock); } + cfs_spin_unlock(&plli->lli_agl_lock); + + GOTO(out, rc = 0); } else if (1) { /* * chain is exhausted. @@ -1002,7 +1283,7 @@ keep_de: */ ll_release_page(page, le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - lli->lli_sa_pos = pos; + plli->lli_sa_pos = pos; sai->sai_in_readpage = 1; page = ll_get_dir_page(NULL, dir, pos, &chain); sai->sai_in_readpage = 0; @@ -1017,26 +1298,41 @@ keep_de: EXIT; out: + if (sai->sai_agl_valid) { + struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; + + cfs_spin_lock(&plli->lli_agl_lock); + thread_set_flags(agl_thread, SVC_STOPPING); + cfs_spin_unlock(&plli->lli_agl_lock); + cfs_waitq_signal(&agl_thread->t_ctl_waitq); + + CDEBUG(D_READA, "stop agl thread: [pid %d]\n", + cfs_curproc_pid()); + l_wait_event(agl_thread->t_ctl_waitq, + thread_is_stopped(agl_thread), + &lwi); + } + ll_dir_chain_fini(&chain); - cfs_spin_lock(&lli->lli_sa_lock); + cfs_spin_lock(&plli->lli_sa_lock); if (!sa_received_empty(sai)) { thread_set_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&lli->lli_sa_lock); + cfs_spin_unlock(&plli->lli_sa_lock); /* To release the resources held by received entries. */ while (!sa_received_empty(sai)) do_statahead_interpret(sai, NULL); - cfs_spin_lock(&lli->lli_sa_lock); + cfs_spin_lock(&plli->lli_sa_lock); } thread_set_flags(thread, SVC_STOPPED); - cfs_spin_unlock(&lli->lli_sa_lock); + cfs_spin_unlock(&plli->lli_sa_lock); cfs_waitq_signal(&sai->sai_waitq); cfs_waitq_signal(&thread->t_ctl_waitq); ll_sai_put(sai); dput(parent); - CDEBUG(D_READA, "statahead thread stopped, pid %d\n", - cfs_curproc_pid()); + CDEBUG(D_READA, "statahead thread stopped: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); return rc; } @@ -1067,7 +1363,7 @@ void ll_stop_statahead(struct inode *dir, void *key) cfs_spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&thread->t_ctl_waitq); - CDEBUG(D_READA, "stopping statahead thread, pid %d\n", + CDEBUG(D_READA, "stop statahead thread: [pid %d]\n", cfs_curproc_pid()); l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread), @@ -1105,13 +1401,13 @@ enum { static int is_first_dirent(struct inode *dir, struct dentry *dentry) { - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_dir_chain chain; - struct qstr *target = &dentry->d_name; - struct page *page; - __u64 pos = 0; - int dot_de; - int rc = LS_NONE_FIRST_DE; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_dir_chain chain; + struct qstr *target = &dentry->d_name; + struct page *page; + __u64 pos = 0; + int dot_de; + int rc = LS_NONE_FIRST_DE; ENTRY; lli->lli_sa_pos = 0; @@ -1225,7 +1521,7 @@ ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { struct ptlrpc_thread *thread = &sai->sai_thread; struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); - int hit; + int hit; ENTRY; if (entry != NULL && entry->se_stat == SA_ENTRY_SUCC) @@ -1416,6 +1712,9 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, GOTO(out, rc = -EAGAIN); } + CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + lli->lli_sai = sai; rc = cfs_create_thread(ll_statahead_thread, parent, 0); thread = &sai->sai_thread; -- 1.8.3.1