Whamcloud - gitweb
LU-14139 ptlrpc: grow reply buffer properly for batch request 45/40945/18
authorQian Yingjin <qian@ddn.com>
Fri, 11 Dec 2020 10:38:03 +0000 (18:38 +0800)
committerOleg Drokin <green@whamcloud.com>
Sat, 22 Apr 2023 17:29:15 +0000 (17:29 +0000)
This patch adds the support to grow the reply buffer for batch
PtlRPC request.
With this support, statahead sanity-pfl test for test_16b will
pass for large LOV stripes with overstriping.

Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: Iaa7eb88b49d6ee068ec1fd9666a8bac2839b5041
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/40945
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Mikhail Pershin <mpershin@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/mdt/mdt_batch.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/ptlrpc/layout.c

index 3eb15a1..cfd6aa1 100644 (file)
@@ -246,7 +246,6 @@ int mdt_batch(struct tgt_session_info *tsi)
 
        reply->burp_magic = BUT_REPLY_MAGIC;
        packed_replen = sizeof(*reply);
-       info->mti_max_repsize = buh->buh_reply_size;
        info->mti_batch_env = 1;
        info->mti_pill = pill;
        tsi->tsi_batch_env = true;
@@ -323,22 +322,20 @@ int mdt_batch(struct tgt_session_info *tsi)
                        if (rc)
                                GOTO(out, rc);
 next:
-                       repmsg->lm_result = rc;
-                       mdt_thread_info_reset(info);
                        /*
-                        * TODO: Check whether overflow reply buffer.
-                        * Fix reply, shrink and/or grow reply buffers.
+                        * As @repmsg may be changed if the reply buffer is
+                        * too small to grow, thus it needs to reload it here.
                         */
+                       repmsg = pill->rc_repmsg;
+                       repmsg->lm_result = rc;
+                       mdt_thread_info_reset(info);
+
                        replen = lustre_packed_msg_size(repmsg);
-                       info->mti_max_repsize -= replen;
                        packed_replen += replen;
                        handled_update_count++;
                }
        }
 
-       /*
-        * TODO: Grow/shrink the reply buffer.
-        */
        CDEBUG(D_INFO, "reply size %u packed replen %u\n",
               buh->buh_reply_size, packed_replen);
        if (buh->buh_reply_size > packed_replen)
index c2acfbb..c1436e9 100644 (file)
@@ -4425,7 +4425,6 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
        info->mti_env = req->rq_svc_thread->t_env;
        info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
        info->mti_big_buf = LU_BUF_NULL;
-       info->mti_max_repsize = 0;
        info->mti_batch_env = 0;
        info->mti_object = NULL;
 
index 8301cbe..13df6a2 100644 (file)
@@ -467,11 +467,6 @@ struct mdt_thread_info {
         */
        struct req_capsule          mti_sub_pill;
 
-       /*
-        * Max left reply buffer size for the batch request.
-        */
-       __u32                       mti_max_repsize;
-
        /* although we have export in req, there are cases when it is not
         * available, e.g. closing files upon export destroy */
        struct obd_export          *mti_exp;
index a239410..e74163d 100644 (file)
@@ -834,13 +834,6 @@ int mdt_fix_reply(struct mdt_thread_info *info)
                 CDEBUG(D_INFO, "Enlarge reply buffer, need extra %d bytes\n",
                        md_size - md_packed);
 
-               /* FIXME: Grow reply buffer for the batch request. */
-               if (info->mti_batch_env) {
-                       body->mbo_valid &= ~(OBD_MD_FLDIREA | OBD_MD_FLEASIZE);
-                       info->mti_big_lmm_used = 0;
-                       GOTO(check_acl, rc);
-               }
-
                 rc = req_capsule_server_grow(pill, &RMF_MDT_MD, md_size);
                 if (rc) {
                         /* we can't answer with proper LOV EA, drop flags,
@@ -874,17 +867,10 @@ int mdt_fix_reply(struct mdt_thread_info *info)
                info->mti_big_lmm_used = 0;
        }
 
-check_acl:
        if (info->mti_big_acl_used) {
                CDEBUG(D_INFO, "Enlarge reply ACL buffer to %d bytes\n",
                       acl_size);
 
-               if (info->mti_batch_env) {
-                       body->mbo_valid &= ~OBD_MD_FLACL;
-                       info->mti_big_acl_used = 0;
-                       RETURN(rc);
-               }
-
                rc = req_capsule_server_grow(pill, &RMF_ACL, acl_size);
                if (rc) {
                        body->mbo_valid &= ~OBD_MD_FLACL;
index ff6ff62..d8057b8 100644 (file)
@@ -2680,24 +2680,65 @@ int req_capsule_server_grow(struct req_capsule *pill,
                            const struct req_msg_field *field,
                            __u32 newlen)
 {
-       struct ptlrpc_reply_state *rs = pill->rc_req->rq_reply_state, *nrs;
-       char *from, *to;
-       int rc;
+       struct ptlrpc_request *req = pill->rc_req;
+       struct ptlrpc_reply_state *rs = req->rq_reply_state, *nrs;
+       char *from, *to, *sptr = NULL;
+       __u32 slen = 0, snewlen = 0;
        __u32 offset, len;
+       int rc;
 
        LASSERT(pill->rc_fmt != NULL);
        LASSERT(__req_format_is_sane(pill->rc_fmt));
        LASSERT(req_capsule_has_field(pill, field, RCL_SERVER));
        LASSERT(req_capsule_field_present(pill, field, RCL_SERVER));
 
-       len = req_capsule_get_size(pill, field, RCL_SERVER);
-       offset = __req_capsule_offset(pill, field, RCL_SERVER);
+       if (req_capsule_subreq(pill)) {
+               if (!req_capsule_has_field(&req->rq_pill, &RMF_BUT_REPLY,
+                                          RCL_SERVER))
+                       return -EINVAL;
+
+               if (!req_capsule_field_present(&req->rq_pill, &RMF_BUT_REPLY,
+                                              RCL_SERVER))
+                       return -EINVAL;
+
+               len = req_capsule_get_size(&req->rq_pill, &RMF_BUT_REPLY,
+                                          RCL_SERVER);
+               sptr = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
+               slen = req_capsule_get_size(pill, field, RCL_SERVER);
+
+               LASSERT(len >= (char *)pill->rc_repmsg - sptr +
+                              lustre_packed_msg_size(pill->rc_repmsg));
+               if (len >= (char *)pill->rc_repmsg - sptr +
+                          lustre_packed_msg_size(pill->rc_repmsg) - slen +
+                          newlen) {
+                       req_capsule_set_size(pill, field, RCL_SERVER, newlen);
+                       offset = __req_capsule_offset(pill, field, RCL_SERVER);
+                       lustre_grow_msg(pill->rc_repmsg, offset, newlen);
+                       return 0;
+               }
+
+               /*
+                * Currently just increase the reply buffer by 2 * newlen.
+                * TODO: Enlarge the reply buffer properly according to the
+                * left SUB requests in the batch PTLRPC request.
+                */
+               snewlen = newlen;
+               req_capsule_set_size(pill, field, RCL_SERVER, snewlen);
+               newlen = len + cfs_size_round(2 * snewlen);
+               req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY, RCL_SERVER,
+                                    newlen);
+               offset = __req_capsule_offset(&req->rq_pill, &RMF_BUT_REPLY,
+                                             RCL_SERVER);
+       } else {
+               len = req_capsule_get_size(pill, field, RCL_SERVER);
+               offset = __req_capsule_offset(pill, field, RCL_SERVER);
+               req_capsule_set_size(pill, field, RCL_SERVER, newlen);
+       }
 
        CDEBUG(D_INFO, "Reply packed: %d, allocated: %d, field len %d -> %d\n",
               lustre_packed_msg_size(rs->rs_msg), rs->rs_repbuf_len,
                                      len, newlen);
 
