Whamcloud - gitweb
Landing b_elan4.
authorshaver <shaver>
Mon, 19 Jan 2004 18:47:33 +0000 (18:47 +0000)
committershaver <shaver>
Mon, 19 Jan 2004 18:47:33 +0000 (18:47 +0000)
lnet/archdep.m4
lnet/klnds/qswlnd/qswlnd.c
lnet/klnds/qswlnd/qswlnd.h
lnet/klnds/qswlnd/qswlnd_cb.c
lustre/portals/archdep.m4
lustre/portals/knals/qswnal/qswnal.c
lustre/portals/knals/qswnal/qswnal.h
lustre/portals/knals/qswnal/qswnal_cb.c
lustre/utils/lconf
lustre/utils/llmount.c

index f3f0fb4..b67e012 100644 (file)
@@ -253,7 +253,14 @@ AC_MSG_CHECKING(if quadrics kernel headers are present)
 if test -d $LINUX/drivers/net/qsnet ; then
   AC_MSG_RESULT(yes)
   QSWNAL="qswnal"
-  with_quadrics="-I$LINUX/drivers/net/qsnet/include"
+  AC_MSG_CHECKING(for multirail EKC)
+  if test -f $LINUX/include/elan/epcomms.h; then
+       AC_MSG_RESULT(supported)
+       with_quadrics="-DMULTIRAIL_EKC=1"
+  else
+       AC_MSG_RESULT(not supported)
+       with_quadrics="-I$LINUX/drivers/net/qsnet/include"
+  fi
   :
 elif test -d $LINUX/drivers/qsnet1 ; then
   AC_MSG_RESULT(yes)
index 70b45c0..127bbce 100644 (file)
@@ -35,6 +35,27 @@ kpr_nal_interface_t kqswnal_router_interface = {
        kprni_notify:   NULL,                   /* we're connectionless */
 };
 
+#if CONFIG_SYSCTL
+#define QSWNAL_SYSCTL  201
+
+#define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
+#define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
+
+static ctl_table kqswnal_ctl_table[] = {
+       {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
+        &kqswnal_data.kqn_optimized_gets, sizeof (int),
+        0644, NULL, &proc_dointvec},
+       {QSWNAL_SYSCTL_COPY_SMALL_FWD, "copy_small_fwd",
+        &kqswnal_data.kqn_copy_small_fwd, sizeof (int),
+        0644, NULL, &proc_dointvec},
+       {0}
+};
+
+static ctl_table kqswnal_top_ctl_table[] = {
+       {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
+       {0}
+};
+#endif
 
 static int
 kqswnal_forward(nal_t   *nal,
@@ -178,6 +199,10 @@ kqswnal_finalise (void)
                LASSERT (0);
 
        case KQN_INIT_ALL:
+#if CONFIG_SYSCTL
+                if (kqswnal_data.kqn_sysctl != NULL)
+                        unregister_sysctl_table (kqswnal_data.kqn_sysctl);
+#endif         
                PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
                 kportal_nal_unregister(QSWNAL);
                /* fall through */
@@ -200,21 +225,30 @@ kqswnal_finalise (void)
        kpr_shutdown (&kqswnal_data.kqn_router);
 
        /**********************************************************************/
-       /* flag threads to terminate, wake them and wait for them to die */
+       /* flag threads we've started to terminate and wait for all to ack */
 
        kqswnal_data.kqn_shuttingdown = 1;
        wake_up_all (&kqswnal_data.kqn_sched_waitq);
 
-       while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
-               CDEBUG(D_NET, "waiting for %d threads to terminate\n",
-                      atomic_read (&kqswnal_data.kqn_nthreads));
+       while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
+               CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
+                      atomic_read (&kqswnal_data.kqn_nthreads_running));
                set_current_state (TASK_UNINTERRUPTIBLE);
                schedule_timeout (HZ);
        }
 
        /**********************************************************************/
        /* close elan comms */
+#if MULTIRAIL_EKC
+       if (kqswnal_data.kqn_eprx_small != NULL)
+               ep_free_rcvr (kqswnal_data.kqn_eprx_small);
 
+       if (kqswnal_data.kqn_eprx_large != NULL)
+               ep_free_rcvr (kqswnal_data.kqn_eprx_large);
+
+       if (kqswnal_data.kqn_eptx != NULL)
+               ep_free_xmtr (kqswnal_data.kqn_eptx);
+#else
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
 
@@ -223,12 +257,29 @@ kqswnal_finalise (void)
 
        if (kqswnal_data.kqn_eptx != NULL)
                ep_free_large_xmtr (kqswnal_data.kqn_eptx);
+#endif
+       /**********************************************************************/
+       /* flag threads to terminate, wake them and wait for them to die */
+
+       kqswnal_data.kqn_shuttingdown = 2;
+       wake_up_all (&kqswnal_data.kqn_sched_waitq);
+
+       while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
+               CDEBUG(D_NET, "waiting for %d threads to terminate\n",
+                      atomic_read (&kqswnal_data.kqn_nthreads));
+               set_current_state (TASK_UNINTERRUPTIBLE);
+               schedule_timeout (HZ);
+       }
 
        /**********************************************************************/
        /* No more threads.  No more portals, router or comms callbacks!
         * I control the horizontals and the verticals...
         */
 
+#if MULTIRAIL_EKC
+       LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
+#endif
+
        /**********************************************************************/
        /* Complete any blocked forwarding packets with error
         */
@@ -260,27 +311,73 @@ kqswnal_finalise (void)
        /* Unmap message buffers and free all descriptors and buffers
         */
 
+#if MULTIRAIL_EKC
+       /* FTTB, we need to unmap any remaining mapped memory.  When
+        * ep_dvma_release() get fixed (and releases any mappings in the
+        * region), we can delete all the code from here -------->  */
+
+       if (kqswnal_data.kqn_txds != NULL) {
+               int  i;
+
+               for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
+                       kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
+
+                       /* If ktx has a buffer, it got mapped; unmap now.
+                        * NB only the pre-mapped stuff is still mapped
+                        * since all tx descs must be idle */
+
+                       if (ktx->ktx_buffer != NULL)
+                               ep_dvma_unload(kqswnal_data.kqn_ep,
+                                              kqswnal_data.kqn_ep_tx_nmh,
+                                              &ktx->ktx_ebuffer);
+               }
+       }
+
+       if (kqswnal_data.kqn_rxds != NULL) {
+               int   i;
+
+               for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
+                       kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
+
+                       /* If krx_pages[0] got allocated, it got mapped.
+                        * NB subsequent pages get merged */
+
+                       if (krx->krx_pages[0] != NULL)
+                               ep_dvma_unload(kqswnal_data.kqn_ep,
+                                              kqswnal_data.kqn_ep_rx_nmh,
+                                              &krx->krx_elanbuffer);
+               }
+       }
+       /* <----------- to here */
+
+       if (kqswnal_data.kqn_ep_rx_nmh != NULL)
+               ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
+
+       if (kqswnal_data.kqn_ep_tx_nmh != NULL)
+               ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
+#else
        if (kqswnal_data.kqn_eprxdmahandle != NULL)
        {
-               elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+               elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                                  kqswnal_data.kqn_eprxdmahandle, 0,
                                  KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
                                  KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
 
-               elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
+               elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
                                  kqswnal_data.kqn_eprxdmahandle);
        }
 
        if (kqswnal_data.kqn_eptxdmahandle != NULL)
        {
-               elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+               elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                                  kqswnal_data.kqn_eptxdmahandle, 0,
                                  KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
                                                      KQSW_NNBLK_TXMSGS));
 
-               elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
+               elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
                                  kqswnal_data.kqn_eptxdmahandle);
        }
+#endif
 
        if (kqswnal_data.kqn_txds != NULL)
        {
@@ -331,7 +428,11 @@ kqswnal_finalise (void)
 static int __init
 kqswnal_initialise (void)
 {
+#if MULTIRAIL_EKC
+       EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
+#else
        ELAN3_DMA_REQUEST dmareq;
+#endif
        int               rc;
        int               i;
        int               elan_page_idx;
@@ -351,9 +452,19 @@ kqswnal_initialise (void)
 
        kqswnal_lib.nal_data = &kqswnal_data;
 
+       memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
+       memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
+#if MULTIRAIL_EKC
+       kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
+#else
+       kqswnal_rpc_failed.Status = -ECONNREFUSED;
+#endif
        /* ensure all pointers NULL etc */
        memset (&kqswnal_data, 0, sizeof (kqswnal_data));
 
+       kqswnal_data.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
+       kqswnal_data.kqn_copy_small_fwd = KQSW_COPY_SMALL_FWD;
+
        kqswnal_data.kqn_cb = &kqswnal_lib;
 
        INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
@@ -375,24 +486,38 @@ kqswnal_initialise (void)
        /* pointers/lists/locks initialised */
        kqswnal_data.kqn_init = KQN_INIT_DATA;
 
+#if MULTIRAIL_EKC
+       kqswnal_data.kqn_ep = ep_system();
+       if (kqswnal_data.kqn_ep == NULL) {
+               CERROR("Can't initialise EKC\n");
+               return (-ENODEV);
+       }
+
+       if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
+               CERROR("Can't get elan ID\n");
+               kqswnal_finalise();
+               return (-ENODEV);
+       }
+#else
        /**********************************************************************/
        /* Find the first Elan device */
 
-       kqswnal_data.kqn_epdev = ep_device (0);
-       if (kqswnal_data.kqn_epdev == NULL)
+       kqswnal_data.kqn_ep = ep_device (0);
+       if (kqswnal_data.kqn_ep == NULL)
        {
                CERROR ("Can't get elan device 0\n");
-               return (-ENOMEM);
+               return (-ENODEV);
        }
+#endif
 
        kqswnal_data.kqn_nid_offset = 0;
-       kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_epdev);
-       kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_epdev);
+       kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
+       kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
        
        /**********************************************************************/
        /* Get the transmitter */
 
-       kqswnal_data.kqn_eptx = ep_alloc_large_xmtr (kqswnal_data.kqn_epdev);
+       kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
        if (kqswnal_data.kqn_eptx == NULL)
        {
                CERROR ("Can't allocate transmitter\n");
@@ -403,9 +528,9 @@ kqswnal_initialise (void)
        /**********************************************************************/
        /* Get the receivers */
 
-       kqswnal_data.kqn_eprx_small = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
-                                                            EP_SVC_LARGE_PORTALS_SMALL,
-                                                            KQSW_EP_ENVELOPES_SMALL);
+       kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
+                                                    EP_MSG_SVC_PORTALS_SMALL,
+                                                    KQSW_EP_ENVELOPES_SMALL);
        if (kqswnal_data.kqn_eprx_small == NULL)
        {
                CERROR ("Can't install small msg receiver\n");
@@ -413,9 +538,9 @@ kqswnal_initialise (void)
                return (-ENOMEM);
        }
 
-       kqswnal_data.kqn_eprx_large = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
-                                                            EP_SVC_LARGE_PORTALS_LARGE,
-                                                            KQSW_EP_ENVELOPES_LARGE);
+       kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
+                                                    EP_MSG_SVC_PORTALS_LARGE,
+                                                    KQSW_EP_ENVELOPES_LARGE);
        if (kqswnal_data.kqn_eprx_large == NULL)
        {
                CERROR ("Can't install large msg receiver\n");
@@ -427,13 +552,23 @@ kqswnal_initialise (void)
        /* Reserve Elan address space for transmit descriptors NB we may
         * either send the contents of associated buffers immediately, or
         * map them for the peer to suck/blow... */
-
+#if MULTIRAIL_EKC
+       kqswnal_data.kqn_ep_tx_nmh = 
+               ep_dvma_reserve(kqswnal_data.kqn_ep,
+                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
+                               EP_PERM_WRITE);
+       if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
+               CERROR("Can't reserve tx dma space\n");
+               kqswnal_finalise();
+               return (-ENOMEM);
+       }
+#else
         dmareq.Waitfn   = DDI_DMA_SLEEP;
         dmareq.ElanAddr = (E3_Addr) 0;
         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
 
-       rc = elan3_dma_reserve(kqswnal_data.kqn_epdev->DmaState,
+       rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
                              KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
                              &dmareq, &kqswnal_data.kqn_eptxdmahandle);
        if (rc != DDI_SUCCESS)
@@ -442,16 +577,27 @@ kqswnal_initialise (void)
                kqswnal_finalise ();
                return (-ENOMEM);
        }
-
+#endif
        /**********************************************************************/
        /* Reserve Elan address space for receive buffers */
-
+#if MULTIRAIL_EKC
+       kqswnal_data.kqn_ep_rx_nmh =
+               ep_dvma_reserve(kqswnal_data.kqn_ep,
+                               KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
+                               KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
+                               EP_PERM_WRITE);
+       if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
+               CERROR("Can't reserve rx dma space\n");
+               kqswnal_finalise();
+               return (-ENOMEM);
+       }
+#else
         dmareq.Waitfn   = DDI_DMA_SLEEP;
         dmareq.ElanAddr = (E3_Addr) 0;
         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
 
-       rc = elan3_dma_reserve (kqswnal_data.kqn_epdev->DmaState,
+       rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
                                KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
                                KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
                                &dmareq, &kqswnal_data.kqn_eprxdmahandle);
@@ -461,7 +607,7 @@ kqswnal_initialise (void)
                kqswnal_finalise ();
                return (-ENOMEM);
        }
-
+#endif
        /**********************************************************************/
        /* Allocate/Initialise transmit descriptors */
 
@@ -492,12 +638,17 @@ kqswnal_initialise (void)
                /* Map pre-allocated buffer NOW, to save latency on transmit */
                premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
                                                        KQSW_TX_BUFFER_SIZE);
