*/
-#include "gmnal.h"
+#include "gmlnd.h"
-ptl_err_t
-gmnal_cb_recv(lib_nal_t *libnal, void *private,
- lib_msg_t *libmsg,
- unsigned int niov, struct iovec *iov,
- size_t offset, size_t mlen, size_t rlen)
+int
+gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+ int delayed, unsigned int niov,
+ struct iovec *iov, lnet_kiov_t *kiov,
+ unsigned int offset, unsigned int mlen, unsigned int rlen)
{
+ gmnal_ni_t *gmni = ni->ni_data;
gmnal_rx_t *rx = (gmnal_rx_t*)private;
- gmnal_msg_t *msg = rx->rx_msg;
- size_t nobleft = mlen;
- int rxnob;
- char *buffer;
- size_t nob;
-
- CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], libmsg[%p], "
- "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, libmsg, niov, iov, offset, mlen, rlen);
+ gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
+ int npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
+ int payload_offset = offsetof(gmnal_msg_t,
+ gmm_u.immediate.gmim_payload[0]);
+ int nob = payload_offset + mlen;
LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+ LASSERT (iov == NULL || kiov == NULL);
- buffer = &msg->gmm_u.immediate.gmim_payload[0];
- rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
-
- if (rx->rx_recv_nob < rxnob) {
- CERROR("Short message from nid "LPD64": got %d, need %d\n",
- msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
- return PTL_FAIL;
+ if (rx->rx_recv_nob < nob) {
+ CERROR("Short message from nid %s: got %d, need %d\n",
+ libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
+ gmnal_post_rx(gmni, rx);
+ return -EIO;
}
-
- while (nobleft > 0) {
- LASSERT (niov > 0);
- if (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- } else {
- nob = MIN (iov->iov_len - offset, nobleft);
-
- gm_bcopy(buffer, iov->iov_base + offset, nob);
-
- buffer += nob;
- nobleft -= nob;
- offset = 0;
- }
- niov--;
- iov++;
- }
-
- lib_finalize(libnal, private, libmsg, PTL_OK);
- return PTL_OK;
+ if (kiov != NULL)
+ lnet_copy_kiov2kiov(niov, kiov, offset,
+ npages, rx->rx_buf.nb_kiov, payload_offset,
+ mlen);
+ else
+ lnet_copy_kiov2iov(niov, iov, offset,
+ npages, rx->rx_buf.nb_kiov, payload_offset,
+ mlen);
+
+ lnet_finalize(ni, lntmsg, 0);
+ gmnal_post_rx(gmni, rx);
+ return 0;
}
-ptl_err_t
-gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
- lib_msg_t *libmsg,
- unsigned int nkiov, ptl_kiov_t *kiov,
- size_t offset, size_t mlen, size_t rlen)
+int
+gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
{
- gmnal_rx_t *rx = (gmnal_rx_t*)private;
- gmnal_msg_t *msg = rx->rx_msg;
- size_t nobleft = mlen;
- int rxnob;
- size_t nob;
- char *ptr;
- void *buffer;
-
- CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
- "libmsg[%p], kniov[%d], kiov [%p], "
- "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, libmsg, nkiov, kiov, offset, mlen, rlen);
-
- LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
-
- buffer = &msg->gmm_u.immediate.gmim_payload[0];
- rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
-
- if (rx->rx_recv_nob < rxnob) {
- CERROR("Short message from nid "LPD64": got %d, need %d\n",
- msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
- return PTL_FAIL;
+ lnet_hdr_t *hdr= &lntmsg->msg_hdr;
+ int type = lntmsg->msg_type;
+ lnet_process_id_t target = lntmsg->msg_target;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int len = lntmsg->msg_len;
+ gmnal_ni_t *gmni = ni->ni_data;
+ gm_status_t gmrc;
+ gmnal_tx_t *tx;
+
+ LASSERT (iov == NULL || kiov == NULL);
+
+ /* I may not block for a tx if I'm responding to an incoming message */
+ tx = gmnal_get_tx(gmni);
+ if (tx == NULL) {
+ if (!gmni->gmni_shutdown)
+ CERROR ("Can't get tx for msg type %d for %s\n",
+ type, libcfs_nid2str(target.nid));
+ return -EIO;
}
-
- while (nobleft > 0) {
- LASSERT (nkiov > 0);
-
- if (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- } else {
- nob = MIN (kiov->kiov_len - offset, nobleft);
-
- ptr = ((char *)kmap(kiov->kiov_page)) +
- kiov->kiov_offset;
-
- gm_bcopy(buffer, ptr + offset, nob);
-
- kunmap(kiov->kiov_page);
-
- buffer += nob;
- nobleft -= nob;
- offset = 0;
- }
- kiov++;
- nkiov--;
- }
-
- lib_finalize(libnal, private, libmsg, PTL_OK);
- return PTL_OK;
-}
-
-ptl_err_t
-gmnal_cb_send(lib_nal_t *libnal, void *private,
- lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
- ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov,
- size_t offset, size_t len)
-{
-
- gmnal_ni_t *gmnalni = libnal->libnal_data;
- size_t nobleft = len;
- void *buffer;
- gmnal_tx_t *tx;
- size_t nob;
- CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] "
- "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid);
+ tx->tx_nid = target.nid;
- if ((nid >> 32) != 0) {
- CERROR("Illegal nid: "LPU64"\n", nid);
- return PTL_FAIL;
+ gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
+ &tx->tx_gmlid);
+ if (gmrc != GM_SUCCESS) {
+ CERROR("Can't map Nid %s to a GM local ID: %d\n",
+ libcfs_nid2str(target.nid), gmrc);
+ /* NB tx_lntmsg not set => doesn't finalize */
+ gmnal_tx_done(tx, -EIO);
+ return -EIO;
}
- tx = gmnal_get_tx(gmnalni, 1);
-
- gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
- gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
-
- buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
- while (nobleft > 0) {
- LASSERT (niov > 0);
-
- if (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- } else {
- nob = MIN (iov->iov_len - offset, nobleft);
-
- gm_bcopy(iov->iov_base + offset, buffer, nob);
-
- buffer += nob;
- nobleft -= nob;
- offset = 0;
+ gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf),
+ target.nid, GMNAL_MSG_IMMEDIATE);
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
+ tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
+
+ if (the_lnet.ln_testprotocompat != 0) {
+ /* single-shot proto test */
+ LNET_LOCK();
+ if ((the_lnet.ln_testprotocompat & 1) != 0) {
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
+ the_lnet.ln_testprotocompat &= ~1;
}
- niov--;
- iov++;
- }
-
- nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
- return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
-}
-
-ptl_err_t
-gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
- lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
- ptl_nid_t nid, ptl_pid_t pid,
- unsigned int nkiov, ptl_kiov_t *kiov,
- size_t offset, size_t len)
-{
-
- gmnal_ni_t *gmnalni = libnal->libnal_data;
- size_t nobleft = len;
- void *buffer;
- gmnal_tx_t *tx;
- char *ptr;
- size_t nob;
-
- CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
- LPSZ"] len["LPSZ"]\n", nid, nkiov, offset, len);
-
- if ((nid >> 32) != 0) {
- CERROR("Illegal nid: "LPU64"\n", nid);
- return PTL_FAIL;
+ if ((the_lnet.ln_testprotocompat & 2) != 0) {
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
+ LNET_PROTO_MAGIC;
+ the_lnet.ln_testprotocompat &= ~2;
+ }
+ LNET_UNLOCK();
}
- tx = gmnal_get_tx(gmnalni, 1);
-
- gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
- gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
-
- buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
- while (nobleft > 0) {
- LASSERT (nkiov > 0);
-
- if (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- } else {
- nob = MIN (kiov->kiov_len - offset, nobleft);
-
- ptr = ((char *)kmap(kiov->kiov_page)) +
- kiov->kiov_offset;
-
- gm_bcopy(ptr + offset, buffer, nob);
-
- kunmap(kiov->kiov_page);
+ if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
+ /* whole message fits in tx_buf */
+ char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
- buffer += nob;
- nobleft -= nob;
- offset = 0;
- }
- nkiov--;
- kiov++;
+ if (iov != NULL)
+ lnet_copy_iov2flat(len, buffer, 0,
+ niov, iov, offset, len);
+ else
+ lnet_copy_kiov2flat(len, buffer, 0,
+ niov, kiov, offset, len);
+
+ tx->tx_msgnob += len;
+ tx->tx_large_nob = 0;
+ } else {
+ /* stash payload pts to copy later */
+ tx->tx_large_nob = len;
+ tx->tx_large_iskiov = (kiov != NULL);
+ tx->tx_large_niov = niov;
+ if (tx->tx_large_iskiov)
+ tx->tx_large_frags.kiov = kiov;
+ else
+ tx->tx_large_frags.iov = iov;
}
- nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
- return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
-}
+ LASSERT(tx->tx_lntmsg == NULL);
+ tx->tx_lntmsg = lntmsg;
+
+ spin_lock(&gmni->gmni_tx_lock);
-int
-gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
-{
- CDEBUG(D_TRACE, "gmnal_cb_dist\n");
+ list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+ gmnal_check_txqueues_locked(gmni);
- if (dist != NULL)
- *dist = 1;
+ spin_unlock(&gmni->gmni_tx_lock);
- return PTL_OK;
+ return 0;
}