Whamcloud - gitweb
* Fix for 2895
authoreeb <eeb>
Mon, 5 Apr 2004 19:26:16 +0000 (19:26 +0000)
committereeb <eeb>
Mon, 5 Apr 2004 19:26:16 +0000 (19:26 +0000)
lnet/klnds/qswlnd/qswlnd.c
lnet/klnds/qswlnd/qswlnd.h
lnet/klnds/qswlnd/qswlnd_cb.c
lnet/router/router.c
lustre/portals/knals/qswnal/qswnal.c
lustre/portals/knals/qswnal/qswnal.h
lustre/portals/knals/qswnal/qswnal_cb.c
lustre/portals/router/router.c

index a386eef..aeadd31 100644 (file)
@@ -222,6 +222,9 @@ kqswnal_cmd (struct portals_cfg *pcfg, void *private)
 void __exit
 kqswnal_finalise (void)
 {
+       unsigned long flags;
+       int           do_ptl_fini = 0;
+
        switch (kqswnal_data.kqn_init)
        {
        default:
@@ -237,8 +240,7 @@ kqswnal_finalise (void)
                /* fall through */
 
        case KQN_INIT_PTL:
-               PtlNIFini (kqswnal_ni);
-               lib_fini (&kqswnal_lib);
+               do_ptl_fini = 1;
                /* fall through */
 
        case KQN_INIT_DATA:
@@ -249,18 +251,24 @@ kqswnal_finalise (void)
        }
 
        /**********************************************************************/
-       /* Make router stop her calling me and fail any more call-ins */
+       /* Tell router we're shutting down.  Any router calls my threads
+        * make will now fail immediately and the router will stop calling
+        * into me. */
        kpr_shutdown (&kqswnal_data.kqn_router);
-
+       
        /**********************************************************************/
-       /* flag threads we've started to terminate and wait for all to ack */
-
+       /* Signal the start of shutdown... */
+       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
        kqswnal_data.kqn_shuttingdown = 1;
-       wake_up_all (&kqswnal_data.kqn_sched_waitq);
+       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
+
+       wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
 
-       while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
-               CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
-                      atomic_read (&kqswnal_data.kqn_nthreads_running));
+       /**********************************************************************/
+       /* wait for sends that have allocated a tx desc to launch or give up */
+       while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
+               CDEBUG(D_NET, "waiting for %d pending sends\n",
+                      atomic_read (&kqswnal_data.kqn_pending_txs));
                set_current_state (TASK_UNINTERRUPTIBLE);
                schedule_timeout (HZ);
        }
@@ -268,18 +276,27 @@ kqswnal_finalise (void)
        /**********************************************************************/
        /* close elan comms */
 #if MULTIRAIL_EKC
+       /* Shut down receivers first; rx callbacks might try sending... */
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_free_rcvr (kqswnal_data.kqn_eprx_small);
 
        if (kqswnal_data.kqn_eprx_large != NULL)
                ep_free_rcvr (kqswnal_data.kqn_eprx_large);
 
+       /* NB ep_free_rcvr() returns only after we've freed off all receive
+        * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
+        * means we must have completed any messages we passed to
+        * lib_parse() or kpr_fwd_start(). */
+
        if (kqswnal_data.kqn_eptx != NULL)
                ep_free_xmtr (kqswnal_data.kqn_eptx);
 
-       /* freeing the xmtr completes all txs pdq */
+       /* NB ep_free_xmtr() returns only after all outstanding transmits
+        * have called their callback... */
        LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
 #else
+       /* "Old" EKC just pretends to shutdown cleanly but actually
+        * provides no guarantees */
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
 
@@ -298,7 +315,6 @@ kqswnal_finalise (void)
 #endif
        /**********************************************************************/
        /* flag threads to terminate, wake them and wait for them to die */
-
        kqswnal_data.kqn_shuttingdown = 2;
        wake_up_all (&kqswnal_data.kqn_sched_waitq);
 
@@ -316,10 +332,12 @@ kqswnal_finalise (void)
 
 #if MULTIRAIL_EKC
        LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
+       LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
+       LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
 #endif
 
        /**********************************************************************/
-       /* Complete any blocked forwarding packets with error
+       /* Complete any blocked forwarding packets, with error
         */
 
        while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
@@ -327,23 +345,18 @@ kqswnal_finalise (void)
                kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
                                                  kpr_fwd_desc_t, kprfd_list);
                list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
-       }
-
-       while (!list_empty (&kqswnal_data.kqn_delayedfwds))
-       {
-               kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
-                                                 kpr_fwd_desc_t, kprfd_list);
-               list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
+               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
        }
 
        /**********************************************************************/