-
-               elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+               ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
+                            ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
+                            kqswnal_data.kqn_ep_tx_nmh, basepage,
+                            &all_rails, &ktx->ktx_ebuffer);
+#else
+               elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                       kqswnal_data.kqn_eptxdmahandle,
                                       ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
                                       basepage, &ktx->ktx_ebuffer);
-
+#endif
                ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
                ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
 
@@ -527,7 +678,11 @@ kqswnal_initialise (void)
        elan_page_idx = 0;
        for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
        {
-               E3_Addr       elanaddr;
+#if MULTIRAIL_EKC
+               EP_NMD        elanbuffer;
+#else
+               E3_Addr       elanbuffer;
+#endif
                int           j;
                kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
 
@@ -554,18 +709,35 @@ kqswnal_initialise (void)
 
                        LASSERT(page_address(krx->krx_pages[j]) != NULL);
 
-                       elan3_dvma_kaddr_load(kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                       ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                                    page_address(krx->krx_pages[j]),
+                                    PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
+                                    elan_page_idx, &all_rails, &elanbuffer);
+                       
+                       if (j == 0) {
+                               krx->krx_elanbuffer = elanbuffer;
+                       } else {
+                               rc = ep_nmd_merge(&krx->krx_elanbuffer,
+                                                 &krx->krx_elanbuffer, 
+                                                 &elanbuffer);
+                               /* NB contiguous mapping */
+                               LASSERT(rc);
+                       }
+#else
+                       elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
                                              kqswnal_data.kqn_eprxdmahandle,
                                              page_address(krx->krx_pages[j]),
                                              PAGE_SIZE, elan_page_idx,
-                                             &elanaddr);
-                       elan_page_idx++;
-
+                                             &elanbuffer);
                        if (j == 0)
-                               krx->krx_elanaddr = elanaddr;
+                               krx->krx_elanbuffer = elanbuffer;
+
+                       /* NB contiguous mapping */
+                       LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
+#endif
+                       elan_page_idx++;
 
-                       /* NB we assume a contiguous  */
-                       LASSERT (elanaddr == krx->krx_elanaddr + j * PAGE_SIZE);
                }
        }
        LASSERT (elan_page_idx ==
@@ -593,10 +765,15 @@ kqswnal_initialise (void)
                kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
 
                /* NB this enqueue can allocate/sleep (attr == 0) */
+#if MULTIRAIL_EKC
                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                                     krx->krx_elanaddr,
+                                     &krx->krx_elanbuffer, 0);
+#else
+               rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+                                     krx->krx_elanbuffer,
                                      krx->krx_npages * PAGE_SIZE, 0);
-               if (rc != ESUCCESS)
+#endif
+               if (rc != EP_SUCCESS)
                {
                        CERROR ("failed ep_queue_receive %d\n", rc);
                        kqswnal_finalise ();
@@ -629,6 +806,11 @@ kqswnal_initialise (void)
                return (rc);
        }
 
+#if CONFIG_SYSCTL
+        /* Press on regardless even if registering sysctl doesn't work */
+        kqswnal_data.kqn_sysctl = register_sysctl_table (kqswnal_top_ctl_table, 0);
+#endif
+
        PORTAL_SYMBOL_REGISTER(kqswnal_ni);
        kqswnal_data.kqn_init = KQN_INIT_ALL;
 
@@ -642,8 +824,8 @@ kqswnal_initialise (void)
 }
 
 
-MODULE_AUTHOR("W. Marcus Miller <marcusm@llnl.gov>");
-MODULE_DESCRIPTION("Kernel Quadrics Switch NAL v1.00");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
 MODULE_LICENSE("GPL");
 
 module_init (kqswnal_initialise);
index 4cb9ad9..3e4274b 100644 (file)
 #include <linux/config.h>
 #include <linux/module.h>
 
-#include <elan3/elanregs.h>
-#include <elan3/elandev.h>
-#include <elan3/elanvp.h>
-#include <elan3/elan3mmu.h>
-#include <elan3/elanctxt.h>
-#include <elan3/elandebug.h>
-#include <elan3/urom_addrs.h>
-#include <elan3/busops.h>
-#include <elan3/kcomm.h>
+#if MULTIRAIL_EKC
+# include <elan/epcomms.h>
+#else
+# include <elan3/elanregs.h>
+# include <elan3/elandev.h>
+# include <elan3/elanvp.h>
+# include <elan3/elan3mmu.h>
+# include <elan3/elanctxt.h>
+# include <elan3/elandebug.h>
+# include <elan3/urom_addrs.h>
+# include <elan3/busops.h>
+# include <elan3/kcomm.h>
+#endif
 
 #include <linux/kernel.h>
 #include <linux/mm.h>
@@ -66,7 +70,7 @@
 #include <linux/file.h>
 #include <linux/stat.h>
 #include <linux/list.h>
-#include <asm/uaccess.h>
+#include <linux/sysctl.h>
 #include <asm/segment.h>
 
 #define DEBUG_SUBSYSTEM S_QSWNAL
@@ -75,8 +79,6 @@
 #include <portals/p30.h>
 #include <portals/lib-p30.h>
 
-#define KQSW_OPTIMIZE_GETS 1
-
 #define KQSW_CHECKSUM   0
 #if KQSW_CHECKSUM
 typedef unsigned long kqsw_csum_t;
@@ -87,13 +89,6 @@ typedef unsigned long kqsw_csum_t;
 #define KQSW_HDR_SIZE   (sizeof (ptl_hdr_t) + KQSW_CSUM_SIZE)
 
 /*
- *  Elan NAL
- */
-#define EP_SVC_LARGE_PORTALS_SMALL      (0x10)  /* Portals over elan port number (large payloads) */
-#define EP_SVC_LARGE_PORTALS_LARGE      (0x11)  /* Portals over elan port number (small payloads) */
-/* NB small/large message sizes are GLOBAL constants */
-
-/*
  * Performance Tuning defines
  * NB no mention of PAGE_SIZE for interoperability
  */
@@ -113,6 +108,9 @@ typedef unsigned long kqsw_csum_t;
 
 #define KQSW_RESCHED                    100     /* # busy loops that forces scheduler to yield */
 
+#define KQSW_OPTIMIZED_GETS             1       /* optimized gets? */
+#define KQSW_COPY_SMALL_FWD             0       /* copy small fwd messages to pre-mapped buffer? */
+
 /*
  * derived constants
  */
@@ -136,8 +134,12 @@ typedef unsigned long kqsw_csum_t;
 /* Remote memory descriptor */
 typedef struct
 {
-        __u32            kqrmd_neiov;           /* # frags */
-        EP_IOVEC         kqrmd_eiov[0];         /* actual frags */
+        __u32            kqrmd_nfrag;           /* # frags */
+#if MULTIRAIL_EKC
+        EP_NMD           kqrmd_frag[0];         /* actual frags */
+#else
+        EP_IOVEC         kqrmd_frag[0];         /* actual frags */
+#endif
 } kqswnal_remotemd_t;
 
 typedef struct 
@@ -145,11 +147,16 @@ typedef struct
         struct list_head krx_list;              /* enqueue -> thread */
         EP_RCVR         *krx_eprx;              /* port to post receives to */
         EP_RXD          *krx_rxd;               /* receive descriptor (for repost) */
-        E3_Addr          krx_elanaddr;          /* Elan address of buffer (contiguous in elan vm) */
+#if MULTIRAIL_EKC
+        EP_NMD           krx_elanbuffer;        /* contiguous Elan buffer */
+#else
+        E3_Addr          krx_elanbuffer;        /* contiguous Elan buffer */
+#endif
         int              krx_npages;            /* # pages in receive buffer */
         int              krx_nob;               /* Number Of Bytes received into buffer */
-        atomic_t         krx_refcount;          /* who's using me? */
-        int              krx_rpc_completed;     /* I completed peer's RPC */
+        int              krx_rpc_reply_needed;  /* peer waiting for EKC RPC reply */
+        int              krx_rpc_reply_sent;    /* rpc reply sent */
+        atomic_t         krx_refcount;          /* how to tell when rpc is done */
         kpr_fwd_desc_t   krx_fwd;               /* embedded forwarding descriptor */
         struct page     *krx_pages[KQSW_NRXMSGPAGES_LARGE]; /* pages allocated */
         struct iovec     krx_iov[KQSW_NRXMSGPAGES_LARGE]; /* iovec for forwarding */
@@ -159,15 +166,15 @@ typedef struct
 {
         struct list_head  ktx_list;             /* enqueue idle/active */
         struct list_head  ktx_delayed_list;     /* enqueue delayedtxds */
-        int               ktx_isnblk:1;         /* reserved descriptor? */
-        int               ktx_state:7;          /* What I'm doing */
+        unsigned int      ktx_isnblk:1;         /* reserved descriptor? */
+        unsigned int      ktx_state:7;          /* What I'm doing */
+        unsigned int      ktx_firsttmpfrag:1;   /* ktx_frags[0] is in my ebuffer ? 0 : 1 */
         uint32_t          ktx_basepage;         /* page offset in reserved elan tx vaddrs for mapping pages */
         int               ktx_npages;           /* pages reserved for mapping messages */
         int               ktx_nmappedpages;     /* # pages mapped for current message */
         int               ktx_port;             /* destination ep port */
         ptl_nid_t         ktx_nid;              /* destination node */
         void             *ktx_args[2];          /* completion passthru */
-        E3_Addr           ktx_ebuffer;          /* elan address of ktx_buffer */
         char             *ktx_buffer;           /* pre-allocated contiguous buffer for hdr + small payloads */
         unsigned long     ktx_launchtime;       /* when (in jiffies) the transmit was launched */
 
@@ -176,10 +183,13 @@ typedef struct
         ptl_hdr_t        *ktx_wire_hdr;         /* portals header (wire endian) */
 
         int               ktx_nfrag;            /* # message frags */
-        union {
-                EP_IOVEC   iov[EP_MAXFRAG];     /* msg frags (elan vaddrs) */
-                EP_DATAVEC datav[EP_MAXFRAG];   /* DMA frags (eolan vaddrs) */
-        }                 ktx_frags;
+#if MULTIRAIL_EKC
+        EP_NMD            ktx_ebuffer;          /* elan mapping of ktx_buffer */
+        EP_NMD            ktx_frags[EP_MAXFRAG];/* elan mapping of msg frags */
+#else
+        E3_Addr           ktx_ebuffer;          /* elan address of ktx_buffer */
+        EP_IOVEC          ktx_frags[EP_MAXFRAG];/* msg frags (elan vaddrs) */
+#endif
 } kqswnal_tx_t;
 
 #define KTX_IDLE        0                       /* on kqn_(nblk_)idletxds */
@@ -191,8 +201,15 @@ typedef struct
 {
         char               kqn_init;            /* what's been initialised */
         char               kqn_shuttingdown;    /* I'm trying to shut down */
-        atomic_t           kqn_nthreads;        /* # threads still running */
+        atomic_t           kqn_nthreads;        /* # threads not terminated */
+        atomic_t           kqn_nthreads_running;/* # threads still running */
+
+        int                kqn_optimized_gets;  /* optimized GETs? */
+        int                kqn_copy_small_fwd;  /* fwd small msgs from pre-allocated buffer? */
 
+#if CONFIG_SYSCTL
+        struct ctl_table_header *kqn_sysctl;    /* sysctl interface */
+#endif        
         kqswnal_rx_t      *kqn_rxds;            /* all the receive descriptors */
         kqswnal_tx_t      *kqn_txds;            /* all the transmit descriptors */
 
@@ -212,12 +229,18 @@ typedef struct
 
         spinlock_t         kqn_statelock;       /* cb_cli/cb_sti */
         nal_cb_t          *kqn_cb;              /* -> kqswnal_lib */
-        EP_DEV            *kqn_epdev;           /* elan device */
+#if MULTIRAIL_EKC
+        EP_SYS            *kqn_ep;              /* elan system */
+        EP_NMH            *kqn_ep_tx_nmh;       /* elan reserved tx vaddrs */
+        EP_NMH            *kqn_ep_rx_nmh;       /* elan reserved rx vaddrs */
+#else
+        EP_DEV            *kqn_ep;              /* elan device */
+        ELAN3_DMA_HANDLE  *kqn_eptxdmahandle;   /* elan reserved tx vaddrs */
+        ELAN3_DMA_HANDLE  *kqn_eprxdmahandle;   /* elan reserved rx vaddrs */
+#endif
         EP_XMTR           *kqn_eptx;            /* elan transmitter */
         EP_RCVR           *kqn_eprx_small;      /* elan receiver (small messages) */
         EP_RCVR           *kqn_eprx_large;      /* elan receiver (large messages) */
-        ELAN3_DMA_HANDLE  *kqn_eptxdmahandle;   /* elan reserved tx vaddrs */
-        ELAN3_DMA_HANDLE  *kqn_eprxdmahandle;   /* elan reserved rx vaddrs */
         kpr_router_t       kqn_router;          /* connection to Kernel Portals Router module */
 
         ptl_nid_t          kqn_nid_offset;      /* this cluster's NID offset */
@@ -235,11 +258,15 @@ extern nal_cb_t        kqswnal_lib;
 extern nal_t           kqswnal_api;
 extern kqswnal_data_t  kqswnal_data;
 
+/* global pre-prepared replies to keep off the stack */
+extern EP_STATUSBLK    kqswnal_rpc_success;
+extern EP_STATUSBLK    kqswnal_rpc_failed;
+
 extern int kqswnal_thread_start (int (*fn)(void *arg), void *arg);
 extern void kqswnal_rxhandler(EP_RXD *rxd);
 extern int kqswnal_scheduler (void *);
 extern void kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
-extern void kqswnal_reply_complete (EP_RXD *rxd);
+extern void kqswnal_dma_reply_complete (EP_RXD *rxd);
 extern void kqswnal_requeue_rx (kqswnal_rx_t *krx);
 
 static inline ptl_nid_t
@@ -281,4 +308,87 @@ static inline kqsw_csum_t kqsw_csum (kqsw_csum_t sum, void *base, int nob)
 }
 #endif
 
