Whamcloud - gitweb
b=18631
authornicolas.williams <nicolas.williams>
Thu, 5 Nov 2009 20:56:12 +0000 (20:56 +0000)
committernicolas.williams <nicolas.williams>
Thu, 5 Nov 2009 20:56:12 +0000 (20:56 +0000)
i=robert.read@sun.com
i=tom.wang@sun.com

28 files changed:
lustre/ChangeLog
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_capa.h
lustre/include/lustre_debug.h
lustre/include/lustre_dlm.h
lustre/include/lustre_net.h
lustre/include/lustre_req_layout.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/liblustre/super.c
lustre/mdc/mdc_locks.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_reint.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_handler.c
lustre/obdclass/debug.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_lvb.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_module.c
lustre/utils/req-layout.c

index f1fff09..da9379b 100644 (file)
@@ -14,6 +14,12 @@ tbd  Sun Microsystems, Inc.
        * RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a
         removed cwd "./" (refer to Bugzilla 14399).
 
        * RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a
         removed cwd "./" (refer to Bugzilla 14399).
 
+Severity   : normal$
+Bugzilla   : 18631
+Description: Unify req format on client/servers
+Details    : Use new req_capsule API [almost] everywhere instead of old PTLRPC
+             buffers and swabbers approach..
+
 Severity   : normal
 Frequency  : cleanup
 Bugzilla   : 19200
 Severity   : normal
 Frequency  : cleanup
 Bugzilla   : 19200
index 5e205b8..7f76ca0 100644 (file)
@@ -2465,6 +2465,13 @@ extern void lustre_swab_llog_rec(struct llog_rec_hdr  *rec,
 struct lustre_cfg;
 extern void lustre_swab_lustre_cfg(struct lustre_cfg *lcfg);
 
 struct lustre_cfg;
 extern void lustre_swab_lustre_cfg(struct lustre_cfg *lcfg);
 
+/* Functions for dumping PTLRPC fields */
+void dump_rniobuf(struct niobuf_remote *rnb);
+void dump_ioo(struct obd_ioobj *nb);
+void dump_obdo(struct obdo *oa);
+void dump_ost_body(struct ost_body *ob);
+void dump_rcs(__u32 *rc);
+
 /* this will be used when OBD_CONNECT_CHANGE_QS is set */
 struct qunit_data {
         /**
 /* this will be used when OBD_CONNECT_CHANGE_QS is set */
 struct qunit_data {
         /**
@@ -2503,7 +2510,7 @@ struct qunit_data {
 #define QDATA_CLR_CHANGE_QS(qdata)  ((qdata)->qd_flags &= ~LQUOTA_FLAGS_CHG_QS)
 
 extern void lustre_swab_qdata(struct qunit_data *d);
 #define QDATA_CLR_CHANGE_QS(qdata)  ((qdata)->qd_flags &= ~LQUOTA_FLAGS_CHG_QS)
 
 extern void lustre_swab_qdata(struct qunit_data *d);
-extern struct qunit_data *quota_get_qdata(void*req, int is_req, int is_exp);
+extern struct qunit_data *quota_get_qdata(void *req, int is_req, int is_exp);
 extern int quota_copy_qdata(void *request, struct qunit_data *qdata,
                             int is_req, int is_exp);
 
 extern int quota_copy_qdata(void *request, struct qunit_data *qdata,
                             int is_req, int is_exp);
 
index 9b9b21b..44a06b7 100644 (file)
@@ -284,22 +284,6 @@ static inline int capa_opc_supported(struct lustre_capa *capa, __u64 opc)
         return (capa_opc(capa) & opc) == opc;
 }
 
         return (capa_opc(capa) & opc) == opc;
 }
 
-static inline struct lustre_capa *
-lustre_unpack_incoming_capa(struct ptlrpc_request *req, unsigned int offset)
-{
-        struct lustre_capa *capa;
-
-        capa = lustre_swab_reqbuf(req, offset, sizeof(*capa),
-                                  lustre_swab_lustre_capa);
-        if (capa == NULL)
-                CERROR("bufcount %u, bufsize %u\n",
-                       lustre_msg_bufcount(req->rq_reqmsg),
-                       (lustre_msg_bufcount(req->rq_reqmsg) <= offset) ?
-                                -1 : lustre_msg_buflen(req->rq_reqmsg, offset));
-
-        return capa;
-}
-
 struct filter_capa_key {
         struct list_head        k_list;
         struct lustre_capa_key  k_key;
 struct filter_capa_key {
         struct list_head        k_list;
         struct lustre_capa_key  k_key;
index 5ded0ff..73c22bb 100644 (file)
@@ -38,6 +38,7 @@
 #define _LUSTRE_DEBUG_H
 
 #include <lustre_net.h>
 #define _LUSTRE_DEBUG_H
 
 #include <lustre_net.h>
+#include <obd.h>
 
 #if defined(__linux__)
 #include <linux/lustre_debug.h>
 
 #if defined(__linux__)
 #include <linux/lustre_debug.h>
@@ -67,11 +68,8 @@ do { if (offset > ASSERT_MAX_SIZE_MB << 20) {                           \
 }} while(0)
 
 /* lib/debug.c */
 }} while(0)
 
 /* lib/debug.c */
-int dump_lniobuf(struct niobuf_local *lnb);
-int dump_rniobuf(struct niobuf_remote *rnb);
-int dump_ioo(struct obd_ioobj *nb);
+void dump_lniobuf(struct niobuf_local *lnb);
 int dump_req(struct ptlrpc_request *req);
 int dump_req(struct ptlrpc_request *req);
-int dump_obdo(struct obdo *oa);
 void dump_lsm(int level, struct lov_stripe_md *lsm);
 int block_debug_setup(void *addr, int len, __u64 off, __u64 id);
 int block_debug_check(char *who, void *addr, int len, __u64 off, __u64 id);
 void dump_lsm(int level, struct lov_stripe_md *lsm);
 int block_debug_setup(void *addr, int len, __u64 off, __u64 id);
 int block_debug_check(char *who, void *addr, int len, __u64 off, __u64 id);
index c83a841..710f921 100644 (file)
@@ -368,7 +368,7 @@ struct ldlm_valblock_ops {
         int (*lvbo_init)(struct ldlm_resource *res);
         int (*lvbo_update)(struct ldlm_resource *res,
                            struct ptlrpc_request *r,
         int (*lvbo_init)(struct ldlm_resource *res);
         int (*lvbo_update)(struct ldlm_resource *res,
                            struct ptlrpc_request *r,
-                           int buf_idx, int increase);
+                           int increase);
 };
 
 typedef enum {
 };
 
 typedef enum {
@@ -667,7 +667,6 @@ struct ldlm_lock {
          */
         __u32                 l_lvb_len;
         void                 *l_lvb_data;
          */
         __u32                 l_lvb_len;
         void                 *l_lvb_data;
-        void                 *l_lvb_swabber;
 
         void                 *l_ast_data;
         spinlock_t            l_extents_list_lock;
 
         void                 *l_ast_data;
         spinlock_t            l_extents_list_lock;
@@ -915,12 +914,11 @@ ldlm_handle2lock_long(const struct lustre_handle *h, int flags)
 }
 
 static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
 }
 
 static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
-                                       struct ptlrpc_request *r, int buf_idx,
-                                       int increase)
+                                       struct ptlrpc_request *r, int increase)
 {
         if (res->lr_namespace->ns_lvbo &&
             res->lr_namespace->ns_lvbo->lvbo_update) {
 {
         if (res->lr_namespace->ns_lvbo &&
             res->lr_namespace->ns_lvbo->lvbo_update) {
-                return res->lr_namespace->ns_lvbo->lvbo_update(res, r, buf_idx,
+                return res->lr_namespace->ns_lvbo->lvbo_update(res, r,
                                                                increase);
         }
         return 0;
                                                                increase);
         }
         return 0;
@@ -1064,8 +1062,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                      struct ldlm_enqueue_info *einfo,
                      const struct ldlm_res_id *res_id,
                      ldlm_policy_data_t *policy, int *flags,
                      struct ldlm_enqueue_info *einfo,
                      const struct ldlm_res_id *res_id,
                      ldlm_policy_data_t *policy, int *flags,
-                     void *lvb, __u32 lvb_len, void *lvb_swabber,
-                     struct lustre_handle *lockh, int async);
+                     void *lvb, __u32 lvb_len, struct lustre_handle *lockh,
+                     int async);
 int ldlm_prep_enqueue_req(struct obd_export *exp,
                           struct ptlrpc_request *req,
                           struct list_head *cancels,
 int ldlm_prep_enqueue_req(struct obd_export *exp,
                           struct ptlrpc_request *req,
                           struct list_head *cancels,
@@ -1080,8 +1078,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
                           int *flags, void *lvb, __u32 lvb_len,
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
                           int *flags, void *lvb, __u32 lvb_len,
-                          void *lvb_swabber, struct lustre_handle *lockh,
-                          int rc);
+                          struct lustre_handle *lockh, int rc);
 int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                            const struct ldlm_res_id *res_id,
                            ldlm_type_t type, ldlm_policy_data_t *policy,
 int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                            const struct ldlm_res_id *res_id,
                            ldlm_type_t type, ldlm_policy_data_t *policy,
@@ -1089,7 +1086,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                            ldlm_blocking_callback blocking,
                            ldlm_completion_callback completion,
                            ldlm_glimpse_callback glimpse,
                            ldlm_blocking_callback blocking,
                            ldlm_completion_callback completion,
                            ldlm_glimpse_callback glimpse,
-                           void *data, __u32 lvb_len, void *lvb_swabber,
+                           void *data, __u32 lvb_len,
                            const __u64 *client_cookie,
                            struct lustre_handle *lockh);
 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
                            const __u64 *client_cookie,
                            struct lustre_handle *lockh);
 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
index 2433ea8..af283c5 100644 (file)
@@ -415,7 +415,8 @@ struct ptlrpc_request {
                                  rq_pack_udesc:1,
                                  rq_pack_bulk:1,
                                  /* doesn't expect reply FIXME */
                                  rq_pack_udesc:1,
                                  rq_pack_bulk:1,
                                  /* doesn't expect reply FIXME */
-                                 rq_no_reply:1;
+                                 rq_no_reply:1,
+                                 rq_pill_init:1;     /* pill initialized */
 
         uid_t                    rq_auth_uid;        /* authed uid */
         uid_t                    rq_auth_mapped_uid; /* authed uid mapped to */
 
         uid_t                    rq_auth_uid;        /* authed uid */
         uid_t                    rq_auth_mapped_uid; /* authed uid mapped to */
@@ -1133,10 +1134,6 @@ int lustre_msg_buflen(struct lustre_msg *m, int n);
 void lustre_msg_set_buflen(struct lustre_msg *m, int n, int len);
 int lustre_msg_bufcount(struct lustre_msg *m);
 char *lustre_msg_string (struct lustre_msg *m, int n, int max_len);
 void lustre_msg_set_buflen(struct lustre_msg *m, int n, int len);
 int lustre_msg_bufcount(struct lustre_msg *m);
 char *lustre_msg_string (struct lustre_msg *m, int n, int max_len);
-void *lustre_swab_reqbuf(struct ptlrpc_request *req, int n, int minlen,
-                         void *swabber);
-void *lustre_swab_repbuf(struct ptlrpc_request *req, int n, int minlen,
-                         void *swabber);
 __u32 lustre_msghdr_get_flags(struct lustre_msg *msg);
 void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags);
 __u32 lustre_msg_get_flags(struct lustre_msg *msg);
 __u32 lustre_msghdr_get_flags(struct lustre_msg *msg);
 void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags);
 __u32 lustre_msg_get_flags(struct lustre_msg *msg);
index 8c650e7..0cd88b4 100644 (file)
@@ -75,6 +75,8 @@ void req_capsule_init(struct req_capsule *pill, struct ptlrpc_request *req,
 void req_capsule_fini(struct req_capsule *pill);
 
 void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt);
 void req_capsule_fini(struct req_capsule *pill);
 
 void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt);
+void req_capsule_client_dump(struct req_capsule *pill);
+void req_capsule_server_dump(struct req_capsule *pill);
 void req_capsule_init_area(struct req_capsule *pill);
 int req_capsule_filled_sizes(struct req_capsule *pill, enum req_location loc);
 int  req_capsule_server_pack(struct req_capsule *pill);
 void req_capsule_init_area(struct req_capsule *pill);
 int req_capsule_filled_sizes(struct req_capsule *pill, enum req_location loc);
 int  req_capsule_server_pack(struct req_capsule *pill);
@@ -271,6 +273,7 @@ extern const struct req_msg_field RMF_OST_BODY;
 extern const struct req_msg_field RMF_OBD_IOOBJ;
 extern const struct req_msg_field RMF_OBD_ID;
 extern const struct req_msg_field RMF_NIOBUF_REMOTE;
 extern const struct req_msg_field RMF_OBD_IOOBJ;
 extern const struct req_msg_field RMF_OBD_ID;
 extern const struct req_msg_field RMF_NIOBUF_REMOTE;
+extern const struct req_msg_field RMF_RCS;
 extern const struct req_msg_field RMF_FIEMAP_KEY;
 extern const struct req_msg_field RMF_FIEMAP_VAL;
 
 extern const struct req_msg_field RMF_FIEMAP_KEY;
 extern const struct req_msg_field RMF_FIEMAP_VAL;
 
index dd57661..eda190b 100644 (file)
@@ -1673,7 +1673,7 @@ void ldlm_cancel_locks_for_export_cb(void *obj, void *data)
         LDLM_LOCK_GET(lock);
 
         LDLM_DEBUG(lock, "export %p", exp);
         LDLM_LOCK_GET(lock);
 
         LDLM_DEBUG(lock, "export %p", exp);
-        ldlm_res_lvbo_update(res, NULL, 0, 1);
+        ldlm_res_lvbo_update(res, NULL, 1);
         ldlm_lock_cancel(lock);
         ldlm_reprocess_all(res);
         ldlm_resource_putref(res);
         ldlm_lock_cancel(lock);
         ldlm_reprocess_all(res);
         ldlm_resource_putref(res);
index c59fd3e..a3fc916 100644 (file)
@@ -637,8 +637,7 @@ static int ldlm_cb_interpret(const struct lu_env *env,
                  * been received yet, we need to update lvbo to have the
                  * proper attributes cached. */
                 if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
                  * been received yet, we need to update lvbo to have the
                  * proper attributes cached. */
                 if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
-                        ldlm_res_lvbo_update(lock->l_resource, NULL,
-                                             0, 1);
+                        ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
                 rc = ldlm_handle_ast_error(lock, req, rc,
                                            arg->type == LDLM_BL_CALLBACK
                                            ? "blocking" : "completion");
                 rc = ldlm_handle_ast_error(lock, req, rc,
                                            arg->type == LDLM_BL_CALLBACK
                                            ? "blocking" : "completion");
@@ -949,7 +948,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         else if (rc != 0)
                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
         else
         else if (rc != 0)
                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
         else
-                rc = ldlm_res_lvbo_update(res, req, REPLY_REC_OFF, 1);
+                rc = ldlm_res_lvbo_update(res, req, 1);
 
         ptlrpc_req_finished(req);
         if (rc == -ERESTART)
 
         ptlrpc_req_finished(req);
         if (rc == -ERESTART)
@@ -1391,7 +1390,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                         if (res != NULL) {
                                 ldlm_resource_getref(res);
                                 LDLM_RESOURCE_ADDREF(res);
                         if (res != NULL) {
                                 ldlm_resource_getref(res);
                                 LDLM_RESOURCE_ADDREF(res);
-                                ldlm_res_lvbo_update(res, NULL, 0, 1);
+                                ldlm_res_lvbo_update(res, NULL, 1);
                         }
                         pres = res;
                 }
                         }
                         pres = res;
                 }
@@ -1544,9 +1543,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                         LDLM_ERROR(lock, "completion AST did not contain "
                                    "expected LVB!");
                 } else {
                         LDLM_ERROR(lock, "completion AST did not contain "
                                    "expected LVB!");
                 } else {
-                        void *lvb = req_capsule_client_swab_get(&req->rq_pill,
-                                                                &RMF_DLM_LVB,
-                                                  (void *)lock->l_lvb_swabber);
+                        void *lvb = req_capsule_client_get(&req->rq_pill,
+                                                           &RMF_DLM_LVB);
                         memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
                 }
         }
                         memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
                 }
         }
index 1eaa0e1..5402c50 100644 (file)
@@ -377,7 +377,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                            ldlm_blocking_callback blocking,
                            ldlm_completion_callback completion,
                            ldlm_glimpse_callback glimpse,
                            ldlm_blocking_callback blocking,
                            ldlm_completion_callback completion,
                            ldlm_glimpse_callback glimpse,
-                           void *data, __u32 lvb_len, void *lvb_swabber,
+                           void *data, __u32 lvb_len,
                            const __u64 *client_cookie,
                            struct lustre_handle *lockh)
 {
                            const __u64 *client_cookie,
                            struct lustre_handle *lockh)
 {
@@ -406,7 +406,6 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         lock->l_flags |= LDLM_FL_LOCAL;
         if (*flags & LDLM_FL_ATOMIC_CB)
                 lock->l_flags |= LDLM_FL_ATOMIC_CB;
         lock->l_flags |= LDLM_FL_LOCAL;
         if (*flags & LDLM_FL_ATOMIC_CB)
                 lock->l_flags |= LDLM_FL_ATOMIC_CB;
-        lock->l_lvb_swabber = lvb_swabber;
         unlock_res_and_lock(lock);
         if (policy != NULL)
                 lock->l_policy_data = *policy;
         unlock_res_and_lock(lock);
         if (policy != NULL)
                 lock->l_policy_data = *policy;
@@ -476,7 +475,7 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
                           int *flags, void *lvb, __u32 lvb_len,
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
                           int *flags, void *lvb, __u32 lvb_len,
-                          void *lvb_swabber, struct lustre_handle *lockh,int rc)
+                          struct lustre_handle *lockh,int rc)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         int is_replay = *flags & LDLM_FL_REPLAY;
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         int is_replay = *flags & LDLM_FL_REPLAY;
@@ -509,9 +508,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                                 req_capsule_set_size(&req->rq_pill,
                                                      &RMF_DLM_LVB, RCL_SERVER,
                                                      lvb_len);
                                 req_capsule_set_size(&req->rq_pill,
                                                      &RMF_DLM_LVB, RCL_SERVER,
                                                      lvb_len);
-                            tmplvb = req_capsule_server_swab_get(&req->rq_pill,
-                                                                 &RMF_DLM_LVB,
-                                                                 lvb_swabber);
+                            tmplvb = req_capsule_server_get(&req->rq_pill,
+                                                                 &RMF_DLM_LVB);
                                 if (tmplvb == NULL)
                                         GOTO(cleanup, rc = -EPROTO);
                                 if (lvb != NULL)
                                 if (tmplvb == NULL)
                                         GOTO(cleanup, rc = -EPROTO);
                                 if (lvb != NULL)
