Whamcloud - gitweb
b=18631
[fs/lustre-release.git] / lustre / ptlrpc / layout.c
index cdc9ce6..4cddc07 100644 (file)
  *
  * Author: Nikita Danilov <nikita@clusterfs.com>
  */
+/*
+ * This file contains the "capsule/pill" abstraction layered above PTLRPC.
+ *
+ * Every struct ptlrpc_request contains a "pill", which points to a description
+ * of the format that the request conforms to.
+ */
 
 #if !defined(__REQ_LAYOUT_USER__)
 
 /* struct ptlrpc_request, lustre_msg* */
 #include <lustre_req_layout.h>
 #include <lustre_acl.h>
+#include <lustre_debug.h>
 
 /*
- * empty set of fields... for suitable definition of emptiness.
+ * RQFs (see below) refer to two struct req_msg_field arrays describing the
+ * client request and server reply, respectively.
  */
+/* empty set of fields... for suitable definition of emptiness. */
 static const struct req_msg_field *empty[] = {
         &RMF_PTLRPC_BODY
 };
@@ -286,6 +295,12 @@ static const struct req_msg_field *obd_set_info_client[] = {
         &RMF_SETINFO_VAL
 };
 
+static const struct req_msg_field *ost_grant_shrink_client[] = {
+        &RMF_PTLRPC_BODY,
+        &RMF_SETINFO_KEY,
+        &RMF_OST_BODY
+};
+
 static const struct req_msg_field *mds_getinfo_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_GETINFO_KEY,
@@ -497,7 +512,7 @@ static const struct req_msg_field *ost_brw_client[] = {
 static const struct req_msg_field *ost_brw_server[] = {
         &RMF_PTLRPC_BODY,
         &RMF_OST_BODY,
-        &RMF_NIOBUF_REMOTE
+        &RMF_RCS
 };
 
 static const struct req_msg_field *ost_get_info_generic_server[] = {
@@ -609,16 +624,31 @@ struct req_msg_field {
         __u32       rmf_flags;
         const char *rmf_name;
         /**
-         * Field length. (-1) means "variable length".
+         * Field length. (-1) means "variable length".  If the
+         * \a RMF_F_STRUCT_ARRAY flag is set the field is also variable-length,
+         * but the actual size must be a whole multiple of \a rmf_size.
          */
         int         rmf_size;
         void      (*rmf_swabber)(void *);
+        void      (*rmf_dumper)(void *);
         int         rmf_offset[ARRAY_SIZE(req_formats)][RCL_NR];
 };
 
 enum rmf_flags {
+        /**
+         * The field is a string, must be NUL-terminated.
+         */
         RMF_F_STRING = 1 << 0,
-        RMF_F_NO_SIZE_CHECK = 1 << 1
+        /**
+         * The field's buffer size need not match the declared \a rmf_size.
+         */
+        RMF_F_NO_SIZE_CHECK = 1 << 1,
+        /**
+         * The field's buffer size must be a whole multiple of the declared \a
+         * rmf_size and the \a rmf_swabber function must work on the declared \a
+         * rmf_size worth of bytes.
+         */
+        RMF_F_STRUCT_ARRAY = 1 << 2
 };
 
 struct req_capsule;
@@ -626,208 +656,219 @@ struct req_capsule;
 /*
  * Request fields.
  */
-#define DEFINE_MSGF(name, flags, size, swabber) {       \
-        .rmf_name    = (name),                          \
-        .rmf_flags   = (flags),                         \
-        .rmf_size    = (size),                          \
-        .rmf_swabber = (void (*)(void*))(swabber)       \
+#define DEFINE_MSGF(name, flags, size, swabber, dumper) {       \
+        .rmf_name    = (name),                                  \
+        .rmf_flags   = (flags),                                 \
+        .rmf_size    = (size),                                  \
+        .rmf_swabber = (void (*)(void*))(swabber),              \
+        .rmf_dumper  = (void (*)(void*))(dumper)                \
 }
 
 const struct req_msg_field RMF_GENERIC_DATA =
         DEFINE_MSGF("generic_data", 0,
-                    -1, NULL);
+                    -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_GENERIC_DATA);
 
 const struct req_msg_field RMF_MGS_TARGET_INFO =
         DEFINE_MSGF("mgs_target_info", 0,
                     sizeof(struct mgs_target_info),
-                    lustre_swab_mgs_target_info);
+                    lustre_swab_mgs_target_info, NULL);
 EXPORT_SYMBOL(RMF_MGS_TARGET_INFO);
 
 const struct req_msg_field RMF_MGS_SEND_PARAM =
         DEFINE_MSGF("mgs_send_param", 0,
                     sizeof(struct mgs_send_param),
-                    NULL);
+                    NULL, NULL);
 EXPORT_SYMBOL(RMF_MGS_SEND_PARAM);
 
 const struct req_msg_field RMF_SETINFO_VAL =
-        DEFINE_MSGF("setinfo_val", 0, -1, NULL);
+        DEFINE_MSGF("setinfo_val", 0, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_SETINFO_VAL);
 
 const struct req_msg_field RMF_GETINFO_KEY =
-        DEFINE_MSGF("getinfo_key", 0, -1, NULL);
+        DEFINE_MSGF("getinfo_key", 0, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_GETINFO_KEY);
 
 const struct req_msg_field RMF_GETINFO_VALLEN =
         DEFINE_MSGF("getinfo_vallen", 0,
-                    sizeof(__u32), lustre_swab_generic_32s);
+                    sizeof(__u32), lustre_swab_generic_32s, NULL);
 EXPORT_SYMBOL(RMF_GETINFO_VALLEN);
 
 const struct req_msg_field RMF_GETINFO_VAL =
