Whamcloud - gitweb
* Always hold kss_lock while setting/inspecting ksnc_{rx,tx}_ready.
authoreeb <eeb>
Tue, 25 Nov 2003 17:22:29 +0000 (17:22 +0000)
committereeb <eeb>
Tue, 25 Nov 2003 17:22:29 +0000 (17:22 +0000)
*    Removed kss_lock ops from ksocknal_process_{transmit,receive}().
     ksocknal_scheduler() does this locking now.

*    Included the --single_socket lconf flag.

lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd_cb.c
lustre/portals/knals/socknal/socknal.c
lustre/portals/knals/socknal/socknal_cb.c

index da47785..6f6fa7e 100644 (file)
@@ -914,15 +914,34 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
          * destroy it. */
         unsigned long   flags;
         ksock_peer_t   *peer = conn->ksnc_peer;
+        ksock_sched_t  *sched = conn->ksnc_scheduler;
         struct timeval  now;
         time_t          then = 0;
         int             notify = 0;
 
+        LASSERT(conn->ksnc_closing);
+
+        /* wake up the scheduler to "send" all remaining packets to /dev/null */
+        spin_lock_irqsave(&sched->kss_lock, flags);
+
+        if (!conn->ksnc_tx_scheduled &&
+            !list_empty(&conn->ksnc_tx_queue)){
+                list_add_tail (&conn->ksnc_tx_list,
+                               &sched->kss_tx_conns);
+                /* a closing conn is always ready to tx */
+                conn->ksnc_tx_ready = 1;
+                conn->ksnc_tx_scheduled = 1;
+                /* extra ref for scheduler */
+                atomic_inc (&conn->ksnc_refcount);
+
+                wake_up (&sched->kss_waitq);
+        }
+
+        spin_unlock_irqrestore (&sched->kss_lock, flags);
+
         /* serialise with callbacks */
         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
 
-        LASSERT (conn->ksnc_closing);
-        
         /* Remove conn's network callbacks.
          * NB I _have_ to restore the callback, rather than storing a noop,
          * since the socket could survive past this module being unloaded!! */
@@ -934,6 +953,8 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
          * sk_user_data is NULL. */
         conn->ksnc_sock->sk->sk_user_data = NULL;
 
+        /* OK, so this conn may not be completely disengaged from its
+         * scheduler yet, but it _has_ committed to terminate... */
         conn->ksnc_scheduler->kss_nconns--;
 
         if (peer->ksnp_error != 0) {
@@ -970,27 +991,20 @@ ksocknal_destroy_conn (ksock_conn_t *conn)
         LASSERT (conn->ksnc_route == NULL);
         LASSERT (!conn->ksnc_tx_scheduled);
         LASSERT (!conn->ksnc_rx_scheduled);
-
-        /* complete queued packets */
-        while (!list_empty (&conn->ksnc_tx_queue)) {
-                ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
-                                             ksock_tx_t, tx_list);
-
-                CERROR ("Deleting packet %p type %d len %d ("LPX64"->"LPX64")\n",
-                        tx,
-                        NTOH__u32 (tx->tx_hdr->type),
-                        NTOH__u32 (tx->tx_hdr->payload_length),
-                        NTOH__u64 (tx->tx_hdr->src_nid),
-                        NTOH__u64 (tx->tx_hdr->dest_nid));
-
-                list_del (&tx->tx_list);
-                ksocknal_tx_done (tx, 0);
-        }
+        LASSERT (list_empty(&conn->ksnc_tx_queue));
 
         /* complete current receive if any */
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_BODY:
+#if 0
                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
+#else
+                CERROR ("Refusing to complete a partial receive from "
+                        LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid,
+                        conn->ksnc_ipaddr);
+                CERROR ("This may hang communications and "
+                        "prevent modules from unloading\n");
+#endif
                 break;
         case SOCKNAL_RX_BODY_FWD:
                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
index b6fdc18..c33d852 100644 (file)
@@ -691,71 +691,37 @@ ksocknal_tx_launched (ksock_tx_t *tx)
         ksocknal_tx_done (tx, 0);
 }
 
