X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Fklnds%2Fptllnd%2Fptllnd.c;h=717dd47703a7aa9a3c4cd65f6017624320069461;hb=d016a086b68ca4af2dcbfefc3917b9291efd8d62;hp=f9361f900d0dc8a74ae2d198d31c3fb88b726489;hpb=c78cfaaa580da8d2c883c38e8baa493c302c3264;p=fs%2Flustre-release.git diff --git a/lnet/klnds/ptllnd/ptllnd.c b/lnet/klnds/ptllnd/ptllnd.c index f9361f9..717dd47 100755 --- a/lnet/klnds/ptllnd/ptllnd.c +++ b/lnet/klnds/ptllnd/ptllnd.c @@ -1,19 +1,41 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. - * Author: PJ Kirner + * GPL HEADER START * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is confidential source code owned by Cluster File Systems. - * No viewing, modification, compilation, redistribution, or any other - * form of use is permitted except through a signed license agreement. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * If you have not signed such an agreement, then you have no rights to - * this file. Please destroy it immediately and contact CFS. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lnet/klnds/ptllnd/ptllnd.c + * + * Author: PJ Kirner */ #include "ptllnd.h" @@ -23,6 +45,7 @@ lnd_t kptllnd_lnd = { .lnd_startup = kptllnd_startup, .lnd_shutdown = kptllnd_shutdown, .lnd_ctl = kptllnd_ctl, + .lnd_query = kptllnd_query, .lnd_send = kptllnd_send, .lnd_recv = kptllnd_recv, .lnd_eager_recv = kptllnd_eager_recv, @@ -39,11 +62,11 @@ kptllnd_ptlid2str(ptl_process_id_t id) unsigned long flags; char *str; - spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags); + cfs_spin_lock_irqsave(&kptllnd_data.kptl_ptlid2str_lock, flags); str = strs[idx++]; if (idx >= sizeof(strs)/sizeof(strs[0])) idx = 0; - spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags); + cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_ptlid2str_lock, flags); snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid); return str; @@ -224,10 +247,16 @@ kptllnd_cksum (void *ptr, int nob) } void -kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob) +kptllnd_init_msg(kptl_msg_t *msg, int type, + lnet_process_id_t target, int body_nob) { msg->ptlm_type = type; msg->ptlm_nob = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7; + msg->ptlm_dstpid = target.pid; + msg->ptlm_dstnid = target.nid; + msg->ptlm_srcpid = the_lnet.ln_pid; + msg->ptlm_srcnid = kptllnd_ptl2lnetnid(target.nid, + kptllnd_data.kptl_portals_id.nid); LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size); } @@ -241,12 +270,9 @@ kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer) msg->ptlm_credits = peer->peer_outstanding_credits; /* msg->ptlm_nob Filled in kptllnd_init_msg() */ msg->ptlm_cksum = 0; - msg->ptlm_srcnid = kptllnd_data.kptl_ni->ni_nid; + /* msg->ptlm_{src|dst}[pn]id Filled in kptllnd_init_msg */ msg->ptlm_srcstamp = peer->peer_myincarnation; - msg->ptlm_dstnid = peer->peer_id.nid; msg->ptlm_dststamp = peer->peer_incarnation; - msg->ptlm_srcpid = the_lnet.ln_pid; - msg->ptlm_dstpid = peer->peer_id.pid; if (*kptllnd_tunables.kptl_checksum) { /* NB ptlm_cksum zero while computing cksum */ @@ -388,6 +414,7 @@ kptllnd_msg_unpack(kptl_msg_t *msg, int nob) int kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) { + kptl_net_t *net = ni->ni_data; struct libcfs_ioctl_data *data = arg; int rc = -EINVAL; @@ -397,7 +424,7 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) * Validate that the context block is actually * pointing to this interface */ - LASSERT (ni == kptllnd_data.kptl_ni); + LASSERT (ni == net->net_ni); switch(cmd) { case IOC_LIBCFS_DEL_PEER: { @@ -454,24 +481,176 @@ kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg) return rc; } +void +kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, cfs_time_t *when) +{ + kptl_net_t *net = ni->ni_data; + kptl_peer_t *peer = NULL; + lnet_process_id_t id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID}; + unsigned long flags; + + /* NB: kptllnd_find_target connects to peer if necessary */ + if (kptllnd_find_target(net, id, &peer) != 0) + return; + + cfs_spin_lock_irqsave(&peer->peer_lock, flags); + if (peer->peer_last_alive != 0) + *when = peer->peer_last_alive; + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); + kptllnd_peer_decref(peer); + return; +} + +void +kptllnd_base_shutdown (void) +{ + int i; + ptl_err_t prc; + unsigned long flags; + lnet_process_id_t process_id; + + cfs_read_lock(&kptllnd_data.kptl_net_rw_lock); + LASSERT (cfs_list_empty(&kptllnd_data.kptl_nets)); + cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock); + + switch (kptllnd_data.kptl_init) { + default: + LBUG(); + + case PTLLND_INIT_ALL: + case PTLLND_INIT_DATA: + /* stop receiving */ + kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool); + LASSERT (cfs_list_empty(&kptllnd_data.kptl_sched_rxq)); + LASSERT (cfs_list_empty(&kptllnd_data.kptl_sched_rxbq)); + + /* lock to interleave cleanly with peer birth/death */ + cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); + LASSERT (kptllnd_data.kptl_shutdown == 0); + kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */ + /* no new peers possible now */ + cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, + flags); + + /* nuke all existing peers */ + process_id.nid = LNET_NID_ANY; + process_id.pid = LNET_PID_ANY; + kptllnd_peer_del(process_id); + + cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); + + LASSERT (kptllnd_data.kptl_n_active_peers == 0); + + i = 2; + while (kptllnd_data.kptl_npeers != 0) { + i++; + CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, + "Waiting for %d peers to terminate\n", + kptllnd_data.kptl_npeers); + + cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, + flags); + + cfs_pause(cfs_time_seconds(1)); + + cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, + flags); + } + + LASSERT (cfs_list_empty(&kptllnd_data.kptl_closing_peers)); + LASSERT (cfs_list_empty(&kptllnd_data.kptl_zombie_peers)); + LASSERT (kptllnd_data.kptl_peers != NULL); + for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) + LASSERT (cfs_list_empty (&kptllnd_data.kptl_peers[i])); + + cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, + flags); + CDEBUG(D_NET, "All peers deleted\n"); + + /* Shutdown phase 2: kill the daemons... */ + kptllnd_data.kptl_shutdown = 2; + cfs_mb(); + + i = 2; + while (cfs_atomic_read (&kptllnd_data.kptl_nthreads) != 0) { + /* Wake up all threads*/ + cfs_waitq_broadcast(&kptllnd_data.kptl_sched_waitq); + cfs_waitq_broadcast(&kptllnd_data.kptl_watchdog_waitq); + + i++; + CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ + "Waiting for %d threads to terminate\n", + cfs_atomic_read(&kptllnd_data.kptl_nthreads)); + cfs_pause(cfs_time_seconds(1)); + } + + CDEBUG(D_NET, "All Threads stopped\n"); + LASSERT(cfs_list_empty(&kptllnd_data.kptl_sched_txq)); + + kptllnd_cleanup_tx_descs(); + + /* Nothing here now, but libcfs might soon require + * us to explicitly destroy wait queues and semaphores + * that would be done here */ + + /* fall through */ + + case PTLLND_INIT_NOTHING: + CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n"); + break; + } + + if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) { + prc = PtlEQFree(kptllnd_data.kptl_eqh); + if (prc != PTL_OK) + CERROR("Error %s(%d) freeing portals EQ\n", + kptllnd_errtype2str(prc), prc); + } + + if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) { + prc = PtlNIFini(kptllnd_data.kptl_nih); + if (prc != PTL_OK) + CERROR("Error %s(%d) finalizing portals NI\n", + kptllnd_errtype2str(prc), prc); + } + + LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0); + LASSERT (cfs_list_empty(&kptllnd_data.kptl_idle_txs)); + + if (kptllnd_data.kptl_rx_cache != NULL) + cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache); + + if (kptllnd_data.kptl_peers != NULL) + LIBCFS_FREE(kptllnd_data.kptl_peers, + sizeof (cfs_list_t) * + kptllnd_data.kptl_peer_hash_size); + + if (kptllnd_data.kptl_nak_msg != NULL) + LIBCFS_FREE(kptllnd_data.kptl_nak_msg, + offsetof(kptl_msg_t, ptlm_u)); + + memset(&kptllnd_data, 0, sizeof(kptllnd_data)); + PORTAL_MODULE_UNUSE; + return; +} + int -kptllnd_startup (lnet_ni_t *ni) +kptllnd_base_startup (void) { - int rc; int i; + int rc; int spares; struct timeval tv; + lnet_process_id_t target; ptl_err_t ptl_rc; - LASSERT (ni->ni_lnd == &kptllnd_lnd); - - if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) { - CERROR("Only 1 instance supported\n"); - return -EPERM; + if (*kptllnd_tunables.kptl_max_procs_per_node < 1) { + CERROR("max_procs_per_node must be >= 1\n"); + return -EINVAL; } - if (*kptllnd_tunables.kptl_max_procs_per_node < 1) { - CERROR("max_procs_per_node must be > 1\n"); + if (*kptllnd_tunables.kptl_peertxcredits > PTLLND_MSG_MAX_CREDITS) { + CERROR("peercredits must be <= %d\n", PTLLND_MSG_MAX_CREDITS); return -EINVAL; } @@ -482,35 +661,44 @@ kptllnd_startup (lnet_ni_t *ni) CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0); CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE); - /* - * zero pointers, flags etc - * put everything into a known state. - */ + /* Zero pointers, flags etc; put everything into a known state. */ memset (&kptllnd_data, 0, sizeof (kptllnd_data)); + + LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u)); + if (kptllnd_data.kptl_nak_msg == NULL) { + CERROR("Can't allocate NAK msg\n"); + return -ENOMEM; + } + memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u)); + kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE; kptllnd_data.kptl_nih = PTL_INVALID_HANDLE; - /* - * Uptick the module reference count - */ - PORTAL_MODULE_USE; + cfs_rwlock_init(&kptllnd_data.kptl_net_rw_lock); + CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_nets); - /* - * Setup pointers between the ni and context data block - */ - kptllnd_data.kptl_ni = ni; - ni->ni_data = &kptllnd_data; + /* Setup the sched locks/lists/waitq */ + cfs_spin_lock_init(&kptllnd_data.kptl_sched_lock); + cfs_waitq_init(&kptllnd_data.kptl_sched_waitq); + CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq); + CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq); + CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq); - /* - * Setup Credits - */ - ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits; - ni->ni_peertxcredits = *kptllnd_tunables.kptl_peercredits; + /* Init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */ + cfs_spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock); + + /* Setup the tx locks/lists */ + cfs_spin_lock_init(&kptllnd_data.kptl_tx_lock); + CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs); + cfs_atomic_set(&kptllnd_data.kptl_ntx, 0); + + /* Uptick the module reference count */ + PORTAL_MODULE_USE; kptllnd_data.kptl_expected_peers = *kptllnd_tunables.kptl_max_nodes * *kptllnd_tunables.kptl_max_procs_per_node; - + /* * Initialize the Network interface instance * We use the default because we don't have any @@ -532,7 +720,8 @@ kptllnd_startup (lnet_ni_t *ni) * Which is ok. */ if (ptl_rc != PTL_OK && ptl_rc != PTL_IFACE_DUP) { - CERROR ("PtlNIInit: error %d\n", ptl_rc); + CERROR ("PtlNIInit: error %s(%d)\n", + kptllnd_errtype2str(ptl_rc), ptl_rc); rc = -EINVAL; goto failed; } @@ -543,18 +732,18 @@ kptllnd_startup (lnet_ni_t *ni) kptllnd_eq_callback, /* handler callback */ &kptllnd_data.kptl_eqh); /* output handle */ if (ptl_rc != PTL_OK) { - CERROR("PtlEQAlloc failed %d\n", ptl_rc); + CERROR("PtlEQAlloc failed %s(%d)\n", + kptllnd_errtype2str(ptl_rc), ptl_rc); rc = -ENOMEM; goto failed; } - /* - * Fetch the lower NID - */ + /* Fetch the lower NID */ ptl_rc = PtlGetId(kptllnd_data.kptl_nih, &kptllnd_data.kptl_portals_id); if (ptl_rc != PTL_OK) { - CERROR ("PtlGetID: error %d\n", ptl_rc); + CERROR ("PtlGetID: error %s(%d)\n", + kptllnd_errtype2str(ptl_rc), ptl_rc); rc = -EINVAL; goto failed; } @@ -568,51 +757,34 @@ kptllnd_startup (lnet_ni_t *ni) goto failed; } - ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid); - - CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", - kptllnd_ptlid2str(kptllnd_data.kptl_portals_id), - libcfs_nid2str(ni->ni_nid)); - /* Initialized the incarnation - it must be for-all-time unique, even * accounting for the fact that we increment it when we disconnect a * peer that's using it */ - do_gettimeofday(&tv); + cfs_gettimeofday(&tv); kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation); - /* - * Setup the sched locks/lists/waitq - */ - spin_lock_init(&kptllnd_data.kptl_sched_lock); - init_waitqueue_head(&kptllnd_data.kptl_sched_waitq); - INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq); - INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq); - INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq); - - /* - * Setup the tx locks/lists - */ - spin_lock_init(&kptllnd_data.kptl_tx_lock); - INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs); - atomic_set(&kptllnd_data.kptl_ntx, 0); - - /* - * Allocate and setup the peer hash table - */ - rwlock_init(&kptllnd_data.kptl_peer_rw_lock); - init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq); - INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers); - INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers); + target.nid = LNET_NID_ANY; + target.pid = LNET_PID_ANY; /* NB target for NAK doesn't matter */ + kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, target, 0); + kptllnd_data.kptl_nak_msg->ptlm_magic = PTLLND_MSG_MAGIC; + kptllnd_data.kptl_nak_msg->ptlm_version = PTLLND_MSG_VERSION; + kptllnd_data.kptl_nak_msg->ptlm_srcpid = the_lnet.ln_pid; + kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation; - spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock); + cfs_rwlock_init(&kptllnd_data.kptl_peer_rw_lock); + cfs_waitq_init(&kptllnd_data.kptl_watchdog_waitq); + cfs_atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0); + CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers); + CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers); + /* Allocate and setup the peer hash table */ kptllnd_data.kptl_peer_hash_size = *kptllnd_tunables.kptl_peer_hash_table_size; LIBCFS_ALLOC(kptllnd_data.kptl_peers, - (kptllnd_data.kptl_peer_hash_size * - sizeof(struct list_head))); + sizeof(cfs_list_t) * + kptllnd_data.kptl_peer_hash_size); if (kptllnd_data.kptl_peers == NULL) { CERROR("Failed to allocate space for peer hash table size=%d\n", kptllnd_data.kptl_peer_hash_size); @@ -620,27 +792,11 @@ kptllnd_startup (lnet_ni_t *ni) goto failed; } for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) - INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]); - - LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u)); - if (kptllnd_data.kptl_nak_msg == NULL) { - CERROR("Can't allocate NAK msg\n"); - rc = -ENOMEM; - goto failed; - } - memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u)); - kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0); - kptllnd_data.kptl_nak_msg->ptlm_magic = PTLLND_MSG_MAGIC; - kptllnd_data.kptl_nak_msg->ptlm_version = PTLLND_MSG_VERSION; - kptllnd_data.kptl_nak_msg->ptlm_srcpid = the_lnet.ln_pid; - kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid; - kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation; - kptllnd_data.kptl_nak_msg->ptlm_dstpid = LNET_PID_ANY; - kptllnd_data.kptl_nak_msg->ptlm_dstnid = LNET_NID_ANY; + CFS_INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]); kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool); - kptllnd_data.kptl_rx_cache = + kptllnd_data.kptl_rx_cache = cfs_mem_cache_create("ptllnd_rx", sizeof(kptl_rx_t) + *kptllnd_tunables.kptl_max_msg_size, @@ -666,7 +822,7 @@ kptllnd_startup (lnet_ni_t *ni) /* Start the scheduler threads for handling incoming requests. No need * to advance the state because this will be automatically cleaned up - * now that PTLNAT_INIT_DATA state has been entered */ + * now that PTLLND_INIT_DATA state has been entered */ CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED); for (i = 0; i < PTLLND_N_SCHED; i++) { rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i)); @@ -704,150 +860,114 @@ kptllnd_startup (lnet_ni_t *ni) if (*kptllnd_tunables.kptl_checksum) CWARN("Checksumming enabled\n"); - - CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n"); + + CDEBUG(D_NET, "<<< kptllnd_base_startup SUCCESS\n"); return 0; failed: - CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc); - kptllnd_shutdown(ni); + CERROR("kptllnd_base_startup failed: %d\n", rc); + kptllnd_base_shutdown(); return rc; } -void -kptllnd_shutdown (lnet_ni_t *ni) +int +kptllnd_startup (lnet_ni_t *ni) { - int i; - ptl_err_t prc; - lnet_process_id_t process_id; - unsigned long flags; + int rc; + kptl_net_t *net; - CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n", - atomic_read (&libcfs_kmemory)); - - LASSERT (ni == kptllnd_data.kptl_ni); - - switch (kptllnd_data.kptl_init) { - default: - LBUG(); + LASSERT (ni->ni_lnd == &kptllnd_lnd); - case PTLLND_INIT_ALL: - case PTLLND_INIT_DATA: - /* Stop receiving */ - kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool); - LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq)); - LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq)); + if (kptllnd_data.kptl_init == PTLLND_INIT_NOTHING) { + rc = kptllnd_base_startup(); + if (rc != 0) + return rc; + } - /* Hold peertable lock to interleave cleanly with peer birth/death */ - write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); + LIBCFS_ALLOC(net, sizeof(*net)); + ni->ni_data = net; + if (net == NULL) { + CERROR("Can't allocate kptl_net_t\n"); + rc = -ENOMEM; + goto failed; + } + memset(net, 0, sizeof(*net)); + net->net_ni = ni; + + ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits; + ni->ni_peertxcredits = *kptllnd_tunables.kptl_peertxcredits; + ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits; + ni->ni_nid = kptllnd_ptl2lnetnid(ni->ni_nid, + kptllnd_data.kptl_portals_id.nid); + CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", + kptllnd_ptlid2str(kptllnd_data.kptl_portals_id), + libcfs_nid2str(ni->ni_nid)); - LASSERT (kptllnd_data.kptl_shutdown == 0); - kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */ + /* NB LNET_NIDNET(ptlm_srcnid) of NAK doesn't matter in case of + * multiple NIs */ + kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid; - /* no new peers possible now */ - write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, - flags); + cfs_atomic_set(&net->net_refcount, 1); + cfs_write_lock(&kptllnd_data.kptl_net_rw_lock); + cfs_list_add_tail(&net->net_list, &kptllnd_data.kptl_nets); + cfs_write_unlock(&kptllnd_data.kptl_net_rw_lock); + return 0; - /* nuke all existing peers */ - process_id.nid = LNET_NID_ANY; - process_id.pid = LNET_PID_ANY; - kptllnd_peer_del(process_id); + failed: + kptllnd_shutdown(ni); + return rc; +} - read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); +void +kptllnd_shutdown (lnet_ni_t *ni) +{ + kptl_net_t *net = ni->ni_data; + int i; + unsigned long flags; - LASSERT (kptllnd_data.kptl_n_active_peers == 0); + LASSERT (kptllnd_data.kptl_init == PTLLND_INIT_ALL); - i = 2; - while (kptllnd_data.kptl_npeers != 0) { + CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n", + cfs_atomic_read (&libcfs_kmemory)); + + if (net == NULL) + goto out; + + LASSERT (ni == net->net_ni); + LASSERT (!net->net_shutdown); + LASSERT (!cfs_list_empty(&net->net_list)); + LASSERT (cfs_atomic_read(&net->net_refcount) != 0); + ni->ni_data = NULL; + net->net_ni = NULL; + + cfs_write_lock(&kptllnd_data.kptl_net_rw_lock); + kptllnd_net_decref(net); + cfs_list_del_init(&net->net_list); + cfs_write_unlock(&kptllnd_data.kptl_net_rw_lock); + + /* Can't nuke peers here - they are shared among all NIs */ + cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); + net->net_shutdown = 1; /* Order with peer creation */ + cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); + + i = 2; + while (cfs_atomic_read(&net->net_refcount) != 0) { i++; CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, - "Waiting for %d peers to terminate\n", - kptllnd_data.kptl_npeers); - - read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, - flags); - - cfs_pause(cfs_time_seconds(1)); - - read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, - flags); - } - - LASSERT(list_empty(&kptllnd_data.kptl_closing_peers)); - LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers)); - LASSERT (kptllnd_data.kptl_peers != NULL); - for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) - LASSERT (list_empty (&kptllnd_data.kptl_peers[i])); - - read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); - CDEBUG(D_NET, "All peers deleted\n"); - - /* Shutdown phase 2: kill the daemons... */ - kptllnd_data.kptl_shutdown = 2; - mb(); - - i = 2; - while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) { - /* Wake up all threads*/ - wake_up_all(&kptllnd_data.kptl_sched_waitq); - wake_up_all(&kptllnd_data.kptl_watchdog_waitq); + "Waiting for %d references to drop\n", + cfs_atomic_read(&net->net_refcount)); - i++; - CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ - "Waiting for %d threads to terminate\n", - atomic_read(&kptllnd_data.kptl_nthreads)); - cfs_pause(cfs_time_seconds(1)); + cfs_pause(cfs_time_seconds(1)); } - CDEBUG(D_NET, "All Threads stopped\n"); - LASSERT(list_empty(&kptllnd_data.kptl_sched_txq)); - - kptllnd_cleanup_tx_descs(); - - /* Nothing here now, but libcfs might soon require - * us to explicitly destroy wait queues and semaphores - * that would be done here */ - - /* fall through */ - - case PTLLND_INIT_NOTHING: - CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n"); - break; - } - - if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) { - prc = PtlEQFree(kptllnd_data.kptl_eqh); - if (prc != PTL_OK) - CERROR("Error %d freeing portals EQ\n", prc); - } - - if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) { - prc = PtlNIFini(kptllnd_data.kptl_nih); - if (prc != PTL_OK) - CERROR("Error %d finalizing portals NI\n", prc); - } - - LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0); - LASSERT (list_empty(&kptllnd_data.kptl_idle_txs)); - - if (kptllnd_data.kptl_rx_cache != NULL) - cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache); - - if (kptllnd_data.kptl_peers != NULL) - LIBCFS_FREE (kptllnd_data.kptl_peers, - sizeof (struct list_head) * - kptllnd_data.kptl_peer_hash_size); - - if (kptllnd_data.kptl_nak_msg != NULL) - LIBCFS_FREE (kptllnd_data.kptl_nak_msg, - offsetof(kptl_msg_t, ptlm_u)); - - memset(&kptllnd_data, 0, sizeof(kptllnd_data)); - + LIBCFS_FREE(net, sizeof(*net)); +out: + /* NB no locking since I don't race with writers */ + if (cfs_list_empty(&kptllnd_data.kptl_nets)) + kptllnd_base_shutdown(); CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n", - atomic_read (&libcfs_kmemory)); - - PORTAL_MODULE_UNUSE; + cfs_atomic_read (&libcfs_kmemory)); + return; } int __init @@ -875,7 +995,7 @@ kptllnd_module_fini (void) kptllnd_tunables_fini(); } -MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Kernel Portals LND v1.00"); MODULE_LICENSE("GPL");