#include "gmnal.h"
-ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- unsigned int niov, struct iovec *iov, size_t offset,
- size_t mlen, size_t rlen)
+ptl_err_t
+gmnal_cb_recv(lib_nal_t *libnal, void *private,
+ lib_msg_t *libmsg,
+ unsigned int niov, struct iovec *iov,
+ size_t offset, size_t mlen, size_t rlen)
{
- void *buffer = NULL;
- gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- int status = PTL_OK;
- size_t msglen = mlen;
- size_t nob;
-
- CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
+ gmnal_rx_t *rx = (gmnal_rx_t*)private;
+ gmnal_msg_t *msg = rx->rx_msg;
+ size_t nobleft = mlen;
+ int rxnob;
+ char *buffer;
+ size_t nob;
+
+ CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], libmsg[%p], "
"niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, cookie, niov, iov, offset, mlen, rlen);
-
- switch(srxd->type) {
- case(GMNAL_SMALL_MESSAGE):
- CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
- /* HP SFS 1380: Proactively change receives to avoid a receive
- * side occurrence of filling pkmap_count[].
- */
- buffer = srxd->buffer;
- buffer += GMNAL_MSGHDR_SIZE;
- buffer += sizeof(ptl_hdr_t);
-
- while(niov--) {
- if (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- } else {
- nob = MIN (iov->iov_len - offset, msglen);
- CDEBUG(D_INFO, "processing iov [%p] base [%p] "
- "offset [%d] len ["LPSZ"] to [%p] left "
- "["LPSZ"]\n", iov, iov->iov_base,
- offset, nob, buffer, msglen);
- gm_bcopy(buffer, iov->iov_base + offset, nob);
- buffer += nob;
- msglen -= nob;
- offset = 0;
- }
- iov++;
- }
- status = gmnal_small_rx(libnal, private, cookie);
- break;
- case(GMNAL_LARGE_MESSAGE_INIT):
- CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
- status = gmnal_large_rx(libnal, private, cookie, niov,
- iov, offset, mlen, rlen);
- }
-
- CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
- return(status);
+ libnal, private, libmsg, niov, iov, offset, mlen, rlen);
+
+ LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+
+ buffer = &msg->gmm_u.immediate.gmim_payload[0];
+ rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
+
+ if (rx->rx_recv_nob < rxnob) {
+ CERROR("Short message from nid "LPD64": got %d, need %d\n",
+ msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
+ return PTL_FAIL;
+ }
+
+ while (nobleft > 0) {
+ LASSERT (niov > 0);
+
+ if (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ } else {
+ nob = MIN (iov->iov_len - offset, nobleft);
+
+ gm_bcopy(buffer, iov->iov_base + offset, nob);
+
+ buffer += nob;
+ nobleft -= nob;
+ offset = 0;
+ }
+ niov--;
+ iov++;
+ }
+
+ lib_finalize(libnal, private, libmsg, PTL_OK);
+ return PTL_OK;
}
-ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
- lib_msg_t *cookie, unsigned int kniov,
- ptl_kiov_t *kiov, size_t offset, size_t mlen,
- size_t rlen)
+ptl_err_t
+gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
+ lib_msg_t *libmsg,
+ unsigned int nkiov, ptl_kiov_t *kiov,
+ size_t offset, size_t mlen, size_t rlen)
{
- gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- int status = PTL_OK;
- char *ptr = NULL;
- void *buffer = NULL;
-
+ gmnal_rx_t *rx = (gmnal_rx_t*)private;
+ gmnal_msg_t *msg = rx->rx_msg;
+ size_t nobleft = mlen;
+ int rxnob;
+ size_t nob;
+ char *ptr;
+ void *buffer;
CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
- "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
-
- if (srxd->type == GMNAL_SMALL_MESSAGE) {
- size_t msglen = mlen;
- size_t nob;
-
- buffer = srxd->buffer;
- buffer += GMNAL_MSGHDR_SIZE;
- buffer += sizeof(ptl_hdr_t);
-
- /*
- * map each page and create an iovec for it
- */
- while (kniov--) {
- /* HP SFS 1380: Proactively change receives to avoid a
- * receive side occurrence of filling pkmap_count[].
- */
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
- kniov, kiov);
-
- if (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- } else {
- nob = MIN (kiov->kiov_len - offset, msglen);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] "
- "offset[%d]\n", kiov->kiov_page,
- kiov->kiov_len, kiov->kiov_offset);
- ptr = ((char *)kmap(kiov->kiov_page)) +
- kiov->kiov_offset;
-
- CDEBUG(D_INFO, "processing ptr [%p] offset [%d] "
- "len ["LPSZ"] from [%p] left ["LPSZ"]\n",
- ptr, offset, nob, buffer, msglen);
- gm_bcopy(buffer, ptr + offset, nob);
- kunmap(kiov->kiov_page);
- buffer += nob;
- msglen -= nob;
- offset = 0;
- }
- kiov++;
- }
- CDEBUG(D_INFO, "calling gmnal_small_rx\n");
- status = gmnal_small_rx(libnal, private, cookie);
- }
+ "libmsg[%p], kniov[%d], kiov [%p], "
+ "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+ libnal, private, libmsg, nkiov, kiov, offset, mlen, rlen);
- CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
- return(status);
-}
+ LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+ buffer = &msg->gmm_u.immediate.gmim_payload[0];
+ rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
-ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov, size_t offset,
- size_t len)
-{
+ if (rx->rx_recv_nob < rxnob) {
+ CERROR("Short message from nid "LPD64": got %d, need %d\n",
+ msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
+ return PTL_FAIL;
+ }
+
+ while (nobleft > 0) {
+ LASSERT (nkiov > 0);
- gmnal_data_t *nal_data;
- void *buffer = NULL;
- gmnal_stxd_t *stxd = NULL;
+ if (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ } else {
+ nob = MIN (kiov->kiov_len - offset, nobleft);
+ ptr = ((char *)kmap(kiov->kiov_page)) +
+ kiov->kiov_offset;
- CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
- "] nid["LPU64"]\n", niov, offset, len, nid);
- nal_data = libnal->libnal_data;
- if (!nal_data) {
- CERROR("no nal_data\n");
- return(PTL_FAIL);
- } else {
- CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
- }
+ gm_bcopy(buffer, ptr + offset, nob);
- if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
- size_t msglen = len;
- size_t nob;
-
- CDEBUG(D_INFO, "This is a small message send\n");
- /*
- * HP SFS 1380: With the change to gmnal_small_tx, need to get
- * the stxd and do relevant setup here
- */
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_INFO, "stxd [%p]\n", stxd);
- /* Set the offset of the data to copy into the buffer */
- buffer = stxd->buffer + GMNAL_MSGHDR_SIZE + sizeof(ptl_hdr_t);
- while(niov--) {
- if (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- } else {
- nob = MIN (iov->iov_len - offset, msglen);
- CDEBUG(D_INFO, "processing iov [%p] base [%p]"
- " offset [%d] len ["LPSZ"] to [%p] left"
- " ["LPSZ"]\n", iov, iov->iov_base,
- offset, nob, buffer, msglen);
- gm_bcopy(iov->iov_base + offset, buffer, nob);
- buffer += nob;
- msglen -= nob;
- offset = 0;
- }
- iov++;
+ kunmap(kiov->kiov_page);
+
+ buffer += nob;
+ nobleft -= nob;
+ offset = 0;
}
- gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
- stxd, len);
- } else {
- CERROR("Large message send is not supported\n");
- lib_finalize(libnal, private, cookie, PTL_FAIL);
- return(PTL_FAIL);
- gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
- niov, iov, offset, len);
+ kiov++;
+ nkiov--;
}
- return(PTL_OK);
+
+ lib_finalize(libnal, private, libmsg, PTL_OK);
+ return PTL_OK;
+}
+
+ptl_err_t
+gmnal_cb_send(lib_nal_t *libnal, void *private,
+ lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
+ ptl_nid_t nid, ptl_pid_t pid,
+ unsigned int niov, struct iovec *iov,
+ size_t offset, size_t len)
+{
+
+ gmnal_ni_t *gmnalni = libnal->libnal_data;
+ size_t nobleft = len;
+ void *buffer;
+ gmnal_tx_t *tx;
+ size_t nob;
+
+ CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] "
+ "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid);
+
+ if ((nid >> 32) != 0) {
+ CERROR("Illegal nid: "LPU64"\n", nid);
+ return PTL_FAIL;
+ }
+
+ tx = gmnal_get_tx(gmnalni, 1);
+
+ gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
+ gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
+
+ buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
+ while (nobleft > 0) {
+ LASSERT (niov > 0);
+
+ if (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ } else {
+ nob = MIN (iov->iov_len - offset, nobleft);
+
+ gm_bcopy(iov->iov_base + offset, buffer, nob);
+
+ buffer += nob;
+ nobleft -= nob;
+ offset = 0;
+ }
+ niov--;
+ iov++;
+ }
+
+ nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
+ return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
}
-ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
- lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
- ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
- ptl_kiov_t *kiov, size_t offset, size_t len)
+ptl_err_t
+gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
+ lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
+ ptl_nid_t nid, ptl_pid_t pid,
+ unsigned int nkiov, ptl_kiov_t *kiov,
+ size_t offset, size_t len)
{
- gmnal_data_t *nal_data;
+ gmnal_ni_t *gmnalni = libnal->libnal_data;
+ size_t nobleft = len;
+ void *buffer;
+ gmnal_tx_t *tx;
char *ptr;
- void *buffer = NULL;
- gmnal_stxd_t *stxd = NULL;
- ptl_err_t status = PTL_OK;
+ size_t nob;
CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
- LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
- nal_data = libnal->libnal_data;
- if (!nal_data) {
- CERROR("no nal_data\n");
- return(PTL_FAIL);
- } else {
- CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
- }
+ LPSZ"] len["LPSZ"]\n", nid, nkiov, offset, len);
- /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
- * more aggressively. This is the fix for a livelock situation under
- * load on ia32 that occurs when there are no more available entries in
- * the pkmap_count array. Just fill the buffer and let gmnal_small_tx
- * put the headers in after we pass it the stxd pointer.
- */
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_INFO, "stxd [%p]\n", stxd);
- /* Set the offset of the data to copy into the buffer */
- buffer = stxd->buffer + GMNAL_MSGHDR_SIZE + sizeof(ptl_hdr_t);
-
- if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
- size_t msglen = len;
- size_t nob;
-
- CDEBUG(D_INFO, "This is a small message send\n");
-
- while(kniov--) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
- if (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- } else {
- nob = MIN (kiov->kiov_len - offset, msglen);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
-
- ptr = ((char *)kmap(kiov->kiov_page)) +
- kiov->kiov_offset;
-
- CDEBUG(D_INFO, "processing ptr [%p] offset [%d]"
- " len ["LPSZ"] to [%p] left ["LPSZ"]\n",
- ptr, offset, nob, buffer, msglen);
- gm_bcopy(ptr + offset, buffer, nob);
- kunmap(kiov->kiov_page);
- buffer += nob;
- msglen -= nob;
- offset = 0;
- }
- kiov++;
- }
- status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
- pid, stxd, len);
- } else {
- int i = 0;
- struct iovec *iovec = NULL, *iovec_dup = NULL;
- ptl_kiov_t *kiov_dup = kiov;
-
- PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
- iovec_dup = iovec;
- CERROR("Large message send it is not supported yet\n");
- PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
- return(PTL_FAIL);
- for (i=0; i<kniov; i++) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
-
- iovec->iov_base = kmap(kiov->kiov_page)
- + kiov->kiov_offset;
- iovec->iov_len = kiov->kiov_len;
- iovec++;
- kiov++;
- }
- gmnal_large_tx(libnal, private, cookie, hdr, type, nid,
- pid, kniov, iovec, offset, len);
- for (i=0; i<kniov; i++) {
- kunmap(kiov_dup->kiov_page);
- kiov_dup++;
- }
- PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
- }
- return(status);
+ if ((nid >> 32) != 0) {
+ CERROR("Illegal nid: "LPU64"\n", nid);
+ return PTL_FAIL;
+ }
+
+ tx = gmnal_get_tx(gmnalni, 1);
+
+ gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
+ gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
+
+ buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
+ while (nobleft > 0) {
+ LASSERT (nkiov > 0);
+
+ if (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ } else {
+ nob = MIN (kiov->kiov_len - offset, nobleft);
+
+ ptr = ((char *)kmap(kiov->kiov_page)) +
+ kiov->kiov_offset;
+
+ gm_bcopy(ptr + offset, buffer, nob);
+
+ kunmap(kiov->kiov_page);
+
+ buffer += nob;
+ nobleft -= nob;
+ offset = 0;
+ }
+ nkiov--;
+ kiov++;
+ }
+
+ nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
+ return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
}
-int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
+int
+gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
{
CDEBUG(D_TRACE, "gmnal_cb_dist\n");
- if (dist)
- *dist = 27;
- return(PTL_OK);
+
+ if (dist != NULL)
+ *dist = 1;
+
+ return PTL_OK;
}