-void
-ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
+int
+ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
 {
-        ksock_conn_t  *conn;
-        ksock_tx_t    *tx;
         int            rc;
-        
-        LASSERT (!list_empty (&sched->kss_tx_conns));
-        conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
-        list_del (&conn->ksnc_tx_list);
-
-        LASSERT (conn->ksnc_tx_scheduled);
-        LASSERT (conn->ksnc_tx_ready);
-        LASSERT (!list_empty (&conn->ksnc_tx_queue));
-        tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
-        /* assume transmit will complete now, so dequeue while I've got lock */
-        list_del (&tx->tx_list);
-
-        spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
-
-        LASSERT (tx->tx_resid > 0);
-
-        conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
-        mb();                   /* => clear BEFORE trying to write */
-
+       
         rc = ksocknal_sendmsg (conn, tx);
 
         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
+        LASSERT (rc != -EAGAIN);
 
-        if (rc != 0) {
-                if (!conn->ksnc_closing)
-                        CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
-                                conn, rc, conn->ksnc_peer->ksnp_nid,
-                                conn->ksnc_ipaddr, conn->ksnc_port);
-                ksocknal_close_conn_and_siblings (conn, rc);
-
+        if (rc == 0) {
+                /* no errors */
+                if (tx->tx_resid != 0) {
+                        /* didn't send everything */
+                        return (-EAGAIN);
+                }
+                
                 ksocknal_tx_launched (tx);
-                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
-
-        } else if (tx->tx_resid == 0) {  
-                /* everything went; assume more can go, and avoid
-                 * write_space locking */
-                conn->ksnc_tx_ready = 1;
+                return (0);
+        }
 
-                ksocknal_tx_launched (tx);
-                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
-        } else {
-                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
+        if (!conn->ksnc_closing)
+                CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
+                        conn, rc, conn->ksnc_peer->ksnp_nid,
+                        conn->ksnc_ipaddr, conn->ksnc_port);
 
-                /* back onto HEAD of tx_queue */
-                list_add (&tx->tx_list, &conn->ksnc_tx_queue);
-        }
+        ksocknal_close_conn_and_siblings (conn, rc);
+        ksocknal_tx_launched (tx);
 
-        /* no space to write, or nothing to write? */
-        if (!conn->ksnc_tx_ready ||
-            list_empty (&conn->ksnc_tx_queue)) {
-                /* mark not scheduled */
-                conn->ksnc_tx_scheduled = 0;
-                /* drop scheduler's ref */
-                ksocknal_put_conn (conn);
-        } else {
-                /* stay scheduled */
-                list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
-        }
-}
+        return (-EAGAIN);
+} 
 
 void
 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
@@ -893,8 +859,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 
         conn->ksnc_tx_deadline = jiffies + 
                                  ksocknal_data.ksnd_io_timeout * HZ;
-        mb();
-        /* Extend deadline BEFORE tx is enqueued */
+        mb();                                   /* order with list_add_tail */
 
         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
                 
@@ -1527,25 +1492,13 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
         return (0);
 }
 
-void
-ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
+int
+ksocknal_process_receive (ksock_conn_t *conn)
 {
-        ksock_conn_t *conn;
         ksock_fmb_t  *fmb;
         int           rc;
-
-        /* NB: sched->ksnc_lock lock held */
-
-        LASSERT (!list_empty (&sched->kss_rx_conns));
-        conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list);
-        list_del (&conn->ksnc_rx_list);
-
-        spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
-
-        CDEBUG(D_NET, "sched %p conn %p\n", sched, conn);
+        
         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
-        LASSERT (conn->ksnc_rx_scheduled);
-        LASSERT (conn->ksnc_rx_ready);
 
         /* doesn't need a forwarding buffer */
         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
@@ -1553,13 +1506,15 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
  get_fmb:
         fmb = ksocknal_get_idle_fmb (conn);
-        if (fmb == NULL) {      /* conn descheduled waiting for idle fmb */
-                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
-                return;
+        if (fmb == NULL) {
+                /* conn descheduled waiting for idle fmb */
+                return (0);
         }
 
-        if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */
-                goto out;               /* come back later for next packet */
+        if (ksocknal_init_fmb (conn, fmb)) {
+                /* packet forwarded */
+                return (0);
+        }
 
  try_read:
         /* NB: sched lock NOT held */
@@ -1570,9 +1525,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
         LASSERT (conn->ksnc_rx_nob_wanted > 0);
 
-        conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */
-        mb();                   /* => clear BEFORE trying to read */
-
         rc = ksocknal_recvmsg(conn);
 
         if (rc <= 0) {
@@ -1584,17 +1536,16 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                         CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
                                 conn, rc, conn->ksnc_peer->ksnp_nid,
                                 conn->ksnc_ipaddr, conn->ksnc_port);
+
                 ksocknal_close_conn_and_siblings (conn, rc);
-                goto out;
+                return (rc == 0 ? -ESHUTDOWN : rc);
         }
 
