Whamcloud - gitweb
i=liang,b=21103,b=19735:
authorisaac <isaac>
Tue, 1 Dec 2009 13:53:55 +0000 (13:53 +0000)
committerisaac <isaac>
Tue, 1 Dec 2009 13:53:55 +0000 (13:53 +0000)
- multiple-instance support for kptllnd.

lnet/ChangeLog
lnet/include/lnet/ptllnd.h
lnet/klnds/ptllnd/ptllnd.c
lnet/klnds/ptllnd/ptllnd.h
lnet/klnds/ptllnd/ptllnd_cb.c
lnet/klnds/ptllnd/ptllnd_peer.c
lnet/klnds/ptllnd/ptllnd_rx_buf.c
lnet/klnds/ptllnd/ptllnd_tx.c

index 0f430a1..84f18b2 100644 (file)
@@ -17,6 +17,10 @@ Bugzilla   :
 Description: 
 Details    : 
 
+Severity   : enhancement
+Bugzilla   : 19735
+Description: multiple-instance support for kptllnd
+
 Severity   : normal
 Bugzilla   : 20897
 Description: ksocknal_close_conn_locked connection race
index 33fcea1..509625e 100755 (executable)
@@ -75,7 +75,7 @@
 
 /*
  * The Cray Portals has no maximum number of IOVs.  The
- * maximum is limited only my memory and size of the
+ * maximum is limited only by memory and size of the
  * int parameters (2^31-1).
  * Lustre only really require that the underyling
  * implemenation to support at least LNET_MAX_IOV,
index 770bfeb..6562561 100755 (executable)
@@ -247,10 +247,16 @@ kptllnd_cksum (void *ptr, int nob)
 }
 
 void
-kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
+kptllnd_init_msg(kptl_msg_t *msg, int type,
+                 lnet_process_id_t target, int body_nob)
 {
         msg->ptlm_type = type;
         msg->ptlm_nob  = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
+        msg->ptlm_dstpid = target.pid;
+        msg->ptlm_dstnid = target.nid;
+        msg->ptlm_srcpid = the_lnet.ln_pid;
+        msg->ptlm_srcnid = kptllnd_ptl2lnetnid(target.nid,
+                                               kptllnd_data.kptl_portals_id.nid);
         
         LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
 }
@@ -264,12 +270,9 @@ kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer)
         msg->ptlm_credits  = peer->peer_outstanding_credits;
         /* msg->ptlm_nob   Filled in kptllnd_init_msg()  */
         msg->ptlm_cksum    = 0;
-        msg->ptlm_srcnid   = kptllnd_data.kptl_ni->ni_nid;
+        /* msg->ptlm_{src|dst}[pn]id Filled in kptllnd_init_msg */
         msg->ptlm_srcstamp = peer->peer_myincarnation;
-        msg->ptlm_dstnid   = peer->peer_id.nid;
         msg->ptlm_dststamp = peer->peer_incarnation;
-        msg->ptlm_srcpid   = the_lnet.ln_pid;
-        msg->ptlm_dstpid   = peer->peer_id.pid;
 
         if (*kptllnd_tunables.kptl_checksum) {
                 /* NB ptlm_cksum zero while computing cksum */
@@ -411,6 +414,7 @@ kptllnd_msg_unpack(kptl_msg_t *msg, int nob)
 int
 kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
 {
+        kptl_net_t  *net = ni->ni_data;
         struct libcfs_ioctl_data *data = arg;
         int          rc = -EINVAL;
 
@@ -420,7 +424,7 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
          * Validate that the context block is actually
          * pointing to this interface
          */
-        LASSERT (ni == kptllnd_data.kptl_ni);
+        LASSERT (ni == net->net_ni);
 
         switch(cmd) {
         case IOC_LIBCFS_DEL_PEER: {
@@ -480,12 +484,13 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
 void
 kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when)
 {
+        kptl_net_t        *net = ni->ni_data;
         kptl_peer_t       *peer = NULL;
         lnet_process_id_t  id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID};
         unsigned long      flags;
 
         /* NB: kptllnd_find_target connects to peer if necessary */
-        if (kptllnd_find_target(&peer, id) != 0)
+        if (kptllnd_find_target(net, id, &peer) != 0)
                 return;
 
         spin_lock_irqsave(&peer->peer_lock, flags);
@@ -498,22 +503,147 @@ kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when)
         return;
 }
 
+void
+kptllnd_base_shutdown (void)
+{
+        int               i;
+        ptl_err_t         prc;
+        unsigned long     flags;
+        lnet_process_id_t process_id;
+
+        read_lock(&kptllnd_data.kptl_net_rw_lock);
+        LASSERT (list_empty(&kptllnd_data.kptl_nets));
+        read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        switch (kptllnd_data.kptl_init) {
+        default:
+                LBUG();
+
+        case PTLLND_INIT_ALL:
+        case PTLLND_INIT_DATA:
+                /* stop receiving */
+                kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
+                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
+                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
+
+                /* lock to interleave cleanly with peer birth/death */
+                write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+                LASSERT (kptllnd_data.kptl_shutdown == 0);
+                kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
+                /* no new peers possible now */
+                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+                /* nuke all existing peers */
+                process_id.nid = LNET_NID_ANY;
+                process_id.pid = LNET_PID_ANY;
+                kptllnd_peer_del(process_id);
+
+                read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+                LASSERT (kptllnd_data.kptl_n_active_peers == 0);
+
+                i = 2;
+                while (kptllnd_data.kptl_npeers != 0) {
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
+                               "Waiting for %d peers to terminate\n",
+                               kptllnd_data.kptl_npeers);
+
+                        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+                                               flags);
+
+                        cfs_pause(cfs_time_seconds(1));
+
+                        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,
+                                          flags);
+                }
+
+                LASSERT (list_empty(&kptllnd_data.kptl_closing_peers));
+                LASSERT (list_empty(&kptllnd_data.kptl_zombie_peers));
+                LASSERT (kptllnd_data.kptl_peers != NULL);
+                for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
+                        LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
+
+                read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+                CDEBUG(D_NET, "All peers deleted\n");
+
+                /* Shutdown phase 2: kill the daemons... */
+                kptllnd_data.kptl_shutdown = 2;
+                mb();
+
+                i = 2;
+                while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
+                        /* Wake up all threads*/
+                        wake_up_all(&kptllnd_data.kptl_sched_waitq);
+                        wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
+
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+                               "Waiting for %d threads to terminate\n",
+                               atomic_read(&kptllnd_data.kptl_nthreads));
+                        cfs_pause(cfs_time_seconds(1));
+                }
+
+                CDEBUG(D_NET, "All Threads stopped\n");
+                LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
+
+                kptllnd_cleanup_tx_descs();
+
+                /* Nothing here now, but libcfs might soon require
+                 * us to explicitly destroy wait queues and semaphores
+                 * that would be done here */
+
+                /* fall through */
+
+        case PTLLND_INIT_NOTHING:
+                CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
+                break;
+        }
+
+        if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
+                prc = PtlEQFree(kptllnd_data.kptl_eqh);
+                if (prc != PTL_OK)
+                        CERROR("Error %s(%d) freeing portals EQ\n",
+                               kptllnd_errtype2str(prc), prc);
+        }
+
+        if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
+                prc = PtlNIFini(kptllnd_data.kptl_nih);
+                if (prc != PTL_OK)
+                        CERROR("Error %s(%d) finalizing portals NI\n",
+                               kptllnd_errtype2str(prc), prc);
+        }
+
+        LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
+        LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
+
+        if (kptllnd_data.kptl_rx_cache != NULL)
+                cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
+
+        if (kptllnd_data.kptl_peers != NULL)
+                LIBCFS_FREE(kptllnd_data.kptl_peers,
+                            sizeof (struct list_head) *
+                            kptllnd_data.kptl_peer_hash_size);
+
+        if (kptllnd_data.kptl_nak_msg != NULL)
+                LIBCFS_FREE(kptllnd_data.kptl_nak_msg,
+                            offsetof(kptl_msg_t, ptlm_u));
+
+        memset(&kptllnd_data, 0, sizeof(kptllnd_data));
+        PORTAL_MODULE_UNUSE;
+        return;
+}
+
 int
-kptllnd_startup (lnet_ni_t *ni)
+kptllnd_base_startup (void)
 {
-        int             rc;
         int             i;
+        int                rc;
         int             spares;
         struct timeval  tv;
+        lnet_process_id_t  target;
         ptl_err_t       ptl_rc;
 
-        LASSERT (ni->ni_lnd == &kptllnd_lnd);
-
-        if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
-                CERROR("Only 1 instance supported\n");
-                return -EPERM;
-        }
-
         if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
                 CERROR("max_procs_per_node must be >= 1\n");
                 return -EINVAL;
@@ -531,51 +661,40 @@ kptllnd_startup (lnet_ni_t *ni)
         CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
         CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
 
-        /*
-         * zero pointers, flags etc
-         * put everything into a known state.
-         */
+        /* Zero pointers, flags etc; put everything into a known state. */
         memset (&kptllnd_data, 0, sizeof (kptllnd_data));
+
+        LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
+        if (kptllnd_data.kptl_nak_msg == NULL) {
+                CERROR("Can't allocate NAK msg\n");
+                return -ENOMEM;
+        }
+        memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
+
         kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
         kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
 
-        /*
-         * Setup the sched locks/lists/waitq
-         */
+        rwlock_init(&kptllnd_data.kptl_net_rw_lock);
+        INIT_LIST_HEAD(&kptllnd_data.kptl_nets);
+
+        /* Setup the sched locks/lists/waitq */
         spin_lock_init(&kptllnd_data.kptl_sched_lock);
         init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
         INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
 
-        /* init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
+        /* Init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
         spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
 
-        /*
-         * Setup the tx locks/lists
-         */
+        /* Setup the tx locks/lists */
         spin_lock_init(&kptllnd_data.kptl_tx_lock);
         INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
         atomic_set(&kptllnd_data.kptl_ntx, 0);
 
-        /*
-         * Uptick the module reference count
-         */
+        /* Uptick the module reference count */
         PORTAL_MODULE_USE;
 
-        /*
-         * Setup pointers between the ni and context data block
-         */
-        kptllnd_data.kptl_ni = ni;
-        ni->ni_data = &kptllnd_data;
-
-        /*
-         * Setup Credits
-         */
-        ni->ni_maxtxcredits   = *kptllnd_tunables.kptl_credits;
-        ni->ni_peertxcredits  = *kptllnd_tunables.kptl_peertxcredits;
-        ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
-
         kptllnd_data.kptl_expected_peers =
                 *kptllnd_tunables.kptl_max_nodes *
                 *kptllnd_tunables.kptl_max_procs_per_node;
@@ -619,9 +738,7 @@ kptllnd_startup (lnet_ni_t *ni)
                 goto failed;
         }
 
-        /*
-         * Fetch the lower NID
-         */
+        /* Fetch the lower NID */
         ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
                           &kptllnd_data.kptl_portals_id);
         if (ptl_rc != PTL_OK) {
@@ -640,12 +757,6 @@ kptllnd_startup (lnet_ni_t *ni)
                 goto failed;
         }
 
-        ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
-
-        CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", 
-               kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
-               libcfs_nid2str(ni->ni_nid));
-
         /* Initialized the incarnation - it must be for-all-time unique, even
          * accounting for the fact that we increment it when we disconnect a
          * peer that's using it */
@@ -654,20 +765,26 @@ kptllnd_startup (lnet_ni_t *ni)
                                         tv.tv_usec;
         CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
 
-        /*
-         * Allocate and setup the peer hash table
-         */
+        target.nid = LNET_NID_ANY;
+        target.pid = LNET_PID_ANY; /* NB target for NAK doesn't matter */
+        kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, target, 0);
+        kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
+        kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
+        kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
+        kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
+
         rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
         init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
         atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
         INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
         INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
 
