Whamcloud - gitweb
LU-6020 kerberos: proper sg list initialization
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
index b5e1e39..fa8e8f4 100644 (file)
 /*
- * Copyright (C) 2002 Cluster File Systems, Inc.
- *   Author: Eric Barton <eric@bartonsoftware.com>
+ * GPL HEADER START
  *
- * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
- * W. Marcus Miller - Based on ksocknal
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- * This file is part of Portals, http://www.sf.net/projects/lustre/
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- * Portals is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Portals is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
  * You should have received a copy of the GNU General Public License
- * along with Portals; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
  *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/klnds/qswlnd/qswlnd.c
+ *
+ * Author: Eric Barton <eric@bartonsoftware.com>
  */
 
-#include "qswnal.h"
-
-ptl_handle_ni_t                kqswnal_ni;
-nal_t                  kqswnal_api;
-kqswnal_data_t         kqswnal_data;
-
-kpr_nal_interface_t kqswnal_router_interface = {
-       kprni_nalid:    QSWNAL,
-       kprni_arg:      NULL,
-       kprni_fwd:      kqswnal_fwd_packet,
-};
-
-
-static int
-kqswnal_forward(nal_t   *nal,
-               int     id,
-               void    *args,  size_t args_len,
-               void    *ret,   size_t ret_len)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
-       return (PTL_OK);
-}
-
-static void
-kqswnal_lock (nal_t *nal, unsigned long *flags)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       nal_cb->cb_cli(nal_cb,flags);
-}
-
-static void
-kqswnal_unlock(nal_t *nal, unsigned long *flags)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       nal_cb->cb_sti(nal_cb,flags);
-}
-
-static int
-kqswnal_shutdown(nal_t *nal, int ni)
-{
-       CDEBUG (D_NET, "shutdown\n");
-
-       LASSERT (nal == &kqswnal_api);
-       return (0);
-}
-
-static void
-kqswnal_yield( nal_t *nal )
-{
-       CDEBUG (D_NET, "yield\n");
+#include "qswlnd.h"
 
-       if (current->need_resched)
-               schedule();
-       return;
-}
 
-static nal_t *
-kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
-            ptl_pid_t requested_pid)
+lnd_t the_kqswlnd =
 {
-       ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
-       int       nnids = kqswnal_data.kqn_nnodes;
-
-        CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
-
-       lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
+       .lnd_type       = QSWLND,
+       .lnd_startup    = kqswnal_startup,
+       .lnd_shutdown   = kqswnal_shutdown,
+       .lnd_ctl        = kqswnal_ctl,
+       .lnd_send       = kqswnal_send,
+        .lnd_recv       = kqswnal_recv,
+};
 
-       return (&kqswnal_api);
-}
+kqswnal_data_t         kqswnal_data;
 
 int
-kqswnal_get_tx_desc (struct portal_ioctl_data *data)
+kqswnal_get_tx_desc (struct libcfs_ioctl_data *data)
 {
        unsigned long      flags;
-       struct list_head  *tmp;
+       cfs_list_t        *tmp;
        kqswnal_tx_t      *ktx;
+       lnet_hdr_t        *hdr;
        int                index = data->ioc_count;
        int                rc = -ENOENT;
 
-       spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
+       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
 
-       list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
+       cfs_list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
                if (index-- != 0)
                        continue;
-               
-               ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
-
-               data->ioc_pbuf1 = (char *)ktx;
-               data->ioc_count = NTOH__u32(ktx->ktx_wire_hdr->type);
-               data->ioc_size  = NTOH__u32(PTL_HDR_LENGTH(ktx->ktx_wire_hdr));
-               data->ioc_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
-               data->ioc_nid2  = ktx->ktx_nid;
-               data->ioc_misc  = ktx->ktx_launcher;
-               data->ioc_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
-                                 ((!ktx->ktx_forwarding)              ? 0 : 2) |
-                                 ((!ktx->ktx_isnblk)                  ? 0 : 4);
 
+               ktx = cfs_list_entry (tmp, kqswnal_tx_t, ktx_list);
+               hdr = (lnet_hdr_t *)ktx->ktx_buffer;
+
+               data->ioc_count  = le32_to_cpu(hdr->payload_length);
+               data->ioc_nid    = le64_to_cpu(hdr->dest_nid);
+               data->ioc_u64[0] = ktx->ktx_nid;
+               data->ioc_u32[0] = le32_to_cpu(hdr->type);
+               data->ioc_u32[1] = ktx->ktx_launcher;
+               data->ioc_flags  =
+                        (cfs_list_empty (&ktx->ktx_schedlist) ? 0 : 1) |
+                                        (ktx->ktx_state << 2);
                rc = 0;
                break;
        }
