#define SA_OMITTED_ENTRY_MAX 8ULL
-typedef enum {
+enum sa_entry_state {
/** negative values are for error cases */
SA_ENTRY_INIT = 0, /** init entry */
SA_ENTRY_SUCC = 1, /** stat succeed */
SA_ENTRY_INVA = 2, /** invalid entry */
-} se_state_t;
+};
/*
* sa_entry is not refcounted: statahead thread allocates it and do async stat,
/* low layer ldlm lock handle */
__u64 se_handle;
/* entry status */
- se_state_t se_state;
+ enum sa_entry_state se_state;
/* entry size, contains name */
int se_size;
/* pointer to the target inode */
}
/* unhash entry from sai_cache */
-static inline void
-sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
+static inline int sa_unhash(struct ll_statahead_context *ctx,
+ struct sa_entry *entry, bool inuse_check)
{
+ struct ll_statahead_info *sai = entry->se_sai;
int i = sa_hash(entry->se_qstr.hash);
+ int rc = 0;
+
+ if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
+ return -EAGAIN;
spin_lock(&ctx->sax_cache_lock[i]);
- list_del_init(&entry->se_hash);
+ if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
+ rc = -EAGAIN;
+ else
+ list_del_init(&entry->se_hash);
spin_unlock(&ctx->sax_cache_lock[i]);
+
+ return rc;
}
static inline int agl_should_run(struct ll_statahead_info *sai,
/**
* (1) hit ratio less than 80%
* or
- * (2) consecutive miss more than 8
+ * (2) consecutive miss more than 32
* then means low hit.
*/
static inline int sa_low_hit(struct ll_statahead_info *sai)
{
- return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
- (sai->sai_consecutive_miss > 8));
+ return ((sai->sai_hit > 32 && sai->sai_hit < 4 * sai->sai_miss) ||
+ (sai->sai_consecutive_miss > 32));
}
/*
}
/*
- * find sa_entry by name, used by directory scanner, lock is not needed because
- * only scanner can remove the entry from cache.
+ * Find sa_entry by name, used by directory scanner. If @sai_pid is not the PID
+ * of the scanner (which means it may do statahead wrongly, return -EINVAL
+ * immediately.
*/
-static struct sa_entry *
-sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
+static struct sa_entry *sa_get(struct ll_statahead_context *ctx,
+ const struct qstr *qstr,
+ struct ll_statahead_info **info)
{
struct sa_entry *entry;
int i = sa_hash(qstr->hash);
if (entry->se_qstr.hash == qstr->hash &&
entry->se_qstr.len == qstr->len &&
memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
+ struct ll_statahead_info *sai = entry->se_sai;
+
+ if (sai->sai_pid != current->pid) {
+ CDEBUG(D_CACHE,
+ "%s: wrong pid=%d:%d for entry %.*s\n",
+ ll_i2sbi(ctx->sax_inode)->ll_fsname,
+ sai->sai_pid, current->pid,
+ entry->se_qstr.len, entry->se_qstr.name);
+ entry = ERR_PTR(-EINVAL);
+ *info = sai;
+ }
+
+ atomic_inc(&sai->sai_inuse_count);
spin_unlock(&ctx->sax_cache_lock[i]);
return entry;
}
}
/* unhash and unlink sa_entry, and then free it */
-static inline void
-sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
+static inline int sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry,
+ bool locked, bool inuse_check)
{
struct inode *dir = sai->sai_dentry->d_inode;
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_context *ctx = lli->lli_sax;
+ int rc;
- LASSERT(!sa_unhashed(entry));
LASSERT(!list_empty(&entry->se_list));
LASSERT(sa_ready(entry));
- sa_unhash(ctx, entry);
+ rc = sa_unhash(ctx, entry, inuse_check);
+ if (rc)
+ return rc;
if (!locked)
spin_lock(&lli->lli_sa_lock);
iput(entry->se_inode);
atomic_dec(&sai->sai_cache_count);
+
sa_free(ctx, entry);
if (locked)
spin_lock(&lli->lli_sa_lock);
+
+ return 0;
+}
+
+static inline int sa_kill_try(struct ll_statahead_info *sai,
+ struct sa_entry *entry, bool locked)
+{
+ return sa_kill(sai, entry, locked, true);
}
/* called by scanner after use, sa_entry will be killed */
struct ll_inode_info *lli = ll_i2info(dir);
struct sa_entry *tmp;
bool wakeup = false;
+ bool inuse = false;
if (entry && entry->se_state == SA_ENTRY_SUCC) {
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
wakeup = true;
}
- if (entry)
- sa_kill(sai, entry, false);
+ if (entry) {
+ inuse = true;
+ sa_kill(sai, entry, false, false);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_STATAHEAD_PAUSE, cfs_fail_val);
+ }
- if (sai) {
+ spin_lock(&lli->lli_sa_lock);
+ if (inuse) {
/*
* kill old completed entries. Maybe kicking old entries can
* be ignored?
*/
- spin_lock(&lli->lli_sa_lock);
while ((tmp = list_first_entry_or_null(&sai->sai_entries,
struct sa_entry, se_list))) {
if (!is_omitted_entry(sai, tmp->se_index))
break;
/* ll_sa_lock is dropped by sa_kill(), restart list */
- sa_kill(sai, tmp, true);
+ sa_kill(sai, tmp, true, false);
}
- spin_unlock(&lli->lli_sa_lock);
}
-
- spin_lock(&lli->lli_sa_lock);
if (wakeup && sai->sai_task)
wake_up_process(sai->sai_task);
+ if (inuse)
+ atomic_dec(&sai->sai_inuse_count);
spin_unlock(&lli->lli_sa_lock);
}
INIT_LIST_HEAD(&sai->sai_agls);
atomic_set(&sai->sai_cache_count, 0);
-
+ atomic_set(&sai->sai_inuse_count, 0);
spin_lock(&sai_generation_lock);
lli->lli_sa_generation = ++sai_generation;
if (unlikely(sai_generation == 0))
if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
- struct sa_entry *entry, *next;
lli->lli_sai = NULL;
list_del_init(&sai->sai_item);
LASSERT(!sai->sai_agl_task);
LASSERT(sai->sai_sent == sai->sai_replied);
- list_for_each_entry_safe(entry, next, &sai->sai_entries,
- se_list)
- sa_kill(sai, entry, false);
-
LASSERT(atomic_read(&sai->sai_cache_count) == 0);
LASSERT(agl_list_empty(sai));
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct lu_batch *bh = NULL;
+ struct sa_entry *entry;
+ int tries = 0;
int rc = 0;
ENTRY;
atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
+ /* Kill all local cached entry. */
+ spin_lock(&lli->lli_sa_lock);
+ while ((entry = list_first_entry_or_null(&sai->sai_entries,
+ struct sa_entry, se_list))) {
+ /*
+ * If the entry is being used by the user process, wait for
+ * inuse entry finished and restart to kill local cached
+ * entries.
+ */
+ if (sa_kill_try(sai, entry, true)) {
+ spin_unlock(&lli->lli_sa_lock);
+ msleep(125);
+ if (++tries % 1024 == 0) {
+ CWARN("%s: statahead thread waited %lums for inuse entry "DFID" to be finished\n",
+ sbi->ll_fsname, tries * 125/MSEC_PER_SEC,
+ PFID(&entry->se_fid));
+ }
+ spin_lock(&lli->lli_sa_lock);
+ }
+ }
+ spin_unlock(&lli->lli_sa_lock);
+
ll_sai_put(sai);
ll_sax_put(dir, lli->lli_sax);
lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
}
- LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED);
sai = ll_find_sai_locked(ctx, current->pid);
if (sai) {
spin_unlock(&lli->lli_sa_lock);
RETURN(-EEXIST);
}
+ lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
spin_unlock(&lli->lli_sa_lock);
RETURN(start_statahead_thread(dir, dentry, true));
struct sa_entry *entry = NULL;
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = lli->lli_sai;
+ struct ll_statahead_info *info = NULL;
int rc = 0;
ENTRY;
if (unplug)
GOTO(out, rc = 1);
- entry = sa_get(ctx, &(*dentryp)->d_name);
- if (!entry) {
+ entry = sa_get(ctx, &(*dentryp)->d_name, &info);
+ if (entry == ERR_PTR(-EINVAL)) {
+ sai = info;
+ spin_lock(&lli->lli_sa_lock);
+ if (sai->sai_task) {
+ struct task_struct *task = sai->sai_task;
+
+ /*
+ * matches smp_load_acquire() in
+ * ll_statahead_thread().
+ * Notify to stop statahead thread immediately.
+ */
+ smp_store_release(&sai->sai_task, NULL);
+ wake_up_process(task);
+ }
+ atomic_dec(&sai->sai_inuse_count);
+ spin_unlock(&lli->lli_sa_lock);
+ RETURN(-EINVAL);
+ } else if (entry == NULL) {
if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
rc = ll_shared_statahead_check(dir, *dentryp, ctx);
GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
sai = entry->se_sai;
+ else
+ sai = entry->se_sai;
LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
sai->sai_ls_all = 0;
sai->sai_max = sbi->ll_sa_max;
strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
+ sai->sai_pid = current->pid;
ctx = ll_sax_get(dir);
if (ctx == NULL) {
* Quick check whether the last character is digit.
*/
if (!isdigit(dname->name[dname->len - 1])) {
+ lli->lli_sa_pattern &= ~LSA_PATTERN_FN_PREDICT;
lli->lli_sa_match_count = 0;
return;
}
}
run_test 123j "-ENOENT error from batched statahead be handled correctly"
+test_123k() {
+ MDTEST=${MDTEST:=$(which mdtest 2> /dev/null || true)}
+ [[ -n "$MDTEST" ]] || skip_env "mdtest not found"
+
+ $LCTL get_param -n mdc.*.connect_flags | grep -q batch_rpc ||
+ skip "Server does not support batch RPC"
+
+ local enabled
+
+ enabled=$($LCTL get_param -n llite.*.enable_statahead_fname | head -n 1)
+ stack_trap "$LCTL set_param llite.*.enable_statahead_fname=$enabled"
+ $LCTL set_param llite.*.enable_statahead_fname=1
+
+ local np=2
+
+ mpi_run -np $np $MDTEST -C -F -n 1000 -d $DIR/$tdir
+ cancel_lru_locks mdc
+ $LCTL set_param llite.*.statahead_stats=clear
+ $LCTL set_param mdc.*.batch_stats=0
+ mpi_run -np $np $MDTEST -T -F -n 1000 -d $DIR/$tdir
+ #umount_client $MOUNT || error "failed to umount client"
+ echo "Sleep to wait statahead thread (ll_sa_xxx) to exit ..."
+ wait_update_facet client "pgrep ll_sa" "" 35 ||
+ error "ll_sa thread is still running"
+
+ $LCTL get_param -n llite.*.statahead_stats
+ $LCTL get_param -n mdc.*.batch_stats
+ ps -el | grep ll_sa
+
+ local count=$($LCTL get_param -n llite.*.statahead_stats |
+ awk '/fname.total:/ {print $2}')
+
+ [ $count == $np ] || error "File name pattern statahead not trigger"
+ count=$($LCTL get_param -n llite.*.statahead_stats |
+ awk '/hit.total:/ {print $2}')
+ # Hit ratio should be >= 75%
+ [ $count -gt $((np * 1000 * 75 / 100)) ] ||
+ error "hit total is too low: $count"
+}
+run_test 123k "Verify statahead work with mdtest shared stat() mode"
+
+test_123l() {
+ local dir=$DIR/$tdir
+ local cmd="touch $dir/$tfile.{000000..000100}"
+
+ $LCTL get_param -n mdc.*.connect_flags | grep -q batch_rpc ||
+ skip "Server does not support batch RPC"
+
+ stack_trap "rm -rf $dir"
+ mkdir -p $dir || error "failed to mkdir $dir"
+ eval $cmd
+
+ cancel_lru_locks mdc
+ $LCTL set_param llite.*.statahead_stats=clear
+ $LCTL set_param mdc.*.batch_stats=0
+
+ local max
+ local batch_max
+ local enabled
+
+ enabled=$($LCTL get_param -n llite.*.enable_statahead_fname | head -n 1)
+ max=$($LCTL get_param -n llite.*.statahead_max | head -n 1)
+ batch_max=$($LCTL get_param -n llite.*.statahead_batch_max | head -n 1)
+ stack_trap "$LCTL set_param llite.*.statahead_max=$max"
+ stack_trap "$LCTL set_param llite.*.statahead_batch_max=$batch_max"
+ stack_trap "$LCTL set_param llite.*.enable_statahead_fname=$enabled"
+ $LCTL set_param llite.*.enable_statahead_fname=1
+ $LCTL set_param llite.*.statahead_max=1024
+ $LCTL set_param llite.*.statahead_batch_max=32
+ $LCTL get_param -n llite.*.statahead_stats
+ #define OBD_FAIL_LLITE_STATAHEAD_PAUSE 0x1433
+ $LCTL set_param fail_loc=0x80001433 fail_val=35
+ ls $dir/* > /dev/null
+ $LCTL get_param -n llite.*.statahead_stats
+ $LCTL get_param -n mdc.*.batch_stats
+
+ echo "Sleep to wait the statahead thread (ll_sa_xxx) to exit ..."
+ wait_update_facet client "pgrep ll_sa" "" 35 ||
+ error "ll_sa thread is still running"
+ $LCTL get_param -n llite.*.statahead_stats
+}
+run_test 123l "Avoid panic when revalidate a local cached entry"
+
test_124a() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
$LCTL get_param -n mdc.*.connect_flags | grep -q lru_resize ||