+        if (conn->ksnc_rx_nob_wanted != 0) {
+                /* short read */
+                return (-EAGAIN);
+        }
         
-        if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
-                goto out;                       /* try again later */
-
-        /* got all I wanted, assume there's more - prevent data_ready locking */
-        conn->ksnc_rx_ready = 1;
-
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_HEADER:
                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
@@ -1603,7 +1554,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                         ksocknal_fwd_parse (conn);
                         switch (conn->ksnc_rx_state) {
                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
-                                goto out;       /* => come back later */
+                                return (0);     /* => come back later */
                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
                                 goto try_read;  /* => go read it */
                         case SOCKNAL_RX_GET_FMB: /* forwarding */
@@ -1631,7 +1582,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
         case SOCKNAL_RX_SLOP:
                 /* starting new packet? */
                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
-                        goto out;       /* come back later */
+                        return (0);     /* come back later */
                 goto try_read;          /* try to finish reading slop now */
 
         case SOCKNAL_RX_BODY_FWD:
@@ -1650,7 +1601,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                 LASSERT (conn->ksnc_rx_nob_left == 0);
 
                 ksocknal_new_packet (conn, 0);  /* on to next packet */
-                goto out;                       /* (later) */
+                return (0);                     /* (later) */
 
         default:
                 break;
@@ -1658,20 +1609,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
         /* Not Reached */
         LBUG ();
-
- out:
-        spin_lock_irqsave (&sched->kss_lock, *irq_flags);
-
-        /* no data there to read? */
-        if (!conn->ksnc_rx_ready) {
-                /* let socket callback schedule again */
-                conn->ksnc_rx_scheduled = 0;
-                /* drop scheduler's ref */
-                ksocknal_put_conn (conn);   
-        } else {
-                /* stay scheduled */
-                list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
-        }
+        return (-EINVAL);                       /* keep gcc happy */
 }
 
 int
