/* link into sai hash table locally */
cfs_list_t se_hash;
/* entry reference count */
- cfs_atomic_t se_refcount;
+ atomic_t se_refcount;
/* entry index in the sai */
__u64 se_index;
/* low layer ldlm lock handle */
static inline int sa_sent_full(struct ll_statahead_info *sai)
{
- return cfs_atomic_read(&sai->sai_cache_count) >= sai->sai_max;
+ return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
}
static inline int sa_received_empty(struct ll_statahead_info *sai)
entry->se_index = index;
- /*
- * Statahead entry reference rules:
- *
- * 1) When statahead entry is initialized, its reference is set as 2.
- * One reference is used by the directory scanner. When the scanner
- * searches the statahead cache for the given name, it can perform
- * lockless hash lookup (only the scanner can remove entry from hash
- * list), and once found, it needn't to call "atomic_inc()" for the
- * entry reference. So the performance is improved. After using the
- * statahead entry, the scanner will call "atomic_dec()" to drop the
- * reference held when initialization. If it is the last reference,
- * the statahead entry will be freed.
- *
- * 2) All other threads, including statahead thread and ptlrpcd thread,
- * when they process the statahead entry, the reference for target
- * should be held to guarantee the entry will not be released by the
- * directory scanner. After processing the entry, these threads will
- * drop the entry reference. If it is the last reference, the entry
- * will be freed.
- *
- * The second reference when initializes the statahead entry is used
- * by the statahead thread, following the rule 2).
- */
- cfs_atomic_set(&entry->se_refcount, 2);
- entry->se_stat = SA_ENTRY_INIT;
- entry->se_size = entry_size;
- dname = (char *)entry + sizeof(struct ll_sa_entry);
- memcpy(dname, name, len);
- dname[len] = 0;
- entry->se_qstr.hash = full_name_hash(name, len);
- entry->se_qstr.len = len;
- entry->se_qstr.name = dname;
-
- lli = ll_i2info(sai->sai_inode);
+ /*
+ * Statahead entry reference rules:
+ *
+ * 1) When statahead entry is initialized, its reference is set as 2.
+ * One reference is used by the directory scanner. When the scanner
+ * searches the statahead cache for the given name, it can perform
+ * lockless hash lookup (only the scanner can remove entry from hash
+ * list), and once found, it needn't to call "atomic_inc()" for the
+ * entry reference. So the performance is improved. After using the
+ * statahead entry, the scanner will call "atomic_dec()" to drop the
+ * reference held when initialization. If it is the last reference,
+ * the statahead entry will be freed.
+ *
+ * 2) All other threads, including statahead thread and ptlrpcd thread,
+ * when they process the statahead entry, the reference for target
+ * should be held to guarantee the entry will not be released by the
+ * directory scanner. After processing the entry, these threads will
+ * drop the entry reference. If it is the last reference, the entry
+ * will be freed.
+ *
+ * The second reference when initializes the statahead entry is used
+ * by the statahead thread, following the rule 2).
+ */
+ atomic_set(&entry->se_refcount, 2);
+ entry->se_stat = SA_ENTRY_INIT;
+ entry->se_size = entry_size;
+ dname = (char *)entry + sizeof(struct ll_sa_entry);
+ memcpy(dname, name, len);
+ dname[len] = 0;
+ entry->se_qstr.hash = full_name_hash(name, len);
+ entry->se_qstr.len = len;
+ entry->se_qstr.name = dname;
+
+ lli = ll_i2info(sai->sai_inode);
spin_lock(&lli->lli_sa_lock);
cfs_list_add_tail(&entry->se_link, &sai->sai_entries);
CFS_INIT_LIST_HEAD(&entry->se_list);
ll_sa_entry_enhash(sai, entry);
spin_unlock(&lli->lli_sa_lock);
- cfs_atomic_inc(&sai->sai_cache_count);
+ atomic_inc(&sai->sai_cache_count);
RETURN(entry);
}
cfs_list_for_each_entry(entry, &sai->sai_entries, se_link) {
if (entry->se_index == index) {
LASSERT(atomic_read(&entry->se_refcount) > 0);
- cfs_atomic_inc(&entry->se_refcount);
+ atomic_inc(&entry->se_refcount);
return entry;
}
if (entry->se_index > index)
static void ll_sa_entry_put(struct ll_statahead_info *sai,
struct ll_sa_entry *entry)
{
- if (cfs_atomic_dec_and_test(&entry->se_refcount)) {
+ if (atomic_dec_and_test(&entry->se_refcount)) {
CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n",
entry->se_qstr.len, entry->se_qstr.name, entry,
entry->se_index);
iput(entry->se_inode);
OBD_FREE(entry, entry->se_size);
- cfs_atomic_dec(&sai->sai_cache_count);
+ atomic_dec(&sai->sai_cache_count);
}
}
if (!sai)
RETURN(NULL);
- cfs_atomic_set(&sai->sai_refcount, 1);
+ atomic_set(&sai->sai_refcount, 1);
spin_lock(&sai_generation_lock);
sai->sai_generation = ++sai_generation;
CFS_INIT_LIST_HEAD(&sai->sai_cache[i]);
spin_lock_init(&sai->sai_cache_lock[i]);
}
- cfs_atomic_set(&sai->sai_cache_count, 0);
+ atomic_set(&sai->sai_cache_count, 0);
RETURN(sai);
}
static inline struct ll_statahead_info *
ll_sai_get(struct ll_statahead_info *sai)
{
- cfs_atomic_inc(&sai->sai_refcount);
- return sai;
+ atomic_inc(&sai->sai_refcount);
+ return sai;
}
static void ll_sai_put(struct ll_statahead_info *sai)
{
- struct inode *inode = sai->sai_inode;
- struct ll_inode_info *lli = ll_i2info(inode);
- ENTRY;
+ struct inode *inode = sai->sai_inode;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ ENTRY;
- if (cfs_atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
- struct ll_sa_entry *entry, *next;
+ if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
+ struct ll_sa_entry *entry, *next;
- if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) {
- /* It is race case, the interpret callback just hold
- * a reference count */
+ if (unlikely(atomic_read(&sai->sai_refcount) > 0)) {
+ /* It is race case, the interpret callback just hold
+ * a reference count */
spin_unlock(&lli->lli_sa_lock);
RETURN_EXIT;
}
LASSERT(sa_received_empty(sai));
LASSERT(list_empty(&sai->sai_entries_stated));
- LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0);
- LASSERT(agl_list_empty(sai));
+ LASSERT(atomic_read(&sai->sai_cache_count) == 0);
+ LASSERT(agl_list_empty(sai));
- iput(inode);
- OBD_FREE_PTR(sai);
- }
+ iput(inode);
+ OBD_FREE_PTR(sai);
+ }
- EXIT;
+ EXIT;
}
/* Do NOT forget to drop inode refcount when into sai_entries_agl. */
RETURN_EXIT;
}
entry = sa_first_received_entry(sai);
- cfs_atomic_inc(&entry->se_refcount);
+ atomic_inc(&entry->se_refcount);
cfs_list_del_init(&entry->se_list);
spin_unlock(&lli->lli_sa_lock);
struct l_wait_info lwi = { 0 };
ENTRY;
- CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n",
- current_pid(), parent->d_name.len, parent->d_name.name);
+ thread->t_pid = current_pid();
+ CDEBUG(D_READA, "agl thread started: sai %p, parent %.*s\n",
+ sai, parent->d_name.len, parent->d_name.name);
atomic_inc(&sbi->ll_agl_total);
spin_lock(&plli->lli_agl_lock);
sai->sai_agl_valid = 1;
- thread_set_flags(thread, SVC_RUNNING);
+ if (thread_is_init(thread))
+ /* If someone else has changed the thread state
+ * (e.g. already changed to SVC_STOPPING), we can't just
+ * blindly overwrite that setting. */
+ thread_set_flags(thread, SVC_RUNNING);
spin_unlock(&plli->lli_agl_lock);
wake_up(&thread->t_ctl_waitq);
spin_unlock(&plli->lli_agl_lock);
wake_up(&thread->t_ctl_waitq);
ll_sai_put(sai);
- CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n",
- current_pid(), parent->d_name.len, parent->d_name.name);
+ CDEBUG(D_READA, "agl thread stopped: sai %p, parent %.*s\n",
+ sai, parent->d_name.len, parent->d_name.name);
RETURN(0);
}
struct task_struct *task;
ENTRY;
- CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n",
- current_pid(), parent->d_name.len, parent->d_name.name);
+ CDEBUG(D_READA, "start agl thread: sai %p, parent %.*s\n",
+ sai, parent->d_name.len, parent->d_name.name);
plli = ll_i2info(parent->d_inode);
task = kthread_run(ll_agl_thread, parent,
struct ll_dir_chain chain;
struct l_wait_info lwi = { 0 };
struct lu_dirent *ent;
+ struct page *page = NULL;
ENTRY;
- CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n",
- current_pid(), parent->d_name.len, parent->d_name.name);
+ thread->t_pid = current_pid();
+ CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
+ sai, parent->d_name.len, parent->d_name.name);
op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
LUSTRE_OPC_ANY, dir);
atomic_inc(&sbi->ll_sa_total);
spin_lock(&plli->lli_sa_lock);
- thread_set_flags(thread, SVC_RUNNING);
+ if (thread_is_init(thread))
+ /* If someone else has changed the thread state
+ * (e.g. already changed to SVC_STOPPING), we can't just
+ * blindly overwrite that setting. */
+ thread_set_flags(thread, SVC_RUNNING);
spin_unlock(&plli->lli_sa_lock);
wake_up(&thread->t_ctl_waitq);
ll_dir_chain_init(&chain);
- for (ent = ll_dir_entry_start(dir, op_data);
+ for (ent = ll_dir_entry_start(dir, op_data, &page);
ent != NULL && !IS_ERR(ent);
- ent = ll_dir_entry_next(dir, op_data, ent)) {
+ ent = ll_dir_entry_next(dir, op_data, ent, &page)) {
__u64 hash;
int namelen;
char *name;
ll_statahead_one(parent, name, namelen);
}
- if (ent != NULL && !IS_ERR(ent))
- ll_dir_entry_end(dir, op_data, ent);
+ if (page != NULL) {
+ kunmap(page);
+ page_cache_release(page);
+ }
/*
* End of directory reached.
spin_unlock(&plli->lli_agl_lock);
wake_up(&agl_thread->t_ctl_waitq);
- CDEBUG(D_READA, "stop agl thread: [pid %d]\n",
- current_pid());
+ CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
+ sai, (unsigned int)agl_thread->t_pid);
l_wait_event(agl_thread->t_ctl_waitq,
thread_is_stopped(agl_thread),
&lwi);
wake_up(&thread->t_ctl_waitq);
ll_sai_put(sai);
dput(parent);
- CDEBUG(D_READA, "statahead thread stopped: [pid %d] [parent %.*s]\n",
- current_pid(), parent->d_name.len, parent->d_name.name);
+ CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n",
+ sai, parent->d_name.len, parent->d_name.name);
return rc;
}
spin_unlock(&lli->lli_sa_lock);
wake_up(&thread->t_ctl_waitq);
- CDEBUG(D_READA, "stop statahead thread: [pid %d]\n",
- current_pid());
+ CDEBUG(D_READA, "stop statahead thread: sai %p pid %u\n",
+ lli->lli_sai, (unsigned int)thread->t_pid);
l_wait_event(thread->t_ctl_waitq,
thread_is_stopped(thread),
&lwi);
struct md_op_data *op_data;
int dot_de;
struct lu_dirent *ent;
+ struct page *page = NULL;
int rc = LS_NONE_FIRST_DE;
ENTRY;
op_data->op_max_pages =
ll_i2sbi(dir)->ll_md_brw_size >> PAGE_CACHE_SHIFT;
- for (ent = ll_dir_entry_start(dir, op_data);
+ for (ent = ll_dir_entry_start(dir, op_data, &page);
ent != NULL && !IS_ERR(ent);
- ent = ll_dir_entry_next(dir, op_data, ent)) {
+ ent = ll_dir_entry_next(dir, op_data, ent, &page)) {
__u64 hash;
int namelen;
char *name;
break;
}
EXIT;
- if (ent != NULL && !IS_ERR(ent))
- ll_dir_entry_end(dir, op_data, ent);
+
+ if (page != NULL) {
+ kunmap(page);
+ page_cache_release(page);
+ }
ll_finish_md_op_data(op_data);
out:
ll_dir_chain_fini(&chain);
} else {
struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
- sai->sai_miss++;
- sai->sai_consecutive_miss++;
- if (sa_low_hit(sai) && thread_is_running(thread)) {
- atomic_inc(&sbi->ll_sa_wrong);
+ sai->sai_miss++;
+ sai->sai_consecutive_miss++;
+ if (sa_low_hit(sai) && thread_is_running(thread)) {
+ atomic_inc(&sbi->ll_sa_wrong);
CDEBUG(D_READA, "Statahead for dir "DFID" hit "
"ratio too low: hit/miss "LPU64"/"LPU64
", sent/replied "LPU64"/"LPU64", stopping "
- "statahead thread: pid %d\n",
+ "statahead thread\n",
PFID(&lli->lli_fid), sai->sai_hit,
sai->sai_miss, sai->sai_sent,
- sai->sai_replied, current_pid());
+ sai->sai_replied);
spin_lock(&lli->lli_sa_lock);
if (!thread_is_stopped(thread))
thread_set_flags(thread, SVC_STOPPING);
struct ll_sa_entry *entry;
struct ptlrpc_thread *thread;
struct l_wait_info lwi = { 0 };
+ struct task_struct *task;
int rc = 0;
struct ll_inode_info *plli;
ENTRY;
RETURN(entry ? 1 : -EAGAIN);
}
- /* if statahead is busy in readdir, help it do post-work */
- while (!ll_sa_entry_stated(entry) &&
- sai->sai_in_readpage &&
- !sa_received_empty(sai))
- ll_post_statahead(sai);
-
if (!ll_sa_entry_stated(entry)) {
sai->sai_index_wait = entry->se_index;
lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
(*dentryp)->d_name.name,
PFID(ll_inode2fid((*dentryp)->d_inode)),
PFID(ll_inode2fid(inode)));
- ll_sai_unplug(sai, entry);
+ ll_intent_release(&it);
+ ll_sai_unplug(sai, entry);
RETURN(-ESTALE);
} else {
iput(inode);
GOTO(out, rc = -EAGAIN);
}
- CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
- current_pid(), parent->d_name.len, parent->d_name.name);
+ CDEBUG(D_READA, "start statahead thread: sai %p, parent %.*s\n",
+ sai, parent->d_name.len, parent->d_name.name);
+ /* The sai buffer already has one reference taken at allocation time,
+ * but as soon as we expose the sai by attaching it to the lli that
+ * default reference can be dropped by another thread calling
+ * ll_stop_statahead. We need to take a local reference to protect
+ * the sai buffer while we intend to access it. */
+ ll_sai_get(sai);
lli->lli_sai = sai;
plli = ll_i2info(parent->d_inode);
- rc = PTR_ERR(kthread_run(ll_statahead_thread, parent,
- "ll_sa_%u", plli->lli_opendir_pid));
+ task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
+ plli->lli_opendir_pid);
thread = &sai->sai_thread;
- if (IS_ERR_VALUE(rc)) {
- CERROR("can't start ll_sa thread, rc: %d\n", rc);
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CERROR("cannot start ll_sa thread: rc = %d\n", rc);
dput(parent);
lli->lli_opendir_key = NULL;
thread_set_flags(thread, SVC_STOPPED);
thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);
+ /* Drop both our own local reference and the default
+ * reference from allocation time. */
ll_sai_put(sai);
+ ll_sai_put(sai);
LASSERT(lli->lli_sai == NULL);
RETURN(-EAGAIN);
}
l_wait_event(thread->t_ctl_waitq,
thread_is_running(thread) || thread_is_stopped(thread),
&lwi);
+ ll_sai_put(sai);
/*
* We don't stat-ahead for the first dirent since we are already in