Whamcloud - gitweb
LU-3534 ptlrpc: mbits is sent within ptlrpc_body 21/15421/6
authorLiang Zhen <liang.zhen@intel.com>
Fri, 26 Jun 2015 09:27:37 +0000 (17:27 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 29 Jun 2015 22:12:02 +0000 (22:12 +0000)
ptlrpc is using rq_xid as matchbits of bulk data, which means it
has to change rq_xid for bulk resend to avoid several bulk data
landing into the same buffer from different resends.

This patch uses one of reserved __u64 of ptlrpc_body to transfer
mbits to peer, matchbits is now separated from xid. With this change,
ptlrpc can keep rq_xid unchanged on resend, it only updates matchbits
for bulk data.

This protocol change is only applied if both sides of connection have
OBD_CONNECT_BULK_MBITS, otherwise, ptlrpc still uses old approach and
update xid while resending bulk.

Signed-off-by: Liang Zhen <liang.zhen@intel.com>
Change-Id: Ide0b3f490368babeb6ac1b4ea6953f8f9aacf81a
Reviewed-on: http://review.whamcloud.com/15421
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
15 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lib.c
lustre/llite/llite_lib.c
lustre/lod/lod_lov.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/obd_mount.c
lustre/obdclass/obd_mount_server.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/wiretest.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index d4fc979..ebd526e 100644 (file)
@@ -1206,8 +1206,11 @@ struct ptlrpc_body_v3 {
        __u64 pb_slv;
        /* VBR: pre-versions */
        __u64 pb_pre_versions[PTLRPC_NUM_VERSIONS];
+       __u64 pb_mbits; /**< match bits for bulk request */
        /* padding for future needs */
-       __u64 pb_padding[4];
+       __u64 pb_padding64_0;
+       __u64 pb_padding64_1;
+       __u64 pb_padding64_2;
        char  pb_jobid[LUSTRE_JOBID_SIZE];
 };
 #define ptlrpc_body     ptlrpc_body_v3
@@ -1234,8 +1237,11 @@ struct ptlrpc_body_v2 {
         __u64 pb_slv;
         /* VBR: pre-versions */
         __u64 pb_pre_versions[PTLRPC_NUM_VERSIONS];
+       __u64 pb_mbits; /**< unused in V2 */
         /* padding for future needs */
-        __u64 pb_padding[4];
+       __u64 pb_padding64_0;
+       __u64 pb_padding64_1;
+       __u64 pb_padding64_2;
 };
 
 extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
@@ -1364,7 +1370,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
 #define OBD_CONNECT_MULTIMODRPCS 0x200000000000000ULL /* support multiple modify
                                                         RPCs in parallel */
 #define OBD_CONNECT_DIR_STRIPE  0x400000000000000ULL /* striped DNE dir */
-
+/** bulk matchbits is sent within ptlrpc_body */
+#define OBD_CONNECT_BULK_MBITS  0x2000000000000000ULL
 /* XXX README XXX:
  * Please DO NOT add flag values here before first ensuring that this same
  * flag value is not in use on some other branch.  Please clear any such
@@ -1409,7 +1416,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                OBD_CONNECT_FLOCK_DEAD | \
                                OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | \
                                OBD_CONNECT_OPEN_BY_FID | \
-                               OBD_CONNECT_DIR_STRIPE)
+                               OBD_CONNECT_DIR_STRIPE | \
+                               OBD_CONNECT_BULK_MBITS)
 
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
@@ -1427,11 +1435,13 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                OBD_CONNECT_JOBSTATS | \
                                OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE|\
                                OBD_CONNECT_LAYOUTLOCK | OBD_CONNECT_FID | \
-                               OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK)
+                               OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK | \
+                               OBD_CONNECT_BULK_MBITS)
 #define ECHO_CONNECT_SUPPORTED (0)
 #define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION | OBD_CONNECT_AT | \
                                OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV | \
-                               OBD_CONNECT_MNE_SWAB | OBD_CONNECT_PINGLESS)
+                               OBD_CONNECT_MNE_SWAB | OBD_CONNECT_PINGLESS |\
+                               OBD_CONNECT_BULK_MBITS)
 
 /* Features required for this version of the client to work with server */
 #define CLIENT_CONNECT_MDT_REQD (OBD_CONNECT_IBITS | OBD_CONNECT_FID | \
index fae4c7e..8c6be2d 100644 (file)
@@ -1024,7 +1024,9 @@ struct ptlrpc_request {
         /** Transaction number */
         __u64 rq_transno;
         /** xid */
-        __u64 rq_xid;
+        __u64                           rq_xid;
+       /** bulk match bits */
+       __u64                            rq_mbits;
        /**
         * List item to for replay list. Not yet commited requests get linked
         * there.
@@ -1444,7 +1446,7 @@ struct ptlrpc_bulk_desc {
        int                    bd_nob;          /* # bytes covered */
        int                    bd_nob_transferred; /* # bytes GOT/PUT */
 
-       __u64                  bd_last_xid;
+       __u64                  bd_last_mbits;
 
        struct ptlrpc_cb_id    bd_cbid;         /* network callback info */
        lnet_nid_t             bd_sender;       /* stash event::sender */
@@ -2352,6 +2354,7 @@ __u32 lustre_msg_get_timeout(struct lustre_msg *msg);
 __u32 lustre_msg_get_service_time(struct lustre_msg *msg);
 char *lustre_msg_get_jobid(struct lustre_msg *msg);
 __u32 lustre_msg_get_cksum(struct lustre_msg *msg);
+__u64 lustre_msg_get_mbits(struct lustre_msg *msg);
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0)
 __u32 lustre_msg_calc_cksum(struct lustre_msg *msg, int compat18);
 #else
@@ -2373,6 +2376,7 @@ void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
 void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
 void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid);
 void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
+void lustre_msg_set_mbits(struct lustre_msg *msg, __u64 mbits);
 
 static inline void
 lustre_shrink_reply(struct ptlrpc_request *req, int segment,
index cb80676..cff32cc 100644 (file)
@@ -2962,6 +2962,13 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
        } else {
                if (req->rq_bulk_read)
                        rc = sptlrpc_svc_wrap_bulk(req, desc);
+
+               if ((exp->exp_connect_data.ocd_connect_flags &
+                    OBD_CONNECT_BULK_MBITS) != 0)
+                       req->rq_mbits = lustre_msg_get_mbits(req->rq_reqmsg);
+               else /* old version, bulk matchbits is rq_xid */
+                       req->rq_mbits = req->rq_xid;
+
                if (rc == 0)
                        rc = ptlrpc_start_bulk_transfer(desc);
        }
index 90176b5..d45e644 100644 (file)
@@ -218,7 +218,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  OBD_CONNECT_FLOCK_DEAD |
                                  OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK |
                                  OBD_CONNECT_OPEN_BY_FID |
-                                 OBD_CONNECT_DIR_STRIPE;
+                                 OBD_CONNECT_DIR_STRIPE |
+                                 OBD_CONNECT_BULK_MBITS;
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
@@ -405,7 +406,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  OBD_CONNECT_EINPROGRESS |
                                  OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
                                  OBD_CONNECT_LAYOUTLOCK |
