+static int ll_statahead_thread(void *arg)
+{
+ struct dentry *parent = (struct dentry *)arg;
+ struct inode *dir = parent->d_inode;
+ struct ll_inode_info *plli = ll_i2info(dir);
+ struct ll_inode_info *clli;
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai);
+ struct ptlrpc_thread *thread = &sai->sai_thread;
+ struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread;
+ int first = 0;
+ int rc = 0;
+ struct md_op_data *op_data;
+ struct ll_dir_chain chain;
+ struct l_wait_info lwi = { 0 };
+ struct page *page = NULL;
+ __u64 pos = 0;
+ ENTRY;
+
+ thread->t_pid = current_pid();
+ CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
+ sai, parent->d_name.len, parent->d_name.name);
+
+ op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
+ LUSTRE_OPC_ANY, dir);
+ if (IS_ERR(op_data))
+ GOTO(out_put, rc = PTR_ERR(op_data));
+
+ op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
+
+ if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
+ ll_start_agl(parent, sai);
+
+ atomic_inc(&sbi->ll_sa_total);
+ spin_lock(&plli->lli_sa_lock);
+ if (thread_is_init(thread))
+ /* If someone else has changed the thread state
+ * (e.g. already changed to SVC_STOPPING), we can't just
+ * blindly overwrite that setting. */
+ thread_set_flags(thread, SVC_RUNNING);
+ spin_unlock(&plli->lli_sa_lock);
+ wake_up(&thread->t_ctl_waitq);
+
+ ll_dir_chain_init(&chain);
+ page = ll_get_dir_page(dir, op_data, pos, &chain);
+ while (1) {
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+
+ if (IS_ERR(page)) {
+ rc = PTR_ERR(page);
+ CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
+ "/"LPU64" opendir_pid = %u: rc = %d\n",
+ PFID(ll_inode2fid(dir)), pos, sai->sai_index,
+ plli->lli_opendir_pid, rc);
+ GOTO(out, rc);
+ }
+
+ dp = page_address(page);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ __u64 hash;
+ int namelen;
+ char *name;
+
+ hash = le64_to_cpu(ent->lde_hash);
+ if (unlikely(hash < pos))
+ /*
+ * Skip until we find target hash value.
+ */
+ continue;
+
+ namelen = le16_to_cpu(ent->lde_namelen);
+ if (unlikely(namelen == 0))
+ /*
+ * Skip dummy record.
+ */
+ continue;
+
+ name = ent->lde_name;
+ if (name[0] == '.') {
+ if (namelen == 1) {
+ /*
+ * skip "."
+ */
+ continue;
+ } else if (name[1] == '.' && namelen == 2) {
+ /*
+ * skip ".."
+ */
+ continue;
+ } else if (!sai->sai_ls_all) {
+ /*
+ * skip hidden files.
+ */
+ sai->sai_skip_hidden++;
+ continue;
+ }
+ }
+
+ /*
+ * don't stat-ahead first entry.
+ */
+ if (unlikely(++first == 1))
+ continue;
+
+keep_it:
+ l_wait_event(thread->t_ctl_waitq,
+ !sa_sent_full(sai) ||
+ !sa_received_empty(sai) ||
+ !agl_list_empty(sai) ||
+ !thread_is_running(thread),
+ &lwi);
+
+interpret_it:
+ while (!sa_received_empty(sai))
+ ll_post_statahead(sai);
+
+ if (unlikely(!thread_is_running(thread))) {
+ ll_release_page(dir, page, false);
+ GOTO(out, rc = 0);
+ }
+
+ /* If no window for metadata statahead, but there are
+ * some AGL entries to be triggered, then try to help
+ * to process the AGL entries. */
+ if (sa_sent_full(sai)) {
+ spin_lock(&plli->lli_agl_lock);
+ while (!agl_list_empty(sai)) {
+ clli = agl_first_entry(sai);
+ list_del_init(&clli->lli_agl_list);
+ spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode,
+ sai);
+
+ if (!sa_received_empty(sai))
+ goto interpret_it;
+
+ if (unlikely(
+ !thread_is_running(thread))) {
+ ll_release_page(dir, page,
+ false);
+ GOTO(out, rc = 0);
+ }
+
+ if (!sa_sent_full(sai))
+ goto do_it;
+
+ spin_lock(&plli->lli_agl_lock);
+ }
+ spin_unlock(&plli->lli_agl_lock);
+
+ goto keep_it;
+ }
+do_it:
+ ll_statahead_one(parent, name, namelen);
+ }
+
+ pos = le64_to_cpu(dp->ldp_hash_end);
+ if (pos == MDS_DIR_END_OFF) {
+ /*
+ * End of directory reached.
+ */
+ ll_release_page(dir, page, false);
+ while (1) {
+ l_wait_event(thread->t_ctl_waitq,
+ !sa_received_empty(sai) ||
+ sai->sai_sent == sai->sai_replied ||
+ !thread_is_running(thread),
+ &lwi);
+
+ while (!sa_received_empty(sai))
+ ll_post_statahead(sai);
+
+ if (unlikely(!thread_is_running(thread)))
+ GOTO(out, rc = 0);
+
+ if (sai->sai_sent == sai->sai_replied &&
+ sa_received_empty(sai))
+ break;
+ }
+
+ spin_lock(&plli->lli_agl_lock);
+ while (!agl_list_empty(sai) &&
+ thread_is_running(thread)) {
+ clli = agl_first_entry(sai);
+ list_del_init(&clli->lli_agl_list);
+ spin_unlock(&plli->lli_agl_lock);
+ ll_agl_trigger(&clli->lli_vfs_inode, sai);
+ spin_lock(&plli->lli_agl_lock);
+ }
+ spin_unlock(&plli->lli_agl_lock);
+
+ GOTO(out, rc = 0);
+ } else {
+ /*
+ * chain is exhausted.
+ * Normal case: continue to the next page.
+ */
+ ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
+ LDF_COLLIDE);
+ sai->sai_in_readpage = 1;
+ page = ll_get_dir_page(dir, op_data, pos, &chain);
+ sai->sai_in_readpage = 0;
+ }
+ }