Whamcloud - gitweb
LU-14393 protocol: basic batching processing framework
[fs/lustre-release.git] / lustre / mdt / mdt_batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2020, DDN Storage Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * lustre/mdt/mdt_batch.c
30  *
31  * Batch Metadata Updating on the server (MDT)
32  *
33  * Author: Qian Yingjin <qian@ddn.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 #include <lustre_mds.h>
41 #include "mdt_internal.h"
42
43 static int mdt_batch_unpack(struct mdt_thread_info *info, __u32 opc)
44 {
45         int rc = 0;
46
47         switch (opc) {
48         default:
49                 rc = -EOPNOTSUPP;
50                 CERROR("%s: Unexpected opcode %d: rc = %d\n",
51                        mdt_obd_name(info->mti_mdt), opc, rc);
52                 break;
53         }
54
55         RETURN(rc);
56 }
57
58 static int mdt_batch_pack_repmsg(struct mdt_thread_info *info)
59 {
60         return 0;
61 }
62
63 /* Batch UpdaTe Request with a format known in advance */
64 #define TGT_BUT_HDL(flags, opc, fn)                     \
65 [opc - BUT_FIRST_OPC] = {                               \
66         .th_name        = #opc,                         \
67         .th_fail_id     = 0,                            \
68         .th_opc         = opc,                          \
69         .th_flags       = flags,                        \
70         .th_act         = fn,                           \
71         .th_fmt         = &RQF_ ## opc,                 \
72         .th_version     = LUSTRE_MDS_VERSION,           \
73         .th_hp          = NULL,                         \
74 }
75
76 static struct tgt_handler mdt_batch_handlers[BUT_LAST_OPC];
77
78 static struct tgt_handler *mdt_batch_handler_find(__u32 opc)
79 {
80         struct tgt_handler *h;
81
82         h = NULL;
83         if (opc >= BUT_FIRST_OPC && opc < BUT_LAST_OPC) {
84                 h = &mdt_batch_handlers[opc - BUT_FIRST_OPC];
85                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
86                          h->th_opc, opc);
87         } else {
88                 h = NULL; /* unsupported opc */
89         }
90         return h;
91 }
92
93 int mdt_batch(struct tgt_session_info *tsi)
94 {
95         struct mdt_thread_info *info = tsi2mdt_info(tsi);
96         struct req_capsule *pill = &info->mti_sub_pill;
97         struct ptlrpc_request *req = tgt_ses_req(tsi);
98         struct but_update_header *buh;
99         struct but_update_buffer *bub = NULL;
100         struct batch_update_reply *reply = NULL;
101         struct ptlrpc_bulk_desc *desc = NULL;
102         struct lustre_msg *repmsg = NULL;
103         __u32 handled_update_count = 0;
104         __u32 update_buf_count;
105         __u32 packed_replen;
106         void **update_bufs;
107         int buh_size;
108         int rc;
109         int i;
110
111         ENTRY;
112
113         buh_size = req_capsule_get_size(&req->rq_pill, &RMF_BUT_HEADER,
114                                         RCL_CLIENT);
115         if (buh_size <= 0)
116                 RETURN(err_serious(-EPROTO));
117
118         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
119         if (buh == NULL)
120                 RETURN(err_serious(-EPROTO));
121
122         if (buh->buh_magic != BUT_HEADER_MAGIC) {
123                 CERROR("%s: invalid update header magic %x expect %x: "
124                        "rc = %d\n", tgt_name(tsi->tsi_tgt), buh->buh_magic,
125                        BUT_HEADER_MAGIC, -EPROTO);
126                 RETURN(err_serious(-EPROTO));
127         }
128
129         update_buf_count = buh->buh_count;
130         if (update_buf_count == 0)
131                 RETURN(err_serious(-EPROTO));
132
133         OBD_ALLOC_PTR_ARRAY(update_bufs, update_buf_count);
134         if (update_bufs == NULL)
135                 RETURN(err_serious(-ENOMEM));
136
137         if (buh->buh_inline_length > 0) {
138                 update_bufs[0] = buh->buh_inline_data;
139         } else {
140                 struct but_update_buffer *tmp;
141                 int page_count = 0;
142
143                 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
144                 if (bub == NULL)
145                         GOTO(out, rc = err_serious(-EPROTO));
146
147                 for (i = 0; i < update_buf_count; i++)
148                         /* First *and* last might be partial pages, hence +1 */
149                         page_count += DIV_ROUND_UP(bub[i].bub_size,
150                                                    PAGE_SIZE) + 1;
151
152                 desc = ptlrpc_prep_bulk_exp(req, page_count,
153                                             PTLRPC_BULK_OPS_COUNT,
154                                             PTLRPC_BULK_GET_SINK,
155                                             MDS_BULK_PORTAL,
156                                             &ptlrpc_bulk_kiov_nopin_ops);
157                 if (desc == NULL)
158                         GOTO(out, rc = err_serious(-ENOMEM));
159
160                 tmp = bub;
161                 for (i = 0; i < update_buf_count; i++, tmp++) {
162                         if (tmp->bub_size >= OUT_MAXREQSIZE)
163                                 GOTO(out, rc = err_serious(-EPROTO));
164
165                         OBD_ALLOC_LARGE(update_bufs[i], tmp->bub_size);
166                         if (update_bufs[i] == NULL)
167                                 GOTO(out, rc = err_serious(-ENOMEM));
168
169                         desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
170                                                         tmp->bub_size);
171                 }
172
173                 req->rq_bulk_write = 1;
174                 rc = sptlrpc_svc_prep_bulk(req, desc);
175                 if (rc != 0)
176                         GOTO(out, rc = err_serious(rc));
177
178                 rc = target_bulk_io(req->rq_export, desc);
179                 if (rc < 0)
180                         GOTO(out, rc = err_serious(rc));
181         }
182
183         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
184                              buh->buh_reply_size);
185         rc = req_capsule_server_pack(&req->rq_pill);
186         if (rc != 0) {
187                 DEBUG_REQ(D_ERROR, req, "%s: Can't pack response: rc = %d\n",
188                        tgt_name(tsi->tsi_tgt), rc);
189                 GOTO(out, rc);
190         }
191
192         /* Prepare the update reply buffer */
193         reply = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
194         if (reply == NULL)
195                 GOTO(out, rc = -EPROTO);
196
197         reply->burp_magic = BUT_REPLY_MAGIC;
198         packed_replen = sizeof(*reply);
199         info->mti_max_repsize = buh->buh_reply_size;
200         info->mti_batch_env = 1;
201         info->mti_pill = pill;
202
203         /* Walk through sub requests in the batch request to execute them. */
204         for (i = 0; i < update_buf_count; i++) {
205                 struct batch_update_request *bur;
206                 struct lustre_msg *reqmsg = NULL;
207                 struct tgt_handler *h;
208                 int update_count;
209                 int j;
210
211                 bur = update_bufs[i];
212                 update_count = bur->burq_count;
213                 for (j = 0; j < update_count; j++) {
214                         __u32 replen;
215
216                         reqmsg = batch_update_reqmsg_next(bur, reqmsg);
217                         repmsg = batch_update_repmsg_next(reply, repmsg);
218
219                         if (handled_update_count > buh->buh_update_count)
220                                 GOTO(out, rc = -EOVERFLOW);
221
222                         LASSERT(reqmsg != NULL && repmsg != NULL);
223                         LASSERTF(reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2,
224                                  "Invalid reqmsg magic %x expected %x\n",
225                                  reqmsg->lm_magic, LUSTRE_MSG_MAGIC_V2);
226
227                         h = mdt_batch_handler_find(reqmsg->lm_opc);
228                         if (unlikely(h == NULL)) {
229                                 CERROR("%s: unsupported opc: 0x%x\n",
230                                        tgt_name(tsi->tsi_tgt), reqmsg->lm_opc);
231                                 GOTO(out, rc = -ENOTSUPP);
232                         }
233
234                         /* TODO: Check resend case only for modifying RPC */
235
236                         LASSERT(h->th_fmt != NULL);
237                         req_capsule_subreq_init(pill, h->th_fmt, req,
238                                                 reqmsg, repmsg, RCL_SERVER);
239
240                         rc = mdt_batch_unpack(info, reqmsg->lm_opc);
241                         if (rc) {
242                                 CERROR("%s: Can't unpack subreq, rc = %d\n",
243                                        mdt_obd_name(info->mti_mdt), rc);
244                                 GOTO(out, rc);
245                         }
246
247                         rc = mdt_batch_pack_repmsg(info);
248                         if (rc)
249                                 GOTO(out, rc);
250
251                         rc = h->th_act(tsi);
252                         if (rc)
253                                 GOTO(out, rc);
254
255                         repmsg->lm_result = rc;
256                         mdt_thread_info_reset(info);
257                         /*
258                          * TODO: Check whether overflow reply buffer.
259                          * Fix reply, shrink and/or grow reply buffers.
260                          */
261                         replen = lustre_packed_msg_size(repmsg);
262                         info->mti_max_repsize -= replen;
263                         packed_replen += replen;
264                         handled_update_count++;
265                 }
266         }
267
268         /*
269          * TODO: Grow/shrink the reply buffer.
270          */
271         CDEBUG(D_INFO, "reply size %u packed replen %u\n",
272                buh->buh_reply_size, packed_replen);
273         if (buh->buh_reply_size > packed_replen)
274                 req_capsule_shrink(&req->rq_pill, &RMF_BUT_REPLY,
275                                    packed_replen, RCL_SERVER);
276 out:
277         if (reply != NULL)
278                 reply->burp_count = handled_update_count;
279
280         if (update_bufs != NULL) {
281                 if (bub != NULL) {
282                         for (i = 0; i < update_buf_count; i++, bub++) {
283                                 if (update_bufs[i] != NULL)
284                                         OBD_FREE_LARGE(update_bufs[i],
285                                                        bub->bub_size);
286                         }
287                 }
288
289                 OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
290         }
291
292         if (desc != NULL)
293                 ptlrpc_free_bulk(desc);
294
295         mdt_thread_info_fini(info);
296         RETURN(rc);
297 }
298