-                                 OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK;
+                                 OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK |
+                                 OBD_CONNECT_BULK_MBITS;
 
         if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
                 /* OBD_CONNECT_CKSUM should always be set, even if checksums are
index 7054876..63b5018 100644 (file)
@@ -259,7 +259,8 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
                                           OBD_CONNECT_LVB_TYPE |
                                           OBD_CONNECT_VERSION |
                                           OBD_CONNECT_PINGLESS |
-                                          OBD_CONNECT_LFSCK;
+                                          OBD_CONNECT_LFSCK |
+                                          OBD_CONNECT_BULK_MBITS;
 
                data->ocd_group = tgt_index;
                ltd = &lod->lod_ost_descs;
@@ -274,7 +275,8 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
                                           OBD_CONNECT_FID |
                                           OBD_CONNECT_AT |
                                           OBD_CONNECT_FULL20 |
-                                          OBD_CONNECT_LFSCK;
+                                          OBD_CONNECT_LFSCK |
+                                          OBD_CONNECT_BULK_MBITS;
                spin_lock(&imp->imp_lock);
                imp->imp_server_timeout = 1;
                spin_unlock(&imp->imp_lock);
index 3b878f6..4a51b73 100644 (file)
@@ -652,6 +652,7 @@ static const char *obd_connect_names[] = {
        "unlink_close",
        "multi_mod_rpcs",
        "dir_stripe",
+       "bulk_mbits",
        "unknown",
        NULL
 };
index 4f247c3..32f78e5 100644 (file)
@@ -455,7 +455,7 @@ int lustre_start_mgc(struct super_block *sb)
        /* We connect to the MGS at setup, and don't disconnect until cleanup */
        data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_AT |
                                  OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV |
-                                 OBD_CONNECT_LVB_TYPE;
+                                 OBD_CONNECT_LVB_TYPE | OBD_CONNECT_BULK_MBITS;
 
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
        data->ocd_connect_flags |= OBD_CONNECT_MNE_SWAB;
index 6fb292f..b8ee012 100644 (file)
@@ -539,7 +539,8 @@ static int lustre_lwp_connect(struct obd_device *lwp)
        data->ocd_connect_flags |= OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID |
                OBD_CONNECT_AT | OBD_CONNECT_LRU_RESIZE |
                OBD_CONNECT_FULL20 | OBD_CONNECT_LVB_TYPE |
-               OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LFSCK;
+               OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LFSCK |
+               OBD_CONNECT_BULK_MBITS;
        OBD_ALLOC_PTR(uuid);
        if (uuid == NULL)
                GOTO(out, rc = -ENOMEM);
index 18de641..a72e53d 100644 (file)
@@ -1343,15 +1343,6 @@ static int after_reply(struct ptlrpc_request *req)
                spin_unlock(&req->rq_lock);
                req->rq_nr_resend++;
 
-               /* allocate new xid to avoid reply reconstruction */
-               if (!req->rq_bulk) {
-                       /* new xid is already allocated for bulk in
-                        * ptlrpc_check_set() */
-                       req->rq_xid = ptlrpc_next_xid();
-                       DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for "
-                                 "resend on EINPROGRESS");
-               }
-
                /* Readjust the timeout for current conditions */
                ptlrpc_at_set_req_timeout(req);
                /* delay resend to give a chance to the server to get ready.
@@ -1813,20 +1804,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                        spin_lock(&req->rq_lock);
                                        req->rq_resend = 1;
                                        spin_unlock(&req->rq_lock);
-                                        if (req->rq_bulk) {
-                                                __u64 old_xid;
-
-                                                if (!ptlrpc_unregister_bulk(req, 1))
-                                                        continue;
-
-                                                /* ensure previous bulk fails */
-                                                old_xid = req->rq_xid;
-                                                req->rq_xid = ptlrpc_next_xid();
-                                                CDEBUG(D_HA, "resend bulk "
-                                                       "old x"LPU64
-                                                       " new x"LPU64"\n",
-                                                       old_xid, req->rq_xid);
-                                        }
+
+                                       if (req->rq_bulk != NULL &&
+                                           !ptlrpc_unregister_bulk(req, 1))
+                                               continue;
                                 }
                                 /*
                                  * rq_wait_ctx is only touched by ptlrpcd,
@@ -2691,14 +2672,7 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
         req->rq_resend = 1;
         req->rq_net_err = 0;
         req->rq_timedout = 0;
-        if (req->rq_bulk) {
-                __u64 old_xid = req->rq_xid;
 
-                /* ensure previous bulk fails */
-                req->rq_xid = ptlrpc_next_xid();
-                CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
-                       old_xid, req->rq_xid);
-        }
         ptlrpc_client_wake_req(req);
        spin_unlock(&req->rq_lock);
 }
