From 5edb883b44ac707528ce2c0bc812d65b9ffb4a50 Mon Sep 17 00:00:00 2001 From: Qian Yingjin Date: Fri, 11 Dec 2020 18:38:03 +0800 Subject: [PATCH] LU-14139 ptlrpc: grow reply buffer properly for batch request This patch adds the support to grow the reply buffer for batch PtlRPC request. With this support, statahead sanity-pfl test for test_16b will pass for large LOV stripes with overstriping. Signed-off-by: Qian Yingjin Change-Id: Iaa7eb88b49d6ee068ec1fd9666a8bac2839b5041 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/40945 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin Reviewed-by: Mikhail Pershin Reviewed-by: Andreas Dilger --- lustre/mdt/mdt_batch.c | 15 ++++----- lustre/mdt/mdt_handler.c | 1 - lustre/mdt/mdt_internal.h | 5 --- lustre/mdt/mdt_lib.c | 14 -------- lustre/ptlrpc/layout.c | 82 +++++++++++++++++++++++++++++++++++++++-------- 5 files changed, 75 insertions(+), 42 deletions(-) diff --git a/lustre/mdt/mdt_batch.c b/lustre/mdt/mdt_batch.c index 3eb15a1..cfd6aa1 100644 --- a/lustre/mdt/mdt_batch.c +++ b/lustre/mdt/mdt_batch.c @@ -246,7 +246,6 @@ int mdt_batch(struct tgt_session_info *tsi) reply->burp_magic = BUT_REPLY_MAGIC; packed_replen = sizeof(*reply); - info->mti_max_repsize = buh->buh_reply_size; info->mti_batch_env = 1; info->mti_pill = pill; tsi->tsi_batch_env = true; @@ -323,22 +322,20 @@ int mdt_batch(struct tgt_session_info *tsi) if (rc) GOTO(out, rc); next: - repmsg->lm_result = rc; - mdt_thread_info_reset(info); /* - * TODO: Check whether overflow reply buffer. - * Fix reply, shrink and/or grow reply buffers. + * As @repmsg may be changed if the reply buffer is + * too small to grow, thus it needs to reload it here. */ + repmsg = pill->rc_repmsg; + repmsg->lm_result = rc; + mdt_thread_info_reset(info); + replen = lustre_packed_msg_size(repmsg); - info->mti_max_repsize -= replen; packed_replen += replen; handled_update_count++; } } - /* - * TODO: Grow/shrink the reply buffer. - */ CDEBUG(D_INFO, "reply size %u packed replen %u\n", buh->buh_reply_size, packed_replen); if (buh->buh_reply_size > packed_replen) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index c2acfbb..c1436e9 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -4425,7 +4425,6 @@ void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_env = req->rq_svc_thread->t_env; info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg); info->mti_big_buf = LU_BUF_NULL; - info->mti_max_repsize = 0; info->mti_batch_env = 0; info->mti_object = NULL; diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 8301cbe2..13df6a2 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -467,11 +467,6 @@ struct mdt_thread_info { */ struct req_capsule mti_sub_pill; - /* - * Max left reply buffer size for the batch request. - */ - __u32 mti_max_repsize; - /* although we have export in req, there are cases when it is not * available, e.g. closing files upon export destroy */ struct obd_export *mti_exp; diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index a239410..e74163d 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -834,13 +834,6 @@ int mdt_fix_reply(struct mdt_thread_info *info) CDEBUG(D_INFO, "Enlarge reply buffer, need extra %d bytes\n", md_size - md_packed); - /* FIXME: Grow reply buffer for the batch request. */ - if (info->mti_batch_env) { - body->mbo_valid &= ~(OBD_MD_FLDIREA | OBD_MD_FLEASIZE); - info->mti_big_lmm_used = 0; - GOTO(check_acl, rc); - } - rc = req_capsule_server_grow(pill, &RMF_MDT_MD, md_size); if (rc) { /* we can't answer with proper LOV EA, drop flags, @@ -874,17 +867,10 @@ int mdt_fix_reply(struct mdt_thread_info *info) info->mti_big_lmm_used = 0; } -check_acl: if (info->mti_big_acl_used) { CDEBUG(D_INFO, "Enlarge reply ACL buffer to %d bytes\n", acl_size); - if (info->mti_batch_env) { - body->mbo_valid &= ~OBD_MD_FLACL; - info->mti_big_acl_used = 0; - RETURN(rc); - } - rc = req_capsule_server_grow(pill, &RMF_ACL, acl_size); if (rc) { body->mbo_valid &= ~OBD_MD_FLACL; diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index ff6ff62..d8057b8 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -2680,24 +2680,65 @@ int req_capsule_server_grow(struct req_capsule *pill, const struct req_msg_field *field, __u32 newlen) { - struct ptlrpc_reply_state *rs = pill->rc_req->rq_reply_state, *nrs; - char *from, *to; - int rc; + struct ptlrpc_request *req = pill->rc_req; + struct ptlrpc_reply_state *rs = req->rq_reply_state, *nrs; + char *from, *to, *sptr = NULL; + __u32 slen = 0, snewlen = 0; __u32 offset, len; + int rc; LASSERT(pill->rc_fmt != NULL); LASSERT(__req_format_is_sane(pill->rc_fmt)); LASSERT(req_capsule_has_field(pill, field, RCL_SERVER)); LASSERT(req_capsule_field_present(pill, field, RCL_SERVER)); - len = req_capsule_get_size(pill, field, RCL_SERVER); - offset = __req_capsule_offset(pill, field, RCL_SERVER); + if (req_capsule_subreq(pill)) { + if (!req_capsule_has_field(&req->rq_pill, &RMF_BUT_REPLY, + RCL_SERVER)) + return -EINVAL; + + if (!req_capsule_field_present(&req->rq_pill, &RMF_BUT_REPLY, + RCL_SERVER)) + return -EINVAL; + + len = req_capsule_get_size(&req->rq_pill, &RMF_BUT_REPLY, + RCL_SERVER); + sptr = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY); + slen = req_capsule_get_size(pill, field, RCL_SERVER); + + LASSERT(len >= (char *)pill->rc_repmsg - sptr + + lustre_packed_msg_size(pill->rc_repmsg)); + if (len >= (char *)pill->rc_repmsg - sptr + + lustre_packed_msg_size(pill->rc_repmsg) - slen + + newlen) { + req_capsule_set_size(pill, field, RCL_SERVER, newlen); + offset = __req_capsule_offset(pill, field, RCL_SERVER); + lustre_grow_msg(pill->rc_repmsg, offset, newlen); + return 0; + } + + /* + * Currently just increase the reply buffer by 2 * newlen. + * TODO: Enlarge the reply buffer properly according to the + * left SUB requests in the batch PTLRPC request. + */ + snewlen = newlen; + req_capsule_set_size(pill, field, RCL_SERVER, snewlen); + newlen = len + cfs_size_round(2 * snewlen); + req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER, + newlen); + offset = __req_capsule_offset(&req->rq_pill, &RMF_BUT_REPLY, + RCL_SERVER); + } else { + len = req_capsule_get_size(pill, field, RCL_SERVER); + offset = __req_capsule_offset(pill, field, RCL_SERVER); + req_capsule_set_size(pill, field, RCL_SERVER, newlen); + } CDEBUG(D_INFO, "Reply packed: %d, allocated: %d, field len %d -> %d\n", lustre_packed_msg_size(rs->rs_msg), rs->rs_repbuf_len, len, newlen); - req_capsule_set_size(pill, field, RCL_SERVER, newlen); /** * There can be enough space in current reply buffer, make sure * that rs_repbuf is not a wrapper but real reply msg, otherwise @@ -2706,21 +2747,26 @@ int req_capsule_server_grow(struct req_capsule *pill, if (rs->rs_msg == rs->rs_repbuf && rs->rs_repbuf_len >= lustre_packed_msg_size(rs->rs_msg) - len + newlen) { - pill->rc_req->rq_replen = lustre_grow_msg(rs->rs_msg, offset, - newlen); + req->rq_replen = lustre_grow_msg(rs->rs_msg, offset, newlen); return 0; } /* Re-allocate replay state */ - pill->rc_req->rq_reply_state = NULL; - rc = req_capsule_server_pack(pill); + req->rq_reply_state = NULL; + rc = req_capsule_server_pack(&req->rq_pill); if (rc) { /* put old values back, the caller should decide what to do */ - req_capsule_set_size(pill, field, RCL_SERVER, len); + if (req_capsule_subreq(pill)) { + req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, + RCL_SERVER, len); + req_capsule_set_size(pill, field, RCL_SERVER, slen); + } else { + req_capsule_set_size(pill, field, RCL_SERVER, len); + } pill->rc_req->rq_reply_state = rs; return rc; } - nrs = pill->rc_req->rq_reply_state; + nrs = req->rq_reply_state; LASSERT(lustre_packed_msg_size(nrs->rs_msg) > lustre_packed_msg_size(rs->rs_msg)); @@ -2730,7 +2776,17 @@ int req_capsule_server_grow(struct req_capsule *pill, memcpy(to, from, (char *)rs->rs_msg + lustre_packed_msg_size(rs->rs_msg) - from); lustre_msg_set_buflen(nrs->rs_msg, offset, len); - pill->rc_req->rq_replen = lustre_grow_msg(nrs->rs_msg, offset, newlen); + req->rq_replen = lustre_grow_msg(nrs->rs_msg, offset, newlen); + + if (req_capsule_subreq(pill)) { + char *ptr; + + ptr = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY); + pill->rc_repmsg = (struct lustre_msg *)(ptr + + ((char *)pill->rc_repmsg - sptr)); + offset = __req_capsule_offset(pill, field, RCL_SERVER); + lustre_grow_msg(pill->rc_repmsg, offset, snewlen); + } if (rs->rs_difficult) { /* copy rs data */ -- 1.8.3.1