From fb91164e56e44290fed34332d30ff002fd4f584e Mon Sep 17 00:00:00 2001 From: eeb Date: Mon, 5 Apr 2004 19:26:16 +0000 Subject: [PATCH] * Fix for 2895 --- lnet/klnds/qswlnd/qswlnd.c | 61 +++++++++------ lnet/klnds/qswlnd/qswlnd.h | 4 +- lnet/klnds/qswlnd/qswlnd_cb.c | 128 +++++++++++++++----------------- lnet/router/router.c | 65 ++++++++-------- lustre/portals/knals/qswnal/qswnal.c | 61 +++++++++------ lustre/portals/knals/qswnal/qswnal.h | 4 +- lustre/portals/knals/qswnal/qswnal_cb.c | 128 +++++++++++++++----------------- lustre/portals/router/router.c | 65 ++++++++-------- 8 files changed, 264 insertions(+), 252 deletions(-) diff --git a/lnet/klnds/qswlnd/qswlnd.c b/lnet/klnds/qswlnd/qswlnd.c index a386eef..aeadd31 100644 --- a/lnet/klnds/qswlnd/qswlnd.c +++ b/lnet/klnds/qswlnd/qswlnd.c @@ -222,6 +222,9 @@ kqswnal_cmd (struct portals_cfg *pcfg, void *private) void __exit kqswnal_finalise (void) { + unsigned long flags; + int do_ptl_fini = 0; + switch (kqswnal_data.kqn_init) { default: @@ -237,8 +240,7 @@ kqswnal_finalise (void) /* fall through */ case KQN_INIT_PTL: - PtlNIFini (kqswnal_ni); - lib_fini (&kqswnal_lib); + do_ptl_fini = 1; /* fall through */ case KQN_INIT_DATA: @@ -249,18 +251,24 @@ kqswnal_finalise (void) } /**********************************************************************/ - /* Make router stop her calling me and fail any more call-ins */ + /* Tell router we're shutting down. Any router calls my threads + * make will now fail immediately and the router will stop calling + * into me. */ kpr_shutdown (&kqswnal_data.kqn_router); - + /**********************************************************************/ - /* flag threads we've started to terminate and wait for all to ack */ - + /* Signal the start of shutdown... */ + spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags); kqswnal_data.kqn_shuttingdown = 1; - wake_up_all (&kqswnal_data.kqn_sched_waitq); + spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags); + + wake_up_all(&kqswnal_data.kqn_idletxd_waitq); - while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) { - CDEBUG(D_NET, "waiting for %d threads to start shutting down\n", - atomic_read (&kqswnal_data.kqn_nthreads_running)); + /**********************************************************************/ + /* wait for sends that have allocated a tx desc to launch or give up */ + while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) { + CDEBUG(D_NET, "waiting for %d pending sends\n", + atomic_read (&kqswnal_data.kqn_pending_txs)); set_current_state (TASK_UNINTERRUPTIBLE); schedule_timeout (HZ); } @@ -268,18 +276,27 @@ kqswnal_finalise (void) /**********************************************************************/ /* close elan comms */ #if MULTIRAIL_EKC + /* Shut down receivers first; rx callbacks might try sending... */ if (kqswnal_data.kqn_eprx_small != NULL) ep_free_rcvr (kqswnal_data.kqn_eprx_small); if (kqswnal_data.kqn_eprx_large != NULL) ep_free_rcvr (kqswnal_data.kqn_eprx_large); + /* NB ep_free_rcvr() returns only after we've freed off all receive + * buffers (see shutdown handling in kqswnal_requeue_rx()). This + * means we must have completed any messages we passed to + * lib_parse() or kpr_fwd_start(). */ + if (kqswnal_data.kqn_eptx != NULL) ep_free_xmtr (kqswnal_data.kqn_eptx); - /* freeing the xmtr completes all txs pdq */ + /* NB ep_free_xmtr() returns only after all outstanding transmits + * have called their callback... */ LASSERT(list_empty(&kqswnal_data.kqn_activetxds)); #else + /* "Old" EKC just pretends to shutdown cleanly but actually + * provides no guarantees */ if (kqswnal_data.kqn_eprx_small != NULL) ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small); @@ -298,7 +315,6 @@ kqswnal_finalise (void) #endif /**********************************************************************/ /* flag threads to terminate, wake them and wait for them to die */ - kqswnal_data.kqn_shuttingdown = 2; wake_up_all (&kqswnal_data.kqn_sched_waitq); @@ -316,10 +332,12 @@ kqswnal_finalise (void) #if MULTIRAIL_EKC LASSERT (list_empty (&kqswnal_data.kqn_readyrxds)); + LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds)); + LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds)); #endif /**********************************************************************/ - /* Complete any blocked forwarding packets with error + /* Complete any blocked forwarding packets, with error */ while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq)) @@ -327,23 +345,18 @@ kqswnal_finalise (void) kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next, kpr_fwd_desc_t, kprfd_list); list_del (&fwd->kprfd_list); - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH); - } - - while (!list_empty (&kqswnal_data.kqn_delayedfwds)) - { - kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, - kpr_fwd_desc_t, kprfd_list); - list_del (&fwd->kprfd_list); - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH); + kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN); } /**********************************************************************/ - /* Wait for router to complete any packets I sent her - */ + /* finalise router and portals lib */ kpr_deregister (&kqswnal_data.kqn_router); + if (do_ptl_fini) { + PtlNIFini (kqswnal_ni); + lib_fini (&kqswnal_lib); + } /**********************************************************************/ /* Unmap message buffers and free all descriptors and buffers diff --git a/lnet/klnds/qswlnd/qswlnd.h b/lnet/klnds/qswlnd/qswlnd.h index 5e32887..93bf584 100644 --- a/lnet/klnds/qswlnd/qswlnd.h +++ b/lnet/klnds/qswlnd/qswlnd.h @@ -196,8 +196,7 @@ typedef struct { char kqn_init; /* what's been initialised */ char kqn_shuttingdown; /* I'm trying to shut down */ - atomic_t kqn_nthreads; /* # threads not terminated */ - atomic_t kqn_nthreads_running;/* # threads still running */ + atomic_t kqn_nthreads; /* # threads running */ int kqn_optimized_gets; /* optimized GETs? */ int kqn_copy_small_fwd; /* fwd small msgs from pre-allocated buffer? */ @@ -214,6 +213,7 @@ typedef struct spinlock_t kqn_idletxd_lock; /* serialise idle txd access */ wait_queue_head_t kqn_idletxd_waitq; /* sender blocks here waiting for idle txd */ struct list_head kqn_idletxd_fwdq; /* forwarded packets block here waiting for idle txd */ + atomic_t kqn_pending_txs; /* # transmits being prepped */ spinlock_t kqn_sched_lock; /* serialise packet schedulers */ wait_queue_head_t kqn_sched_waitq; /* scheduler blocks here */ diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c index 61c88f6..577c578 100644 --- a/lnet/klnds/qswlnd/qswlnd_cb.c +++ b/lnet/klnds/qswlnd/qswlnd_cb.c @@ -426,7 +426,8 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx) list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds); /* anything blocking for a tx descriptor? */ - if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ + if (!kqswnal_data.kqn_shuttingdown && + !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ { CDEBUG(D_NET,"wakeup fwd\n"); @@ -460,6 +461,9 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) for (;;) { spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags); + if (kqswnal_data.kqn_shuttingdown) + break; + /* "normal" descriptor is free */ if (!list_empty (&kqswnal_data.kqn_idletxds)) { ktx = list_entry (kqswnal_data.kqn_idletxds.next, @@ -467,14 +471,8 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) break; } - /* "normal" descriptor pool is empty */ - - if (fwd != NULL) { /* forwarded packet => queue for idle txd */ - CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); - list_add_tail (&fwd->kprfd_list, - &kqswnal_data.kqn_idletxd_fwdq); + if (fwd != NULL) /* forwarded packet? */ break; - } /* doing a local transmit */ if (!may_block) { @@ -494,13 +492,20 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) CDEBUG (D_NET, "blocking for tx desc\n"); wait_event (kqswnal_data.kqn_idletxd_waitq, - !list_empty (&kqswnal_data.kqn_idletxds)); + !list_empty (&kqswnal_data.kqn_idletxds) || + kqswnal_data.kqn_shuttingdown); } if (ktx != NULL) { list_del (&ktx->ktx_list); list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds); ktx->ktx_launcher = current->pid; + atomic_inc(&kqswnal_data.kqn_pending_txs); + } else if (fwd != NULL) { + /* queue forwarded packet until idle txd available */ + CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); + list_add_tail (&fwd->kprfd_list, + &kqswnal_data.kqn_idletxd_fwdq); } spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); @@ -601,6 +606,9 @@ kqswnal_launch (kqswnal_tx_t *ktx) ktx->ktx_launchtime = jiffies; + if (kqswnal_data.kqn_shuttingdown) + return (-ESHUTDOWN); + LASSERT (dest >= 0); /* must be a peer */ if (ktx->ktx_state == KTX_GETTING) { /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The @@ -635,8 +643,6 @@ kqswnal_launch (kqswnal_tx_t *ktx) return (0); case EP_ENOMEM: /* can't allocate ep txd => queue for later */ - LASSERT (in_interrupt()); - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds); @@ -921,7 +927,7 @@ kqswnal_sendmsg (nal_cb_t *nal, LASSERT (payload_kiov == NULL || !in_interrupt ()); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); - + if (payload_nob > KQSW_MAXPAYLOAD) { CERROR ("request exceeds MTU size "LPSZ" (max %u).\n", payload_nob, KQSW_MAXPAYLOAD); @@ -967,19 +973,17 @@ kqswnal_sendmsg (nal_cb_t *nal, "nid "LPX64" via "LPX64" elanID %d\n", nid, targetnid, ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)); - return (PTL_FAIL); + rc = -EINVAL; + goto out; } /* peer expects RPC completion with GET data */ rc = kqswnal_dma_reply (ktx, payload_niov, payload_iov, payload_kiov, payload_offset, payload_nob); - if (rc == 0) - return (PTL_OK); - - CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); + if (rc != 0) + CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); + goto out; } memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ @@ -1052,11 +1056,8 @@ kqswnal_sendmsg (nal_cb_t *nal, else rc = kqswnal_map_tx_iov (ktx, 0, md->length, md->md_niov, md->md_iov.iov); - - if (rc < 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } + if (rc != 0) + goto out; rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1; @@ -1119,25 +1120,26 @@ kqswnal_sendmsg (nal_cb_t *nal, else rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, payload_niov, payload_iov); - if (rc != 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } + if (rc != 0) + goto out; } ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; rc = kqswnal_launch (ktx); - if (rc != 0) { /* failed? */ - CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc); + + out: + CDEBUG(rc == 0 ? D_NET : D_ERROR, + "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", + rc == 0 ? "Sent" : "Failed to send", + payload_nob, nid, targetnid, rc); + + if (rc != 0) kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } - CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", - payload_nob, nid, targetnid); - return (PTL_OK); + atomic_dec(&kqswnal_data.kqn_pending_txs); + return (rc == 0 ? PTL_OK : PTL_FAIL); } static ptl_err_t @@ -1204,7 +1206,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) if (kqswnal_nid2elanid (nid) < 0) { CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid); rc = -EHOSTUNREACH; - goto failed; + goto out; } /* copy hdr into pre-mapped buffer */ @@ -1244,20 +1246,20 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) #endif rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov); if (rc != 0) - goto failed; + goto out; } rc = kqswnal_launch (ktx); - if (rc == 0) - return; + out: + if (rc != 0) { + CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); - failed: - LASSERT (rc != 0); - CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); + kqswnal_put_idle_tx (ktx); + /* complete now (with failure) */ + kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); + } - kqswnal_put_idle_tx (ktx); - /* complete now (with failure) */ - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); + atomic_dec(&kqswnal_data.kqn_pending_txs); } void @@ -1727,7 +1729,6 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg) return ((int)pid); atomic_inc (&kqswnal_data.kqn_nthreads); - atomic_inc (&kqswnal_data.kqn_nthreads_running); return (0); } @@ -1746,7 +1747,6 @@ kqswnal_scheduler (void *arg) long flags; int rc; int counter = 0; - int shuttingdown = 0; int did_something; kportal_daemonize ("kqswnal_sched"); @@ -1756,18 +1756,6 @@ kqswnal_scheduler (void *arg) for (;;) { - if (kqswnal_data.kqn_shuttingdown != shuttingdown) { - - if (kqswnal_data.kqn_shuttingdown == 2) - break; - - /* During stage 1 of shutdown we are still responsive - * to receives */ - - atomic_dec (&kqswnal_data.kqn_nthreads_running); - shuttingdown = kqswnal_data.kqn_shuttingdown; - } - did_something = 0; if (!list_empty (&kqswnal_data.kqn_readyrxds)) @@ -1784,8 +1772,7 @@ kqswnal_scheduler (void *arg) spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); } - if (!shuttingdown && - !list_empty (&kqswnal_data.kqn_delayedtxds)) + if (!list_empty (&kqswnal_data.kqn_delayedtxds)) { ktx = list_entry(kqswnal_data.kqn_delayedtxds.next, kqswnal_tx_t, ktx_list); @@ -1794,31 +1781,31 @@ kqswnal_scheduler (void *arg) flags); rc = kqswnal_launch (ktx); - if (rc != 0) /* failed: ktx_nid down? */ - { + if (rc != 0) { CERROR("Failed delayed transmit to "LPX64 ": %d\n", ktx->ktx_nid, rc); kqswnal_tx_done (ktx, rc); } + atomic_dec (&kqswnal_data.kqn_pending_txs); did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - if (!shuttingdown & - !list_empty (&kqswnal_data.kqn_delayedfwds)) + if (!list_empty (&kqswnal_data.kqn_delayedfwds)) { fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list); list_del (&fwd->kprfd_list); spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); + /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */ kqswnal_fwd_packet (NULL, fwd); did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - /* nothing to do or hogging CPU */ + /* nothing to do or hogging CPU */ if (!did_something || counter++ == KQSW_RESCHED) { spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags); @@ -1826,8 +1813,13 @@ kqswnal_scheduler (void *arg) counter = 0; if (!did_something) { + if (kqswnal_data.kqn_shuttingdown == 2) { + /* We only exit in stage 2 of shutdown when + * there's nothing left to do */ + break; + } rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq, - kqswnal_data.kqn_shuttingdown != shuttingdown || + kqswnal_data.kqn_shuttingdown == 2 || !list_empty(&kqswnal_data.kqn_readyrxds) || !list_empty(&kqswnal_data.kqn_delayedtxds) || !list_empty(&kqswnal_data.kqn_delayedfwds)); @@ -1839,8 +1831,6 @@ kqswnal_scheduler (void *arg) } } - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); - kqswnal_thread_fini (); return (0); } diff --git a/lnet/router/router.c b/lnet/router/router.c index d0dbf0a..27aab67 100644 --- a/lnet/router/router.c +++ b/lnet/router/router.c @@ -289,18 +289,9 @@ kpr_shutdown_nal (void *arg) LASSERT (!ne->kpne_shutdown); LASSERT (!in_interrupt()); - write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */ + write_lock_irqsave (&kpr_rwlock, flags); ne->kpne_shutdown = 1; - write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */ - - while (atomic_read (&ne->kpne_refcount) != 0) - { - CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n", - ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount)); - - set_current_state (TASK_UNINTERRUPTIBLE); - schedule_timeout (HZ); - } + write_unlock_irqrestore (&kpr_rwlock, flags); } void @@ -312,15 +303,22 @@ kpr_deregister_nal (void *arg) CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid); LASSERT (ne->kpne_shutdown); /* caller must have issued shutdown already */ - LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */ LASSERT (!in_interrupt()); write_lock_irqsave (&kpr_rwlock, flags); - list_del (&ne->kpne_list); - write_unlock_irqrestore (&kpr_rwlock, flags); + /* Wait until all outstanding messages/notifications have completed */ + while (atomic_read (&ne->kpne_refcount) != 0) + { + CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n", + ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount)); + + set_current_state (TASK_UNINTERRUPTIBLE); + schedule_timeout (HZ); + } + PORTAL_FREE (ne, sizeof (*ne)); PORTAL_MODULE_UNUSE; } @@ -377,12 +375,15 @@ kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob, CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, ne->kpne_interface.kprni_nalid); - - if (ne->kpne_shutdown) /* caller is shutting down */ - return (-ENOENT); + LASSERT (!in_interrupt()); read_lock (&kpr_rwlock); + if (ne->kpne_shutdown) { /* caller is shutting down */ + read_unlock (&kpr_rwlock); + return (-ENOENT); + } + /* Search routes for one that has a gateway to target_nid on the callers network */ list_for_each (e, &kpr_routes) { @@ -452,25 +453,26 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd) struct list_head *e; kpr_route_entry_t *re; kpr_nal_entry_t *tmp_ne; + int rc; CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd, target_nid, src_ne->kpne_interface.kprni_nalid); LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov)); - - atomic_inc (&kpr_queue_depth); - atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */ + LASSERT (!in_interrupt()); + + read_lock (&kpr_rwlock); kpr_fwd_packets++; /* (loose) stats accounting */ kpr_fwd_bytes += nob + sizeof(ptl_hdr_t); - if (src_ne->kpne_shutdown) /* caller is shutting down */ + if (src_ne->kpne_shutdown) { /* caller is shutting down */ + rc = -ESHUTDOWN; goto out; + } fwd->kprfd_router_arg = src_ne; /* stash caller's nal entry */ - read_lock (&kpr_rwlock); - /* Search routes for one that has a gateway to target_nid NOT on the caller's network */ list_for_each (e, &kpr_routes) { @@ -507,7 +509,9 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd) kpr_update_weight (ge, nob); fwd->kprfd_gateway_nid = ge->kpge_nid; - atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */ + atomic_inc (&src_ne->kpne_refcount); /* source and dest nals are */ + atomic_inc (&dst_ne->kpne_refcount); /* busy until fwd completes */ + atomic_inc (&kpr_queue_depth); read_unlock (&kpr_rwlock); @@ -520,18 +524,16 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd) return; } - read_unlock (&kpr_rwlock); + rc = -EHOSTUNREACH; out: kpr_fwd_errors++; - CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd, - target_nid, src_ne->kpne_interface.kprni_nalid); + CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d: %d\n", + fwd, target_nid, src_ne->kpne_interface.kprni_nalid, rc); - /* Can't find anywhere to forward to */ - (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH); + (fwd->kprfd_callback)(fwd->kprfd_callback_arg, rc); - atomic_dec (&kpr_queue_depth); - atomic_dec (&src_ne->kpne_refcount); + read_unlock (&kpr_rwlock); } void @@ -699,6 +701,7 @@ kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid, { struct list_head *e; + LASSERT (!in_interrupt()); read_lock(&kpr_rwlock); for (e = kpr_routes.next; e != &kpr_routes; e = e->next) { diff --git a/lustre/portals/knals/qswnal/qswnal.c b/lustre/portals/knals/qswnal/qswnal.c index a386eef..aeadd31 100644 --- a/lustre/portals/knals/qswnal/qswnal.c +++ b/lustre/portals/knals/qswnal/qswnal.c @@ -222,6 +222,9 @@ kqswnal_cmd (struct portals_cfg *pcfg, void *private) void __exit kqswnal_finalise (void) { + unsigned long flags; + int do_ptl_fini = 0; + switch (kqswnal_data.kqn_init) { default: @@ -237,8 +240,7 @@ kqswnal_finalise (void) /* fall through */ case KQN_INIT_PTL: - PtlNIFini (kqswnal_ni); - lib_fini (&kqswnal_lib); + do_ptl_fini = 1; /* fall through */ case KQN_INIT_DATA: @@ -249,18 +251,24 @@ kqswnal_finalise (void) } /**********************************************************************/ - /* Make router stop her calling me and fail any more call-ins */ + /* Tell router we're shutting down. Any router calls my threads + * make will now fail immediately and the router will stop calling + * into me. */ kpr_shutdown (&kqswnal_data.kqn_router); - + /**********************************************************************/ - /* flag threads we've started to terminate and wait for all to ack */ - + /* Signal the start of shutdown... */ + spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags); kqswnal_data.kqn_shuttingdown = 1; - wake_up_all (&kqswnal_data.kqn_sched_waitq); + spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags); + + wake_up_all(&kqswnal_data.kqn_idletxd_waitq); - while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) { - CDEBUG(D_NET, "waiting for %d threads to start shutting down\n", - atomic_read (&kqswnal_data.kqn_nthreads_running)); + /**********************************************************************/ + /* wait for sends that have allocated a tx desc to launch or give up */ + while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) { + CDEBUG(D_NET, "waiting for %d pending sends\n", + atomic_read (&kqswnal_data.kqn_pending_txs)); set_current_state (TASK_UNINTERRUPTIBLE); schedule_timeout (HZ); } @@ -268,18 +276,27 @@ kqswnal_finalise (void) /**********************************************************************/ /* close elan comms */ #if MULTIRAIL_EKC + /* Shut down receivers first; rx callbacks might try sending... */ if (kqswnal_data.kqn_eprx_small != NULL) ep_free_rcvr (kqswnal_data.kqn_eprx_small); if (kqswnal_data.kqn_eprx_large != NULL) ep_free_rcvr (kqswnal_data.kqn_eprx_large); + /* NB ep_free_rcvr() returns only after we've freed off all receive + * buffers (see shutdown handling in kqswnal_requeue_rx()). This + * means we must have completed any messages we passed to + * lib_parse() or kpr_fwd_start(). */ + if (kqswnal_data.kqn_eptx != NULL) ep_free_xmtr (kqswnal_data.kqn_eptx); - /* freeing the xmtr completes all txs pdq */ + /* NB ep_free_xmtr() returns only after all outstanding transmits + * have called their callback... */ LASSERT(list_empty(&kqswnal_data.kqn_activetxds)); #else + /* "Old" EKC just pretends to shutdown cleanly but actually + * provides no guarantees */ if (kqswnal_data.kqn_eprx_small != NULL) ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small); @@ -298,7 +315,6 @@ kqswnal_finalise (void) #endif /**********************************************************************/ /* flag threads to terminate, wake them and wait for them to die */ - kqswnal_data.kqn_shuttingdown = 2; wake_up_all (&kqswnal_data.kqn_sched_waitq); @@ -316,10 +332,12 @@ kqswnal_finalise (void) #if MULTIRAIL_EKC LASSERT (list_empty (&kqswnal_data.kqn_readyrxds)); + LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds)); + LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds)); #endif /**********************************************************************/ - /* Complete any blocked forwarding packets with error + /* Complete any blocked forwarding packets, with error */ while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq)) @@ -327,23 +345,18 @@ kqswnal_finalise (void) kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next, kpr_fwd_desc_t, kprfd_list); list_del (&fwd->kprfd_list); - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH); - } - - while (!list_empty (&kqswnal_data.kqn_delayedfwds)) - { - kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, - kpr_fwd_desc_t, kprfd_list); - list_del (&fwd->kprfd_list); - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH); + kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN); } /**********************************************************************/ - /* Wait for router to complete any packets I sent her - */ + /* finalise router and portals lib */ kpr_deregister (&kqswnal_data.kqn_router); + if (do_ptl_fini) { + PtlNIFini (kqswnal_ni); + lib_fini (&kqswnal_lib); + } /**********************************************************************/ /* Unmap message buffers and free all descriptors and buffers diff --git a/lustre/portals/knals/qswnal/qswnal.h b/lustre/portals/knals/qswnal/qswnal.h index 5e32887..93bf584 100644 --- a/lustre/portals/knals/qswnal/qswnal.h +++ b/lustre/portals/knals/qswnal/qswnal.h @@ -196,8 +196,7 @@ typedef struct { char kqn_init; /* what's been initialised */ char kqn_shuttingdown; /* I'm trying to shut down */ - atomic_t kqn_nthreads; /* # threads not terminated */ - atomic_t kqn_nthreads_running;/* # threads still running */ + atomic_t kqn_nthreads; /* # threads running */ int kqn_optimized_gets; /* optimized GETs? */ int kqn_copy_small_fwd; /* fwd small msgs from pre-allocated buffer? */ @@ -214,6 +213,7 @@ typedef struct spinlock_t kqn_idletxd_lock; /* serialise idle txd access */ wait_queue_head_t kqn_idletxd_waitq; /* sender blocks here waiting for idle txd */ struct list_head kqn_idletxd_fwdq; /* forwarded packets block here waiting for idle txd */ + atomic_t kqn_pending_txs; /* # transmits being prepped */ spinlock_t kqn_sched_lock; /* serialise packet schedulers */ wait_queue_head_t kqn_sched_waitq; /* scheduler blocks here */ diff --git a/lustre/portals/knals/qswnal/qswnal_cb.c b/lustre/portals/knals/qswnal/qswnal_cb.c index 61c88f6..577c578 100644 --- a/lustre/portals/knals/qswnal/qswnal_cb.c +++ b/lustre/portals/knals/qswnal/qswnal_cb.c @@ -426,7 +426,8 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx) list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds); /* anything blocking for a tx descriptor? */ - if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ + if (!kqswnal_data.kqn_shuttingdown && + !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ { CDEBUG(D_NET,"wakeup fwd\n"); @@ -460,6 +461,9 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) for (;;) { spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags); + if (kqswnal_data.kqn_shuttingdown) + break; + /* "normal" descriptor is free */ if (!list_empty (&kqswnal_data.kqn_idletxds)) { ktx = list_entry (kqswnal_data.kqn_idletxds.next, @@ -467,14 +471,8 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) break; } - /* "normal" descriptor pool is empty */ - - if (fwd != NULL) { /* forwarded packet => queue for idle txd */ - CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); - list_add_tail (&fwd->kprfd_list, - &kqswnal_data.kqn_idletxd_fwdq); + if (fwd != NULL) /* forwarded packet? */ break; - } /* doing a local transmit */ if (!may_block) { @@ -494,13 +492,20 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) CDEBUG (D_NET, "blocking for tx desc\n"); wait_event (kqswnal_data.kqn_idletxd_waitq, - !list_empty (&kqswnal_data.kqn_idletxds)); + !list_empty (&kqswnal_data.kqn_idletxds) || + kqswnal_data.kqn_shuttingdown); } if (ktx != NULL) { list_del (&ktx->ktx_list); list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds); ktx->ktx_launcher = current->pid; + atomic_inc(&kqswnal_data.kqn_pending_txs); + } else if (fwd != NULL) { + /* queue forwarded packet until idle txd available */ + CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); + list_add_tail (&fwd->kprfd_list, + &kqswnal_data.kqn_idletxd_fwdq); } spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); @@ -601,6 +606,9 @@ kqswnal_launch (kqswnal_tx_t *ktx) ktx->ktx_launchtime = jiffies; + if (kqswnal_data.kqn_shuttingdown) + return (-ESHUTDOWN); + LASSERT (dest >= 0); /* must be a peer */ if (ktx->ktx_state == KTX_GETTING) { /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The @@ -635,8 +643,6 @@ kqswnal_launch (kqswnal_tx_t *ktx) return (0); case EP_ENOMEM: /* can't allocate ep txd => queue for later */ - LASSERT (in_interrupt()); - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds); @@ -921,7 +927,7 @@ kqswnal_sendmsg (nal_cb_t *nal, LASSERT (payload_kiov == NULL || !in_interrupt ()); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); - + if (payload_nob > KQSW_MAXPAYLOAD) { CERROR ("request exceeds MTU size "LPSZ" (max %u).\n", payload_nob, KQSW_MAXPAYLOAD); @@ -967,19 +973,17 @@ kqswnal_sendmsg (nal_cb_t *nal, "nid "LPX64" via "LPX64" elanID %d\n", nid, targetnid, ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)); - return (PTL_FAIL); + rc = -EINVAL; + goto out; } /* peer expects RPC completion with GET data */ rc = kqswnal_dma_reply (ktx, payload_niov, payload_iov, payload_kiov, payload_offset, payload_nob); - if (rc == 0) - return (PTL_OK); - - CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); + if (rc != 0) + CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); + goto out; } memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ @@ -1052,11 +1056,8 @@ kqswnal_sendmsg (nal_cb_t *nal, else rc = kqswnal_map_tx_iov (ktx, 0, md->length, md->md_niov, md->md_iov.iov); - - if (rc < 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } + if (rc != 0) + goto out; rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1; @@ -1119,25 +1120,26 @@ kqswnal_sendmsg (nal_cb_t *nal, else rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, payload_niov, payload_iov); - if (rc != 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } + if (rc != 0) + goto out; } ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; rc = kqswnal_launch (ktx); - if (rc != 0) { /* failed? */ - CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc); + + out: + CDEBUG(rc == 0 ? D_NET : D_ERROR, + "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", + rc == 0 ? "Sent" : "Failed to send", + payload_nob, nid, targetnid, rc); + + if (rc != 0) kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } - CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", - payload_nob, nid, targetnid); - return (PTL_OK); + atomic_dec(&kqswnal_data.kqn_pending_txs); + return (rc == 0 ? PTL_OK : PTL_FAIL); } static ptl_err_t @@ -1204,7 +1206,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) if (kqswnal_nid2elanid (nid) < 0) { CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid); rc = -EHOSTUNREACH; - goto failed; + goto out; } /* copy hdr into pre-mapped buffer */ @@ -1244,20 +1246,20 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) #endif rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov); if (rc != 0) - goto failed; + goto out; } rc = kqswnal_launch (ktx); - if (rc == 0) - return; + out: + if (rc != 0) { + CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); - failed: - LASSERT (rc != 0); - CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); + kqswnal_put_idle_tx (ktx); + /* complete now (with failure) */ + kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); + } - kqswnal_put_idle_tx (ktx); - /* complete now (with failure) */ - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); + atomic_dec(&kqswnal_data.kqn_pending_txs); } void @@ -1727,7 +1729,6 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg) return ((int)pid); atomic_inc (&kqswnal_data.kqn_nthreads); - atomic_inc (&kqswnal_data.kqn_nthreads_running); return (0); } @@ -1746,7 +1747,6 @@ kqswnal_scheduler (void *arg) long flags; int rc; int counter = 0; - int shuttingdown = 0; int did_something; kportal_daemonize ("kqswnal_sched"); @@ -1756,18 +1756,6 @@ kqswnal_scheduler (void *arg) for (;;) { - if (kqswnal_data.kqn_shuttingdown != shuttingdown) { - - if (kqswnal_data.kqn_shuttingdown == 2) - break; - - /* During stage 1 of shutdown we are still responsive - * to receives */ - - atomic_dec (&kqswnal_data.kqn_nthreads_running); - shuttingdown = kqswnal_data.kqn_shuttingdown; - } - did_something = 0; if (!list_empty (&kqswnal_data.kqn_readyrxds)) @@ -1784,8 +1772,7 @@ kqswnal_scheduler (void *arg) spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); } - if (!shuttingdown && - !list_empty (&kqswnal_data.kqn_delayedtxds)) + if (!list_empty (&kqswnal_data.kqn_delayedtxds)) { ktx = list_entry(kqswnal_data.kqn_delayedtxds.next, kqswnal_tx_t, ktx_list); @@ -1794,31 +1781,31 @@ kqswnal_scheduler (void *arg) flags); rc = kqswnal_launch (ktx); - if (rc != 0) /* failed: ktx_nid down? */ - { + if (rc != 0) { CERROR("Failed delayed transmit to "LPX64 ": %d\n", ktx->ktx_nid, rc); kqswnal_tx_done (ktx, rc); } + atomic_dec (&kqswnal_data.kqn_pending_txs); did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - if (!shuttingdown & - !list_empty (&kqswnal_data.kqn_delayedfwds)) + if (!list_empty (&kqswnal_data.kqn_delayedfwds)) { fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list); list_del (&fwd->kprfd_list); spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); + /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */ kqswnal_fwd_packet (NULL, fwd); did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - /* nothing to do or hogging CPU */ + /* nothing to do or hogging CPU */ if (!did_something || counter++ == KQSW_RESCHED) { spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags); @@ -1826,8 +1813,13 @@ kqswnal_scheduler (void *arg) counter = 0; if (!did_something) { + if (kqswnal_data.kqn_shuttingdown == 2) { + /* We only exit in stage 2 of shutdown when + * there's nothing left to do */ + break; + } rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq, - kqswnal_data.kqn_shuttingdown != shuttingdown || + kqswnal_data.kqn_shuttingdown == 2 || !list_empty(&kqswnal_data.kqn_readyrxds) || !list_empty(&kqswnal_data.kqn_delayedtxds) || !list_empty(&kqswnal_data.kqn_delayedfwds)); @@ -1839,8 +1831,6 @@ kqswnal_scheduler (void *arg) } } - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); - kqswnal_thread_fini (); return (0); } diff --git a/lustre/portals/router/router.c b/lustre/portals/router/router.c index d0dbf0a..27aab67 100644 --- a/lustre/portals/router/router.c +++ b/lustre/portals/router/router.c @@ -289,18 +289,9 @@ kpr_shutdown_nal (void *arg) LASSERT (!ne->kpne_shutdown); LASSERT (!in_interrupt()); - write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */ + write_lock_irqsave (&kpr_rwlock, flags); ne->kpne_shutdown = 1; - write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */ - - while (atomic_read (&ne->kpne_refcount) != 0) - { - CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n", - ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount)); - - set_current_state (TASK_UNINTERRUPTIBLE); - schedule_timeout (HZ); - } + write_unlock_irqrestore (&kpr_rwlock, flags); } void @@ -312,15 +303,22 @@ kpr_deregister_nal (void *arg) CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid); LASSERT (ne->kpne_shutdown); /* caller must have issued shutdown already */ - LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */ LASSERT (!in_interrupt()); write_lock_irqsave (&kpr_rwlock, flags); - list_del (&ne->kpne_list); - write_unlock_irqrestore (&kpr_rwlock, flags); + /* Wait until all outstanding messages/notifications have completed */ + while (atomic_read (&ne->kpne_refcount) != 0) + { + CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n", + ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount)); + + set_current_state (TASK_UNINTERRUPTIBLE); + schedule_timeout (HZ); + } + PORTAL_FREE (ne, sizeof (*ne)); PORTAL_MODULE_UNUSE; } @@ -377,12 +375,15 @@ kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob, CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, ne->kpne_interface.kprni_nalid); - - if (ne->kpne_shutdown) /* caller is shutting down */ - return (-ENOENT); + LASSERT (!in_interrupt()); read_lock (&kpr_rwlock); + if (ne->kpne_shutdown) { /* caller is shutting down */ + read_unlock (&kpr_rwlock); + return (-ENOENT); + } + /* Search routes for one that has a gateway to target_nid on the callers network */ list_for_each (e, &kpr_routes) { @@ -452,25 +453,26 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd) struct list_head *e; kpr_route_entry_t *re; kpr_nal_entry_t *tmp_ne; + int rc; CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd, target_nid, src_ne->kpne_interface.kprni_nalid); LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov)); - - atomic_inc (&kpr_queue_depth); - atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */ + LASSERT (!in_interrupt()); + + read_lock (&kpr_rwlock); kpr_fwd_packets++; /* (loose) stats accounting */ kpr_fwd_bytes += nob + sizeof(ptl_hdr_t); - if (src_ne->kpne_shutdown) /* caller is shutting down */ + if (src_ne->kpne_shutdown) { /* caller is shutting down */ + rc = -ESHUTDOWN; goto out; + } fwd->kprfd_router_arg = src_ne; /* stash caller's nal entry */ - read_lock (&kpr_rwlock); - /* Search routes for one that has a gateway to target_nid NOT on the caller's network */ list_for_each (e, &kpr_routes) { @@ -507,7 +509,9 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd) kpr_update_weight (ge, nob); fwd->kprfd_gateway_nid = ge->kpge_nid; - atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */ + atomic_inc (&src_ne->kpne_refcount); /* source and dest nals are */ + atomic_inc (&dst_ne->kpne_refcount); /* busy until fwd completes */ + atomic_inc (&kpr_queue_depth); read_unlock (&kpr_rwlock); @@ -520,18 +524,16 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd) return; } - read_unlock (&kpr_rwlock); + rc = -EHOSTUNREACH; out: kpr_fwd_errors++; - CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd, - target_nid, src_ne->kpne_interface.kprni_nalid); + CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d: %d\n", + fwd, target_nid, src_ne->kpne_interface.kprni_nalid, rc); - /* Can't find anywhere to forward to */ - (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH); + (fwd->kprfd_callback)(fwd->kprfd_callback_arg, rc); - atomic_dec (&kpr_queue_depth); - atomic_dec (&src_ne->kpne_refcount); + read_unlock (&kpr_rwlock); } void @@ -699,6 +701,7 @@ kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid, { struct list_head *e; + LASSERT (!in_interrupt()); read_lock(&kpr_rwlock); for (e = kpr_routes.next; e != &kpr_routes; e = e->next) { -- 1.8.3.1