@@ -3100,6 +3074,43 @@ __u64 ptlrpc_next_xid(void)
 }
 
 /**
+ * If request has a new allocated XID (new request or EINPROGRESS resend),
+ * use this XID as matchbits of bulk, otherwise allocate a new matchbits for
+ * request to ensure previous bulk fails and avoid problems with lost replies
+ * and therefore several transfers landing into the same buffer from different
+ * sending attempts.
+ */
+void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
+{
+       struct ptlrpc_bulk_desc *bd = req->rq_bulk;
+
+       LASSERT(bd != NULL);
+
+       if (!req->rq_resend || req->rq_nr_resend != 0) {
+               /* this request has a new xid, just use it as bulk matchbits */
+               req->rq_mbits = req->rq_xid;
+
+       } else { /* needs to generate a new matchbits for resend */
+               __u64   old_mbits = req->rq_mbits;
+
+               if ((bd->bd_import->imp_connect_data.ocd_connect_flags &
+                   OBD_CONNECT_BULK_MBITS) != 0)
+                       req->rq_mbits = ptlrpc_next_xid();
+               else /* old version transfers rq_xid to peer as matchbits */
+                       req->rq_mbits = req->rq_xid = ptlrpc_next_xid();
+
+               CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
+                      old_mbits, req->rq_mbits);
+       }
+
+       /* For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so
+        * that server can infer the number of bulks that were prepared,
+        * see LU-1431 */
+       req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
+                         LNET_MAX_IOV) - 1;
+}
+
+/**
  * Get a glimpse at what next xid value might have been.
  * Returns possible next xid.
  */
index 21c1301..c279ab5 100644 (file)
@@ -154,7 +154,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        struct obd_export        *exp = desc->bd_export;
        struct ptlrpc_connection *conn = exp->exp_connection;
        int                       rc = 0;
-       __u64                     xid;
+       __u64                     mbits;
        int                       posted_md;
        int                       total_md;
        lnet_md_t                 md;
@@ -173,11 +173,11 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        /* NB total length may be 0 for a read past EOF, so we send 0
         * length bulks, since the client expects bulk events.
         *
-        * The client may not need all of the bulk XIDs for the RPC.  The RPC
-        * used the XID of the highest bulk XID needed, and the server masks
+        * The client may not need all of the bulk mbits for the RPC. The RPC
+        * used the mbits of the highest bulk mbits needed, and the server masks
         * off high bits to get bulk count for this RPC. LU-1431 */
-       xid = desc->bd_req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
-       total_md = desc->bd_req->rq_xid - xid + 1;
+       mbits = desc->bd_req->rq_mbits & ~((__u64)desc->bd_md_max_brw - 1);
+       total_md = desc->bd_req->rq_mbits - mbits + 1;
 
        desc->bd_md_count = total_md;
        desc->bd_failure = 0;
@@ -186,7 +186,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        md.eq_handle = ptlrpc_eq_h;
        md.threshold = 2; /* SENT and ACK/REPLY */
 