-       
-       spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
+
+       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
        return (rc);
 }
 
 int
-kqswnal_cmd (struct portal_ioctl_data *data, void *private)
+kqswnal_ctl (lnet_ni_t *ni, unsigned int cmd, void *arg)
 {
-       LASSERT (data != NULL);
-       
-       switch (data->ioc_nal_cmd) {
-       case NAL_CMD_GET_TXDESC:
+       struct libcfs_ioctl_data *data = arg;
+
+       LASSERT (ni == kqswnal_data.kqn_ni);
+
+       switch (cmd) {
+       case IOC_LIBCFS_GET_TXDESC:
                return (kqswnal_get_tx_desc (data));
 
-       case NAL_CMD_REGISTER_MYNID:
-               CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
-                       data->ioc_nid - kqswnal_data.kqn_elanid,
-                       kqswnal_data.kqn_nid_offset);
-               kqswnal_data.kqn_nid_offset =
-                       data->ioc_nid - kqswnal_data.kqn_elanid;
-               kqswnal_lib.ni.nid = data->ioc_nid;
-               return (0);
-               
+       case IOC_LIBCFS_REGISTER_MYNID:
+               if (data->ioc_nid == ni->ni_nid)
+                       return 0;
+
+               LASSERT (LNET_NIDNET(data->ioc_nid) == LNET_NIDNET(ni->ni_nid));
+
+               CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID for %s(%s)\n",
+                      libcfs_nid2str(data->ioc_nid),
+                      libcfs_nid2str(ni->ni_nid));
+               return 0;
+
        default:
                return (-EINVAL);
        }
 }
 
-void __exit
-kqswnal_finalise (void)
+void
+kqswnal_shutdown(lnet_ni_t *ni)
 {
+       unsigned long flags;
+       kqswnal_tx_t *ktx;
+       kqswnal_rx_t *krx;
+       
+       CDEBUG (D_NET, "shutdown\n");
+       LASSERT (ni->ni_data == &kqswnal_data);
+       LASSERT (ni == kqswnal_data.kqn_ni);
+
        switch (kqswnal_data.kqn_init)
        {
        default:
                LASSERT (0);
 
        case KQN_INIT_ALL:
-               PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
-               /* fall through */
-
-       case KQN_INIT_PTL:
-               PtlNIFini (kqswnal_ni);
-               lib_fini (&kqswnal_lib);
-               /* fall through */
-
        case KQN_INIT_DATA:
                break;
-
-       case KQN_INIT_NOTHING:
-               return;
        }
 
        /**********************************************************************/
-       /* Make router stop her calling me and fail any more call-ins */
-       kpr_shutdown (&kqswnal_data.kqn_router);
-
-       /**********************************************************************/
-       /* flag threads to terminate, wake them and wait for them to die */
-
+       /* Signal the start of shutdown... */
+       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
        kqswnal_data.kqn_shuttingdown = 1;
-       wake_up_all (&kqswnal_data.kqn_sched_waitq);
+       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
 
-       while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
-               CDEBUG(D_NET, "waiting for %d threads to terminate\n",
-                      atomic_read (&kqswnal_data.kqn_nthreads));
-               set_current_state (TASK_UNINTERRUPTIBLE);
-               schedule_timeout (HZ);
+       /**********************************************************************/
+       /* wait for sends that have allocated a tx desc to launch or give up */
+       while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
+               CDEBUG(D_NET, "waiting for %d pending sends\n",
+                      atomic_read (&kqswnal_data.kqn_pending_txs));
+               cfs_pause(cfs_time_seconds(1));
        }
 
        /**********************************************************************/
        /* close elan comms */