@@ -606,9 +604,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                                      lvb_len);
 
                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                                      lvb_len);
-                tmplvb = req_capsule_server_swab_get(&req->rq_pill,
-                                                     &RMF_DLM_LVB,
-                                                     lvb_swabber);
+                tmplvb = req_capsule_server_get(&req->rq_pill,
+                                                     &RMF_DLM_LVB);
                 if (tmplvb == NULL)
                         GOTO(cleanup, rc = -EPROTO);
                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
                 if (tmplvb == NULL)
                         GOTO(cleanup, rc = -EPROTO);
                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
@@ -758,8 +755,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                      struct ldlm_enqueue_info *einfo,
                      const struct ldlm_res_id *res_id,
                      ldlm_policy_data_t *policy, int *flags,
                      struct ldlm_enqueue_info *einfo,
                      const struct ldlm_res_id *res_id,
                      ldlm_policy_data_t *policy, int *flags,
-                     void *lvb, __u32 lvb_len, void *lvb_swabber,
-                     struct lustre_handle *lockh, int async)
+                     void *lvb, __u32 lvb_len, struct lustre_handle *lockh,
+                     int async)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         struct ldlm_lock      *lock;
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         struct ldlm_lock      *lock;
@@ -794,7 +791,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                 /* for the local lock, add the reference */
                 ldlm_lock_addref_internal(lock, einfo->ei_mode);
                 ldlm_lock2handle(lock, lockh);
                 /* for the local lock, add the reference */
                 ldlm_lock_addref_internal(lock, einfo->ei_mode);
                 ldlm_lock2handle(lock, lockh);
-                lock->l_lvb_swabber = lvb_swabber;
                 if (policy != NULL) {
                         /* INODEBITS_INTEROP: If the server does not support
                          * inodebits, we will request a plain lock in the
                 if (policy != NULL) {
                         /* INODEBITS_INTEROP: If the server does not support
                          * inodebits, we will request a plain lock in the
@@ -880,7 +876,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
         err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
                                     einfo->ei_mode, flags, lvb, lvb_len,
 
         err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
                                     einfo->ei_mode, flags, lvb, lvb_len,
-                                    lvb_swabber, lockh, rc);
+                                    lockh, rc);
 
         /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
          * one reference that we took */
 
         /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
          * one reference that we took */
index 0b2088d..bbb2dfb 100644 (file)
@@ -1410,7 +1410,7 @@ static int llu_file_flock(struct inode *ino,
 
                 if (lmv->tgts[0].ltd_exp != NULL)
                         rc = ldlm_cli_enqueue(lmv->tgts[0].ltd_exp, NULL, &einfo, &res_id,
 
                 if (lmv->tgts[0].ltd_exp != NULL)
                         rc = ldlm_cli_enqueue(lmv->tgts[0].ltd_exp, NULL, &einfo, &res_id,
-                                              &flock, &flags, NULL, 0, NULL, &lockh, 0);
+                                              &flock, &flags, NULL, 0, &lockh, 0);
                 else
                         rc = -ENODEV;
         }
                 else
                         rc = -ENODEV;
         }
index 9b1443f..4fb239f 100644 (file)
@@ -661,7 +661,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 mdc_enter_request(&obddev->u.cli);
         }
         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
                 mdc_enter_request(&obddev->u.cli);
         }
         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
-                              0, NULL, lockh, 0);
+                              0, lockh, 0);
         if (reqp)
                 *reqp = req;
 
         if (reqp)
                 *reqp = req;
 
@@ -943,7 +943,7 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
                 rc = -ETIMEDOUT;
 
         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
                 rc = -ETIMEDOUT;
 
         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
-                                   &flags, NULL, 0, NULL, lockh, rc);
+                                   &flags, NULL, 0, lockh, rc);
         if (rc < 0) {
                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
                 mdc_clear_replay_flag(req, rc);
         if (rc < 0) {
                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
                 mdc_clear_replay_flag(req, rc);
@@ -990,7 +990,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
 
         mdc_enter_request(&obddev->u.cli);
         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
 
         mdc_enter_request(&obddev->u.cli);
         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
-                              0, NULL, &minfo->mi_lockh, 1);
+                              0, &minfo->mi_lockh, 1);
         if (rc < 0) {
                 mdc_exit_request(&obddev->u.cli);
                 RETURN(rc);
         if (rc < 0) {
                 mdc_exit_request(&obddev->u.cli);
                 RETURN(rc);
index fa2205b..c32f9ad 100644 (file)
@@ -712,8 +712,8 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns,
 
         rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
                                     mode, &flags, mdt_blocking_ast,
 
         rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
                                     mode, &flags, mdt_blocking_ast,
-                                    ldlm_completion_ast,
-                                    NULL, NULL, 0, NULL, client_cookie, lh);
+                                    ldlm_completion_ast, NULL, NULL, 0,
+                                    client_cookie, lh);
         return rc == ELDLM_OK ? 0 : -EIO;
 }
 
         return rc == ELDLM_OK ? 0 : -EIO;
 }
 
index 38502ef..6548035 100644 (file)
@@ -837,7 +837,6 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
                                             LCK_EX, &flags, ldlm_blocking_ast,
                                             ldlm_completion_ast, NULL, NULL, 0,
                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
                                             LCK_EX, &flags, ldlm_blocking_ast,
                                             ldlm_completion_ast, NULL, NULL, 0,
-                                            NULL,
                                             &info->mti_exp->exp_handle.h_cookie,
                                             lh);
         } else {
                                             &info->mti_exp->exp_handle.h_cookie,
                                             lh);
         } else {
@@ -849,9 +848,8 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
                  * This is the case mdt0 is remote node, issue DLM lock like
                  * other clients.
                  */
                  * This is the case mdt0 is remote node, issue DLM lock like
                  * other clients.
                  */
-                rc = ldlm_cli_enqueue(ms->ms_control_exp,
-                                      NULL, &einfo, res_id,
-                                      policy, &flags, NULL, 0, NULL, lh, 0);
+                rc = ldlm_cli_enqueue(ms->ms_control_exp, NULL, &einfo, res_id,
+                                      policy, &flags, NULL, 0, lh, 0);
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
index 339487f..b7eb9b0 100644 (file)
@@ -786,29 +786,27 @@ static int mgc_set_mgs_param(struct obd_export *exp,
 {
         struct ptlrpc_request *req;
         struct mgs_send_param *req_msp, *rep_msp;
 {
         struct ptlrpc_request *req;
         struct mgs_send_param *req_msp, *rep_msp;
-        int size[] = { sizeof(struct ptlrpc_body), sizeof(*req_msp) };
-        __u32 rep_size[] = { sizeof(struct ptlrpc_body), sizeof(*msp) };
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MGS_VERSION,
-                              MGS_SET_INFO, 2, size, NULL);
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                        &RQF_MGS_SET_INFO, LUSTRE_MGS_VERSION,
+                                        MGS_SET_INFO);
         if (!req)
                 RETURN(-ENOMEM);
 
         if (!req)
                 RETURN(-ENOMEM);
 
-        req_msp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*req_msp));
+        req_msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
         if (!req_msp) {
                 ptlrpc_req_finished(req);
                 RETURN(-ENOMEM);
         }
 
         memcpy(req_msp, msp, sizeof(*req_msp));
         if (!req_msp) {
                 ptlrpc_req_finished(req);
                 RETURN(-ENOMEM);
         }
 
         memcpy(req_msp, msp, sizeof(*req_msp));
-        ptlrpc_req_set_repsize(req, 2, rep_size);
+        ptlrpc_request_set_replen(req);
 
         rc = ptlrpc_queue_wait(req);
         if (!rc) {
 
         rc = ptlrpc_queue_wait(req);
         if (!rc) {
-                rep_msp = lustre_swab_repbuf(req, REPLY_REC_OFF,
-                                             sizeof(*rep_msp), NULL);
+                rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
                 memcpy(msp, rep_msp, sizeof(*rep_msp));
         }
 
                 memcpy(msp, rep_msp, sizeof(*rep_msp));
         }
 
@@ -841,8 +839,8 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
         /* We need a callback for every lockholder, so don't try to
            ldlm_lock_match (see rev 1.1.2.11.2.47) */
 
         /* We need a callback for every lockholder, so don't try to
            ldlm_lock_match (see rev 1.1.2.11.2.47) */
 
-        rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid,
-                              NULL, flags, NULL, 0, NULL, lockh, 0);
+        rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid, NULL, flags,
+                              NULL, 0, lockh, 0);
         /* A failed enqueue should still call the mgc_blocking_ast,
            where it will be requeued if needed ("grant failed"). */
 
         /* A failed enqueue should still call the mgc_blocking_ast,
            where it will be requeued if needed ("grant failed"). */
 
index d45f690..3c5262e 100644 (file)
@@ -332,7 +332,7 @@ static int mgs_get_cfg_lock(struct obd_device *obd, char *fsname,
                                             LDLM_PLAIN, NULL, LCK_EX,
                                             &flags, ldlm_blocking_ast,
                                             ldlm_completion_ast, NULL,
                                             LDLM_PLAIN, NULL, LCK_EX,
                                             &flags, ldlm_blocking_ast,
                                             ldlm_completion_ast, NULL,
-                                            fsname, 0, NULL, NULL, lockh);
+                                            fsname, 0, NULL, lockh);
         if (rc)
                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
 
         if (rc)
                 CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
 
index a80cd2c..1a62d9d 100644 (file)
 #include <lustre_debug.h>
 #include <lustre_net.h>
 
 #include <lustre_debug.h>
 #include <lustre_net.h>
 
-int dump_ioo(struct obd_ioobj *ioo)
+void dump_lniobuf(struct niobuf_local *nb)
 {
 {
-        CERROR("obd_ioobj: ioo_id="LPD64", ioo_gr="LPD64", ioo_type=%d, "
-               "ioo_bufct=%d\n",
-               ioo->ioo_id, ioo->ioo_gr, ioo->ioo_type, ioo->ioo_bufcnt);
-        return -EINVAL;
-}
-
-int dump_lniobuf(struct niobuf_local *nb)
-{
-        CERROR("niobuf_local: offset="LPD64", len=%d, page=%p, rc=%d\n",
+        CDEBUG(D_RPCTRACE,
+               "niobuf_local: offset="LPD64", len=%d, page=%p, rc=%d\n",
                nb->offset, nb->len, nb->page, nb->rc);
                nb->offset, nb->len, nb->page, nb->rc);
-        CERROR("nb->page: index = %ld\n", nb->page ? cfs_page_index(nb->page) : -1);
-
-        return -EINVAL;
-}
-
-int dump_rniobuf(struct niobuf_remote *nb)
-{
-        CERROR("niobuf_remote: offset="LPU64", len=%d, flags=%x\n",
-               nb->offset, nb->len, nb->flags);
-
-        return -EINVAL;
-}
-
-int dump_obdo(struct obdo *oa)
-{
-        __u32 valid = oa->o_valid;
-
-        CERROR("obdo: o_valid = %08x\n", valid);
-        if (valid & OBD_MD_FLID)
-                CERROR("obdo: o_id = "LPD64"\n", oa->o_id);
-        if (valid & OBD_MD_FLATIME)
-                CERROR("obdo: o_atime = "LPD64"\n", oa->o_atime);
-        if (valid & OBD_MD_FLMTIME)
-                CERROR("obdo: o_mtime = "LPD64"\n", oa->o_mtime);
-        if (valid & OBD_MD_FLCTIME)
-                CERROR("obdo: o_ctime = "LPD64"\n", oa->o_ctime);
-        if (valid & OBD_MD_FLSIZE)
-                CERROR("obdo: o_size = "LPD64"\n", oa->o_size);
-        if (valid & OBD_MD_FLBLOCKS)   /* allocation of space */
-                CERROR("obdo: o_blocks = "LPD64"\n", oa->o_blocks);
-        if (valid & OBD_MD_FLBLKSZ)
-                CERROR("obdo: o_blksize = %d\n", oa->o_blksize);
-        if (valid & (OBD_MD_FLTYPE | OBD_MD_FLMODE))
-                CERROR("obdo: o_mode = %o\n",
-                       oa->o_mode & ((valid & OBD_MD_FLTYPE ?  S_IFMT : 0) |
-                                     (valid & OBD_MD_FLMODE ? ~S_IFMT : 0)));
-        if (valid & OBD_MD_FLUID)
-                CERROR("obdo: o_uid = %u\n", oa->o_uid);
-        if (valid & OBD_MD_FLGID)
-                CERROR("obdo: o_gid = %u\n", oa->o_gid);
-        if (valid & OBD_MD_FLFLAGS)
-                CERROR("obdo: o_flags = %x\n", oa->o_flags);
-        if (valid & OBD_MD_FLNLINK)
-                CERROR("obdo: o_nlink = %u\n", oa->o_nlink);
-        if (valid & OBD_MD_FLGENER)
-                CERROR("obdo: o_generation = %u\n", oa->o_generation);
-
-        return -EINVAL;
+        CDEBUG(D_RPCTRACE, "nb->page: index = %ld\n",
+               nb->page ? cfs_page_index(nb->page) : -1);
 }
 }
+EXPORT_SYMBOL(dump_lniobuf);
 
 void dump_lsm(int level, struct lov_stripe_md *lsm)
 {
 
 void dump_lsm(int level, struct lov_stripe_md *lsm)
 {
@@ -123,22 +71,6 @@ void dump_lsm(int level, struct lov_stripe_md *lsm)
                lsm->lsm_pool_name);
 }
 
                lsm->lsm_pool_name);
 }
 
-/* XXX assumes only a single page in request */
-/*
-int dump_req(struct ptlrpc_request *req)
-{
-        struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
-        struct obd_ioobj *ioo = lustre_msg_buf(req->rq_reqmsg, 1);
-        //struct niobuf *nb = lustre_msg_buf(req->rq_reqmsg, 2);
-
-        dump_obdo(&body->oa);
-        //dump_niobuf(nb);
-        dump_ioo(ioo);
-
-        return -EINVAL;
-}
-*/
-
 #define LPDS sizeof(__u64)
 int block_debug_setup(void *addr, int len, __u64 off, __u64 id)
 {
 #define LPDS sizeof(__u64)
 int block_debug_setup(void *addr, int len, __u64 off, __u64 id)
 {
@@ -192,11 +124,7 @@ int block_debug_check(char *who, void *addr, int end, __u64 off, __u64 id)
 }
 #undef LPDS
 
 }
 #undef LPDS
 
-EXPORT_SYMBOL(dump_lniobuf);
-EXPORT_SYMBOL(dump_rniobuf);
-EXPORT_SYMBOL(dump_ioo);
 //EXPORT_SYMBOL(dump_req);
 //EXPORT_SYMBOL(dump_req);
-EXPORT_SYMBOL(dump_obdo);
 EXPORT_SYMBOL(dump_lsm);
 EXPORT_SYMBOL(block_debug_setup);
 EXPORT_SYMBOL(block_debug_check);
 EXPORT_SYMBOL(dump_lsm);
 EXPORT_SYMBOL(block_debug_setup);
 EXPORT_SYMBOL(block_debug_check);
index 67f9a0b..58b308c 100644 (file)
@@ -566,8 +566,8 @@ static int echo_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 
         rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN,
                                     NULL, LCK_NL, &lock_flags, NULL,
 
         rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN,
                                     NULL, LCK_NL, &lock_flags, NULL,
-                                    ldlm_completion_ast, NULL, NULL,
-                                    0, NULL, NULL, &obd->u.echo.eo_nl_lock);
+                                    ldlm_completion_ast, NULL, NULL, 0, NULL,
+                                    &obd->u.echo.eo_nl_lock);
         LASSERT (rc == ELDLM_OK);
 
         lprocfs_echo_init_vars(&lvars);
         LASSERT (rc == ELDLM_OK);
 
         lprocfs_echo_init_vars(&lvars);
index c126570..7a64adc 100644 (file)
@@ -1551,7 +1551,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid,
         rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT,
                                     &policy, LCK_PW, &flags, ldlm_blocking_ast,
                                     ldlm_completion_ast, NULL, NULL, 0, NULL,
         rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT,
                                     &policy, LCK_PW, &flags, ldlm_blocking_ast,
                                     ldlm_completion_ast, NULL, NULL, 0, NULL,
-                                    NULL, lockh);
+                                    lockh);
         if (rc != ELDLM_OK)
                 lockh->cookie = 0;
         RETURN(rc);
         if (rc != ELDLM_OK)
                 lockh->cookie = 0;
         RETURN(rc);
@@ -1809,7 +1809,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                          *
                          * Of course, this will all disappear when we switch to
                          * taking liblustre locks on the OST. */
                          *
                          * Of course, this will all disappear when we switch to
                          * taking liblustre locks on the OST. */
-                        ldlm_res_lvbo_update(res, NULL, 0, 1);
+                        ldlm_res_lvbo_update(res, NULL, 1);
                 }
                 RETURN(ELDLM_LOCK_ABORTED);
         }
                 }
                 RETURN(ELDLM_LOCK_ABORTED);
         }
@@ -1836,7 +1836,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
          * sending ast is not handled. This can result in lost client writes.
          */
         if (rc != 0)
          * sending ast is not handled. This can result in lost client writes.
          */
         if (rc != 0)
-                ldlm_res_lvbo_update(res, NULL, 0, 1);
+                ldlm_res_lvbo_update(res, NULL, 1);
 
         lock_res(res);
         *reply_lvb = *res_lvb;
 
         lock_res(res);
         *reply_lvb = *res_lvb;
@@ -3445,7 +3445,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
 
         if (res != NULL) {
                 LDLM_RESOURCE_ADDREF(res);
 
         if (res != NULL) {
                 LDLM_RESOURCE_ADDREF(res);
-                rc = ldlm_res_lvbo_update(res, NULL, 0, 0);
+                rc = ldlm_res_lvbo_update(res, NULL, 0);
                 LDLM_RESOURCE_DELREF(res);
                 ldlm_resource_putref(res);
         }
                 LDLM_RESOURCE_DELREF(res);
                 ldlm_resource_putref(res);
         }
index d195627..368842e 100644 (file)
@@ -898,7 +898,7 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
 
                 if (resource != NULL) {
                         LDLM_RESOURCE_ADDREF(resource);
 
                 if (resource != NULL) {
                         LDLM_RESOURCE_ADDREF(resource);
-                        ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1);
+                        ns->ns_lvbo->lvbo_update(resource, NULL, 1);
                         LDLM_RESOURCE_DELREF(resource);
                         ldlm_resource_putref(resource);
                 }
                         LDLM_RESOURCE_DELREF(resource);
                         ldlm_resource_putref(resource);
                 }
