/* forward ref */
static void lib_commit_md (lib_nal_t *nal, lib_md_t *md, lib_msg_t *msg);
+static ptl_err_t do_lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr,
+ void *private, int loopback);
static lib_md_t *
lib_match_md(lib_nal_t *nal, int index, int op_mask,
return PTL_OK;
}
+int
+lib_api_loopback (nal_t *apinal, int set, int *enabled)
+{
+ lib_nal_t *nal = apinal->nal_data;
+
+ if (set)
+ nal->libnal_ni.ni_loopback = *enabled;
+ else
+ *enabled = nal->libnal_ni.ni_loopback;
+
+ return PTL_OK;
+}
+
static int
fail_peer (lib_nal_t *nal, ptl_nid_t nid, int outgoing)
{
#endif
ptl_err_t
+lib_lo_rxiov(lib_nal_t *nal,
+ void *private,
+ lib_msg_t *libmsg,
+ unsigned int niov,
+ struct iovec *iov,
+ size_t offset,
+ size_t mlen,
+ size_t rlen)
+{
+ lo_desc_t *lod = (lo_desc_t *)private;
+
+ /* I only handle mapped->mapped matches */
+ LASSERT(lod->lod_type == LOD_IOV);
+ LASSERT(mlen > 0);
+
+ while (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ iov++;
+ niov--;
+ LASSERT(niov > 0);
+ }
+
+ while (lod->lod_offset >= lod->lod_iov.iov->iov_len) {
+ lod->lod_offset -= lod->lod_iov.iov->iov_len;
+ lod->lod_iov.iov++;
+ lod->lod_niov--;
+ LASSERT(lod->lod_niov > 0);
+ }
+
+ do {
+ int fraglen = MIN(iov->iov_len - offset,
+ lod->lod_iov.iov->iov_len - lod->lod_offset);
+
+ LASSERT(niov > 0);
+ LASSERT(lod->lod_niov > 0);
+
+ if (fraglen > mlen)
+ fraglen = mlen;
+
+ memcpy((void *)((unsigned long)iov->iov_base + offset),
+ (void *)((unsigned long)lod->lod_iov.iov->iov_base +
+ lod->lod_offset),
+ fraglen);
+
+ if (offset + fraglen < iov->iov_len) {
+ offset += fraglen;
+ } else {
+ offset = 0;
+ iov++;
+ niov--;
+ }
+
+ if (lod->lod_offset + fraglen < lod->lod_iov.iov->iov_len ) {
+ lod->lod_offset += fraglen;
+ } else {
+ lod->lod_offset = 0;
+ lod->lod_iov.iov++;
+ lod->lod_niov--;
+ }
+
+ mlen -= fraglen;
+ } while (mlen > 0);
+
+ lib_finalize(nal, private, libmsg, PTL_OK);
+ return PTL_OK;
+}
+
+ptl_err_t
+lib_lo_rxkiov(lib_nal_t *nal,
+ void *private,
+ lib_msg_t *libmsg,
+ unsigned int niov,
+ ptl_kiov_t *kiov,
+ size_t offset,
+ size_t mlen,
+ size_t rlen)
+{
+ void *srcaddr = NULL;
+ void *dstaddr = NULL;
+ unsigned long srcfrag = 0;
+ unsigned long dstfrag = 0;
+ unsigned long fraglen;
+ lo_desc_t *lod = (lo_desc_t *)private;
+
+ /* I only handle unmapped->unmapped matches */
+ LASSERT(lod->lod_type == LOD_KIOV);
+
+ if (mlen == 0)
+ return PTL_OK;
+
+ while (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ kiov++;
+ niov--;
+ LASSERT(niov > 0);
+ }
+
+ while (lod->lod_offset >= lod->lod_iov.kiov->kiov_len) {
+ lod->lod_offset -= lod->lod_iov.kiov->kiov_len;
+ lod->lod_iov.kiov++;
+ lod->lod_niov--;
+ LASSERT(lod->lod_niov > 0);
+ }
+
+ do {
+ /* CAVEAT EMPTOR: I kmap 2 pages at once == slight risk of deadlock */
+ LASSERT(niov > 0);
+ if (dstaddr == NULL) {
+ dstaddr = (void *)((unsigned long)kmap(kiov->kiov_page) +
+ kiov->kiov_offset + offset);
+ dstfrag = kiov->kiov_len - offset;
+ }
+
+ LASSERT(lod->lod_niov > 0);
+ if (srcaddr == NULL) {
+ srcaddr = (void *)((unsigned long)kmap(lod->lod_iov.kiov->kiov_page) +
+ lod->lod_iov.kiov->kiov_offset + lod->lod_offset);
+ srcfrag = lod->lod_iov.kiov->kiov_len - lod->lod_offset;
+ }
+
+ fraglen = MIN(srcfrag, dstfrag);
+ if (fraglen > mlen)
+ fraglen = mlen;
+
+ memcpy(dstaddr, srcaddr, fraglen);
+
+ if (fraglen < dstfrag) {
+ dstfrag -= fraglen;
+ dstaddr = (void *)((unsigned long)dstaddr + fraglen);
+ } else {
+ kunmap(kiov->kiov_page);
+ dstaddr = NULL;
+ offset = 0;
+ kiov++;
+ niov--;
+ }
+
+ if (fraglen < srcfrag) {
+ srcfrag -= fraglen;
+ srcaddr = (void *)((unsigned long)srcaddr + fraglen);
+ } else {
+ kunmap(lod->lod_iov.kiov->kiov_page);
+ srcaddr = NULL;
+ lod->lod_offset = 0;
+ lod->lod_iov.kiov++;
+ lod->lod_niov--;
+ }
+
+ mlen -= fraglen;
+ } while (mlen > 0);
+
+ if (dstaddr != NULL)
+ kunmap(kiov->kiov_page);
+
+ if (srcaddr != NULL)
+ kunmap(lod->lod_iov.kiov->kiov_page);
+
+ lib_finalize(nal, private, libmsg, PTL_OK);
+ return PTL_OK;
+}
+
+ptl_err_t
+lib_lo_txiov (lib_nal_t *nal,
+ void *private,
+ lib_msg_t *libmsg,
+ ptl_hdr_t *hdr,
+ int type,
+ ptl_nid_t nid,
+ ptl_pid_t pid,
+ unsigned int payload_niov,
+ struct iovec *payload_iov,
+ size_t payload_offset,
+ size_t payload_nob)
+{
+ lo_desc_t lod = {
+ .lod_type = LOD_IOV,
+ .lod_niov = payload_niov,
+ .lod_offset = payload_offset,
+ .lod_nob = payload_nob,
+ .lod_iov = { .iov = payload_iov } };
+ ptl_err_t rc;
+
+ rc = do_lib_parse(nal, hdr, &lod, 1);
+ if (rc == PTL_OK)
+ lib_finalize(nal, private, libmsg, PTL_OK);
+
+ return rc;
+}
+
+ptl_err_t
+lib_lo_txkiov (lib_nal_t *nal,
+ void *private,
+ lib_msg_t *libmsg,
+ ptl_hdr_t *hdr,
+ int type,
+ ptl_nid_t nid,
+ ptl_pid_t pid,
+ unsigned int payload_niov,
+ ptl_kiov_t *payload_kiov,
+ size_t payload_offset,
+ size_t payload_nob)
+{
+ lo_desc_t lod = {
+ .lod_type = LOD_KIOV,
+ .lod_niov = payload_niov,
+ .lod_offset = payload_offset,
+ .lod_nob = payload_nob,
+ .lod_iov = { .kiov = payload_kiov } };
+ ptl_err_t rc;
+
+ rc = do_lib_parse(nal, hdr, &lod, 1);
+ if (rc == PTL_OK)
+ lib_finalize(nal, private, libmsg, PTL_OK);
+
+ return rc;
+}
+
+ptl_err_t
+lib_lo_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
+ ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
+{
+ if (mlen == 0) {
+ lib_finalize(nal, private, msg, PTL_OK);
+ return PTL_OK;
+ }
+
+ if ((md->options & PTL_MD_KIOV) == 0)
+ return lib_lo_rxiov(nal, private, msg,
+ md->md_niov, md->md_iov.iov,
+ offset, mlen, rlen);
+
+ return lib_lo_rxkiov(nal, private, msg,
+ md->md_niov, md->md_iov.kiov,
+ offset, mlen, rlen);
+}
+
+ptl_err_t
lib_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
{
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
lib_md_t *md, ptl_size_t offset, ptl_size_t len)
{
- if (len == 0)
- return (nal->libnal_send(nal, private, msg,
- hdr, type, nid, pid,
- 0, NULL,
- offset, len));
-
- if ((md->options & PTL_MD_KIOV) == 0)
- return (nal->libnal_send(nal, private, msg,
- hdr, type, nid, pid,
- md->md_niov, md->md_iov.iov,
- offset, len));
-
- return (nal->libnal_send_pages(nal, private, msg,
- hdr, type, nid, pid,
- md->md_niov, md->md_iov.kiov,
- offset, len));
+ int loopback = (nal->libnal_ni.ni_loopback &&
+ (nid == nal->libnal_ni.ni_pid.nid));
+
+ if (len == 0) {
+ if (loopback)
+ return lib_lo_txiov(nal, private, msg,
+ hdr, type, nid, pid,
+ 0, NULL,
+ offset, len);
+ else
+ return nal->libnal_send(nal, private, msg,
+ hdr, type, nid, pid,
+ 0, NULL,
+ offset, len);
+ }
+
+ if ((md->options & PTL_MD_KIOV) == 0) {
+ if (loopback)
+ return lib_lo_txiov(nal, private, msg,
+ hdr, type, nid, pid,
+ md->md_niov, md->md_iov.iov,
+ offset, len);
+ else
+ return nal->libnal_send(nal, private, msg,
+ hdr, type, nid, pid,
+ md->md_niov, md->md_iov.iov,
+ offset, len);
+ }
+
+ if (loopback)
+ return lib_lo_txkiov(nal, private, msg,
+ hdr, type, nid, pid,
+ md->md_niov, md->md_iov.kiov,
+ offset, len);
+ else
+ return nal->libnal_send_pages(nal, private, msg,
+ hdr, type, nid, pid,
+ md->md_niov, md->md_iov.kiov,
+ offset, len);
}
static void
}
static void
-lib_drop_message (lib_nal_t *nal, void *private, ptl_hdr_t *hdr)
+lib_drop_message (lib_nal_t *nal, void *private, ptl_hdr_t *hdr, int loopback)
{
unsigned long flags;
LIB_UNLOCK(nal, flags);
/* NULL msg => if NAL calls lib_finalize it will be a noop */
- (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
+ if (!loopback)
+ (void) lib_recv(nal, private, NULL, NULL, 0, 0,
+ hdr->payload_length);
}
/*
*
*/
static ptl_err_t
-parse_put(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
+parse_put(lib_nal_t *nal, ptl_hdr_t *hdr, void *private,
+ lib_msg_t *msg, int loopback)
{
lib_ni_t *ni = &nal->libnal_ni;
ptl_size_t mlength = 0;
LIB_UNLOCK(nal, flags);
- rc = lib_recv(nal, private, msg, md, offset, mlength,
- hdr->payload_length);
+ if (loopback)
+ rc = lib_lo_recv(nal, private, msg, md, offset, mlength,
+ hdr->payload_length);
+ else
+ rc = lib_recv(nal, private, msg, md, offset, mlength,
+ hdr->payload_length);
+
if (rc != PTL_OK)
CERROR(LPU64": error on receiving PUT from "LPU64": %d\n",
ni->ni_pid.nid, hdr->src_nid, rc);
}
static ptl_err_t
-parse_get(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
+parse_get(lib_nal_t *nal, ptl_hdr_t *hdr, void *private,
+ lib_msg_t *msg, int loopback)
{
lib_ni_t *ni = &nal->libnal_ni;
ptl_size_t mlength = 0;
ni->ni_pid.nid, hdr->src_nid, rc);
/* Discard any junk after the hdr */
- (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
+ if (!loopback)
+ (void) lib_recv(nal, private, NULL, NULL, 0, 0,
+ hdr->payload_length);
return (rc);
}
static ptl_err_t
-parse_reply(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
+parse_reply(lib_nal_t *nal, ptl_hdr_t *hdr, void *private,
+ lib_msg_t *msg, int loopback)
{
lib_ni_t *ni = &nal->libnal_ni;
lib_md_t *md;
LIB_UNLOCK(nal, flags);
- rc = lib_recv(nal, private, msg, md, 0, length, rlength);
+ if (loopback)
+ rc = lib_lo_recv(nal, private, msg, md, 0, length, rlength);
+ else
+ rc = lib_recv(nal, private, msg, md, 0, length, rlength);
+
if (rc != PTL_OK)
CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n",
ni->ni_pid.nid, hdr->src_nid, rc);
}
static ptl_err_t
-parse_ack(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
+parse_ack(lib_nal_t *nal, ptl_hdr_t *hdr, void *private,
+ lib_msg_t *msg, int loopback)
{
lib_ni_t *ni = &nal->libnal_ni;
lib_md_t *md;
lib_finalize(nal, private, msg, PTL_OK);
/* ...and now discard any junk after the hdr */
- (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
+ if (!loopback)
+ (void) lib_recv(nal, private, NULL, NULL, 0, 0,
+ hdr->payload_length);
return (PTL_OK);
}
ptl_err_t
lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private)
{
+ return do_lib_parse(nal, hdr, private, 0);
+}
+
+ptl_err_t
+do_lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, int loopback)
+{
unsigned long flags;
ptl_err_t rc;
lib_msg_t *msg;
hdr->src_nid);
/* it's good but we don't want it */
- lib_drop_message(nal, private, hdr);
+ lib_drop_message(nal, private, hdr, loopback);
return PTL_OK;
}
": simulated failure\n",
nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr),
hdr->src_nid);
- lib_drop_message(nal, private, hdr);
+ lib_drop_message(nal, private, hdr, loopback);
return PTL_OK;
}
": can't allocate a lib_msg_t\n",
nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr),
hdr->src_nid);
- lib_drop_message(nal, private, hdr);
+ lib_drop_message(nal, private, hdr, loopback);
return PTL_OK;
}
switch (hdr->type) {
case PTL_MSG_ACK:
- rc = parse_ack(nal, hdr, private, msg);
+ rc = parse_ack(nal, hdr, private, msg, loopback);
break;
case PTL_MSG_PUT:
- rc = parse_put(nal, hdr, private, msg);
+ rc = parse_put(nal, hdr, private, msg, loopback);
break;
case PTL_MSG_GET:
- rc = parse_get(nal, hdr, private, msg);
+ rc = parse_get(nal, hdr, private, msg, loopback);
break;
case PTL_MSG_REPLY:
- rc = parse_reply(nal, hdr, private, msg);
+ rc = parse_reply(nal, hdr, private, msg, loopback);
break;
default:
LASSERT(0);
lib_msg_free(nal, msg); /* expects LIB_LOCK held */
LIB_UNLOCK(nal, flags);
- lib_drop_message(nal, private, hdr);
+ lib_drop_message(nal, private, hdr, loopback);
}
}