Whamcloud - gitweb
LU-14139 statahead: add stats for batch RPC requests
[fs/lustre-release.git] / lustre / ptlrpc / batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2020, 2022, DDN/Whamcloud Storage Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * lustre/ptlrpc/batch.c
30  *
31  * Batch Metadata Updating on the client
32  *
33  * Author: Qian Yingjin <qian@ddn.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDC
37
38 #include <linux/module.h>
39 #include <obd_class.h>
40 #include <obd.h>
41 #ifdef HAVE_SERVER_SUPPORT
42 #include <lustre_update.h>
43 #else
44
45 #define OUT_UPDATE_REPLY_SIZE           4096
46
47 static inline struct lustre_msg *
48 batch_update_repmsg_next(struct batch_update_reply *bur,
49                          struct lustre_msg *repmsg)
50 {
51         if (repmsg)
52                 return (struct lustre_msg *)((char *)repmsg +
53                                              lustre_packed_msg_size(repmsg));
54         else
55                 return &bur->burp_repmsg[0];
56 }
57 #endif
58
59 struct batch_update_buffer {
60         struct batch_update_request     *bub_req;
61         size_t                           bub_size;
62         size_t                           bub_end;
63         struct list_head                 bub_item;
64 };
65
66 struct batch_update_args {
67         struct batch_update_head        *ba_head;
68 };
69
70 /**
71  * Prepare inline update request
72  *
73  * Prepare BUT update ptlrpc inline request, and the request usuanlly includes
74  * one update buffer, which does not need bulk transfer.
75  */
76 static int batch_prep_inline_update_req(struct batch_update_head *head,
77                                         struct ptlrpc_request *req,
78                                         int repsize)
79 {
80         struct batch_update_buffer *buf;
81         struct but_update_header *buh;
82         int rc;
83
84         buf = list_entry(head->buh_buf_list.next,
85                           struct batch_update_buffer, bub_item);
86         req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
87                              buf->bub_end + sizeof(*buh));
88
89         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
90         if (rc != 0)
91                 RETURN(rc);
92
93         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
94         buh->buh_magic = BUT_HEADER_MAGIC;
95         buh->buh_count = 1;
96         buh->buh_inline_length = buf->bub_end;
97         buh->buh_reply_size = repsize;
98         buh->buh_update_count = head->buh_update_count;
99
100         memcpy(buh->buh_inline_data, buf->bub_req, buf->bub_end);
101
102         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
103                              RCL_SERVER, repsize);
104
105         ptlrpc_request_set_replen(req);
106         req->rq_request_portal = OUT_PORTAL;
107         req->rq_reply_portal = OSC_REPLY_PORTAL;
108
109         RETURN(rc);
110 }
111
112 static int batch_prep_update_req(struct batch_update_head *head,
113                                  struct ptlrpc_request **reqp)
114 {
115         struct ptlrpc_request *req;
116         struct ptlrpc_bulk_desc *desc;
117         struct batch_update_buffer *buf;
118         struct but_update_header *buh;
119         struct but_update_buffer *bub;
120         int page_count = 0;
121         int total = 0;
122         int repsize;
123         int rc;
124
125         ENTRY;
126
127         repsize = head->buh_repsize +
128                   cfs_size_round(offsetof(struct batch_update_reply,
129                                           burp_repmsg[0]));
130         if (repsize < OUT_UPDATE_REPLY_SIZE)
131                 repsize = OUT_UPDATE_REPLY_SIZE;
132
133         LASSERT(head->buh_buf_count > 0);
134
135         req = ptlrpc_request_alloc(class_exp2cliimp(head->buh_exp),
136                                    &RQF_MDS_BATCH);
137         if (req == NULL)
138                 RETURN(-ENOMEM);
139
140         if (head->buh_buf_count == 1) {
141                 buf = list_entry(head->buh_buf_list.next,
142                                  struct batch_update_buffer, bub_item);
143
144                 /* Check whether it can be packed inline */
145                 if (buf->bub_end + sizeof(struct but_update_header) <
146                     OUT_UPDATE_MAX_INLINE_SIZE) {
147                         rc = batch_prep_inline_update_req(head, req, repsize);
148                         if (rc == 0)
149                                 *reqp = req;
150                         GOTO(out_req, rc);
151                 }
152         }
153
154         req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
155                              sizeof(struct but_update_header));
156         req_capsule_set_size(&req->rq_pill, &RMF_BUT_BUF, RCL_CLIENT,
157                              head->buh_buf_count * sizeof(*bub));
158
159         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
160         if (rc != 0)
161                 GOTO(out_req, rc);
162
163         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
164         buh->buh_magic = BUT_HEADER_MAGIC;
165         buh->buh_count = head->buh_buf_count;
166         buh->buh_inline_length = 0;
167         buh->buh_reply_size = repsize;
168         buh->buh_update_count = head->buh_update_count;
169         bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
170         list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
171                 bub->bub_size = buf->bub_size;
172                 bub++;
173                 /* First *and* last might be partial pages, hence +1 */
174                 page_count += DIV_ROUND_UP(buf->bub_size, PAGE_SIZE) + 1;
175         }
176
177         req->rq_bulk_write = 1;
178         desc = ptlrpc_prep_bulk_imp(req, page_count,
179                                     MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
180                                     PTLRPC_BULK_GET_SOURCE,
181                                     MDS_BULK_PORTAL,
182                                     &ptlrpc_bulk_kiov_nopin_ops);
183         if (desc == NULL)
184                 GOTO(out_req, rc = -ENOMEM);
185
186         list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
187                 desc->bd_frag_ops->add_iov_frag(desc, buf->bub_req,
188                                                 buf->bub_size);
189                 total += buf->bub_size;
190         }
191         CDEBUG(D_OTHER, "Total %d in %u\n", total, head->buh_update_count);
192
193         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
194                              RCL_SERVER, repsize);
195
196         ptlrpc_request_set_replen(req);
197         req->rq_request_portal = OUT_PORTAL;
198         req->rq_reply_portal = OSC_REPLY_PORTAL;
199         *reqp = req;
200
201 out_req:
202         if (rc < 0)
203                 ptlrpc_req_finished(req);
204
205         RETURN(rc);
206 }
207
208 static struct batch_update_buffer *
209 current_batch_update_buffer(struct batch_update_head *head)
210 {
211         if (list_empty(&head->buh_buf_list))
212                 return NULL;
213
214         return list_entry(head->buh_buf_list.prev, struct batch_update_buffer,
215                           bub_item);
216 }
217
218 static int batch_update_buffer_create(struct batch_update_head *head,
219                                       size_t size)
220 {
221         struct batch_update_buffer *buf;
222         struct batch_update_request *bur;
223
224         OBD_ALLOC_PTR(buf);
225         if (buf == NULL)
226                 return -ENOMEM;
227
228         LASSERT(size > 0);
229         size = round_up(size, PAGE_SIZE);
230         OBD_ALLOC_LARGE(bur, size);
231         if (bur == NULL) {
232                 OBD_FREE_PTR(buf);
233                 return -ENOMEM;
234         }
235
236         bur->burq_magic = BUT_REQUEST_MAGIC;
237         bur->burq_count = 0;
238         buf->bub_req = bur;
239         buf->bub_size = size;
240         buf->bub_end = sizeof(*bur);
241         INIT_LIST_HEAD(&buf->bub_item);
242         list_add_tail(&buf->bub_item, &head->buh_buf_list);
243         head->buh_buf_count++;
244
245         return 0;
246 }
247
248 /**
249  * Destroy an @object_update_callback.
250  */
251 static void object_update_callback_fini(struct object_update_callback *ouc)
252 {
253         LASSERT(list_empty(&ouc->ouc_item));
254
255         OBD_FREE_PTR(ouc);
256 }
257
258 /**
259  * Insert an @object_update_callback into the the @batch_update_head.
260  *
261  * Usually each update in @batch_update_head will have one correspondent
262  * callback, and these callbacks will be called in ->rq_interpret_reply.
263  */
264 static int
265 batch_insert_update_callback(struct batch_update_head *head, void *data,
266                              object_update_interpret_t interpret)
267 {
268         struct object_update_callback *ouc;
269
270         OBD_ALLOC_PTR(ouc);
271         if (ouc == NULL)
272                 return -ENOMEM;
273
274         INIT_LIST_HEAD(&ouc->ouc_item);
275         ouc->ouc_interpret = interpret;
276         ouc->ouc_head = head;
277         ouc->ouc_data = data;
278         list_add_tail(&ouc->ouc_item, &head->buh_cb_list);
279
280         return 0;
281 }
282
283 /**
284  * Allocate and initialize batch update request.
285  *
286  * @batch_update_head is being used to track updates being executed on
287  * this OBD device. The update buffer will be 4K initially, and increased
288  * if needed.
289  */
290 static struct batch_update_head *
291 batch_update_request_create(struct obd_export *exp, struct lu_batch *bh)
292 {
293         struct batch_update_head *head;
294         int rc;
295
296         OBD_ALLOC_PTR(head);
297         if (head == NULL)
298                 return ERR_PTR(-ENOMEM);
299
300         INIT_LIST_HEAD(&head->buh_cb_list);
301         INIT_LIST_HEAD(&head->buh_buf_list);
302         head->buh_exp = exp;
303         head->buh_batch = bh;
304
305         rc = batch_update_buffer_create(head, PAGE_SIZE);
306         if (rc != 0) {
307                 OBD_FREE_PTR(head);
308                 RETURN(ERR_PTR(rc));
309         }
310
311         return head;
312 }
313
314 static void batch_update_request_destroy(struct batch_update_head *head)
315 {
316         struct batch_update_buffer *bub, *tmp;
317
318         if (head == NULL)
319                 return;
320
321         list_for_each_entry_safe(bub, tmp, &head->buh_buf_list, bub_item) {
322                 list_del(&bub->bub_item);
323                 if (bub->bub_req)
324                         OBD_FREE_LARGE(bub->bub_req, bub->bub_size);
325                 OBD_FREE_PTR(bub);
326         }
327
328         OBD_FREE_PTR(head);
329 }
330
331 static int batch_update_request_fini(struct batch_update_head *head,
332                                      struct ptlrpc_request *req,
333                                      struct batch_update_reply *reply, int rc)
334 {
335         struct object_update_callback *ouc, *next;
336         struct lustre_msg *repmsg = NULL;
337         int count = 0;
338         int index = 0;
339
340         ENTRY;
341
342         if (reply)
343                 count = reply->burp_count;
344
345         list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
346                 int rc1 = 0;
347
348                 list_del_init(&ouc->ouc_item);
349
350                 /*
351                  * The peer may only have handled some requests (indicated by
352                  * @count) in the packaged OUT PRC, we can only get results
353                  * for the handled part.
354                  */
355                 if (index < count) {
356                         repmsg = batch_update_repmsg_next(reply, repmsg);
357                         if (repmsg == NULL)
358                                 rc1 = -EPROTO;
359                         else
360                                 rc1 = repmsg->lm_result;
361                 } else {
362                         /*
363                          * The peer did not handle these request, let us return
364                          * -ECANCELED to the update interpreter for now.
365                          */
366                         repmsg = NULL;
367                         rc1 = -ECANCELED;
368                 }
369
370                 if (ouc->ouc_interpret != NULL)
371                         ouc->ouc_interpret(req, repmsg, ouc, rc1);
372
373                 object_update_callback_fini(ouc);
374                 if (rc == 0 && rc1 < 0)
375                         rc = rc1;
376         }
377
378         batch_update_request_destroy(head);
379
380         RETURN(rc);
381 }
382
383 static int batch_update_interpret(const struct lu_env *env,
384                                   struct ptlrpc_request *req,
385                                   void *args, int rc)
386 {
387         struct batch_update_args *aa = (struct batch_update_args *)args;
388         struct batch_update_reply *reply = NULL;
389
390         ENTRY;
391
392         if (aa->ba_head == NULL)
393                 RETURN(0);
394
395         ptlrpc_put_mod_rpc_slot(req);
396         /* Unpack the results from the reply message. */
397         if (req->rq_repmsg != NULL && req->rq_replied) {
398                 reply = req_capsule_server_sized_get(&req->rq_pill,
399                                                      &RMF_BUT_REPLY,
400                                                      sizeof(*reply));
401                 if ((reply == NULL ||
402                      reply->burp_magic != BUT_REPLY_MAGIC) && rc == 0)
403                         rc = -EPROTO;
404         }
405
406         rc = batch_update_request_fini(aa->ba_head, req, reply, rc);
407
408         RETURN(rc);
409 }
410
411 static int batch_send_update_req(const struct lu_env *env,
412                                  struct batch_update_head *head)
413 {
414         struct obd_device *obd;
415         struct ptlrpc_request *req = NULL;
416         struct batch_update_args *aa;
417         struct lu_batch *bh;
418         int rc;
419
420         ENTRY;
421
422         if (head == NULL)
423                 RETURN(0);
424
425         obd = class_exp2obd(head->buh_exp);
426         bh = head->buh_batch;
427         rc = batch_prep_update_req(head, &req);
428         if (rc) {
429                 rc = batch_update_request_fini(head, NULL, NULL, rc);
430                 RETURN(rc);
431         }
432
433         aa = ptlrpc_req_async_args(aa, req);
434         aa->ba_head = head;
435         req->rq_interpret_reply = batch_update_interpret;
436
437         /*
438          * Only acquire modification RPC slot for the batched RPC
439          * which contains metadata updates.
440          */
441         if (!(bh->lbt_flags & BATCH_FL_RDONLY))
442                 ptlrpc_get_mod_rpc_slot(req);
443
444         if (bh->lbt_flags & BATCH_FL_SYNC) {
445                 rc = ptlrpc_queue_wait(req);
446         } else {
447                 if ((bh->lbt_flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
448                     BATCH_FL_RDONLY) {
449                         ptlrpcd_add_req(req);
450                 } else if (bh->lbt_flags & BATCH_FL_RQSET) {
451                         ptlrpc_set_add_req(bh->lbt_rqset, req);
452                         ptlrpc_check_set(env, bh->lbt_rqset);
453                 } else {
454                         ptlrpcd_add_req(req);
455                 }
456                 req = NULL;
457         }
458
459         if (req != NULL)
460                 ptlrpc_req_finished(req);
461
462         lprocfs_oh_tally_log2(&obd->u.cli.cl_batch_rpc_hist,
463                               head->buh_update_count);
464         RETURN(rc);
465 }
466
467 static int batch_update_request_add(struct batch_update_head **headp,
468                                     struct md_op_item *item,
469                                     md_update_pack_t packer,
470                                     object_update_interpret_t interpreter)
471 {
472         struct batch_update_head *head = *headp;
473         struct lu_batch *bh = head->buh_batch;
474         struct batch_update_buffer *buf;
475         struct lustre_msg *reqmsg;
476         size_t max_len;
477         int rc;
478
479         ENTRY;
480
481         for (; ;) {
482                 buf = current_batch_update_buffer(head);
483                 LASSERT(buf != NULL);
484                 max_len = buf->bub_size - buf->bub_end;
485                 reqmsg = (struct lustre_msg *)((char *)buf->bub_req +
486                                                 buf->bub_end);
487                 rc = packer(head, reqmsg, &max_len, item);
488                 if (rc == -E2BIG) {
489                         int rc2;
490
491                         /* Create new batch object update buffer */
492                         rc2 = batch_update_buffer_create(head,
493                                 max_len + offsetof(struct batch_update_request,
494                                                    burq_reqmsg[0]) + 1);
495                         if (rc2 != 0) {
496                                 rc = rc2;
497                                 break;
498                         }
499                 } else {
500                         if (rc == 0) {
501                                 buf->bub_end += max_len;
502                                 buf->bub_req->burq_count++;
503                                 head->buh_update_count++;
504                                 head->buh_repsize += reqmsg->lm_repsize;
505                         }
506                         break;
507                 }
508         }
509
510         if (rc)
511                 GOTO(out, rc);
512
513         rc = batch_insert_update_callback(head, item, interpreter);
514         if (rc)
515                 GOTO(out, rc);
516
517         /* Unplug the batch queue if accumulated enough update requests. */
518         if (bh->lbt_max_count && head->buh_update_count >= bh->lbt_max_count) {
519                 rc = batch_send_update_req(NULL, head);
520                 *headp = NULL;
521         }
522 out:
523         if (rc) {
524                 batch_update_request_destroy(head);
525                 *headp = NULL;
526         }
527
528         RETURN(rc);
529 }
530
531 struct lu_batch *cli_batch_create(struct obd_export *exp,
532                                   enum lu_batch_flags flags, __u32 max_count)
533 {
534         struct cli_batch *cbh;
535         struct lu_batch *bh;
536
537         ENTRY;
538
539         OBD_ALLOC_PTR(cbh);
540         if (!cbh)
541                 RETURN(ERR_PTR(-ENOMEM));
542
543         bh = &cbh->cbh_super;
544         bh->lbt_result = 0;
545         bh->lbt_flags = flags;
546         bh->lbt_max_count = max_count;
547
548         cbh->cbh_head = batch_update_request_create(exp, bh);
549         if (IS_ERR(cbh->cbh_head)) {
550                 bh = (struct lu_batch *)cbh->cbh_head;
551                 OBD_FREE_PTR(cbh);
552         }
553
554         RETURN(bh);
555 }
556 EXPORT_SYMBOL(cli_batch_create);
557
558 int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh)
559 {
560         struct cli_batch *cbh;
561         int rc;
562
563         ENTRY;
564
565         cbh = container_of(bh, struct cli_batch, cbh_super);
566         rc = batch_send_update_req(NULL, cbh->cbh_head);
567
568         OBD_FREE_PTR(cbh);
569         RETURN(rc);
570 }
571 EXPORT_SYMBOL(cli_batch_stop);
572
573 int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait)
574 {
575         struct cli_batch *cbh;
576         int rc;
577
578         ENTRY;
579
580         cbh = container_of(bh, struct cli_batch, cbh_super);
581         if (cbh->cbh_head == NULL)
582                 RETURN(0);
583
584         rc = batch_send_update_req(NULL, cbh->cbh_head);
585         cbh->cbh_head = NULL;
586
587         RETURN(rc);
588 }
589 EXPORT_SYMBOL(cli_batch_flush);
590
591 int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
592                   struct md_op_item *item, md_update_pack_t packer,
593                   object_update_interpret_t interpreter)
594 {
595         struct cli_batch *cbh;
596         int rc;
597
598         ENTRY;
599
600         cbh = container_of(bh, struct cli_batch, cbh_super);
601         if (cbh->cbh_head == NULL) {
602                 cbh->cbh_head = batch_update_request_create(exp, bh);
603                 if (IS_ERR(cbh->cbh_head))
604                         RETURN(PTR_ERR(cbh->cbh_head));
605         }
606
607         rc = batch_update_request_add(&cbh->cbh_head, item,
608                                       packer, interpreter);
609
610         RETURN(rc);
611 }
612 EXPORT_SYMBOL(cli_batch_add);