Whamcloud - gitweb
land b1_4_bgl on HEAD (20050404_1913)
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
index 90c9a95..be01f5d 100644 (file)
@@ -1,11 +1,8 @@
 /*
- * Copyright (C) 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2002-2004 Cluster File Systems, Inc.
  *   Author: Eric Barton <eric@bartonsoftware.com>
  *
- * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
- * W. Marcus Miller - Based on ksocknal
- *
- * This file is part of Portals, http://www.sf.net/projects/lustre/
+ * This file is part of Portals, http://www.lustre.org
  *
  * Portals is free software; you can redistribute it and/or
  * modify it under the terms of version 2 of the GNU General Public
 
 #include "qswnal.h"
 
-ptl_handle_ni_t                kqswnal_ni;
 nal_t                  kqswnal_api;
 kqswnal_data_t         kqswnal_data;
+ptl_handle_ni_t         kqswnal_ni;
+kqswnal_tunables_t      kqswnal_tunables;
 
 kpr_nal_interface_t kqswnal_router_interface = {
        kprni_nalid:    QSWNAL,
@@ -39,14 +37,14 @@ kpr_nal_interface_t kqswnal_router_interface = {
 #define QSWNAL_SYSCTL  201
 
 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
-#define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
+#define QSWNAL_SYSCTL_OPTIMIZED_PUTS     2
 
 static ctl_table kqswnal_ctl_table[] = {
-       {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
-        &kqswnal_data.kqn_optimized_gets, sizeof (int),
+       {QSWNAL_SYSCTL_OPTIMIZED_PUTS, "optimized_puts",
+        &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
         0644, NULL, &proc_dointvec},
-       {QSWNAL_SYSCTL_COPY_SMALL_FWD, "copy_small_fwd",
-        &kqswnal_data.kqn_copy_small_fwd, sizeof (int),
+       {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
+        &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
         0644, NULL, &proc_dointvec},
        {0}
 };
@@ -57,88 +55,13 @@ static ctl_table kqswnal_top_ctl_table[] = {
 };
 #endif
 
-static int
-kqswnal_forward(nal_t   *nal,
-               int     id,
-               void    *args,  size_t args_len,
-               void    *ret,   size_t ret_len)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
-       return (PTL_OK);
-}
-
-static void
-kqswnal_lock (nal_t *nal, unsigned long *flags)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       nal_cb->cb_cli(nal_cb,flags);
-}
-
-static void
-kqswnal_unlock(nal_t *nal, unsigned long *flags)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       nal_cb->cb_sti(nal_cb,flags);
-}
-
-static int
-kqswnal_shutdown(nal_t *nal, int ni)
-{
-       CDEBUG (D_NET, "shutdown\n");
-
-       LASSERT (nal == &kqswnal_api);
-       return (0);
-}
-
-static void
-kqswnal_yield( nal_t *nal )
-{
-       CDEBUG (D_NET, "yield\n");
-
-       if (current->need_resched)
-               schedule();
-       return;
-}
-
-static nal_t *
-kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
-            ptl_pid_t requested_pid)
-{
-       ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
-       int       nnids = kqswnal_data.kqn_nnodes;
-
-        CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
-
-       lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
-
-       return (&kqswnal_api);
-}
-
 int
 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
 {
        unsigned long      flags;
        struct list_head  *tmp;
        kqswnal_tx_t      *ktx;
+       ptl_hdr_t         *hdr;
        int                index = pcfg->pcfg_count;
        int                rc = -ENOENT;
 
@@ -149,11 +72,13 @@ kqswnal_get_tx_desc (struct portals_cfg *pcfg)
                        continue;
 
                ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
+               hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
-               pcfg->pcfg_pbuf1 = (char *)ktx;
-               pcfg->pcfg_count = NTOH__u32(ktx->ktx_wire_hdr->type);
-               pcfg->pcfg_size  = NTOH__u32(ktx->ktx_wire_hdr->payload_length);
-               pcfg->pcfg_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
+               memcpy(pcfg->pcfg_pbuf, ktx,
+                      MIN(sizeof(*ktx), pcfg->pcfg_plen1));
+               pcfg->pcfg_count = le32_to_cpu(hdr->type);
+               pcfg->pcfg_size  = le32_to_cpu(hdr->payload_length);
+               pcfg->pcfg_nid   = le64_to_cpu(hdr->dest_nid);
                pcfg->pcfg_nid2  = ktx->ktx_nid;
                pcfg->pcfg_misc  = ktx->ktx_launcher;
                pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
@@ -182,7 +107,7 @@ kqswnal_cmd (struct portals_cfg *pcfg, void *private)
                        kqswnal_data.kqn_nid_offset);
                kqswnal_data.kqn_nid_offset =
                        pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
-               kqswnal_lib.ni.nid = pcfg->pcfg_nid;
+               kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
                return (0);
                
        default:
@@ -190,26 +115,34 @@ kqswnal_cmd (struct portals_cfg *pcfg, void *private)
        }
 }
 
-void __exit
-kqswnal_finalise (void)
+static void
+kqswnal_shutdown(nal_t *nal)
 {
+       unsigned long flags;
+       kqswnal_tx_t *ktx;
+       kqswnal_rx_t *krx;
+       int           do_lib_fini = 0;
+
+       /* NB The first ref was this module! */
+       if (nal->nal_refct != 0) {
+               PORTAL_MODULE_UNUSE;
+               return;
+       }
+
+       CDEBUG (D_NET, "shutdown\n");
+       LASSERT (nal == &kqswnal_api);
+
        switch (kqswnal_data.kqn_init)
        {
        default:
                LASSERT (0);
 
        case KQN_INIT_ALL:
-#if CONFIG_SYSCTL
-                if (kqswnal_data.kqn_sysctl != NULL)
-                        unregister_sysctl_table (kqswnal_data.kqn_sysctl);
-#endif         
-               PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
-                kportal_nal_unregister(QSWNAL);
+                libcfs_nal_cmd_unregister(QSWNAL);
                /* fall through */
 
-       case KQN_INIT_PTL:
-               PtlNIFini (kqswnal_ni);
-               lib_fini (&kqswnal_lib);
+       case KQN_INIT_LIB:
+               do_lib_fini = 1;
                /* fall through */
 
        case KQN_INIT_DATA:
@@ -220,18 +153,24 @@ kqswnal_finalise (void)
        }
 
        /**********************************************************************/
