- /* NB no locking required until desc is on the network */
- LASSERT (desc->bd_nob > 0);
- LASSERT (!desc->bd_network_rw);
- LASSERT (desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
- LASSERT (desc->bd_req != NULL);
- LASSERT (desc->bd_type == BULK_PUT_SINK ||
- desc->bd_type == BULK_GET_SOURCE);
-
- desc->bd_success = 0;
-
- peer = desc->bd_import->imp_connection->c_peer;
-
- md.user_ptr = &desc->bd_cbid;
- md.eq_handle = ptlrpc_eq_h;
- md.threshold = 1; /* PUT or GET */
- md.options = PTLRPC_MD_OPTIONS |
- ((desc->bd_type == BULK_GET_SOURCE) ?
- LNET_MD_OP_GET : LNET_MD_OP_PUT);
- ptlrpc_fill_bulk_md(&md, desc);
-
- LASSERT (desc->bd_cbid.cbid_fn == client_bulk_callback);
- LASSERT (desc->bd_cbid.cbid_arg == desc);
-
- /* XXX Registering the same xid on retried bulk makes my head
- * explode trying to understand how the original request's bulk
- * might interfere with the retried request -eeb */
- LASSERTF (!desc->bd_registered || req->rq_xid != desc->bd_last_xid,
- "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
- desc->bd_registered, req->rq_xid, desc->bd_last_xid);
- desc->bd_registered = 1;
- desc->bd_last_xid = req->rq_xid;
-
- rc = LNetMEAttach(desc->bd_portal, peer,
- req->rq_xid, 0, LNET_UNLINK, LNET_INS_AFTER, &me_h);
- if (rc != 0) {
- CERROR("LNetMEAttach failed: %d\n", rc);
- LASSERT (rc == -ENOMEM);
- RETURN (-ENOMEM);
- }
-
- /* About to let the network at it... */
- desc->bd_network_rw = 1;
- rc = LNetMDAttach(me_h, md, LNET_UNLINK, &desc->bd_md_h);
- if (rc != 0) {
- CERROR("LNetMDAttach failed: %d\n", rc);
- LASSERT (rc == -ENOMEM);
- desc->bd_network_rw = 0;
- rc2 = LNetMEUnlink (me_h);
- LASSERT (rc2 == 0);
- RETURN (-ENOMEM);
- }
-
- CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
- "portal %u\n",
- desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
- desc->bd_iov_count, desc->bd_nob,
- req->rq_xid, desc->bd_portal);
- RETURN(0);
+ /* NB no locking required until desc is on the network */
+ LASSERT(desc->bd_nob > 0);
+ LASSERT(desc->bd_md_count == 0);
+ LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
+ LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
+ LASSERT(desc->bd_req != NULL);
+ LASSERT(desc->bd_type == BULK_PUT_SINK ||
+ desc->bd_type == BULK_GET_SOURCE);
+
+ /* cleanup the state of the bulk for it will be reused */
+ if (req->rq_resend || req->rq_send_state == LUSTRE_IMP_REPLAY)
+ desc->bd_nob_transferred = 0;
+ else
+ LASSERT(desc->bd_nob_transferred == 0);
+
+ desc->bd_failure = 0;
+
+ peer = desc->bd_import->imp_connection->c_peer;
+
+ LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
+ LASSERT(desc->bd_cbid.cbid_arg == desc);
+
+ /* An XID is only used for a single request from the client.
+ * For retried bulk transfers, a new XID will be allocated in
+ * in ptlrpc_check_set() if it needs to be resent, so it is not
+ * using the same RDMA match bits after an error.
+ *
+ * For multi-bulk RPCs, rq_xid is the last XID needed for bulks. The
+ * first bulk XID is power-of-two aligned before rq_xid. LU-1431 */
+ xid = req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
+ LASSERTF(!(desc->bd_registered &&
+ req->rq_send_state != LUSTRE_IMP_REPLAY) ||
+ xid != desc->bd_last_xid,
+ "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
+ desc->bd_registered, xid, desc->bd_last_xid);
+
+ total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
+ desc->bd_registered = 1;
+ desc->bd_last_xid = xid;
+ desc->bd_md_count = total_md;
+ md.user_ptr = &desc->bd_cbid;
+ md.eq_handle = ptlrpc_eq_h;
+ md.threshold = 1; /* PUT or GET */
+
+ for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
+ md.options = PTLRPC_MD_OPTIONS |
+ ((desc->bd_type == BULK_GET_SOURCE) ?
+ LNET_MD_OP_GET : LNET_MD_OP_PUT);
+ ptlrpc_fill_bulk_md(&md, desc, posted_md);
+
+ rc = LNetMEAttach(desc->bd_portal, peer, xid, 0,
+ LNET_UNLINK, LNET_INS_AFTER, &me_h);
+ if (rc != 0) {
+ CERROR("%s: LNetMEAttach failed x"LPU64"/%d: rc = %d\n",
+ desc->bd_import->imp_obd->obd_name, xid,
+ posted_md, rc);
+ break;
+ }
+
+ /* About to let the network at it... */
+ rc = LNetMDAttach(me_h, md, LNET_UNLINK,
+ &desc->bd_mds[posted_md]);
+ if (rc != 0) {
+ CERROR("%s: LNetMDAttach failed x"LPU64"/%d: rc = %d\n",
+ desc->bd_import->imp_obd->obd_name, xid,
+ posted_md, rc);
+ rc2 = LNetMEUnlink(me_h);
+ LASSERT(rc2 == 0);
+ break;
+ }
+ }
+
+ if (rc != 0) {
+ LASSERT(rc == -ENOMEM);
+ spin_lock(&desc->bd_lock);
+ desc->bd_md_count -= total_md - posted_md;
+ spin_unlock(&desc->bd_lock);
+ LASSERT(desc->bd_md_count >= 0);
+ mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
+ req->rq_status = -ENOMEM;
+ RETURN(-ENOMEM);
+ }
+
+ /* Set rq_xid to matchbits of the final bulk so that server can
+ * infer the number of bulks that were prepared */
+ req->rq_xid = --xid;
+ LASSERTF(desc->bd_last_xid == (req->rq_xid & PTLRPC_BULK_OPS_MASK),
+ "bd_last_xid = x"LPU64", rq_xid = x"LPU64"\n",
+ desc->bd_last_xid, req->rq_xid);
+
+ spin_lock(&desc->bd_lock);
+ /* Holler if peer manages to touch buffers before he knows the xid */
+ if (desc->bd_md_count != total_md)
+ CWARN("%s: Peer %s touched %d buffers while I registered\n",
+ desc->bd_import->imp_obd->obd_name, libcfs_id2str(peer),
+ total_md - desc->bd_md_count);
+ spin_unlock(&desc->bd_lock);
+
+ CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, "
+ "xid x"LPX64"-"LPX64", portal %u\n", desc->bd_md_count,
+ desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
+ desc->bd_iov_count, desc->bd_nob,
+ desc->bd_last_xid, req->rq_xid, desc->bd_portal);
+
+ RETURN(0);