@@ -1729,6 +1667,8 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
 int ksocknal_scheduler (void *arg)
 {
         ksock_sched_t     *sched = (ksock_sched_t *)arg;
+        ksock_conn_t      *conn;
+        ksock_tx_t        *tx;
         unsigned long      flags;
         int                rc;
         int                nloops = 0;
@@ -1761,15 +1701,94 @@ int ksocknal_scheduler (void *arg)
                 /* Ensure I progress everything semi-fairly */
 
                 if (!list_empty (&sched->kss_rx_conns)) {
+                        conn = list_entry(sched->kss_rx_conns.next,
+                                          ksock_conn_t, ksnc_rx_list);
+                        list_del(&conn->ksnc_rx_list);
+
+                        LASSERT(conn->ksnc_rx_scheduled);
+                        LASSERT(conn->ksnc_rx_ready);
+
+                        /* clear rx_ready in case receive isn't complete.
+                         * Do it BEFORE we call process_recv, since
+                         * data_ready can set it any time after we release
+                         * kss_lock. */
+                        conn->ksnc_rx_ready = 0;
+                        spin_unlock_irqrestore(&sched->kss_lock, flags);
+                        
+                        rc = ksocknal_process_receive(conn);
+                        
+                        spin_lock_irqsave(&sched->kss_lock, flags);
+
+                        /* I'm the only one that can clear this flag */
+                        LASSERT(conn->ksnc_rx_scheduled);
+
+                        /* Did process_receive get everything it wanted? */
+                        if (rc == 0)
+                                conn->ksnc_rx_ready = 1;
+                        
+                        if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP) {
+                                /* Conn is blocking for a forwarding
+                                 * buffer; it will get queued for me when
+                                 * one becomes available.  Meanwhile my ref
+                                 * on it stays put. */
+                        } else if (conn->ksnc_rx_ready) {
+                                /* reschedule for rx */
+                                list_add_tail (&conn->ksnc_rx_list,
+                                               &sched->kss_rx_conns);
+                        } else {
+                                conn->ksnc_rx_scheduled = 0;
+                                /* drop my ref */
+                                ksocknal_put_conn(conn);
+                        }
+
                         did_something = 1;
-                        /* drops & regains kss_lock */
-                        ksocknal_process_receive (sched, &flags);
                 }
 
                 if (!list_empty (&sched->kss_tx_conns)) {
+                        conn = list_entry(sched->kss_tx_conns.next,
+                                          ksock_conn_t, ksnc_tx_list);
+                        list_del (&conn->ksnc_tx_list);
+                        
+                        LASSERT(conn->ksnc_tx_scheduled);
+                        LASSERT(conn->ksnc_tx_ready);
+                        LASSERT(!list_empty(&conn->ksnc_tx_queue));
+                        
+                        tx = list_entry(conn->ksnc_tx_queue.next,
+                                        ksock_tx_t, tx_list);
+                        /* dequeue now so empty list => more to send */
+                        list_del(&tx->tx_list);
+                        
+                        /* Clear tx_ready in case send isn't complete.  Do
+                         * it BEFORE we call process_transmit, since
+                         * write_space can set it any time after we release
+                         * kss_lock. */
+                        conn->ksnc_tx_ready = 0;
+                        spin_unlock_irqrestore (&sched->kss_lock, flags);
+                        
+                        rc = ksocknal_process_transmit(conn, tx);
+                        
+                        spin_lock_irqsave (&sched->kss_lock, flags);
+
+                        if (rc != -EAGAIN) {
+                                /* error or everything went: assume more can go */
+                                conn->ksnc_tx_ready = 1;
+                        } else {
+                                 /* back onto HEAD of tx_queue */
+                                list_add (&tx->tx_list, &conn->ksnc_tx_queue);
+                        }
+                        
+                        if (conn->ksnc_tx_ready &&
+                            !list_empty (&conn->ksnc_tx_queue)) {
+                                /* reschedule for tx */
+                                list_add_tail (&conn->ksnc_tx_list, 
+                                               &sched->kss_tx_conns);
+                        } else {
+                                conn->ksnc_tx_scheduled = 0;
+                                /* drop my ref */
+                                ksocknal_put_conn (conn);
+                        }
+                                
                         did_something = 1;
-                        /* drops and regains kss_lock */
-                        ksocknal_process_transmit (sched, &flags);
                 }
 #if SOCKNAL_ZC
                 if (!list_empty (&sched->kss_zctxdone_list)) {
@@ -1833,18 +1852,11 @@ ksocknal_data_ready (struct sock *sk, int n)
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
                 sk->sk_data_ready (sk, n);
-                goto out;
-        }
-
-        if (!conn->ksnc_rx_ready) {        /* new news */
-                /* Set ASAP in case of concurrent calls to me */
-                conn->ksnc_rx_ready = 1;
-
+        } else {
                 sched = conn->ksnc_scheduler;
 
                 spin_lock_irqsave (&sched->kss_lock, flags);
 
-                /* Set again (process_receive may have cleared while I blocked for the lock) */
                 conn->ksnc_rx_ready = 1;
 
                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
@@ -1860,7 +1872,6 @@ ksocknal_data_ready (struct sock *sk, int n)
                 spin_unlock_irqrestore (&sched->kss_lock, flags);
         }
 
- out:
         read_unlock (&ksocknal_data.ksnd_global_lock);
 
         EXIT;
@@ -1898,31 +1909,24 @@ ksocknal_write_space (struct sock *sk)
         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
 
-                if (!conn->ksnc_tx_ready) {      /* new news */
-                        /* Set ASAP in case of concurrent calls to me */
-                        conn->ksnc_tx_ready = 1;
-
-                        sched = conn->ksnc_scheduler;
-
-                        spin_lock_irqsave (&sched->kss_lock, flags);
+                sched = conn->ksnc_scheduler;
 
-                        /* Set again (process_transmit may have
-                           cleared while I blocked for the lock) */
-                        conn->ksnc_tx_ready = 1;
+                spin_lock_irqsave (&sched->kss_lock, flags);
 
-                        if (!conn->ksnc_tx_scheduled && // not being progressed
-                            !list_empty(&conn->ksnc_tx_queue)){//packets to send
-                                list_add_tail (&conn->ksnc_tx_list,
-                                               &sched->kss_tx_conns);
-                                conn->ksnc_tx_scheduled = 1;
-                                /* extra ref for scheduler */
-                                atomic_inc (&conn->ksnc_refcount);
+                conn->ksnc_tx_ready = 1;
 
-                                wake_up (&sched->kss_waitq);
-                        }
+                if (!conn->ksnc_tx_scheduled && // not being progressed
+                    !list_empty(&conn->ksnc_tx_queue)){//packets to send
+                        list_add_tail (&conn->ksnc_tx_list,
+                                       &sched->kss_tx_conns);
+                        conn->ksnc_tx_scheduled = 1;
+                        /* extra ref for scheduler */
+                        atomic_inc (&conn->ksnc_refcount);
 
-                        spin_unlock_irqrestore (&sched->kss_lock, flags);
+                        wake_up (&sched->kss_waitq);
                 }
+
+                spin_unlock_irqrestore (&sched->kss_lock, flags);
         }
 
         read_unlock (&ksocknal_data.ksnd_global_lock);
