4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2020, 2022, DDN/Whamcloud Storage Corporation.
26 * This file is part of Lustre, http://www.lustre.org/
29 * lustre/ptlrpc/batch.c
31 * Batch Metadata Updating on the client
33 * Author: Qian Yingjin <qian@ddn.com>
36 #define DEBUG_SUBSYSTEM S_MDC
38 #include <linux/module.h>
39 #include <obd_class.h>
41 #ifdef HAVE_SERVER_SUPPORT
42 #include <lustre_update.h>
45 #define OUT_UPDATE_REPLY_SIZE 4096
47 static inline struct lustre_msg *
48 batch_update_reqmsg_next(struct batch_update_request *bur,
49 struct lustre_msg *reqmsg)
52 return (struct lustre_msg *)((char *)reqmsg +
53 lustre_packed_msg_size(reqmsg));
55 return &bur->burq_reqmsg[0];
58 static inline struct lustre_msg *
59 batch_update_repmsg_next(struct batch_update_reply *bur,
60 struct lustre_msg *repmsg)
63 return (struct lustre_msg *)((char *)repmsg +
64 lustre_packed_msg_size(repmsg));
66 return &bur->burp_repmsg[0];
70 struct batch_update_buffer {
71 struct batch_update_request *bub_req;
74 struct list_head bub_item;
77 struct batch_update_args {
78 struct batch_update_head *ba_head;
81 struct batch_work_resend {
82 struct work_struct bwr_work;
83 struct batch_update_head *bwr_head;
88 * Prepare inline update request
90 * Prepare BUT update ptlrpc inline request, and the request usuanlly includes
91 * one update buffer, which does not need bulk transfer.
93 static int batch_prep_inline_update_req(struct batch_update_head *head,
94 struct ptlrpc_request *req,
97 struct batch_update_buffer *buf;
98 struct but_update_header *buh;
101 buf = list_entry(head->buh_buf_list.next,
102 struct batch_update_buffer, bub_item);
103 req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
104 buf->bub_end + sizeof(*buh));
106 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
110 buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
111 buh->buh_magic = BUT_HEADER_MAGIC;
113 buh->buh_inline_length = buf->bub_end;
114 buh->buh_reply_size = repsize;
115 buh->buh_update_count = head->buh_update_count;
117 memcpy(buh->buh_inline_data, buf->bub_req, buf->bub_end);
119 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
120 RCL_SERVER, repsize);
122 ptlrpc_request_set_replen(req);
123 req->rq_request_portal = OUT_PORTAL;
124 req->rq_reply_portal = OSC_REPLY_PORTAL;
129 static int batch_prep_update_req(struct batch_update_head *head,
130 struct ptlrpc_request **reqp)
132 struct ptlrpc_request *req;
133 struct ptlrpc_bulk_desc *desc;
134 struct batch_update_buffer *buf;
135 struct but_update_header *buh;
136 struct but_update_buffer *bub;
144 repsize = head->buh_repsize +
145 cfs_size_round(offsetof(struct batch_update_reply,
147 if (repsize < OUT_UPDATE_REPLY_SIZE)
148 repsize = OUT_UPDATE_REPLY_SIZE;
150 LASSERT(head->buh_buf_count > 0);
152 req = ptlrpc_request_alloc(class_exp2cliimp(head->buh_exp),
157 if (head->buh_buf_count == 1) {
158 buf = list_entry(head->buh_buf_list.next,
159 struct batch_update_buffer, bub_item);
161 /* Check whether it can be packed inline */
162 if (buf->bub_end + sizeof(struct but_update_header) <
163 OUT_UPDATE_MAX_INLINE_SIZE) {
164 rc = batch_prep_inline_update_req(head, req, repsize);
171 req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
172 sizeof(struct but_update_header));
173 req_capsule_set_size(&req->rq_pill, &RMF_BUT_BUF, RCL_CLIENT,
174 head->buh_buf_count * sizeof(*bub));
176 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
180 buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
181 buh->buh_magic = BUT_HEADER_MAGIC;
182 buh->buh_count = head->buh_buf_count;
183 buh->buh_inline_length = 0;
184 buh->buh_reply_size = repsize;
185 buh->buh_update_count = head->buh_update_count;
186 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
187 list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
188 bub->bub_size = buf->bub_size;
190 /* First *and* last might be partial pages, hence +1 */
191 page_count += DIV_ROUND_UP(buf->bub_size, PAGE_SIZE) + 1;
194 req->rq_bulk_write = 1;
195 desc = ptlrpc_prep_bulk_imp(req, page_count,
196 MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
197 PTLRPC_BULK_GET_SOURCE,
199 &ptlrpc_bulk_kiov_nopin_ops);
201 GOTO(out_req, rc = -ENOMEM);
203 list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
204 desc->bd_frag_ops->add_iov_frag(desc, buf->bub_req,
206 total += buf->bub_size;
208 CDEBUG(D_OTHER, "Total %d in %u\n", total, head->buh_update_count);
210 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
211 RCL_SERVER, repsize);
213 ptlrpc_request_set_replen(req);
214 req->rq_request_portal = OUT_PORTAL;
215 req->rq_reply_portal = OSC_REPLY_PORTAL;
220 ptlrpc_req_finished(req);
225 static struct batch_update_buffer *
226 current_batch_update_buffer(struct batch_update_head *head)
228 if (list_empty(&head->buh_buf_list))
231 return list_entry(head->buh_buf_list.prev, struct batch_update_buffer,
235 static int batch_update_buffer_create(struct batch_update_head *head,
238 struct batch_update_buffer *buf;
239 struct batch_update_request *bur;
246 size = round_up(size, PAGE_SIZE);
247 OBD_ALLOC_LARGE(bur, size);
253 bur->burq_magic = BUT_REQUEST_MAGIC;
256 buf->bub_size = size;
257 buf->bub_end = sizeof(*bur);
258 INIT_LIST_HEAD(&buf->bub_item);
259 list_add_tail(&buf->bub_item, &head->buh_buf_list);
260 head->buh_buf_count++;
266 * Destroy an @object_update_callback.
268 static void object_update_callback_fini(struct object_update_callback *ouc)
270 LASSERT(list_empty(&ouc->ouc_item));
276 * Insert an @object_update_callback into the the @batch_update_head.
278 * Usually each update in @batch_update_head will have one correspondent
279 * callback, and these callbacks will be called in ->rq_interpret_reply.
282 batch_insert_update_callback(struct batch_update_head *head, void *data,
283 object_update_interpret_t interpret)
285 struct object_update_callback *ouc;
291 INIT_LIST_HEAD(&ouc->ouc_item);
292 ouc->ouc_interpret = interpret;
293 ouc->ouc_head = head;
294 ouc->ouc_data = data;
295 list_add_tail(&ouc->ouc_item, &head->buh_cb_list);
301 * Allocate and initialize batch update request.
303 * @batch_update_head is being used to track updates being executed on
304 * this OBD device. The update buffer will be 4K initially, and increased
307 static struct batch_update_head *
308 batch_update_request_create(struct obd_export *exp, struct lu_batch *bh)
310 struct batch_update_head *head;
315 return ERR_PTR(-ENOMEM);
317 INIT_LIST_HEAD(&head->buh_cb_list);
318 INIT_LIST_HEAD(&head->buh_buf_list);
320 head->buh_batch = bh;
322 rc = batch_update_buffer_create(head, PAGE_SIZE);
331 static void batch_update_request_destroy(struct batch_update_head *head)
333 struct batch_update_buffer *bub, *tmp;
338 list_for_each_entry_safe(bub, tmp, &head->buh_buf_list, bub_item) {
339 list_del(&bub->bub_item);
341 OBD_FREE_LARGE(bub->bub_req, bub->bub_size);
348 static void cli_batch_resend_work(struct work_struct *data);
350 static int batch_update_request_fini(struct batch_update_head *head,
351 struct ptlrpc_request *req,
352 struct batch_update_reply *reply, int rc)
354 struct object_update_callback *ouc, *next;
355 struct lustre_msg *repmsg = NULL;
362 count = reply->burp_count;
364 list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
368 * The peer may only have handled some requests (indicated by
369 * @count) in the packaged OUT PRC, we can only get results
370 * for the handled part.
373 repmsg = batch_update_repmsg_next(reply, repmsg);
377 rc1 = repmsg->lm_result;
380 * The peer did not handle these request, let us return
381 * -ECANCELED to the update interpreter for now.
386 * TODO: resend the unfinished sub request when the
387 * return code is -EOVERFLOW.
389 if (rc == -EOVERFLOW) {
390 struct batch_work_resend *work;
392 OBD_ALLOC_GFP(work, sizeof(*work), GFP_ATOMIC);
396 INIT_WORK(&work->bwr_work,
397 cli_batch_resend_work);
398 work->bwr_head = head;
399 work->bwr_index = index;
400 schedule_work(&work->bwr_work);
406 list_del_init(&ouc->ouc_item);
407 if (ouc->ouc_interpret != NULL)
408 ouc->ouc_interpret(req, repmsg, ouc, rc1);
411 object_update_callback_fini(ouc);
412 if (rc == 0 && rc1 < 0)
416 batch_update_request_destroy(head);
421 static int batch_update_interpret(const struct lu_env *env,
422 struct ptlrpc_request *req,
425 struct batch_update_args *aa = (struct batch_update_args *)args;
426 struct batch_update_reply *reply = NULL;
430 if (aa->ba_head == NULL)
433 ptlrpc_put_mod_rpc_slot(req);
434 /* Unpack the results from the reply message. */
435 if (req->rq_repmsg != NULL && req->rq_replied) {
436 reply = req_capsule_server_sized_get(&req->rq_pill,
439 if ((reply == NULL ||
440 reply->burp_magic != BUT_REPLY_MAGIC) && rc == 0)
444 rc = batch_update_request_fini(aa->ba_head, req, reply, rc);
449 static int batch_send_update_req(const struct lu_env *env,
450 struct batch_update_head *head)
452 struct obd_device *obd;
453 struct ptlrpc_request *req = NULL;
454 struct batch_update_args *aa;
464 obd = class_exp2obd(head->buh_exp);
465 bh = head->buh_batch;
467 flags = bh->lbt_flags;
469 rc = batch_prep_update_req(head, &req);
471 rc = batch_update_request_fini(head, NULL, NULL, rc);
475 aa = ptlrpc_req_async_args(aa, req);
477 req->rq_interpret_reply = batch_update_interpret;
480 * Only acquire modification RPC slot for the batched RPC
481 * which contains metadata updates.
483 if (!(flags & BATCH_FL_RDONLY))
484 ptlrpc_get_mod_rpc_slot(req);
486 if (flags & BATCH_FL_SYNC) {
487 rc = ptlrpc_queue_wait(req);
489 if ((flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
491 ptlrpcd_add_req(req);
492 } else if (flags & BATCH_FL_RQSET) {
493 ptlrpc_set_add_req(bh->lbt_rqset, req);
494 ptlrpc_check_set(env, bh->lbt_rqset);
496 ptlrpcd_add_req(req);
502 ptlrpc_req_finished(req);
504 lprocfs_oh_tally_log2(&obd->u.cli.cl_batch_rpc_hist,
505 head->buh_update_count);
509 static int batch_update_request_add(struct batch_update_head **headp,
510 struct md_op_item *item,
511 md_update_pack_t packer,
512 object_update_interpret_t interpreter)
514 struct batch_update_head *head = *headp;
515 struct lu_batch *bh = head->buh_batch;
516 struct batch_update_buffer *buf;
517 struct lustre_msg *reqmsg;
524 buf = current_batch_update_buffer(head);
525 LASSERT(buf != NULL);
526 max_len = buf->bub_size - buf->bub_end;
527 reqmsg = (struct lustre_msg *)((char *)buf->bub_req +
529 rc = packer(head, reqmsg, &max_len, item);
533 /* Create new batch object update buffer */
534 rc2 = batch_update_buffer_create(head,
535 max_len + offsetof(struct batch_update_request,
536 burq_reqmsg[0]) + 1);
543 buf->bub_end += max_len;
544 buf->bub_req->burq_count++;
545 head->buh_update_count++;
546 head->buh_repsize += reqmsg->lm_repsize;
555 rc = batch_insert_update_callback(head, item, interpreter);
559 /* Unplug the batch queue if accumulated enough update requests. */
560 if (bh->lbt_max_count && head->buh_update_count >= bh->lbt_max_count) {
561 rc = batch_send_update_req(NULL, head);
566 batch_update_request_destroy(head);
573 static void cli_batch_resend_work(struct work_struct *data)
575 struct batch_work_resend *work = container_of(data,
576 struct batch_work_resend, bwr_work);
577 struct batch_update_head *obuh = work->bwr_head;
578 struct object_update_callback *ouc;
579 struct batch_update_head *head;
580 struct batch_update_buffer *buf;
581 struct batch_update_buffer *tmp;
582 int index = work->bwr_index;
588 head = batch_update_request_create(obuh->buh_exp, NULL);
590 GOTO(err_up, rc = -ENOMEM);
592 list_for_each_entry_safe(buf, tmp, &obuh->buh_buf_list, bub_item) {
593 struct batch_update_request *bur = buf->bub_req;
594 struct batch_update_buffer *newbuf;
595 struct lustre_msg *reqmsg = NULL;
599 if (i + bur->burq_count < index) {
600 i += bur->burq_count;
604 /* reused the allocated buffer */
606 list_move_tail(&buf->bub_item, &head->buh_buf_list);
607 head->buh_update_count += buf->bub_req->burq_count;
608 head->buh_buf_count++;
612 for (j = 0; j < bur->burq_count; j++) {
613 struct lustre_msg *newmsg;
616 reqmsg = batch_update_reqmsg_next(bur, reqmsg);
620 newbuf = current_batch_update_buffer(head);
621 LASSERT(newbuf != NULL);
622 max_len = newbuf->bub_size - newbuf->bub_end;
623 newmsg = (struct lustre_msg *)((char *)newbuf->bub_req +
625 msgsz = lustre_packed_msg_size(reqmsg);
626 if (msgsz >= max_len) {
629 /* Create new batch update buffer */
630 rc2 = batch_update_buffer_create(head, msgsz +
631 offsetof(struct batch_update_request,
632 burq_reqmsg[0]) + 1);
634 GOTO(err_up, rc = rc2);
638 memcpy(newmsg, reqmsg, msgsz);
639 newbuf->bub_end += msgsz;
640 newbuf->bub_req->burq_count++;
641 head->buh_update_count++;
647 list_splice_init(&obuh->buh_cb_list, &head->buh_cb_list);
648 list_for_each_entry(ouc, &head->buh_cb_list, ouc_item)
649 ouc->ouc_head = head;
651 head->buh_repsize = BUT_MAXREPSIZE - SPTLRPC_MAX_PAYLOAD;
652 rc = batch_send_update_req(NULL, head);
656 batch_update_request_destroy(obuh);
661 batch_update_request_fini(obuh, NULL, NULL, rc);
663 batch_update_request_fini(head, NULL, NULL, rc);
669 struct lu_batch *cli_batch_create(struct obd_export *exp,
670 enum lu_batch_flags flags, __u32 max_count)
672 struct cli_batch *cbh;
679 RETURN(ERR_PTR(-ENOMEM));
681 bh = &cbh->cbh_super;
683 bh->lbt_flags = flags;
684 bh->lbt_max_count = max_count;
686 cbh->cbh_head = batch_update_request_create(exp, bh);
687 if (IS_ERR(cbh->cbh_head)) {
688 bh = (struct lu_batch *)cbh->cbh_head;
694 EXPORT_SYMBOL(cli_batch_create);
696 int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh)
698 struct cli_batch *cbh;
703 cbh = container_of(bh, struct cli_batch, cbh_super);
704 rc = batch_send_update_req(NULL, cbh->cbh_head);
709 EXPORT_SYMBOL(cli_batch_stop);
711 int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait)
713 struct cli_batch *cbh;
718 cbh = container_of(bh, struct cli_batch, cbh_super);
719 if (cbh->cbh_head == NULL)
722 rc = batch_send_update_req(NULL, cbh->cbh_head);
723 cbh->cbh_head = NULL;
727 EXPORT_SYMBOL(cli_batch_flush);
729 int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
730 struct md_op_item *item, md_update_pack_t packer,
731 object_update_interpret_t interpreter)
733 struct cli_batch *cbh;
738 cbh = container_of(bh, struct cli_batch, cbh_super);
739 if (cbh->cbh_head == NULL) {
740 cbh->cbh_head = batch_update_request_create(exp, bh);
741 if (IS_ERR(cbh->cbh_head))
742 RETURN(PTR_ERR(cbh->cbh_head));
745 rc = batch_update_request_add(&cbh->cbh_head, item,
746 packer, interpreter);
750 EXPORT_SYMBOL(cli_batch_add);