void __exit
kqswnal_finalise (void)
{
+ unsigned long flags;
+ int do_ptl_fini = 0;
+
switch (kqswnal_data.kqn_init)
{
default:
/* fall through */
case KQN_INIT_PTL:
- PtlNIFini (kqswnal_ni);
- lib_fini (&kqswnal_lib);
+ do_ptl_fini = 1;
/* fall through */
case KQN_INIT_DATA:
}
/**********************************************************************/
- /* Make router stop her calling me and fail any more call-ins */
+ /* Tell router we're shutting down. Any router calls my threads
+ * make will now fail immediately and the router will stop calling
+ * into me. */
kpr_shutdown (&kqswnal_data.kqn_router);
-
+
/**********************************************************************/
- /* flag threads we've started to terminate and wait for all to ack */
-
+ /* Signal the start of shutdown... */
+ spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
kqswnal_data.kqn_shuttingdown = 1;
- wake_up_all (&kqswnal_data.kqn_sched_waitq);
+ spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
+
+ wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
- while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
- CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
- atomic_read (&kqswnal_data.kqn_nthreads_running));
+ /**********************************************************************/
+ /* wait for sends that have allocated a tx desc to launch or give up */
+ while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
+ CDEBUG(D_NET, "waiting for %d pending sends\n",
+ atomic_read (&kqswnal_data.kqn_pending_txs));
set_current_state (TASK_UNINTERRUPTIBLE);
schedule_timeout (HZ);
}
/**********************************************************************/
/* close elan comms */
#if MULTIRAIL_EKC
+ /* Shut down receivers first; rx callbacks might try sending... */
if (kqswnal_data.kqn_eprx_small != NULL)
ep_free_rcvr (kqswnal_data.kqn_eprx_small);
if (kqswnal_data.kqn_eprx_large != NULL)
ep_free_rcvr (kqswnal_data.kqn_eprx_large);
+ /* NB ep_free_rcvr() returns only after we've freed off all receive
+ * buffers (see shutdown handling in kqswnal_requeue_rx()). This
+ * means we must have completed any messages we passed to
+ * lib_parse() or kpr_fwd_start(). */
+
if (kqswnal_data.kqn_eptx != NULL)
ep_free_xmtr (kqswnal_data.kqn_eptx);
- /* freeing the xmtr completes all txs pdq */
+ /* NB ep_free_xmtr() returns only after all outstanding transmits
+ * have called their callback... */
LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
#else
+ /* "Old" EKC just pretends to shutdown cleanly but actually
+ * provides no guarantees */
if (kqswnal_data.kqn_eprx_small != NULL)
ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
#endif
/**********************************************************************/
/* flag threads to terminate, wake them and wait for them to die */
-
kqswnal_data.kqn_shuttingdown = 2;
wake_up_all (&kqswnal_data.kqn_sched_waitq);
#if MULTIRAIL_EKC
LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
+ LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
+ LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
#endif
/**********************************************************************/
- /* Complete any blocked forwarding packets with error
+ /* Complete any blocked forwarding packets, with error
*/
while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
- }
-
- while (!list_empty (&kqswnal_data.kqn_delayedfwds))
- {
- kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
- kpr_fwd_desc_t, kprfd_list);
- list_del (&fwd->kprfd_list);
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
+ kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
}
/**********************************************************************/
- /* Wait for router to complete any packets I sent her
- */
+ /* finalise router and portals lib */
kpr_deregister (&kqswnal_data.kqn_router);
+ if (do_ptl_fini) {
+ PtlNIFini (kqswnal_ni);
+ lib_fini (&kqswnal_lib);
+ }
/**********************************************************************/
/* Unmap message buffers and free all descriptors and buffers
{
char kqn_init; /* what's been initialised */
char kqn_shuttingdown; /* I'm trying to shut down */
- atomic_t kqn_nthreads; /* # threads not terminated */
- atomic_t kqn_nthreads_running;/* # threads still running */
+ atomic_t kqn_nthreads; /* # threads running */
int kqn_optimized_gets; /* optimized GETs? */
int kqn_copy_small_fwd; /* fwd small msgs from pre-allocated buffer? */
spinlock_t kqn_idletxd_lock; /* serialise idle txd access */
wait_queue_head_t kqn_idletxd_waitq; /* sender blocks here waiting for idle txd */
struct list_head kqn_idletxd_fwdq; /* forwarded packets block here waiting for idle txd */
+ atomic_t kqn_pending_txs; /* # transmits being prepped */
spinlock_t kqn_sched_lock; /* serialise packet schedulers */
wait_queue_head_t kqn_sched_waitq; /* scheduler blocks here */
list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
/* anything blocking for a tx descriptor? */
- if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
+ if (!kqswnal_data.kqn_shuttingdown &&
+ !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
{
CDEBUG(D_NET,"wakeup fwd\n");
for (;;) {
spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
+ if (kqswnal_data.kqn_shuttingdown)
+ break;
+
/* "normal" descriptor is free */
if (!list_empty (&kqswnal_data.kqn_idletxds)) {
ktx = list_entry (kqswnal_data.kqn_idletxds.next,
break;
}
- /* "normal" descriptor pool is empty */
-
- if (fwd != NULL) { /* forwarded packet => queue for idle txd */
- CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
- list_add_tail (&fwd->kprfd_list,
- &kqswnal_data.kqn_idletxd_fwdq);
+ if (fwd != NULL) /* forwarded packet? */
break;
- }
/* doing a local transmit */
if (!may_block) {
CDEBUG (D_NET, "blocking for tx desc\n");
wait_event (kqswnal_data.kqn_idletxd_waitq,
- !list_empty (&kqswnal_data.kqn_idletxds));
+ !list_empty (&kqswnal_data.kqn_idletxds) ||
+ kqswnal_data.kqn_shuttingdown);
}
if (ktx != NULL) {
list_del (&ktx->ktx_list);
list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
ktx->ktx_launcher = current->pid;
+ atomic_inc(&kqswnal_data.kqn_pending_txs);
+ } else if (fwd != NULL) {
+ /* queue forwarded packet until idle txd available */
+ CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
+ list_add_tail (&fwd->kprfd_list,
+ &kqswnal_data.kqn_idletxd_fwdq);
}
spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
ktx->ktx_launchtime = jiffies;
+ if (kqswnal_data.kqn_shuttingdown)
+ return (-ESHUTDOWN);
+
LASSERT (dest >= 0); /* must be a peer */
if (ktx->ktx_state == KTX_GETTING) {
/* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
return (0);
case EP_ENOMEM: /* can't allocate ep txd => queue for later */
- LASSERT (in_interrupt());
-
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
LASSERT (payload_kiov == NULL || !in_interrupt ());
/* payload is either all vaddrs or all pages */
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-
+
if (payload_nob > KQSW_MAXPAYLOAD) {
CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
payload_nob, KQSW_MAXPAYLOAD);
"nid "LPX64" via "LPX64" elanID %d\n",
nid, targetnid,
ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
- return (PTL_FAIL);
+ rc = -EINVAL;
+ goto out;
}
/* peer expects RPC completion with GET data */
rc = kqswnal_dma_reply (ktx, payload_niov,
payload_iov, payload_kiov,
payload_offset, payload_nob);
- if (rc == 0)
- return (PTL_OK);
-
- CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
+ if (rc != 0)
+ CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+ goto out;
}
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
else
rc = kqswnal_map_tx_iov (ktx, 0, md->length,
md->md_niov, md->md_iov.iov);
-
- if (rc < 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ if (rc != 0)
+ goto out;
rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
else
rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
payload_niov, payload_iov);
- if (rc != 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ if (rc != 0)
+ goto out;
}
ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
rc = kqswnal_launch (ktx);
- if (rc != 0) { /* failed? */
- CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
+
+ out:
+ CDEBUG(rc == 0 ? D_NET : D_ERROR,
+ "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n",
+ rc == 0 ? "Sent" : "Failed to send",
+ payload_nob, nid, targetnid, rc);
+
+ if (rc != 0)
kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
- CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
- payload_nob, nid, targetnid);
- return (PTL_OK);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
+ return (rc == 0 ? PTL_OK : PTL_FAIL);
}
static ptl_err_t
if (kqswnal_nid2elanid (nid) < 0) {
CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
rc = -EHOSTUNREACH;
- goto failed;
+ goto out;
}
/* copy hdr into pre-mapped buffer */
#endif
rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
if (rc != 0)
- goto failed;
+ goto out;
}
rc = kqswnal_launch (ktx);
- if (rc == 0)
- return;
+ out:
+ if (rc != 0) {
+ CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
- failed:
- LASSERT (rc != 0);
- CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+ kqswnal_put_idle_tx (ktx);
+ /* complete now (with failure) */
+ kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ }
- kqswnal_put_idle_tx (ktx);
- /* complete now (with failure) */
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
}
void
return ((int)pid);
atomic_inc (&kqswnal_data.kqn_nthreads);
- atomic_inc (&kqswnal_data.kqn_nthreads_running);
return (0);
}
long flags;
int rc;
int counter = 0;
- int shuttingdown = 0;
int did_something;
kportal_daemonize ("kqswnal_sched");
for (;;)
{
- if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
-
- if (kqswnal_data.kqn_shuttingdown == 2)
- break;
-
- /* During stage 1 of shutdown we are still responsive
- * to receives */
-
- atomic_dec (&kqswnal_data.kqn_nthreads_running);
- shuttingdown = kqswnal_data.kqn_shuttingdown;
- }
-
did_something = 0;
if (!list_empty (&kqswnal_data.kqn_readyrxds))
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
}
- if (!shuttingdown &&
- !list_empty (&kqswnal_data.kqn_delayedtxds))
+ if (!list_empty (&kqswnal_data.kqn_delayedtxds))
{
ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
kqswnal_tx_t, ktx_list);
flags);
rc = kqswnal_launch (ktx);
- if (rc != 0) /* failed: ktx_nid down? */
- {
+ if (rc != 0) {
CERROR("Failed delayed transmit to "LPX64
": %d\n", ktx->ktx_nid, rc);
kqswnal_tx_done (ktx, rc);
}
+ atomic_dec (&kqswnal_data.kqn_pending_txs);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- if (!shuttingdown &
- !list_empty (&kqswnal_data.kqn_delayedfwds))
+ if (!list_empty (&kqswnal_data.kqn_delayedfwds))
{
fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+ /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
kqswnal_fwd_packet (NULL, fwd);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- /* nothing to do or hogging CPU */
+ /* nothing to do or hogging CPU */
if (!did_something || counter++ == KQSW_RESCHED) {
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
counter = 0;
if (!did_something) {
+ if (kqswnal_data.kqn_shuttingdown == 2) {
+ /* We only exit in stage 2 of shutdown when
+ * there's nothing left to do */
+ break;
+ }
rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
- kqswnal_data.kqn_shuttingdown != shuttingdown ||
+ kqswnal_data.kqn_shuttingdown == 2 ||
!list_empty(&kqswnal_data.kqn_readyrxds) ||
!list_empty(&kqswnal_data.kqn_delayedtxds) ||
!list_empty(&kqswnal_data.kqn_delayedfwds));
}
}
- spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
kqswnal_thread_fini ();
return (0);
}
LASSERT (!ne->kpne_shutdown);
LASSERT (!in_interrupt());
- write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */
+ write_lock_irqsave (&kpr_rwlock, flags);
ne->kpne_shutdown = 1;
- write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */
-
- while (atomic_read (&ne->kpne_refcount) != 0)
- {
- CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
- ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
-
- set_current_state (TASK_UNINTERRUPTIBLE);
- schedule_timeout (HZ);
- }
+ write_unlock_irqrestore (&kpr_rwlock, flags);
}
void
CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
LASSERT (ne->kpne_shutdown); /* caller must have issued shutdown already */
- LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */
LASSERT (!in_interrupt());
write_lock_irqsave (&kpr_rwlock, flags);
-
list_del (&ne->kpne_list);
-
write_unlock_irqrestore (&kpr_rwlock, flags);
+ /* Wait until all outstanding messages/notifications have completed */
+ while (atomic_read (&ne->kpne_refcount) != 0)
+ {
+ CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
+ ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
+
+ set_current_state (TASK_UNINTERRUPTIBLE);
+ schedule_timeout (HZ);
+ }
+
PORTAL_FREE (ne, sizeof (*ne));
PORTAL_MODULE_UNUSE;
}
CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid,
ne->kpne_interface.kprni_nalid);
-
- if (ne->kpne_shutdown) /* caller is shutting down */
- return (-ENOENT);
+ LASSERT (!in_interrupt());
read_lock (&kpr_rwlock);
+ if (ne->kpne_shutdown) { /* caller is shutting down */
+ read_unlock (&kpr_rwlock);
+ return (-ENOENT);
+ }
+
/* Search routes for one that has a gateway to target_nid on the callers network */
list_for_each (e, &kpr_routes) {
struct list_head *e;
kpr_route_entry_t *re;
kpr_nal_entry_t *tmp_ne;
+ int rc;
CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
target_nid, src_ne->kpne_interface.kprni_nalid);
LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov));
-
- atomic_inc (&kpr_queue_depth);
- atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
+ LASSERT (!in_interrupt());
+
+ read_lock (&kpr_rwlock);
kpr_fwd_packets++; /* (loose) stats accounting */
kpr_fwd_bytes += nob + sizeof(ptl_hdr_t);
- if (src_ne->kpne_shutdown) /* caller is shutting down */
+ if (src_ne->kpne_shutdown) { /* caller is shutting down */
+ rc = -ESHUTDOWN;
goto out;
+ }
fwd->kprfd_router_arg = src_ne; /* stash caller's nal entry */
- read_lock (&kpr_rwlock);
-
/* Search routes for one that has a gateway to target_nid NOT on the caller's network */
list_for_each (e, &kpr_routes) {
kpr_update_weight (ge, nob);
fwd->kprfd_gateway_nid = ge->kpge_nid;
- atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
+ atomic_inc (&src_ne->kpne_refcount); /* source and dest nals are */
+ atomic_inc (&dst_ne->kpne_refcount); /* busy until fwd completes */
+ atomic_inc (&kpr_queue_depth);
read_unlock (&kpr_rwlock);
return;
}
- read_unlock (&kpr_rwlock);
+ rc = -EHOSTUNREACH;
out:
kpr_fwd_errors++;
- CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
- target_nid, src_ne->kpne_interface.kprni_nalid);
+ CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d: %d\n",
+ fwd, target_nid, src_ne->kpne_interface.kprni_nalid, rc);
- /* Can't find anywhere to forward to */
- (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH);
+ (fwd->kprfd_callback)(fwd->kprfd_callback_arg, rc);
- atomic_dec (&kpr_queue_depth);
- atomic_dec (&src_ne->kpne_refcount);
+ read_unlock (&kpr_rwlock);
}
void
{
struct list_head *e;
+ LASSERT (!in_interrupt());
read_lock(&kpr_rwlock);
for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
void __exit
kqswnal_finalise (void)
{
+ unsigned long flags;
+ int do_ptl_fini = 0;
+
switch (kqswnal_data.kqn_init)
{
default:
/* fall through */
case KQN_INIT_PTL:
- PtlNIFini (kqswnal_ni);
- lib_fini (&kqswnal_lib);
+ do_ptl_fini = 1;
/* fall through */
case KQN_INIT_DATA:
}
/**********************************************************************/
- /* Make router stop her calling me and fail any more call-ins */
+ /* Tell router we're shutting down. Any router calls my threads
+ * make will now fail immediately and the router will stop calling
+ * into me. */
kpr_shutdown (&kqswnal_data.kqn_router);
-
+
/**********************************************************************/
- /* flag threads we've started to terminate and wait for all to ack */
-
+ /* Signal the start of shutdown... */
+ spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
kqswnal_data.kqn_shuttingdown = 1;
- wake_up_all (&kqswnal_data.kqn_sched_waitq);
+ spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
+
+ wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
- while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
- CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
- atomic_read (&kqswnal_data.kqn_nthreads_running));
+ /**********************************************************************/
+ /* wait for sends that have allocated a tx desc to launch or give up */
+ while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
+ CDEBUG(D_NET, "waiting for %d pending sends\n",
+ atomic_read (&kqswnal_data.kqn_pending_txs));
set_current_state (TASK_UNINTERRUPTIBLE);
schedule_timeout (HZ);
}
/**********************************************************************/
/* close elan comms */
#if MULTIRAIL_EKC
+ /* Shut down receivers first; rx callbacks might try sending... */
if (kqswnal_data.kqn_eprx_small != NULL)
ep_free_rcvr (kqswnal_data.kqn_eprx_small);
if (kqswnal_data.kqn_eprx_large != NULL)
ep_free_rcvr (kqswnal_data.kqn_eprx_large);
+ /* NB ep_free_rcvr() returns only after we've freed off all receive
+ * buffers (see shutdown handling in kqswnal_requeue_rx()). This
+ * means we must have completed any messages we passed to
+ * lib_parse() or kpr_fwd_start(). */
+
if (kqswnal_data.kqn_eptx != NULL)
ep_free_xmtr (kqswnal_data.kqn_eptx);
- /* freeing the xmtr completes all txs pdq */
+ /* NB ep_free_xmtr() returns only after all outstanding transmits
+ * have called their callback... */
LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
#else
+ /* "Old" EKC just pretends to shutdown cleanly but actually
+ * provides no guarantees */
if (kqswnal_data.kqn_eprx_small != NULL)
ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
#endif
/**********************************************************************/
/* flag threads to terminate, wake them and wait for them to die */
-
kqswnal_data.kqn_shuttingdown = 2;
wake_up_all (&kqswnal_data.kqn_sched_waitq);
#if MULTIRAIL_EKC
LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
+ LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
+ LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
#endif
/**********************************************************************/
- /* Complete any blocked forwarding packets with error
+ /* Complete any blocked forwarding packets, with error
*/
while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
- }
-
- while (!list_empty (&kqswnal_data.kqn_delayedfwds))
- {
- kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
- kpr_fwd_desc_t, kprfd_list);
- list_del (&fwd->kprfd_list);
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
+ kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
}
/**********************************************************************/
- /* Wait for router to complete any packets I sent her
- */
+ /* finalise router and portals lib */
kpr_deregister (&kqswnal_data.kqn_router);
+ if (do_ptl_fini) {
+ PtlNIFini (kqswnal_ni);
+ lib_fini (&kqswnal_lib);
+ }
/**********************************************************************/
/* Unmap message buffers and free all descriptors and buffers
{
char kqn_init; /* what's been initialised */
char kqn_shuttingdown; /* I'm trying to shut down */
- atomic_t kqn_nthreads; /* # threads not terminated */
- atomic_t kqn_nthreads_running;/* # threads still running */
+ atomic_t kqn_nthreads; /* # threads running */
int kqn_optimized_gets; /* optimized GETs? */
int kqn_copy_small_fwd; /* fwd small msgs from pre-allocated buffer? */
spinlock_t kqn_idletxd_lock; /* serialise idle txd access */
wait_queue_head_t kqn_idletxd_waitq; /* sender blocks here waiting for idle txd */
struct list_head kqn_idletxd_fwdq; /* forwarded packets block here waiting for idle txd */
+ atomic_t kqn_pending_txs; /* # transmits being prepped */
spinlock_t kqn_sched_lock; /* serialise packet schedulers */
wait_queue_head_t kqn_sched_waitq; /* scheduler blocks here */
list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
/* anything blocking for a tx descriptor? */
- if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
+ if (!kqswnal_data.kqn_shuttingdown &&
+ !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
{
CDEBUG(D_NET,"wakeup fwd\n");
for (;;) {
spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
+ if (kqswnal_data.kqn_shuttingdown)
+ break;
+
/* "normal" descriptor is free */
if (!list_empty (&kqswnal_data.kqn_idletxds)) {
ktx = list_entry (kqswnal_data.kqn_idletxds.next,
break;
}
- /* "normal" descriptor pool is empty */
-
- if (fwd != NULL) { /* forwarded packet => queue for idle txd */
- CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
- list_add_tail (&fwd->kprfd_list,
- &kqswnal_data.kqn_idletxd_fwdq);
+ if (fwd != NULL) /* forwarded packet? */
break;
- }
/* doing a local transmit */
if (!may_block) {
CDEBUG (D_NET, "blocking for tx desc\n");
wait_event (kqswnal_data.kqn_idletxd_waitq,
- !list_empty (&kqswnal_data.kqn_idletxds));
+ !list_empty (&kqswnal_data.kqn_idletxds) ||
+ kqswnal_data.kqn_shuttingdown);
}
if (ktx != NULL) {
list_del (&ktx->ktx_list);
list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
ktx->ktx_launcher = current->pid;
+ atomic_inc(&kqswnal_data.kqn_pending_txs);
+ } else if (fwd != NULL) {
+ /* queue forwarded packet until idle txd available */
+ CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
+ list_add_tail (&fwd->kprfd_list,
+ &kqswnal_data.kqn_idletxd_fwdq);
}
spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
ktx->ktx_launchtime = jiffies;
+ if (kqswnal_data.kqn_shuttingdown)
+ return (-ESHUTDOWN);
+
LASSERT (dest >= 0); /* must be a peer */
if (ktx->ktx_state == KTX_GETTING) {
/* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
return (0);
case EP_ENOMEM: /* can't allocate ep txd => queue for later */
- LASSERT (in_interrupt());
-
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
LASSERT (payload_kiov == NULL || !in_interrupt ());
/* payload is either all vaddrs or all pages */
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-
+
if (payload_nob > KQSW_MAXPAYLOAD) {
CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
payload_nob, KQSW_MAXPAYLOAD);
"nid "LPX64" via "LPX64" elanID %d\n",
nid, targetnid,
ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
- return (PTL_FAIL);
+ rc = -EINVAL;
+ goto out;
}
/* peer expects RPC completion with GET data */
rc = kqswnal_dma_reply (ktx, payload_niov,
payload_iov, payload_kiov,
payload_offset, payload_nob);
- if (rc == 0)
- return (PTL_OK);
-
- CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
+ if (rc != 0)
+ CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+ goto out;
}
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
else
rc = kqswnal_map_tx_iov (ktx, 0, md->length,
md->md_niov, md->md_iov.iov);
-
- if (rc < 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ if (rc != 0)
+ goto out;
rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
else
rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
payload_niov, payload_iov);
- if (rc != 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ if (rc != 0)
+ goto out;
}
ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
rc = kqswnal_launch (ktx);
- if (rc != 0) { /* failed? */
- CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
+
+ out:
+ CDEBUG(rc == 0 ? D_NET : D_ERROR,
+ "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n",
+ rc == 0 ? "Sent" : "Failed to send",
+ payload_nob, nid, targetnid, rc);
+
+ if (rc != 0)
kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
- CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
- payload_nob, nid, targetnid);
- return (PTL_OK);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
+ return (rc == 0 ? PTL_OK : PTL_FAIL);
}
static ptl_err_t
if (kqswnal_nid2elanid (nid) < 0) {
CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
rc = -EHOSTUNREACH;
- goto failed;
+ goto out;
}
/* copy hdr into pre-mapped buffer */
#endif
rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
if (rc != 0)
- goto failed;
+ goto out;
}
rc = kqswnal_launch (ktx);
- if (rc == 0)
- return;
+ out:
+ if (rc != 0) {
+ CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
- failed:
- LASSERT (rc != 0);
- CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+ kqswnal_put_idle_tx (ktx);
+ /* complete now (with failure) */
+ kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ }
- kqswnal_put_idle_tx (ktx);
- /* complete now (with failure) */
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
}
void
return ((int)pid);
atomic_inc (&kqswnal_data.kqn_nthreads);
- atomic_inc (&kqswnal_data.kqn_nthreads_running);
return (0);
}
long flags;
int rc;
int counter = 0;
- int shuttingdown = 0;
int did_something;
kportal_daemonize ("kqswnal_sched");
for (;;)
{
- if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
-
- if (kqswnal_data.kqn_shuttingdown == 2)
- break;
-
- /* During stage 1 of shutdown we are still responsive
- * to receives */
-
- atomic_dec (&kqswnal_data.kqn_nthreads_running);
- shuttingdown = kqswnal_data.kqn_shuttingdown;
- }
-
did_something = 0;
if (!list_empty (&kqswnal_data.kqn_readyrxds))
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
}
- if (!shuttingdown &&
- !list_empty (&kqswnal_data.kqn_delayedtxds))
+ if (!list_empty (&kqswnal_data.kqn_delayedtxds))
{
ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
kqswnal_tx_t, ktx_list);
flags);
rc = kqswnal_launch (ktx);
- if (rc != 0) /* failed: ktx_nid down? */
- {
+ if (rc != 0) {
CERROR("Failed delayed transmit to "LPX64
": %d\n", ktx->ktx_nid, rc);
kqswnal_tx_done (ktx, rc);
}
+ atomic_dec (&kqswnal_data.kqn_pending_txs);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- if (!shuttingdown &
- !list_empty (&kqswnal_data.kqn_delayedfwds))
+ if (!list_empty (&kqswnal_data.kqn_delayedfwds))
{
fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+ /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
kqswnal_fwd_packet (NULL, fwd);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- /* nothing to do or hogging CPU */
+ /* nothing to do or hogging CPU */
if (!did_something || counter++ == KQSW_RESCHED) {
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
counter = 0;
if (!did_something) {
+ if (kqswnal_data.kqn_shuttingdown == 2) {
+ /* We only exit in stage 2 of shutdown when
+ * there's nothing left to do */
+ break;
+ }
rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
- kqswnal_data.kqn_shuttingdown != shuttingdown ||
+ kqswnal_data.kqn_shuttingdown == 2 ||
!list_empty(&kqswnal_data.kqn_readyrxds) ||
!list_empty(&kqswnal_data.kqn_delayedtxds) ||
!list_empty(&kqswnal_data.kqn_delayedfwds));
}
}
- spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
kqswnal_thread_fini ();
return (0);
}
LASSERT (!ne->kpne_shutdown);
LASSERT (!in_interrupt());
- write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */
+ write_lock_irqsave (&kpr_rwlock, flags);
ne->kpne_shutdown = 1;
- write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */
-
- while (atomic_read (&ne->kpne_refcount) != 0)
- {
- CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
- ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
-
- set_current_state (TASK_UNINTERRUPTIBLE);
- schedule_timeout (HZ);
- }
+ write_unlock_irqrestore (&kpr_rwlock, flags);
}
void
CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
LASSERT (ne->kpne_shutdown); /* caller must have issued shutdown already */
- LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */
LASSERT (!in_interrupt());
write_lock_irqsave (&kpr_rwlock, flags);
-
list_del (&ne->kpne_list);
-
write_unlock_irqrestore (&kpr_rwlock, flags);
+ /* Wait until all outstanding messages/notifications have completed */
+ while (atomic_read (&ne->kpne_refcount) != 0)
+ {
+ CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
+ ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
+
+ set_current_state (TASK_UNINTERRUPTIBLE);
+ schedule_timeout (HZ);
+ }
+
PORTAL_FREE (ne, sizeof (*ne));
PORTAL_MODULE_UNUSE;
}
CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid,
ne->kpne_interface.kprni_nalid);
-
- if (ne->kpne_shutdown) /* caller is shutting down */
- return (-ENOENT);
+ LASSERT (!in_interrupt());
read_lock (&kpr_rwlock);
+ if (ne->kpne_shutdown) { /* caller is shutting down */
+ read_unlock (&kpr_rwlock);
+ return (-ENOENT);
+ }
+
/* Search routes for one that has a gateway to target_nid on the callers network */
list_for_each (e, &kpr_routes) {
struct list_head *e;
kpr_route_entry_t *re;
kpr_nal_entry_t *tmp_ne;
+ int rc;
CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
target_nid, src_ne->kpne_interface.kprni_nalid);
LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov));
-
- atomic_inc (&kpr_queue_depth);
- atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
+ LASSERT (!in_interrupt());
+
+ read_lock (&kpr_rwlock);
kpr_fwd_packets++; /* (loose) stats accounting */
kpr_fwd_bytes += nob + sizeof(ptl_hdr_t);
- if (src_ne->kpne_shutdown) /* caller is shutting down */
+ if (src_ne->kpne_shutdown) { /* caller is shutting down */
+ rc = -ESHUTDOWN;
goto out;
+ }
fwd->kprfd_router_arg = src_ne; /* stash caller's nal entry */
- read_lock (&kpr_rwlock);
-
/* Search routes for one that has a gateway to target_nid NOT on the caller's network */
list_for_each (e, &kpr_routes) {
kpr_update_weight (ge, nob);
fwd->kprfd_gateway_nid = ge->kpge_nid;
- atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
+ atomic_inc (&src_ne->kpne_refcount); /* source and dest nals are */
+ atomic_inc (&dst_ne->kpne_refcount); /* busy until fwd completes */
+ atomic_inc (&kpr_queue_depth);
read_unlock (&kpr_rwlock);
return;
}
- read_unlock (&kpr_rwlock);
+ rc = -EHOSTUNREACH;
out:
kpr_fwd_errors++;
- CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
- target_nid, src_ne->kpne_interface.kprni_nalid);
+ CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d: %d\n",
+ fwd, target_nid, src_ne->kpne_interface.kprni_nalid, rc);
- /* Can't find anywhere to forward to */
- (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH);
+ (fwd->kprfd_callback)(fwd->kprfd_callback_arg, rc);
- atomic_dec (&kpr_queue_depth);
- atomic_dec (&src_ne->kpne_refcount);
+ read_unlock (&kpr_rwlock);
}
void
{
struct list_head *e;
+ LASSERT (!in_interrupt());
read_lock(&kpr_rwlock);
for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {