Whamcloud - gitweb
i=liang,b=21103,b=19735:
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.c
index 770bfeb..6562561 100755 (executable)
@@ -247,10 +247,16 @@ kptllnd_cksum (void *ptr, int nob)
 }
 
 void
-kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
+kptllnd_init_msg(kptl_msg_t *msg, int type,
+                 lnet_process_id_t target, int body_nob)
 {
         msg->ptlm_type = type;
         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
+        msg->ptlm_dstpid = target.pid;
+        msg->ptlm_dstnid = target.nid;
+        msg->ptlm_srcpid = the_lnet.ln_pid;
+        msg->ptlm_srcnid = kptllnd_ptl2lnetnid(target.nid,
+                                               kptllnd_data.kptl_portals_id.nid);
         
         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
 }
@@ -264,12 +270,9 @@ kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
         msg->ptlm_credits  = peer->peer_outstanding_credits;
         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
         msg->ptlm_cksum    = 0;
-        msg->ptlm_srcnid   = kptllnd_data.kptl_ni->ni_nid;
+        /* msg->ptlm_{src|dst}[pn]id Filled in kptllnd_init_msg */
         msg->ptlm_srcstamp = peer->peer_myincarnation;
-        msg->ptlm_dstnid   = peer->peer_id.nid;
         msg->ptlm_dststamp = peer->peer_incarnation;
