Whamcloud - gitweb
LU-1617 build: skip generated files in .gitignore
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.c
old mode 100755 (executable)
new mode 100644 (file)
index 770bfeb..42d82f7
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -26,7 +24,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -62,11 +60,11 @@ kptllnd_ptlid2str(ptl_process_id_t id)
         unsigned long  flags;
         char          *str;
         
-        spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
+        cfs_spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags);
         str = strs[idx++];
         if (idx >= sizeof(strs)/sizeof(strs[0]))
                 idx = 0;
-        spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
+        cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags);
 
         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
         return str;
@@ -247,10 +245,16 @@ kptllnd_cksum (void *ptr, int nob)
 }
 
 void
-kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
+kptllnd_init_msg(kptl_msg_t *msg, int type,
+                 lnet_process_id_t target, int body_nob)
 {
         msg->ptlm_type = type;
         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
+        msg->ptlm_dstpid = target.pid;
+        msg->ptlm_dstnid = target.nid;
+        msg->ptlm_srcpid = the_lnet.ln_pid;
+        msg->ptlm_srcnid = kptllnd_ptl2lnetnid(target.nid,
+                                               kptllnd_data.kptl_portals_id.nid);
         
         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
 }
@@ -264,12 +268,9 @@ kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
         msg->ptlm_credits  = peer->peer_outstanding_credits;
         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
         msg->ptlm_cksum    = 0;
-        msg->ptlm_srcnid   = kptllnd_data.kptl_ni->ni_nid;
+        /* msg->ptlm_{src|dst}[pn]id Filled in kptllnd_init_msg */
         msg->ptlm_srcstamp = peer->peer_myincarnation;
-        msg->ptlm_dstnid   = peer->peer_id.nid;
         msg->ptlm_dststamp = peer->peer_incarnation;
-        msg->ptlm_srcpid   = the_lnet.ln_pid;
-        msg->ptlm_dstpid   = peer->peer_id.pid;
 
         if (*kptllnd_tunables.kptl_checksum) {
                 /* NB ptlm_cksum zero while computing cksum */
@@ -411,6 +412,7 @@ kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
 int
 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
 {
+        kptl_net_t  *net = ni->ni_data;
         struct libcfs_ioctl_data *data = arg;
         int          rc = -EINVAL;
 
@@ -420,7 +422,7 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
          * Validate that the context block is actually
          * pointing to this interface
          */
-        LASSERT (ni == kptllnd_data.kptl_ni);
+        LASSERT (ni == net->net_ni);
 
         switch(cmd) {
         case IOC_LIBCFS_DEL_PEER: {
@@ -478,42 +480,168 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
 }
 
 void
-kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when)
+kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, cfs_time_t *when)
 {
+        kptl_net_t        *net = ni->ni_data;
         kptl_peer_t       *peer = NULL;
         lnet_process_id_t  id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID};
         unsigned long      flags;
 
         /* NB: kptllnd_find_target connects to peer if necessary */
-        if (kptllnd_find_target(&peer, id) != 0)
+        if (kptllnd_find_target(net, id, &peer) != 0)
                 return;
 
-        spin_lock_irqsave(&peer->peer_lock, flags);
+        cfs_spin_lock_irqsave(&peer->peer_lock, flags);
         if (peer->peer_last_alive != 0)
-                *when = cfs_time_current_sec() -
-                        cfs_duration_sec(cfs_time_current() -
-                                         peer->peer_last_alive);
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
+                *when = peer->peer_last_alive;
+        cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
         kptllnd_peer_decref(peer);
         return;
 }
 
+void
+kptllnd_base_shutdown (void)
+{
+        int               i;
+        ptl_err_t         prc;
+        unsigned long     flags;
+        lnet_process_id_t process_id;
+
+        cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
+        LASSERT (cfs_list_empty(&kptllnd_data.kptl_nets));
+        cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        switch (kptllnd_data.kptl_init) {
+        default:
+                LBUG();
+
+        case PTLLND_INIT_ALL:
+        case PTLLND_INIT_DATA:
+                /* stop receiving */
+                kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
+                LASSERT (cfs_list_empty(&kptllnd_data.kptl_sched_rxq));
+                LASSERT (cfs_list_empty(&kptllnd_data.kptl_sched_rxbq));
+
+                /* lock to interleave cleanly with peer birth/death */
+                cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+                LASSERT (kptllnd_data.kptl_shutdown == 0);
+                kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
+                /* no new peers possible now */
+                cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+                                            flags);
+
+                /* nuke all existing peers */
+                process_id.nid = LNET_NID_ANY;
+                process_id.pid = LNET_PID_ANY;
+                kptllnd_peer_del(process_id);
+
+                cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+                LASSERT (kptllnd_data.kptl_n_active_peers == 0);
+
+                i = 2;
+                while (kptllnd_data.kptl_npeers != 0) {
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
+                               "Waiting for %d peers to terminate\n",
+                               kptllnd_data.kptl_npeers);
+
+                        cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+                                                   flags);
+
+                        cfs_pause(cfs_time_seconds(1));
+
+                        cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,
+                                              flags);
+                }
+
+                LASSERT (cfs_list_empty(&kptllnd_data.kptl_closing_peers));
+                LASSERT (cfs_list_empty(&kptllnd_data.kptl_zombie_peers));
+                LASSERT (kptllnd_data.kptl_peers != NULL);
+                for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
+                        LASSERT (cfs_list_empty (&kptllnd_data.kptl_peers[i]));
+
+                cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+                                           flags);
+                CDEBUG(D_NET, "All peers deleted\n");
+
+                /* Shutdown phase 2: kill the daemons... */
+                kptllnd_data.kptl_shutdown = 2;
+                cfs_mb();
+
+                i = 2;
+                while (cfs_atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
+                        /* Wake up all threads*/
+                        cfs_waitq_broadcast(&kptllnd_data.kptl_sched_waitq);
+                        cfs_waitq_broadcast(&kptllnd_data.kptl_watchdog_waitq);
+
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+                               "Waiting for %d threads to terminate\n",
+                               cfs_atomic_read(&kptllnd_data.kptl_nthreads));
+                        cfs_pause(cfs_time_seconds(1));
+                }
+
+                CDEBUG(D_NET, "All Threads stopped\n");
+                LASSERT(cfs_list_empty(&kptllnd_data.kptl_sched_txq));
+
+                kptllnd_cleanup_tx_descs();
+
+                /* Nothing here now, but libcfs might soon require
+                 * us to explicitly destroy wait queues and semaphores
+                 * that would be done here */
+
+                /* fall through */
+
+        case PTLLND_INIT_NOTHING:
+                CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
+                break;
+        }
+
+        if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
+                prc = PtlEQFree(kptllnd_data.kptl_eqh);
+                if (prc != PTL_OK)
+                        CERROR("Error %s(%d) freeing portals EQ\n",
+                               kptllnd_errtype2str(prc), prc);
+        }
+
+        if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
+                prc = PtlNIFini(kptllnd_data.kptl_nih);
+                if (prc != PTL_OK)
+                        CERROR("Error %s(%d) finalizing portals NI\n",
+                               kptllnd_errtype2str(prc), prc);
+        }
+
+        LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0);
+        LASSERT (cfs_list_empty(&kptllnd_data.kptl_idle_txs));
+
+        if (kptllnd_data.kptl_rx_cache != NULL)
+                cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
+
+        if (kptllnd_data.kptl_peers != NULL)
+                LIBCFS_FREE(kptllnd_data.kptl_peers,
+                            sizeof (cfs_list_t) *
+                            kptllnd_data.kptl_peer_hash_size);
+
+        if (kptllnd_data.kptl_nak_msg != NULL)
+                LIBCFS_FREE(kptllnd_data.kptl_nak_msg,
+                            offsetof(kptl_msg_t, ptlm_u));
+
+        memset(&kptllnd_data, 0, sizeof(kptllnd_data));
+        PORTAL_MODULE_UNUSE;
+        return;
+}
+
 int
-kptllnd_startup (lnet_ni_t *ni)
+kptllnd_base_startup (void)
 {
-        int             rc;
         int             i;
+        int                rc;
         int             spares;
         struct timeval  tv;
+        lnet_process_id_t  target;
         ptl_err_t       ptl_rc;
 
-        LASSERT (ni->ni_lnd == &kptllnd_lnd);
-
-        if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
-                CERROR("Only 1 instance supported\n");
-                return -EPERM;
-        }
-
         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
                 CERROR("max_procs_per_node must be >= 1\n");
                 return -EINVAL;
@@ -531,55 +659,44 @@ kptllnd_startup (lnet_ni_t *ni)
         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
 
-        /*
-         * zero pointers, flags etc
-         * put everything into a known state.
-         */
+        /* Zero pointers, flags etc; put everything into a known state. */
         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
