ctx->sax_inode = igrab(dir);
atomic_set(&ctx->sax_refcount, 1);
+ INIT_LIST_HEAD(&ctx->sax_sai_list);
for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
INIT_LIST_HEAD(&ctx->sax_cache[i]);
spin_lock_init(&ctx->sax_cache_lock[i]);
struct ll_inode_info *lli = ll_i2info(dir);
if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
+ LASSERT(list_empty(&ctx->sax_sai_list));
lli->lli_sai = NULL;
lli->lli_sax = NULL;
if (lli->lli_sa_pattern & (LSA_PATTERN_ADVISE |
LSA_PATTERN_FNAME)) {
lli->lli_opendir_key = NULL;
- lli->lli_opendir_pid = 0;
+ lli->lli_stat_pid = 0;
lli->lli_sa_enabled = 0;
}
lli->lli_sa_pattern = LSA_PATTERN_NONE;
sai->sai_index = 1;
init_waitqueue_head(&sai->sai_waitq);
+ INIT_LIST_HEAD(&sai->sai_item);
INIT_LIST_HEAD(&sai->sai_entries);
INIT_LIST_HEAD(&sai->sai_agls);
struct sa_entry *entry, *next;
lli->lli_sai = NULL;
+ list_del_init(&sai->sai_item);
spin_unlock(&lli->lli_sa_lock);
LASSERT(!sai->sai_task);
plli = ll_i2info(parent->d_inode);
task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
- plli->lli_opendir_pid);
+ plli->lli_stat_pid);
if (IS_ERR(task)) {
CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
RETURN_EXIT;
if (IS_ERR(page)) {
rc = PTR_ERR(page);
CDEBUG(D_READA,
- "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
+ "error reading dir "DFID" at %llu /%llu stat_pid = %u: rc = %d\n",
PFID(ll_inode2fid(dir)), pos, sai->sai_index,
- lli->lli_opendir_pid, rc);
+ lli->lli_stat_pid, rc);
break;
}
if (rc < 0) {
spin_lock(&lli->lli_sa_lock);
sai->sai_task = NULL;
- lli->lli_sa_enabled = 0;
spin_unlock(&lli->lli_sa_lock);
}
* finished yet, we'd better not start a new statahead for now.
*/
lli->lli_opendir_key = key;
- lli->lli_opendir_pid = current->pid;
+ lli->lli_stat_pid = current->pid;
lli->lli_sa_enabled = 1;
+ lli->lli_sa_pattern |= LSA_PATTERN_OPENDIR;
}
spin_unlock(&lli->lli_sa_lock);
}
return;
}
- LASSERT(lli->lli_opendir_pid != 0);
+ LASSERT(lli->lli_stat_pid != 0);
LASSERT(lli->lli_opendir_key == key);
spin_lock(&lli->lli_sa_lock);
lli->lli_opendir_key = NULL;
- lli->lli_opendir_pid = 0;
+ lli->lli_stat_pid = 0;
lli->lli_sa_enabled = 0;
lli->lli_sa_pattern = LSA_PATTERN_NONE;
lli->lli_sa_fname_index = 0;
struct ll_inode_info *lli = ll_i2info(dir);
rc = PTR_ERR(page);
- CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
+ CERROR("%s: reading dir "DFID" at %llu stat_pid = %u : rc = %d\n",
ll_i2sbi(dir)->ll_fsname,
PFID(ll_inode2fid(dir)), pos,
- lli->lli_opendir_pid, rc);
+ lli->lli_stat_pid, rc);
break;
}
return rc;
}
+static struct ll_statahead_info *
+ll_find_sai_locked(struct ll_statahead_context *ctx, pid_t pid)
+{
+ struct ll_statahead_info *sai;
+
+ list_for_each_entry(sai, &ctx->sax_sai_list, sai_item) {
+ if (sai->sai_pid == pid)
+ return sai;
+ }
+ return NULL;
+}
+
+static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
+ bool agl);
+
+static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
+ struct ll_statahead_context *ctx)
+{
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai;
+
+ ENTRY;
+
+ spin_lock(&lli->lli_sa_lock);
+ sai = lli->lli_sai;
+ if (sai) {
+ if (sai->sai_pid == current->pid) {
+ spin_unlock(&lli->lli_sa_lock);
+ RETURN(0);
+ }
+ lli->lli_sai = NULL;
+ lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
+ }
+
+ LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED);
+ sai = ll_find_sai_locked(ctx, current->pid);
+ if (sai) {
+ spin_unlock(&lli->lli_sa_lock);
+ RETURN(-EEXIST);
+ }
+
+ spin_unlock(&lli->lli_sa_lock);
+
+ RETURN(start_statahead_thread(dir, dentry, true));
+}
+
/**
* revalidate @dentryp from statahead cache
*
GOTO(out, rc = 1);
entry = sa_get(ctx, &(*dentryp)->d_name);
- if (!entry)
- GOTO(out, rc = -EAGAIN);
+ if (!entry) {
+ if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
+ rc = ll_shared_statahead_check(dir, *dentryp, ctx);
+ GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
+ }
- if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
- lli->lli_sa_pattern & LSA_PATTERN_FNAME)
+ if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
LASSERT(sai == entry->se_sai);
- else if (lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
+ else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
+ lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
sai = entry->se_sai;
- LASSERT(sai != NULL);
+ LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
+ lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
if (!sa_ready(entry)) {
spin_lock(&lli->lli_sa_lock);
sai->sai_index_wait = entry->se_index;
{
struct ll_inode_info *lli = ll_i2info(dir);
- if (lli->lli_opendir_pid == 0)
+ if (lli->lli_stat_pid == 0)
+ return false;
+
+ /* Directory listing needs to call opendir()/readdir()/stat(). */
+ if (!(lli->lli_sa_pattern & LSA_PATTERN_OPENDIR))
return false;
if (lli->lli_sa_enabled == 0)
* time to try it.
*/
spin_lock(&lli->lli_sa_lock);
- if (lli->lli_opendir_pid == current->pid) {
+ if (lli->lli_stat_pid == current->pid) {
lli->lli_sa_enabled = 0;
lli->lli_sa_pattern |= LSA_PATTERN_LS_NOT_FIRST_DE;
}
if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
return false;
+ if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)
+ return true;
/*
* Parse the format of the file name to determine whether it matches
- * the supported file name pattern for statahead (i.e. mdtest.$i).
+ * the supported file name pattern for statahead (i.e. mdtest.$rank.$i).
*/
i = dname->len - 1;
if (isdigit(name[i])) {
long num;
int ret;
+ if (lli->lli_stat_pid == 0) {
+ lli->lli_stat_pid = current->pid;
+ } else if (lli->lli_stat_pid != current->pid) {
+ /*
+ * More than two processes (MPI ranks) doing stat()
+ * calls under this directory, consider it as a mdtest
+ * shared dir stat() workload.
+ */
+ spin_lock(&lli->lli_sa_lock);
+ lli->lli_stat_pid = current->pid;
+ if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
+ lli->lli_sai = NULL;
+ rc = false;
+ } else {
+ lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
+ rc = true;
+ }
+ lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
+ spin_unlock(&lli->lli_sa_lock);
+ return rc;
+ }
+
while (--i >= 0 && isdigit(name[i]))
; /* do nothing */
i++;
lli->lli_sa_match_count++;
lli->lli_sa_fname_index = num;
- if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT) {
- spin_lock(&lli->lli_sa_lock);
- lli->lli_sa_pattern |= LSA_PATTERN_FN_UNIQUE;
- spin_unlock(&lli->lli_sa_lock);
+ if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT)
GOTO(out, rc = true);
- }
return false;
}
sa_pattern_fname_detect(dir, dchild);
}
+static inline int ll_sax_add_sai(struct ll_statahead_context *ctx,
+ struct ll_statahead_info *sai)
+{
+ if (ll_find_sai_locked(ctx, sai->sai_pid) != NULL)
+ return -EEXIST;
+
+ list_add_tail(&sai->sai_item, &ctx->sax_sai_list);
+ return 0;
+}
+
/**
* start statahead thread
*
if (!sai)
GOTO(out, rc = -ENOMEM);
- ctx = ll_sax_alloc(dir);
- if (!ctx)
- GOTO(out, rc = -ENOMEM);
-
sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
+ sai->sai_pid = current->pid;
if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
struct qstr *dname = &dentry->d_name;
const unsigned char *name = dname->name;
- int rc;
+ long num;
int i;
if (dname->len >= sizeof(sai->sai_fname))
while (--i >= 0 && isdigit(name[i]))
; /* do nothing */
i++;
+ rc = kstrtol(&name[i], 0, &num);
+ if (rc)
+ GOTO(out, rc);
memcpy(sai->sai_fname, dname->name, i);
sai->sai_fname[i] = '\0';
- sai->sai_fname_index = lli->lli_sa_fname_index;
+ sai->sai_fname_index = num;
/* The front part of the file name is zeroed padding. */
if (name[i] == '0')
sai->sai_fname_zeroed_len = dname->len - i;
}
- /*
- * if current lli_opendir_key was deauthorized, or dir re-opened by
- * another process, don't start statahead, otherwise the newly spawned
- * statahead thread won't be notified to quit.
- */
- spin_lock(&lli->lli_sa_lock);
- if (unlikely(lli->lli_sai ||
- ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
- !lli->lli_opendir_key &&
- lli->lli_opendir_pid != current->pid))) {
+ /* The workload like directory listing or mdtest unique dir stat() */
+ if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
+ (lli->lli_sa_pattern & (LSA_PATTERN_FN_SHARED |
+ LSA_PATTERN_FNAME)) == LSA_PATTERN_FNAME) {
+ ctx = ll_sax_alloc(dir);
+ if (!ctx)
+ GOTO(out, rc = -ENOMEM);
+
+ /*
+ * if current lli_opendir_key was deauthorized, or dir
+ * re-opened by another process, don't start statahead,
+ * otherwise the newly spawned statahead thread won't be
+ * notified to quit.
+ */
+ spin_lock(&lli->lli_sa_lock);
+ if (unlikely(lli->lli_sai || lli->lli_sax ||
+ ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
+ !lli->lli_opendir_key &&
+ lli->lli_stat_pid != current->pid))) {
+ spin_unlock(&lli->lli_sa_lock);
+ GOTO(out, rc = -EPERM);
+ }
+ rc = ll_sax_add_sai(ctx, sai);
+ if (rc) {
+ spin_unlock(&lli->lli_sa_lock);
+ GOTO(out, rc);
+ }
+ lli->lli_sai = sai;
+ lli->lli_sax = ctx;
spin_unlock(&lli->lli_sa_lock);
- GOTO(out, rc = -EPERM);
+ } else if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED) {
+ /* For mdtest shared dir stat() workload */
+ LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FNAME);
+ ctx = ll_sax_get(dir);
+ if (ctx == NULL) {
+ ctx = ll_sax_alloc(dir);
+ if (ctx == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ spin_lock(&lli->lli_sa_lock);
+ if (lli->lli_sax) {
+ struct ll_statahead_context *tmp = ctx;
+
+ if (lli->lli_sa_pattern &
+ LSA_PATTERN_FN_SHARED) {
+ ctx = lli->lli_sax;
+ __ll_sax_get(ctx);
+ rc = ll_sax_add_sai(ctx, sai);
+ } else {
+ CWARN("%s: invalid pattern %#X.\n",
+ sbi->ll_fsname,
+ lli->lli_sa_pattern);
+ rc = -EINVAL;
+ }
+
+ spin_unlock(&lli->lli_sa_lock);
+ ll_sax_free(tmp);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ lli->lli_sax = ctx;
+ rc = ll_sax_add_sai(ctx, sai);
+ spin_unlock(&lli->lli_sa_lock);
+ }
+ } else {
+ spin_lock(&lli->lli_sa_lock);
+ if (!(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)) {
+ spin_unlock(&lli->lli_sa_lock);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ rc = ll_sax_add_sai(ctx, sai);
+ spin_unlock(&lli->lli_sa_lock);
+ }
+
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ CERROR("%s: unsupported statahead pattern %#X.\n",
+ sbi->ll_fsname, lli->lli_sa_pattern);
+ GOTO(out, rc = -EOPNOTSUPP);
}
- lli->lli_sai = sai;
- lli->lli_sax = ctx;
- spin_unlock(&lli->lli_sa_lock);
CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
current->pid, parent);
task = kthread_create_on_node(ll_statahead_thread, sai, node,
- "ll_sa_%u", lli->lli_opendir_pid);
+ "ll_sa_%u", lli->lli_stat_pid);
if (IS_ERR(task)) {
spin_lock(&lli->lli_sa_lock);
lli->lli_sai = NULL;
* subsequent stat won't waste time to try it.
*/
spin_lock(&lli->lli_sa_lock);
- if (lli->lli_opendir_pid == current->pid)
+ if (lli->lli_stat_pid == current->pid)
lli->lli_sa_enabled = 0;
spin_unlock(&lli->lli_sa_lock);
if (ctx)
ll_sax_free(ctx);
- if (first != LS_NOT_FIRST_DE)
+ if (rc)
atomic_dec(&sbi->ll_sa_running);
RETURN(rc);