-        msg->ptlm_srcpid   = the_lnet.ln_pid;
-        msg->ptlm_dstpid   = peer->peer_id.pid;
 
         if (*kptllnd_tunables.kptl_checksum) {
                 /* NB ptlm_cksum zero while computing cksum */
@@ -411,6 +414,7 @@ kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
 int
 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
 {
+        kptl_net_t  *net = ni->ni_data;
         struct libcfs_ioctl_data *data = arg;
         int          rc = -EINVAL;
 
@@ -420,7 +424,7 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
          * Validate that the context block is actually
          * pointing to this interface
          */
-        LASSERT (ni == kptllnd_data.kptl_ni);
+        LASSERT (ni == net->net_ni);
 
         switch(cmd) {
         case IOC_LIBCFS_DEL_PEER: {
@@ -480,12 +484,13 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
 void
 kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when)
 {
+        kptl_net_t        *net = ni->ni_data;
         kptl_peer_t       *peer = NULL;
         lnet_process_id_t  id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID};
         unsigned long      flags;
 
         /* NB: kptllnd_find_target connects to peer if necessary */
-        if (kptllnd_find_target(&peer, id) != 0)
+        if (kptllnd_find_target(net, id, &peer) != 0)
                 return;
 
         spin_lock_irqsave(&peer->peer_lock, flags);
@@ -498,22 +503,147 @@ kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when)
         return;
 }
 
+void
+kptllnd_base_shutdown (void)
+{
+        int               i;
+        ptl_err_t         prc;
+        unsigned long     flags;
+        lnet_process_id_t process_id;
+
+        read_lock(&kptllnd_data.kptl_net_rw_lock);
+        LASSERT (list_empty(&kptllnd_data.kptl_nets));
+        read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        switch (kptllnd_data.kptl_init) {
+        default:
+                LBUG();
+
+        case PTLLND_INIT_ALL:
+        case PTLLND_INIT_DATA:
+                /* stop receiving */
+                kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
+                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
+                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
+
+                /* lock to interleave cleanly with peer birth/death */
+                write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+                LASSERT (kptllnd_data.kptl_shutdown == 0);
+                kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
+                /* no new peers possible now */
+                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+                /* nuke all existing peers */
+                process_id.nid = LNET_NID_ANY;
+                process_id.pid = LNET_PID_ANY;
+                kptllnd_peer_del(process_id);
+
+                read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+                LASSERT (kptllnd_data.kptl_n_active_peers == 0);
+
+                i = 2;
+                while (kptllnd_data.kptl_npeers != 0) {
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
+                               "Waiting for %d peers to terminate\n",
+                               kptllnd_data.kptl_npeers);
+
+                        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+                                               flags);
+
+                        cfs_pause(cfs_time_seconds(1));
+
+                        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,
+                                          flags);
+                }
+
+                LASSERT (list_empty(&kptllnd_data.kptl_closing_peers));
+                LASSERT (list_empty(&kptllnd_data.kptl_zombie_peers));
+                LASSERT (kptllnd_data.kptl_peers != NULL);
+                for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
+                        LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
+
+                read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+                CDEBUG(D_NET, "All peers deleted\n");
+
+                /* Shutdown phase 2: kill the daemons... */
+                kptllnd_data.kptl_shutdown = 2;
+                mb();
+
+                i = 2;
+                while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
+                        /* Wake up all threads*/
+                        wake_up_all(&kptllnd_data.kptl_sched_waitq);
+                        wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
+
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+                               "Waiting for %d threads to terminate\n",
+                               atomic_read(&kptllnd_data.kptl_nthreads));
+                        cfs_pause(cfs_time_seconds(1));
+                }
+
+                CDEBUG(D_NET, "All Threads stopped\n");
+                LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
+
+                kptllnd_cleanup_tx_descs();
+
+                /* Nothing here now, but libcfs might soon require
+                 * us to explicitly destroy wait queues and semaphores
+                 * that would be done here */
+
+                /* fall through */
+
+        case PTLLND_INIT_NOTHING:
+                CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
+                break;
+        }
+
+        if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
+                prc = PtlEQFree(kptllnd_data.kptl_eqh);
+                if (prc != PTL_OK)
+                        CERROR("Error %s(%d) freeing portals EQ\n",
+                               kptllnd_errtype2str(prc), prc);
+        }
+
+        if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
+                prc = PtlNIFini(kptllnd_data.kptl_nih);
+                if (prc != PTL_OK)
+                        CERROR("Error %s(%d) finalizing portals NI\n",
+                               kptllnd_errtype2str(prc), prc);
+        }
+
+        LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
+        LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
+
+        if (kptllnd_data.kptl_rx_cache != NULL)
+                cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
+
+        if (kptllnd_data.kptl_peers != NULL)
+                LIBCFS_FREE(kptllnd_data.kptl_peers,
+                            sizeof (struct list_head) *
+                            kptllnd_data.kptl_peer_hash_size);
+
+        if (kptllnd_data.kptl_nak_msg != NULL)
+                LIBCFS_FREE(kptllnd_data.kptl_nak_msg,
+                            offsetof(kptl_msg_t, ptlm_u));
+
+        memset(&kptllnd_data, 0, sizeof(kptllnd_data));
+        PORTAL_MODULE_UNUSE;
+        return;
+}
+
 int
-kptllnd_startup (lnet_ni_t *ni)
+kptllnd_base_startup (void)
 {
-        int             rc;
         int             i;
+        int                rc;
         int             spares;
         struct timeval  tv;
+        lnet_process_id_t  target;
         ptl_err_t       ptl_rc;
 
-        LASSERT (ni->ni_lnd == &kptllnd_lnd);
-
-        if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
-                CERROR("Only 1 instance supported\n");
-                return -EPERM;
-        }
-
         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
                 CERROR("max_procs_per_node must be >= 1\n");
                 return -EINVAL;
@@ -531,51 +661,40 @@ kptllnd_startup (lnet_ni_t *ni)
         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
 
-        /*
-         * zero pointers, flags etc
-         * put everything into a known state.
-         */
+        /* Zero pointers, flags etc; put everything into a known state. */
         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
+
+        LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
+        if (kptllnd_data.kptl_nak_msg == NULL) {
+                CERROR("Can't allocate NAK msg\n");
+                return -ENOMEM;
+        }
+        memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
+
         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
 
