From d099fdd6cd15d0d00d9b573da5d3bfd3e4bbcb9d Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Fri, 26 Jun 2015 17:27:37 +0800 Subject: [PATCH] LU-3534 ptlrpc: mbits is sent within ptlrpc_body ptlrpc is using rq_xid as matchbits of bulk data, which means it has to change rq_xid for bulk resend to avoid several bulk data landing into the same buffer from different resends. This patch uses one of reserved __u64 of ptlrpc_body to transfer mbits to peer, matchbits is now separated from xid. With this change, ptlrpc can keep rq_xid unchanged on resend, it only updates matchbits for bulk data. This protocol change is only applied if both sides of connection have OBD_CONNECT_BULK_MBITS, otherwise, ptlrpc still uses old approach and update xid while resending bulk. Signed-off-by: Liang Zhen Change-Id: Ide0b3f490368babeb6ac1b4ea6953f8f9aacf81a Reviewed-on: http://review.whamcloud.com/15421 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_idl.h | 22 +++++++--- lustre/include/lustre_net.h | 8 +++- lustre/ldlm/ldlm_lib.c | 7 ++++ lustre/llite/llite_lib.c | 6 ++- lustre/lod/lod_lov.c | 6 ++- lustre/obdclass/lprocfs_status.c | 1 + lustre/obdclass/obd_mount.c | 2 +- lustre/obdclass/obd_mount_server.c | 3 +- lustre/ptlrpc/client.c | 71 +++++++++++++++++++-------------- lustre/ptlrpc/niobuf.c | 82 ++++++++++++++++++++------------------ lustre/ptlrpc/pack_generic.c | 36 ++++++++++++++++- lustre/ptlrpc/ptlrpc_internal.h | 1 + lustre/ptlrpc/wiretest.c | 40 +++++++++++++++---- lustre/utils/wirecheck.c | 11 ++++- lustre/utils/wiretest.c | 41 +++++++++++++++---- 15 files changed, 235 insertions(+), 102 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index d4fc979..ebd526e 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1206,8 +1206,11 @@ struct ptlrpc_body_v3 { __u64 pb_slv; /* VBR: pre-versions */ __u64 pb_pre_versions[PTLRPC_NUM_VERSIONS]; + __u64 pb_mbits; /**< match bits for bulk request */ /* padding for future needs */ - __u64 pb_padding[4]; + __u64 pb_padding64_0; + __u64 pb_padding64_1; + __u64 pb_padding64_2; char pb_jobid[LUSTRE_JOBID_SIZE]; }; #define ptlrpc_body ptlrpc_body_v3 @@ -1234,8 +1237,11 @@ struct ptlrpc_body_v2 { __u64 pb_slv; /* VBR: pre-versions */ __u64 pb_pre_versions[PTLRPC_NUM_VERSIONS]; + __u64 pb_mbits; /**< unused in V2 */ /* padding for future needs */ - __u64 pb_padding[4]; + __u64 pb_padding64_0; + __u64 pb_padding64_1; + __u64 pb_padding64_2; }; extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); @@ -1364,7 +1370,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define OBD_CONNECT_MULTIMODRPCS 0x200000000000000ULL /* support multiple modify RPCs in parallel */ #define OBD_CONNECT_DIR_STRIPE 0x400000000000000ULL /* striped DNE dir */ - +/** bulk matchbits is sent within ptlrpc_body */ +#define OBD_CONNECT_BULK_MBITS 0x2000000000000000ULL /* XXX README XXX: * Please DO NOT add flag values here before first ensuring that this same * flag value is not in use on some other branch. Please clear any such @@ -1409,7 +1416,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_FLOCK_DEAD | \ OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | \ OBD_CONNECT_OPEN_BY_FID | \ - OBD_CONNECT_DIR_STRIPE) + OBD_CONNECT_DIR_STRIPE | \ + OBD_CONNECT_BULK_MBITS) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ @@ -1427,11 +1435,13 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_JOBSTATS | \ OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE|\ OBD_CONNECT_LAYOUTLOCK | OBD_CONNECT_FID | \ - OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK) + OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK | \ + OBD_CONNECT_BULK_MBITS) #define ECHO_CONNECT_SUPPORTED (0) #define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT | \ OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV | \ - OBD_CONNECT_MNE_SWAB | OBD_CONNECT_PINGLESS) + OBD_CONNECT_MNE_SWAB | OBD_CONNECT_PINGLESS |\ + OBD_CONNECT_BULK_MBITS) /* Features required for this version of the client to work with server */ #define CLIENT_CONNECT_MDT_REQD (OBD_CONNECT_IBITS | OBD_CONNECT_FID | \ diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index fae4c7e..8c6be2d 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -1024,7 +1024,9 @@ struct ptlrpc_request { /** Transaction number */ __u64 rq_transno; /** xid */ - __u64 rq_xid; + __u64 rq_xid; + /** bulk match bits */ + __u64 rq_mbits; /** * List item to for replay list. Not yet commited requests get linked * there. @@ -1444,7 +1446,7 @@ struct ptlrpc_bulk_desc { int bd_nob; /* # bytes covered */ int bd_nob_transferred; /* # bytes GOT/PUT */ - __u64 bd_last_xid; + __u64 bd_last_mbits; struct ptlrpc_cb_id bd_cbid; /* network callback info */ lnet_nid_t bd_sender; /* stash event::sender */ @@ -2352,6 +2354,7 @@ __u32 lustre_msg_get_timeout(struct lustre_msg *msg); __u32 lustre_msg_get_service_time(struct lustre_msg *msg); char *lustre_msg_get_jobid(struct lustre_msg *msg); __u32 lustre_msg_get_cksum(struct lustre_msg *msg); +__u64 lustre_msg_get_mbits(struct lustre_msg *msg); #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0) __u32 lustre_msg_calc_cksum(struct lustre_msg *msg, int compat18); #else @@ -2373,6 +2376,7 @@ void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout); void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time); void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid); void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum); +void lustre_msg_set_mbits(struct lustre_msg *msg, __u64 mbits); static inline void lustre_shrink_reply(struct ptlrpc_request *req, int segment, diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index cb80676..cff32cc80 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -2962,6 +2962,13 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc, } else { if (req->rq_bulk_read) rc = sptlrpc_svc_wrap_bulk(req, desc); + + if ((exp->exp_connect_data.ocd_connect_flags & + OBD_CONNECT_BULK_MBITS) != 0) + req->rq_mbits = lustre_msg_get_mbits(req->rq_reqmsg); + else /* old version, bulk matchbits is rq_xid */ + req->rq_mbits = req->rq_xid; + if (rc == 0) rc = ptlrpc_start_bulk_transfer(desc); } diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 90176b5..d45e644 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -218,7 +218,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_FLOCK_DEAD | OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | OBD_CONNECT_OPEN_BY_FID | - OBD_CONNECT_DIR_STRIPE; + OBD_CONNECT_DIR_STRIPE | + OBD_CONNECT_BULK_MBITS; #ifdef HAVE_LRU_RESIZE_SUPPORT if (sbi->ll_flags & LL_SBI_LRU_RESIZE) @@ -405,7 +406,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_EINPROGRESS | OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE | OBD_CONNECT_LAYOUTLOCK | - OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK; + OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK | + OBD_CONNECT_BULK_MBITS; if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) { /* OBD_CONNECT_CKSUM should always be set, even if checksums are diff --git a/lustre/lod/lod_lov.c b/lustre/lod/lod_lov.c index 7054876..63b5018 100644 --- a/lustre/lod/lod_lov.c +++ b/lustre/lod/lod_lov.c @@ -259,7 +259,8 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod, OBD_CONNECT_LVB_TYPE | OBD_CONNECT_VERSION | OBD_CONNECT_PINGLESS | - OBD_CONNECT_LFSCK; + OBD_CONNECT_LFSCK | + OBD_CONNECT_BULK_MBITS; data->ocd_group = tgt_index; ltd = &lod->lod_ost_descs; @@ -274,7 +275,8 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod, OBD_CONNECT_FID | OBD_CONNECT_AT | OBD_CONNECT_FULL20 | - OBD_CONNECT_LFSCK; + OBD_CONNECT_LFSCK | + OBD_CONNECT_BULK_MBITS; spin_lock(&imp->imp_lock); imp->imp_server_timeout = 1; spin_unlock(&imp->imp_lock); diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 3b878f6..4a51b73 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -652,6 +652,7 @@ static const char *obd_connect_names[] = { "unlink_close", "multi_mod_rpcs", "dir_stripe", + "bulk_mbits", "unknown", NULL }; diff --git a/lustre/obdclass/obd_mount.c b/lustre/obdclass/obd_mount.c index 4f247c3..32f78e5 100644 --- a/lustre/obdclass/obd_mount.c +++ b/lustre/obdclass/obd_mount.c @@ -455,7 +455,7 @@ int lustre_start_mgc(struct super_block *sb) /* We connect to the MGS at setup, and don't disconnect until cleanup */ data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_AT | OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV | - OBD_CONNECT_LVB_TYPE; + OBD_CONNECT_LVB_TYPE | OBD_CONNECT_BULK_MBITS; #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0) data->ocd_connect_flags |= OBD_CONNECT_MNE_SWAB; diff --git a/lustre/obdclass/obd_mount_server.c b/lustre/obdclass/obd_mount_server.c index 6fb292f..b8ee012 100644 --- a/lustre/obdclass/obd_mount_server.c +++ b/lustre/obdclass/obd_mount_server.c @@ -539,7 +539,8 @@ static int lustre_lwp_connect(struct obd_device *lwp) data->ocd_connect_flags |= OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID | OBD_CONNECT_AT | OBD_CONNECT_LRU_RESIZE | OBD_CONNECT_FULL20 | OBD_CONNECT_LVB_TYPE | - OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LFSCK; + OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LFSCK | + OBD_CONNECT_BULK_MBITS; OBD_ALLOC_PTR(uuid); if (uuid == NULL) GOTO(out, rc = -ENOMEM); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 18de641..a72e53d 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -1343,15 +1343,6 @@ static int after_reply(struct ptlrpc_request *req) spin_unlock(&req->rq_lock); req->rq_nr_resend++; - /* allocate new xid to avoid reply reconstruction */ - if (!req->rq_bulk) { - /* new xid is already allocated for bulk in - * ptlrpc_check_set() */ - req->rq_xid = ptlrpc_next_xid(); - DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for " - "resend on EINPROGRESS"); - } - /* Readjust the timeout for current conditions */ ptlrpc_at_set_req_timeout(req); /* delay resend to give a chance to the server to get ready. @@ -1813,20 +1804,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); - if (req->rq_bulk) { - __u64 old_xid; - - if (!ptlrpc_unregister_bulk(req, 1)) - continue; - - /* ensure previous bulk fails */ - old_xid = req->rq_xid; - req->rq_xid = ptlrpc_next_xid(); - CDEBUG(D_HA, "resend bulk " - "old x"LPU64 - " new x"LPU64"\n", - old_xid, req->rq_xid); - } + + if (req->rq_bulk != NULL && + !ptlrpc_unregister_bulk(req, 1)) + continue; } /* * rq_wait_ctx is only touched by ptlrpcd, @@ -2691,14 +2672,7 @@ void ptlrpc_resend_req(struct ptlrpc_request *req) req->rq_resend = 1; req->rq_net_err = 0; req->rq_timedout = 0; - if (req->rq_bulk) { - __u64 old_xid = req->rq_xid; - /* ensure previous bulk fails */ - req->rq_xid = ptlrpc_next_xid(); - CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n", - old_xid, req->rq_xid); - } ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); } @@ -3100,6 +3074,43 @@ __u64 ptlrpc_next_xid(void) } /** + * If request has a new allocated XID (new request or EINPROGRESS resend), + * use this XID as matchbits of bulk, otherwise allocate a new matchbits for + * request to ensure previous bulk fails and avoid problems with lost replies + * and therefore several transfers landing into the same buffer from different + * sending attempts. + */ +void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) +{ + struct ptlrpc_bulk_desc *bd = req->rq_bulk; + + LASSERT(bd != NULL); + + if (!req->rq_resend || req->rq_nr_resend != 0) { + /* this request has a new xid, just use it as bulk matchbits */ + req->rq_mbits = req->rq_xid; + + } else { /* needs to generate a new matchbits for resend */ + __u64 old_mbits = req->rq_mbits; + + if ((bd->bd_import->imp_connect_data.ocd_connect_flags & + OBD_CONNECT_BULK_MBITS) != 0) + req->rq_mbits = ptlrpc_next_xid(); + else /* old version transfers rq_xid to peer as matchbits */ + req->rq_mbits = req->rq_xid = ptlrpc_next_xid(); + + CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n", + old_mbits, req->rq_mbits); + } + + /* For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so + * that server can infer the number of bulks that were prepared, + * see LU-1431 */ + req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / + LNET_MAX_IOV) - 1; +} + +/** * Get a glimpse at what next xid value might have been. * Returns possible next xid. */ diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 21c1301..c279ab5 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -154,7 +154,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) struct obd_export *exp = desc->bd_export; struct ptlrpc_connection *conn = exp->exp_connection; int rc = 0; - __u64 xid; + __u64 mbits; int posted_md; int total_md; lnet_md_t md; @@ -173,11 +173,11 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) /* NB total length may be 0 for a read past EOF, so we send 0 * length bulks, since the client expects bulk events. * - * The client may not need all of the bulk XIDs for the RPC. The RPC - * used the XID of the highest bulk XID needed, and the server masks + * The client may not need all of the bulk mbits for the RPC. The RPC + * used the mbits of the highest bulk mbits needed, and the server masks * off high bits to get bulk count for this RPC. LU-1431 */ - xid = desc->bd_req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1); - total_md = desc->bd_req->rq_xid - xid + 1; + mbits = desc->bd_req->rq_mbits & ~((__u64)desc->bd_md_max_brw - 1); + total_md = desc->bd_req->rq_mbits - mbits + 1; desc->bd_md_count = total_md; desc->bd_failure = 0; @@ -186,7 +186,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) md.eq_handle = ptlrpc_eq_h; md.threshold = 2; /* SENT and ACK/REPLY */ - for (posted_md = 0; posted_md < total_md; xid++) { + for (posted_md = 0; posted_md < total_md; mbits++) { md.options = PTLRPC_MD_OPTIONS; /* NB it's assumed that source and sink buffer frags are @@ -217,17 +217,17 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) if (ptlrpc_is_bulk_put_source(desc->bd_type)) rc = LNetPut(conn->c_self, desc->bd_mds[posted_md], LNET_ACK_REQ, conn->c_peer, - desc->bd_portal, xid, 0, 0); + desc->bd_portal, mbits, 0, 0); else rc = LNetGet(conn->c_self, desc->bd_mds[posted_md], - conn->c_peer, desc->bd_portal, xid, 0); + conn->c_peer, desc->bd_portal, mbits, 0); posted_md++; if (rc != 0) { CERROR("%s: failed bulk transfer with %s:%u x"LPU64": " "rc = %d\n", exp->exp_obd->obd_name, libcfs_id2str(conn->c_peer), desc->bd_portal, - xid, rc); + mbits, rc); break; } } @@ -246,9 +246,9 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) } CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d " - "id %s xid "LPX64"-"LPX64"\n", desc->bd_iov_count, + "id %s mbits "LPX64"-"LPX64"\n", desc->bd_iov_count, desc->bd_nob, desc->bd_portal, libcfs_id2str(conn->c_peer), - xid - posted_md, xid - 1); + mbits - posted_md, mbits - 1); RETURN(0); } @@ -306,7 +306,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) int rc2; int posted_md; int total_md; - __u64 xid; + __u64 mbits; lnet_handle_me_t me_h; lnet_md_t md; ENTRY; @@ -335,39 +335,37 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback); LASSERT(desc->bd_cbid.cbid_arg == desc); - /* An XID is only used for a single request from the client. - * For retried bulk transfers, a new XID will be allocated in - * in ptlrpc_check_set() if it needs to be resent, so it is not - * using the same RDMA match bits after an error. - * - * For multi-bulk RPCs, rq_xid is the last XID needed for bulks. The - * first bulk XID is power-of-two aligned before rq_xid. LU-1431 */ - xid = req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1); + total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV; + /* rq_mbits is matchbits of the final bulk */ + mbits = req->rq_mbits - total_md + 1; + + LASSERTF(mbits == (req->rq_mbits & PTLRPC_BULK_OPS_MASK), + "first mbits = x"LPU64", last mbits = x"LPU64"\n", + mbits, req->rq_mbits); LASSERTF(!(desc->bd_registered && req->rq_send_state != LUSTRE_IMP_REPLAY) || - xid != desc->bd_last_xid, - "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n", - desc->bd_registered, xid, desc->bd_last_xid); + mbits != desc->bd_last_mbits, + "registered: %d rq_mbits: "LPU64" bd_last_mbits: "LPU64"\n", + desc->bd_registered, mbits, desc->bd_last_mbits); - total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV; desc->bd_registered = 1; - desc->bd_last_xid = xid; + desc->bd_last_mbits = mbits; desc->bd_md_count = total_md; md.user_ptr = &desc->bd_cbid; md.eq_handle = ptlrpc_eq_h; md.threshold = 1; /* PUT or GET */ - for (posted_md = 0; posted_md < total_md; posted_md++, xid++) { + for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) { md.options = PTLRPC_MD_OPTIONS | (ptlrpc_is_bulk_op_get(desc->bd_type) ? LNET_MD_OP_GET : LNET_MD_OP_PUT); ptlrpc_fill_bulk_md(&md, desc, posted_md); - rc = LNetMEAttach(desc->bd_portal, peer, xid, 0, + rc = LNetMEAttach(desc->bd_portal, peer, mbits, 0, LNET_UNLINK, LNET_INS_AFTER, &me_h); if (rc != 0) { CERROR("%s: LNetMEAttach failed x"LPU64"/%d: rc = %d\n", - desc->bd_import->imp_obd->obd_name, xid, + desc->bd_import->imp_obd->obd_name, mbits, posted_md, rc); break; } @@ -377,7 +375,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) &desc->bd_mds[posted_md]); if (rc != 0) { CERROR("%s: LNetMDAttach failed x"LPU64"/%d: rc = %d\n", - desc->bd_import->imp_obd->obd_name, xid, + desc->bd_import->imp_obd->obd_name, mbits, posted_md, rc); rc2 = LNetMEUnlink(me_h); LASSERT(rc2 == 0); @@ -396,15 +394,8 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) RETURN(-ENOMEM); } - /* Set rq_xid to matchbits of the final bulk so that server can - * infer the number of bulks that were prepared */ - req->rq_xid = --xid; - LASSERTF(desc->bd_last_xid == (req->rq_xid & PTLRPC_BULK_OPS_MASK), - "bd_last_xid = x"LPU64", rq_xid = x"LPU64"\n", - desc->bd_last_xid, req->rq_xid); - spin_lock(&desc->bd_lock); - /* Holler if peer manages to touch buffers before he knows the xid */ + /* Holler if peer manages to touch buffers before he knows the mbits */ if (desc->bd_md_count != total_md) CWARN("%s: Peer %s touched %d buffers while I registered\n", desc->bd_import->imp_obd->obd_name, libcfs_id2str(peer), @@ -412,10 +403,10 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) spin_unlock(&desc->bd_lock); CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, " - "xid x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count, + "mbits x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count, ptlrpc_is_bulk_op_get(desc->bd_type) ? "get-source" : "put-sink", desc->bd_iov_count, desc->bd_nob, - desc->bd_last_xid, req->rq_xid, desc->bd_portal); + desc->bd_last_mbits, req->rq_mbits, desc->bd_portal); RETURN(0); } @@ -711,6 +702,19 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) lustre_msghdr_set_flags(request->rq_reqmsg, imp->imp_msghdr_flags); + if (request->rq_nr_resend != 0) { + /* resend for EINPROGRESS, allocate new xid to avoid reply + * reconstruction */ + request->rq_xid = ptlrpc_next_xid(); + DEBUG_REQ(D_RPCTRACE, request, "Allocating new xid for " + "resend on EINPROGRESS"); + } + + if (request->rq_bulk != NULL) { + ptlrpc_set_bulk_mbits(request); + lustre_msg_set_mbits(request->rq_reqmsg, request->rq_mbits); + } + /** For enabled AT all request should have AT_SUPPORT in the * FULL import state when OBD_CONNECT_AT is set */ LASSERT(AT_OFF || imp->imp_state != LUSTRE_IMP_FULL || diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 76751a8..ccc15d5 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1275,6 +1275,23 @@ __u32 lustre_msg_get_cksum(struct lustre_msg *msg) } } +__u64 lustre_msg_get_mbits(struct lustre_msg *msg) +{ + switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V2: { + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (pb == NULL) { + CERROR("invalid msg %p: no ptlrpc body!\n", msg); + return 0; + } + return pb->pb_mbits; + } + default: + CERROR("incorrect message magic: %08x\n", msg->lm_magic); + return 0; + } +} + #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0) /* * In 1.6 and 1.8 the checksum was computed only on struct ptlrpc_body as @@ -1529,6 +1546,20 @@ void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum) } } +void lustre_msg_set_mbits(struct lustre_msg *msg, __u64 mbits) +{ + switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V2: { + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + + LASSERTF(pb != NULL, "invalid msg %p: no ptlrpc body!\n", msg); + pb->pb_mbits = mbits; + return; + } + default: + LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic); + } +} void ptlrpc_request_set_replen(struct ptlrpc_request *req) { @@ -1621,9 +1652,12 @@ void lustre_swab_ptlrpc_body(struct ptlrpc_body *b) __swab64s (&b->pb_pre_versions[1]); __swab64s (&b->pb_pre_versions[2]); __swab64s (&b->pb_pre_versions[3]); + __swab64s(&b->pb_mbits); CLASSERT(offsetof(typeof(*b), pb_padding0) != 0); CLASSERT(offsetof(typeof(*b), pb_padding1) != 0); - CLASSERT(offsetof(typeof(*b), pb_padding) != 0); + CLASSERT(offsetof(typeof(*b), pb_padding64_0) != 0); + CLASSERT(offsetof(typeof(*b), pb_padding64_1) != 0); + CLASSERT(offsetof(typeof(*b), pb_padding64_2) != 0); /* While we need to maintain compatibility between * clients and servers without ptlrpc_body_v2 (< 2.3) * do not swab any fields beyond pb_jobid, as we are diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 37d52a2..54a522b 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -88,6 +88,7 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, int ptlrpc_expired_set(void *data); int ptlrpc_set_next_timeout(struct ptlrpc_request_set *); void ptlrpc_resend_req(struct ptlrpc_request *request); +void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req); /* events.c */ int ptlrpc_init_portals(void); diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 6ee0dc4..49b97cc 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -752,10 +752,22 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ptlrpc_body_v3, pb_pre_versions)); LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions) == 32, "found %lld\n", (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions)); - LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding) == 120, "found %lld\n", - (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding)); - LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding) == 32, "found %lld\n", - (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_mbits) == 120, "found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body_v3, pb_mbits)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_0) == 128, "found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_0)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_1) == 136, "found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_1)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_2) == 144, "found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_2)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2)); CLASSERT(LUSTRE_JOBID_SIZE == 32); LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_jobid) == 152, "found %lld\n", (long long)(int)offsetof(struct ptlrpc_body_v3, pb_jobid)); @@ -837,10 +849,22 @@ void lustre_assert_wire_constants(void) (int)offsetof(struct ptlrpc_body_v3, pb_pre_versions), (int)offsetof(struct ptlrpc_body_v2, pb_pre_versions)); LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_pre_versions), "%d != %d\n", (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_pre_versions)); - LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding) == (int)offsetof(struct ptlrpc_body_v2, pb_padding), "%d != %d\n", - (int)offsetof(struct ptlrpc_body_v3, pb_padding), (int)offsetof(struct ptlrpc_body_v2, pb_padding)); - LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding), "%d != %d\n", - (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_mbits) == (int)offsetof(struct ptlrpc_body_v2, pb_mbits), "%d != %d\n", + (int)offsetof(struct ptlrpc_body_v3, pb_mbits), (int)offsetof(struct ptlrpc_body_v2, pb_mbits)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_mbits), "%d != %d\n", + (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_mbits)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_0) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_0), "%d != %d\n", + (int)offsetof(struct ptlrpc_body_v3, pb_padding64_0), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_0)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_0), "%d != %d\n", + (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_0)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_1) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_1), "%d != %d\n", + (int)offsetof(struct ptlrpc_body_v3, pb_padding64_1), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_1)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_1), "%d != %d\n", + (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_1)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_2) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_2), "%d != %d\n", + (int)offsetof(struct ptlrpc_body_v3, pb_padding64_2), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_2)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_2), "%d != %d\n", + (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_2)); LASSERTF(MSG_PTLRPC_BODY_OFF == 0, "found %lld\n", (long long)MSG_PTLRPC_BODY_OFF); LASSERTF(REQ_REC_OFF == 1, "found %lld\n", diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 243693a..46cbd53 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -355,7 +355,10 @@ check_ptlrpc_body(void) CHECK_MEMBER(ptlrpc_body, pb_slv); CHECK_CVALUE(PTLRPC_NUM_VERSIONS); CHECK_MEMBER(ptlrpc_body, pb_pre_versions); - CHECK_MEMBER(ptlrpc_body, pb_padding); + CHECK_MEMBER(ptlrpc_body, pb_mbits); + CHECK_MEMBER(ptlrpc_body, pb_padding64_0); + CHECK_MEMBER(ptlrpc_body, pb_padding64_1); + CHECK_MEMBER(ptlrpc_body, pb_padding64_2); CHECK_CVALUE(LUSTRE_JOBID_SIZE); CHECK_MEMBER(ptlrpc_body, pb_jobid); @@ -378,7 +381,10 @@ check_ptlrpc_body(void) CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_limit); CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_slv); CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_pre_versions); - CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_padding); + CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_mbits); + CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_padding64_0); + CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_padding64_1); + CHECK_MEMBER_SAME(ptlrpc_body_v3, ptlrpc_body_v2, pb_padding64_2); CHECK_VALUE(MSG_PTLRPC_BODY_OFF); CHECK_VALUE(REQ_REC_OFF); @@ -521,6 +527,7 @@ check_obd_connect_data(void) CHECK_DEFINE_64X(OBD_CONNECT_UNLINK_CLOSE); CHECK_DEFINE_64X(OBD_CONNECT_MULTIMODRPCS); CHECK_DEFINE_64X(OBD_CONNECT_DIR_STRIPE); + CHECK_DEFINE_64X(OBD_CONNECT_BULK_MBITS); CHECK_VALUE_X(OBD_CKSUM_CRC32); CHECK_VALUE_X(OBD_CKSUM_ADLER); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 2d347c7..435dfc6 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -760,10 +760,22 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ptlrpc_body_v3, pb_pre_versions)); LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions) == 32, "found %lld\n", (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions)); - LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding) == 120, "found %lld\n", - (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding)); - LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding) == 32, "found %lld\n", - (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_mbits) == 120, "found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body_v3, pb_mbits)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_0) == 128, "found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_0)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_1) == 136, "found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_1)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_2) == 144, "found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body_v3, pb_padding64_2)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2)); CLASSERT(LUSTRE_JOBID_SIZE == 32); LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_jobid) == 152, "found %lld\n", (long long)(int)offsetof(struct ptlrpc_body_v3, pb_jobid)); @@ -845,10 +857,23 @@ void lustre_assert_wire_constants(void) (int)offsetof(struct ptlrpc_body_v3, pb_pre_versions), (int)offsetof(struct ptlrpc_body_v2, pb_pre_versions)); LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_pre_versions), "%d != %d\n", (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_pre_versions), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_pre_versions)); - LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding) == (int)offsetof(struct ptlrpc_body_v2, pb_padding), "%d != %d\n", - (int)offsetof(struct ptlrpc_body_v3, pb_padding), (int)offsetof(struct ptlrpc_body_v2, pb_padding)); - LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding), "%d != %d\n", - (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_mbits) == (int)offsetof(struct ptlrpc_body_v2, pb_mbits), "%d != %d\n", + (int)offsetof(struct ptlrpc_body_v3, pb_mbits), (int)offsetof(struct ptlrpc_body_v2, pb_mbits)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_mbits), "%d != %d\n", + (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_mbits), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_mbits)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_0) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_0), "%d != %d\n", + (int)offsetof(struct ptlrpc_body_v3, pb_padding64_0), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_0)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_0), "%d != %d\n", + (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_0), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_0)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_1) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_1), "%d != %d\n", + (int)offsetof(struct ptlrpc_body_v3, pb_padding64_1), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_1)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_1), "%d != %d\n", + (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_1), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_1)); + LASSERTF((int)offsetof(struct ptlrpc_body_v3, pb_padding64_2) == (int)offsetof(struct ptlrpc_body_v2, pb_padding64_2), "%d != %d\n", + (int)offsetof(struct ptlrpc_body_v3, pb_padding64_2), (int)offsetof(struct ptlrpc_body_v2, pb_padding64_2)); + LASSERTF((int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2) == (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_2), "%d != %d\n", + (int)sizeof(((struct ptlrpc_body_v3 *)0)->pb_padding64_2), (int)sizeof(((struct ptlrpc_body_v2 *)0)->pb_padding64_2)); + LASSERTF(MSG_PTLRPC_BODY_OFF == 0, "found %lld\n", (long long)MSG_PTLRPC_BODY_OFF); LASSERTF(REQ_REC_OFF == 1, "found %lld\n", -- 1.8.3.1