Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal_cb.c
index 61c88f6..2bcb853 100644 (file)
@@ -426,7 +426,8 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
 
         /* anything blocking for a tx descriptor? */
-        if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
+        if (!kqswnal_data.kqn_shuttingdown &&
+            !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
         {
                 CDEBUG(D_NET,"wakeup fwd\n");
 
@@ -460,6 +461,9 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
         for (;;) {
                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
 
+                if (kqswnal_data.kqn_shuttingdown)
+                        break;
+
                 /* "normal" descriptor is free */
                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
@@ -467,14 +471,8 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
                         break;
                 }
 
-                /* "normal" descriptor pool is empty */
-
-                if (fwd != NULL) { /* forwarded packet => queue for idle txd */
-                        CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
-                        list_add_tail (&fwd->kprfd_list,
-                                       &kqswnal_data.kqn_idletxd_fwdq);
+                if (fwd != NULL)                /* forwarded packet? */
                         break;
-                }
 
                 /* doing a local transmit */
                 if (!may_block) {
@@ -494,13 +492,20 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 
                 CDEBUG (D_NET, "blocking for tx desc\n");
                 wait_event (kqswnal_data.kqn_idletxd_waitq,
-                            !list_empty (&kqswnal_data.kqn_idletxds));
+                            !list_empty (&kqswnal_data.kqn_idletxds) ||
+                            kqswnal_data.kqn_shuttingdown);
         }
 
         if (ktx != NULL) {
                 list_del (&ktx->ktx_list);
                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
                 ktx->ktx_launcher = current->pid;
+                atomic_inc(&kqswnal_data.kqn_pending_txs);
+        } else if (fwd != NULL) {
+                /* queue forwarded packet until idle txd available */
+                CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
+                list_add_tail (&fwd->kprfd_list,
+                               &kqswnal_data.kqn_idletxd_fwdq);
         }
 
         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
@@ -601,6 +606,9 @@ kqswnal_launch (kqswnal_tx_t *ktx)
 
         ktx->ktx_launchtime = jiffies;
 
+        if (kqswnal_data.kqn_shuttingdown)
+                return (-ESHUTDOWN);
+
         LASSERT (dest >= 0);                    /* must be a peer */
         if (ktx->ktx_state == KTX_GETTING) {
                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
@@ -635,8 +643,6 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                 return (0);
 
         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
-                LASSERT (in_interrupt());
-
                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
@@ -921,7 +927,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         LASSERT (payload_kiov == NULL || !in_interrupt ());
         /* payload is either all vaddrs or all pages */
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-        
+
         if (payload_nob > KQSW_MAXPAYLOAD) {
                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
                         payload_nob, KQSW_MAXPAYLOAD);
@@ -967,19 +973,17 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                "nid "LPX64" via "LPX64" elanID %d\n",
                                nid, targetnid,
                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        return (PTL_FAIL);
+                        rc = -EINVAL;
+                        goto out;
                 }
 
                 /* peer expects RPC completion with GET data */
                 rc = kqswnal_dma_reply (ktx, payload_niov, 
                                         payload_iov, payload_kiov, 
                                         payload_offset, payload_nob);
-                if (rc == 0)
-                        return (PTL_OK);
-                
-                CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
-                kqswnal_put_idle_tx (ktx);
-                return (PTL_FAIL);
+                if (rc != 0)
+                        CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+                goto out;
         }
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
@@ -1023,7 +1027,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
 
-        if (kqswnal_data.kqn_optimized_gets &&
+        if (kqswnal_tunables.kqn_optimized_gets &&
             type == PTL_MSG_GET &&              /* doing a GET */
             nid == targetnid) {                 /* not forwarding */
                 lib_md_t           *md = libmsg->md;
@@ -1052,11 +1056,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 else
                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
                                                  md->md_niov, md->md_iov.iov);
-
-                if (rc < 0) {
-                        kqswnal_put_idle_tx (ktx);
-                        return (PTL_FAIL);
-                }
+                if (rc != 0)
+                        goto out;
 
                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
@@ -1119,25 +1120,26 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 else
                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
                                                  payload_niov, payload_iov);
