memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
- tx->tx_rdma_md.start = tx->tx_rdma_frags;
+ tx->tx_rdma_md.start = tx->tx_frags;
tx->tx_rdma_md.user_ptr = &tx->tx_rdma_eventarg;
tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
tx->tx_rdma_md.options = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
break;
case TX_TYPE_GET_RESPONSE: /* active: I put */
- tx->tx_rdma_md.threshold = 1; /* SEND */
+ tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1; /* SEND + ACK? */
break;
}
if (iov != NULL) {
tx->tx_rdma_md.options |= PTL_MD_IOVEC;
tx->tx_rdma_md.length =
- lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+ lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
niov, iov, offset, nob);
return;
}
tx->tx_rdma_md.options |= PTL_MD_KIOV;
tx->tx_rdma_md.length =
- lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->kiov,
+ lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
niov, kiov, offset, nob);
#else
if (iov != NULL) {
tx->tx_rdma_md.options |= PTL_MD_IOVEC;
tx->tx_rdma_md.length =
- kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+ kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
niov, iov, offset, nob);
return;
}
tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
tx->tx_rdma_md.length =
- kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+ kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
niov, kiov, offset, nob);
#endif
}
ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md,
PTL_UNLINK, &mdh);
if (ptlrc != PTL_OK) {
- CERROR("PtlMDBind(%s) failed: %d\n",
- libcfs_id2str(peer->peer_id), ptlrc);
+ CERROR("PtlMDBind(%s) failed: %s(%d)\n",
+ libcfs_id2str(peer->peer_id),
+ kptllnd_errtype2str(ptlrc), ptlrc);
tx->tx_status = -EIO;
kptllnd_tx_decref(tx);
return -EIO;
spin_unlock_irqrestore(&peer->peer_lock, flags);
+ tx->tx_tposted = jiffies;
+
if (type == TX_TYPE_GET_RESPONSE)
ptlrc = PtlPut(mdh,
- PTL_NOACK_REQ,
+ tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
rx->rx_initiator,
*kptllnd_tunables.kptl_portal,
0, /* acl cookie */
0); /* offset */
if (ptlrc != PTL_OK) {
- CERROR("Ptl%s failed: %d\n",
- (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get", ptlrc);
+ CERROR("Ptl%s failed: %s(%d)\n",
+ (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
+ kptllnd_errtype2str(ptlrc), ptlrc);
kptllnd_peer_close(peer, -EIO);
/* Everything (including this RDMA) queued on the peer will
lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
unsigned int payload_offset = lntmsg->msg_offset;
unsigned int payload_nob = lntmsg->msg_len;
+ kptl_peer_t *peer;
kptl_tx_t *tx;
int nob;
+ int nfrag;
+ int rc;
LASSERT (payload_nob == 0 || payload_niov > 0);
LASSERT (payload_niov <= LNET_MAX_IOV);
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
LASSERT (!in_interrupt());
+ rc = kptllnd_find_target(&peer, target);
+ if (rc != 0)
+ return rc;
+
switch (type) {
default:
LBUG();
case LNET_MSG_REPLY:
case LNET_MSG_PUT:
- /* Is the payload small enough not to need RDMA? */
+ /* Should the payload avoid RDMA? */
nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
- if (nob <= *kptllnd_tunables.kptl_max_msg_size)
+ if (payload_kiov == NULL &&
+ nob <= peer->peer_max_msg_size)
break;
tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
CERROR("Can't send %s to %s: can't allocate descriptor\n",
lnet_msgtyp2str(type),
libcfs_id2str(target));
- return -ENOMEM;
+ rc = -ENOMEM;
+ goto out;
}
kptllnd_init_rdma_md(tx, payload_niov,
tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
sizeof(kptl_rdma_msg_t));
- kptllnd_tx_launch(tx, target);
- return 0;
+
+ CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
+ libcfs_id2str(target),
+ le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
+
+ kptllnd_tx_launch(peer, tx, 0);
+ goto out;
case LNET_MSG_GET:
/* routed gets don't RDMA */
nob = lntmsg->msg_md->md_length;
nob = offsetof(kptl_msg_t,
ptlm_u.immediate.kptlim_payload[nob]);
- if (nob <= *kptllnd_tunables.kptl_max_msg_size)
+ if (nob <= peer->peer_max_msg_size)
break;
tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
if (tx == NULL) {
CERROR("Can't send GET to %s: can't allocate descriptor\n",
libcfs_id2str(target));
- return -ENOMEM;
+ rc = -ENOMEM;
+ goto out;
}
tx->tx_lnet_replymsg =
CERROR("Failed to allocate LNET reply for %s\n",
libcfs_id2str(target));
kptllnd_tx_decref(tx);
- return -ENOMEM;
+ rc = -ENOMEM;
+ goto out;
}
if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
sizeof(kptl_rdma_msg_t));
- kptllnd_tx_launch(tx, target);
- return 0;
+
+ CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
+ libcfs_id2str(target),
+ le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
+
+ kptllnd_tx_launch(peer, tx, 0);
+ goto out;
case LNET_MSG_ACK:
CDEBUG(D_NET, "LNET_MSG_ACK\n");
break;
}
+ /* I don't have to handle kiovs */
+ LASSERT (payload_nob == 0 || payload_iov != NULL);
+
tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
if (tx == NULL) {
CERROR("Can't send %s to %s: can't allocate descriptor\n",
lnet_msgtyp2str(type), libcfs_id2str(target));
- return -ENOMEM;
+ rc = -ENOMEM;
+ goto out;
}
tx->tx_lnet_msg = lntmsg;
tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
- if (payload_kiov != NULL)
- lnet_copy_kiov2flat(*kptllnd_tunables.kptl_max_msg_size,
- tx->tx_msg->ptlm_u.immediate.kptlim_payload,
- 0,
- payload_niov, payload_kiov,
- payload_offset, payload_nob);
- else
- lnet_copy_iov2flat(*kptllnd_tunables.kptl_max_msg_size,
- tx->tx_msg->ptlm_u.immediate.kptlim_payload,
- 0,
- payload_niov, payload_iov,
- payload_offset, payload_nob);
+ if (payload_nob == 0) {
+ nfrag = 0;
+ } else {
+ tx->tx_frags->iov[0].iov_base = tx->tx_msg;
+ tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
+ ptlm_u.immediate.kptlim_payload);
+ /* NB relying on lustre not asking for PTL_MD_MAX_IOV
+ * fragments!! */
+#ifdef _USING_LUSTRE_PORTALS_
+ nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1,
+ &tx->tx_frags->iov[1],
+ payload_niov, payload_iov,
+ payload_offset, payload_nob);
+#else
+ nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
+ &tx->tx_frags->iov[1],
+ payload_niov, payload_iov,
+ payload_offset, payload_nob);
+#endif
+ }
+
nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
- kptllnd_tx_launch(tx, target);
- return 0;
+
+ CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
+ libcfs_id2str(target),
+ lnet_msgtyp2str(lntmsg->msg_type),
+ (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ?
+ le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
+ (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ?
+ le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
+ tx);
+
+ kptllnd_tx_launch(peer, tx, nfrag);
+
+ out:
+ kptllnd_peer_decref(peer);
+ return rc;
}
int
int id = (long)arg;
char name[16];
wait_queue_t waitlink;
+ int stamp = 0;
int peer_index = 0;
unsigned long deadline = jiffies;
int timeout;
chunk = 1;
for (i = 0; i < chunk; i++) {
- kptllnd_peer_check_bucket(peer_index);
+ kptllnd_peer_check_bucket(peer_index, stamp);
peer_index = (peer_index + 1) %
kptllnd_data.kptl_peer_hash_size;
}
deadline += p * HZ;
+ stamp++;
continue;
}