+static inline void kqswnal_rx_done (kqswnal_rx_t *krx)
+{
+        LASSERT (atomic_read (&krx->krx_refcount) > 0);
+        if (atomic_dec_and_test (&krx->krx_refcount))
+                kqswnal_requeue_rx(krx);
+}
+
+#if MULTIRAIL_EKC
+
+#if (!defined(EP_RAILMASK_ALL) && !defined(EP_SHUTDOWN))
+/* These are making their way into the EKC subsystem.... */
+# define EP_RAILMASK_ALL    0xFFFF
+# define EP_SHUTDOWN        EP_ABORT
+#else
+/* ...Oh! they've got there already! */
+# error "qswnal.h older than EKC headers"
+#endif
+
+static inline int
+ep_nmd_merge (EP_NMD *merged, EP_NMD *a, EP_NMD *b)
+{
+        if (EP_NMD_NODEID(a) != EP_NMD_NODEID(b)) /* not generated on the same node */
+                return 0;
+
+        if ((EP_NMD_RAILMASK(a) & EP_NMD_RAILMASK(b)) == 0) /* no common rails */
+                return 0;
+
+        if (b->nmd_addr == (a->nmd_addr + a->nmd_len)) {
+                if (merged != NULL) {
+                        merged->nmd_addr = a->nmd_addr;
+                        merged->nmd_len  = a->nmd_len + b->nmd_len;
+                        merged->nmd_attr = EP_NMD_ATTR(EP_NMD_NODEID(a), EP_NMD_RAILMASK(a) & EP_NMD_RAILMASK(b));
+                }
+                return 1;
+        }
+    
+        if (a->nmd_addr == (b->nmd_addr + b->nmd_len)) {
+                if (merged != NULL) {
+                        merged->nmd_addr = b->nmd_addr;
+                        merged->nmd_len   = b->nmd_len + a->nmd_len;
+                        merged->nmd_attr  = EP_NMD_ATTR(EP_NMD_NODEID(b), EP_NMD_RAILMASK(a) & EP_NMD_RAILMASK(b));
+                }
+                return 1;
+        }
+
+        return 0;
+}
+#else
+/* multirail defines these in <elan/epcomms.h> */
+#define EP_MSG_SVC_PORTALS_SMALL      (0x10)  /* Portals over elan port number (large payloads) */
+#define EP_MSG_SVC_PORTALS_LARGE      (0x11)  /* Portals over elan port number (small payloads) */
+/* NB small/large message sizes are GLOBAL constants */
+
+/* A minimal attempt to minimise inline #ifdeffing */
+
+#define EP_SUCCESS      ESUCCESS
+#define EP_ENOMEM      ENOMEM
+
+static inline EP_XMTR *
+ep_alloc_xmtr(EP_DEV *e) 
+{
+        return (ep_alloc_large_xmtr(e));
+}
+
+static inline EP_RCVR *
+ep_alloc_rcvr(EP_DEV *e, int svc, int nenv)
+{
+        return (ep_install_large_rcvr(e, svc, nenv));
+}
+
+static inline void
+ep_free_xmtr(EP_XMTR *x) 
+{
+        ep_free_large_xmtr(x);
+}
+
+static inline void
+ep_free_rcvr(EP_RCVR *r)
+{
+        ep_remove_large_rcvr(r);
+}
+#endif
+
 #endif /* _QSWNAL_H */
index 43926c9..96749cd 100644 (file)
@@ -26,6 +26,9 @@
 
 #include "qswnal.h"
 
+EP_STATUSBLK  kqswnal_rpc_success;
+EP_STATUSBLK  kqswnal_rpc_failed;
+
 /*
  *  LIB functions follow
  *
@@ -128,9 +131,22 @@ kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
 void
 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
 {
+#if MULTIRAIL_EKC
+        int      i;
+#endif
+
         if (ktx->ktx_nmappedpages == 0)
                 return;
-
+        
+#if MULTIRAIL_EKC
+        CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
+               ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
+
+        for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
+                ep_dvma_unload(kqswnal_data.kqn_ep,
+                               kqswnal_data.kqn_ep_tx_nmh,
+                               &ktx->ktx_frags[i]);
+#else
         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
 
@@ -138,9 +154,11 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
 
-        elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+        elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                           kqswnal_data.kqn_eptxdmahandle,
                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
+
+#endif
         ktx->ktx_nmappedpages = 0;
 }
 
@@ -152,12 +170,24 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
         char     *ptr;
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
         
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
         LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
-        
+
         do {
                 int  fraglen = kiov->kiov_len;
 
@@ -188,25 +218,40 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                        "%p[%d] loading %p for %d, page %d, %d total\n",
                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             ptr, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        ptr, fraglen,
-                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
-
-                kunmap (kiov->kiov_page);
-                
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                kunmap (kiov->kiov_page);
+                
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage++;
                 kiov++;
@@ -232,8 +277,20 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
-
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
+        
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
         LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
@@ -263,22 +320,38 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                         ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
                         nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             iov->iov_base, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        iov->iov_base, fraglen,
-                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage += npages;
                 iov++;
@@ -424,7 +497,6 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
                 break;
 
         case KTX_GETTING:          /* Peer has DMA-ed direct? */
-                LASSERT (KQSW_OPTIMIZE_GETS);
                 msg = (lib_msg_t *)ktx->ktx_args[1];
                 repmsg = NULL;
 
@@ -455,8 +527,8 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 
         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
 
-        if (status != EP_SUCCESS)
-        {
+        if (status != EP_SUCCESS) {
+
                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
                         ktx->ktx_nid, status);
 
@@ -466,8 +538,11 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
         } else if (ktx->ktx_state == KTX_GETTING) {
                 /* RPC completed OK; what did our peer put in the status
                  * block? */
-                LASSERT (KQSW_OPTIMIZE_GETS);
+#if MULTIRAIL_EKC
+                status = ep_txd_statusblk(txd)->Data[0];
+#else
                 status = ep_txd_statusblk(txd)->Status;
+#endif
         } else {
                 status = 0;
         }
@@ -488,21 +563,38 @@ kqswnal_launch (kqswnal_tx_t *ktx)
 
         LASSERT (dest >= 0);                    /* must be a peer */
         if (ktx->ktx_state == KTX_GETTING) {
-                LASSERT (KQSW_OPTIMIZE_GETS);
+                /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
+                 * other frags are the GET sink which we obviously don't
+                 * send here :) */
+#if MULTIRAIL_EKC
+                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+                                     ktx->ktx_port, attr,
+                                     kqswnal_txhandler, ktx,
+                                     NULL, ktx->ktx_frags, 1);
+#else
                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
                                      ktx->ktx_port, attr, kqswnal_txhandler,
-                                     ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+                                     ktx, NULL, ktx->ktx_frags, 1);
+#endif
         } else {
+#if MULTIRAIL_EKC
+                rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
+                                         ktx->ktx_port, attr,
+                                         kqswnal_txhandler, ktx,
+                                         NULL, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
-                                       ktx->ktx_port, attr, kqswnal_txhandler,
-                                       ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+                                       ktx->ktx_port, attr, 
+                                       kqswnal_txhandler, ktx, 
+                                       ktx->ktx_frags, ktx->ktx_nfrag);
+#endif
         }
 
         switch (rc) {
-        case ESUCCESS: /* success */
+        case EP_SUCCESS: /* success */
                 return (0);
 
-        case ENOMEM: /* can't allocate ep txd => queue for later */
+        case EP_ENOMEM: /* can't allocate ep txd => queue for later */
                 LASSERT (in_interrupt());
 
                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
@@ -516,7 +608,7 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         default: /* fatal error */
                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
                 kqswnal_notify_peer_down(ktx);
-                return (rc);
+                return (-EHOSTUNREACH);
         }
 }
 
@@ -589,6 +681,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
 
 }                               /* end of print_hdr() */
 
+#if !MULTIRAIL_EKC
 void
 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
 {
@@ -648,6 +741,7 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
         CERROR ("DATAVEC too small\n");
         return (-E2BIG);
 }
+#endif
 
 int
 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
@@ -656,14 +750,17 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
         char               *buffer = (char *)page_address(krx->krx_pages[0]);
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
-        EP_IOVEC            eiov[EP_MAXFRAG];
-        EP_STATUSBLK        blk;
         int                 rc;
-
-        LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
+#if MULTIRAIL_EKC
+        int                 i;
+#else
+        EP_DATAVEC          datav[EP_MAXFRAG];
+        int                 ndatav;
+#endif
+        LASSERT (krx->krx_rpc_reply_needed);
         LASSERT ((iov == NULL) != (kiov == NULL));
 
-        /* see .*_pack_k?iov comment regarding endian-ness */
+        /* see kqswnal_sendmsg comment regarding endian-ness */
         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
                 /* msg too small to discover rmd size */
                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
@@ -671,16 +768,16 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                 return (-EINVAL);
         }
         
-        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
+        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
                 /* rmd doesn't fit in the incoming message */
                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
-                        krx->krx_nob, rmd->kqrmd_neiov,
-                        (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
+                        krx->krx_nob, rmd->kqrmd_nfrag,
+                        (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
                 return (-EINVAL);
         }
 
-        /* Ghastly hack part 1, uses the existing procedures to map the source data... */
-        ktx->ktx_nfrag = 0;
+        /* Map the source data... */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
         if (kiov != NULL)
                 rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
         else
@@ -691,32 +788,61 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                 return (rc);
         }
 
-        /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
-        memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
-
-        rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
-                                  ktx->ktx_nfrag, eiov,
-                                  rmd->kqrmd_neiov, rmd->kqrmd_eiov);
-        if (rc < 0) {
-                CERROR ("Can't create datavec: %d\n", rc);
-                return (rc);
+#if MULTIRAIL_EKC
+        if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       ktx->ktx_nfrag, rmd->kqrmd_nfrag);
+                return (-EINVAL);
         }
-        ktx->ktx_nfrag = rc;
-
-        memset (&blk, 0, sizeof (blk));         /* zero blk.Status */
+        
+        for (i = 0; i < rmd->kqrmd_nfrag; i++)
+                if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, rmd->kqrmd_nfrag, 
+                               ktx->ktx_frags[i].nmd_len, 
+                               rmd->kqrmd_frag[i].nmd_len);
+                        return (-EINVAL);
+                }
+#else
+        ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
+                                      ktx->ktx_nfrag, ktx->ktx_frags,
+                                      rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (ndatav < 0) {
+                CERROR ("Can't create datavec: %d\n", ndatav);
+                return (ndatav);
+        }
+#endif
 
-        /* Our caller will start to race with kqswnal_rpc_complete... */
+        /* Our caller will start to race with kqswnal_dma_reply_complete... */
         LASSERT (atomic_read (&krx->krx_refcount) == 1);
         atomic_set (&krx->krx_refcount, 2);
 
-        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
-                              &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
-        if (rc == ESUCCESS)
+#if MULTIRAIL_EKC
+        rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
+                             &kqswnal_rpc_success,
+                             ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
+        if (rc == EP_SUCCESS)
+                return (0);
+
+        /* Well we tried... */
+        krx->krx_rpc_reply_needed = 0;
+#else
+        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+                              &kqswnal_rpc_success, datav, ndatav);
+        if (rc == EP_SUCCESS)
                 return (0);
 
+        /* "old" EKC destroys rxd on failed completion */
+        krx->krx_rxd = NULL;
+#endif
+
+        CERROR("can't complete RPC: %d\n", rc);
+
         /* reset refcount back to 1: we're not going to be racing with
-         * kqswnal_rely_complete. */
+         * kqswnal_dma_reply_complete. */
         atomic_set (&krx->krx_refcount, 1);
+
         return (-ECONNABORTED);
 }
 
@@ -785,12 +911,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_NOSPACE);
         }
 
+        ktx->ktx_nid     = targetnid;
         ktx->ktx_args[0] = private;
         ktx->ktx_args[1] = libmsg;
 
-#if KQSW_OPTIMIZE_GETS
         if (type == PTL_MSG_REPLY &&
-            ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
+            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
                 if (nid != targetnid ||
                     kqswnal_nid2elanid(nid) != 
                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
@@ -798,7 +924,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                "nid "LPX64" via "LPX64" elanID %d\n",
                                nid, targetnid,
                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        return(PTL_FAIL);
+                        return (PTL_FAIL);
                 }
 
                 /* peer expects RPC completion with GET data */
@@ -806,13 +932,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                         payload_niov, payload_iov, 
                                         payload_kiov, payload_nob);
                 if (rc == 0)
-                        return (0);
+                        return (PTL_OK);
                 
                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
                 kqswnal_put_idle_tx (ktx);
                 return (PTL_FAIL);
         }
-#endif
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
@@ -838,15 +963,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
 #endif
         
-        /* Set up first frag from pre-mapped buffer (it's at least the
-         * portals header) */
-        ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
-        ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
-        ktx->ktx_nfrag = 1;
-        ktx->ktx_state = KTX_SENDING;   /* => lib_finalize() on completion */
-
-#if KQSW_OPTIMIZE_GETS
-        if (type == PTL_MSG_GET &&              /* doing a GET */
+        if (kqswnal_data.kqn_optimized_gets &&
+            type == PTL_MSG_GET &&              /* doing a GET */
             nid == targetnid) {                 /* not forwarding */
                 lib_md_t           *md = libmsg->md;
                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
@@ -856,8 +974,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  *
                  * First I set up ktx as if it was going to send this
                  * payload, (it needs to map it anyway).  This fills
-                 * ktx_frags.iov[1] and onward with the network addresses
-                 * of the get sink frags.  I copy these into ktx_buffer,
+                 * ktx_frags[1] and onward with the network addresses
+                 * of the GET sink frags.  I copy these into ktx_buffer,
                  * immediately after the header, and send that as my GET
                  * message.
                  *
@@ -865,6 +983,9 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  * When EKC copes with different endian nodes, I'll fix
                  * this (and eat my hat :) */
 
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_GETTING;
+
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
                         rc = kqswnal_map_tx_kiov (ktx, md->length,
                                                   md->md_niov, md->md_iov.kiov);
@@ -877,46 +998,73 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                         return (PTL_FAIL);
                 }
 
-                rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
-                memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
-                        rmd->kqrmd_neiov * sizeof (EP_IOVEC));
+                rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
-                ktx->ktx_nfrag = 1;
-                ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
-                                                       kqrmd_eiov[rmd->kqrmd_neiov]);
-                payload_nob = ktx->ktx_frags.iov[0].Len;
-                ktx->ktx_state = KTX_GETTING;
-        } else 
+                payload_nob = offsetof(kqswnal_remotemd_t,
+                                       kqrmd_frag[rmd->kqrmd_nfrag]);
+                LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
+
+#if MULTIRAIL_EKC
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_NMD));
+
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
+                
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+#endif
+        } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
+
+                /* small message: single frag copied into the pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
 #endif
-        if (payload_nob > 0) { /* got some payload (something more to do) */
-                /* make a single contiguous message? */
-                if (payload_nob <= KQSW_TX_MAXCONTIG) {
-                        /* copy payload to ktx_buffer, immediately after hdr */
+                if (payload_nob > 0) {
                         if (payload_kiov != NULL)
                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
                                                    payload_niov, payload_kiov, payload_nob);
                         else
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
                                                   payload_niov, payload_iov, payload_nob);
-                        /* first frag includes payload */
-                        ktx->ktx_frags.iov[0].Len += payload_nob;
-                } else {
-                        if (payload_kiov != NULL)
-                                rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
-                                                          payload_niov, payload_kiov);
-                        else
-                                rc = kqswnal_map_tx_iov (ktx, payload_nob,
-                                                         payload_niov, payload_iov);
-                        if (rc != 0) {
-                                kqswnal_put_idle_tx (ktx);
-                                return (PTL_FAIL);
-                        }
-                } 
-        }
+                }
+        } else {
 
-        ktx->ktx_nid  = targetnid;
+                /* large message: multiple frags: first is hdr in pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                if (payload_kiov != NULL)
+                        rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
+                                                  payload_niov, payload_kiov);
+                else
+                        rc = kqswnal_map_tx_iov (ktx, payload_nob,
+                                                 payload_niov, payload_iov);
+                if (rc != 0) {
+                        kqswnal_put_idle_tx (ktx);
+                        return (PTL_FAIL);
+                }
+        }
+        
         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
+                        EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
         if (rc != 0) {                    /* failed? */
@@ -962,8 +1110,6 @@ kqswnal_send_pages (nal_cb_t     *nal,
                                  payload_niov, NULL, payload_kiov, payload_nob));
 }
 
-int kqswnal_fwd_copy_contig = 0;
-
 void
 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
@@ -984,7 +1130,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 
         LASSERT (niov > 0);
         
-        ktx = kqswnal_get_idle_tx (fwd, FALSE);
+        ktx = kqswnal_get_idle_tx (fwd, 0);
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
 
@@ -1005,20 +1151,31 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 goto failed;
         }
 
-        if ((kqswnal_fwd_copy_contig || niov > 1) &&
+        ktx->ktx_port    = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+                           EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
+        ktx->ktx_nid     = nid;
+        ktx->ktx_state   = KTX_FORWARDING;
+        ktx->ktx_args[0] = fwd;
+
+        if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
             nob <= KQSW_TX_BUFFER_SIZE) 
         {
-                /* send from ktx's pre-allocated/mapped contiguous buffer? */
+                /* send from ktx's pre-mapped contiguous buffer? */
                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
-                ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
-                ktx->ktx_frags.iov[0].Len = nob;
-                ktx->ktx_nfrag = 1;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = nob;
+#endif
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
         }
         else
         {
                 /* zero copy */
-                ktx->ktx_nfrag = 0;       /* no frags mapped yet */
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
                 if (rc != 0)
                         goto failed;
@@ -1026,12 +1183,6 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
-        ktx->ktx_port    = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid     = nid;
-        ktx->ktx_state   = KTX_FORWARDING; /* kpr_put_packet() on completion */
-        ktx->ktx_args[0] = fwd;
-
         rc = kqswnal_launch (ktx);
         if (rc == 0)
                 return;
@@ -1064,7 +1215,7 @@ kqswnal_fwd_callback (void *arg, int error)
 }
 
 void
-kqswnal_reply_complete (EP_RXD *rxd) 
+kqswnal_dma_reply_complete (EP_RXD *rxd) 
 {
         int           status = ep_rxd_status(rxd);
         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
@@ -1075,9 +1226,10 @@ kqswnal_reply_complete (EP_RXD *rxd)
                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
 
         LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
 
-        krx->krx_rpc_completed = 1;
-        kqswnal_requeue_rx (krx);
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_done (krx);
 
         lib_finalize (&kqswnal_lib, NULL, msg);
         kqswnal_put_idle_tx (ktx);
@@ -1093,67 +1245,76 @@ kqswnal_rpc_complete (EP_RXD *rxd)
                "rxd %p, krx %p, status %d\n", rxd, krx, status);
 
         LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
         
-        krx->krx_rpc_completed = 1;
+        krx->krx_rpc_reply_needed = 0;
         kqswnal_requeue_rx (krx);
 }
 
 void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_requeue_rx (kqswnal_rx_t *krx) 
 {
-        EP_STATUSBLK  blk;
-        int           rc;
+        int   rc;
 
-        LASSERT (atomic_read (&krx->krx_refcount) > 0);
-        if (!atomic_dec_and_test (&krx->krx_refcount))
-                return;
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
 
-        if (!ep_rxd_isrpc(krx->krx_rxd) ||
-            krx->krx_rpc_completed) {
+        if (krx->krx_rpc_reply_needed) {
 
-                /* don't actually requeue on shutdown */
-                if (kqswnal_data.kqn_shuttingdown)
+                /* We failed to complete the peer's optimized GET (e.g. we
+                 * couldn't map the source buffers).  We complete the
+                 * peer's EKC rpc now with failure. */
+#if MULTIRAIL_EKC
+                rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
+                                     &kqswnal_rpc_failed, NULL, NULL, 0);
+                if (rc == EP_SUCCESS)
                         return;
                 
-                ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
-                                    krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
-                return;
-        }
-
-        /* Sender wanted an RPC, but we didn't complete it (we must have
-         * dropped the sender's message).  We complete it now with
-         * failure... */
-        memset (&blk, 0, sizeof (blk));
-        blk.Status = -ECONNREFUSED;
-
-        atomic_set (&krx->krx_refcount, 1);
+                CERROR("can't complete RPC: %d\n", rc);
+#else
+                if (krx->krx_rxd != NULL) {
+                        /* We didn't try (and fail) to complete earlier... */
+                        rc = ep_complete_rpc(krx->krx_rxd, 
+                                             kqswnal_rpc_complete, krx,
+                                             &kqswnal_rpc_failed, NULL, 0);
+                        if (rc == EP_SUCCESS)
+                                return;
+
+                        CERROR("can't complete RPC: %d\n", rc);
+                }
+                
+                /* NB the old ep_complete_rpc() frees rxd on failure, so we
+                 * have to requeue from scratch here, unless we're shutting
+                 * down */
+                if (kqswnal_data.kqn_shuttingdown)
+                        return;
 
-        rc = ep_complete_rpc (krx->krx_rxd, 
-                              kqswnal_rpc_complete, krx,
-                              &blk, NULL, 0);
-        if (rc == ESUCCESS) {
-                /* callback will call me again to requeue, having set
-                 * krx_rpc_completed... */
+                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+                                      krx->krx_elanbuffer, 
+                                      krx->krx_npages * PAGE_SIZE, 0);
+                LASSERT (rc == EP_SUCCESS);
+                /* We don't handle failure here; it's incredibly rare
+                 * (never reported?) and only happens with "old" EKC */
                 return;
+#endif
         }
 
-        CERROR("can't complete RPC: %d\n", rc);
-
-        /* we don't actually requeue on shutdown */
-        if (kqswnal_data.kqn_shuttingdown)
-                return;
-
-        /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
-         * from scratch here... */
-        rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                              krx->krx_elanaddr, 
-                              krx->krx_npages * PAGE_SIZE, 0);
-
-        LASSERT (rc == ESUCCESS);
-        /* This needs to be fixed by ep_complete_rpc NOT freeing
-         * krx->krx_rxd on failure so we can just ep_requeue_receive() */
+#if MULTIRAIL_EKC
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
+#else                
+        /* don't actually requeue on shutdown */
+        if (!kqswnal_data.kqn_shuttingdown) 
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
+#endif
 }
-
+        
 void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
@@ -1162,9 +1323,12 @@ kqswnal_rx (kqswnal_rx_t *krx)
         int             nob;
         int             niov;
 
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+
         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                /* NB krx requeued when lib_parse() calls back kqswnal_recv */
+                atomic_set(&krx->krx_refcount, 1);
                 lib_parse (&kqswnal_lib, hdr, krx);
+                kqswnal_rx_done(krx);
                 return;
         }
 
@@ -1212,18 +1376,27 @@ kqswnal_rxhandler(EP_RXD *rxd)
 
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
-        LASSERT (atomic_read (&krx->krx_refcount) == 0);
-        atomic_set (&krx->krx_refcount, 1);
-        krx->krx_rpc_completed = 0;
+#if MULTIRAIL_EKC
+        krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
+#else
+        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
+#endif
         
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
                 /* receives complete with failure when receiver is removed */
+#if MULTIRAIL_EKC
+                if (status == EP_SHUTDOWN)
+                        LASSERT (kqswnal_data.kqn_shuttingdown);
+                else
+                        CERROR("receive status failed with status %d nob %d\n",
+                               ep_rxd_status(rxd), nob);
+#else
                 if (!kqswnal_data.kqn_shuttingdown)
                         CERROR("receive status failed with status %d nob %d\n",
                                ep_rxd_status(rxd), nob);
-
+#endif
                 kqswnal_requeue_rx (krx);
                 return;
         }
@@ -1417,8 +1590,6 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #endif
         lib_finalize(nal, private, libmsg);
 
-        kqswnal_requeue_rx (krx);
-
         return (rlen);
 }
 
@@ -1455,6 +1626,7 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
+        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1473,6 +1645,7 @@ kqswnal_scheduler (void *arg)
         long             flags;
         int              rc;
         int              counter = 0;
+        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1480,9 +1653,21 @@ kqswnal_scheduler (void *arg)
         
         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
