From: isaac Date: Mon, 9 Nov 2009 19:02:06 +0000 (+0000) Subject: i=liang,b=19735: X-Git-Tag: v1_8_1_54~1^3~4 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=1e9acba3f2dd940151c549f72cfcb9bd5b622b95;p=fs%2Flustre-release.git i=liang,b=19735: - multiple-instance support for kptllnd. --- diff --git a/lnet/ChangeLog b/lnet/ChangeLog index ab54322..aa387a7 100644 --- a/lnet/ChangeLog +++ b/lnet/ChangeLog @@ -12,6 +12,10 @@ tbd Sun Microsystems, Inc. mxlnd - MX 1.2.1 or later, ptllnd - Portals 3.3 / UNICOS/lc 1.5.x, 2.0.x +Severity : enhancement +Bugzilla : 19735 +Description: multiple-instance support for kptllnd + Severity : normal Bugzilla : 20897 Description: ksocknal_close_conn_locked connection race diff --git a/lnet/include/lnet/ptllnd.h b/lnet/include/lnet/ptllnd.h index 33fcea1..509625e 100755 --- a/lnet/include/lnet/ptllnd.h +++ b/lnet/include/lnet/ptllnd.h @@ -75,7 +75,7 @@ /* * The Cray Portals has no maximum number of IOVs. The - * maximum is limited only my memory and size of the + * maximum is limited only by memory and size of the * int parameters (2^31-1). * Lustre only really require that the underyling * implemenation to support at least LNET_MAX_IOV, diff --git a/lnet/klnds/ptllnd/ptllnd.c b/lnet/klnds/ptllnd/ptllnd.c index 770bfeb..6562561 100755 --- a/lnet/klnds/ptllnd/ptllnd.c +++ b/lnet/klnds/ptllnd/ptllnd.c @@ -247,10 +247,16 @@ kptllnd_cksum (void *ptr, int nob) } void -kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob) +kptllnd_init_msg(kptl_msg_t *msg, int type, + lnet_process_id_t target, int body_nob) { msg->ptlm_type = type; msg->ptlm_nob = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7; + msg->ptlm_dstpid = target.pid; + msg->ptlm_dstnid = target.nid; + msg->ptlm_srcpid = the_lnet.ln_pid; + msg->ptlm_srcnid = kptllnd_ptl2lnetnid(target.nid, + kptllnd_data.kptl_portals_id.nid); LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size); } @@ -264,12 +270,9 @@ kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer) msg->ptlm_credits = peer->peer_outstanding_credits; /* msg->ptlm_nob Filled in kptllnd_init_msg() */ msg->ptlm_cksum = 0; - msg->ptlm_srcnid = kptllnd_data.kptl_ni->ni_nid; + /* msg->ptlm_{src|dst}[pn]id Filled in kptllnd_init_msg */ msg->ptlm_srcstamp = peer->peer_myincarnation; - msg->ptlm_dstnid = peer->peer_id.nid; msg->ptlm_dststamp = peer->peer_incarnation; - msg->ptlm_srcpid = the_lnet.ln_pid; - msg->ptlm_dstpid = peer->peer_id.pid; if (*kptllnd_tunables.kptl_checksum) { /* NB ptlm_cksum zero while computing cksum */ @@ -411,6 +414,7 @@ kptllnd_msg_unpack(kptl_msg_t *msg, int nob) int kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) { + kptl_net_t *net = ni->ni_data; struct libcfs_ioctl_data *data = arg; int rc = -EINVAL; @@ -420,7 +424,7 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) * Validate that the context block is actually * pointing to this interface */ - LASSERT (ni == kptllnd_data.kptl_ni); + LASSERT (ni == net->net_ni); switch(cmd) { case IOC_LIBCFS_DEL_PEER: { @@ -480,12 +484,13 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) void kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when) { + kptl_net_t *net = ni->ni_data; kptl_peer_t *peer = NULL; lnet_process_id_t id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID}; unsigned long flags; /* NB: kptllnd_find_target connects to peer if necessary */ - if (kptllnd_find_target(&peer, id) != 0) + if (kptllnd_find_target(net, id, &peer) != 0) return; spin_lock_irqsave(&peer->peer_lock, flags); @@ -498,22 +503,147 @@ kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when) return; } +void +kptllnd_base_shutdown (void) +{ + int i; + ptl_err_t prc; + unsigned long flags; + lnet_process_id_t process_id; + + read_lock(&kptllnd_data.kptl_net_rw_lock); + LASSERT (list_empty(&kptllnd_data.kptl_nets)); + read_unlock(&kptllnd_data.kptl_net_rw_lock); + + switch (kptllnd_data.kptl_init) { + default: + LBUG(); + + case PTLLND_INIT_ALL: + case PTLLND_INIT_DATA: + /* stop receiving */ + kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool); + LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq)); + LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq)); + + /* lock to interleave cleanly with peer birth/death */ + write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); + LASSERT (kptllnd_data.kptl_shutdown == 0); + kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */ + /* no new peers possible now */ + write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); + + /* nuke all existing peers */ + process_id.nid = LNET_NID_ANY; + process_id.pid = LNET_PID_ANY; + kptllnd_peer_del(process_id); + + read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); + + LASSERT (kptllnd_data.kptl_n_active_peers == 0); + + i = 2; + while (kptllnd_data.kptl_npeers != 0) { + i++; + CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, + "Waiting for %d peers to terminate\n", + kptllnd_data.kptl_npeers); + + read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, + flags); + + cfs_pause(cfs_time_seconds(1)); + + read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, + flags); + } + + LASSERT (list_empty(&kptllnd_data.kptl_closing_peers)); + LASSERT (list_empty(&kptllnd_data.kptl_zombie_peers)); + LASSERT (kptllnd_data.kptl_peers != NULL); + for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) + LASSERT (list_empty (&kptllnd_data.kptl_peers[i])); + + read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); + CDEBUG(D_NET, "All peers deleted\n"); + + /* Shutdown phase 2: kill the daemons... */ + kptllnd_data.kptl_shutdown = 2; + mb(); + + i = 2; + while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) { + /* Wake up all threads*/ + wake_up_all(&kptllnd_data.kptl_sched_waitq); + wake_up_all(&kptllnd_data.kptl_watchdog_waitq); + + i++; + CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ + "Waiting for %d threads to terminate\n", + atomic_read(&kptllnd_data.kptl_nthreads)); + cfs_pause(cfs_time_seconds(1)); + } + + CDEBUG(D_NET, "All Threads stopped\n"); + LASSERT(list_empty(&kptllnd_data.kptl_sched_txq)); + + kptllnd_cleanup_tx_descs(); + + /* Nothing here now, but libcfs might soon require + * us to explicitly destroy wait queues and semaphores + * that would be done here */ + + /* fall through */ + + case PTLLND_INIT_NOTHING: + CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n"); + break; + } + + if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) { + prc = PtlEQFree(kptllnd_data.kptl_eqh); + if (prc != PTL_OK) + CERROR("Error %s(%d) freeing portals EQ\n", + kptllnd_errtype2str(prc), prc); + } + + if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) { + prc = PtlNIFini(kptllnd_data.kptl_nih); + if (prc != PTL_OK) + CERROR("Error %s(%d) finalizing portals NI\n", + kptllnd_errtype2str(prc), prc); + } + + LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0); + LASSERT (list_empty(&kptllnd_data.kptl_idle_txs)); + + if (kptllnd_data.kptl_rx_cache != NULL) + cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache); + + if (kptllnd_data.kptl_peers != NULL) + LIBCFS_FREE(kptllnd_data.kptl_peers, + sizeof (struct list_head) * + kptllnd_data.kptl_peer_hash_size); + + if (kptllnd_data.kptl_nak_msg != NULL) + LIBCFS_FREE(kptllnd_data.kptl_nak_msg, + offsetof(kptl_msg_t, ptlm_u)); + + memset(&kptllnd_data, 0, sizeof(kptllnd_data)); + PORTAL_MODULE_UNUSE; + return; +} + int -kptllnd_startup (lnet_ni_t *ni) +kptllnd_base_startup (void) { - int rc; int i; + int rc; int spares; struct timeval tv; + lnet_process_id_t target; ptl_err_t ptl_rc; - LASSERT (ni->ni_lnd == &kptllnd_lnd); - - if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) { - CERROR("Only 1 instance supported\n"); - return -EPERM; - } - if (*kptllnd_tunables.kptl_max_procs_per_node < 1) { CERROR("max_procs_per_node must be >= 1\n"); return -EINVAL; @@ -531,51 +661,40 @@ kptllnd_startup (lnet_ni_t *ni) CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0); CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE); - /* - * zero pointers, flags etc - * put everything into a known state. - */ + /* Zero pointers, flags etc; put everything into a known state. */ memset (&kptllnd_data, 0, sizeof (kptllnd_data)); + + LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u)); + if (kptllnd_data.kptl_nak_msg == NULL) { + CERROR("Can't allocate NAK msg\n"); + return -ENOMEM; + } + memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u)); + kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE; kptllnd_data.kptl_nih = PTL_INVALID_HANDLE; - /* - * Setup the sched locks/lists/waitq - */ + rwlock_init(&kptllnd_data.kptl_net_rw_lock); + INIT_LIST_HEAD(&kptllnd_data.kptl_nets); + + /* Setup the sched locks/lists/waitq */ spin_lock_init(&kptllnd_data.kptl_sched_lock); init_waitqueue_head(&kptllnd_data.kptl_sched_waitq); INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq); INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq); INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq); - /* init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */ + /* Init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */ spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock); - /* - * Setup the tx locks/lists - */ + /* Setup the tx locks/lists */ spin_lock_init(&kptllnd_data.kptl_tx_lock); INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs); atomic_set(&kptllnd_data.kptl_ntx, 0); - /* - * Uptick the module reference count - */ + /* Uptick the module reference count */ PORTAL_MODULE_USE; - /* - * Setup pointers between the ni and context data block - */ - kptllnd_data.kptl_ni = ni; - ni->ni_data = &kptllnd_data; - - /* - * Setup Credits - */ - ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits; - ni->ni_peertxcredits = *kptllnd_tunables.kptl_peertxcredits; - ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits; - kptllnd_data.kptl_expected_peers = *kptllnd_tunables.kptl_max_nodes * *kptllnd_tunables.kptl_max_procs_per_node; @@ -619,9 +738,7 @@ kptllnd_startup (lnet_ni_t *ni) goto failed; } - /* - * Fetch the lower NID - */ + /* Fetch the lower NID */ ptl_rc = PtlGetId(kptllnd_data.kptl_nih, &kptllnd_data.kptl_portals_id); if (ptl_rc != PTL_OK) { @@ -640,12 +757,6 @@ kptllnd_startup (lnet_ni_t *ni) goto failed; } - ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid); - - CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", - kptllnd_ptlid2str(kptllnd_data.kptl_portals_id), - libcfs_nid2str(ni->ni_nid)); - /* Initialized the incarnation - it must be for-all-time unique, even * accounting for the fact that we increment it when we disconnect a * peer that's using it */ @@ -654,20 +765,26 @@ kptllnd_startup (lnet_ni_t *ni) tv.tv_usec; CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation); - /* - * Allocate and setup the peer hash table - */ + target.nid = LNET_NID_ANY; + target.pid = LNET_PID_ANY; /* NB target for NAK doesn't matter */ + kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, target, 0); + kptllnd_data.kptl_nak_msg->ptlm_magic = PTLLND_MSG_MAGIC; + kptllnd_data.kptl_nak_msg->ptlm_version = PTLLND_MSG_VERSION; + kptllnd_data.kptl_nak_msg->ptlm_srcpid = the_lnet.ln_pid; + kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation; + rwlock_init(&kptllnd_data.kptl_peer_rw_lock); init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq); atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0); INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers); INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers); + /* Allocate and setup the peer hash table */ kptllnd_data.kptl_peer_hash_size = *kptllnd_tunables.kptl_peer_hash_table_size; LIBCFS_ALLOC(kptllnd_data.kptl_peers, - (kptllnd_data.kptl_peer_hash_size * - sizeof(struct list_head))); + sizeof(struct list_head) * + kptllnd_data.kptl_peer_hash_size); if (kptllnd_data.kptl_peers == NULL) { CERROR("Failed to allocate space for peer hash table size=%d\n", kptllnd_data.kptl_peer_hash_size); @@ -677,22 +794,6 @@ kptllnd_startup (lnet_ni_t *ni) for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]); - LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u)); - if (kptllnd_data.kptl_nak_msg == NULL) { - CERROR("Can't allocate NAK msg\n"); - rc = -ENOMEM; - goto failed; - } - memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u)); - kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0); - kptllnd_data.kptl_nak_msg->ptlm_magic = PTLLND_MSG_MAGIC; - kptllnd_data.kptl_nak_msg->ptlm_version = PTLLND_MSG_VERSION; - kptllnd_data.kptl_nak_msg->ptlm_srcpid = the_lnet.ln_pid; - kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid; - kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation; - kptllnd_data.kptl_nak_msg->ptlm_dstpid = LNET_PID_ANY; - kptllnd_data.kptl_nak_msg->ptlm_dstnid = LNET_NID_ANY; - kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool); kptllnd_data.kptl_rx_cache = @@ -721,7 +822,7 @@ kptllnd_startup (lnet_ni_t *ni) /* Start the scheduler threads for handling incoming requests. No need * to advance the state because this will be automatically cleaned up - * now that PTLNAT_INIT_DATA state has been entered */ + * now that PTLLND_INIT_DATA state has been entered */ CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED); for (i = 0; i < PTLLND_N_SCHED; i++) { rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i)); @@ -760,11 +861,59 @@ kptllnd_startup (lnet_ni_t *ni) if (*kptllnd_tunables.kptl_checksum) CWARN("Checksumming enabled\n"); - CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n"); + CDEBUG(D_NET, "<<< kptllnd_base_startup SUCCESS\n"); + return 0; + + failed: + CERROR("kptllnd_base_startup failed: %d\n", rc); + kptllnd_base_shutdown(); + return rc; +} + +int +kptllnd_startup (lnet_ni_t *ni) +{ + int rc; + kptl_net_t *net; + + LASSERT (ni->ni_lnd == &kptllnd_lnd); + + if (kptllnd_data.kptl_init == PTLLND_INIT_NOTHING) { + rc = kptllnd_base_startup(); + if (rc != 0) + return rc; + } + + LIBCFS_ALLOC(net, sizeof(*net)); + ni->ni_data = net; + if (net == NULL) { + CERROR("Can't allocate kptl_net_t\n"); + rc = -ENOMEM; + goto failed; + } + memset(net, 0, sizeof(*net)); + net->net_ni = ni; + + ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits; + ni->ni_peertxcredits = *kptllnd_tunables.kptl_peertxcredits; + ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits; + ni->ni_nid = kptllnd_ptl2lnetnid(ni->ni_nid, + kptllnd_data.kptl_portals_id.nid); + CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", + kptllnd_ptlid2str(kptllnd_data.kptl_portals_id), + libcfs_nid2str(ni->ni_nid)); + + /* NB LNET_NIDNET(ptlm_srcnid) of NAK doesn't matter in case of + * multiple NIs */ + kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid; + + atomic_set(&net->net_refcount, 1); + write_lock(&kptllnd_data.kptl_net_rw_lock); + list_add_tail(&net->net_list, &kptllnd_data.kptl_nets); + write_unlock(&kptllnd_data.kptl_net_rw_lock); return 0; failed: - CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc); kptllnd_shutdown(ni); return rc; } @@ -772,139 +921,53 @@ kptllnd_startup (lnet_ni_t *ni) void kptllnd_shutdown (lnet_ni_t *ni) { + kptl_net_t *net = ni->ni_data; int i; - ptl_err_t prc; - lnet_process_id_t process_id; unsigned long flags; + LASSERT (kptllnd_data.kptl_init == PTLLND_INIT_ALL); + CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n", atomic_read (&libcfs_kmemory)); - LASSERT (ni == kptllnd_data.kptl_ni); + if (net == NULL) + goto out; - switch (kptllnd_data.kptl_init) { - default: - LBUG(); + LASSERT (ni == net->net_ni); + LASSERT (!net->net_shutdown); + LASSERT (!list_empty(&net->net_list)); + LASSERT (atomic_read(&net->net_refcount) != 0); + ni->ni_data = NULL; + net->net_ni = NULL; - case PTLLND_INIT_ALL: - case PTLLND_INIT_DATA: - /* Stop receiving */ - kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool); - LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq)); - LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq)); + write_lock(&kptllnd_data.kptl_net_rw_lock); + kptllnd_net_decref(net); + list_del_init(&net->net_list); + write_unlock(&kptllnd_data.kptl_net_rw_lock); - /* Hold peertable lock to interleave cleanly with peer birth/death */ + /* Can't nuke peers here - they are shared among all NIs */ write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); - - LASSERT (kptllnd_data.kptl_shutdown == 0); - kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */ - - /* no new peers possible now */ - write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, - flags); - - /* nuke all existing peers */ - process_id.nid = LNET_NID_ANY; - process_id.pid = LNET_PID_ANY; - kptllnd_peer_del(process_id); - - read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); - - LASSERT (kptllnd_data.kptl_n_active_peers == 0); + net->net_shutdown = 1; /* Order with peer creation */ + write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); i = 2; - while (kptllnd_data.kptl_npeers != 0) { + while (atomic_read(&net->net_refcount) != 0) { i++; CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, - "Waiting for %d peers to terminate\n", - kptllnd_data.kptl_npeers); - - read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, - flags); + "Waiting for %d references to drop\n", + atomic_read(&net->net_refcount)); cfs_pause(cfs_time_seconds(1)); - - read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, - flags); } - LASSERT(list_empty(&kptllnd_data.kptl_closing_peers)); - LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers)); - LASSERT (kptllnd_data.kptl_peers != NULL); - for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) - LASSERT (list_empty (&kptllnd_data.kptl_peers[i])); - - read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); - CDEBUG(D_NET, "All peers deleted\n"); - - /* Shutdown phase 2: kill the daemons... */ - kptllnd_data.kptl_shutdown = 2; - mb(); - - i = 2; - while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) { - /* Wake up all threads*/ - wake_up_all(&kptllnd_data.kptl_sched_waitq); - wake_up_all(&kptllnd_data.kptl_watchdog_waitq); - - i++; - CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ - "Waiting for %d threads to terminate\n", - atomic_read(&kptllnd_data.kptl_nthreads)); - cfs_pause(cfs_time_seconds(1)); - } - - CDEBUG(D_NET, "All Threads stopped\n"); - LASSERT(list_empty(&kptllnd_data.kptl_sched_txq)); - - kptllnd_cleanup_tx_descs(); - - /* Nothing here now, but libcfs might soon require - * us to explicitly destroy wait queues and semaphores - * that would be done here */ - - /* fall through */ - - case PTLLND_INIT_NOTHING: - CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n"); - break; - } - - if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) { - prc = PtlEQFree(kptllnd_data.kptl_eqh); - if (prc != PTL_OK) - CERROR("Error %s(%d) freeing portals EQ\n", - kptllnd_errtype2str(prc), prc); - } - - if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) { - prc = PtlNIFini(kptllnd_data.kptl_nih); - if (prc != PTL_OK) - CERROR("Error %s(%d) finalizing portals NI\n", - kptllnd_errtype2str(prc), prc); - } - - LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0); - LASSERT (list_empty(&kptllnd_data.kptl_idle_txs)); - - if (kptllnd_data.kptl_rx_cache != NULL) - cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache); - - if (kptllnd_data.kptl_peers != NULL) - LIBCFS_FREE (kptllnd_data.kptl_peers, - sizeof (struct list_head) * - kptllnd_data.kptl_peer_hash_size); - - if (kptllnd_data.kptl_nak_msg != NULL) - LIBCFS_FREE (kptllnd_data.kptl_nak_msg, - offsetof(kptl_msg_t, ptlm_u)); - - memset(&kptllnd_data, 0, sizeof(kptllnd_data)); - + LIBCFS_FREE(net, sizeof(*net)); +out: + /* NB no locking since I don't race with writers */ + if (list_empty(&kptllnd_data.kptl_nets)) + kptllnd_base_shutdown(); CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n", atomic_read (&libcfs_kmemory)); - - PORTAL_MODULE_UNUSE; + return; } int __init diff --git a/lnet/klnds/ptllnd/ptllnd.h b/lnet/klnds/ptllnd/ptllnd.h index 40569be..c0283fc 100755 --- a/lnet/klnds/ptllnd/ptllnd.h +++ b/lnet/klnds/ptllnd/ptllnd.h @@ -135,6 +135,7 @@ typedef struct /***********************************************************************/ typedef struct kptl_data kptl_data_t; +typedef struct kptl_net kptl_net_t; typedef struct kptl_rx_buffer kptl_rx_buffer_t; typedef struct kptl_peer kptl_peer_t; @@ -243,7 +244,7 @@ enum kptllnd_peer_state struct kptl_peer { struct list_head peer_list; - atomic_t peer_refcount; /* The current refrences */ + atomic_t peer_refcount; /* The current references */ enum kptllnd_peer_state peer_state; spinlock_t peer_lock; /* serialize */ struct list_head peer_noops; /* PTLLND_MSG_TYPE_NOOP txs */ @@ -271,12 +272,14 @@ struct kptl_data int kptl_init; /* initialisation state */ volatile int kptl_shutdown; /* shut down? */ atomic_t kptl_nthreads; /* # live threads */ - lnet_ni_t *kptl_ni; /* _the_ LND instance */ ptl_handle_ni_t kptl_nih; /* network inteface handle */ ptl_process_id_t kptl_portals_id; /* Portals ID of interface */ __u64 kptl_incarnation; /* which one am I */ ptl_handle_eq_t kptl_eqh; /* Event Queue (EQ) */ + rwlock_t kptl_net_rw_lock; /* serialise... */ + struct list_head kptl_nets; /* kptl_net instances */ + spinlock_t kptl_sched_lock; /* serialise... */ wait_queue_head_t kptl_sched_waitq; /* schedulers sleep here */ struct list_head kptl_sched_txq; /* tx requiring attention */ @@ -306,6 +309,14 @@ struct kptl_data spinlock_t kptl_ptlid2str_lock; /* serialise str ops */ }; +struct kptl_net +{ + struct list_head net_list; /* chain on kptl_data:: kptl_nets */ + lnet_ni_t *net_ni; + atomic_t net_refcount; /* # current references */ + int net_shutdown; /* lnd_shutdown called */ +}; + enum { PTLLND_INIT_NOTHING = 0, @@ -317,14 +328,12 @@ extern kptl_tunables_t kptllnd_tunables; extern kptl_data_t kptllnd_data; static inline lnet_nid_t -kptllnd_ptl2lnetnid(ptl_nid_t ptl_nid) +kptllnd_ptl2lnetnid(lnet_nid_t ni_nid, ptl_nid_t ptl_nid) { #ifdef _USING_LUSTRE_PORTALS_ - return LNET_MKNID(LNET_NIDNET(kptllnd_data.kptl_ni->ni_nid), - LNET_NIDADDR(ptl_nid)); + return LNET_MKNID(LNET_NIDNET(ni_nid), LNET_NIDADDR(ptl_nid)); #else - return LNET_MKNID(LNET_NIDNET(kptllnd_data.kptl_ni->ni_nid), - ptl_nid); + return LNET_MKNID(LNET_NIDNET(ni_nid), ptl_nid); #endif } @@ -466,8 +475,10 @@ int kptllnd_peer_connect(kptl_tx_t *tx, lnet_nid_t nid); void kptllnd_peer_check_sends(kptl_peer_t *peer); void kptllnd_peer_check_bucket(int idx, int stamp); void kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag); -int kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target); -kptl_peer_t *kptllnd_peer_handle_hello(ptl_process_id_t initiator, +int kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target, + kptl_peer_t **peerp); +kptl_peer_t *kptllnd_peer_handle_hello(kptl_net_t *net, + ptl_process_id_t initiator, kptl_msg_t *msg); kptl_peer_t *kptllnd_id2peer_locked(lnet_process_id_t id); void kptllnd_peer_alive(kptl_peer_t *peer); @@ -486,6 +497,20 @@ kptllnd_peer_decref (kptl_peer_t *peer) } static inline void +kptllnd_net_addref (kptl_net_t *net) +{ + LASSERT (atomic_read(&net->net_refcount) > 0); + atomic_inc(&net->net_refcount); +} + +static inline void +kptllnd_net_decref (kptl_net_t *net) +{ + LASSERT (atomic_read(&net->net_refcount) > 0); + atomic_dec(&net->net_refcount); +} + +static inline void kptllnd_set_tx_peer(kptl_tx_t *tx, kptl_peer_t *peer) { LASSERT (tx->tx_peer == NULL); @@ -497,7 +522,9 @@ kptllnd_set_tx_peer(kptl_tx_t *tx, kptl_peer_t *peer) static inline struct list_head * kptllnd_nid2peerlist(lnet_nid_t nid) { - unsigned int hash = ((unsigned int)nid) % + /* Only one copy of peer state for all logical peers, so the net part + * of NIDs is ignored; e.g. A@ptl0 and A@ptl2 share peer state */ + unsigned int hash = ((unsigned int)LNET_NIDADDR(nid)) % kptllnd_data.kptl_peer_hash_size; return &kptllnd_data.kptl_peers[hash]; @@ -543,7 +570,7 @@ int kptllnd_setup_tx_descs(void); void kptllnd_cleanup_tx_descs(void); void kptllnd_tx_fini(kptl_tx_t *tx); void kptllnd_cancel_txlist(struct list_head *peerq, struct list_head *txs); -void kptllnd_restart_txs(lnet_process_id_t target, struct list_head *restarts); +void kptllnd_restart_txs(kptl_net_t *net, lnet_process_id_t id, struct list_head *restarts); kptl_tx_t *kptllnd_get_idle_tx(enum kptl_tx_type purpose); void kptllnd_tx_callback(ptl_event_t *ev); const char *kptllnd_tx_typestr(int type); @@ -566,7 +593,8 @@ kptllnd_tx_decref(kptl_tx_t *tx) /* * MESSAGE SUPPORT FUNCTIONS */ -void kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob); +void kptllnd_init_msg(kptl_msg_t *msg, int type, + lnet_process_id_t target, int body_nob); void kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer); int kptllnd_msg_unpack(kptl_msg_t *msg, int nob); diff --git a/lnet/klnds/ptllnd/ptllnd_cb.c b/lnet/klnds/ptllnd/ptllnd_cb.c index 8acf9d0..8a0d67a 100644 --- a/lnet/klnds/ptllnd/ptllnd_cb.c +++ b/lnet/klnds/ptllnd/ptllnd_cb.c @@ -320,22 +320,27 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; + kptl_net_t *net = ni->ni_data; kptl_peer_t *peer; kptl_tx_t *tx; int nob; int nfrag; int rc; + LASSERT (net->net_ni == ni); + LASSERT (!net->net_shutdown); LASSERT (payload_nob == 0 || payload_niov > 0); LASSERT (payload_niov <= LNET_MAX_IOV); LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); LASSERT (!in_interrupt()); - rc = kptllnd_find_target(&peer, target); + rc = kptllnd_find_target(net, target, &peer); if (rc != 0) return rc; + /* NB peer->peer_id does NOT always equal target, be careful with + * which one to use */ switch (type) { default: LBUG(); @@ -365,7 +370,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) tx->tx_lnet_msg = lntmsg; tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr; kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT, - sizeof(kptl_rdma_msg_t)); + target, sizeof(kptl_rdma_msg_t)); CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n", libcfs_id2str(target), @@ -394,8 +399,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) goto out; } - tx->tx_lnet_replymsg = - lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg); + tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg); if (tx->tx_lnet_replymsg == NULL) { CERROR("Failed to allocate LNET reply for %s\n", libcfs_id2str(target)); @@ -416,7 +420,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) tx->tx_lnet_msg = lntmsg; tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr; kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET, - sizeof(kptl_rdma_msg_t)); + target, sizeof(kptl_rdma_msg_t)); CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n", libcfs_id2str(target), @@ -468,7 +472,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) } nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]); - kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob); + kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob); CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n", libcfs_id2str(target), diff --git a/lnet/klnds/ptllnd/ptllnd_peer.c b/lnet/klnds/ptllnd/ptllnd_peer.c index 641a73c..5d659d8 100644 --- a/lnet/klnds/ptllnd/ptllnd_peer.c +++ b/lnet/klnds/ptllnd/ptllnd_peer.c @@ -74,7 +74,6 @@ kptllnd_get_peer_info(int index, read_lock_irqsave(g_lock, flags); for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) { - list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) { peer = list_entry(ptmp, kptl_peer_t, peer_list); @@ -112,7 +111,6 @@ kptllnd_get_peer_info(int index, void kptllnd_peer_add_peertable_locked (kptl_peer_t *peer) { - LASSERT (!kptllnd_data.kptl_shutdown); LASSERT (kptllnd_data.kptl_n_active_peers < kptllnd_data.kptl_expected_peers); @@ -146,7 +144,7 @@ kptllnd_cull_peertable_locked (lnet_process_id_t pid) * in MRU order */ peer = list_entry(tmp, kptl_peer_t, peer_list); - if (peer->peer_id.nid != pid.nid) + if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid)) continue; LASSERT (peer->peer_id.pid != pid.pid); @@ -165,7 +163,7 @@ kptllnd_cull_peertable_locked (lnet_process_id_t pid) } kptl_peer_t * -kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid) +kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid) { unsigned long flags; kptl_peer_t *peer; @@ -204,16 +202,14 @@ kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid) /* Only increase # peers under lock, to guarantee we dont grow it * during shutdown */ - if (kptllnd_data.kptl_shutdown) { - write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, - flags); + if (net->net_shutdown) { + write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); LIBCFS_FREE(peer, sizeof(*peer)); return NULL; } kptllnd_data.kptl_npeers++; write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); - return peer; } @@ -288,8 +284,12 @@ void kptllnd_peer_notify (kptl_peer_t *peer) { unsigned long flags; - time_t last_alive = 0; + kptl_net_t *net; + kptl_net_t **nets; + int i = 0; + int nnets = 0; int error = 0; + time_t last_alive = 0; spin_lock_irqsave(&peer->peer_lock, flags); @@ -304,9 +304,51 @@ kptllnd_peer_notify (kptl_peer_t *peer) spin_unlock_irqrestore(&peer->peer_lock, flags); - if (error != 0) - lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0, - last_alive); + if (error == 0) + return; + + read_lock(&kptllnd_data.kptl_net_rw_lock); + list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) + nnets++; + read_unlock(&kptllnd_data.kptl_net_rw_lock); + + if (nnets == 0) /* shutdown in progress */ + return; + + LIBCFS_ALLOC(nets, nnets * sizeof(*nets)); + if (nets == NULL) { + CERROR("Failed to allocate nets[%d]\n", nnets); + return; + } + memset(nets, 0, nnets * sizeof(*nets)); + + read_lock(&kptllnd_data.kptl_net_rw_lock); + i = 0; + list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) { + LASSERT (i < nnets); + nets[i] = net; + kptllnd_net_addref(net); + i++; + } + read_unlock(&kptllnd_data.kptl_net_rw_lock); + + for (i = 0; i < nnets; i++) { + lnet_nid_t peer_nid; + + net = nets[i]; + if (net == NULL) + break; + + if (!net->net_shutdown) { + peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid, + peer->peer_ptlid.nid); + lnet_notify(net->net_ni, peer_nid, 0, last_alive); + } + + kptllnd_net_decref(net); + } + + LIBCFS_FREE(nets, nnets * sizeof(*nets)); } void @@ -462,7 +504,7 @@ again: peer = list_entry (ptmp, kptl_peer_t, peer_list); if (!(id.nid == LNET_NID_ANY || - (peer->peer_id.nid == id.nid && + (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) && (id.pid == LNET_PID_ANY || peer->peer_id.pid == id.pid)))) continue; @@ -565,7 +607,7 @@ kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag) /* NB "restarts" comes from peer_sendq of a single peer */ void -kptllnd_restart_txs (lnet_process_id_t target, struct list_head *restarts) +kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target, struct list_head *restarts) { kptl_tx_t *tx; kptl_tx_t *tmp; @@ -573,7 +615,7 @@ kptllnd_restart_txs (lnet_process_id_t target, struct list_head *restarts) LASSERT (!list_empty(restarts)); - if (kptllnd_find_target(&peer, target) != 0) + if (kptllnd_find_target(net, target, &peer) != 0) peer = NULL; list_for_each_entry_safe (tx, tmp, restarts, tx_list) { @@ -643,7 +685,8 @@ kptllnd_peer_check_sends (kptl_peer_t *peer) CERROR("Can't return credits to %s: can't allocate descriptor\n", libcfs_id2str(peer->peer_id)); } else { - kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0); + kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, + peer->peer_id, 0); kptllnd_post_tx(peer, tx, 0); } @@ -855,13 +898,8 @@ void kptllnd_peer_check_bucket (int idx, int stamp) { struct list_head *peers = &kptllnd_data.kptl_peers[idx]; - struct list_head *ptmp; kptl_peer_t *peer; - kptl_tx_t *tx; unsigned long flags; - int nsend; - int nactive; - int check_sends; CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp); @@ -869,8 +907,12 @@ kptllnd_peer_check_bucket (int idx, int stamp) /* NB. Shared lock while I just look */ read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); - list_for_each (ptmp, peers) { - peer = list_entry (ptmp, kptl_peer_t, peer_list); + list_for_each_entry (peer, peers, peer_list) { + kptl_tx_t *tx; + int check_sends; + int c = -1, oc = -1, sc = -1; + int nsend = -1, nactive = -1; + int sent_hello = -1, state = -1; CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n", libcfs_id2str(peer->peer_id), peer->peer_credits, @@ -888,6 +930,16 @@ kptllnd_peer_check_bucket (int idx, int stamp) tx = kptllnd_find_timed_out_tx(peer); check_sends = peer->peer_retry_noop; + if (tx != NULL) { + c = peer->peer_credits; + sc = peer->peer_sent_credits; + oc = peer->peer_outstanding_credits; + state = peer->peer_state; + sent_hello = peer->peer_sent_hello; + nsend = kptllnd_count_queue(&peer->peer_sendq); + nactive = kptllnd_count_queue(&peer->peer_activeq); + } + spin_unlock(&peer->peer_lock); if (tx == NULL && !check_sends) @@ -905,11 +957,6 @@ kptllnd_peer_check_bucket (int idx, int stamp) goto again; } - spin_lock_irqsave(&peer->peer_lock, flags); - nsend = kptllnd_count_queue(&peer->peer_sendq); - nactive = kptllnd_count_queue(&peer->peer_activeq); - spin_unlock_irqrestore(&peer->peer_lock, flags); - LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n", libcfs_id2str(peer->peer_id), (tx->tx_tposted == 0) ? @@ -922,6 +969,11 @@ kptllnd_peer_check_bucket (int idx, int stamp) libcfs_id2str(peer->peer_id), *kptllnd_tunables.kptl_timeout, cfs_duration_sec(jiffies - tx->tx_tposted)); + } else if (state < PEER_STATE_ACTIVE) { + CERROR("Could not connect %s (%d) after %ds; " + "peer might be down\n", + libcfs_id2str(peer->peer_id), state, + *kptllnd_tunables.kptl_timeout); } else { CERROR("Could not get credits for %s after %ds; " "possible Lustre networking issues\n", @@ -930,11 +982,12 @@ kptllnd_peer_check_bucket (int idx, int stamp) } CERROR("%s timed out: cred %d outstanding %d, sent %d, " - "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d " - "%sposted %lu T/O %ds\n", - libcfs_id2str(peer->peer_id), peer->peer_credits, - peer->peer_outstanding_credits, peer->peer_sent_credits, - nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type), + "state %d, sent_hello %d, sendq %d, activeq %d " + "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n", + libcfs_id2str(peer->peer_id), c, oc, sc, + state, sent_hello, nsend, nactive, + tx, kptllnd_tx_typestr(tx->tx_type), + kptllnd_msgtype2str(tx->tx_msg->ptlm_type), tx->tx_active ? "A" : "", PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ? "" : "M", @@ -970,20 +1023,20 @@ kptllnd_id2peer_locked (lnet_process_id_t id) kptl_peer_t *peer; list_for_each (tmp, peers) { - peer = list_entry (tmp, kptl_peer_t, peer_list); LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO || peer->peer_state == PEER_STATE_ACTIVE); - - if (peer->peer_id.nid != id.nid || - peer->peer_id.pid != id.pid) + + /* NB logical LNet peers share one kptl_peer_t */ + if (peer->peer_id.pid != id.pid || + LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid)) continue; kptllnd_peer_addref(peer); CDEBUG(D_NET, "%s -> %s (%d)\n", - libcfs_id2str(id), + libcfs_id2str(id), kptllnd_ptlid2str(peer->peer_ptlid), atomic_read (&peer->peer_refcount)); return peer; @@ -1025,25 +1078,25 @@ kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid) list_for_each (tmp, &kptllnd_data.kptl_closing_peers) { peer = list_entry (tmp, kptl_peer_t, peer_list); - if (peer->peer_id.nid == lpid.nid && + if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) && peer->peer_id.pid == lpid.pid) return peer->peer_last_matchbits_seen; } - + list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) { peer = list_entry (tmp, kptl_peer_t, peer_list); - if (peer->peer_id.nid == lpid.nid && + if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) && peer->peer_id.pid == lpid.pid) return peer->peer_last_matchbits_seen; } - + return PTL_RESERVED_MATCHBITS; } kptl_peer_t * -kptllnd_peer_handle_hello (ptl_process_id_t initiator, - kptl_msg_t *msg) +kptllnd_peer_handle_hello (kptl_net_t *net, + ptl_process_id_t initiator, kptl_msg_t *msg) { rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock; kptl_peer_t *peer; @@ -1156,6 +1209,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, msg->ptlm_srcstamp, peer->peer_incarnation); kptllnd_peer_decref(peer); + peer = NULL; } hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE); @@ -1166,9 +1220,9 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, } kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO, - sizeof(kptl_hello_msg_t)); + lpid, sizeof(kptl_hello_msg_t)); - new_peer = kptllnd_peer_allocate(lpid, initiator); + new_peer = kptllnd_peer_allocate(net, lpid, initiator); if (new_peer == NULL) { kptllnd_tx_decref(hello_tx); return NULL; @@ -1187,7 +1241,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, write_lock_irqsave(g_lock, flags); again: - if (kptllnd_data.kptl_shutdown) { + if (net->net_shutdown) { write_unlock_irqrestore(g_lock, flags); CERROR ("Shutdown started, refusing connection from %s\n", @@ -1268,6 +1322,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t initiator, new_peer->peer_last_matchbits_seen = last_matchbits_seen; new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size; + LASSERT (!net->net_shutdown); kptllnd_peer_add_peertable_locked(new_peer); write_unlock_irqrestore(g_lock, flags); @@ -1292,7 +1347,8 @@ kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag) } int -kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target) +kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target, + kptl_peer_t **peerp) { rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock; ptl_process_id_t ptl_id; @@ -1316,8 +1372,8 @@ kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target) return -EHOSTUNREACH; } - /* The new peer is a kernel ptllnd, and kernel ptllnds all have - * the same portals PID */ + /* The new peer is a kernel ptllnd, and kernel ptllnds all have the + * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */ ptl_id.nid = kptllnd_lnet2ptlnid(target.nid); ptl_id.pid = kptllnd_data.kptl_portals_id.pid; @@ -1330,9 +1386,9 @@ kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target) hello_tx->tx_acked = 1; kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO, - sizeof(kptl_hello_msg_t)); + target, sizeof(kptl_hello_msg_t)); - new_peer = kptllnd_peer_allocate(target, ptl_id); + new_peer = kptllnd_peer_allocate(net, target, ptl_id); if (new_peer == NULL) { rc = -ENOMEM; goto unwind_0; @@ -1344,11 +1400,8 @@ kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target) write_lock_irqsave(g_lock, flags); again: - if (kptllnd_data.kptl_shutdown) { - write_unlock_irqrestore(g_lock, flags); - rc = -ESHUTDOWN; - goto unwind_2; - } + /* Called only in lnd_send which can't happen after lnd_shutdown */ + LASSERT (!net->net_shutdown); *peerp = kptllnd_id2peer_locked(target); if (*peerp != NULL) { diff --git a/lnet/klnds/ptllnd/ptllnd_rx_buf.c b/lnet/klnds/ptllnd/ptllnd_rx_buf.c index 90c654a..cacd1257 100644 --- a/lnet/klnds/ptllnd/ptllnd_rx_buf.c +++ b/lnet/klnds/ptllnd/ptllnd_rx_buf.c @@ -299,7 +299,7 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb) * Setup MD */ md.start = rxb->rxb_buffer; - md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages; + md.length = kptllnd_rx_buffer_size(); md.threshold = PTL_MD_THRESH_INF; md.options = PTL_MD_OP_PUT | PTL_MD_LUSTRE_COMPLETION_SEMANTICS | @@ -505,7 +505,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev) } void -kptllnd_nak (kptl_rx_t *rx) +kptllnd_nak (ptl_process_id_t dest) { /* Fire-and-forget a stub message that will let the peer know my * protocol magic/version and make her drop/refresh any peer state she @@ -523,34 +523,53 @@ kptllnd_nak (kptl_rx_t *rx) rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh); if (rc != PTL_OK) { CWARN("Can't NAK %s: bind failed %s(%d)\n", - kptllnd_ptlid2str(rx->rx_initiator), - kptllnd_errtype2str(rc), rc); + kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc); return; } - rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator, + rc = PtlPut(mdh, PTL_NOACK_REQ, dest, *kptllnd_tunables.kptl_portal, 0, LNET_MSG_MATCHBITS, 0, 0); - if (rc != PTL_OK) { CWARN("Can't NAK %s: put failed %s(%d)\n", - kptllnd_ptlid2str(rx->rx_initiator), - kptllnd_errtype2str(rc), rc); + kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc); kptllnd_schedule_ptltrace_dump(); } } +kptl_net_t * +kptllnd_find_net (lnet_nid_t nid) +{ + kptl_net_t *net; + + read_lock(&kptllnd_data.kptl_net_rw_lock); + list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) { + LASSERT (!net->net_shutdown); + + if (net->net_ni->ni_nid == nid) { + kptllnd_net_addref(net); + read_unlock(&kptllnd_data.kptl_net_rw_lock); + return net; + } + } + read_unlock(&kptllnd_data.kptl_net_rw_lock); + + return NULL; +} + void kptllnd_rx_parse(kptl_rx_t *rx) { kptl_msg_t *msg = rx->rx_msg; int rc = 0; int post_credit = PTLLND_POSTRX_PEER_CREDIT; + kptl_net_t *net = NULL; kptl_peer_t *peer; struct list_head txs; unsigned long flags; lnet_process_id_t srcid; + LASSERT (!in_interrupt()); LASSERT (rx->rx_peer == NULL); INIT_LIST_HEAD(&txs); @@ -570,7 +589,8 @@ kptllnd_rx_parse(kptl_rx_t *rx) (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ? msg->ptlm_version : __swab16(msg->ptlm_version)), PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator)); - kptllnd_nak(rx); + /* NB backward compatibility */ + kptllnd_nak(rx->rx_initiator); goto rx_done; } @@ -590,8 +610,8 @@ kptllnd_rx_parse(kptl_rx_t *rx) jiffies - rx->rx_treceived, cfs_duration_sec(jiffies - rx->rx_treceived)); - if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) { - CERROR("Bad source id %s from %s\n", + if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) { + CERROR("Bad source nid %s from %s\n", libcfs_id2str(srcid), kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; @@ -619,21 +639,28 @@ kptllnd_rx_parse(kptl_rx_t *rx) goto failed; } - if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid || - msg->ptlm_dstpid != the_lnet.ln_pid) { - CERROR("Bad dstid %s (expected %s) from %s\n", + net = kptllnd_find_net(msg->ptlm_dstnid); + if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) { + CERROR("Bad dstid %s from %s\n", libcfs_id2str((lnet_process_id_t) { .nid = msg->ptlm_dstnid, .pid = msg->ptlm_dstpid}), - libcfs_id2str((lnet_process_id_t) { - .nid = kptllnd_data.kptl_ni->ni_nid, - .pid = the_lnet.ln_pid}), kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; } + if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) { + lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid), + LNET_NIDADDR(srcid.nid)); + CERROR("Bad source nid %s from %s, %s expected.\n", + libcfs_id2str(srcid), + kptllnd_ptlid2str(rx->rx_initiator), + libcfs_nid2str(nid)); + goto rx_done; + } + if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) { - peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg); + peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg); if (peer == NULL) goto rx_done; } else { @@ -643,7 +670,7 @@ kptllnd_rx_parse(kptl_rx_t *rx) kptllnd_msgtype2str(msg->ptlm_type), libcfs_id2str(srcid)); /* NAK to make the peer reconnect */ - kptllnd_nak(rx); + kptllnd_nak(rx->rx_initiator); goto rx_done; } @@ -671,7 +698,7 @@ kptllnd_rx_parse(kptl_rx_t *rx) CWARN("NAK %s: Unexpected %s message\n", libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type)); - kptllnd_nak(rx); + kptllnd_nak(rx->rx_initiator); rc = -EPROTO; goto failed; } @@ -687,8 +714,12 @@ kptllnd_rx_parse(kptl_rx_t *rx) } } - LASSERT (msg->ptlm_srcnid == peer->peer_id.nid && - msg->ptlm_srcpid == peer->peer_id.pid); + LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) == + LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n", + libcfs_nid2str(msg->ptlm_srcnid), + libcfs_nid2str(peer->peer_id.nid)); + LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n", + msg->ptlm_srcpid, peer->peer_id.pid); spin_lock_irqsave(&peer->peer_lock, flags); @@ -743,12 +774,14 @@ kptllnd_rx_parse(kptl_rx_t *rx) case PTLLND_MSG_TYPE_IMMEDIATE: CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n"); - rc = lnet_parse(kptllnd_data.kptl_ni, + rc = lnet_parse(net->net_ni, &msg->ptlm_u.immediate.kptlim_hdr, msg->ptlm_srcnid, rx, 0); - if (rc >= 0) /* kptllnd_recv owns 'rx' now */ + if (rc >= 0) { /* kptllnd_recv owns 'rx' now */ + kptllnd_net_decref(net); return; + } goto failed; case PTLLND_MSG_TYPE_PUT: @@ -771,12 +804,14 @@ kptllnd_rx_parse(kptl_rx_t *rx) spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags); - rc = lnet_parse(kptllnd_data.kptl_ni, + rc = lnet_parse(net->net_ni, &msg->ptlm_u.rdma.kptlrm_hdr, msg->ptlm_srcnid, rx, 1); - if (rc >= 0) /* kptllnd_recv owns 'rx' now */ + if (rc >= 0) { /* kptllnd_recv owns 'rx' now */ + kptllnd_net_decref(net); return; + } goto failed; } @@ -785,8 +820,12 @@ kptllnd_rx_parse(kptl_rx_t *rx) kptllnd_peer_close(peer, rc); if (rx->rx_peer == NULL) /* drop ref on peer */ kptllnd_peer_decref(peer); /* unless rx_done will */ - if (!list_empty(&txs)) - kptllnd_restart_txs(srcid, &txs); + if (!list_empty(&txs)) { + LASSERT (net != NULL); + kptllnd_restart_txs(net, srcid, &txs); + } rx_done: + if (net != NULL) + kptllnd_net_decref(net); kptllnd_rx_done(rx, post_credit); } diff --git a/lnet/klnds/ptllnd/ptllnd_tx.c b/lnet/klnds/ptllnd/ptllnd_tx.c index 96e350b..10be464 100644 --- a/lnet/klnds/ptllnd/ptllnd_tx.c +++ b/lnet/klnds/ptllnd/ptllnd_tx.c @@ -79,6 +79,7 @@ kptllnd_alloc_tx(void) tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA; tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG; tx->tx_msg = NULL; + tx->tx_peer = NULL; tx->tx_frags = NULL; LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg)); @@ -108,14 +109,11 @@ kptllnd_setup_tx_descs() for (i = 0; i < n; i++) { kptl_tx_t *tx = kptllnd_alloc_tx(); - if (tx == NULL) return -ENOMEM; spin_lock(&kptllnd_data.kptl_tx_lock); - list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs); - spin_unlock(&kptllnd_data.kptl_tx_lock); } @@ -365,11 +363,10 @@ kptllnd_tx_fini (kptl_tx_t *tx) /* Must finalize AFTER freeing 'tx' */ if (msg != NULL) - lnet_finalize(kptllnd_data.kptl_ni, msg, - (replymsg == NULL) ? status : 0); + lnet_finalize(NULL, msg, (replymsg == NULL) ? status : 0); if (replymsg != NULL) - lnet_finalize(kptllnd_data.kptl_ni, replymsg, status); + lnet_finalize(NULL, replymsg, status); if (peer != NULL) kptllnd_peer_decref(peer); @@ -456,8 +453,7 @@ kptllnd_tx_callback(ptl_event_t *ev) if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) { if (ev->hdr_data == PTLLND_RDMA_OK) { - lnet_set_reply_msg_len( - kptllnd_data.kptl_ni, + lnet_set_reply_msg_len(NULL, tx->tx_lnet_replymsg, ev->mlength); } else {