-        /*
-         * Setup the sched locks/lists/waitq
-         */
+        rwlock_init(&kptllnd_data.kptl_net_rw_lock);
+        INIT_LIST_HEAD(&kptllnd_data.kptl_nets);
+
+        /* Setup the sched locks/lists/waitq */
         spin_lock_init(&kptllnd_data.kptl_sched_lock);
         init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
 
-        /* init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
+        /* Init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
         spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
 
-        /*
-         * Setup the tx locks/lists
-         */
+        /* Setup the tx locks/lists */
         spin_lock_init(&kptllnd_data.kptl_tx_lock);
         INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
         atomic_set(&kptllnd_data.kptl_ntx, 0);
 
-        /*
-         * Uptick the module reference count
-         */
+        /* Uptick the module reference count */
         PORTAL_MODULE_USE;
 
-        /*
-         * Setup pointers between the ni and context data block
-         */
-        kptllnd_data.kptl_ni = ni;
-        ni->ni_data = &kptllnd_data;
-
-        /*
-         * Setup Credits
-         */
-        ni->ni_maxtxcredits   = *kptllnd_tunables.kptl_credits;
-        ni->ni_peertxcredits  = *kptllnd_tunables.kptl_peertxcredits;
-        ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
-
         kptllnd_data.kptl_expected_peers =
                 *kptllnd_tunables.kptl_max_nodes *
                 *kptllnd_tunables.kptl_max_procs_per_node;
@@ -619,9 +738,7 @@ kptllnd_startup (lnet_ni_t *ni)
                 goto failed;
         }
 
-        /*
-         * Fetch the lower NID
-         */
+        /* Fetch the lower NID */
         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
                           &kptllnd_data.kptl_portals_id);
         if (ptl_rc != PTL_OK) {
@@ -640,12 +757,6 @@ kptllnd_startup (lnet_ni_t *ni)
                 goto failed;
         }
 
-        ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
-
-        CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", 
-               kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
-               libcfs_nid2str(ni->ni_nid));
-
         /* Initialized the incarnation - it must be for-all-time unique, even
          * accounting for the fact that we increment it when we disconnect a
          * peer that's using it */
@@ -654,20 +765,26 @@ kptllnd_startup (lnet_ni_t *ni)
                                         tv.tv_usec;
         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
 
-        /*
-         * Allocate and setup the peer hash table
-         */
+        target.nid = LNET_NID_ANY;
+        target.pid = LNET_PID_ANY; /* NB target for NAK doesn't matter */
+        kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, target, 0);
+        kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
+        kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
+        kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
+        kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
+
         rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
         init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
         atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
         INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
         INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
 
+        /* Allocate and setup the peer hash table */
         kptllnd_data.kptl_peer_hash_size =
                 *kptllnd_tunables.kptl_peer_hash_table_size;
         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
-                     (kptllnd_data.kptl_peer_hash_size * 
-                      sizeof(struct list_head)));
+                     sizeof(struct list_head) *
+                     kptllnd_data.kptl_peer_hash_size);
         if (kptllnd_data.kptl_peers == NULL) {
                 CERROR("Failed to allocate space for peer hash table size=%d\n",
                         kptllnd_data.kptl_peer_hash_size);
@@ -677,22 +794,6 @@ kptllnd_startup (lnet_ni_t *ni)
         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
                 INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
 
-        LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
-        if (kptllnd_data.kptl_nak_msg == NULL) {
-                CERROR("Can't allocate NAK msg\n");
-                rc = -ENOMEM;
-                goto failed;
-        }
-        memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
-        kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
-        kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
-        kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
-        kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
-        kptllnd_data.kptl_nak_msg->ptlm_srcnid   = ni->ni_nid;
-        kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
-        kptllnd_data.kptl_nak_msg->ptlm_dstpid   = LNET_PID_ANY;
-        kptllnd_data.kptl_nak_msg->ptlm_dstnid   = LNET_NID_ANY;
-
         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
 
         kptllnd_data.kptl_rx_cache = 
@@ -721,7 +822,7 @@ kptllnd_startup (lnet_ni_t *ni)
         
         /* Start the scheduler threads for handling incoming requests.  No need
          * to advance the state because this will be automatically cleaned up
-         * now that PTLNAT_INIT_DATA state has been entered */
+         * now that PTLLND_INIT_DATA state has been entered */
         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
         for (i = 0; i < PTLLND_N_SCHED; i++) {
                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
@@ -760,11 +861,59 @@ kptllnd_startup (lnet_ni_t *ni)
         if (*kptllnd_tunables.kptl_checksum)
                 CWARN("Checksumming enabled\n");
         
-        CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
+        CDEBUG(D_NET, "<<< kptllnd_base_startup SUCCESS\n");
+        return 0;
+
+ failed:
+        CERROR("kptllnd_base_startup failed: %d\n", rc);
+        kptllnd_base_shutdown();
+        return rc;
+}
+
+int
+kptllnd_startup (lnet_ni_t *ni)
+{
+        int         rc;
+        kptl_net_t *net;
+
+        LASSERT (ni->ni_lnd == &kptllnd_lnd);
+
+        if (kptllnd_data.kptl_init == PTLLND_INIT_NOTHING) {
+                rc = kptllnd_base_startup();
+                if (rc != 0)
+                        return rc;
+        }
+
+        LIBCFS_ALLOC(net, sizeof(*net));
+        ni->ni_data = net;
+        if (net == NULL) {
+                CERROR("Can't allocate kptl_net_t\n");
+                rc = -ENOMEM;
+                goto failed;
+        }
+        memset(net, 0, sizeof(*net));
+        net->net_ni = ni;
+
+        ni->ni_maxtxcredits   = *kptllnd_tunables.kptl_credits;
+        ni->ni_peertxcredits  = *kptllnd_tunables.kptl_peertxcredits;
+        ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
+        ni->ni_nid = kptllnd_ptl2lnetnid(ni->ni_nid,
+                                         kptllnd_data.kptl_portals_id.nid);
+        CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",
+               kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
+               libcfs_nid2str(ni->ni_nid));
+
+        /* NB LNET_NIDNET(ptlm_srcnid) of NAK doesn't matter in case of
+         * multiple NIs */
+        kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid;
+
+        atomic_set(&net->net_refcount, 1);
+        write_lock(&kptllnd_data.kptl_net_rw_lock);
+        list_add_tail(&net->net_list, &kptllnd_data.kptl_nets);
+        write_unlock(&kptllnd_data.kptl_net_rw_lock);
         return 0;
 
  failed:
-        CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
         kptllnd_shutdown(ni);
         return rc;
 }
@@ -772,139 +921,53 @@ kptllnd_startup (lnet_ni_t *ni)
 void
 kptllnd_shutdown (lnet_ni_t *ni)
 {
+        kptl_net_t    *net = ni->ni_data;
         int               i;
-        ptl_err_t         prc;
-        lnet_process_id_t process_id;
         unsigned long     flags;
 
+        LASSERT (kptllnd_data.kptl_init == PTLLND_INIT_ALL);
+
         CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
                atomic_read (&libcfs_kmemory));
 
-        LASSERT (ni == kptllnd_data.kptl_ni);
+        if (net == NULL)
+                goto out;
 
-        switch (kptllnd_data.kptl_init) {
-        default:
-                LBUG();
+        LASSERT (ni == net->net_ni);
+        LASSERT (!net->net_shutdown);
+        LASSERT (!list_empty(&net->net_list));
+        LASSERT (atomic_read(&net->net_refcount) != 0);
+        ni->ni_data = NULL;
+        net->net_ni = NULL;
 
-        case PTLLND_INIT_ALL:
-        case PTLLND_INIT_DATA:
-                /* Stop receiving */
-                kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
-                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
-                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
+        write_lock(&kptllnd_data.kptl_net_rw_lock);
+        kptllnd_net_decref(net);
+        list_del_init(&net->net_list);
+        write_unlock(&kptllnd_data.kptl_net_rw_lock);
 
-                /* Hold peertable lock to interleave cleanly with peer birth/death */
+        /* Can't nuke peers here - they are shared among all NIs */
                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
-
-                LASSERT (kptllnd_data.kptl_shutdown == 0);
-                kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
-
-                /* no new peers possible now */
-                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
-                                        flags);
-
-                /* nuke all existing peers */
-                process_id.nid = LNET_NID_ANY;
-                process_id.pid = LNET_PID_ANY;
-                kptllnd_peer_del(process_id);
-
-                read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
-
-                LASSERT (kptllnd_data.kptl_n_active_peers == 0);
+        net->net_shutdown = 1;   /* Order with peer creation */
+        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
 
                 i = 2;
-                while (kptllnd_data.kptl_npeers != 0) {
+        while (atomic_read(&net->net_refcount) != 0) {
                         i++;
                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
-                               "Waiting for %d peers to terminate\n",
-                               kptllnd_data.kptl_npeers);
-
-                        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
-                                               flags);
+                       "Waiting for %d references to drop\n",
+                       atomic_read(&net->net_refcount));
 
                         cfs_pause(cfs_time_seconds(1));
-
-                        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, 
-                                          flags);
                 }
 
-                LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
-                LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
-                LASSERT (kptllnd_data.kptl_peers != NULL);
-                for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
-                        LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
-
-                read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
-                CDEBUG(D_NET, "All peers deleted\n");
-
-                /* Shutdown phase 2: kill the daemons... */
-                kptllnd_data.kptl_shutdown = 2;
-                mb();
-                
-                i = 2;
-                while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
-                        /* Wake up all threads*/
-                        wake_up_all(&kptllnd_data.kptl_sched_waitq);
-                        wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
-
-                        i++;
-                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
-                               "Waiting for %d threads to terminate\n",
-                               atomic_read(&kptllnd_data.kptl_nthreads));
-                        cfs_pause(cfs_time_seconds(1));
-                }
-
-                CDEBUG(D_NET, "All Threads stopped\n");
-                LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
-
-                kptllnd_cleanup_tx_descs();
-
-                /* Nothing here now, but libcfs might soon require
-                 * us to explicitly destroy wait queues and semaphores
-                 * that would be done here */
-
-                /* fall through */
-
-        case PTLLND_INIT_NOTHING:
-                CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
-                break;
-        }
-
-        if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
-                prc = PtlEQFree(kptllnd_data.kptl_eqh);
-                if (prc != PTL_OK)
-                        CERROR("Error %s(%d) freeing portals EQ\n",
-                               kptllnd_errtype2str(prc), prc);
-        }
-
-        if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
-                prc = PtlNIFini(kptllnd_data.kptl_nih);
-                if (prc != PTL_OK)
-                        CERROR("Error %s(%d) finalizing portals NI\n",
-                               kptllnd_errtype2str(prc), prc);
-        }
-        
-        LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
-        LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
-
-        if (kptllnd_data.kptl_rx_cache != NULL)
-                cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
-
-        if (kptllnd_data.kptl_peers != NULL)
-                LIBCFS_FREE (kptllnd_data.kptl_peers,
-                             sizeof (struct list_head) *
-                             kptllnd_data.kptl_peer_hash_size);
-
-        if (kptllnd_data.kptl_nak_msg != NULL)
-                LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
-                             offsetof(kptl_msg_t, ptlm_u));
-
-        memset(&kptllnd_data, 0, sizeof(kptllnd_data));
-
+        LIBCFS_FREE(net, sizeof(*net));
+out:
+        /* NB no locking since I don't race with writers */
+        if (list_empty(&kptllnd_data.kptl_nets))
+                kptllnd_base_shutdown();
         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
                atomic_read (&libcfs_kmemory));
-
-        PORTAL_MODULE_UNUSE;
+        return;
 }
 
 int __init