-       /* Wait for router to complete any packets I sent her
-        */
+       /* finalise router and portals lib */
 
        kpr_deregister (&kqswnal_data.kqn_router);
 
+       if (do_ptl_fini) {
+               PtlNIFini (kqswnal_ni);
+               lib_fini (&kqswnal_lib);
+       }
 
        /**********************************************************************/
        /* Unmap message buffers and free all descriptors and buffers
index 5e32887..93bf584 100644 (file)
@@ -196,8 +196,7 @@ typedef struct
 {
         char               kqn_init;            /* what's been initialised */
         char               kqn_shuttingdown;    /* I'm trying to shut down */
-        atomic_t           kqn_nthreads;        /* # threads not terminated */
-        atomic_t           kqn_nthreads_running;/* # threads still running */
+        atomic_t           kqn_nthreads;        /* # threads running */
 
         int                kqn_optimized_gets;  /* optimized GETs? */
         int                kqn_copy_small_fwd;  /* fwd small msgs from pre-allocated buffer? */
@@ -214,6 +213,7 @@ typedef struct
         spinlock_t         kqn_idletxd_lock;    /* serialise idle txd access */
         wait_queue_head_t  kqn_idletxd_waitq;   /* sender blocks here waiting for idle txd */
         struct list_head   kqn_idletxd_fwdq;    /* forwarded packets block here waiting for idle txd */
+        atomic_t           kqn_pending_txs;     /* # transmits being prepped */
         
         spinlock_t         kqn_sched_lock;      /* serialise packet schedulers */
         wait_queue_head_t  kqn_sched_waitq;     /* scheduler blocks here */
index 61c88f6..577c578 100644 (file)
@@ -426,7 +426,8 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
 
         /* anything blocking for a tx descriptor? */
-        if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
+        if (!kqswnal_data.kqn_shuttingdown &&
+            !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
         {
                 CDEBUG(D_NET,"wakeup fwd\n");
 
@@ -460,6 +461,9 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
         for (;;) {
                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
 
+                if (kqswnal_data.kqn_shuttingdown)
+                        break;
+
                 /* "normal" descriptor is free */
                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
@@ -467,14 +471,8 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
                         break;
                 }
 
-                /* "normal" descriptor pool is empty */
-
-                if (fwd != NULL) { /* forwarded packet => queue for idle txd */
-                        CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
-                        list_add_tail (&fwd->kprfd_list,
-                                       &kqswnal_data.kqn_idletxd_fwdq);
+                if (fwd != NULL)                /* forwarded packet? */
                         break;
-                }
 
                 /* doing a local transmit */
                 if (!may_block) {
@@ -494,13 +492,20 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 
                 CDEBUG (D_NET, "blocking for tx desc\n");
                 wait_event (kqswnal_data.kqn_idletxd_waitq,
-                            !list_empty (&kqswnal_data.kqn_idletxds));
+                            !list_empty (&kqswnal_data.kqn_idletxds) ||
+                            kqswnal_data.kqn_shuttingdown);
         }
 
         if (ktx != NULL) {
                 list_del (&ktx->ktx_list);
                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
                 ktx->ktx_launcher = current->pid;
+                atomic_inc(&kqswnal_data.kqn_pending_txs);
+        } else if (fwd != NULL) {
+                /* queue forwarded packet until idle txd available */
+                CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
+                list_add_tail (&fwd->kprfd_list,
+                               &kqswnal_data.kqn_idletxd_fwdq);
         }
 
         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
@@ -601,6 +606,9 @@ kqswnal_launch (kqswnal_tx_t *ktx)
 
         ktx->ktx_launchtime = jiffies;
 
+        if (kqswnal_data.kqn_shuttingdown)
+                return (-ESHUTDOWN);
+
         LASSERT (dest >= 0);                    /* must be a peer */
         if (ktx->ktx_state == KTX_GETTING) {
                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
@@ -635,8 +643,6 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                 return (0);
 
         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
-                LASSERT (in_interrupt());
-
                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
@@ -921,7 +927,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         LASSERT (payload_kiov == NULL || !in_interrupt ());
         /* payload is either all vaddrs or all pages */
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-        
+
         if (payload_nob > KQSW_MAXPAYLOAD) {
                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
                         payload_nob, KQSW_MAXPAYLOAD);
@@ -967,19 +973,17 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                "nid "LPX64" via "LPX64" elanID %d\n",
                                nid, targetnid,
                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        return (PTL_FAIL);
+                        rc = -EINVAL;
+                        goto out;
                 }
 
                 /* peer expects RPC completion with GET data */
                 rc = kqswnal_dma_reply (ktx, payload_niov, 
                                         payload_iov, payload_kiov, 
                                         payload_offset, payload_nob);
-                if (rc == 0)
-                        return (PTL_OK);
-                
-                CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
-                kqswnal_put_idle_tx (ktx);
-                return (PTL_FAIL);
+                if (rc != 0)
+                        CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+                goto out;
         }
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
@@ -1052,11 +1056,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 else
                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
                                                  md->md_niov, md->md_iov.iov);
