#define DEBUG_SUBSYSTEM S_PORTALS
-#if 0
-#ifndef __KERNEL__
-# include <stdio.h>
-#else
-# include <libcfs/kp30.h>
-#endif
-#endif
#include <portals/lib-p30.h>
+static int allow_destination_aliases = 0;
+CFS_MODULE_PARM(allow_destination_aliases, "i", int, 0644,
+ "Boolean: don't require strict destination NIDs");
+
/* forward ref */
static void ptl_commit_md (ptl_libmd_t *md, ptl_msg_t *msg);
-static ptl_err_t do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr,
- void *private, int loopback);
static ptl_libmd_t *
ptl_match_md(int index, int op_mask, ptl_process_id_t src,
LASSERT (0);
}
-ptl_err_t
-ptl_lo_rxkiov(ptl_ni_t *ni, void *private, ptl_msg_t *libmsg,
- unsigned int niov, ptl_kiov_t *kiov,
- size_t offset, size_t mlen, size_t rlen)
-{
- LASSERT (0);
-}
-
-ptl_err_t
-ptl_lo_txkiov (ptl_ni_t *ni, void *private, ptl_msg_t *libmsg,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int payload_niov, ptl_kiov_t *payload_kiov,
- size_t payload_offset, size_t payload_nob)
-{
- LASSERT (0);
-}
-
#else /* __KERNEL__ */
ptl_size_t
offset = 0;
}
}
-
-#ifndef __KERNEL__
-#if !defined(kmap)
-#define kmap(page) ((page)->addr)
-#endif
-#if !defined(kunmap)
-#define kunmap(page) do {} while(0)
-#endif
-#if !defined(page_address)
-#define page_address(page) ((page)->page_address)
-#endif
#endif
ptl_err_t
-ptl_lo_rxkiov(ptl_ni_t *ni,
- void *private,
- ptl_msg_t *libmsg,
- unsigned int niov,
- ptl_kiov_t *kiov,
- size_t offset,
- size_t mlen,
- size_t rlen)
-{
- void *srcaddr = NULL;
- void *dstaddr = NULL;
- unsigned long srcfrag = 0;
- unsigned long dstfrag = 0;
- unsigned long fraglen;
- lo_desc_t *lod = (lo_desc_t *)private;
-
- /* I only handle unmapped->unmapped matches */
- LASSERT(lod->lod_type == LOD_KIOV);
-
- if (mlen == 0)
- return PTL_OK;
-
- while (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- kiov++;
- niov--;
- LASSERT(niov > 0);
- }
-
- while (lod->lod_offset >= lod->lod_iov.kiov->kiov_len) {
- lod->lod_offset -= lod->lod_iov.kiov->kiov_len;
- lod->lod_iov.kiov++;
- lod->lod_niov--;
- LASSERT(lod->lod_niov > 0);
- }
-
- do {
- /* CAVEAT EMPTOR:
- * I kmap 2 pages at once == slight risk of deadlock */
- LASSERT(niov > 0);
- if (dstaddr == NULL) {
- dstaddr = (void *)
- ((unsigned long)cfs_kmap(kiov->kiov_page) +
- kiov->kiov_offset + offset);
- dstfrag = kiov->kiov_len - offset;
- }
-
- LASSERT(lod->lod_niov > 0);
- if (srcaddr == NULL) {
- srcaddr = (void *)
- ((unsigned long)cfs_kmap(lod->lod_iov.kiov->kiov_page)+
- lod->lod_iov.kiov->kiov_offset + lod->lod_offset);
- srcfrag = lod->lod_iov.kiov->kiov_len - lod->lod_offset;
- }
-
- fraglen = MIN(srcfrag, dstfrag);
- if (fraglen > mlen)
- fraglen = mlen;
-
- memcpy(dstaddr, srcaddr, fraglen);
-
- if (fraglen < dstfrag) {
- dstfrag -= fraglen;
- dstaddr = (void *)((unsigned long)dstaddr + fraglen);
- } else {
- cfs_kunmap(kiov->kiov_page);
- dstaddr = NULL;
- offset = 0;
- kiov++;
- niov--;
- }
-
- if (fraglen < srcfrag) {
- srcfrag -= fraglen;
- srcaddr = (void *)((unsigned long)srcaddr + fraglen);
- } else {
- cfs_kunmap(lod->lod_iov.kiov->kiov_page);
- srcaddr = NULL;
- lod->lod_offset = 0;
- lod->lod_iov.kiov++;
- lod->lod_niov--;
- }
-
- mlen -= fraglen;
- } while (mlen > 0);
-
- if (dstaddr != NULL)
- cfs_kunmap(kiov->kiov_page);
-
- if (srcaddr != NULL)
- cfs_kunmap(lod->lod_iov.kiov->kiov_page);
-
- ptl_finalize(ni, private, libmsg, PTL_OK);
- return PTL_OK;
-}
-
-ptl_err_t
-ptl_lo_txkiov (ptl_ni_t *ni,
- void *private,
- ptl_msg_t *libmsg,
- ptl_hdr_t *hdr,
- unsigned int payload_niov,
- ptl_kiov_t *payload_kiov,
- size_t payload_offset,
- size_t payload_nob)
-{
- lo_desc_t lod = {
- .lod_type = LOD_KIOV,
- .lod_niov = payload_niov,
- .lod_offset = payload_offset,
- .lod_nob = payload_nob,
- .lod_iov = { .kiov = payload_kiov } };
- ptl_err_t rc;
-
- rc = do_ptl_parse(ni, hdr, &lod, 1);
- if (rc == PTL_OK)
- ptl_finalize(ni, private, libmsg, PTL_OK);
-
- return rc;
-}
-#endif
-
-ptl_err_t
-ptl_lo_rxiov(ptl_ni_t *ni,
- void *private,
- ptl_msg_t *libmsg,
- unsigned int niov,
- struct iovec *iov,
- size_t offset,
- size_t mlen,
- size_t rlen)
-{
- lo_desc_t *lod = (lo_desc_t *)private;
-
- /* I only handle mapped->mapped matches */
- LASSERT(lod->lod_type == LOD_IOV);
- LASSERT(mlen > 0);
-
- while (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- iov++;
- niov--;
- LASSERT(niov > 0);
- }
-
- while (lod->lod_offset >= lod->lod_iov.iov->iov_len) {
- lod->lod_offset -= lod->lod_iov.iov->iov_len;
- lod->lod_iov.iov++;
- lod->lod_niov--;
- LASSERT(lod->lod_niov > 0);
- }
-
- do {
- int fraglen = MIN(iov->iov_len - offset,
- lod->lod_iov.iov->iov_len - lod->lod_offset);
-
- LASSERT(niov > 0);
- LASSERT(lod->lod_niov > 0);
-
- if (fraglen > mlen)
- fraglen = mlen;
-
- memcpy((void *)((unsigned long)iov->iov_base + offset),
- (void *)((unsigned long)lod->lod_iov.iov->iov_base +
- lod->lod_offset),
- fraglen);
-
- if (offset + fraglen < iov->iov_len) {
- offset += fraglen;
- } else {
- offset = 0;
- iov++;
- niov--;
- }
-
- if (lod->lod_offset + fraglen < lod->lod_iov.iov->iov_len ) {
- lod->lod_offset += fraglen;
- } else {
- lod->lod_offset = 0;
- lod->lod_iov.iov++;
- lod->lod_niov--;
- }
-
- mlen -= fraglen;
- } while (mlen > 0);
-
- ptl_finalize(ni, private, libmsg, PTL_OK);
- return PTL_OK;
-}
-
-ptl_err_t
-ptl_lo_txiov (ptl_ni_t *ni,
- void *private,
- ptl_msg_t *libmsg,
- ptl_hdr_t *hdr,
- unsigned int payload_niov,
- struct iovec *payload_iov,
- size_t payload_offset,
- size_t payload_nob)
-{
- lo_desc_t lod = {
- .lod_type = LOD_IOV,
- .lod_niov = payload_niov,
- .lod_offset = payload_offset,
- .lod_nob = payload_nob,
- .lod_iov = { .iov = payload_iov } };
- ptl_err_t rc;
-
- rc = do_ptl_parse(ni, hdr, &lod, 1);
- if (rc == PTL_OK)
- ptl_finalize(ni, private, libmsg, PTL_OK);
-
- return rc;
-}
-
-ptl_err_t
-ptl_lo_recv (ptl_ni_t *ni, void *private, ptl_msg_t *msg, ptl_libmd_t *md,
- ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
-{
- if (mlen == 0) {
- ptl_finalize(ni, private, msg, PTL_OK);
- return PTL_OK;
- }
-
- if ((md->md_options & PTL_MD_KIOV) == 0)
- return ptl_lo_rxiov(ni, private, msg,
- md->md_niov, md->md_iov.iov,
- offset, mlen, rlen);
-
- return ptl_lo_rxkiov(ni, private, msg,
- md->md_niov, md->md_iov.kiov,
- offset, mlen, rlen);
-}
-
-ptl_err_t
ptl_recv (ptl_ni_t *ni, void *private, ptl_msg_t *msg, ptl_libmd_t *md,
ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
{
{
ptl_nid_t gw_nid;
int routing;
- int loopback;
ptl_err_t rc;
/* CAVEAT EMPTOR! ni != NULL == interface pre-determined (ACK) */
return PTL_FAIL;
}
- routing = (gw_nid != ni->ni_nid); /* gateway will forward */
- loopback = (target.nid == ni->ni_nid); /* it's for me! */
-
- if (routing && loopback) { /* very strange */
- CERROR("Inconsistent route table: target %s gw %s ni %s\n",
- libcfs_id2str(target), libcfs_nid2str(gw_nid),
- libcfs_nid2str(ni->ni_nid));
- rc = PTL_FAIL;
- goto out;
+ if (target.nid != ni->ni_nid) {
+ /* will gateway have to forward? */
+ routing = (gw_nid != ni->ni_nid);
+ } else {
+ /* it's for me! */
+ ptl_ni_addref(&ptl_loni);
+ ptl_ni_decref(ni);
+ ni = &ptl_loni;
+ routing = 0;
}
hdr->type = cpu_to_le32(type);
if (routing)
target.nid = gw_nid;
- if (len == 0) {
- if (loopback)
- rc = ptl_lo_txiov(ni, private, msg, hdr,
- 0, NULL, offset, len);
- else
- rc = (ni->ni_nal->nal_send)(ni, private, msg, hdr,
- type, target, routing,
- 0, NULL, offset, len);
- } else if ((md->md_options & PTL_MD_KIOV) == 0) {
- if (loopback)
- rc = ptl_lo_txiov(ni, private, msg, hdr,
- md->md_niov, md->md_iov.iov,
- offset, len);
- else
- rc = (ni->ni_nal->nal_send)
- (ni, private, msg, hdr,
- type, target, routing,
- md->md_niov, md->md_iov.iov,
- offset, len);
- } else {
- if (loopback)
- rc = ptl_lo_txkiov(ni, private, msg, hdr,
- md->md_niov, md->md_iov.kiov,
+ if (len == 0)
+ rc = (ni->ni_nal->nal_send)(ni, private, msg, hdr,
+ type, target, routing,
+ 0, NULL, offset, len);
+ else if ((md->md_options & PTL_MD_KIOV) == 0)
+ rc = (ni->ni_nal->nal_send)(ni, private, msg, hdr,
+ type, target, routing,
+ md->md_niov, md->md_iov.iov,
offset, len);
- else
- rc = (ni->ni_nal->nal_send_pages)
- (ni, private, msg, hdr,
- type, target, routing,
- md->md_niov, md->md_iov.kiov,
- offset, len);
- }
-
+ else
+ rc = (ni->ni_nal->nal_send_pages)(ni, private, msg, hdr,
+ type, target, routing,
+ md->md_niov, md->md_iov.kiov,
+ offset, len);
out:
ptl_ni_decref(ni); /* lose ref from kpr_lookup */
return rc;
}
static void
-ptl_drop_message (ptl_ni_t *ni, void *private, ptl_hdr_t *hdr, int loopback)
+ptl_drop_message (ptl_ni_t *ni, void *private, ptl_hdr_t *hdr)
{
unsigned long flags;
PTL_UNLOCK(flags);
/* NULL msg => if NAL calls ptl_finalize it will be a noop */
- if (!loopback)
- (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
- hdr->payload_length);
+ (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
+ hdr->payload_length);
}
/*
*
*/
static ptl_err_t
-ptl_parse_put(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
- ptl_msg_t *msg, int loopback)
+ptl_parse_put(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, ptl_msg_t *msg)
{
ptl_size_t mlength = 0;
ptl_size_t offset = 0;
PTL_UNLOCK(flags);
- if (loopback)
- rc = ptl_lo_recv(ni, private, msg, md, offset, mlength,
- hdr->payload_length);
- else
- rc = ptl_recv(ni, private, msg, md, offset, mlength,
- hdr->payload_length);
+ rc = ptl_recv(ni, private, msg, md, offset, mlength,
+ hdr->payload_length);
if (rc != PTL_OK)
CERROR("%s: error on receiving PUT from %s: %d\n",
}
static ptl_err_t
-ptl_parse_get(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
- ptl_msg_t *msg, int loopback)
+ptl_parse_get(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, ptl_msg_t *msg)
{
ptl_size_t mlength = 0;
ptl_size_t offset = 0;
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
/* Discard any junk after the hdr */
- if (!loopback)
- (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
- hdr->payload_length);
-
+ (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
+ hdr->payload_length);
return (rc);
}
static ptl_err_t
-ptl_parse_reply(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
- ptl_msg_t *msg, int loopback)
+ptl_parse_reply(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, ptl_msg_t *msg)
{
ptl_process_id_t src = {.nid = hdr->src_nid,
.pid = hdr->src_pid};
PTL_UNLOCK(flags);
- if (loopback)
- rc = ptl_lo_recv(ni, private, msg, md, 0, length, rlength);
- else
- rc = ptl_recv(ni, private, msg, md, 0, length, rlength);
-
+ rc = ptl_recv(ni, private, msg, md, 0, length, rlength);
if (rc != PTL_OK)
CERROR("%s: error on receiving REPLY from %s: %d\n",
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
}
static ptl_err_t
-ptl_parse_ack(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
- ptl_msg_t *msg, int loopback)
+ptl_parse_ack(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, ptl_msg_t *msg)
{
ptl_process_id_t src = {.nid = hdr->src_nid,
.pid = hdr->src_pid};
ptl_finalize(ni, private, msg, PTL_OK);
/* ...and now discard any junk after the hdr */
- if (!loopback)
- (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
- hdr->payload_length);
+ (void) ptl_recv(ni, private, NULL, NULL, 0, 0, hdr->payload_length);
return (PTL_OK);
}
ptl_err_t
ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private)
{
- return do_ptl_parse(ni, hdr, private, 0);
-}
-
-ptl_err_t
-do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, int loopback)
-{
unsigned long flags;
ptl_err_t rc;
ptl_msg_t *msg;
+ ptl_nid_t dest_nid;
+ __u32 type = le32_to_cpu(hdr->type);
/* NB we return PTL_OK if we manage to parse the header and believe
* it looks OK. Anything that goes wrong with receiving the
- * message after that point is the responsibility of the NAL */
-
- /* convert common fields to host byte order */
- hdr->type = le32_to_cpu(hdr->type);
- hdr->src_nid = le64_to_cpu(hdr->src_nid);
- hdr->src_pid = le32_to_cpu(hdr->src_pid);
- hdr->dest_pid = le32_to_cpu(hdr->dest_pid);
- hdr->payload_length = le32_to_cpu(hdr->payload_length);
+ * message after that point is the responsibility of the NAL.
+ * If we don't think the packet is for us, return PTL_IFACE_DUP */
- switch (hdr->type) {
- case PTL_MSG_HELLO: {
+ if (type == PTL_MSG_HELLO) {
/* dest_nid is really ptl_magicversion_t */
ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
libcfs_nid2str(hdr->src_nid));
/* it's good but we don't want it */
- ptl_drop_message(ni, private, hdr, loopback);
+ ptl_drop_message(ni, private, hdr);
return PTL_OK;
}
return PTL_FAIL;
}
+ dest_nid = le64_to_cpu(hdr->dest_nid);
+ if (dest_nid != ni->ni_nid) {
+
+ if (!ptl_islocalnid(dest_nid))
+ return PTL_IFACE_DUP;
+
+ /* dest_nid is one of my NIs */
+
+ if (!allow_destination_aliases) {
+ /* dest is another local NI; sender should have used
+ * this node's NID on its own network */
+ CERROR ("%s: Dropping message from %s: nid %s is a local alias\n",
+ libcfs_nid2str(ni->ni_nid),
+ libcfs_nid2str(le64_to_cpu(hdr->src_nid)),
+ libcfs_nid2str(dest_nid));
+ return PTL_FAIL;
+ }
+ }
+
+ /* convert common fields to host byte order */
+ hdr->type = type;
+ hdr->src_nid = le64_to_cpu(hdr->src_nid);
+ hdr->src_pid = le32_to_cpu(hdr->src_pid);
+ hdr->dest_nid = dest_nid;
+ hdr->dest_pid = le32_to_cpu(hdr->dest_pid);
+ hdr->payload_length = le32_to_cpu(hdr->payload_length);
+
+ switch (type) {
case PTL_MSG_ACK:
case PTL_MSG_PUT:
case PTL_MSG_GET:
case PTL_MSG_REPLY:
- hdr->dest_nid = le64_to_cpu(hdr->dest_nid);
- if (hdr->dest_nid != ni->ni_nid) {
- CERROR("%s: BAD dest NID in %s message from %s to %s"
- "(not me)\n", libcfs_nid2str(ni->ni_nid),
- hdr_type_string(hdr),
- libcfs_nid2str(hdr->src_nid),
- libcfs_nid2str(hdr->dest_nid));
- return PTL_FAIL;
- }
break;
default:
CERROR("%s: Bad message type 0x%x from %s\n",
libcfs_nid2str(ni->ni_nid), hdr->type,
libcfs_nid2str(hdr->src_nid));
+
return PTL_FAIL;
}
CERROR("%s: Dropping incoming %s from %s: simulated failure\n",
libcfs_nid2str(ni->ni_nid), hdr_type_string (hdr),
libcfs_nid2str(hdr->src_nid));
- ptl_drop_message(ni, private, hdr, loopback);
+ ptl_drop_message(ni, private, hdr);
return PTL_OK;
}
"can't allocate a ptl_msg_t\n",
libcfs_nid2str(ni->ni_nid), hdr_type_string (hdr),
libcfs_nid2str(hdr->src_nid));
- ptl_drop_message(ni, private, hdr, loopback);
+ ptl_drop_message(ni, private, hdr);
return PTL_OK;
}
switch (hdr->type) {
case PTL_MSG_ACK:
- rc = ptl_parse_ack(ni, hdr, private, msg, loopback);
+ rc = ptl_parse_ack(ni, hdr, private, msg);
break;
case PTL_MSG_PUT:
- rc = ptl_parse_put(ni, hdr, private, msg, loopback);
+ rc = ptl_parse_put(ni, hdr, private, msg);
break;
case PTL_MSG_GET:
- rc = ptl_parse_get(ni, hdr, private, msg, loopback);
+ rc = ptl_parse_get(ni, hdr, private, msg);
break;
case PTL_MSG_REPLY:
- rc = ptl_parse_reply(ni, hdr, private, msg, loopback);
+ rc = ptl_parse_reply(ni, hdr, private, msg);
break;
default:
LASSERT(0);
ptl_msg_free(msg); /* expects PTL_LOCK held */
PTL_UNLOCK(flags);
- ptl_drop_message(ni, private, hdr, loopback);
+ ptl_drop_message(ni, private, hdr);
}
}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2004 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <portals/lib-p30.h>
+
+ptl_err_t
+lonal_send (ptl_ni_t *ni,
+ void *private,
+ ptl_msg_t *ptlmsg,
+ ptl_hdr_t *hdr,
+ int type,
+ ptl_process_id_t target,
+ int routing,
+ unsigned int payload_niov,
+ struct iovec *payload_iov,
+ size_t payload_offset,
+ size_t payload_nob)
+{
+ lo_desc_t lod = {
+ .lod_type = LOD_IOV,
+ .lod_niov = payload_niov,
+ .lod_offset = payload_offset,
+ .lod_nob = payload_nob,
+ .lod_iov = { .iov = payload_iov } };
+ ptl_err_t rc;
+
+ LASSERT (!routing);
+
+ rc = ptl_parse(ni, hdr, &lod);
+ if (rc == PTL_OK)
+ ptl_finalize(ni, private, ptlmsg, PTL_OK);
+
+ return rc;
+}
+
+ptl_err_t
+lonal_recv(ptl_ni_t *ni,
+ void *private,
+ ptl_msg_t *ptlmsg,
+ unsigned int niov,
+ struct iovec *iov,
+ size_t offset,
+ size_t mlen,
+ size_t rlen)
+{
+ lo_desc_t *lod = (lo_desc_t *)private;
+
+ /* I only handle mapped->mapped matches */
+ LASSERT(mlen == 0 || lod->lod_type == LOD_IOV);
+
+ if (mlen == 0)
+ goto out;
+
+ while (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ iov++;
+ niov--;
+ LASSERT(niov > 0);
+ }
+
+ while (lod->lod_offset >= lod->lod_iov.iov->iov_len) {
+ lod->lod_offset -= lod->lod_iov.iov->iov_len;
+ lod->lod_iov.iov++;
+ lod->lod_niov--;
+ LASSERT(lod->lod_niov > 0);
+ }
+
+ do {
+ int fraglen = MIN(iov->iov_len - offset,
+ lod->lod_iov.iov->iov_len - lod->lod_offset);
+
+ LASSERT(niov > 0);
+ LASSERT(lod->lod_niov > 0);
+
+ if (fraglen > mlen)
+ fraglen = mlen;
+
+ memcpy((void *)((unsigned long)iov->iov_base + offset),
+ (void *)((unsigned long)lod->lod_iov.iov->iov_base +
+ lod->lod_offset),
+ fraglen);
+
+ if (offset + fraglen < iov->iov_len) {
+ offset += fraglen;
+ } else {
+ offset = 0;
+ iov++;
+ niov--;
+ }
+
+ if (lod->lod_offset + fraglen < lod->lod_iov.iov->iov_len ) {
+ lod->lod_offset += fraglen;
+ } else {
+ lod->lod_offset = 0;
+ lod->lod_iov.iov++;
+ lod->lod_niov--;
+ }
+
+ mlen -= fraglen;
+ } while (mlen > 0);
+
+ out:
+ ptl_finalize(ni, private, ptlmsg, PTL_OK);
+ return PTL_OK;
+}
+
+#ifdef __KERNEL__
+ptl_err_t
+lonal_send_pages (ptl_ni_t *ni,
+ void *private,
+ ptl_msg_t *ptlmsg,
+ ptl_hdr_t *hdr,
+ int type,
+ ptl_process_id_t target,
+ int routing,
+ unsigned int payload_niov,
+ ptl_kiov_t *payload_kiov,
+ size_t payload_offset,
+ size_t payload_nob)
+{
+ lo_desc_t lod = {
+ .lod_type = LOD_KIOV,
+ .lod_niov = payload_niov,
+ .lod_offset = payload_offset,
+ .lod_nob = payload_nob,
+ .lod_iov = { .kiov = payload_kiov } };
+ ptl_err_t rc;
+
+ LASSERT (!routing);
+
+ rc = ptl_parse(ni, hdr, &lod);
+ if (rc == PTL_OK)
+ ptl_finalize(ni, private, ptlmsg, PTL_OK);
+
+ return rc;
+}
+
+ptl_err_t
+lonal_recv_pages(ptl_ni_t *ni,
+ void *private,
+ ptl_msg_t *ptlmsg,
+ unsigned int niov,
+ ptl_kiov_t *kiov,
+ size_t offset,
+ size_t mlen,
+ size_t rlen)
+{
+ void *srcaddr = NULL;
+ void *dstaddr = NULL;
+ unsigned long srcfrag = 0;
+ unsigned long dstfrag = 0;
+ unsigned long fraglen;
+ lo_desc_t *lod = (lo_desc_t *)private;
+
+ /* I only handle unmapped->unmapped matches */
+ LASSERT(mlen == 0 || lod->lod_type == LOD_KIOV);
+
+ if (mlen == 0)
+ goto out;
+
+ while (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ kiov++;
+ niov--;
+ LASSERT(niov > 0);
+ }
+
+ while (lod->lod_offset >= lod->lod_iov.kiov->kiov_len) {
+ lod->lod_offset -= lod->lod_iov.kiov->kiov_len;
+ lod->lod_iov.kiov++;
+ lod->lod_niov--;
+ LASSERT(lod->lod_niov > 0);
+ }
+
+ do {
+ /* CAVEAT EMPTOR: I kmap 2 pages at once == slight risk of deadlock */
+ LASSERT(niov > 0);
+ if (dstaddr == NULL) {
+ dstaddr = (void *)((unsigned long)kmap(kiov->kiov_page) +
+ kiov->kiov_offset + offset);
+ dstfrag = kiov->kiov_len - offset;
+ }
+
+ LASSERT(lod->lod_niov > 0);
+ if (srcaddr == NULL) {
+ srcaddr = (void *)((unsigned long)kmap(lod->lod_iov.kiov->kiov_page) +
+ lod->lod_iov.kiov->kiov_offset + lod->lod_offset);
+ srcfrag = lod->lod_iov.kiov->kiov_len - lod->lod_offset;
+ }
+
+ fraglen = MIN(srcfrag, dstfrag);
+ if (fraglen > mlen)
+ fraglen = mlen;
+
+ memcpy(dstaddr, srcaddr, fraglen);
+
+ if (fraglen < dstfrag) {
+ dstfrag -= fraglen;
+ dstaddr = (void *)((unsigned long)dstaddr + fraglen);
+ } else {
+ kunmap(kiov->kiov_page);
+ dstaddr = NULL;
+ offset = 0;
+ kiov++;
+ niov--;
+ }
+
+ if (fraglen < srcfrag) {
+ srcfrag -= fraglen;
+ srcaddr = (void *)((unsigned long)srcaddr + fraglen);
+ } else {
+ kunmap(lod->lod_iov.kiov->kiov_page);
+ srcaddr = NULL;
+ lod->lod_offset = 0;
+ lod->lod_iov.kiov++;
+ lod->lod_niov--;
+ }
+
+ mlen -= fraglen;
+ } while (mlen > 0);
+
+ if (dstaddr != NULL)
+ kunmap(kiov->kiov_page);
+
+ if (srcaddr != NULL)
+ kunmap(lod->lod_iov.kiov->kiov_page);
+
+ out:
+ ptl_finalize(ni, private, ptlmsg, PTL_OK);
+ return PTL_OK;
+}
+#endif
+
+static int lonal_instanced;
+
+void
+lonal_shutdown(ptl_ni_t *ni)
+{
+ CDEBUG (D_NET, "shutdown\n");
+ LASSERT (ni == &ptl_loni);
+ LASSERT (lonal_instanced);
+
+ lonal_instanced = 0;
+}
+
+ptl_err_t
+lonal_startup (ptl_ni_t *ni)
+{
+ LASSERT (ni->ni_nal == &ptl_lonal);
+ LASSERT (ni == &ptl_loni);
+ LASSERT (!lonal_instanced);
+ lonal_instanced = 1;
+
+ return (PTL_OK);
+}
+
+ptl_nal_t ptl_lonal = {
+ .nal_type = LONAL,
+ .nal_startup = lonal_startup,
+ .nal_shutdown = lonal_shutdown,
+ .nal_send = lonal_send,
+ .nal_recv = lonal_recv,
+#ifdef __KERNEL__
+ .nal_send_pages = lonal_send_pages,
+ .nal_recv_pages = lonal_recv_pages,
+#endif
+};
+
+ptl_ni_t ptl_loni;