index da47785..6f6fa7e 100644 (file)
@@ -914,15 +914,34 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
          * destroy it. */
         unsigned long   flags;
         ksock_peer_t   *peer = conn->ksnc_peer;
+        ksock_sched_t  *sched = conn->ksnc_scheduler;
         struct timeval  now;
         time_t          then = 0;
         int             notify = 0;
 
+        LASSERT(conn->ksnc_closing);
+
+        /* wake up the scheduler to "send" all remaining packets to /dev/null */
+        spin_lock_irqsave(&sched->kss_lock, flags);
+
+        if (!conn->ksnc_tx_scheduled &&
+            !list_empty(&conn->ksnc_tx_queue)){
+                list_add_tail (&conn->ksnc_tx_list,
+                               &sched->kss_tx_conns);
+                /* a closing conn is always ready to tx */
+                conn->ksnc_tx_ready = 1;
+                conn->ksnc_tx_scheduled = 1;
+                /* extra ref for scheduler */
+                atomic_inc (&conn->ksnc_refcount);
+
+                wake_up (&sched->kss_waitq);
+        }
+
+        spin_unlock_irqrestore (&sched->kss_lock, flags);
+
         /* serialise with callbacks */
         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
 
-        LASSERT (conn->ksnc_closing);
-        
         /* Remove conn's network callbacks.
          * NB I _have_ to restore the callback, rather than storing a noop,
          * since the socket could survive past this module being unloaded!! */
@@ -934,6 +953,8 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
          * sk_user_data is NULL. */
         conn->ksnc_sock->sk->sk_user_data = NULL;
 
+        /* OK, so this conn may not be completely disengaged from its
+         * scheduler yet, but it _has_ committed to terminate... */
         conn->ksnc_scheduler->kss_nconns--;
 
         if (peer->ksnp_error != 0) {
@@ -970,27 +991,20 @@ ksocknal_destroy_conn (ksock_conn_t *conn)
         LASSERT (conn->ksnc_route == NULL);
         LASSERT (!conn->ksnc_tx_scheduled);
         LASSERT (!conn->ksnc_rx_scheduled);
-
-        /* complete queued packets */
-        while (!list_empty (&conn->ksnc_tx_queue)) {
-                ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
-                                             ksock_tx_t, tx_list);
-
-                CERROR ("Deleting packet %p type %d len %d ("LPX64"->"LPX64")\n",
-                        tx,
-                        NTOH__u32 (tx->tx_hdr->type),
-                        NTOH__u32 (tx->tx_hdr->payload_length),
-                        NTOH__u64 (tx->tx_hdr->src_nid),
-                        NTOH__u64 (tx->tx_hdr->dest_nid));
-
-                list_del (&tx->tx_list);
-                ksocknal_tx_done (tx, 0);
-        }
+        LASSERT (list_empty(&conn->ksnc_tx_queue));
 
         /* complete current receive if any */
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_BODY:
+#if 0
                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
+#else
+                CERROR ("Refusing to complete a partial receive from "
+                        LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid,
+                        conn->ksnc_ipaddr);
+                CERROR ("This may hang communications and "
+                        "prevent modules from unloading\n");
+#endif
                 break;
         case SOCKNAL_RX_BODY_FWD:
                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
index b6fdc18..c33d852 100644 (file)
@@ -691,71 +691,37 @@ ksocknal_tx_launched (ksock_tx_t *tx)
         ksocknal_tx_done (tx, 0);
 }
 
