#include "gmnal.h"
-int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
- unsigned int niov, struct iovec *iov, size_t mlen,
- size_t rlen)
+ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
+ unsigned int niov, struct iovec *iov, size_t offset,
+ size_t mlen, size_t rlen)
{
+ void *buffer = NULL;
gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
int status = PTL_OK;
-
- CDEBUG(D_TRACE, "gmnal_cb_recv nal_cb [%p], private[%p], cookie[%p],
- niov[%d], iov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
- nal_cb, private, cookie, niov, iov, mlen, rlen);
+ CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
+ "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+ libnal, private, cookie, niov, iov, offset, mlen, rlen);
switch(srxd->type) {
case(GMNAL_SMALL_MESSAGE):
CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
- status = gmnal_small_rx(nal_cb, private, cookie, niov,
- iov, mlen, rlen);
+ /* HP SFS 1380: Proactively change receives to avoid a receive
+ * side occurrence of filling pkmap_count[].
+ */
+ buffer = srxd->buffer;
+ buffer += sizeof(gmnal_msghdr_t);
+ buffer += sizeof(ptl_hdr_t);
+
+ while(niov--) {
+ if (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ } else if (offset > 0) {
+ CDEBUG(D_INFO, "processing [%p] base [%p] "
+ "len %d, offset %d, len ["LPSZ"]\n", iov,
+ iov->iov_base + offset, iov->iov_len,
+ offset, iov->iov_len - offset);
+ gm_bcopy(buffer, iov->iov_base + offset,
+ iov->iov_len - offset);
+ buffer += iov->iov_len - offset;
+ offset = 0;
+ } else {
+ CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n",
+ iov, iov->iov_len);
+ gm_bcopy(buffer, iov->iov_base, iov->iov_len);
+ buffer += iov->iov_len;
+ }
+ iov++;
+ }
+ status = gmnal_small_rx(libnal, private, cookie);
break;
case(GMNAL_LARGE_MESSAGE_INIT):
CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
- status = gmnal_large_rx(nal_cb, private, cookie, niov,
- iov, mlen, rlen);
+ status = gmnal_large_rx(libnal, private, cookie, niov,
+ iov, offset, mlen, rlen);
}
-
CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
return(status);
}
-int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
- unsigned int kniov, ptl_kiov_t *kiov, size_t mlen,
- size_t rlen)
+ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
+ lib_msg_t *cookie, unsigned int kniov,
+ ptl_kiov_t *kiov, size_t offset, size_t mlen,
+ size_t rlen)
{
gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
int status = PTL_OK;
- struct iovec *iovec = NULL, *iovec_dup = NULL;
- int i = 0;
+ char *ptr = NULL;
+ void *buffer = NULL;
- CDEBUG(D_TRACE, "gmnal_cb_recv_pages nal_cb [%p],private[%p],
- cookie[%p], kniov[%d], kiov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
- nal_cb, private, cookie, kniov, kiov, mlen, rlen);
+ CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
+ "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+ libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
if (srxd->type == GMNAL_SMALL_MESSAGE) {
- PORTAL_ALLOC(iovec, sizeof(struct iovec)*kniov);
- if (!iovec) {
- CDEBUG(D_ERROR, "Can't malloc\n");
- return(GMNAL_STATUS_FAIL);
- }
- iovec_dup = iovec;
+ buffer = srxd->buffer;
+ buffer += sizeof(gmnal_msghdr_t);
+ buffer += sizeof(ptl_hdr_t);
/*
* map each page and create an iovec for it
*/
- for (i=0; i<kniov; i++) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
- iovec->iov_len = kiov->kiov_len;
- CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
-
- iovec->iov_base = kmap(kiov->kiov_page) +
- kiov->kiov_offset;
-
- CDEBUG(D_INFO, "iov_base is [%p]\n", iovec->iov_base);
- iovec++;
+ while (kniov--) {
+ /* HP SFS 1380: Proactively change receives to avoid a
+ * receive side occurrence of filling pkmap_count[].
+ */
+ CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
+ kniov, kiov);
+
+ if (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ } else {
+ CDEBUG(D_INFO, "kniov page [%p] len [%d] "
+ "offset[%d]\n", kiov->kiov_page,
+ kiov->kiov_len, kiov->kiov_offset);
+ CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
+ ptr = ((char *)kmap(kiov->kiov_page)) +
+ kiov->kiov_offset;
+
+ if (offset > 0) {
+ CDEBUG(D_INFO, "processing [%p] base "
+ "[%p] len %d, offset %d, len ["
+ LPSZ"]\n", ptr, ptr + offset,
+ kiov->kiov_len, offset,
+ kiov->kiov_len - offset);
+ gm_bcopy(buffer, ptr + offset,
+ kiov->kiov_len - offset);
+ buffer += kiov->kiov_len - offset;
+ offset = 0;
+ } else {
+ CDEBUG(D_INFO, "processing [%p] len ["
+ LPSZ"]\n", ptr, kiov->kiov_len);
+ gm_bcopy(buffer, ptr, kiov->kiov_len);
+ buffer += kiov->kiov_len;
+ }
+ kunmap(kiov->kiov_page);
+ CDEBUG(D_INFO, "Stored in [%p]\n", ptr);
+ }
kiov++;
}
CDEBUG(D_INFO, "calling gmnal_small_rx\n");
- status = gmnal_small_rx(nal_cb, private, cookie, kniov,
- iovec_dup, mlen, rlen);
- PORTAL_FREE(iovec_dup, sizeof(struct iovec)*kniov);
+ status = gmnal_small_rx(libnal, private, cookie);
}
-
CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
return(status);
}
-int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov, size_t len)
+ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
+ ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+ unsigned int niov, struct iovec *iov, size_t offset,
+ size_t len)
{
gmnal_data_t *nal_data;
+ void *buffer = NULL;
+ gmnal_stxd_t *stxd = NULL;
- CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] len["LPSZ"] nid["LPU64"]\n",
- niov, len, nid);
- nal_data = nal_cb->nal_data;
-
+ CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
+ "] nid["LPU64"]\n", niov, offset, len, nid);
+ nal_data = libnal->libnal_data;
+ if (!nal_data) {
+ CDEBUG(D_ERROR, "no nal_data\n");
+ return(PTL_FAIL);
+ } else {
+ CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
+ }
+
if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
CDEBUG(D_INFO, "This is a small message send\n");
- gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, pid,
- niov, iov, len);
+ /*
+ * HP SFS 1380: With the change to gmnal_small_tx, need to get
+ * the stxd and do relevant setup here
+ */
+ stxd = gmnal_get_stxd(nal_data, 1);
+ CDEBUG(D_INFO, "stxd [%p]\n", stxd);
+ /* Set the offset of the data to copy into the buffer */
+ buffer = stxd->buffer +sizeof(gmnal_msghdr_t)+sizeof(ptl_hdr_t);
+ while(niov--) {
+ if (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ } else if (offset > 0) {
+ CDEBUG(D_INFO, "processing iov [%p] base [%p] "
+ "len ["LPSZ"] to [%p]\n",
+ iov, iov->iov_base + offset,
+ iov->iov_len - offset, buffer);
+ gm_bcopy(iov->iov_base + offset, buffer,
+ iov->iov_len - offset);
+ buffer+= iov->iov_len - offset;
+ offset = 0;
+ } else {
+ CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ
+ "] to [%p]\n", iov, iov->iov_len,buffer);
+ gm_bcopy(iov->iov_base, buffer, iov->iov_len);
+ buffer+= iov->iov_len;
+ }
+ iov++;
+ }
+ gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
+ stxd, len);
} else {
- CDEBUG(D_ERROR, "Large message send it is not supported\n");
- lib_finalize(nal_cb, private, cookie);
+ CDEBUG(D_ERROR, "Large message send is not supported\n");
+ lib_finalize(libnal, private, cookie, PTL_FAIL);
return(PTL_FAIL);
- gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid,
- niov, iov, len);
+ gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
+ niov, iov, offset, len);
}
return(PTL_OK);
}
-int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov, ptl_kiov_t *kiov, size_t len)
+ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
+ lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
+ ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
+ ptl_kiov_t *kiov, size_t offset, size_t len)
{
- int i = 0;
gmnal_data_t *nal_data;
- struct iovec *iovec = NULL, *iovec_dup = NULL;
+ char *ptr;
+ void *buffer = NULL;
+ gmnal_stxd_t *stxd = NULL;
+ ptl_err_t status = PTL_OK;
+
+ CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
+ LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
+ nal_data = libnal->libnal_data;
+ if (!nal_data) {
+ CDEBUG(D_ERROR, "no nal_data\n");
+ return(PTL_FAIL);
+ } else {
+ CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
+ }
+
+ /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
+ * more aggressively. This is the fix for a livelock situation under
+ * load on ia32 that occurs when there are no more available entries in
+ * the pkmap_count array. Just fill the buffer and let gmnal_small_tx
+ * put the headers in after we pass it the stxd pointer.
+ */
+ stxd = gmnal_get_stxd(nal_data, 1);
+ CDEBUG(D_INFO, "stxd [%p]\n", stxd);
+ /* Set the offset of the data to copy into the buffer */
+ buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
- CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] len["LPSZ"]\n", nid, kniov, len);
- nal_data = nal_cb->nal_data;
- PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
- iovec_dup = iovec;
if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
CDEBUG(D_INFO, "This is a small message send\n");
-
- for (i=0; i<kniov; i++) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
- iovec->iov_base = kmap(kiov->kiov_page)
- + kiov->kiov_offset;
-
- iovec->iov_len = kiov->kiov_len;
- iovec++;
+ while(kniov--) {
+ CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
+ if (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ } else {
+ CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
+ kiov->kiov_page, kiov->kiov_len,
+ kiov->kiov_offset);
+
+ ptr = ((char *)kmap(kiov->kiov_page)) +
+ kiov->kiov_offset;
+
+ if (offset > 0) {
+ CDEBUG(D_INFO, "processing [%p] base "
+ "[%p] len ["LPSZ"] to [%p]\n",
+ ptr, ptr + offset,
+ kiov->kiov_len - offset, buffer);
+ gm_bcopy(ptr + offset, buffer,
+ kiov->kiov_len - offset);
+ buffer+= kiov->kiov_len - offset;
+ offset = 0;
+ } else {
+ CDEBUG(D_INFO, "processing kmapped [%p]"
+ " len ["LPSZ"] to [%p]\n",
+ ptr, kiov->kiov_len, buffer);
+ gm_bcopy(ptr, buffer, kiov->kiov_len);
+
+ buffer += kiov->kiov_len;
+ }
+ kunmap(kiov->kiov_page);
+ }
kiov++;
}
- gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid,
- pid, kniov, iovec_dup, len);
+ status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
+ pid, stxd, len);
} else {
+ int i = 0;
+ struct iovec *iovec = NULL, *iovec_dup = NULL;
+ ptl_kiov_t *kiov_dup = kiov;
+
+ PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
+ iovec_dup = iovec;
CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
+ PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
return(PTL_FAIL);
for (i=0; i<kniov; i++) {
CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
iovec++;
kiov++;
}
- gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid,
- pid, kniov, iovec, len);
- }
- PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
- return(PTL_OK);
-}
-
-int gmnal_cb_read(nal_cb_t *nal_cb, void *private, void *dst,
- user_ptr src, size_t len)
-{
- gm_bcopy(src, dst, len);
- return(PTL_OK);
-}
-
-int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst,
- void *src, size_t len)
-{
- gm_bcopy(src, dst, len);
- return(PTL_OK);
-}
-
-int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq,
- ptl_event_t *ev)
-{
-
- if (eq->event_callback != NULL) {
- CDEBUG(D_INFO, "found callback\n");
- eq->event_callback(ev);
+ gmnal_large_tx(libnal, private, cookie, hdr, type, nid,
+ pid, kniov, iovec, offset, len);
+ for (i=0; i<kniov; i++) {
+ kunmap(kiov_dup->kiov_page);
+ kiov_dup++;
+ }
+ PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
}
-
- return(PTL_OK);
-}
-
-void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len)
-{
- void *ptr = NULL;
- CDEBUG(D_TRACE, "gmnal_cb_malloc len["LPSZ"]\n", len);
- PORTAL_ALLOC(ptr, len);
- return(ptr);
-}
-
-void gmnal_cb_free(nal_cb_t *nal_cb, void *buf, size_t len)
-{
- CDEBUG(D_TRACE, "gmnal_cb_free :: buf[%p] len["LPSZ"]\n", buf, len);
- PORTAL_FREE(buf, len);
- return;
-}
-
-void gmnal_cb_unmap(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov,
- void **addrkey)
-{
- return;
-}
-
-int gmnal_cb_map(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov,
- void**addrkey)
-{
- return(PTL_OK);
-}
-
-void gmnal_cb_printf(nal_cb_t *nal_cb, const char *fmt, ...)
-{
- CDEBUG(D_TRACE, "gmnal_cb_printf\n");
- printk(fmt);
- return;
-}
-
-void gmnal_cb_cli(nal_cb_t *nal_cb, unsigned long *flags)
-{
- gmnal_data_t *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
- spin_lock_irqsave(&nal_data->cb_lock, *flags);
- return;
-}
-
-void gmnal_cb_sti(nal_cb_t *nal_cb, unsigned long *flags)
-{
- gmnal_data_t *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
- spin_unlock_irqrestore(&nal_data->cb_lock, *flags);
- return;
+ return(status);
}
-int gmnal_cb_dist(nal_cb_t *nal_cb, ptl_nid_t nid, unsigned long *dist)
+int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
{
CDEBUG(D_TRACE, "gmnal_cb_dist\n");
if (dist)