index 1f627b2..c66055e 100644 (file)
@@ -121,8 +121,7 @@ out_dentry:
  *   If 'increase_only' is true, don't allow values to move backwards.
  */
 static int filter_lvbo_update(struct ldlm_resource *res,
  *   If 'increase_only' is true, don't allow values to move backwards.
  */
 static int filter_lvbo_update(struct ldlm_resource *res,
-                              struct ptlrpc_request *r,
-                              int buf_idx, int increase_only)
+                              struct ptlrpc_request *r, int increase_only)
 {
         int rc = 0;
         struct ost_lvb *lvb;
 {
         int rc = 0;
         struct ost_lvb *lvb;
@@ -144,8 +143,7 @@ static int filter_lvbo_update(struct ldlm_resource *res,
                 struct ost_lvb *new;
 
                 /* XXX update always from reply buffer */
                 struct ost_lvb *new;
 
                 /* XXX update always from reply buffer */
-                new = lustre_swab_repbuf(r, buf_idx, sizeof(*new),
-                                         lustre_swab_ost_lvb);
+                new = req_capsule_server_get(&r->rq_pill, &RMF_DLM_LVB);
 
                 if (new == NULL) {
                         CERROR("lustre_swab_buf failed\n");
 
                 if (new == NULL) {
                         CERROR("lustre_swab_buf failed\n");
index 2bd2311..17fcbd8 100644 (file)
@@ -82,8 +82,7 @@ static int osc_interpret_create(const struct lu_env *env,
         ENTRY;
 
         if (req->rq_repmsg) {
         ENTRY;
 
         if (req->rq_repmsg) {
-                body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
-                                          lustre_swab_ost_body);
+                body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
                 if (body == NULL && rc == 0)
                         rc = -EPROTO;
         }
                 if (body == NULL && rc == 0)
                         rc = -EPROTO;
         }
@@ -189,7 +188,6 @@ static int oscc_internal_create(struct osc_creator *oscc)
 {
         struct ptlrpc_request *request;
         struct ost_body *body;
 {
         struct ptlrpc_request *request;
         struct ost_body *body;
-        __u32 size[] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
         ENTRY;
 
         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
@@ -221,9 +219,9 @@ static int oscc_internal_create(struct osc_creator *oscc)
         oscc->oscc_flags |= OSCC_FLAG_CREATING;
         spin_unlock(&oscc->oscc_lock);
 
         oscc->oscc_flags |= OSCC_FLAG_CREATING;
         spin_unlock(&oscc->oscc_lock);
 
-        request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
-                                  LUSTRE_OST_VERSION, OST_CREATE, 2,
-                                  size, NULL);
+        request = ptlrpc_request_alloc_pack(oscc->oscc_obd->u.cli.cl_import,
+                                            &RQF_OST_CREATE,
+                                            LUSTRE_OST_VERSION, OST_CREATE);
         if (request == NULL) {
                 spin_lock(&oscc->oscc_lock);
                 oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
         if (request == NULL) {
                 spin_lock(&oscc->oscc_lock);
                 oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
@@ -233,7 +231,7 @@ static int oscc_internal_create(struct osc_creator *oscc)
 
         request->rq_request_portal = OST_CREATE_PORTAL;
         ptlrpc_at_set_req_timeout(request);
 
         request->rq_request_portal = OST_CREATE_PORTAL;
         ptlrpc_at_set_req_timeout(request);
-        body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        body = req_capsule_client_get(&request->rq_pill, &RMF_OST_BODY);
 
         spin_lock(&oscc->oscc_lock);
         body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
 
         spin_lock(&oscc->oscc_lock);
         body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
@@ -248,7 +246,7 @@ static int oscc_internal_create(struct osc_creator *oscc)
         /* we should not resend create request - anyway we will have delorphan
          * and kill these objects */
         request->rq_no_delay = request->rq_no_resend = 1;
         /* we should not resend create request - anyway we will have delorphan
          * and kill these objects */
         request->rq_no_delay = request->rq_no_resend = 1;
-        ptlrpc_req_set_repsize(request, 2, size);
+        ptlrpc_request_set_replen(request);
 
         request->rq_async_args.pointer_arg[0] = oscc;
         request->rq_interpret_reply = osc_interpret_create;
 
         request->rq_async_args.pointer_arg[0] = oscc;
         request->rq_interpret_reply = osc_interpret_create;
index ba12a1a..a2f2784 100644 (file)
@@ -210,8 +210,7 @@ static int osc_getattr_interpret(const struct lu_env *env,
         if (rc != 0)
                 GOTO(out, rc);
 
         if (rc != 0)
                 GOTO(out, rc);
 
-        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
+        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         if (body) {
                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
         if (body) {
                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
@@ -1138,19 +1137,18 @@ static int check_write_rcs(struct ptlrpc_request *req,
                            int requested_nob, int niocount,
                            obd_count page_count, struct brw_page **pga)
 {
                            int requested_nob, int niocount,
                            obd_count page_count, struct brw_page **pga)
 {
-        int    *remote_rcs, i;
+        int     i;
+        __u32   *remote_rcs;
 
 
-        /* return error if any niobuf was in error */
-        remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
-                                        sizeof(*remote_rcs) * niocount, NULL);
+        remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
+                                                  sizeof(*remote_rcs) *
+                                                  niocount);
         if (remote_rcs == NULL) {
                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
                 return(-EPROTO);
         }
         if (remote_rcs == NULL) {
                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
                 return(-EPROTO);
         }
-        if (ptlrpc_rep_need_swab(req))
-                for (i = 0; i < niocount; i++)
-                        __swab32s(&remote_rcs[i]);
 
 
+        /* return error if any niobuf was in error */
         for (i = 0; i < niocount; i++) {
                 if (remote_rcs[i] < 0)
                         return(remote_rcs[i]);
         for (i = 0; i < niocount; i++) {
                 if (remote_rcs[i] < 0)
                         return(remote_rcs[i]);
@@ -1264,6 +1262,8 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         }
 
         pill = &req->rq_pill;
         }
 
         pill = &req->rq_pill;
+        req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
+                             sizeof(*ioobj));
         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
                              niocount * sizeof(*niobuf));
         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
                              niocount * sizeof(*niobuf));
         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
@@ -1290,7 +1290,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         body = req_capsule_client_get(pill, &RMF_OST_BODY);
         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
         body = req_capsule_client_get(pill, &RMF_OST_BODY);
         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
-        LASSERT(body && ioobj && niobuf);
+        LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
 
         lustre_set_wire_obdo(&body->oa, oa);
 
 
         lustre_set_wire_obdo(&body->oa, oa);
 
@@ -1337,11 +1337,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         }
 
         LASSERTF((void *)(niobuf - niocount) ==
         }
 
         LASSERTF((void *)(niobuf - niocount) ==
-                lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
-                               niocount * sizeof(*niobuf)),
-                "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
-                REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
-                (void *)(niobuf - niocount));
+                req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
+                "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
+                &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
 
         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
         if (osc_should_shrink_grant(cli))
 
         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
         if (osc_should_shrink_grant(cli))
@@ -1377,7 +1375,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 }
                 oa->o_cksum = body->oa.o_cksum;
                 /* 1 RC per niobuf */
                 }
                 oa->o_cksum = body->oa.o_cksum;
                 /* 1 RC per niobuf */
-                req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
+                req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
                                      sizeof(__u32) * niocount);
         } else {
                 if (unlikely(cli->cl_checksum) &&
                                      sizeof(__u32) * niocount);
         } else {
                 if (unlikely(cli->cl_checksum) &&
@@ -1387,7 +1385,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
                 }
                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
                 }
-                req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
+                req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 0);
                 /* 1 RC for the whole I/O */
         }
         ptlrpc_request_set_replen(req);
                 /* 1 RC for the whole I/O */
         }
         ptlrpc_request_set_replen(req);
@@ -1479,8 +1477,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 RETURN(rc);
 
         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
                 RETURN(rc);
 
         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
-        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
+        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         if (body == NULL) {
                 CDEBUG(D_INFO, "Can't unpack body\n");
                 RETURN(-EPROTO);
         if (body == NULL) {
                 CDEBUG(D_INFO, "Can't unpack body\n");
                 RETURN(-EPROTO);
@@ -3157,8 +3154,7 @@ static int osc_enqueue_interpret(const struct lu_env *env,
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
                                    mode, aa->oa_flags, aa->oa_lvb,
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
                                    mode, aa->oa_flags, aa->oa_lvb,
-                                   sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
-                                   &handle, rc);
+                                   sizeof(*aa->oa_lvb), &handle, rc);
         /* Complete osc stuff. */
         rc = osc_enqueue_fini(req, aa->oa_lvb,
                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
         /* Complete osc stuff. */
         rc = osc_enqueue_fini(req, aa->oa_lvb,
                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
@@ -3325,7 +3321,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
         *flags &= ~LDLM_FL_BLOCK_GRANTED;
 
         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
         *flags &= ~LDLM_FL_BLOCK_GRANTED;
 
         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
-                              sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
+                              sizeof(*lvb), lockh, async);
         if (rqset) {
                 if (!rc) {
                         struct osc_enqueue_args *aa;
         if (rqset) {
                 if (!rc) {
                         struct osc_enqueue_args *aa;
index 2ee5f2b..b979851 100644 (file)
@@ -106,40 +106,51 @@ static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
                        struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
                        struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         struct lustre_capa *capa = NULL;
         int rc;
         ENTRY;
 
         struct lustre_capa *capa = NULL;
         int rc;
         ENTRY;
 
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
+        /* Get the request body */
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
         if (body->oa.o_id == 0)
                 RETURN(-EPROTO);
 
         if (body == NULL)
                 RETURN(-EFAULT);
 
         if (body->oa.o_id == 0)
                 RETURN(-EPROTO);
 
-        if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) {
+        /* If there's a DLM request, cancel the locks mentioned in it*/
+        if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
                 struct ldlm_request *dlm;
                 struct ldlm_request *dlm;
-                dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm),
-                                         lustre_swab_ldlm_request);
+
+                dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
                 if (dlm == NULL)
                         RETURN (-EFAULT);
                 ldlm_request_cancel(req, dlm, 0);
         }
 
                 if (dlm == NULL)
                         RETURN (-EFAULT);
                 ldlm_request_cancel(req, dlm, 0);
         }
 
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
-                capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 2);
+        /* If there's a capability, get it */
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+                if (capa == NULL) {
+                        CERROR("Missing capability for OST DESTROY");
+                        RETURN (-EFAULT);
+                }
+        }
 
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        /* Prepare the reply */
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
+        /* Get the log cancellation cookie */
         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
                 oti->oti_logcookies = &body->oa.o_lcookie;
         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
                 oti->oti_logcookies = &body->oa.o_lcookie;
-        repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                 sizeof(*repbody));
+
+        /* Finish the reply */
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+
+        /* Do the destroy and set the reply status accordingly  */
         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
         RETURN(0);
 }
         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
         RETURN(0);
 }
@@ -148,27 +159,30 @@ static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
 {
         struct ost_body *body, *repbody;
         struct obd_info oinfo = { { { 0 } } };
 {
         struct ost_body *body, *repbody;
         struct obd_info oinfo = { { { 0 } } };
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                 sizeof(*repbody));
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         repbody->oa = body->oa;
 
         oinfo.oi_oa = &repbody->oa;
         repbody->oa = body->oa;
 
         oinfo.oi_oa = &repbody->oa;
-        if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
-                oinfo.oi_capa = lustre_unpack_incoming_capa(req,
-                                                   REQ_REC_OFF + 1);
+        if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
+                oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
+                                                       &RMF_CAPA1);
+                if (oinfo.oi_capa == NULL) {
+                        CERROR("Missing capability for OST GETATTR");
+                        RETURN (-EFAULT);
+                }
+        }
+
         req->rq_status = obd_getattr(exp, &oinfo);
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
         req->rq_status = obd_getattr(exp, &oinfo);
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
@@ -177,15 +191,14 @@ static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
 static int ost_statfs(struct ptlrpc_request *req)
 {
         struct obd_statfs *osfs;
 static int ost_statfs(struct ptlrpc_request *req)
 {
         struct obd_statfs *osfs;
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs));
+        osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
 
         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
                                     cfs_time_current_64() - HZ, 0);
 
         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
                                     cfs_time_current_64() - HZ, 0);
@@ -201,21 +214,18 @@ static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
                       struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
                       struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                 sizeof(*repbody));
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         oti->oti_logcookies = &repbody->oa.o_lcookie;
 
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         oti->oti_logcookies = &repbody->oa.o_lcookie;
 
@@ -224,7 +234,7 @@ static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-/*
+/**
  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
  * lock on the file being truncated.
  */
  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
  * lock on the file being truncated.
  */
@@ -271,11 +281,10 @@ static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
                                       ldlm_blocking_ast, ldlm_completion_ast,
         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
                                       ldlm_blocking_ast, ldlm_completion_ast,
-                                      ldlm_glimpse_ast, NULL, 0, NULL,
-                                      NULL, lh));
+                                      ldlm_glimpse_ast, NULL, 0, NULL, lh));
 }
 
 }
 
-/*
+/**
  * Helper function for ost_punch(): release lock acquired by
  * ost_punch_lock_get(), if any.
  */
  * Helper function for ost_punch(): release lock acquired by
  * ost_punch_lock_get(), if any.
  */
@@ -293,7 +302,6 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
 {
         struct obd_info oinfo = { { { 0 } } };
         struct ost_body *body, *repbody;
 {
         struct obd_info oinfo = { { { 0 } } };
         struct ost_body *body, *repbody;
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
         int rc;
         struct lustre_handle lh = {0,};
         ENTRY;
         int rc;
         struct lustre_handle lh = {0,};
         ENTRY;
@@ -301,9 +309,9 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
 
         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
 
-        /* ost_body is varified and swabbed in ost_hpreq_handler() */
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        LASSERT(body != NULL);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+        if (body == NULL)
+                RETURN(-EFAULT);
 
         oinfo.oi_oa = &body->oa;
         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
 
         oinfo.oi_oa = &body->oa;
         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
@@ -313,12 +321,11 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
                 RETURN(-EPROTO);
 
             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
                 RETURN(-EPROTO);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                 sizeof(*repbody));
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
         if (rc == 0) {
                 if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
         rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
         if (rc == 0) {
                 if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
@@ -330,9 +337,14 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
                          */
                         oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
 
                          */
                         oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
 
-                if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
-                        oinfo.oi_capa = lustre_unpack_incoming_capa(req,
-                                                           REQ_REC_OFF + 1);
+                if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
+                        oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
+                                                               &RMF_CAPA1);
+                        if (oinfo.oi_capa == NULL) {
+                                CERROR("Missing capability for OST PUNCH");
+                                RETURN (-EFAULT);
+                        }
+                }
                 req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
         }
                 req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
         }
@@ -345,24 +357,26 @@ static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
 {
         struct ost_body *body, *repbody;
         struct lustre_capa *capa = NULL;
 {
         struct ost_body *body, *repbody;
         struct lustre_capa *capa = NULL;
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
-                capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 1);
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+                if (capa == NULL) {
+                        CERROR("Missing capability for OST SYNC");
+                        RETURN (-EFAULT);
+                }
+        }
 
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                 sizeof(*repbody));
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
                                   repbody->oa.o_blocks, capa);
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
                                   repbody->oa.o_blocks, capa);
@@ -374,28 +388,30 @@ static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
                        struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
                        struct obd_trans_info *oti)
 {
         struct ost_body *body, *repbody;
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
         int rc;
         struct obd_info oinfo = { { { 0 } } };
         ENTRY;
 
         int rc;
         struct obd_info oinfo = { { { 0 } } };
         ENTRY;
 
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         if (body == NULL)
                 RETURN(-EFAULT);
 
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                 sizeof(*repbody));
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         repbody->oa = body->oa;
 
         oinfo.oi_oa = &repbody->oa;
         repbody->oa = body->oa;
 
         oinfo.oi_oa = &repbody->oa;
-        if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
-                oinfo.oi_capa = lustre_unpack_incoming_capa(req,
-                                                   REQ_REC_OFF + 1);
+        if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
+                oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
+                                                       &RMF_CAPA1);
+                if (oinfo.oi_capa == NULL) {
+                        CERROR("Missing capability for OST SETATTR");
+                        RETURN (-EFAULT);
+                }
+        }
         req->rq_status = obd_setattr(exp, &oinfo, oti);
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
         req->rq_status = obd_setattr(exp, &oinfo, oti);
         ost_drop_id(exp, &repbody->oa);
         RETURN(0);
@@ -461,10 +477,10 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp,
         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
                 RETURN(0);
 
         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
                 RETURN(0);
 
-        /* EXPENSIVE ASSERTION */
         for (i = 1; i < nrbufs; i ++)
         for (i = 1; i < nrbufs; i ++)
-                LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
-                        (nb[i].flags & OBD_BRW_SRVLOCK));
+                if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
+                    (nb[i].flags & OBD_BRW_SRVLOCK))
+                        RETURN(-EFAULT);
 
         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
         policy.l_extent.end   = (nb[nrbufs - 1].offset +
 
         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
         policy.l_extent.end   = (nb[nrbufs - 1].offset +
@@ -473,8 +489,7 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp,
         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
                                       LDLM_EXTENT, &policy, mode, &flags,
                                       ldlm_blocking_ast, ldlm_completion_ast,
         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
                                       LDLM_EXTENT, &policy, mode, &flags,
                                       ldlm_blocking_ast, ldlm_completion_ast,
-                                      ldlm_glimpse_ast, NULL, 0, NULL,
-                                      NULL, lh));
+                                      ldlm_glimpse_ast, NULL, 0, NULL, lh));
 }
 
 static void ost_brw_lock_put(int mode,
 }
 
 static void ost_brw_lock_put(int mode,
@@ -619,7 +634,6 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         struct lustre_capa *capa = NULL;
         struct l_wait_info lwi;
         struct lustre_handle lockh = { 0 };
         struct lustre_capa *capa = NULL;
         struct l_wait_info lwi;
         struct lustre_handle lockh = { 0 };
-        __u32  size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         int niocount, npages, nob = 0, rc, i;
         int no_reply = 0;
         ENTRY;
         int niocount, npages, nob = 0, rc, i;
         int no_reply = 0;
         ENTRY;
@@ -644,21 +658,34 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
          * ost_rw_hpreq_check(). */
 
         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
          * ost_rw_hpreq_check(). */
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        LASSERT(body != NULL);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+        if (body == NULL)
+                GOTO(out, rc = -EFAULT);
 
 
-        ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
-        LASSERT(ioo != NULL);
+        /*
+         * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
+         * would be useful here and wherever we get &RMF_OBD_IOOBJ and
+         * &RMF_NIOBUF_REMOTE.
+         */
+        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+        if (ioo == NULL)
+                GOTO(out, rc = -EFAULT);
 
         niocount = ioo->ioo_bufcnt;
 
         niocount = ioo->ioo_bufcnt;
-        remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
-                                   niocount * sizeof(*remote_nb));
-        LASSERT(remote_nb != NULL);
+        remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+        if (remote_nb == NULL)
+                GOTO(out, rc = -EFAULT);
 
 
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
-                capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 3);
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+                if (capa == NULL) {
+                        CERROR("Missing capability for OST BRW READ");
+                        GOTO(out, rc = -EFAULT);
+                }
+        }
 
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER, 0);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 GOTO(out, rc);
 
         if (rc)
                 GOTO(out, rc);
 
