}
LUSTRE_RW_ATTR(statahead_timeout);
+static ssize_t
+statahead_fname_predict_hit_show(struct kobject *kobj, struct attribute *attr,
+ char *buf)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+
+ return snprintf(buf, PAGE_SIZE, "%u\n", sbi->ll_sa_fname_predict_hit);
+}
+
+static ssize_t
+statahead_fname_predict_hit_store(struct kobject *kobj, struct attribute *attr,
+ const char *buffer, size_t count)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+ unsigned long val;
+ int rc;
+
+ rc = kstrtoul(buffer, 0, &val);
+ if (rc)
+ return rc;
+
+ sbi->ll_sa_fname_predict_hit = val;
+ return count;
+}
+LUSTRE_RW_ATTR(statahead_fname_predict_hit);
+
+
+static ssize_t
+statahead_fname_match_hit_show(struct kobject *kobj, struct attribute *attr,
+ char *buf)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+
+ return snprintf(buf, PAGE_SIZE, "%u\n", sbi->ll_sa_fname_match_hit);
+}
+
+static ssize_t
+statahead_fname_match_hit_store(struct kobject *kobj, struct attribute *attr,
+ const char *buffer, size_t count)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+ unsigned long val;
+ int rc;
+
+ rc = kstrtoul(buffer, 0, &val);
+ if (rc)
+ return rc;
+
+ sbi->ll_sa_fname_match_hit = val;
+ return count;
+}
+LUSTRE_RW_ATTR(statahead_fname_match_hit);
+
static ssize_t statahead_agl_show(struct kobject *kobj,
struct attribute *attr,
char *buf)
&lustre_attr_statahead_max.attr,
&lustre_attr_statahead_min.attr,
&lustre_attr_statahead_timeout.attr,
+ &lustre_attr_statahead_fname_predict_hit.attr,
+ &lustre_attr_statahead_fname_match_hit.attr,
&lustre_attr_statahead_agl.attr,
&lustre_attr_lazystatfs.attr,
&lustre_attr_statfs_max_age.attr,
spin_lock_init(&ctx->sax_cache_lock[i]);
}
+ atomic_inc(&ll_i2sbi(dir)->ll_sa_refcnt);
RETURN(ctx);
}
static inline void ll_sax_free(struct ll_statahead_context *ctx)
{
+ struct ll_sb_info *sbi;
+
LASSERT(ctx->sax_inode != NULL);
+ sbi = ll_i2sbi(ctx->sax_inode);
iput(ctx->sax_inode);
+ atomic_dec(&sbi->ll_sa_refcnt);
OBD_FREE_PTR(ctx);
}
rc = ll_prep_inode(&child, pill, dir->i_sb, it);
if (rc) {
- CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
+ CERROR("%s: failed to prep %.*s "DFID" inode@%pK: rc = %d\n",
ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
- entry->se_qstr.name, PFID(&entry->se_fid), rc);
+ entry->se_qstr.name, PFID(&entry->se_fid),
+ entry->se_inode, rc);
+ if (entry->se_inode) {
+ iput(entry->se_inode);
+ entry->se_inode = NULL;
+ }
GOTO(out, rc);
}
plli = ll_i2info(parent->d_inode);
task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
- plli->lli_stat_pid);
+ sai->sai_pid);
if (IS_ERR(task)) {
- CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
+ CERROR("%s: cannot start ll_agl_%u thread for '%pd': rc=%ld\n",
+ ll_i2sbi(d_inode(parent))->ll_fsname, sai->sai_pid,
+ sai->sai_dentry, PTR_ERR(task));
RETURN_EXIT;
}
sai->sai_agl_task = task;
EXIT;
}
-static int ll_statahead_by_list(struct dentry *parent)
+static int ll_statahead_by_list(struct ll_statahead_info *sai,
+ struct dentry *parent)
{
struct inode *dir = parent->d_inode;
struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct md_op_data *op_data;
struct page *page = NULL;
ENTRY;
+ LASSERTF(sai != NULL, "Pattern %#X\n", lli->lli_sa_pattern);
CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
sai, parent);
struct inode *dir = parent->d_inode;
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_sb_info *sbi = ll_i2sbi(dir);
+ enum ll_sa_pattern pattern;
struct lu_batch *bh = NULL;
struct sa_entry *entry;
int tries = 0;
}
sai->sai_bh = bh;
-
- switch (lli->lli_sa_pattern & LSA_PATTERN_MASK) {
+ pattern = lli->lli_sa_pattern & LSA_PATTERN_MASK;
+ switch (pattern) {
case LSA_PATTERN_LIST:
- rc = ll_statahead_by_list(parent);
+ rc = ll_statahead_by_list(sai, parent);
break;
case LSA_PATTERN_ADVISE:
rc = ll_statahead_by_advise(sai, parent);
while (({set_current_state(TASK_IDLE);
/* matches smp_store_release() in ll_deauthorize_statahead() */
smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled; })) {
- schedule();
+ long timeout;
+
+ timeout = schedule_timeout(
+ cfs_time_seconds(sbi->ll_sa_timeout));
+ if (timeout == 0 && pattern == LSA_PATTERN_LIST &&
+ lli->lli_sai == NULL)
+ CERROR("%s: ll_sa_%u LIST => FNAME no wakeup.\n",
+ sbi->ll_fsname, sai->sai_pid);
}
__set_current_state(TASK_RUNNING);
{
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai;
+ int rc;
ENTRY;
spin_lock(&lli->lli_sa_lock);
+ if (!(lli->lli_sa_pattern & LSA_PATTERN_FNAME))
+ GOTO(out_unlock, rc = 0);
+
sai = lli->lli_sai;
if (sai) {
- if (sai->sai_pid == current->pid) {
- spin_unlock(&lli->lli_sa_lock);
- RETURN(0);
- }
+ if (sai->sai_pid == current->pid)
+ GOTO(out_unlock, rc = 0);
+
lli->lli_sai = NULL;
lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
}
sai = ll_find_sai_locked(ctx, current->pid);
- if (sai) {
- spin_unlock(&lli->lli_sa_lock);
- RETURN(-EEXIST);
- }
+ if (sai)
+ GOTO(out_unlock, rc = -EEXIST);
lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
spin_unlock(&lli->lli_sa_lock);
RETURN(start_statahead_thread(dir, dentry, true));
+
+out_unlock:
+ spin_unlock(&lli->lli_sa_lock);
+ RETURN(rc);
}
/**
* \retval negative number upon error
*/
static int revalidate_statahead_dentry(struct inode *dir,
+ struct ll_statahead_info *sai,
struct ll_statahead_context *ctx,
struct dentry **dentryp,
bool unplug)
struct sa_entry *entry = NULL;
struct ll_dentry_data *lld;
struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
struct ll_statahead_info *info = NULL;
int rc = 0;
LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
+
if (!sa_ready(entry)) {
spin_lock(&lli->lli_sa_lock);
sai->sai_index_wait = entry->se_index;
sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
{
struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
struct qstr *dname = &dchild->d_name;
const unsigned char *name = dname->name;
bool rc = false;
int i;
- if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
+ if (sbi->ll_enable_statahead_fname == 0)
return false;
if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)
return true;
lli->lli_sa_match_count++;
lli->lli_sa_fname_index = num;
- if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT)
+ if (lli->lli_sa_match_count >
+ sbi->ll_sa_fname_match_hit)
GOTO(out, rc = true);
return false;
return 0;
}
+/* Check whether it is shared FNAME statahead pattern. */
+static inline bool sa_pattern_shared_fname(struct ll_inode_info *lli)
+{
+ return (lli->lli_sa_pattern & LSA_PATTERN_SFNAME) == LSA_PATTERN_SFNAME;
+}
+
/**
* start statahead thread
*
struct task_struct *task;
struct ll_sb_info *sbi;
int first = LS_FIRST_DE;
+ bool fail_free = true;
int rc = 0;
ENTRY;
/* The workload like directory listing or mdtest unique dir stat() */
if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
- (lli->lli_sa_pattern & (LSA_PATTERN_FN_SHARED |
- LSA_PATTERN_FNAME)) == LSA_PATTERN_FNAME) {
+ (lli->lli_sa_pattern & LSA_PATTERN_SFNAME) == LSA_PATTERN_FNAME) {
ctx = ll_sax_alloc(dir);
if (!ctx)
GOTO(out, rc = -ENOMEM);
spin_lock(&lli->lli_sa_lock);
if (unlikely(lli->lli_sai || lli->lli_sax ||
((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
- !lli->lli_opendir_key &&
- lli->lli_stat_pid != current->pid))) {
+ (!lli->lli_opendir_key ||
+ lli->lli_stat_pid != current->pid)))) {
spin_unlock(&lli->lli_sa_lock);
GOTO(out, rc = -EPERM);
}
lli->lli_sai = sai;
lli->lli_sax = ctx;
spin_unlock(&lli->lli_sa_lock);
- } else if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED) {
+ } else if (sa_pattern_shared_fname(lli)) {
/* For mdtest shared dir stat() workload */
- LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FNAME);
ctx = ll_sax_get(dir);
if (ctx == NULL) {
ctx = ll_sax_alloc(dir);
GOTO(out, rc = -ENOMEM);
spin_lock(&lli->lli_sa_lock);
+ if (!sa_pattern_shared_fname(lli)) {
+ spin_unlock(&lli->lli_sa_lock);
+ CERROR("%s: invalid pattern %#X.\n",
+ sbi->ll_fsname, lli->lli_sa_pattern);
+ GOTO(out, rc = -EINVAL);
+ }
+
if (lli->lli_sax) {
struct ll_statahead_context *tmp = ctx;
- if (lli->lli_sa_pattern &
- LSA_PATTERN_FN_SHARED) {
- ctx = lli->lli_sax;
- __ll_sax_get(ctx);
- rc = ll_sax_add_sai(ctx, sai);
- } else {
- CWARN("%s: invalid pattern %#X.\n",
- sbi->ll_fsname,
- lli->lli_sa_pattern);
- rc = -EINVAL;
- }
+ ctx = lli->lli_sax;
+ __ll_sax_get(ctx);
+ rc = ll_sax_add_sai(ctx, sai);
spin_unlock(&lli->lli_sa_lock);
ll_sax_free(tmp);
- if (rc)
+ if (rc) {
+ CERROR("%s: failed to add sai: rc=%d\n",
+ sbi->ll_fsname, rc);
+ ll_sax_put(dir, ctx);
+ ctx = NULL;
GOTO(out, rc);
+ }
} else {
lli->lli_sax = ctx;
rc = ll_sax_add_sai(ctx, sai);
+ if (rc)
+ lli->lli_sax = NULL;
spin_unlock(&lli->lli_sa_lock);
}
} else {
spin_lock(&lli->lli_sa_lock);
- if (!(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)) {
- spin_unlock(&lli->lli_sa_lock);
- GOTO(out, rc = -EINVAL);
- }
-
- rc = ll_sax_add_sai(ctx, sai);
+ if (sa_pattern_shared_fname(lli))
+ rc = ll_sax_add_sai(ctx, sai);
+ else
+ rc = -EINVAL;
spin_unlock(&lli->lli_sa_lock);
+ if (rc) {
+ CERROR("%s: failed to add sai: pattern %#X pid=%d rc=%d\n",
+ sbi->ll_fsname, lli->lli_sa_pattern,
+ sai->sai_pid, rc);
+ ll_sax_put(dir, ctx);
+ ctx = NULL;
+ GOTO(out, rc);
+ }
}
if (rc)
current->pid, parent);
task = kthread_create_on_node(ll_statahead_thread, sai, node,
- "ll_sa_%u", lli->lli_stat_pid);
+ "ll_sa_%u", sai->sai_pid);
if (IS_ERR(task)) {
spin_lock(&lli->lli_sa_lock);
lli->lli_sai = NULL;
spin_unlock(&lli->lli_sa_lock);
rc = PTR_ERR(task);
- CERROR("can't start ll_sa thread, rc: %d\n", rc);
+ CERROR("%s: cannot start ll_sa_%u thread for '%pd': rc = %d\n",
+ sbi->ll_fsname, current->pid, sai->sai_dentry, rc);
+ /* Use @ll_sai/sax_put to release @sai and @ctx. */
+ fail_free = false;
GOTO(out, rc);
}
lli->lli_sa_enabled = 0;
spin_unlock(&lli->lli_sa_lock);
- if (sai)
- ll_sai_put(sai);
-
- if (ctx)
- ll_sax_put(dir, ctx);
+ /*
+ * @ll_sai_free() and @ll_sax_free() will not set @lli_sai and @lli_sax
+ * with NULL. When these two values are not set by this process upon
+ * failure, we should free @sai and @ctx directly by using these two
+ * functions.
+ * If using ll_sax/sai_put(), it will change @lli_sai and @lli_sax into
+ * NULL and other statahead threads will use/access these two data
+ * structures (NULL) wrongly.
+ */
+ if (fail_free) {
+ if (sai)
+ ll_sai_free(sai);
+ if (ctx)
+ ll_sax_free(ctx);
+ } else {
+ if (sai)
+ ll_sai_put(sai);
+ if (ctx)
+ ll_sax_put(dir, ctx);
+ }
if (rc)
atomic_dec(&sbi->ll_sa_running);
-
RETURN(rc);
}
}
spin_unlock(&lli->lli_sa_lock);
if (ctx) {
- rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
+ rc = revalidate_statahead_dentry(dir, sai, ctx,
+ dentryp, unplug);
CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
*dentryp, rc);
if (sai)
"ll_sa_%u", current->pid);
if (IS_ERR(task)) {
rc = PTR_ERR(task);
- CERROR("%s: cannot start ll_sa thread: rc = %d\n",
- sbi->ll_fsname, rc);
+ CERROR("%s: cannot start ll_sa_%u thread for '%pd': rc = %d\n",
+ sbi->ll_fsname, sai->sai_pid, sai->sai_dentry, rc);
GOTO(out, rc);
}
*/
void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
{
- struct ll_inode_info *lli;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
struct qstr *dname = &dchild->d_name;
- if (ll_i2sbi(dir)->ll_sa_max == 0)
+ if (sbi->ll_sa_max == 0)
return;
- if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
+ if (sbi->ll_enable_statahead_fname == 0)
return;
- lli = ll_i2info(dir);
if (lli->lli_sa_enabled)
return;
}
lli->lli_sa_match_count++;
- if (lli->lli_sa_match_count > LSA_FN_PREDICT_HIT) {
+ if (lli->lli_sa_match_count > sbi->ll_sa_fname_predict_hit) {
spin_lock(&lli->lli_sa_lock);
lli->lli_sa_pattern |= LSA_PATTERN_FN_PREDICT;
spin_unlock(&lli->lli_sa_lock);