-       /* Make router stop her calling me and fail any more call-ins */
+       /* Tell router we're shutting down.  Any router calls my threads
+        * make will now fail immediately and the router will stop calling
+        * into me. */
        kpr_shutdown (&kqswnal_data.kqn_router);
-
+       
        /**********************************************************************/
-       /* flag threads we've started to terminate and wait for all to ack */
-
+       /* Signal the start of shutdown... */
+       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
        kqswnal_data.kqn_shuttingdown = 1;
-       wake_up_all (&kqswnal_data.kqn_sched_waitq);
+       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
+
+       wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
 
-       while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
-               CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
-                      atomic_read (&kqswnal_data.kqn_nthreads_running));
+       /**********************************************************************/
+       /* wait for sends that have allocated a tx desc to launch or give up */
+       while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
+               CDEBUG(D_NET, "waiting for %d pending sends\n",
+                      atomic_read (&kqswnal_data.kqn_pending_txs));
                set_current_state (TASK_UNINTERRUPTIBLE);
                schedule_timeout (HZ);
        }
@@ -239,18 +178,27 @@ kqswnal_finalise (void)
        /**********************************************************************/
        /* close elan comms */
 #if MULTIRAIL_EKC
+       /* Shut down receivers first; rx callbacks might try sending... */
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_free_rcvr (kqswnal_data.kqn_eprx_small);
 
        if (kqswnal_data.kqn_eprx_large != NULL)
                ep_free_rcvr (kqswnal_data.kqn_eprx_large);
 
+       /* NB ep_free_rcvr() returns only after we've freed off all receive
+        * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
+        * means we must have completed any messages we passed to
+        * lib_parse() or kpr_fwd_start(). */
+
        if (kqswnal_data.kqn_eptx != NULL)
                ep_free_xmtr (kqswnal_data.kqn_eptx);
 