-       for (posted_md = 0; posted_md < total_md; xid++) {
+       for (posted_md = 0; posted_md < total_md; mbits++) {
                md.options = PTLRPC_MD_OPTIONS;
 
                /* NB it's assumed that source and sink buffer frags are
@@ -217,17 +217,17 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
                if (ptlrpc_is_bulk_put_source(desc->bd_type))
                        rc = LNetPut(conn->c_self, desc->bd_mds[posted_md],
                                     LNET_ACK_REQ, conn->c_peer,
-                                    desc->bd_portal, xid, 0, 0);
+                                    desc->bd_portal, mbits, 0, 0);
                else
                        rc = LNetGet(conn->c_self, desc->bd_mds[posted_md],
-                                    conn->c_peer, desc->bd_portal, xid, 0);
+                                    conn->c_peer, desc->bd_portal, mbits, 0);
 
                posted_md++;
                if (rc != 0) {
                        CERROR("%s: failed bulk transfer with %s:%u x"LPU64": "
                               "rc = %d\n", exp->exp_obd->obd_name,
                               libcfs_id2str(conn->c_peer), desc->bd_portal,
-                              xid, rc);
+                              mbits, rc);
                        break;
                }
        }
@@ -246,9 +246,9 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        }
 
        CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
-              "id %s xid "LPX64"-"LPX64"\n", desc->bd_iov_count,
+              "id %s mbits "LPX64"-"LPX64"\n", desc->bd_iov_count,
               desc->bd_nob, desc->bd_portal, libcfs_id2str(conn->c_peer),
-              xid - posted_md, xid - 1);
+              mbits - posted_md, mbits - 1);
 
        RETURN(0);
 }
@@ -306,7 +306,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        int rc2;
        int posted_md;
        int total_md;
-       __u64 xid;
+       __u64 mbits;
        lnet_handle_me_t  me_h;
        lnet_md_t         md;
        ENTRY;
@@ -335,39 +335,37 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
        LASSERT(desc->bd_cbid.cbid_arg == desc);
 
-       /* An XID is only used for a single request from the client.
-        * For retried bulk transfers, a new XID will be allocated in
-        * in ptlrpc_check_set() if it needs to be resent, so it is not
-        * using the same RDMA match bits after an error.
-        *
-        * For multi-bulk RPCs, rq_xid is the last XID needed for bulks. The
-        * first bulk XID is power-of-two aligned before rq_xid. LU-1431 */
-       xid = req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
+       total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
+       /* rq_mbits is matchbits of the final bulk */
+       mbits = req->rq_mbits - total_md + 1;
+
+       LASSERTF(mbits == (req->rq_mbits & PTLRPC_BULK_OPS_MASK),
+                "first mbits = x"LPU64", last mbits = x"LPU64"\n",
+                mbits, req->rq_mbits);
        LASSERTF(!(desc->bd_registered &&
                   req->rq_send_state != LUSTRE_IMP_REPLAY) ||
-                xid != desc->bd_last_xid,
-                "registered: %d  rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
-                desc->bd_registered, xid, desc->bd_last_xid);
+                mbits != desc->bd_last_mbits,
+                "registered: %d  rq_mbits: "LPU64" bd_last_mbits: "LPU64"\n",
+                desc->bd_registered, mbits, desc->bd_last_mbits);
 
-       total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
        desc->bd_registered = 1;
-       desc->bd_last_xid = xid;
+       desc->bd_last_mbits = mbits;
        desc->bd_md_count = total_md;
        md.user_ptr = &desc->bd_cbid;
        md.eq_handle = ptlrpc_eq_h;
        md.threshold = 1;                       /* PUT or GET */
 
-       for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
+       for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) {
                md.options = PTLRPC_MD_OPTIONS |
                             (ptlrpc_is_bulk_op_get(desc->bd_type) ?
                              LNET_MD_OP_GET : LNET_MD_OP_PUT);
                ptlrpc_fill_bulk_md(&md, desc, posted_md);
 
-               rc = LNetMEAttach(desc->bd_portal, peer, xid, 0,
+               rc = LNetMEAttach(desc->bd_portal, peer, mbits, 0,
                                  LNET_UNLINK, LNET_INS_AFTER, &me_h);
                if (rc != 0) {
                        CERROR("%s: LNetMEAttach failed x"LPU64"/%d: rc = %d\n",
-                              desc->bd_import->imp_obd->obd_name, xid,
+                              desc->bd_import->imp_obd->obd_name, mbits,
                               posted_md, rc);
                        break;
                }
@@ -377,7 +375,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
                                  &desc->bd_mds[posted_md]);
                if (rc != 0) {
                        CERROR("%s: LNetMDAttach failed x"LPU64"/%d: rc = %d\n",
-                              desc->bd_import->imp_obd->obd_name, xid,
+                              desc->bd_import->imp_obd->obd_name, mbits,
                               posted_md, rc);
                        rc2 = LNetMEUnlink(me_h);
                        LASSERT(rc2 == 0);
@@ -396,15 +394,8 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
                RETURN(-ENOMEM);
        }
 
-       /* Set rq_xid to matchbits of the final bulk so that server can
-        * infer the number of bulks that were prepared */
-       req->rq_xid = --xid;
-       LASSERTF(desc->bd_last_xid == (req->rq_xid & PTLRPC_BULK_OPS_MASK),
-                "bd_last_xid = x"LPU64", rq_xid = x"LPU64"\n",
-                desc->bd_last_xid, req->rq_xid);
-
        spin_lock(&desc->bd_lock);
-       /* Holler if peer manages to touch buffers before he knows the xid */
+       /* Holler if peer manages to touch buffers before he knows the mbits */
        if (desc->bd_md_count != total_md)
                CWARN("%s: Peer %s touched %d buffers while I registered\n",
                      desc->bd_import->imp_obd->obd_name, libcfs_id2str(peer),
@@ -412,10 +403,10 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        spin_unlock(&desc->bd_lock);
 
        CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, "
-              "xid x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count,
+              "mbits x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count,
               ptlrpc_is_bulk_op_get(desc->bd_type) ? "get-source" : "put-sink",
               desc->bd_iov_count, desc->bd_nob,
-              desc->bd_last_xid, req->rq_xid, desc->bd_portal);
+              desc->bd_last_mbits, req->rq_mbits, desc->bd_portal);
 
        RETURN(0);
 }