-
+       /* Shut down receivers first; rx callbacks might try sending... */
        if (kqswnal_data.kqn_eprx_small != NULL)
-               ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
+               ep_free_rcvr (kqswnal_data.kqn_eprx_small);
 
        if (kqswnal_data.kqn_eprx_large != NULL)
-               ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
+               ep_free_rcvr (kqswnal_data.kqn_eprx_large);
+
+       /* NB ep_free_rcvr() returns only after we've freed off all receive
+        * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
+        * means we must have completed any messages we passed to
+        * lnet_parse() */
 
        if (kqswnal_data.kqn_eptx != NULL)
-               ep_free_large_xmtr (kqswnal_data.kqn_eptx);
+               ep_free_xmtr (kqswnal_data.kqn_eptx);
 
-       /**********************************************************************/
-       /* No more threads.  No more portals, router or comms callbacks!
-        * I control the horizontals and the verticals...
-        */
+       /* NB ep_free_xmtr() returns only after all outstanding transmits
+        * have called their callback... */
+       LASSERT(cfs_list_empty(&kqswnal_data.kqn_activetxds));
 
        /**********************************************************************/
-       /* Complete any blocked forwarding packets with error
-        */
-
-       while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
-       {
-               kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
-                                                 kpr_fwd_desc_t, kprfd_list);
-               list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
-       }
+       /* flag threads to terminate, wake them and wait for them to die */
+       kqswnal_data.kqn_shuttingdown = 2;
+       wake_up_all (&kqswnal_data.kqn_sched_waitq);
 
-       while (!list_empty (&kqswnal_data.kqn_delayedfwds))
-       {
-               kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
-                                                 kpr_fwd_desc_t, kprfd_list);
-               list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
+       while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
+               CDEBUG(D_NET, "waiting for %d threads to terminate\n",
+                      atomic_read (&kqswnal_data.kqn_nthreads));
+               cfs_pause(cfs_time_seconds(1));
        }
 
        /**********************************************************************/
-       /* Wait for router to complete any packets I sent her
+       /* No more threads.  No more portals, router or comms callbacks!
+        * I control the horizontals and the verticals...
         */
 
-       kpr_deregister (&kqswnal_data.kqn_router);
-
+       LASSERT (cfs_list_empty (&kqswnal_data.kqn_readyrxds));
+       LASSERT (cfs_list_empty (&kqswnal_data.kqn_donetxds));
+       LASSERT (cfs_list_empty (&kqswnal_data.kqn_delayedtxds));
 
        /**********************************************************************/
        /* Unmap message buffers and free all descriptors and buffers
         */
 
-       if (kqswnal_data.kqn_eprxdmahandle != NULL)
-       {
-               elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
-                                 kqswnal_data.kqn_eprxdmahandle, 0,
-                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
-                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
+       /* FTTB, we need to unmap any remaining mapped memory.  When
+        * ep_dvma_release() get fixed (and releases any mappings in the
+        * region), we can delete all the code from here -------->  */
+
+       for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
+               /* If ktx has a buffer, it got mapped; unmap now.  NB only
+                * the pre-mapped stuff is still mapped since all tx descs
+                * must be idle */
 
-               elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
-                                 kqswnal_data.kqn_eprxdmahandle);
+               if (ktx->ktx_buffer != NULL)
+                       ep_dvma_unload(kqswnal_data.kqn_ep,
+                                      kqswnal_data.kqn_ep_tx_nmh,
+                                      &ktx->ktx_ebuffer);
        }
 
-       if (kqswnal_data.kqn_eptxdmahandle != NULL)
-       {
-               elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
-                                 kqswnal_data.kqn_eptxdmahandle, 0,
-                                 KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
-                                                     KQSW_NNBLK_TXMSGS));
+       for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
+               /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
+                * NB subsequent pages get merged */
 