-       /* freeing the xmtr completes all txs pdq */
+       /* NB ep_free_xmtr() returns only after all outstanding transmits
+        * have called their callback... */
        LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
 #else
+       /* "Old" EKC just pretends to shutdown cleanly but actually
+        * provides no guarantees */
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
 
@@ -269,7 +217,6 @@ kqswnal_finalise (void)
 #endif
        /**********************************************************************/
        /* flag threads to terminate, wake them and wait for them to die */
-
        kqswnal_data.kqn_shuttingdown = 2;
        wake_up_all (&kqswnal_data.kqn_sched_waitq);
 
@@ -287,10 +234,12 @@ kqswnal_finalise (void)
 
 #if MULTIRAIL_EKC
        LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
+       LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
+       LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
 #endif
 
        /**********************************************************************/
-       /* Complete any blocked forwarding packets with error
+       /* Complete any blocked forwarding packets, with error
         */
 
        while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
@@ -298,23 +247,16 @@ kqswnal_finalise (void)
                kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
                                                  kpr_fwd_desc_t, kprfd_list);
                list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
-       }
-
-       while (!list_empty (&kqswnal_data.kqn_delayedfwds))
-       {
-               kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
-                                                 kpr_fwd_desc_t, kprfd_list);
-               list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
+               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
        }
 
        /**********************************************************************/
-       /* Wait for router to complete any packets I sent her
-        */
+       /* finalise router and portals lib */
 
        kpr_deregister (&kqswnal_data.kqn_router);
 
+       if (do_lib_fini)
+               lib_fini (&kqswnal_lib);
 
        /**********************************************************************/
        /* Unmap message buffers and free all descriptors and buffers
@@ -325,37 +267,25 @@ kqswnal_finalise (void)
         * ep_dvma_release() get fixed (and releases any mappings in the
         * region), we can delete all the code from here -------->  */
 
-       if (kqswnal_data.kqn_txds != NULL) {
-               int  i;
+       for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
+               /* If ktx has a buffer, it got mapped; unmap now.  NB only
+                * the pre-mapped stuff is still mapped since all tx descs
+                * must be idle */
 
-               for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
-                       kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
-
-                       /* If ktx has a buffer, it got mapped; unmap now.
-                        * NB only the pre-mapped stuff is still mapped
-                        * since all tx descs must be idle */
-
-                       if (ktx->ktx_buffer != NULL)
-                               ep_dvma_unload(kqswnal_data.kqn_ep,
-                                              kqswnal_data.kqn_ep_tx_nmh,
-                                              &ktx->ktx_ebuffer);
-               }
+               if (ktx->ktx_buffer != NULL)
+                       ep_dvma_unload(kqswnal_data.kqn_ep,
+                                      kqswnal_data.kqn_ep_tx_nmh,
+                                      &ktx->ktx_ebuffer);
        }
 
-       if (kqswnal_data.kqn_rxds != NULL) {
-               int   i;
-
-               for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
-                       kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
-
-                       /* If krx_pages[0] got allocated, it got mapped.
-                        * NB subsequent pages get merged */
+       for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
+               /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
+                * NB subsequent pages get merged */
 
-                       if (krx->krx_pages[0] != NULL)
-                               ep_dvma_unload(kqswnal_data.kqn_ep,
-                                              kqswnal_data.kqn_ep_rx_nmh,
-                                              &krx->krx_elanbuffer);
-               }
+               if (krx->krx_kiov[0].kiov_page != NULL)
+                       ep_dvma_unload(kqswnal_data.kqn_ep,
+                                      kqswnal_data.kqn_ep_rx_nmh,
+                                      &krx->krx_elanbuffer);
        }
        /* <----------- to here */
 
@@ -388,41 +318,26 @@ kqswnal_finalise (void)
        }
 #endif
 
-       if (kqswnal_data.kqn_txds != NULL)
-       {
-               int   i;
+       while (kqswnal_data.kqn_txds != NULL) {
+               ktx = kqswnal_data.kqn_txds;
 
-               for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
-               {
-                       kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
-
-                       if (ktx->ktx_buffer != NULL)
-                               PORTAL_FREE(ktx->ktx_buffer,
-                                           KQSW_TX_BUFFER_SIZE);
-               }
+               if (ktx->ktx_buffer != NULL)
+                       PORTAL_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
 
-               PORTAL_FREE(kqswnal_data.kqn_txds,
-                           sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
-                                                    KQSW_NNBLK_TXMSGS));
+               kqswnal_data.kqn_txds = ktx->ktx_alloclist;
+               PORTAL_FREE(ktx, sizeof(*ktx));
        }
 
-       if (kqswnal_data.kqn_rxds != NULL)
-       {
-               int   i;
-               int   j;
+       while (kqswnal_data.kqn_rxds != NULL) {
+               int           i;
 
-               for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
-               {
-                       kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
+               krx = kqswnal_data.kqn_rxds;
+               for (i = 0; i < krx->krx_npages; i++)
+                       if (krx->krx_kiov[i].kiov_page != NULL)
+                               __free_page (krx->krx_kiov[i].kiov_page);
 
-                       for (j = 0; j < krx->krx_npages; j++)
-                               if (krx->krx_pages[j] != NULL)
-                                       __free_page (krx->krx_pages[j]);
-               }
-
-               PORTAL_FREE(kqswnal_data.kqn_rxds,
-                           sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
-                                                   KQSW_NRXMSGS_LARGE));
+               kqswnal_data.kqn_rxds = krx->krx_alloclist;
+               PORTAL_FREE(krx, sizeof (*krx));
        }
 
        /* resets flags, pointers to NULL etc */
@@ -434,8 +349,10 @@ kqswnal_finalise (void)
                 atomic_read(&portal_kmemory));
 }
 
-static int __init
-kqswnal_initialise (void)
+static int
+kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
+                ptl_ni_limits_t *requested_limits, 
+                ptl_ni_limits_t *actual_limits)
 {
 #if MULTIRAIL_EKC
        EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
@@ -444,38 +361,29 @@ kqswnal_initialise (void)
 #endif
        int               rc;
        int               i;
+       kqswnal_rx_t     *krx;
+       kqswnal_tx_t     *ktx;
        int               elan_page_idx;
+       ptl_process_id_t  my_process_id;
        int               pkmem = atomic_read(&portal_kmemory);
 
-       LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
+       LASSERT (nal == &kqswnal_api);
 
-       CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
+       if (nal->nal_refct != 0) {
+               if (actual_limits != NULL)
+                       *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
+               /* This module got the first ref */
+               PORTAL_MODULE_USE;
+               return (PTL_OK);
+       }
 
-       kqswnal_api.forward  = kqswnal_forward;
-       kqswnal_api.shutdown = kqswnal_shutdown;
-       kqswnal_api.yield    = kqswnal_yield;
-       kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
-       kqswnal_api.lock     = kqswnal_lock;
-       kqswnal_api.unlock   = kqswnal_unlock;
-       kqswnal_api.nal_data = &kqswnal_data;
+       LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
 
-       kqswnal_lib.nal_data = &kqswnal_data;
+       CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
 
-       memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
-       memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
-#if MULTIRAIL_EKC
-       kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
-#else
-       kqswnal_rpc_failed.Status = -ECONNREFUSED;
-#endif
        /* ensure all pointers NULL etc */
        memset (&kqswnal_data, 0, sizeof (kqswnal_data));
 
-       kqswnal_data.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
-       kqswnal_data.kqn_copy_small_fwd = KQSW_COPY_SMALL_FWD;
-
-       kqswnal_data.kqn_cb = &kqswnal_lib;
-
        INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
        INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
        INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
@@ -490,22 +398,28 @@ kqswnal_initialise (void)
        spin_lock_init (&kqswnal_data.kqn_sched_lock);
        init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
 
-       spin_lock_init (&kqswnal_data.kqn_statelock);
+       /* Leave kqn_rpc_success zeroed */
+#if MULTIRAIL_EKC
+       kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
+#else
+       kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
+#endif
 
        /* pointers/lists/locks initialised */
        kqswnal_data.kqn_init = KQN_INIT_DATA;
-
+       
 #if MULTIRAIL_EKC
        kqswnal_data.kqn_ep = ep_system();
        if (kqswnal_data.kqn_ep == NULL) {
                CERROR("Can't initialise EKC\n");
-               return (-ENODEV);
+               kqswnal_shutdown(nal);
+               return (PTL_IFACE_INVALID);
        }
 
        if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
                CERROR("Can't get elan ID\n");
-               kqswnal_finalise();
-               return (-ENODEV);
+               kqswnal_shutdown(nal);
+               return (PTL_IFACE_INVALID);
        }
 #else
        /**********************************************************************/
@@ -515,7 +429,8 @@ kqswnal_initialise (void)
        if (kqswnal_data.kqn_ep == NULL)
        {
                CERROR ("Can't get elan device 0\n");
-               return (-ENODEV);
+               kqswnal_shutdown(nal);
+               return (PTL_IFACE_INVALID);
        }
 #endif
 
@@ -530,8 +445,8 @@ kqswnal_initialise (void)
        if (kqswnal_data.kqn_eptx == NULL)
        {
                CERROR ("Can't allocate transmitter\n");
-               kqswnal_finalise ();
-               return (-ENOMEM);
+               kqswnal_shutdown (nal);
+               return (PTL_NO_SPACE);
        }
 
        /**********************************************************************/
@@ -543,8 +458,8 @@ kqswnal_initialise (void)
        if (kqswnal_data.kqn_eprx_small == NULL)
        {
                CERROR ("Can't install small msg receiver\n");
-               kqswnal_finalise ();
-               return (-ENOMEM);
+               kqswnal_shutdown (nal);
+               return (PTL_NO_SPACE);
        }
 
        kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
@@ -553,8 +468,8 @@ kqswnal_initialise (void)
        if (kqswnal_data.kqn_eprx_large == NULL)
        {
                CERROR ("Can't install large msg receiver\n");
-               kqswnal_finalise ();
-               return (-ENOMEM);
+               kqswnal_shutdown (nal);
+               return (PTL_NO_SPACE);
        }
 
        /**********************************************************************/
@@ -568,8 +483,8 @@ kqswnal_initialise (void)
                                EP_PERM_WRITE);
        if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
                CERROR("Can't reserve tx dma space\n");
-               kqswnal_finalise();
-               return (-ENOMEM);
+               kqswnal_shutdown(nal);
+               return (PTL_NO_SPACE);
        }
 #else
         dmareq.Waitfn   = DDI_DMA_SLEEP;
@@ -583,8 +498,8 @@ kqswnal_initialise (void)
        if (rc != DDI_SUCCESS)
        {
                CERROR ("Can't reserve rx dma space\n");
-               kqswnal_finalise ();
-               return (-ENOMEM);
+               kqswnal_shutdown (nal);
+               return (PTL_NO_SPACE);
        }
 #endif
        /**********************************************************************/
@@ -597,8 +512,8 @@ kqswnal_initialise (void)
                                EP_PERM_WRITE);
        if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
                CERROR("Can't reserve rx dma space\n");
-               kqswnal_finalise();
-               return (-ENOMEM);
+               kqswnal_shutdown(nal);
+               return (PTL_NO_SPACE);
        }
 #else
         dmareq.Waitfn   = DDI_DMA_SLEEP;
@@ -613,35 +528,34 @@ kqswnal_initialise (void)
        if (rc != DDI_SUCCESS)
        {
                CERROR ("Can't reserve rx dma space\n");
-               kqswnal_finalise ();
-               return (-ENOMEM);
+               kqswnal_shutdown (nal);
+               return (PTL_NO_SPACE);
        }
 #endif
        /**********************************************************************/
        /* Allocate/Initialise transmit descriptors */
 
-       PORTAL_ALLOC(kqswnal_data.kqn_txds,
-                    sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
-       if (kqswnal_data.kqn_txds == NULL)
-       {
-               kqswnal_finalise ();
-               return (-ENOMEM);
-       }
-
-       /* clear flags, null pointers etc */
-       memset(kqswnal_data.kqn_txds, 0,
-              sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
+       kqswnal_data.kqn_txds = NULL;
        for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
        {
                int           premapped_pages;
-               kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
                int           basepage = i * KQSW_NTXMSGPAGES;
 
+               PORTAL_ALLOC (ktx, sizeof(*ktx));
+               if (ktx == NULL) {
+                       kqswnal_shutdown (nal);
+                       return (PTL_NO_SPACE);
+               }
+
+               memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
+               ktx->ktx_alloclist = kqswnal_data.kqn_txds;
+               kqswnal_data.kqn_txds = ktx;
+
                PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
                if (ktx->ktx_buffer == NULL)
                {
-                       kqswnal_finalise ();
-                       return (-ENOMEM);
+                       kqswnal_shutdown (nal);
+                       return (PTL_NO_SPACE);
                }
 
                /* Map pre-allocated buffer NOW, to save latency on transmit */
@@ -664,6 +578,9 @@ kqswnal_initialise (void)
                INIT_LIST_HEAD (&ktx->ktx_delayed_list);
 
                ktx->ktx_state = KTX_IDLE;
+#if MULTIRAIL_EKC
+               ktx->ktx_rail = -1;             /* unset rail */
+#endif
                ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
                list_add_tail (&ktx->ktx_list, 
                               ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
@@ -672,18 +589,7 @@ kqswnal_initialise (void)
 
        /**********************************************************************/
        /* Allocate/Initialise receive descriptors */
-
-       PORTAL_ALLOC (kqswnal_data.kqn_rxds,
-                     sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
-       if (kqswnal_data.kqn_rxds == NULL)
-       {
-               kqswnal_finalise ();
-               return (-ENOMEM);
-       }
-
-       memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
-              sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
-
+       kqswnal_data.kqn_rxds = NULL;
        elan_page_idx = 0;
        for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
        {
@@ -693,7 +599,16 @@ kqswnal_initialise (void)
                E3_Addr       elanbuffer;
 #endif
                int           j;
-               kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
+
+               PORTAL_ALLOC(krx, sizeof(*krx));
+               if (krx == NULL) {
+                       kqswnal_shutdown(nal);
+                       return (PTL_NO_SPACE);
+               }
+
+               memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
+               krx->krx_alloclist = kqswnal_data.kqn_rxds;
+               kqswnal_data.kqn_rxds = krx;
 
                if (i < KQSW_NRXMSGS_SMALL)
                {
@@ -709,18 +624,19 @@ kqswnal_initialise (void)
                LASSERT (krx->krx_npages > 0);
                for (j = 0; j < krx->krx_npages; j++)
                {
-                       krx->krx_pages[j] = alloc_page(GFP_KERNEL);
-                       if (krx->krx_pages[j] == NULL)
-                       {
-                               kqswnal_finalise ();
-                               return (-ENOMEM);
+                       struct page *page = alloc_page(GFP_KERNEL);
+                       
+                       if (page == NULL) {
+                               kqswnal_shutdown (nal);
+                               return (PTL_NO_SPACE);
                        }
 
-                       LASSERT(page_address(krx->krx_pages[j]) != NULL);
+                       krx->krx_kiov[j].kiov_page = page;
+                       LASSERT(page_address(page) != NULL);
 
 #if MULTIRAIL_EKC
                        ep_dvma_load(kqswnal_data.kqn_ep, NULL,
-                                    page_address(krx->krx_pages[j]),
+                                    page_address(page),
                                     PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
                                     elan_page_idx, &all_rails, &elanbuffer);
                        
@@ -736,7 +652,7 @@ kqswnal_initialise (void)
 #else
                        elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
                                              kqswnal_data.kqn_eprxdmahandle,
-                                             page_address(krx->krx_pages[j]),
+                                             page_address(page),
                                              PAGE_SIZE, elan_page_idx,
                                              &elanbuffer);
                        if (j == 0)
@@ -756,24 +672,26 @@ kqswnal_initialise (void)
        /**********************************************************************/
        /* Network interface ready to initialise */
 
-        rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
-        if (rc != 0)
+       my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
+       my_process_id.pid = requested_pid;
+
+       rc = lib_init(&kqswnal_lib, nal, my_process_id,
+                     requested_limits, actual_limits);
+        if (rc != PTL_OK)
        {
-               CERROR ("PtlNIInit failed %d\n", rc);
-               kqswnal_finalise ();
-               return (-ENOMEM);
+               CERROR ("lib_init failed %d\n", rc);
+               kqswnal_shutdown (nal);
+               return (rc);
        }
 
-       kqswnal_data.kqn_init = KQN_INIT_PTL;
+       kqswnal_data.kqn_init = KQN_INIT_LIB;
 
        /**********************************************************************/
        /* Queue receives, now that it's OK to run their completion callbacks */
 
-       for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
-       {
-               kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
-
+       for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
                /* NB this enqueue can allocate/sleep (attr == 0) */
+               krx->krx_state = KRX_POSTED;
 #if MULTIRAIL_EKC
                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
                                      &krx->krx_elanbuffer, 0);
@@ -785,21 +703,20 @@ kqswnal_initialise (void)
                if (rc != EP_SUCCESS)
                {
                        CERROR ("failed ep_queue_receive %d\n", rc);
-                       kqswnal_finalise ();
-                       return (-ENOMEM);
+                       kqswnal_shutdown (nal);
+                       return (PTL_FAIL);
                }
        }
 
        /**********************************************************************/
        /* Spawn scheduling threads */
-       for (i = 0; i < smp_num_cpus; i++)
-       {
+       for (i = 0; i < num_online_cpus(); i++) {
                rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
                if (rc != 0)
                {
                        CERROR ("failed to spawn scheduling thread: %d\n", rc);
-                       kqswnal_finalise ();
-                       return (rc);
+                       kqswnal_shutdown (nal);
+                       return (PTL_FAIL);
                }
        }
 
@@ -808,19 +725,13 @@ kqswnal_initialise (void)
        rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
        CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
 
-       rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
+       rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
        if (rc != 0) {
                CERROR ("Can't initialise command interface (rc = %d)\n", rc);
-               kqswnal_finalise ();
-               return (rc);
+               kqswnal_shutdown (nal);
+               return (PTL_FAIL);
        }
 
-#if CONFIG_SYSCTL
-        /* Press on regardless even if registering sysctl doesn't work */
-        kqswnal_data.kqn_sysctl = register_sysctl_table (kqswnal_top_ctl_table, 0);
-#endif
-
-       PORTAL_SYMBOL_REGISTER(kqswnal_ni);
        kqswnal_data.kqn_init = KQN_INIT_ALL;
 
        printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
@@ -829,9 +740,55 @@ kqswnal_initialise (void)
               kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
               pkmem);
 
-       return (0);
+       return (PTL_OK);
 }
 
+void __exit
+kqswnal_finalise (void)
+{
+#if CONFIG_SYSCTL
+       if (kqswnal_tunables.kqn_sysctl != NULL)
+               unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
+#endif
+       PtlNIFini(kqswnal_ni);
+
+       ptl_unregister_nal(QSWNAL);
+}
+
+static int __init
+kqswnal_initialise (void)
+{
+       int   rc;
+
+       kqswnal_api.nal_ni_init = kqswnal_startup;
+       kqswnal_api.nal_ni_fini = kqswnal_shutdown;
+
+       /* Initialise dynamic tunables to defaults once only */
+       kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
+       kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
+       
+       rc = ptl_register_nal(QSWNAL, &kqswnal_api);
+       if (rc != PTL_OK) {
+               CERROR("Can't register QSWNAL: %d\n", rc);
+               return (-ENOMEM);               /* or something... */
+       }
+
+       /* Pure gateways, and the workaround for 'EKC blocks forever until
+        * the service is active' want the NAL started up at module load
+        * time... */
+       rc = PtlNIInit(QSWNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kqswnal_ni);
+       if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
+               ptl_unregister_nal(QSWNAL);
+               return (-ENODEV);
+       }
+
+#if CONFIG_SYSCTL
+        /* Press on regardless even if registering sysctl doesn't work */
+        kqswnal_tunables.kqn_sysctl = 
+               register_sysctl_table (kqswnal_top_ctl_table, 0);
+#endif
+       return (0);
+}
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
@@ -839,5 +796,3 @@ MODULE_LICENSE("GPL");
 
 module_init (kqswnal_initialise);
 module_exit (kqswnal_finalise);
-
-EXPORT_SYMBOL (kqswnal_ni);