-
-                if (rc < 0) {
-                        kqswnal_put_idle_tx (ktx);
-                        return (PTL_FAIL);
-                }
+                if (rc != 0)
+                        goto out;
 
                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
@@ -1119,25 +1120,26 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 else
                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
                                                  payload_niov, payload_iov);
-                if (rc != 0) {
-                        kqswnal_put_idle_tx (ktx);
-                        return (PTL_FAIL);
-                }
+                if (rc != 0)
+                        goto out;
         }
         
         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
-        if (rc != 0) {                    /* failed? */
-                CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
+
+ out:
+        CDEBUG(rc == 0 ? D_NET : D_ERROR, 
+               "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", 
+               rc == 0 ? "Sent" : "Failed to send",
+               payload_nob, nid, targetnid, rc);
+
+        if (rc != 0)
                 kqswnal_put_idle_tx (ktx);
-                return (PTL_FAIL);
-        }
 
-        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
-               payload_nob, nid, targetnid);
-        return (PTL_OK);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc == 0 ? PTL_OK : PTL_FAIL);
 }
 
 static ptl_err_t
@@ -1204,7 +1206,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (kqswnal_nid2elanid (nid) < 0) {
                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
                 rc = -EHOSTUNREACH;
-                goto failed;
+                goto out;
         }
 
         /* copy hdr into pre-mapped buffer */
@@ -1244,20 +1246,20 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 #endif
                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
-                        goto failed;
+                        goto out;
         }
 
         rc = kqswnal_launch (ktx);
-        if (rc == 0)
-                return;
+ out:
+        if (rc != 0) {
+                CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
 
- failed:
-        LASSERT (rc != 0);
-        CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+                kqswnal_put_idle_tx (ktx);
+                /* complete now (with failure) */
+                kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+        }
 
-        kqswnal_put_idle_tx (ktx);
-        /* complete now (with failure) */
-        kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
 }
 
 void
@@ -1727,7 +1729,6 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
-        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1746,7 +1747,6 @@ kqswnal_scheduler (void *arg)
         long             flags;
         int              rc;
         int              counter = 0;
-        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1756,18 +1756,6 @@ kqswnal_scheduler (void *arg)
 
         for (;;)
         {
-                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
-
-                        if (kqswnal_data.kqn_shuttingdown == 2)
-                                break;
-                
-                        /* During stage 1 of shutdown we are still responsive
-                         * to receives */
-
-                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
-                        shuttingdown = kqswnal_data.kqn_shuttingdown;
-                }
-
                 did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
@@ -1784,8 +1772,7 @@ kqswnal_scheduler (void *arg)
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!shuttingdown &&
-                    !list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1794,31 +1781,31 @@ kqswnal_scheduler (void *arg)
                                                flags);
 
                         rc = kqswnal_launch (ktx);
-                        if (rc != 0)          /* failed: ktx_nid down? */
-                        {
+                        if (rc != 0) {
                                 CERROR("Failed delayed transmit to "LPX64
                                        ": %d\n", ktx->ktx_nid, rc);
                                 kqswnal_tx_done (ktx, rc);
                         }
+                        atomic_dec (&kqswnal_data.kqn_pending_txs);
 
                         did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!shuttingdown &
-                    !list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
 
+                        /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
                         kqswnal_fwd_packet (NULL, fwd);
 
                         did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                    /* nothing to do or hogging CPU */
+                /* nothing to do or hogging CPU */
                 if (!did_something || counter++ == KQSW_RESCHED) {
                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
                                                flags);
@@ -1826,8 +1813,13 @@ kqswnal_scheduler (void *arg)
                         counter = 0;
 
                         if (!did_something) {
+                                if (kqswnal_data.kqn_shuttingdown == 2) {
+                                        /* We only exit in stage 2 of shutdown when 
+                                         * there's nothing left to do */
+                                        break;
+                                }
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown == 2 ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
@@ -1839,8 +1831,6 @@ kqswnal_scheduler (void *arg)
                 }
         }
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
         kqswnal_thread_fini ();
         return (0);
 }
