*/
struct sa_entry {
/* link into sai_entries */
- struct list_head se_list;
+ struct list_head se_list;
/* link into sai hash table locally */
- struct list_head se_hash;
+ struct list_head se_hash;
/* entry index in the sai */
- __u64 se_index;
+ __u64 se_index;
/* low layer ldlm lock handle */
- __u64 se_handle;
+ __u64 se_handle;
/* entry status */
- se_state_t se_state;
+ se_state_t se_state;
/* entry size, contains name */
- int se_size;
+ int se_size;
/* pointer to the target inode */
- struct inode *se_inode;
+ struct inode *se_inode;
+ /* pointer to @sai per process struct */
+ struct ll_statahead_info *se_sai;
/* entry name */
- struct qstr se_qstr;
+ struct qstr se_qstr;
/* entry fid */
- struct lu_fid se_fid;
+ struct lu_fid se_fid;
};
static unsigned int sai_generation;
return val & LL_SA_CACHE_MASK;
}
-/* hash entry into sai_cache */
+/* hash entry into sax_cache */
static inline void
-sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
{
int i = sa_hash(entry->se_qstr.hash);
- spin_lock(&sai->sai_cache_lock[i]);
- list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
- spin_unlock(&sai->sai_cache_lock[i]);
+ spin_lock(&ctx->sax_cache_lock[i]);
+ list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
+ spin_unlock(&ctx->sax_cache_lock[i]);
}
/* unhash entry from sai_cache */
static inline void
-sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
{
int i = sa_hash(entry->se_qstr.hash);
- spin_lock(&sai->sai_cache_lock[i]);
+ spin_lock(&ctx->sax_cache_lock[i]);
list_del_init(&entry->se_hash);
- spin_unlock(&sai->sai_cache_lock[i]);
+ spin_unlock(&ctx->sax_cache_lock[i]);
}
static inline int agl_should_run(struct ll_statahead_info *sai,
ENTRY;
- entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
+ entry_size = sizeof(struct sa_entry) +
+ round_up(len + 1 /* for trailing NUL */, 4);
OBD_ALLOC(entry, entry_size);
if (unlikely(!entry))
RETURN(ERR_PTR(-ENOMEM));
len, name, entry, index);
entry->se_index = index;
+ entry->se_sai = sai;
entry->se_state = SA_ENTRY_INIT;
entry->se_size = entry_size;
entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
entry->se_qstr.len = len;
entry->se_qstr.name = dname;
- entry->se_fid = *fid;
- lli = ll_i2info(sai->sai_dentry->d_inode);
+ if (fid)
+ entry->se_fid = *fid;
+ lli = ll_i2info(sai->sai_dentry->d_inode);
spin_lock(&lli->lli_sa_lock);
INIT_LIST_HEAD(&entry->se_list);
- sa_rehash(sai, entry);
+ sa_rehash(lli->lli_sax, entry);
spin_unlock(&lli->lli_sa_lock);
atomic_inc(&sai->sai_cache_count);
}
/* free sa_entry, which should have been unhashed and not in any list */
-static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
+static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
{
CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
entry->se_qstr.len, entry->se_qstr.name, entry,
LASSERT(sa_unhashed(entry));
OBD_FREE(entry, entry->se_size);
- atomic_dec(&sai->sai_cache_count);
}
/*
* only scanner can remove the entry from cache.
*/
static struct sa_entry *
-sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
+sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
{
struct sa_entry *entry;
int i = sa_hash(qstr->hash);
- list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
+ spin_lock(&ctx->sax_cache_lock[i]);
+ list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
if (entry->se_qstr.hash == qstr->hash &&
entry->se_qstr.len == qstr->len &&
- memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
+ memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
+ spin_unlock(&ctx->sax_cache_lock[i]);
return entry;
+ }
}
+ spin_unlock(&ctx->sax_cache_lock[i]);
return NULL;
}
/* unhash and unlink sa_entry, and then free it */
static inline void
-sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
{
- struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
+ struct inode *dir = sai->sai_dentry->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_context *ctx = lli->lli_sax;
LASSERT(!sa_unhashed(entry));
LASSERT(!list_empty(&entry->se_list));
LASSERT(sa_ready(entry));
- sa_unhash(sai, entry);
+ sa_unhash(ctx, entry);
- spin_lock(&lli->lli_sa_lock);
+ if (!locked)
+ spin_lock(&lli->lli_sa_lock);
list_del_init(&entry->se_list);
spin_unlock(&lli->lli_sa_lock);
iput(entry->se_inode);
-
- sa_free(sai, entry);
+ atomic_dec(&sai->sai_cache_count);
+ sa_free(ctx, entry);
+ if (locked)
+ spin_lock(&lli->lli_sa_lock);
}
/* called by scanner after use, sa_entry will be killed */
sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
{
struct ll_inode_info *lli = ll_i2info(dir);
- struct sa_entry *tmp, *next;
+ struct sa_entry *tmp;
bool wakeup = false;
if (entry && entry->se_state == SA_ENTRY_SUCC) {
} else {
wakeup = true;
}
- } else {
+ } else if (sai) {
sai->sai_miss++;
sai->sai_consecutive_miss++;
wakeup = true;
}
if (entry)
- sa_kill(sai, entry);
+ sa_kill(sai, entry, false);
- /*
- * kill old completed entries, only scanner process does this, no need
- * to lock
- */
- list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
- if (!is_omitted_entry(sai, tmp->se_index))
- break;
- sa_kill(sai, tmp);
+ if (sai) {
+ /*
+ * kill old completed entries. Maybe kicking old entries can
+ * be ignored?
+ */
+ spin_lock(&lli->lli_sa_lock);
+ while ((tmp = list_first_entry_or_null(&sai->sai_entries,
+ struct sa_entry, se_list))) {
+ if (!is_omitted_entry(sai, tmp->se_index))
+ break;
+
+ /* ll_sa_lock is dropped by sa_kill(), restart list */
+ sa_kill(sai, tmp, true);
+ }
+ spin_unlock(&lli->lli_sa_lock);
}
spin_lock(&lli->lli_sa_lock);
}
}
+/* Allocate sax */
+static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
+{
+ struct ll_statahead_context *ctx;
+ int i;
+
+ ENTRY;
+
+ OBD_ALLOC_PTR(ctx);
+ if (ctx == NULL)
+ RETURN(NULL);
+
+ ctx->sax_inode = igrab(dir);
+ atomic_set(&ctx->sax_refcount, 1);
+ for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
+ INIT_LIST_HEAD(&ctx->sax_cache[i]);
+ spin_lock_init(&ctx->sax_cache_lock[i]);
+ }
+
+ RETURN(ctx);
+}
+
+static inline void ll_sax_free(struct ll_statahead_context *ctx)
+{
+ LASSERT(ctx->sax_inode != NULL);
+ iput(ctx->sax_inode);
+ OBD_FREE_PTR(ctx);
+}
+
+static inline void __ll_sax_get(struct ll_statahead_context *ctx)
+{
+ atomic_inc(&ctx->sax_refcount);
+}
+
+static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
+{
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_context *ctx = NULL;
+
+ spin_lock(&lli->lli_sa_lock);
+ ctx = lli->lli_sax;
+ if (ctx)
+ __ll_sax_get(ctx);
+ spin_unlock(&lli->lli_sa_lock);
+
+ return ctx;
+}
+
+static inline void ll_sax_put(struct inode *dir,
+ struct ll_statahead_context *ctx)
+{
+ struct ll_inode_info *lli = ll_i2info(dir);
+
+ if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
+ lli->lli_sai = NULL;
+ lli->lli_sax = NULL;
+ if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+ lli->lli_opendir_key = NULL;
+ lli->lli_opendir_pid = 0;
+ lli->lli_sa_enabled = 0;
+ }
+ lli->lli_sa_pattern = LSA_PATTERN_NONE;
+ spin_unlock(&lli->lli_sa_lock);
+
+ ll_sax_free(ctx);
+ }
+}
+
/* allocate sai */
static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
{
struct ll_statahead_info *sai;
struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
- int i;
ENTRY;
INIT_LIST_HEAD(&sai->sai_entries);
INIT_LIST_HEAD(&sai->sai_agls);
- for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
- INIT_LIST_HEAD(&sai->sai_cache[i]);
- spin_lock_init(&sai->sai_cache_lock[i]);
- }
atomic_set(&sai->sai_cache_count, 0);
spin_lock(&sai_generation_lock);
OBD_FREE_PTR(sai);
}
-/*
- * take refcount of sai if sai for @dir exists, which means statahead is on for
- * this directory.
- */
-static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
+static inline struct ll_statahead_info *
+__ll_sai_get(struct ll_statahead_info *sai)
{
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = NULL;
-
- spin_lock(&lli->lli_sa_lock);
- sai = lli->lli_sai;
- if (sai)
- atomic_inc(&sai->sai_refcount);
- spin_unlock(&lli->lli_sa_lock);
-
+ atomic_inc(&sai->sai_refcount);
return sai;
}
struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
- struct sa_entry *entry, *next;
struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
+ struct sa_entry *entry, *next;
lli->lli_sai = NULL;
spin_unlock(&lli->lli_sa_lock);
list_for_each_entry_safe(entry, next, &sai->sai_entries,
se_list)
- sa_kill(sai, entry);
+ sa_kill(sai, entry, false);
LASSERT(atomic_read(&sai->sai_cache_count) == 0);
LASSERT(agl_list_empty(sai));
struct req_capsule *pill = item->mop_pill;
struct inode *dir = item->mop_dir;
struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
+ struct ll_statahead_info *sai;
struct lookup_intent *it;
struct sa_entry *entry;
struct mdt_body *body;
entry = (struct sa_entry *)item->mop_cbdata;
LASSERT(entry->se_handle != 0);
+ sai = entry->se_sai;
it = &item->mop_it;
body = req_capsule_server_get(pill, &RMF_MDT_BODY);
if (!body)
child = entry->se_inode;
/* revalidate; unlinked and re-created with the same name */
- if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+ if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
+ !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
if (child) {
entry->se_inode = NULL;
iput(child);
struct lookup_intent *it = &item->mop_it;
struct inode *dir = item->mop_dir;
struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
struct work_struct *work = &item->mop_work;
+ struct ll_statahead_info *sai;
struct mdt_body *body;
struct inode *child;
__u64 handle = 0;
* because statahead thread will wait for all inflight RPC to finish,
* sai should be always valid, no need to refcount
*/
- LASSERT(sai != NULL);
LASSERT(entry != NULL);
+ sai = entry->se_sai;
+ LASSERT(sai != NULL);
CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
entry->se_qstr.len, entry->se_qstr.name, rc);
GOTO(out, rc = -EFAULT);
child = entry->se_inode;
- /* revalidate; unlinked and re-created with the same name */
- if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+ /*
+ * revalidate; unlinked and re-created with the same name.
+ * exclude the case where FID is zero as it was from statahead with
+ * regularized file name pattern and had no idea for the FID of the
+ * children file.
+ */
+ if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
+ !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
if (child) {
entry->se_inode = NULL;
iput(child);
RETURN(rc);
}
-static inline int sa_getattr(struct inode *dir, struct md_op_item *item)
+static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
+ struct md_op_item *item)
{
- struct ll_statahead_info *sai = ll_i2info(dir)->lli_sai;
int rc;
if (sa_has_batch_handle(sai))
if (IS_ERR(item))
RETURN(PTR_ERR(item));
- rc = sa_getattr(dir, item);
+ rc = sa_getattr(entry->se_sai, dir, item);
if (rc < 0)
sa_fini_data(item);
RETURN(1);
}
- rc = sa_getattr(dir, item);
+ rc = sa_getattr(entry->se_sai, dir, item);
if (rc < 0) {
entry->se_inode = NULL;
iput(inode);
}
/* async stat for file with @name */
-static void sa_statahead(struct dentry *parent, const char *name, int len,
- const struct lu_fid *fid)
+static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
+ const char *name, int len, const struct lu_fid *fid)
{
struct inode *dir = parent->d_inode;
- struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_statahead_info *sai = lli->lli_sai;
struct dentry *dentry = NULL;
struct sa_entry *entry;
int rc;
/* async glimpse (agl) thread main function */
static int ll_agl_thread(void *arg)
{
- struct dentry *parent = (struct dentry *)arg;
- struct inode *dir = parent->d_inode;
- struct ll_inode_info *plli = ll_i2info(dir);
- struct ll_inode_info *clli;
/*
* We already own this reference, so it is safe to take it
* without a lock.
*/
- struct ll_statahead_info *sai = plli->lli_sai;
+ struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
+ struct dentry *parent = sai->sai_dentry;
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *plli = ll_i2info(dir);
+ struct ll_inode_info *clli;
ENTRY;
sai, parent);
plli = ll_i2info(parent->d_inode);
- task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
+ task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
plli->lli_opendir_pid);
if (IS_ERR(task)) {
CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
sai->sai_agl_task = task;
atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
/* Get an extra reference that the thread holds */
- ll_sai_get(d_inode(parent));
+ __ll_sai_get(sai);
wake_up_process(task);
EXIT;
}
-/* statahead thread main function */
-static int ll_statahead_thread(void *arg)
+static int ll_statahead_by_list(struct dentry *parent)
{
- struct dentry *parent = (struct dentry *)arg;
struct inode *dir = parent->d_inode;
struct ll_inode_info *lli = ll_i2info(dir);
- struct ll_sb_info *sbi = ll_i2sbi(dir);
struct ll_statahead_info *sai = lli->lli_sai;
- int first = 0;
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
struct md_op_data *op_data;
struct page *page = NULL;
- struct lu_batch *bh = NULL;
__u64 pos = 0;
+ int first = 0;
int rc = 0;
ENTRY;
CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
sai, parent);
- sai->sai_max_batch_count = sbi->ll_sa_batch_max;
- if (sai->sai_max_batch_count) {
- bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
- sai->sai_max_batch_count);
- if (IS_ERR(bh))
- GOTO(out_stop_agl, rc = PTR_ERR(bh));
- }
-
- sai->sai_bh = bh;
OBD_ALLOC_PTR(op_data);
if (!op_data)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
/* matches smp_store_release() in ll_deauthorize_statahead() */
while (pos != MDS_DIR_END_OFF && smp_load_acquire(&sai->sai_task)) {
namelen = lltr.len;
}
- sa_statahead(parent, name, namelen, &fid);
+ sa_statahead(sai, parent, name, namelen, &fid);
llcrypt_fname_free_buffer(&lltr);
}
}
ll_finish_md_op_data(op_data);
+ RETURN(rc);
+}
+
+static int ll_statahead_by_fname(struct ll_statahead_info *sai,
+ struct dentry *parent)
+{
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ size_t max_len;
+ size_t len;
+ char *fname;
+ char *ptr;
+ int rc = 0;
+ __u64 i = 0;
+
+ ENTRY;
+
+ CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
+ sbi->ll_fsname, parent, sai->sai_fname);
+
+ OBD_ALLOC(fname, NAME_MAX);
+ if (fname == NULL)
+ RETURN(-ENOMEM);
+
+ len = strlen(sai->sai_fname);
+ memcpy(fname, sai->sai_fname, len);
+ max_len = sizeof(sai->sai_fname) - len;
+ ptr = fname + len;
+
+ /* matches smp_store_release() in ll_deauthorize_statahead() */
+ while (smp_load_acquire(&sai->sai_task)) {
+ size_t numlen;
+
+ numlen = snprintf(ptr, max_len, "%llu",
+ sai->sai_fstart + i);
+
+ while (({set_current_state(TASK_IDLE);
+ /*
+ * matches smp_store_release() in
+ * ll_deauthorize_statahead()
+ */
+ smp_load_acquire(&sai->sai_task); })) {
+ spin_lock(&lli->lli_agl_lock);
+ while (sa_sent_full(sai) && !agl_list_empty(sai)) {
+ struct ll_inode_info *clli;
+
+ __set_current_state(TASK_RUNNING);
+ clli = agl_first_entry(sai);
+ list_del_init(&clli->lli_agl_list);
+ spin_unlock(&lli->lli_agl_lock);
+
+ ll_agl_trigger(&clli->lli_vfs_inode, sai);
+ cond_resched();
+ spin_lock(&lli->lli_agl_lock);
+ }
+ spin_unlock(&lli->lli_agl_lock);
+
+ if (!sa_sent_full(sai))
+ break;
+ schedule();
+ }
+ __set_current_state(TASK_RUNNING);
+
+ sa_statahead(sai, parent, fname, len + numlen, NULL);
+ if (++i >= sai->sai_fend)
+ break;
+ }
+
+ OBD_FREE(fname, NAME_MAX);
+ RETURN(rc);
+}
+
+/* statahead thread main function */
+static int ll_statahead_thread(void *arg)
+{
+ struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
+ struct dentry *parent = sai->sai_dentry;
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct lu_batch *bh = NULL;
+ int rc = 0;
+
+ ENTRY;
+
+ CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
+ sai, parent);
+
+ sai->sai_max_batch_count = sbi->ll_sa_batch_max;
+ if (sai->sai_max_batch_count) {
+ bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
+ sai->sai_max_batch_count);
+ if (IS_ERR(bh))
+ GOTO(out_stop_agl, rc = PTR_ERR(bh));
+ }
+
+ sai->sai_bh = bh;
+
+ switch (lli->lli_sa_pattern) {
+ case LSA_PATTERN_LIST:
+ rc = ll_statahead_by_list(parent);
+ break;
+ case LSA_PATTERN_FNAME:
+ rc = ll_statahead_by_fname(sai, parent);
+ break;
+ default:
+ rc = -EFAULT;
+ break;
+ }
+
if (rc < 0) {
spin_lock(&lli->lli_sa_lock);
sai->sai_task = NULL;
__set_current_state(TASK_RUNNING);
EXIT;
-out:
+
if (bh) {
rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
sai->sai_bh = NULL;
/* in case we're not woken up, timeout wait */
msleep(125);
- CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
- sbi->ll_fsname, sai, parent);
+ CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
+ sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
spin_lock(&lli->lli_sa_lock);
sai->sai_task = NULL;
atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
ll_sai_put(sai);
+ ll_sax_put(dir, lli->lli_sax);
return rc;
}
* if lli_sai is not NULL, it means previous statahead is not
* finished yet, we'd better not start a new statahead for now.
*/
- LASSERT(lli->lli_opendir_pid == 0);
lli->lli_opendir_key = key;
lli->lli_opendir_pid = current->pid;
lli->lli_sa_enabled = 1;
spin_unlock(&lli->lli_sa_lock);
}
+static void ll_deauthorize_statahead_fname(struct inode *dir, void *key)
+{
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_file_data *fd = (struct ll_file_data *)key;
+ struct ll_statahead_info *sai = fd->fd_sai;
+
+ if (sai == NULL)
+ return;
+
+ spin_lock(&lli->lli_sa_lock);
+ if (sai->sai_task) {
+ struct task_struct *task = sai->sai_task;
+
+ sai->sai_task = NULL;
+ wake_up_process(task);
+ }
+ fd->fd_sai = NULL;
+ spin_unlock(&lli->lli_sa_lock);
+ ll_sai_put(sai);
+ LASSERT(lli->lli_sax != NULL);
+ ll_sax_put(dir, lli->lli_sax);
+}
+
/*
* deauthorize opened dir handle @key to statahead, and notify statahead thread
* to quit if it's running.
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai;
- LASSERT(lli->lli_opendir_key == key);
LASSERT(lli->lli_opendir_pid != 0);
CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
PFID(&lli->lli_fid));
+ if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+ ll_deauthorize_statahead_fname(dir, key);
+ return;
+ }
+
+ LASSERT(lli->lli_opendir_key == key);
spin_lock(&lli->lli_sa_lock);
lli->lli_opendir_key = NULL;
lli->lli_opendir_pid = 0;
* \retval negative number upon error
*/
static int revalidate_statahead_dentry(struct inode *dir,
- struct ll_statahead_info *sai,
+ struct ll_statahead_context *ctx,
struct dentry **dentryp,
bool unplug)
{
struct sa_entry *entry = NULL;
struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai = lli->lli_sai;
int rc = 0;
ENTRY;
- if ((*dentryp)->d_name.name[0] == '.') {
+ if (sai && (*dentryp)->d_name.name[0] == '.') {
if (sai->sai_ls_all ||
sai->sai_miss_hidden >= sai->sai_skip_hidden) {
/*
if (unplug)
GOTO(out, rc = 1);
- entry = sa_get(sai, &(*dentryp)->d_name);
+ entry = sa_get(ctx, &(*dentryp)->d_name);
if (!entry)
GOTO(out, rc = -EAGAIN);
+ if (lli->lli_sa_pattern == LSA_PATTERN_LIST)
+ LASSERT(sai == entry->se_sai);
+ else if (lli->lli_sa_pattern == LSA_PATTERN_FNAME)
+ sai = entry->se_sai;
+
+ LASSERT(sai != NULL);
if (!sa_ready(entry)) {
spin_lock(&lli->lli_sa_lock);
sai->sai_index_wait = entry->se_index;
int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = NULL;
+ struct ll_statahead_context *ctx = NULL;
struct dentry *parent = dentry->d_parent;
struct task_struct *task;
struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
if (!sai)
GOTO(out, rc = -ENOMEM);
+ ctx = ll_sax_alloc(dir);
+ if (!ctx)
+ GOTO(out, rc = -ENOMEM);
+
sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
/*
*/
spin_lock(&lli->lli_sa_lock);
if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
- lli->lli_opendir_pid != current->pid)) {
+ lli->lli_opendir_pid != current->pid ||
+ lli->lli_sa_pattern != LSA_PATTERN_NONE)) {
spin_unlock(&lli->lli_sa_lock);
GOTO(out, rc = -EPERM);
}
lli->lli_sai = sai;
+ lli->lli_sax = ctx;
+ lli->lli_sa_pattern = LSA_PATTERN_LIST;
spin_unlock(&lli->lli_sa_lock);
CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
current->pid, parent);
- task = kthread_create_on_node(ll_statahead_thread, parent, node,
+ task = kthread_create_on_node(ll_statahead_thread, sai, node,
"ll_sa_%u", lli->lli_opendir_pid);
if (IS_ERR(task)) {
spin_lock(&lli->lli_sa_lock);
if (sai)
ll_sai_free(sai);
+
+ if (ctx)
+ ll_sax_free(ctx);
+
if (first != LS_NOT_FIRST_DE)
atomic_dec(&sbi->ll_sa_running);
static inline bool ll_statahead_started(struct inode *dir, bool agl)
{
struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_context *ctx;
struct ll_statahead_info *sai;
spin_lock(&lli->lli_sa_lock);
+ ctx = lli->lli_sax;
sai = lli->lli_sai;
if (sai && (sai->sai_agl_task != NULL) != agl)
CDEBUG(D_READA,
sai->sai_agl_task != NULL, agl);
spin_unlock(&lli->lli_sa_lock);
- return !!sai;
+ return !!ctx;
}
/**
int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
bool unplug)
{
- struct ll_statahead_info *sai;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_context *ctx;
+ struct ll_statahead_info *sai = NULL;
int rc = 0;
- sai = ll_sai_get(dir);
- if (sai) {
- rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
+ spin_lock(&lli->lli_sa_lock);
+ ctx = lli->lli_sax;
+ if (ctx) {
+ sai = lli->lli_sai;
+ if (sai) {
+ atomic_inc(&sai->sai_refcount);
+ } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
+ spin_unlock(&lli->lli_sa_lock);
+ return 0;
+ }
+ __ll_sax_get(ctx);
+ }
+ spin_unlock(&lli->lli_sa_lock);
+ if (ctx) {
+ rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
*dentryp, rc);
- ll_sai_put(sai);
+ if (sai)
+ ll_sai_put(sai);
+ ll_sax_put(dir, ctx);
}
return rc;
}
+
+int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
+{
+ int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
+ struct ll_file_data *fd = file->private_data;
+ struct dentry *dentry = file_dentry(file);
+ struct inode *dir = dentry->d_inode;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_statahead_info *sai = NULL;
+ struct ll_statahead_context *ctx = NULL;
+ struct task_struct *task;
+ bool agl = true;
+ int rc;
+
+ ENTRY;
+
+ if (sbi->ll_sa_max == 0)
+ RETURN(0);
+
+ if (!S_ISDIR(dir->i_mode))
+ RETURN(-EINVAL);
+
+ if (fd->fd_sai) {
+ rc = -EALREADY;
+ CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
+ sbi->ll_fsname, dentry, rc);
+ RETURN(rc);
+ }
+
+ if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
+ sbi->ll_sa_running_max)) {
+ CDEBUG(D_READA,
+ "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
+ GOTO(out, rc = -EMFILE);
+ }
+
+ sai = ll_sai_alloc(dentry);
+ if (sai == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ sai->sai_fstart = ladvise->lla_start;
+ sai->sai_fend = ladvise->lla_end;
+ sai->sai_ls_all = 0;
+ sai->sai_max = sbi->ll_sa_max;
+ strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
+
+ ctx = ll_sax_get(dir);
+ if (ctx == NULL) {
+ ctx = ll_sax_alloc(dir);
+ if (ctx == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ spin_lock(&lli->lli_sa_lock);
+ if (unlikely(lli->lli_sax)) {
+ struct ll_statahead_context *tmp = ctx;
+
+ if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
+ lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
+ lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+ ctx = lli->lli_sax;
+ __ll_sax_get(ctx);
+ fd->fd_sai = __ll_sai_get(sai);
+ rc = 0;
+ } else {
+ rc = -EINVAL;
+ CWARN("%s: pattern %X is not FNAME: rc = %d\n",
+ sbi->ll_fsname, lli->lli_sa_pattern, rc);
+ }
+
+ spin_unlock(&lli->lli_sa_lock);
+ ll_sax_free(tmp);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+ lli->lli_sax = ctx;
+ fd->fd_sai = __ll_sai_get(sai);
+ spin_unlock(&lli->lli_sa_lock);
+ }
+ } else {
+ spin_lock(&lli->lli_sa_lock);
+ if (!(lli->lli_sa_pattern == LSA_PATTERN_FNAME ||
+ lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
+ spin_unlock(&lli->lli_sa_lock);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ lli->lli_sa_pattern = LSA_PATTERN_FNAME;
+ fd->fd_sai = __ll_sai_get(sai);
+ spin_unlock(&lli->lli_sa_lock);
+ }
+
+ __ll_sax_get(ctx);
+ CDEBUG(D_READA,
+ "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
+ current->pid, dentry, sai, ctx);
+
+ task = kthread_create_on_node(ll_statahead_thread, sai, node,
+ "ll_sa_%u", current->pid);
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CERROR("%s: cannot start ll_sa thread: rc = %d\n",
+ sbi->ll_fsname, rc);
+ GOTO(out, rc);
+ }
+
+ if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
+ ll_start_agl(dentry, sai);
+
+ atomic_inc(&sbi->ll_sa_total);
+ sai->sai_task = task;
+ wake_up_process(task);
+
+ RETURN(0);
+out:
+ if (fd->fd_sai) {
+ ll_sai_put(sai);
+ ll_sax_put(dir, ctx);
+ fd->fd_sai = NULL;
+ }
+
+ if (sai)
+ ll_sai_free(sai);
+
+ if (ctx)
+ ll_sax_free(ctx);
+
+ atomic_dec(&sbi->ll_sa_running);
+ RETURN(rc);
+}