@@ -824,8 +851,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                           remote_nb, npages, local_nb, oti, rc);
 
         if (rc == 0) {
                           remote_nb, npages, local_nb, oti, rc);
 
         if (rc == 0) {
-                repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                         sizeof(*repbody));
+                repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
                 ost_drop_id(exp, &repbody->oa);
         }
                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
                 ost_drop_id(exp, &repbody->oa);
         }
@@ -873,7 +899,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         struct lustre_handle     lockh = {0};
         struct lustre_capa      *capa = NULL;
         __u32                   *rcs;
         struct lustre_handle     lockh = {0};
         struct lustre_capa      *capa = NULL;
         __u32                   *rcs;
-        __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         int objcount, niocount, npages;
         int rc, i, j;
         obd_count                client_cksum = 0, server_cksum = 0;
         int objcount, niocount, npages;
         int rc, i, j;
         obd_count                client_cksum = 0, server_cksum = 0;
@@ -905,35 +930,47 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
          * ost_rw_hpreq_check(). */
 
         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
          * ost_rw_hpreq_check(). */
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        LASSERT(body != NULL);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+        if (body == NULL)
+                GOTO(out, rc = -EFAULT);
 
         if ((body->oa.o_flags & OBD_BRW_MEMALLOC) &&
             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
                 libcfs_memory_pressure_set();
 
 
         if ((body->oa.o_flags & OBD_BRW_MEMALLOC) &&
             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
                 libcfs_memory_pressure_set();
 
-        objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
-                sizeof(*ioo);
-        ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
-                             objcount * sizeof(*ioo));
-        LASSERT(ioo != NULL);
+        objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+                                        RCL_CLIENT) / sizeof(*ioo);
+        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+        if (ioo == NULL)
+                GOTO(out, rc = -EFAULT);
         for (niocount = i = 0; i < objcount; i++)
                 niocount += ioo[i].ioo_bufcnt;
 
         for (niocount = i = 0; i < objcount; i++)
                 niocount += ioo[i].ioo_bufcnt;
 
-        remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
-                                   niocount * sizeof(*remote_nb));
-        LASSERT(remote_nb != NULL);
+        /*
+         * It'd be nice to have a capsule function to indicate how many elements
+         * there were in a buffer for an RMF that's declared to be an array.
+         * It's easy enough to compute the number of elements here though.
+         */
+        remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+        if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
+            &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
+                GOTO(out, rc = -EFAULT);
 
 
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
-                capa = lustre_unpack_incoming_capa(req, REQ_REC_OFF + 3);
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+                if (capa == NULL) {
+                        CERROR("Missing capability for OST BRW WRITE");
+                        GOTO(out, rc = -EFAULT);
+                }
+        }
 
 
-        size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
-        rc = lustre_pack_reply(req, 3, size, NULL);
+        req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
+                             niocount * sizeof(*rcs));
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc != 0)
                 GOTO(out, rc);
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
         if (rc != 0)
                 GOTO(out, rc);
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
-        rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
-                             niocount * sizeof(*rcs));
+        rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
 
         /*
          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
 
         /*
          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
@@ -1060,8 +1097,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         }
         no_reply = rc != 0;
 
         }
         no_reply = rc != 0;
 
-        repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                 sizeof(*repbody));
+        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
 
         if (unlikely(client_cksum != 0 && rc == 0)) {
         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
 
         if (unlikely(client_cksum != 0 && rc == 0)) {
@@ -1189,47 +1225,64 @@ out:
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+/**
+ * Implementation of OST_SET_INFO.
+ *
+ * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
+ * "key" and a value RPC buffers as arguments, with the value's contents
+ * interpreted according to the key.
+ *
+ * Value types that need swabbing have swabbing done explicitly, either here or
+ * in functions called from here.  This should be corrected: all swabbing should
+ * be done in the capsule abstraction, as that will then allow us to move
+ * swabbing exclusively to the client without having to modify server code
+ * outside the capsule abstraction's implementation itself.  To correct this
+ * will require minor changes to the capsule abstraction; see the comments for
+ * req_capsule_extend() in layout.c.
+ */
 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
 {
         struct ost_body *body = NULL, *repbody;
 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
 {
         struct ost_body *body = NULL, *repbody;
-        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         char *key, *val = NULL;
         int keylen, vallen, rc = 0;
         char *key, *val = NULL;
         int keylen, vallen, rc = 0;
+        int is_grant_shrink = 0;
         ENTRY;
 
         ENTRY;
 
-        key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
+        key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
         if (key == NULL) {
                 DEBUG_REQ(D_HA, req, "no set_info key");
                 RETURN(-EFAULT);
         }
         if (key == NULL) {
                 DEBUG_REQ(D_HA, req, "no set_info key");
                 RETURN(-EFAULT);
         }
-        keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
+        keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
+                                      RCL_CLIENT);
 
 
-        if (KEY_IS(KEY_GRANT_SHRINK)) {
-                rc = lustre_pack_reply(req, 2, size, NULL);
-                if (rc)
-                        RETURN(rc);
-        } else {
-                rc = lustre_pack_reply(req, 1, NULL, NULL);
-                if (rc)
-                        RETURN(rc);
-        }
+        vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
+                                      RCL_CLIENT);
+
+        if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
+                /* In this case the value is actually an RMF_OST_BODY, so we
+                 * transmutate the type of this PTLRPC */
+                req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
+
+        rc = req_capsule_server_pack(&req->rq_pill);
+        if (rc)
+                RETURN(rc);
 
 
-        vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
         if (vallen) {
         if (vallen) {
-                if (KEY_IS(KEY_GRANT_SHRINK)) {
-                        body = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
-                                                  sizeof(*body),
-                                                  lustre_swab_ost_body);
+                if (is_grant_shrink) {
+                        body = req_capsule_client_get(&req->rq_pill,
+                                                      &RMF_OST_BODY);
                         if (!body)
                                 RETURN(-EFAULT);
 
                         if (!body)
                                 RETURN(-EFAULT);
 
-                        repbody = lustre_msg_buf(req->rq_repmsg,
-                                                 REPLY_REC_OFF,
-                                                 sizeof(*repbody));
+                        repbody = req_capsule_server_get(&req->rq_pill,
+                                                         &RMF_OST_BODY);
                         memcpy(repbody, body, sizeof(*body));
                         val = (char*)repbody;
                         memcpy(repbody, body, sizeof(*body));
                         val = (char*)repbody;
-                } else
-                        val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,0);
+                } else {
+                        val = req_capsule_client_get(&req->rq_pill,
+                                                     &RMF_SETINFO_VAL);
+                }
         }
 
         if (KEY_IS(KEY_EVICT_BY_NID)) {
         }
 
         if (KEY_IS(KEY_EVICT_BY_NID)) {
@@ -1237,10 +1290,13 @@ static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
                         obd_export_evict_by_nid(exp->exp_obd, val);
                 GOTO(out, rc = 0);
         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
                         obd_export_evict_by_nid(exp->exp_obd, val);
                 GOTO(out, rc = 0);
         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
-                /* Val's are not swabbed automatically */
+                if (vallen < sizeof(__u32))
+                        RETURN(-EFAULT);
                 __swab32s((__u32 *)val);
         }
 
                 __swab32s((__u32 *)val);
         }
 
+        /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
+         * a struct ost_body * value */
         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
 out:
         lustre_msg_set_status(req->rq_repmsg, 0);
         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
 out:
         lustre_msg_set_status(req->rq_repmsg, 0);
@@ -1254,8 +1310,6 @@ static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
         struct req_capsule *pill = &req->rq_pill;
         ENTRY;
 
         struct req_capsule *pill = &req->rq_pill;
         ENTRY;
 
-        req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
-
         /* this common part for get_info rpc */
         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
         if (key == NULL) {
         /* this common part for get_info rpc */
         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
         if (key == NULL) {
@@ -1359,7 +1413,7 @@ static int ost_llog_handle_connect(struct obd_export *exp,
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
         rc = obd_llog_connect(exp, body);
         RETURN(rc);
 }
         rc = obd_llog_connect(exp, body);
         RETURN(rc);
 }
@@ -1636,6 +1690,10 @@ int ost_msg_check_version(struct lustre_msg *msg)
         return rc;
 }
 
         return rc;
 }
 
+/**
+ * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
+ * not.
+ */
 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
                                    struct ldlm_lock *lock)
 {
 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
                                    struct ldlm_lock *lock)
 {
@@ -1652,19 +1710,24 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
 
         /* As the request may be covered by several locks, do not look at
          * o_handle, look at the RPC IO region. */
 
         /* As the request may be covered by several locks, do not look at
          * o_handle, look at the RPC IO region. */
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_obdo);
-        objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
-                sizeof(*ioo);
-        ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
-                             objcount * sizeof(*ioo));
-        LASSERT(ioo != NULL);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+        if (body == NULL)
+                RETURN(0);
+
+        objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+                                        RCL_CLIENT) / sizeof(*ioo);
+        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+        if (ioo == NULL)
+                RETURN(0);
+
         for (niocount = i = 0; i < objcount; i++)
                 niocount += ioo[i].ioo_bufcnt;
 
         for (niocount = i = 0; i < objcount; i++)
                 niocount += ioo[i].ioo_bufcnt;
 
-        nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
-                            niocount * sizeof(*nb));
-        LASSERT(nb != NULL);
+        nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+        if (nb == NULL ||
+            niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
+            RCL_CLIENT) / sizeof(*nb)))
+                RETURN(0);
 
         mode = LCK_PW;
         if (opc == OST_READ)
 
         mode = LCK_PW;
         if (opc == OST_READ)
@@ -1690,9 +1753,14 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
 }
 
 /**
 }
 
 /**
- * Swab buffers needed to call ost_rw_prolong_locks() and call it.
- * Return the value from ost_rw_prolong_locks() which is non-zero if
- * there is a cancelled lock which is waiting for this IO request.
+ * High-priority queue request check for whether the given PTLRPC request (\a
+ * req) is blocking an LDLM lock cancel.
+ *
+ * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
+ * cancel, 0 if it is not, and -EFAULT if the request is malformed.
+ *
+ * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
+ * function looks only at OST_READs and OST_WRITEs.
  */
 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
 {
  */
 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
 {
@@ -1706,21 +1774,25 @@ static int ost_rw_hpreq_check(struct ptlrpc_request *req)
         opc = lustre_msg_get_opc(req->rq_reqmsg);
         LASSERT(opc == OST_READ || opc == OST_WRITE);
 
         opc = lustre_msg_get_opc(req->rq_reqmsg);
         LASSERT(opc == OST_READ || opc == OST_WRITE);
 
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        LASSERT(body != NULL);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+        if (body == NULL)
+                RETURN(-EFAULT);
 
 
-        objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
-                sizeof(*ioo);
-        ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
-                             objcount * sizeof(*ioo));
-        LASSERT(ioo != NULL);
+        objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+                                        RCL_CLIENT) / sizeof(*ioo);
+        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+        if (ioo == NULL)
+                RETURN(-EFAULT);
 
         for (niocount = i = 0; i < objcount; i++)
                 niocount += ioo[i].ioo_bufcnt;
 
         for (niocount = i = 0; i < objcount; i++)
                 niocount += ioo[i].ioo_bufcnt;
-        nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
-                            niocount * sizeof(*nb));
-        LASSERT(nb != NULL);
-        LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
+        nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+        if (nb == NULL ||
+            niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
+            RCL_CLIENT) / sizeof(*nb)))
+                RETURN(-EFAULT);
+        if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK))
+                RETURN(-EFAULT);
 
         mode = LCK_PW;
         if (opc == OST_READ)
 
         mode = LCK_PW;
         if (opc == OST_READ)
@@ -1762,15 +1834,18 @@ static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
         RETURN(opd.opd_lock_match);
 }
 
         RETURN(opd.opd_lock_match);
 }
 
+/**
+ * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
+ */
 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
                                       struct ldlm_lock *lock)
 {
         struct ost_body *body;
         ENTRY;
 
 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
                                       struct ldlm_lock *lock)
 {
         struct ost_body *body;
         ENTRY;
 
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_obdo);
-        LASSERT(body != NULL);
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+        if (body == NULL)
+                RETURN(0);  /* can't return -EFAULT here */
 
         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
 
         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
@@ -1778,11 +1853,17 @@ static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
         RETURN(0);
 }
 
         RETURN(0);
 }
 
+/**
+ * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
+ */
 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
 {
 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
 {
-        struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
-                                               REQ_REC_OFF, sizeof(*body));
-        LASSERT(body != NULL);
+        struct ost_body *body;
+
+        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+        if (body == NULL)
+                RETURN(-EFAULT);
+
         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
 
         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
 
@@ -1811,18 +1892,27 @@ static int ost_hpreq_handler(struct ptlrpc_request *req)
                         struct niobuf_remote *nb;
                         struct obd_ioobj *ioo;
                         int objcount, niocount;
                         struct niobuf_remote *nb;
                         struct obd_ioobj *ioo;
                         int objcount, niocount;
-                        int swab, i;
+                        int i;
+
+                        /* RPCs on the H-P queue can be inspected before
+                         * ost_handler() initializes their pills, so we
+                         * initialize that here.  Capsule initialization is
+                         * idempotent, as is setting the pill's format (provided
+                         * it doesn't change).
+                         */
+                        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+                        req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
 
 
-                        body = lustre_swab_reqbuf(req, REQ_REC_OFF,
-                                                  sizeof(*body),
-                                                  lustre_swab_obdo);
-                        if (!body) {
+                        body = req_capsule_client_get(&req->rq_pill,
+                                                      &RMF_OST_BODY);
+                        if (body == NULL) {
                                 CERROR("Missing/short ost_body\n");
                                 RETURN(-EFAULT);
                         }
                                 CERROR("Missing/short ost_body\n");
                                 RETURN(-EFAULT);
                         }
-                        objcount = lustre_msg_buflen(req->rq_reqmsg,
-                                                     REQ_REC_OFF + 1) /
-                                sizeof(*ioo);
+                        objcount = req_capsule_get_size(&req->rq_pill,
+                                                        &RMF_OBD_IOOBJ,
+                                                        RCL_CLIENT) /
+                                                        sizeof(*ioo);
                         if (objcount == 0) {
                                 CERROR("Missing/short ioobj\n");
                                 RETURN(-EFAULT);
                         if (objcount == 0) {
                                 CERROR("Missing/short ioobj\n");
                                 RETURN(-EFAULT);
@@ -1832,18 +1922,14 @@ static int ost_hpreq_handler(struct ptlrpc_request *req)
                                 RETURN(-EFAULT);
                         }
 
                                 RETURN(-EFAULT);
                         }
 
-                        ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
-                                                 objcount * sizeof(*ioo),
-                                                 lustre_swab_obd_ioobj);
-                        if (!ioo) {
+                        ioo = req_capsule_client_get(&req->rq_pill,
+                                                     &RMF_OBD_IOOBJ);
+                        if (ioo == NULL) {
                                 CERROR("Missing/short ioobj\n");
                                 RETURN(-EFAULT);
                         }
 
                                 CERROR("Missing/short ioobj\n");
                                 RETURN(-EFAULT);
                         }
 
-                        swab = ptlrpc_req_need_swab(req);
                         for (niocount = i = 0; i < objcount; i++) {
                         for (niocount = i = 0; i < objcount; i++) {
-                                if (i > 0 && swab)
-                                        lustre_swab_obd_ioobj(&ioo[i]);
                                 if (ioo[i].ioo_bufcnt == 0) {
                                         CERROR("ioo[%d] has zero bufcnt\n", i);
                                         RETURN(-EFAULT);
                                 if (ioo[i].ioo_bufcnt == 0) {
                                         CERROR("ioo[%d] has zero bufcnt\n", i);
                                         RETURN(-EFAULT);
@@ -1851,33 +1937,28 @@ static int ost_hpreq_handler(struct ptlrpc_request *req)
                                 niocount += ioo[i].ioo_bufcnt;
                         }
                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
                                 niocount += ioo[i].ioo_bufcnt;
                         }
                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
-                                DEBUG_REQ(D_ERROR, req, "bulk has too many "
-                                          "pages (%d)", niocount);
+                                DEBUG_REQ(D_RPCTRACE, req,
+                                          "bulk has too many pages (%d)",
+                                          niocount);
                                 RETURN(-EFAULT);
                         }
 
                                 RETURN(-EFAULT);
                         }
 
-                        nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
-                                                niocount * sizeof(*nb),
-                                                lustre_swab_niobuf_remote);
-                        if (!nb) {
+                        nb = req_capsule_client_get(&req->rq_pill,
+                                                    &RMF_NIOBUF_REMOTE);
+                        if (nb == NULL) {
                                 CERROR("Missing/short niobuf\n");
                                 RETURN(-EFAULT);
                         }
 
                                 CERROR("Missing/short niobuf\n");
                                 RETURN(-EFAULT);
                         }
 