-void
-ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
+int
+ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
 {
-        ksock_conn_t  *conn;
-        ksock_tx_t    *tx;
         int            rc;
-        
-        LASSERT (!list_empty (&sched->kss_tx_conns));
-        conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
-        list_del (&conn->ksnc_tx_list);
-
-        LASSERT (conn->ksnc_tx_scheduled);
-        LASSERT (conn->ksnc_tx_ready);
-        LASSERT (!list_empty (&conn->ksnc_tx_queue));
-        tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
-        /* assume transmit will complete now, so dequeue while I've got lock */
-        list_del (&tx->tx_list);
-
-        spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
-
-        LASSERT (tx->tx_resid > 0);
-
-        conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
-        mb();                   /* => clear BEFORE trying to write */
-
+       
         rc = ksocknal_sendmsg (conn, tx);
 
         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
+        LASSERT (rc != -EAGAIN);
 
-        if (rc != 0) {
-                if (!conn->ksnc_closing)
-                        CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
-                                conn, rc, conn->ksnc_peer->ksnp_nid,
-                                conn->ksnc_ipaddr, conn->ksnc_port);
-                ksocknal_close_conn_and_siblings (conn, rc);
-
+        if (rc == 0) {
+                /* no errors */
+                if (tx->tx_resid != 0) {
+                        /* didn't send everything */
+                        return (-EAGAIN);
+                }
+                
                 ksocknal_tx_launched (tx);
-                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
-
-        } else if (tx->tx_resid == 0) {  
-                /* everything went; assume more can go, and avoid
-                 * write_space locking */
-                conn->ksnc_tx_ready = 1;
+                return (0);
+        }
 
-                ksocknal_tx_launched (tx);
-                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
-        } else {
-                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
+        if (!conn->ksnc_closing)
+                CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
+                        conn, rc, conn->ksnc_peer->ksnp_nid,
+                        conn->ksnc_ipaddr, conn->ksnc_port);
 
-                /* back onto HEAD of tx_queue */
-                list_add (&tx->tx_list, &conn->ksnc_tx_queue);
-        }
+        ksocknal_close_conn_and_siblings (conn, rc);
+        ksocknal_tx_launched (tx);
 
-        /* no space to write, or nothing to write? */
-        if (!conn->ksnc_tx_ready ||
-            list_empty (&conn->ksnc_tx_queue)) {
-                /* mark not scheduled */
-                conn->ksnc_tx_scheduled = 0;
-                /* drop scheduler's ref */
-                ksocknal_put_conn (conn);
-        } else {
-                /* stay scheduled */
-                list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
-        }
-}
+        return (-EAGAIN);
+} 
 
 void
 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
@@ -893,8 +859,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 
         conn->ksnc_tx_deadline = jiffies + 
                                  ksocknal_data.ksnd_io_timeout * HZ;
-        mb();
-        /* Extend deadline BEFORE tx is enqueued */
+        mb();                                   /* order with list_add_tail */
 
         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
                 
@@ -1527,25 +1492,13 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
         return (0);
 }
 
-void
-ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
+int
+ksocknal_process_receive (ksock_conn_t *conn)
 {
-        ksock_conn_t *conn;
         ksock_fmb_t  *fmb;
         int           rc;
-
-        /* NB: sched->ksnc_lock lock held */
-
-        LASSERT (!list_empty (&sched->kss_rx_conns));
-        conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list);
-        list_del (&conn->ksnc_rx_list);
-
-        spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
-
-        CDEBUG(D_NET, "sched %p conn %p\n", sched, conn);
+        
         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
-        LASSERT (conn->ksnc_rx_scheduled);
-        LASSERT (conn->ksnc_rx_ready);
 
         /* doesn't need a forwarding buffer */
         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
@@ -1553,13 +1506,15 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
  get_fmb:
         fmb = ksocknal_get_idle_fmb (conn);
-        if (fmb == NULL) {      /* conn descheduled waiting for idle fmb */
-                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
-                return;
+        if (fmb == NULL) {
+                /* conn descheduled waiting for idle fmb */
+                return (0);
         }
 
-        if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */
-                goto out;               /* come back later for next packet */
+        if (ksocknal_init_fmb (conn, fmb)) {
+                /* packet forwarded */
+                return (0);
+        }
 
  try_read:
         /* NB: sched lock NOT held */
@@ -1570,9 +1525,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
         LASSERT (conn->ksnc_rx_nob_wanted > 0);
 
-        conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */
-        mb();                   /* => clear BEFORE trying to read */
-
         rc = ksocknal_recvmsg(conn);
 
         if (rc <= 0) {
@@ -1584,17 +1536,16 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                         CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
                                 conn, rc, conn->ksnc_peer->ksnp_nid,
                                 conn->ksnc_ipaddr, conn->ksnc_port);
+
                 ksocknal_close_conn_and_siblings (conn, rc);
-                goto out;
+                return (rc == 0 ? -ESHUTDOWN : rc);
         }
 
+        if (conn->ksnc_rx_nob_wanted != 0) {
+                /* short read */
+                return (-EAGAIN);
+        }
         