-        DEFINE_MSGF("getinfo_val", 0, -1, NULL);
+        DEFINE_MSGF("getinfo_val", 0, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_GETINFO_VAL);
 
 const struct req_msg_field RMF_SEQ_OPC =
         DEFINE_MSGF("seq_query_opc", 0,
-                    sizeof(__u32), lustre_swab_generic_32s);
+                    sizeof(__u32), lustre_swab_generic_32s, NULL);
 EXPORT_SYMBOL(RMF_SEQ_OPC);
 
 const struct req_msg_field RMF_SEQ_RANGE =
         DEFINE_MSGF("seq_query_range", 0,
-                    sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
+                    sizeof(struct lu_seq_range),
+                    lustre_swab_lu_seq_range, NULL);
 EXPORT_SYMBOL(RMF_SEQ_RANGE);
 
 const struct req_msg_field RMF_FLD_OPC =
         DEFINE_MSGF("fld_query_opc", 0,
-                    sizeof(__u32), lustre_swab_generic_32s);
+                    sizeof(__u32), lustre_swab_generic_32s, NULL);
 EXPORT_SYMBOL(RMF_FLD_OPC);
 
 const struct req_msg_field RMF_FLD_MDFLD =
         DEFINE_MSGF("fld_query_mdfld", 0,
-                    sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
+                    sizeof(struct lu_seq_range),
+                    lustre_swab_lu_seq_range, NULL);
 EXPORT_SYMBOL(RMF_FLD_MDFLD);
 
 const struct req_msg_field RMF_MDT_BODY =
         DEFINE_MSGF("mdt_body", 0,
-                    sizeof(struct mdt_body), lustre_swab_mdt_body);
+                    sizeof(struct mdt_body), lustre_swab_mdt_body, NULL);
 EXPORT_SYMBOL(RMF_MDT_BODY);
 
 const struct req_msg_field RMF_OBD_QUOTACTL =
         DEFINE_MSGF("obd_quotactl", 0,
-                    sizeof(struct obd_quotactl), lustre_swab_obd_quotactl);
+                    sizeof(struct obd_quotactl),
+                    lustre_swab_obd_quotactl, NULL);
 EXPORT_SYMBOL(RMF_OBD_QUOTACTL);
 
 const struct req_msg_field RMF_QUOTA_ADJUST_QUNIT =
         DEFINE_MSGF("quota_adjust_qunit", 0,
                     sizeof(struct quota_adjust_qunit),
-                    lustre_swab_quota_adjust_qunit);
+                    lustre_swab_quota_adjust_qunit, NULL);
 EXPORT_SYMBOL(RMF_QUOTA_ADJUST_QUNIT);
 
 const struct req_msg_field RMF_QUNIT_DATA =
         DEFINE_MSGF("qunit_data", 0,
-                    sizeof(struct qunit_data), NULL);
+                    sizeof(struct qunit_data), lustre_swab_qdata, NULL);
 EXPORT_SYMBOL(RMF_QUNIT_DATA);
 
 const struct req_msg_field RMF_MDT_EPOCH =
         DEFINE_MSGF("mdt_epoch", 0,
-                    sizeof(struct mdt_epoch), lustre_swab_mdt_epoch);
+                    sizeof(struct mdt_epoch), lustre_swab_mdt_epoch, NULL);
 EXPORT_SYMBOL(RMF_MDT_EPOCH);
 
 const struct req_msg_field RMF_PTLRPC_BODY =
         DEFINE_MSGF("ptlrpc_body", 0,
-                    sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body);
+                    sizeof(struct ptlrpc_body), lustre_swab_ptlrpc_body, NULL);
 EXPORT_SYMBOL(RMF_PTLRPC_BODY);
 
 const struct req_msg_field RMF_OBD_STATFS =
         DEFINE_MSGF("obd_statfs", 0,
-                    sizeof(struct obd_statfs), lustre_swab_obd_statfs);
+                    sizeof(struct obd_statfs), lustre_swab_obd_statfs, NULL);
 EXPORT_SYMBOL(RMF_OBD_STATFS);
 
 const struct req_msg_field RMF_SETINFO_KEY =
-        DEFINE_MSGF("setinfo_key", 0, -1, NULL);
+        DEFINE_MSGF("setinfo_key", 0, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_SETINFO_KEY);
 
 const struct req_msg_field RMF_NAME =
-        DEFINE_MSGF("name", RMF_F_STRING, -1, NULL);
+        DEFINE_MSGF("name", RMF_F_STRING, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_NAME);
 
 const struct req_msg_field RMF_SYMTGT =
-        DEFINE_MSGF("symtgt", RMF_F_STRING, -1, NULL);
+        DEFINE_MSGF("symtgt", RMF_F_STRING, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_SYMTGT);
 
 const struct req_msg_field RMF_TGTUUID =
-        DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+        DEFINE_MSGF("tgtuuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL,
+        NULL);
 EXPORT_SYMBOL(RMF_TGTUUID);
 
 const struct req_msg_field RMF_CLUUID =
-        DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL);
+        DEFINE_MSGF("cluuid", RMF_F_STRING, sizeof(struct obd_uuid) - 1, NULL,
+        NULL);
 EXPORT_SYMBOL(RMF_CLUUID);
 
 const struct req_msg_field RMF_STRING =
-        DEFINE_MSGF("string", RMF_F_STRING, -1, NULL);
+        DEFINE_MSGF("string", RMF_F_STRING, -1, NULL, NULL);
 EXPORT_SYMBOL(RMF_STRING);
 
 const struct req_msg_field RMF_LLOGD_BODY =
         DEFINE_MSGF("llogd_body", 0,
-                    sizeof(struct llogd_body), lustre_swab_llogd_body);
+                    sizeof(struct llogd_body), lustre_swab_llogd_body, NULL);
 EXPORT_SYMBOL(RMF_LLOGD_BODY);
 
 const struct req_msg_field RMF_LLOG_LOG_HDR =
         DEFINE_MSGF("llog_log_hdr", 0,
-                    sizeof(struct llog_log_hdr), lustre_swab_llog_hdr);
+                    sizeof(struct llog_log_hdr), lustre_swab_llog_hdr, NULL);
 EXPORT_SYMBOL(RMF_LLOG_LOG_HDR);
 
 const struct req_msg_field RMF_LLOGD_CONN_BODY =
         DEFINE_MSGF("llogd_conn_body", 0,
                     sizeof(struct llogd_conn_body),
-                    lustre_swab_llogd_conn_body);
+                    lustre_swab_llogd_conn_body, NULL);
 EXPORT_SYMBOL(RMF_LLOGD_CONN_BODY);
 
 /*
  * connection handle received in MDS_CONNECT request.
  *
- * XXX no swabbing?
+ * No swabbing needed because struct lustre_handle contains only a 64-bit cookie
+ * that the client does not interpret at all.
  */
 const struct req_msg_field RMF_CONN =
-        DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL);
+        DEFINE_MSGF("conn", 0, sizeof(struct lustre_handle), NULL, NULL);
 EXPORT_SYMBOL(RMF_CONN);
 
 const struct req_msg_field RMF_CONNECT_DATA =
         DEFINE_MSGF("cdata",
                     RMF_F_NO_SIZE_CHECK /* we allow extra space for interop */,
-                    sizeof(struct obd_connect_data), lustre_swab_connect);
+                    sizeof(struct obd_connect_data), lustre_swab_connect, NULL);
 EXPORT_SYMBOL(RMF_CONNECT_DATA);
 
 const struct req_msg_field RMF_DLM_REQ =
         DEFINE_MSGF("dlm_req", RMF_F_NO_SIZE_CHECK /* ldlm_request_bufsize */,
-                    sizeof(struct ldlm_request), lustre_swab_ldlm_request);
+                    sizeof(struct ldlm_request),
+                    lustre_swab_ldlm_request, NULL);
 EXPORT_SYMBOL(RMF_DLM_REQ);
 
 const struct req_msg_field RMF_DLM_REP =
         DEFINE_MSGF("dlm_rep", 0,
-                    sizeof(struct ldlm_reply), lustre_swab_ldlm_reply);
+                    sizeof(struct ldlm_reply), lustre_swab_ldlm_reply, NULL);
 EXPORT_SYMBOL(RMF_DLM_REP);
 
 const struct req_msg_field RMF_LDLM_INTENT =
         DEFINE_MSGF("ldlm_intent", 0,
-                    sizeof(struct ldlm_intent), lustre_swab_ldlm_intent);
+                    sizeof(struct ldlm_intent), lustre_swab_ldlm_intent, NULL);
 EXPORT_SYMBOL(RMF_LDLM_INTENT);
 
 const struct req_msg_field RMF_DLM_LVB =
-        DEFINE_MSGF("dlm_lvb", 0, sizeof(struct ost_lvb), NULL);
+        DEFINE_MSGF("dlm_lvb", 0, sizeof(struct ost_lvb), lustre_swab_ost_lvb,
+        NULL);
 EXPORT_SYMBOL(RMF_DLM_LVB);
 
 const struct req_msg_field RMF_MDT_MD =
-        DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL);
+        DEFINE_MSGF("mdt_md", RMF_F_NO_SIZE_CHECK, MIN_MD_SIZE, NULL, NULL);
 EXPORT_SYMBOL(RMF_MDT_MD);
 
 const struct req_msg_field RMF_REC_REINT =
         DEFINE_MSGF("rec_reint", 0, sizeof(struct mdt_rec_reint),
-                    lustre_swab_mdt_rec_reint);
+                    lustre_swab_mdt_rec_reint, NULL);
 EXPORT_SYMBOL(RMF_REC_REINT);
 
 /* FIXME: this length should be defined as a macro */
-const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1, NULL);
+const struct req_msg_field RMF_EADATA = DEFINE_MSGF("eadata", 0, -1,
+                                                    NULL, NULL);
 EXPORT_SYMBOL(RMF_EADATA);
 
 const struct req_msg_field RMF_ACL =
         DEFINE_MSGF("acl", RMF_F_NO_SIZE_CHECK,
-                    LUSTRE_POSIX_ACL_MAX_SIZE, NULL);
+                    LUSTRE_POSIX_ACL_MAX_SIZE, NULL, NULL);
 EXPORT_SYMBOL(RMF_ACL);
 
+/* FIXME: this should be made to use RMF_F_STRUCT_ARRAY */
 const struct req_msg_field RMF_LOGCOOKIES =
         DEFINE_MSGF("logcookies", RMF_F_NO_SIZE_CHECK /* multiple cookies */,
-                    sizeof(struct llog_cookie), NULL);
+                    sizeof(struct llog_cookie), NULL, NULL);
 EXPORT_SYMBOL(RMF_LOGCOOKIES);
 
 const struct req_msg_field RMF_CAPA1 =
         DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa),