@@ -711,6 +702,19 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
        lustre_msghdr_set_flags(request->rq_reqmsg,
                                imp->imp_msghdr_flags);
 
+       if (request->rq_nr_resend != 0) {
+               /* resend for EINPROGRESS, allocate new xid to avoid reply
+                * reconstruction */
+               request->rq_xid = ptlrpc_next_xid();
+               DEBUG_REQ(D_RPCTRACE, request, "Allocating new xid for "
+                         "resend on EINPROGRESS");
+       }
+
+       if (request->rq_bulk != NULL) {
+               ptlrpc_set_bulk_mbits(request);
+               lustre_msg_set_mbits(request->rq_reqmsg, request->rq_mbits);
+       }
+
        /** For enabled AT all request should have AT_SUPPORT in the
         * FULL import state when OBD_CONNECT_AT is set */
        LASSERT(AT_OFF || imp->imp_state != LUSTRE_IMP_FULL ||
index 76751a8..ccc15d5 100644 (file)
@@ -1275,6 +1275,23 @@ __u32 lustre_msg_get_cksum(struct lustre_msg *msg)
        }
 }
 
+__u64 lustre_msg_get_mbits(struct lustre_msg *msg)
+{
+       switch (msg->lm_magic) {
+       case LUSTRE_MSG_MAGIC_V2: {
+               struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+               if (pb == NULL) {
+                       CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+                       return 0;
+               }
+               return pb->pb_mbits;
+       }
+       default:
+               CERROR("incorrect message magic: %08x\n", msg->lm_magic);
+               return 0;
+       }
+}
+
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0)
 /*
  * In 1.6 and 1.8 the checksum was computed only on struct ptlrpc_body as
@@ -1529,6 +1546,20 @@ void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum)
        }
 }
 
+void lustre_msg_set_mbits(struct lustre_msg *msg, __u64 mbits)
+{
+       switch (msg->lm_magic) {
+       case LUSTRE_MSG_MAGIC_V2: {
+               struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
+               LASSERTF(pb != NULL, "invalid msg %p: no ptlrpc body!\n", msg);
+               pb->pb_mbits = mbits;
+               return;
+       }
+       default:
+               LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+       }
+}
 
 void ptlrpc_request_set_replen(struct ptlrpc_request *req)
 {
@@ -1621,9 +1652,12 @@ void lustre_swab_ptlrpc_body(struct ptlrpc_body *b)
         __swab64s (&b->pb_pre_versions[1]);
         __swab64s (&b->pb_pre_versions[2]);
         __swab64s (&b->pb_pre_versions[3]);
+       __swab64s(&b->pb_mbits);
        CLASSERT(offsetof(typeof(*b), pb_padding0) != 0);
        CLASSERT(offsetof(typeof(*b), pb_padding1) != 0);
-       CLASSERT(offsetof(typeof(*b), pb_padding) != 0);
+       CLASSERT(offsetof(typeof(*b), pb_padding64_0) != 0);
+       CLASSERT(offsetof(typeof(*b), pb_padding64_1) != 0);
+       CLASSERT(offsetof(typeof(*b), pb_padding64_2) != 0);
        /* While we need to maintain compatibility between
         * clients and servers without ptlrpc_body_v2 (< 2.3)
         * do not swab any fields beyond pb_jobid, as we are
index 37d52a2..54a522b 100644 (file)
@@ -88,6 +88,7 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
 int ptlrpc_expired_set(void *data);
 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
 void ptlrpc_resend_req(struct ptlrpc_request *request);
+void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req);
 
 /* events.c */
 int ptlrpc_init_portals(void);