-                        swab = ptlrpc_req_need_swab(req);
-                        if (swab) {
-                                /* swab remaining niobufs */
-                                for (i = 1; i < niocount; i++)
-                                        lustre_swab_niobuf_remote(&nb[i]);
-                        }
-
                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
                                 req->rq_ops = &ost_hpreq_rw;
                 } else if (opc == OST_PUNCH) {
                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
                                 req->rq_ops = &ost_hpreq_rw;
                 } else if (opc == OST_PUNCH) {
-                        body = lustre_swab_reqbuf(req, REQ_REC_OFF,
-                                                  sizeof(*body),
-                                                  lustre_swab_obdo);
-                        if (!body) {
+                        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+                        req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
+
+                        body = req_capsule_client_get(&req->rq_pill,
+                                                      &RMF_OST_BODY);
+                        if (body == NULL) {
                                 CERROR("Missing/short ost_body\n");
                                 RETURN(-EFAULT);
                         }
                                 CERROR("Missing/short ost_body\n");
                                 RETURN(-EFAULT);
                         }
@@ -1973,6 +2054,7 @@ int ost_handle(struct ptlrpc_request *req)
                 break;
         case OST_CREATE:
                 CDEBUG(D_INODE, "create\n");
                 break;
         case OST_CREATE:
                 CDEBUG(D_INODE, "create\n");
+                req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
                         RETURN(0);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
                         RETURN(0);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
@@ -1983,6 +2065,7 @@ int ost_handle(struct ptlrpc_request *req)
                 break;
         case OST_DESTROY:
                 CDEBUG(D_INODE, "destroy\n");
                 break;
         case OST_DESTROY:
                 CDEBUG(D_INODE, "destroy\n");
+                req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
                         RETURN(0);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
                         RETURN(0);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
@@ -1991,17 +2074,20 @@ int ost_handle(struct ptlrpc_request *req)
                 break;
         case OST_GETATTR:
                 CDEBUG(D_INODE, "getattr\n");
                 break;
         case OST_GETATTR:
                 CDEBUG(D_INODE, "getattr\n");
+                req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
                         RETURN(0);
                 rc = ost_getattr(req->rq_export, req);
                 break;
         case OST_SETATTR:
                 CDEBUG(D_INODE, "setattr\n");
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
                         RETURN(0);
                 rc = ost_getattr(req->rq_export, req);
                 break;
         case OST_SETATTR:
                 CDEBUG(D_INODE, "setattr\n");
+                req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
                         RETURN(0);
                 rc = ost_setattr(req->rq_export, req, oti);
                 break;
         case OST_WRITE:
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
                         RETURN(0);
                 rc = ost_setattr(req->rq_export, req, oti);
                 break;
         case OST_WRITE:
+                req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
                 CDEBUG(D_INODE, "write\n");
                 /* req->rq_request_portal would be nice, if it was set */
                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
                 CDEBUG(D_INODE, "write\n");
                 /* req->rq_request_portal would be nice, if it was set */
                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
@@ -2022,6 +2108,7 @@ int ost_handle(struct ptlrpc_request *req)
                 /* ost_brw_write sends its own replies */
                 RETURN(rc);
         case OST_READ:
                 /* ost_brw_write sends its own replies */
                 RETURN(rc);
         case OST_READ:
+                req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
                 CDEBUG(D_INODE, "read\n");
                 /* req->rq_request_portal would be nice, if it was set */
                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
                 CDEBUG(D_INODE, "read\n");
                 /* req->rq_request_portal would be nice, if it was set */
                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
@@ -2039,6 +2126,7 @@ int ost_handle(struct ptlrpc_request *req)
                 RETURN(rc);
         case OST_PUNCH:
                 CDEBUG(D_INODE, "punch\n");
                 RETURN(rc);
         case OST_PUNCH:
                 CDEBUG(D_INODE, "punch\n");
+                req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
                         RETURN(0);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
                         RETURN(0);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
@@ -2047,12 +2135,14 @@ int ost_handle(struct ptlrpc_request *req)
                 break;
         case OST_STATFS:
                 CDEBUG(D_INODE, "statfs\n");
                 break;
         case OST_STATFS:
                 CDEBUG(D_INODE, "statfs\n");
+                req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
                         RETURN(0);
                 rc = ost_statfs(req);
                 break;
         case OST_SYNC:
                 CDEBUG(D_INODE, "sync\n");
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
                         RETURN(0);
                 rc = ost_statfs(req);
                 break;
         case OST_SYNC:
                 CDEBUG(D_INODE, "sync\n");
+                req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
                         RETURN(0);
                 rc = ost_sync(req->rq_export, req);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
                         RETURN(0);
                 rc = ost_sync(req->rq_export, req);
@@ -2064,6 +2154,7 @@ int ost_handle(struct ptlrpc_request *req)
                 break;
         case OST_GET_INFO:
                 DEBUG_REQ(D_INODE, req, "get_info");
                 break;
         case OST_GET_INFO:
                 DEBUG_REQ(D_INODE, req, "get_info");
+                req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
                 rc = ost_get_info(req->rq_export, req);
                 break;
 #ifdef HAVE_QUOTA_SUPPORT
                 rc = ost_get_info(req->rq_export, req);
                 break;
 #ifdef HAVE_QUOTA_SUPPORT
index cdc9ce6..4cddc07 100644 (file)
  *
  * Author: Nikita Danilov <nikita@clusterfs.com>
  */
  *
  * Author: Nikita Danilov <nikita@clusterfs.com>
  */
+/*
+ * This file contains the "capsule/pill" abstraction layered above PTLRPC.
+ *
+ * Every struct ptlrpc_request contains a "pill", which points to a description
+ * of the format that the request conforms to.
+ */
 
 #if !defined(__REQ_LAYOUT_USER__)
 
 
 #if !defined(__REQ_LAYOUT_USER__)
 
 /* struct ptlrpc_request, lustre_msg* */
 #include <lustre_req_layout.h>
 #include <lustre_acl.h>
 /* struct ptlrpc_request, lustre_msg* */
 #include <lustre_req_layout.h>
 #include <lustre_acl.h>
+#include <lustre_debug.h>
 
 /*
 
 /*
- * empty set of fields... for suitable definition of emptiness.
+ * RQFs (see below) refer to two struct req_msg_field arrays describing the
+ * client request and server reply, respectively.
  */
  */
+/* empty set of fields... for suitable definition of emptiness. */
 static const struct req_msg_field *empty[] = {
         &RMF_PTLRPC_BODY
 };
 static const struct req_msg_field *empty[] = {
         &RMF_PTLRPC_BODY
 };
@@ -286,6 +295,12 @@ static const struct req_msg_field *obd_set_info_client[] = {
         &RMF_SETINFO_VAL
 };
 
         &RMF_SETINFO_VAL
 };
 
+static const struct req_msg_field *ost_grant_shrink_client[] = {
+        &RMF_PTLRPC_BODY,
+        &RMF_SETINFO_KEY,
+        &RMF_OST_BODY
+};
+
 static const struct req_msg_field *mds_getinfo_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_GETINFO_KEY,
 static const struct req_msg_field *mds_getinfo_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_GETINFO_KEY,
@@ -497,7 +512,7 @@ static const struct req_msg_field *ost_brw_client[] = {
 static const struct req_msg_field *ost_brw_server[] = {
         &RMF_PTLRPC_BODY,
         &RMF_OST_BODY,
 static const struct req_msg_field *ost_brw_server[] = {
         &RMF_PTLRPC_BODY,
         &RMF_OST_BODY,
-        &RMF_NIOBUF_REMOTE
+        &RMF_RCS
 };
 
 static const struct req_msg_field *ost_get_info_generic_server[] = {
 };
 
 static const struct req_msg_field *ost_get_info_generic_server[] = {
@@ -609,16 +624,31 @@ struct req_msg_field {
         __u32       rmf_flags;
         const char *rmf_name;
         /**
         __u32       rmf_flags;
         const char *rmf_name;
         /**
-         * Field length. (-1) means "variable length".
+         * Field length. (-1) means "variable length".  If the
+         * \a RMF_F_STRUCT_ARRAY flag is set the field is also variable-length,
+         * but the actual size must be a whole multiple of \a rmf_size.
          */
         int         rmf_size;
         void      (*rmf_swabber)(void *);
          */
         int         rmf_size;
         void      (*rmf_swabber)(void *);
+        void      (*rmf_dumper)(void *);
         int         rmf_offset[ARRAY_SIZE(req_formats)][RCL_NR];
 };
 
 enum rmf_flags {
         int         rmf_offset[ARRAY_SIZE(req_formats)][RCL_NR];
 };
 
 enum rmf_flags {
+        /**
+         * The field is a string, must be NUL-terminated.
+         */
         RMF_F_STRING = 1 << 0,
         RMF_F_STRING = 1 << 0,
-        RMF_F_NO_SIZE_CHECK = 1 << 1
+        /**
+         * The field's buffer size need not match the declared \a rmf_size.
+         */
+        RMF_F_NO_SIZE_CHECK = 1 << 1,
+        /**
+         * The field's buffer size must be a whole multiple of the declared \a
+         * rmf_size and the \a rmf_swabber function must work on the declared \a
+         * rmf_size worth of bytes.
+         */
+        RMF_F_STRUCT_ARRAY = 1 << 2
 };
 
 struct req_capsule;
 };
 
 struct req_capsule;
@@ -626,208 +656,219 @@ struct req_capsule;
 /*
  * Request fields.
  */
 /*
  * Request fields.
  */
-#define DEFINE_MSGF(name, flags, size, swabber) {       \
-        .rmf_name    = (name),                          \
-        .rmf_flags   = (flags),                         \
-        .rmf_size    = (size),                          \
-        .rmf_swabber = (void (*)(void*))(swabber)       \
+#define DEFINE_MSGF(name, flags, size, swabber, dumper) {       \
+        .rmf_name    = (name),                                  \
+        .rmf_flags   = (flags),                                 \
+        .rmf_size    = (size),                                  \
+        .rmf_swabber = (void (*)(void*))(swabber),              \
+        .rmf_dumper  = (void (*)(void*))(dumper)                \
 }
 
 const struct req_msg_field RMF_GENERIC_DATA =
         DEFINE_MSGF("generic_data", 0,
 }
 
 const struct req_msg_field RMF_GENERIC_DATA =
         DEFINE_MSGF("generic_data", 0,
-                    -1, NULL);
+                    -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_GENERIC_DATA);
 
 const struct req_msg_field RMF_MGS_TARGET_INFO =
         DEFINE_MSGF("mgs_target_info", 0,
                     sizeof(struct mgs_target_info),
 EXPORT_SYMBOL(RMF_GENERIC_DATA);
 
 const struct req_msg_field RMF_MGS_TARGET_INFO =
         DEFINE_MSGF("mgs_target_info", 0,
                     sizeof(struct mgs_target_info),
-                    lustre_swab_mgs_target_info);
+                    lustre_swab_mgs_target_info, NULL);
 EXPORT_SYMBOL(RMF_MGS_TARGET_INFO);
 
 const struct req_msg_field RMF_MGS_SEND_PARAM =
         DEFINE_MSGF("mgs_send_param", 0,
                     sizeof(struct mgs_send_param),
 EXPORT_SYMBOL(RMF_MGS_TARGET_INFO);
 
 const struct req_msg_field RMF_MGS_SEND_PARAM =
         DEFINE_MSGF("mgs_send_param", 0,
                     sizeof(struct mgs_send_param),
-                    NULL);
+                    NULL, NULL);
 EXPORT_SYMBOL(RMF_MGS_SEND_PARAM);
 
 const struct req_msg_field RMF_SETINFO_VAL =
 EXPORT_SYMBOL(RMF_MGS_SEND_PARAM);
 
 const struct req_msg_field RMF_SETINFO_VAL =
-        DEFINE_MSGF("setinfo_val", 0, -1, NULL);
+        DEFINE_MSGF("setinfo_val", 0, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_SETINFO_VAL);
 
 const struct req_msg_field RMF_GETINFO_KEY =
 EXPORT_SYMBOL(RMF_SETINFO_VAL);
 
 const struct req_msg_field RMF_GETINFO_KEY =
-        DEFINE_MSGF("getinfo_key", 0, -1, NULL);
+        DEFINE_MSGF("getinfo_key", 0, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_GETINFO_KEY);
 
 const struct req_msg_field RMF_GETINFO_VALLEN =
         DEFINE_MSGF("getinfo_vallen", 0,
 EXPORT_SYMBOL(RMF_GETINFO_KEY);
 
 const struct req_msg_field RMF_GETINFO_VALLEN =
         DEFINE_MSGF("getinfo_vallen", 0,
-                    sizeof(__u32), lustre_swab_generic_32s);
+                    sizeof(__u32), lustre_swab_generic_32s, NULL);
 EXPORT_SYMBOL(RMF_GETINFO_VALLEN);
 
 const struct req_msg_field RMF_GETINFO_VAL =
 EXPORT_SYMBOL(RMF_GETINFO_VALLEN);
 
 const struct req_msg_field RMF_GETINFO_VAL =
-        DEFINE_MSGF("getinfo_val", 0, -1, NULL);
+        DEFINE_MSGF("getinfo_val", 0, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_GETINFO_VAL);
 
 const struct req_msg_field RMF_SEQ_OPC =
         DEFINE_MSGF("seq_query_opc", 0,
 EXPORT_SYMBOL(RMF_GETINFO_VAL);
 
 const struct req_msg_field RMF_SEQ_OPC =
         DEFINE_MSGF("seq_query_opc", 0,
-                    sizeof(__u32), lustre_swab_generic_32s);
+                    sizeof(__u32), lustre_swab_generic_32s, NULL);
 EXPORT_SYMBOL(RMF_SEQ_OPC);
 
 const struct req_msg_field RMF_SEQ_RANGE =
         DEFINE_MSGF("seq_query_range", 0,
 EXPORT_SYMBOL(RMF_SEQ_OPC);
 
 const struct req_msg_field RMF_SEQ_RANGE =
         DEFINE_MSGF("seq_query_range", 0,
-                    sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
+                    sizeof(struct lu_seq_range),
+                    lustre_swab_lu_seq_range, NULL);
 EXPORT_SYMBOL(RMF_SEQ_RANGE);
 
 const struct req_msg_field RMF_FLD_OPC =
         DEFINE_MSGF("fld_query_opc", 0,
 EXPORT_SYMBOL(RMF_SEQ_RANGE);
 
 const struct req_msg_field RMF_FLD_OPC =
         DEFINE_MSGF("fld_query_opc", 0,
-                    sizeof(__u32), lustre_swab_generic_32s);
+                    sizeof(__u32), lustre_swab_generic_32s, NULL);
 EXPORT_SYMBOL(RMF_FLD_OPC);
 
 const struct req_msg_field RMF_FLD_MDFLD =
         DEFINE_MSGF("fld_query_mdfld", 0,
 EXPORT_SYMBOL(RMF_FLD_OPC);
 
 const struct req_msg_field RMF_FLD_MDFLD =
         DEFINE_MSGF("fld_query_mdfld", 0,
-                    sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
+                    sizeof(struct lu_seq_range),
+                    lustre_swab_lu_seq_range, NULL);
 EXPORT_SYMBOL(RMF_FLD_MDFLD);
 
 const struct req_msg_field RMF_MDT_BODY =
         DEFINE_MSGF("mdt_body", 0,
 EXPORT_SYMBOL(RMF_FLD_MDFLD);
 
 const struct req_msg_field RMF_MDT_BODY =
         DEFINE_MSGF("mdt_body", 0,
-                    sizeof(struct mdt_body), lustre_swab_mdt_body);
+                    sizeof(struct mdt_body), lustre_swab_mdt_body, NULL);
 EXPORT_SYMBOL(RMF_MDT_BODY);
 
 const struct req_msg_field RMF_OBD_QUOTACTL =
         DEFINE_MSGF("obd_quotactl", 0,
 EXPORT_SYMBOL(RMF_MDT_BODY);
 
 const struct req_msg_field RMF_OBD_QUOTACTL =
         DEFINE_MSGF("obd_quotactl", 0,
-                    sizeof(struct obd_quotactl), lustre_swab_obd_quotactl);
+                    sizeof(struct obd_quotactl),
+                    lustre_swab_obd_quotactl, NULL);
 EXPORT_SYMBOL(RMF_OBD_QUOTACTL);
 
 const struct req_msg_field RMF_QUOTA_ADJUST_QUNIT =
         DEFINE_MSGF("quota_adjust_qunit", 0,
                     sizeof(struct quota_adjust_qunit),
 EXPORT_SYMBOL(RMF_OBD_QUOTACTL);
 
 const struct req_msg_field RMF_QUOTA_ADJUST_QUNIT =
         DEFINE_MSGF("quota_adjust_qunit", 0,
                     sizeof(struct quota_adjust_qunit),
-                    lustre_swab_quota_adjust_qunit);
+                    lustre_swab_quota_adjust_qunit, NULL);
 EXPORT_SYMBOL(RMF_QUOTA_ADJUST_QUNIT);
 
 const struct req_msg_field RMF_QUNIT_DATA =
         DEFINE_MSGF("qunit_data", 0,
 EXPORT_SYMBOL(RMF_QUOTA_ADJUST_QUNIT);
 
 const struct req_msg_field RMF_QUNIT_DATA =
         DEFINE_MSGF("qunit_data", 0,
-                    sizeof(struct qunit_data), NULL);
+                    sizeof(struct qunit_data), lustre_swab_qdata, NULL);
 EXPORT_SYMBOL(RMF_QUNIT_DATA);
 
 const struct req_msg_field RMF_MDT_EPOCH =
         DEFINE_MSGF("mdt_epoch", 0,
 EXPORT_SYMBOL(RMF_QUNIT_DATA);
 
 const struct req_msg_field RMF_MDT_EPOCH =
         DEFINE_MSGF("mdt_epoch", 0,
-                    sizeof(struct mdt_epoch), lustre_swab_mdt_epoch);
+                    sizeof(struct mdt_epoch), lustre_swab_mdt_epoch, NULL);
 EXPORT_SYMBOL(RMF_MDT_EPOCH);
 
 const struct req_msg_field RMF_PTLRPC_BODY =
         DEFINE_MSGF("ptlrpc_body", 0,
 EXPORT_SYMBOL(RMF_MDT_EPOCH);
 
 const struct req_msg_field RMF_PTLRPC_BODY =
         DEFINE_MSGF("ptlrpc_body", 0,
-                    sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body);
+                    sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body, NULL);
 EXPORT_SYMBOL(RMF_PTLRPC_BODY);
 
 const struct req_msg_field RMF_OBD_STATFS =
         DEFINE_MSGF("obd_statfs", 0,
 EXPORT_SYMBOL(RMF_PTLRPC_BODY);
 
 const struct req_msg_field RMF_OBD_STATFS =
         DEFINE_MSGF("obd_statfs", 0,
-                    sizeof(struct obd_statfs), lustre_swab_obd_statfs);
+                    sizeof(struct obd_statfs), lustre_swab_obd_statfs, NULL);
 EXPORT_SYMBOL(RMF_OBD_STATFS);
 
 const struct req_msg_field RMF_SETINFO_KEY =
 EXPORT_SYMBOL(RMF_OBD_STATFS);
 
 const struct req_msg_field RMF_SETINFO_KEY =
-        DEFINE_MSGF("setinfo_key", 0, -1, NULL);
+        DEFINE_MSGF("setinfo_key", 0, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_SETINFO_KEY);
 
 const struct req_msg_field RMF_NAME =
 EXPORT_SYMBOL(RMF_SETINFO_KEY);
 
 const struct req_msg_field RMF_NAME =
-        DEFINE_MSGF("name", RMF_F_STRING, -1, NULL);
+        DEFINE_MSGF("name", RMF_F_STRING, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_NAME);
 
 const struct req_msg_field RMF_SYMTGT =
 EXPORT_SYMBOL(RMF_NAME);
 
 const struct req_msg_field RMF_SYMTGT =
-        DEFINE_MSGF("symtgt", RMF_F_STRING, -1, NULL);
+        DEFINE_MSGF("symtgt", RMF_F_STRING, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_SYMTGT);
 
 const struct req_msg_field RMF_TGTUUID =
 EXPORT_SYMBOL(RMF_SYMTGT);
 
 const struct req_msg_field RMF_TGTUUID =
-        DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+        DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL,
+        NULL);
 EXPORT_SYMBOL(RMF_TGTUUID);
 
 const struct req_msg_field RMF_CLUUID =
 EXPORT_SYMBOL(RMF_TGTUUID);
 
 const struct req_msg_field RMF_CLUUID =
-        DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+        DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL,
+        NULL);
 EXPORT_SYMBOL(RMF_CLUUID);
 
 const struct req_msg_field RMF_STRING =
 EXPORT_SYMBOL(RMF_CLUUID);
 
 const struct req_msg_field RMF_STRING =