-                    lustre_swab_lustre_capa);
+                    lustre_swab_lustre_capa, NULL);
 EXPORT_SYMBOL(RMF_CAPA1);
 
 const struct req_msg_field RMF_CAPA2 =
         DEFINE_MSGF("capa", 0, sizeof(struct lustre_capa),
-                    lustre_swab_lustre_capa);
+                    lustre_swab_lustre_capa, NULL);
 EXPORT_SYMBOL(RMF_CAPA2);
 
 /*
@@ -835,30 +876,37 @@ EXPORT_SYMBOL(RMF_CAPA2);
  */
 const struct req_msg_field RMF_OST_BODY =
         DEFINE_MSGF("ost_body", 0,
-                    sizeof(struct ost_body), lustre_swab_ost_body);
+                    sizeof(struct ost_body), lustre_swab_ost_body, dump_ost_body);
 EXPORT_SYMBOL(RMF_OST_BODY);
 
 const struct req_msg_field RMF_OBD_IOOBJ =
-        DEFINE_MSGF("obd_ioobj", 0,
-                    sizeof(struct obd_ioobj), lustre_swab_obd_ioobj);
+        DEFINE_MSGF("obd_ioobj", RMF_F_STRUCT_ARRAY,
+                    sizeof(struct obd_ioobj), lustre_swab_obd_ioobj, dump_ioo);
 EXPORT_SYMBOL(RMF_OBD_IOOBJ);
 
 const struct req_msg_field RMF_NIOBUF_REMOTE =
-        DEFINE_MSGF("niobuf_remote", 0, -1, lustre_swab_niobuf_remote);
+        DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY,
+                    sizeof(struct niobuf_remote), lustre_swab_niobuf_remote,
+                    dump_rniobuf);
 EXPORT_SYMBOL(RMF_NIOBUF_REMOTE);
 
+const struct req_msg_field RMF_RCS =
+        DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY, sizeof(__u32),
+                    lustre_swab_generic_32s, dump_rcs);
+EXPORT_SYMBOL(RMF_RCS);
+
 const struct req_msg_field RMF_OBD_ID =
         DEFINE_MSGF("obd_id", 0,
-                    sizeof(obd_id), lustre_swab_ost_last_id);
+                    sizeof(obd_id), lustre_swab_ost_last_id, NULL);
 EXPORT_SYMBOL(RMF_OBD_ID);
 
 const struct req_msg_field RMF_FIEMAP_KEY =
         DEFINE_MSGF("fiemap", 0, sizeof(struct ll_fiemap_info_key),
-                    lustre_swab_fiemap);
+                    lustre_swab_fiemap, NULL);
 EXPORT_SYMBOL(RMF_FIEMAP_KEY);
 
 const struct req_msg_field RMF_FIEMAP_VAL =