+        /* Allocate and setup the peer hash table */
         kptllnd_data.kptl_peer_hash_size =
                 *kptllnd_tunables.kptl_peer_hash_table_size;
         LIBCFS_ALLOC(kptllnd_data.kptl_peers,
-                     (kptllnd_data.kptl_peer_hash_size * 
-                      sizeof(struct list_head)));
+                     sizeof(struct list_head) *
+                     kptllnd_data.kptl_peer_hash_size);
         if (kptllnd_data.kptl_peers == NULL) {
                 CERROR("Failed to allocate space for peer hash table size=%d\n",
                         kptllnd_data.kptl_peer_hash_size);
@@ -677,22 +794,6 @@ kptllnd_startup (lnet_ni_t *ni)
         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
                 INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
 
-        LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
-        if (kptllnd_data.kptl_nak_msg == NULL) {
-                CERROR("Can't allocate NAK msg\n");
-                rc = -ENOMEM;
-                goto failed;
-        }
-        memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
-        kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
-        kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;
-        kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;
-        kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;
-        kptllnd_data.kptl_nak_msg->ptlm_srcnid   = ni->ni_nid;
-        kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
-        kptllnd_data.kptl_nak_msg->ptlm_dstpid   = LNET_PID_ANY;
-        kptllnd_data.kptl_nak_msg->ptlm_dstnid   = LNET_NID_ANY;
-
         kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
 
         kptllnd_data.kptl_rx_cache = 
@@ -721,7 +822,7 @@ kptllnd_startup (lnet_ni_t *ni)
         
         /* Start the scheduler threads for handling incoming requests.  No need
          * to advance the state because this will be automatically cleaned up
-         * now that PTLNAT_INIT_DATA state has been entered */
+         * now that PTLLND_INIT_DATA state has been entered */
         CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
         for (i = 0; i < PTLLND_N_SCHED; i++) {
                 rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
@@ -760,11 +861,59 @@ kptllnd_startup (lnet_ni_t *ni)
         if (*kptllnd_tunables.kptl_checksum)
                 CWARN("Checksumming enabled\n");
         
-        CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
+        CDEBUG(D_NET, "<<< kptllnd_base_startup SUCCESS\n");
+        return 0;
+
+ failed:
+        CERROR("kptllnd_base_startup failed: %d\n", rc);
+        kptllnd_base_shutdown();
+        return rc;
+}
+
+int
+kptllnd_startup (lnet_ni_t *ni)
+{
+        int         rc;
+        kptl_net_t *net;
+
+        LASSERT (ni->ni_lnd == &kptllnd_lnd);
+
+        if (kptllnd_data.kptl_init == PTLLND_INIT_NOTHING) {
+                rc = kptllnd_base_startup();
+                if (rc != 0)
+                        return rc;
+        }
+
+        LIBCFS_ALLOC(net, sizeof(*net));
+        ni->ni_data = net;
+        if (net == NULL) {
+                CERROR("Can't allocate kptl_net_t\n");
+                rc = -ENOMEM;
+                goto failed;
+        }
+        memset(net, 0, sizeof(*net));
+        net->net_ni = ni;
+
+        ni->ni_maxtxcredits   = *kptllnd_tunables.kptl_credits;
+        ni->ni_peertxcredits  = *kptllnd_tunables.kptl_peertxcredits;
+        ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
+        ni->ni_nid = kptllnd_ptl2lnetnid(ni->ni_nid,
+                                         kptllnd_data.kptl_portals_id.nid);
+        CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",
+               kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
+               libcfs_nid2str(ni->ni_nid));
+
+        /* NB LNET_NIDNET(ptlm_srcnid) of NAK doesn't matter in case of
+         * multiple NIs */
+        kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid;
+
+        atomic_set(&net->net_refcount, 1);
+        write_lock(&kptllnd_data.kptl_net_rw_lock);
+        list_add_tail(&net->net_list, &kptllnd_data.kptl_nets);
+        write_unlock(&kptllnd_data.kptl_net_rw_lock);
         return 0;
 
  failed:
-        CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
         kptllnd_shutdown(ni);
         return rc;
 }
@@ -772,139 +921,53 @@ kptllnd_startup (lnet_ni_t *ni)
 void
 kptllnd_shutdown (lnet_ni_t *ni)
 {
+        kptl_net_t    *net = ni->ni_data;
         int               i;
-        ptl_err_t         prc;
-        lnet_process_id_t process_id;
         unsigned long     flags;
 
+        LASSERT (kptllnd_data.kptl_init == PTLLND_INIT_ALL);
+
         CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
                atomic_read (&libcfs_kmemory));
 
-        LASSERT (ni == kptllnd_data.kptl_ni);
+        if (net == NULL)
+                goto out;
 
-        switch (kptllnd_data.kptl_init) {
-        default:
-                LBUG();
+        LASSERT (ni == net->net_ni);
+        LASSERT (!net->net_shutdown);
+        LASSERT (!list_empty(&net->net_list));
+        LASSERT (atomic_read(&net->net_refcount) != 0);
+        ni->ni_data = NULL;
+        net->net_ni = NULL;
 
-        case PTLLND_INIT_ALL:
-        case PTLLND_INIT_DATA:
-                /* Stop receiving */
-                kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
-                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
-                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
+        write_lock(&kptllnd_data.kptl_net_rw_lock);
+        kptllnd_net_decref(net);
+        list_del_init(&net->net_list);
+        write_unlock(&kptllnd_data.kptl_net_rw_lock);
 
-                /* Hold peertable lock to interleave cleanly with peer birth/death */
+        /* Can't nuke peers here - they are shared among all NIs */
                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
-
-                LASSERT (kptllnd_data.kptl_shutdown == 0);
-                kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
-
-                /* no new peers possible now */
-                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
-                                        flags);
-
-                /* nuke all existing peers */
-                process_id.nid = LNET_NID_ANY;
-                process_id.pid = LNET_PID_ANY;
-                kptllnd_peer_del(process_id);
-
-                read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
-
-                LASSERT (kptllnd_data.kptl_n_active_peers == 0);
+        net->net_shutdown = 1;   /* Order with peer creation */
+        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
 
                 i = 2;
-                while (kptllnd_data.kptl_npeers != 0) {
+        while (atomic_read(&net->net_refcount) != 0) {
                         i++;
                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
-                               "Waiting for %d peers to terminate\n",
-                               kptllnd_data.kptl_npeers);
-
-                        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
-                                               flags);
+                       "Waiting for %d references to drop\n",
+                       atomic_read(&net->net_refcount));
 
                         cfs_pause(cfs_time_seconds(1));
-
-                        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, 
-                                          flags);
                 }
 