+
+        LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
+        if (kptllnd_data.kptl_nak_msg == NULL) {
+                CERROR("Can't allocate NAK msg\n");
+                return -ENOMEM;
+        }
+        memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
+
         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
 
-        /*
-         * Setup the sched locks/lists/waitq
-         */
-        spin_lock_init(&kptllnd_data.kptl_sched_lock);
-        init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
-        INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
-        INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
-        INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
+        cfs_rwlock_init(&kptllnd_data.kptl_net_rw_lock);
+        CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_nets);
 
-        /* init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
-        spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
-
-        /*
-         * Setup the tx locks/lists
-         */
-        spin_lock_init(&kptllnd_data.kptl_tx_lock);
-        INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
-        atomic_set(&kptllnd_data.kptl_ntx, 0);
+        /* Setup the sched locks/lists/waitq */
+        cfs_spin_lock_init(&kptllnd_data.kptl_sched_lock);
+        cfs_waitq_init(&kptllnd_data.kptl_sched_waitq);
+        CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
+        CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
+        CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
 
-        /*
-         * Uptick the module reference count
-         */
-        PORTAL_MODULE_USE;
+        /* Init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
+        cfs_spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
 
-        /*
-         * Setup pointers between the ni and context data block
-         */
-        kptllnd_data.kptl_ni = ni;
-        ni->ni_data = &kptllnd_data;
+        /* Setup the tx locks/lists */
+        cfs_spin_lock_init(&kptllnd_data.kptl_tx_lock);
+        CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
+        cfs_atomic_set(&kptllnd_data.kptl_ntx, 0);
 
-        /*
-         * Setup Credits
-         */
-        ni->ni_maxtxcredits   = *kptllnd_tunables.kptl_credits;
-        ni->ni_peertxcredits  = *kptllnd_tunables.kptl_peertxcredits;
-        ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
+        /* Uptick the module reference count */
+        PORTAL_MODULE_USE;
 
         kptllnd_data.kptl_expected_peers =
                 *kptllnd_tunables.kptl_max_nodes *
                 *kptllnd_tunables.kptl_max_procs_per_node;
-        
+
         /*
          * Initialize the Network interface instance
          * We use the default because we don't have any
@@ -619,9 +736,7 @@ kptllnd_startup (lnet_ni_t *ni)
                 goto failed;
         }
 
-        /*
-         * Fetch the lower NID
-         */
+        /* Fetch the lower NID */
         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
                           &kptllnd_data.kptl_portals_id);
         if (ptl_rc != PTL_OK) {
@@ -640,34 +755,33 @@ kptllnd_startup (lnet_ni_t *ni)
                 goto failed;
         }
 
-        ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
-
-        CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", 
-               kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
-               libcfs_nid2str(ni->ni_nid));
-
         /* Initialized the incarnation - it must be for-all-time unique, even
          * accounting for the fact that we increment it when we disconnect a
          * peer that's using it */
-        do_gettimeofday(&tv);
+        cfs_gettimeofday(&tv);
         kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +
                                         tv.tv_usec;
         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
 
-        /*
-         * Allocate and setup the peer hash table
-         */
-        rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
-        init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
-        atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
-        INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
-        INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
+        target.nid = LNET_NID_ANY;
+        target.pid = LNET_PID_ANY; /* NB target for NAK doesn't matter */
+        kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, target, 0);
+        kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
+        kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
+        kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
+        kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
 
+        cfs_rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
+        cfs_waitq_init(&kptllnd_data.kptl_watchdog_waitq);
+        CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
+        CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
+
+        /* Allocate and setup the peer hash table */
         kptllnd_data.kptl_peer_hash_size =
                 *kptllnd_tunables.kptl_peer_hash_table_size;
         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
-                     (kptllnd_data.kptl_peer_hash_size * 
-                      sizeof(struct list_head)));
+                     sizeof(cfs_list_t) *
+                     kptllnd_data.kptl_peer_hash_size);
         if (kptllnd_data.kptl_peers == NULL) {
                 CERROR("Failed to allocate space for peer hash table size=%d\n",
                         kptllnd_data.kptl_peer_hash_size);
@@ -675,27 +789,11 @@ kptllnd_startup (lnet_ni_t *ni)
                 goto failed;
         }
         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