index d0dbf0a..27aab67 100644 (file)
@@ -289,18 +289,9 @@ kpr_shutdown_nal (void *arg)
        LASSERT (!ne->kpne_shutdown);
        LASSERT (!in_interrupt());
 
-       write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */
+       write_lock_irqsave (&kpr_rwlock, flags);
        ne->kpne_shutdown = 1;
-       write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */
-
-       while (atomic_read (&ne->kpne_refcount) != 0)
-       {
-               CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
-                       ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
-
-               set_current_state (TASK_UNINTERRUPTIBLE);
-               schedule_timeout (HZ);
-       }
+       write_unlock_irqrestore (&kpr_rwlock, flags);
 }
 
 void
@@ -312,15 +303,22 @@ kpr_deregister_nal (void *arg)
         CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
 
        LASSERT (ne->kpne_shutdown);            /* caller must have issued shutdown already */
-       LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */
        LASSERT (!in_interrupt());
 
        write_lock_irqsave (&kpr_rwlock, flags);
-
        list_del (&ne->kpne_list);
-
        write_unlock_irqrestore (&kpr_rwlock, flags);
 
+        /* Wait until all outstanding messages/notifications have completed */
+       while (atomic_read (&ne->kpne_refcount) != 0)
+       {
+               CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
+                       ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
+
+               set_current_state (TASK_UNINTERRUPTIBLE);
+               schedule_timeout (HZ);
+       }
+
        PORTAL_FREE (ne, sizeof (*ne));
         PORTAL_MODULE_UNUSE;
 }
@@ -377,12 +375,15 @@ kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob,
 
         CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, 
                 ne->kpne_interface.kprni_nalid);
-
-       if (ne->kpne_shutdown)          /* caller is shutting down */
-               return (-ENOENT);
+        LASSERT (!in_interrupt());
 
        read_lock (&kpr_rwlock);
 
+       if (ne->kpne_shutdown) {        /* caller is shutting down */
+                read_unlock (&kpr_rwlock);
+               return (-ENOENT);
+        }
+
        /* Search routes for one that has a gateway to target_nid on the callers network */
 
         list_for_each (e, &kpr_routes) {
@@ -452,25 +453,26 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
        struct list_head    *e;
         kpr_route_entry_t   *re;
         kpr_nal_entry_t     *tmp_ne;
+        int                  rc;
 
         CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
                 target_nid, src_ne->kpne_interface.kprni_nalid);
 
         LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov));
-        
-        atomic_inc (&kpr_queue_depth);
-       atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
+        LASSERT (!in_interrupt());
+
+       read_lock (&kpr_rwlock);
 
         kpr_fwd_packets++;                   /* (loose) stats accounting */
         kpr_fwd_bytes += nob + sizeof(ptl_hdr_t);
 
-       if (src_ne->kpne_shutdown)           /* caller is shutting down */
+       if (src_ne->kpne_shutdown) {         /* caller is shutting down */
+                rc = -ESHUTDOWN;
                goto out;
+        }
 
        fwd->kprfd_router_arg = src_ne;      /* stash caller's nal entry */
 
-       read_lock (&kpr_rwlock);
-
        /* Search routes for one that has a gateway to target_nid NOT on the caller's network */
 
         list_for_each (e, &kpr_routes) {
@@ -507,7 +509,9 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
                 kpr_update_weight (ge, nob);
 
                 fwd->kprfd_gateway_nid = ge->kpge_nid;
-                atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
+                atomic_inc (&src_ne->kpne_refcount); /* source and dest nals are */
+                atomic_inc (&dst_ne->kpne_refcount); /* busy until fwd completes */
+                atomic_inc (&kpr_queue_depth);
 
                 read_unlock (&kpr_rwlock);
 
@@ -520,18 +524,16 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
                 return;
        }
 
-        read_unlock (&kpr_rwlock);
+        rc = -EHOSTUNREACH;
  out:
         kpr_fwd_errors++;
 
-        CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
-                target_nid, src_ne->kpne_interface.kprni_nalid);
+        CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d: %d\n", 
+                fwd, target_nid, src_ne->kpne_interface.kprni_nalid, rc);
 
-       /* Can't find anywhere to forward to */
-       (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH);
+       (fwd->kprfd_callback)(fwd->kprfd_callback_arg, rc);
 
-        atomic_dec (&kpr_queue_depth);
-       atomic_dec (&src_ne->kpne_refcount);
+        read_unlock (&kpr_rwlock);
 }
 
 void