-        while (!kqswnal_data.kqn_shuttingdown)
+        for (;;)
         {
-                did_something = FALSE;
+                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
+
+                        if (kqswnal_data.kqn_shuttingdown == 2)
+                                break;
+                
+                        /* During stage 1 of shutdown we are still responsive
+                         * to receives */
+
+                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
+                        shuttingdown = kqswnal_data.kqn_shuttingdown;
+                }
+
+                did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
                 {
@@ -1494,11 +1679,12 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_rx (krx);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!shuttingdown &&
+                    !list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1514,11 +1700,12 @@ kqswnal_scheduler (void *arg)
                                 kqswnal_tx_done (ktx, rc);
                         }
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!shuttingdown &
+                    !list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
@@ -1526,7 +1713,7 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_fwd_packet (NULL, fwd);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
@@ -1539,7 +1726,7 @@ kqswnal_scheduler (void *arg)
 
                         if (!did_something) {
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
index f3f0fb4..b67e012 100644 (file)
@@ -253,7 +253,14 @@ AC_MSG_CHECKING(if quadrics kernel headers are present)
 if test -d $LINUX/drivers/net/qsnet ; then
   AC_MSG_RESULT(yes)
   QSWNAL="qswnal"
-  with_quadrics="-I$LINUX/drivers/net/qsnet/include"
+  AC_MSG_CHECKING(for multirail EKC)
+  if test -f $LINUX/include/elan/epcomms.h; then
+       AC_MSG_RESULT(supported)
+       with_quadrics="-DMULTIRAIL_EKC=1"
+  else
+       AC_MSG_RESULT(not supported)
+       with_quadrics="-I$LINUX/drivers/net/qsnet/include"
+  fi
   :
 elif test -d $LINUX/drivers/qsnet1 ; then
   AC_MSG_RESULT(yes)
index 70b45c0..127bbce 100644 (file)
@@ -35,6 +35,27 @@ kpr_nal_interface_t kqswnal_router_interface = {
        kprni_notify:   NULL,                   /* we're connectionless */
 };
 
+#if CONFIG_SYSCTL
+#define QSWNAL_SYSCTL  201
+
+#define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
+#define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
+
+static ctl_table kqswnal_ctl_table[] = {
+       {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
+        &kqswnal_data.kqn_optimized_gets, sizeof (int),
+        0644, NULL, &proc_dointvec},
+       {QSWNAL_SYSCTL_COPY_SMALL_FWD, "copy_small_fwd",
+        &kqswnal_data.kqn_copy_small_fwd, sizeof (int),
+        0644, NULL, &proc_dointvec},
+       {0}
+};
+
+static ctl_table kqswnal_top_ctl_table[] = {
+       {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
+       {0}
+};
+#endif
 
 static int
 kqswnal_forward(nal_t   *nal,
@@ -178,6 +199,10 @@ kqswnal_finalise (void)
                LASSERT (0);
 
        case KQN_INIT_ALL:
+#if CONFIG_SYSCTL
+                if (kqswnal_data.kqn_sysctl != NULL)
+                        unregister_sysctl_table (kqswnal_data.kqn_sysctl);
+#endif         
                PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
                 kportal_nal_unregister(QSWNAL);
                /* fall through */
@@ -200,21 +225,30 @@ kqswnal_finalise (void)
        kpr_shutdown (&kqswnal_data.kqn_router);
 
        /**********************************************************************/
-       /* flag threads to terminate, wake them and wait for them to die */
+       /* flag threads we've started to terminate and wait for all to ack */
 
        kqswnal_data.kqn_shuttingdown = 1;
        wake_up_all (&kqswnal_data.kqn_sched_waitq);
 
-       while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
-               CDEBUG(D_NET, "waiting for %d threads to terminate\n",
-                      atomic_read (&kqswnal_data.kqn_nthreads));
+       while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
+               CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
+                      atomic_read (&kqswnal_data.kqn_nthreads_running));
                set_current_state (TASK_UNINTERRUPTIBLE);
                schedule_timeout (HZ);
        }
 
        /**********************************************************************/
        /* close elan comms */
+#if MULTIRAIL_EKC
+       if (kqswnal_data.kqn_eprx_small != NULL)
+               ep_free_rcvr (kqswnal_data.kqn_eprx_small);
 
+       if (kqswnal_data.kqn_eprx_large != NULL)
+               ep_free_rcvr (kqswnal_data.kqn_eprx_large);
+
+       if (kqswnal_data.kqn_eptx != NULL)
+               ep_free_xmtr (kqswnal_data.kqn_eptx);
+#else
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
 
@@ -223,12 +257,29 @@ kqswnal_finalise (void)
 
        if (kqswnal_data.kqn_eptx != NULL)
                ep_free_large_xmtr (kqswnal_data.kqn_eptx);
+#endif
+       /**********************************************************************/
+       /* flag threads to terminate, wake them and wait for them to die */
+
+       kqswnal_data.kqn_shuttingdown = 2;
+       wake_up_all (&kqswnal_data.kqn_sched_waitq);
+
+       while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
+               CDEBUG(D_NET, "waiting for %d threads to terminate\n",
+                      atomic_read (&kqswnal_data.kqn_nthreads));
+               set_current_state (TASK_UNINTERRUPTIBLE);
+               schedule_timeout (HZ);
+       }
 
        /**********************************************************************/
        /* No more threads.  No more portals, router or comms callbacks!
         * I control the horizontals and the verticals...
         */
 
+#if MULTIRAIL_EKC
+       LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
+#endif
+
        /**********************************************************************/
        /* Complete any blocked forwarding packets with error
         */
@@ -260,27 +311,73 @@ kqswnal_finalise (void)
        /* Unmap message buffers and free all descriptors and buffers
         */
 
+#if MULTIRAIL_EKC
+       /* FTTB, we need to unmap any remaining mapped memory.  When
+        * ep_dvma_release() get fixed (and releases any mappings in the
+        * region), we can delete all the code from here -------->  */
+
+       if (kqswnal_data.kqn_txds != NULL) {
+               int  i;
+
+               for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
+                       kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
+
+                       /* If ktx has a buffer, it got mapped; unmap now.
+                        * NB only the pre-mapped stuff is still mapped
+                        * since all tx descs must be idle */
+
+                       if (ktx->ktx_buffer != NULL)
+                               ep_dvma_unload(kqswnal_data.kqn_ep,
+                                              kqswnal_data.kqn_ep_tx_nmh,
+                                              &ktx->ktx_ebuffer);
+               }
+       }
+
+       if (kqswnal_data.kqn_rxds != NULL) {
+               int   i;
+
+               for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
+                       kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
+
+                       /* If krx_pages[0] got allocated, it got mapped.
+                        * NB subsequent pages get merged */
+
+                       if (krx->krx_pages[0] != NULL)
+                               ep_dvma_unload(kqswnal_data.kqn_ep,
+                                              kqswnal_data.kqn_ep_rx_nmh,
+                                              &krx->krx_elanbuffer);
+               }
+       }
+       /* <----------- to here */
+
+       if (kqswnal_data.kqn_ep_rx_nmh != NULL)
+               ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
+
+       if (kqswnal_data.kqn_ep_tx_nmh != NULL)
+               ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
+#else
        if (kqswnal_data.kqn_eprxdmahandle != NULL)
        {
-               elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+               elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                                  kqswnal_data.kqn_eprxdmahandle, 0,
                                  KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
                                  KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
 
-               elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
+               elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
                                  kqswnal_data.kqn_eprxdmahandle);
        }
 
        if (kqswnal_data.kqn_eptxdmahandle != NULL)
        {
-               elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+               elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                                  kqswnal_data.kqn_eptxdmahandle, 0,
                                  KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
                                                      KQSW_NNBLK_TXMSGS));
 
-               elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
+               elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
                                  kqswnal_data.kqn_eptxdmahandle);
        }
+#endif
 
        if (kqswnal_data.kqn_txds != NULL)
        {
@@ -331,7 +428,11 @@ kqswnal_finalise (void)
 static int __init
 kqswnal_initialise (void)
 {
+#if MULTIRAIL_EKC
+       EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
+#else
        ELAN3_DMA_REQUEST dmareq;
+#endif
        int               rc;
        int               i;
        int               elan_page_idx;
@@ -351,9 +452,19 @@ kqswnal_initialise (void)
 
        kqswnal_lib.nal_data = &kqswnal_data;
 
+       memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
+       memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
+#if MULTIRAIL_EKC
+       kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
+#else
+       kqswnal_rpc_failed.Status = -ECONNREFUSED;
+#endif
        /* ensure all pointers NULL etc */
        memset (&kqswnal_data, 0, sizeof (kqswnal_data));
 
+       kqswnal_data.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
+       kqswnal_data.kqn_copy_small_fwd = KQSW_COPY_SMALL_FWD;
+
        kqswnal_data.kqn_cb = &kqswnal_lib;
 
        INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
@@ -375,24 +486,38 @@ kqswnal_initialise (void)
        /* pointers/lists/locks initialised */
        kqswnal_data.kqn_init = KQN_INIT_DATA;
 
+#if MULTIRAIL_EKC
+       kqswnal_data.kqn_ep = ep_system();
+       if (kqswnal_data.kqn_ep == NULL) {
+               CERROR("Can't initialise EKC\n");
+               return (-ENODEV);
+       }
+
+       if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
+               CERROR("Can't get elan ID\n");
+               kqswnal_finalise();
+               return (-ENODEV);
+       }
+#else
        /**********************************************************************/
        /* Find the first Elan device */
 
-       kqswnal_data.kqn_epdev = ep_device (0);
-       if (kqswnal_data.kqn_epdev == NULL)
+       kqswnal_data.kqn_ep = ep_device (0);
+       if (kqswnal_data.kqn_ep == NULL)
        {
                CERROR ("Can't get elan device 0\n");
-               return (-ENOMEM);
+               return (-ENODEV);
        }
+#endif
 
        kqswnal_data.kqn_nid_offset = 0;
-       kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_epdev);
-       kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_epdev);
+       kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
+       kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
        
        /**********************************************************************/
        /* Get the transmitter */
 
-       kqswnal_data.kqn_eptx = ep_alloc_large_xmtr (kqswnal_data.kqn_epdev);
+       kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
        if (kqswnal_data.kqn_eptx == NULL)
        {
                CERROR ("Can't allocate transmitter\n");
@@ -403,9 +528,9 @@ kqswnal_initialise (void)
        /**********************************************************************/
        /* Get the receivers */
 
-       kqswnal_data.kqn_eprx_small = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
-                                                            EP_SVC_LARGE_PORTALS_SMALL,
-                                                            KQSW_EP_ENVELOPES_SMALL);
+       kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
+                                                    EP_MSG_SVC_PORTALS_SMALL,
+                                                    KQSW_EP_ENVELOPES_SMALL);
        if (kqswnal_data.kqn_eprx_small == NULL)
        {
                CERROR ("Can't install small msg receiver\n");
@@ -413,9 +538,9 @@ kqswnal_initialise (void)
                return (-ENOMEM);
        }
 
-       kqswnal_data.kqn_eprx_large = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
-                                                            EP_SVC_LARGE_PORTALS_LARGE,
-                                                            KQSW_EP_ENVELOPES_LARGE);
+       kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
+                                                    EP_MSG_SVC_PORTALS_LARGE,
+                                                    KQSW_EP_ENVELOPES_LARGE);
        if (kqswnal_data.kqn_eprx_large == NULL)
        {
                CERROR ("Can't install large msg receiver\n");
@@ -427,13 +552,23 @@ kqswnal_initialise (void)
        /* Reserve Elan address space for transmit descriptors NB we may
         * either send the contents of associated buffers immediately, or
         * map them for the peer to suck/blow... */
-
+#if MULTIRAIL_EKC
+       kqswnal_data.kqn_ep_tx_nmh = 
+               ep_dvma_reserve(kqswnal_data.kqn_ep,
+                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
+                               EP_PERM_WRITE);
+       if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
+               CERROR("Can't reserve tx dma space\n");
+               kqswnal_finalise();
+               return (-ENOMEM);
+       }
+#else
         dmareq.Waitfn   = DDI_DMA_SLEEP;
         dmareq.ElanAddr = (E3_Addr) 0;
         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
 
-       rc = elan3_dma_reserve(kqswnal_data.kqn_epdev->DmaState,
+       rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
                              KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
                              &dmareq, &kqswnal_data.kqn_eptxdmahandle);
        if (rc != DDI_SUCCESS)
@@ -442,16 +577,27 @@ kqswnal_initialise (void)
                kqswnal_finalise ();
                return (-ENOMEM);
        }
-
+#endif
        /**********************************************************************/
        /* Reserve Elan address space for receive buffers */
-
+#if MULTIRAIL_EKC
+       kqswnal_data.kqn_ep_rx_nmh =
+               ep_dvma_reserve(kqswnal_data.kqn_ep,
+                               KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
+                               KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
+                               EP_PERM_WRITE);
+       if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
+               CERROR("Can't reserve rx dma space\n");
+               kqswnal_finalise();
+               return (-ENOMEM);
+       }
+#else
         dmareq.Waitfn   = DDI_DMA_SLEEP;
         dmareq.ElanAddr = (E3_Addr) 0;
         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
 
-       rc = elan3_dma_reserve (kqswnal_data.kqn_epdev->DmaState,
+       rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
                                KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
                                KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
                                &dmareq, &kqswnal_data.kqn_eprxdmahandle);
@@ -461,7 +607,7 @@ kqswnal_initialise (void)
                kqswnal_finalise ();
                return (-ENOMEM);
        }
-
+#endif
        /**********************************************************************/
        /* Allocate/Initialise transmit descriptors */
 
@@ -492,12 +638,17 @@ kqswnal_initialise (void)
                /* Map pre-allocated buffer NOW, to save latency on transmit */
                premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
                                                        KQSW_TX_BUFFER_SIZE);
-
-               elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+               ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
+                            ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
+                            kqswnal_data.kqn_ep_tx_nmh, basepage,
+                            &all_rails, &ktx->ktx_ebuffer);
+#else
+               elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                       kqswnal_data.kqn_eptxdmahandle,
                                       ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
                                       basepage, &ktx->ktx_ebuffer);
-
+#endif
                ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
                ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
 
@@ -527,7 +678,11 @@ kqswnal_initialise (void)
        elan_page_idx = 0;
        for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
        {
-               E3_Addr       elanaddr;
+#if MULTIRAIL_EKC
+               EP_NMD        elanbuffer;
+#else
+               E3_Addr       elanbuffer;
+#endif
                int           j;
                kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
 
@@ -554,18 +709,35 @@ kqswnal_initialise (void)
 
                        LASSERT(page_address(krx->krx_pages[j]) != NULL);
 
-                       elan3_dvma_kaddr_load(kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                       ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                                    page_address(krx->krx_pages[j]),
+                                    PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
+                                    elan_page_idx, &all_rails, &elanbuffer);
+                       
+                       if (j == 0) {
+                               krx->krx_elanbuffer = elanbuffer;
+                       } else {
+                               rc = ep_nmd_merge(&krx->krx_elanbuffer,
+                                                 &krx->krx_elanbuffer, 
+                                                 &elanbuffer);
+                               /* NB contiguous mapping */
+                               LASSERT(rc);
+                       }
+#else
+                       elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
                                              kqswnal_data.kqn_eprxdmahandle,
                                              page_address(krx->krx_pages[j]),
                                              PAGE_SIZE, elan_page_idx,
-                                             &elanaddr);
-                       elan_page_idx++;
-
+                                             &elanbuffer);
                        if (j == 0)
-                               krx->krx_elanaddr = elanaddr;
+                               krx->krx_elanbuffer = elanbuffer;
+
+                       /* NB contiguous mapping */
+                       LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
+#endif
+                       elan_page_idx++;
 
-                       /* NB we assume a contiguous  */
-                       LASSERT (elanaddr == krx->krx_elanaddr + j * PAGE_SIZE);
                }
        }
        LASSERT (elan_page_idx ==
@@ -593,10 +765,15 @@ kqswnal_initialise (void)
                kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
 
                /* NB this enqueue can allocate/sleep (attr == 0) */
+#if MULTIRAIL_EKC
                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                                     krx->krx_elanaddr,
+                                     &krx->krx_elanbuffer, 0);
+#else
+               rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+                                     krx->krx_elanbuffer,
                                      krx->krx_npages * PAGE_SIZE, 0);
