- multiple-instance support for kptllnd.
mxlnd - MX 1.2.1 or later,
ptllnd - Portals 3.3 / UNICOS/lc 1.5.x, 2.0.x
+Severity : enhancement
+Bugzilla : 19735
+Description: multiple-instance support for kptllnd
+
Severity : normal
Bugzilla : 20897
Description: ksocknal_close_conn_locked connection race
/*
* The Cray Portals has no maximum number of IOVs. The
- * maximum is limited only my memory and size of the
+ * maximum is limited only by memory and size of the
* int parameters (2^31-1).
* Lustre only really require that the underyling
* implemenation to support at least LNET_MAX_IOV,
}
void
-kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob)
+kptllnd_init_msg(kptl_msg_t *msg, int type,
+ lnet_process_id_t target, int body_nob)
{
msg->ptlm_type = type;
msg->ptlm_nob = (offsetof(kptl_msg_t, ptlm_u) + body_nob + 7) & ~7;
+ msg->ptlm_dstpid = target.pid;
+ msg->ptlm_dstnid = target.nid;
+ msg->ptlm_srcpid = the_lnet.ln_pid;
+ msg->ptlm_srcnid = kptllnd_ptl2lnetnid(target.nid,
+ kptllnd_data.kptl_portals_id.nid);
LASSERT(msg->ptlm_nob <= *kptllnd_tunables.kptl_max_msg_size);
}
msg->ptlm_credits = peer->peer_outstanding_credits;
/* msg->ptlm_nob Filled in kptllnd_init_msg() */
msg->ptlm_cksum = 0;
- msg->ptlm_srcnid = kptllnd_data.kptl_ni->ni_nid;
+ /* msg->ptlm_{src|dst}[pn]id Filled in kptllnd_init_msg */
msg->ptlm_srcstamp = peer->peer_myincarnation;
- msg->ptlm_dstnid = peer->peer_id.nid;
msg->ptlm_dststamp = peer->peer_incarnation;
- msg->ptlm_srcpid = the_lnet.ln_pid;
- msg->ptlm_dstpid = peer->peer_id.pid;
if (*kptllnd_tunables.kptl_checksum) {
/* NB ptlm_cksum zero while computing cksum */
int
kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
{
+ kptl_net_t *net = ni->ni_data;
struct libcfs_ioctl_data *data = arg;
int rc = -EINVAL;
* Validate that the context block is actually
* pointing to this interface
*/
- LASSERT (ni == kptllnd_data.kptl_ni);
+ LASSERT (ni == net->net_ni);
switch(cmd) {
case IOC_LIBCFS_DEL_PEER: {
void
kptllnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when)
{
+ kptl_net_t *net = ni->ni_data;
kptl_peer_t *peer = NULL;
lnet_process_id_t id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID};
unsigned long flags;
/* NB: kptllnd_find_target connects to peer if necessary */
- if (kptllnd_find_target(&peer, id) != 0)
+ if (kptllnd_find_target(net, id, &peer) != 0)
return;
spin_lock_irqsave(&peer->peer_lock, flags);
return;
}
+void
+kptllnd_base_shutdown (void)
+{
+ int i;
+ ptl_err_t prc;
+ unsigned long flags;
+ lnet_process_id_t process_id;
+
+ read_lock(&kptllnd_data.kptl_net_rw_lock);
+ LASSERT (list_empty(&kptllnd_data.kptl_nets));
+ read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+ switch (kptllnd_data.kptl_init) {
+ default:
+ LBUG();
+
+ case PTLLND_INIT_ALL:
+ case PTLLND_INIT_DATA:
+ /* stop receiving */
+ kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
+ LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
+ LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
+
+ /* lock to interleave cleanly with peer birth/death */
+ write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+ LASSERT (kptllnd_data.kptl_shutdown == 0);
+ kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
+ /* no new peers possible now */
+ write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+ /* nuke all existing peers */
+ process_id.nid = LNET_NID_ANY;
+ process_id.pid = LNET_PID_ANY;
+ kptllnd_peer_del(process_id);
+
+ read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+ LASSERT (kptllnd_data.kptl_n_active_peers == 0);
+
+ i = 2;
+ while (kptllnd_data.kptl_npeers != 0) {
+ i++;
+ CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
+ "Waiting for %d peers to terminate\n",
+ kptllnd_data.kptl_npeers);
+
+ read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+ flags);
+
+ cfs_pause(cfs_time_seconds(1));
+
+ read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,
+ flags);
+ }
+
+ LASSERT (list_empty(&kptllnd_data.kptl_closing_peers));
+ LASSERT (list_empty(&kptllnd_data.kptl_zombie_peers));
+ LASSERT (kptllnd_data.kptl_peers != NULL);
+ for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
+ LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
+
+ read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+ CDEBUG(D_NET, "All peers deleted\n");
+
+ /* Shutdown phase 2: kill the daemons... */
+ kptllnd_data.kptl_shutdown = 2;
+ mb();
+
+ i = 2;
+ while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
+ /* Wake up all threads*/
+ wake_up_all(&kptllnd_data.kptl_sched_waitq);
+ wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
+
+ i++;
+ CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+ "Waiting for %d threads to terminate\n",
+ atomic_read(&kptllnd_data.kptl_nthreads));
+ cfs_pause(cfs_time_seconds(1));
+ }
+
+ CDEBUG(D_NET, "All Threads stopped\n");
+ LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
+
+ kptllnd_cleanup_tx_descs();
+
+ /* Nothing here now, but libcfs might soon require
+ * us to explicitly destroy wait queues and semaphores
+ * that would be done here */
+
+ /* fall through */
+
+ case PTLLND_INIT_NOTHING:
+ CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
+ break;
+ }
+
+ if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
+ prc = PtlEQFree(kptllnd_data.kptl_eqh);
+ if (prc != PTL_OK)
+ CERROR("Error %s(%d) freeing portals EQ\n",
+ kptllnd_errtype2str(prc), prc);
+ }
+
+ if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
+ prc = PtlNIFini(kptllnd_data.kptl_nih);
+ if (prc != PTL_OK)
+ CERROR("Error %s(%d) finalizing portals NI\n",
+ kptllnd_errtype2str(prc), prc);
+ }
+
+ LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
+ LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
+
+ if (kptllnd_data.kptl_rx_cache != NULL)
+ cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
+
+ if (kptllnd_data.kptl_peers != NULL)
+ LIBCFS_FREE(kptllnd_data.kptl_peers,
+ sizeof (struct list_head) *
+ kptllnd_data.kptl_peer_hash_size);
+
+ if (kptllnd_data.kptl_nak_msg != NULL)
+ LIBCFS_FREE(kptllnd_data.kptl_nak_msg,
+ offsetof(kptl_msg_t, ptlm_u));
+
+ memset(&kptllnd_data, 0, sizeof(kptllnd_data));
+ PORTAL_MODULE_UNUSE;
+ return;
+}
+
int
-kptllnd_startup (lnet_ni_t *ni)
+kptllnd_base_startup (void)
{
- int rc;
int i;
+ int rc;
int spares;
struct timeval tv;
+ lnet_process_id_t target;
ptl_err_t ptl_rc;
- LASSERT (ni->ni_lnd == &kptllnd_lnd);
-
- if (kptllnd_data.kptl_init != PTLLND_INIT_NOTHING) {
- CERROR("Only 1 instance supported\n");
- return -EPERM;
- }
-
if (*kptllnd_tunables.kptl_max_procs_per_node < 1) {
CERROR("max_procs_per_node must be >= 1\n");
return -EINVAL;
CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
- /*
- * zero pointers, flags etc
- * put everything into a known state.
- */
+ /* Zero pointers, flags etc; put everything into a known state. */
memset (&kptllnd_data, 0, sizeof (kptllnd_data));
+
+ LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
+ if (kptllnd_data.kptl_nak_msg == NULL) {
+ CERROR("Can't allocate NAK msg\n");
+ return -ENOMEM;
+ }
+ memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
+
kptllnd_data.kptl_eqh = PTL_INVALID_HANDLE;
kptllnd_data.kptl_nih = PTL_INVALID_HANDLE;
- /*
- * Setup the sched locks/lists/waitq
- */
+ rwlock_init(&kptllnd_data.kptl_net_rw_lock);
+ INIT_LIST_HEAD(&kptllnd_data.kptl_nets);
+
+ /* Setup the sched locks/lists/waitq */
spin_lock_init(&kptllnd_data.kptl_sched_lock);
init_waitqueue_head(&kptllnd_data.kptl_sched_waitq);
INIT_LIST_HEAD(&kptllnd_data.kptl_sched_txq);
INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxq);
INIT_LIST_HEAD(&kptllnd_data.kptl_sched_rxbq);
- /* init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
+ /* Init kptl_ptlid2str_lock before any call to kptllnd_ptlid2str */
spin_lock_init(&kptllnd_data.kptl_ptlid2str_lock);
- /*
- * Setup the tx locks/lists
- */
+ /* Setup the tx locks/lists */
spin_lock_init(&kptllnd_data.kptl_tx_lock);
INIT_LIST_HEAD(&kptllnd_data.kptl_idle_txs);
atomic_set(&kptllnd_data.kptl_ntx, 0);
- /*
- * Uptick the module reference count
- */
+ /* Uptick the module reference count */
PORTAL_MODULE_USE;
- /*
- * Setup pointers between the ni and context data block
- */
- kptllnd_data.kptl_ni = ni;
- ni->ni_data = &kptllnd_data;
-
- /*
- * Setup Credits
- */
- ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits;
- ni->ni_peertxcredits = *kptllnd_tunables.kptl_peertxcredits;
- ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
-
kptllnd_data.kptl_expected_peers =
*kptllnd_tunables.kptl_max_nodes *
*kptllnd_tunables.kptl_max_procs_per_node;
goto failed;
}
- /*
- * Fetch the lower NID
- */
+ /* Fetch the lower NID */
ptl_rc = PtlGetId(kptllnd_data.kptl_nih,
&kptllnd_data.kptl_portals_id);
if (ptl_rc != PTL_OK) {
goto failed;
}
- ni->ni_nid = kptllnd_ptl2lnetnid(kptllnd_data.kptl_portals_id.nid);
-
- CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",
- kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
- libcfs_nid2str(ni->ni_nid));
-
/* Initialized the incarnation - it must be for-all-time unique, even
* accounting for the fact that we increment it when we disconnect a
* peer that's using it */
tv.tv_usec;
CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);
- /*
- * Allocate and setup the peer hash table
- */
+ target.nid = LNET_NID_ANY;
+ target.pid = LNET_PID_ANY; /* NB target for NAK doesn't matter */
+ kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, target, 0);
+ kptllnd_data.kptl_nak_msg->ptlm_magic = PTLLND_MSG_MAGIC;
+ kptllnd_data.kptl_nak_msg->ptlm_version = PTLLND_MSG_VERSION;
+ kptllnd_data.kptl_nak_msg->ptlm_srcpid = the_lnet.ln_pid;
+ kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
+
rwlock_init(&kptllnd_data.kptl_peer_rw_lock);
init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);
atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);
INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);
+ /* Allocate and setup the peer hash table */
kptllnd_data.kptl_peer_hash_size =
*kptllnd_tunables.kptl_peer_hash_table_size;
LIBCFS_ALLOC(kptllnd_data.kptl_peers,
- (kptllnd_data.kptl_peer_hash_size *
- sizeof(struct list_head)));
+ sizeof(struct list_head) *
+ kptllnd_data.kptl_peer_hash_size);
if (kptllnd_data.kptl_peers == NULL) {
CERROR("Failed to allocate space for peer hash table size=%d\n",
kptllnd_data.kptl_peer_hash_size);
for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);
- LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));
- if (kptllnd_data.kptl_nak_msg == NULL) {
- CERROR("Can't allocate NAK msg\n");
- rc = -ENOMEM;
- goto failed;
- }
- memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));
- kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);
- kptllnd_data.kptl_nak_msg->ptlm_magic = PTLLND_MSG_MAGIC;
- kptllnd_data.kptl_nak_msg->ptlm_version = PTLLND_MSG_VERSION;
- kptllnd_data.kptl_nak_msg->ptlm_srcpid = the_lnet.ln_pid;
- kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid;
- kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;
- kptllnd_data.kptl_nak_msg->ptlm_dstpid = LNET_PID_ANY;
- kptllnd_data.kptl_nak_msg->ptlm_dstnid = LNET_NID_ANY;
-
kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);
kptllnd_data.kptl_rx_cache =
/* Start the scheduler threads for handling incoming requests. No need
* to advance the state because this will be automatically cleaned up
- * now that PTLNAT_INIT_DATA state has been entered */
+ * now that PTLLND_INIT_DATA state has been entered */
CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);
for (i = 0; i < PTLLND_N_SCHED; i++) {
rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));
if (*kptllnd_tunables.kptl_checksum)
CWARN("Checksumming enabled\n");
- CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");
+ CDEBUG(D_NET, "<<< kptllnd_base_startup SUCCESS\n");
+ return 0;
+
+ failed:
+ CERROR("kptllnd_base_startup failed: %d\n", rc);
+ kptllnd_base_shutdown();
+ return rc;
+}
+
+int
+kptllnd_startup (lnet_ni_t *ni)
+{
+ int rc;
+ kptl_net_t *net;
+
+ LASSERT (ni->ni_lnd == &kptllnd_lnd);
+
+ if (kptllnd_data.kptl_init == PTLLND_INIT_NOTHING) {
+ rc = kptllnd_base_startup();
+ if (rc != 0)
+ return rc;
+ }
+
+ LIBCFS_ALLOC(net, sizeof(*net));
+ ni->ni_data = net;
+ if (net == NULL) {
+ CERROR("Can't allocate kptl_net_t\n");
+ rc = -ENOMEM;
+ goto failed;
+ }
+ memset(net, 0, sizeof(*net));
+ net->net_ni = ni;
+
+ ni->ni_maxtxcredits = *kptllnd_tunables.kptl_credits;
+ ni->ni_peertxcredits = *kptllnd_tunables.kptl_peertxcredits;
+ ni->ni_peerrtrcredits = *kptllnd_tunables.kptl_peerrtrcredits;
+ ni->ni_nid = kptllnd_ptl2lnetnid(ni->ni_nid,
+ kptllnd_data.kptl_portals_id.nid);
+ CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",
+ kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),
+ libcfs_nid2str(ni->ni_nid));
+
+ /* NB LNET_NIDNET(ptlm_srcnid) of NAK doesn't matter in case of
+ * multiple NIs */
+ kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid;
+
+ atomic_set(&net->net_refcount, 1);
+ write_lock(&kptllnd_data.kptl_net_rw_lock);
+ list_add_tail(&net->net_list, &kptllnd_data.kptl_nets);
+ write_unlock(&kptllnd_data.kptl_net_rw_lock);
return 0;
failed:
- CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);
kptllnd_shutdown(ni);
return rc;
}
void
kptllnd_shutdown (lnet_ni_t *ni)
{
+ kptl_net_t *net = ni->ni_data;
int i;
- ptl_err_t prc;
- lnet_process_id_t process_id;
unsigned long flags;
+ LASSERT (kptllnd_data.kptl_init == PTLLND_INIT_ALL);
+
CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",
atomic_read (&libcfs_kmemory));
- LASSERT (ni == kptllnd_data.kptl_ni);
+ if (net == NULL)
+ goto out;
- switch (kptllnd_data.kptl_init) {
- default:
- LBUG();
+ LASSERT (ni == net->net_ni);
+ LASSERT (!net->net_shutdown);
+ LASSERT (!list_empty(&net->net_list));
+ LASSERT (atomic_read(&net->net_refcount) != 0);
+ ni->ni_data = NULL;
+ net->net_ni = NULL;
- case PTLLND_INIT_ALL:
- case PTLLND_INIT_DATA:
- /* Stop receiving */
- kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);
- LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));
- LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));
+ write_lock(&kptllnd_data.kptl_net_rw_lock);
+ kptllnd_net_decref(net);
+ list_del_init(&net->net_list);
+ write_unlock(&kptllnd_data.kptl_net_rw_lock);
- /* Hold peertable lock to interleave cleanly with peer birth/death */
+ /* Can't nuke peers here - they are shared among all NIs */
write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
-
- LASSERT (kptllnd_data.kptl_shutdown == 0);
- kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */
-
- /* no new peers possible now */
- write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
- flags);
-
- /* nuke all existing peers */
- process_id.nid = LNET_NID_ANY;
- process_id.pid = LNET_PID_ANY;
- kptllnd_peer_del(process_id);
-
- read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
-
- LASSERT (kptllnd_data.kptl_n_active_peers == 0);
+ net->net_shutdown = 1; /* Order with peer creation */
+ write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
i = 2;
- while (kptllnd_data.kptl_npeers != 0) {
+ while (atomic_read(&net->net_refcount) != 0) {
i++;
CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
- "Waiting for %d peers to terminate\n",
- kptllnd_data.kptl_npeers);
-
- read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
- flags);
+ "Waiting for %d references to drop\n",
+ atomic_read(&net->net_refcount));
cfs_pause(cfs_time_seconds(1));
-
- read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,
- flags);
}
- LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));
- LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));
- LASSERT (kptllnd_data.kptl_peers != NULL);
- for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)
- LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));
-
- read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
- CDEBUG(D_NET, "All peers deleted\n");
-
- /* Shutdown phase 2: kill the daemons... */
- kptllnd_data.kptl_shutdown = 2;
- mb();
-
- i = 2;
- while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {
- /* Wake up all threads*/
- wake_up_all(&kptllnd_data.kptl_sched_waitq);
- wake_up_all(&kptllnd_data.kptl_watchdog_waitq);
-
- i++;
- CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
- "Waiting for %d threads to terminate\n",
- atomic_read(&kptllnd_data.kptl_nthreads));
- cfs_pause(cfs_time_seconds(1));
- }
-
- CDEBUG(D_NET, "All Threads stopped\n");
- LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));
-
- kptllnd_cleanup_tx_descs();
-
- /* Nothing here now, but libcfs might soon require
- * us to explicitly destroy wait queues and semaphores
- * that would be done here */
-
- /* fall through */
-
- case PTLLND_INIT_NOTHING:
- CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");
- break;
- }
-
- if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {
- prc = PtlEQFree(kptllnd_data.kptl_eqh);
- if (prc != PTL_OK)
- CERROR("Error %s(%d) freeing portals EQ\n",
- kptllnd_errtype2str(prc), prc);
- }
-
- if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {
- prc = PtlNIFini(kptllnd_data.kptl_nih);
- if (prc != PTL_OK)
- CERROR("Error %s(%d) finalizing portals NI\n",
- kptllnd_errtype2str(prc), prc);
- }
-
- LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
- LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));
-
- if (kptllnd_data.kptl_rx_cache != NULL)
- cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);
-
- if (kptllnd_data.kptl_peers != NULL)
- LIBCFS_FREE (kptllnd_data.kptl_peers,
- sizeof (struct list_head) *
- kptllnd_data.kptl_peer_hash_size);
-
- if (kptllnd_data.kptl_nak_msg != NULL)
- LIBCFS_FREE (kptllnd_data.kptl_nak_msg,
- offsetof(kptl_msg_t, ptlm_u));
-
- memset(&kptllnd_data, 0, sizeof(kptllnd_data));
-
+ LIBCFS_FREE(net, sizeof(*net));
+out:
+ /* NB no locking since I don't race with writers */
+ if (list_empty(&kptllnd_data.kptl_nets))
+ kptllnd_base_shutdown();
CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",
atomic_read (&libcfs_kmemory));
-
- PORTAL_MODULE_UNUSE;
+ return;
}
int __init
/***********************************************************************/
typedef struct kptl_data kptl_data_t;
+typedef struct kptl_net kptl_net_t;
typedef struct kptl_rx_buffer kptl_rx_buffer_t;
typedef struct kptl_peer kptl_peer_t;
struct kptl_peer
{
struct list_head peer_list;
- atomic_t peer_refcount; /* The current refrences */
+ atomic_t peer_refcount; /* The current references */
enum kptllnd_peer_state peer_state;
spinlock_t peer_lock; /* serialize */
struct list_head peer_noops; /* PTLLND_MSG_TYPE_NOOP txs */
int kptl_init; /* initialisation state */
volatile int kptl_shutdown; /* shut down? */
atomic_t kptl_nthreads; /* # live threads */
- lnet_ni_t *kptl_ni; /* _the_ LND instance */
ptl_handle_ni_t kptl_nih; /* network inteface handle */
ptl_process_id_t kptl_portals_id; /* Portals ID of interface */
__u64 kptl_incarnation; /* which one am I */
ptl_handle_eq_t kptl_eqh; /* Event Queue (EQ) */
+ rwlock_t kptl_net_rw_lock; /* serialise... */
+ struct list_head kptl_nets; /* kptl_net instances */
+
spinlock_t kptl_sched_lock; /* serialise... */
wait_queue_head_t kptl_sched_waitq; /* schedulers sleep here */
struct list_head kptl_sched_txq; /* tx requiring attention */
spinlock_t kptl_ptlid2str_lock; /* serialise str ops */
};
+struct kptl_net
+{
+ struct list_head net_list; /* chain on kptl_data:: kptl_nets */
+ lnet_ni_t *net_ni;
+ atomic_t net_refcount; /* # current references */
+ int net_shutdown; /* lnd_shutdown called */
+};
+
enum
{
PTLLND_INIT_NOTHING = 0,
extern kptl_data_t kptllnd_data;
static inline lnet_nid_t
-kptllnd_ptl2lnetnid(ptl_nid_t ptl_nid)
+kptllnd_ptl2lnetnid(lnet_nid_t ni_nid, ptl_nid_t ptl_nid)
{
#ifdef _USING_LUSTRE_PORTALS_
- return LNET_MKNID(LNET_NIDNET(kptllnd_data.kptl_ni->ni_nid),
- LNET_NIDADDR(ptl_nid));
+ return LNET_MKNID(LNET_NIDNET(ni_nid), LNET_NIDADDR(ptl_nid));
#else
- return LNET_MKNID(LNET_NIDNET(kptllnd_data.kptl_ni->ni_nid),
- ptl_nid);
+ return LNET_MKNID(LNET_NIDNET(ni_nid), ptl_nid);
#endif
}
void kptllnd_peer_check_sends(kptl_peer_t *peer);
void kptllnd_peer_check_bucket(int idx, int stamp);
void kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag);
-int kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target);
-kptl_peer_t *kptllnd_peer_handle_hello(ptl_process_id_t initiator,
+int kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
+ kptl_peer_t **peerp);
+kptl_peer_t *kptllnd_peer_handle_hello(kptl_net_t *net,
+ ptl_process_id_t initiator,
kptl_msg_t *msg);
kptl_peer_t *kptllnd_id2peer_locked(lnet_process_id_t id);
void kptllnd_peer_alive(kptl_peer_t *peer);
}
static inline void
+kptllnd_net_addref (kptl_net_t *net)
+{
+ LASSERT (atomic_read(&net->net_refcount) > 0);
+ atomic_inc(&net->net_refcount);
+}
+
+static inline void
+kptllnd_net_decref (kptl_net_t *net)
+{
+ LASSERT (atomic_read(&net->net_refcount) > 0);
+ atomic_dec(&net->net_refcount);
+}
+
+static inline void
kptllnd_set_tx_peer(kptl_tx_t *tx, kptl_peer_t *peer)
{
LASSERT (tx->tx_peer == NULL);
static inline struct list_head *
kptllnd_nid2peerlist(lnet_nid_t nid)
{
- unsigned int hash = ((unsigned int)nid) %
+ /* Only one copy of peer state for all logical peers, so the net part
+ * of NIDs is ignored; e.g. A@ptl0 and A@ptl2 share peer state */
+ unsigned int hash = ((unsigned int)LNET_NIDADDR(nid)) %
kptllnd_data.kptl_peer_hash_size;
return &kptllnd_data.kptl_peers[hash];
void kptllnd_cleanup_tx_descs(void);
void kptllnd_tx_fini(kptl_tx_t *tx);
void kptllnd_cancel_txlist(struct list_head *peerq, struct list_head *txs);
-void kptllnd_restart_txs(lnet_process_id_t target, struct list_head *restarts);
+void kptllnd_restart_txs(kptl_net_t *net, lnet_process_id_t id, struct list_head *restarts);
kptl_tx_t *kptllnd_get_idle_tx(enum kptl_tx_type purpose);
void kptllnd_tx_callback(ptl_event_t *ev);
const char *kptllnd_tx_typestr(int type);
/*
* MESSAGE SUPPORT FUNCTIONS
*/
-void kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob);
+void kptllnd_init_msg(kptl_msg_t *msg, int type,
+ lnet_process_id_t target, int body_nob);
void kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer);
int kptllnd_msg_unpack(kptl_msg_t *msg, int nob);
lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
unsigned int payload_offset = lntmsg->msg_offset;
unsigned int payload_nob = lntmsg->msg_len;
+ kptl_net_t *net = ni->ni_data;
kptl_peer_t *peer;
kptl_tx_t *tx;
int nob;
int nfrag;
int rc;
+ LASSERT (net->net_ni == ni);
+ LASSERT (!net->net_shutdown);
LASSERT (payload_nob == 0 || payload_niov > 0);
LASSERT (payload_niov <= LNET_MAX_IOV);
LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
LASSERT (!in_interrupt());
- rc = kptllnd_find_target(&peer, target);
+ rc = kptllnd_find_target(net, target, &peer);
if (rc != 0)
return rc;
+ /* NB peer->peer_id does NOT always equal target, be careful with
+ * which one to use */
switch (type) {
default:
LBUG();
tx->tx_lnet_msg = lntmsg;
tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
- sizeof(kptl_rdma_msg_t));
+ target, sizeof(kptl_rdma_msg_t));
CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
libcfs_id2str(target),
goto out;
}
- tx->tx_lnet_replymsg =
- lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg);
+ tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg);
if (tx->tx_lnet_replymsg == NULL) {
CERROR("Failed to allocate LNET reply for %s\n",
libcfs_id2str(target));
tx->tx_lnet_msg = lntmsg;
tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
- sizeof(kptl_rdma_msg_t));
+ target, sizeof(kptl_rdma_msg_t));
CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
libcfs_id2str(target),
}
nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
- kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
+ kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob);
CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
libcfs_id2str(target),
read_lock_irqsave(g_lock, flags);
for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
-
list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
peer = list_entry(ptmp, kptl_peer_t, peer_list);
void
kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
{
- LASSERT (!kptllnd_data.kptl_shutdown);
LASSERT (kptllnd_data.kptl_n_active_peers <
kptllnd_data.kptl_expected_peers);
* in MRU order */
peer = list_entry(tmp, kptl_peer_t, peer_list);
- if (peer->peer_id.nid != pid.nid)
+ if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
continue;
LASSERT (peer->peer_id.pid != pid.pid);
}
kptl_peer_t *
-kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
+kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
{
unsigned long flags;
kptl_peer_t *peer;
/* Only increase # peers under lock, to guarantee we dont grow it
* during shutdown */
- if (kptllnd_data.kptl_shutdown) {
- write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
- flags);
+ if (net->net_shutdown) {
+ write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
LIBCFS_FREE(peer, sizeof(*peer));
return NULL;
}
kptllnd_data.kptl_npeers++;
write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
-
return peer;
}
kptllnd_peer_notify (kptl_peer_t *peer)
{
unsigned long flags;
- time_t last_alive = 0;
+ kptl_net_t *net;
+ kptl_net_t **nets;
+ int i = 0;
+ int nnets = 0;
int error = 0;
+ time_t last_alive = 0;
spin_lock_irqsave(&peer->peer_lock, flags);
spin_unlock_irqrestore(&peer->peer_lock, flags);
- if (error != 0)
- lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
- last_alive);
+ if (error == 0)
+ return;
+
+ read_lock(&kptllnd_data.kptl_net_rw_lock);
+ list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
+ nnets++;
+ read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+ if (nnets == 0) /* shutdown in progress */
+ return;
+
+ LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
+ if (nets == NULL) {
+ CERROR("Failed to allocate nets[%d]\n", nnets);
+ return;
+ }
+ memset(nets, 0, nnets * sizeof(*nets));
+
+ read_lock(&kptllnd_data.kptl_net_rw_lock);
+ i = 0;
+ list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
+ LASSERT (i < nnets);
+ nets[i] = net;
+ kptllnd_net_addref(net);
+ i++;
+ }
+ read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+ for (i = 0; i < nnets; i++) {
+ lnet_nid_t peer_nid;
+
+ net = nets[i];
+ if (net == NULL)
+ break;
+
+ if (!net->net_shutdown) {
+ peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
+ peer->peer_ptlid.nid);
+ lnet_notify(net->net_ni, peer_nid, 0, last_alive);
+ }
+
+ kptllnd_net_decref(net);
+ }
+
+ LIBCFS_FREE(nets, nnets * sizeof(*nets));
}
void
peer = list_entry (ptmp, kptl_peer_t, peer_list);
if (!(id.nid == LNET_NID_ANY ||
- (peer->peer_id.nid == id.nid &&
+ (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
(id.pid == LNET_PID_ANY ||
peer->peer_id.pid == id.pid))))
continue;
/* NB "restarts" comes from peer_sendq of a single peer */
void
-kptllnd_restart_txs (lnet_process_id_t target, struct list_head *restarts)
+kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target, struct list_head *restarts)
{
kptl_tx_t *tx;
kptl_tx_t *tmp;
LASSERT (!list_empty(restarts));
- if (kptllnd_find_target(&peer, target) != 0)
+ if (kptllnd_find_target(net, target, &peer) != 0)
peer = NULL;
list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
CERROR("Can't return credits to %s: can't allocate descriptor\n",
libcfs_id2str(peer->peer_id));
} else {
- kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
+ kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
+ peer->peer_id, 0);
kptllnd_post_tx(peer, tx, 0);
}
kptllnd_peer_check_bucket (int idx, int stamp)
{
struct list_head *peers = &kptllnd_data.kptl_peers[idx];
- struct list_head *ptmp;
kptl_peer_t *peer;
- kptl_tx_t *tx;
unsigned long flags;
- int nsend;
- int nactive;
- int check_sends;
CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
/* NB. Shared lock while I just look */
read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
- list_for_each (ptmp, peers) {
- peer = list_entry (ptmp, kptl_peer_t, peer_list);
+ list_for_each_entry (peer, peers, peer_list) {
+ kptl_tx_t *tx;
+ int check_sends;
+ int c = -1, oc = -1, sc = -1;
+ int nsend = -1, nactive = -1;
+ int sent_hello = -1, state = -1;
CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
libcfs_id2str(peer->peer_id), peer->peer_credits,
tx = kptllnd_find_timed_out_tx(peer);
check_sends = peer->peer_retry_noop;
+ if (tx != NULL) {
+ c = peer->peer_credits;
+ sc = peer->peer_sent_credits;
+ oc = peer->peer_outstanding_credits;
+ state = peer->peer_state;
+ sent_hello = peer->peer_sent_hello;
+ nsend = kptllnd_count_queue(&peer->peer_sendq);
+ nactive = kptllnd_count_queue(&peer->peer_activeq);
+ }
+
spin_unlock(&peer->peer_lock);
if (tx == NULL && !check_sends)
goto again;
}
- spin_lock_irqsave(&peer->peer_lock, flags);
- nsend = kptllnd_count_queue(&peer->peer_sendq);
- nactive = kptllnd_count_queue(&peer->peer_activeq);
- spin_unlock_irqrestore(&peer->peer_lock, flags);
-
LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
libcfs_id2str(peer->peer_id),
(tx->tx_tposted == 0) ?
libcfs_id2str(peer->peer_id),
*kptllnd_tunables.kptl_timeout,
cfs_duration_sec(jiffies - tx->tx_tposted));
+ } else if (state < PEER_STATE_ACTIVE) {
+ CERROR("Could not connect %s (%d) after %ds; "
+ "peer might be down\n",
+ libcfs_id2str(peer->peer_id), state,
+ *kptllnd_tunables.kptl_timeout);
} else {
CERROR("Could not get credits for %s after %ds; "
"possible Lustre networking issues\n",
}
CERROR("%s timed out: cred %d outstanding %d, sent %d, "
- "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
- "%sposted %lu T/O %ds\n",
- libcfs_id2str(peer->peer_id), peer->peer_credits,
- peer->peer_outstanding_credits, peer->peer_sent_credits,
- nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
+ "state %d, sent_hello %d, sendq %d, activeq %d "
+ "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
+ libcfs_id2str(peer->peer_id), c, oc, sc,
+ state, sent_hello, nsend, nactive,
+ tx, kptllnd_tx_typestr(tx->tx_type),
+ kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
tx->tx_active ? "A" : "",
PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
"" : "M",
kptl_peer_t *peer;
list_for_each (tmp, peers) {
-
peer = list_entry (tmp, kptl_peer_t, peer_list);
LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
peer->peer_state == PEER_STATE_ACTIVE);
-
- if (peer->peer_id.nid != id.nid ||
- peer->peer_id.pid != id.pid)
+
+ /* NB logical LNet peers share one kptl_peer_t */
+ if (peer->peer_id.pid != id.pid ||
+ LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
continue;
kptllnd_peer_addref(peer);
CDEBUG(D_NET, "%s -> %s (%d)\n",
- libcfs_id2str(id),
+ libcfs_id2str(id),
kptllnd_ptlid2str(peer->peer_ptlid),
atomic_read (&peer->peer_refcount));
return peer;
list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
peer = list_entry (tmp, kptl_peer_t, peer_list);
- if (peer->peer_id.nid == lpid.nid &&
+ if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
peer->peer_id.pid == lpid.pid)
return peer->peer_last_matchbits_seen;
}
-
+
list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
peer = list_entry (tmp, kptl_peer_t, peer_list);
- if (peer->peer_id.nid == lpid.nid &&
+ if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
peer->peer_id.pid == lpid.pid)
return peer->peer_last_matchbits_seen;
}
-
+
return PTL_RESERVED_MATCHBITS;
}
kptl_peer_t *
-kptllnd_peer_handle_hello (ptl_process_id_t initiator,
- kptl_msg_t *msg)
+kptllnd_peer_handle_hello (kptl_net_t *net,
+ ptl_process_id_t initiator, kptl_msg_t *msg)
{
rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
kptl_peer_t *peer;
msg->ptlm_srcstamp, peer->peer_incarnation);
kptllnd_peer_decref(peer);
+ peer = NULL;
}
hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
}
kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
- sizeof(kptl_hello_msg_t));
+ lpid, sizeof(kptl_hello_msg_t));
- new_peer = kptllnd_peer_allocate(lpid, initiator);
+ new_peer = kptllnd_peer_allocate(net, lpid, initiator);
if (new_peer == NULL) {
kptllnd_tx_decref(hello_tx);
return NULL;
write_lock_irqsave(g_lock, flags);
again:
- if (kptllnd_data.kptl_shutdown) {
+ if (net->net_shutdown) {
write_unlock_irqrestore(g_lock, flags);
CERROR ("Shutdown started, refusing connection from %s\n",
new_peer->peer_last_matchbits_seen = last_matchbits_seen;
new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
+ LASSERT (!net->net_shutdown);
kptllnd_peer_add_peertable_locked(new_peer);
write_unlock_irqrestore(g_lock, flags);
}
int
-kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
+kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
+ kptl_peer_t **peerp)
{
rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock;
ptl_process_id_t ptl_id;
return -EHOSTUNREACH;
}
- /* The new peer is a kernel ptllnd, and kernel ptllnds all have
- * the same portals PID */
+ /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
+ * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
hello_tx->tx_acked = 1;
kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
- sizeof(kptl_hello_msg_t));
+ target, sizeof(kptl_hello_msg_t));
- new_peer = kptllnd_peer_allocate(target, ptl_id);
+ new_peer = kptllnd_peer_allocate(net, target, ptl_id);
if (new_peer == NULL) {
rc = -ENOMEM;
goto unwind_0;
write_lock_irqsave(g_lock, flags);
again:
- if (kptllnd_data.kptl_shutdown) {
- write_unlock_irqrestore(g_lock, flags);
- rc = -ESHUTDOWN;
- goto unwind_2;
- }
+ /* Called only in lnd_send which can't happen after lnd_shutdown */
+ LASSERT (!net->net_shutdown);
*peerp = kptllnd_id2peer_locked(target);
if (*peerp != NULL) {
* Setup MD
*/
md.start = rxb->rxb_buffer;
- md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
+ md.length = kptllnd_rx_buffer_size();
md.threshold = PTL_MD_THRESH_INF;
md.options = PTL_MD_OP_PUT |
PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
}
void
-kptllnd_nak (kptl_rx_t *rx)
+kptllnd_nak (ptl_process_id_t dest)
{
/* Fire-and-forget a stub message that will let the peer know my
* protocol magic/version and make her drop/refresh any peer state she
rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
if (rc != PTL_OK) {
CWARN("Can't NAK %s: bind failed %s(%d)\n",
- kptllnd_ptlid2str(rx->rx_initiator),
- kptllnd_errtype2str(rc), rc);
+ kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
return;
}
- rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
+ rc = PtlPut(mdh, PTL_NOACK_REQ, dest,
*kptllnd_tunables.kptl_portal, 0,
LNET_MSG_MATCHBITS, 0, 0);
-
if (rc != PTL_OK) {
CWARN("Can't NAK %s: put failed %s(%d)\n",
- kptllnd_ptlid2str(rx->rx_initiator),
- kptllnd_errtype2str(rc), rc);
+ kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
kptllnd_schedule_ptltrace_dump();
}
}
+kptl_net_t *
+kptllnd_find_net (lnet_nid_t nid)
+{
+ kptl_net_t *net;
+
+ read_lock(&kptllnd_data.kptl_net_rw_lock);
+ list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
+ LASSERT (!net->net_shutdown);
+
+ if (net->net_ni->ni_nid == nid) {
+ kptllnd_net_addref(net);
+ read_unlock(&kptllnd_data.kptl_net_rw_lock);
+ return net;
+ }
+ }
+ read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+ return NULL;
+}
+
void
kptllnd_rx_parse(kptl_rx_t *rx)
{
kptl_msg_t *msg = rx->rx_msg;
int rc = 0;
int post_credit = PTLLND_POSTRX_PEER_CREDIT;
+ kptl_net_t *net = NULL;
kptl_peer_t *peer;
struct list_head txs;
unsigned long flags;
lnet_process_id_t srcid;
+ LASSERT (!in_interrupt());
LASSERT (rx->rx_peer == NULL);
INIT_LIST_HEAD(&txs);
(__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
msg->ptlm_version : __swab16(msg->ptlm_version)),
PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
- kptllnd_nak(rx);
+ /* NB backward compatibility */
+ kptllnd_nak(rx->rx_initiator);
goto rx_done;
}
jiffies - rx->rx_treceived,
cfs_duration_sec(jiffies - rx->rx_treceived));
- if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
- CERROR("Bad source id %s from %s\n",
+ if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) {
+ CERROR("Bad source nid %s from %s\n",
libcfs_id2str(srcid),
kptllnd_ptlid2str(rx->rx_initiator));
goto rx_done;
goto failed;
}
- if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
- msg->ptlm_dstpid != the_lnet.ln_pid) {
- CERROR("Bad dstid %s (expected %s) from %s\n",
+ net = kptllnd_find_net(msg->ptlm_dstnid);
+ if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) {
+ CERROR("Bad dstid %s from %s\n",
libcfs_id2str((lnet_process_id_t) {
.nid = msg->ptlm_dstnid,
.pid = msg->ptlm_dstpid}),
- libcfs_id2str((lnet_process_id_t) {
- .nid = kptllnd_data.kptl_ni->ni_nid,
- .pid = the_lnet.ln_pid}),
kptllnd_ptlid2str(rx->rx_initiator));
goto rx_done;
}
+ if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) {
+ lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid),
+ LNET_NIDADDR(srcid.nid));
+ CERROR("Bad source nid %s from %s, %s expected.\n",
+ libcfs_id2str(srcid),
+ kptllnd_ptlid2str(rx->rx_initiator),
+ libcfs_nid2str(nid));
+ goto rx_done;
+ }
+
if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
- peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
+ peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg);
if (peer == NULL)
goto rx_done;
} else {
kptllnd_msgtype2str(msg->ptlm_type),
libcfs_id2str(srcid));
/* NAK to make the peer reconnect */
- kptllnd_nak(rx);
+ kptllnd_nak(rx->rx_initiator);
goto rx_done;
}
CWARN("NAK %s: Unexpected %s message\n",
libcfs_id2str(srcid),
kptllnd_msgtype2str(msg->ptlm_type));
- kptllnd_nak(rx);
+ kptllnd_nak(rx->rx_initiator);
rc = -EPROTO;
goto failed;
}
}
}
- LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
- msg->ptlm_srcpid == peer->peer_id.pid);
+ LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) ==
+ LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n",
+ libcfs_nid2str(msg->ptlm_srcnid),
+ libcfs_nid2str(peer->peer_id.nid));
+ LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n",
+ msg->ptlm_srcpid, peer->peer_id.pid);
spin_lock_irqsave(&peer->peer_lock, flags);
case PTLLND_MSG_TYPE_IMMEDIATE:
CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
- rc = lnet_parse(kptllnd_data.kptl_ni,
+ rc = lnet_parse(net->net_ni,
&msg->ptlm_u.immediate.kptlim_hdr,
msg->ptlm_srcnid,
rx, 0);
- if (rc >= 0) /* kptllnd_recv owns 'rx' now */
+ if (rc >= 0) { /* kptllnd_recv owns 'rx' now */
+ kptllnd_net_decref(net);
return;
+ }
goto failed;
case PTLLND_MSG_TYPE_PUT:
spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
- rc = lnet_parse(kptllnd_data.kptl_ni,
+ rc = lnet_parse(net->net_ni,
&msg->ptlm_u.rdma.kptlrm_hdr,
msg->ptlm_srcnid,
rx, 1);
- if (rc >= 0) /* kptllnd_recv owns 'rx' now */
+ if (rc >= 0) { /* kptllnd_recv owns 'rx' now */
+ kptllnd_net_decref(net);
return;
+ }
goto failed;
}
kptllnd_peer_close(peer, rc);
if (rx->rx_peer == NULL) /* drop ref on peer */
kptllnd_peer_decref(peer); /* unless rx_done will */
- if (!list_empty(&txs))
- kptllnd_restart_txs(srcid, &txs);
+ if (!list_empty(&txs)) {
+ LASSERT (net != NULL);
+ kptllnd_restart_txs(net, srcid, &txs);
+ }
rx_done:
+ if (net != NULL)
+ kptllnd_net_decref(net);
kptllnd_rx_done(rx, post_credit);
}
tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
tx->tx_msg = NULL;
+ tx->tx_peer = NULL;
tx->tx_frags = NULL;
LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
for (i = 0; i < n; i++) {
kptl_tx_t *tx = kptllnd_alloc_tx();
-
if (tx == NULL)
return -ENOMEM;
spin_lock(&kptllnd_data.kptl_tx_lock);
-
list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
-
spin_unlock(&kptllnd_data.kptl_tx_lock);
}
/* Must finalize AFTER freeing 'tx' */
if (msg != NULL)
- lnet_finalize(kptllnd_data.kptl_ni, msg,
- (replymsg == NULL) ? status : 0);
+ lnet_finalize(NULL, msg, (replymsg == NULL) ? status : 0);
if (replymsg != NULL)
- lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
+ lnet_finalize(NULL, replymsg, status);
if (peer != NULL)
kptllnd_peer_decref(peer);
if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
if (ev->hdr_data == PTLLND_RDMA_OK) {
- lnet_set_reply_msg_len(
- kptllnd_data.kptl_ni,
+ lnet_set_reply_msg_len(NULL,
tx->tx_lnet_replymsg,
ev->mlength);
} else {