-        DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap);
+        DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap, NULL);
 EXPORT_SYMBOL(RMF_FIEMAP_VAL);
 
 /*
@@ -1220,7 +1268,7 @@ const struct req_format RQF_OST_STATFS =
 EXPORT_SYMBOL(RQF_OST_STATFS);
 
 const struct req_format RQF_OST_SET_GRANT_INFO =
-        DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", obd_set_info_client,
+        DEFINE_REQ_FMT0("OST_SET_GRANT_INFO", ost_grant_shrink_client,
                          ost_body_only);
 EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO);
 
@@ -1242,6 +1290,13 @@ EXPORT_SYMBOL(RQF_OST_GET_INFO_FIEMAP);
 
 #if !defined(__REQ_LAYOUT_USER__)
 
+/* Convenience macro */
+#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
+
+/**
+ * Initializes the capsule abstraction by computing and setting the \a rf_idx
+ * field of RQFs and the \a rmf_offset field of RMFs.
+ */
 int req_layout_init(void)
 {
         int i;
@@ -1258,6 +1313,8 @@ int req_layout_init(void)
                                 struct req_msg_field *field;
 
                                 field = (typeof(field))rf->rf_fields[j].d[k];
+                                LASSERT(!(field->rmf_flags & RMF_F_STRUCT_ARRAY)
+                                        || field->rmf_size > 0);
                                 LASSERT(field->rmf_offset[i][j] == 0);
                                 /*
                                  * k + 1 to detect unused format/field
@@ -1276,6 +1333,14 @@ void req_layout_fini(void)
 }
 EXPORT_SYMBOL(req_layout_fini);
 
+/**
+ * Initializes the expected sizes of each RMF in a \a pill (\a rc_area) to -1.
+ *
+ * Actual/expected field sizes are set elsewhere in functions in this file:
+ * req_capsule_init(), req_capsule_server_pack(), req_capsule_set_size() and
+ * req_capsule_msg_size().  The \a rc_area information is used by.
+ * ptlrpc_request_set_replen().
+ */
 void req_capsule_init_area(struct req_capsule *pill)
 {
         int i;
@@ -1287,11 +1352,11 @@ void req_capsule_init_area(struct req_capsule *pill)
 }
 EXPORT_SYMBOL(req_capsule_init_area);
 
-/*
- * Initialize capsule.
+/**
+ * Initialize a pill.
  *
- * @area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of
- * variable-sized fields.
+ * The \a location indicates whether the caller is executing on the client side
+ * (RCL_CLIENT) or server side (RCL_SERVER)..
  */
 void req_capsule_init(struct req_capsule *pill,
                       struct ptlrpc_request *req,
@@ -1299,10 +1364,27 @@ void req_capsule_init(struct req_capsule *pill,
 {
         LASSERT(location == RCL_SERVER || location == RCL_CLIENT);
 
+        /*
+         * Today all capsules are embedded in ptlrpc_request structs,
+         * but just in case that ever isn't the case, we don't reach
+         * into req unless req != NULL and pill is the one embedded in
+         * the req.
+         *
+         * The req->rq_pill_init flag makes it safe to initialize a pill
+         * twice, which might happen in the OST paths as a result of the
+         * high-priority RPC queue getting peeked at before ost_handle()
+         * handles an OST RPC.
+         */
+        if (req != NULL && pill == &req->rq_pill && req->rq_pill_init)
+                return;
+
         memset(pill, 0, sizeof *pill);
         pill->rc_req = req;
         pill->rc_loc = location;
         req_capsule_init_area(pill);
+
+        if (req != NULL && pill == &req->rq_pill)
+                req->rq_pill_init = 1;
 }
 EXPORT_SYMBOL(req_capsule_init);
 
@@ -1327,15 +1409,27 @@ static struct lustre_msg *__req_msg(const struct req_capsule *pill,
         return loc == RCL_CLIENT ? req->rq_reqmsg : req->rq_repmsg;
 }
 
+/**
+ * Set the format (\a fmt) of a \a pill; format changes are not allowed here
+ * (see req_capsule_extend()).
+ */
 void req_capsule_set(struct req_capsule *pill, const struct req_format *fmt)
 {
-        LASSERT(pill->rc_fmt == NULL);
+        LASSERT(pill->rc_fmt == NULL || pill->rc_fmt == fmt);
         LASSERT(__req_format_is_sane(fmt));
 
         pill->rc_fmt = fmt;
 }
 EXPORT_SYMBOL(req_capsule_set);
 
+/**
+ * Fills in any parts of the \a rc_area of a \a pill that haven't been filled in
+ * yet.
+
+ * \a rc_area is an array of REQ_MAX_FIELD_NR elements, used to store sizes of
+ * variable-sized fields.  The field sizes come from the declared \a rmf_size
+ * field of a \a pill's \a rc_fmt's RMF's.
+ */
 int req_capsule_filled_sizes(struct req_capsule *pill,
                            enum req_location loc)
 {
@@ -1349,7 +1443,12 @@ int req_capsule_filled_sizes(struct req_capsule *pill,
                         pill->rc_area[loc][i] =
                                             fmt->rf_fields[loc].d[i]->rmf_size;
                         if (pill->rc_area[loc][i] == -1) {
-                                /* skip the following fields */
+                                /*
+                                 * Skip the following fields.
+                                 *
+                                 * If this LASSERT() trips then you're missing a
+                                 * call to req_capsule_set_size().
+                                 */
                                 LASSERT(loc != RCL_SERVER);
                                 break;
                         }
@@ -1359,6 +1458,13 @@ int req_capsule_filled_sizes(struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_filled_sizes);
 
+/**
+ * Capsule equivalent of lustre_pack_request() and lustre_pack_reply().
+ *
+ * This function uses the \a pill's \a rc_area as filled in by
+ * req_capsule_set_size() or req_capsule_filled_sizes() (the latter is called by
+ * this function).
+ */
 int req_capsule_server_pack(struct req_capsule *pill)
 {
         const struct req_format *fmt;
@@ -1374,13 +1480,17 @@ int req_capsule_server_pack(struct req_capsule *pill)
                                pill->rc_area[RCL_SERVER], NULL);
         if (rc != 0) {
                 DEBUG_REQ(D_ERROR, pill->rc_req,
-                          "Cannot pack %d fields in format `%s': ",
-                          count, fmt->rf_name);
+                       "Cannot pack %d fields in format `%s': ",
+                       count, fmt->rf_name);
         }
         return rc;
 }
 EXPORT_SYMBOL(req_capsule_server_pack);
 
+/**
+ * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
+ * corresponding to the given RMF (\a field).
+ */
 static int __req_capsule_offset(const struct req_capsule *pill,
                                 const struct req_msg_field *field,
                                 enum req_location loc)
@@ -1397,17 +1507,99 @@ static int __req_capsule_offset(const struct req_capsule *pill,
         return offset;
 }
 
+/**
+ * Helper for __req_capsule_get(); swabs value / array of values and/or dumps
+ * them if desired.
+ */
+static
+void
+swabber_dumper_helper(struct req_capsule *pill,
+                      const struct req_msg_field *field,
+                      enum req_location loc,
+                      int offset,
+                      void *value, int len, int dump, void (*swabber)( void *))
+{
+        void    *p;
+        int     i;
+        int     n;
+        int     do_swab;
+        int     inout = loc == RCL_CLIENT;
+
+        swabber = swabber ?: field->rmf_swabber;
+
+        if (ptlrpc_buf_need_swab(pill->rc_req, inout, offset) &&
+            swabber != NULL && value != NULL)
+                do_swab = 1;
+        else
+                do_swab = 0;
+
+        if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY)) {
+                if (dump && field->rmf_dumper) {
+                        CDEBUG(D_RPCTRACE, "Dump of %sfield %s follows\n",
+                               do_swab ? "unswabbed " : "", field->rmf_name);
+                        field->rmf_dumper(value);
+                }
+                if (!do_swab)
+                        return;
+                swabber(value);
+                ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
+                if (dump) {
+                        CDEBUG(D_RPCTRACE, "Dump of swabbed field %s "
+                               "follows\n", field->rmf_name);
+                        field->rmf_dumper(value);
+                }
+
+                return;
+        }
+
+        /*
+         * We're swabbing an array; swabber() swabs a single array element, so
+         * swab every element.
+         */
+        LASSERT((len % field->rmf_size) == 0);
+        for (p = value, i = 0, n = len / field->rmf_size;
+             i < n;
+             i++, p += field->rmf_size) {
+                if (dump && field->rmf_dumper) {
+                        CDEBUG(D_RPCTRACE, "Dump of %sarray field %s, "
+                               "element %d follows\n",
+                               do_swab ? "unswabbed " : "", field->rmf_name, i);
+                        field->rmf_dumper(p);
+                }
+                if (!do_swab)
+                        continue;
+                swabber(p);
+                if (dump && field->rmf_dumper) {
+                        CDEBUG(D_RPCTRACE, "Dump of swabbed array field %s, "
+                               "element %d follows\n", field->rmf_name, i);
+                        field->rmf_dumper(value);
+                }
+        }
+        if (do_swab)
+                ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
+}
+
+/**
+ * Returns the pointer to a PTLRPC request or reply (\a loc) buffer of a \a pill
+ * corresponding to the given RMF (\a field).
+ *
+ * The buffer will be swabbed using the given \a swabber.  If \a swabber == NULL
+ * then the \a rmf_swabber from the RMF will be used.  Soon there will be no
+ * calls to __req_capsule_get() with a non-NULL \a swabber; \a swabber will then
+ * be removed.  Fields with the \a RMF_F_STRUCT_ARRAY flag set will have each
+ * element of the array swabbed.
+ */
 static void *__req_capsule_get(struct req_capsule *pill,
                                const struct req_msg_field *field,
                                enum req_location loc,
-                               void (*swabber)( void *))
+                               void (*swabber)( void *),
+                               int dump)
 {
         const struct req_format *fmt;
         struct lustre_msg       *msg;
         void                    *value;
         int                      len;
         int                      offset;
-        int                      inout = loc == RCL_CLIENT;
 
         void *(*getter)(struct lustre_msg *m, int n, int minlen);
 
@@ -1431,86 +1623,193 @@ static void *__req_capsule_get(struct req_capsule *pill,
         getter = (field->rmf_flags & RMF_F_STRING) ?
                 (typeof(getter))lustre_msg_string : lustre_msg_buf;
 
-        if (pill->rc_area[loc][offset] != -1)
+        if (field->rmf_flags & RMF_F_STRUCT_ARRAY) {
+                /*
+                 * We've already asserted that field->rmf_size > 0 in
+                 * req_layout_init().
+                 */
+                len = lustre_msg_buflen(msg, offset);
+                if ((len % field->rmf_size) != 0) {
+                        CERROR("%s: array field size mismatch "
+                               "%d modulo %d != 0 (%d)\n",
+                               field->rmf_name, len, field->rmf_size, loc);
+                        return NULL;
+                }
+        } else if (pill->rc_area[loc][offset] != -1) {
                 len = pill->rc_area[loc][offset];
-        else
+        } else {
                 len = max(field->rmf_size, 0);
+        }
         value = getter(msg, offset, len);
 
-        swabber = swabber ?: field->rmf_swabber;
-        if (ptlrpc_buf_need_swab(pill->rc_req, inout, offset) &&
-            swabber != NULL && value != NULL) {
-                swabber(value);
-                ptlrpc_buf_set_swabbed(pill->rc_req, inout, offset);
-        }
         if (value == NULL) {
                 DEBUG_REQ(D_ERROR, pill->rc_req,
                           "Wrong buffer for field `%s' (%d of %d) "
                           "in format `%s': %d vs. %d (%s)\n",
-                          field->rmf_name, offset, lustre_msg_bufcount(msg), fmt->rf_name,
-                          lustre_msg_buflen(msg, offset), len,
+                          field->rmf_name, offset, lustre_msg_bufcount(msg),
+                          fmt->rf_name, lustre_msg_buflen(msg, offset), len,
                           rcl_names[loc]);
+        } else {
+                swabber_dumper_helper(pill, field, loc, offset, value, len,
+                                      dump, swabber);
         }
 
         return value;
 }
 
+/**
+ * Dump a request and/or reply
+ */
+void __req_capsule_dump(struct req_capsule *pill, enum req_location loc)
+{
+        const struct    req_format *fmt;
+        const struct    req_msg_field *field;
+        int             len;
+        int             i;
+
+        fmt = pill->rc_fmt;
+
+        DEBUG_REQ(D_RPCTRACE, pill->rc_req, "BEGIN REQ CAPSULE DUMP\n");
+        for (i = 0; i < fmt->rf_fields[loc].nr; ++i) {
+                field = FMT_FIELD(fmt, loc, i);
+                if (field->rmf_dumper == NULL) {
+                        /*
+                         * FIXME Add a default hex dumper for fields that don't
+                         * have a specific dumper
+                         */
+                        len = req_capsule_get_size(pill, field, loc);
+                        CDEBUG(D_RPCTRACE, "Field %s has no dumper function;"
+                               "field size is %d\n", field->rmf_name, len);
+                } else {
+                        /* It's the dumping side-effect that we're interested in */
+                        (void) __req_capsule_get(pill, field, loc, NULL, 1);
+                }
+        }
+        CDEBUG(D_RPCTRACE, "END REQ CAPSULE DUMP\n");
+}
+
+/**
+ * Dump a request.
+ */
+void req_capsule_client_dump(struct req_capsule *pill)
+{
+        __req_capsule_dump(pill, RCL_CLIENT);
+}
+EXPORT_SYMBOL(req_capsule_client_dump);
+
+/**
+ * Dump a reply
+ */
+void req_capsule_server_dump(struct req_capsule *pill)
+{
+        __req_capsule_dump(pill, RCL_SERVER);
+}
+EXPORT_SYMBOL(req_capsule_server_dump);
+
+/**
+ * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC request
+ * buffer corresponding to the given RMF (\a field) of a \a pill.
+ */
 void *req_capsule_client_get(struct req_capsule *pill,
                              const struct req_msg_field *field)
 {
-        return __req_capsule_get(pill, field, RCL_CLIENT, NULL);
+        return __req_capsule_get(pill, field, RCL_CLIENT, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_client_get);
 
+/**
+ * Same as req_capsule_client_get(), but with a \a swabber argument.
+ *
+ * Currently unused; will be removed when req_capsule_server_swab_get() is
+ * unused too.
+ */
 void *req_capsule_client_swab_get(struct req_capsule *pill,
                                   const struct req_msg_field *field,
                                   void (*swabber)(void* ))
 {
-        return __req_capsule_get(pill, field, RCL_CLIENT, swabber);
+        return __req_capsule_get(pill, field, RCL_CLIENT, swabber, 0);
 }
 EXPORT_SYMBOL(req_capsule_client_swab_get);
 
+/**
+ * Utility that combines req_capsule_set_size() and req_capsule_client_get().
+ *
+ * First the \a pill's request \a field's size is set (\a rc_area) using
+ * req_capsule_set_size() with the given \a len.  Then the actual buffer is
+ * returned.
+ */
 void *req_capsule_client_sized_get(struct req_capsule *pill,
                                    const struct req_msg_field *field,
                                    int len)
 {
         req_capsule_set_size(pill, field, RCL_CLIENT, len);
-        return __req_capsule_get(pill, field, RCL_CLIENT, NULL);
+        return __req_capsule_get(pill, field, RCL_CLIENT, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_client_sized_get);
 
+/**
+ * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC reply
+ * buffer corresponding to the given RMF (\a field) of a \a pill.
+ */
 void *req_capsule_server_get(struct req_capsule *pill,
                              const struct req_msg_field *field)
 {
-        return __req_capsule_get(pill, field, RCL_SERVER, NULL);
+        return __req_capsule_get(pill, field, RCL_SERVER, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_server_get);
 
+/**
+ * Same as req_capsule_server_get(), but with a \a swabber argument.
+ *
+ * Ideally all swabbing should be done pursuant to RMF definitions, with no
+ * swabbing done outside this capsule abstraction.
+ */
 void *req_capsule_server_swab_get(struct req_capsule *pill,
                                   const struct req_msg_field *field,
                                   void *swabber)
 {
-        return __req_capsule_get(pill, field, RCL_SERVER, swabber);
+        return __req_capsule_get(pill, field, RCL_SERVER, swabber, 0);
 }
 EXPORT_SYMBOL(req_capsule_server_swab_get);
 
-
+/**
+ * Utility that combines req_capsule_set_size() and req_capsule_server_get().
+ *
+ * First the \a pill's request \a field's size is set (\a rc_area) using
+ * req_capsule_set_size() with the given \a len.  Then the actual buffer is
+ * returned.
+ */
 void *req_capsule_server_sized_get(struct req_capsule *pill,
                                    const struct req_msg_field *field,
                                    int len)
 {
         req_capsule_set_size(pill, field, RCL_SERVER, len);
-        return __req_capsule_get(pill, field, RCL_SERVER, NULL);
+        return __req_capsule_get(pill, field, RCL_SERVER, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_server_sized_get);
 
+/**
+ * Returns the buffer of a \a pill corresponding to the given \a field from the
+ * request (if the caller is executing on the server-side) or reply (if the
+ * caller is executing on the client-side).
+ *
+ * This function convienient for use is code that could be executed on the
+ * client and server alike.
+ */
 const void *req_capsule_other_get(struct req_capsule *pill,
                                   const struct req_msg_field *field)
 {
-        return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL);
+        return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL, 0);
 }
 EXPORT_SYMBOL(req_capsule_other_get);
 
+/**
+ * Set the size of the PTLRPC request/reply (\a loc) buffer for the given \a
+ * field of the given \a pill.
+ *
+ * This function must be used when constructing variable sized fields of a
+ * request or reply.
+ */
 void req_capsule_set_size(struct req_capsule *pill,
                           const struct req_msg_field *field,
                           enum req_location loc, int size)
@@ -1521,16 +1820,29 @@ void req_capsule_set_size(struct req_capsule *pill,
             (field->rmf_size != -1) &&
             !(field->rmf_flags & RMF_F_NO_SIZE_CHECK) &&
             (size > 0)) {
-                CERROR("%s: field size mismatch %d != %d (%d)\n",
-                       field->rmf_name, size, field->rmf_size, loc);
-                LBUG();
+                if ((field->rmf_flags & RMF_F_STRUCT_ARRAY) &&
+                    (size % field->rmf_size != 0)) {
+                        CERROR("%s: array field size mismatch "
+                               "%d %% %d != 0 (%d)\n",
+                               field->rmf_name, size, field->rmf_size, loc);
+                        LBUG();
+                } else if (!(field->rmf_flags & RMF_F_STRUCT_ARRAY) &&
+                    size < field->rmf_size) {
+                        CERROR("%s: field size mismatch %d != %d (%d)\n",
+                               field->rmf_name, size, field->rmf_size, loc);
+                        LBUG();
+                }
         }
 
         pill->rc_area[loc][__req_capsule_offset(pill, field, loc)] = size;
 }
 EXPORT_SYMBOL(req_capsule_set_size);
 
-/* NB: this function doesn't correspond with req_capsule_set_size(), which
+/**
+ * Return the actual PTLRPC buffer length of a request or reply (\a loc)
+ * for the given \a pill's given \a field.
+ *
+ * NB: this function doesn't correspond with req_capsule_set_size(), which
  * actually sets the size in pill.rc_area[loc][offset], but this function
  * returns the message buflen[offset], maybe we should use another name.
  */
@@ -1545,6 +1857,13 @@ int req_capsule_get_size(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_get_size);
 
+/**
+ * Wrapper around lustre_msg_size() that returns the PTLRPC size needed for the
+ * given \a pill's request or reply (\a loc) given the field size recorded in
+ * the \a pill's rc_area.
+ *
+ * See also req_capsule_set_size().
+ */
 int req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
 {
         return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
@@ -1552,11 +1871,26 @@ int req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
                                pill->rc_area[loc]);
 }
 
+/**
+ * While req_capsule_msg_size() computes the size of a PTLRPC request or reply
+ * (\a loc) given a \a pill's \a rc_area, this function computes the size of a
+ * PTLRPC request or reply given only an RQF (\a fmt).
+ *
+ * This function should not be used for formats which contain variable size
+ * fields.
+ */
 int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
                          enum req_location loc)
 {
         int size, i = 0;
 
+        /*
+         * This function should probably LASSERT() that fmt has no fields with
+         * RMF_F_STRUCT_ARRAY in rmf_flags, since we can't know here how many
+         * elements in the array there will ultimately be, but then, we could
+         * assume that there will be at least one element, and that's just what
+         * we do.
+         */
         size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr);
         if (size < 0)
                 return size;
@@ -1567,8 +1901,25 @@ int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
         return size;
 }
 
-#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
-
+/**
+ * Changes the format of an RPC.
+ *
+ * The pill must already have been initialized, which means that it already has
+ * a request format.  The new format \a fmt must be an extension of the pill's
+ * old format.  Specifically: the new format must have as many request and reply
+ * fields as the old one, and all fields shared by the old and new format must
+ * be at least as large in the new format.
+ *
+ * The new format's fields may be of different "type" than the old format, but
+ * only for fields that are "opaque" blobs: fields which have a) have no
+ * \a rmf_swabber, b) \a rmf_flags == 0 or RMF_F_NO_SIZE_CHECK, and c) \a
+ * rmf_size == -1 or \a rmf_flags == RMF_F_NO_SIZE_CHECK.  For example,
+ * OBD_SET_INFO has a key field and an opaque value field that gets interpreted
+ * according to the key field.  When the value, according to the key, contains a
+ * structure (or array thereof) to be swabbed, the format should be changed to
+ * one where the value field has \a rmf_size/rmf_flags/rmf_swabber set
+ * accordingly.
+ */
 void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
 {
         int i;
@@ -1586,6 +1937,14 @@ void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
         for (i = 0; i < RCL_NR; ++i) {
                 LASSERT(fmt->rf_fields[i].nr >= old->rf_fields[i].nr);
                 for (j = 0; j < old->rf_fields[i].nr - 1; ++j) {
+                        const struct req_msg_field *ofield = FMT_FIELD(old, i, j);
+
+                        /* "opaque" fields can be transmogrified */
+                        if (ofield->rmf_swabber == NULL &&
+                            (ofield->rmf_flags & ~RMF_F_NO_SIZE_CHECK) == 0 &&
+                            (ofield->rmf_size == -1 ||
+                            ofield->rmf_flags == RMF_F_NO_SIZE_CHECK))
+                                continue;
                         LASSERT(FMT_FIELD(fmt, i, j) == FMT_FIELD(old, i, j));
                 }
                 /*
@@ -1599,6 +1958,11 @@ void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
 }
 EXPORT_SYMBOL(req_capsule_extend);
 
+/**
+ * This function returns a non-zero value if the given \a field is present in
+ * the format (\a rc_fmt) of \a pill's PTLRPC request or reply (\a loc), else it
+ * returns 0.
+ */
 int req_capsule_has_field(const struct req_capsule *pill,
                           const struct req_msg_field *field,
                           enum req_location loc)
@@ -1609,6 +1973,10 @@ int req_capsule_has_field(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_has_field);
 
+/**
+ * Returns a non-zero value if the given \a field is present in the given \a
+ * pill's PTLRPC request or reply (\a loc), else it returns 0.
+ */
 int req_capsule_field_present(const struct req_capsule *pill,
                               const struct req_msg_field *field,
                               enum req_location loc)
@@ -1623,6 +1991,12 @@ int req_capsule_field_present(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_field_present);
 
+/**
+ * This function shrinks the size of the _buffer_ of the \a pill's PTLRPC
+ * request or reply (\a loc).
+ *
+ * This is not the opposite of req_capsule_extend().
+ */
 void req_capsule_shrink(struct req_capsule *pill,
                         const struct req_msg_field *field,
                         unsigned int newlen,