@@ -699,6 +701,7 @@ kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid,
 {
        struct list_head  *e;
 
+        LASSERT (!in_interrupt());
        read_lock(&kpr_rwlock);
 
         for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
index a386eef..aeadd31 100644 (file)
@@ -222,6 +222,9 @@ kqswnal_cmd (struct portals_cfg *pcfg, void *private)
 void __exit
 kqswnal_finalise (void)
 {
+       unsigned long flags;
+       int           do_ptl_fini = 0;
+
        switch (kqswnal_data.kqn_init)
        {
        default:
@@ -237,8 +240,7 @@ kqswnal_finalise (void)
                /* fall through */
 
        case KQN_INIT_PTL:
-               PtlNIFini (kqswnal_ni);
-               lib_fini (&kqswnal_lib);
+               do_ptl_fini = 1;
                /* fall through */
 
        case KQN_INIT_DATA:
@@ -249,18 +251,24 @@ kqswnal_finalise (void)
        }
 
        /**********************************************************************/
-       /* Make router stop her calling me and fail any more call-ins */
+       /* Tell router we're shutting down.  Any router calls my threads
+        * make will now fail immediately and the router will stop calling
+        * into me. */
        kpr_shutdown (&kqswnal_data.kqn_router);
-
+       
        /**********************************************************************/
-       /* flag threads we've started to terminate and wait for all to ack */
-
+       /* Signal the start of shutdown... */
+       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
        kqswnal_data.kqn_shuttingdown = 1;
-       wake_up_all (&kqswnal_data.kqn_sched_waitq);
+       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
+
+       wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
 
-       while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
-               CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
-                      atomic_read (&kqswnal_data.kqn_nthreads_running));
+       /**********************************************************************/
+       /* wait for sends that have allocated a tx desc to launch or give up */
+       while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
+               CDEBUG(D_NET, "waiting for %d pending sends\n",
+                      atomic_read (&kqswnal_data.kqn_pending_txs));
                set_current_state (TASK_UNINTERRUPTIBLE);
                schedule_timeout (HZ);
        }
@@ -268,18 +276,27 @@ kqswnal_finalise (void)
        /**********************************************************************/
        /* close elan comms */
 #if MULTIRAIL_EKC
+       /* Shut down receivers first; rx callbacks might try sending... */
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_free_rcvr (kqswnal_data.kqn_eprx_small);
 
        if (kqswnal_data.kqn_eprx_large != NULL)
                ep_free_rcvr (kqswnal_data.kqn_eprx_large);
 
+       /* NB ep_free_rcvr() returns only after we've freed off all receive
+        * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
+        * means we must have completed any messages we passed to
+        * lib_parse() or kpr_fwd_start(). */
+
        if (kqswnal_data.kqn_eptx != NULL)
                ep_free_xmtr (kqswnal_data.kqn_eptx);
 
-       /* freeing the xmtr completes all txs pdq */
+       /* NB ep_free_xmtr() returns only after all outstanding transmits
+        * have called their callback... */
        LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
 #else
+       /* "Old" EKC just pretends to shutdown cleanly but actually
+        * provides no guarantees */
        if (kqswnal_data.kqn_eprx_small != NULL)
                ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
 
@@ -298,7 +315,6 @@ kqswnal_finalise (void)
 #endif
        /**********************************************************************/
        /* flag threads to terminate, wake them and wait for them to die */
-
        kqswnal_data.kqn_shuttingdown = 2;
        wake_up_all (&kqswnal_data.kqn_sched_waitq);
 
@@ -316,10 +332,12 @@ kqswnal_finalise (void)
 
 #if MULTIRAIL_EKC
        LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
+       LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
+       LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
 #endif
 
        /**********************************************************************/
-       /* Complete any blocked forwarding packets with error
+       /* Complete any blocked forwarding packets, with error
         */
 
        while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
@@ -327,23 +345,18 @@ kqswnal_finalise (void)
                kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
                                                  kpr_fwd_desc_t, kprfd_list);
                list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
-       }
-
-       while (!list_empty (&kqswnal_data.kqn_delayedfwds))
-       {
-               kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
-                                                 kpr_fwd_desc_t, kprfd_list);
-               list_del (&fwd->kprfd_list);
-               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
+               kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
        }
 
        /**********************************************************************/
-       /* Wait for router to complete any packets I sent her
-        */
+       /* finalise router and portals lib */
 
        kpr_deregister (&kqswnal_data.kqn_router);
 
+       if (do_ptl_fini) {
+               PtlNIFini (kqswnal_ni);
+               lib_fini (&kqswnal_lib);
+       }
 
        /**********************************************************************/
        /* Unmap message buffers and free all descriptors and buffers
index 5e32887..93bf584 100644 (file)
@@ -196,8 +196,7 @@ typedef struct
 {
         char               kqn_init;            /* what's been initialised */
         char               kqn_shuttingdown;    /* I'm trying to shut down */
