Whamcloud - gitweb
LU-15550 ptlrpc: retry mechanism for overflowed batched RPCs 40/46540/9
authorQian Yingjin <qian@ddn.com>
Thu, 17 Feb 2022 03:42:14 +0000 (22:42 -0500)
committerOleg Drokin <green@whamcloud.com>
Mon, 1 May 2023 04:08:07 +0000 (04:08 +0000)
Before send the batched RPC, the client has no idea about the
actual reply buffer size. The reply buffer size prepared by a
client may be smalller than the reply buffer buffer size in need.
We already have the patch to grow the reply buffer properly in
most cases.

However, when the reply buffer size is growing larger than
BUT_MAXREPSIZE (1000 * 1024), the server will return -EOVERFLOW
error code. At this time, the server only executed the partial
sub requests in the batched RPC. The overflowed sub requests are
not handled.

In this patch, it adds a retry mechanism for overflowed batched
RPC. When found that the reply buffer overflowed, the client will
rebuild the batched RPC for the unhandled sub requests, and use
work queue mechanism to resend the new batched RPC to the server
to re-execute then again.

Add the test case sanity test_123f to verify it for large LOV
stripes with overstriping.

Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: If84fad32f2026bd34ffb47b3e163f84a9d950dbb
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/46540
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Mikhail Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/ptlrpc/batch.c
lustre/tests/sanity.sh

index 6381cc0..59050ac 100644 (file)
 #define OUT_UPDATE_REPLY_SIZE          4096
 
 static inline struct lustre_msg *
+batch_update_reqmsg_next(struct batch_update_request *bur,
+                        struct lustre_msg *reqmsg)
+{
+       if (reqmsg)
+               return (struct lustre_msg *)((char *)reqmsg +
+                                            lustre_packed_msg_size(reqmsg));
+       else
+               return &bur->burq_reqmsg[0];
+}
+
+static inline struct lustre_msg *
 batch_update_repmsg_next(struct batch_update_reply *bur,
                         struct lustre_msg *repmsg)
 {
@@ -67,6 +78,12 @@ struct batch_update_args {
        struct batch_update_head        *ba_head;
 };
 
+struct batch_work_resend {
+       struct work_struct               bwr_work;
+       struct batch_update_head        *bwr_head;
+       int                              bwr_index;
+};
+
 /**
  * Prepare inline update request
  *
@@ -328,6 +345,8 @@ static void batch_update_request_destroy(struct batch_update_head *head)
        OBD_FREE_PTR(head);
 }
 
+static void cli_batch_resend_work(struct work_struct *data);
+
 static int batch_update_request_fini(struct batch_update_head *head,
                                     struct ptlrpc_request *req,
                                     struct batch_update_reply *reply, int rc)
@@ -345,8 +364,6 @@ static int batch_update_request_fini(struct batch_update_head *head,
        list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) {
                int rc1 = 0;
 
-               list_del_init(&ouc->ouc_item);
-
                /*
                 * The peer may only have handled some requests (indicated by
                 * @count) in the packaged OUT PRC, we can only get results
@@ -369,8 +386,24 @@ static int batch_update_request_fini(struct batch_update_head *head,
                         * TODO: resend the unfinished sub request when the
                         * return code is -EOVERFLOW.
                         */
+                       if (rc == -EOVERFLOW) {
+                               struct batch_work_resend *work;
+
+                               OBD_ALLOC_GFP(work, sizeof(*work), GFP_ATOMIC);
+                               if (work == NULL) {
+                                       rc1 = -ENOMEM;
+                               } else {
+                                       INIT_WORK(&work->bwr_work,
+                                                 cli_batch_resend_work);
+                                       work->bwr_head = head;
+                                       work->bwr_index = index;
+                                       schedule_work(&work->bwr_work);
+                                       RETURN(0);
+                               }
+                       }
                }
 
+               list_del_init(&ouc->ouc_item);
                if (ouc->ouc_interpret != NULL)
                        ouc->ouc_interpret(req, repmsg, ouc, rc1);
 