-               elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
-                                 kqswnal_data.kqn_eptxdmahandle);
+               if (krx->krx_kiov[0].kiov_page != NULL)
+                       ep_dvma_unload(kqswnal_data.kqn_ep,
+                                      kqswnal_data.kqn_ep_rx_nmh,
+                                      &krx->krx_elanbuffer);
        }
+       /* <----------- to here */
 
-       if (kqswnal_data.kqn_txds != NULL)
-       {
-               int   i;
+       if (kqswnal_data.kqn_ep_rx_nmh != NULL)
+               ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
 
-               for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
-               {
-                       kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
+       if (kqswnal_data.kqn_ep_tx_nmh != NULL)
+               ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
 
-                       if (ktx->ktx_buffer != NULL)
-                               PORTAL_FREE(ktx->ktx_buffer,
-                                           KQSW_TX_BUFFER_SIZE);
-               }
+       while (kqswnal_data.kqn_txds != NULL) {
+               ktx = kqswnal_data.kqn_txds;
 
-               PORTAL_FREE(kqswnal_data.kqn_txds,
-                           sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
-                                                    KQSW_NNBLK_TXMSGS));
+               if (ktx->ktx_buffer != NULL)
+                       LIBCFS_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
+
+               kqswnal_data.kqn_txds = ktx->ktx_alloclist;
+               LIBCFS_FREE(ktx, sizeof(*ktx));
        }
 
-       if (kqswnal_data.kqn_rxds != NULL)
-       {
-               int   i;
-               int   j;
+       while (kqswnal_data.kqn_rxds != NULL) {
+               int           i;
 
-               for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
-               {
-                       kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
-
-                       for (j = 0; j < krx->krx_npages; j++)
-                               if (krx->krx_pages[j] != NULL)
-                                       __free_page (krx->krx_pages[j]);
-               }
+               krx = kqswnal_data.kqn_rxds;
+               for (i = 0; i < krx->krx_npages; i++)
+                       if (krx->krx_kiov[i].kiov_page != NULL)
+                               __free_page (krx->krx_kiov[i].kiov_page);
 
-               PORTAL_FREE(kqswnal_data.kqn_rxds,
-                           sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
-                                                   KQSW_NRXMSGS_LARGE));
+               kqswnal_data.kqn_rxds = krx->krx_alloclist;
+               LIBCFS_FREE(krx, sizeof (*krx));
        }
 
        /* resets flags, pointers to NULL etc */
        memset(&kqswnal_data, 0, sizeof (kqswnal_data));
 
-       CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
+       CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&libcfs_kmemory));
 
-       printk (KERN_INFO "Routing QSW NAL unloaded (final mem %d)\n",
-                atomic_read(&portal_kmemory));
+       module_put(THIS_MODULE);
 }
 
-static int __init
-kqswnal_initialise (void)
+int
+kqswnal_startup (lnet_ni_t *ni)
 {
-       ELAN3_DMA_REQUEST dmareq;
+       EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
        int               rc;
        int               i;
+       kqswnal_rx_t     *krx;
+       kqswnal_tx_t     *ktx;
        int               elan_page_idx;
-       int               pkmem = atomic_read(&portal_kmemory);
-
-       LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
-
-       CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
-
-       kqswnal_api.forward  = kqswnal_forward;
-       kqswnal_api.shutdown = kqswnal_shutdown;
-       kqswnal_api.yield    = kqswnal_yield;
-       kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
-       kqswnal_api.lock     = kqswnal_lock;
-       kqswnal_api.unlock   = kqswnal_unlock;
-       kqswnal_api.nal_data = &kqswnal_data;
-
-       kqswnal_lib.nal_data = &kqswnal_data;
 
+       LASSERT (ni->ni_lnd == &the_kqswlnd);
+
+       /* Only 1 instance supported */
+       if (kqswnal_data.kqn_init != KQN_INIT_NOTHING) {
+                CERROR ("Only 1 instance supported\n");
+                return -EPERM;
+        }
+
+        if (ni->ni_interfaces[0] != NULL) {
+                CERROR("Explicit interface config not supported\n");
+                return -EPERM;
+        }
+
+       if (*kqswnal_tunables.kqn_credits >=
+           *kqswnal_tunables.kqn_ntxmsgs) {
+               LCONSOLE_ERROR_MSG(0x12e, "Configuration error: please set "
+                                  "ntxmsgs(%d) > credits(%d)\n",
+                                  *kqswnal_tunables.kqn_ntxmsgs,
+                                  *kqswnal_tunables.kqn_credits);
+       }
+        
+       CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&libcfs_kmemory));
+       
        /* ensure all pointers NULL etc */
        memset (&kqswnal_data, 0, sizeof (kqswnal_data));
 
