spin_unlock(&req->rq_lock);
req->rq_nr_resend++;
- /* allocate new xid to avoid reply reconstruction */
- if (!req->rq_bulk) {
- /* new xid is already allocated for bulk in
- * ptlrpc_check_set() */
- req->rq_xid = ptlrpc_next_xid();
- DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for "
- "resend on EINPROGRESS");
- }
-
/* Readjust the timeout for current conditions */
ptlrpc_at_set_req_timeout(req);
/* delay resend to give a chance to the server to get ready.
spin_lock(&req->rq_lock);
req->rq_resend = 1;
spin_unlock(&req->rq_lock);
- if (req->rq_bulk) {
- __u64 old_xid;
-
- if (!ptlrpc_unregister_bulk(req, 1))
- continue;
-
- /* ensure previous bulk fails */
- old_xid = req->rq_xid;
- req->rq_xid = ptlrpc_next_xid();
- CDEBUG(D_HA, "resend bulk "
- "old x"LPU64
- " new x"LPU64"\n",
- old_xid, req->rq_xid);
- }
+
+ if (req->rq_bulk != NULL &&
+ !ptlrpc_unregister_bulk(req, 1))
+ continue;
}
/*
* rq_wait_ctx is only touched by ptlrpcd,
req->rq_resend = 1;
req->rq_net_err = 0;
req->rq_timedout = 0;
- if (req->rq_bulk) {
- __u64 old_xid = req->rq_xid;
- /* ensure previous bulk fails */
- req->rq_xid = ptlrpc_next_xid();
- CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
- old_xid, req->rq_xid);
- }
ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
}
/**
+ * If request has a new allocated XID (new request or EINPROGRESS resend),
+ * use this XID as matchbits of bulk, otherwise allocate a new matchbits for
+ * request to ensure previous bulk fails and avoid problems with lost replies
+ * and therefore several transfers landing into the same buffer from different
+ * sending attempts.
+ */
+void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *bd = req->rq_bulk;
+
+ LASSERT(bd != NULL);
+
+ if (!req->rq_resend || req->rq_nr_resend != 0) {
+ /* this request has a new xid, just use it as bulk matchbits */
+ req->rq_mbits = req->rq_xid;
+
+ } else { /* needs to generate a new matchbits for resend */
+ __u64 old_mbits = req->rq_mbits;
+
+ if ((bd->bd_import->imp_connect_data.ocd_connect_flags &
+ OBD_CONNECT_BULK_MBITS) != 0)
+ req->rq_mbits = ptlrpc_next_xid();
+ else /* old version transfers rq_xid to peer as matchbits */
+ req->rq_mbits = req->rq_xid = ptlrpc_next_xid();
+
+ CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
+ old_mbits, req->rq_mbits);
+ }
+
+ /* For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so
+ * that server can infer the number of bulks that were prepared,
+ * see LU-1431 */
+ req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
+ LNET_MAX_IOV) - 1;
+}
+
+/**
* Get a glimpse at what next xid value might have been.
* Returns possible next xid.
*/