Whamcloud - gitweb
* lonal part of portals
authoreeb <eeb>
Wed, 11 May 2005 16:14:34 +0000 (16:14 +0000)
committereeb <eeb>
Wed, 11 May 2005 16:14:34 +0000 (16:14 +0000)
lnet/autoconf/lustre-lnet.m4
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-p30.h
lnet/klnds/qswlnd/qswlnd_cb.c
lnet/klnds/socklnd/socklnd_cb.c
lnet/lnet/Makefile.in
lnet/lnet/api-ni.c
lnet/lnet/autoMakefile.am
lnet/lnet/config.c
lnet/lnet/lib-move.c
lnet/lnet/lo.c [new file with mode: 0644]

index c85d8c9..87d08db 100644 (file)
@@ -747,8 +747,6 @@ portals/knals/iibnal/Makefile
 portals/knals/iibnal/autoMakefile
 portals/knals/vibnal/Makefile
 portals/knals/vibnal/autoMakefile
-portals/knals/lonal/Makefile
-portals/knals/lonal/autoMakefile
 portals/knals/qswnal/Makefile
 portals/knals/qswnal/autoMakefile
 portals/knals/ranal/Makefile
index 4390861..f81e488 100644 (file)
@@ -411,9 +411,13 @@ ptl_ni_decref(ptl_ni_t *ni)
                 ptl_queue_zombie_ni(ni);
 }
 
+extern ptl_nal_t ptl_lonal;
+extern ptl_ni_t  ptl_loni;
+
 extern ptl_err_t ptl_get_apinih (ptl_handle_ni_t *nih);
 
 extern ptl_ni_t *ptl_net2ni (__u32 net);
+extern int ptl_islocalnid (ptl_nid_t nid);
 extern void ptl_enq_event_locked (void *private,
                                   ptl_eq_t *eq, ptl_event_t *ev);
 extern void ptl_finalize (ptl_ni_t *ni, void *private, ptl_msg_t *msg, 
index 4390861..f81e488 100644 (file)
@@ -411,9 +411,13 @@ ptl_ni_decref(ptl_ni_t *ni)
                 ptl_queue_zombie_ni(ni);
 }
 
+extern ptl_nal_t ptl_lonal;
+extern ptl_ni_t  ptl_loni;
+
 extern ptl_err_t ptl_get_apinih (ptl_handle_ni_t *nih);
 
 extern ptl_ni_t *ptl_net2ni (__u32 net);
