From 668f48f87bec3999892ce1daad24b6dba9ae362b Mon Sep 17 00:00:00 2001 From: Qian Yingjin Date: Wed, 16 Feb 2022 22:42:14 -0500 Subject: [PATCH] LU-15550 ptlrpc: retry mechanism for overflowed batched RPCs Before send the batched RPC, the client has no idea about the actual reply buffer size. The reply buffer size prepared by a client may be smalller than the reply buffer buffer size in need. We already have the patch to grow the reply buffer properly in most cases. However, when the reply buffer size is growing larger than BUT_MAXREPSIZE (1000 * 1024), the server will return -EOVERFLOW error code. At this time, the server only executed the partial sub requests in the batched RPC. The overflowed sub requests are not handled. In this patch, it adds a retry mechanism for overflowed batched RPC. When found that the reply buffer overflowed, the client will rebuild the batched RPC for the unhandled sub requests, and use work queue mechanism to resend the new batched RPC to the server to re-execute then again. Add the test case sanity test_123f to verify it for large LOV stripes with overstriping. Signed-off-by: Qian Yingjin Change-Id: If84fad32f2026bd34ffb47b3e163f84a9d950dbb Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/46540 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Mikhail Pershin Reviewed-by: Oleg Drokin --- lustre/ptlrpc/batch.c | 145 +++++++++++++++++++++++++++++++++++++++++++++++-- lustre/tests/sanity.sh | 25 +++++++++ 2 files changed, 164 insertions(+), 6 deletions(-) diff --git a/lustre/ptlrpc/batch.c b/lustre/ptlrpc/batch.c index 6381cc0..59050ac 100644 --- a/lustre/ptlrpc/batch.c +++ b/lustre/ptlrpc/batch.c @@ -45,6 +45,17 @@ #define OUT_UPDATE_REPLY_SIZE 4096 static inline struct lustre_msg * +batch_update_reqmsg_next(struct batch_update_request *bur, + struct lustre_msg *reqmsg) +{ + if (reqmsg) + return (struct lustre_msg *)((char *)reqmsg + + lustre_packed_msg_size(reqmsg)); + else + return &bur->burq_reqmsg[0]; +} + +static inline struct lustre_msg * batch_update_repmsg_next(struct batch_update_reply *bur, struct lustre_msg *repmsg) { @@ -67,6 +78,12 @@ struct batch_update_args { struct batch_update_head *ba_head; }; +struct batch_work_resend { + struct work_struct bwr_work; + struct batch_update_head *bwr_head; + int bwr_index; +}; + /** * Prepare inline update request * @@ -328,6 +345,8 @@ static void batch_update_request_destroy(struct batch_update_head *head) OBD_FREE_PTR(head); } +static void cli_batch_resend_work(struct work_struct *data); + static int batch_update_request_fini(struct batch_update_head *head, struct ptlrpc_request *req, struct batch_update_reply *reply, int rc) @@ -345,8 +364,6 @@ static int batch_update_request_fini(struct batch_update_head *head, list_for_each_entry_safe(ouc, next, &head->buh_cb_list, ouc_item) { int rc1 = 0; - list_del_init(&ouc->ouc_item); - /* * The peer may only have handled some requests (indicated by * @count) in the packaged OUT PRC, we can only get results @@ -369,8 +386,24 @@ static int batch_update_request_fini(struct batch_update_head *head, * TODO: resend the unfinished sub request when the * return code is -EOVERFLOW. */ + if (rc == -EOVERFLOW) { + struct batch_work_resend *work; + + OBD_ALLOC_GFP(work, sizeof(*work), GFP_ATOMIC); + if (work == NULL) { + rc1 = -ENOMEM; + } else { + INIT_WORK(&work->bwr_work, + cli_batch_resend_work); + work->bwr_head = head; + work->bwr_index = index; + schedule_work(&work->bwr_work); + RETURN(0); + } + } } + list_del_init(&ouc->ouc_item); if (ouc->ouc_interpret != NULL) ouc->ouc_interpret(req, repmsg, ouc, rc1); @@ -420,6 +453,7 @@ static int batch_send_update_req(const struct lu_env *env, struct ptlrpc_request *req = NULL; struct batch_update_args *aa; struct lu_batch *bh; + __u32 flags = 0; int rc; ENTRY; @@ -429,6 +463,9 @@ static int batch_send_update_req(const struct lu_env *env, obd = class_exp2obd(head->buh_exp); bh = head->buh_batch; + if (bh) + flags = bh->lbt_flags; + rc = batch_prep_update_req(head, &req); if (rc) { rc = batch_update_request_fini(head, NULL, NULL, rc); @@ -443,16 +480,16 @@ static int batch_send_update_req(const struct lu_env *env, * Only acquire modification RPC slot for the batched RPC * which contains metadata updates. */ - if (!(bh->lbt_flags & BATCH_FL_RDONLY)) + if (!(flags & BATCH_FL_RDONLY)) ptlrpc_get_mod_rpc_slot(req); - if (bh->lbt_flags & BATCH_FL_SYNC) { + if (flags & BATCH_FL_SYNC) { rc = ptlrpc_queue_wait(req); } else { - if ((bh->lbt_flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) == + if ((flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) == BATCH_FL_RDONLY) { ptlrpcd_add_req(req); - } else if (bh->lbt_flags & BATCH_FL_RQSET) { + } else if (flags & BATCH_FL_RQSET) { ptlrpc_set_add_req(bh->lbt_rqset, req); ptlrpc_check_set(env, bh->lbt_rqset); } else { @@ -533,6 +570,102 @@ out: RETURN(rc); } +static void cli_batch_resend_work(struct work_struct *data) +{ + struct batch_work_resend *work = container_of(data, + struct batch_work_resend, bwr_work); + struct batch_update_head *obuh = work->bwr_head; + struct object_update_callback *ouc; + struct batch_update_head *head; + struct batch_update_buffer *buf; + struct batch_update_buffer *tmp; + int index = work->bwr_index; + int rc = 0; + int i = 0; + + ENTRY; + + head = batch_update_request_create(obuh->buh_exp, NULL); + if (head == NULL) + GOTO(err_up, rc = -ENOMEM); + + list_for_each_entry_safe(buf, tmp, &obuh->buh_buf_list, bub_item) { + struct batch_update_request *bur = buf->bub_req; + struct batch_update_buffer *newbuf; + struct lustre_msg *reqmsg = NULL; + size_t max_len; + int j; + + if (i + bur->burq_count < index) { + i += bur->burq_count; + continue; + } + + /* reused the allocated buffer */ + if (i >= index) { + list_move_tail(&buf->bub_item, &head->buh_buf_list); + head->buh_update_count += buf->bub_req->burq_count; + head->buh_buf_count++; + continue; + } + + for (j = 0; j < bur->burq_count; j++) { + struct lustre_msg *newmsg; + __u32 msgsz; + + reqmsg = batch_update_reqmsg_next(bur, reqmsg); + if (i + j < index) + continue; +repeat: + newbuf = current_batch_update_buffer(head); + LASSERT(newbuf != NULL); + max_len = newbuf->bub_size - newbuf->bub_end; + newmsg = (struct lustre_msg *)((char *)newbuf->bub_req + + newbuf->bub_end); + msgsz = lustre_packed_msg_size(reqmsg); + if (msgsz >= max_len) { + int rc2; + + /* Create new batch update buffer */ + rc2 = batch_update_buffer_create(head, msgsz + + offsetof(struct batch_update_request, + burq_reqmsg[0]) + 1); + if (rc2 != 0) + GOTO(err_up, rc = rc2); + GOTO(repeat, rc); + } + + memcpy(newmsg, reqmsg, msgsz); + newbuf->bub_end += msgsz; + newbuf->bub_req->burq_count++; + head->buh_update_count++; + } + + i = index; + } + + list_splice_init(&obuh->buh_cb_list, &head->buh_cb_list); + list_for_each_entry(ouc, &head->buh_cb_list, ouc_item) + ouc->ouc_head = head; + + head->buh_repsize = BUT_MAXREPSIZE - SPTLRPC_MAX_PAYLOAD; + rc = batch_send_update_req(NULL, head); + if (rc) + GOTO(err_up, rc); + + batch_update_request_destroy(obuh); + OBD_FREE_PTR(work); + RETURN_EXIT; + +err_up: + batch_update_request_fini(obuh, NULL, NULL, rc); + if (head != NULL) + batch_update_request_fini(head, NULL, NULL, rc); + + OBD_FREE_PTR(work); + RETURN_EXIT; +} + struct lu_batch *cli_batch_create(struct obd_export *exp, enum lu_batch_flags flags, __u32 max_count) { diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index c58379a..c91cc3f 100755 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -13869,6 +13869,31 @@ test_123e() { } run_test 123e "statahead with large wide striping" +test_123f() { + local max + local batch_max + local dir=$DIR/$tdir + + mkdir $dir || error "mkdir $dir failed" + $LFS setstripe -C 1000 $dir || error "setstripe $dir failed" + + touch $dir/$tfile.{0..200} || error "touch 200 files failed" + + max=$($LCTL get_param -n llite.*.statahead_max | head -n 1) + batch_max=$($LCTL get_param -n llite.*.statahead_batch_max | head -n 1) + + $LCTL set_param llite.*.statahead_max=64 + $LCTL set_param llite.*.statahead_batch_max=64 + + ls -l $dir + lctl get_param mdc.*.batch_stats + lctl get_param llite.*.statahead_* + + $LCTL set_param llite.*.statahead_max=$max + $LCTL set_param llite.*.statahead_batch_max=$batch_max +} +run_test 123f "Retry mechanism with large wide striping files" + test_124a() { [ $PARALLEL == "yes" ] && skip "skip parallel run" $LCTL get_param -n mdc.*.connect_flags | grep -q lru_resize || -- 1.8.3.1