list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
/* anything blocking for a tx descriptor? */
- if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
+ if (!kqswnal_data.kqn_shuttingdown &&
+ !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
{
CDEBUG(D_NET,"wakeup fwd\n");
for (;;) {
spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
+ if (kqswnal_data.kqn_shuttingdown)
+ break;
+
/* "normal" descriptor is free */
if (!list_empty (&kqswnal_data.kqn_idletxds)) {
ktx = list_entry (kqswnal_data.kqn_idletxds.next,
break;
}
- /* "normal" descriptor pool is empty */
-
- if (fwd != NULL) { /* forwarded packet => queue for idle txd */
- CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
- list_add_tail (&fwd->kprfd_list,
- &kqswnal_data.kqn_idletxd_fwdq);
+ if (fwd != NULL) /* forwarded packet? */
break;
- }
/* doing a local transmit */
if (!may_block) {
CDEBUG (D_NET, "blocking for tx desc\n");
wait_event (kqswnal_data.kqn_idletxd_waitq,
- !list_empty (&kqswnal_data.kqn_idletxds));
+ !list_empty (&kqswnal_data.kqn_idletxds) ||
+ kqswnal_data.kqn_shuttingdown);
}
if (ktx != NULL) {
list_del (&ktx->ktx_list);
list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
ktx->ktx_launcher = current->pid;
+ atomic_inc(&kqswnal_data.kqn_pending_txs);
+ } else if (fwd != NULL) {
+ /* queue forwarded packet until idle txd available */
+ CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
+ list_add_tail (&fwd->kprfd_list,
+ &kqswnal_data.kqn_idletxd_fwdq);
}
spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
ktx->ktx_launchtime = jiffies;
+ if (kqswnal_data.kqn_shuttingdown)
+ return (-ESHUTDOWN);
+
LASSERT (dest >= 0); /* must be a peer */
if (ktx->ktx_state == KTX_GETTING) {
/* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
return (0);
case EP_ENOMEM: /* can't allocate ep txd => queue for later */
- LASSERT (in_interrupt());
-
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
LASSERT (payload_kiov == NULL || !in_interrupt ());
/* payload is either all vaddrs or all pages */
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-
+
if (payload_nob > KQSW_MAXPAYLOAD) {
CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
payload_nob, KQSW_MAXPAYLOAD);
"nid "LPX64" via "LPX64" elanID %d\n",
nid, targetnid,
ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
- return (PTL_FAIL);
+ rc = -EINVAL;
+ goto out;
}
/* peer expects RPC completion with GET data */
rc = kqswnal_dma_reply (ktx, payload_niov,
payload_iov, payload_kiov,
payload_offset, payload_nob);
- if (rc == 0)
- return (PTL_OK);
-
- CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
+ if (rc != 0)
+ CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+ goto out;
}
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
#endif
- if (kqswnal_data.kqn_optimized_gets &&
+ if (kqswnal_tunables.kqn_optimized_gets &&
type == PTL_MSG_GET && /* doing a GET */
nid == targetnid) { /* not forwarding */
lib_md_t *md = libmsg->md;
else
rc = kqswnal_map_tx_iov (ktx, 0, md->length,
md->md_niov, md->md_iov.iov);
-
- if (rc < 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ if (rc != 0)
+ goto out;
rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
else
rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
payload_niov, payload_iov);
- if (rc != 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
+ if (rc != 0)
+ goto out;
}
ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
rc = kqswnal_launch (ktx);
- if (rc != 0) { /* failed? */
- CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
+
+ out:
+ CDEBUG(rc == 0 ? D_NET : D_ERROR,
+ "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n",
+ rc == 0 ? "Sent" : "Failed to send",
+ payload_nob, nid, targetnid, rc);
+
+ if (rc != 0)
kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
- CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
- payload_nob, nid, targetnid);
- return (PTL_OK);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
+ return (rc == 0 ? PTL_OK : PTL_FAIL);
}
static ptl_err_t
if (kqswnal_nid2elanid (nid) < 0) {
CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
rc = -EHOSTUNREACH;
- goto failed;
+ goto out;
}
/* copy hdr into pre-mapped buffer */
#endif
rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
if (rc != 0)
- goto failed;
+ goto out;
}
rc = kqswnal_launch (ktx);
- if (rc == 0)
- return;
+ out:
+ if (rc != 0) {
+ CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
- failed:
- LASSERT (rc != 0);
- CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+ kqswnal_put_idle_tx (ktx);
+ /* complete now (with failure) */
+ kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ }
- kqswnal_put_idle_tx (ktx);
- /* complete now (with failure) */
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
}
void
return ((int)pid);
atomic_inc (&kqswnal_data.kqn_nthreads);
- atomic_inc (&kqswnal_data.kqn_nthreads_running);
return (0);
}
long flags;
int rc;
int counter = 0;
- int shuttingdown = 0;
int did_something;
kportal_daemonize ("kqswnal_sched");
for (;;)
{
- if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
-
- if (kqswnal_data.kqn_shuttingdown == 2)
- break;
-
- /* During stage 1 of shutdown we are still responsive
- * to receives */
-
- atomic_dec (&kqswnal_data.kqn_nthreads_running);
- shuttingdown = kqswnal_data.kqn_shuttingdown;
- }
-
did_something = 0;
if (!list_empty (&kqswnal_data.kqn_readyrxds))
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
}
- if (!shuttingdown &&
- !list_empty (&kqswnal_data.kqn_delayedtxds))
+ if (!list_empty (&kqswnal_data.kqn_delayedtxds))
{
ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
kqswnal_tx_t, ktx_list);
flags);
rc = kqswnal_launch (ktx);
- if (rc != 0) /* failed: ktx_nid down? */
- {
+ if (rc != 0) {
CERROR("Failed delayed transmit to "LPX64
": %d\n", ktx->ktx_nid, rc);
kqswnal_tx_done (ktx, rc);
}
+ atomic_dec (&kqswnal_data.kqn_pending_txs);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- if (!shuttingdown &
- !list_empty (&kqswnal_data.kqn_delayedfwds))
+ if (!list_empty (&kqswnal_data.kqn_delayedfwds))
{
fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+ /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
kqswnal_fwd_packet (NULL, fwd);
did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- /* nothing to do or hogging CPU */
+ /* nothing to do or hogging CPU */
if (!did_something || counter++ == KQSW_RESCHED) {
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
counter = 0;
if (!did_something) {
+ if (kqswnal_data.kqn_shuttingdown == 2) {
+ /* We only exit in stage 2 of shutdown when
+ * there's nothing left to do */
+ break;
+ }
rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
- kqswnal_data.kqn_shuttingdown != shuttingdown ||
+ kqswnal_data.kqn_shuttingdown == 2 ||
!list_empty(&kqswnal_data.kqn_readyrxds) ||
!list_empty(&kqswnal_data.kqn_delayedtxds) ||
!list_empty(&kqswnal_data.kqn_delayedfwds));
LASSERT (rc == 0);
- } else if (current->need_resched)
+ } else if (need_resched())
schedule ();
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
}
- spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
kqswnal_thread_fini ();
return (0);
}