-       kqswnal_data.kqn_cb = &kqswnal_lib;
+       kqswnal_data.kqn_ni = ni;
+       ni->ni_data = &kqswnal_data;
+       ni->ni_peertxcredits = *kqswnal_tunables.kqn_peercredits;
+       ni->ni_maxtxcredits = *kqswnal_tunables.kqn_credits;
 
-       INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
-       INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
-       INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
-       spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
-       init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
-       INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
+       CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
+       CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
+       spin_lock_init(&kqswnal_data.kqn_idletxd_lock);
 
-       INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
-       INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
-       INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
+       CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
+       CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_donetxds);
+       CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
 
-       spin_lock_init (&kqswnal_data.kqn_sched_lock);
+       spin_lock_init(&kqswnal_data.kqn_sched_lock);
        init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
 
-       spin_lock_init (&kqswnal_data.kqn_statelock);
-
        /* pointers/lists/locks initialised */
        kqswnal_data.kqn_init = KQN_INIT_DATA;
+       try_module_get(THIS_MODULE);
+       
+       kqswnal_data.kqn_ep = ep_system();
+       if (kqswnal_data.kqn_ep == NULL) {
+               CERROR("Can't initialise EKC\n");
+               kqswnal_shutdown(ni);
+               return (-ENODEV);
+       }
 
-       /**********************************************************************/
-       /* Find the first Elan device */
-
-       kqswnal_data.kqn_epdev = ep_device (0);
-       if (kqswnal_data.kqn_epdev == NULL)
-       {
-               CERROR ("Can't get elan device 0\n");
-               return (-ENOMEM);
+       if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
+               CERROR("Can't get elan ID\n");
+               kqswnal_shutdown(ni);
+               return (-ENODEV);
        }
 
-       kqswnal_data.kqn_nid_offset = 0;
-       kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_epdev);
-       kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_epdev);
+       kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
+       kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
+
+       ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), kqswnal_data.kqn_elanid);
        
        /**********************************************************************/
        /* Get the transmitter */
 
-       kqswnal_data.kqn_eptx = ep_alloc_large_xmtr (kqswnal_data.kqn_epdev);
+       kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
        if (kqswnal_data.kqn_eptx == NULL)
        {
                CERROR ("Can't allocate transmitter\n");
-               kqswnal_finalise ();
+               kqswnal_shutdown (ni);
                return (-ENOMEM);
        }
 
        /**********************************************************************/
        /* Get the receivers */
 
-       kqswnal_data.kqn_eprx_small = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
-                                                            EP_SVC_LARGE_PORTALS_SMALL,
-                                                            KQSW_EP_ENVELOPES_SMALL);
+       kqswnal_data.kqn_eprx_small = 
+               ep_alloc_rcvr (kqswnal_data.kqn_ep,
+                              EP_MSG_SVC_PORTALS_SMALL,
+                              *kqswnal_tunables.kqn_ep_envelopes_small);
        if (kqswnal_data.kqn_eprx_small == NULL)
        {
                CERROR ("Can't install small msg receiver\n");
-               kqswnal_finalise ();
+               kqswnal_shutdown (ni);
                return (-ENOMEM);
        }
 