@@ -420,6 +453,7 @@ static int batch_send_update_req(const struct lu_env *env,
        struct ptlrpc_request *req = NULL;
        struct batch_update_args *aa;
        struct lu_batch *bh;
+       __u32 flags = 0;
        int rc;
 
        ENTRY;
@@ -429,6 +463,9 @@ static int batch_send_update_req(const struct lu_env *env,
 
        obd = class_exp2obd(head->buh_exp);
        bh = head->buh_batch;
+       if (bh)
+               flags = bh->lbt_flags;
+
        rc = batch_prep_update_req(head, &req);
        if (rc) {
                rc = batch_update_request_fini(head, NULL, NULL, rc);
@@ -443,16 +480,16 @@ static int batch_send_update_req(const struct lu_env *env,
         * Only acquire modification RPC slot for the batched RPC
         * which contains metadata updates.
         */
-       if (!(bh->lbt_flags & BATCH_FL_RDONLY))
+       if (!(flags & BATCH_FL_RDONLY))
                ptlrpc_get_mod_rpc_slot(req);
 
-       if (bh->lbt_flags & BATCH_FL_SYNC) {
+       if (flags & BATCH_FL_SYNC) {
                rc = ptlrpc_queue_wait(req);
        } else {
-               if ((bh->lbt_flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
+               if ((flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
                    BATCH_FL_RDONLY) {
                        ptlrpcd_add_req(req);
-               } else if (bh->lbt_flags & BATCH_FL_RQSET) {
+               } else if (flags & BATCH_FL_RQSET) {
                        ptlrpc_set_add_req(bh->lbt_rqset, req);
                        ptlrpc_check_set(env, bh->lbt_rqset);
                } else {
@@ -533,6 +570,102 @@ out:
        RETURN(rc);
 }
 
+static void cli_batch_resend_work(struct work_struct *data)
+{
+       struct batch_work_resend *work = container_of(data,
+                                       struct batch_work_resend, bwr_work);
+       struct batch_update_head *obuh = work->bwr_head;
+       struct object_update_callback *ouc;
+       struct batch_update_head *head;
+       struct batch_update_buffer *buf;
+       struct batch_update_buffer *tmp;
+       int index = work->bwr_index;
+       int rc = 0;
+       int i = 0;
+
+       ENTRY;
+
+       head = batch_update_request_create(obuh->buh_exp, NULL);
+       if (head == NULL)
+               GOTO(err_up, rc = -ENOMEM);
+
+       list_for_each_entry_safe(buf, tmp, &obuh->buh_buf_list, bub_item) {
+               struct batch_update_request *bur = buf->bub_req;
+               struct batch_update_buffer *newbuf;
+               struct lustre_msg *reqmsg = NULL;
+               size_t max_len;
+               int j;
+
+               if (i + bur->burq_count < index) {
+                       i += bur->burq_count;
+                       continue;
+               }
+
+               /* reused the allocated buffer */
+               if (i >= index) {
+                       list_move_tail(&buf->bub_item, &head->buh_buf_list);
+                       head->buh_update_count += buf->bub_req->burq_count;
+                       head->buh_buf_count++;
+                       continue;
+               }
+
+               for (j = 0; j < bur->burq_count; j++) {
+                       struct lustre_msg *newmsg;
+                       __u32 msgsz;
+
+                       reqmsg = batch_update_reqmsg_next(bur, reqmsg);
+                       if (i + j < index)
+                               continue;
+repeat:
+                       newbuf = current_batch_update_buffer(head);
+                       LASSERT(newbuf != NULL);
+                       max_len = newbuf->bub_size - newbuf->bub_end;
+                       newmsg = (struct lustre_msg *)((char *)newbuf->bub_req +
+                                                      newbuf->bub_end);
+                       msgsz = lustre_packed_msg_size(reqmsg);
+                       if (msgsz >= max_len) {
+                               int rc2;
+
+                               /* Create new batch update buffer */
+                               rc2 = batch_update_buffer_create(head, msgsz +
+                                       offsetof(struct batch_update_request,
+                                                burq_reqmsg[0]) + 1);
+                               if (rc2 != 0)
+                                       GOTO(err_up, rc = rc2);
+                               GOTO(repeat, rc);
+                       }
+
+                       memcpy(newmsg, reqmsg, msgsz);
+                       newbuf->bub_end += msgsz;
+                       newbuf->bub_req->burq_count++;
+                       head->buh_update_count++;
+               }
+
+               i = index;
+       }
+
+       list_splice_init(&obuh->buh_cb_list, &head->buh_cb_list);
+       list_for_each_entry(ouc, &head->buh_cb_list, ouc_item)
+               ouc->ouc_head = head;
+
+       head->buh_repsize = BUT_MAXREPSIZE - SPTLRPC_MAX_PAYLOAD;
+       rc = batch_send_update_req(NULL, head);
+       if (rc)
+               GOTO(err_up, rc);
+
+       batch_update_request_destroy(obuh);
+       OBD_FREE_PTR(work);
+       RETURN_EXIT;
+
+err_up:
+       batch_update_request_fini(obuh, NULL, NULL, rc);
+       if (head != NULL)
+               batch_update_request_fini(head, NULL, NULL, rc);
+
+       OBD_FREE_PTR(work);
+       RETURN_EXIT;
+}
+
 struct lu_batch *cli_batch_create(struct obd_export *exp,
                                  enum lu_batch_flags flags, __u32 max_count)
 {
index c58379a..c91cc3f 100755 (executable)
@@ -13869,6 +13869,31 @@ test_123e() {
 }
 run_test 123e "statahead with large wide striping"
 
+test_123f() {
+       local max
+       local batch_max
+       local dir=$DIR/$tdir
+
+       mkdir $dir || error "mkdir $dir failed"
+       $LFS setstripe -C 1000 $dir || error "setstripe $dir failed"
+
+       touch $dir/$tfile.{0..200} || error "touch 200 files failed"
+
+       max=$($LCTL get_param -n llite.*.statahead_max | head -n 1)
+       batch_max=$($LCTL get_param -n llite.*.statahead_batch_max | head -n 1)
+
+       $LCTL set_param llite.*.statahead_max=64
+       $LCTL set_param llite.*.statahead_batch_max=64
+
+       ls -l $dir
+       lctl get_param mdc.*.batch_stats
+       lctl get_param llite.*.statahead_*
+
+       $LCTL set_param llite.*.statahead_max=$max
+       $LCTL set_param llite.*.statahead_batch_max=$batch_max
+}
+run_test 123f "Retry mechanism with large wide striping files"
+
 test_124a() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        $LCTL get_param -n mdc.*.connect_flags | grep -q lru_resize ||