#define SA_OMITTED_ENTRY_MAX 8ULL
-typedef enum {
+enum sa_entry_state {
/** negative values are for error cases */
SA_ENTRY_INIT = 0, /** init entry */
SA_ENTRY_SUCC = 1, /** stat succeed */
SA_ENTRY_INVA = 2, /** invalid entry */
-} se_state_t;
+};
/*
* sa_entry is not refcounted: statahead thread allocates it and do async stat,
/* low layer ldlm lock handle */
__u64 se_handle;
/* entry status */
- se_state_t se_state;
+ enum sa_entry_state se_state;
/* entry size, contains name */
int se_size;
/* pointer to the target inode */
}
/* unhash entry from sai_cache */
-static inline void
-sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
+static inline int sa_unhash(struct ll_statahead_context *ctx,
+ struct sa_entry *entry, bool inuse_check)
{
+ struct ll_statahead_info *sai = entry->se_sai;
int i = sa_hash(entry->se_qstr.hash);
+ int rc = 0;
+
+ if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
+ return -EAGAIN;
spin_lock(&ctx->sax_cache_lock[i]);
- list_del_init(&entry->se_hash);
+ if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
+ rc = -EAGAIN;
+ else
+ list_del_init(&entry->se_hash);
spin_unlock(&ctx->sax_cache_lock[i]);
+
+ return rc;
}
static inline int agl_should_run(struct ll_statahead_info *sai,
/**
* (1) hit ratio less than 80%
* or
- * (2) consecutive miss more than 8
+ * (2) consecutive miss more than 32
* then means low hit.
*/
static inline int sa_low_hit(struct ll_statahead_info *sai)
{
- return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
- (sai->sai_consecutive_miss > 8));
+ return ((sai->sai_hit > 32 && sai->sai_hit < 4 * sai->sai_miss) ||
+ (sai->sai_consecutive_miss > 32));
}
/*
}
/*
- * find sa_entry by name, used by directory scanner, lock is not needed because
- * only scanner can remove the entry from cache.
+ * Find sa_entry by name, used by directory scanner. If @sai_pid is not the PID
+ * of the scanner (which means it may do statahead wrongly, return -EINVAL
+ * immediately.
*/
-static struct sa_entry *
-sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
+static struct sa_entry *sa_get(struct ll_statahead_context *ctx,
+ const struct qstr *qstr,
+ struct ll_statahead_info **info)
{
struct sa_entry *entry;
int i = sa_hash(qstr->hash);
if (entry->se_qstr.hash == qstr->hash &&
entry->se_qstr.len == qstr->len &&
memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
+ struct ll_statahead_info *sai = entry->se_sai;
+
+ if (sai->sai_pid != current->pid) {
+ CDEBUG(D_CACHE,
+ "%s: wrong pid=%d:%d for entry %.*s\n",
+ ll_i2sbi(ctx->sax_inode)->ll_fsname,
+ sai->sai_pid, current->pid,
+ entry->se_qstr.len, entry->se_qstr.name);
+ entry = ERR_PTR(-EINVAL);
+ *info = sai;
+ }
+
+ atomic_inc(&sai->sai_inuse_count);
spin_unlock(&ctx->sax_cache_lock[i]);
return entry;
}
}
/* unhash and unlink sa_entry, and then free it */
-static inline void
-sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
+static inline int sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry,
+ bool locked, bool inuse_check)
{
struct inode *dir = sai->sai_dentry->d_inode;
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_context *ctx = lli->lli_sax;
+ int rc;
- LASSERT(!sa_unhashed(entry));
LASSERT(!list_empty(&entry->se_list));
LASSERT(sa_ready(entry));
- sa_unhash(ctx, entry);
+ rc = sa_unhash(ctx, entry, inuse_check);
+ if (rc)
+ return rc;
if (!locked)
spin_lock(&lli->lli_sa_lock);
iput(entry->se_inode);
atomic_dec(&sai->sai_cache_count);
+
sa_free(ctx, entry);
if (locked)
spin_lock(&lli->lli_sa_lock);
+
+ return 0;
+}
+
+static inline int sa_kill_try(struct ll_statahead_info *sai,
+ struct sa_entry *entry, bool locked)
+{
+ return sa_kill(sai, entry, locked, true);
}
/* called by scanner after use, sa_entry will be killed */
struct ll_inode_info *lli = ll_i2info(dir);
struct sa_entry *tmp;
bool wakeup = false;
+ bool inuse = false;
if (entry && entry->se_state == SA_ENTRY_SUCC) {
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
wakeup = true;
}
- if (entry)
- sa_kill(sai, entry, false);
+ if (entry) {
+ inuse = true;
+ sa_kill(sai, entry, false, false);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_STATAHEAD_PAUSE, cfs_fail_val);
+ }
- if (sai) {
+ spin_lock(&lli->lli_sa_lock);
+ if (inuse) {
/*
* kill old completed entries. Maybe kicking old entries can
* be ignored?
*/
- spin_lock(&lli->lli_sa_lock);
while ((tmp = list_first_entry_or_null(&sai->sai_entries,
struct sa_entry, se_list))) {
if (!is_omitted_entry(sai, tmp->se_index))
break;
/* ll_sa_lock is dropped by sa_kill(), restart list */
- sa_kill(sai, tmp, true);
+ sa_kill(sai, tmp, true, false);
}
- spin_unlock(&lli->lli_sa_lock);
}
-
- spin_lock(&lli->lli_sa_lock);
if (wakeup && sai->sai_task)
wake_up_process(sai->sai_task);
+ if (inuse)
+ atomic_dec(&sai->sai_inuse_count);
spin_unlock(&lli->lli_sa_lock);
}
INIT_LIST_HEAD(&sai->sai_agls);
atomic_set(&sai->sai_cache_count, 0);
-
+ atomic_set(&sai->sai_inuse_count, 0);
spin_lock(&sai_generation_lock);
lli->lli_sa_generation = ++sai_generation;
if (unlikely(sai_generation == 0))
if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
- struct sa_entry *entry, *next;
lli->lli_sai = NULL;
list_del_init(&sai->sai_item);
LASSERT(!sai->sai_agl_task);
LASSERT(sai->sai_sent == sai->sai_replied);
- list_for_each_entry_safe(entry, next, &sai->sai_entries,
- se_list)
- sa_kill(sai, entry, false);
-
LASSERT(atomic_read(&sai->sai_cache_count) == 0);
LASSERT(agl_list_empty(sai));
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct lu_batch *bh = NULL;
+ struct sa_entry *entry;
+ int tries = 0;
int rc = 0;
ENTRY;
atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
+ /* Kill all local cached entry. */
+ spin_lock(&lli->lli_sa_lock);
+ while ((entry = list_first_entry_or_null(&sai->sai_entries,
+ struct sa_entry, se_list))) {
+ /*
+ * If the entry is being used by the user process, wait for
+ * inuse entry finished and restart to kill local cached
+ * entries.
+ */
+ if (sa_kill_try(sai, entry, true)) {
+ spin_unlock(&lli->lli_sa_lock);
+ msleep(125);
+ if (++tries % 1024 == 0) {
+ CWARN("%s: statahead thread waited %lums for inuse entry "DFID" to be finished\n",
+ sbi->ll_fsname, tries * 125/MSEC_PER_SEC,
+ PFID(&entry->se_fid));
+ }
+ spin_lock(&lli->lli_sa_lock);
+ }
+ }
+ spin_unlock(&lli->lli_sa_lock);
+
ll_sai_put(sai);
ll_sax_put(dir, lli->lli_sax);
lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
}
- LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED);
sai = ll_find_sai_locked(ctx, current->pid);
if (sai) {
spin_unlock(&lli->lli_sa_lock);
RETURN(-EEXIST);
}
+ lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
spin_unlock(&lli->lli_sa_lock);
RETURN(start_statahead_thread(dir, dentry, true));
struct sa_entry *entry = NULL;
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = lli->lli_sai;
+ struct ll_statahead_info *info = NULL;
int rc = 0;
ENTRY;
if (unplug)
GOTO(out, rc = 1);
- entry = sa_get(ctx, &(*dentryp)->d_name);
- if (!entry) {
+ entry = sa_get(ctx, &(*dentryp)->d_name, &info);
+ if (entry == ERR_PTR(-EINVAL)) {
+ sai = info;
+ spin_lock(&lli->lli_sa_lock);
+ if (sai->sai_task) {
+ struct task_struct *task = sai->sai_task;
+
+ /*
+ * matches smp_load_acquire() in
+ * ll_statahead_thread().
+ * Notify to stop statahead thread immediately.
+ */
+ smp_store_release(&sai->sai_task, NULL);
+ wake_up_process(task);
+ }
+ atomic_dec(&sai->sai_inuse_count);
+ spin_unlock(&lli->lli_sa_lock);
+ RETURN(-EINVAL);
+ } else if (entry == NULL) {
if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
rc = ll_shared_statahead_check(dir, *dentryp, ctx);
GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
sai = entry->se_sai;
+ else
+ sai = entry->se_sai;
LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
sai->sai_ls_all = 0;
sai->sai_max = sbi->ll_sa_max;
strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
+ sai->sai_pid = current->pid;
ctx = ll_sax_get(dir);
if (ctx == NULL) {
* Quick check whether the last character is digit.
*/
if (!isdigit(dname->name[dname->len - 1])) {
+ lli->lli_sa_pattern &= ~LSA_PATTERN_FN_PREDICT;
lli->lli_sa_match_count = 0;
return;
}