-                LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
-                LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
-                LASSERT (kptllnd_data.kptl_peers != NULL);
-                for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
-                        LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
-
-                read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
-                CDEBUG(D_NET, "All peers deleted\n");
-
-                /* Shutdown phase 2: kill the daemons... */
-                kptllnd_data.kptl_shutdown = 2;
-                mb();
-                
-                i = 2;
-                while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
-                        /* Wake up all threads*/
-                        wake_up_all(&kptllnd_data.kptl_sched_waitq);
-                        wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
-
-                        i++;
-                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
-                               "Waiting for %d threads to terminate\n",
-                               atomic_read(&kptllnd_data.kptl_nthreads));
-                        cfs_pause(cfs_time_seconds(1));
-                }
-
-                CDEBUG(D_NET, "All Threads stopped\n");
-                LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
-
-                kptllnd_cleanup_tx_descs();
-
-                /* Nothing here now, but libcfs might soon require
-                 * us to explicitly destroy wait queues and semaphores
-                 * that would be done here */
-
-                /* fall through */
-
-        case PTLLND_INIT_NOTHING:
-                CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
-                break;
-        }
-
-        if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
-                prc = PtlEQFree(kptllnd_data.kptl_eqh);
-                if (prc != PTL_OK)
-                        CERROR("Error %s(%d) freeing portals EQ\n",
-                               kptllnd_errtype2str(prc), prc);
-        }
-
-        if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
-                prc = PtlNIFini(kptllnd_data.kptl_nih);
-                if (prc != PTL_OK)
-                        CERROR("Error %s(%d) finalizing portals NI\n",
-                               kptllnd_errtype2str(prc), prc);
-        }
-        
-        LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
-        LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
-
-        if (kptllnd_data.kptl_rx_cache != NULL)
-                cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
-
-        if (kptllnd_data.kptl_peers != NULL)
-                LIBCFS_FREE (kptllnd_data.kptl_peers,
-                             sizeof (struct list_head) *
-                             kptllnd_data.kptl_peer_hash_size);
-
-        if (kptllnd_data.kptl_nak_msg != NULL)
-                LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
-                             offsetof(kptl_msg_t, ptlm_u));
-
-        memset(&kptllnd_data, 0, sizeof(kptllnd_data));
-
+        LIBCFS_FREE(net, sizeof(*net));
+out:
+        /* NB no locking since I don't race with writers */
+        if (list_empty(&kptllnd_data.kptl_nets))
+                kptllnd_base_shutdown();
         CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
                atomic_read (&libcfs_kmemory));
-
-        PORTAL_MODULE_UNUSE;
+        return;
 }
 
 int __init
index 5f817cb..e747812 100755 (executable)
@@ -135,6 +135,7 @@ typedef struct
 /***********************************************************************/
 
 typedef struct kptl_data kptl_data_t;
+typedef struct kptl_net kptl_net_t;
 typedef struct kptl_rx_buffer kptl_rx_buffer_t;
 typedef struct kptl_peer kptl_peer_t;
 
@@ -243,7 +244,7 @@ enum kptllnd_peer_state
 struct kptl_peer
 {
         struct list_head        peer_list;
-        atomic_t                peer_refcount;          /* The current refrences */
+        atomic_t                peer_refcount;          /* The current references */
         enum kptllnd_peer_state peer_state;
         spinlock_t              peer_lock;              /* serialize */
         struct list_head        peer_noops;             /* PTLLND_MSG_TYPE_NOOP txs */
@@ -271,12 +272,14 @@ struct kptl_data
         int                     kptl_init;             /* initialisation state */
         volatile int            kptl_shutdown;         /* shut down? */
         atomic_t                kptl_nthreads;         /* # live threads */
-        lnet_ni_t              *kptl_ni;               /* _the_ LND instance */
         ptl_handle_ni_t         kptl_nih;              /* network inteface handle */
         ptl_process_id_t        kptl_portals_id;       /* Portals ID of interface */
         __u64                   kptl_incarnation;      /* which one am I */
         ptl_handle_eq_t         kptl_eqh;              /* Event Queue (EQ) */
 
+        rwlock_t                kptl_net_rw_lock;      /* serialise... */
+        struct list_head        kptl_nets;             /* kptl_net instances */
+
         spinlock_t              kptl_sched_lock;       /* serialise... */
         wait_queue_head_t       kptl_sched_waitq;      /* schedulers sleep here */
         struct list_head        kptl_sched_txq;        /* tx requiring attention */
@@ -306,6 +309,14 @@ struct kptl_data
         spinlock_t              kptl_ptlid2str_lock;   /* serialise str ops */
 };
 
+struct kptl_net
+{
+        struct list_head  net_list;      /* chain on kptl_data:: kptl_nets */
+        lnet_ni_t        *net_ni;
+        atomic_t          net_refcount;  /* # current references */
+        int               net_shutdown;  /* lnd_shutdown called */
+};
+
 enum 
 {
         PTLLND_INIT_NOTHING = 0,
@@ -317,14 +328,12 @@ extern kptl_tunables_t  kptllnd_tunables;
 extern kptl_data_t      kptllnd_data;
 
 static inline lnet_nid_t 
-kptllnd_ptl2lnetnid(ptl_nid_t ptl_nid)
+kptllnd_ptl2lnetnid(lnet_nid_t ni_nid, ptl_nid_t ptl_nid)
 {
 #ifdef _USING_LUSTRE_PORTALS_
-        return LNET_MKNID(LNET_NIDNET(kptllnd_data.kptl_ni->ni_nid), 
-                          LNET_NIDADDR(ptl_nid));
+        return LNET_MKNID(LNET_NIDNET(ni_nid), LNET_NIDADDR(ptl_nid));
 #else
-       return LNET_MKNID(LNET_NIDNET(kptllnd_data.kptl_ni->ni_nid), 
-                          ptl_nid);
+        return LNET_MKNID(LNET_NIDNET(ni_nid), ptl_nid);
 #endif
 }
 
@@ -466,8 +475,10 @@ int  kptllnd_peer_connect(kptl_tx_t *tx, lnet_nid_t nid);
 void kptllnd_peer_check_sends(kptl_peer_t *peer);
 void kptllnd_peer_check_bucket(int idx, int stamp);
 void kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag);
-int  kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target);
-kptl_peer_t *kptllnd_peer_handle_hello(ptl_process_id_t initiator,
+int  kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
+                         kptl_peer_t **peerp);
+kptl_peer_t *kptllnd_peer_handle_hello(kptl_net_t *net,
+                                       ptl_process_id_t initiator,
                                        kptl_msg_t *msg);
 kptl_peer_t *kptllnd_id2peer_locked(lnet_process_id_t id);
 void kptllnd_peer_alive(kptl_peer_t *peer);
@@ -486,6 +497,20 @@ kptllnd_peer_decref (kptl_peer_t *peer)
 }
 
 static inline void