-       req_capsule_set_size(pill, field, RCL_SERVER, newlen);
        /**
         * There can be enough space in current reply buffer, make sure
         * that rs_repbuf is not a wrapper but real reply msg, otherwise
@@ -2706,21 +2747,26 @@ int req_capsule_server_grow(struct req_capsule *pill,
        if (rs->rs_msg == rs->rs_repbuf &&
            rs->rs_repbuf_len >=
            lustre_packed_msg_size(rs->rs_msg) - len + newlen) {
-               pill->rc_req->rq_replen = lustre_grow_msg(rs->rs_msg, offset,
-                                                         newlen);
+               req->rq_replen = lustre_grow_msg(rs->rs_msg, offset, newlen);
                return 0;
        }
 
        /* Re-allocate replay state */
-       pill->rc_req->rq_reply_state = NULL;
-       rc = req_capsule_server_pack(pill);
+       req->rq_reply_state = NULL;
+       rc = req_capsule_server_pack(&req->rq_pill);
        if (rc) {
                /* put old values back, the caller should decide what to do */
-               req_capsule_set_size(pill, field, RCL_SERVER, len);
+               if (req_capsule_subreq(pill)) {
+                       req_capsule_set_size(&req->rq_pill, &RMF_BUT_REPLY,
+                                            RCL_SERVER, len);
+                       req_capsule_set_size(pill, field, RCL_SERVER, slen);
+               } else {
+                       req_capsule_set_size(pill, field, RCL_SERVER, len);
+               }
                pill->rc_req->rq_reply_state = rs;
                return rc;
        }
-       nrs = pill->rc_req->rq_reply_state;
+       nrs = req->rq_reply_state;
        LASSERT(lustre_packed_msg_size(nrs->rs_msg) >
                lustre_packed_msg_size(rs->rs_msg));
 
@@ -2730,7 +2776,17 @@ int req_capsule_server_grow(struct req_capsule *pill,
        memcpy(to, from,
               (char *)rs->rs_msg + lustre_packed_msg_size(rs->rs_msg) - from);
        lustre_msg_set_buflen(nrs->rs_msg, offset, len);
-       pill->rc_req->rq_replen = lustre_grow_msg(nrs->rs_msg, offset, newlen);
+       req->rq_replen = lustre_grow_msg(nrs->rs_msg, offset, newlen);
+
+       if (req_capsule_subreq(pill)) {
+               char *ptr;
+
+               ptr = req_capsule_server_get(&req->rq_pill, &RMF_BUT_REPLY);
+               pill->rc_repmsg = (struct lustre_msg *)(ptr +
+                                 ((char *)pill->rc_repmsg - sptr));
+               offset = __req_capsule_offset(pill, field, RCL_SERVER);
+               lustre_grow_msg(pill->rc_repmsg, offset, snewlen);
+       }
 
         if (rs->rs_difficult) {
                 /* copy rs data */