Whamcloud - gitweb
LU-15550 ptlrpc: retry mechanism for overflowed batched RPCs
[fs/lustre-release.git] / lustre / ptlrpc / batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2020, 2022, DDN/Whamcloud Storage Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * lustre/ptlrpc/batch.c
30  *
31  * Batch Metadata Updating on the client
32  *
33  * Author: Qian Yingjin <qian@ddn.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDC
37
38 #include <linux/module.h>
39 #include <obd_class.h>
40 #include <obd.h>
41 #ifdef HAVE_SERVER_SUPPORT
42 #include <lustre_update.h>
43 #else
44
45 #define OUT_UPDATE_REPLY_SIZE           4096
46
47 static inline struct lustre_msg *
48 batch_update_reqmsg_next(struct batch_update_request *bur,
49                          struct lustre_msg *reqmsg)
50 {
51         if (reqmsg)
52                 return (struct lustre_msg *)((char *)reqmsg +
53                                              lustre_packed_msg_size(reqmsg));
54         else
55                 return &bur->burq_reqmsg[0];
56 }
57
58 static inline struct lustre_msg *
59 batch_update_repmsg_next(struct batch_update_reply *bur,
60                          struct lustre_msg *repmsg)
61 {
62         if (repmsg)
63                 return (struct lustre_msg *)((char *)repmsg +
64                                              lustre_packed_msg_size(repmsg));
65         else
66                 return &bur->burp_repmsg[0];
67 }
68 #endif
69
70 struct batch_update_buffer {
71         struct batch_update_request     *bub_req;
72         size_t                           bub_size;
73         size_t                           bub_end;
74         struct list_head                 bub_item;
75 };
76
77 struct batch_update_args {
78         struct batch_update_head        *ba_head;
79 };
80
81 struct batch_work_resend {
82         struct work_struct               bwr_work;
83         struct batch_update_head        *bwr_head;
84         int                              bwr_index;
85 };
86
87 /**
88  * Prepare inline update request
89  *
90  * Prepare BUT update ptlrpc inline request, and the request usuanlly includes
91  * one update buffer, which does not need bulk transfer.
92  */
93 static int batch_prep_inline_update_req(struct batch_update_head *head,
94                                         struct ptlrpc_request *req,
95                                         int repsize)
96 {
97         struct batch_update_buffer *buf;
98         struct but_update_header *buh;
99         int rc;
100
101         buf = list_entry(head->buh_buf_list.next,
102                           struct batch_update_buffer, bub_item);
103         req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
104                              buf->bub_end + sizeof(*buh));
105
106         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
107         if (rc != 0)
108                 RETURN(rc);
109
110         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
111         buh->buh_magic = BUT_HEADER_MAGIC;
112         buh->buh_count = 1;
113         buh->buh_inline_length = buf->bub_end;
114         buh->buh_reply_size = repsize;
115         buh->buh_update_count = head->buh_update_count;
116
117         memcpy(buh->buh_inline_data, buf->bub_req, buf->bub_end);
118
119         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
120                              RCL_SERVER, repsize);
121
122         ptlrpc_request_set_replen(req);
123         req->rq_request_portal = OUT_PORTAL;
124         req->rq_reply_portal = OSC_REPLY_PORTAL;
125
126         RETURN(rc);
127 }
128
129 static int batch_prep_update_req(struct batch_update_head *head,
130                                  struct ptlrpc_request **reqp)
131 {
132         struct ptlrpc_request *req;
133         struct ptlrpc_bulk_desc *desc;
134         struct batch_update_buffer *buf;
135         struct but_update_header *buh;
136         struct but_update_buffer *bub;
137         int page_count = 0;
138         int total = 0;
139         int repsize;
140         int rc;
141
142         ENTRY;
143
144         repsize = head->buh_repsize +
145                   cfs_size_round(offsetof(struct batch_update_reply,
146                                           burp_repmsg[0]));
147         if (repsize < OUT_UPDATE_REPLY_SIZE)
148                 repsize = OUT_UPDATE_REPLY_SIZE;
149
150         LASSERT(head->buh_buf_count > 0);
151
152         req = ptlrpc_request_alloc(class_exp2cliimp(head->buh_exp),
153                                    &RQF_MDS_BATCH);
154         if (req == NULL)
155                 RETURN(-ENOMEM);
156
157         if (head->buh_buf_count == 1) {
158                 buf = list_entry(head->buh_buf_list.next,
159                                  struct batch_update_buffer, bub_item);
160
161                 /* Check whether it can be packed inline */
162                 if (buf->bub_end + sizeof(struct but_update_header) <
163                     OUT_UPDATE_MAX_INLINE_SIZE) {
164                         rc = batch_prep_inline_update_req(head, req, repsize);
165                         if (rc == 0)
166                                 *reqp = req;
167                         GOTO(out_req, rc);
168                 }
169         }
170
171         req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
172                              sizeof(struct but_update_header));
173         req_capsule_set_size(&req->rq_pill, &RMF_BUT_BUF, RCL_CLIENT,
174                              head->buh_buf_count * sizeof(*bub));
175
176         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
177         if (rc != 0)
178                 GOTO(out_req, rc);
179
180         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
181         buh->buh_magic = BUT_HEADER_MAGIC;
182         buh->buh_count = head->buh_buf_count;
183         buh->buh_inline_length = 0;
184         buh->buh_reply_size = repsize;
185         buh->buh_update_count = head->buh_update_count;
186         bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
187         list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
188                 bub->bub_size = buf->bub_size;
189                 bub++;
190                 /* First *and* last might be partial pages, hence +1 */
191                 page_count += DIV_ROUND_UP(buf->bub_size, PAGE_SIZE) + 1;
192         }
193
194         req->rq_bulk_write = 1;
195         desc = ptlrpc_prep_bulk_imp(req, page_count,
196                                     MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
197                                     PTLRPC_BULK_GET_SOURCE,
198                                     MDS_BULK_PORTAL,
199                                     &ptlrpc_bulk_kiov_nopin_ops);
200         if (desc == NULL)
201                 GOTO(out_req, rc = -ENOMEM);
202
203         list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
204                 desc->bd_frag_ops->add_iov_frag(desc, buf->bub_req,
205                                                 buf->bub_size);
206                 total += buf->bub_size;
207         }
208         CDEBUG(D_OTHER, "Total %d in %u\n", total, head->buh_update_count);
209
210         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
211                              RCL_SERVER, repsize);
212
213         ptlrpc_request_set_replen(req);
214         req->rq_request_portal = OUT_PORTAL;
215         req->rq_reply_portal = OSC_REPLY_PORTAL;
216         *reqp = req;
217
218 out_req:
219         if (rc < 0)
220                 ptlrpc_req_finished(req);
221
222         RETURN(rc);
223 }
224
225 static struct batch_update_buffer *
226 current_batch_update_buffer(struct batch_update_head *head)
227 {
228         if (list_empty(&head->buh_buf_list))
229                 return NULL;
230
231         return list_entry(head->buh_buf_list.prev, struct batch_update_buffer,
232                           bub_item);
233 }
234
235 static int batch_update_buffer_create(struct batch_update_head *head,
236                                       size_t size)
237 {
238         struct batch_update_buffer *buf;
239         struct batch_update_request *bur;
240
241         OBD_ALLOC_PTR(buf);
242         if (buf == NULL)
243                 return -ENOMEM;
244
245         LASSERT(size > 0);
246         size = round_up(size, PAGE_SIZE);
247         OBD_ALLOC_LARGE(bur, size);
248         if (bur == NULL) {
249                 OBD_FREE_PTR(buf);
250                 return -ENOMEM;
251         }
252
253         bur->burq_magic = BUT_REQUEST_MAGIC;
254         bur->burq_count = 0;
255         buf->bub_req = bur;
256         buf->bub_size = size;
257         buf->bub_end = sizeof(*bur);
258         INIT_LIST_HEAD(&buf->bub_item);
259         list_add_tail(&buf->bub_item, &head->buh_buf_list);
260         head->buh_buf_count++;
261
262         return 0;
263 }
264
265 /**
266  * Destroy an @object_update_callback.
267  */
268 static void object_update_callback_fini(struct object_update_callback *ouc)
269 {
270         LASSERT(list_empty(&ouc->ouc_item));
271
272         OBD_FREE_PTR(ouc);
273 }
274
275 /**
276  * Insert an @object_update_callback into the the @batch_update_head.
277  *
278  * Usually each update in @batch_update_head will have one correspondent
279  * callback, and these callbacks will be called in ->rq_interpret_reply.
280  */
281 static int
282 batch_insert_update_callback(struct batch_update_head *head, void *data,
283                              object_update_interpret_t interpret)
284 {
285         struct object_update_callback *ouc;
286
287         OBD_ALLOC_PTR(ouc);
288         if (ouc == NULL)
289                 return -ENOMEM;
290
291         INIT_LIST_HEAD(&ouc->ouc_item);
292         ouc->ouc_interpret = interpret;
293         ouc->ouc_head = head;
294         ouc->ouc_data = data;
295         list_add_tail(&ouc->ouc_item, &head->buh_cb_list);
296
297         return 0;
298 }
299
300 /**
301  * Allocate and initialize batch update request.
302  *
303  * @batch_update_head is being used to track updates being executed on
304  * this OBD device. The update buffer will be 4K initially, and increased
305  * if needed.
306  */
307 static struct batch_update_head *
308 batch_update_request_create(struct obd_export *exp, struct lu_batch *bh)
309 {
310         struct batch_update_head *head;
311         int rc;
312
313         OBD_ALLOC_PTR(head);
314         if (head == NULL)
315                 return ERR_PTR(-ENOMEM);
316
317         INIT_LIST_HEAD(&head->buh_cb_list);
318         INIT_LIST_HEAD(&head->buh_buf_list);
319         head->buh_exp = exp;
320         head->buh_batch = bh;
321
322         rc = batch_update_buffer_create(head, PAGE_SIZE);
323         if (rc != 0) {
324                 OBD_FREE_PTR(head);
325                 RETURN(ERR_PTR(rc));
326         }
327
328         return head;
329 }
330
331 static void batch_update_request_destroy(struct batch_update_head *head)
332 {
333         struct batch_update_buffer *bub, *tmp;
334
335         if (head == NULL)
336                 return;
337
338         list_for_each_entry_safe(bub, tmp, &head->buh_buf_list, bub_item) {
339                 list_del(&bub->bub_item);
340                 if (bub->bub_req)
341                         OBD_FREE_LARGE(bub->bub_req, bub->bub_size);
342                 OBD_FREE_PTR(bub);
343         }
344
345         OBD_FREE_PTR(head);
346 }
347
348 static void cli_batch_resend_work(struct work_struct *data);
349
350 static int batch_update_request_fini(struct batch_update_head *head,
351                                      struct ptlrpc_request *req,
352                                      struct batch_update_reply *reply, int rc)
353 {
354         struct object_update_callback *ouc, *next;
355         struct lustre_msg *repmsg = NULL;
356         int count = 0;
357         int index = 0;
358
359         ENTRY;
360
361         if (reply)
362                 count = reply->burp_count;
363
364         list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
365                 int rc1 = 0;
366
367                 /*
368                  * The peer may only have handled some requests (indicated by
369                  * @count) in the packaged OUT PRC, we can only get results
370                  * for the handled part.
371                  */
372                 if (index < count) {
373                         repmsg = batch_update_repmsg_next(reply, repmsg);
374                         if (repmsg == NULL)
375                                 rc1 = -EPROTO;
376                         else
377                                 rc1 = repmsg->lm_result;
378                 } else {
379                         /*
380                          * The peer did not handle these request, let us return
381                          * -ECANCELED to the update interpreter for now.
382                          */
383                         repmsg = NULL;
384                         rc1 = -ECANCELED;
385                         /*
386                          * TODO: resend the unfinished sub request when the
387                          * return code is -EOVERFLOW.
388                          */
389                         if (rc == -EOVERFLOW) {
390                                 struct batch_work_resend *work;
391
392                                 OBD_ALLOC_GFP(work, sizeof(*work), GFP_ATOMIC);
393                                 if (work == NULL) {
394                                         rc1 = -ENOMEM;
395                                 } else {
396                                         INIT_WORK(&work->bwr_work,
397                                                   cli_batch_resend_work);
398                                         work->bwr_head = head;
399                                         work->bwr_index = index;
400                                         schedule_work(&work->bwr_work);
401                                         RETURN(0);
402                                 }
403                         }
404                 }
405
406                 list_del_init(&ouc->ouc_item);
407                 if (ouc->ouc_interpret != NULL)
408                         ouc->ouc_interpret(req, repmsg, ouc, rc1);
409
410                 index++;
411                 object_update_callback_fini(ouc);
412                 if (rc == 0 && rc1 < 0)
413                         rc = rc1;
414         }
415
416         batch_update_request_destroy(head);
417
418         RETURN(rc);
419 }
420
421 static int batch_update_interpret(const struct lu_env *env,
422                                   struct ptlrpc_request *req,
423                                   void *args, int rc)
424 {
425         struct batch_update_args *aa = (struct batch_update_args *)args;
426         struct batch_update_reply *reply = NULL;
427
428         ENTRY;
429
430         if (aa->ba_head == NULL)
431                 RETURN(0);
432
433         ptlrpc_put_mod_rpc_slot(req);
434         /* Unpack the results from the reply message. */
435         if (req->rq_repmsg != NULL && req->rq_replied) {
436                 reply = req_capsule_server_sized_get(&req->rq_pill,
437                                                      &RMF_BUT_REPLY,
438                                                      sizeof(*reply));
439                 if ((reply == NULL ||
440                      reply->burp_magic != BUT_REPLY_MAGIC) && rc == 0)
441                         rc = -EPROTO;
442         }
443
444         rc = batch_update_request_fini(aa->ba_head, req, reply, rc);
445
446         RETURN(rc);
447 }
448
449 static int batch_send_update_req(const struct lu_env *env,
450                                  struct batch_update_head *head)
451 {
452         struct obd_device *obd;
453         struct ptlrpc_request *req = NULL;
454         struct batch_update_args *aa;
455         struct lu_batch *bh;
456         __u32 flags = 0;
457         int rc;
458
459         ENTRY;
460
461         if (head == NULL)
462                 RETURN(0);
463
464         obd = class_exp2obd(head->buh_exp);
465         bh = head->buh_batch;
466         if (bh)
467                 flags = bh->lbt_flags;
468
469         rc = batch_prep_update_req(head, &req);
470         if (rc) {
471                 rc = batch_update_request_fini(head, NULL, NULL, rc);
472                 RETURN(rc);
473         }
474
475         aa = ptlrpc_req_async_args(aa, req);
476         aa->ba_head = head;
477         req->rq_interpret_reply = batch_update_interpret;
478
479         /*
480          * Only acquire modification RPC slot for the batched RPC
481          * which contains metadata updates.
482          */
483         if (!(flags & BATCH_FL_RDONLY))
484                 ptlrpc_get_mod_rpc_slot(req);
485
486         if (flags & BATCH_FL_SYNC) {
487                 rc = ptlrpc_queue_wait(req);
488         } else {
489                 if ((flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
490                     BATCH_FL_RDONLY) {
491                         ptlrpcd_add_req(req);
492                 } else if (flags & BATCH_FL_RQSET) {
493                         ptlrpc_set_add_req(bh->lbt_rqset, req);
494                         ptlrpc_check_set(env, bh->lbt_rqset);
495                 } else {
496                         ptlrpcd_add_req(req);
497                 }
498                 req = NULL;
499         }
500
501         if (req != NULL)
502                 ptlrpc_req_finished(req);
503
504         lprocfs_oh_tally_log2(&obd->u.cli.cl_batch_rpc_hist,
505                               head->buh_update_count);
506         RETURN(rc);
507 }
508
509 static int batch_update_request_add(struct batch_update_head **headp,
510                                     struct md_op_item *item,
511                                     md_update_pack_t packer,
512                                     object_update_interpret_t interpreter)
513 {
514         struct batch_update_head *head = *headp;
515         struct lu_batch *bh = head->buh_batch;
516         struct batch_update_buffer *buf;
517         struct lustre_msg *reqmsg;
518         size_t max_len;
519         int rc;
520
521         ENTRY;
522
523         for (; ;) {
524                 buf = current_batch_update_buffer(head);
525                 LASSERT(buf != NULL);
526                 max_len = buf->bub_size - buf->bub_end;
527                 reqmsg = (struct lustre_msg *)((char *)buf->bub_req +
528                                                 buf->bub_end);
529                 rc = packer(head, reqmsg, &max_len, item);
530                 if (rc == -E2BIG) {
531                         int rc2;
532
533                         /* Create new batch object update buffer */
534                         rc2 = batch_update_buffer_create(head,
535                                 max_len + offsetof(struct batch_update_request,
536                                                    burq_reqmsg[0]) + 1);
537                         if (rc2 != 0) {
538                                 rc = rc2;
539                                 break;
540                         }
541                 } else {
542                         if (rc == 0) {
543                                 buf->bub_end += max_len;
544                                 buf->bub_req->burq_count++;
545                                 head->buh_update_count++;
546                                 head->buh_repsize += reqmsg->lm_repsize;
547                         }
548                         break;
549                 }
550         }
551
552         if (rc)
553                 GOTO(out, rc);
554
555         rc = batch_insert_update_callback(head, item, interpreter);
556         if (rc)
557                 GOTO(out, rc);
558
559         /* Unplug the batch queue if accumulated enough update requests. */
560         if (bh->lbt_max_count && head->buh_update_count >= bh->lbt_max_count) {
561                 rc = batch_send_update_req(NULL, head);
562                 *headp = NULL;
563         }
564 out:
565         if (rc) {
566                 batch_update_request_destroy(head);
567                 *headp = NULL;
568         }
569
570         RETURN(rc);
571 }
572
573 static void cli_batch_resend_work(struct work_struct *data)
574 {
575         struct batch_work_resend *work = container_of(data,
576                                         struct batch_work_resend, bwr_work);
577         struct batch_update_head *obuh = work->bwr_head;
578         struct object_update_callback *ouc;
579         struct batch_update_head *head;
580         struct batch_update_buffer *buf;
581         struct batch_update_buffer *tmp;
582         int index = work->bwr_index;
583         int rc = 0;
584         int i = 0;
585
586         ENTRY;
587
588         head = batch_update_request_create(obuh->buh_exp, NULL);
589         if (head == NULL)
590                 GOTO(err_up, rc = -ENOMEM);
591
592         list_for_each_entry_safe(buf, tmp, &obuh->buh_buf_list, bub_item) {
593                 struct batch_update_request *bur = buf->bub_req;
594                 struct batch_update_buffer *newbuf;
595                 struct lustre_msg *reqmsg = NULL;
596                 size_t max_len;
597                 int j;
598
599                 if (i + bur->burq_count < index) {
600                         i += bur->burq_count;
601                         continue;
602                 }
603
604                 /* reused the allocated buffer */
605                 if (i >= index) {
606                         list_move_tail(&buf->bub_item, &head->buh_buf_list);
607                         head->buh_update_count += buf->bub_req->burq_count;
608                         head->buh_buf_count++;
609                         continue;
610                 }
611
612                 for (j = 0; j < bur->burq_count; j++) {
613                         struct lustre_msg *newmsg;
614                         __u32 msgsz;
615
616                         reqmsg = batch_update_reqmsg_next(bur, reqmsg);
617                         if (i + j < index)
618                                 continue;
619 repeat:
620                         newbuf = current_batch_update_buffer(head);
621                         LASSERT(newbuf != NULL);
622                         max_len = newbuf->bub_size - newbuf->bub_end;
623                         newmsg = (struct lustre_msg *)((char *)newbuf->bub_req +
624                                                        newbuf->bub_end);
625                         msgsz = lustre_packed_msg_size(reqmsg);
626                         if (msgsz >= max_len) {
627                                 int rc2;
628
629                                 /* Create new batch update buffer */
630                                 rc2 = batch_update_buffer_create(head, msgsz +
631                                         offsetof(struct batch_update_request,
632                                                  burq_reqmsg[0]) + 1);
633                                 if (rc2 != 0)
634                                         GOTO(err_up, rc = rc2);
635                                 GOTO(repeat, rc);
636                         }
637
638                         memcpy(newmsg, reqmsg, msgsz);
639                         newbuf->bub_end += msgsz;
640                         newbuf->bub_req->burq_count++;
641                         head->buh_update_count++;
642                 }
643
644                 i = index;
645         }
646
647         list_splice_init(&obuh->buh_cb_list, &head->buh_cb_list);
648         list_for_each_entry(ouc, &head->buh_cb_list, ouc_item)
649                 ouc->ouc_head = head;
650
651         head->buh_repsize = BUT_MAXREPSIZE - SPTLRPC_MAX_PAYLOAD;
652         rc = batch_send_update_req(NULL, head);
653         if (rc)
654                 GOTO(err_up, rc);
655
656         batch_update_request_destroy(obuh);
657         OBD_FREE_PTR(work);
658         RETURN_EXIT;
659
660 err_up:
661         batch_update_request_fini(obuh, NULL, NULL, rc);
662         if (head != NULL)
663                 batch_update_request_fini(head, NULL, NULL, rc);
664
665         OBD_FREE_PTR(work);
666         RETURN_EXIT;
667 }
668
669 struct lu_batch *cli_batch_create(struct obd_export *exp,
670                                   enum lu_batch_flags flags, __u32 max_count)
671 {
672         struct cli_batch *cbh;
673         struct lu_batch *bh;
674
675         ENTRY;
676
677         OBD_ALLOC_PTR(cbh);
678         if (!cbh)
679                 RETURN(ERR_PTR(-ENOMEM));
680
681         bh = &cbh->cbh_super;
682         bh->lbt_result = 0;
683         bh->lbt_flags = flags;
684         bh->lbt_max_count = max_count;
685
686         cbh->cbh_head = batch_update_request_create(exp, bh);
687         if (IS_ERR(cbh->cbh_head)) {
688                 bh = (struct lu_batch *)cbh->cbh_head;
689                 OBD_FREE_PTR(cbh);
690         }
691
692         RETURN(bh);
693 }
694 EXPORT_SYMBOL(cli_batch_create);
695
696 int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh)
697 {
698         struct cli_batch *cbh;
699         int rc;
700
701         ENTRY;
702
703         cbh = container_of(bh, struct cli_batch, cbh_super);
704         rc = batch_send_update_req(NULL, cbh->cbh_head);
705
706         OBD_FREE_PTR(cbh);
707         RETURN(rc);
708 }
709 EXPORT_SYMBOL(cli_batch_stop);
710
711 int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait)
712 {
713         struct cli_batch *cbh;
714         int rc;
715
716         ENTRY;
717
718         cbh = container_of(bh, struct cli_batch, cbh_super);
719         if (cbh->cbh_head == NULL)
720                 RETURN(0);
721
722         rc = batch_send_update_req(NULL, cbh->cbh_head);
723         cbh->cbh_head = NULL;
724
725         RETURN(rc);
726 }
727 EXPORT_SYMBOL(cli_batch_flush);
728
729 int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
730                   struct md_op_item *item, md_update_pack_t packer,
731                   object_update_interpret_t interpreter)
732 {
733         struct cli_batch *cbh;
734         int rc;
735
736         ENTRY;
737
738         cbh = container_of(bh, struct cli_batch, cbh_super);
739         if (cbh->cbh_head == NULL) {
740                 cbh->cbh_head = batch_update_request_create(exp, bh);
741                 if (IS_ERR(cbh->cbh_head))
742                         RETURN(PTR_ERR(cbh->cbh_head));
743         }
744
745         rc = batch_update_request_add(&cbh->cbh_head, item,
746                                       packer, interpreter);
747
748         RETURN(rc);
749 }
750 EXPORT_SYMBOL(cli_batch_add);