-        if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
-                goto out;                       /* try again later */
-
-        /* got all I wanted, assume there's more - prevent data_ready locking */
-        conn->ksnc_rx_ready = 1;
-
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_HEADER:
                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
@@ -1603,7 +1554,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                         ksocknal_fwd_parse (conn);
                         switch (conn->ksnc_rx_state) {
                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
-                                goto out;       /* => come back later */
+                                return (0);     /* => come back later */
                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
                                 goto try_read;  /* => go read it */
                         case SOCKNAL_RX_GET_FMB: /* forwarding */
@@ -1631,7 +1582,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
         case SOCKNAL_RX_SLOP:
                 /* starting new packet? */
                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
-                        goto out;       /* come back later */
+                        return (0);     /* come back later */
                 goto try_read;          /* try to finish reading slop now */
 
         case SOCKNAL_RX_BODY_FWD:
@@ -1650,7 +1601,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                 LASSERT (conn->ksnc_rx_nob_left == 0);
 
                 ksocknal_new_packet (conn, 0);  /* on to next packet */
-                goto out;                       /* (later) */
+                return (0);                     /* (later) */
 
         default:
                 break;
@@ -1658,20 +1609,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
         /* Not Reached */
         LBUG ();
-
- out:
-        spin_lock_irqsave (&sched->kss_lock, *irq_flags);
-
-        /* no data there to read? */
-        if (!conn->ksnc_rx_ready) {
-                /* let socket callback schedule again */
-                conn->ksnc_rx_scheduled = 0;
-                /* drop scheduler's ref */
-                ksocknal_put_conn (conn);   
-        } else {
-                /* stay scheduled */
-                list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
-        }
+        return (-EINVAL);                       /* keep gcc happy */
 }
 
 int
@@ -1729,6 +1667,8 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
 int ksocknal_scheduler (void *arg)
 {
         ksock_sched_t     *sched = (ksock_sched_t *)arg;
+        ksock_conn_t      *conn;
+        ksock_tx_t        *tx;
         unsigned long      flags;
         int                rc;
         int                nloops = 0;
@@ -1761,15 +1701,94 @@ int ksocknal_scheduler (void *arg)
                 /* Ensure I progress everything semi-fairly */
 
                 if (!list_empty (&sched->kss_rx_conns)) {
+                        conn = list_entry(sched->kss_rx_conns.next,
+                                          ksock_conn_t, ksnc_rx_list);
+                        list_del(&conn->ksnc_rx_list);
+
+                        LASSERT(conn->ksnc_rx_scheduled);
+                        LASSERT(conn->ksnc_rx_ready);
+
+                        /* clear rx_ready in case receive isn't complete.
+                         * Do it BEFORE we call process_recv, since
+                         * data_ready can set it any time after we release
+                         * kss_lock. */
+                        conn->ksnc_rx_ready = 0;
+                        spin_unlock_irqrestore(&sched->kss_lock, flags);
+                        
+                        rc = ksocknal_process_receive(conn);
+                        
+                        spin_lock_irqsave(&sched->kss_lock, flags);
+
+                        /* I'm the only one that can clear this flag */
+                        LASSERT(conn->ksnc_rx_scheduled);
+
+                        /* Did process_receive get everything it wanted? */
+                        if (rc == 0)
+                                conn->ksnc_rx_ready = 1;
+                        
+                        if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP) {
+                                /* Conn is blocking for a forwarding
+                                 * buffer; it will get queued for me when
+                                 * one becomes available.  Meanwhile my ref
+                                 * on it stays put. */
+                        } else if (conn->ksnc_rx_ready) {
+                                /* reschedule for rx */
+                                list_add_tail (&conn->ksnc_rx_list,
+                                               &sched->kss_rx_conns);
+                        } else {
+                                conn->ksnc_rx_scheduled = 0;
+                                /* drop my ref */
+                                ksocknal_put_conn(conn);
+                        }
+
                         did_something = 1;
-                        /* drops & regains kss_lock */
-                        ksocknal_process_receive (sched, &flags);
                 }
 
                 if (!list_empty (&sched->kss_tx_conns)) {
+                        conn = list_entry(sched->kss_tx_conns.next,
+                                          ksock_conn_t, ksnc_tx_list);
+                        list_del (&conn->ksnc_tx_list);
+                        
+                        LASSERT(conn->ksnc_tx_scheduled);
+                        LASSERT(conn->ksnc_tx_ready);
+                        LASSERT(!list_empty(&conn->ksnc_tx_queue));
+                        
+                        tx = list_entry(conn->ksnc_tx_queue.next,
+                                        ksock_tx_t, tx_list);
+                        /* dequeue now so empty list => more to send */
+                        list_del(&tx->tx_list);
+                        
+                        /* Clear tx_ready in case send isn't complete.  Do
+                         * it BEFORE we call process_transmit, since
+                         * write_space can set it any time after we release
+                         * kss_lock. */
+                        conn->ksnc_tx_ready = 0;
+                        spin_unlock_irqrestore (&sched->kss_lock, flags);
+                        
+                        rc = ksocknal_process_transmit(conn, tx);
+                        
+                        spin_lock_irqsave (&sched->kss_lock, flags);
+
+                        if (rc != -EAGAIN) {
+                                /* error or everything went: assume more can go */
+                                conn->ksnc_tx_ready = 1;
+                        } else {
+                                 /* back onto HEAD of tx_queue */
+                                list_add (&tx->tx_list, &conn->ksnc_tx_queue);
+                        }
+                        
+                        if (conn->ksnc_tx_ready &&
+                            !list_empty (&conn->ksnc_tx_queue)) {
+                                /* reschedule for tx */
+                                list_add_tail (&conn->ksnc_tx_list, 
+                                               &sched->kss_tx_conns);
+                        } else {
+                                conn->ksnc_tx_scheduled = 0;
+                                /* drop my ref */
+                                ksocknal_put_conn (conn);
+                        }
+                                
                         did_something = 1;
-                        /* drops and regains kss_lock */
-                        ksocknal_process_transmit (sched, &flags);
                 }
 #if SOCKNAL_ZC
                 if (!list_empty (&sched->kss_zctxdone_list)) {
@@ -1833,18 +1852,11 @@ ksocknal_data_ready (struct sock *sk, int n)
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
                 sk->sk_data_ready (sk, n);
-                goto out;
-        }
-
-        if (!conn->ksnc_rx_ready) {        /* new news */
-                /* Set ASAP in case of concurrent calls to me */
-                conn->ksnc_rx_ready = 1;
-
+        } else {
                 sched = conn->ksnc_scheduler;
 
                 spin_lock_irqsave (&sched->kss_lock, flags);
 
-                /* Set again (process_receive may have cleared while I blocked for the lock) */
                 conn->ksnc_rx_ready = 1;
 
                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
@@ -1860,7 +1872,6 @@ ksocknal_data_ready (struct sock *sk, int n)
                 spin_unlock_irqrestore (&sched->kss_lock, flags);
         }
 