+kptllnd_net_addref (kptl_net_t *net)
+{
+        LASSERT (atomic_read(&net->net_refcount) > 0);
+        atomic_inc(&net->net_refcount);
+}
+
+static inline void
+kptllnd_net_decref (kptl_net_t *net)
+{
+        LASSERT (atomic_read(&net->net_refcount) > 0);
+        atomic_dec(&net->net_refcount);
+}
+
+static inline void
 kptllnd_set_tx_peer(kptl_tx_t *tx, kptl_peer_t *peer) 
 {
         LASSERT (tx->tx_peer == NULL);
@@ -497,7 +522,9 @@ kptllnd_set_tx_peer(kptl_tx_t *tx, kptl_peer_t *peer)
 static inline struct list_head *
 kptllnd_nid2peerlist(lnet_nid_t nid)
 {
-        unsigned int hash = ((unsigned int)nid) %
+        /* Only one copy of peer state for all logical peers, so the net part
+         * of NIDs is ignored; e.g. A@ptl0 and A@ptl2 share peer state */
+        unsigned int hash = ((unsigned int)LNET_NIDADDR(nid)) %
                             kptllnd_data.kptl_peer_hash_size;
 
         return &kptllnd_data.kptl_peers[hash];
@@ -543,7 +570,7 @@ int  kptllnd_setup_tx_descs(void);
 void kptllnd_cleanup_tx_descs(void);
 void kptllnd_tx_fini(kptl_tx_t *tx);
 void kptllnd_cancel_txlist(struct list_head *peerq, struct list_head *txs);
-void kptllnd_restart_txs(lnet_process_id_t target, struct list_head *restarts);
+void kptllnd_restart_txs(kptl_net_t *net, lnet_process_id_t id, struct list_head *restarts);
 kptl_tx_t *kptllnd_get_idle_tx(enum kptl_tx_type purpose);
 void kptllnd_tx_callback(ptl_event_t *ev);
 const char *kptllnd_tx_typestr(int type);
@@ -566,7 +593,8 @@ kptllnd_tx_decref(kptl_tx_t *tx)
 /*
  * MESSAGE SUPPORT FUNCTIONS
  */
-void kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob);
+void kptllnd_init_msg(kptl_msg_t *msg, int type,
+                      lnet_process_id_t target, int body_nob);
 void kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer);
 int  kptllnd_msg_unpack(kptl_msg_t *msg, int nob);
 
index 8acf9d0..8a0d67a 100644 (file)
@@ -320,22 +320,27 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
         unsigned int      payload_offset = lntmsg->msg_offset;
         unsigned int      payload_nob = lntmsg->msg_len;
+        kptl_net_t       *net = ni->ni_data;
         kptl_peer_t      *peer;
         kptl_tx_t        *tx;
         int               nob;
         int               nfrag;
         int               rc;
 
+        LASSERT (net->net_ni == ni);
+        LASSERT (!net->net_shutdown);
         LASSERT (payload_nob == 0 || payload_niov > 0);
         LASSERT (payload_niov <= LNET_MAX_IOV);
         LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
         LASSERT (!in_interrupt());
 
-        rc = kptllnd_find_target(&peer, target);
+        rc = kptllnd_find_target(net, target, &peer);
         if (rc != 0)
                 return rc;
         
+        /* NB peer->peer_id does NOT always equal target, be careful with
+         * which one to use */
         switch (type) {
         default:
                 LBUG();
@@ -365,7 +370,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 tx->tx_lnet_msg = lntmsg;
                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
-                                  sizeof(kptl_rdma_msg_t));
+                                 target, sizeof(kptl_rdma_msg_t));
 
                 CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
                        libcfs_id2str(target),
@@ -394,8 +399,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                         goto out;
                 }
 
-                tx->tx_lnet_replymsg =
-                        lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg);
+                tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg);
                 if (tx->tx_lnet_replymsg == NULL) {
                         CERROR("Failed to allocate LNET reply for %s\n",
                                libcfs_id2str(target));
@@ -416,7 +420,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 tx->tx_lnet_msg = lntmsg;
                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
-                                  sizeof(kptl_rdma_msg_t));
+                                 target, sizeof(kptl_rdma_msg_t));
 
                 CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
                        libcfs_id2str(target),
@@ -468,7 +472,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         }
         
         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
-        kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
+        kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob);
 
         CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
                libcfs_id2str(target),
index 641a73c..5d659d8 100644 (file)
@@ -74,7 +74,6 @@ kptllnd_get_peer_info(int index,
         read_lock_irqsave(g_lock, flags);
 
         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
-                
                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
 
@@ -112,7 +111,6 @@ kptllnd_get_peer_info(int index,
 void
 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
 {
-        LASSERT (!kptllnd_data.kptl_shutdown);
         LASSERT (kptllnd_data.kptl_n_active_peers <
                  kptllnd_data.kptl_expected_peers);
 
@@ -146,7 +144,7 @@ kptllnd_cull_peertable_locked (lnet_process_id_t pid)
                  * in MRU order */
                 peer = list_entry(tmp, kptl_peer_t, peer_list);
                         
-                if (peer->peer_id.nid != pid.nid)
+                if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
                         continue;
 
                 LASSERT (peer->peer_id.pid != pid.pid);
@@ -165,7 +163,7 @@ kptllnd_cull_peertable_locked (lnet_process_id_t pid)
 }
 
 kptl_peer_t *
-kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
+kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
 {
         unsigned long    flags;
         kptl_peer_t     *peer;
@@ -204,16 +202,14 @@ kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
 
         /* Only increase # peers under lock, to guarantee we dont grow it
          * during shutdown */
-        if (kptllnd_data.kptl_shutdown) {
-                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
-                                        flags);
+        if (net->net_shutdown) {
+                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
                 LIBCFS_FREE(peer, sizeof(*peer));
                 return NULL;
         }
 
         kptllnd_data.kptl_npeers++;
         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
-        
         return peer;
 }
 
@@ -288,8 +284,12 @@ void
 kptllnd_peer_notify (kptl_peer_t *peer)
 {
         unsigned long flags;
-        time_t        last_alive = 0;
+        kptl_net_t   *net;
+        kptl_net_t  **nets;
+        int           i = 0;
+        int           nnets = 0;
         int           error = 0;
+        time_t        last_alive = 0;
         
         spin_lock_irqsave(&peer->peer_lock, flags);
 
@@ -304,9 +304,51 @@ kptllnd_peer_notify (kptl_peer_t *peer)
         
         spin_unlock_irqrestore(&peer->peer_lock, flags);
 
-        if (error != 0)
-                lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
-                             last_alive);
+        if (error == 0)
+                return;
+
+        read_lock(&kptllnd_data.kptl_net_rw_lock);
+        list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
+                nnets++;
+        read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        if (nnets == 0) /* shutdown in progress */
+                return;
+
+        LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
+        if (nets == NULL) {
+                CERROR("Failed to allocate nets[%d]\n", nnets);
+                return;
+        }
+        memset(nets, 0, nnets * sizeof(*nets));
+
+        read_lock(&kptllnd_data.kptl_net_rw_lock);
+        i = 0;
+        list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
+                LASSERT (i < nnets);
+                nets[i] = net;
+                kptllnd_net_addref(net);
+                i++;
+        }
+        read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        for (i = 0; i < nnets; i++) {
+                lnet_nid_t peer_nid;
+
+                net = nets[i];
+                if (net == NULL)
+                        break;
+
+                if (!net->net_shutdown) {
+                        peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
+                                                       peer->peer_ptlid.nid);
+                        lnet_notify(net->net_ni, peer_nid, 0, last_alive);
+                }
+
+                kptllnd_net_decref(net);
+        }
+
+        LIBCFS_FREE(nets, nnets * sizeof(*nets));
 }
 
 void