index 6ee0dc4..49b97cc 100644 (file)
@@ -752,10 +752,22 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct ptlrpc_body_v3, pb_pre_versions));
        LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions) == 32, "found %lld\n",
                 (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions));
-       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding) == 120, "found %lld\n",
-                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding));
-       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding) == 32, "found %lld\n",
-                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_mbits) == 120, "found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_mbits));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_0) == 128, "found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_0));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_1) == 136, "found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_1));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_2) == 144, "found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_2));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2));
        CLASSERT(LUSTRE_JOBID_SIZE == 32);
        LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_jobid) == 152, "found %lld\n",
                 (long long)(int)offsetof(struct ptlrpc_body_v3, pb_jobid));
@@ -837,10 +849,22 @@ void lustre_assert_wire_constants(void)
                 (int)offsetof(struct ptlrpc_body_v3, pb_pre_versions), (int)offsetof(struct ptlrpc_body_v2, pb_pre_versions));
        LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_pre_versions), "%d != %d\n",
                 (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_pre_versions));
-       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding) == (int)offsetof(struct ptlrpc_body_v2, pb_padding), "%d != %d\n",
-                (int)offsetof(struct ptlrpc_body_v3, pb_padding), (int)offsetof(struct ptlrpc_body_v2, pb_padding));
-       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding), "%d != %d\n",
-                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_mbits) == (int)offsetof(struct ptlrpc_body_v2, pb_mbits), "%d != %d\n",
+                (int)offsetof(struct ptlrpc_body_v3, pb_mbits), (int)offsetof(struct ptlrpc_body_v2, pb_mbits));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_mbits), "%d != %d\n",
+                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_mbits));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_0) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_0), "%d != %d\n",
+                (int)offsetof(struct ptlrpc_body_v3, pb_padding64_0), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_0));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_0), "%d != %d\n",
+                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_0));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_1) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_1), "%d != %d\n",
+                (int)offsetof(struct ptlrpc_body_v3, pb_padding64_1), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_1));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_1), "%d != %d\n",
+                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_1));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_2) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_2), "%d != %d\n",
+                (int)offsetof(struct ptlrpc_body_v3, pb_padding64_2), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_2));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_2), "%d != %d\n",
+                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_2));
        LASSERTF(MSG_PTLRPC_BODY_OFF == 0, "found %lld\n",
                 (long long)MSG_PTLRPC_BODY_OFF);
        LASSERTF(REQ_REC_OFF == 1, "found %lld\n",
index 243693a..46cbd53 100644 (file)
@@ -355,7 +355,10 @@ check_ptlrpc_body(void)
        CHECK_MEMBER(ptlrpc_body, pb_slv);
        CHECK_CVALUE(PTLRPC_NUM_VERSIONS);
        CHECK_MEMBER(ptlrpc_body, pb_pre_versions);
-       CHECK_MEMBER(ptlrpc_body, pb_padding);
+       CHECK_MEMBER(ptlrpc_body, pb_mbits);
+       CHECK_MEMBER(ptlrpc_body, pb_padding64_0);
+       CHECK_MEMBER(ptlrpc_body, pb_padding64_1);
+       CHECK_MEMBER(ptlrpc_body, pb_padding64_2);
        CHECK_CVALUE(LUSTRE_JOBID_SIZE);
        CHECK_MEMBER(ptlrpc_body, pb_jobid);
 
@@ -378,7 +381,10 @@ check_ptlrpc_body(void)
        CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_limit);
        CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_slv);
        CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_pre_versions);