+extern int ptl_islocalnid (ptl_nid_t nid);
 extern void ptl_enq_event_locked (void *private,
                                   ptl_eq_t *eq, ptl_event_t *ev);
 extern void ptl_finalize (ptl_ni_t *ni, void *private, ptl_msg_t *msg, 
index 2042d39..9a71e80 100644 (file)
@@ -1492,23 +1492,28 @@ void
 kqswnal_parse (kqswnal_rx_t *krx)
 {
         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
-        ptl_nid_t       dest_nid = le64_to_cpu(hdr->dest_nid);
+        ptl_nid_t       dest_nid;
         int             payload_nob;
         int             nob;
         int             niov;
+        int             rc;
 
         LASSERT (atomic_read(&krx->krx_refcount) == 1);
 
-        if (ptl_islocalnid(dest_nid)) {         /* It's for me :) */
-                /* I ignore parse errors since I'm not consuming a byte
+        rc = ptl_parse (kqswnal_data.kqn_ni, hdr, krx);
+        
+        if (rc != PTL_IFACE_DUP) {
+                /* It's for me or there's been some error.
+                 * However I ignore parse errors since I'm not consuming a byte
                  * stream */
-                (void)ptl_parse (kqswnal_data.kqn_ni, hdr, krx);
 
                 /* Drop my ref; any RDMA activity takes an additional ref */
                 kqswnal_rx_decref(krx);
                 return;
         }
 
+        dest_nid = le64_to_cpu(hdr->dest_nid);
+
 #if KQSW_CHECKSUM
         LASSERTF (0, "checksums for forwarded packets not implemented\n");
 #endif
index 760f2e9..1172e72 100644 (file)
@@ -1312,9 +1312,10 @@ ksocknal_process_receive (ksock_conn_t *conn)
         
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_HEADER:
-                if (conn->ksnc_hdr.type != cpu_to_le32(PTL_MSG_HELLO) &&
-                    !ptl_islocalnid(le64_to_cpu(conn->ksnc_hdr.dest_nid))) {
-                        /* This packet isn't for me */
+                rc = ptl_parse(conn->ksnc_peer->ksnp_ni, &conn->ksnc_hdr, conn);
+
+                if (rc == PTL_IFACE_DUP) {
+                        /* This packet isn't for me (still in net byte order) */
                         ksocknal_fwd_parse (conn);
                         switch (conn->ksnc_rx_state) {
                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
@@ -1329,9 +1330,6 @@ ksocknal_process_receive (ksock_conn_t *conn)
                         /* Not Reached */
                 }
 
-                /* sets wanted_len, iovs etc */
-                rc = ptl_parse(conn->ksnc_peer->ksnp_ni, &conn->ksnc_hdr, conn);
-
                 if (rc != PTL_OK) {
                         /* I just received garbage: give up on this conn */
                         ksocknal_close_conn_and_siblings (conn, rc);
index e298027..851e6c9 100644 (file)
@@ -5,7 +5,7 @@ router_objs += $(ROUTER)r_proc.o
 
 portals-objs := api-errno.o api-ni.o config.o
 portals-objs += lib-me.o lib-msg.o lib-eq.o lib-md.o
-portals-objs += lib-move.o module.o
+portals-objs += lib-move.o module.o lo.o
 portals-objs += $(router_objs)
 
 default: all
index bc4ef60..79b47cc 100644 (file)
@@ -139,7 +139,6 @@ ptl_find_nal_by_type (int type)
         return NULL;
 }
 
-
 void
 ptl_register_nal (ptl_nal_t *nal)
 {
@@ -553,6 +552,25 @@ ptl_net2ni (__u32 net)
         return NULL;
 }
 
+int
+ptl_islocalnid (ptl_nid_t nid)
+{
+        struct list_head *tmp;
+        ptl_ni_t         *ni;
+        unsigned long     flags;
+
+        PTL_LOCK(flags);
+        list_for_each (tmp, &ptl_apini.apini_nis) {
+                ni = list_entry(tmp, ptl_ni_t, ni_list);
+
+                if (ni->ni_nid == nid)
+                        return 1;
+        }
+        
+        PTL_UNLOCK(flags);
+        return 0;
+}
+
 void
 ptl_queue_zombie_ni (ptl_ni_t *ni)
 {
@@ -654,7 +672,7 @@ ptl_startup_nalnis (void)
         rc = ptl_parse_networks(&nilist, networks);
         if (rc != PTL_OK) 
                 goto failed;
-        
+
         while (!list_empty(&nilist)) {
                 ni = list_entry(nilist.next, ptl_ni_t, ni_list);
                 nal_type = PTL_NETNAL(PTL_NIDNET(ni->ni_nid));
@@ -743,10 +761,14 @@ PtlInit(int *max_interfaces)
         pthread_cond_init(&ptl_apini.apini_cond, NULL);
         pthread_mutex_init(&ptl_apini.apini_nal_mutex);
         pthread_mutex_init(&ptl_apini.apini_api_mutex);
+#endif
 
-        /* Kernel NALs register themselves when their module loads, and
-         * unregister themselves when their module is unloaded.  Userspace NALs
-         * are plugged in explicitly here... */
+        /* NALs in separate modules register themselves when their module
+         * loads, and unregister themselves when their module is unloaded.
+         * Otherwise they are plugged in explicitly here... */
+
+        ptl_register_nal (&ptl_lonal);
+#ifndef __KERNEL__
         ptl_register_nal (&tcpnal_nal);
 #endif
         ptl_apini.apini_init = 1;
index a82dfe3..224b868 100644 (file)
@@ -1,6 +1,6 @@
 my_sources =    api-errno.c api-ni.c config.c \
                lib-me.c lib-msg.c lib-eq.c \
-               lib-md.c lib-move. \
+               lib-md.c lib-move.c lo.c \
                $(top_srcdir)/portals/router/router.c
 
 if !CRAY_PORTALS
@@ -23,7 +23,7 @@ macos_PROGRAMS := portals
 
 portals_SOURCES := api-errno.c api-ni.c config.c
 portals_SOURCES += lib-me.c lib-msg.c lib-eq.c lib-md.c
-portals_SOURCES += lib-move.c module.c
+portals_SOURCES += lib-move.c module.c lo.c
 
 portals_CFLAGS := $(EXTRA_KCFLAGS)
 portals_LDFLAGS := $(EXTRA_KLDFLAGS)
index 441b452..1f8d105 100644 (file)
@@ -131,7 +131,10 @@ ptl_nis_conflict(ptl_ni_t *ni1, ptl_ni_t *ni2)
                 }
                 return 0;
         }
-        
+#if 0
+        /* leave this commented out so the same interface can be included explicitly in 2
+         * networks. */
+
         for (i = 0; i < PTL_MAX_INTERFACES; i++) {
                 if (ni1->ni_interfaces[i] == NULL)
                         break;
@@ -155,7 +158,7 @@ ptl_nis_conflict(ptl_ni_t *ni1, ptl_ni_t *ni2)
                         return 1;
                 }
         }
-        
+#endif   
         return 0;
 }
 
@@ -165,8 +168,7 @@ ptl_check_ni_conflicts(ptl_ni_t *ni, struct list_head *nilist)
         struct list_head *tmp;
         ptl_ni_t         *ni2;
 
-        /* Yes! ni just added to this list.  
-         * Check its network is unique and its interfaces don't conflict */
+        /* Yes! ni _has_ just been added to this list. */
         LASSERT (ni == list_entry(nilist->prev, ptl_ni_t, ni_list));
         
         list_for_each (tmp, nilist) {
@@ -206,6 +208,12 @@ ptl_parse_networks(struct list_head *nilist, char *networks)
         memcpy (tokens, networks, tokensize);
        str = tokens;
 
+        /* Add in the loopback network */
+        /* zero counters/flags, NULL pointers... */
+        memset(&ptl_loni, 0, sizeof(ptl_loni));
+        ptl_loni.ni_nid = PTL_MKNID(PTL_MKNET(LONAL, 0), 0);
+        list_add_tail(&ptl_loni.ni_list, nilist);
+        
         while (str != NULL && *str != 0) {
                 char      *comma = strchr(str, ',');
                 char      *bracket = strchr(str, '(');
index 24b26ff..408bded 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_PORTALS
 
-#if 0
-#ifndef __KERNEL__
-# include <stdio.h>
-#else
-# include <libcfs/kp30.h>
-#endif
-#endif
 #include <portals/lib-p30.h>
 
+static int allow_destination_aliases = 0;
+CFS_MODULE_PARM(allow_destination_aliases, "i", int, 0644,
+                "Boolean: don't require strict destination NIDs");
+
 /* forward ref */
 static void ptl_commit_md (ptl_libmd_t *md, ptl_msg_t *msg);
-static ptl_err_t do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, 
-                              void *private, int loopback);
 
 static ptl_libmd_t *
 ptl_match_md(int index, int op_mask, ptl_process_id_t src,
@@ -419,23 +414,6 @@ ptl_extract_kiov (int dst_niov, ptl_kiov_t *dst,
         LASSERT (0);
 }
 
-ptl_err_t
-ptl_lo_rxkiov(ptl_ni_t *ni, void *private, ptl_msg_t *libmsg,
-              unsigned int niov, ptl_kiov_t *kiov,
-              size_t offset, size_t mlen, size_t rlen)
-{
-        LASSERT (0);
-}
-
-ptl_err_t
-ptl_lo_txkiov (ptl_ni_t *ni, void *private, ptl_msg_t *libmsg,
-               ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-               unsigned int payload_niov, ptl_kiov_t *payload_kiov,
-               size_t payload_offset, size_t payload_nob)
-{
-        LASSERT (0);
-}
-
 #else /* __KERNEL__ */
 
 ptl_size_t
@@ -571,255 +549,9 @@ ptl_extract_kiov (int dst_niov, ptl_kiov_t *dst,
                 offset = 0;
         }
 }
-
-#ifndef __KERNEL__
-#if !defined(kmap)
-#define kmap(page) ((page)->addr)
-#endif
-#if !defined(kunmap)
-#define kunmap(page) do {} while(0)
-#endif
-#if !defined(page_address)
-#define page_address(page) ((page)->page_address)
-#endif
 #endif
 
 ptl_err_t
-ptl_lo_rxkiov(ptl_ni_t     *ni,
-              void         *private,
-              ptl_msg_t    *libmsg,
-              unsigned int  niov,
-              ptl_kiov_t   *kiov,
-              size_t        offset,
-              size_t        mlen,
-              size_t        rlen)
-{
-        void          *srcaddr = NULL;
-        void          *dstaddr = NULL;
-        unsigned long  srcfrag = 0;
-        unsigned long  dstfrag = 0;
-        unsigned long  fraglen;
-        lo_desc_t     *lod = (lo_desc_t *)private;
-
-        /* I only handle unmapped->unmapped matches */
-        LASSERT(lod->lod_type == LOD_KIOV);
-
-        if (mlen == 0)
-                return PTL_OK;
-
-        while (offset >= kiov->kiov_len) {
-                offset -= kiov->kiov_len;
-                kiov++;
-                niov--;
-                LASSERT(niov > 0);
-        }
-
-        while (lod->lod_offset >= lod->lod_iov.kiov->kiov_len) {
-                lod->lod_offset -= lod->lod_iov.kiov->kiov_len;
-                lod->lod_iov.kiov++;
-                lod->lod_niov--;
-                LASSERT(lod->lod_niov > 0);
-        }
-
-        do {
-                /* CAVEAT EMPTOR:
-                 * I kmap 2 pages at once == slight risk of deadlock */
-                LASSERT(niov > 0);
-                if (dstaddr == NULL) {
-                        dstaddr = (void *)
-                                ((unsigned long)cfs_kmap(kiov->kiov_page) +
-                                 kiov->kiov_offset + offset);
-                        dstfrag = kiov->kiov_len -  offset;
-                }
-
-                LASSERT(lod->lod_niov > 0);
-                if (srcaddr == NULL) {
-                        srcaddr = (void *)
-                         ((unsigned long)cfs_kmap(lod->lod_iov.kiov->kiov_page)+
-                          lod->lod_iov.kiov->kiov_offset + lod->lod_offset);
-                        srcfrag = lod->lod_iov.kiov->kiov_len - lod->lod_offset;
-                }
-
-                fraglen = MIN(srcfrag, dstfrag);
-                if (fraglen > mlen)
-                        fraglen = mlen;
-
-                memcpy(dstaddr, srcaddr, fraglen);
-
-                if (fraglen < dstfrag) {
-                        dstfrag -= fraglen;
-                        dstaddr = (void *)((unsigned long)dstaddr + fraglen);
-                } else {
-                        cfs_kunmap(kiov->kiov_page);
-                        dstaddr = NULL;
-                        offset = 0;
-                        kiov++;
-                        niov--;
-                }
-
-                if (fraglen < srcfrag) {
-                        srcfrag -= fraglen;
-                        srcaddr = (void *)((unsigned long)srcaddr + fraglen);
-                } else {
-                        cfs_kunmap(lod->lod_iov.kiov->kiov_page);
-                        srcaddr = NULL;
-                        lod->lod_offset = 0;
-                        lod->lod_iov.kiov++;
-                        lod->lod_niov--;
-                }
-
-                mlen -= fraglen;
-        } while (mlen > 0);
-
-        if (dstaddr != NULL)
-                cfs_kunmap(kiov->kiov_page);
-
-        if (srcaddr != NULL)
-                cfs_kunmap(lod->lod_iov.kiov->kiov_page);
-
-        ptl_finalize(ni, private, libmsg, PTL_OK);
-        return PTL_OK;
-}
-
-ptl_err_t
-ptl_lo_txkiov (ptl_ni_t     *ni,
-               void         *private,
-               ptl_msg_t    *libmsg,
-               ptl_hdr_t    *hdr,
-               unsigned int  payload_niov,
-               ptl_kiov_t   *payload_kiov,
-               size_t        payload_offset,
-               size_t        payload_nob)
-{
-        lo_desc_t lod = {
-                .lod_type     = LOD_KIOV,
-                .lod_niov     = payload_niov,
-                .lod_offset   = payload_offset,
-                .lod_nob      = payload_nob,
-                .lod_iov      = { .kiov = payload_kiov } };
-        ptl_err_t   rc;
-
-        rc = do_ptl_parse(ni, hdr, &lod, 1);
-        if (rc == PTL_OK)
-                ptl_finalize(ni, private, libmsg, PTL_OK);
-
-        return rc;
-}
-#endif
-
-ptl_err_t
-ptl_lo_rxiov(ptl_ni_t     *ni,
-             void         *private,
-             ptl_msg_t    *libmsg,
-             unsigned int  niov,
-             struct iovec *iov,
-             size_t        offset,
-             size_t        mlen,
-             size_t        rlen)
-{
-        lo_desc_t *lod = (lo_desc_t *)private;
-
-        /* I only handle mapped->mapped matches */
-        LASSERT(lod->lod_type == LOD_IOV);
-        LASSERT(mlen > 0);
-
-        while (offset >= iov->iov_len) {
-                offset -= iov->iov_len;
-                iov++;
-                niov--;
-                LASSERT(niov > 0);
-        }
-
-        while (lod->lod_offset >= lod->lod_iov.iov->iov_len) {
-                lod->lod_offset -= lod->lod_iov.iov->iov_len;
-                lod->lod_iov.iov++;
-                lod->lod_niov--;
-                LASSERT(lod->lod_niov > 0);
-        }
-
-        do {
-                int fraglen = MIN(iov->iov_len - offset,
-                                  lod->lod_iov.iov->iov_len - lod->lod_offset);
-
-                LASSERT(niov > 0);
-                LASSERT(lod->lod_niov > 0);
-
-                if (fraglen > mlen)
-                        fraglen = mlen;
-
-                memcpy((void *)((unsigned long)iov->iov_base + offset),
-                       (void *)((unsigned long)lod->lod_iov.iov->iov_base +
-                                lod->lod_offset),
-                       fraglen);
-
-                if (offset + fraglen < iov->iov_len) {
-                        offset += fraglen;
-                } else {
-                        offset = 0;
-                        iov++;
-                        niov--;
-                }
-
-                if (lod->lod_offset + fraglen < lod->lod_iov.iov->iov_len ) {
-                        lod->lod_offset += fraglen;
-                } else {
-                        lod->lod_offset = 0;
-                        lod->lod_iov.iov++;
-                        lod->lod_niov--;
-                }
-
-                mlen -= fraglen;
-        } while (mlen > 0);
-
-        ptl_finalize(ni, private, libmsg, PTL_OK);
-        return PTL_OK;
-}
-
-ptl_err_t
-ptl_lo_txiov (ptl_ni_t     *ni,
-              void         *private,
-              ptl_msg_t    *libmsg,
-              ptl_hdr_t    *hdr,
-              unsigned int  payload_niov,
-              struct iovec *payload_iov,
-              size_t        payload_offset,
-              size_t        payload_nob)
-{
-        lo_desc_t lod = {
-                .lod_type    = LOD_IOV,
-                .lod_niov    = payload_niov,
-                .lod_offset  = payload_offset,
-                .lod_nob     = payload_nob,
-                .lod_iov     = { .iov = payload_iov } };
-        ptl_err_t rc;
-
-        rc = do_ptl_parse(ni, hdr, &lod, 1);
-        if (rc == PTL_OK)
-                ptl_finalize(ni, private, libmsg, PTL_OK);
-
-        return rc;
-}
-
-ptl_err_t
-ptl_lo_recv (ptl_ni_t *ni, void *private, ptl_msg_t *msg, ptl_libmd_t *md,
-             ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
-{
-        if (mlen == 0) {
-                ptl_finalize(ni, private, msg, PTL_OK);
-                return PTL_OK;
-        }
-
-        if ((md->md_options & PTL_MD_KIOV) == 0)
-                return ptl_lo_rxiov(ni, private, msg,
-                                    md->md_niov, md->md_iov.iov,
-                                    offset, mlen, rlen);
-
-        return ptl_lo_rxkiov(ni, private, msg,
-                             md->md_niov, md->md_iov.kiov,
-                             offset, mlen, rlen);
-}
-
-ptl_err_t
 ptl_recv (ptl_ni_t *ni, void *private, ptl_msg_t *msg, ptl_libmd_t *md,
           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
 {
@@ -845,7 +577,6 @@ ptl_send (ptl_ni_t *ni, void *private, ptl_msg_t *msg,
 {
         ptl_nid_t gw_nid;
         int       routing;
-        int       loopback;
         ptl_err_t rc;
 
         /* CAVEAT EMPTOR! ni != NULL == interface pre-determined (ACK) */
@@ -856,15 +587,15 @@ ptl_send (ptl_ni_t *ni, void *private, ptl_msg_t *msg,
                 return PTL_FAIL;
         }
 
-        routing = (gw_nid != ni->ni_nid);       /* gateway will forward */
-        loopback = (target.nid == ni->ni_nid);  /* it's for me! */
-
-        if (routing && loopback) {              /* very strange */
-                CERROR("Inconsistent route table: target %s gw %s ni %s\n",
-                       libcfs_id2str(target), libcfs_nid2str(gw_nid), 
-                       libcfs_nid2str(ni->ni_nid));
-                rc = PTL_FAIL;
-                goto out;
+        if (target.nid != ni->ni_nid) {
+                /* will gateway have to forward? */
+                routing = (gw_nid != ni->ni_nid);
+        } else {
+                /* it's for me! */
+                ptl_ni_addref(&ptl_loni);
+                ptl_ni_decref(ni);
+                ni = &ptl_loni;
+                routing = 0;
         }
 
         hdr->type           = cpu_to_le32(type);
@@ -881,38 +612,20 @@ ptl_send (ptl_ni_t *ni, void *private, ptl_msg_t *msg,
         if (routing)
                 target.nid = gw_nid;
         
-        if (len == 0) {
-                if (loopback)
-                        rc = ptl_lo_txiov(ni, private, msg, hdr,
-                                          0, NULL, offset, len);
-                else
-                        rc = (ni->ni_nal->nal_send)(ni, private, msg, hdr, 
-                                                    type, target, routing,
-                                                    0, NULL, offset, len);
-        } else if ((md->md_options & PTL_MD_KIOV) == 0) {
-                if (loopback)
-                        rc = ptl_lo_txiov(ni, private, msg, hdr, 
-                                          md->md_niov, md->md_iov.iov,
-                                          offset, len);
-                else
-                        rc = (ni->ni_nal->nal_send)
-                                     (ni, private, msg, hdr, 
-                                      type, target, routing,
-                                      md->md_niov, md->md_iov.iov,
-                                      offset, len);
-        } else {
-                if (loopback)
-                        rc =  ptl_lo_txkiov(ni, private, msg, hdr, 
-                                            md->md_niov, md->md_iov.kiov,
+        if (len == 0)
+                rc = (ni->ni_nal->nal_send)(ni, private, msg, hdr, 
+                                            type, target, routing,
+                                            0, NULL, offset, len);
+        else if ((md->md_options & PTL_MD_KIOV) == 0)
+                rc = (ni->ni_nal->nal_send)(ni, private, msg, hdr, 
+                                            type, target, routing,
+                                            md->md_niov, md->md_iov.iov,
                                             offset, len);
-                else
-                        rc = (ni->ni_nal->nal_send_pages)
-                                     (ni, private, msg, hdr, 
-                                      type, target, routing,
-                                      md->md_niov, md->md_iov.kiov,
-                                      offset, len);
-        }
-
+        else
+                rc = (ni->ni_nal->nal_send_pages)(ni, private, msg, hdr, 
+                                                  type, target, routing,
+                                                  md->md_niov, md->md_iov.kiov,
+                                                  offset, len);
  out:
         ptl_ni_decref(ni);                      /* lose ref from kpr_lookup */
         return rc;
@@ -943,7 +656,7 @@ ptl_commit_md (ptl_libmd_t *md, ptl_msg_t *msg)
 }
 
 static void
-ptl_drop_message (ptl_ni_t *ni, void *private, ptl_hdr_t *hdr, int loopback)
+ptl_drop_message (ptl_ni_t *ni, void *private, ptl_hdr_t *hdr)
 {
         unsigned long flags;
 
@@ -957,9 +670,8 @@ ptl_drop_message (ptl_ni_t *ni, void *private, ptl_hdr_t *hdr, int loopback)
         PTL_UNLOCK(flags);
 
         /* NULL msg => if NAL calls ptl_finalize it will be a noop */
-        if (!loopback)
-                (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
-                                hdr->payload_length);
+        (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
+                        hdr->payload_length);
 }
 
 /*
@@ -970,8 +682,7 @@ ptl_drop_message (ptl_ni_t *ni, void *private, ptl_hdr_t *hdr, int loopback)
  *
  */
 static ptl_err_t
-ptl_parse_put(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
-              ptl_msg_t *msg, int loopback)
+ptl_parse_put(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, ptl_msg_t *msg)
 {
         ptl_size_t       mlength = 0;
         ptl_size_t       offset = 0;
@@ -1010,12 +721,8 @@ ptl_parse_put(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
 
         PTL_UNLOCK(flags);
 
-        if (loopback)
-                rc = ptl_lo_recv(ni, private, msg, md, offset, mlength,
-                                 hdr->payload_length);
-        else
-                rc = ptl_recv(ni, private, msg, md, offset, mlength,
-                              hdr->payload_length);
+        rc = ptl_recv(ni, private, msg, md, offset, mlength,
+                      hdr->payload_length);
 
         if (rc != PTL_OK)
                 CERROR("%s: error on receiving PUT from %s: %d\n",
@@ -1025,8 +732,7 @@ ptl_parse_put(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
 }
 
 static ptl_err_t
-ptl_parse_get(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
-              ptl_msg_t *msg, int loopback)
+ptl_parse_get(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, ptl_msg_t *msg)
 {
         ptl_size_t       mlength = 0;
         ptl_size_t       offset = 0;
@@ -1075,16 +781,13 @@ ptl_parse_get(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
 
         /* Discard any junk after the hdr */
-        if (!loopback)
-                (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
-                                hdr->payload_length);
-
+        (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
+                        hdr->payload_length);
         return (rc);
 }
 
 static ptl_err_t
-ptl_parse_reply(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
-                ptl_msg_t *msg, int loopback)
+ptl_parse_reply(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, ptl_msg_t *msg)
 {
         ptl_process_id_t src = {.nid = hdr->src_nid,
                                 .pid = hdr->src_pid};
@@ -1147,11 +850,7 @@ ptl_parse_reply(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
 
         PTL_UNLOCK(flags);
 
-        if (loopback)
-                rc = ptl_lo_recv(ni, private, msg, md, 0, length, rlength);
-        else
-                rc = ptl_recv(ni, private, msg, md, 0, length, rlength);
-
+        rc = ptl_recv(ni, private, msg, md, 0, length, rlength);
         if (rc != PTL_OK)
                 CERROR("%s: error on receiving REPLY from %s: %d\n",
                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
@@ -1160,8 +859,7 @@ ptl_parse_reply(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
 }
 
 static ptl_err_t
-ptl_parse_ack(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, 
-              ptl_msg_t *msg, int loopback)
+ptl_parse_ack(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, ptl_msg_t *msg)
 {
         ptl_process_id_t src = {.nid = hdr->src_nid,
                                 .pid = hdr->src_pid};
@@ -1210,9 +908,7 @@ ptl_parse_ack(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private,
         ptl_finalize(ni, private, msg, PTL_OK);
 
         /* ...and now discard any junk after the hdr */
-        if (!loopback)
-                (void) ptl_recv(ni, private, NULL, NULL, 0, 0,
-                                hdr->payload_length);
+        (void) ptl_recv(ni, private, NULL, NULL, 0, 0, hdr->payload_length);
 
        return (PTL_OK);
 }
@@ -1298,29 +994,18 @@ ptl_print_hdr(ptl_hdr_t * hdr)
 ptl_err_t
 ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private)
 {
-        return do_ptl_parse(ni, hdr, private, 0);
-}
-
-ptl_err_t
-do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, int loopback)
-{
         unsigned long  flags;
         ptl_err_t      rc;
         ptl_msg_t     *msg;
+        ptl_nid_t      dest_nid;
+        __u32          type = le32_to_cpu(hdr->type);
 
         /* NB we return PTL_OK if we manage to parse the header and believe
          * it looks OK.  Anything that goes wrong with receiving the
-         * message after that point is the responsibility of the NAL */
-
-        /* convert common fields to host byte order */
-        hdr->type = le32_to_cpu(hdr->type);
-        hdr->src_nid = le64_to_cpu(hdr->src_nid);
-        hdr->src_pid = le32_to_cpu(hdr->src_pid);
-        hdr->dest_pid = le32_to_cpu(hdr->dest_pid);
-        hdr->payload_length = le32_to_cpu(hdr->payload_length);
+         * message after that point is the responsibility of the NAL.
+         * If we don't think the packet is for us, return PTL_IFACE_DUP */
 
-        switch (hdr->type) {
-        case PTL_MSG_HELLO: {
+        if (type == PTL_MSG_HELLO) {
                 /* dest_nid is really ptl_magicversion_t */
                 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
 
@@ -1338,7 +1023,7 @@ do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, int loopback)
                                libcfs_nid2str(hdr->src_nid));
 
                         /* it's good but we don't want it */
-                        ptl_drop_message(ni, private, hdr, loopback);
+                        ptl_drop_message(ni, private, hdr);
                         return PTL_OK;
                 }
 
@@ -1351,25 +1036,45 @@ do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, int loopback)
                 return PTL_FAIL;
         }
 
+        dest_nid = le64_to_cpu(hdr->dest_nid);
+        if (dest_nid != ni->ni_nid) {
+
+                if (!ptl_islocalnid(dest_nid))
+                        return PTL_IFACE_DUP;
+
+                /* dest_nid is one of my NIs */
+                
+                if (!allow_destination_aliases) {
+                        /* dest is another local NI; sender should have used
+                         * this node's NID on its own network */
+                        CERROR ("%s: Dropping message from %s: nid %s is a local alias\n",
+                                libcfs_nid2str(ni->ni_nid),
+                                libcfs_nid2str(le64_to_cpu(hdr->src_nid)),
+                                libcfs_nid2str(dest_nid));
+                        return PTL_FAIL;
+                }
+        }
+        
+        /* convert common fields to host byte order */
+        hdr->type = type;
+        hdr->src_nid = le64_to_cpu(hdr->src_nid);
+        hdr->src_pid = le32_to_cpu(hdr->src_pid);
+        hdr->dest_nid = dest_nid;
+        hdr->dest_pid = le32_to_cpu(hdr->dest_pid);
+        hdr->payload_length = le32_to_cpu(hdr->payload_length);
+
+        switch (type) {
         case PTL_MSG_ACK:
         case PTL_MSG_PUT:
         case PTL_MSG_GET:
         case PTL_MSG_REPLY:
-                hdr->dest_nid = le64_to_cpu(hdr->dest_nid);
-                if (hdr->dest_nid != ni->ni_nid) {
-                        CERROR("%s: BAD dest NID in %s message from %s to %s"
-                               "(not me)\n", libcfs_nid2str(ni->ni_nid), 
-                               hdr_type_string(hdr), 
-                               libcfs_nid2str(hdr->src_nid),
-                               libcfs_nid2str(hdr->dest_nid));
-                        return PTL_FAIL;
-                }
                 break;
 
         default:
                 CERROR("%s: Bad message type 0x%x from %s\n",
                        libcfs_nid2str(ni->ni_nid), hdr->type, 
                        libcfs_nid2str(hdr->src_nid));
+
                 return PTL_FAIL;
         }
 
@@ -1382,7 +1087,7 @@ do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, int loopback)
                 CERROR("%s: Dropping incoming %s from %s: simulated failure\n",
                        libcfs_nid2str(ni->ni_nid), hdr_type_string (hdr),
                        libcfs_nid2str(hdr->src_nid));
-                ptl_drop_message(ni, private, hdr, loopback);
+                ptl_drop_message(ni, private, hdr);
                 return PTL_OK;
         }
 
@@ -1392,22 +1097,22 @@ do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, int loopback)
                        "can't allocate a ptl_msg_t\n",
                        libcfs_nid2str(ni->ni_nid), hdr_type_string (hdr),
                        libcfs_nid2str(hdr->src_nid));
-                ptl_drop_message(ni, private, hdr, loopback);
+                ptl_drop_message(ni, private, hdr);
                 return PTL_OK;
         }
 
         switch (hdr->type) {
         case PTL_MSG_ACK:
-                rc = ptl_parse_ack(ni, hdr, private, msg, loopback);
+                rc = ptl_parse_ack(ni, hdr, private, msg);
                 break;
         case PTL_MSG_PUT:
-                rc = ptl_parse_put(ni, hdr, private, msg, loopback);
+                rc = ptl_parse_put(ni, hdr, private, msg);
                 break;
         case PTL_MSG_GET:
-                rc = ptl_parse_get(ni, hdr, private, msg, loopback);
+                rc = ptl_parse_get(ni, hdr, private, msg);
                 break;
         case PTL_MSG_REPLY:
-                rc = ptl_parse_reply(ni, hdr, private, msg, loopback);
+                rc = ptl_parse_reply(ni, hdr, private, msg);
                 break;
         default:
                 LASSERT(0);
@@ -1424,7 +1129,7 @@ do_ptl_parse(ptl_ni_t *ni, ptl_hdr_t *hdr, void *private, int loopback)
                         ptl_msg_free(msg); /* expects PTL_LOCK held */
                         PTL_UNLOCK(flags);
 
-                        ptl_drop_message(ni, private, hdr, loopback);
+                        ptl_drop_message(ni, private, hdr);
                 }
         }
 
diff --git a/lnet/lnet/lo.c b/lnet/lnet/lo.c
new file mode 100644 (file)
index 0000000..b58df75
--- /dev/null
@@ -0,0 +1,287 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2004 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <portals/lib-p30.h>
+
+ptl_err_t
+lonal_send (ptl_ni_t        *ni,
+           void            *private,
+           ptl_msg_t       *ptlmsg,
+           ptl_hdr_t       *hdr,
+           int              type,
+           ptl_process_id_t target,
+           int              routing,
+           unsigned int     payload_niov,
+           struct iovec    *payload_iov,
+           size_t           payload_offset,
+           size_t           payload_nob)
+{
+        lo_desc_t lod = {
+                .lod_type    = LOD_IOV,
+                .lod_niov    = payload_niov,
+                .lod_offset  = payload_offset,
+                .lod_nob     = payload_nob,
+                .lod_iov     = { .iov = payload_iov } };
+        ptl_err_t rc;
+
+        LASSERT (!routing);
+
+        rc = ptl_parse(ni, hdr, &lod);
+        if (rc == PTL_OK)
+                ptl_finalize(ni, private, ptlmsg, PTL_OK);
+        
+        return rc;
+}
+
+ptl_err_t
+lonal_recv(ptl_ni_t     *ni,
+          void         *private,
+          ptl_msg_t    *ptlmsg,
+          unsigned int  niov,
+          struct iovec *iov,
+          size_t        offset,
+          size_t        mlen,
+          size_t        rlen)
+{
+        lo_desc_t *lod = (lo_desc_t *)private;
+
+        /* I only handle mapped->mapped matches */
+        LASSERT(mlen == 0 || lod->lod_type == LOD_IOV);
+
+        if (mlen == 0)
+                goto out;
+
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT(niov > 0);
+        }
+        
+        while (lod->lod_offset >= lod->lod_iov.iov->iov_len) {
+                lod->lod_offset -= lod->lod_iov.iov->iov_len;
+                lod->lod_iov.iov++;
+                lod->lod_niov--;
+                LASSERT(lod->lod_niov > 0);
+        }
+        
+        do {
+                int fraglen = MIN(iov->iov_len - offset,
+                                  lod->lod_iov.iov->iov_len - lod->lod_offset);
+
+                LASSERT(niov > 0);
+                LASSERT(lod->lod_niov > 0);
+
+                if (fraglen > mlen)
+                        fraglen = mlen;
+                
+                memcpy((void *)((unsigned long)iov->iov_base + offset),
+                       (void *)((unsigned long)lod->lod_iov.iov->iov_base +
+                                lod->lod_offset),
+                       fraglen);
+
+                if (offset + fraglen < iov->iov_len) {
+                        offset += fraglen;
+                } else {
+                        offset = 0;
+                        iov++;
+                        niov--;
+                }
+
+                if (lod->lod_offset + fraglen < lod->lod_iov.iov->iov_len ) {
+                        lod->lod_offset += fraglen;
+                } else {
+                        lod->lod_offset = 0;
+                        lod->lod_iov.iov++;
+                        lod->lod_niov--;
+                }
+
+                mlen -= fraglen;
+        } while (mlen > 0);
+        
+ out:
+        ptl_finalize(ni, private, ptlmsg, PTL_OK);
+        return PTL_OK;
+}
+
+#ifdef __KERNEL__
+ptl_err_t
+lonal_send_pages (ptl_ni_t        *ni,
+                 void            *private,
+                 ptl_msg_t       *ptlmsg,
+                 ptl_hdr_t       *hdr,
+                 int              type,
+                 ptl_process_id_t target,
+                 int              routing,
+                 unsigned int     payload_niov,
+                 ptl_kiov_t      *payload_kiov,
+                 size_t           payload_offset,
+                 size_t           payload_nob)
+{
+        lo_desc_t lod = {
+                .lod_type     = LOD_KIOV,
+                .lod_niov     = payload_niov,
+                .lod_offset   = payload_offset,
+                .lod_nob      = payload_nob,
+                .lod_iov      = { .kiov = payload_kiov } };
+        ptl_err_t   rc;
+
+        LASSERT (!routing);
+
+        rc = ptl_parse(ni, hdr, &lod);
+        if (rc == PTL_OK)
+                ptl_finalize(ni, private, ptlmsg, PTL_OK);
+        
+        return rc;
+}
+
+ptl_err_t
+lonal_recv_pages(ptl_ni_t     *ni,
+                void         *private,
+                ptl_msg_t    *ptlmsg,
+                unsigned int  niov,
+                ptl_kiov_t   *kiov,
+                size_t        offset,
+                size_t        mlen,
+                size_t        rlen)
+{
+        void          *srcaddr = NULL;
+        void          *dstaddr = NULL;
+        unsigned long  srcfrag = 0;
+        unsigned long  dstfrag = 0;
+        unsigned long  fraglen;
+        lo_desc_t    *lod = (lo_desc_t *)private;
+
+        /* I only handle unmapped->unmapped matches */
+        LASSERT(mlen == 0 || lod->lod_type == LOD_KIOV);
+
+        if (mlen == 0)
+                goto out;
+
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT(niov > 0);
+        }
+
+        while (lod->lod_offset >= lod->lod_iov.kiov->kiov_len) {
+                lod->lod_offset -= lod->lod_iov.kiov->kiov_len;
+                lod->lod_iov.kiov++;
+                lod->lod_niov--;
+                LASSERT(lod->lod_niov > 0);
+        }
+
+        do {
+        /* CAVEAT EMPTOR: I kmap 2 pages at once == slight risk of deadlock */
+                LASSERT(niov > 0);
+                if (dstaddr == NULL) {
+                        dstaddr = (void *)((unsigned long)kmap(kiov->kiov_page) +
+                                           kiov->kiov_offset + offset);
+                        dstfrag = kiov->kiov_len -  offset;
+                }
+
+                LASSERT(lod->lod_niov > 0);
+                if (srcaddr == NULL) {
+                        srcaddr = (void *)((unsigned long)kmap(lod->lod_iov.kiov->kiov_page) +
+                                           lod->lod_iov.kiov->kiov_offset + lod->lod_offset);
+                        srcfrag = lod->lod_iov.kiov->kiov_len - lod->lod_offset;
+                }
+                
+                fraglen = MIN(srcfrag, dstfrag);
+                if (fraglen > mlen)
+                        fraglen = mlen;
+                
+                memcpy(dstaddr, srcaddr, fraglen);
+                
+                if (fraglen < dstfrag) {
+                        dstfrag -= fraglen;
+                        dstaddr = (void *)((unsigned long)dstaddr + fraglen);
+                } else {
+                        kunmap(kiov->kiov_page);
+                        dstaddr = NULL;
+                        offset = 0;
+                        kiov++;
+                        niov--;
+                }
+
+                if (fraglen < srcfrag) {
+                        srcfrag -= fraglen;
+                        srcaddr = (void *)((unsigned long)srcaddr + fraglen);
+                } else {
+                        kunmap(lod->lod_iov.kiov->kiov_page);
+                        srcaddr = NULL;
+                        lod->lod_offset = 0;
+                        lod->lod_iov.kiov++;
+                        lod->lod_niov--;
+                }
+
+                mlen -= fraglen;
+        } while (mlen > 0);
+
+        if (dstaddr != NULL)
+                kunmap(kiov->kiov_page);
+
+        if (srcaddr != NULL)
+                kunmap(lod->lod_iov.kiov->kiov_page);
+
+ out:
+        ptl_finalize(ni, private, ptlmsg, PTL_OK);
+        return PTL_OK;
+}
+#endif
+
+static int lonal_instanced;
+
+void
+lonal_shutdown(ptl_ni_t *ni)
+{
+       CDEBUG (D_NET, "shutdown\n");
+       LASSERT (ni == &ptl_loni);
+        LASSERT (lonal_instanced);
+        
+        lonal_instanced = 0;
+}
+
+ptl_err_t
+lonal_startup (ptl_ni_t *ni)
+{
+       LASSERT (ni->ni_nal == &ptl_lonal);
+       LASSERT (ni == &ptl_loni);
+       LASSERT (!lonal_instanced);
+        lonal_instanced = 1;
+
+       return (PTL_OK);
+}
+
+ptl_nal_t ptl_lonal = {
+        .nal_type       = LONAL,
+        .nal_startup    = lonal_startup,
+        .nal_shutdown   = lonal_shutdown,
+        .nal_send       = lonal_send,
+        .nal_recv       = lonal_recv,
+#ifdef __KERNEL__
+        .nal_send_pages = lonal_send_pages,
+        .nal_recv_pages = lonal_recv_pages,
+#endif
+};
+
+ptl_ni_t ptl_loni;