@@ -462,7 +504,7 @@ again:
                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
 
                         if (!(id.nid == LNET_NID_ANY || 
-                              (peer->peer_id.nid == id.nid &&
+                              (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
                                (id.pid == LNET_PID_ANY || 
                                 peer->peer_id.pid == id.pid))))
                                 continue;
@@ -565,7 +607,7 @@ kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
 
 /* NB "restarts" comes from peer_sendq of a single peer */
 void
-kptllnd_restart_txs (lnet_process_id_t target, struct list_head *restarts)
+kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target, struct list_head *restarts)
 {
         kptl_tx_t   *tx;
         kptl_tx_t   *tmp;
@@ -573,7 +615,7 @@ kptllnd_restart_txs (lnet_process_id_t target, struct list_head *restarts)
 
         LASSERT (!list_empty(restarts));
 
-        if (kptllnd_find_target(&peer, target) != 0)
+        if (kptllnd_find_target(net, target, &peer) != 0)
                 peer = NULL;
 
         list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
@@ -643,7 +685,8 @@ kptllnd_peer_check_sends (kptl_peer_t *peer)
                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
                                libcfs_id2str(peer->peer_id));
                 } else {
-                        kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
+                        kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
+                                         peer->peer_id, 0);
                         kptllnd_post_tx(peer, tx, 0);
                 }
 
@@ -855,13 +898,8 @@ void
 kptllnd_peer_check_bucket (int idx, int stamp)
 {
         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
-        struct list_head  *ptmp;
         kptl_peer_t       *peer;
-        kptl_tx_t         *tx;
         unsigned long      flags;
-        int                nsend;
-        int                nactive;
-        int                check_sends;
 
         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
 
@@ -869,8 +907,12 @@ kptllnd_peer_check_bucket (int idx, int stamp)
         /* NB. Shared lock while I just look */
         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
 
-        list_for_each (ptmp, peers) {
-                peer = list_entry (ptmp, kptl_peer_t, peer_list);
+        list_for_each_entry (peer, peers, peer_list) {
+                kptl_tx_t *tx;
+                int        check_sends;
+                int        c = -1, oc = -1, sc = -1;
+                int        nsend = -1, nactive = -1;
+                int        sent_hello = -1, state = -1;
 
                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
@@ -888,6 +930,16 @@ kptllnd_peer_check_bucket (int idx, int stamp)
                 tx = kptllnd_find_timed_out_tx(peer);
                 check_sends = peer->peer_retry_noop;
                 
+                if (tx != NULL) {
+                        c  = peer->peer_credits;
+                        sc = peer->peer_sent_credits;
+                        oc = peer->peer_outstanding_credits;
+                        state      = peer->peer_state;
+                        sent_hello = peer->peer_sent_hello;
+                        nsend   = kptllnd_count_queue(&peer->peer_sendq);
+                        nactive = kptllnd_count_queue(&peer->peer_activeq);
+                }
+
                 spin_unlock(&peer->peer_lock);
                 
                 if (tx == NULL && !check_sends)
@@ -905,11 +957,6 @@ kptllnd_peer_check_bucket (int idx, int stamp)
                         goto again;
                 }
 
-                spin_lock_irqsave(&peer->peer_lock, flags);
-                nsend = kptllnd_count_queue(&peer->peer_sendq);
-                nactive = kptllnd_count_queue(&peer->peer_activeq);
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
-
                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
                                    libcfs_id2str(peer->peer_id),
                                    (tx->tx_tposted == 0) ? 
@@ -922,6 +969,11 @@ kptllnd_peer_check_bucket (int idx, int stamp)
                                libcfs_id2str(peer->peer_id),
                                *kptllnd_tunables.kptl_timeout,
                                cfs_duration_sec(jiffies - tx->tx_tposted));
+                } else if (state < PEER_STATE_ACTIVE) {
+                        CERROR("Could not connect %s (%d) after %ds; "
+                               "peer might be down\n",
+                               libcfs_id2str(peer->peer_id), state,
+                               *kptllnd_tunables.kptl_timeout);
                } else {
                        CERROR("Could not get credits for %s after %ds; "
                                "possible Lustre networking issues\n",
@@ -930,11 +982,12 @@ kptllnd_peer_check_bucket (int idx, int stamp)
                }
 
                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
-                       "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
-                       "%sposted %lu T/O %ds\n",
-                       libcfs_id2str(peer->peer_id), peer->peer_credits,
-                       peer->peer_outstanding_credits, peer->peer_sent_credits,
-                       nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
+                       "state %d, sent_hello %d, sendq %d, activeq %d "
+                       "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
+                       libcfs_id2str(peer->peer_id), c, oc, sc,
+                       state, sent_hello, nsend, nactive,
+                       tx, kptllnd_tx_typestr(tx->tx_type),
+                       kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
                        tx->tx_active ? "A" : "",
                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
                        "" : "M",
@@ -970,20 +1023,20 @@ kptllnd_id2peer_locked (lnet_process_id_t id)
         kptl_peer_t      *peer;
 
         list_for_each (tmp, peers) {
-
                 peer = list_entry (tmp, kptl_peer_t, peer_list);
 
                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
                         peer->peer_state == PEER_STATE_ACTIVE);
-                
-                if (peer->peer_id.nid != id.nid ||
-                    peer->peer_id.pid != id.pid)
+
+                /* NB logical LNet peers share one kptl_peer_t */
+                if (peer->peer_id.pid != id.pid ||
+                    LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
                         continue;
 
                 kptllnd_peer_addref(peer);
 
                 CDEBUG(D_NET, "%s -> %s (%d)\n",
-                       libcfs_id2str(id), 
+                       libcfs_id2str(id),
                        kptllnd_ptlid2str(peer->peer_ptlid),
                        atomic_read (&peer->peer_refcount));
                 return peer;
@@ -1025,25 +1078,25 @@ kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
                 peer = list_entry (tmp, kptl_peer_t, peer_list);
 
-                if (peer->peer_id.nid == lpid.nid &&
+                if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
                     peer->peer_id.pid == lpid.pid)
                         return peer->peer_last_matchbits_seen;
         }
-        
+
         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
                 peer = list_entry (tmp, kptl_peer_t, peer_list);
 
-                if (peer->peer_id.nid == lpid.nid &&
+                if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
                     peer->peer_id.pid == lpid.pid)
                         return peer->peer_last_matchbits_seen;
         }
