Whamcloud - gitweb
LU-14393 protocol: basic batching processing framework
[fs/lustre-release.git] / lustre / ptlrpc / layout.c
index 9d20c18..bbd9d8b 100644 (file)
@@ -584,6 +584,17 @@ static const struct req_msg_field *mds_setattr_server[] = {
         &RMF_CAPA2
 };
 
+static const struct req_msg_field *mds_batch_client[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_BUT_HEADER,
+       &RMF_BUT_BUF,
+};
+
+static const struct req_msg_field *mds_batch_server[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_BUT_REPLY,
+};
+
 static const struct req_msg_field *llog_origin_handle_create_client[] = {
        &RMF_PTLRPC_BODY,
        &RMF_LLOGD_BODY,
@@ -854,6 +865,7 @@ static struct req_format *req_formats[] = {
        &RQF_CONNECT,
        &RQF_LFSCK_NOTIFY,
        &RQF_LFSCK_QUERY,
+       &RQF_MDS_BATCH,
 };
 
 struct req_msg_field {
@@ -1304,6 +1316,20 @@ struct req_msg_field RMF_OST_LADVISE =
                    lustre_swab_ladvise, NULL);
 EXPORT_SYMBOL(RMF_OST_LADVISE);
 
+struct req_msg_field RMF_BUT_REPLY =
+                       DEFINE_MSGF("batch_update_reply", 0, -1,
+                                   lustre_swab_batch_update_reply, NULL);
+EXPORT_SYMBOL(RMF_BUT_REPLY);
+
+struct req_msg_field RMF_BUT_HEADER = DEFINE_MSGF("but_update_header", 0,
+                               -1, lustre_swab_but_update_header, NULL);
+EXPORT_SYMBOL(RMF_BUT_HEADER);
+
+struct req_msg_field RMF_BUT_BUF = DEFINE_MSGF("but_update_buf",
+                       RMF_F_STRUCT_ARRAY, sizeof(struct but_update_buffer),
+                       lustre_swab_but_update_buffer, NULL);
+EXPORT_SYMBOL(RMF_BUT_BUF);
+
 /*
  * Request formats.
  */
@@ -1525,6 +1551,11 @@ struct req_format RQF_MDS_GET_INFO =
                         mds_getinfo_server);
 EXPORT_SYMBOL(RQF_MDS_GET_INFO);
 
+struct req_format RQF_MDS_BATCH =
+       DEFINE_REQ_FMT0("MDS_BATCH", mds_batch_client,
+                       mds_batch_server);
+EXPORT_SYMBOL(RQF_MDS_BATCH);
+
 struct req_format RQF_LDLM_ENQUEUE =
         DEFINE_REQ_FMT0("LDLM_ENQUEUE",
                         ldlm_enqueue_client, ldlm_enqueue_lvb_server);
@@ -1957,17 +1988,62 @@ int req_capsule_server_pack(struct req_capsule *pill)
        LASSERT(fmt != NULL);
 
        count = req_capsule_filled_sizes(pill, RCL_SERVER);
-       rc = lustre_pack_reply(pill->rc_req, count,
-                              pill->rc_area[RCL_SERVER], NULL);
-       if (rc != 0) {
-               DEBUG_REQ(D_ERROR, pill->rc_req,
-                         "Cannot pack %d fields in format '%s'",
-                         count, fmt->rf_name);
+       if (req_capsule_ptlreq(pill)) {
+               rc = lustre_pack_reply(pill->rc_req, count,
+                                      pill->rc_area[RCL_SERVER], NULL);
+               if (rc != 0) {
+                       DEBUG_REQ(D_ERROR, pill->rc_req,
+                                 "Cannot pack %d fields in format '%s'",
+                                  count, fmt->rf_name);
+               }
+       } else { /* SUB request */
+               __u32 msg_len;
+
+               msg_len = lustre_msg_size_v2(count, pill->rc_area[RCL_SERVER]);
+               if (msg_len > pill->rc_reqmsg->lm_repsize) {
+                       /* TODO: Check whether there is enough buffer size */
+                       CDEBUG(D_INFO,
+                              "Overflow pack %d fields in format '%s' for "
+                              "the SUB request with message len %u:%u\n",
+                              count, fmt->rf_name, msg_len,
+                              pill->rc_reqmsg->lm_repsize);
+               }
+
+               rc = 0;
+               lustre_init_msg_v2(pill->rc_repmsg, count,
+                                  pill->rc_area[RCL_SERVER], NULL);
        }
+
        return rc;
 }
 EXPORT_SYMBOL(req_capsule_server_pack);
 
+int req_capsule_client_pack(struct req_capsule *pill)
+{
+       const struct req_format *fmt;
+       int count;
+       int rc = 0;
+
+       LASSERT(pill->rc_loc == RCL_CLIENT);
+       fmt = pill->rc_fmt;
+       LASSERT(fmt != NULL);
+
+       count = req_capsule_filled_sizes(pill, RCL_CLIENT);
+       if (req_capsule_ptlreq(pill)) {
+               struct ptlrpc_request *req = pill->rc_req;
+
+               rc = lustre_pack_request(req, req->rq_import->imp_msg_magic,
+                                        count, pill->rc_area[RCL_CLIENT],
+                                        NULL);
+       } else {
+               /* Sub request in a batch PTLRPC request */
+               lustre_init_msg_v2(pill->rc_reqmsg, count,
+                                  pill->rc_area[RCL_CLIENT], NULL);
+       }
+       return rc;
+}
+EXPORT_SYMBOL(req_capsule_client_pack);
+
 /**
  * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
  * corresponding to the given RMF (\a field).
@@ -2160,11 +2236,12 @@ static void *__req_capsule_get(struct req_capsule *pill,
         value = getter(msg, offset, len);
 
         if (value == NULL) {
+               LASSERT(pill->rc_req != NULL);
                 DEBUG_REQ(D_ERROR, pill->rc_req,
-                         "Wrong buffer for field '%s' (%u of %u) in format '%s', %u vs. %u (%s)",
-                         field->rmf_name, offset, lustre_msg_bufcount(msg),
-                         fmt->rf_name, lustre_msg_buflen(msg, offset), len,
-                         rcl_names[loc]);
+                          "Wrong buffer for field '%s' (%u of %u) in format '%s', %u vs. %u (%s)",
+                          field->rmf_name, offset, lustre_msg_bufcount(msg),
+                          fmt->rf_name, lustre_msg_buflen(msg, offset), len,
+                          rcl_names[loc]);
         } else {
                 swabber_dumper_helper(pill, field, loc, offset, value, len,
                                       dump, swabber);
@@ -2393,10 +2470,18 @@ EXPORT_SYMBOL(req_capsule_get_size);
  */
 __u32 req_capsule_msg_size(struct req_capsule *pill, enum req_location loc)
 {
-        return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
-                               pill->rc_fmt->rf_fields[loc].nr,
-                               pill->rc_area[loc]);
+       if (req_capsule_ptlreq(pill)) {
+               return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
+                                      pill->rc_fmt->rf_fields[loc].nr,
+                                      pill->rc_area[loc]);
+       } else { /* SUB request in a batch request */
+               int count;
+
+               count = req_capsule_filled_sizes(pill, loc);
+               return lustre_msg_size_v2(count, pill->rc_area[loc]);
+       }
 }
+EXPORT_SYMBOL(req_capsule_msg_size);
 
 /**
  * While req_capsule_msg_size() computes the size of a PTLRPC request or reply
@@ -2550,16 +2635,17 @@ void req_capsule_shrink(struct req_capsule *pill,
        LASSERTF(newlen <= len, "%s:%s, oldlen=%u, newlen=%u\n",
                 fmt->rf_name, field->rmf_name, len, newlen);
 
+       len = lustre_shrink_msg(msg, offset, newlen, 1);
        if (loc == RCL_CLIENT) {
-               pill->rc_req->rq_reqlen = lustre_shrink_msg(msg, offset, newlen,
-                                                           1);
+               if (req_capsule_ptlreq(pill))
+                       pill->rc_req->rq_reqlen = len;
        } else {
-               pill->rc_req->rq_replen = lustre_shrink_msg(msg, offset, newlen,
-                                                           1);
                /* update also field size in reply lenghts arrays for possible
                 * reply re-pack due to req_capsule_server_grow() call.
                 */
                req_capsule_set_size(pill, field, loc, newlen);
+               if (req_capsule_ptlreq(pill))
+                       pill->rc_req->rq_replen = len;
        }
 }
 EXPORT_SYMBOL(req_capsule_shrink);
@@ -2684,6 +2770,9 @@ int req_check_sepol(struct req_capsule *pill)
        const char *sepol = NULL;
        const char *nm_sepol = NULL;
 
+       if (req_capsule_subreq(pill))
+               return 0;
+
        if (!pill->rc_req)
                return -EPROTO;
 
@@ -2717,3 +2806,32 @@ nm:
 }
 EXPORT_SYMBOL(req_check_sepol);
 #endif
+
+void req_capsule_subreq_init(struct req_capsule *pill,
+                            const struct req_format *fmt,
+                            struct ptlrpc_request *req,
+                            struct lustre_msg *reqmsg,
+                            struct lustre_msg *repmsg,
+                            enum req_location loc)
+{
+       req_capsule_init(pill, req, loc);
+       req_capsule_set(pill, fmt);
+       pill->rc_reqmsg = reqmsg;
+       pill->rc_repmsg = repmsg;
+}
+EXPORT_SYMBOL(req_capsule_subreq_init);
+
+void req_capsule_set_replen(struct req_capsule *pill)
+{
+       if (req_capsule_ptlreq(pill)) {
+               ptlrpc_request_set_replen(pill->rc_req);
+       } else { /* SUB request in a batch request */
+               int count;
+
+               count = req_capsule_filled_sizes(pill, RCL_SERVER);
+               pill->rc_reqmsg->lm_repsize =
+                               lustre_msg_size_v2(count,
+                                                  pill->rc_area[RCL_SERVER]);
+       }
+}
+EXPORT_SYMBOL(req_capsule_set_replen);