/*
* sa_entry is not refcounted: statahead thread allocates it and do async stat,
- * and in async stat callback ll_statahead_interpret() will add it into
- * sai_interim_entries, later statahead thread will call sa_handle_callback() to
- * instantiate entry and move it into sai_entries, and then only scanner process
- * can access and free it.
+ * and in async stat callback ll_statahead_interpret() will prepare the inode
+ * and set lock data in the ptlrpcd context. Then the scanner process will be
+ * woken up if this entry is the waiting one, can access and free it.
*/
struct sa_entry {
- /* link into sai_interim_entries or sai_entries */
+ /* link into sai_entries */
struct list_head se_list;
/* link into sai hash table locally */
struct list_head se_hash;
se_state_t se_state;
/* entry size, contains name */
int se_size;
- /* pointer to async getattr enqueue info */
- struct md_enqueue_info *se_minfo;
- /* pointer to the async getattr request */
- struct ptlrpc_request *se_req;
/* pointer to the target inode */
struct inode *se_inode;
/* entry name */
return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
}
-/* got async stat replies */
-static inline int sa_has_callback(struct ll_statahead_info *sai)
-{
- return !list_empty(&sai->sai_interim_entries);
-}
-
static inline int agl_list_empty(struct ll_statahead_info *sai)
{
return list_empty(&sai->sai_agls);
}
/* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo)
+static void sa_fini_data(struct md_op_item *item)
{
- struct md_op_data *op_data = &minfo->mi_data;
+ struct md_op_data *op_data = &item->mop_data;
if (op_data->op_flags & MF_OPNAME_KMALLOCED)
/* allocated via ll_setup_filename called from sa_prep_data */
kfree(op_data->op_name);
- ll_unlock_md_op_lsm(&minfo->mi_data);
- iput(minfo->mi_dir);
- OBD_FREE_PTR(minfo);
+ ll_unlock_md_op_lsm(&item->mop_data);
+ iput(item->mop_dir);
+ OBD_FREE_PTR(item);
}
-static int ll_statahead_interpret(struct ptlrpc_request *req,
- struct md_enqueue_info *minfo, int rc);
+static int ll_statahead_interpret(struct req_capsule *pill,
+ struct md_op_item *item, int rc);
/*
* prepare arguments for async stat RPC.
*/
-static struct md_enqueue_info *
+static struct md_op_item *
sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
{
- struct md_enqueue_info *minfo;
+ struct md_op_item *item;
struct ldlm_enqueue_info *einfo;
- struct md_op_data *op_data;
+ struct md_op_data *op_data;
- OBD_ALLOC_PTR(minfo);
- if (!minfo)
+ OBD_ALLOC_PTR(item);
+ if (!item)
return ERR_PTR(-ENOMEM);
- op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
+ op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
entry->se_qstr.name, entry->se_qstr.len, 0,
LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data)) {
- OBD_FREE_PTR(minfo);
- return (struct md_enqueue_info *)op_data;
+ OBD_FREE_PTR(item);
+ return (struct md_op_item *)op_data;
}
if (!child)
op_data->op_fid2 = entry->se_fid;
- minfo->mi_it.it_op = IT_GETATTR;
- minfo->mi_dir = igrab(dir);
- minfo->mi_cb = ll_statahead_interpret;
- minfo->mi_cbdata = entry;
-
- einfo = &minfo->mi_einfo;
- einfo->ei_type = LDLM_IBITS;
- einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
- einfo->ei_cb_bl = ll_md_blocking_ast;
- einfo->ei_cb_cp = ldlm_completion_ast;
- einfo->ei_cb_gl = NULL;
+ item->mop_it.it_op = IT_GETATTR;
+ item->mop_dir = igrab(dir);
+ item->mop_cb = ll_statahead_interpret;
+ item->mop_cbdata = entry;
+
+ einfo = &item->mop_einfo;
+ einfo->ei_type = LDLM_IBITS;
+ einfo->ei_mode = it_to_lock_mode(&item->mop_it);
+ einfo->ei_cb_bl = ll_md_blocking_ast;
+ einfo->ei_cb_cp = ldlm_completion_ast;
+ einfo->ei_cb_gl = NULL;
einfo->ei_cbdata = NULL;
einfo->ei_req_slot = 1;
- return minfo;
+ return item;
}
/*
sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
{
struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
- struct md_enqueue_info *minfo = entry->se_minfo;
- struct ptlrpc_request *req = entry->se_req;
bool wakeup;
- /* release resources used in RPC */
- if (minfo) {
- entry->se_minfo = NULL;
- ll_intent_release(&minfo->mi_it);
- sa_fini_data(minfo);
- }
-
- if (req) {
- entry->se_req = NULL;
- ptlrpc_req_finished(req);
- }
-
spin_lock(&lli->lli_sa_lock);
wakeup = __sa_make_ready(sai, entry, ret);
spin_unlock(&lli->lli_sa_lock);
sai->sai_index = 1;
init_waitqueue_head(&sai->sai_waitq);
- INIT_LIST_HEAD(&sai->sai_interim_entries);
INIT_LIST_HEAD(&sai->sai_entries);
INIT_LIST_HEAD(&sai->sai_agls);
LASSERT(!sai->sai_task);
LASSERT(!sai->sai_agl_task);
LASSERT(sai->sai_sent == sai->sai_replied);
- LASSERT(!sa_has_callback(sai));
list_for_each_entry_safe(entry, next, &sai->sai_entries,
se_list)
EXIT;
}
-/*
- * prepare inode for sa entry, add it into agl list, now sa_entry is ready
- * to be used by scanner process.
- */
-static void sa_instantiate(struct ll_statahead_info *sai,
- struct sa_entry *entry)
+static int ll_statahead_interpret_common(struct inode *dir,
+ struct ll_statahead_info *sai,
+ struct req_capsule *pill,
+ struct lookup_intent *it,
+ struct sa_entry *entry,
+ struct mdt_body *body)
{
- struct inode *dir = sai->sai_dentry->d_inode;
struct inode *child;
- struct md_enqueue_info *minfo;
- struct lookup_intent *it;
- struct ptlrpc_request *req;
- struct mdt_body *body;
- int rc = 0;
+ int rc;
ENTRY;
- LASSERT(entry->se_handle != 0);
-
- minfo = entry->se_minfo;
- it = &minfo->mi_it;
- req = entry->se_req;
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- if (!body)
- GOTO(out, rc = -EFAULT);
-
child = entry->se_inode;
- /* revalidate; unlinked and re-created with the same name */
- if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
- if (child) {
- entry->se_inode = NULL;
- iput(child);
- }
- /* The mdt_body is invalid. Skip this entry */
- GOTO(out, rc = -EAGAIN);
- }
-
- it->it_lock_handle = entry->se_handle;
- rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
- if (rc != 1)
- GOTO(out, rc = -EAGAIN);
-
- rc = ll_prep_inode(&child, &req->rq_pill, dir->i_sb, it);
+ rc = ll_prep_inode(&child, pill, dir->i_sb, it);
if (rc)
GOTO(out, rc);
* inode now to save an extra getxattr.
*/
if (body->mbo_valid & OBD_MD_ENCCTX) {
- void *encctx = req_capsule_server_get(&req->rq_pill,
- &RMF_FILE_ENCCTX);
- __u32 encctxlen = req_capsule_get_size(&req->rq_pill,
- &RMF_FILE_ENCCTX,
+ void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
+ __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
RCL_SERVER);
if (encctxlen) {
if (agl_should_run(sai, child))
ll_agl_add(sai, child, entry->se_index);
- EXIT;
-
out:
+ RETURN(rc);
+}
+
+static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
+ struct ll_statahead_info *sai,
+ struct md_op_item *item,
+ struct sa_entry *entry,
+ struct ptlrpc_request *req,
+ int rc)
+{
/*
- * sa_make_ready() will drop ldlm ibits lock refcount by calling
+ * First it will drop ldlm ibits lock refcount by calling
* ll_intent_drop_lock() in spite of failures. Do not worry about
* calling ll_intent_drop_lock() more than once.
*/
+ ll_intent_release(&item->mop_it);
+ sa_fini_data(item);
+ if (req)
+ ptlrpc_req_finished(req);
sa_make_ready(sai, entry, rc);
+
+ spin_lock(&lli->lli_sa_lock);
+ sai->sai_replied++;
+ spin_unlock(&lli->lli_sa_lock);
}
-/* once there are async stat replies, instantiate sa_entry from replies */
-static void sa_handle_callback(struct ll_statahead_info *sai)
+static void ll_statahead_interpret_work(struct work_struct *data)
{
- struct ll_inode_info *lli;
+ struct ll_interpret_work *work = container_of(data,
+ struct ll_interpret_work, lpw_work);
+ struct md_op_item *item = work->lpw_item;
+ struct req_capsule *pill = work->lpw_pill;
+ struct inode *dir = item->mop_dir;
+ struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_statahead_info *sai = lli->lli_sai;
+ struct lookup_intent *it;
+ struct sa_entry *entry;
+ struct mdt_body *body;
+ struct inode *child;
+ int rc;
- lli = ll_i2info(sai->sai_dentry->d_inode);
+ ENTRY;
- spin_lock(&lli->lli_sa_lock);
- while (sa_has_callback(sai)) {
- struct sa_entry *entry;
+ entry = (struct sa_entry *)item->mop_cbdata;
+ LASSERT(entry->se_handle != 0);
- entry = list_entry(sai->sai_interim_entries.next,
- struct sa_entry, se_list);
- list_del_init(&entry->se_list);
- spin_unlock(&lli->lli_sa_lock);
+ it = &item->mop_it;
+ body = req_capsule_server_get(pill, &RMF_MDT_BODY);
+ if (!body)
+ GOTO(out, rc = -EFAULT);
- sa_instantiate(sai, entry);
- spin_lock(&lli->lli_sa_lock);
+ child = entry->se_inode;
+ /* revalidate; unlinked and re-created with the same name */
+ if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+ if (child) {
+ entry->se_inode = NULL;
+ iput(child);
+ }
+ /* The mdt_body is invalid. Skip this entry */
+ GOTO(out, rc = -EAGAIN);
}
- spin_unlock(&lli->lli_sa_lock);
+
+ it->it_lock_handle = entry->se_handle;
+ rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
+ if (rc != 1)
+ GOTO(out, rc = -EAGAIN);
+
+ LASSERT(it->it_extra_rpc_check == 0);
+ rc = ll_statahead_interpret_common(dir, sai, pill, it, entry, body);
+out:
+ ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
+ OBD_FREE_PTR(work);
}
/*
- * callback for async stat RPC, because this is called in ptlrpcd context, we
- * only put sa_entry in sai_interim_entries, and wake up statahead thread to
- * really prepare inode and instantiate sa_entry later.
+ * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
+ * the inode and set lock data directly in the ptlrpcd context. It will wake up
+ * the directory listing process if the dentry is the waiting one.
*/
-static int ll_statahead_interpret(struct ptlrpc_request *req,
- struct md_enqueue_info *minfo, int rc)
+static int ll_statahead_interpret(struct req_capsule *pill,
+ struct md_op_item *item, int rc)
{
- struct lookup_intent *it = &minfo->mi_it;
- struct inode *dir = minfo->mi_dir;
+ struct lookup_intent *it = &item->mop_it;
+ struct inode *dir = item->mop_dir;
struct ll_inode_info *lli = ll_i2info(dir);
struct ll_statahead_info *sai = lli->lli_sai;
- struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
+ struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
+ struct mdt_body *body;
+ struct inode *child;
__u64 handle = 0;
ENTRY;
CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
entry->se_qstr.len, entry->se_qstr.name, rc);
- if (rc != 0) {
- ll_intent_release(it);
- sa_fini_data(minfo);
- } else {
+ if (rc != 0)
+ GOTO(out, rc);
+
+ body = req_capsule_server_get(pill, &RMF_MDT_BODY);
+ if (!body)
+ GOTO(out, rc = -EFAULT);
+
+ child = entry->se_inode;
+ /* revalidate; unlinked and re-created with the same name */
+ if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+ if (child) {
+ entry->se_inode = NULL;
+ iput(child);
+ }
+ /* The mdt_body is invalid. Skip this entry */
+ GOTO(out, rc = -EAGAIN);
+ }
+
+ entry->se_handle = it->it_lock_handle;
+ /*
+ * In ptlrpcd context, it is not allowed to generate new RPCs
+ * especially for striped directories.
+ */
+ it->it_extra_rpc_check = 1;
+ rc = ll_statahead_interpret_common(dir, sai, pill, it, entry, body);
+ if (rc == -EAGAIN && it->it_extra_rpc_need) {
+ struct ll_interpret_work *work;
+
/*
* release ibits lock ASAP to avoid deadlock when statahead
* thread enqueues lock on parent in readdir and another
*/
handle = it->it_lock_handle;
ll_intent_drop_lock(it);
- ll_unlock_md_op_lsm(&minfo->mi_data);
- }
+ ll_unlock_md_op_lsm(&item->mop_data);
+ it->it_extra_rpc_check = 0;
+ it->it_extra_rpc_need = 0;
- spin_lock(&lli->lli_sa_lock);
- if (rc != 0) {
- if (__sa_make_ready(sai, entry, rc))
- wake_up(&sai->sai_waitq);
- } else {
- int first = 0;
-
- entry->se_minfo = minfo;
- entry->se_req = ptlrpc_request_addref(req);
/*
- * Release the async ibits lock ASAP to avoid deadlock
- * when statahead thread tries to enqueue lock on parent
- * for readpage and other tries to enqueue lock on child
- * with parent's lock held, for example: unlink.
+ * If the stat-ahead entry is a striped directory, there are two
+ * solutions:
+ * 1. It can drop the result, let the scanning process do stat()
+ * on the striped directory in synchronous way. By this way, it
+ * can avoid to generate new RPCs to obtain the attributes for
+ * slaves of the striped directory in the ptlrpcd context as it
+ * is dangerous of blocking in ptlrpcd thread.
+ * 2. Use work queue or the separate statahead thread to handle
+ * the extra RPCs (@ll_prep_inode->@lmv_revalidate_slaves).
+ * Here we adopt the second solution.
*/
- entry->se_handle = handle;
- if (!sa_has_callback(sai))
- first = 1;
-
- list_add_tail(&entry->se_list, &sai->sai_interim_entries);
- if (first && sai->sai_task)
- wake_up_process(sai->sai_task);
+ OBD_ALLOC_GFP(work, sizeof(*work), GFP_ATOMIC);
+ if (work == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ INIT_WORK(&work->lpw_work, ll_statahead_interpret_work);
+ work->lpw_item = item;
+ work->lpw_pill = pill;
+ ptlrpc_request_addref(pill->rc_req);
+ schedule_work(&work->lpw_work);
+ RETURN(0);
}
- sai->sai_replied++;
-
- spin_unlock(&lli->lli_sa_lock);
+out:
+ ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
RETURN(rc);
}
/* async stat for file not found in dcache */
static int sa_lookup(struct inode *dir, struct sa_entry *entry)
{
- struct md_enqueue_info *minfo;
- int rc;
+ struct md_op_item *item;
+ int rc;
ENTRY;
- minfo = sa_prep_data(dir, NULL, entry);
- if (IS_ERR(minfo))
- RETURN(PTR_ERR(minfo));
+ item = sa_prep_data(dir, NULL, entry);
+ if (IS_ERR(item))
+ RETURN(PTR_ERR(item));
- rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+ rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
if (rc < 0)
- sa_fini_data(minfo);
+ sa_fini_data(item);
RETURN(rc);
}
struct inode *inode = dentry->d_inode;
struct lookup_intent it = { .it_op = IT_GETATTR,
.it_lock_handle = 0 };
- struct md_enqueue_info *minfo;
+ struct md_op_item *item;
int rc;
ENTRY;
if (d_mountpoint(dentry))
RETURN(1);
- minfo = sa_prep_data(dir, inode, entry);
- if (IS_ERR(minfo))
- RETURN(PTR_ERR(minfo));
+ item = sa_prep_data(dir, inode, entry);
+ if (IS_ERR(item))
+ RETURN(PTR_ERR(item));
entry->se_inode = igrab(inode);
rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
if (rc == 1) {
entry->se_handle = it.it_lock_handle;
ll_intent_release(&it);
- sa_fini_data(minfo);
+ sa_fini_data(item);
RETURN(1);
}
- rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+ rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
if (rc < 0) {
entry->se_inode = NULL;
iput(inode);
- sa_fini_data(minfo);
+ sa_fini_data(item);
}
RETURN(rc);
break;
}
- sai->sai_in_readpage = 1;
page = ll_get_dir_page(dir, op_data, pos, NULL);
ll_unlock_md_op_lsm(op_data);
- sai->sai_in_readpage = 0;
if (IS_ERR(page)) {
rc = PTR_ERR(page);
CDEBUG(D_READA,
while (({set_current_state(TASK_IDLE);
sai->sai_task; })) {
- if (sa_has_callback(sai)) {
- __set_current_state(TASK_RUNNING);
- sa_handle_callback(sai);
- }
-
spin_lock(&lli->lli_agl_lock);
while (sa_sent_full(sai) &&
!agl_list_empty(sai)) {
/*
* statahead is finished, but statahead entries need to be cached, wait
- * for file release to stop me.
+ * for file release closedir() call to stop me.
*/
while (({set_current_state(TASK_IDLE);
sai->sai_task; })) {
- if (sa_has_callback(sai)) {
- __set_current_state(TASK_RUNNING);
- sa_handle_callback(sai);
- } else {
- schedule();
- }
+ schedule();
}
__set_current_state(TASK_RUNNING);
/* in case we're not woken up, timeout wait */
msleep(125);
- /* release resources held by statahead RPCs */
- sa_handle_callback(sai);
-
CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
sbi->ll_fsname, sai, parent);
if (!entry)
GOTO(out, rc = -EAGAIN);
- /* if statahead is busy in readdir, help it do post-work */
- if (!sa_ready(entry) && sai->sai_in_readpage)
- sa_handle_callback(sai);
-
if (!sa_ready(entry)) {
spin_lock(&lli->lli_sa_lock);
sai->sai_index_wait = entry->se_index;