-        
+
         return PTL_RESERVED_MATCHBITS;
 }
 
 kptl_peer_t *
-kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
-                           kptl_msg_t       *msg)
+kptllnd_peer_handle_hello (kptl_net_t *net,
+                           ptl_process_id_t initiator, kptl_msg_t *msg)
 {
         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
         kptl_peer_t        *peer;
@@ -1156,6 +1209,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                        msg->ptlm_srcstamp, peer->peer_incarnation);
 
                 kptllnd_peer_decref(peer);
+                peer = NULL;
         }
 
         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
@@ -1166,9 +1220,9 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         }
 
         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
-                         sizeof(kptl_hello_msg_t));
+                         lpid, sizeof(kptl_hello_msg_t));
 
-        new_peer = kptllnd_peer_allocate(lpid, initiator);
+        new_peer = kptllnd_peer_allocate(net, lpid, initiator);
         if (new_peer == NULL) {
                 kptllnd_tx_decref(hello_tx);
                 return NULL;
@@ -1187,7 +1241,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         write_lock_irqsave(g_lock, flags);
 
  again:
-        if (kptllnd_data.kptl_shutdown) {
+        if (net->net_shutdown) {
                 write_unlock_irqrestore(g_lock, flags);
 
                 CERROR ("Shutdown started, refusing connection from %s\n",
@@ -1268,6 +1322,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
 
+        LASSERT (!net->net_shutdown);
         kptllnd_peer_add_peertable_locked(new_peer);
 
         write_unlock_irqrestore(g_lock, flags);
@@ -1292,7 +1347,8 @@ kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
 }
 
 int
-kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
+kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
+                    kptl_peer_t **peerp)
 {
         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
         ptl_process_id_t  ptl_id;
@@ -1316,8 +1372,8 @@ kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
                 return -EHOSTUNREACH;
         }
 
-        /* The new peer is a kernel ptllnd, and kernel ptllnds all have
-         * the same portals PID */
+        /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
+         * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
 
@@ -1330,9 +1386,9 @@ kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
 
         hello_tx->tx_acked = 1;
         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
-                         sizeof(kptl_hello_msg_t));
+                         target, sizeof(kptl_hello_msg_t));
 
-        new_peer = kptllnd_peer_allocate(target, ptl_id);
+        new_peer = kptllnd_peer_allocate(net, target, ptl_id);
         if (new_peer == NULL) {
                 rc = -ENOMEM;
                 goto unwind_0;
@@ -1344,11 +1400,8 @@ kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
 
         write_lock_irqsave(g_lock, flags);
  again:
-        if (kptllnd_data.kptl_shutdown) {
-                write_unlock_irqrestore(g_lock, flags);
-                rc = -ESHUTDOWN;
-                goto unwind_2;
-        }
+        /* Called only in lnd_send which can't happen after lnd_shutdown */
+        LASSERT (!net->net_shutdown);
 
         *peerp = kptllnd_id2peer_locked(target);
         if (*peerp != NULL) {
index 90c654a..cacd125 100644 (file)
@@ -299,7 +299,7 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
          * Setup MD
          */
         md.start = rxb->rxb_buffer;
-        md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
+        md.length = kptllnd_rx_buffer_size();
         md.threshold = PTL_MD_THRESH_INF;
         md.options = PTL_MD_OP_PUT |
                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
@@ -505,7 +505,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
 }
 
 void
-kptllnd_nak (kptl_rx_t *rx)
+kptllnd_nak (ptl_process_id_t dest)
 {
         /* Fire-and-forget a stub message that will let the peer know my
          * protocol magic/version and make her drop/refresh any peer state she
@@ -523,34 +523,53 @@ kptllnd_nak (kptl_rx_t *rx)
         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
         if (rc != PTL_OK) {
                 CWARN("Can't NAK %s: bind failed %s(%d)\n",
-                      kptllnd_ptlid2str(rx->rx_initiator),
-                      kptllnd_errtype2str(rc), rc);
+                      kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
                 return;
         }
 
-        rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
+        rc = PtlPut(mdh, PTL_NOACK_REQ, dest,
                     *kptllnd_tunables.kptl_portal, 0,
                     LNET_MSG_MATCHBITS, 0, 0);
-
         if (rc != PTL_OK) {
                 CWARN("Can't NAK %s: put failed %s(%d)\n",
-                      kptllnd_ptlid2str(rx->rx_initiator),
-                      kptllnd_errtype2str(rc), rc);
+                      kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
                 kptllnd_schedule_ptltrace_dump();
         }
 }
 
+kptl_net_t *
+kptllnd_find_net (lnet_nid_t nid)
+{
+        kptl_net_t *net;
+
+        read_lock(&kptllnd_data.kptl_net_rw_lock);
+        list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
+                LASSERT (!net->net_shutdown);
+
+                if (net->net_ni->ni_nid == nid) {
+                        kptllnd_net_addref(net);
+                        read_unlock(&kptllnd_data.kptl_net_rw_lock);
+                        return net;
+                }
+        }
+        read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        return NULL;
+}
+
 void
 kptllnd_rx_parse(kptl_rx_t *rx)
 {
         kptl_msg_t             *msg = rx->rx_msg;
         int                     rc = 0;
         int                     post_credit = PTLLND_POSTRX_PEER_CREDIT;
+        kptl_net_t             *net = NULL;
         kptl_peer_t            *peer;
         struct list_head        txs;
         unsigned long           flags;
         lnet_process_id_t       srcid;
 
+        LASSERT (!in_interrupt());
         LASSERT (rx->rx_peer == NULL);
 
         INIT_LIST_HEAD(&txs);
@@ -570,7 +589,8 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
                                msg->ptlm_version : __swab16(msg->ptlm_version)),
                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
-                kptllnd_nak(rx);
+                /* NB backward compatibility */
+                kptllnd_nak(rx->rx_initiator);
                 goto rx_done;
         }
         
@@ -590,8 +610,8 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                jiffies - rx->rx_treceived,
                cfs_duration_sec(jiffies - rx->rx_treceived));
 