-               if (rc != ESUCCESS)
+#endif
+               if (rc != EP_SUCCESS)
                {
                        CERROR ("failed ep_queue_receive %d\n", rc);
                        kqswnal_finalise ();
@@ -629,6 +806,11 @@ kqswnal_initialise (void)
                return (rc);
        }
 
+#if CONFIG_SYSCTL
+        /* Press on regardless even if registering sysctl doesn't work */
+        kqswnal_data.kqn_sysctl = register_sysctl_table (kqswnal_top_ctl_table, 0);
+#endif
+
        PORTAL_SYMBOL_REGISTER(kqswnal_ni);
        kqswnal_data.kqn_init = KQN_INIT_ALL;
 
@@ -642,8 +824,8 @@ kqswnal_initialise (void)
 }
 
 
-MODULE_AUTHOR("W. Marcus Miller <marcusm@llnl.gov>");
-MODULE_DESCRIPTION("Kernel Quadrics Switch NAL v1.00");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
 MODULE_LICENSE("GPL");
 
 module_init (kqswnal_initialise);
index 4cb9ad9..3e4274b 100644 (file)
 #include <linux/config.h>
 #include <linux/module.h>
 
-#include <elan3/elanregs.h>
-#include <elan3/elandev.h>
-#include <elan3/elanvp.h>
-#include <elan3/elan3mmu.h>
-#include <elan3/elanctxt.h>
-#include <elan3/elandebug.h>
-#include <elan3/urom_addrs.h>
-#include <elan3/busops.h>
-#include <elan3/kcomm.h>
+#if MULTIRAIL_EKC
+# include <elan/epcomms.h>
+#else
+# include <elan3/elanregs.h>
+# include <elan3/elandev.h>
+# include <elan3/elanvp.h>
+# include <elan3/elan3mmu.h>
+# include <elan3/elanctxt.h>
+# include <elan3/elandebug.h>
+# include <elan3/urom_addrs.h>
+# include <elan3/busops.h>
+# include <elan3/kcomm.h>
+#endif
 
 #include <linux/kernel.h>
 #include <linux/mm.h>
@@ -66,7 +70,7 @@
 #include <linux/file.h>
 #include <linux/stat.h>
 #include <linux/list.h>
-#include <asm/uaccess.h>
+#include <linux/sysctl.h>
 #include <asm/segment.h>
 
 #define DEBUG_SUBSYSTEM S_QSWNAL
@@ -75,8 +79,6 @@
 #include <portals/p30.h>
 #include <portals/lib-p30.h>
 
-#define KQSW_OPTIMIZE_GETS 1
-
 #define KQSW_CHECKSUM   0
 #if KQSW_CHECKSUM
 typedef unsigned long kqsw_csum_t;
@@ -87,13 +89,6 @@ typedef unsigned long kqsw_csum_t;
 #define KQSW_HDR_SIZE   (sizeof (ptl_hdr_t) + KQSW_CSUM_SIZE)
 
 /*
- *  Elan NAL
- */
-#define EP_SVC_LARGE_PORTALS_SMALL      (0x10)  /* Portals over elan port number (large payloads) */
-#define EP_SVC_LARGE_PORTALS_LARGE      (0x11)  /* Portals over elan port number (small payloads) */
-/* NB small/large message sizes are GLOBAL constants */
-
-/*
  * Performance Tuning defines
  * NB no mention of PAGE_SIZE for interoperability
  */
@@ -113,6 +108,9 @@ typedef unsigned long kqsw_csum_t;
 
 #define KQSW_RESCHED                    100     /* # busy loops that forces scheduler to yield */
 
+#define KQSW_OPTIMIZED_GETS             1       /* optimized gets? */
+#define KQSW_COPY_SMALL_FWD             0       /* copy small fwd messages to pre-mapped buffer? */
+
 /*
  * derived constants
  */
@@ -136,8 +134,12 @@ typedef unsigned long kqsw_csum_t;
 /* Remote memory descriptor */
 typedef struct
 {
-        __u32            kqrmd_neiov;           /* # frags */
-        EP_IOVEC         kqrmd_eiov[0];         /* actual frags */
+        __u32            kqrmd_nfrag;           /* # frags */
+#if MULTIRAIL_EKC
+        EP_NMD           kqrmd_frag[0];         /* actual frags */
+#else
+        EP_IOVEC         kqrmd_frag[0];         /* actual frags */
+#endif
 } kqswnal_remotemd_t;
 
 typedef struct 
@@ -145,11 +147,16 @@ typedef struct
         struct list_head krx_list;              /* enqueue -> thread */
         EP_RCVR         *krx_eprx;              /* port to post receives to */
         EP_RXD          *krx_rxd;               /* receive descriptor (for repost) */
-        E3_Addr          krx_elanaddr;          /* Elan address of buffer (contiguous in elan vm) */
+#if MULTIRAIL_EKC
+        EP_NMD           krx_elanbuffer;        /* contiguous Elan buffer */
+#else
+        E3_Addr          krx_elanbuffer;        /* contiguous Elan buffer */
+#endif
         int              krx_npages;            /* # pages in receive buffer */
         int              krx_nob;               /* Number Of Bytes received into buffer */
-        atomic_t         krx_refcount;          /* who's using me? */
-        int              krx_rpc_completed;     /* I completed peer's RPC */
+        int              krx_rpc_reply_needed;  /* peer waiting for EKC RPC reply */
+        int              krx_rpc_reply_sent;    /* rpc reply sent */
+        atomic_t         krx_refcount;          /* how to tell when rpc is done */
         kpr_fwd_desc_t   krx_fwd;               /* embedded forwarding descriptor */
         struct page     *krx_pages[KQSW_NRXMSGPAGES_LARGE]; /* pages allocated */
         struct iovec     krx_iov[KQSW_NRXMSGPAGES_LARGE]; /* iovec for forwarding */
@@ -159,15 +166,15 @@ typedef struct
 {
         struct list_head  ktx_list;             /* enqueue idle/active */
         struct list_head  ktx_delayed_list;     /* enqueue delayedtxds */
-        int               ktx_isnblk:1;         /* reserved descriptor? */
-        int               ktx_state:7;          /* What I'm doing */
+        unsigned int      ktx_isnblk:1;         /* reserved descriptor? */
+        unsigned int      ktx_state:7;          /* What I'm doing */
+        unsigned int      ktx_firsttmpfrag:1;   /* ktx_frags[0] is in my ebuffer ? 0 : 1 */
         uint32_t          ktx_basepage;         /* page offset in reserved elan tx vaddrs for mapping pages */
         int               ktx_npages;           /* pages reserved for mapping messages */
         int               ktx_nmappedpages;     /* # pages mapped for current message */
         int               ktx_port;             /* destination ep port */
         ptl_nid_t         ktx_nid;              /* destination node */
         void             *ktx_args[2];          /* completion passthru */
-        E3_Addr           ktx_ebuffer;          /* elan address of ktx_buffer */
         char             *ktx_buffer;           /* pre-allocated contiguous buffer for hdr + small payloads */
         unsigned long     ktx_launchtime;       /* when (in jiffies) the transmit was launched */
 
@@ -176,10 +183,13 @@ typedef struct
         ptl_hdr_t        *ktx_wire_hdr;         /* portals header (wire endian) */
 
         int               ktx_nfrag;            /* # message frags */
-        union {
-                EP_IOVEC   iov[EP_MAXFRAG];     /* msg frags (elan vaddrs) */
-                EP_DATAVEC datav[EP_MAXFRAG];   /* DMA frags (eolan vaddrs) */
-        }                 ktx_frags;
+#if MULTIRAIL_EKC
+        EP_NMD            ktx_ebuffer;          /* elan mapping of ktx_buffer */
+        EP_NMD            ktx_frags[EP_MAXFRAG];/* elan mapping of msg frags */
+#else
+        E3_Addr           ktx_ebuffer;          /* elan address of ktx_buffer */
+        EP_IOVEC          ktx_frags[EP_MAXFRAG];/* msg frags (elan vaddrs) */
+#endif
 } kqswnal_tx_t;
 
 #define KTX_IDLE        0                       /* on kqn_(nblk_)idletxds */
@@ -191,8 +201,15 @@ typedef struct
 {
         char               kqn_init;            /* what's been initialised */
         char               kqn_shuttingdown;    /* I'm trying to shut down */
-        atomic_t           kqn_nthreads;        /* # threads still running */
+        atomic_t           kqn_nthreads;        /* # threads not terminated */
+        atomic_t           kqn_nthreads_running;/* # threads still running */
+
+        int                kqn_optimized_gets;  /* optimized GETs? */
+        int                kqn_copy_small_fwd;  /* fwd small msgs from pre-allocated buffer? */
 
+#if CONFIG_SYSCTL
+        struct ctl_table_header *kqn_sysctl;    /* sysctl interface */
+#endif        
         kqswnal_rx_t      *kqn_rxds;            /* all the receive descriptors */
         kqswnal_tx_t      *kqn_txds;            /* all the transmit descriptors */
 
@@ -212,12 +229,18 @@ typedef struct
 
         spinlock_t         kqn_statelock;       /* cb_cli/cb_sti */
         nal_cb_t          *kqn_cb;              /* -> kqswnal_lib */
-        EP_DEV            *kqn_epdev;           /* elan device */
+#if MULTIRAIL_EKC
+        EP_SYS            *kqn_ep;              /* elan system */
+        EP_NMH            *kqn_ep_tx_nmh;       /* elan reserved tx vaddrs */
+        EP_NMH            *kqn_ep_rx_nmh;       /* elan reserved rx vaddrs */
+#else
+        EP_DEV            *kqn_ep;              /* elan device */
+        ELAN3_DMA_HANDLE  *kqn_eptxdmahandle;   /* elan reserved tx vaddrs */
+        ELAN3_DMA_HANDLE  *kqn_eprxdmahandle;   /* elan reserved rx vaddrs */
+#endif
         EP_XMTR           *kqn_eptx;            /* elan transmitter */
         EP_RCVR           *kqn_eprx_small;      /* elan receiver (small messages) */
         EP_RCVR           *kqn_eprx_large;      /* elan receiver (large messages) */
-        ELAN3_DMA_HANDLE  *kqn_eptxdmahandle;   /* elan reserved tx vaddrs */
-        ELAN3_DMA_HANDLE  *kqn_eprxdmahandle;   /* elan reserved rx vaddrs */
         kpr_router_t       kqn_router;          /* connection to Kernel Portals Router module */
 
         ptl_nid_t          kqn_nid_offset;      /* this cluster's NID offset */
@@ -235,11 +258,15 @@ extern nal_cb_t        kqswnal_lib;
 extern nal_t           kqswnal_api;
 extern kqswnal_data_t  kqswnal_data;
 
+/* global pre-prepared replies to keep off the stack */
+extern EP_STATUSBLK    kqswnal_rpc_success;
+extern EP_STATUSBLK    kqswnal_rpc_failed;
+
 extern int kqswnal_thread_start (int (*fn)(void *arg), void *arg);
 extern void kqswnal_rxhandler(EP_RXD *rxd);
 extern int kqswnal_scheduler (void *);
 extern void kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
-extern void kqswnal_reply_complete (EP_RXD *rxd);
+extern void kqswnal_dma_reply_complete (EP_RXD *rxd);
 extern void kqswnal_requeue_rx (kqswnal_rx_t *krx);
 
 static inline ptl_nid_t
@@ -281,4 +308,87 @@ static inline kqsw_csum_t kqsw_csum (kqsw_csum_t sum, void *base, int nob)
 }
 #endif
 
+static inline void kqswnal_rx_done (kqswnal_rx_t *krx)
+{
+        LASSERT (atomic_read (&krx->krx_refcount) > 0);
+        if (atomic_dec_and_test (&krx->krx_refcount))
+                kqswnal_requeue_rx(krx);
+}
+
+#if MULTIRAIL_EKC
+
+#if (!defined(EP_RAILMASK_ALL) && !defined(EP_SHUTDOWN))
+/* These are making their way into the EKC subsystem.... */
+# define EP_RAILMASK_ALL    0xFFFF
+# define EP_SHUTDOWN        EP_ABORT
+#else
+/* ...Oh! they've got there already! */
+# error "qswnal.h older than EKC headers"
+#endif
+
+static inline int
+ep_nmd_merge (EP_NMD *merged, EP_NMD *a, EP_NMD *b)
+{
+        if (EP_NMD_NODEID(a) != EP_NMD_NODEID(b)) /* not generated on the same node */
+                return 0;
+
+        if ((EP_NMD_RAILMASK(a) & EP_NMD_RAILMASK(b)) == 0) /* no common rails */
+                return 0;
+
+        if (b->nmd_addr == (a->nmd_addr + a->nmd_len)) {
+                if (merged != NULL) {
+                        merged->nmd_addr = a->nmd_addr;
+                        merged->nmd_len  = a->nmd_len + b->nmd_len;
+                        merged->nmd_attr = EP_NMD_ATTR(EP_NMD_NODEID(a), EP_NMD_RAILMASK(a) & EP_NMD_RAILMASK(b));
+                }
+                return 1;
+        }
+    
+        if (a->nmd_addr == (b->nmd_addr + b->nmd_len)) {
+                if (merged != NULL) {
+                        merged->nmd_addr = b->nmd_addr;
+                        merged->nmd_len   = b->nmd_len + a->nmd_len;
+                        merged->nmd_attr  = EP_NMD_ATTR(EP_NMD_NODEID(b), EP_NMD_RAILMASK(a) & EP_NMD_RAILMASK(b));
+                }
+                return 1;
+        }
+
+        return 0;
+}
+#else
+/* multirail defines these in <elan/epcomms.h> */
+#define EP_MSG_SVC_PORTALS_SMALL      (0x10)  /* Portals over elan port number (large payloads) */
+#define EP_MSG_SVC_PORTALS_LARGE      (0x11)  /* Portals over elan port number (small payloads) */
+/* NB small/large message sizes are GLOBAL constants */
+
+/* A minimal attempt to minimise inline #ifdeffing */
+
+#define EP_SUCCESS      ESUCCESS
+#define EP_ENOMEM      ENOMEM
+
+static inline EP_XMTR *
+ep_alloc_xmtr(EP_DEV *e) 
+{
+        return (ep_alloc_large_xmtr(e));
+}
+
+static inline EP_RCVR *
+ep_alloc_rcvr(EP_DEV *e, int svc, int nenv)
+{
+        return (ep_install_large_rcvr(e, svc, nenv));
+}
+
+static inline void
+ep_free_xmtr(EP_XMTR *x) 
+{
+        ep_free_large_xmtr(x);
+}
+
+static inline void
+ep_free_rcvr(EP_RCVR *r)
+{
+        ep_remove_large_rcvr(r);
+}
+#endif
+
 #endif /* _QSWNAL_H */