-       kqswnal_data.kqn_eprx_large = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
-                                                            EP_SVC_LARGE_PORTALS_LARGE,
-                                                            KQSW_EP_ENVELOPES_LARGE);
+       kqswnal_data.kqn_eprx_large = 
+               ep_alloc_rcvr (kqswnal_data.kqn_ep,
+                              EP_MSG_SVC_PORTALS_LARGE,
+                              *kqswnal_tunables.kqn_ep_envelopes_large);
        if (kqswnal_data.kqn_eprx_large == NULL)
        {
                CERROR ("Can't install large msg receiver\n");
-               kqswnal_finalise ();
+               kqswnal_shutdown (ni);
                return (-ENOMEM);
        }
 
        /**********************************************************************/
-       /* Reserve Elan address space for transmit buffers */
-
-        dmareq.Waitfn   = DDI_DMA_SLEEP;
-        dmareq.ElanAddr = (E3_Addr) 0;
-        dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
-        dmareq.Perm     = ELAN_PERM_REMOTEREAD;
-
-       rc = elan3_dma_reserve(kqswnal_data.kqn_epdev->DmaState,
-                             KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
-                             &dmareq, &kqswnal_data.kqn_eptxdmahandle);
-       if (rc != DDI_SUCCESS)
-       {
-               CERROR ("Can't reserve rx dma space\n");
-               kqswnal_finalise ();
+       /* Reserve Elan address space for transmit descriptors NB we may
+        * either send the contents of associated buffers immediately, or
+        * map them for the peer to suck/blow... */
+       kqswnal_data.kqn_ep_tx_nmh = 
+               ep_dvma_reserve(kqswnal_data.kqn_ep,
+                               KQSW_NTXMSGPAGES*(*kqswnal_tunables.kqn_ntxmsgs),
+                               EP_PERM_WRITE);
+       if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
+               CERROR("Can't reserve tx dma space\n");
+               kqswnal_shutdown(ni);
                return (-ENOMEM);
        }
 
        /**********************************************************************/
        /* Reserve Elan address space for receive buffers */
-
-        dmareq.Waitfn   = DDI_DMA_SLEEP;
-        dmareq.ElanAddr = (E3_Addr) 0;
-        dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
-        dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
-
-       rc = elan3_dma_reserve (kqswnal_data.kqn_epdev->DmaState,
-                               KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
-                               KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
-                               &dmareq, &kqswnal_data.kqn_eprxdmahandle);
-       if (rc != DDI_SUCCESS)
-       {
-               CERROR ("Can't reserve rx dma space\n");
-               kqswnal_finalise ();
+       kqswnal_data.kqn_ep_rx_nmh =
+               ep_dvma_reserve(kqswnal_data.kqn_ep,
+                               KQSW_NRXMSGPAGES_SMALL * 
+                               (*kqswnal_tunables.kqn_nrxmsgs_small) +
+                               KQSW_NRXMSGPAGES_LARGE * 
+                               (*kqswnal_tunables.kqn_nrxmsgs_large),
+                               EP_PERM_WRITE);
+       if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
+               CERROR("Can't reserve rx dma space\n");
+               kqswnal_shutdown(ni);
                return (-ENOMEM);
        }
 
        /**********************************************************************/
        /* Allocate/Initialise transmit descriptors */
 
-       PORTAL_ALLOC(kqswnal_data.kqn_txds,
-                    sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
-       if (kqswnal_data.kqn_txds == NULL)
-       {
-               kqswnal_finalise ();
-               return (-ENOMEM);
-       }
-
-       /* clear flags, null pointers etc */
-       memset(kqswnal_data.kqn_txds, 0,
-              sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
-       for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
+       kqswnal_data.kqn_txds = NULL;
+       for (i = 0; i < (*kqswnal_tunables.kqn_ntxmsgs); i++)
        {
                int           premapped_pages;
-               kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
                int           basepage = i * KQSW_NTXMSGPAGES;
 
-               PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
+               LIBCFS_ALLOC (ktx, sizeof(*ktx));
+               if (ktx == NULL) {
+                       kqswnal_shutdown (ni);
+                       return (-ENOMEM);
+               }
+
+               memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
+               ktx->ktx_alloclist = kqswnal_data.kqn_txds;
+               kqswnal_data.kqn_txds = ktx;
+
+               LIBCFS_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
                if (ktx->ktx_buffer == NULL)
                {
-                       kqswnal_finalise ();
+                       kqswnal_shutdown (ni);
                        return (-ENOMEM);
                }
 
                /* Map pre-allocated buffer NOW, to save latency on transmit */
                premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
                                                        KQSW_TX_BUFFER_SIZE);
-
-               elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
-                                      kqswnal_data.kqn_eptxdmahandle,
-                                      ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
-                                      basepage, &ktx->ktx_ebuffer);
+               ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
+                            ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
+                            kqswnal_data.kqn_ep_tx_nmh, basepage,
+                            &all_rails, &ktx->ktx_ebuffer);
 
                ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
                ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
 
