From 94c416e9620e3a4883585e4697efa5641d9d54b1 Mon Sep 17 00:00:00 2001 From: eeb Date: Wed, 26 Nov 2003 21:42:28 +0000 Subject: [PATCH] * Fixed socknal forwarding race in the new version of ksocknal_process_receive(). --- lnet/klnds/socklnd/socklnd.c | 50 ++++-- lnet/klnds/socklnd/socklnd_cb.c | 280 +++++++++++++++--------------- lustre/portals/knals/socknal/socknal.c | 50 ++++-- lustre/portals/knals/socknal/socknal_cb.c | 280 +++++++++++++++--------------- 4 files changed, 350 insertions(+), 310 deletions(-) diff --git a/lnet/klnds/socklnd/socklnd.c b/lnet/klnds/socklnd/socklnd.c index da47785..6f6fa7e 100644 --- a/lnet/klnds/socklnd/socklnd.c +++ b/lnet/klnds/socklnd/socklnd.c @@ -914,15 +914,34 @@ ksocknal_terminate_conn (ksock_conn_t *conn) * destroy it. */ unsigned long flags; ksock_peer_t *peer = conn->ksnc_peer; + ksock_sched_t *sched = conn->ksnc_scheduler; struct timeval now; time_t then = 0; int notify = 0; + LASSERT(conn->ksnc_closing); + + /* wake up the scheduler to "send" all remaining packets to /dev/null */ + spin_lock_irqsave(&sched->kss_lock, flags); + + if (!conn->ksnc_tx_scheduled && + !list_empty(&conn->ksnc_tx_queue)){ + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + /* a closing conn is always ready to tx */ + conn->ksnc_tx_ready = 1; + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + atomic_inc (&conn->ksnc_refcount); + + wake_up (&sched->kss_waitq); + } + + spin_unlock_irqrestore (&sched->kss_lock, flags); + /* serialise with callbacks */ write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags); - LASSERT (conn->ksnc_closing); - /* Remove conn's network callbacks. * NB I _have_ to restore the callback, rather than storing a noop, * since the socket could survive past this module being unloaded!! */ @@ -934,6 +953,8 @@ ksocknal_terminate_conn (ksock_conn_t *conn) * sk_user_data is NULL. */ conn->ksnc_sock->sk->sk_user_data = NULL; + /* OK, so this conn may not be completely disengaged from its + * scheduler yet, but it _has_ committed to terminate... */ conn->ksnc_scheduler->kss_nconns--; if (peer->ksnp_error != 0) { @@ -970,27 +991,20 @@ ksocknal_destroy_conn (ksock_conn_t *conn) LASSERT (conn->ksnc_route == NULL); LASSERT (!conn->ksnc_tx_scheduled); LASSERT (!conn->ksnc_rx_scheduled); - - /* complete queued packets */ - while (!list_empty (&conn->ksnc_tx_queue)) { - ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next, - ksock_tx_t, tx_list); - - CERROR ("Deleting packet %p type %d len %d ("LPX64"->"LPX64")\n", - tx, - NTOH__u32 (tx->tx_hdr->type), - NTOH__u32 (tx->tx_hdr->payload_length), - NTOH__u64 (tx->tx_hdr->src_nid), - NTOH__u64 (tx->tx_hdr->dest_nid)); - - list_del (&tx->tx_list); - ksocknal_tx_done (tx, 0); - } + LASSERT (list_empty(&conn->ksnc_tx_queue)); /* complete current receive if any */ switch (conn->ksnc_rx_state) { case SOCKNAL_RX_BODY: +#if 0 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie); +#else + CERROR ("Refusing to complete a partial receive from " + LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid, + conn->ksnc_ipaddr); + CERROR ("This may hang communications and " + "prevent modules from unloading\n"); +#endif break; case SOCKNAL_RX_BODY_FWD: ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED); diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index b6fdc18..6b1a7d9 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -691,71 +691,37 @@ ksocknal_tx_launched (ksock_tx_t *tx) ksocknal_tx_done (tx, 0); } -void -ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags) +int +ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) { - ksock_conn_t *conn; - ksock_tx_t *tx; int rc; - - LASSERT (!list_empty (&sched->kss_tx_conns)); - conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list); - list_del (&conn->ksnc_tx_list); - - LASSERT (conn->ksnc_tx_scheduled); - LASSERT (conn->ksnc_tx_ready); - LASSERT (!list_empty (&conn->ksnc_tx_queue)); - tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list); - /* assume transmit will complete now, so dequeue while I've got lock */ - list_del (&tx->tx_list); - - spin_unlock_irqrestore (&sched->kss_lock, *irq_flags); - - LASSERT (tx->tx_resid > 0); - - conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */ - mb(); /* => clear BEFORE trying to write */ - + rc = ksocknal_sendmsg (conn, tx); CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc); + LASSERT (rc != -EAGAIN); - if (rc != 0) { - if (!conn->ksnc_closing) - CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n", - conn, rc, conn->ksnc_peer->ksnp_nid, - conn->ksnc_ipaddr, conn->ksnc_port); - ksocknal_close_conn_and_siblings (conn, rc); - + if (rc == 0) { + /* no errors */ + if (tx->tx_resid != 0) { + /* didn't send everything */ + return (-EAGAIN); + } + ksocknal_tx_launched (tx); - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - - } else if (tx->tx_resid == 0) { - /* everything went; assume more can go, and avoid - * write_space locking */ - conn->ksnc_tx_ready = 1; + return (0); + } - ksocknal_tx_launched (tx); - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - } else { - spin_lock_irqsave (&sched->kss_lock, *irq_flags); + if (!conn->ksnc_closing) + CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n", + conn, rc, conn->ksnc_peer->ksnp_nid, + conn->ksnc_ipaddr, conn->ksnc_port); - /* back onto HEAD of tx_queue */ - list_add (&tx->tx_list, &conn->ksnc_tx_queue); - } + ksocknal_close_conn_and_siblings (conn, rc); + ksocknal_tx_launched (tx); - /* no space to write, or nothing to write? */ - if (!conn->ksnc_tx_ready || - list_empty (&conn->ksnc_tx_queue)) { - /* mark not scheduled */ - conn->ksnc_tx_scheduled = 0; - /* drop scheduler's ref */ - ksocknal_put_conn (conn); - } else { - /* stay scheduled */ - list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); - } -} + return (-EAGAIN); +} void ksocknal_launch_autoconnect_locked (ksock_route_t *route) @@ -893,8 +859,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn) conn->ksnc_tx_deadline = jiffies + ksocknal_data.ksnd_io_timeout * HZ; - mb(); - /* Extend deadline BEFORE tx is enqueued */ + mb(); /* order with list_add_tail */ list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); @@ -1527,25 +1492,13 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) return (0); } -void -ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) +int +ksocknal_process_receive (ksock_conn_t *conn) { - ksock_conn_t *conn; ksock_fmb_t *fmb; int rc; - - /* NB: sched->ksnc_lock lock held */ - - LASSERT (!list_empty (&sched->kss_rx_conns)); - conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list); - list_del (&conn->ksnc_rx_list); - - spin_unlock_irqrestore (&sched->kss_lock, *irq_flags); - - CDEBUG(D_NET, "sched %p conn %p\n", sched, conn); + LASSERT (atomic_read (&conn->ksnc_refcount) > 0); - LASSERT (conn->ksnc_rx_scheduled); - LASSERT (conn->ksnc_rx_ready); /* doesn't need a forwarding buffer */ if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB) @@ -1553,13 +1506,15 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) get_fmb: fmb = ksocknal_get_idle_fmb (conn); - if (fmb == NULL) { /* conn descheduled waiting for idle fmb */ - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - return; + if (fmb == NULL) { + /* conn descheduled waiting for idle fmb */ + return (0); } - if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */ - goto out; /* come back later for next packet */ + if (ksocknal_init_fmb (conn, fmb)) { + /* packet forwarded */ + return (0); + } try_read: /* NB: sched lock NOT held */ @@ -1570,9 +1525,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) LASSERT (conn->ksnc_rx_nob_wanted > 0); - conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */ - mb(); /* => clear BEFORE trying to read */ - rc = ksocknal_recvmsg(conn); if (rc <= 0) { @@ -1584,17 +1536,16 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n", conn, rc, conn->ksnc_peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port); + ksocknal_close_conn_and_siblings (conn, rc); - goto out; + return (rc == 0 ? -ESHUTDOWN : rc); } + if (conn->ksnc_rx_nob_wanted != 0) { + /* short read */ + return (-EAGAIN); + } - if (conn->ksnc_rx_nob_wanted != 0) /* short read */ - goto out; /* try again later */ - - /* got all I wanted, assume there's more - prevent data_ready locking */ - conn->ksnc_rx_ready = 1; - switch (conn->ksnc_rx_state) { case SOCKNAL_RX_HEADER: if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) && @@ -1603,7 +1554,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) ksocknal_fwd_parse (conn); switch (conn->ksnc_rx_state) { case SOCKNAL_RX_HEADER: /* skipped (zero payload) */ - goto out; /* => come back later */ + return (0); /* => come back later */ case SOCKNAL_RX_SLOP: /* skipping packet's body */ goto try_read; /* => go read it */ case SOCKNAL_RX_GET_FMB: /* forwarding */ @@ -1631,7 +1582,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) case SOCKNAL_RX_SLOP: /* starting new packet? */ if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left)) - goto out; /* come back later */ + return (0); /* come back later */ goto try_read; /* try to finish reading slop now */ case SOCKNAL_RX_BODY_FWD: @@ -1650,7 +1601,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) LASSERT (conn->ksnc_rx_nob_left == 0); ksocknal_new_packet (conn, 0); /* on to next packet */ - goto out; /* (later) */ + return (0); /* (later) */ default: break; @@ -1658,20 +1609,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) /* Not Reached */ LBUG (); - - out: - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - - /* no data there to read? */ - if (!conn->ksnc_rx_ready) { - /* let socket callback schedule again */ - conn->ksnc_rx_scheduled = 0; - /* drop scheduler's ref */ - ksocknal_put_conn (conn); - } else { - /* stay scheduled */ - list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns); - } + return (-EINVAL); /* keep gcc happy */ } int @@ -1729,6 +1667,8 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg, int ksocknal_scheduler (void *arg) { ksock_sched_t *sched = (ksock_sched_t *)arg; + ksock_conn_t *conn; + ksock_tx_t *tx; unsigned long flags; int rc; int nloops = 0; @@ -1761,15 +1701,96 @@ int ksocknal_scheduler (void *arg) /* Ensure I progress everything semi-fairly */ if (!list_empty (&sched->kss_rx_conns)) { + conn = list_entry(sched->kss_rx_conns.next, + ksock_conn_t, ksnc_rx_list); + list_del(&conn->ksnc_rx_list); + + LASSERT(conn->ksnc_rx_scheduled); + LASSERT(conn->ksnc_rx_ready); + + /* clear rx_ready in case receive isn't complete. + * Do it BEFORE we call process_recv, since + * data_ready can set it any time after we release + * kss_lock. */ + conn->ksnc_rx_ready = 0; + spin_unlock_irqrestore(&sched->kss_lock, flags); + + rc = ksocknal_process_receive(conn); + + spin_lock_irqsave(&sched->kss_lock, flags); + + /* I'm the only one that can clear this flag */ + LASSERT(conn->ksnc_rx_scheduled); + + /* Did process_receive get everything it wanted? */ + if (rc == 0) + conn->ksnc_rx_ready = 1; + + if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP || + conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) { + /* Conn blocked for a forwarding buffer. + * It will get queued for my attention when + * one becomes available (and it might just + * already have been!). Meanwhile my ref + * on it stays put. */ + } else if (conn->ksnc_rx_ready) { + /* reschedule for rx */ + list_add_tail (&conn->ksnc_rx_list, + &sched->kss_rx_conns); + } else { + conn->ksnc_rx_scheduled = 0; + /* drop my ref */ + ksocknal_put_conn(conn); + } + did_something = 1; - /* drops & regains kss_lock */ - ksocknal_process_receive (sched, &flags); } if (!list_empty (&sched->kss_tx_conns)) { + conn = list_entry(sched->kss_tx_conns.next, + ksock_conn_t, ksnc_tx_list); + list_del (&conn->ksnc_tx_list); + + LASSERT(conn->ksnc_tx_scheduled); + LASSERT(conn->ksnc_tx_ready); + LASSERT(!list_empty(&conn->ksnc_tx_queue)); + + tx = list_entry(conn->ksnc_tx_queue.next, + ksock_tx_t, tx_list); + /* dequeue now so empty list => more to send */ + list_del(&tx->tx_list); + + /* Clear tx_ready in case send isn't complete. Do + * it BEFORE we call process_transmit, since + * write_space can set it any time after we release + * kss_lock. */ + conn->ksnc_tx_ready = 0; + spin_unlock_irqrestore (&sched->kss_lock, flags); + + rc = ksocknal_process_transmit(conn, tx); + + spin_lock_irqsave (&sched->kss_lock, flags); + + if (rc != -EAGAIN) { + /* error or everything went: assume more can go */ + conn->ksnc_tx_ready = 1; + } else { + /* back onto HEAD of tx_queue */ + list_add (&tx->tx_list, &conn->ksnc_tx_queue); + } + + if (conn->ksnc_tx_ready && + !list_empty (&conn->ksnc_tx_queue)) { + /* reschedule for tx */ + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + } else { + conn->ksnc_tx_scheduled = 0; + /* drop my ref */ + ksocknal_put_conn (conn); + } + did_something = 1; - /* drops and regains kss_lock */ - ksocknal_process_transmit (sched, &flags); } #if SOCKNAL_ZC if (!list_empty (&sched->kss_zctxdone_list)) { @@ -1833,18 +1854,11 @@ ksocknal_data_ready (struct sock *sk, int n) if (conn == NULL) { /* raced with ksocknal_close_sock */ LASSERT (sk->sk_data_ready != &ksocknal_data_ready); sk->sk_data_ready (sk, n); - goto out; - } - - if (!conn->ksnc_rx_ready) { /* new news */ - /* Set ASAP in case of concurrent calls to me */ - conn->ksnc_rx_ready = 1; - + } else { sched = conn->ksnc_scheduler; spin_lock_irqsave (&sched->kss_lock, flags); - /* Set again (process_receive may have cleared while I blocked for the lock) */ conn->ksnc_rx_ready = 1; if (!conn->ksnc_rx_scheduled) { /* not being progressed */ @@ -1860,7 +1874,6 @@ ksocknal_data_ready (struct sock *sk, int n) spin_unlock_irqrestore (&sched->kss_lock, flags); } - out: read_unlock (&ksocknal_data.ksnd_global_lock); EXIT; @@ -1898,31 +1911,24 @@ ksocknal_write_space (struct sock *sk) if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */ clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); - if (!conn->ksnc_tx_ready) { /* new news */ - /* Set ASAP in case of concurrent calls to me */ - conn->ksnc_tx_ready = 1; - - sched = conn->ksnc_scheduler; - - spin_lock_irqsave (&sched->kss_lock, flags); + sched = conn->ksnc_scheduler; - /* Set again (process_transmit may have - cleared while I blocked for the lock) */ - conn->ksnc_tx_ready = 1; + spin_lock_irqsave (&sched->kss_lock, flags); - if (!conn->ksnc_tx_scheduled && // not being progressed - !list_empty(&conn->ksnc_tx_queue)){//packets to send - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); - conn->ksnc_tx_scheduled = 1; - /* extra ref for scheduler */ - atomic_inc (&conn->ksnc_refcount); + conn->ksnc_tx_ready = 1; - wake_up (&sched->kss_waitq); - } + if (!conn->ksnc_tx_scheduled && // not being progressed + !list_empty(&conn->ksnc_tx_queue)){//packets to send + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + atomic_inc (&conn->ksnc_refcount); - spin_unlock_irqrestore (&sched->kss_lock, flags); + wake_up (&sched->kss_waitq); } + + spin_unlock_irqrestore (&sched->kss_lock, flags); } read_unlock (&ksocknal_data.ksnd_global_lock); diff --git a/lustre/portals/knals/socknal/socknal.c b/lustre/portals/knals/socknal/socknal.c index da47785..6f6fa7e 100644 --- a/lustre/portals/knals/socknal/socknal.c +++ b/lustre/portals/knals/socknal/socknal.c @@ -914,15 +914,34 @@ ksocknal_terminate_conn (ksock_conn_t *conn) * destroy it. */ unsigned long flags; ksock_peer_t *peer = conn->ksnc_peer; + ksock_sched_t *sched = conn->ksnc_scheduler; struct timeval now; time_t then = 0; int notify = 0; + LASSERT(conn->ksnc_closing); + + /* wake up the scheduler to "send" all remaining packets to /dev/null */ + spin_lock_irqsave(&sched->kss_lock, flags); + + if (!conn->ksnc_tx_scheduled && + !list_empty(&conn->ksnc_tx_queue)){ + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + /* a closing conn is always ready to tx */ + conn->ksnc_tx_ready = 1; + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + atomic_inc (&conn->ksnc_refcount); + + wake_up (&sched->kss_waitq); + } + + spin_unlock_irqrestore (&sched->kss_lock, flags); + /* serialise with callbacks */ write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags); - LASSERT (conn->ksnc_closing); - /* Remove conn's network callbacks. * NB I _have_ to restore the callback, rather than storing a noop, * since the socket could survive past this module being unloaded!! */ @@ -934,6 +953,8 @@ ksocknal_terminate_conn (ksock_conn_t *conn) * sk_user_data is NULL. */ conn->ksnc_sock->sk->sk_user_data = NULL; + /* OK, so this conn may not be completely disengaged from its + * scheduler yet, but it _has_ committed to terminate... */ conn->ksnc_scheduler->kss_nconns--; if (peer->ksnp_error != 0) { @@ -970,27 +991,20 @@ ksocknal_destroy_conn (ksock_conn_t *conn) LASSERT (conn->ksnc_route == NULL); LASSERT (!conn->ksnc_tx_scheduled); LASSERT (!conn->ksnc_rx_scheduled); - - /* complete queued packets */ - while (!list_empty (&conn->ksnc_tx_queue)) { - ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next, - ksock_tx_t, tx_list); - - CERROR ("Deleting packet %p type %d len %d ("LPX64"->"LPX64")\n", - tx, - NTOH__u32 (tx->tx_hdr->type), - NTOH__u32 (tx->tx_hdr->payload_length), - NTOH__u64 (tx->tx_hdr->src_nid), - NTOH__u64 (tx->tx_hdr->dest_nid)); - - list_del (&tx->tx_list); - ksocknal_tx_done (tx, 0); - } + LASSERT (list_empty(&conn->ksnc_tx_queue)); /* complete current receive if any */ switch (conn->ksnc_rx_state) { case SOCKNAL_RX_BODY: +#if 0 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie); +#else + CERROR ("Refusing to complete a partial receive from " + LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid, + conn->ksnc_ipaddr); + CERROR ("This may hang communications and " + "prevent modules from unloading\n"); +#endif break; case SOCKNAL_RX_BODY_FWD: ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED); diff --git a/lustre/portals/knals/socknal/socknal_cb.c b/lustre/portals/knals/socknal/socknal_cb.c index b6fdc18..6b1a7d9 100644 --- a/lustre/portals/knals/socknal/socknal_cb.c +++ b/lustre/portals/knals/socknal/socknal_cb.c @@ -691,71 +691,37 @@ ksocknal_tx_launched (ksock_tx_t *tx) ksocknal_tx_done (tx, 0); } -void -ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags) +int +ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) { - ksock_conn_t *conn; - ksock_tx_t *tx; int rc; - - LASSERT (!list_empty (&sched->kss_tx_conns)); - conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list); - list_del (&conn->ksnc_tx_list); - - LASSERT (conn->ksnc_tx_scheduled); - LASSERT (conn->ksnc_tx_ready); - LASSERT (!list_empty (&conn->ksnc_tx_queue)); - tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list); - /* assume transmit will complete now, so dequeue while I've got lock */ - list_del (&tx->tx_list); - - spin_unlock_irqrestore (&sched->kss_lock, *irq_flags); - - LASSERT (tx->tx_resid > 0); - - conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */ - mb(); /* => clear BEFORE trying to write */ - + rc = ksocknal_sendmsg (conn, tx); CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc); + LASSERT (rc != -EAGAIN); - if (rc != 0) { - if (!conn->ksnc_closing) - CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n", - conn, rc, conn->ksnc_peer->ksnp_nid, - conn->ksnc_ipaddr, conn->ksnc_port); - ksocknal_close_conn_and_siblings (conn, rc); - + if (rc == 0) { + /* no errors */ + if (tx->tx_resid != 0) { + /* didn't send everything */ + return (-EAGAIN); + } + ksocknal_tx_launched (tx); - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - - } else if (tx->tx_resid == 0) { - /* everything went; assume more can go, and avoid - * write_space locking */ - conn->ksnc_tx_ready = 1; + return (0); + } - ksocknal_tx_launched (tx); - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - } else { - spin_lock_irqsave (&sched->kss_lock, *irq_flags); + if (!conn->ksnc_closing) + CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n", + conn, rc, conn->ksnc_peer->ksnp_nid, + conn->ksnc_ipaddr, conn->ksnc_port); - /* back onto HEAD of tx_queue */ - list_add (&tx->tx_list, &conn->ksnc_tx_queue); - } + ksocknal_close_conn_and_siblings (conn, rc); + ksocknal_tx_launched (tx); - /* no space to write, or nothing to write? */ - if (!conn->ksnc_tx_ready || - list_empty (&conn->ksnc_tx_queue)) { - /* mark not scheduled */ - conn->ksnc_tx_scheduled = 0; - /* drop scheduler's ref */ - ksocknal_put_conn (conn); - } else { - /* stay scheduled */ - list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); - } -} + return (-EAGAIN); +} void ksocknal_launch_autoconnect_locked (ksock_route_t *route) @@ -893,8 +859,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn) conn->ksnc_tx_deadline = jiffies + ksocknal_data.ksnd_io_timeout * HZ; - mb(); - /* Extend deadline BEFORE tx is enqueued */ + mb(); /* order with list_add_tail */ list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); @@ -1527,25 +1492,13 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) return (0); } -void -ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) +int +ksocknal_process_receive (ksock_conn_t *conn) { - ksock_conn_t *conn; ksock_fmb_t *fmb; int rc; - - /* NB: sched->ksnc_lock lock held */ - - LASSERT (!list_empty (&sched->kss_rx_conns)); - conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list); - list_del (&conn->ksnc_rx_list); - - spin_unlock_irqrestore (&sched->kss_lock, *irq_flags); - - CDEBUG(D_NET, "sched %p conn %p\n", sched, conn); + LASSERT (atomic_read (&conn->ksnc_refcount) > 0); - LASSERT (conn->ksnc_rx_scheduled); - LASSERT (conn->ksnc_rx_ready); /* doesn't need a forwarding buffer */ if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB) @@ -1553,13 +1506,15 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) get_fmb: fmb = ksocknal_get_idle_fmb (conn); - if (fmb == NULL) { /* conn descheduled waiting for idle fmb */ - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - return; + if (fmb == NULL) { + /* conn descheduled waiting for idle fmb */ + return (0); } - if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */ - goto out; /* come back later for next packet */ + if (ksocknal_init_fmb (conn, fmb)) { + /* packet forwarded */ + return (0); + } try_read: /* NB: sched lock NOT held */ @@ -1570,9 +1525,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) LASSERT (conn->ksnc_rx_nob_wanted > 0); - conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */ - mb(); /* => clear BEFORE trying to read */ - rc = ksocknal_recvmsg(conn); if (rc <= 0) { @@ -1584,17 +1536,16 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n", conn, rc, conn->ksnc_peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port); + ksocknal_close_conn_and_siblings (conn, rc); - goto out; + return (rc == 0 ? -ESHUTDOWN : rc); } + if (conn->ksnc_rx_nob_wanted != 0) { + /* short read */ + return (-EAGAIN); + } - if (conn->ksnc_rx_nob_wanted != 0) /* short read */ - goto out; /* try again later */ - - /* got all I wanted, assume there's more - prevent data_ready locking */ - conn->ksnc_rx_ready = 1; - switch (conn->ksnc_rx_state) { case SOCKNAL_RX_HEADER: if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) && @@ -1603,7 +1554,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) ksocknal_fwd_parse (conn); switch (conn->ksnc_rx_state) { case SOCKNAL_RX_HEADER: /* skipped (zero payload) */ - goto out; /* => come back later */ + return (0); /* => come back later */ case SOCKNAL_RX_SLOP: /* skipping packet's body */ goto try_read; /* => go read it */ case SOCKNAL_RX_GET_FMB: /* forwarding */ @@ -1631,7 +1582,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) case SOCKNAL_RX_SLOP: /* starting new packet? */ if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left)) - goto out; /* come back later */ + return (0); /* come back later */ goto try_read; /* try to finish reading slop now */ case SOCKNAL_RX_BODY_FWD: @@ -1650,7 +1601,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) LASSERT (conn->ksnc_rx_nob_left == 0); ksocknal_new_packet (conn, 0); /* on to next packet */ - goto out; /* (later) */ + return (0); /* (later) */ default: break; @@ -1658,20 +1609,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) /* Not Reached */ LBUG (); - - out: - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - - /* no data there to read? */ - if (!conn->ksnc_rx_ready) { - /* let socket callback schedule again */ - conn->ksnc_rx_scheduled = 0; - /* drop scheduler's ref */ - ksocknal_put_conn (conn); - } else { - /* stay scheduled */ - list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns); - } + return (-EINVAL); /* keep gcc happy */ } int @@ -1729,6 +1667,8 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg, int ksocknal_scheduler (void *arg) { ksock_sched_t *sched = (ksock_sched_t *)arg; + ksock_conn_t *conn; + ksock_tx_t *tx; unsigned long flags; int rc; int nloops = 0; @@ -1761,15 +1701,96 @@ int ksocknal_scheduler (void *arg) /* Ensure I progress everything semi-fairly */ if (!list_empty (&sched->kss_rx_conns)) { + conn = list_entry(sched->kss_rx_conns.next, + ksock_conn_t, ksnc_rx_list); + list_del(&conn->ksnc_rx_list); + + LASSERT(conn->ksnc_rx_scheduled); + LASSERT(conn->ksnc_rx_ready); + + /* clear rx_ready in case receive isn't complete. + * Do it BEFORE we call process_recv, since + * data_ready can set it any time after we release + * kss_lock. */ + conn->ksnc_rx_ready = 0; + spin_unlock_irqrestore(&sched->kss_lock, flags); + + rc = ksocknal_process_receive(conn); + + spin_lock_irqsave(&sched->kss_lock, flags); + + /* I'm the only one that can clear this flag */ + LASSERT(conn->ksnc_rx_scheduled); + + /* Did process_receive get everything it wanted? */ + if (rc == 0) + conn->ksnc_rx_ready = 1; + + if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP || + conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) { + /* Conn blocked for a forwarding buffer. + * It will get queued for my attention when + * one becomes available (and it might just + * already have been!). Meanwhile my ref + * on it stays put. */ + } else if (conn->ksnc_rx_ready) { + /* reschedule for rx */ + list_add_tail (&conn->ksnc_rx_list, + &sched->kss_rx_conns); + } else { + conn->ksnc_rx_scheduled = 0; + /* drop my ref */ + ksocknal_put_conn(conn); + } + did_something = 1; - /* drops & regains kss_lock */ - ksocknal_process_receive (sched, &flags); } if (!list_empty (&sched->kss_tx_conns)) { + conn = list_entry(sched->kss_tx_conns.next, + ksock_conn_t, ksnc_tx_list); + list_del (&conn->ksnc_tx_list); + + LASSERT(conn->ksnc_tx_scheduled); + LASSERT(conn->ksnc_tx_ready); + LASSERT(!list_empty(&conn->ksnc_tx_queue)); + + tx = list_entry(conn->ksnc_tx_queue.next, + ksock_tx_t, tx_list); + /* dequeue now so empty list => more to send */ + list_del(&tx->tx_list); + + /* Clear tx_ready in case send isn't complete. Do + * it BEFORE we call process_transmit, since + * write_space can set it any time after we release + * kss_lock. */ + conn->ksnc_tx_ready = 0; + spin_unlock_irqrestore (&sched->kss_lock, flags); + + rc = ksocknal_process_transmit(conn, tx); + + spin_lock_irqsave (&sched->kss_lock, flags); + + if (rc != -EAGAIN) { + /* error or everything went: assume more can go */ + conn->ksnc_tx_ready = 1; + } else { + /* back onto HEAD of tx_queue */ + list_add (&tx->tx_list, &conn->ksnc_tx_queue); + } + + if (conn->ksnc_tx_ready && + !list_empty (&conn->ksnc_tx_queue)) { + /* reschedule for tx */ + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + } else { + conn->ksnc_tx_scheduled = 0; + /* drop my ref */ + ksocknal_put_conn (conn); + } + did_something = 1; - /* drops and regains kss_lock */ - ksocknal_process_transmit (sched, &flags); } #if SOCKNAL_ZC if (!list_empty (&sched->kss_zctxdone_list)) { @@ -1833,18 +1854,11 @@ ksocknal_data_ready (struct sock *sk, int n) if (conn == NULL) { /* raced with ksocknal_close_sock */ LASSERT (sk->sk_data_ready != &ksocknal_data_ready); sk->sk_data_ready (sk, n); - goto out; - } - - if (!conn->ksnc_rx_ready) { /* new news */ - /* Set ASAP in case of concurrent calls to me */ - conn->ksnc_rx_ready = 1; - + } else { sched = conn->ksnc_scheduler; spin_lock_irqsave (&sched->kss_lock, flags); - /* Set again (process_receive may have cleared while I blocked for the lock) */ conn->ksnc_rx_ready = 1; if (!conn->ksnc_rx_scheduled) { /* not being progressed */ @@ -1860,7 +1874,6 @@ ksocknal_data_ready (struct sock *sk, int n) spin_unlock_irqrestore (&sched->kss_lock, flags); } - out: read_unlock (&ksocknal_data.ksnd_global_lock); EXIT; @@ -1898,31 +1911,24 @@ ksocknal_write_space (struct sock *sk) if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */ clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); - if (!conn->ksnc_tx_ready) { /* new news */ - /* Set ASAP in case of concurrent calls to me */ - conn->ksnc_tx_ready = 1; - - sched = conn->ksnc_scheduler; - - spin_lock_irqsave (&sched->kss_lock, flags); + sched = conn->ksnc_scheduler; - /* Set again (process_transmit may have - cleared while I blocked for the lock) */ - conn->ksnc_tx_ready = 1; + spin_lock_irqsave (&sched->kss_lock, flags); - if (!conn->ksnc_tx_scheduled && // not being progressed - !list_empty(&conn->ksnc_tx_queue)){//packets to send - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); - conn->ksnc_tx_scheduled = 1; - /* extra ref for scheduler */ - atomic_inc (&conn->ksnc_refcount); + conn->ksnc_tx_ready = 1; - wake_up (&sched->kss_waitq); - } + if (!conn->ksnc_tx_scheduled && // not being progressed + !list_empty(&conn->ksnc_tx_queue)){//packets to send + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + atomic_inc (&conn->ksnc_refcount); - spin_unlock_irqrestore (&sched->kss_lock, flags); + wake_up (&sched->kss_waitq); } + + spin_unlock_irqrestore (&sched->kss_lock, flags); } read_unlock (&ksocknal_data.ksnd_global_lock); -- 1.8.3.1