-        atomic_t           kqn_nthreads;        /* # threads not terminated */
-        atomic_t           kqn_nthreads_running;/* # threads still running */
+        atomic_t           kqn_nthreads;        /* # threads running */
 
         int                kqn_optimized_gets;  /* optimized GETs? */
         int                kqn_copy_small_fwd;  /* fwd small msgs from pre-allocated buffer? */
@@ -214,6 +213,7 @@ typedef struct
         spinlock_t         kqn_idletxd_lock;    /* serialise idle txd access */
         wait_queue_head_t  kqn_idletxd_waitq;   /* sender blocks here waiting for idle txd */
         struct list_head   kqn_idletxd_fwdq;    /* forwarded packets block here waiting for idle txd */
+        atomic_t           kqn_pending_txs;     /* # transmits being prepped */
         
         spinlock_t         kqn_sched_lock;      /* serialise packet schedulers */
         wait_queue_head_t  kqn_sched_waitq;     /* scheduler blocks here */
index 61c88f6..577c578 100644 (file)
@@ -426,7 +426,8 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
 
         /* anything blocking for a tx descriptor? */
-        if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
+        if (!kqswnal_data.kqn_shuttingdown &&
+            !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
         {
                 CDEBUG(D_NET,"wakeup fwd\n");
 
@@ -460,6 +461,9 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
         for (;;) {
                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
 
+                if (kqswnal_data.kqn_shuttingdown)
+                        break;
+
                 /* "normal" descriptor is free */
                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
@@ -467,14 +471,8 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
                         break;
                 }
 
-                /* "normal" descriptor pool is empty */
-
-                if (fwd != NULL) { /* forwarded packet => queue for idle txd */
-                        CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
-                        list_add_tail (&fwd->kprfd_list,
-                                       &kqswnal_data.kqn_idletxd_fwdq);
+                if (fwd != NULL)                /* forwarded packet? */
                         break;
-                }
 
                 /* doing a local transmit */
                 if (!may_block) {
@@ -494,13 +492,20 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 
                 CDEBUG (D_NET, "blocking for tx desc\n");
                 wait_event (kqswnal_data.kqn_idletxd_waitq,
-                            !list_empty (&kqswnal_data.kqn_idletxds));
+                            !list_empty (&kqswnal_data.kqn_idletxds) ||
+                            kqswnal_data.kqn_shuttingdown);
         }
 
         if (ktx != NULL) {
                 list_del (&ktx->ktx_list);
                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
                 ktx->ktx_launcher = current->pid;
+                atomic_inc(&kqswnal_data.kqn_pending_txs);
+        } else if (fwd != NULL) {
+                /* queue forwarded packet until idle txd available */
+                CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
+                list_add_tail (&fwd->kprfd_list,
+                               &kqswnal_data.kqn_idletxd_fwdq);
         }
 
         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
@@ -601,6 +606,9 @@ kqswnal_launch (kqswnal_tx_t *ktx)
 
         ktx->ktx_launchtime = jiffies;
 
+        if (kqswnal_data.kqn_shuttingdown)
+                return (-ESHUTDOWN);
+
         LASSERT (dest >= 0);                    /* must be a peer */
         if (ktx->ktx_state == KTX_GETTING) {
                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
@@ -635,8 +643,6 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                 return (0);
 
         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
-                LASSERT (in_interrupt());
-
                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
@@ -921,7 +927,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         LASSERT (payload_kiov == NULL || !in_interrupt ());
         /* payload is either all vaddrs or all pages */
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-        
+
         if (payload_nob > KQSW_MAXPAYLOAD) {
                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
                         payload_nob, KQSW_MAXPAYLOAD);
@@ -967,19 +973,17 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                "nid "LPX64" via "LPX64" elanID %d\n",
                                nid, targetnid,
                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        return (PTL_FAIL);
+                        rc = -EINVAL;
+                        goto out;
                 }
 
                 /* peer expects RPC completion with GET data */
                 rc = kqswnal_dma_reply (ktx, payload_niov, 
                                         payload_iov, payload_kiov, 
                                         payload_offset, payload_nob);
-                if (rc == 0)
-                        return (PTL_OK);
-                
-                CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
-                kqswnal_put_idle_tx (ktx);
-                return (PTL_FAIL);
+                if (rc != 0)
+                        CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+                goto out;
         }
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
@@ -1052,11 +1056,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 else
                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
                                                  md->md_niov, md->md_iov.iov);