-               INIT_LIST_HEAD (&ktx->ktx_delayed_list);
+               CFS_INIT_LIST_HEAD (&ktx->ktx_schedlist);
 
-               ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
-               list_add_tail (&ktx->ktx_list, 
-                              ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
-                                                &kqswnal_data.kqn_idletxds);
+               ktx->ktx_state = KTX_IDLE;
+               ktx->ktx_rail = -1;             /* unset rail */
+
+               cfs_list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
        }
 
        /**********************************************************************/
        /* Allocate/Initialise receive descriptors */
-
-       PORTAL_ALLOC (kqswnal_data.kqn_rxds,
-                     sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
-       if (kqswnal_data.kqn_rxds == NULL)
-       {
-               kqswnal_finalise ();
-               return (-ENOMEM);
-       }
-
-       memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
-              sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
-
+       kqswnal_data.kqn_rxds = NULL;
        elan_page_idx = 0;
-       for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
+       for (i = 0; i < *kqswnal_tunables.kqn_nrxmsgs_small + *kqswnal_tunables.kqn_nrxmsgs_large; i++)
        {
-               E3_Addr       elanaddr;
+               EP_NMD        elanbuffer;
                int           j;
-               kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
 
-               if (i < KQSW_NRXMSGS_SMALL)
+               LIBCFS_ALLOC(krx, sizeof(*krx));
+               if (krx == NULL) {
+                       kqswnal_shutdown(ni);
+                       return (-ENOMEM);
+               }
+
+               memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
+               krx->krx_alloclist = kqswnal_data.kqn_rxds;
+               kqswnal_data.kqn_rxds = krx;
+
+               if (i < *kqswnal_tunables.kqn_nrxmsgs_small)
                {
                        krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
                        krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
@@ -540,108 +474,94 @@ kqswnal_initialise (void)
                LASSERT (krx->krx_npages > 0);
                for (j = 0; j < krx->krx_npages; j++)
                {
-                       krx->krx_pages[j] = alloc_page(GFP_KERNEL);
-                       if (krx->krx_pages[j] == NULL)
-                       {
-                               kqswnal_finalise ();
+                       struct page *page = alloc_page(GFP_KERNEL);
+                       
+                       if (page == NULL) {
+                               kqswnal_shutdown (ni);
                                return (-ENOMEM);
                        }
 
-                       LASSERT(page_address(krx->krx_pages[j]) != NULL);
-
-                       elan3_dvma_kaddr_load(kqswnal_data.kqn_epdev->DmaState,
-                                             kqswnal_data.kqn_eprxdmahandle,
-                                             page_address(krx->krx_pages[j]),
-                                             PAGE_SIZE, elan_page_idx,
-                                             &elanaddr);
+                       krx->krx_kiov[j] = (lnet_kiov_t) {.kiov_page = page,
+                                                         .kiov_offset = 0,
+                                                         .kiov_len = PAGE_SIZE};
+                       LASSERT(page_address(page) != NULL);
+
+                       ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                                    page_address(page),
+                                    PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
+                                    elan_page_idx, &all_rails, &elanbuffer);
+                       
+                       if (j == 0) {
+                               krx->krx_elanbuffer = elanbuffer;
+                       } else {
+                               rc = ep_nmd_merge(&krx->krx_elanbuffer,
+                                                 &krx->krx_elanbuffer, 
+                                                 &elanbuffer);
+                               /* NB contiguous mapping */
+                               LASSERT(rc);
+                       }
                        elan_page_idx++;
 
-                       if (j == 0)
-                               krx->krx_elanaddr = elanaddr;
-
-                       /* NB we assume a contiguous  */
-                       LASSERT (elanaddr == krx->krx_elanaddr + j * PAGE_SIZE);
                }
        }
        LASSERT (elan_page_idx ==
-                (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
-                (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
-
-       /**********************************************************************/
-       /* Network interface ready to initialise */
-
-        rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
-        if (rc != 0)
-       {
-               CERROR ("PtlNIInit failed %d\n", rc);
-               kqswnal_finalise ();
-               return (-ENOMEM);
-       }
-
-       kqswnal_data.kqn_init = KQN_INIT_PTL;
+                (*kqswnal_tunables.kqn_nrxmsgs_small * KQSW_NRXMSGPAGES_SMALL) +
+                (*kqswnal_tunables.kqn_nrxmsgs_large * KQSW_NRXMSGPAGES_LARGE));
 
        /**********************************************************************/
        /* Queue receives, now that it's OK to run their completion callbacks */
 
-       for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
-       {
-               kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
-
+       for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
                /* NB this enqueue can allocate/sleep (attr == 0) */
+               krx->krx_state = KRX_POSTED;
                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                                     krx->krx_elanaddr,
-                                     krx->krx_npages * PAGE_SIZE, 0);
-               if (rc != 0)
-               {
+                                     &krx->krx_elanbuffer, 0);
+               if (rc != EP_SUCCESS) {
                        CERROR ("failed ep_queue_receive %d\n", rc);
-                       kqswnal_finalise ();
-                       return (-ENOMEM);
+                       kqswnal_shutdown (ni);
+                       return (-EIO);
                }
        }
 
        /**********************************************************************/
        /* Spawn scheduling threads */
-       for (i = 0; i < smp_num_cpus; i++)
-       {
-               rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
+       for (i = 0; i < num_online_cpus(); i++) {
+               rc = kqswnal_thread_start(kqswnal_scheduler, NULL,
+                                         "kqswnal_sched");
                if (rc != 0)
                {
                        CERROR ("failed to spawn scheduling thread: %d\n", rc);
-                       kqswnal_finalise ();
-                       return (rc);
+                       kqswnal_shutdown (ni);
+                       return (-ESRCH);
                }
        }
 
-       /**********************************************************************/
-       /* Connect to the router */
-       rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
-       CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
-
-       rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
-       if (rc != 0) {
-               CERROR ("Can't initialise command interface (rc = %d)\n", rc);
-               kqswnal_finalise ();
-               return (rc);
-       }
-
-       PORTAL_SYMBOL_REGISTER(kqswnal_ni);
        kqswnal_data.kqn_init = KQN_INIT_ALL;
+       return (0);
+}
 
-       printk(KERN_INFO "Routing QSW NAL loaded on node %d of %d "
-              "(Routing %s, initial mem %d)\n", 
-              kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
-              kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
-              pkmem);
+void __exit
+kqswnal_finalise (void)
+{
+       lnet_unregister_lnd(&the_kqswlnd);
+       kqswnal_tunables_fini();
+}
+
+static int __init
+kqswnal_initialise (void)
+{
+       int   rc = kqswnal_tunables_init();
+       
+       if (rc != 0)
+               return rc;
 
+       lnet_register_lnd(&the_kqswlnd);
        return (0);
 }
 
-
-MODULE_AUTHOR("W. Marcus Miller <marcusm@llnl.gov>");
-MODULE_DESCRIPTION("Kernel Quadrics Switch NAL v1.00");
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
+MODULE_DESCRIPTION("Kernel Quadrics/Elan LND v1.01");
 MODULE_LICENSE("GPL");
 
 module_init (kqswnal_initialise);
 module_exit (kqswnal_finalise);
-
-EXPORT_SYMBOL (kqswnal_ni);