-        DEFINE_MSGF("string", RMF_F_STRING, -1, NULL);
+        DEFINE_MSGF("string", RMF_F_STRING, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_STRING);
 
 const struct req_msg_field RMF_LLOGD_BODY =
         DEFINE_MSGF("llogd_body", 0,
 EXPORT_SYMBOL(RMF_STRING);
 
 const struct req_msg_field RMF_LLOGD_BODY =
         DEFINE_MSGF("llogd_body", 0,
-                    sizeof(struct llogd_body), lustre_swab_llogd_body);
+                    sizeof(struct llogd_body), lustre_swab_llogd_body, NULL);
 EXPORT_SYMBOL(RMF_LLOGD_BODY);
 
 const struct req_msg_field RMF_LLOG_LOG_HDR =
         DEFINE_MSGF("llog_log_hdr", 0,
 EXPORT_SYMBOL(RMF_LLOGD_BODY);
 
 const struct req_msg_field RMF_LLOG_LOG_HDR =
         DEFINE_MSGF("llog_log_hdr", 0,
-                    sizeof(struct llog_log_hdr), lustre_swab_llog_hdr);
+                    sizeof(struct llog_log_hdr), lustre_swab_llog_hdr, NULL);
 EXPORT_SYMBOL(RMF_LLOG_LOG_HDR);
 
 const struct req_msg_field RMF_LLOGD_CONN_BODY =
         DEFINE_MSGF("llogd_conn_body", 0,
                     sizeof(struct llogd_conn_body),
 EXPORT_SYMBOL(RMF_LLOG_LOG_HDR);
 
 const struct req_msg_field RMF_LLOGD_CONN_BODY =
         DEFINE_MSGF("llogd_conn_body", 0,
                     sizeof(struct llogd_conn_body),
-                    lustre_swab_llogd_conn_body);
+                    lustre_swab_llogd_conn_body, NULL);
 EXPORT_SYMBOL(RMF_LLOGD_CONN_BODY);
 
 /*
  * connection handle received in MDS_CONNECT request.
  *
 EXPORT_SYMBOL(RMF_LLOGD_CONN_BODY);
 
 /*
  * connection handle received in MDS_CONNECT request.
  *
- * XXX no swabbing?
+ * No swabbing needed because struct lustre_handle contains only a 64-bit cookie
+ * that the client does not interpret at all.
  */
 const struct req_msg_field RMF_CONN =
  */
 const struct req_msg_field RMF_CONN =
-        DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL);
+        DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL, NULL);
 EXPORT_SYMBOL(RMF_CONN);
 
 const struct req_msg_field RMF_CONNECT_DATA =
         DEFINE_MSGF("cdata",
                     RMF_F_NO_SIZE_CHECK /* we allow extra space for interop */,
 EXPORT_SYMBOL(RMF_CONN);
 
 const struct req_msg_field RMF_CONNECT_DATA =
         DEFINE_MSGF("cdata",
                     RMF_F_NO_SIZE_CHECK /* we allow extra space for interop */,
-                    sizeof(struct obd_connect_data), lustre_swab_connect);
+                    sizeof(struct obd_connect_data), lustre_swab_connect, NULL);
 EXPORT_SYMBOL(RMF_CONNECT_DATA);
 
 const struct req_msg_field RMF_DLM_REQ =
         DEFINE_MSGF("dlm_req", RMF_F_NO_SIZE_CHECK /* ldlm_request_bufsize */,
 EXPORT_SYMBOL(RMF_CONNECT_DATA);
 
 const struct req_msg_field RMF_DLM_REQ =
         DEFINE_MSGF("dlm_req", RMF_F_NO_SIZE_CHECK /* ldlm_request_bufsize */,
-                    sizeof(struct ldlm_request), lustre_swab_ldlm_request);
+                    sizeof(struct ldlm_request),
+                    lustre_swab_ldlm_request, NULL);
 EXPORT_SYMBOL(RMF_DLM_REQ);
 
 const struct req_msg_field RMF_DLM_REP =
         DEFINE_MSGF("dlm_rep", 0,
 EXPORT_SYMBOL(RMF_DLM_REQ);
 
 const struct req_msg_field RMF_DLM_REP =
         DEFINE_MSGF("dlm_rep", 0,
-                    sizeof(struct ldlm_reply), lustre_swab_ldlm_reply);
+                    sizeof(struct ldlm_reply), lustre_swab_ldlm_reply, NULL);
 EXPORT_SYMBOL(RMF_DLM_REP);
 
 const struct req_msg_field RMF_LDLM_INTENT =
         DEFINE_MSGF("ldlm_intent", 0,
 EXPORT_SYMBOL(RMF_DLM_REP);
 
 const struct req_msg_field RMF_LDLM_INTENT =
         DEFINE_MSGF("ldlm_intent", 0,
-                    sizeof(struct ldlm_intent), lustre_swab_ldlm_intent);
+                    sizeof(struct ldlm_intent), lustre_swab_ldlm_intent, NULL);
 EXPORT_SYMBOL(RMF_LDLM_INTENT);
 
 const struct req_msg_field RMF_DLM_LVB =
 EXPORT_SYMBOL(RMF_LDLM_INTENT);
 
 const struct req_msg_field RMF_DLM_LVB =
-        DEFINE_MSGF("dlm_lvb", 0, sizeof(struct ost_lvb), NULL);
+        DEFINE_MSGF("dlm_lvb", 0, sizeof(struct ost_lvb), lustre_swab_ost_lvb,
+        NULL);
 EXPORT_SYMBOL(RMF_DLM_LVB);
 
 const struct req_msg_field RMF_MDT_MD =
 EXPORT_SYMBOL(RMF_DLM_LVB);
 
 const struct req_msg_field RMF_MDT_MD =
-        DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL);
+        DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL, NULL);
 EXPORT_SYMBOL(RMF_MDT_MD);
 
 const struct req_msg_field RMF_REC_REINT =
         DEFINE_MSGF("rec_reint", 0, sizeof(struct mdt_rec_reint),
 EXPORT_SYMBOL(RMF_MDT_MD);
 
 const struct req_msg_field RMF_REC_REINT =
         DEFINE_MSGF("rec_reint", 0, sizeof(struct mdt_rec_reint),
-                    lustre_swab_mdt_rec_reint);
+                    lustre_swab_mdt_rec_reint, NULL);
 EXPORT_SYMBOL(RMF_REC_REINT);
 
 /* FIXME: this length should be defined as a macro */
 EXPORT_SYMBOL(RMF_REC_REINT);
 
 /* FIXME: this length should be defined as a macro */
-const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1, NULL);
+const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1,
+                                                    NULL, NULL);
 EXPORT_SYMBOL(RMF_EADATA);
 
 const struct req_msg_field RMF_ACL =
         DEFINE_MSGF("acl", RMF_F_NO_SIZE_CHECK,
 EXPORT_SYMBOL(RMF_EADATA);
 
 const struct req_msg_field RMF_ACL =
         DEFINE_MSGF("acl", RMF_F_NO_SIZE_CHECK,
-                    LUSTRE_POSIX_ACL_MAX_SIZE, NULL);
+                    LUSTRE_POSIX_ACL_MAX_SIZE, NULL, NULL);
 EXPORT_SYMBOL(RMF_ACL);
 
 EXPORT_SYMBOL(RMF_ACL);
 
+/* FIXME: this should be made to use RMF_F_STRUCT_ARRAY */
 const struct req_msg_field RMF_LOGCOOKIES =
         DEFINE_MSGF("logcookies", RMF_F_NO_SIZE_CHECK /* multiple cookies */,
 const struct req_msg_field RMF_LOGCOOKIES =
         DEFINE_MSGF("logcookies", RMF_F_NO_SIZE_CHECK /* multiple cookies */,
-                    sizeof(struct llog_cookie), NULL);
+                    sizeof(struct llog_cookie), NULL, NULL);
 EXPORT_SYMBOL(RMF_LOGCOOKIES);
 
 const struct req_msg_field RMF_CAPA1 =
         DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa),
 EXPORT_SYMBOL(RMF_LOGCOOKIES);
 
 const struct req_msg_field RMF_CAPA1 =
         DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa),
-                    lustre_swab_lustre_capa);
+                    lustre_swab_lustre_capa, NULL);
 EXPORT_SYMBOL(RMF_CAPA1);
 
 const struct req_msg_field RMF_CAPA2 =
         DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa),
 EXPORT_SYMBOL(RMF_CAPA1);
 
 const struct req_msg_field RMF_CAPA2 =
         DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa),
-                    lustre_swab_lustre_capa);
+                    lustre_swab_lustre_capa, NULL);
 EXPORT_SYMBOL(RMF_CAPA2);
 
 /*
 EXPORT_SYMBOL(RMF_CAPA2);
 
 /*
@@ -835,30 +876,37 @@ EXPORT_SYMBOL(RMF_CAPA2);
  */
 const struct req_msg_field RMF_OST_BODY =
         DEFINE_MSGF("ost_body", 0,
  */
 const struct req_msg_field RMF_OST_BODY =
         DEFINE_MSGF("ost_body", 0,
-                    sizeof(struct ost_body), lustre_swab_ost_body);
+                    sizeof(struct ost_body), lustre_swab_ost_body, dump_ost_body);
 EXPORT_SYMBOL(RMF_OST_BODY);
 
 const struct req_msg_field RMF_OBD_IOOBJ =
 EXPORT_SYMBOL(RMF_OST_BODY);
 
 const struct req_msg_field RMF_OBD_IOOBJ =
-        DEFINE_MSGF("obd_ioobj", 0,
-                    sizeof(struct obd_ioobj), lustre_swab_obd_ioobj);
+        DEFINE_MSGF("obd_ioobj", RMF_F_STRUCT_ARRAY,
+                    sizeof(struct obd_ioobj), lustre_swab_obd_ioobj, dump_ioo);
 EXPORT_SYMBOL(RMF_OBD_IOOBJ);
 
 const struct req_msg_field RMF_NIOBUF_REMOTE =
 EXPORT_SYMBOL(RMF_OBD_IOOBJ);
 
 const struct req_msg_field RMF_NIOBUF_REMOTE =
-        DEFINE_MSGF("niobuf_remote", 0, -1, lustre_swab_niobuf_remote);
+        DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY,
+                    sizeof(struct niobuf_remote), lustre_swab_niobuf_remote,
+                    dump_rniobuf);
 EXPORT_SYMBOL(RMF_NIOBUF_REMOTE);
 
 EXPORT_SYMBOL(RMF_NIOBUF_REMOTE);
 
+const struct req_msg_field RMF_RCS =
+        DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY, sizeof(__u32),
+                    lustre_swab_generic_32s, dump_rcs);
+EXPORT_SYMBOL(RMF_RCS);
+
 const struct req_msg_field RMF_OBD_ID =
         DEFINE_MSGF("obd_id", 0,
 const struct req_msg_field RMF_OBD_ID =
         DEFINE_MSGF("obd_id", 0,
-                    sizeof(obd_id), lustre_swab_ost_last_id);
+                    sizeof(obd_id), lustre_swab_ost_last_id, NULL);
 EXPORT_SYMBOL(RMF_OBD_ID);
 
 const struct req_msg_field RMF_FIEMAP_KEY =
         DEFINE_MSGF("fiemap", 0, sizeof(struct ll_fiemap_info_key),
 EXPORT_SYMBOL(RMF_OBD_ID);
 
 const struct req_msg_field RMF_FIEMAP_KEY =
         DEFINE_MSGF("fiemap", 0, sizeof(struct ll_fiemap_info_key),
-                    lustre_swab_fiemap);
+                    lustre_swab_fiemap, NULL);
 EXPORT_SYMBOL(RMF_FIEMAP_KEY);
 
 const struct req_msg_field RMF_FIEMAP_VAL =
 EXPORT_SYMBOL(RMF_FIEMAP_KEY);
 
 const struct req_msg_field RMF_FIEMAP_VAL =