index 43926c9..96749cd 100644 (file)
@@ -26,6 +26,9 @@
 
 #include "qswnal.h"
 
+EP_STATUSBLK  kqswnal_rpc_success;
+EP_STATUSBLK  kqswnal_rpc_failed;
+
 /*
  *  LIB functions follow
  *
@@ -128,9 +131,22 @@ kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
 void
 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
 {
+#if MULTIRAIL_EKC
+        int      i;
+#endif
+
         if (ktx->ktx_nmappedpages == 0)
                 return;
-
+        
+#if MULTIRAIL_EKC
+        CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
+               ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
+
+        for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
+                ep_dvma_unload(kqswnal_data.kqn_ep,
+                               kqswnal_data.kqn_ep_tx_nmh,
+                               &ktx->ktx_frags[i]);
+#else
         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
 
@@ -138,9 +154,11 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
 
-        elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+        elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                           kqswnal_data.kqn_eptxdmahandle,
                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
+
+#endif
         ktx->ktx_nmappedpages = 0;
 }
 
@@ -152,12 +170,24 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
         char     *ptr;
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
         
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
         LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
-        
+
         do {
                 int  fraglen = kiov->kiov_len;
 
@@ -188,25 +218,40 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                        "%p[%d] loading %p for %d, page %d, %d total\n",
                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             ptr, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        ptr, fraglen,
-                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
-
-                kunmap (kiov->kiov_page);
-                
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                kunmap (kiov->kiov_page);
+                
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage++;
                 kiov++;
@@ -232,8 +277,20 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
-
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
+        
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
         LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
@@ -263,22 +320,38 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                         ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
                         nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             iov->iov_base, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        iov->iov_base, fraglen,
-                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage += npages;
                 iov++;
@@ -424,7 +497,6 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
                 break;
 
         case KTX_GETTING:          /* Peer has DMA-ed direct? */
-                LASSERT (KQSW_OPTIMIZE_GETS);
                 msg = (lib_msg_t *)ktx->ktx_args[1];
                 repmsg = NULL;
 
@@ -455,8 +527,8 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 
         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
 
-        if (status != EP_SUCCESS)
-        {
+        if (status != EP_SUCCESS) {
+
                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
                         ktx->ktx_nid, status);
 
@@ -466,8 +538,11 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
         } else if (ktx->ktx_state == KTX_GETTING) {
                 /* RPC completed OK; what did our peer put in the status
                  * block? */
-                LASSERT (KQSW_OPTIMIZE_GETS);
+#if MULTIRAIL_EKC
+                status = ep_txd_statusblk(txd)->Data[0];
+#else
                 status = ep_txd_statusblk(txd)->Status;
+#endif
         } else {
                 status = 0;
         }
@@ -488,21 +563,38 @@ kqswnal_launch (kqswnal_tx_t *ktx)
 
         LASSERT (dest >= 0);                    /* must be a peer */
         if (ktx->ktx_state == KTX_GETTING) {
-                LASSERT (KQSW_OPTIMIZE_GETS);
+                /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
+                 * other frags are the GET sink which we obviously don't
+                 * send here :) */
+#if MULTIRAIL_EKC
+                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+                                     ktx->ktx_port, attr,
+                                     kqswnal_txhandler, ktx,
+                                     NULL, ktx->ktx_frags, 1);
+#else
                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
                                      ktx->ktx_port, attr, kqswnal_txhandler,
-                                     ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+                                     ktx, NULL, ktx->ktx_frags, 1);
+#endif
         } else {
+#if MULTIRAIL_EKC
+                rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
+                                         ktx->ktx_port, attr,
+                                         kqswnal_txhandler, ktx,
+                                         NULL, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
-                                       ktx->ktx_port, attr, kqswnal_txhandler,
-                                       ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+                                       ktx->ktx_port, attr, 
+                                       kqswnal_txhandler, ktx, 
+                                       ktx->ktx_frags, ktx->ktx_nfrag);
+#endif
         }
 
         switch (rc) {
-        case ESUCCESS: /* success */
+        case EP_SUCCESS: /* success */
                 return (0);
 
-        case ENOMEM: /* can't allocate ep txd => queue for later */
+        case EP_ENOMEM: /* can't allocate ep txd => queue for later */
                 LASSERT (in_interrupt());
 
                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
@@ -516,7 +608,7 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         default: /* fatal error */
                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
                 kqswnal_notify_peer_down(ktx);
-                return (rc);
+                return (-EHOSTUNREACH);
         }
 }
 
@@ -589,6 +681,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
 
 }                               /* end of print_hdr() */
 
+#if !MULTIRAIL_EKC
 void
 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
 {
@@ -648,6 +741,7 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
         CERROR ("DATAVEC too small\n");
         return (-E2BIG);
 }
+#endif
 
 int
 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
@@ -656,14 +750,17 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
         char               *buffer = (char *)page_address(krx->krx_pages[0]);
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
-        EP_IOVEC            eiov[EP_MAXFRAG];
-        EP_STATUSBLK        blk;
         int                 rc;
-
-        LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
+#if MULTIRAIL_EKC
+        int                 i;
+#else
+        EP_DATAVEC          datav[EP_MAXFRAG];
+        int                 ndatav;
+#endif
+        LASSERT (krx->krx_rpc_reply_needed);
         LASSERT ((iov == NULL) != (kiov == NULL));
 
-        /* see .*_pack_k?iov comment regarding endian-ness */
+        /* see kqswnal_sendmsg comment regarding endian-ness */
         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
                 /* msg too small to discover rmd size */
                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
@@ -671,16 +768,16 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                 return (-EINVAL);
         }
         
-        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
+        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
                 /* rmd doesn't fit in the incoming message */
                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
-                        krx->krx_nob, rmd->kqrmd_neiov,
-                        (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
+                        krx->krx_nob, rmd->kqrmd_nfrag,
+                        (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
                 return (-EINVAL);
         }
 
-        /* Ghastly hack part 1, uses the existing procedures to map the source data... */
-        ktx->ktx_nfrag = 0;
+        /* Map the source data... */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
         if (kiov != NULL)
                 rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
         else
@@ -691,32 +788,61 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                 return (rc);
         }
 
-        /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
-        memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
-
-        rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
-                                  ktx->ktx_nfrag, eiov,
-                                  rmd->kqrmd_neiov, rmd->kqrmd_eiov);
-        if (rc < 0) {
-                CERROR ("Can't create datavec: %d\n", rc);
-                return (rc);
+#if MULTIRAIL_EKC
+        if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       ktx->ktx_nfrag, rmd->kqrmd_nfrag);
+                return (-EINVAL);
         }
-        ktx->ktx_nfrag = rc;
-
-        memset (&blk, 0, sizeof (blk));         /* zero blk.Status */
+        
+        for (i = 0; i < rmd->kqrmd_nfrag; i++)
+                if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, rmd->kqrmd_nfrag, 
+                               ktx->ktx_frags[i].nmd_len, 
+                               rmd->kqrmd_frag[i].nmd_len);
+                        return (-EINVAL);
+                }
+#else
+        ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
+                                      ktx->ktx_nfrag, ktx->ktx_frags,
+                                      rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (ndatav < 0) {
+                CERROR ("Can't create datavec: %d\n", ndatav);
+                return (ndatav);
+        }
+#endif
 
-        /* Our caller will start to race with kqswnal_rpc_complete... */
+        /* Our caller will start to race with kqswnal_dma_reply_complete... */
         LASSERT (atomic_read (&krx->krx_refcount) == 1);
         atomic_set (&krx->krx_refcount, 2);
 
-        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
-                              &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
-        if (rc == ESUCCESS)
+#if MULTIRAIL_EKC
+        rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
+                             &kqswnal_rpc_success,
+                             ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
+        if (rc == EP_SUCCESS)
+                return (0);
+
+        /* Well we tried... */
+        krx->krx_rpc_reply_needed = 0;
+#else
+        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+                              &kqswnal_rpc_success, datav, ndatav);
+        if (rc == EP_SUCCESS)
                 return (0);
 
+        /* "old" EKC destroys rxd on failed completion */
+        krx->krx_rxd = NULL;
+#endif
+
+        CERROR("can't complete RPC: %d\n", rc);
+
         /* reset refcount back to 1: we're not going to be racing with
-         * kqswnal_rely_complete. */
+         * kqswnal_dma_reply_complete. */
         atomic_set (&krx->krx_refcount, 1);
+
         return (-ECONNABORTED);
 }
 
@@ -785,12 +911,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_NOSPACE);
         }
 
+        ktx->ktx_nid     = targetnid;
         ktx->ktx_args[0] = private;
         ktx->ktx_args[1] = libmsg;
 
-#if KQSW_OPTIMIZE_GETS
         if (type == PTL_MSG_REPLY &&
-            ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
+            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
                 if (nid != targetnid ||
                     kqswnal_nid2elanid(nid) != 
                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
@@ -798,7 +924,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                "nid "LPX64" via "LPX64" elanID %d\n",
                                nid, targetnid,
                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        return(PTL_FAIL);
+                        return (PTL_FAIL);
                 }
 
                 /* peer expects RPC completion with GET data */
@@ -806,13 +932,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                         payload_niov, payload_iov, 
                                         payload_kiov, payload_nob);
                 if (rc == 0)
-                        return (0);
+                        return (PTL_OK);
                 
                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
                 kqswnal_put_idle_tx (ktx);
                 return (PTL_FAIL);
         }
-#endif
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
@@ -838,15 +963,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
 #endif
         
-        /* Set up first frag from pre-mapped buffer (it's at least the
-         * portals header) */
-        ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
-        ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
-        ktx->ktx_nfrag = 1;
-        ktx->ktx_state = KTX_SENDING;   /* => lib_finalize() on completion */
-
-#if KQSW_OPTIMIZE_GETS
-        if (type == PTL_MSG_GET &&              /* doing a GET */
+        if (kqswnal_data.kqn_optimized_gets &&
+            type == PTL_MSG_GET &&              /* doing a GET */
             nid == targetnid) {                 /* not forwarding */
                 lib_md_t           *md = libmsg->md;
                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
@@ -856,8 +974,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  *
                  * First I set up ktx as if it was going to send this
                  * payload, (it needs to map it anyway).  This fills
-                 * ktx_frags.iov[1] and onward with the network addresses
-                 * of the get sink frags.  I copy these into ktx_buffer,
+                 * ktx_frags[1] and onward with the network addresses
+                 * of the GET sink frags.  I copy these into ktx_buffer,
                  * immediately after the header, and send that as my GET
                  * message.
                  *
@@ -865,6 +983,9 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  * When EKC copes with different endian nodes, I'll fix
                  * this (and eat my hat :) */
 
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_GETTING;
+
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
                         rc = kqswnal_map_tx_kiov (ktx, md->length,
                                                   md->md_niov, md->md_iov.kiov);
@@ -877,46 +998,73 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                         return (PTL_FAIL);
                 }
 
-                rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
-                memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
-                        rmd->kqrmd_neiov * sizeof (EP_IOVEC));
+                rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
-                ktx->ktx_nfrag = 1;
-                ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
-                                                       kqrmd_eiov[rmd->kqrmd_neiov]);
-                payload_nob = ktx->ktx_frags.iov[0].Len;
-                ktx->ktx_state = KTX_GETTING;
-        } else 
+                payload_nob = offsetof(kqswnal_remotemd_t,
+                                       kqrmd_frag[rmd->kqrmd_nfrag]);
+                LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
+
+#if MULTIRAIL_EKC
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_NMD));
+
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
+                
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+#endif
+        } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
+
+                /* small message: single frag copied into the pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
 #endif
-        if (payload_nob > 0) { /* got some payload (something more to do) */
-                /* make a single contiguous message? */
-                if (payload_nob <= KQSW_TX_MAXCONTIG) {
-                        /* copy payload to ktx_buffer, immediately after hdr */
+                if (payload_nob > 0) {
                         if (payload_kiov != NULL)
                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
                                                    payload_niov, payload_kiov, payload_nob);
                         else
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
                                                   payload_niov, payload_iov, payload_nob);