- out:
         read_unlock (&ksocknal_data.ksnd_global_lock);
 
         EXIT;
@@ -1898,31 +1909,24 @@ ksocknal_write_space (struct sock *sk)
         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
 
-                if (!conn->ksnc_tx_ready) {      /* new news */
-                        /* Set ASAP in case of concurrent calls to me */
-                        conn->ksnc_tx_ready = 1;
-
-                        sched = conn->ksnc_scheduler;
-
-                        spin_lock_irqsave (&sched->kss_lock, flags);
+                sched = conn->ksnc_scheduler;
 
-                        /* Set again (process_transmit may have
-                           cleared while I blocked for the lock) */
-                        conn->ksnc_tx_ready = 1;
+                spin_lock_irqsave (&sched->kss_lock, flags);
 
-                        if (!conn->ksnc_tx_scheduled && // not being progressed
-                            !list_empty(&conn->ksnc_tx_queue)){//packets to send
-                                list_add_tail (&conn->ksnc_tx_list,
-                                               &sched->kss_tx_conns);
-                                conn->ksnc_tx_scheduled = 1;
-                                /* extra ref for scheduler */
-                                atomic_inc (&conn->ksnc_refcount);
+                conn->ksnc_tx_ready = 1;
 
-                                wake_up (&sched->kss_waitq);
-                        }
+                if (!conn->ksnc_tx_scheduled && // not being progressed
+                    !list_empty(&conn->ksnc_tx_queue)){//packets to send
+                        list_add_tail (&conn->ksnc_tx_list,
+                                       &sched->kss_tx_conns);
+                        conn->ksnc_tx_scheduled = 1;
+                        /* extra ref for scheduler */
+                        atomic_inc (&conn->ksnc_refcount);
 
-                        spin_unlock_irqrestore (&sched->kss_lock, flags);
+                        wake_up (&sched->kss_waitq);
                 }
+
+                spin_unlock_irqrestore (&sched->kss_lock, flags);
         }
 
         read_unlock (&ksocknal_data.ksnd_global_lock);