-        DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap);
+        DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap, NULL);
 EXPORT_SYMBOL(RMF_FIEMAP_VAL);
 
 /*
 EXPORT_SYMBOL(RMF_FIEMAP_VAL);
 
 /*
@@ -1220,7 +1268,7 @@ const struct req_format RQF_OST_STATFS =
 EXPORT_SYMBOL(RQF_OST_STATFS);
 
 const struct req_format RQF_OST_SET_GRANT_INFO =
 EXPORT_SYMBOL(RQF_OST_STATFS);
 
 const struct req_format RQF_OST_SET_GRANT_INFO =
-        DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", obd_set_info_client,
+        DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", ost_grant_shrink_client,
                          ost_body_only);
 EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO);
 
                          ost_body_only);
 EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO);
 
@@ -1242,6 +1290,13 @@ EXPORT_SYMBOL(RQF_OST_GET_INFO_FIEMAP);
 
 #if !defined(__REQ_LAYOUT_USER__)
 
 
 #if !defined(__REQ_LAYOUT_USER__)
 
+/* Convenience macro */
+#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
+
+/**
+ * Initializes the capsule abstraction by computing and setting the \a rf_idx
+ * field of RQFs and the \a rmf_offset field of RMFs.
+ */
 int req_layout_init(void)
 {
         int i;
 int req_layout_init(void)
 {
         int i;
@@ -1258,6 +1313,8 @@ int req_layout_init(void)
                                 struct req_msg_field *field;
 
                                 field = (typeof(field))rf->rf_fields[j].d[k];
                                 struct req_msg_field *field;
 
                                 field = (typeof(field))rf->rf_fields[j].d[k];
+                                LASSERT(!(field->rmf_flags & RMF_F_STRUCT_ARRAY)
+                                        || field->rmf_size > 0);
                                 LASSERT(field->rmf_offset[i][j] == 0);
                                 /*
                                  * k + 1 to detect unused format/field
                                 LASSERT(field->rmf_offset[i][j] == 0);
                                 /*
                                  * k + 1 to detect unused format/field
@@ -1276,6 +1333,14 @@ void req_layout_fini(void)
 }
 EXPORT_SYMBOL(req_layout_fini);
 
 }
 EXPORT_SYMBOL(req_layout_fini);
 
+/**
+ * Initializes the expected sizes of each RMF in a \a pill (\a rc_area) to -1.
+ *
+ * Actual/expected field sizes are set elsewhere in functions in this file:
+ * req_capsule_init(), req_capsule_server_pack(), req_capsule_set_size() and
+ * req_capsule_msg_size().  The \a rc_area information is used by.
+ * ptlrpc_request_set_replen().
+ */
 void req_capsule_init_area(struct req_capsule *pill)
 {
         int i;
 void req_capsule_init_area(struct req_capsule *pill)
 {
         int i;
@@ -1287,11 +1352,11 @@ void req_capsule_init_area(struct req_capsule *pill)
 }
 EXPORT_SYMBOL(req_capsule_init_area);
 
 }
 EXPORT_SYMBOL(req_capsule_init_area);
 
-/*
- * Initialize capsule.
+/**
+ * Initialize a pill.
  *
  *
- * @area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of
- * variable-sized fields.
+ * The \a location indicates whether the caller is executing on the client side
+ * (RCL_CLIENT) or server side (RCL_SERVER)..
  */
 void req_capsule_init(struct req_capsule *pill,
                       struct ptlrpc_request *req,
  */
 void req_capsule_init(struct req_capsule *pill,
                       struct ptlrpc_request *req,
@@ -1299,10 +1364,27 @@ void req_capsule_init(struct req_capsule *pill,
 {
         LASSERT(location == RCL_SERVER || location == RCL_CLIENT);
 
 {
         LASSERT(location == RCL_SERVER || location == RCL_CLIENT);
 
+        /*
+         * Today all capsules are embedded in ptlrpc_request structs,
+         * but just in case that ever isn't the case, we don't reach
+         * into req unless req != NULL and pill is the one embedded in
+         * the req.
+         *
+         * The req->rq_pill_init flag makes it safe to initialize a pill
+         * twice, which might happen in the OST paths as a result of the
+         * high-priority RPC queue getting peeked at before ost_handle()
+         * handles an OST RPC.
+         */
+        if (req != NULL && pill == &req->rq_pill && req->rq_pill_init)
+                return;
+
         memset(pill, 0, sizeof *pill);
         pill->rc_req = req;
         pill->rc_loc = location;
         req_capsule_init_area(pill);
         memset(pill, 0, sizeof *pill);
         pill->rc_req = req;
         pill->rc_loc = location;
         req_capsule_init_area(pill);
+
+        if (req != NULL && pill == &req->rq_pill)
+                req->rq_pill_init = 1;
 }
 EXPORT_SYMBOL(req_capsule_init);
 
 }
 EXPORT_SYMBOL(req_capsule_init);
 
@@ -1327,15 +1409,27 @@ static struct lustre_msg *__req_msg(const struct req_capsule *pill,
         return loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg;
 }
 
         return loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg;
 }
 
+/**
+ * Set the format (\a fmt) of a \a pill; format changes are not allowed here
+ * (see req_capsule_extend()).
+ */
 void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt)
 {
 void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt)
 {
-        LASSERT(pill->rc_fmt == NULL);
+        LASSERT(pill->rc_fmt == NULL || pill->rc_fmt == fmt);
         LASSERT(__req_format_is_sane(fmt));
 
         pill->rc_fmt = fmt;
 }
 EXPORT_SYMBOL(req_capsule_set);
 
         LASSERT(__req_format_is_sane(fmt));
 
         pill->rc_fmt = fmt;
 }
 EXPORT_SYMBOL(req_capsule_set);
 
+/**
+ * Fills in any parts of the \a rc_area of a \a pill that haven't been filled in
+ * yet.
+
+ * \a rc_area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of
+ * variable-sized fields.  The field sizes come from the declared \a rmf_size
+ * field of a \a pill's \a rc_fmt's RMF's.
+ */
 int req_capsule_filled_sizes(struct req_capsule *pill,
                            enum req_location loc)
 {
 int req_capsule_filled_sizes(struct req_capsule *pill,
                            enum req_location loc)
 {
@@ -1349,7 +1443,12 @@ int req_capsule_filled_sizes(struct req_capsule *pill,
                         pill->rc_area[loc][i] =
                                             fmt->rf_fields[loc].d[i]->rmf_size;
                         if (pill->rc_area[loc][i] == -1) {
                         pill->rc_area[loc][i] =
                                             fmt->rf_fields[loc].d[i]->rmf_size;
                         if (pill->rc_area[loc][i] == -1) {
-                                /* skip the following fields */
+                                /*
+                                 * Skip the following fields.
+                                 *
+                                 * If this LASSERT() trips then you're missing a
+                                 * call to req_capsule_set_size().
+                                 */
                                 LASSERT(loc != RCL_SERVER);
                                 break;
                         }
                                 LASSERT(loc != RCL_SERVER);
                                 break;
                         }
@@ -1359,6 +1458,13 @@ int req_capsule_filled_sizes(struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_filled_sizes);
 
 }
 EXPORT_SYMBOL(req_capsule_filled_sizes);
 
+/**
+ * Capsule equivalent of lustre_pack_request() and lustre_pack_reply().
+ *
+ * This function uses the \a pill's \a rc_area as filled in by
+ * req_capsule_set_size() or req_capsule_filled_sizes() (the latter is called by
+ * this function).
+ */
 int req_capsule_server_pack(struct req_capsule *pill)
 {
         const struct req_format *fmt;
 int req_capsule_server_pack(struct req_capsule *pill)
 {
         const struct req_format *fmt;
@@ -1374,13 +1480,17 @@ int req_capsule_server_pack(struct req_capsule *pill)
                                pill->rc_area[RCL_SERVER], NULL);
         if (rc != 0) {
                 DEBUG_REQ(D_ERROR, pill->rc_req,
                                pill->rc_area[RCL_SERVER], NULL);
         if (rc != 0) {
                 DEBUG_REQ(D_ERROR, pill->rc_req,
-                          "Cannot pack %d fields in format `%s': ",
-                          count, fmt->rf_name);
+                       "Cannot pack %d fields in format `%s': ",
+                       count, fmt->rf_name);
         }
         return rc;
 }
 EXPORT_SYMBOL(req_capsule_server_pack);
 
         }
         return rc;
 }
 EXPORT_SYMBOL(req_capsule_server_pack);
 
+/**
+ * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
+ * corresponding to the given RMF (\a field).
+ */
 static int __req_capsule_offset(const struct req_capsule *pill,
                                 const struct req_msg_field *field,
                                 enum req_location loc)
 static int __req_capsule_offset(const struct req_capsule *pill,
                                 const struct req_msg_field *field,
                                 enum req_location loc)
@@ -1397,17 +1507,99 @@ static int __req_capsule_offset(const struct req_capsule *pill,
         return offset;
 }
 
         return offset;
 }
 
+/**
+ * Helper for __req_capsule_get(); swabs value / array of values and/or dumps
+ * them if desired.
+ */
+static
+void
+swabber_dumper_helper(struct req_capsule *pill,
+                      const struct req_msg_field *field,
+                      enum req_location loc,
+                      int offset,
+                      void *value, int len, int dump, void (*swabber)( void *))
+{
+        void    *p;
+        int     i;
+        int     n;
+        int     do_swab;
+        int     inout = loc == RCL_CLIENT;
+
+        swabber = swabber ?: field->rmf_swabber;
+
+        if (ptlrpc_buf_need_swab(pill->rc_req, inout, offset) &&
+            swabber != NULL && value != NULL)
+                do_swab = 1;
+        else
+                do_swab = 0;
+
+        if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY)) {
+                if (dump && field->rmf_dumper) {
+                        CDEBUG(D_RPCTRACE, "Dump of %sfield %s follows\n",
+                               do_swab ? "unswabbed " : "", field->rmf_name);
+                        field->rmf_dumper(value);
+                }
+                if (!do_swab)
+                        return;
+                swabber(value);
+                ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
+                if (dump) {
+                        CDEBUG(D_RPCTRACE, "Dump of swabbed field %s "
+                               "follows\n", field->rmf_name);
+                        field->rmf_dumper(value);
+                }
+
+                return;
+        }
+
+        /*
+         * We're swabbing an array; swabber() swabs a single array element, so
+         * swab every element.
+         */
+        LASSERT((len % field->rmf_size) == 0);
+        for (p = value, i = 0, n = len / field->rmf_size;
+             i < n;
+             i++, p += field->rmf_size) {
+                if (dump && field->rmf_dumper) {
+                        CDEBUG(D_RPCTRACE, "Dump of %sarray field %s, "
+                               "element %d follows\n",
+                               do_swab ? "unswabbed " : "", field->rmf_name, i);
+                        field->rmf_dumper(p);
+                }
+                if (!do_swab)
+                        continue;
+                swabber(p);
+                if (dump && field->rmf_dumper) {
+                        CDEBUG(D_RPCTRACE, "Dump of swabbed array field %s, "
+                               "element %d follows\n", field->rmf_name, i);
+                        field->rmf_dumper(value);
+                }
+        }
+        if (do_swab)
+                ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
+}
+
+/**
+ * Returns the pointer to a PTLRPC request or reply (\a loc) buffer of a \a pill
+ * corresponding to the given RMF (\a field).
+ *
+ * The buffer will be swabbed using the given \a swabber.  If \a swabber == NULL
+ * then the \a rmf_swabber from the RMF will be used.  Soon there will be no
+ * calls to __req_capsule_get() with a non-NULL \a swabber; \a swabber will then
+ * be removed.  Fields with the \a RMF_F_STRUCT_ARRAY flag set will have each
+ * element of the array swabbed.
+ */
 static void *__req_capsule_get(struct req_capsule *pill,
                                const struct req_msg_field *field,
                                enum req_location loc,
 static void *__req_capsule_get(struct req_capsule *pill,
                                const struct req_msg_field *field,
                                enum req_location loc,
-                               void (*swabber)( void *))
+                               void (*swabber)( void *),
+                               int dump)
 {
         const struct req_format *fmt;
         struct lustre_msg       *msg;
         void                    *value;
         int                      len;
         int                      offset;
 {
         const struct req_format *fmt;
         struct lustre_msg       *msg;
         void                    *value;
         int                      len;
         int                      offset;
-        int                      inout = loc == RCL_CLIENT;
 
         void *(*getter)(struct lustre_msg *m, int n, int minlen);
 
 
         void *(*getter)(struct lustre_msg *m, int n, int minlen);
 
@@ -1431,86 +1623,193 @@ static void *__req_capsule_get(struct req_capsule *pill,
         getter = (field->rmf_flags & RMF_F_STRING) ?
                 (typeof(getter))lustre_msg_string : lustre_msg_buf;
 
         getter = (field->rmf_flags & RMF_F_STRING) ?
                 (typeof(getter))lustre_msg_string : lustre_msg_buf;
 
-        if (pill->rc_area[loc][offset] != -1)
+        if (field->rmf_flags & RMF_F_STRUCT_ARRAY) {
+                /*
+                 * We've already asserted that field->rmf_size > 0 in
+                 * req_layout_init().
+                 */
+                len = lustre_msg_buflen(msg, offset);
+                if ((len % field->rmf_size) != 0) {
+                        CERROR("%s: array field size mismatch "
+                               "%d modulo %d != 0 (%d)\n",
+                               field->rmf_name, len, field->rmf_size, loc);
+                        return NULL;
+                }
+        } else if (pill->rc_area[loc][offset] != -1) {
                 len = pill->rc_area[loc][offset];
                 len = pill->rc_area[loc][offset];
-        else
+        } else {
                 len = max(field->rmf_size, 0);
                 len = max(field->rmf_size, 0);
+        }
         value = getter(msg, offset, len);
 
         value = getter(msg, offset, len);
 
-        swabber = swabber ?: field->rmf_swabber;
-        if (ptlrpc_buf_need_swab(pill->rc_req, inout, offset) &&
-            swabber != NULL && value != NULL) {
-                swabber(value);
-                ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
-        }
         if (value == NULL) {
                 DEBUG_REQ(D_ERROR, pill->rc_req,
                           "Wrong buffer for field `%s' (%d of %d) "
                           "in format `%s': %d vs. %d (%s)\n",
         if (value == NULL) {
                 DEBUG_REQ(D_ERROR, pill->rc_req,
                           "Wrong buffer for field `%s' (%d of %d) "
                           "in format `%s': %d vs. %d (%s)\n",
-                          field->rmf_name, offset, lustre_msg_bufcount(msg), fmt->rf_name,
-                          lustre_msg_buflen(msg, offset), len,
+                          field->rmf_name, offset, lustre_msg_bufcount(msg),
+                          fmt->rf_name, lustre_msg_buflen(msg, offset), len,
                           rcl_names[loc]);
                           rcl_names[loc]);
+        } else {
+                swabber_dumper_helper(pill, field, loc, offset, value, len,
+                                      dump, swabber);
         }
 
         return value;
 }
 
         }
 
         return value;
 }
 
+/**
+ * Dump a request and/or reply
+ */
+void __req_capsule_dump(struct req_capsule *pill, enum req_location loc)
+{
+        const struct    req_format *fmt;
+        const struct    req_msg_field *field;
+        int             len;
+        int             i;
+
+        fmt = pill->rc_fmt;
+
+        DEBUG_REQ(D_RPCTRACE, pill->rc_req, "BEGIN REQ CAPSULE DUMP\n");
+        for (i = 0; i < fmt->rf_fields[loc].nr; ++i) {
+                field = FMT_FIELD(fmt, loc, i);
+                if (field->rmf_dumper == NULL) {
+                        /*
+                         * FIXME Add a default hex dumper for fields that don't
+                         * have a specific dumper
+                         */
+                        len = req_capsule_get_size(pill, field, loc);
+                        CDEBUG(D_RPCTRACE, "Field %s has no dumper function;"
+                               "field size is %d\n", field->rmf_name, len);
+                } else {
+                        /* It's the dumping side-effect that we're interested in */
+                        (void) __req_capsule_get(pill, field, loc, NULL, 1);
+                }
+        }
+        CDEBUG(D_RPCTRACE, "END REQ CAPSULE DUMP\n");
+}
+
+/**
+ * Dump a request.
+ */
+void req_capsule_client_dump(struct req_capsule *pill)
+{
+        __req_capsule_dump(pill, RCL_CLIENT);
+}
+EXPORT_SYMBOL(req_capsule_client_dump);
+
+/**
+ * Dump a reply
+ */
+void req_capsule_server_dump(struct req_capsule *pill)
+{
+        __req_capsule_dump(pill, RCL_SERVER);
+}
+EXPORT_SYMBOL(req_capsule_server_dump);
+
+/**
+ * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC request
+ * buffer corresponding to the given RMF (\a field) of a \a pill.
+ */
 void *req_capsule_client_get(struct req_capsule *pill,
                              const struct req_msg_field *field)
 {
 void *req_capsule_client_get(struct req_capsule *pill,
                              const struct req_msg_field *field)
 {
-        return __req_capsule_get(pill, field, RCL_CLIENT, NULL);
+        return __req_capsule_get(pill, field, RCL_CLIENT, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_client_get);
 
 }
 EXPORT_SYMBOL(req_capsule_client_get);
 
+/**
+ * Same as req_capsule_client_get(), but with a \a swabber argument.
+ *
+ * Currently unused; will be removed when req_capsule_server_swab_get() is
+ * unused too.
+ */
 void *req_capsule_client_swab_get(struct req_capsule *pill,
                                   const struct req_msg_field *field,
                                   void (*swabber)(void* ))
 {
 void *req_capsule_client_swab_get(struct req_capsule *pill,
                                   const struct req_msg_field *field,
                                   void (*swabber)(void* ))
 {
-        return __req_capsule_get(pill, field, RCL_CLIENT, swabber);
+        return __req_capsule_get(pill, field, RCL_CLIENT, swabber, 0);
 }
 EXPORT_SYMBOL(req_capsule_client_swab_get);
 
 }
 EXPORT_SYMBOL(req_capsule_client_swab_get);
 
+/**
+ * Utility that combines req_capsule_set_size() and req_capsule_client_get().
+ *
+ * First the \a pill's request \a field's size is set (\a rc_area) using
+ * req_capsule_set_size() with the given \a len.  Then the actual buffer is
+ * returned.
+ */
 void *req_capsule_client_sized_get(struct req_capsule *pill,
                                    const struct req_msg_field *field,
                                    int len)
 {
         req_capsule_set_size(pill, field, RCL_CLIENT, len);
 void *req_capsule_client_sized_get(struct req_capsule *pill,
                                    const struct req_msg_field *field,
                                    int len)
 {
         req_capsule_set_size(pill, field, RCL_CLIENT, len);
-        return __req_capsule_get(pill, field, RCL_CLIENT, NULL);
+        return __req_capsule_get(pill, field, RCL_CLIENT, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_client_sized_get);
 
 }
 EXPORT_SYMBOL(req_capsule_client_sized_get);
 
+/**
+ * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC reply
+ * buffer corresponding to the given RMF (\a field) of a \a pill.
+ */
 void *req_capsule_server_get(struct req_capsule *pill,
                              const struct req_msg_field *field)
 {
 void *req_capsule_server_get(struct req_capsule *pill,
                              const struct req_msg_field *field)
 {
-        return __req_capsule_get(pill, field, RCL_SERVER, NULL);
+        return __req_capsule_get(pill, field, RCL_SERVER, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_server_get);
 
 }
 EXPORT_SYMBOL(req_capsule_server_get);
 
+/**
+ * Same as req_capsule_server_get(), but with a \a swabber argument.
+ *
+ * Ideally all swabbing should be done pursuant to RMF definitions, with no
+ * swabbing done outside this capsule abstraction.
+ */
 void *req_capsule_server_swab_get(struct req_capsule *pill,
                                   const struct req_msg_field *field,
                                   void *swabber)
 {
 void *req_capsule_server_swab_get(struct req_capsule *pill,
                                   const struct req_msg_field *field,
                                   void *swabber)
 {
-        return __req_capsule_get(pill, field, RCL_SERVER, swabber);
+        return __req_capsule_get(pill, field, RCL_SERVER, swabber, 0);
 }
 EXPORT_SYMBOL(req_capsule_server_swab_get);
 
 }
 EXPORT_SYMBOL(req_capsule_server_swab_get);
 
-
+/**
+ * Utility that combines req_capsule_set_size() and req_capsule_server_get().
+ *
+ * First the \a pill's request \a field's size is set (\a rc_area) using
+ * req_capsule_set_size() with the given \a len.  Then the actual buffer is
+ * returned.
+ */
 void *req_capsule_server_sized_get(struct req_capsule *pill,
                                    const struct req_msg_field *field,
                                    int len)
 {
         req_capsule_set_size(pill, field, RCL_SERVER, len);
 void *req_capsule_server_sized_get(struct req_capsule *pill,
                                    const struct req_msg_field *field,
                                    int len)
 {
         req_capsule_set_size(pill, field, RCL_SERVER, len);
-        return __req_capsule_get(pill, field, RCL_SERVER, NULL);
+        return __req_capsule_get(pill, field, RCL_SERVER, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_server_sized_get);
 
 }
 EXPORT_SYMBOL(req_capsule_server_sized_get);
 
+/**
+ * Returns the buffer of a \a pill corresponding to the given \a field from the
+ * request (if the caller is executing on the server-side) or reply (if the
+ * caller is executing on the client-side).
+ *
+ * This function convienient for use is code that could be executed on the
+ * client and server alike.
+ */
 const void *req_capsule_other_get(struct req_capsule *pill,
                                   const struct req_msg_field *field)
 {
 const void *req_capsule_other_get(struct req_capsule *pill,
                                   const struct req_msg_field *field)
 {
-        return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL);
+        return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_other_get);
 
 }
 EXPORT_SYMBOL(req_capsule_other_get);
 
+/**
+ * Set the size of the PTLRPC request/reply (\a loc) buffer for the given \a
+ * field of the given \a pill.
+ *
+ * This function must be used when constructing variable sized fields of a
+ * request or reply.
+ */
 void req_capsule_set_size(struct req_capsule *pill,
                           const struct req_msg_field *field,
                           enum req_location loc, int size)
 void req_capsule_set_size(struct req_capsule *pill,
                           const struct req_msg_field *field,
                           enum req_location loc, int size)
@@ -1521,16 +1820,29 @@ void req_capsule_set_size(struct req_capsule *pill,
             (field->rmf_size != -1) &&
             !(field->rmf_flags & RMF_F_NO_SIZE_CHECK) &&
             (size > 0)) {
             (field->rmf_size != -1) &&
             !(field->rmf_flags & RMF_F_NO_SIZE_CHECK) &&
             (size > 0)) {
-                CERROR("%s: field size mismatch %d != %d (%d)\n",
-                       field->rmf_name, size, field->rmf_size, loc);
-                LBUG();
+                if ((field->rmf_flags & RMF_F_STRUCT_ARRAY) &&
+                    (size % field->rmf_size != 0)) {
+                        CERROR("%s: array field size mismatch "
+                               "%d %% %d != 0 (%d)\n",
+                               field->rmf_name, size, field->rmf_size, loc);
+                        LBUG();
+                } else if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY) &&
+                    size < field->rmf_size) {
+                        CERROR("%s: field size mismatch %d != %d (%d)\n",
+                               field->rmf_name, size, field->rmf_size, loc);
+                        LBUG();
+                }
         }
 
         pill->rc_area[loc][__req_capsule_offset(pill, field, loc)] = size;
 }
 EXPORT_SYMBOL(req_capsule_set_size);
 
         }
 
         pill->rc_area[loc][__req_capsule_offset(pill, field, loc)] = size;
 }
 EXPORT_SYMBOL(req_capsule_set_size);
 
-/* NB: this function doesn't correspond with req_capsule_set_size(), which
+/**
+ * Return the actual PTLRPC buffer length of a request or reply (\a loc)
+ * for the given \a pill's given \a field.
+ *
+ * NB: this function doesn't correspond with req_capsule_set_size(), which
  * actually sets the size in pill.rc_area[loc][offset], but this function
  * returns the message buflen[offset], maybe we should use another name.
  */
  * actually sets the size in pill.rc_area[loc][offset], but this function
  * returns the message buflen[offset], maybe we should use another name.
  */
