4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2020, 2022, DDN/Whamcloud Storage Corporation.
26 * This file is part of Lustre, http://www.lustre.org/
29 * lustre/ptlrpc/batch.c
31 * Batch Metadata Updating on the client
33 * Author: Qian Yingjin <qian@ddn.com>
36 #define DEBUG_SUBSYSTEM S_MDC
38 #include <linux/module.h>
39 #include <lustre_update.h>
42 struct batch_update_buffer {
43 struct batch_update_request *bub_req;
46 struct list_head bub_item;
49 struct batch_update_args {
50 struct batch_update_head *ba_head;
54 * Prepare inline update request
56 * Prepare BUT update ptlrpc inline request, and the request usuanlly includes
57 * one update buffer, which does not need bulk transfer.
59 static int batch_prep_inline_update_req(struct batch_update_head *head,
60 struct ptlrpc_request *req,
63 struct batch_update_buffer *buf;
64 struct but_update_header *buh;
67 buf = list_entry(head->buh_buf_list.next,
68 struct batch_update_buffer, bub_item);
69 req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
70 buf->bub_end + sizeof(*buh));
72 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
76 buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
77 buh->buh_magic = BUT_HEADER_MAGIC;
79 buh->buh_inline_length = buf->bub_end;
80 buh->buh_reply_size = repsize;
81 buh->buh_update_count = head->buh_update_count;
83 memcpy(buh->buh_inline_data, buf->bub_req, buf->bub_end);
85 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
88 ptlrpc_request_set_replen(req);
89 req->rq_request_portal = OUT_PORTAL;
90 req->rq_reply_portal = OSC_REPLY_PORTAL;
95 static int batch_prep_update_req(struct batch_update_head *head,
96 struct ptlrpc_request **reqp)
98 struct ptlrpc_request *req;
99 struct ptlrpc_bulk_desc *desc;
100 struct batch_update_buffer *buf;
101 struct but_update_header *buh;
102 struct but_update_buffer *bub;
110 repsize = head->buh_repsize +
111 cfs_size_round(offsetof(struct batch_update_reply,
113 if (repsize < OUT_UPDATE_REPLY_SIZE)
114 repsize = OUT_UPDATE_REPLY_SIZE;
116 LASSERT(head->buh_buf_count > 0);
118 req = ptlrpc_request_alloc(class_exp2cliimp(head->buh_exp),
123 if (head->buh_buf_count == 1) {
124 buf = list_entry(head->buh_buf_list.next,
125 struct batch_update_buffer, bub_item);
127 /* Check whether it can be packed inline */
128 if (buf->bub_end + sizeof(struct but_update_header) <
129 OUT_UPDATE_MAX_INLINE_SIZE) {
130 rc = batch_prep_inline_update_req(head, req, repsize);
137 req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
138 sizeof(struct but_update_header));
139 req_capsule_set_size(&req->rq_pill, &RMF_BUT_BUF, RCL_CLIENT,
140 head->buh_buf_count * sizeof(*bub));
142 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
146 buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
147 buh->buh_magic = BUT_HEADER_MAGIC;
148 buh->buh_count = head->buh_buf_count;
149 buh->buh_inline_length = 0;
150 buh->buh_reply_size = repsize;
151 buh->buh_update_count = head->buh_update_count;
152 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
153 list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
154 bub->bub_size = buf->bub_size;
156 /* First *and* last might be partial pages, hence +1 */
157 page_count += DIV_ROUND_UP(buf->bub_size, PAGE_SIZE) + 1;
160 req->rq_bulk_write = 1;
161 desc = ptlrpc_prep_bulk_imp(req, page_count,
162 MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
163 PTLRPC_BULK_GET_SOURCE,
165 &ptlrpc_bulk_kiov_nopin_ops);
167 GOTO(out_req, rc = -ENOMEM);
169 list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
170 desc->bd_frag_ops->add_iov_frag(desc, buf->bub_req,
172 total += buf->bub_size;
174 CDEBUG(D_OTHER, "Total %d in %u\n", total, head->buh_update_count);
176 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
177 RCL_SERVER, repsize);
179 ptlrpc_request_set_replen(req);
180 req->rq_request_portal = OUT_PORTAL;
181 req->rq_reply_portal = OSC_REPLY_PORTAL;
186 ptlrpc_req_finished(req);
191 static struct batch_update_buffer *
192 current_batch_update_buffer(struct batch_update_head *head)
194 if (list_empty(&head->buh_buf_list))
197 return list_entry(head->buh_buf_list.prev, struct batch_update_buffer,
201 static int batch_update_buffer_create(struct batch_update_head *head,
204 struct batch_update_buffer *buf;
205 struct batch_update_request *bur;
212 size = round_up(size, PAGE_SIZE);
213 OBD_ALLOC_LARGE(bur, size);
219 bur->burq_magic = BUT_REQUEST_MAGIC;
222 buf->bub_size = size;
223 buf->bub_end = sizeof(*bur);
224 INIT_LIST_HEAD(&buf->bub_item);
225 list_add_tail(&buf->bub_item, &head->buh_buf_list);
226 head->buh_buf_count++;
232 * Destroy an @object_update_callback.
234 static void object_update_callback_fini(struct object_update_callback *ouc)
236 LASSERT(list_empty(&ouc->ouc_item));
242 * Insert an @object_update_callback into the the @batch_update_head.
244 * Usually each update in @batch_update_head will have one correspondent
245 * callback, and these callbacks will be called in ->rq_interpret_reply.
248 batch_insert_update_callback(struct batch_update_head *head, void *data,
249 object_update_interpret_t interpret)
251 struct object_update_callback *ouc;
257 INIT_LIST_HEAD(&ouc->ouc_item);
258 ouc->ouc_interpret = interpret;
259 ouc->ouc_head = head;
260 ouc->ouc_data = data;
261 list_add_tail(&ouc->ouc_item, &head->buh_cb_list);
267 * Allocate and initialize batch update request.
269 * @batch_update_head is being used to track updates being executed on
270 * this OBD device. The update buffer will be 4K initially, and increased
273 static struct batch_update_head *
274 batch_update_request_create(struct obd_export *exp, struct lu_batch *bh)
276 struct batch_update_head *head;
281 return ERR_PTR(-ENOMEM);
283 INIT_LIST_HEAD(&head->buh_cb_list);
284 INIT_LIST_HEAD(&head->buh_buf_list);
286 head->buh_batch = bh;
288 rc = batch_update_buffer_create(head, PAGE_SIZE);
297 static void batch_update_request_destroy(struct batch_update_head *head)
299 struct batch_update_buffer *bub, *tmp;
304 list_for_each_entry_safe(bub, tmp, &head->buh_buf_list, bub_item) {
305 list_del(&bub->bub_item);
307 OBD_FREE_LARGE(bub->bub_req, bub->bub_size);
314 static int batch_update_request_fini(struct batch_update_head *head,
315 struct ptlrpc_request *req,
316 struct batch_update_reply *reply, int rc)
318 struct object_update_callback *ouc, *next;
319 struct lustre_msg *repmsg = NULL;
326 count = reply->burp_count;
328 list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
331 list_del_init(&ouc->ouc_item);
334 * The peer may only have handled some requests (indicated by
335 * @count) in the packaged OUT PRC, we can only get results
336 * for the handled part.
339 repmsg = batch_update_repmsg_next(reply, repmsg);
343 rc1 = repmsg->lm_result;
346 * The peer did not handle these request, let us return
347 * -ECANCELED to the update interpreter for now.
353 if (ouc->ouc_interpret != NULL)
354 ouc->ouc_interpret(req, repmsg, ouc, rc1);
356 object_update_callback_fini(ouc);
357 if (rc == 0 && rc1 < 0)
361 batch_update_request_destroy(head);
366 static int batch_update_interpret(const struct lu_env *env,
367 struct ptlrpc_request *req,
370 struct batch_update_args *aa = (struct batch_update_args *)args;
371 struct batch_update_reply *reply = NULL;
375 if (aa->ba_head == NULL)
378 ptlrpc_put_mod_rpc_slot(req);
379 /* Unpack the results from the reply message. */
380 if (req->rq_repmsg != NULL && req->rq_replied) {
381 reply = req_capsule_server_sized_get(&req->rq_pill,
384 if ((reply == NULL ||
385 reply->burp_magic != BUT_REPLY_MAGIC) && rc == 0)
389 rc = batch_update_request_fini(aa->ba_head, req, reply, rc);
394 static int batch_send_update_req(const struct lu_env *env,
395 struct batch_update_head *head)
398 struct ptlrpc_request *req = NULL;
399 struct batch_update_args *aa;
407 bh = head->buh_batch;
408 rc = batch_prep_update_req(head, &req);
410 rc = batch_update_request_fini(head, NULL, NULL, rc);
414 aa = ptlrpc_req_async_args(aa, req);
416 req->rq_interpret_reply = batch_update_interpret;
419 * Only acquire modification RPC slot for the batched RPC
420 * which contains metadata updates.
422 if (!(bh->lbt_flags & BATCH_FL_RDONLY))
423 ptlrpc_get_mod_rpc_slot(req);
425 if (bh->lbt_flags & BATCH_FL_SYNC) {
426 rc = ptlrpc_queue_wait(req);
428 if ((bh->lbt_flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
430 ptlrpcd_add_req(req);
431 } else if (bh->lbt_flags & BATCH_FL_RQSET) {
432 ptlrpc_set_add_req(bh->lbt_rqset, req);
433 ptlrpc_check_set(env, bh->lbt_rqset);
435 ptlrpcd_add_req(req);
441 ptlrpc_req_finished(req);
446 static int batch_update_request_add(struct batch_update_head **headp,
447 struct md_op_item *item,
448 md_update_pack_t packer,
449 object_update_interpret_t interpreter)
451 struct batch_update_head *head = *headp;
452 struct lu_batch *bh = head->buh_batch;
453 struct batch_update_buffer *buf;
454 struct lustre_msg *reqmsg;
461 buf = current_batch_update_buffer(head);
462 LASSERT(buf != NULL);
463 max_len = buf->bub_size - buf->bub_end;
464 reqmsg = (struct lustre_msg *)((char *)buf->bub_req +
466 rc = packer(head, reqmsg, &max_len, item);
470 /* Create new batch object update buffer */
471 rc2 = batch_update_buffer_create(head,
472 max_len + offsetof(struct batch_update_request,
473 burq_reqmsg[0]) + 1);
480 buf->bub_end += max_len;
481 buf->bub_req->burq_count++;
482 head->buh_update_count++;
483 head->buh_repsize += reqmsg->lm_repsize;
492 rc = batch_insert_update_callback(head, item, interpreter);
496 /* Unplug the batch queue if accumulated enough update requests. */
497 if (bh->lbt_max_count && head->buh_update_count >= bh->lbt_max_count) {
498 rc = batch_send_update_req(NULL, head);
503 batch_update_request_destroy(head);
510 struct lu_batch *cli_batch_create(struct obd_export *exp,
511 enum lu_batch_flags flags, __u32 max_count)
513 struct cli_batch *cbh;
520 RETURN(ERR_PTR(-ENOMEM));
522 bh = &cbh->cbh_super;
524 bh->lbt_flags = flags;
525 bh->lbt_max_count = max_count;
527 cbh->cbh_head = batch_update_request_create(exp, bh);
528 if (IS_ERR(cbh->cbh_head)) {
529 bh = (struct lu_batch *)cbh->cbh_head;
535 EXPORT_SYMBOL(cli_batch_create);
537 int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh)
539 struct cli_batch *cbh;
544 cbh = container_of(bh, struct cli_batch, cbh_super);
545 rc = batch_send_update_req(NULL, cbh->cbh_head);
550 EXPORT_SYMBOL(cli_batch_stop);
552 int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait)
554 struct cli_batch *cbh;
559 cbh = container_of(bh, struct cli_batch, cbh_super);
560 if (cbh->cbh_head == NULL)
563 rc = batch_send_update_req(NULL, cbh->cbh_head);
564 cbh->cbh_head = NULL;
568 EXPORT_SYMBOL(cli_batch_flush);
570 int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
571 struct md_op_item *item, md_update_pack_t packer,
572 object_update_interpret_t interpreter)
574 struct cli_batch *cbh;
579 cbh = container_of(bh, struct cli_batch, cbh_super);
580 if (cbh->cbh_head == NULL) {
581 cbh->cbh_head = batch_update_request_create(exp, bh);
582 if (IS_ERR(cbh->cbh_head))
583 RETURN(PTR_ERR(cbh->cbh_head));
586 rc = batch_update_request_add(&cbh->cbh_head, item,
587 packer, interpreter);
591 EXPORT_SYMBOL(cli_batch_add);