-                INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
-
-        LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
-        if (kptllnd_data.kptl_nak_msg == NULL) {
-                CERROR("Can't allocate NAK msg\n");
-                rc = -ENOMEM;
-                goto failed;
-        }
-        memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
-        kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
-        kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
-        kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
-        kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
-        kptllnd_data.kptl_nak_msg->ptlm_srcnid   = ni->ni_nid;
-        kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
-        kptllnd_data.kptl_nak_msg->ptlm_dstpid   = LNET_PID_ANY;
-        kptllnd_data.kptl_nak_msg->ptlm_dstnid   = LNET_NID_ANY;
+                CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
 
         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
 
-        kptllnd_data.kptl_rx_cache = 
+        kptllnd_data.kptl_rx_cache =
                 cfs_mem_cache_create("ptllnd_rx",
                                      sizeof(kptl_rx_t) + 
                                      *kptllnd_tunables.kptl_max_msg_size,
@@ -721,7 +819,7 @@ kptllnd_startup (lnet_ni_t *ni)
         
         /* Start the scheduler threads for handling incoming requests.  No need
          * to advance the state because this will be automatically cleaned up
-         * now that PTLNAT_INIT_DATA state has been entered */
+         * now that PTLLND_INIT_DATA state has been entered */
         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
         for (i = 0; i < PTLLND_N_SCHED; i++) {
                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
@@ -759,152 +857,114 @@ kptllnd_startup (lnet_ni_t *ni)
 
         if (*kptllnd_tunables.kptl_checksum)
                 CWARN("Checksumming enabled\n");
-        
-        CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
+
+        CDEBUG(D_NET, "<<< kptllnd_base_startup SUCCESS\n");
         return 0;
 
  failed:
-        CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
-        kptllnd_shutdown(ni);
+        CERROR("kptllnd_base_startup failed: %d\n", rc);
+        kptllnd_base_shutdown();
         return rc;
 }
 
-void
-kptllnd_shutdown (lnet_ni_t *ni)
+int
+kptllnd_startup (lnet_ni_t *ni)
 {
-        int               i;
-        ptl_err_t         prc;
-        lnet_process_id_t process_id;
-        unsigned long     flags;
-
-        CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
-               atomic_read (&libcfs_kmemory));
+        int         rc;
+        kptl_net_t *net;
 
-        LASSERT (ni == kptllnd_data.kptl_ni);
+        LASSERT (ni->ni_lnd == &kptllnd_lnd);
 
-        switch (kptllnd_data.kptl_init) {
-        default:
-                LBUG();
+        if (kptllnd_data.kptl_init == PTLLND_INIT_NOTHING) {
+                rc = kptllnd_base_startup();
+                if (rc != 0)
+                        return rc;
+        }
 
-        case PTLLND_INIT_ALL:
-        case PTLLND_INIT_DATA:
-                /* Stop receiving */
-                kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
-                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
-                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
+        LIBCFS_ALLOC(net, sizeof(*net));
+        ni->ni_data = net;
+        if (net == NULL) {
+                CERROR("Can't allocate kptl_net_t\n");
+                rc = -ENOMEM;
+                goto failed;
+        }
+        memset(net, 0, sizeof(*net));
+        net->net_ni = ni;
 
-                /* Hold peertable lock to interleave cleanly with peer birth/death */
-                write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        ni->ni_maxtxcredits   = *kptllnd_tunables.kptl_credits;
+        ni->ni_peertxcredits  = *kptllnd_tunables.kptl_peertxcredits;
+        ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
+        ni->ni_nid = kptllnd_ptl2lnetnid(ni->ni_nid,
+                                         kptllnd_data.kptl_portals_id.nid);
+        CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",
+               kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
+               libcfs_nid2str(ni->ni_nid));
 
-                LASSERT (kptllnd_data.kptl_shutdown == 0);
-                kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
+        /* NB LNET_NIDNET(ptlm_srcnid) of NAK doesn't matter in case of
+         * multiple NIs */
+        kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid;
 
-                /* no new peers possible now */
-                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
-                                        flags);
+        cfs_atomic_set(&net->net_refcount, 1);
+        cfs_write_lock(&kptllnd_data.kptl_net_rw_lock);
+        cfs_list_add_tail(&net->net_list, &kptllnd_data.kptl_nets);
+        cfs_write_unlock(&kptllnd_data.kptl_net_rw_lock);
+        return 0;
 
-                /* nuke all existing peers */
-                process_id.nid = LNET_NID_ANY;
-                process_id.pid = LNET_PID_ANY;
-                kptllnd_peer_del(process_id);
+ failed:
+        kptllnd_shutdown(ni);
+        return rc;
+}
 
-                read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+void
+kptllnd_shutdown (lnet_ni_t *ni)
+{
+        kptl_net_t    *net = ni->ni_data;
+        int               i;
+        unsigned long     flags;
 
-                LASSERT (kptllnd_data.kptl_n_active_peers == 0);
+        LASSERT (kptllnd_data.kptl_init == PTLLND_INIT_ALL);
 
-                i = 2;
-                while (kptllnd_data.kptl_npeers != 0) {
+        CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
+               cfs_atomic_read (&libcfs_kmemory));
+
+        if (net == NULL)
+                goto out;
+
+        LASSERT (ni == net->net_ni);
+        LASSERT (!net->net_shutdown);
+        LASSERT (!cfs_list_empty(&net->net_list));
+        LASSERT (cfs_atomic_read(&net->net_refcount) != 0);
+        ni->ni_data = NULL;
+        net->net_ni = NULL;
+
+        cfs_write_lock(&kptllnd_data.kptl_net_rw_lock);
+        kptllnd_net_decref(net);
+        cfs_list_del_init(&net->net_list);
+        cfs_write_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        /* Can't nuke peers here - they are shared among all NIs */
+        cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        net->net_shutdown = 1;   /* Order with peer creation */
+        cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+        i = 2;
+        while (cfs_atomic_read(&net->net_refcount) != 0) {
                         i++;
                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
-                               "Waiting for %d peers to terminate\n",
-                               kptllnd_data.kptl_npeers);
-
-                        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
-                                               flags);
-
-                        cfs_pause(cfs_time_seconds(1));
-
-                        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, 
-                                          flags);
-                }
-
-                LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
-                LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
-                LASSERT (kptllnd_data.kptl_peers != NULL);
-                for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
-                        LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
-
-                read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
-                CDEBUG(D_NET, "All peers deleted\n");
-
-                /* Shutdown phase 2: kill the daemons... */
-                kptllnd_data.kptl_shutdown = 2;
-                mb();
-                
-                i = 2;
-                while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
-                        /* Wake up all threads*/
-                        wake_up_all(&kptllnd_data.kptl_sched_waitq);
-                        wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
+                       "Waiting for %d references to drop\n",
+                       cfs_atomic_read(&net->net_refcount));
 
-                        i++;
-                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
-                               "Waiting for %d threads to terminate\n",
-                               atomic_read(&kptllnd_data.kptl_nthreads));
-                        cfs_pause(cfs_time_seconds(1));
+                       cfs_pause(cfs_time_seconds(1));
                 }
 
-                CDEBUG(D_NET, "All Threads stopped\n");
-                LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
-
-                kptllnd_cleanup_tx_descs();
-
-                /* Nothing here now, but libcfs might soon require
-                 * us to explicitly destroy wait queues and semaphores
-                 * that would be done here */
-
-                /* fall through */
-
-        case PTLLND_INIT_NOTHING:
-                CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
-                break;
-        }
-
-        if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
-                prc = PtlEQFree(kptllnd_data.kptl_eqh);
-                if (prc != PTL_OK)
-                        CERROR("Error %s(%d) freeing portals EQ\n",
-                               kptllnd_errtype2str(prc), prc);
-        }
-
-        if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
-                prc = PtlNIFini(kptllnd_data.kptl_nih);
-                if (prc != PTL_OK)
-                        CERROR("Error %s(%d) finalizing portals NI\n",
-                               kptllnd_errtype2str(prc), prc);
-        }
-        
-        LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
-        LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
-
-        if (kptllnd_data.kptl_rx_cache != NULL)
-                cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
-
-        if (kptllnd_data.kptl_peers != NULL)
-                LIBCFS_FREE (kptllnd_data.kptl_peers,
-                             sizeof (struct list_head) *
-                             kptllnd_data.kptl_peer_hash_size);
-
-        if (kptllnd_data.kptl_nak_msg != NULL)
-                LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
-                             offsetof(kptl_msg_t, ptlm_u));
-
-        memset(&kptllnd_data, 0, sizeof(kptllnd_data));
-
+        LIBCFS_FREE(net, sizeof(*net));
+out:
+        /* NB no locking since I don't race with writers */
+        if (cfs_list_empty(&kptllnd_data.kptl_nets))
+                kptllnd_base_shutdown();
         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
-               atomic_read (&libcfs_kmemory));
-
-        PORTAL_MODULE_UNUSE;
+               cfs_atomic_read (&libcfs_kmemory));
+        return;
 }
 
 int __init