-                        /* first frag includes payload */
-                        ktx->ktx_frags.iov[0].Len += payload_nob;
-                } else {
-                        if (payload_kiov != NULL)
-                                rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
-                                                          payload_niov, payload_kiov);
-                        else
-                                rc = kqswnal_map_tx_iov (ktx, payload_nob,
-                                                         payload_niov, payload_iov);
-                        if (rc != 0) {
-                                kqswnal_put_idle_tx (ktx);
-                                return (PTL_FAIL);
-                        }
-                } 
-        }
+                }
+        } else {
 
-        ktx->ktx_nid  = targetnid;
+                /* large message: multiple frags: first is hdr in pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                if (payload_kiov != NULL)
+                        rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
+                                                  payload_niov, payload_kiov);
+                else
+                        rc = kqswnal_map_tx_iov (ktx, payload_nob,
+                                                 payload_niov, payload_iov);
+                if (rc != 0) {
+                        kqswnal_put_idle_tx (ktx);
+                        return (PTL_FAIL);
+                }
+        }
+        
         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
+                        EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
         if (rc != 0) {                    /* failed? */
@@ -962,8 +1110,6 @@ kqswnal_send_pages (nal_cb_t     *nal,
                                  payload_niov, NULL, payload_kiov, payload_nob));
 }
 
-int kqswnal_fwd_copy_contig = 0;
-
 void
 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
@@ -984,7 +1130,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 
         LASSERT (niov > 0);
         
-        ktx = kqswnal_get_idle_tx (fwd, FALSE);
+        ktx = kqswnal_get_idle_tx (fwd, 0);
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
 
@@ -1005,20 +1151,31 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 goto failed;
         }
 
-        if ((kqswnal_fwd_copy_contig || niov > 1) &&
+        ktx->ktx_port    = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+                           EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
+        ktx->ktx_nid     = nid;
+        ktx->ktx_state   = KTX_FORWARDING;
+        ktx->ktx_args[0] = fwd;
+
+        if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
             nob <= KQSW_TX_BUFFER_SIZE) 
         {
-                /* send from ktx's pre-allocated/mapped contiguous buffer? */
+                /* send from ktx's pre-mapped contiguous buffer? */
                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
-                ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
-                ktx->ktx_frags.iov[0].Len = nob;
-                ktx->ktx_nfrag = 1;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = nob;
+#endif
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
         }
         else
         {
                 /* zero copy */
-                ktx->ktx_nfrag = 0;       /* no frags mapped yet */
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
                 if (rc != 0)
                         goto failed;
@@ -1026,12 +1183,6 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
-        ktx->ktx_port    = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid     = nid;
-        ktx->ktx_state   = KTX_FORWARDING; /* kpr_put_packet() on completion */
-        ktx->ktx_args[0] = fwd;
-
         rc = kqswnal_launch (ktx);
         if (rc == 0)
                 return;
@@ -1064,7 +1215,7 @@ kqswnal_fwd_callback (void *arg, int error)
 }
 
 void
-kqswnal_reply_complete (EP_RXD *rxd) 
+kqswnal_dma_reply_complete (EP_RXD *rxd) 
 {
         int           status = ep_rxd_status(rxd);
         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
@@ -1075,9 +1226,10 @@ kqswnal_reply_complete (EP_RXD *rxd)
                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
 
         LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
 
-        krx->krx_rpc_completed = 1;
-        kqswnal_requeue_rx (krx);
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_done (krx);
 
         lib_finalize (&kqswnal_lib, NULL, msg);
         kqswnal_put_idle_tx (ktx);
@@ -1093,67 +1245,76 @@ kqswnal_rpc_complete (EP_RXD *rxd)
                "rxd %p, krx %p, status %d\n", rxd, krx, status);
 
         LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
         
-        krx->krx_rpc_completed = 1;
+        krx->krx_rpc_reply_needed = 0;
         kqswnal_requeue_rx (krx);
 }
 
 void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_requeue_rx (kqswnal_rx_t *krx) 
 {
-        EP_STATUSBLK  blk;
-        int           rc;
+        int   rc;
 
-        LASSERT (atomic_read (&krx->krx_refcount) > 0);
-        if (!atomic_dec_and_test (&krx->krx_refcount))
-                return;
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
 
-        if (!ep_rxd_isrpc(krx->krx_rxd) ||
-            krx->krx_rpc_completed) {
+        if (krx->krx_rpc_reply_needed) {
 
-                /* don't actually requeue on shutdown */
-                if (kqswnal_data.kqn_shuttingdown)
+                /* We failed to complete the peer's optimized GET (e.g. we
+                 * couldn't map the source buffers).  We complete the
+                 * peer's EKC rpc now with failure. */
+#if MULTIRAIL_EKC
+                rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
+                                     &kqswnal_rpc_failed, NULL, NULL, 0);
+                if (rc == EP_SUCCESS)
                         return;
                 
-                ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
-                                    krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
-                return;
-        }
-
-        /* Sender wanted an RPC, but we didn't complete it (we must have
-         * dropped the sender's message).  We complete it now with
-         * failure... */
-        memset (&blk, 0, sizeof (blk));
-        blk.Status = -ECONNREFUSED;
-
-        atomic_set (&krx->krx_refcount, 1);
+                CERROR("can't complete RPC: %d\n", rc);
+#else
+                if (krx->krx_rxd != NULL) {
+                        /* We didn't try (and fail) to complete earlier... */
+                        rc = ep_complete_rpc(krx->krx_rxd, 
+                                             kqswnal_rpc_complete, krx,
+                                             &kqswnal_rpc_failed, NULL, 0);
+                        if (rc == EP_SUCCESS)
+                                return;
+
+                        CERROR("can't complete RPC: %d\n", rc);
+                }
+                
+                /* NB the old ep_complete_rpc() frees rxd on failure, so we
+                 * have to requeue from scratch here, unless we're shutting
+                 * down */
+                if (kqswnal_data.kqn_shuttingdown)
+                        return;
 
-        rc = ep_complete_rpc (krx->krx_rxd, 
-                              kqswnal_rpc_complete, krx,
-                              &blk, NULL, 0);
-        if (rc == ESUCCESS) {
-                /* callback will call me again to requeue, having set
-                 * krx_rpc_completed... */
+                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+                                      krx->krx_elanbuffer, 
+                                      krx->krx_npages * PAGE_SIZE, 0);
+                LASSERT (rc == EP_SUCCESS);
+                /* We don't handle failure here; it's incredibly rare
+                 * (never reported?) and only happens with "old" EKC */
                 return;
+#endif
         }
 
-        CERROR("can't complete RPC: %d\n", rc);
-
-        /* we don't actually requeue on shutdown */
-        if (kqswnal_data.kqn_shuttingdown)
-                return;
-
-        /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
-         * from scratch here... */
-        rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                              krx->krx_elanaddr, 
-                              krx->krx_npages * PAGE_SIZE, 0);
-
-        LASSERT (rc == ESUCCESS);
-        /* This needs to be fixed by ep_complete_rpc NOT freeing
-         * krx->krx_rxd on failure so we can just ep_requeue_receive() */
+#if MULTIRAIL_EKC
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
+#else                
+        /* don't actually requeue on shutdown */
+        if (!kqswnal_data.kqn_shuttingdown) 
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
+#endif
 }
-
+        
 void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
@@ -1162,9 +1323,12 @@ kqswnal_rx (kqswnal_rx_t *krx)
         int             nob;
         int             niov;
 
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+
         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                /* NB krx requeued when lib_parse() calls back kqswnal_recv */
+                atomic_set(&krx->krx_refcount, 1);
                 lib_parse (&kqswnal_lib, hdr, krx);
+                kqswnal_rx_done(krx);
                 return;
         }
 
@@ -1212,18 +1376,27 @@ kqswnal_rxhandler(EP_RXD *rxd)
 
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
-        LASSERT (atomic_read (&krx->krx_refcount) == 0);
-        atomic_set (&krx->krx_refcount, 1);
-        krx->krx_rpc_completed = 0;
+#if MULTIRAIL_EKC
+        krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
+#else
+        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
+#endif
         
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
                 /* receives complete with failure when receiver is removed */
+#if MULTIRAIL_EKC
+                if (status == EP_SHUTDOWN)
+                        LASSERT (kqswnal_data.kqn_shuttingdown);
+                else
+                        CERROR("receive status failed with status %d nob %d\n",
+                               ep_rxd_status(rxd), nob);
+#else
                 if (!kqswnal_data.kqn_shuttingdown)
                         CERROR("receive status failed with status %d nob %d\n",
                                ep_rxd_status(rxd), nob);
-
+#endif
                 kqswnal_requeue_rx (krx);
                 return;
         }
@@ -1417,8 +1590,6 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #endif
         lib_finalize(nal, private, libmsg);
 
-        kqswnal_requeue_rx (krx);
-
         return (rlen);
 }
 
@@ -1455,6 +1626,7 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
+        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1473,6 +1645,7 @@ kqswnal_scheduler (void *arg)
         long             flags;
         int              rc;
         int              counter = 0;
+        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1480,9 +1653,21 @@ kqswnal_scheduler (void *arg)
         
         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
-        while (!kqswnal_data.kqn_shuttingdown)
+        for (;;)
         {
-                did_something = FALSE;
+                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
+
+                        if (kqswnal_data.kqn_shuttingdown == 2)
+                                break;
+                
+                        /* During stage 1 of shutdown we are still responsive
+                         * to receives */
+
+                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
+                        shuttingdown = kqswnal_data.kqn_shuttingdown;
+                }
+
+                did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
                 {
@@ -1494,11 +1679,12 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_rx (krx);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!shuttingdown &&
+                    !list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1514,11 +1700,12 @@ kqswnal_scheduler (void *arg)
                                 kqswnal_tx_done (ktx, rc);
                         }
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!shuttingdown &
+                    !list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
@@ -1526,7 +1713,7 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_fwd_packet (NULL, fwd);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
@@ -1539,7 +1726,7 @@ kqswnal_scheduler (void *arg)
 
                         if (!did_something) {
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
index 6ebb29f..f5ceb59 100755 (executable)
@@ -841,10 +841,19 @@ def if2addr(iface):
     ip = string.split(addr, ':')[1]
     return ip
 
+def sys_get_elan_position_file():
+    procfiles = ["/proc/elan/device0/position",
+                 "/proc/qsnet/elan4/device0/position",
+                 "/proc/qsnet/elan3/device0/position"]
+    for p in procfiles:
+        if os.access(p, os.R_OK):
+            return p
+    return ""
+
 def sys_get_local_nid(net_type, wildcard, cluster_id):
     """Return the local nid."""
     local = ""
-    if os.access('/proc/elan/device0/position', os.R_OK):
+    if sys_get_elan_position_file():
         local = sys_get_local_address('elan', '*', cluster_id)
     else:
         local = sys_get_local_address(net_type, wildcard, cluster_id)
@@ -863,9 +872,12 @@ def sys_get_local_address(net_type, wildcard, cluster_id):
             host = socket.gethostname()
             local = socket.gethostbyname(host)
     elif net_type == 'elan':
-        # awk '/NodeId/ { print $2 }' '/proc/elan/device0/position'
+        # awk '/NodeId/ { print $2 }' 'sys_get_elan_position_file()'
+        f = sys_get_elan_position_file()
+        if not f:
+            panic ("unable to determine local Elan ID")
         try:
-            fp = open('/proc/elan/device0/position', 'r')
+            fp = open(f, 'r')
             lines = fp.readlines()
             fp.close()
             for l in lines:
@@ -2334,7 +2346,12 @@ def sys_tweak_socknal ():
         sysctl("socknal/typed", 0)
 
 def sys_optimize_elan ():
-        run ("echo 0 > /proc/elan/config/eventint_punt_loops")
+    procfiles = ["/proc/elan/config/eventint_punt_loops",
+                 "/proc/qsnet/elan3/config/eventint_punt_loops",
+                 "/proc/qsnet/elan4/config/elan4_mainint_punt_loops"]
+    for p in procfiles:
+        if os.access(p, os.R_OK):
+            run ("echo 0 > " + p)
 
 def sys_set_ptldebug(ptldebug):
     if config.ptldebug:
index d5994e7..2f35ded 100644 (file)
@@ -140,8 +140,27 @@ parse_options(char * options, struct lustre_mount_data *lmd)
 }
 
 int
+get_local_elan_id(char *fname, char *buf)
+{
+        FILE *fp = fopen(fname, "r");
+        int   rc;
+
+        if (fp == NULL)
+                return -1;
+
+        rc = fscanf(fp, "NodeId %255s", buf);
+
+        fclose(fp);
+
+        return (rc == 1) ? 0 : -1;
+}
+
+int
 set_local(struct lustre_mount_data *lmd)
 {
+        /* XXX ClusterID?
+         * XXX PtlGetId() will be safer if portals is loaded and
+         * initialised correctly at this time... */
         char buf[256];
         ptl_nid_t nid;
         int rc;
@@ -159,19 +178,26 @@ set_local(struct lustre_mount_data *lmd)
                         return rc;
                 }
         } else if (lmd->lmd_nal == QSWNAL) {
-                FILE *fp;
-                fp = fopen("/proc/elan/device0/position", "r");
-                if (fp == NULL) {
-                        perror("mount: /proc/elan/device0/position");
-                        return -1;
-                }
-                rc = fscanf(fp, "%*s %255s", buf);
-                fclose(fp);
-                if (rc != 1) {
-                        fprintf(stderr, "mount: problem read elan NID");
+#if MULTIRAIL_EKC
+                char *pfiles[] = {"/proc/qsnet/elan3/device0/position",
+                                  "/proc/qsnet/elan4/device0/position",
+                                  NULL};
+#else
+                char *pfiles[] = {"/proc/elan/device0/position",
+                                  NULL};
+#endif
+                int   i = 0;
+
+                do {
+                        rc = get_local_elan_id(pfiles[i], buf);
+                } while (rc != 0 &&
+                         pfiles[++i] != NULL);
+                
+                if (rc != 0) {
+                        fprintf(stderr, "mount: can't read elan ID"
+                                " from /proc\n");
                         return -1;
                 }
-                
         }
 
         if (ptl_parse_nid (&nid, buf) != 0) {