4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2020, 2022, DDN/Whamcloud Storage Corporation.
26 * This file is part of Lustre, http://www.lustre.org/
29 * lustre/ptlrpc/batch.c
31 * Batch Metadata Updating on the client
33 * Author: Qian Yingjin <qian@ddn.com>
36 #define DEBUG_SUBSYSTEM S_MDC
38 #include <linux/module.h>
39 #include <obd_class.h>
41 #ifdef HAVE_SERVER_SUPPORT
42 #include <lustre_update.h>
45 #define OUT_UPDATE_REPLY_SIZE 4096
47 static inline struct lustre_msg *
48 batch_update_repmsg_next(struct batch_update_reply *bur,
49 struct lustre_msg *repmsg)
52 return (struct lustre_msg *)((char *)repmsg +
53 lustre_packed_msg_size(repmsg));
55 return &bur->burp_repmsg[0];
59 struct batch_update_buffer {
60 struct batch_update_request *bub_req;
63 struct list_head bub_item;
66 struct batch_update_args {
67 struct batch_update_head *ba_head;
71 * Prepare inline update request
73 * Prepare BUT update ptlrpc inline request, and the request usuanlly includes
74 * one update buffer, which does not need bulk transfer.
76 static int batch_prep_inline_update_req(struct batch_update_head *head,
77 struct ptlrpc_request *req,
80 struct batch_update_buffer *buf;
81 struct but_update_header *buh;
84 buf = list_entry(head->buh_buf_list.next,
85 struct batch_update_buffer, bub_item);
86 req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
87 buf->bub_end + sizeof(*buh));
89 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
93 buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
94 buh->buh_magic = BUT_HEADER_MAGIC;
96 buh->buh_inline_length = buf->bub_end;
97 buh->buh_reply_size = repsize;
98 buh->buh_update_count = head->buh_update_count;
100 memcpy(buh->buh_inline_data, buf->bub_req, buf->bub_end);
102 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
103 RCL_SERVER, repsize);
105 ptlrpc_request_set_replen(req);
106 req->rq_request_portal = OUT_PORTAL;
107 req->rq_reply_portal = OSC_REPLY_PORTAL;
112 static int batch_prep_update_req(struct batch_update_head *head,
113 struct ptlrpc_request **reqp)
115 struct ptlrpc_request *req;
116 struct ptlrpc_bulk_desc *desc;
117 struct batch_update_buffer *buf;
118 struct but_update_header *buh;
119 struct but_update_buffer *bub;
127 repsize = head->buh_repsize +
128 cfs_size_round(offsetof(struct batch_update_reply,
130 if (repsize < OUT_UPDATE_REPLY_SIZE)
131 repsize = OUT_UPDATE_REPLY_SIZE;
133 LASSERT(head->buh_buf_count > 0);
135 req = ptlrpc_request_alloc(class_exp2cliimp(head->buh_exp),
140 if (head->buh_buf_count == 1) {
141 buf = list_entry(head->buh_buf_list.next,
142 struct batch_update_buffer, bub_item);
144 /* Check whether it can be packed inline */
145 if (buf->bub_end + sizeof(struct but_update_header) <
146 OUT_UPDATE_MAX_INLINE_SIZE) {
147 rc = batch_prep_inline_update_req(head, req, repsize);
154 req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
155 sizeof(struct but_update_header));
156 req_capsule_set_size(&req->rq_pill, &RMF_BUT_BUF, RCL_CLIENT,
157 head->buh_buf_count * sizeof(*bub));
159 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
163 buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
164 buh->buh_magic = BUT_HEADER_MAGIC;
165 buh->buh_count = head->buh_buf_count;
166 buh->buh_inline_length = 0;
167 buh->buh_reply_size = repsize;
168 buh->buh_update_count = head->buh_update_count;
169 bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
170 list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
171 bub->bub_size = buf->bub_size;
173 /* First *and* last might be partial pages, hence +1 */
174 page_count += DIV_ROUND_UP(buf->bub_size, PAGE_SIZE) + 1;
177 req->rq_bulk_write = 1;
178 desc = ptlrpc_prep_bulk_imp(req, page_count,
179 MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
180 PTLRPC_BULK_GET_SOURCE,
182 &ptlrpc_bulk_kiov_nopin_ops);
184 GOTO(out_req, rc = -ENOMEM);
186 list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
187 desc->bd_frag_ops->add_iov_frag(desc, buf->bub_req,
189 total += buf->bub_size;
191 CDEBUG(D_OTHER, "Total %d in %u\n", total, head->buh_update_count);
193 req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
194 RCL_SERVER, repsize);
196 ptlrpc_request_set_replen(req);
197 req->rq_request_portal = OUT_PORTAL;
198 req->rq_reply_portal = OSC_REPLY_PORTAL;
203 ptlrpc_req_finished(req);
208 static struct batch_update_buffer *
209 current_batch_update_buffer(struct batch_update_head *head)
211 if (list_empty(&head->buh_buf_list))
214 return list_entry(head->buh_buf_list.prev, struct batch_update_buffer,
218 static int batch_update_buffer_create(struct batch_update_head *head,
221 struct batch_update_buffer *buf;
222 struct batch_update_request *bur;
229 size = round_up(size, PAGE_SIZE);
230 OBD_ALLOC_LARGE(bur, size);
236 bur->burq_magic = BUT_REQUEST_MAGIC;
239 buf->bub_size = size;
240 buf->bub_end = sizeof(*bur);
241 INIT_LIST_HEAD(&buf->bub_item);
242 list_add_tail(&buf->bub_item, &head->buh_buf_list);
243 head->buh_buf_count++;
249 * Destroy an @object_update_callback.
251 static void object_update_callback_fini(struct object_update_callback *ouc)
253 LASSERT(list_empty(&ouc->ouc_item));
259 * Insert an @object_update_callback into the the @batch_update_head.
261 * Usually each update in @batch_update_head will have one correspondent
262 * callback, and these callbacks will be called in ->rq_interpret_reply.
265 batch_insert_update_callback(struct batch_update_head *head, void *data,
266 object_update_interpret_t interpret)
268 struct object_update_callback *ouc;
274 INIT_LIST_HEAD(&ouc->ouc_item);
275 ouc->ouc_interpret = interpret;
276 ouc->ouc_head = head;
277 ouc->ouc_data = data;
278 list_add_tail(&ouc->ouc_item, &head->buh_cb_list);
284 * Allocate and initialize batch update request.
286 * @batch_update_head is being used to track updates being executed on
287 * this OBD device. The update buffer will be 4K initially, and increased
290 static struct batch_update_head *
291 batch_update_request_create(struct obd_export *exp, struct lu_batch *bh)
293 struct batch_update_head *head;
298 return ERR_PTR(-ENOMEM);
300 INIT_LIST_HEAD(&head->buh_cb_list);
301 INIT_LIST_HEAD(&head->buh_buf_list);
303 head->buh_batch = bh;
305 rc = batch_update_buffer_create(head, PAGE_SIZE);
314 static void batch_update_request_destroy(struct batch_update_head *head)
316 struct batch_update_buffer *bub, *tmp;
321 list_for_each_entry_safe(bub, tmp, &head->buh_buf_list, bub_item) {
322 list_del(&bub->bub_item);
324 OBD_FREE_LARGE(bub->bub_req, bub->bub_size);
331 static int batch_update_request_fini(struct batch_update_head *head,
332 struct ptlrpc_request *req,
333 struct batch_update_reply *reply, int rc)
335 struct object_update_callback *ouc, *next;
336 struct lustre_msg *repmsg = NULL;
343 count = reply->burp_count;
345 list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
348 list_del_init(&ouc->ouc_item);
351 * The peer may only have handled some requests (indicated by
352 * @count) in the packaged OUT PRC, we can only get results
353 * for the handled part.
356 repmsg = batch_update_repmsg_next(reply, repmsg);
360 rc1 = repmsg->lm_result;
363 * The peer did not handle these request, let us return
364 * -ECANCELED to the update interpreter for now.
370 if (ouc->ouc_interpret != NULL)
371 ouc->ouc_interpret(req, repmsg, ouc, rc1);
373 object_update_callback_fini(ouc);
374 if (rc == 0 && rc1 < 0)
378 batch_update_request_destroy(head);
383 static int batch_update_interpret(const struct lu_env *env,
384 struct ptlrpc_request *req,
387 struct batch_update_args *aa = (struct batch_update_args *)args;
388 struct batch_update_reply *reply = NULL;
392 if (aa->ba_head == NULL)
395 ptlrpc_put_mod_rpc_slot(req);
396 /* Unpack the results from the reply message. */
397 if (req->rq_repmsg != NULL && req->rq_replied) {
398 reply = req_capsule_server_sized_get(&req->rq_pill,
401 if ((reply == NULL ||
402 reply->burp_magic != BUT_REPLY_MAGIC) && rc == 0)
406 rc = batch_update_request_fini(aa->ba_head, req, reply, rc);
411 static int batch_send_update_req(const struct lu_env *env,
412 struct batch_update_head *head)
414 struct obd_device *obd;
415 struct ptlrpc_request *req = NULL;
416 struct batch_update_args *aa;
425 obd = class_exp2obd(head->buh_exp);
426 bh = head->buh_batch;
427 rc = batch_prep_update_req(head, &req);
429 rc = batch_update_request_fini(head, NULL, NULL, rc);
433 aa = ptlrpc_req_async_args(aa, req);
435 req->rq_interpret_reply = batch_update_interpret;
438 * Only acquire modification RPC slot for the batched RPC
439 * which contains metadata updates.
441 if (!(bh->lbt_flags & BATCH_FL_RDONLY))
442 ptlrpc_get_mod_rpc_slot(req);
444 if (bh->lbt_flags & BATCH_FL_SYNC) {
445 rc = ptlrpc_queue_wait(req);
447 if ((bh->lbt_flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
449 ptlrpcd_add_req(req);
450 } else if (bh->lbt_flags & BATCH_FL_RQSET) {
451 ptlrpc_set_add_req(bh->lbt_rqset, req);
452 ptlrpc_check_set(env, bh->lbt_rqset);
454 ptlrpcd_add_req(req);
460 ptlrpc_req_finished(req);
462 lprocfs_oh_tally_log2(&obd->u.cli.cl_batch_rpc_hist,
463 head->buh_update_count);
467 static int batch_update_request_add(struct batch_update_head **headp,
468 struct md_op_item *item,
469 md_update_pack_t packer,
470 object_update_interpret_t interpreter)
472 struct batch_update_head *head = *headp;
473 struct lu_batch *bh = head->buh_batch;
474 struct batch_update_buffer *buf;
475 struct lustre_msg *reqmsg;
482 buf = current_batch_update_buffer(head);
483 LASSERT(buf != NULL);
484 max_len = buf->bub_size - buf->bub_end;
485 reqmsg = (struct lustre_msg *)((char *)buf->bub_req +
487 rc = packer(head, reqmsg, &max_len, item);
491 /* Create new batch object update buffer */
492 rc2 = batch_update_buffer_create(head,
493 max_len + offsetof(struct batch_update_request,
494 burq_reqmsg[0]) + 1);
501 buf->bub_end += max_len;
502 buf->bub_req->burq_count++;
503 head->buh_update_count++;
504 head->buh_repsize += reqmsg->lm_repsize;
513 rc = batch_insert_update_callback(head, item, interpreter);
517 /* Unplug the batch queue if accumulated enough update requests. */
518 if (bh->lbt_max_count && head->buh_update_count >= bh->lbt_max_count) {
519 rc = batch_send_update_req(NULL, head);
524 batch_update_request_destroy(head);
531 struct lu_batch *cli_batch_create(struct obd_export *exp,
532 enum lu_batch_flags flags, __u32 max_count)
534 struct cli_batch *cbh;
541 RETURN(ERR_PTR(-ENOMEM));
543 bh = &cbh->cbh_super;
545 bh->lbt_flags = flags;
546 bh->lbt_max_count = max_count;
548 cbh->cbh_head = batch_update_request_create(exp, bh);
549 if (IS_ERR(cbh->cbh_head)) {
550 bh = (struct lu_batch *)cbh->cbh_head;
556 EXPORT_SYMBOL(cli_batch_create);
558 int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh)
560 struct cli_batch *cbh;
565 cbh = container_of(bh, struct cli_batch, cbh_super);
566 rc = batch_send_update_req(NULL, cbh->cbh_head);
571 EXPORT_SYMBOL(cli_batch_stop);
573 int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait)
575 struct cli_batch *cbh;
580 cbh = container_of(bh, struct cli_batch, cbh_super);
581 if (cbh->cbh_head == NULL)
584 rc = batch_send_update_req(NULL, cbh->cbh_head);
585 cbh->cbh_head = NULL;
589 EXPORT_SYMBOL(cli_batch_flush);
591 int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
592 struct md_op_item *item, md_update_pack_t packer,
593 object_update_interpret_t interpreter)
595 struct cli_batch *cbh;
600 cbh = container_of(bh, struct cli_batch, cbh_super);
601 if (cbh->cbh_head == NULL) {
602 cbh->cbh_head = batch_update_request_create(exp, bh);
603 if (IS_ERR(cbh->cbh_head))
604 RETURN(PTR_ERR(cbh->cbh_head));
607 rc = batch_update_request_add(&cbh->cbh_head, item,
608 packer, interpreter);
612 EXPORT_SYMBOL(cli_batch_add);