-
-                if (rc < 0) {
-                        kqswnal_put_idle_tx (ktx);
-                        return (PTL_FAIL);
-                }
+                if (rc != 0)
+                        goto out;
 
                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
@@ -1119,25 +1120,26 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 else
                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
                                                  payload_niov, payload_iov);
-                if (rc != 0) {
-                        kqswnal_put_idle_tx (ktx);
-                        return (PTL_FAIL);
-                }
+                if (rc != 0)
+                        goto out;
         }
         
         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
-        if (rc != 0) {                    /* failed? */
-                CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
+
+ out:
+        CDEBUG(rc == 0 ? D_NET : D_ERROR, 
+               "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", 
+               rc == 0 ? "Sent" : "Failed to send",
+               payload_nob, nid, targetnid, rc);
+
+        if (rc != 0)
                 kqswnal_put_idle_tx (ktx);
-                return (PTL_FAIL);
-        }
 
-        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
-               payload_nob, nid, targetnid);
-        return (PTL_OK);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc == 0 ? PTL_OK : PTL_FAIL);
 }
 
 static ptl_err_t
@@ -1204,7 +1206,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (kqswnal_nid2elanid (nid) < 0) {
                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
                 rc = -EHOSTUNREACH;
-                goto failed;
+                goto out;
         }
 
         /* copy hdr into pre-mapped buffer */
@@ -1244,20 +1246,20 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 #endif
                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
-                        goto failed;
+                        goto out;
         }
 
         rc = kqswnal_launch (ktx);
-        if (rc == 0)
-                return;
+ out:
+        if (rc != 0) {
+                CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
 
- failed:
-        LASSERT (rc != 0);
-        CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+                kqswnal_put_idle_tx (ktx);
+                /* complete now (with failure) */
+                kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+        }
 
-        kqswnal_put_idle_tx (ktx);
-        /* complete now (with failure) */
-        kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
 }
 
 void
@@ -1727,7 +1729,6 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
-        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1746,7 +1747,6 @@ kqswnal_scheduler (void *arg)
         long             flags;
         int              rc;
         int              counter = 0;
-        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1756,18 +1756,6 @@ kqswnal_scheduler (void *arg)
 
         for (;;)
         {
-                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
-
-                        if (kqswnal_data.kqn_shuttingdown == 2)
-                                break;
-                
-                        /* During stage 1 of shutdown we are still responsive
-                         * to receives */
-
-                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
-                        shuttingdown = kqswnal_data.kqn_shuttingdown;
-                }
-
                 did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
@@ -1784,8 +1772,7 @@ kqswnal_scheduler (void *arg)
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!shuttingdown &&
-                    !list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1794,31 +1781,31 @@ kqswnal_scheduler (void *arg)
                                                flags);
 
                         rc = kqswnal_launch (ktx);
-                        if (rc != 0)          /* failed: ktx_nid down? */
-                        {
+                        if (rc != 0) {
                                 CERROR("Failed delayed transmit to "LPX64
                                        ": %d\n", ktx->ktx_nid, rc);
                                 kqswnal_tx_done (ktx, rc);
                         }
+                        atomic_dec (&kqswnal_data.kqn_pending_txs);
 
                         did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!shuttingdown &
-                    !list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
 
+                        /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
                         kqswnal_fwd_packet (NULL, fwd);
 
                         did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                    /* nothing to do or hogging CPU */
+                /* nothing to do or hogging CPU */
                 if (!did_something || counter++ == KQSW_RESCHED) {
                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
                                                flags);
@@ -1826,8 +1813,13 @@ kqswnal_scheduler (void *arg)
                         counter = 0;
 
                         if (!did_something) {
+                                if (kqswnal_data.kqn_shuttingdown == 2) {
+                                        /* We only exit in stage 2 of shutdown when 
+                                         * there's nothing left to do */
+                                        break;
+                                }
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown == 2 ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
@@ -1839,8 +1831,6 @@ kqswnal_scheduler (void *arg)
                 }
         }
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
         kqswnal_thread_fini ();
         return (0);
 }
index d0dbf0a..27aab67 100644 (file)
@@ -289,18 +289,9 @@ kpr_shutdown_nal (void *arg)
        LASSERT (!ne->kpne_shutdown);
        LASSERT (!in_interrupt());
 
-       write_lock_irqsave (&kpr_rwlock, flags); /* locking a bit spurious... */
+       write_lock_irqsave (&kpr_rwlock, flags);
        ne->kpne_shutdown = 1;