-        if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
-                CERROR("Bad source id %s from %s\n",
+        if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) {
+                CERROR("Bad source nid %s from %s\n",
                        libcfs_id2str(srcid),
                        kptllnd_ptlid2str(rx->rx_initiator));
                 goto rx_done;
@@ -619,21 +639,28 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                 goto failed;
         }
 
-        if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
-            msg->ptlm_dstpid != the_lnet.ln_pid) {
-                CERROR("Bad dstid %s (expected %s) from %s\n",
+        net = kptllnd_find_net(msg->ptlm_dstnid);
+        if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) {
+                CERROR("Bad dstid %s from %s\n",
                        libcfs_id2str((lnet_process_id_t) {
                                .nid = msg->ptlm_dstnid,
                                .pid = msg->ptlm_dstpid}),
-                       libcfs_id2str((lnet_process_id_t) {
-                               .nid = kptllnd_data.kptl_ni->ni_nid,
-                               .pid = the_lnet.ln_pid}),
                        kptllnd_ptlid2str(rx->rx_initiator));
                 goto rx_done;
         }
 
+        if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) {
+                lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid),
+                                            LNET_NIDADDR(srcid.nid));
+                CERROR("Bad source nid %s from %s, %s expected.\n",
+                       libcfs_id2str(srcid),
+                       kptllnd_ptlid2str(rx->rx_initiator),
+                       libcfs_nid2str(nid));
+                goto rx_done;
+        }
+
         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
-                peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
+                peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg);
                 if (peer == NULL)
                         goto rx_done;
         } else {
@@ -643,7 +670,7 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                               kptllnd_msgtype2str(msg->ptlm_type),
                               libcfs_id2str(srcid));
                         /* NAK to make the peer reconnect */
-                        kptllnd_nak(rx);
+                        kptllnd_nak(rx->rx_initiator);
                         goto rx_done;
                 }
 
@@ -671,7 +698,7 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                         CWARN("NAK %s: Unexpected %s message\n",
                               libcfs_id2str(srcid),
                               kptllnd_msgtype2str(msg->ptlm_type));
-                        kptllnd_nak(rx);
+                        kptllnd_nak(rx->rx_initiator);
                         rc = -EPROTO;
                         goto failed;
                 }
@@ -687,8 +714,12 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                 }
         }
 
-        LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
-                 msg->ptlm_srcpid == peer->peer_id.pid);
+        LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) ==
+                         LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n",
+                  libcfs_nid2str(msg->ptlm_srcnid),
+                  libcfs_nid2str(peer->peer_id.nid));
+        LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n",
+                  msg->ptlm_srcpid, peer->peer_id.pid);
 
         spin_lock_irqsave(&peer->peer_lock, flags);
 
@@ -743,12 +774,14 @@ kptllnd_rx_parse(kptl_rx_t *rx)
 
         case PTLLND_MSG_TYPE_IMMEDIATE:
                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
-                rc = lnet_parse(kptllnd_data.kptl_ni,
+                rc = lnet_parse(net->net_ni,
                                 &msg->ptlm_u.immediate.kptlim_hdr,
                                 msg->ptlm_srcnid,
                                 rx, 0);
-                if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
+                if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
+                        kptllnd_net_decref(net);
                         return;
+                }
                 goto failed;
                 
         case PTLLND_MSG_TYPE_PUT:
@@ -771,12 +804,14 @@ kptllnd_rx_parse(kptl_rx_t *rx)
 
                 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
 
-                rc = lnet_parse(kptllnd_data.kptl_ni,
+                rc = lnet_parse(net->net_ni,
                                 &msg->ptlm_u.rdma.kptlrm_hdr,
                                 msg->ptlm_srcnid,
                                 rx, 1);
-                if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
+                if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
+                        kptllnd_net_decref(net);
                         return;
+                }
                 goto failed;
         }
 
@@ -785,8 +820,12 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         kptllnd_peer_close(peer, rc);
         if (rx->rx_peer == NULL)                /* drop ref on peer */
                 kptllnd_peer_decref(peer);      /* unless rx_done will */
-        if (!list_empty(&txs))
-                kptllnd_restart_txs(srcid, &txs);
+        if (!list_empty(&txs)) {
+                LASSERT (net != NULL);
+                kptllnd_restart_txs(net, srcid, &txs);
+        }
  rx_done:
+        if (net != NULL)
+                kptllnd_net_decref(net);
         kptllnd_rx_done(rx, post_credit);
 }
index 96e350b..10be464 100644 (file)
@@ -79,6 +79,7 @@ kptllnd_alloc_tx(void)
         tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
         tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
         tx->tx_msg = NULL;
+        tx->tx_peer = NULL;
         tx->tx_frags = NULL;
                 
         LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
@@ -108,14 +109,11 @@ kptllnd_setup_tx_descs()
         
         for (i = 0; i < n; i++) {
                 kptl_tx_t *tx = kptllnd_alloc_tx();
-                
                 if (tx == NULL)
                         return -ENOMEM;
                 
                 spin_lock(&kptllnd_data.kptl_tx_lock);
-                
                 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
-                
                 spin_unlock(&kptllnd_data.kptl_tx_lock);
         }
         
@@ -365,11 +363,10 @@ kptllnd_tx_fini (kptl_tx_t *tx)
 
         /* Must finalize AFTER freeing 'tx' */
         if (msg != NULL)
-                lnet_finalize(kptllnd_data.kptl_ni, msg,
-                              (replymsg == NULL) ? status : 0);
+                lnet_finalize(NULL, msg, (replymsg == NULL) ? status : 0);
 
         if (replymsg != NULL)
-                lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
+                lnet_finalize(NULL, replymsg, status);
 
         if (peer != NULL)
                 kptllnd_peer_decref(peer);
@@ -456,8 +453,7 @@ kptllnd_tx_callback(ptl_event_t *ev)
 
                 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
                         if (ev->hdr_data == PTLLND_RDMA_OK) {
-                                lnet_set_reply_msg_len(
-                                        kptllnd_data.kptl_ni,
+                                lnet_set_reply_msg_len(NULL,
                                         tx->tx_lnet_replymsg,
                                         ev->mlength);
                         } else {