- struct ptlrpc_connection *conn = desc->bd_export->exp_connection;
- int rc;
- int rc2;
- lnet_md_t md;
- __u64 xid;
- ENTRY;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_PUT_NET))
- RETURN(0);
-
- /* NB no locking required until desc is on the network */
- LASSERT (!desc->bd_network_rw);
- LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
- desc->bd_type == BULK_GET_SINK);
- desc->bd_success = 0;
-
- md.user_ptr = &desc->bd_cbid;
- md.eq_handle = ptlrpc_eq_h;
- md.threshold = 2; /* SENT and ACK/REPLY */
- md.options = PTLRPC_MD_OPTIONS;
- ptlrpc_fill_bulk_md(&md, desc);
-
- LASSERT (desc->bd_cbid.cbid_fn == server_bulk_callback);
- LASSERT (desc->bd_cbid.cbid_arg == desc);
-
- /* NB total length may be 0 for a read past EOF, so we send a 0
- * length bulk, since the client expects a bulk event. */
-
- rc = LNetMDBind(md, LNET_UNLINK, &desc->bd_md_h);
- if (rc != 0) {
- CERROR("LNetMDBind failed: %d\n", rc);
- LASSERT (rc == -ENOMEM);
- RETURN(-ENOMEM);
- }
-
- /* Client's bulk and reply matchbits are the same */
- xid = desc->bd_req->rq_xid;
- CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
- "id %s xid "LPX64"\n", desc->bd_iov_count,
- desc->bd_nob, desc->bd_portal,
- libcfs_id2str(conn->c_peer), xid);
-
- /* Network is about to get at the memory */
- desc->bd_network_rw = 1;
-
- if (desc->bd_type == BULK_PUT_SOURCE)
- rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ,
- conn->c_peer, desc->bd_portal, xid, 0, 0);
- else
- rc = LNetGet (conn->c_self, desc->bd_md_h,
- conn->c_peer, desc->bd_portal, xid, 0);
-
- if (rc != 0) {
- /* Can't send, so we unlink the MD bound above. The UNLINK
- * event this creates will signal completion with failure,
- * so we return SUCCESS here! */
- CERROR("Transfer(%s, %d, "LPX64") failed: %d\n",
- libcfs_id2str(conn->c_peer), desc->bd_portal, xid, rc);
- rc2 = LNetMDUnlink(desc->bd_md_h);
- LASSERT (rc2 == 0);
- }
-
- RETURN(0);
+ struct obd_export *exp = desc->bd_export;
+ struct ptlrpc_connection *conn = exp->exp_connection;
+ int rc = 0;
+ __u64 xid;
+ int posted_md;
+ int total_md;
+ lnet_md_t md;
+ ENTRY;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_PUT_NET))
+ RETURN(0);
+
+ /* NB no locking required until desc is on the network */
+ LASSERT(desc->bd_md_count == 0);
+ LASSERT(desc->bd_type == BULK_PUT_SOURCE ||
+ desc->bd_type == BULK_GET_SINK);
+
+ LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback);
+ LASSERT(desc->bd_cbid.cbid_arg == desc);
+
+ /* NB total length may be 0 for a read past EOF, so we send 0
+ * length bulks, since the client expects bulk events.
+ *
+ * The client may not need all of the bulk XIDs for the RPC. The RPC
+ * used the XID of the highest bulk XID needed, and the server masks
+ * off high bits to get bulk count for this RPC. LU-1431 */
+ xid = desc->bd_req->rq_xid & ~((__u64)desc->bd_md_max_brw - 1);
+ total_md = desc->bd_req->rq_xid - xid + 1;
+
+ desc->bd_md_count = total_md;
+ desc->bd_failure = 0;
+
+ md.user_ptr = &desc->bd_cbid;
+ md.eq_handle = ptlrpc_eq_h;
+ md.threshold = 2; /* SENT and ACK/REPLY */
+
+ for (posted_md = 0; posted_md < total_md; xid++) {
+ md.options = PTLRPC_MD_OPTIONS;
+
+ /* NB it's assumed that source and sink buffer frags are
+ * page-aligned. Otherwise we'd have to send client bulk
+ * sizes over and split server buffer accordingly */
+ ptlrpc_fill_bulk_md(&md, desc, posted_md);
+ rc = LNetMDBind(md, LNET_UNLINK, &desc->bd_mds[posted_md]);
+ if (rc != 0) {
+ CERROR("%s: LNetMDBind failed for MD %u: rc = %d\n",
+ exp->exp_obd->obd_name, posted_md, rc);
+ LASSERT(rc == -ENOMEM);
+ if (posted_md == 0) {
+ desc->bd_md_count = 0;
+ RETURN(-ENOMEM);
+ }
+ break;
+ }
+ /* Network is about to get at the memory */
+ if (desc->bd_type == BULK_PUT_SOURCE)
+ rc = LNetPut(conn->c_self, desc->bd_mds[posted_md],
+ LNET_ACK_REQ, conn->c_peer,
+ desc->bd_portal, xid, 0, 0);
+ else
+ rc = LNetGet(conn->c_self, desc->bd_mds[posted_md],
+ conn->c_peer, desc->bd_portal, xid, 0);
+
+ posted_md++;
+ if (rc != 0) {
+ CERROR("%s: failed bulk transfer with %s:%u x"LPU64": "
+ "rc = %d\n", exp->exp_obd->obd_name,
+ libcfs_id2str(conn->c_peer), desc->bd_portal,
+ xid, rc);
+ break;
+ }
+ }
+
+ if (rc != 0) {
+ /* Can't send, so we unlink the MD bound above. The UNLINK
+ * event this creates will signal completion with failure,
+ * so we return SUCCESS here! */
+ spin_lock(&desc->bd_lock);
+ desc->bd_md_count -= total_md - posted_md;
+ spin_unlock(&desc->bd_lock);
+ LASSERT(desc->bd_md_count >= 0);
+
+ mdunlink_iterate_helper(desc->bd_mds, posted_md);
+ RETURN(0);
+ }
+
+ CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
+ "id %s xid "LPX64"-"LPX64"\n", desc->bd_iov_count,
+ desc->bd_nob, desc->bd_portal, libcfs_id2str(conn->c_peer),
+ xid - posted_md, xid - 1);
+
+ RETURN(0);