-       write_unlock_irqrestore (&kpr_rwlock, flags); /* except it's a memory barrier */
-
-       while (atomic_read (&ne->kpne_refcount) != 0)
-       {
-               CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
-                       ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
-
-               set_current_state (TASK_UNINTERRUPTIBLE);
-               schedule_timeout (HZ);
-       }
+       write_unlock_irqrestore (&kpr_rwlock, flags);
 }
 
 void
@@ -312,15 +303,22 @@ kpr_deregister_nal (void *arg)
         CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
 
        LASSERT (ne->kpne_shutdown);            /* caller must have issued shutdown already */
-       LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */
        LASSERT (!in_interrupt());
 
        write_lock_irqsave (&kpr_rwlock, flags);
-
        list_del (&ne->kpne_list);
-
        write_unlock_irqrestore (&kpr_rwlock, flags);
 
+        /* Wait until all outstanding messages/notifications have completed */
+       while (atomic_read (&ne->kpne_refcount) != 0)
+       {
+               CDEBUG (D_NET, "Waiting for refcount on NAL %d to reach zero (%d)\n",
+                       ne->kpne_interface.kprni_nalid, atomic_read (&ne->kpne_refcount));
+
+               set_current_state (TASK_UNINTERRUPTIBLE);
+               schedule_timeout (HZ);
+       }
+
        PORTAL_FREE (ne, sizeof (*ne));
         PORTAL_MODULE_UNUSE;
 }
@@ -377,12 +375,15 @@ kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob,
 
         CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, 
                 ne->kpne_interface.kprni_nalid);
-
-       if (ne->kpne_shutdown)          /* caller is shutting down */
-               return (-ENOENT);
+        LASSERT (!in_interrupt());
 
        read_lock (&kpr_rwlock);
 
+       if (ne->kpne_shutdown) {        /* caller is shutting down */
+                read_unlock (&kpr_rwlock);
+               return (-ENOENT);
+        }
+
        /* Search routes for one that has a gateway to target_nid on the callers network */
 
         list_for_each (e, &kpr_routes) {
@@ -452,25 +453,26 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
        struct list_head    *e;
         kpr_route_entry_t   *re;
         kpr_nal_entry_t     *tmp_ne;
+        int                  rc;
 
         CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
                 target_nid, src_ne->kpne_interface.kprni_nalid);
 
         LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov));
-        
-        atomic_inc (&kpr_queue_depth);
-       atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
+        LASSERT (!in_interrupt());
+
+       read_lock (&kpr_rwlock);
 
         kpr_fwd_packets++;                   /* (loose) stats accounting */
         kpr_fwd_bytes += nob + sizeof(ptl_hdr_t);
 
-       if (src_ne->kpne_shutdown)           /* caller is shutting down */
+       if (src_ne->kpne_shutdown) {         /* caller is shutting down */
+                rc = -ESHUTDOWN;
                goto out;
+        }
 
        fwd->kprfd_router_arg = src_ne;      /* stash caller's nal entry */
 
-       read_lock (&kpr_rwlock);
-
        /* Search routes for one that has a gateway to target_nid NOT on the caller's network */
 
         list_for_each (e, &kpr_routes) {
@@ -507,7 +509,9 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
                 kpr_update_weight (ge, nob);
 
                 fwd->kprfd_gateway_nid = ge->kpge_nid;
-                atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
+                atomic_inc (&src_ne->kpne_refcount); /* source and dest nals are */
+                atomic_inc (&dst_ne->kpne_refcount); /* busy until fwd completes */
+                atomic_inc (&kpr_queue_depth);
 
                 read_unlock (&kpr_rwlock);
 
@@ -520,18 +524,16 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
                 return;
        }
 
-        read_unlock (&kpr_rwlock);
+        rc = -EHOSTUNREACH;
  out:
         kpr_fwd_errors++;
 
-        CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
-                target_nid, src_ne->kpne_interface.kprni_nalid);
+        CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d: %d\n", 
+                fwd, target_nid, src_ne->kpne_interface.kprni_nalid, rc);
 
-       /* Can't find anywhere to forward to */
-       (fwd->kprfd_callback)(fwd->kprfd_callback_arg, -EHOSTUNREACH);
+       (fwd->kprfd_callback)(fwd->kprfd_callback_arg, rc);
 
-        atomic_dec (&kpr_queue_depth);
-       atomic_dec (&src_ne->kpne_refcount);
+        read_unlock (&kpr_rwlock);
 }
 
 void
@@ -699,6 +701,7 @@ kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid,
 {
        struct list_head  *e;
 
+        LASSERT (!in_interrupt());
        read_lock(&kpr_rwlock);
 
         for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {