Whamcloud - gitweb
LU-17402 kernel: RHEL 8.10 client and server support
[fs/lustre-release.git] / lustre / ptlrpc / batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2020, 2022, DDN/Whamcloud Storage Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * lustre/ptlrpc/batch.c
30  *
31  * Batch Metadata Updating on the client
32  *
33  * Author: Qian Yingjin <qian@ddn.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDC
37
38 #include <linux/module.h>
39 #include <obd_class.h>
40 #include <obd.h>
41 #ifdef HAVE_SERVER_SUPPORT
42 #include <lustre_update.h>
43 #else
44
45 #define OUT_UPDATE_REPLY_SIZE           4096
46
47 static inline struct lustre_msg *
48 batch_update_reqmsg_next(struct batch_update_request *bur,
49                          struct lustre_msg *reqmsg)
50 {
51         if (reqmsg)
52                 return (struct lustre_msg *)((char *)reqmsg +
53                                              lustre_packed_msg_size(reqmsg));
54         else
55                 return &bur->burq_reqmsg[0];
56 }
57
58 static inline struct lustre_msg *
59 batch_update_repmsg_next(struct batch_update_reply *bur,
60                          struct lustre_msg *repmsg)
61 {
62         if (repmsg)
63                 return (struct lustre_msg *)((char *)repmsg +
64                                              lustre_packed_msg_size(repmsg));
65         else
66                 return &bur->burp_repmsg[0];
67 }
68 #endif
69
70 struct batch_update_buffer {
71         struct batch_update_request     *bub_req;
72         size_t                           bub_size;
73         size_t                           bub_end;
74         struct list_head                 bub_item;
75 };
76
77 struct batch_update_args {
78         struct batch_update_head        *ba_head;
79 };
80
81 struct batch_work_resend {
82         struct work_struct               bwr_work;
83         struct batch_update_head        *bwr_head;
84         int                              bwr_index;
85 };
86
87 /**
88  * Prepare inline update request
89  *
90  * Prepare BUT update ptlrpc inline request, and the request usuanlly includes
91  * one update buffer, which does not need bulk transfer.
92  */
93 static int batch_prep_inline_update_req(struct batch_update_head *head,
94                                         struct ptlrpc_request *req,
95                                         int repsize)
96 {
97         struct batch_update_buffer *buf;
98         struct but_update_header *buh;
99         int rc;
100
101         buf = list_entry(head->buh_buf_list.next,
102                           struct batch_update_buffer, bub_item);
103         req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
104                              buf->bub_end + sizeof(*buh));
105
106         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
107         if (rc != 0)
108                 RETURN(rc);
109
110         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
111         buh->buh_magic = BUT_HEADER_MAGIC;
112         buh->buh_count = 1;
113         buh->buh_inline_length = buf->bub_end;
114         buh->buh_reply_size = repsize;
115         buh->buh_update_count = head->buh_update_count;
116
117         memcpy(buh->buh_inline_data, buf->bub_req, buf->bub_end);
118
119         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
120                              RCL_SERVER, repsize);
121
122         ptlrpc_request_set_replen(req);
123         req->rq_request_portal = OUT_PORTAL;
124         req->rq_reply_portal = OSC_REPLY_PORTAL;
125
126         RETURN(rc);
127 }
128
129 static int batch_prep_update_req(struct batch_update_head *head,
130                                  struct ptlrpc_request **reqp)
131 {
132         struct ptlrpc_request *req;
133         struct ptlrpc_bulk_desc *desc;
134         struct batch_update_buffer *buf;
135         struct but_update_header *buh;
136         struct but_update_buffer *bub;
137         int repsize = head->buh_repsize;
138         int page_count = 0;
139         int total = 0;
140         int rc;
141
142         ENTRY;
143         repsize += round_up(offsetof(struct batch_update_reply, burp_repmsg[0]), 8);
144         if (repsize < OUT_UPDATE_REPLY_SIZE)
145                 repsize = OUT_UPDATE_REPLY_SIZE;
146
147         LASSERT(head->buh_buf_count > 0);
148
149         req = ptlrpc_request_alloc(class_exp2cliimp(head->buh_exp),
150                                    &RQF_MDS_BATCH);
151         if (req == NULL)
152                 RETURN(-ENOMEM);
153
154         if (head->buh_buf_count == 1) {
155                 buf = list_entry(head->buh_buf_list.next,
156                                  struct batch_update_buffer, bub_item);
157
158                 /* Check whether it can be packed inline */
159                 if (buf->bub_end + sizeof(struct but_update_header) <
160                     OUT_UPDATE_MAX_INLINE_SIZE) {
161                         rc = batch_prep_inline_update_req(head, req, repsize);
162                         if (rc == 0)
163                                 *reqp = req;
164                         GOTO(out_req, rc);
165                 }
166         }
167
168         req_capsule_set_size(&req->rq_pill, &RMF_BUT_HEADER, RCL_CLIENT,
169                              sizeof(struct but_update_header));
170         req_capsule_set_size(&req->rq_pill, &RMF_BUT_BUF, RCL_CLIENT,
171                              head->buh_buf_count * sizeof(*bub));
172
173         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_BATCH);
174         if (rc != 0)
175                 GOTO(out_req, rc);
176
177         buh = req_capsule_client_get(&req->rq_pill, &RMF_BUT_HEADER);
178         buh->buh_magic = BUT_HEADER_MAGIC;
179         buh->buh_count = head->buh_buf_count;
180         buh->buh_inline_length = 0;
181         buh->buh_reply_size = repsize;
182         buh->buh_update_count = head->buh_update_count;
183         bub = req_capsule_client_get(&req->rq_pill, &RMF_BUT_BUF);
184         list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
185                 bub->bub_size = buf->bub_size;
186                 bub++;
187                 /* First *and* last might be partial pages, hence +1 */
188                 page_count += DIV_ROUND_UP(buf->bub_size, PAGE_SIZE) + 1;
189         }
190
191         req->rq_bulk_write = 1;
192         desc = ptlrpc_prep_bulk_imp(req, page_count,
193                                     MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
194                                     PTLRPC_BULK_GET_SOURCE,
195                                     MDS_BULK_PORTAL,
196                                     &ptlrpc_bulk_kiov_nopin_ops);
197         if (desc == NULL)
198                 GOTO(out_req, rc = -ENOMEM);
199
200         list_for_each_entry(buf, &head->buh_buf_list, bub_item) {
201                 desc->bd_frag_ops->add_iov_frag(desc, buf->bub_req,
202                                                 buf->bub_size);
203                 total += buf->bub_size;
204         }
205         CDEBUG(D_OTHER, "Total %d in %u\n", total, head->buh_update_count);
206
207         req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
208                              RCL_SERVER, repsize);
209
210         ptlrpc_request_set_replen(req);
211         req->rq_request_portal = OUT_PORTAL;
212         req->rq_reply_portal = OSC_REPLY_PORTAL;
213         *reqp = req;
214
215 out_req:
216         if (rc < 0)
217                 ptlrpc_req_put(req);
218
219         RETURN(rc);
220 }
221
222 static struct batch_update_buffer *
223 current_batch_update_buffer(struct batch_update_head *head)
224 {
225         if (list_empty(&head->buh_buf_list))
226                 return NULL;
227
228         return list_entry(head->buh_buf_list.prev, struct batch_update_buffer,
229                           bub_item);
230 }
231
232 static int batch_update_buffer_create(struct batch_update_head *head,
233                                       size_t size)
234 {
235         struct batch_update_buffer *buf;
236         struct batch_update_request *bur;
237
238         OBD_ALLOC_PTR(buf);
239         if (buf == NULL)
240                 return -ENOMEM;
241
242         LASSERT(size > 0);
243         size = round_up(size, PAGE_SIZE);
244         OBD_ALLOC_LARGE(bur, size);
245         if (bur == NULL) {
246                 OBD_FREE_PTR(buf);
247                 return -ENOMEM;
248         }
249
250         bur->burq_magic = BUT_REQUEST_MAGIC;
251         bur->burq_count = 0;
252         buf->bub_req = bur;
253         buf->bub_size = size;
254         buf->bub_end = sizeof(*bur);
255         INIT_LIST_HEAD(&buf->bub_item);
256         list_add_tail(&buf->bub_item, &head->buh_buf_list);
257         head->buh_buf_count++;
258
259         return 0;
260 }
261
262 /**
263  * Destroy an @object_update_callback.
264  */
265 static void object_update_callback_fini(struct object_update_callback *ouc)
266 {
267         LASSERT(list_empty(&ouc->ouc_item));
268
269         OBD_FREE_PTR(ouc);
270 }
271
272 /**
273  * Insert an @object_update_callback into the the @batch_update_head.
274  *
275  * Usually each update in @batch_update_head will have one correspondent
276  * callback, and these callbacks will be called in ->rq_interpret_reply.
277  */
278 static int
279 batch_insert_update_callback(struct batch_update_head *head, void *data,
280                              object_update_interpret_t interpret)
281 {
282         struct object_update_callback *ouc;
283
284         OBD_ALLOC_PTR(ouc);
285         if (ouc == NULL)
286                 return -ENOMEM;
287
288         INIT_LIST_HEAD(&ouc->ouc_item);
289         ouc->ouc_interpret = interpret;
290         ouc->ouc_head = head;
291         ouc->ouc_data = data;
292         list_add_tail(&ouc->ouc_item, &head->buh_cb_list);
293
294         return 0;
295 }
296
297 /**
298  * Allocate and initialize batch update request.
299  *
300  * @batch_update_head is being used to track updates being executed on
301  * this OBD device. The update buffer will be 4K initially, and increased
302  * if needed.
303  */
304 static struct batch_update_head *
305 batch_update_request_create(struct obd_export *exp, struct lu_batch *bh)
306 {
307         struct batch_update_head *head;
308         int rc;
309
310         OBD_ALLOC_PTR(head);
311         if (head == NULL)
312                 return ERR_PTR(-ENOMEM);
313
314         INIT_LIST_HEAD(&head->buh_cb_list);
315         INIT_LIST_HEAD(&head->buh_buf_list);
316         head->buh_exp = exp;
317         head->buh_batch = bh;
318
319         rc = batch_update_buffer_create(head, PAGE_SIZE);
320         if (rc != 0) {
321                 OBD_FREE_PTR(head);
322                 RETURN(ERR_PTR(rc));
323         }
324
325         return head;
326 }
327
328 static void batch_update_request_destroy(struct batch_update_head *head)
329 {
330         struct batch_update_buffer *bub, *tmp;
331
332         if (head == NULL)
333                 return;
334
335         list_for_each_entry_safe(bub, tmp, &head->buh_buf_list, bub_item) {
336                 list_del(&bub->bub_item);
337                 if (bub->bub_req)
338                         OBD_FREE_LARGE(bub->bub_req, bub->bub_size);
339                 OBD_FREE_PTR(bub);
340         }
341
342         OBD_FREE_PTR(head);
343 }
344
345 static void cli_batch_resend_work(struct work_struct *data);
346
347 static int batch_update_request_fini(struct batch_update_head *head,
348                                      struct ptlrpc_request *req,
349                                      struct batch_update_reply *reply, int rc)
350 {
351         struct object_update_callback *ouc, *next;
352         struct lustre_msg *repmsg = NULL;
353         int count = 0;
354         int index = 0;
355
356         ENTRY;
357
358         if (reply)
359                 count = reply->burp_count;
360
361         list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
362                 int rc1 = 0;
363
364                 /*
365                  * The peer may only have handled some requests (indicated by
366                  * @count) in the packaged OUT PRC, we can only get results
367                  * for the handled part.
368                  */
369                 if (index < count) {
370                         repmsg = batch_update_repmsg_next(reply, repmsg);
371                         if (repmsg == NULL)
372                                 rc1 = -EPROTO;
373                         else
374                                 rc1 = repmsg->lm_result;
375                 } else {
376                         /*
377                          * The peer did not handle these request, let us return
378                          * -ECANCELED to the update interpreter for now.
379                          */
380                         repmsg = NULL;
381                         rc1 = -ECANCELED;
382                         /*
383                          * TODO: resend the unfinished sub request when the
384                          * return code is -EOVERFLOW.
385                          */
386                         if (rc == -EOVERFLOW) {
387                                 struct batch_work_resend *work;
388
389                                 OBD_ALLOC_GFP(work, sizeof(*work), GFP_ATOMIC);
390                                 if (work == NULL) {
391                                         rc1 = -ENOMEM;
392                                 } else {
393                                         INIT_WORK(&work->bwr_work,
394                                                   cli_batch_resend_work);
395                                         work->bwr_head = head;
396                                         work->bwr_index = index;
397                                         schedule_work(&work->bwr_work);
398                                         RETURN(0);
399                                 }
400                         }
401                 }
402
403                 list_del_init(&ouc->ouc_item);
404                 if (ouc->ouc_interpret != NULL)
405                         ouc->ouc_interpret(req, repmsg, ouc, rc1);
406
407                 index++;
408                 object_update_callback_fini(ouc);
409                 if (rc == 0 && rc1 < 0)
410                         rc = rc1;
411         }
412
413         batch_update_request_destroy(head);
414
415         RETURN(rc);
416 }
417
418 static int batch_update_interpret(const struct lu_env *env,
419                                   struct ptlrpc_request *req,
420                                   void *args, int rc)
421 {
422         struct batch_update_args *aa = (struct batch_update_args *)args;
423         struct batch_update_reply *reply = NULL;
424
425         ENTRY;
426
427         if (aa->ba_head == NULL)
428                 RETURN(0);
429
430         ptlrpc_put_mod_rpc_slot(req);
431         /* Unpack the results from the reply message. */
432         if (req->rq_repmsg != NULL && req->rq_replied) {
433                 reply = req_capsule_server_sized_get(&req->rq_pill,
434                                                      &RMF_BUT_REPLY,
435                                                      sizeof(*reply));
436                 if ((reply == NULL ||
437                      reply->burp_magic != BUT_REPLY_MAGIC) && rc == 0)
438                         rc = -EPROTO;
439         }
440
441         rc = batch_update_request_fini(aa->ba_head, req, reply, rc);
442
443         RETURN(rc);
444 }
445
446 static int batch_send_update_req(const struct lu_env *env,
447                                  struct batch_update_head *head)
448 {
449         struct obd_device *obd;
450         struct ptlrpc_request *req = NULL;
451         struct batch_update_args *aa;
452         struct lu_batch *bh;
453         __u32 flags = 0;
454         int rc;
455
456         ENTRY;
457
458         if (head == NULL)
459                 RETURN(0);
460
461         obd = class_exp2obd(head->buh_exp);
462         bh = head->buh_batch;
463         if (bh)
464                 flags = bh->lbt_flags;
465
466         rc = batch_prep_update_req(head, &req);
467         if (rc) {
468                 rc = batch_update_request_fini(head, NULL, NULL, rc);
469                 RETURN(rc);
470         }
471
472         aa = ptlrpc_req_async_args(aa, req);
473         aa->ba_head = head;
474         req->rq_interpret_reply = batch_update_interpret;
475
476         /*
477          * Only acquire modification RPC slot for the batched RPC
478          * which contains metadata updates.
479          */
480         if (!(flags & BATCH_FL_RDONLY))
481                 ptlrpc_get_mod_rpc_slot(req);
482
483         if (flags & BATCH_FL_SYNC) {
484                 rc = ptlrpc_queue_wait(req);
485         } else {
486                 if (bh && (flags & BATCH_FL_RQSET)) {
487                         ptlrpc_set_add_req(bh->lbt_rqset, req);
488                         ptlrpc_check_set(env, bh->lbt_rqset);
489                 } else {
490                         ptlrpcd_add_req(req);
491                 }
492                 req = NULL;
493         }
494
495         if (req != NULL)
496                 ptlrpc_req_put(req);
497
498         lprocfs_oh_tally_log2(&obd->u.cli.cl_batch_rpc_hist,
499                               head->buh_update_count);
500         RETURN(rc);
501 }
502
503 static int batch_update_request_add(struct batch_update_head **headp,
504                                     struct md_op_item *item,
505                                     md_update_pack_t packer,
506                                     object_update_interpret_t interpreter)
507 {
508         struct batch_update_head *head = *headp;
509         struct lu_batch *bh = head->buh_batch;
510         struct batch_update_buffer *buf;
511         struct lustre_msg *reqmsg;
512         size_t max_len;
513         int rc;
514
515         ENTRY;
516
517         for (; ;) {
518                 buf = current_batch_update_buffer(head);
519                 LASSERT(buf != NULL);
520                 max_len = buf->bub_size - buf->bub_end;
521                 reqmsg = (struct lustre_msg *)((char *)buf->bub_req +
522                                                 buf->bub_end);
523                 rc = packer(head, reqmsg, &max_len, item);
524                 if (rc == -E2BIG) {
525                         int rc2;
526
527                         /* Create new batch object update buffer */
528                         rc2 = batch_update_buffer_create(head,
529                                 max_len + offsetof(struct batch_update_request,
530                                                    burq_reqmsg[0]) + 1);
531                         if (rc2 != 0) {
532                                 rc = rc2;
533                                 break;
534                         }
535                 } else {
536                         if (rc == 0) {
537                                 buf->bub_end += max_len;
538                                 buf->bub_req->burq_count++;
539                                 head->buh_update_count++;
540                                 head->buh_repsize += reqmsg->lm_repsize;
541                         }
542                         break;
543                 }
544         }
545
546         if (rc)
547                 GOTO(out, rc);
548
549         rc = batch_insert_update_callback(head, item, interpreter);
550         if (rc)
551                 GOTO(out, rc);
552
553         /* Unplug the batch queue if accumulated enough update requests. */
554         if (bh->lbt_max_count && head->buh_update_count >= bh->lbt_max_count) {
555                 rc = batch_send_update_req(NULL, head);
556                 *headp = NULL;
557         }
558 out:
559         if (rc) {
560                 batch_update_request_destroy(head);
561                 *headp = NULL;
562         }
563
564         RETURN(rc);
565 }
566
567 static void cli_batch_resend_work(struct work_struct *data)
568 {
569         struct batch_work_resend *work = container_of(data,
570                                         struct batch_work_resend, bwr_work);
571         struct batch_update_head *obuh = work->bwr_head;
572         struct object_update_callback *ouc;
573         struct batch_update_head *head;
574         struct batch_update_buffer *buf;
575         struct batch_update_buffer *tmp;
576         int index = work->bwr_index;
577         int rc = 0;
578         int i = 0;
579
580         ENTRY;
581
582         head = batch_update_request_create(obuh->buh_exp, NULL);
583         if (head == NULL)
584                 GOTO(err_up, rc = -ENOMEM);
585
586         list_for_each_entry_safe(buf, tmp, &obuh->buh_buf_list, bub_item) {
587                 struct batch_update_request *bur = buf->bub_req;
588                 struct batch_update_buffer *newbuf;
589                 struct lustre_msg *reqmsg = NULL;
590                 size_t max_len;
591                 int j;
592
593                 if (i + bur->burq_count < index) {
594                         i += bur->burq_count;
595                         continue;
596                 }
597
598                 /* reused the allocated buffer */
599                 if (i >= index) {
600                         list_move_tail(&buf->bub_item, &head->buh_buf_list);
601                         head->buh_update_count += buf->bub_req->burq_count;
602                         head->buh_buf_count++;
603                         continue;
604                 }
605
606                 for (j = 0; j < bur->burq_count; j++) {
607                         struct lustre_msg *newmsg;
608                         __u32 msgsz;
609
610                         reqmsg = batch_update_reqmsg_next(bur, reqmsg);
611                         if (i + j < index)
612                                 continue;
613 repeat:
614                         newbuf = current_batch_update_buffer(head);
615                         LASSERT(newbuf != NULL);
616                         max_len = newbuf->bub_size - newbuf->bub_end;
617                         newmsg = (struct lustre_msg *)((char *)newbuf->bub_req +
618                                                        newbuf->bub_end);
619                         msgsz = lustre_packed_msg_size(reqmsg);
620                         if (msgsz >= max_len) {
621                                 int rc2;
622
623                                 /* Create new batch update buffer */
624                                 rc2 = batch_update_buffer_create(head, msgsz +
625                                         offsetof(struct batch_update_request,
626                                                  burq_reqmsg[0]) + 1);
627                                 if (rc2 != 0)
628                                         GOTO(err_up, rc = rc2);
629                                 GOTO(repeat, rc);
630                         }
631
632                         memcpy(newmsg, reqmsg, msgsz);
633                         newbuf->bub_end += msgsz;
634                         newbuf->bub_req->burq_count++;
635                         head->buh_update_count++;
636                 }
637
638                 i = index;
639         }
640
641         list_splice_init(&obuh->buh_cb_list, &head->buh_cb_list);
642         list_for_each_entry(ouc, &head->buh_cb_list, ouc_item)
643                 ouc->ouc_head = head;
644
645         head->buh_repsize = BUT_MAXREPSIZE - SPTLRPC_MAX_PAYLOAD;
646         rc = batch_send_update_req(NULL, head);
647         if (rc)
648                 GOTO(err_up, rc);
649
650         batch_update_request_destroy(obuh);
651         OBD_FREE_PTR(work);
652         RETURN_EXIT;
653
654 err_up:
655         batch_update_request_fini(obuh, NULL, NULL, rc);
656         if (head != NULL)
657                 batch_update_request_fini(head, NULL, NULL, rc);
658
659         OBD_FREE_PTR(work);
660         RETURN_EXIT;
661 }
662
663 struct lu_batch *cli_batch_create(struct obd_export *exp,
664                                   enum lu_batch_flags flags, __u32 max_count)
665 {
666         struct cli_batch *cbh;
667         struct lu_batch *bh;
668
669         ENTRY;
670
671         OBD_ALLOC_PTR(cbh);
672         if (!cbh)
673                 RETURN(ERR_PTR(-ENOMEM));
674
675         bh = &cbh->cbh_super;
676         bh->lbt_result = 0;
677         bh->lbt_flags = flags;
678         bh->lbt_max_count = max_count;
679
680         cbh->cbh_head = batch_update_request_create(exp, bh);
681         if (IS_ERR(cbh->cbh_head)) {
682                 bh = (struct lu_batch *)cbh->cbh_head;
683                 OBD_FREE_PTR(cbh);
684         }
685
686         RETURN(bh);
687 }
688 EXPORT_SYMBOL(cli_batch_create);
689
690 int cli_batch_stop(struct obd_export *exp, struct lu_batch *bh)
691 {
692         struct cli_batch *cbh;
693         int rc;
694
695         ENTRY;
696
697         cbh = container_of(bh, struct cli_batch, cbh_super);
698         rc = batch_send_update_req(NULL, cbh->cbh_head);
699
700         OBD_FREE_PTR(cbh);
701         RETURN(rc);
702 }
703 EXPORT_SYMBOL(cli_batch_stop);
704
705 int cli_batch_flush(struct obd_export *exp, struct lu_batch *bh, bool wait)
706 {
707         struct cli_batch *cbh;
708         int rc;
709
710         ENTRY;
711
712         cbh = container_of(bh, struct cli_batch, cbh_super);
713         if (cbh->cbh_head == NULL)
714                 RETURN(0);
715
716         rc = batch_send_update_req(NULL, cbh->cbh_head);
717         cbh->cbh_head = NULL;
718
719         RETURN(rc);
720 }
721 EXPORT_SYMBOL(cli_batch_flush);
722
723 int cli_batch_add(struct obd_export *exp, struct lu_batch *bh,
724                   struct md_op_item *item, md_update_pack_t packer,
725                   object_update_interpret_t interpreter)
726 {
727         struct cli_batch *cbh;
728         int rc;
729
730         ENTRY;
731
732         cbh = container_of(bh, struct cli_batch, cbh_super);
733         if (cbh->cbh_head == NULL) {
734                 cbh->cbh_head = batch_update_request_create(exp, bh);
735                 if (IS_ERR(cbh->cbh_head))
736                         RETURN(PTR_ERR(cbh->cbh_head));
737         }
738
739         rc = batch_update_request_add(&cbh->cbh_head, item,
740                                       packer, interpreter);
741
742         RETURN(rc);
743 }
744 EXPORT_SYMBOL(cli_batch_add);