From 1288681bb8aa76f43af5c53ec3ef68ecc8d412d7 Mon Sep 17 00:00:00 2001 From: Qian Yingjin Date: Thu, 22 Sep 2022 05:24:14 -0400 Subject: [PATCH] LU-14361 statahead: add statahead advise IOCTL This patch reuse ioctl(LL_IOC_LADVISE2) for statahead advise. This allows userspace programs to advise the kernel statahead of the order that they will be traversing a directory, so that the client can prefetch inode attributes from the MDT, similar to what posix_fadvise(POSIX_FADV_SEQUENTIAL) does for file data. After patched mdtest via adding this statahead IOCTL hint, it can support mdtest benchmark with regularized file naming format: mdtest.$rank.$i The usage of this statahead advise IOCTL could be as follows: open(dir); ioctl(dir_fd, IOC_LADVISE2, ...); stat mdtest.0.0; stat mdtest.0.1; stat mdtest.0.2; stat mdtest.0.3; ... clsoedir(dir); Signed-off-by: Qian Yingjin Change-Id: Iac38e33bfc6d7a0b755c2646ba8053a263e3afc9 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/48625 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- lustre/include/uapi/linux/lustre/lustre_user.h | 56 +++ lustre/llite/dir.c | 54 +++ lustre/llite/file.c | 3 +- lustre/llite/llite_internal.h | 44 +- lustre/llite/statahead.c | 610 ++++++++++++++++++++----- lustre/lmv/lmv_obd.c | 42 +- lustre/mdt/mdt_handler.c | 1 + 7 files changed, 673 insertions(+), 137 deletions(-) diff --git a/lustre/include/uapi/linux/lustre/lustre_user.h b/lustre/include/uapi/linux/lustre/lustre_user.h index 48ead30..9881854 100644 --- a/lustre/include/uapi/linux/lustre/lustre_user.h +++ b/lustre/include/uapi/linux/lustre/lustre_user.h @@ -631,6 +631,7 @@ struct ll_ioc_lease_id { #define LL_IOC_FID2MDTIDX _IOWR('f', 248, struct lu_fid) #define LL_IOC_GETPARENT _IOWR('f', 249, struct getparent) #define LL_IOC_LADVISE _IOR('f', 250, struct llapi_lu_ladvise) +#define LL_IOC_LADVISE2 _IOW('f', 250, struct llapi_lu_ladvise2) #define LL_IOC_HEAT_GET _IOWR('f', 251, struct lu_heat) #define LL_IOC_HEAT_SET _IOW('f', 251, __u64) #define LL_IOC_PCC_ATTACH _IOW('f', 252, struct lu_pcc_attach) @@ -2610,6 +2611,8 @@ enum lu_ladvise_type { LU_LADVISE_DONTNEED = 2, LU_LADVISE_LOCKNOEXPAND = 3, LU_LADVISE_LOCKAHEAD = 4, + /* Ahead operations for open|create|stat|read|write. */ + LU_LADVISE_AHEAD = 5, LU_LADVISE_MAX }; @@ -2618,6 +2621,7 @@ enum lu_ladvise_type { [LU_LADVISE_DONTNEED] = "dontneed", \ [LU_LADVISE_LOCKNOEXPAND] = "locknoexpand", \ [LU_LADVISE_LOCKAHEAD] = "lockahead", \ + [LU_LADVISE_AHEAD] = "ahead", \ } /* This is the userspace argument for ladvise. It is currently the same as @@ -2633,6 +2637,58 @@ struct llapi_lu_ladvise { __u32 lla_value4; }; +struct llapi_lu_ladvise2 { + __u16 lla_advice; /* advice type */ + __u16 lla_value1; /* values for different advice types */ + __u32 lla_value2; + __u64 lla_start; + __u64 lla_end; + __u32 lla_value3; + __u32 lla_value4; + union { + struct { + __u32 lla_value5; + __u32 lla_value6; + }; + char lla_buf[NAME_MAX + 1]; + }; +}; + +/* I/O call sequences in a batch access. */ +enum lu_access_flags { + ACCESS_FL_NONE = 0x0, + ACCESS_FL_STAT = 0x01, + ACCESS_FL_OPEN = 0x02, + ACCESS_FL_CREAT = 0x04, + ACCESS_FL_READ = 0x08, + ACCESS_FL_WRITE = 0x10, + ACCESS_FL_OC = ACCESS_FL_OPEN | ACCESS_FL_CREAT, + ACCESS_FL_SOR = ACCESS_FL_STAT | ACCESS_FL_OPEN | ACCESS_FL_READ, + ACCESS_FL_OCW = ACCESS_FL_OPEN | ACCESS_FL_CREAT | ACCESS_FL_WRITE, +}; + +enum lu_ahead_mode { + LU_AH_MODE_NONE = 0, + /* + * The batch access pattern obeys certain naming rules, such as mdtest + * with the file naming format mdtest.$rank.$i. + */ + LU_AH_NAME_INDEX = 1, + /* + * Provide a file name list as input to do batch accesses with + * irregular file name format. + */ + LU_AH_NAME_ARRAY = 2, + /* Prefetching in readdir() order under a directory. */ + LU_AH_NAME_READDIR = 3, + LU_AH_MODE_MAX, +}; + +#define lla_ahead_mode lla_value1 +#define lla_access_flags lla_value2 +#define lla_batch_max lla_value3 +#define lla_fname lla_buf + enum ladvise_flag { LF_ASYNC = 0x00000001, LF_UNSET = 0x00000002, diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 8e1a1fb..2856bb7 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -1534,6 +1534,8 @@ static char *ll_getname(const char __user *filename) return tmp; } +static const char *const ladvise_names[] = LU_LADVISE_NAMES; + #define ll_putname(filename) OBD_FREE(filename, NAME_MAX + 1); static long ll_dir_ioctl(struct file *file, unsigned int cmd, unsigned long arg) @@ -2292,6 +2294,58 @@ migrate_free: RETURN(rc); } + case LL_IOC_LADVISE2: { + struct llapi_lu_ladvise2 *ladvise; + + OBD_ALLOC_PTR(ladvise); + if (ladvise == NULL) + RETURN(-ENOMEM); + + if (copy_from_user(ladvise, uarg, sizeof(*ladvise))) + GOTO(out_ladvise, rc = -EFAULT); + + switch (ladvise->lla_advice) { + case LU_LADVISE_AHEAD: + if (ladvise->lla_start >= ladvise->lla_end) { + CDEBUG(D_VFSTRACE, + "%s: Invalid range (%llu %llu) for %s\n", + sbi->ll_fsname, ladvise->lla_start, + ladvise->lla_end, + ladvise_names[ladvise->lla_advice]); + GOTO(out_ladvise, rc = -EINVAL); + } + + /* + * Currently we only support name indexing format + * ahead operations. + */ + if (ladvise->lla_ahead_mode != LU_AH_NAME_INDEX) { + CDEBUG(D_VFSTRACE, + "%s: Invalid access mode (%d) for %s\n", + sbi->ll_fsname, ladvise->lla_ahead_mode, + ladvise_names[ladvise->lla_advice]); + GOTO(out_ladvise, rc = -EINVAL); + } + + /* Currently we only support stat-ahead operations. */ + if (!(ladvise->lla_access_flags & ACCESS_FL_STAT)) { + CDEBUG(D_VFSTRACE, + "%s: Invalid access flags (%x) for %s\n", + sbi->ll_fsname, + ladvise->lla_access_flags, + ladvise_names[ladvise->lla_advice]); + GOTO(out_ladvise, rc = -EINVAL); + } + + rc = ll_ioctl_ahead(file, ladvise); + break; + default: + rc = -EINVAL; + } +out_ladvise: + OBD_FREE_PTR(ladvise); + RETURN(rc); + } case LL_IOC_PCC_DETACH_BY_FID: { struct lu_pcc_detach_fid *detach; struct lu_fid *fid; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index d365b0b..bf8192a 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -414,7 +414,8 @@ int ll_file_release(struct inode *inode, struct file *file) /* The last ref on @file, maybe not the the owner pid of statahead, * because parent and child process can share the same file handle. */ - if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd) + if (S_ISDIR(inode->i_mode) && + (lli->lli_opendir_key == fd || fd->fd_sai)) ll_deauthorize_statahead(inode, fd); if (is_root_inode(inode)) { diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 01d4bb8..ce933ff 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -118,6 +118,13 @@ struct ll_trunc_sem { atomic_t ll_trunc_waiters; }; +enum ll_sa_pattern { + LSA_PATTERN_NONE = 0x0000, + LSA_PATTERN_LIST = 0x0001, + LSA_PATTERN_FNAME = 0X0002, + LSA_PATTERN_MAX, +}; + struct ll_inode_info { __u32 lli_inode_magic; rwlock_t lli_lock; @@ -171,6 +178,7 @@ struct ll_inode_info { * cleanup the dir readahead. */ void *lli_opendir_key; struct ll_statahead_info *lli_sai; + struct ll_statahead_context *lli_sax; /* protect statahead stuff. */ spinlock_t lli_sa_lock; /* "opendir_pid" is the token when lookup/revalid @@ -195,6 +203,8 @@ struct ll_inode_info { lli_def_lsm_obj_set:1; /* generation for statahead */ unsigned int lli_sa_generation; + /* access pattern for statahead */ + enum ll_sa_pattern lli_sa_pattern; /* rw lock protects lli_lsm_md */ struct rw_semaphore lli_lsm_sem; /* directory stripe information */ @@ -1050,6 +1060,8 @@ struct ll_file_data { * -errno is saved here, and will return to user in close(). */ int fd_partial_readdir_rc; + /* mdtest unique/shared dir stat mode: per process statahead struct. */ + struct ll_statahead_info *fd_sai; }; void llite_tunables_unregister(void); @@ -1295,6 +1307,8 @@ int ll_ioctl_fssetxattr(struct inode *inode, unsigned int cmd, void __user *uarg); #endif int ll_ioctl_project(struct file *file, unsigned int cmd, void __user *uarg); +int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise); + int ll_lov_setstripe_ea_info(struct inode *inode, struct dentry *dentry, __u64 flags, struct lov_user_md *lum, int lum_size); @@ -1617,11 +1631,11 @@ void ll_ra_stats_inc(struct inode *inode, enum ra_stat which); #define LL_SA_BATCH_MAX 1024 #define LL_SA_BATCH_DEF 64 -#define LL_SA_CACHE_BIT 5 +#define LL_SA_CACHE_BIT 6 #define LL_SA_CACHE_SIZE (1 << LL_SA_CACHE_BIT) #define LL_SA_CACHE_MASK (LL_SA_CACHE_SIZE - 1) -/* per inode struct, for dir only */ +/* statahead controller, per process struct, for dir only */ struct ll_statahead_info { struct dentry *sai_dentry; atomic_t sai_refcount; /* when access this struct, hold @@ -1654,12 +1668,23 @@ struct ll_statahead_info { struct task_struct *sai_agl_task; /* AGL thread */ struct list_head sai_entries; /* completed entries */ struct list_head sai_agls; /* AGLs to be sent */ - struct list_head sai_cache[LL_SA_CACHE_SIZE]; - spinlock_t sai_cache_lock[LL_SA_CACHE_SIZE]; atomic_t sai_cache_count; /* entry count in cache */ struct lu_batch *sai_bh; __u32 sai_max_batch_count; __u64 sai_index_end; + + __u64 sai_fstart; + __u64 sai_fend; + char sai_fname[NAME_MAX]; +}; + +/* Per inode statahead information */ +struct ll_statahead_context { + struct inode *sax_inode; + atomic_t sax_refcount; + /* Local dcache */ + struct list_head sax_cache[LL_SA_CACHE_SIZE]; + spinlock_t sax_cache_lock[LL_SA_CACHE_SIZE]; }; int ll_revalidate_statahead(struct inode *dir, struct dentry **dentry, @@ -1724,10 +1749,6 @@ dentry_may_statahead(struct inode *dir, struct dentry *dentry) if (!lli->lli_sa_enabled) return false; - /* not the same process, don't statahead */ - if (lli->lli_opendir_pid != current->pid) - return false; - /* * When stating a dentry, kernel may trigger 'revalidate' or 'lookup' * multiple times, eg. for 'getattr', 'getxattr' and etc. @@ -1747,6 +1768,13 @@ dentry_may_statahead(struct inode *dir, struct dentry *dentry) ldd->lld_sa_generation == lli->lli_sa_generation) return false; + if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) + return true; + + /* not the same process, don't statahead */ + if (lli->lli_opendir_pid != current->pid) + return false; + return true; } diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 0253b0d..29e32e9a 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -60,23 +60,25 @@ typedef enum { */ struct sa_entry { /* link into sai_entries */ - struct list_head se_list; + struct list_head se_list; /* link into sai hash table locally */ - struct list_head se_hash; + struct list_head se_hash; /* entry index in the sai */ - __u64 se_index; + __u64 se_index; /* low layer ldlm lock handle */ - __u64 se_handle; + __u64 se_handle; /* entry status */ - se_state_t se_state; + se_state_t se_state; /* entry size, contains name */ - int se_size; + int se_size; /* pointer to the target inode */ - struct inode *se_inode; + struct inode *se_inode; + /* pointer to @sai per process struct */ + struct ll_statahead_info *se_sai; /* entry name */ - struct qstr se_qstr; + struct qstr se_qstr; /* entry fid */ - struct lu_fid se_fid; + struct lu_fid se_fid; }; static unsigned int sai_generation; @@ -101,26 +103,26 @@ static inline int sa_hash(int val) return val & LL_SA_CACHE_MASK; } -/* hash entry into sai_cache */ +/* hash entry into sax_cache */ static inline void -sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry) +sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry) { int i = sa_hash(entry->se_qstr.hash); - spin_lock(&sai->sai_cache_lock[i]); - list_add_tail(&entry->se_hash, &sai->sai_cache[i]); - spin_unlock(&sai->sai_cache_lock[i]); + spin_lock(&ctx->sax_cache_lock[i]); + list_add_tail(&entry->se_hash, &ctx->sax_cache[i]); + spin_unlock(&ctx->sax_cache_lock[i]); } /* unhash entry from sai_cache */ static inline void -sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry) +sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry) { int i = sa_hash(entry->se_qstr.hash); - spin_lock(&sai->sai_cache_lock[i]); + spin_lock(&ctx->sax_cache_lock[i]); list_del_init(&entry->se_hash); - spin_unlock(&sai->sai_cache_lock[i]); + spin_unlock(&ctx->sax_cache_lock[i]); } static inline int agl_should_run(struct ll_statahead_info *sai, @@ -196,7 +198,8 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index, ENTRY; - entry_size = sizeof(struct sa_entry) + (len & ~3) + 4; + entry_size = sizeof(struct sa_entry) + + round_up(len + 1 /* for trailing NUL */, 4); OBD_ALLOC(entry, entry_size); if (unlikely(!entry)) RETURN(ERR_PTR(-ENOMEM)); @@ -205,6 +208,7 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index, len, name, entry, index); entry->se_index = index; + entry->se_sai = sai; entry->se_state = SA_ENTRY_INIT; entry->se_size = entry_size; @@ -214,13 +218,14 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index, entry->se_qstr.hash = ll_full_name_hash(parent, name, len); entry->se_qstr.len = len; entry->se_qstr.name = dname; - entry->se_fid = *fid; - lli = ll_i2info(sai->sai_dentry->d_inode); + if (fid) + entry->se_fid = *fid; + lli = ll_i2info(sai->sai_dentry->d_inode); spin_lock(&lli->lli_sa_lock); INIT_LIST_HEAD(&entry->se_list); - sa_rehash(sai, entry); + sa_rehash(lli->lli_sax, entry); spin_unlock(&lli->lli_sa_lock); atomic_inc(&sai->sai_cache_count); @@ -229,7 +234,7 @@ sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index, } /* free sa_entry, which should have been unhashed and not in any list */ -static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry) +static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry) { CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n", entry->se_qstr.len, entry->se_qstr.name, entry, @@ -239,7 +244,6 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry) LASSERT(sa_unhashed(entry)); OBD_FREE(entry, entry->se_size); - atomic_dec(&sai->sai_cache_count); } /* @@ -247,39 +251,48 @@ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry) * only scanner can remove the entry from cache. */ static struct sa_entry * -sa_get(struct ll_statahead_info *sai, const struct qstr *qstr) +sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr) { struct sa_entry *entry; int i = sa_hash(qstr->hash); - list_for_each_entry(entry, &sai->sai_cache[i], se_hash) { + spin_lock(&ctx->sax_cache_lock[i]); + list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) { if (entry->se_qstr.hash == qstr->hash && entry->se_qstr.len == qstr->len && - memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) + memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) { + spin_unlock(&ctx->sax_cache_lock[i]); return entry; + } } + spin_unlock(&ctx->sax_cache_lock[i]); return NULL; } /* unhash and unlink sa_entry, and then free it */ static inline void -sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry) +sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked) { - struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode); + struct inode *dir = sai->sai_dentry->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_context *ctx = lli->lli_sax; LASSERT(!sa_unhashed(entry)); LASSERT(!list_empty(&entry->se_list)); LASSERT(sa_ready(entry)); - sa_unhash(sai, entry); + sa_unhash(ctx, entry); - spin_lock(&lli->lli_sa_lock); + if (!locked) + spin_lock(&lli->lli_sa_lock); list_del_init(&entry->se_list); spin_unlock(&lli->lli_sa_lock); iput(entry->se_inode); - - sa_free(sai, entry); + atomic_dec(&sai->sai_cache_count); + sa_free(ctx, entry); + if (locked) + spin_lock(&lli->lli_sa_lock); } /* called by scanner after use, sa_entry will be killed */ @@ -287,7 +300,7 @@ static void sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry) { struct ll_inode_info *lli = ll_i2info(dir); - struct sa_entry *tmp, *next; + struct sa_entry *tmp; bool wakeup = false; if (entry && entry->se_state == SA_ENTRY_SUCC) { @@ -309,23 +322,30 @@ sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry) } else { wakeup = true; } - } else { + } else if (sai) { sai->sai_miss++; sai->sai_consecutive_miss++; wakeup = true; } if (entry) - sa_kill(sai, entry); + sa_kill(sai, entry, false); - /* - * kill old completed entries, only scanner process does this, no need - * to lock - */ - list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) { - if (!is_omitted_entry(sai, tmp->se_index)) - break; - sa_kill(sai, tmp); + if (sai) { + /* + * kill old completed entries. Maybe kicking old entries can + * be ignored? + */ + spin_lock(&lli->lli_sa_lock); + while ((tmp = list_first_entry_or_null(&sai->sai_entries, + struct sa_entry, se_list))) { + if (!is_omitted_entry(sai, tmp->se_index)) + break; + + /* ll_sa_lock is dropped by sa_kill(), restart list */ + sa_kill(sai, tmp, true); + } + spin_unlock(&lli->lli_sa_lock); } spin_lock(&lli->lli_sa_lock); @@ -472,12 +492,79 @@ static void ll_agl_add(struct ll_statahead_info *sai, } } +/* Allocate sax */ +static struct ll_statahead_context *ll_sax_alloc(struct inode *dir) +{ + struct ll_statahead_context *ctx; + int i; + + ENTRY; + + OBD_ALLOC_PTR(ctx); + if (ctx == NULL) + RETURN(NULL); + + ctx->sax_inode = igrab(dir); + atomic_set(&ctx->sax_refcount, 1); + for (i = 0; i < LL_SA_CACHE_SIZE; i++) { + INIT_LIST_HEAD(&ctx->sax_cache[i]); + spin_lock_init(&ctx->sax_cache_lock[i]); + } + + RETURN(ctx); +} + +static inline void ll_sax_free(struct ll_statahead_context *ctx) +{ + LASSERT(ctx->sax_inode != NULL); + iput(ctx->sax_inode); + OBD_FREE_PTR(ctx); +} + +static inline void __ll_sax_get(struct ll_statahead_context *ctx) +{ + atomic_inc(&ctx->sax_refcount); +} + +static inline struct ll_statahead_context *ll_sax_get(struct inode *dir) +{ + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_context *ctx = NULL; + + spin_lock(&lli->lli_sa_lock); + ctx = lli->lli_sax; + if (ctx) + __ll_sax_get(ctx); + spin_unlock(&lli->lli_sa_lock); + + return ctx; +} + +static inline void ll_sax_put(struct inode *dir, + struct ll_statahead_context *ctx) +{ + struct ll_inode_info *lli = ll_i2info(dir); + + if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) { + lli->lli_sai = NULL; + lli->lli_sax = NULL; + if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) { + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; + lli->lli_sa_enabled = 0; + } + lli->lli_sa_pattern = LSA_PATTERN_NONE; + spin_unlock(&lli->lli_sa_lock); + + ll_sax_free(ctx); + } +} + /* allocate sai */ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry) { struct ll_statahead_info *sai; struct ll_inode_info *lli = ll_i2info(dentry->d_inode); - int i; ENTRY; @@ -494,10 +581,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry) INIT_LIST_HEAD(&sai->sai_entries); INIT_LIST_HEAD(&sai->sai_agls); - for (i = 0; i < LL_SA_CACHE_SIZE; i++) { - INIT_LIST_HEAD(&sai->sai_cache[i]); - spin_lock_init(&sai->sai_cache_lock[i]); - } atomic_set(&sai->sai_cache_count, 0); spin_lock(&sai_generation_lock); @@ -517,21 +600,10 @@ static inline void ll_sai_free(struct ll_statahead_info *sai) OBD_FREE_PTR(sai); } -/* - * take refcount of sai if sai for @dir exists, which means statahead is on for - * this directory. - */ -static inline struct ll_statahead_info *ll_sai_get(struct inode *dir) +static inline struct ll_statahead_info * +__ll_sai_get(struct ll_statahead_info *sai) { - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = NULL; - - spin_lock(&lli->lli_sa_lock); - sai = lli->lli_sai; - if (sai) - atomic_inc(&sai->sai_refcount); - spin_unlock(&lli->lli_sa_lock); - + atomic_inc(&sai->sai_refcount); return sai; } @@ -544,8 +616,8 @@ static void ll_sai_put(struct ll_statahead_info *sai) struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode); if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) { - struct sa_entry *entry, *next; struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode); + struct sa_entry *entry, *next; lli->lli_sai = NULL; spin_unlock(&lli->lli_sa_lock); @@ -556,7 +628,7 @@ static void ll_sai_put(struct ll_statahead_info *sai) list_for_each_entry_safe(entry, next, &sai->sai_entries, se_list) - sa_kill(sai, entry); + sa_kill(sai, entry, false); LASSERT(atomic_read(&sai->sai_cache_count) == 0); LASSERT(agl_list_empty(sai)); @@ -677,7 +749,7 @@ static void ll_statahead_interpret_work(struct work_struct *work) struct req_capsule *pill = item->mop_pill; struct inode *dir = item->mop_dir; struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; + struct ll_statahead_info *sai; struct lookup_intent *it; struct sa_entry *entry; struct mdt_body *body; @@ -689,6 +761,7 @@ static void ll_statahead_interpret_work(struct work_struct *work) entry = (struct sa_entry *)item->mop_cbdata; LASSERT(entry->se_handle != 0); + sai = entry->se_sai; it = &item->mop_it; body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (!body) @@ -696,7 +769,8 @@ static void ll_statahead_interpret_work(struct work_struct *work) child = entry->se_inode; /* revalidate; unlinked and re-created with the same name */ - if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) { + if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) && + !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) { if (child) { entry->se_inode = NULL; iput(child); @@ -764,9 +838,9 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc) struct lookup_intent *it = &item->mop_it; struct inode *dir = item->mop_dir; struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata; struct work_struct *work = &item->mop_work; + struct ll_statahead_info *sai; struct mdt_body *body; struct inode *child; __u64 handle = 0; @@ -780,8 +854,9 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc) * because statahead thread will wait for all inflight RPC to finish, * sai should be always valid, no need to refcount */ - LASSERT(sai != NULL); LASSERT(entry != NULL); + sai = entry->se_sai; + LASSERT(sai != NULL); CDEBUG(D_READA, "sa_entry %.*s rc %d\n", entry->se_qstr.len, entry->se_qstr.name, rc); @@ -794,8 +869,14 @@ static int ll_statahead_interpret(struct md_op_item *item, int rc) GOTO(out, rc = -EFAULT); child = entry->se_inode; - /* revalidate; unlinked and re-created with the same name */ - if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) { + /* + * revalidate; unlinked and re-created with the same name. + * exclude the case where FID is zero as it was from statahead with + * regularized file name pattern and had no idea for the FID of the + * children file. + */ + if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) && + !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) { if (child) { entry->se_inode = NULL; iput(child); @@ -839,9 +920,9 @@ out: RETURN(rc); } -static inline int sa_getattr(struct inode *dir, struct md_op_item *item) +static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir, + struct md_op_item *item) { - struct ll_statahead_info *sai = ll_i2info(dir)->lli_sai; int rc; if (sa_has_batch_handle(sai)) @@ -864,7 +945,7 @@ static int sa_lookup(struct inode *dir, struct sa_entry *entry) if (IS_ERR(item)) RETURN(PTR_ERR(item)); - rc = sa_getattr(dir, item); + rc = sa_getattr(entry->se_sai, dir, item); if (rc < 0) sa_fini_data(item); @@ -909,7 +990,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry, RETURN(1); } - rc = sa_getattr(dir, item); + rc = sa_getattr(entry->se_sai, dir, item); if (rc < 0) { entry->se_inode = NULL; iput(inode); @@ -920,12 +1001,10 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry, } /* async stat for file with @name */ -static void sa_statahead(struct dentry *parent, const char *name, int len, - const struct lu_fid *fid) +static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent, + const char *name, int len, const struct lu_fid *fid) { struct inode *dir = parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; struct dentry *dentry = NULL; struct sa_entry *entry; int rc; @@ -964,15 +1043,15 @@ static void sa_statahead(struct dentry *parent, const char *name, int len, /* async glimpse (agl) thread main function */ static int ll_agl_thread(void *arg) { - struct dentry *parent = (struct dentry *)arg; - struct inode *dir = parent->d_inode; - struct ll_inode_info *plli = ll_i2info(dir); - struct ll_inode_info *clli; /* * We already own this reference, so it is safe to take it * without a lock. */ - struct ll_statahead_info *sai = plli->lli_sai; + struct ll_statahead_info *sai = (struct ll_statahead_info *)arg; + struct dentry *parent = sai->sai_dentry; + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; ENTRY; @@ -1047,7 +1126,7 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) sai, parent); plli = ll_i2info(parent->d_inode); - task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d", + task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d", plli->lli_opendir_pid); if (IS_ERR(task)) { CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task)); @@ -1056,26 +1135,23 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) sai->sai_agl_task = task; atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total); /* Get an extra reference that the thread holds */ - ll_sai_get(d_inode(parent)); + __ll_sai_get(sai); wake_up_process(task); EXIT; } -/* statahead thread main function */ -static int ll_statahead_thread(void *arg) +static int ll_statahead_by_list(struct dentry *parent) { - struct dentry *parent = (struct dentry *)arg; struct inode *dir = parent->d_inode; struct ll_inode_info *lli = ll_i2info(dir); - struct ll_sb_info *sbi = ll_i2sbi(dir); struct ll_statahead_info *sai = lli->lli_sai; - int first = 0; + struct ll_sb_info *sbi = ll_i2sbi(dir); struct md_op_data *op_data; struct page *page = NULL; - struct lu_batch *bh = NULL; __u64 pos = 0; + int first = 0; int rc = 0; ENTRY; @@ -1083,18 +1159,9 @@ static int ll_statahead_thread(void *arg) CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n", sai, parent); - sai->sai_max_batch_count = sbi->ll_sa_batch_max; - if (sai->sai_max_batch_count) { - bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY, - sai->sai_max_batch_count); - if (IS_ERR(bh)) - GOTO(out_stop_agl, rc = PTR_ERR(bh)); - } - - sai->sai_bh = bh; OBD_ALLOC_PTR(op_data); if (!op_data) - GOTO(out, rc = -ENOMEM); + RETURN(-ENOMEM); /* matches smp_store_release() in ll_deauthorize_statahead() */ while (pos != MDS_DIR_END_OFF && smp_load_acquire(&sai->sai_task)) { @@ -1222,7 +1289,7 @@ static int ll_statahead_thread(void *arg) namelen = lltr.len; } - sa_statahead(parent, name, namelen, &fid); + sa_statahead(sai, parent, name, namelen, &fid); llcrypt_fname_free_buffer(&lltr); } @@ -1243,6 +1310,117 @@ static int ll_statahead_thread(void *arg) } ll_finish_md_op_data(op_data); + RETURN(rc); +} + +static int ll_statahead_by_fname(struct ll_statahead_info *sai, + struct dentry *parent) +{ + struct inode *dir = parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_sb_info *sbi = ll_i2sbi(dir); + size_t max_len; + size_t len; + char *fname; + char *ptr; + int rc = 0; + __u64 i = 0; + + ENTRY; + + CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n", + sbi->ll_fsname, parent, sai->sai_fname); + + OBD_ALLOC(fname, NAME_MAX); + if (fname == NULL) + RETURN(-ENOMEM); + + len = strlen(sai->sai_fname); + memcpy(fname, sai->sai_fname, len); + max_len = sizeof(sai->sai_fname) - len; + ptr = fname + len; + + /* matches smp_store_release() in ll_deauthorize_statahead() */ + while (smp_load_acquire(&sai->sai_task)) { + size_t numlen; + + numlen = snprintf(ptr, max_len, "%llu", + sai->sai_fstart + i); + + while (({set_current_state(TASK_IDLE); + /* + * matches smp_store_release() in + * ll_deauthorize_statahead() + */ + smp_load_acquire(&sai->sai_task); })) { + spin_lock(&lli->lli_agl_lock); + while (sa_sent_full(sai) && !agl_list_empty(sai)) { + struct ll_inode_info *clli; + + __set_current_state(TASK_RUNNING); + clli = agl_first_entry(sai); + list_del_init(&clli->lli_agl_list); + spin_unlock(&lli->lli_agl_lock); + + ll_agl_trigger(&clli->lli_vfs_inode, sai); + cond_resched(); + spin_lock(&lli->lli_agl_lock); + } + spin_unlock(&lli->lli_agl_lock); + + if (!sa_sent_full(sai)) + break; + schedule(); + } + __set_current_state(TASK_RUNNING); + + sa_statahead(sai, parent, fname, len + numlen, NULL); + if (++i >= sai->sai_fend) + break; + } + + OBD_FREE(fname, NAME_MAX); + RETURN(rc); +} + +/* statahead thread main function */ +static int ll_statahead_thread(void *arg) +{ + struct ll_statahead_info *sai = (struct ll_statahead_info *)arg; + struct dentry *parent = sai->sai_dentry; + struct inode *dir = parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct lu_batch *bh = NULL; + int rc = 0; + + ENTRY; + + CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n", + sai, parent); + + sai->sai_max_batch_count = sbi->ll_sa_batch_max; + if (sai->sai_max_batch_count) { + bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY, + sai->sai_max_batch_count); + if (IS_ERR(bh)) + GOTO(out_stop_agl, rc = PTR_ERR(bh)); + } + + sai->sai_bh = bh; + + switch (lli->lli_sa_pattern) { + case LSA_PATTERN_LIST: + rc = ll_statahead_by_list(parent); + break; + case LSA_PATTERN_FNAME: + rc = ll_statahead_by_fname(sai, parent); + break; + default: + rc = -EFAULT; + break; + } + if (rc < 0) { spin_lock(&lli->lli_sa_lock); sai->sai_task = NULL; @@ -1264,7 +1442,7 @@ static int ll_statahead_thread(void *arg) __set_current_state(TASK_RUNNING); EXIT; -out: + if (bh) { rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh); sai->sai_bh = NULL; @@ -1281,8 +1459,8 @@ out_stop_agl: /* in case we're not woken up, timeout wait */ msleep(125); - CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n", - sbi->ll_fsname, sai, parent); + CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n", + sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss); spin_lock(&lli->lli_sa_lock); sai->sai_task = NULL; @@ -1293,6 +1471,7 @@ out_stop_agl: atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total); ll_sai_put(sai); + ll_sax_put(dir, lli->lli_sax); return rc; } @@ -1308,7 +1487,6 @@ void ll_authorize_statahead(struct inode *dir, void *key) * if lli_sai is not NULL, it means previous statahead is not * finished yet, we'd better not start a new statahead for now. */ - LASSERT(lli->lli_opendir_pid == 0); lli->lli_opendir_key = key; lli->lli_opendir_pid = current->pid; lli->lli_sa_enabled = 1; @@ -1316,6 +1494,29 @@ void ll_authorize_statahead(struct inode *dir, void *key) spin_unlock(&lli->lli_sa_lock); } +static void ll_deauthorize_statahead_fname(struct inode *dir, void *key) +{ + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_file_data *fd = (struct ll_file_data *)key; + struct ll_statahead_info *sai = fd->fd_sai; + + if (sai == NULL) + return; + + spin_lock(&lli->lli_sa_lock); + if (sai->sai_task) { + struct task_struct *task = sai->sai_task; + + sai->sai_task = NULL; + wake_up_process(task); + } + fd->fd_sai = NULL; + spin_unlock(&lli->lli_sa_lock); + ll_sai_put(sai); + LASSERT(lli->lli_sax != NULL); + ll_sax_put(dir, lli->lli_sax); +} + /* * deauthorize opened dir handle @key to statahead, and notify statahead thread * to quit if it's running. @@ -1325,12 +1526,17 @@ void ll_deauthorize_statahead(struct inode *dir, void *key) struct ll_inode_info *lli = ll_i2info(dir); struct ll_statahead_info *sai; - LASSERT(lli->lli_opendir_key == key); LASSERT(lli->lli_opendir_pid != 0); CDEBUG(D_READA, "deauthorize statahead for "DFID"\n", PFID(&lli->lli_fid)); + if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) { + ll_deauthorize_statahead_fname(dir, key); + return; + } + + LASSERT(lli->lli_opendir_key == key); spin_lock(&lli->lli_sa_lock); lli->lli_opendir_key = NULL; lli->lli_opendir_pid = 0; @@ -1523,17 +1729,18 @@ out: * \retval negative number upon error */ static int revalidate_statahead_dentry(struct inode *dir, - struct ll_statahead_info *sai, + struct ll_statahead_context *ctx, struct dentry **dentryp, bool unplug) { struct sa_entry *entry = NULL; struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; int rc = 0; ENTRY; - if ((*dentryp)->d_name.name[0] == '.') { + if (sai && (*dentryp)->d_name.name[0] == '.') { if (sai->sai_ls_all || sai->sai_miss_hidden >= sai->sai_skip_hidden) { /* @@ -1563,10 +1770,16 @@ static int revalidate_statahead_dentry(struct inode *dir, if (unplug) GOTO(out, rc = 1); - entry = sa_get(sai, &(*dentryp)->d_name); + entry = sa_get(ctx, &(*dentryp)->d_name); if (!entry) GOTO(out, rc = -EAGAIN); + if (lli->lli_sa_pattern == LSA_PATTERN_LIST) + LASSERT(sai == entry->se_sai); + else if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) + sai = entry->se_sai; + + LASSERT(sai != NULL); if (!sa_ready(entry)) { spin_lock(&lli->lli_sa_lock); sai->sai_index_wait = entry->se_index; @@ -1666,6 +1879,7 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry, int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY); struct ll_inode_info *lli = ll_i2info(dir); struct ll_statahead_info *sai = NULL; + struct ll_statahead_context *ctx = NULL; struct dentry *parent = dentry->d_parent; struct task_struct *task; struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode); @@ -1691,6 +1905,10 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry, if (!sai) GOTO(out, rc = -ENOMEM); + ctx = ll_sax_alloc(dir); + if (!ctx) + GOTO(out, rc = -ENOMEM); + sai->sai_ls_all = (first == LS_FIRST_DOT_DE); /* @@ -1700,17 +1918,20 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry, */ spin_lock(&lli->lli_sa_lock); if (unlikely(lli->lli_sai || !lli->lli_opendir_key || - lli->lli_opendir_pid != current->pid)) { + lli->lli_opendir_pid != current->pid || + lli->lli_sa_pattern != LSA_PATTERN_NONE)) { spin_unlock(&lli->lli_sa_lock); GOTO(out, rc = -EPERM); } lli->lli_sai = sai; + lli->lli_sax = ctx; + lli->lli_sa_pattern = LSA_PATTERN_LIST; spin_unlock(&lli->lli_sa_lock); CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n", current->pid, parent); - task = kthread_create_on_node(ll_statahead_thread, parent, node, + task = kthread_create_on_node(ll_statahead_thread, sai, node, "ll_sa_%u", lli->lli_opendir_pid); if (IS_ERR(task)) { spin_lock(&lli->lli_sa_lock); @@ -1746,6 +1967,10 @@ out: if (sai) ll_sai_free(sai); + + if (ctx) + ll_sax_free(ctx); + if (first != LS_NOT_FIRST_DE) atomic_dec(&sbi->ll_sa_running); @@ -1758,9 +1983,11 @@ out: static inline bool ll_statahead_started(struct inode *dir, bool agl) { struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_context *ctx; struct ll_statahead_info *sai; spin_lock(&lli->lli_sa_lock); + ctx = lli->lli_sax; sai = lli->lli_sai; if (sai && (sai->sai_agl_task != NULL) != agl) CDEBUG(D_READA, @@ -1769,7 +1996,7 @@ static inline bool ll_statahead_started(struct inode *dir, bool agl) sai->sai_agl_task != NULL, agl); spin_unlock(&lli->lli_sa_lock); - return !!sai; + return !!ctx; } /** @@ -1810,15 +2037,162 @@ int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl) int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp, bool unplug) { - struct ll_statahead_info *sai; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_context *ctx; + struct ll_statahead_info *sai = NULL; int rc = 0; - sai = ll_sai_get(dir); - if (sai) { - rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug); + spin_lock(&lli->lli_sa_lock); + ctx = lli->lli_sax; + if (ctx) { + sai = lli->lli_sai; + if (sai) { + atomic_inc(&sai->sai_refcount); + } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) { + spin_unlock(&lli->lli_sa_lock); + return 0; + } + __ll_sax_get(ctx); + } + spin_unlock(&lli->lli_sa_lock); + if (ctx) { + rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug); CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n", *dentryp, rc); - ll_sai_put(sai); + if (sai) + ll_sai_put(sai); + ll_sax_put(dir, ctx); } return rc; } + +int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise) +{ + int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY); + struct ll_file_data *fd = file->private_data; + struct dentry *dentry = file_dentry(file); + struct inode *dir = dentry->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = NULL; + struct ll_statahead_context *ctx = NULL; + struct task_struct *task; + bool agl = true; + int rc; + + ENTRY; + + if (sbi->ll_sa_max == 0) + RETURN(0); + + if (!S_ISDIR(dir->i_mode)) + RETURN(-EINVAL); + + if (fd->fd_sai) { + rc = -EALREADY; + CWARN("%s: already set statahead hint for dir %pd: rc = %d\n", + sbi->ll_fsname, dentry, rc); + RETURN(rc); + } + + if (unlikely(atomic_inc_return(&sbi->ll_sa_running) > + sbi->ll_sa_running_max)) { + CDEBUG(D_READA, + "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n"); + GOTO(out, rc = -EMFILE); + } + + sai = ll_sai_alloc(dentry); + if (sai == NULL) + GOTO(out, rc = -ENOMEM); + + sai->sai_fstart = ladvise->lla_start; + sai->sai_fend = ladvise->lla_end; + sai->sai_ls_all = 0; + sai->sai_max = sbi->ll_sa_max; + strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname)); + + ctx = ll_sax_get(dir); + if (ctx == NULL) { + ctx = ll_sax_alloc(dir); + if (ctx == NULL) + GOTO(out, rc = -ENOMEM); + + spin_lock(&lli->lli_sa_lock); + if (unlikely(lli->lli_sax)) { + struct ll_statahead_context *tmp = ctx; + + if (lli->lli_sa_pattern == LSA_PATTERN_NONE || + lli->lli_sa_pattern == LSA_PATTERN_FNAME) { + lli->lli_sa_pattern = LSA_PATTERN_FNAME; + ctx = lli->lli_sax; + __ll_sax_get(ctx); + fd->fd_sai = __ll_sai_get(sai); + rc = 0; + } else { + rc = -EINVAL; + CWARN("%s: pattern %X is not FNAME: rc = %d\n", + sbi->ll_fsname, lli->lli_sa_pattern, rc); + } + + spin_unlock(&lli->lli_sa_lock); + ll_sax_free(tmp); + if (rc) + GOTO(out, rc); + } else { + lli->lli_sa_pattern = LSA_PATTERN_FNAME; + lli->lli_sax = ctx; + fd->fd_sai = __ll_sai_get(sai); + spin_unlock(&lli->lli_sa_lock); + } + } else { + spin_lock(&lli->lli_sa_lock); + if (!(lli->lli_sa_pattern == LSA_PATTERN_FNAME || + lli->lli_sa_pattern == LSA_PATTERN_NONE)) { + spin_unlock(&lli->lli_sa_lock); + GOTO(out, rc = -EINVAL); + } + + lli->lli_sa_pattern = LSA_PATTERN_FNAME; + fd->fd_sai = __ll_sai_get(sai); + spin_unlock(&lli->lli_sa_lock); + } + + __ll_sax_get(ctx); + CDEBUG(D_READA, + "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n", + current->pid, dentry, sai, ctx); + + task = kthread_create_on_node(ll_statahead_thread, sai, node, + "ll_sa_%u", current->pid); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CERROR("%s: cannot start ll_sa thread: rc = %d\n", + sbi->ll_fsname, rc); + GOTO(out, rc); + } + + if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl) + ll_start_agl(dentry, sai); + + atomic_inc(&sbi->ll_sa_total); + sai->sai_task = task; + wake_up_process(task); + + RETURN(0); +out: + if (fd->fd_sai) { + ll_sai_put(sai); + ll_sax_put(dir, ctx); + fd->fd_sai = NULL; + } + + if (sai) + ll_sai_free(sai); + + if (ctx) + ll_sax_free(ctx); + + atomic_dec(&sbi->ll_sa_running); + RETURN(rc); +} diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 7f68caf..3be890b 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -3826,23 +3826,31 @@ static int lmv_intent_getattr_async(struct obd_export *exp, ENTRY; - if (!fid_is_sane(&op_data->op_fid2)) + if (!(fid_is_sane(&op_data->op_fid2) || + fid_is_zero(&op_data->op_fid2))) RETURN(-EINVAL); ptgt = lmv_locate_tgt(lmv, op_data); if (IS_ERR(ptgt)) RETURN(PTR_ERR(ptgt)); - ctgt = lmv_fid2tgt(lmv, &op_data->op_fid2); - if (IS_ERR(ctgt)) - RETURN(PTR_ERR(ctgt)); - /* - * remote object needs two RPCs to lookup and getattr, considering the - * complexity don't support statahead for now. + * Zeroed FID @op_fid2 means that the intent getattr() comes from + * statahead by regularized file names. Currently only do statahead + * for the children files located same as the parent directory. */ - if (ctgt != ptgt) - RETURN(-EREMOTE); + if (!fid_is_zero(&op_data->op_fid2)) { + ctgt = lmv_fid2tgt(lmv, &op_data->op_fid2); + if (IS_ERR(ctgt)) + RETURN(PTR_ERR(ctgt)); + + /* + * remote object needs two RPCs to lookup and getattr, + * considering the complexity don't support statahead for now. + */ + if (ctgt != ptgt) + RETURN(-EREMOTE); + } rc = md_intent_getattr_async(ptgt->ltd_exp, item); @@ -4087,13 +4095,27 @@ lmv_batch_locate_tgt(struct lmv_obd *lmv, struct md_op_item *item) case MD_OP_GETATTR: { struct lmv_tgt_desc *ptgt; - if (!fid_is_sane(&op_data->op_fid2)) + if (!(fid_is_sane(&op_data->op_fid2) || + fid_is_zero(&op_data->op_fid2))) RETURN(ERR_PTR(-EINVAL)); ptgt = lmv_locate_tgt(lmv, op_data); if (IS_ERR(ptgt)) RETURN(ptgt); + /* + * Zeroed @op_fid2 means that it is a statahead populating call + * in the file name pattern which is using file name format to + * prefetch the attributes. Thus it has no idea about the FID of + * the children file. The children file is considered to be + * located on the same storage target with the parent directory + * or the stripped directory. + */ + if (fid_is_zero(&op_data->op_fid2)) { + tgt = ptgt; + break; + } + tgt = lmv_fid2tgt(lmv, &op_data->op_fid2); if (IS_ERR(tgt)) RETURN(tgt); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index f059339..2041ca1 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -2364,6 +2364,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, */ if (mdt_object_remote(child)) GOTO(out_child, rc = -EREMOTE); + try_bits |= child_bits; child_bits = 0; } -- 1.8.3.1