-                if (rc != 0) {
-                        kqswnal_put_idle_tx (ktx);
-                        return (PTL_FAIL);
-                }
+                if (rc != 0)
+                        goto out;
         }
         
         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
-        if (rc != 0) {                    /* failed? */
-                CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
+
+ out:
+        CDEBUG(rc == 0 ? D_NET : D_ERROR, 
+               "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", 
+               rc == 0 ? "Sent" : "Failed to send",
+               payload_nob, nid, targetnid, rc);
+
+        if (rc != 0)
                 kqswnal_put_idle_tx (ktx);
-                return (PTL_FAIL);
-        }
 
-        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
-               payload_nob, nid, targetnid);
-        return (PTL_OK);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc == 0 ? PTL_OK : PTL_FAIL);
 }
 
 static ptl_err_t
@@ -1204,7 +1206,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (kqswnal_nid2elanid (nid) < 0) {
                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
                 rc = -EHOSTUNREACH;
-                goto failed;
+                goto out;
         }
 
         /* copy hdr into pre-mapped buffer */
@@ -1244,20 +1246,20 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 #endif
                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
-                        goto failed;
+                        goto out;
         }
 
         rc = kqswnal_launch (ktx);
-        if (rc == 0)
-                return;
+ out:
+        if (rc != 0) {
+                CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
 
- failed:
-        LASSERT (rc != 0);
-        CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+                kqswnal_put_idle_tx (ktx);
+                /* complete now (with failure) */
+                kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+        }
 
-        kqswnal_put_idle_tx (ktx);
-        /* complete now (with failure) */
-        kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
 }
 
 void
@@ -1727,7 +1729,6 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
-        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1746,7 +1747,6 @@ kqswnal_scheduler (void *arg)
         long             flags;
         int              rc;
         int              counter = 0;
-        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1756,18 +1756,6 @@ kqswnal_scheduler (void *arg)
 
         for (;;)
         {
-                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
-
-                        if (kqswnal_data.kqn_shuttingdown == 2)
-                                break;
-                
-                        /* During stage 1 of shutdown we are still responsive
-                         * to receives */
-
-                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
-                        shuttingdown = kqswnal_data.kqn_shuttingdown;
-                }
-
                 did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
@@ -1784,8 +1772,7 @@ kqswnal_scheduler (void *arg)
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!shuttingdown &&
-                    !list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1794,31 +1781,31 @@ kqswnal_scheduler (void *arg)
                                                flags);
 
                         rc = kqswnal_launch (ktx);
-                        if (rc != 0)          /* failed: ktx_nid down? */
-                        {
+                        if (rc != 0) {
                                 CERROR("Failed delayed transmit to "LPX64
                                        ": %d\n", ktx->ktx_nid, rc);
                                 kqswnal_tx_done (ktx, rc);
                         }
+                        atomic_dec (&kqswnal_data.kqn_pending_txs);
 
                         did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!shuttingdown &
-                    !list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
 
+                        /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
                         kqswnal_fwd_packet (NULL, fwd);
 
                         did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                    /* nothing to do or hogging CPU */
+                /* nothing to do or hogging CPU */
                 if (!did_something || counter++ == KQSW_RESCHED) {
                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
                                                flags);
@@ -1826,21 +1813,24 @@ kqswnal_scheduler (void *arg)
                         counter = 0;
 
                         if (!did_something) {
+                                if (kqswnal_data.kqn_shuttingdown == 2) {
+                                        /* We only exit in stage 2 of shutdown when 
+                                         * there's nothing left to do */
+                                        break;
+                                }
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown == 2 ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
                                 LASSERT (rc == 0);
-                        } else if (current->need_resched)
+                        } else if (need_resched())
                                 schedule ();
 
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
         }
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
         kqswnal_thread_fini ();
         return (0);
 }