Whamcloud - gitweb
LU-14139 statahead: batched statahead processing 20/40720/37
authorQian Yingjin <qian@ddn.com>
Fri, 20 Nov 2020 10:38:02 +0000 (18:38 +0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 11 Apr 2023 20:04:38 +0000 (20:04 +0000)
Batched metadata processing can get a big performance boost.
In this patch, it implements a batched statahead mechanism which
can also increase the performance for a directory traverse or
listing such as the command 'ls'.

For the batched statahead, one batch getattr() RPC equals to
'N' normal lookup/getattr RPCs. It can pack a number of dentry
name getting from the readdir() call and prepared lock handles
one client side lock namespace into one large batched RPC
transfering via bulk I/O to obtain ibits DLM locks and
associated attributes for a lot of files in one blow.
When MDS receives a batched getattr() RPC, it executes the sub
requests in it one by one serially.

A tunable parameter named "statahead_batch_max" is defined, it
means the maximal items can be batched and processed within one
aggregate RPC. Once the number of sub requests exceeds this
predefined limit, it will pack and trigger the batched RPC.
The batched RPC will also be triggered explictly when the
readdir() call comes to the end position of the directory or
the statahead thread exits abnormally.

Batched metadata processing can get a big performance boost.
The mdtest performance results without/with this patch series are
as follow:
mdtest-easy-stat      720.562369 kIOPS : time 118.695 seconds
mdtest-easy-stat     1218.290192 kIOPS : time 70.656 seconds

In this patch, we set statahead_batch_max=0 and disabled batched
statahead by default. It will enable accordingly once some
subsequent fixes about batched RPC have been merged.

Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I5a80c2c377093dc8b8e21341f440e3038f017ca8
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/40720
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
21 files changed:
lustre/include/lustre_dlm.h
lustre/include/lustre_req_layout.h
lustre/include/obd.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/statahead.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_batch.c
lustre/mdc/mdc_dev.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mdt/mdt_batch.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lib.c
lustre/osc/osc_request.c
lustre/ptlrpc/layout.c
lustre/quota/qsd_request.c
lustre/target/tgt_handler.c

index f58d582..3029ce5 100644 (file)
@@ -1428,6 +1428,8 @@ struct ldlm_callback_suite {
  */
 int ldlm_server_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                             void *data, int flag);
+int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                    void *data, int flag);
 int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data);
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data);
 int ldlm_glimpse_locks(struct ldlm_resource *res,
@@ -1439,9 +1441,9 @@ int ldlm_glimpse_locks(struct ldlm_resource *res,
  * MDT or OST to pass through LDLM requests to LDLM for handling
  * @{
  */
-int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
-                        const struct ldlm_request *dlm_req,
-                        const struct ldlm_callback_suite *cbs);
+int ldlm_handle_enqueue(struct ldlm_namespace *ns, struct req_capsule *pill,
+                       const struct ldlm_request *dlm_req,
+                       const struct ldlm_callback_suite *cbs);
 int ldlm_handle_convert0(struct ptlrpc_request *req,
                         const struct ldlm_request *dlm_req);
 int ldlm_handle_cancel(struct ptlrpc_request *req);
@@ -1732,10 +1734,10 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                      struct list_head *cancels, int count);
 
 struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len);
-int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
-                        const struct ldlm_request *dlm_req,
-                        const struct ldlm_callback_suite *cbs);
-int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
+int ldlm_handle_enqueue(struct ldlm_namespace *ns, struct req_capsule *pill,
+                       const struct ldlm_request *dlm_req,
+                       const struct ldlm_callback_suite *cbs);
+int ldlm_cli_enqueue_fini(struct obd_export *exp, struct req_capsule *pill,
                          struct ldlm_enqueue_info *einfo, __u8 with_policy,
                          __u64 *flags, void *lvb, __u32 lvb_len,
                          const struct lustre_handle *lockh, int rc,
@@ -1751,6 +1753,14 @@ int ldlm_cli_enqueue_local(const struct lu_env *env,
                           void *data, __u32 lvb_len, enum lvb_type lvb_type,
                           const __u64 *client_cookie,
                           struct lustre_handle *lockh);
+int ldlm_cli_lock_create_pack(struct obd_export *exp,
+                             struct ldlm_request *dlmreq,
+                             struct ldlm_enqueue_info *einfo,
+                             const struct ldlm_res_id *res_id,
+                             union ldlm_policy_data const *policy,
+                             __u64 *flags, void *lvb, __u32 lvb_len,
+                             enum lvb_type lvb_type,
+                             struct lustre_handle *lockh);
 int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits);
 int ldlm_cli_convert(struct ldlm_lock *lock,
                     enum ldlm_cancel_flags cancel_flags);
index d0fd5ab..645c072 100644 (file)
@@ -312,6 +312,7 @@ extern struct req_format RQF_LFSCK_NOTIFY;
 extern struct req_format RQF_LFSCK_QUERY;
 
 /* Batch UpdaTe req_format */
+extern struct req_format RQF_BUT_GETATTR;
 extern struct req_format RQF_MDS_BATCH;
 
 extern struct req_msg_field RMF_GENERIC_DATA;
index d224762..e0fa3ce 100644 (file)
@@ -988,6 +988,8 @@ struct md_op_item {
        struct inode                    *mop_dir;
        struct req_capsule              *mop_pill;
        struct work_struct               mop_work;
+       __u64                            mop_lock_flags;
+       unsigned int                     mop_subpill_allocated:1;
 };
 
 enum lu_batch_flags {
index b481894..440049e 100644 (file)
@@ -1166,6 +1166,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 
        RETURN(lvb_len < 0 ? lvb_len : rc);
 }
+EXPORT_SYMBOL(ldlm_server_completion_ast);
 
 /**
  * Server side ->l_glimpse_ast handler for client locks.
@@ -1277,10 +1278,10 @@ EXPORT_SYMBOL(ldlm_request_lock);
  * Main server-side entry point into LDLM for enqueue. This is called by ptlrpc
  * service threads to carry out client lock enqueueing requests.
  */
-int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
-                        struct ptlrpc_request *req,
-                        const struct ldlm_request *dlm_req,
-                        const struct ldlm_callback_suite *cbs)
+int ldlm_handle_enqueue(struct ldlm_namespace *ns,
+                       struct req_capsule *pill,
+                       const struct ldlm_request *dlm_req,
+                       const struct ldlm_callback_suite *cbs)
 {
        struct ldlm_reply *dlm_rep;
        __u64 flags;
@@ -1289,23 +1290,27 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
        void *cookie = NULL;
        int rc = 0;
        struct ldlm_resource *res = NULL;
+       struct ptlrpc_request *req = pill->rc_req;
        const struct lu_env *env = req->rq_svc_thread->t_env;
 
        ENTRY;
 
        LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
 
-       ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
-       flags = ldlm_flags_from_wire(dlm_req->lock_flags);
+       LASSERT(req && req->rq_export);
 
-       LASSERT(req->rq_export);
+       if (req_capsule_ptlreq(pill))
+               ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF,
+                                   LATF_SKIP);
+
+       flags = ldlm_flags_from_wire(dlm_req->lock_flags);
 
        /* for intent enqueue the stat will be updated inside intent policy */
        if (ptlrpc_req2svc(req)->srv_stats != NULL &&
            !(dlm_req->lock_flags & LDLM_FL_HAS_INTENT))
                ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
 
-       if (req->rq_export && req->rq_export->exp_nid_stats &&
+       if (req->rq_export->exp_nid_stats &&
            req->rq_export->exp_nid_stats->nid_ldlm_stats)
                lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
                                     LDLM_ENQUEUE - LDLM_FIRST_OPC);
@@ -1426,13 +1431,13 @@ existing_lock:
                /* based on the assumption that lvb size never changes during
                 * resource life time otherwise it need resource->lr_lock's
                 * protection */
-               req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+               req_capsule_set_size(pill, &RMF_DLM_LVB,
                                     RCL_SERVER, ldlm_lvbo_size(lock));
 
                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                        GOTO(out, rc = -ENOMEM);
 
-               rc = req_capsule_server_pack(&req->rq_pill);
+               rc = req_capsule_server_pack(pill);
                if (rc)
                        GOTO(out, rc);
        }
@@ -1444,12 +1449,12 @@ existing_lock:
                GOTO(out, err);
        }
 
-       dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       dlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
 
        ldlm_lock2desc(lock, &dlm_rep->lock_desc);
        ldlm_lock2handle(lock, &dlm_rep->lock_handle);
 
-       if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
+       if (lock->l_resource->lr_type == LDLM_EXTENT)
                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
 
        /*
@@ -1515,11 +1520,13 @@ existing_lock:
 
        EXIT;
 out:
-       req->rq_status = rc ?: err; /* return either error - b=11190 */
-       if (!req->rq_packed_final) {
-               int rc1 = lustre_pack_reply(req, 1, NULL, NULL);
-               if (rc == 0)
-                       rc = rc1;
+       if (req_capsule_ptlreq(pill)) {
+               req->rq_status = rc ?: err; /* return either error - b=11190 */
+               if (!req->rq_packed_final) {
+                       int rc1 = lustre_pack_reply(req, 1, NULL, NULL);
+                       if (rc == 0)
+                               rc = rc1;
+               }
        }
 
        /*
@@ -1532,18 +1539,17 @@ out:
                           err, rc);
 
                if (rc == 0 &&
-                   req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+                   req_capsule_has_field(pill, &RMF_DLM_LVB,
                                          RCL_SERVER) &&
                    ldlm_lvbo_size(lock) > 0) {
                        void *buf;
                        int buflen;
 
 retry:
-                       buf = req_capsule_server_get(&req->rq_pill,
-                                                    &RMF_DLM_LVB);
+                       buf = req_capsule_server_get(pill, &RMF_DLM_LVB);
                        LASSERTF(buf != NULL, "req %p, lock %p\n", req, lock);
-                       buflen = req_capsule_get_size(&req->rq_pill,
-                                       &RMF_DLM_LVB, RCL_SERVER);
+                       buflen = req_capsule_get_size(pill, &RMF_DLM_LVB,
+                                                     RCL_SERVER);
                        /*
                         * non-replayed lock, delayed lvb init may
                         * need to be occur now
@@ -1553,13 +1559,12 @@ retry:
 
                                rc2 = ldlm_lvbo_fill(lock, buf, &buflen);
                                if (rc2 >= 0) {
-                                       req_capsule_shrink(&req->rq_pill,
-                                                          &RMF_DLM_LVB,
+                                       req_capsule_shrink(pill, &RMF_DLM_LVB,
                                                           rc2, RCL_SERVER);
                                } else if (rc2 == -ERANGE) {
                                        rc2 = req_capsule_server_grow(
-                                                       &req->rq_pill,
-                                                       &RMF_DLM_LVB, buflen);
+                                                       pill, &RMF_DLM_LVB,
+                                                       buflen);
                                        if (!rc2) {
                                                goto retry;
                                        } else {
@@ -1569,8 +1574,7 @@ retry:
                                                 * to client.
                                                 */
                                                req_capsule_shrink(
-                                                       &req->rq_pill,
-                                                       &RMF_DLM_LVB, 0,
+                                                       pill, &RMF_DLM_LVB, 0,
                                                        RCL_SERVER);
                                        }
                                } else {
@@ -1579,8 +1583,7 @@ retry:
                        } else if (flags & LDLM_FL_REPLAY) {
                                /* no LVB resend upon replay */
                                if (buflen > 0)
-                                       req_capsule_shrink(&req->rq_pill,
-                                                          &RMF_DLM_LVB,
+                                       req_capsule_shrink(pill, &RMF_DLM_LVB,
                                                           0, RCL_SERVER);
                                else
                                        rc = buflen;
@@ -1614,6 +1617,7 @@ retry:
 
        return rc;
 }
+EXPORT_SYMBOL(ldlm_handle_enqueue);
 
 /*
  * Clear the blocking lock, the race is possible between ldlm_handle_convert0()
index 20d39bc..64408cc 100644 (file)
@@ -605,7 +605,7 @@ static bool ldlm_request_slot_needed(struct ldlm_enqueue_info *einfo)
  *
  * Called after receiving reply from server.
  */
-int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
+int ldlm_cli_enqueue_fini(struct obd_export *exp, struct req_capsule *pill,
                          struct ldlm_enqueue_info *einfo,
                          __u8 with_policy, __u64 *ldlm_flags, void *lvb,
                          __u32 lvb_len, const struct lustre_handle *lockh,
@@ -620,13 +620,17 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
        ENTRY;
 
-       if (request_slot)
-               obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
+       if (req_capsule_ptlreq(pill)) {
+               struct ptlrpc_request *req = pill->rc_req;
 
-       ptlrpc_put_mod_rpc_slot(req);
+               if (request_slot)
+                       obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
 
-       if (req && req->rq_svc_thread)
-               env = req->rq_svc_thread->t_env;
+               ptlrpc_put_mod_rpc_slot(req);
+
+               if (req && req->rq_svc_thread)
+                       env = req->rq_svc_thread->t_env;
+       }
 
        lock = ldlm_handle2lock(lockh);
        /* ldlm_cli_enqueue is holding a reference on this lock. */
@@ -648,15 +652,14 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        }
 
        /* Before we return, swab the reply */
-       reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       reply = req_capsule_server_get(pill, &RMF_DLM_REP);
        if (reply == NULL)
                GOTO(cleanup, rc = -EPROTO);
 
        if (lvb_len > 0) {
                int size = 0;
 
-               size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
-                                           RCL_SERVER);
+               size = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
                if (size < 0) {
                        LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size);
                        GOTO(cleanup, rc = size);
@@ -671,7 +674,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
        if (rc == ELDLM_LOCK_ABORTED) {
                if (lvb_len > 0 && lvb != NULL)
-                       rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
+                       rc = ldlm_fill_lvb(lock, pill, RCL_SERVER,
                                           lvb, lvb_len);
                GOTO(cleanup, rc = rc ? : ELDLM_LOCK_ABORTED);
        }
@@ -766,7 +769,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                 */
                lock_res_and_lock(lock);
                if (!ldlm_is_granted(lock))
-                       rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
+                       rc = ldlm_fill_lvb(lock, pill, RCL_SERVER,
                                           lock->l_lvb_data, lvb_len);
                unlock_res_and_lock(lock);
                if (rc < 0) {
@@ -1117,8 +1120,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        rc = ptlrpc_queue_wait(req);
 
-       err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
-                                   lvb, lvb_len, lockh, rc, need_req_slot);
+       err = ldlm_cli_enqueue_fini(exp, &req->rq_pill, einfo, policy ? 1 : 0,
+                                   flags, lvb, lvb_len, lockh, rc,
+                                   need_req_slot);
 
        /*
         * If ldlm_cli_enqueue_fini did not find the lock, we need to free
@@ -1141,6 +1145,62 @@ out:
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 
 /**
+ * Client-side IBITS lock create and pack for WBC EX lock request.
+ */
+int ldlm_cli_lock_create_pack(struct obd_export *exp,
+                             struct ldlm_request *dlmreq,
+                             struct ldlm_enqueue_info *einfo,
+                             const struct ldlm_res_id *res_id,
+                             union ldlm_policy_data const *policy,
+                             __u64 *flags, void *lvb, __u32 lvb_len,
+                             enum lvb_type lvb_type,
+                             struct lustre_handle *lockh)
+{
+       const struct ldlm_callback_suite cbs = {
+               .lcs_completion = einfo->ei_cb_cp,
+               .lcs_blocking   = einfo->ei_cb_bl,
+               .lcs_glimpse    = einfo->ei_cb_gl
+       };
+       struct ldlm_namespace *ns;
+       struct ldlm_lock *lock;
+
+       ENTRY;
+
+       LASSERT(exp != NULL);
+       LASSERT(!(*flags & LDLM_FL_REPLAY));
+
+       ns = exp->exp_obd->obd_namespace;
+       lock = ldlm_lock_create(ns, res_id, einfo->ei_type, einfo->ei_mode,
+                               &cbs, einfo->ei_cbdata, lvb_len, lvb_type);
+       if (IS_ERR(lock))
+               RETURN(PTR_ERR(lock));
+
+       if (einfo->ei_cb_created)
+               einfo->ei_cb_created(lock);
+
+       /* For the local lock, add the reference */
+       ldlm_lock_addref_internal(lock, einfo->ei_mode);
+       ldlm_lock2handle(lock, lockh);
+       if (policy != NULL)
+               lock->l_policy_data = *policy;
+
+       LDLM_DEBUG(lock, "client-side enqueue START, flags %#llx", *flags);
+       lock->l_conn_export = exp;
+       lock->l_export = NULL;
+       lock->l_blocking_ast = einfo->ei_cb_bl;
+       lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL |
+                                   LDLM_FL_ATOMIC_CB));
+       lock->l_activity = ktime_get_real_seconds();
+
+       ldlm_lock2desc(lock, &dlmreq->lock_desc);
+       dlmreq->lock_flags = ldlm_flags_to_wire(*flags);
+       dlmreq->lock_handle[0] = *lockh;
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_cli_lock_create_pack);
+
+/**
  * Client-side IBITS lock convert.
  *
  * Inform server that lock has been converted instead of canceling.
index ab81f01..569df1f 100644 (file)
@@ -799,6 +799,8 @@ struct ll_sb_info {
        /* metadata stat-ahead */
        unsigned int              ll_sa_running_max;/* max concurrent
                                                     * statahead instances */
+       unsigned int              ll_sa_batch_max;/* max SUB request count in
+                                                  * a batch PTLRPC request */
        unsigned int              ll_sa_max;     /* max statahead RPCs */
        atomic_t                  ll_sa_total;   /* statahead thread started
                                                  * count */
@@ -1541,9 +1543,9 @@ void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
 
 /* statahead.c */
 
-#define LL_SA_RPC_MIN           2
+#define LL_SA_RPC_MIN           8
 #define LL_SA_RPC_DEF           32
-#define LL_SA_RPC_MAX           512
+#define LL_SA_RPC_MAX           2048
 
 /* XXX: If want to support more concurrent statahead instances,
  *     please consider to decentralize the RPC lists attached
@@ -1552,6 +1554,9 @@ void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
 #define LL_SA_RUNNING_MAX      256
 #define LL_SA_RUNNING_DEF      16
 
+#define LL_SA_BATCH_MAX                1024
+#define LL_SA_BATCH_DEF                0
+
 #define LL_SA_CACHE_BIT         5
 #define LL_SA_CACHE_SIZE        (1 << LL_SA_CACHE_BIT)
 #define LL_SA_CACHE_MASK        (LL_SA_CACHE_SIZE - 1)
@@ -1592,6 +1597,9 @@ struct ll_statahead_info {
        struct list_head        sai_cache[LL_SA_CACHE_SIZE];
        spinlock_t              sai_cache_lock[LL_SA_CACHE_SIZE];
        atomic_t                sai_cache_count; /* entry count in cache */
+       struct lu_batch         *sai_bh;
+       __u32                   sai_max_batch_count;
+       __u64                   sai_index_end;
 };
 
 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentry,
index 39b3edb..bf57742 100644 (file)
@@ -177,6 +177,7 @@ static struct ll_sb_info *ll_init_sbi(struct lustre_sb_info *lsi)
 
        /* metadata statahead is enabled by default */
        sbi->ll_sa_running_max = LL_SA_RUNNING_DEF;
+       sbi->ll_sa_batch_max = LL_SA_BATCH_DEF;
        sbi->ll_sa_max = LL_SA_RPC_DEF;
        atomic_set(&sbi->ll_sa_total, 0);
        atomic_set(&sbi->ll_sa_wrong, 0);
@@ -347,7 +348,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                                   OBD_CONNECT2_GETATTR_PFID |
                                   OBD_CONNECT2_DOM_LVB |
                                   OBD_CONNECT2_REP_MBITS |
-                                  OBD_CONNECT2_ATOMIC_OPEN_LOCK;
+                                  OBD_CONNECT2_ATOMIC_OPEN_LOCK |
+                                  OBD_CONNECT2_BATCH_RPC;
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
        if (test_bit(LL_SBI_LRU_RESIZE, sbi->ll_flags))
index 4887946..14be341 100644 (file)
@@ -759,6 +759,41 @@ static ssize_t statahead_running_max_store(struct kobject *kobj,
 }
 LUSTRE_RW_ATTR(statahead_running_max);
 
+static ssize_t statahead_batch_max_show(struct kobject *kobj,
+                                       struct attribute *attr,
+                                       char *buf)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+
+       return snprintf(buf, 16, "%u\n", sbi->ll_sa_batch_max);
+}
+
+static ssize_t statahead_batch_max_store(struct kobject *kobj,
+                                        struct attribute *attr,
+                                        const char *buffer,
+                                        size_t count)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+       unsigned long val;
+       int rc;
+
+       rc = kstrtoul(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       if (val > LL_SA_BATCH_MAX) {
+               CWARN("%s: statahead_batch_max value %lu limited to maximum %d\n",
+                     sbi->ll_fsname, val, LL_SA_BATCH_MAX);
+               val = LL_SA_BATCH_MAX;
+       }
+
+       sbi->ll_sa_batch_max = val;
+       return count;
+}
+LUSTRE_RW_ATTR(statahead_batch_max);
+
 static ssize_t statahead_max_show(struct kobject *kobj,
                                  struct attribute *attr,
                                  char *buf)
@@ -783,12 +818,13 @@ static ssize_t statahead_max_store(struct kobject *kobj,
        if (rc)
                return rc;
 
-       if (val <= LL_SA_RPC_MAX)
-               sbi->ll_sa_max = val;
-       else
-               CERROR("Bad statahead_max value %lu. Valid values are in the range [0, %d]\n",
-                      val, LL_SA_RPC_MAX);
+       if (val > LL_SA_RPC_MAX) {
+               CWARN("%s: statahead_max value %lu limited to maximum %d\n",
+                     sbi->ll_fsname, val, LL_SA_RPC_MAX);
+               val = LL_SA_RPC_MAX;
+       }
 
+       sbi->ll_sa_max = val;
        return count;
 }
 LUSTRE_RW_ATTR(statahead_max);
@@ -1829,6 +1865,7 @@ static struct attribute *llite_attrs[] = {
        &lustre_attr_stats_track_ppid.attr,
        &lustre_attr_stats_track_gid.attr,
        &lustre_attr_statahead_running_max.attr,
+       &lustre_attr_statahead_batch_max.attr,
        &lustre_attr_statahead_max.attr,
        &lustre_attr_statahead_agl.attr,
        &lustre_attr_lazystatfs.attr,
index 7d84b54..b546b68 100644 (file)
@@ -142,6 +142,21 @@ static inline int sa_sent_full(struct ll_statahead_info *sai)
        return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
 }
 
+/* Batch metadata handle */
+static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
+{
+       return sai->sai_bh != NULL;
+}
+
+static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
+{
+       if (sa_has_batch_handle(sai)) {
+               sai->sai_index_end = sai->sai_index - 1;
+               (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
+                                     sai->sai_bh, false);
+       }
+}
+
 static inline int agl_list_empty(struct ll_statahead_info *sai)
 {
        return list_empty(&sai->sai_agls);
@@ -269,19 +284,35 @@ sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
 
 /* called by scanner after use, sa_entry will be killed */
 static void
-sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
+sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
 {
+       struct ll_inode_info *lli = ll_i2info(dir);
        struct sa_entry *tmp, *next;
+       bool wakeup = false;
 
        if (entry && entry->se_state == SA_ENTRY_SUCC) {
                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
 
                sai->sai_hit++;
                sai->sai_consecutive_miss = 0;
-               sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
+               if (sai->sai_max < sbi->ll_sa_max) {
+                       sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
+                       wakeup = true;
+               } else if (sai->sai_max_batch_count > 0) {
+                       if (sai->sai_max >= sai->sai_max_batch_count &&
+                          (sai->sai_index_end - entry->se_index) %
+                          sai->sai_max_batch_count == 0) {
+                               wakeup = true;
+                       } else if (entry->se_index == sai->sai_index_end) {
+                               wakeup = true;
+                       }
+               } else {
+                       wakeup = true;
+               }
        } else {
                sai->sai_miss++;
                sai->sai_consecutive_miss++;
+               wakeup = true;
        }
 
        if (entry)
@@ -296,6 +327,11 @@ sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
                        break;
                sa_kill(sai, tmp);
        }
+
+       spin_lock(&lli->lli_sa_lock);
+       if (wakeup && sai->sai_task)
+               wake_up_process(sai->sai_task);
+       spin_unlock(&lli->lli_sa_lock);
 }
 
 /*
@@ -339,6 +375,8 @@ static void sa_fini_data(struct md_op_item *item)
                kfree(op_data->op_name);
        ll_unlock_md_op_lsm(&item->mop_data);
        iput(item->mop_dir);
+       if (item->mop_subpill_allocated)
+               OBD_FREE_PTR(item->mop_pill);
        OBD_FREE_PTR(item);
 }
 
@@ -369,6 +407,7 @@ sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
        if (!child)
                op_data->op_fid2 = entry->se_fid;
 
+       item->mop_opc = MD_OP_GETATTR;
        item->mop_it.it_op = IT_GETATTR;
        item->mop_dir = igrab(dir);
        item->mop_cb = ll_statahead_interpret;
@@ -672,8 +711,12 @@ static void ll_statahead_interpret_work(struct work_struct *work)
                GOTO(out, rc = -EAGAIN);
 
        rc = ll_prep_inode(&child, pill, dir->i_sb, it);
-       if (rc)
+       if (rc) {
+               CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
+                      ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
+                      entry->se_qstr.name, PFID(&entry->se_fid), rc);
                GOTO(out, rc);
+       }
 
        /* If encryption context was returned by MDT, put it in
         * inode now to save an extra getxattr.
@@ -796,6 +839,19 @@ out:
        RETURN(rc);
 }
 
+static inline int sa_getattr(struct inode *dir, struct md_op_item *item)
+{
+       struct ll_statahead_info *sai = ll_i2info(dir)->lli_sai;
+       int rc;
+
+       if (sa_has_batch_handle(sai))
+               rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
+       else
+               rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
+
+       return rc;
+}
+
 /* async stat for file not found in dcache */
 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
 {
@@ -808,7 +864,7 @@ static int sa_lookup(struct inode *dir, struct sa_entry *entry)
        if (IS_ERR(item))
                RETURN(PTR_ERR(item));
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
+       rc = sa_getattr(dir, item);
        if (rc < 0)
                sa_fini_data(item);
 
@@ -853,7 +909,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
                RETURN(1);
        }
 
-       rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
+       rc = sa_getattr(dir, item);
        if (rc < 0) {
                entry->se_inode = NULL;
                iput(inode);
@@ -899,6 +955,9 @@ static void sa_statahead(struct dentry *parent, const char *name, int len,
 
        sai->sai_index++;
 
+       if (sa_sent_full(sai))
+               ll_statahead_flush_nowait(sai);
+
        EXIT;
 }
 
@@ -1015,6 +1074,7 @@ static int ll_statahead_thread(void *arg)
        int first = 0;
        struct md_op_data *op_data;
        struct page *page = NULL;
+       struct lu_batch *bh = NULL;
        __u64 pos = 0;
        int rc = 0;
 
@@ -1023,6 +1083,15 @@ static int ll_statahead_thread(void *arg)
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
               sai, parent);
 
+       sai->sai_max_batch_count = sbi->ll_sa_batch_max;
+       if (sai->sai_max_batch_count) {
+               bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
+                                    sai->sai_max_batch_count);
+               if (IS_ERR(bh))
+                       GOTO(out_stop_agl, rc = PTR_ERR(bh));
+       }
+
+       sai->sai_bh = bh;
        OBD_ALLOC_PTR(op_data);
        if (!op_data)
                GOTO(out, rc = -ENOMEM);
@@ -1183,6 +1252,8 @@ static int ll_statahead_thread(void *arg)
                spin_unlock(&lli->lli_sa_lock);
        }
 
+       ll_statahead_flush_nowait(sai);
+
        /*
         * statahead is finished, but statahead entries need to be cached, wait
         * for file release closedir() call to stop me.
@@ -1196,6 +1267,12 @@ static int ll_statahead_thread(void *arg)
 
        EXIT;
 out:
+       if (bh) {
+               rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
+               sai->sai_bh = NULL;
+       }
+
+out_stop_agl:
        ll_stop_agl(sai);
 
        /*
@@ -1567,11 +1644,7 @@ out:
         */
        if (lld_is_init(*dentryp))
                ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
-       sa_put(sai, entry);
-       spin_lock(&lli->lli_sa_lock);
-       if (sai->sai_task)
-               wake_up_process(sai->sai_task);
-       spin_unlock(&lli->lli_sa_lock);
+       sa_put(dir, sai, entry);
 
        RETURN(rc);
 }
index c5d6a9e..dbbfd89 100644 (file)
@@ -3997,9 +3997,33 @@ static int lmv_batch_flush(struct obd_export *exp, struct lu_batch *bh,
 static inline struct lmv_tgt_desc *
 lmv_batch_locate_tgt(struct lmv_obd *lmv, struct md_op_item *item)
 {
+       struct md_op_data *op_data = &item->mop_data;
        struct lmv_tgt_desc *tgt;
 
        switch (item->mop_opc) {
+       case MD_OP_GETATTR: {
+               struct lmv_tgt_desc *ptgt;
+
+               if (!fid_is_sane(&op_data->op_fid2))
+                       RETURN(ERR_PTR(-EINVAL));
+
+               ptgt = lmv_locate_tgt(lmv, op_data);
+               if (IS_ERR(ptgt))
+                       RETURN(ptgt);
+
+               tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
+               if (IS_ERR(tgt))
+                       RETURN(tgt);
+
+               /*
+                * Remote object needs two RPCs to lookup and getattr,
+                * considering the complexity don't support statahead for now.
+                */
+               if (tgt != ptgt)
+                       RETURN(ERR_PTR(-EREMOTE));
+
+               break;
+       }
        default:
                tgt = ERR_PTR(-ENOTSUPP);
        }
index e37a91e..d997e42 100644 (file)
 
 #include "mdc_internal.h"
 
+static int mdc_ldlm_lock_pack(struct obd_export *exp,
+                             struct req_capsule *pill,
+                             union ldlm_policy_data *policy,
+                             struct lu_fid *fid, struct md_op_item *item)
+{
+       struct ldlm_request *dlmreq;
+       struct ldlm_res_id res_id;
+       struct ldlm_enqueue_info *einfo = &item->mop_einfo;
+       int rc;
+
+       ENTRY;
+
+       dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
+       if (IS_ERR(dlmreq))
+               RETURN(PTR_ERR(dlmreq));
+
+       /* With Data-on-MDT the glimpse callback is needed too.
+        * It is set here in advance but not in mdc_finish_enqueue()
+        * to avoid possible races. It is safe to have glimpse handler
+        * for non-DOM locks and costs nothing.
+        */
+       if (einfo->ei_cb_gl == NULL)
+               einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
+
+       fid_build_reg_res_name(fid, &res_id);
+       rc = ldlm_cli_lock_create_pack(exp, dlmreq, einfo, &res_id,
+                                      policy, &item->mop_lock_flags,
+                                      NULL, 0, LVB_T_NONE, &item->mop_lockh);
+
+       RETURN(rc);
+}
+
+static int mdc_batch_getattr_pack(struct batch_update_head *head,
+                                 struct lustre_msg *reqmsg,
+                                 size_t *max_pack_size,
+                                 struct md_op_item *item)
+{
+       struct obd_export *exp = head->buh_exp;
+       struct lookup_intent *it = &item->mop_it;
+       struct md_op_data *op_data = &item->mop_data;
+       u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
+                   OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
+                   OBD_MD_DEFAULT_MEA;
+       union ldlm_policy_data policy = {
+               .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
+       };
+       struct ldlm_intent *lit;
+       bool have_secctx = false;
+       struct req_capsule pill;
+       __u32 easize;
+       __u32 size;
+       int rc;
+
+       ENTRY;
+
+       req_capsule_subreq_init(&pill, &RQF_BUT_GETATTR, NULL,
+                               reqmsg, NULL, RCL_CLIENT);
+
+       /* send name of security xattr to get upon intent */
+       if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
+           req_capsule_has_field(&pill, &RMF_FILE_SECCTX_NAME,
+                                 RCL_CLIENT) &&
+           op_data->op_file_secctx_name_size > 0 &&
+           op_data->op_file_secctx_name != NULL) {
+               have_secctx = true;
+               req_capsule_set_size(&pill, &RMF_FILE_SECCTX_NAME, RCL_CLIENT,
+                                    op_data->op_file_secctx_name_size);
+       }
+
+       req_capsule_set_size(&pill, &RMF_NAME, RCL_CLIENT,
+                            op_data->op_namelen + 1);
+
+       size = req_capsule_msg_size(&pill, RCL_CLIENT);
+       if (unlikely(size >= *max_pack_size)) {
+               *max_pack_size = size;
+               return -E2BIG;
+       }
+
+       req_capsule_client_pack(&pill);
+       /* pack the intent */
+       lit = req_capsule_client_get(&pill, &RMF_LDLM_INTENT);
+       lit->opc = (__u64)it->it_op;
 
-static md_update_pack_t mdc_update_packers[MD_OP_MAX];
+       easize = MAX_MD_SIZE_OLD; /* obd->u.cli.cl_default_mds_easize; */
 
-static object_update_interpret_t mdc_update_interpreters[MD_OP_MAX];
+       /* pack the intended request */
+       mdc_getattr_pack(&pill, valid, it->it_flags, op_data, easize);
+
+       item->mop_lock_flags |= LDLM_FL_HAS_INTENT;
+       rc = mdc_ldlm_lock_pack(head->buh_exp, &pill, &policy,
+                               &item->mop_data.op_fid1, item);
+       if (rc)
+               RETURN(rc);
+
+       req_capsule_set_size(&pill, &RMF_MDT_MD, RCL_SERVER, easize);
+       req_capsule_set_size(&pill, &RMF_ACL, RCL_SERVER,
+                            LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+       req_capsule_set_size(&pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
+                            sizeof(struct lmv_user_md));
+
+       if (have_secctx) {
+               char *secctx_name;
+
+               secctx_name = req_capsule_client_get(&pill,
+                                                    &RMF_FILE_SECCTX_NAME);
+               memcpy(secctx_name, op_data->op_file_secctx_name,
+                      op_data->op_file_secctx_name_size);
+
+               req_capsule_set_size(&pill, &RMF_FILE_SECCTX,
+                                    RCL_SERVER, easize);
+
+               CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
+                      op_data->op_file_secctx_name_size,
+                      op_data->op_file_secctx_name);
+       } else {
+               req_capsule_set_size(&pill, &RMF_FILE_SECCTX, RCL_SERVER, 0);
+       }
+
+       if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
+               req_capsule_set_size(&pill, &RMF_FILE_ENCCTX,
+                                    RCL_SERVER, easize);
+       else
+               req_capsule_set_size(&pill, &RMF_FILE_ENCCTX,
+                                    RCL_SERVER, 0);
+
+       req_capsule_set_replen(&pill);
+       reqmsg->lm_opc = BUT_GETATTR;
+       *max_pack_size = size;
+       RETURN(rc);
+}
+
+static md_update_pack_t mdc_update_packers[MD_OP_MAX] = {
+       [MD_OP_GETATTR] = mdc_batch_getattr_pack,
+};
+
+static int mdc_batch_getattr_interpret(struct ptlrpc_request *req,
+                                      struct lustre_msg *repmsg,
+                                      struct object_update_callback *ouc,
+                                      int rc)
+{
+       struct md_op_item *item = (struct md_op_item *)ouc->ouc_data;
+       struct ldlm_enqueue_info *einfo = &item->mop_einfo;
+       struct batch_update_head *head = ouc->ouc_head;
+       struct obd_export *exp = head->buh_exp;
+       struct req_capsule *pill = item->mop_pill;
+
+       req_capsule_subreq_init(pill, &RQF_BUT_GETATTR, req,
+                               NULL, repmsg, RCL_CLIENT);
+
+       rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &item->mop_lock_flags,
+                                  NULL, 0, &item->mop_lockh, rc, false);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdc_finish_enqueue(exp, pill, einfo, &item->mop_it,
+                               &item->mop_lockh, rc);
+out:
+       return item->mop_cb(item, rc);
+}
+
+object_update_interpret_t mdc_update_interpreters[MD_OP_MAX] = {
+       [MD_OP_GETATTR] = mdc_batch_getattr_interpret,
+};
 
 int mdc_batch_add(struct obd_export *exp, struct lu_batch *bh,
                  struct md_op_item *item)
@@ -59,6 +218,11 @@ int mdc_batch_add(struct obd_export *exp, struct lu_batch *bh,
                RETURN(-EFAULT);
        }
 
+       OBD_ALLOC_PTR(item->mop_pill);
+       if (item->mop_pill == NULL)
+               RETURN(-ENOMEM);
+
+       item->mop_subpill_allocated = 1;
        RETURN(cli_batch_add(exp, bh, item, mdc_update_packers[opc],
                             mdc_update_interpreters[opc]));
 }
index 956ff9d..e96e49f 100644 (file)
@@ -681,8 +681,8 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
 
        /* Complete obtaining the lock procedure. */
-       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
-                                  aa->oa_lvb, aa->oa_lvb ?
+       rc = ldlm_cli_enqueue_fini(aa->oa_exp, &req->rq_pill, &einfo, 1,
+                                  aa->oa_flags, aa->oa_lvb, aa->oa_lvb ?
                                   sizeof(*aa->oa_lvb) : 0, lockh, rc, true);
        /* Complete mdc stuff. */
        rc = mdc_enqueue_fini(aa->oa_exp, req, aa->oa_upcall, aa->oa_cookie,
index 6925741..8d71d1b 100644 (file)
@@ -193,6 +193,12 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
 int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data);
 int mdc_fill_lvb(struct req_capsule *pill, struct ost_lvb *lvb);
 
+int mdc_finish_enqueue(struct obd_export *exp,
+                      struct req_capsule *pill,
+                      struct ldlm_enqueue_info *einfo,
+                      struct lookup_intent *it,
+                      struct lustre_handle *lockh, int rc);
+
 /* the minimum inline repsize should be PAGE_SIZE at least */
 #define MDC_DOM_DEF_INLINE_REPSIZE max(8192UL, PAGE_SIZE)
 #define MDC_DOM_MAX_INLINE_REPSIZE XATTR_SIZE_MAX
index 994785e..cce8225 100644 (file)
@@ -673,13 +673,13 @@ static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
        RETURN(req);
 }
 
-static int mdc_finish_enqueue(struct obd_export *exp,
-                             struct ptlrpc_request *req,
-                             struct ldlm_enqueue_info *einfo,
-                             struct lookup_intent *it,
-                             struct lustre_handle *lockh, int rc)
+int mdc_finish_enqueue(struct obd_export *exp,
+                      struct req_capsule *pill,
+                      struct ldlm_enqueue_info *einfo,
+                      struct lookup_intent *it,
+                      struct lustre_handle *lockh, int rc)
 {
-       struct req_capsule *pill = &req->rq_pill;
+       struct ptlrpc_request *req = pill->rc_req;
        struct ldlm_request *lockreq;
        struct ldlm_reply *lockrep;
        struct ldlm_lock *lock;
@@ -1074,7 +1074,7 @@ resend:
                goto resend;
        }
 
-       rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+       rc = mdc_finish_enqueue(exp, &req->rq_pill, einfo, it, lockh, rc);
        if (rc < 0) {
                if (lustre_handle_is_used(lockh)) {
                        ldlm_lock_decref(lockh, einfo->ei_mode);
@@ -1377,6 +1377,7 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
        struct ldlm_enqueue_info *einfo = &item->mop_einfo;
        struct lookup_intent *it = &item->mop_it;
        struct lustre_handle *lockh = &item->mop_lockh;
+       struct req_capsule *pill = &req->rq_pill;
        struct ldlm_reply *lockrep;
        __u64 flags = LDLM_FL_HAS_INTENT;
 
@@ -1384,7 +1385,7 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
        if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
                rc = -ETIMEDOUT;
 
-       rc = ldlm_cli_enqueue_fini(exp, req, einfo, 1, &flags, NULL, 0,
+       rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &flags, NULL, 0,
                                   lockh, rc, true);
        if (rc < 0) {
                CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
@@ -1393,13 +1394,13 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
                GOTO(out, rc);
        }
 
-       lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
        LASSERT(lockrep != NULL);
 
        lockrep->lock_policy_res2 =
                ptlrpc_status_ntoh(lockrep->lock_policy_res2);
 
-       rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
+       rc = mdc_finish_enqueue(exp, pill, einfo, it, lockh, rc);
        if (rc)
                GOTO(out, rc);
 
@@ -1407,7 +1408,7 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
        EXIT;
 
 out:
-       item->mop_pill = &req->rq_pill;
+       item->mop_pill = pill;
        item->mop_cb(item, rc);
        return 0;
 }
index 5e6049b..3eb15a1 100644 (file)
 #include <lustre_mds.h>
 #include "mdt_internal.h"
 
+static struct ldlm_callback_suite mdt_dlm_cbs = {
+       .lcs_completion = ldlm_server_completion_ast,
+       .lcs_blocking   = tgt_blocking_ast,
+       .lcs_glimpse    = ldlm_server_glimpse_ast
+};
+
 static int mdt_batch_unpack(struct mdt_thread_info *info, __u32 opc)
 {
        int rc = 0;
 
        switch (opc) {
+       case BUT_GETATTR:
+               info->mti_dlm_req = req_capsule_client_get(info->mti_pill,
+                                                          &RMF_DLM_REQ);
+               if (info->mti_dlm_req == NULL)
+                       RETURN(-EFAULT);
+               break;
        default:
                rc = -EOPNOTSUPP;
                CERROR("%s: Unexpected opcode %d: rc = %d\n",
@@ -80,6 +92,20 @@ static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
        RETURN(rc);
 }
 
+static int mdt_batch_getattr(struct tgt_session_info *tsi)
+{
+       struct mdt_thread_info *info = mdt_th_info(tsi->tsi_env);
+       struct req_capsule *pill = &info->mti_sub_pill;
+       int rc;
+
+       ENTRY;
+
+       rc = ldlm_handle_enqueue(info->mti_exp->exp_obd->obd_namespace,
+                                pill, info->mti_dlm_req, &mdt_dlm_cbs);
+
+       RETURN(rc);
+}
+
 /* Batch UpdaTe Request with a format known in advance */
 #define TGT_BUT_HDL(flags, opc, fn)                    \
 [opc - BUT_FIRST_OPC] = {                              \
@@ -93,7 +119,9 @@ static int mdt_batch_reconstruct(struct tgt_session_info *tsi, long opc)
        .th_hp          = NULL,                         \
 }
 
-static struct tgt_handler mdt_batch_handlers[BUT_LAST_OPC];
+static struct tgt_handler mdt_batch_handlers[] = {
+TGT_BUT_HDL(HAS_KEY | HAS_REPLY,       BUT_GETATTR,    mdt_batch_getattr),
+};
 
 static struct tgt_handler *mdt_batch_handler_find(__u32 opc)
 {
index 3fe7a60..6d01604 100644 (file)
@@ -2930,7 +2930,9 @@ static void mdt_preset_secctx_size(struct mdt_thread_info *info)
                        /* pre-set size in server part with max size */
                        req_capsule_set_size(pill, &RMF_FILE_SECCTX,
                                             RCL_SERVER,
-                                            OBD_MAX_DEFAULT_EA_SIZE);
+                                            req_capsule_ptlreq(pill) ?
+                                            OBD_MAX_DEFAULT_EA_SIZE :
+                                            MAX_MD_SIZE_OLD);
                else
                        req_capsule_set_size(pill, &RMF_FILE_SECCTX,
                                             RCL_SERVER, 0);
@@ -4229,7 +4231,8 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
                /* Pack reply. */
                if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
-                                            DEF_REP_MD_SIZE);
+                                            req_capsule_ptlreq(pill) ?
+                                            DEF_REP_MD_SIZE : MAX_MD_SIZE_OLD);
 
                if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_LOGCOOKIES,
@@ -4953,7 +4956,8 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
                RETURN(-EPROTO);
        }
 
-       req_capsule_extend(pill, it_format);
+       if (!info->mti_batch_env)
+               req_capsule_extend(pill, it_format);
 
        rc = mdt_unpack_req_pack_rep(info, it_handler_flags);
        if (rc < 0)
@@ -4971,7 +4975,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
        rc = (*it_handler)(it_opc, info, lockp, flags);
 
        /* Check whether the reply has been packed successfully. */
-       if (req->rq_repmsg != NULL) {
+       if (info->mti_batch_env || req->rq_repmsg != NULL) {
                rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
                rep->lock_policy_res2 =
                        ptlrpc_status_hton(rep->lock_policy_res2);
@@ -5013,14 +5017,30 @@ static int mdt_intent_policy(const struct lu_env *env,
 
        tsi = tgt_ses_info(env);
 
-       info = tsi2mdt_info(tsi);
+       info = mdt_th_info(env);
        LASSERT(info != NULL);
-       pill = info->mti_pill;
+
+       /* Check whether it is a sub request processing in a batch request */
+       if (info->mti_batch_env) {
+               pill = info->mti_pill;
+               LASSERT(pill == &info->mti_sub_pill);
+       } else {
+               info = tsi2mdt_info(tsi);
+               pill = info->mti_pill;
+       }
+
        LASSERT(pill->rc_req == req);
        ldesc = &info->mti_dlm_req->lock_desc;
 
-       if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
-               req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
+       if (info->mti_batch_env ||
+           req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
+               /*
+                * For batch processing environment, the request format has
+                * already been set.
+                */
+               if (!info->mti_batch_env)
+                       req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
+
                it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
                if (it != NULL) {
                        mdt_ptlrpc_stats_update(req, it->opc);
@@ -5063,7 +5083,8 @@ static int mdt_intent_policy(const struct lu_env *env,
                        rc = err_serious(rc);
        }
 
-       mdt_thread_info_fini(info);
+       if (!info->mti_batch_env)
+               mdt_thread_info_fini(info);
        RETURN(rc);
 }
 
index b147ae1..4201b4d 100644 (file)
@@ -831,6 +831,14 @@ int mdt_fix_reply(struct mdt_thread_info *info)
                 LASSERT(md_size > md_packed);
                 CDEBUG(D_INFO, "Enlarge reply buffer, need extra %d bytes\n",
                        md_size - md_packed);
+
+               /* FIXME: Grow reply buffer for the batch request. */
+               if (info->mti_batch_env) {
+                       body->mbo_valid &= ~(OBD_MD_FLDIREA | OBD_MD_FLEASIZE);
+                       info->mti_big_lmm_used = 0;
+                       GOTO(check_acl, rc);
+               }
+
                 rc = req_capsule_server_grow(pill, &RMF_MDT_MD, md_size);
                 if (rc) {
                         /* we can't answer with proper LOV EA, drop flags,
@@ -864,10 +872,17 @@ int mdt_fix_reply(struct mdt_thread_info *info)
                info->mti_big_lmm_used = 0;
        }
 
+check_acl:
        if (info->mti_big_acl_used) {
                CDEBUG(D_INFO, "Enlarge reply ACL buffer to %d bytes\n",
                       acl_size);
 
+               if (info->mti_batch_env) {
+                       body->mbo_valid &= ~OBD_MD_FLACL;
+                       info->mti_big_acl_used = 0;
+                       RETURN(rc);
+               }
+
                rc = req_capsule_server_grow(pill, &RMF_ACL, acl_size);
                if (rc) {
                        body->mbo_valid &= ~OBD_MD_FLACL;
index 4b96bec..1a8cf65 100644 (file)
@@ -2976,8 +2976,9 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
        }
 
        /* Complete obtaining the lock procedure. */
-       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
-                                  lvb, lvb_len, lockh, rc, false);
+       rc = ldlm_cli_enqueue_fini(aa->oa_exp, &req->rq_pill, &einfo, 1,
+                                  aa->oa_flags, lvb, lvb_len, lockh, rc,
+                                  false);
        /* Complete osc stuff. */
        rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
                              aa->oa_flags, aa->oa_speculative, rc);
index bbd9d8b..ff6ff62 100644 (file)
@@ -766,6 +766,26 @@ static const struct req_msg_field *obd_lfsck_reply[] = {
        &RMF_LFSCK_REPLY,
 };
 
+static const struct req_msg_field *mds_batch_getattr_client[] = {
+       &RMF_DLM_REQ,
+       &RMF_LDLM_INTENT,
+       &RMF_MDT_BODY,     /* coincides with mds_getattr_name_client[] */
+       &RMF_CAPA1,
+       &RMF_NAME,
+       &RMF_FILE_SECCTX_NAME
+};
+
+static const struct req_msg_field *mds_batch_getattr_server[] = {
+       &RMF_DLM_REP,
+       &RMF_MDT_BODY,
+       &RMF_MDT_MD,
+       &RMF_ACL,
+       &RMF_CAPA1,
+       &RMF_FILE_SECCTX,
+       &RMF_DEFAULT_MDT_MD,
+       &RMF_FILE_ENCCTX,
+};
+
 static struct req_format *req_formats[] = {
        &RQF_OBD_PING,
        &RQF_OBD_SET_INFO,
@@ -865,6 +885,7 @@ static struct req_format *req_formats[] = {
        &RQF_CONNECT,
        &RQF_LFSCK_NOTIFY,
        &RQF_LFSCK_QUERY,
+       &RQF_BUT_GETATTR,
        &RQF_MDS_BATCH,
 };
 
@@ -1806,6 +1827,11 @@ struct req_format RQF_OST_LADVISE =
        DEFINE_REQ_FMT0("OST_LADVISE", ost_ladvise, ost_body_only);
 EXPORT_SYMBOL(RQF_OST_LADVISE);
 
+struct req_format RQF_BUT_GETATTR =
+       DEFINE_REQ_FMT0("MDS_BATCH_GETATTR", mds_batch_getattr_client,
+                       mds_batch_getattr_server);
+EXPORT_SYMBOL(RQF_BUT_GETATTR);
+
 /* Convenience macro */
 #define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
 
index de5726d..26c308a 100644 (file)
@@ -171,7 +171,7 @@ static int qsd_intent_interpret(const struct lu_env *env,
        req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
        req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
 
-       rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, &einfo, 0, &flags,
+       rc = ldlm_cli_enqueue_fini(aa->aa_exp, &req->rq_pill, &einfo, 0, &flags,
                                   aa->aa_lvb, sizeof(*(aa->aa_lvb)),
                                   lockh, rc, false);
        if (rc < 0) {
index eed6089..012cc78 100644 (file)
@@ -1357,8 +1357,8 @@ EXPORT_SYMBOL(tgt_sync);
  * \retval     0 on success
  * \retval     negative number on error
  */
-static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                           void *data, int flag)
+int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                    void *data, int flag)
 {
        struct lu_env            env;
        struct lu_target        *tgt;
@@ -1423,6 +1423,7 @@ err:
        rc = ldlm_server_blocking_ast(lock, desc, data, flag);
        RETURN(rc);
 }
+EXPORT_SYMBOL(tgt_blocking_ast);
 
 static struct ldlm_callback_suite tgt_dlm_cbs = {
        .lcs_completion = ldlm_server_completion_ast,
@@ -1441,8 +1442,8 @@ int tgt_enqueue(struct tgt_session_info *tsi)
         * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
         */
        LASSERT(tsi->tsi_dlm_req != NULL);
-       rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
-                                 tsi->tsi_dlm_req, &tgt_dlm_cbs);
+       rc = ldlm_handle_enqueue(tsi->tsi_exp->exp_obd->obd_namespace,
+                                &req->rq_pill, tsi->tsi_dlm_req, &tgt_dlm_cbs);
        if (rc)
                RETURN(err_serious(rc));