Whamcloud - gitweb
LU-14139 llite: simplify callback handling for async getattr 48/45648/14
authorQian Yingjin <qian@ddn.com>
Thu, 19 Nov 2020 15:15:37 +0000 (23:15 +0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 19 Aug 2022 04:32:24 +0000 (04:32 +0000)
In this patch, it prepares the inode and set lock data directly in
the callback interpret of the intent async getattr RPC request (in
ptlrpcd context), simplifies the old impementation that defer this
work in the statahead thread.

If the statahead entry is a striped directory, it may generate
new RPCs in the ptlrpcd interpret context to obtain the
attributes for slaves of the striped directory:
@ll_prep_inode()->@lmv_revaildate_slaves()
This is dangerous and may result in deadlock in ptlrpcd interpret
context, thus we use work queue to handle these extra RPCs.
Add sanity 123d to verify that it works correctly.

According to the benchmark result, the workload "ls -l" to a large
directory on a client without any caching (server and client),
containing 1M files (47001 bytes) shows the results with measured
elapsed time:
- w/o patch: 180 seconds;
- w patch: 181 seconds;

There is no any obvious performance regession.

Test-Parameters: testlist=racer,racer,racer
Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I43aba0f609243f34f7e7b674c7fff5fa417b1c02
Reviewed-on: https://review.whamcloud.com/45648
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_intent.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/statahead.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/tests/sanity.sh

index 5f3a717..09aa504 100644 (file)
@@ -47,6 +47,8 @@ struct lookup_intent {
        __u64                    it_remote_lock_handle;
        struct ptlrpc_request   *it_request;
        unsigned int             it_lock_set:1;
+       unsigned int             it_extra_rpc_check:1;
+       unsigned int             it_extra_rpc_need:1;
 };
 
 static inline int it_disposition(const struct lookup_intent *it, int flag)
index 65cf50b..c9d1518 100644 (file)
@@ -966,20 +966,19 @@ struct md_readdir_info {
        int mr_partial_readdir_rc;
 };
 
-struct md_enqueue_info;
-/* metadata stat-ahead */
-typedef int (* md_enqueue_cb_t)(struct ptlrpc_request *req,
-                                struct md_enqueue_info *minfo,
-                                int rc);
-
-struct md_enqueue_info {
-       struct md_op_data               mi_data;
-       struct lookup_intent            mi_it;
-       struct lustre_handle            mi_lockh;
-       struct inode                   *mi_dir;
-       struct ldlm_enqueue_info        mi_einfo;
-       md_enqueue_cb_t                 mi_cb;
-       void                           *mi_cbdata;
+struct md_op_item;
+typedef int (*md_op_item_cb_t)(struct req_capsule *pill,
+                              struct md_op_item *item,
+                              int rc);
+
+struct md_op_item {
+       struct md_op_data                mop_data;
+       struct lookup_intent             mop_it;
+       struct lustre_handle             mop_lockh;
+       struct ldlm_enqueue_info         mop_einfo;
+       md_op_item_cb_t                  mop_cb;
+       void                            *mop_cbdata;
+       struct inode                    *mop_dir;
 };
 
 struct obd_ops {
@@ -1178,7 +1177,7 @@ struct md_ops {
                          u64, const char *, size_t, struct ptlrpc_request **);
 
        int (*m_intent_getattr_async)(struct obd_export *,
-                                     struct md_enqueue_info *);
+                                     struct md_op_item *);
 
         int (*m_revalidate_lock)(struct obd_export *, struct lookup_intent *,
                                  struct lu_fid *, __u64 *bits);
index c1a9172..30782c4 100644 (file)
@@ -1732,7 +1732,7 @@ static inline int md_init_ea_size(struct obd_export *exp, __u32 ea_size,
 }
 
 static inline int md_intent_getattr_async(struct obd_export *exp,
-                                         struct md_enqueue_info *minfo)
+                                         struct md_op_item *item)
 {
        int rc;
 
@@ -1743,7 +1743,7 @@ static inline int md_intent_getattr_async(struct obd_export *exp,
        lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
                             LPROC_MD_INTENT_GETATTR_ASYNC);
 
-       return MDP(exp->exp_obd, intent_getattr_async)(exp, minfo);
+       return MDP(exp->exp_obd, intent_getattr_async)(exp, item);
 }
 
 static inline int md_revalidate_lock(struct obd_export *exp,
index b3188d0..c585a68 100644 (file)
@@ -1526,15 +1526,11 @@ struct ll_statahead_info {
                                                 * is not a hidden one */
        unsigned int            sai_skip_hidden;/* skipped hidden dentry count
                                                 */
-       unsigned int            sai_ls_all:1,   /* "ls -al", do stat-ahead for
+       unsigned int            sai_ls_all:1;   /* "ls -al", do stat-ahead for
                                                 * hidden entries */
-                               sai_in_readpage:1;/* statahead is in readdir()*/
        wait_queue_head_t       sai_waitq;      /* stat-ahead wait queue */
        struct task_struct      *sai_task;      /* stat-ahead thread */
        struct task_struct      *sai_agl_task;  /* AGL thread */
-       struct list_head        sai_interim_entries; /* entries which got async
-                                                     * stat reply, but not
-                                                     * instantiated */
        struct list_head        sai_entries;    /* completed entries */
        struct list_head        sai_agls;       /* AGLs to be sent */
        struct list_head        sai_cache[LL_SA_CACHE_SIZE];
@@ -1542,6 +1538,12 @@ struct ll_statahead_info {
        atomic_t                sai_cache_count; /* entry count in cache */
 };
 
+struct ll_interpret_work {
+       struct work_struct       lpw_work;
+       struct md_op_item       *lpw_item;
+       struct req_capsule      *lpw_pill;
+};
+
 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentry,
                            bool unplug);
 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl);
index 8aec4f8..03e9e39 100644 (file)
@@ -3213,6 +3213,13 @@ int ll_prep_inode(struct inode **inode, struct req_capsule *pill,
        if (rc != 0)
                GOTO(out, rc);
 
+       if (S_ISDIR(md.body->mbo_mode) && md.lmv && lmv_dir_striped(md.lmv) &&
+           it && it->it_extra_rpc_check) {
+               /* TODO: Check @lsm unchanged via @lsm_md_eq. */
+               it->it_extra_rpc_need = 1;
+               GOTO(out, rc = -EAGAIN);
+       }
+
        /*
         * clear default_lmv only if intent_getattr reply doesn't contain it.
         * but it needs to be done after iget, check this early because
index b09a798..ad31e6e 100644 (file)
@@ -54,13 +54,12 @@ typedef enum {
 
 /*
  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
- * and in async stat callback ll_statahead_interpret() will add it into
- * sai_interim_entries, later statahead thread will call sa_handle_callback() to
- * instantiate entry and move it into sai_entries, and then only scanner process
- * can access and free it.
+ * and in async stat callback ll_statahead_interpret() will prepare the inode
+ * and set lock data in the ptlrpcd context. Then the scanner process will be
+ * woken up if this entry is the waiting one, can access and free it.
  */
 struct sa_entry {
-       /* link into sai_interim_entries or sai_entries */
+       /* link into sai_entries */
        struct list_head        se_list;
        /* link into sai hash table locally */
        struct list_head        se_hash;
@@ -72,10 +71,6 @@ struct sa_entry {
        se_state_t              se_state;
        /* entry size, contains name */
        int                     se_size;
-       /* pointer to async getattr enqueue info */
-       struct md_enqueue_info *se_minfo;
-       /* pointer to the async getattr request */
-       struct ptlrpc_request  *se_req;
        /* pointer to the target inode */
        struct inode           *se_inode;
        /* entry name */
@@ -147,12 +142,6 @@ static inline int sa_sent_full(struct ll_statahead_info *sai)
        return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
 }
 
-/* got async stat replies */
-static inline int sa_has_callback(struct ll_statahead_info *sai)
-{
-       return !list_empty(&sai->sai_interim_entries);
-}
-
 static inline int agl_list_empty(struct ll_statahead_info *sai)
 {
        return list_empty(&sai->sai_agls);
@@ -341,61 +330,61 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 }
 
 /* finish async stat RPC arguments */
-static void sa_fini_data(struct md_enqueue_info *minfo)
+static void sa_fini_data(struct md_op_item *item)
 {
-       struct md_op_data *op_data = &minfo->mi_data;
+       struct md_op_data *op_data = &item->mop_data;
 
        if (op_data->op_flags & MF_OPNAME_KMALLOCED)
                /* allocated via ll_setup_filename called from sa_prep_data */
                kfree(op_data->op_name);
-       ll_unlock_md_op_lsm(&minfo->mi_data);
-       iput(minfo->mi_dir);
-       OBD_FREE_PTR(minfo);
+       ll_unlock_md_op_lsm(&item->mop_data);
+       iput(item->mop_dir);
+       OBD_FREE_PTR(item);
 }
 
-static int ll_statahead_interpret(struct ptlrpc_request *req,
-                                 struct md_enqueue_info *minfo, int rc);
+static int ll_statahead_interpret(struct req_capsule *pill,
+                                 struct md_op_item *item, int rc);
 
 /*
  * prepare arguments for async stat RPC.
  */
-static struct md_enqueue_info *
+static struct md_op_item *
 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
 {
-       struct md_enqueue_info   *minfo;
+       struct md_op_item *item;
        struct ldlm_enqueue_info *einfo;
-       struct md_op_data        *op_data;
+       struct md_op_data *op_data;
 
-       OBD_ALLOC_PTR(minfo);
-       if (!minfo)
+       OBD_ALLOC_PTR(item);
+       if (!item)
                return ERR_PTR(-ENOMEM);
 
-       op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
+       op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
                                     entry->se_qstr.name, entry->se_qstr.len, 0,
                                     LUSTRE_OPC_ANY, NULL);
        if (IS_ERR(op_data)) {
-               OBD_FREE_PTR(minfo);
-               return (struct md_enqueue_info *)op_data;
+               OBD_FREE_PTR(item);
+               return (struct md_op_item *)op_data;
        }
 
        if (!child)
                op_data->op_fid2 = entry->se_fid;
 
-       minfo->mi_it.it_op = IT_GETATTR;
-       minfo->mi_dir = igrab(dir);
-       minfo->mi_cb = ll_statahead_interpret;
-       minfo->mi_cbdata = entry;
-
-       einfo = &minfo->mi_einfo;
-       einfo->ei_type   = LDLM_IBITS;
-       einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
-       einfo->ei_cb_bl  = ll_md_blocking_ast;
-       einfo->ei_cb_cp  = ldlm_completion_ast;
-       einfo->ei_cb_gl  = NULL;
+       item->mop_it.it_op = IT_GETATTR;
+       item->mop_dir = igrab(dir);
+       item->mop_cb = ll_statahead_interpret;
+       item->mop_cbdata = entry;
+
+       einfo = &item->mop_einfo;
+       einfo->ei_type = LDLM_IBITS;
+       einfo->ei_mode = it_to_lock_mode(&item->mop_it);
+       einfo->ei_cb_bl = ll_md_blocking_ast;
+       einfo->ei_cb_cp = ldlm_completion_ast;
+       einfo->ei_cb_gl = NULL;
        einfo->ei_cbdata = NULL;
        einfo->ei_req_slot = 1;
 
-       return minfo;
+       return item;
 }
 
 /*
@@ -406,22 +395,8 @@ static void
 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
 {
        struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
-       struct md_enqueue_info *minfo = entry->se_minfo;
-       struct ptlrpc_request *req = entry->se_req;
        bool wakeup;
 
-       /* release resources used in RPC */
-       if (minfo) {
-               entry->se_minfo = NULL;
-               ll_intent_release(&minfo->mi_it);
-               sa_fini_data(minfo);
-       }
-
-       if (req) {
-               entry->se_req = NULL;
-               ptlrpc_req_finished(req);
-       }
-
        spin_lock(&lli->lli_sa_lock);
        wakeup = __sa_make_ready(sai, entry, ret);
        spin_unlock(&lli->lli_sa_lock);
@@ -478,7 +453,6 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
        sai->sai_index = 1;
        init_waitqueue_head(&sai->sai_waitq);
 
-       INIT_LIST_HEAD(&sai->sai_interim_entries);
        INIT_LIST_HEAD(&sai->sai_entries);
        INIT_LIST_HEAD(&sai->sai_agls);
 
@@ -541,7 +515,6 @@ static void ll_sai_put(struct ll_statahead_info *sai)
                LASSERT(!sai->sai_task);
                LASSERT(!sai->sai_agl_task);
                LASSERT(sai->sai_sent == sai->sai_replied);
-               LASSERT(!sa_has_callback(sai));
 
                list_for_each_entry_safe(entry, next, &sai->sai_entries,
                                         se_list)
@@ -636,49 +609,20 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
        EXIT;
 }
 
-/*
- * prepare inode for sa entry, add it into agl list, now sa_entry is ready
- * to be used by scanner process.
- */
-static void sa_instantiate(struct ll_statahead_info *sai,
-                          struct sa_entry *entry)
+static int ll_statahead_interpret_common(struct inode *dir,
+                                        struct ll_statahead_info *sai,
+                                        struct req_capsule *pill,
+                                        struct lookup_intent *it,
+                                        struct sa_entry *entry,
+                                        struct mdt_body *body)
 {
-       struct inode *dir = sai->sai_dentry->d_inode;
        struct inode *child;
-       struct md_enqueue_info *minfo;
-       struct lookup_intent *it;
-       struct ptlrpc_request *req;
-       struct mdt_body *body;
-       int rc = 0;
+       int rc;
 
        ENTRY;
 
-       LASSERT(entry->se_handle != 0);
-
-       minfo = entry->se_minfo;
-       it = &minfo->mi_it;
-       req = entry->se_req;
-       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-       if (!body)
-               GOTO(out, rc = -EFAULT);
-
        child = entry->se_inode;
-       /* revalidate; unlinked and re-created with the same name */
-       if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
-               if (child) {
-                       entry->se_inode = NULL;
-                       iput(child);
-               }
-               /* The mdt_body is invalid. Skip this entry */
-               GOTO(out, rc = -EAGAIN);
-       }
-
-       it->it_lock_handle = entry->se_handle;
-       rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
-       if (rc != 1)
-               GOTO(out, rc = -EAGAIN);
-
-       rc = ll_prep_inode(&child, &req->rq_pill, dir->i_sb, it);
+       rc = ll_prep_inode(&child, pill, dir->i_sb, it);
        if (rc)
                GOTO(out, rc);
 
@@ -686,10 +630,8 @@ static void sa_instantiate(struct ll_statahead_info *sai,
         * inode now to save an extra getxattr.
         */
        if (body->mbo_valid & OBD_MD_ENCCTX) {
-               void *encctx = req_capsule_server_get(&req->rq_pill,
-                                                     &RMF_FILE_ENCCTX);
-               __u32 encctxlen = req_capsule_get_size(&req->rq_pill,
-                                                      &RMF_FILE_ENCCTX,
+               void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
+               __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
                                                       RCL_SERVER);
 
                if (encctxlen) {
@@ -716,52 +658,96 @@ static void sa_instantiate(struct ll_statahead_info *sai,
        if (agl_should_run(sai, child))
                ll_agl_add(sai, child, entry->se_index);
 
-       EXIT;
-
 out:
+       RETURN(rc);
+}
+
+static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
+                                       struct ll_statahead_info *sai,
+                                       struct md_op_item *item,
+                                       struct sa_entry *entry,
+                                       struct ptlrpc_request *req,
+                                       int rc)
+{
        /*
-        * sa_make_ready() will drop ldlm ibits lock refcount by calling
+        * First it will drop ldlm ibits lock refcount by calling
         * ll_intent_drop_lock() in spite of failures. Do not worry about
         * calling ll_intent_drop_lock() more than once.
         */
+       ll_intent_release(&item->mop_it);
+       sa_fini_data(item);
+       if (req)
+               ptlrpc_req_finished(req);
        sa_make_ready(sai, entry, rc);
+
+       spin_lock(&lli->lli_sa_lock);
+       sai->sai_replied++;
+       spin_unlock(&lli->lli_sa_lock);
 }
 
-/* once there are async stat replies, instantiate sa_entry from replies */
-static void sa_handle_callback(struct ll_statahead_info *sai)
+static void ll_statahead_interpret_work(struct work_struct *data)
 {
-       struct ll_inode_info *lli;
+       struct ll_interpret_work *work = container_of(data,
+                                       struct ll_interpret_work, lpw_work);
+       struct md_op_item *item = work->lpw_item;
+       struct req_capsule *pill = work->lpw_pill;
+       struct inode *dir = item->mop_dir;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_statahead_info *sai = lli->lli_sai;
+       struct lookup_intent *it;
+       struct sa_entry *entry;
+       struct mdt_body *body;
+       struct inode *child;
+       int rc;
 
-       lli = ll_i2info(sai->sai_dentry->d_inode);
+       ENTRY;
 
-       spin_lock(&lli->lli_sa_lock);
-       while (sa_has_callback(sai)) {
-               struct sa_entry *entry;
+       entry = (struct sa_entry *)item->mop_cbdata;
+       LASSERT(entry->se_handle != 0);
 
-               entry = list_entry(sai->sai_interim_entries.next,
-                                  struct sa_entry, se_list);
-               list_del_init(&entry->se_list);
-               spin_unlock(&lli->lli_sa_lock);
+       it = &item->mop_it;
+       body = req_capsule_server_get(pill, &RMF_MDT_BODY);
+       if (!body)
+               GOTO(out, rc = -EFAULT);
 
-               sa_instantiate(sai, entry);
-               spin_lock(&lli->lli_sa_lock);
+       child = entry->se_inode;
+       /* revalidate; unlinked and re-created with the same name */
+       if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+               if (child) {
+                       entry->se_inode = NULL;
+                       iput(child);
+               }
+               /* The mdt_body is invalid. Skip this entry */
+               GOTO(out, rc = -EAGAIN);
        }
-       spin_unlock(&lli->lli_sa_lock);
+
+       it->it_lock_handle = entry->se_handle;
+       rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
+       if (rc != 1)
+               GOTO(out, rc = -EAGAIN);
+
+       LASSERT(it->it_extra_rpc_check == 0);
+       rc = ll_statahead_interpret_common(dir, sai, pill, it, entry, body);
+out:
+       ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
+       OBD_FREE_PTR(work);
 }
 
 /*
- * callback for async stat RPC, because this is called in ptlrpcd context, we
- * only put sa_entry in sai_interim_entries, and wake up statahead thread to
- * really prepare inode and instantiate sa_entry later.
+ * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
+ * the inode and set lock data directly in the ptlrpcd context. It will wake up
+ * the directory listing process if the dentry is the waiting one.
  */
-static int ll_statahead_interpret(struct ptlrpc_request *req,
-                                 struct md_enqueue_info *minfo, int rc)
+static int ll_statahead_interpret(struct req_capsule *pill,
+                                 struct md_op_item *item, int rc)
 {
-       struct lookup_intent *it = &minfo->mi_it;
-       struct inode *dir = minfo->mi_dir;
+       struct lookup_intent *it = &item->mop_it;
+       struct inode *dir = item->mop_dir;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = lli->lli_sai;
-       struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
+       struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
+       struct mdt_body *body;
+       struct inode *child;
        __u64 handle = 0;
 
        ENTRY;
@@ -779,10 +765,34 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
        CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
               entry->se_qstr.len, entry->se_qstr.name, rc);
 
-       if (rc != 0) {
-               ll_intent_release(it);
-               sa_fini_data(minfo);
-       } else {
+       if (rc != 0)
+               GOTO(out, rc);
+
+       body = req_capsule_server_get(pill, &RMF_MDT_BODY);
+       if (!body)
+               GOTO(out, rc = -EFAULT);
+
+       child = entry->se_inode;
+       /* revalidate; unlinked and re-created with the same name */
+       if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
+               if (child) {
+                       entry->se_inode = NULL;
+                       iput(child);
+               }
+               /* The mdt_body is invalid. Skip this entry */
+               GOTO(out, rc = -EAGAIN);
+       }
+
+       entry->se_handle = it->it_lock_handle;
+       /*
+        * In ptlrpcd context, it is not allowed to generate new RPCs
+        * especially for striped directories.
+        */
+       it->it_extra_rpc_check = 1;
+       rc = ll_statahead_interpret_common(dir, sai, pill, it, entry, body);
+       if (rc == -EAGAIN && it->it_extra_rpc_need) {
+               struct ll_interpret_work *work;
+
                /*
                 * release ibits lock ASAP to avoid deadlock when statahead
                 * thread enqueues lock on parent in readdir and another
@@ -791,54 +801,54 @@ static int ll_statahead_interpret(struct ptlrpc_request *req,
                 */
                handle = it->it_lock_handle;
                ll_intent_drop_lock(it);
-               ll_unlock_md_op_lsm(&minfo->mi_data);
-       }
+               ll_unlock_md_op_lsm(&item->mop_data);
+               it->it_extra_rpc_check = 0;
+               it->it_extra_rpc_need = 0;
 
-       spin_lock(&lli->lli_sa_lock);
-       if (rc != 0) {
-               if (__sa_make_ready(sai, entry, rc))
-                       wake_up(&sai->sai_waitq);
-       } else {
-               int first = 0;
-
-               entry->se_minfo = minfo;
-               entry->se_req = ptlrpc_request_addref(req);
                /*
-                * Release the async ibits lock ASAP to avoid deadlock
-                * when statahead thread tries to enqueue lock on parent
-                * for readpage and other tries to enqueue lock on child
-                * with parent's lock held, for example: unlink.
+                * If the stat-ahead entry is a striped directory, there are two
+                * solutions:
+                * 1. It can drop the result, let the scanning process do stat()
+                * on the striped directory in synchronous way. By this way, it
+                * can avoid to generate new RPCs to obtain the attributes for
+                * slaves of the striped directory in the ptlrpcd context as it
+                * is dangerous of blocking in ptlrpcd thread.
+                * 2. Use work queue or the separate statahead thread to handle
+                * the extra RPCs (@ll_prep_inode->@lmv_revalidate_slaves).
+                * Here we adopt the second solution.
                 */
-               entry->se_handle = handle;
-               if (!sa_has_callback(sai))
-                       first = 1;
-
-               list_add_tail(&entry->se_list, &sai->sai_interim_entries);
-               if (first && sai->sai_task)
-                       wake_up_process(sai->sai_task);
+               OBD_ALLOC_GFP(work, sizeof(*work), GFP_ATOMIC);
+               if (work == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               INIT_WORK(&work->lpw_work, ll_statahead_interpret_work);
+               work->lpw_item = item;
+               work->lpw_pill = pill;
+               ptlrpc_request_addref(pill->rc_req);
+               schedule_work(&work->lpw_work);
+               RETURN(0);
        }
-       sai->sai_replied++;
-
-       spin_unlock(&lli->lli_sa_lock);
 
+out:
+       ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
        RETURN(rc);
 }
 
 /* async stat for file not found in dcache */
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
-       struct md_enqueue_info   *minfo;
-       int                       rc;
+       struct md_op_item *item;
+       int rc;
 
        ENTRY;
 
-       minfo = sa_prep_data(dir, NULL, entry);
-       if (IS_ERR(minfo))
-               RETURN(PTR_ERR(minfo));
+       item = sa_prep_data(dir, NULL, entry);
+       if (IS_ERR(item))
+               RETURN(PTR_ERR(item));
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+       rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
        if (rc < 0)
-               sa_fini_data(minfo);
+               sa_fini_data(item);
 
        RETURN(rc);
 }
@@ -856,7 +866,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
        struct inode *inode = dentry->d_inode;
        struct lookup_intent it = { .it_op = IT_GETATTR,
                                    .it_lock_handle = 0 };
-       struct md_enqueue_info *minfo;
+       struct md_op_item *item;
        int rc;
 
        ENTRY;
@@ -867,9 +877,9 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
        if (d_mountpoint(dentry))
                RETURN(1);
 
-       minfo = sa_prep_data(dir, inode, entry);
-       if (IS_ERR(minfo))
-               RETURN(PTR_ERR(minfo));
+       item = sa_prep_data(dir, inode, entry);
+       if (IS_ERR(item))
+               RETURN(PTR_ERR(item));
 
        entry->se_inode = igrab(inode);
        rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
@@ -877,15 +887,15 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
        if (rc == 1) {
                entry->se_handle = it.it_lock_handle;
                ll_intent_release(&it);
-               sa_fini_data(minfo);
+               sa_fini_data(item);
                RETURN(1);
        }
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
+       rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
        if (rc < 0) {
                entry->se_inode = NULL;
                iput(inode);
-               sa_fini_data(minfo);
+               sa_fini_data(item);
        }
 
        RETURN(rc);
@@ -1066,10 +1076,8 @@ static int ll_statahead_thread(void *arg)
                        break;
                }
 
-               sai->sai_in_readpage = 1;
                page = ll_get_dir_page(dir, op_data, pos, NULL);
                ll_unlock_md_op_lsm(op_data);
-               sai->sai_in_readpage = 0;
                if (IS_ERR(page)) {
                        rc = PTR_ERR(page);
                        CDEBUG(D_READA,
@@ -1135,11 +1143,6 @@ static int ll_statahead_thread(void *arg)
 
                        while (({set_current_state(TASK_IDLE);
                                 sai->sai_task; })) {
-                               if (sa_has_callback(sai)) {
-                                       __set_current_state(TASK_RUNNING);
-                                       sa_handle_callback(sai);
-                               }
-
                                spin_lock(&lli->lli_agl_lock);
                                while (sa_sent_full(sai) &&
                                       !agl_list_empty(sai)) {
@@ -1216,16 +1219,11 @@ static int ll_statahead_thread(void *arg)
 
        /*
         * statahead is finished, but statahead entries need to be cached, wait
-        * for file release to stop me.
+        * for file release closedir() call to stop me.
         */
        while (({set_current_state(TASK_IDLE);
                 sai->sai_task; })) {
-               if (sa_has_callback(sai)) {
-                       __set_current_state(TASK_RUNNING);
-                       sa_handle_callback(sai);
-               } else {
-                       schedule();
-               }
+               schedule();
        }
        __set_current_state(TASK_RUNNING);
 
@@ -1241,9 +1239,6 @@ out:
                /* in case we're not woken up, timeout wait */
                msleep(125);
 
-       /* release resources held by statahead RPCs */
-       sa_handle_callback(sai);
-
        CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
               sbi->ll_fsname, sai, parent);
 
@@ -1526,10 +1521,6 @@ static int revalidate_statahead_dentry(struct inode *dir,
        if (!entry)
                GOTO(out, rc = -EAGAIN);
 
-       /* if statahead is busy in readdir, help it do post-work */
-       if (!sa_ready(entry) && sai->sai_in_readpage)
-               sa_handle_callback(sai);
-
        if (!sa_ready(entry)) {
                spin_lock(&lli->lli_sa_lock);
                sai->sai_index_wait = entry->se_index;
index da6e9e0..a65802d 100644 (file)
@@ -3691,9 +3691,9 @@ static int lmv_clear_open_replay_data(struct obd_export *exp,
 }
 
 static int lmv_intent_getattr_async(struct obd_export *exp,
-                                   struct md_enqueue_info *minfo)
+                                   struct md_op_item *item)
 {
-       struct md_op_data *op_data = &minfo->mi_data;
+       struct md_op_data *op_data = &item->mop_data;
        struct obd_device *obd = exp->exp_obd;
        struct lmv_obd *lmv = &obd->u.lmv;
        struct lmv_tgt_desc *ptgt;
@@ -3720,7 +3720,7 @@ static int lmv_intent_getattr_async(struct obd_export *exp,
        if (ctgt != ptgt)
                RETURN(-EREMOTE);
 
-       rc = md_intent_getattr_async(ptgt->ltd_exp, minfo);
+       rc = md_intent_getattr_async(ptgt->ltd_exp, item);
 
        RETURN(rc);
 }
index 20a81bf..b66825c 100644 (file)
@@ -129,8 +129,7 @@ int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
                         struct lu_fid *fid, __u64 *bits);
 
-int mdc_intent_getattr_async(struct obd_export *exp,
-                            struct md_enqueue_info *minfo);
+int mdc_intent_getattr_async(struct obd_export *exp, struct md_op_item *item);
 
 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
                              const struct lu_fid *fid, enum ldlm_type type,
index 8612405..4ad1de4 100644 (file)
@@ -47,8 +47,8 @@
 #include "mdc_internal.h"
 
 struct mdc_getattr_args {
-       struct obd_export               *ga_exp;
-       struct md_enqueue_info          *ga_minfo;
+       struct obd_export       *ga_exp;
+       struct md_op_item       *ga_item;
 };
 
 int it_open_error(int phase, struct lookup_intent *it)
@@ -1373,10 +1373,10 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
 {
        struct mdc_getattr_args *ga = args;
        struct obd_export *exp = ga->ga_exp;
-       struct md_enqueue_info *minfo = ga->ga_minfo;
-       struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
-       struct lookup_intent *it = &minfo->mi_it;
-       struct lustre_handle *lockh = &minfo->mi_lockh;
+       struct md_op_item *item = ga->ga_item;
+       struct ldlm_enqueue_info *einfo = &item->mop_einfo;
+       struct lookup_intent *it = &item->mop_it;
+       struct lustre_handle *lockh = &item->mop_lockh;
        struct ldlm_reply *lockrep;
        __u64 flags = LDLM_FL_HAS_INTENT;
 
@@ -1403,19 +1403,19 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
        if (rc)
                GOTO(out, rc);
 
-       rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
+       rc = mdc_finish_intent_lock(exp, req, &item->mop_data, it, lockh);
        EXIT;
 
 out:
-       minfo->mi_cb(req, minfo, rc);
+       item->mop_cb(&req->rq_pill, item, rc);
        return 0;
 }
 
 int mdc_intent_getattr_async(struct obd_export *exp,
-                            struct md_enqueue_info *minfo)
+                            struct md_op_item *item)
 {
-       struct md_op_data *op_data = &minfo->mi_data;
-       struct lookup_intent *it = &minfo->mi_it;
+       struct md_op_data *op_data = &item->mop_data;
+       struct lookup_intent *it = &item->mop_it;
        struct ptlrpc_request *req;
        struct mdc_getattr_args *ga;
        struct ldlm_res_id res_id;
@@ -1445,11 +1445,11 @@ int mdc_intent_getattr_async(struct obd_export *exp,
         * to avoid possible races. It is safe to have glimpse handler
         * for non-DOM locks and costs nothing.
         */
-       if (minfo->mi_einfo.ei_cb_gl == NULL)
-               minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
+       if (item->mop_einfo.ei_cb_gl == NULL)
+               item->mop_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
 
-       rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
-                             &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
+       rc = ldlm_cli_enqueue(exp, &req, &item->mop_einfo, &res_id, &policy,
+                             &flags, NULL, 0, LVB_T_NONE, &item->mop_lockh, 1);
        if (rc < 0) {
                ptlrpc_req_finished(req);
                RETURN(rc);
@@ -1457,7 +1457,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
 
        ga = ptlrpc_req_async_args(ga, req);
        ga->ga_exp = exp;
-       ga->ga_minfo = minfo;
+       ga->ga_item = item;
 
        req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
        ptlrpcd_add_req(req);
index 2338abc..4a6cbe4 100755 (executable)
@@ -13177,6 +13177,28 @@ test_123c() {
 }
 run_test 123c "Can not initialize inode warning on DNE statahead"
 
+test_123d() {
+       local num=100
+       local swrong
+       local ewrong
+
+       test_mkdir -c -1 $DIR/$tdir || error "test_mkdir $DIR/$tdir failed"
+       $LFS setdirstripe -D -c $MDSCOUNT $DIR/$tdir ||
+               error "setdirstripe $DIR/$tdir failed"
+       createmany -d $DIR/$tdir/$tfile $num || error "createmany $num failed"
+       remount_client $MOUNT
+       $LCTL get_param llite.*.statahead_max
+       swrong=$(lctl get_param -n llite.*.statahead_stats |
+               grep "statahead wrong:" | awk '{print $3}')
+       ls -l $DIR/$tdir || error "ls -l $DIR/$tdir failed"
+       $LCTL get_param -n llite.*.statahead_stats
+       ewrong=$(lctl get_param -n llite.*.statahead_stats |
+               grep "statahead wrong:" | awk '{print $3}')
+       [[ $swrong -eq $ewrong ]] ||
+               log "statahead was stopped, maybe too many locks held!"
+}
+run_test 123d "Statahead on striped directories works correctly"
+
 test_124a() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        $LCTL get_param -n mdc.*.connect_flags | grep -q lru_resize ||