@@ -1545,6 +1857,13 @@ int req_capsule_get_size(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_get_size);
 
 }
 EXPORT_SYMBOL(req_capsule_get_size);
 
+/**
+ * Wrapper around lustre_msg_size() that returns the PTLRPC size needed for the
+ * given \a pill's request or reply (\a loc) given the field size recorded in
+ * the \a pill's rc_area.
+ *
+ * See also req_capsule_set_size().
+ */
 int req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
 {
         return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
 int req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
 {
         return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
@@ -1552,11 +1871,26 @@ int req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
                                pill->rc_area[loc]);
 }
 
                                pill->rc_area[loc]);
 }
 
+/**
+ * While req_capsule_msg_size() computes the size of a PTLRPC request or reply
+ * (\a loc) given a \a pill's \a rc_area, this function computes the size of a
+ * PTLRPC request or reply given only an RQF (\a fmt).
+ *
+ * This function should not be used for formats which contain variable size
+ * fields.
+ */
 int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
                          enum req_location loc)
 {
         int size, i = 0;
 
 int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
                          enum req_location loc)
 {
         int size, i = 0;
 
+        /*
+         * This function should probably LASSERT() that fmt has no fields with
+         * RMF_F_STRUCT_ARRAY in rmf_flags, since we can't know here how many
+         * elements in the array there will ultimately be, but then, we could
+         * assume that there will be at least one element, and that's just what
+         * we do.
+         */
         size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr);
         if (size < 0)
                 return size;
         size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr);
         if (size < 0)
                 return size;
@@ -1567,8 +1901,25 @@ int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
         return size;
 }
 
         return size;
 }
 
-#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
-
+/**
+ * Changes the format of an RPC.
+ *
+ * The pill must already have been initialized, which means that it already has
+ * a request format.  The new format \a fmt must be an extension of the pill's
+ * old format.  Specifically: the new format must have as many request and reply
+ * fields as the old one, and all fields shared by the old and new format must
+ * be at least as large in the new format.
+ *
+ * The new format's fields may be of different "type" than the old format, but
+ * only for fields that are "opaque" blobs: fields which have a) have no
+ * \a rmf_swabber, b) \a rmf_flags == 0 or RMF_F_NO_SIZE_CHECK, and c) \a
+ * rmf_size == -1 or \a rmf_flags == RMF_F_NO_SIZE_CHECK.  For example,
+ * OBD_SET_INFO has a key field and an opaque value field that gets interpreted
+ * according to the key field.  When the value, according to the key, contains a
+ * structure (or array thereof) to be swabbed, the format should be changed to
+ * one where the value field has \a rmf_size/rmf_flags/rmf_swabber set
+ * accordingly.
+ */
 void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
 {
         int i;
 void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
 {
         int i;
@@ -1586,6 +1937,14 @@ void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
         for (i = 0; i < RCL_NR; ++i) {
                 LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr);
                 for (j = 0; j < old->rf_fields[i].nr - 1; ++j) {
         for (i = 0; i < RCL_NR; ++i) {
                 LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr);
                 for (j = 0; j < old->rf_fields[i].nr - 1; ++j) {
+                        const struct req_msg_field *ofield = FMT_FIELD(old, i, j);
+
+                        /* "opaque" fields can be transmogrified */
+                        if (ofield->rmf_swabber == NULL &&
+                            (ofield->rmf_flags & ~RMF_F_NO_SIZE_CHECK) == 0 &&
+                            (ofield->rmf_size == -1 ||
+                            ofield->rmf_flags == RMF_F_NO_SIZE_CHECK))
+                                continue;
                         LASSERT(FMT_FIELD(fmt, i, j) == FMT_FIELD(old, i, j));
                 }
                 /*
                         LASSERT(FMT_FIELD(fmt, i, j) == FMT_FIELD(old, i, j));
                 }
                 /*
@@ -1599,6 +1958,11 @@ void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
 }
 EXPORT_SYMBOL(req_capsule_extend);
 
 }
 EXPORT_SYMBOL(req_capsule_extend);
 
+/**
+ * This function returns a non-zero value if the given \a field is present in
+ * the format (\a rc_fmt) of \a pill's PTLRPC request or reply (\a loc), else it
+ * returns 0.
+ */
 int req_capsule_has_field(const struct req_capsule *pill,
                           const struct req_msg_field *field,
                           enum req_location loc)
 int req_capsule_has_field(const struct req_capsule *pill,
                           const struct req_msg_field *field,
                           enum req_location loc)
@@ -1609,6 +1973,10 @@ int req_capsule_has_field(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_has_field);
 
 }
 EXPORT_SYMBOL(req_capsule_has_field);
 
+/**
+ * Returns a non-zero value if the given \a field is present in the given \a
+ * pill's PTLRPC request or reply (\a loc), else it returns 0.
+ */
 int req_capsule_field_present(const struct req_capsule *pill,
                               const struct req_msg_field *field,
                               enum req_location loc)
 int req_capsule_field_present(const struct req_capsule *pill,
                               const struct req_msg_field *field,
                               enum req_location loc)
@@ -1623,6 +1991,12 @@ int req_capsule_field_present(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_field_present);
 
 }
 EXPORT_SYMBOL(req_capsule_field_present);
 
+/**
+ * This function shrinks the size of the _buffer_ of the \a pill's PTLRPC
+ * request or reply (\a loc).
+ *
+ * This is not the opposite of req_capsule_extend().
+ */
 void req_capsule_shrink(struct req_capsule *pill,
                         const struct req_msg_field *field,
                         unsigned int newlen,
 void req_capsule_shrink(struct req_capsule *pill,
                         const struct req_msg_field *field,
                         unsigned int newlen,
index b5e3e53..6ec0e5b 100644 (file)
@@ -773,26 +773,6 @@ static inline void *__lustre_swab_buf(struct lustre_msg *msg, int index,
         return ptr;
 }
 
         return ptr;
 }
 
-void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size,
-                         void *swabber)
-{
-        if (!ptlrpc_buf_need_swab(req, 1, index))
-                return lustre_msg_buf(req->rq_reqmsg, index, min_size);
-
-        lustre_set_req_swabbed(req, index);
-        return __lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber);
-}
-
-void *lustre_swab_repbuf(struct ptlrpc_request *req, int index,
-                         int min_size, void *swabber)
-{
-        if (!ptlrpc_buf_need_swab(req, 0, index))
-                return lustre_msg_buf(req->rq_repmsg, index, min_size);
-
-        lustre_set_rep_swabbed(req, index);
-        return __lustre_swab_buf(req->rq_repmsg, index, min_size, swabber);
-}
-
 static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg)
 {
         return lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
 static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg)
 {
         return lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
@@ -2084,14 +2064,94 @@ void lustre_swab_qdata(struct qunit_data *d)
         __swab64s (&d->padding);
 }
 
         __swab64s (&d->padding);
 }
 
+/* Dump functions */
+void dump_ioo(struct obd_ioobj *ioo)
+{
+        CDEBUG(D_RPCTRACE,
+               "obd_ioobj: ioo_id="LPD64", ioo_gr="LPD64", ioo_type=%d, "
+               "ioo_bufct=%d\n", ioo->ioo_id, ioo->ioo_gr, ioo->ioo_type,
+               ioo->ioo_bufcnt);
+}
+
+void dump_rniobuf(struct niobuf_remote *nb)
+{
+        CDEBUG(D_RPCTRACE, "niobuf_remote: offset="LPU64", len=%d, flags=%x\n",
+               nb->offset, nb->len, nb->flags);
+}
+
+void dump_obdo(struct obdo *oa)
+{
+        __u32 valid = oa->o_valid;
+
+        CDEBUG(D_RPCTRACE, "obdo: o_valid = %08x\n", valid);
+        if (valid & OBD_MD_FLID)
+                CDEBUG(D_RPCTRACE, "obdo: o_id = "LPD64"\n", oa->o_id);
+        if (valid & OBD_MD_FLGROUP)
+                CDEBUG(D_RPCTRACE, "obdo: o_gr = "LPD64"\n", oa->o_gr);
+        if (valid & OBD_MD_FLFID)
+                CDEBUG(D_RPCTRACE, "obdo: o_fid = "LPD64"\n", oa->o_fid);
+        if (valid & OBD_MD_FLSIZE)
+                CDEBUG(D_RPCTRACE, "obdo: o_size = "LPD64"\n", oa->o_size);
+        if (valid & OBD_MD_FLMTIME)
+                CDEBUG(D_RPCTRACE, "obdo: o_mtime = "LPD64"\n", oa->o_mtime);
+        if (valid & OBD_MD_FLATIME)
+                CDEBUG(D_RPCTRACE, "obdo: o_atime = "LPD64"\n", oa->o_atime);
+        if (valid & OBD_MD_FLCTIME)
+                CDEBUG(D_RPCTRACE, "obdo: o_ctime = "LPD64"\n", oa->o_ctime);
+        if (valid & OBD_MD_FLBLOCKS)   /* allocation of space */
+                CDEBUG(D_RPCTRACE, "obdo: o_blocks = "LPD64"\n", oa->o_blocks);
+        if (valid & OBD_MD_FLGRANT)
+                CDEBUG(D_RPCTRACE, "obdo: o_grant = "LPD64"\n", oa->o_grant);
+        if (valid & OBD_MD_FLBLKSZ)
+                CDEBUG(D_RPCTRACE, "obdo: o_blksize = %d\n", oa->o_blksize);
+        if (valid & (OBD_MD_FLTYPE | OBD_MD_FLMODE))
+                CDEBUG(D_RPCTRACE, "obdo: o_mode = %o\n",
+                       oa->o_mode & ((valid & OBD_MD_FLTYPE ?  S_IFMT : 0) |
+                                     (valid & OBD_MD_FLMODE ? ~S_IFMT : 0)));
+        if (valid & OBD_MD_FLUID)
+                CDEBUG(D_RPCTRACE, "obdo: o_uid = %u\n", oa->o_uid);
+        if (valid & OBD_MD_FLGID)
+                CDEBUG(D_RPCTRACE, "obdo: o_gid = %u\n", oa->o_gid);
+        if (valid & OBD_MD_FLFLAGS)
+                CDEBUG(D_RPCTRACE, "obdo: o_flags = %x\n", oa->o_flags);
+        if (valid & OBD_MD_FLNLINK)
+                CDEBUG(D_RPCTRACE, "obdo: o_nlink = %u\n", oa->o_nlink);
+        else if (valid & OBD_MD_FLCKSUM)
+                CDEBUG(D_RPCTRACE, "obdo: o_checksum (o_nlink) = %u\n", oa->o_nlink);
+        if (valid & OBD_MD_FLGENER)
+                CDEBUG(D_RPCTRACE, "obdo: o_generation = %u\n",
+                       oa->o_generation);
+        if (valid & OBD_MD_FLEASIZE)
+                CDEBUG(D_RPCTRACE, "obdo: o_easize = %u\n", oa->o_easize);
+        else if (valid & OBD_MD_FLEPOCH)
+                CDEBUG(D_RPCTRACE, "obdo: o_epoc (o_easize) = %u\n", oa->o_easize);
+        if (valid & OBD_MD_FLID)
+                CDEBUG(D_RPCTRACE, "obdo: o_stripe_idx = %u\n", oa->o_stripe_idx);
+        if (valid & OBD_MD_FLHANDLE)
+                CDEBUG(D_RPCTRACE, "obdo: o_handle = "LPD64"\n", oa->o_handle.cookie);
+        if (valid & OBD_MD_FLCOOKIE)
+                CDEBUG(D_RPCTRACE, "obdo: o_lcookie = "
+                       "(llog_cookie dumping not yet implemented)\n");
+}
+
+void dump_ost_body(struct ost_body *ob)
+{
+        dump_obdo(&ob->oa);
+}
+
+void dump_rcs(__u32 *rc)
+{
+        CDEBUG(D_RPCTRACE, "rmf_rcs: %d\n", *rc);
+}
+
 #ifdef __KERNEL__
 
 /**
  * got qdata from request(req/rep)
  */
 #ifdef __KERNEL__
 
 /**
  * got qdata from request(req/rep)
  */
-struct qunit_data *quota_get_qdata(void *request, int is_req, int is_exp)
+struct qunit_data *quota_get_qdata(void *r, int is_req, int is_exp)
 {
 {
-        struct ptlrpc_request *req = (struct ptlrpc_request *)request;
+        struct ptlrpc_request *req = (struct ptlrpc_request *)r;
         struct qunit_data *qdata;
         __u64  flags = is_exp ? req->rq_export->exp_connect_flags :
                        req->rq_import->imp_connect_data.ocd_connect_flags;
         struct qunit_data *qdata;
         __u64  flags = is_exp ? req->rq_export->exp_connect_flags :
                        req->rq_import->imp_connect_data.ocd_connect_flags;
@@ -2103,13 +2163,9 @@ struct qunit_data *quota_get_qdata(void *request, int is_req, int is_exp)
         LASSERT(flags & OBD_CONNECT_CHANGE_QS);
 
         if (is_req == QUOTA_REQUEST)
         LASSERT(flags & OBD_CONNECT_CHANGE_QS);
 
         if (is_req == QUOTA_REQUEST)
-                qdata = lustre_swab_reqbuf(req, REQ_REC_OFF,
-                                           sizeof(struct qunit_data),
-                                           lustre_swab_qdata);
+                qdata = req_capsule_client_get(&req->rq_pill, &RMF_QUNIT_DATA);
         else
         else
-                qdata = lustre_swab_repbuf(req, REPLY_REC_OFF,
-                                           sizeof(struct qunit_data),
-                                           lustre_swab_qdata);
+                qdata = req_capsule_server_get(&req->rq_pill, &RMF_QUNIT_DATA);
         if (qdata == NULL)
                 return ERR_PTR(-EPROTO);
 
         if (qdata == NULL)
                 return ERR_PTR(-EPROTO);
 
@@ -2121,10 +2177,10 @@ EXPORT_SYMBOL(quota_get_qdata);
 /**
  * copy qdata to request(req/rep)
  */
 /**
  * copy qdata to request(req/rep)
  */
-int quota_copy_qdata(void *request, struct qunit_data *qdata,
-                     int is_req, int is_exp)
+int quota_copy_qdata(void *r, struct qunit_data *qdata, int is_req,
+                     int is_exp)
 {
 {
-        struct ptlrpc_request *req = (struct ptlrpc_request *)request;
+        struct ptlrpc_request *req = (struct ptlrpc_request *)r;
         void *target;
         __u64  flags = is_exp ? req->rq_export->exp_connect_flags :
                 req->rq_import->imp_connect_data.ocd_connect_flags;
         void *target;
         __u64  flags = is_exp ? req->rq_export->exp_connect_flags :
                 req->rq_import->imp_connect_data.ocd_connect_flags;
@@ -2137,14 +2193,13 @@ int quota_copy_qdata(void *request, struct qunit_data *qdata,
         LASSERT(flags & OBD_CONNECT_CHANGE_QS);
 
         if (is_req == QUOTA_REQUEST)
         LASSERT(flags & OBD_CONNECT_CHANGE_QS);
 
         if (is_req == QUOTA_REQUEST)
-                target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
-                                        sizeof(struct qunit_data));
+                target = req_capsule_client_get(&req->rq_pill, &RMF_QUNIT_DATA);
         else
         else
-                target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                        sizeof(struct qunit_data));
+                target = req_capsule_server_get(&req->rq_pill, &RMF_QUNIT_DATA);
         if (target == NULL)
                 return -EPROTO;
 
         if (target == NULL)
                 return -EPROTO;
 
+        LASSERT(target != qdata);
         memcpy(target, qdata, sizeof(*qdata));
         return 0;
 }
         memcpy(target, qdata, sizeof(*qdata));
         return 0;
 }
index cab4334..a9012e7 100644 (file)
@@ -238,8 +238,6 @@ EXPORT_SYMBOL(lustre_msg_string);
 EXPORT_SYMBOL(ptlrpc_buf_set_swabbed);
 EXPORT_SYMBOL(ptlrpc_buf_need_swab);
 EXPORT_SYMBOL(lustre_swab_ptlrpc_body);
 EXPORT_SYMBOL(ptlrpc_buf_set_swabbed);
 EXPORT_SYMBOL(ptlrpc_buf_need_swab);
 EXPORT_SYMBOL(lustre_swab_ptlrpc_body);
-EXPORT_SYMBOL(lustre_swab_reqbuf);
-EXPORT_SYMBOL(lustre_swab_repbuf);
 EXPORT_SYMBOL(lustre_swab_obdo);
 EXPORT_SYMBOL(lustre_swab_obd_statfs);
 EXPORT_SYMBOL(lustre_swab_obd_ioobj);
 EXPORT_SYMBOL(lustre_swab_obdo);
 EXPORT_SYMBOL(lustre_swab_obd_statfs);
 EXPORT_SYMBOL(lustre_swab_obd_ioobj);
@@ -272,6 +270,11 @@ EXPORT_SYMBOL(lustre_swab_ldlm_resource_desc);
 EXPORT_SYMBOL(lustre_swab_ldlm_lock_desc);
 EXPORT_SYMBOL(lustre_swab_ldlm_request);
 EXPORT_SYMBOL(lustre_swab_ldlm_reply);
 EXPORT_SYMBOL(lustre_swab_ldlm_lock_desc);
 EXPORT_SYMBOL(lustre_swab_ldlm_request);
 EXPORT_SYMBOL(lustre_swab_ldlm_reply);
+EXPORT_SYMBOL(dump_ioo);
+EXPORT_SYMBOL(dump_rniobuf);
+EXPORT_SYMBOL(dump_obdo);
+EXPORT_SYMBOL(dump_ost_body);
+EXPORT_SYMBOL(dump_rcs);
 EXPORT_SYMBOL(lustre_swab_qdata);
 EXPORT_SYMBOL(lustre_swab_quota_adjust_qunit);
 EXPORT_SYMBOL(lustre_msg_get_flags);
 EXPORT_SYMBOL(lustre_swab_qdata);
 EXPORT_SYMBOL(lustre_swab_quota_adjust_qunit);
 EXPORT_SYMBOL(lustre_msg_get_flags);
index 74bdb0b..4b7c685 100644 (file)
 #define lustre_swab_ost_body NULL
 #define lustre_swab_ost_last_id NULL
 #define lustre_swab_fiemap NULL
 #define lustre_swab_ost_body NULL
 #define lustre_swab_ost_last_id NULL
 #define lustre_swab_fiemap NULL
+#define lustre_swab_qdata NULL
+#define lustre_swab_ost_lvb NULL
+#define dump_rniobuf NULL
+#define dump_ioo NULL
+#define dump_obdo NULL
+#define dump_ost_body NULL
+#define dump_rcs NULL
 
 /*
  * Yes, include .c file.
 
 /*
  * Yes, include .c file.