-       CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_padding);
+       CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_mbits);
+       CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_padding64_0);
+       CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_padding64_1);
+       CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_padding64_2);
 
        CHECK_VALUE(MSG_PTLRPC_BODY_OFF);
        CHECK_VALUE(REQ_REC_OFF);
@@ -521,6 +527,7 @@ check_obd_connect_data(void)
        CHECK_DEFINE_64X(OBD_CONNECT_UNLINK_CLOSE);
        CHECK_DEFINE_64X(OBD_CONNECT_MULTIMODRPCS);
        CHECK_DEFINE_64X(OBD_CONNECT_DIR_STRIPE);
+       CHECK_DEFINE_64X(OBD_CONNECT_BULK_MBITS);
 
        CHECK_VALUE_X(OBD_CKSUM_CRC32);
        CHECK_VALUE_X(OBD_CKSUM_ADLER);
index 2d347c7..435dfc6 100644 (file)
@@ -760,10 +760,22 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct ptlrpc_body_v3, pb_pre_versions));
        LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions) == 32, "found %lld\n",
                 (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions));
-       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding) == 120, "found %lld\n",
-                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding));
-       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding) == 32, "found %lld\n",
-                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_mbits) == 120, "found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_mbits));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_0) == 128, "found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_0));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_1) == 136, "found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_1));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_2) == 144, "found %lld\n",
+                (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_2));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2) == 8, "found %lld\n",
+                (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2));
        CLASSERT(LUSTRE_JOBID_SIZE == 32);
        LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_jobid) == 152, "found %lld\n",
                 (long long)(int)offsetof(struct ptlrpc_body_v3, pb_jobid));
@@ -845,10 +857,23 @@ void lustre_assert_wire_constants(void)
                 (int)offsetof(struct ptlrpc_body_v3, pb_pre_versions), (int)offsetof(struct ptlrpc_body_v2, pb_pre_versions));
        LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_pre_versions), "%d != %d\n",
                 (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_pre_versions));
-       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding) == (int)offsetof(struct ptlrpc_body_v2, pb_padding), "%d != %d\n",
-                (int)offsetof(struct ptlrpc_body_v3, pb_padding), (int)offsetof(struct ptlrpc_body_v2, pb_padding));
-       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding), "%d != %d\n",
-                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_mbits) == (int)offsetof(struct ptlrpc_body_v2, pb_mbits), "%d != %d\n",
+                (int)offsetof(struct ptlrpc_body_v3, pb_mbits), (int)offsetof(struct ptlrpc_body_v2, pb_mbits));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_mbits), "%d != %d\n",
+                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_mbits));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_0) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_0), "%d != %d\n",
+                (int)offsetof(struct ptlrpc_body_v3, pb_padding64_0), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_0));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_0), "%d != %d\n",
+                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_0));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_1) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_1), "%d != %d\n",
+                (int)offsetof(struct ptlrpc_body_v3, pb_padding64_1), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_1));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_1), "%d != %d\n",
+                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_1));
+       LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_2) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_2), "%d != %d\n",
+                (int)offsetof(struct ptlrpc_body_v3, pb_padding64_2), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_2));
+       LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_2), "%d != %d\n",
+                (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_2));
+
        LASSERTF(MSG_PTLRPC_BODY_OFF == 0, "found %lld\n",
                 (long long)MSG_PTLRPC_BODY_OFF);
        LASSERTF(REQ_REC_OFF == 1, "found %lld\n",