Whamcloud - gitweb
LU-14393 protocol: basic batching processing framework
[fs/lustre-release.git] / lustre / ptlrpc / batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2020, 2022, DDN/Whamcloud Storage Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * lustre/ptlrpc/batch.c
30  *
31  * Batch Metadata Updating on the client
32  *
33  * Author: Qian Yingjin <qian@ddn.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDC
37
38 #include <linux/module.h>
39 #include <lustre_update.h>
40 #include <obd.h>
41
42 struct batch_update_buffer {
43         struct batch_update_request     *bub_req;
44         size_t                           bub_size;
45         size_t                           bub_end;
46         struct list_head                 bub_item;
47 };
48
49 struct batch_update_args {
50         struct batch_update_head        *ba_head;
51 };
52
53 /**
54  * Prepare inline update request
55  *
56  * Prepare BUT update ptlrpc inline request, and the request usuanlly includes
57  * one update buffer, which does not need bulk transfer.
58  */
59 static int batch_prep_inline_update_req(struct batch_update_head *head,
60                                         struct ptlrpc_request *req,
61                                         int repsize)
62 {
63         struct batch_update_buffer *buf;
64         struct but_update_header *buh;
65         int rc;
66
67         buf = list_entry(head->buh_buf_list.next,
68                           struct batch_update_buffer, bub_item);
69         req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
70                              buf->bub_end + sizeof(*buh));
71
72         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
73         if (rc != 0)
74                 RETURN(rc);
75
76         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
77         buh->buh_magic = BUT_HEADER_MAGIC;
78         buh->buh_count = 1;
79         buh->buh_inline_length = buf->bub_end;
80         buh->buh_reply_size = repsize;
81         buh->buh_update_count = head->buh_update_count;
82
83         memcpy(buh->buh_inline_data, buf->bub_req, buf->bub_end);
84
85         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
86                              RCL_SERVER, repsize);
87
88         ptlrpc_request_set_replen(req);
89         req->rq_request_portal = OUT_PORTAL;
90         req->rq_reply_portal = OSC_REPLY_PORTAL;
91
92         RETURN(rc);
93 }
94
95 static int batch_prep_update_req(struct batch_update_head *head,
96                                  struct ptlrpc_request **reqp)
97 {
98         struct ptlrpc_request *req;
99         struct ptlrpc_bulk_desc *desc;
100         struct batch_update_buffer *buf;
101         struct but_update_header *buh;
102         struct but_update_buffer *bub;
103         int page_count = 0;
104         int total = 0;
105         int repsize;
106         int rc;
107
108         ENTRY;
109
110         repsize = head->buh_repsize +
111                   cfs_size_round(offsetof(struct batch_update_reply,
112                                           burp_repmsg[0]));
113         if (repsize < OUT_UPDATE_REPLY_SIZE)
114                 repsize = OUT_UPDATE_REPLY_SIZE;
115
116         LASSERT(head->buh_buf_count > 0);
117
118         req = ptlrpc_request_alloc(class_exp2cliimp(head->buh_exp),
119                                    &RQF_MDS_BATCH);
120         if (req == NULL)
121                 RETURN(-ENOMEM);
122
123         if (head->buh_buf_count == 1) {
124                 buf = list_entry(head->buh_buf_list.next,
125                                  struct batch_update_buffer, bub_item);
126
127                 /* Check whether it can be packed inline */
128                 if (buf->bub_end + sizeof(struct but_update_header) <
129                     OUT_UPDATE_MAX_INLINE_SIZE) {
130                         rc = batch_prep_inline_update_req(head, req, repsize);
131                         if (rc == 0)
132                                 *reqp = req;
133                         GOTO(out_req, rc);
134                 }
135         }
136
137         req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
138                              sizeof(struct but_update_header));
139         req_capsule_set_size(&req->rq_pill, &RMF_BUT_BUF, RCL_CLIENT,
140                              head->buh_buf_count * sizeof(*bub));
141
142         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
143         if (rc != 0)
144                 GOTO(out_req, rc);
145
146         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
147         buh->buh_magic = BUT_HEADER_MAGIC;
148         buh->buh_count = head->buh_buf_count;
149         buh->buh_inline_length = 0;
150         buh->buh_reply_size = repsize;
151         buh->buh_update_count = head->buh_update_count;
152         bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
153         list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
154                 bub->bub_size = buf->bub_size;
155                 bub++;
156                 /* First *and* last might be partial pages, hence +1 */
157                 page_count += DIV_ROUND_UP(buf->bub_size, PAGE_SIZE) + 1;
158         }
159
160         req->rq_bulk_write = 1;
161         desc = ptlrpc_prep_bulk_imp(req, page_count,
162                                     MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
163                                     PTLRPC_BULK_GET_SOURCE,
164                                     MDS_BULK_PORTAL,
165                                     &ptlrpc_bulk_kiov_nopin_ops);
166         if (desc == NULL)
167                 GOTO(out_req, rc = -ENOMEM);
168
169         list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
170                 desc->bd_frag_ops->add_iov_frag(desc, buf->bub_req,
171                                                 buf->bub_size);
172                 total += buf->bub_size;
173         }
174         CDEBUG(D_OTHER, "Total %d in %u\n", total, head->buh_update_count);
175
176         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
177                              RCL_SERVER, repsize);
178
179         ptlrpc_request_set_replen(req);
180         req->rq_request_portal = OUT_PORTAL;
181         req->rq_reply_portal = OSC_REPLY_PORTAL;
182         *reqp = req;
183
184 out_req:
185         if (rc < 0)
186                 ptlrpc_req_finished(req);
187
188         RETURN(rc);
189 }
190
191 static struct batch_update_buffer *
192 current_batch_update_buffer(struct batch_update_head *head)
193 {
194         if (list_empty(&head->buh_buf_list))
195                 return NULL;
196
197         return list_entry(head->buh_buf_list.prev, struct batch_update_buffer,
198                           bub_item);
199 }
200
201 static int batch_update_buffer_create(struct batch_update_head *head,
202                                       size_t size)
203 {
204         struct batch_update_buffer *buf;
205         struct batch_update_request *bur;
206
207         OBD_ALLOC_PTR(buf);
208         if (buf == NULL)
209                 return -ENOMEM;
210
211         LASSERT(size > 0);
212         size = round_up(size, PAGE_SIZE);
213         OBD_ALLOC_LARGE(bur, size);
214         if (bur == NULL) {
215                 OBD_FREE_PTR(buf);
216                 return -ENOMEM;
217         }
218
219         bur->burq_magic = BUT_REQUEST_MAGIC;
220         bur->burq_count = 0;
221         buf->bub_req = bur;
222         buf->bub_size = size;
223         buf->bub_end = sizeof(*bur);
224         INIT_LIST_HEAD(&buf->bub_item);
225         list_add_tail(&buf->bub_item, &head->buh_buf_list);
226         head->buh_buf_count++;
227
228         return 0;
229 }
230
231 /**
232  * Destroy an @object_update_callback.
233  */
234 static void object_update_callback_fini(struct object_update_callback *ouc)
235 {
236         LASSERT(list_empty(&ouc->ouc_item));
237
238         OBD_FREE_PTR(ouc);
239 }
240
241 /**
242  * Insert an @object_update_callback into the the @batch_update_head.
243  *
244  * Usually each update in @batch_update_head will have one correspondent
245  * callback, and these callbacks will be called in ->rq_interpret_reply.
246  */
247 static int
248 batch_insert_update_callback(struct batch_update_head *head, void *data,
249                              object_update_interpret_t interpret)
250 {
251         struct object_update_callback *ouc;
252
253         OBD_ALLOC_PTR(ouc);
254         if (ouc == NULL)
255                 return -ENOMEM;
256
257         INIT_LIST_HEAD(&ouc->ouc_item);
258         ouc->ouc_interpret = interpret;
259         ouc->ouc_head = head;
260         ouc->ouc_data = data;
261         list_add_tail(&ouc->ouc_item, &head->buh_cb_list);
262
263         return 0;
264 }
265
266 /**
267  * Allocate and initialize batch update request.
268  *
269  * @batch_update_head is being used to track updates being executed on
270  * this OBD device. The update buffer will be 4K initially, and increased
271  * if needed.
272  */
273 static struct batch_update_head *
274 batch_update_request_create(struct obd_export *exp, struct lu_batch *bh)
275 {
276         struct batch_update_head *head;
277         int rc;
278
279         OBD_ALLOC_PTR(head);
280         if (head == NULL)
281                 return ERR_PTR(-ENOMEM);
282
283         INIT_LIST_HEAD(&head->buh_cb_list);
284         INIT_LIST_HEAD(&head->buh_buf_list);
285         head->buh_exp = exp;
286         head->buh_batch = bh;
287
288         rc = batch_update_buffer_create(head, PAGE_SIZE);
289         if (rc != 0) {
290                 OBD_FREE_PTR(head);
291                 RETURN(ERR_PTR(rc));
292         }
293
294         return head;
295 }
296
297 static void batch_update_request_destroy(struct batch_update_head *head)
298 {
299         struct batch_update_buffer *bub, *tmp;
300
301         if (head == NULL)
302                 return;
303
304         list_for_each_entry_safe(bub, tmp, &head->buh_buf_list, bub_item) {
305                 list_del(&bub->bub_item);
306                 if (bub->bub_req)
307                         OBD_FREE_LARGE(bub->bub_req, bub->bub_size);
308                 OBD_FREE_PTR(bub);
309         }
310
311         OBD_FREE_PTR(head);
312 }
313
314 static int batch_update_request_fini(struct batch_update_head *head,
315                                      struct ptlrpc_request *req,
316                                      struct batch_update_reply *reply, int rc)
317 {
318         struct object_update_callback *ouc, *next;
319         struct lustre_msg *repmsg = NULL;
320         int count = 0;
321         int index = 0;
322
323         ENTRY;
324
325         if (reply)
326                 count = reply->burp_count;
327
328         list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
329                 int rc1 = 0;
330
331                 list_del_init(&ouc->ouc_item);
332
333                 /*
334                  * The peer may only have handled some requests (indicated by
335                  * @count) in the packaged OUT PRC, we can only get results
336                  * for the handled part.
337                  */
338                 if (index < count) {
339                         repmsg = batch_update_repmsg_next(reply, repmsg);
340                         if (repmsg == NULL)
341                                 rc1 = -EPROTO;
342                         else
343                                 rc1 = repmsg->lm_result;
344                 } else {
345                         /*
346                          * The peer did not handle these request, let us return
347                          * -ECANCELED to the update interpreter for now.
348                          */
349                         repmsg = NULL;
350                         rc1 = -ECANCELED;
351                 }
352
353                 if (ouc->ouc_interpret != NULL)
354                         ouc->ouc_interpret(req, repmsg, ouc, rc1);
355
356                 object_update_callback_fini(ouc);
357                 if (rc == 0 && rc1 < 0)
358                         rc = rc1;
359         }
360
361         batch_update_request_destroy(head);
362
363         RETURN(rc);
364 }
365
366 static int batch_update_interpret(const struct lu_env *env,
367                                   struct ptlrpc_request *req,
368                                   void *args, int rc)
369 {
370         struct batch_update_args *aa = (struct batch_update_args *)args;
371         struct batch_update_reply *reply = NULL;
372
373         ENTRY;
374
375         if (aa->ba_head == NULL)
376                 RETURN(0);
377
378         /* Unpack the results from the reply message. */
379         if (req->rq_repmsg != NULL && req->rq_replied) {
380                 reply = req_capsule_server_sized_get(&req->rq_pill,
381                                                      &RMF_BUT_REPLY,
382                                                      sizeof(*reply));
383                 if ((reply == NULL ||
384                      reply->burp_magic != BUT_REPLY_MAGIC) && rc == 0)
385                         rc = -EPROTO;
386         }
387
388         rc = batch_update_request_fini(aa->ba_head, req, reply, rc);
389
390         RETURN(rc);
391 }
392
393 static int batch_send_update_req(const struct lu_env *env,
394                                  struct batch_update_head *head)
395 {
396         struct lu_batch *bh;
397         struct ptlrpc_request *req = NULL;
398         struct batch_update_args *aa;
399         int rc;
400
401         ENTRY;
402
403         if (head == NULL)
404                 RETURN(0);
405
406         bh = head->buh_batch;
407         rc = batch_prep_update_req(head, &req);
408         if (rc) {
409                 rc = batch_update_request_fini(head, NULL, NULL, rc);
410                 RETURN(rc);
411         }
412
413         aa = ptlrpc_req_async_args(aa, req);
414         aa->ba_head = head;
415         req->rq_interpret_reply = batch_update_interpret;
416
417         if (bh->lbt_flags & BATCH_FL_SYNC) {
418                 rc = ptlrpc_queue_wait(req);
419         } else {
420                 if ((bh->lbt_flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
421                     BATCH_FL_RDONLY) {
422                         ptlrpcd_add_req(req);
423                 } else if (bh->lbt_flags & BATCH_FL_RQSET) {
424                         ptlrpc_set_add_req(bh->lbt_rqset, req);
425                         ptlrpc_check_set(env, bh->lbt_rqset);
426                 } else {
427                         ptlrpcd_add_req(req);
428                 }
429                 req = NULL;
430         }
431
432         if (req != NULL)
433                 ptlrpc_req_finished(req);
434
435         RETURN(rc);
436 }
437
438 static int batch_update_request_add(struct batch_update_head **headp,
439                                     struct md_op_item *item,
440                                     md_update_pack_t packer,
441                                     object_update_interpret_t interpreter)
442 {
443         struct batch_update_head *head = *headp;
444         struct lu_batch *bh = head->buh_batch;
445         struct batch_update_buffer *buf;
446         struct lustre_msg *reqmsg;
447         size_t max_len;
448         int rc;
449
450         ENTRY;
451
452         for (; ;) {
453                 buf = current_batch_update_buffer(head);
454                 LASSERT(buf != NULL);
455                 max_len = buf->bub_size - buf->bub_end;
456                 reqmsg = (struct lustre_msg *)((char *)buf->bub_req +
457                                                 buf->bub_end);
458                 rc = packer(head, reqmsg, &max_len, item);
459                 if (rc == -E2BIG) {
460                         int rc2;
461
462                         /* Create new batch object update buffer */
463                         rc2 = batch_update_buffer_create(head,
464                                 max_len + offsetof(struct batch_update_request,
465                                                    burq_reqmsg[0]) + 1);
466                         if (rc2 != 0) {
467                                 rc = rc2;
468                                 break;
469                         }
470                 } else {
471                         if (rc == 0) {
472                                 buf->bub_end += max_len;
473                                 buf->bub_req->burq_count++;
474                                 head->buh_update_count++;
475                                 head->buh_repsize += reqmsg->lm_repsize;
476                         }
477                         break;
478                 }
479         }
480
481         if (rc)
482                 GOTO(out, rc);
483
484         rc = batch_insert_update_callback(head, item, interpreter);
485         if (rc)
486                 GOTO(out, rc);
487
488         /* Unplug the batch queue if accumulated enough update requests. */
489         if (bh->lbt_max_count && head->buh_update_count >= bh->lbt_max_count) {
490                 rc = batch_send_update_req(NULL, head);
491                 *headp = NULL;
492         }
493 out:
494         if (rc) {
495                 batch_update_request_destroy(head);
496                 *headp = NULL;
497         }
498
499         RETURN(rc);
500 }
501
502 struct lu_batch *cli_batch_create(struct obd_export *exp,
503                                   enum lu_batch_flags flags, __u32 max_count)
504 {
505         struct cli_batch *cbh;
506         struct lu_batch *bh;
507
508         ENTRY;
509
510         OBD_ALLOC_PTR(cbh);
511         if (!cbh)
512                 RETURN(ERR_PTR(-ENOMEM));
513
514         bh = &cbh->cbh_super;
515         bh->lbt_result = 0;
516         bh->lbt_flags = flags;
517         bh->lbt_max_count = max_count;
518
519         cbh->cbh_head = batch_update_request_create(exp, bh);
520         if (IS_ERR(cbh->cbh_head)) {
521                 bh = (struct lu_batch *)cbh->cbh_head;
522                 OBD_FREE_PTR(cbh);
523         }
524
525         RETURN(bh);
526 }
527 EXPORT_SYMBOL(cli_batch_create);
528
529 int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh)
530 {
531         struct cli_batch *cbh;
532         int rc;
533
534         ENTRY;
535
536         cbh = container_of(bh, struct cli_batch, cbh_super);
537         rc = batch_send_update_req(NULL, cbh->cbh_head);
538
539         OBD_FREE_PTR(cbh);
540         RETURN(rc);
541 }
542 EXPORT_SYMBOL(cli_batch_stop);
543
544 int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait)
545 {
546         struct cli_batch *cbh;
547         int rc;
548
549         ENTRY;
550
551         cbh = container_of(bh, struct cli_batch, cbh_super);
552         if (cbh->cbh_head == NULL)
553                 RETURN(0);
554
555         rc = batch_send_update_req(NULL, cbh->cbh_head);
556         cbh->cbh_head = NULL;
557
558         RETURN(rc);
559 }
560 EXPORT_SYMBOL(cli_batch_flush);
561
562 int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
563                   struct md_op_item *item, md_update_pack_t packer,
564                   object_update_interpret_t interpreter)
565 {
566         struct cli_batch *cbh;
567         int rc;
568
569         ENTRY;
570
571         cbh = container_of(bh, struct cli_batch, cbh_super);
572         if (cbh->cbh_head == NULL) {
573                 cbh->cbh_head = batch_update_request_create(exp, bh);
574                 if (IS_ERR(cbh->cbh_head))
575                         RETURN(PTR_ERR(cbh->cbh_head));
576         }
577
578         rc = batch_update_request_add(&cbh->cbh_head, item,
579                                       packer, interpreter);
580
581         RETURN(rc);
582 }
583 EXPORT_SYMBOL(cli_batch_add);