From 75d9259d2bfb63caf12285a7001cb7bdb9639d24 Mon Sep 17 00:00:00 2001 From: phil Date: Fri, 28 Nov 2003 08:12:52 +0000 Subject: [PATCH] merge b_devel into b_eq --- lnet/klnds/qswlnd/qswlnd.c | 1 + lnet/klnds/socklnd/socklnd.c | 50 ++++-- lnet/klnds/socklnd/socklnd_cb.c | 280 +++++++++++++++--------------- lnet/utils/Makefile.am | 6 +- lustre/portals/knals/qswnal/qswnal.c | 1 + lustre/portals/knals/socknal/socknal.c | 50 ++++-- lustre/portals/knals/socknal/socknal_cb.c | 280 +++++++++++++++--------------- lustre/portals/utils/Makefile.am | 6 +- lustre/ptlrpc/llog_server.c | 44 +++++ 9 files changed, 404 insertions(+), 314 deletions(-) diff --git a/lnet/klnds/qswlnd/qswlnd.c b/lnet/klnds/qswlnd/qswlnd.c index 9caf381..70b45c0 100644 --- a/lnet/klnds/qswlnd/qswlnd.c +++ b/lnet/klnds/qswlnd/qswlnd.c @@ -179,6 +179,7 @@ kqswnal_finalise (void) case KQN_INIT_ALL: PORTAL_SYMBOL_UNREGISTER (kqswnal_ni); + kportal_nal_unregister(QSWNAL); /* fall through */ case KQN_INIT_PTL: diff --git a/lnet/klnds/socklnd/socklnd.c b/lnet/klnds/socklnd/socklnd.c index da47785..6f6fa7e 100644 --- a/lnet/klnds/socklnd/socklnd.c +++ b/lnet/klnds/socklnd/socklnd.c @@ -914,15 +914,34 @@ ksocknal_terminate_conn (ksock_conn_t *conn) * destroy it. */ unsigned long flags; ksock_peer_t *peer = conn->ksnc_peer; + ksock_sched_t *sched = conn->ksnc_scheduler; struct timeval now; time_t then = 0; int notify = 0; + LASSERT(conn->ksnc_closing); + + /* wake up the scheduler to "send" all remaining packets to /dev/null */ + spin_lock_irqsave(&sched->kss_lock, flags); + + if (!conn->ksnc_tx_scheduled && + !list_empty(&conn->ksnc_tx_queue)){ + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + /* a closing conn is always ready to tx */ + conn->ksnc_tx_ready = 1; + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + atomic_inc (&conn->ksnc_refcount); + + wake_up (&sched->kss_waitq); + } + + spin_unlock_irqrestore (&sched->kss_lock, flags); + /* serialise with callbacks */ write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags); - LASSERT (conn->ksnc_closing); - /* Remove conn's network callbacks. * NB I _have_ to restore the callback, rather than storing a noop, * since the socket could survive past this module being unloaded!! */ @@ -934,6 +953,8 @@ ksocknal_terminate_conn (ksock_conn_t *conn) * sk_user_data is NULL. */ conn->ksnc_sock->sk->sk_user_data = NULL; + /* OK, so this conn may not be completely disengaged from its + * scheduler yet, but it _has_ committed to terminate... */ conn->ksnc_scheduler->kss_nconns--; if (peer->ksnp_error != 0) { @@ -970,27 +991,20 @@ ksocknal_destroy_conn (ksock_conn_t *conn) LASSERT (conn->ksnc_route == NULL); LASSERT (!conn->ksnc_tx_scheduled); LASSERT (!conn->ksnc_rx_scheduled); - - /* complete queued packets */ - while (!list_empty (&conn->ksnc_tx_queue)) { - ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next, - ksock_tx_t, tx_list); - - CERROR ("Deleting packet %p type %d len %d ("LPX64"->"LPX64")\n", - tx, - NTOH__u32 (tx->tx_hdr->type), - NTOH__u32 (tx->tx_hdr->payload_length), - NTOH__u64 (tx->tx_hdr->src_nid), - NTOH__u64 (tx->tx_hdr->dest_nid)); - - list_del (&tx->tx_list); - ksocknal_tx_done (tx, 0); - } + LASSERT (list_empty(&conn->ksnc_tx_queue)); /* complete current receive if any */ switch (conn->ksnc_rx_state) { case SOCKNAL_RX_BODY: +#if 0 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie); +#else + CERROR ("Refusing to complete a partial receive from " + LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid, + conn->ksnc_ipaddr); + CERROR ("This may hang communications and " + "prevent modules from unloading\n"); +#endif break; case SOCKNAL_RX_BODY_FWD: ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED); diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index b6fdc18..6b1a7d9 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -691,71 +691,37 @@ ksocknal_tx_launched (ksock_tx_t *tx) ksocknal_tx_done (tx, 0); } -void -ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags) +int +ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) { - ksock_conn_t *conn; - ksock_tx_t *tx; int rc; - - LASSERT (!list_empty (&sched->kss_tx_conns)); - conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list); - list_del (&conn->ksnc_tx_list); - - LASSERT (conn->ksnc_tx_scheduled); - LASSERT (conn->ksnc_tx_ready); - LASSERT (!list_empty (&conn->ksnc_tx_queue)); - tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list); - /* assume transmit will complete now, so dequeue while I've got lock */ - list_del (&tx->tx_list); - - spin_unlock_irqrestore (&sched->kss_lock, *irq_flags); - - LASSERT (tx->tx_resid > 0); - - conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */ - mb(); /* => clear BEFORE trying to write */ - + rc = ksocknal_sendmsg (conn, tx); CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc); + LASSERT (rc != -EAGAIN); - if (rc != 0) { - if (!conn->ksnc_closing) - CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n", - conn, rc, conn->ksnc_peer->ksnp_nid, - conn->ksnc_ipaddr, conn->ksnc_port); - ksocknal_close_conn_and_siblings (conn, rc); - + if (rc == 0) { + /* no errors */ + if (tx->tx_resid != 0) { + /* didn't send everything */ + return (-EAGAIN); + } + ksocknal_tx_launched (tx); - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - - } else if (tx->tx_resid == 0) { - /* everything went; assume more can go, and avoid - * write_space locking */ - conn->ksnc_tx_ready = 1; + return (0); + } - ksocknal_tx_launched (tx); - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - } else { - spin_lock_irqsave (&sched->kss_lock, *irq_flags); + if (!conn->ksnc_closing) + CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n", + conn, rc, conn->ksnc_peer->ksnp_nid, + conn->ksnc_ipaddr, conn->ksnc_port); - /* back onto HEAD of tx_queue */ - list_add (&tx->tx_list, &conn->ksnc_tx_queue); - } + ksocknal_close_conn_and_siblings (conn, rc); + ksocknal_tx_launched (tx); - /* no space to write, or nothing to write? */ - if (!conn->ksnc_tx_ready || - list_empty (&conn->ksnc_tx_queue)) { - /* mark not scheduled */ - conn->ksnc_tx_scheduled = 0; - /* drop scheduler's ref */ - ksocknal_put_conn (conn); - } else { - /* stay scheduled */ - list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); - } -} + return (-EAGAIN); +} void ksocknal_launch_autoconnect_locked (ksock_route_t *route) @@ -893,8 +859,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn) conn->ksnc_tx_deadline = jiffies + ksocknal_data.ksnd_io_timeout * HZ; - mb(); - /* Extend deadline BEFORE tx is enqueued */ + mb(); /* order with list_add_tail */ list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); @@ -1527,25 +1492,13 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) return (0); } -void -ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) +int +ksocknal_process_receive (ksock_conn_t *conn) { - ksock_conn_t *conn; ksock_fmb_t *fmb; int rc; - - /* NB: sched->ksnc_lock lock held */ - - LASSERT (!list_empty (&sched->kss_rx_conns)); - conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list); - list_del (&conn->ksnc_rx_list); - - spin_unlock_irqrestore (&sched->kss_lock, *irq_flags); - - CDEBUG(D_NET, "sched %p conn %p\n", sched, conn); + LASSERT (atomic_read (&conn->ksnc_refcount) > 0); - LASSERT (conn->ksnc_rx_scheduled); - LASSERT (conn->ksnc_rx_ready); /* doesn't need a forwarding buffer */ if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB) @@ -1553,13 +1506,15 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) get_fmb: fmb = ksocknal_get_idle_fmb (conn); - if (fmb == NULL) { /* conn descheduled waiting for idle fmb */ - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - return; + if (fmb == NULL) { + /* conn descheduled waiting for idle fmb */ + return (0); } - if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */ - goto out; /* come back later for next packet */ + if (ksocknal_init_fmb (conn, fmb)) { + /* packet forwarded */ + return (0); + } try_read: /* NB: sched lock NOT held */ @@ -1570,9 +1525,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) LASSERT (conn->ksnc_rx_nob_wanted > 0); - conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */ - mb(); /* => clear BEFORE trying to read */ - rc = ksocknal_recvmsg(conn); if (rc <= 0) { @@ -1584,17 +1536,16 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n", conn, rc, conn->ksnc_peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port); + ksocknal_close_conn_and_siblings (conn, rc); - goto out; + return (rc == 0 ? -ESHUTDOWN : rc); } + if (conn->ksnc_rx_nob_wanted != 0) { + /* short read */ + return (-EAGAIN); + } - if (conn->ksnc_rx_nob_wanted != 0) /* short read */ - goto out; /* try again later */ - - /* got all I wanted, assume there's more - prevent data_ready locking */ - conn->ksnc_rx_ready = 1; - switch (conn->ksnc_rx_state) { case SOCKNAL_RX_HEADER: if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) && @@ -1603,7 +1554,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) ksocknal_fwd_parse (conn); switch (conn->ksnc_rx_state) { case SOCKNAL_RX_HEADER: /* skipped (zero payload) */ - goto out; /* => come back later */ + return (0); /* => come back later */ case SOCKNAL_RX_SLOP: /* skipping packet's body */ goto try_read; /* => go read it */ case SOCKNAL_RX_GET_FMB: /* forwarding */ @@ -1631,7 +1582,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) case SOCKNAL_RX_SLOP: /* starting new packet? */ if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left)) - goto out; /* come back later */ + return (0); /* come back later */ goto try_read; /* try to finish reading slop now */ case SOCKNAL_RX_BODY_FWD: @@ -1650,7 +1601,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) LASSERT (conn->ksnc_rx_nob_left == 0); ksocknal_new_packet (conn, 0); /* on to next packet */ - goto out; /* (later) */ + return (0); /* (later) */ default: break; @@ -1658,20 +1609,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) /* Not Reached */ LBUG (); - - out: - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - - /* no data there to read? */ - if (!conn->ksnc_rx_ready) { - /* let socket callback schedule again */ - conn->ksnc_rx_scheduled = 0; - /* drop scheduler's ref */ - ksocknal_put_conn (conn); - } else { - /* stay scheduled */ - list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns); - } + return (-EINVAL); /* keep gcc happy */ } int @@ -1729,6 +1667,8 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg, int ksocknal_scheduler (void *arg) { ksock_sched_t *sched = (ksock_sched_t *)arg; + ksock_conn_t *conn; + ksock_tx_t *tx; unsigned long flags; int rc; int nloops = 0; @@ -1761,15 +1701,96 @@ int ksocknal_scheduler (void *arg) /* Ensure I progress everything semi-fairly */ if (!list_empty (&sched->kss_rx_conns)) { + conn = list_entry(sched->kss_rx_conns.next, + ksock_conn_t, ksnc_rx_list); + list_del(&conn->ksnc_rx_list); + + LASSERT(conn->ksnc_rx_scheduled); + LASSERT(conn->ksnc_rx_ready); + + /* clear rx_ready in case receive isn't complete. + * Do it BEFORE we call process_recv, since + * data_ready can set it any time after we release + * kss_lock. */ + conn->ksnc_rx_ready = 0; + spin_unlock_irqrestore(&sched->kss_lock, flags); + + rc = ksocknal_process_receive(conn); + + spin_lock_irqsave(&sched->kss_lock, flags); + + /* I'm the only one that can clear this flag */ + LASSERT(conn->ksnc_rx_scheduled); + + /* Did process_receive get everything it wanted? */ + if (rc == 0) + conn->ksnc_rx_ready = 1; + + if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP || + conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) { + /* Conn blocked for a forwarding buffer. + * It will get queued for my attention when + * one becomes available (and it might just + * already have been!). Meanwhile my ref + * on it stays put. */ + } else if (conn->ksnc_rx_ready) { + /* reschedule for rx */ + list_add_tail (&conn->ksnc_rx_list, + &sched->kss_rx_conns); + } else { + conn->ksnc_rx_scheduled = 0; + /* drop my ref */ + ksocknal_put_conn(conn); + } + did_something = 1; - /* drops & regains kss_lock */ - ksocknal_process_receive (sched, &flags); } if (!list_empty (&sched->kss_tx_conns)) { + conn = list_entry(sched->kss_tx_conns.next, + ksock_conn_t, ksnc_tx_list); + list_del (&conn->ksnc_tx_list); + + LASSERT(conn->ksnc_tx_scheduled); + LASSERT(conn->ksnc_tx_ready); + LASSERT(!list_empty(&conn->ksnc_tx_queue)); + + tx = list_entry(conn->ksnc_tx_queue.next, + ksock_tx_t, tx_list); + /* dequeue now so empty list => more to send */ + list_del(&tx->tx_list); + + /* Clear tx_ready in case send isn't complete. Do + * it BEFORE we call process_transmit, since + * write_space can set it any time after we release + * kss_lock. */ + conn->ksnc_tx_ready = 0; + spin_unlock_irqrestore (&sched->kss_lock, flags); + + rc = ksocknal_process_transmit(conn, tx); + + spin_lock_irqsave (&sched->kss_lock, flags); + + if (rc != -EAGAIN) { + /* error or everything went: assume more can go */ + conn->ksnc_tx_ready = 1; + } else { + /* back onto HEAD of tx_queue */ + list_add (&tx->tx_list, &conn->ksnc_tx_queue); + } + + if (conn->ksnc_tx_ready && + !list_empty (&conn->ksnc_tx_queue)) { + /* reschedule for tx */ + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + } else { + conn->ksnc_tx_scheduled = 0; + /* drop my ref */ + ksocknal_put_conn (conn); + } + did_something = 1; - /* drops and regains kss_lock */ - ksocknal_process_transmit (sched, &flags); } #if SOCKNAL_ZC if (!list_empty (&sched->kss_zctxdone_list)) { @@ -1833,18 +1854,11 @@ ksocknal_data_ready (struct sock *sk, int n) if (conn == NULL) { /* raced with ksocknal_close_sock */ LASSERT (sk->sk_data_ready != &ksocknal_data_ready); sk->sk_data_ready (sk, n); - goto out; - } - - if (!conn->ksnc_rx_ready) { /* new news */ - /* Set ASAP in case of concurrent calls to me */ - conn->ksnc_rx_ready = 1; - + } else { sched = conn->ksnc_scheduler; spin_lock_irqsave (&sched->kss_lock, flags); - /* Set again (process_receive may have cleared while I blocked for the lock) */ conn->ksnc_rx_ready = 1; if (!conn->ksnc_rx_scheduled) { /* not being progressed */ @@ -1860,7 +1874,6 @@ ksocknal_data_ready (struct sock *sk, int n) spin_unlock_irqrestore (&sched->kss_lock, flags); } - out: read_unlock (&ksocknal_data.ksnd_global_lock); EXIT; @@ -1898,31 +1911,24 @@ ksocknal_write_space (struct sock *sk) if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */ clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); - if (!conn->ksnc_tx_ready) { /* new news */ - /* Set ASAP in case of concurrent calls to me */ - conn->ksnc_tx_ready = 1; - - sched = conn->ksnc_scheduler; - - spin_lock_irqsave (&sched->kss_lock, flags); + sched = conn->ksnc_scheduler; - /* Set again (process_transmit may have - cleared while I blocked for the lock) */ - conn->ksnc_tx_ready = 1; + spin_lock_irqsave (&sched->kss_lock, flags); - if (!conn->ksnc_tx_scheduled && // not being progressed - !list_empty(&conn->ksnc_tx_queue)){//packets to send - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); - conn->ksnc_tx_scheduled = 1; - /* extra ref for scheduler */ - atomic_inc (&conn->ksnc_refcount); + conn->ksnc_tx_ready = 1; - wake_up (&sched->kss_waitq); - } + if (!conn->ksnc_tx_scheduled && // not being progressed + !list_empty(&conn->ksnc_tx_queue)){//packets to send + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + atomic_inc (&conn->ksnc_refcount); - spin_unlock_irqrestore (&sched->kss_lock, flags); + wake_up (&sched->kss_waitq); } + + spin_unlock_irqrestore (&sched->kss_lock, flags); } read_unlock (&ksocknal_data.ksnd_global_lock); diff --git a/lnet/utils/Makefile.am b/lnet/utils/Makefile.am index 0c65269..f1878df 100644 --- a/lnet/utils/Makefile.am +++ b/lnet/utils/Makefile.am @@ -8,10 +8,12 @@ COMPILE = $(CC) -Wall -g -I$(srcdir)/../include LINK = $(CC) -o $@ if LIBLUSTRE -sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck +tmp= else -sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck gmnalnid +tmp=gmnalnid endif + +sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck $(tmp) lib_LIBRARIES = libptlctl.a acceptor_SOURCES = acceptor.c # -lefence diff --git a/lustre/portals/knals/qswnal/qswnal.c b/lustre/portals/knals/qswnal/qswnal.c index 9caf381..70b45c0 100644 --- a/lustre/portals/knals/qswnal/qswnal.c +++ b/lustre/portals/knals/qswnal/qswnal.c @@ -179,6 +179,7 @@ kqswnal_finalise (void) case KQN_INIT_ALL: PORTAL_SYMBOL_UNREGISTER (kqswnal_ni); + kportal_nal_unregister(QSWNAL); /* fall through */ case KQN_INIT_PTL: diff --git a/lustre/portals/knals/socknal/socknal.c b/lustre/portals/knals/socknal/socknal.c index da47785..6f6fa7e 100644 --- a/lustre/portals/knals/socknal/socknal.c +++ b/lustre/portals/knals/socknal/socknal.c @@ -914,15 +914,34 @@ ksocknal_terminate_conn (ksock_conn_t *conn) * destroy it. */ unsigned long flags; ksock_peer_t *peer = conn->ksnc_peer; + ksock_sched_t *sched = conn->ksnc_scheduler; struct timeval now; time_t then = 0; int notify = 0; + LASSERT(conn->ksnc_closing); + + /* wake up the scheduler to "send" all remaining packets to /dev/null */ + spin_lock_irqsave(&sched->kss_lock, flags); + + if (!conn->ksnc_tx_scheduled && + !list_empty(&conn->ksnc_tx_queue)){ + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + /* a closing conn is always ready to tx */ + conn->ksnc_tx_ready = 1; + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + atomic_inc (&conn->ksnc_refcount); + + wake_up (&sched->kss_waitq); + } + + spin_unlock_irqrestore (&sched->kss_lock, flags); + /* serialise with callbacks */ write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags); - LASSERT (conn->ksnc_closing); - /* Remove conn's network callbacks. * NB I _have_ to restore the callback, rather than storing a noop, * since the socket could survive past this module being unloaded!! */ @@ -934,6 +953,8 @@ ksocknal_terminate_conn (ksock_conn_t *conn) * sk_user_data is NULL. */ conn->ksnc_sock->sk->sk_user_data = NULL; + /* OK, so this conn may not be completely disengaged from its + * scheduler yet, but it _has_ committed to terminate... */ conn->ksnc_scheduler->kss_nconns--; if (peer->ksnp_error != 0) { @@ -970,27 +991,20 @@ ksocknal_destroy_conn (ksock_conn_t *conn) LASSERT (conn->ksnc_route == NULL); LASSERT (!conn->ksnc_tx_scheduled); LASSERT (!conn->ksnc_rx_scheduled); - - /* complete queued packets */ - while (!list_empty (&conn->ksnc_tx_queue)) { - ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next, - ksock_tx_t, tx_list); - - CERROR ("Deleting packet %p type %d len %d ("LPX64"->"LPX64")\n", - tx, - NTOH__u32 (tx->tx_hdr->type), - NTOH__u32 (tx->tx_hdr->payload_length), - NTOH__u64 (tx->tx_hdr->src_nid), - NTOH__u64 (tx->tx_hdr->dest_nid)); - - list_del (&tx->tx_list); - ksocknal_tx_done (tx, 0); - } + LASSERT (list_empty(&conn->ksnc_tx_queue)); /* complete current receive if any */ switch (conn->ksnc_rx_state) { case SOCKNAL_RX_BODY: +#if 0 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie); +#else + CERROR ("Refusing to complete a partial receive from " + LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid, + conn->ksnc_ipaddr); + CERROR ("This may hang communications and " + "prevent modules from unloading\n"); +#endif break; case SOCKNAL_RX_BODY_FWD: ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED); diff --git a/lustre/portals/knals/socknal/socknal_cb.c b/lustre/portals/knals/socknal/socknal_cb.c index b6fdc18..6b1a7d9 100644 --- a/lustre/portals/knals/socknal/socknal_cb.c +++ b/lustre/portals/knals/socknal/socknal_cb.c @@ -691,71 +691,37 @@ ksocknal_tx_launched (ksock_tx_t *tx) ksocknal_tx_done (tx, 0); } -void -ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags) +int +ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) { - ksock_conn_t *conn; - ksock_tx_t *tx; int rc; - - LASSERT (!list_empty (&sched->kss_tx_conns)); - conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list); - list_del (&conn->ksnc_tx_list); - - LASSERT (conn->ksnc_tx_scheduled); - LASSERT (conn->ksnc_tx_ready); - LASSERT (!list_empty (&conn->ksnc_tx_queue)); - tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list); - /* assume transmit will complete now, so dequeue while I've got lock */ - list_del (&tx->tx_list); - - spin_unlock_irqrestore (&sched->kss_lock, *irq_flags); - - LASSERT (tx->tx_resid > 0); - - conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */ - mb(); /* => clear BEFORE trying to write */ - + rc = ksocknal_sendmsg (conn, tx); CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc); + LASSERT (rc != -EAGAIN); - if (rc != 0) { - if (!conn->ksnc_closing) - CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n", - conn, rc, conn->ksnc_peer->ksnp_nid, - conn->ksnc_ipaddr, conn->ksnc_port); - ksocknal_close_conn_and_siblings (conn, rc); - + if (rc == 0) { + /* no errors */ + if (tx->tx_resid != 0) { + /* didn't send everything */ + return (-EAGAIN); + } + ksocknal_tx_launched (tx); - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - - } else if (tx->tx_resid == 0) { - /* everything went; assume more can go, and avoid - * write_space locking */ - conn->ksnc_tx_ready = 1; + return (0); + } - ksocknal_tx_launched (tx); - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - } else { - spin_lock_irqsave (&sched->kss_lock, *irq_flags); + if (!conn->ksnc_closing) + CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n", + conn, rc, conn->ksnc_peer->ksnp_nid, + conn->ksnc_ipaddr, conn->ksnc_port); - /* back onto HEAD of tx_queue */ - list_add (&tx->tx_list, &conn->ksnc_tx_queue); - } + ksocknal_close_conn_and_siblings (conn, rc); + ksocknal_tx_launched (tx); - /* no space to write, or nothing to write? */ - if (!conn->ksnc_tx_ready || - list_empty (&conn->ksnc_tx_queue)) { - /* mark not scheduled */ - conn->ksnc_tx_scheduled = 0; - /* drop scheduler's ref */ - ksocknal_put_conn (conn); - } else { - /* stay scheduled */ - list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); - } -} + return (-EAGAIN); +} void ksocknal_launch_autoconnect_locked (ksock_route_t *route) @@ -893,8 +859,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn) conn->ksnc_tx_deadline = jiffies + ksocknal_data.ksnd_io_timeout * HZ; - mb(); - /* Extend deadline BEFORE tx is enqueued */ + mb(); /* order with list_add_tail */ list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); @@ -1527,25 +1492,13 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) return (0); } -void -ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) +int +ksocknal_process_receive (ksock_conn_t *conn) { - ksock_conn_t *conn; ksock_fmb_t *fmb; int rc; - - /* NB: sched->ksnc_lock lock held */ - - LASSERT (!list_empty (&sched->kss_rx_conns)); - conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list); - list_del (&conn->ksnc_rx_list); - - spin_unlock_irqrestore (&sched->kss_lock, *irq_flags); - - CDEBUG(D_NET, "sched %p conn %p\n", sched, conn); + LASSERT (atomic_read (&conn->ksnc_refcount) > 0); - LASSERT (conn->ksnc_rx_scheduled); - LASSERT (conn->ksnc_rx_ready); /* doesn't need a forwarding buffer */ if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB) @@ -1553,13 +1506,15 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) get_fmb: fmb = ksocknal_get_idle_fmb (conn); - if (fmb == NULL) { /* conn descheduled waiting for idle fmb */ - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - return; + if (fmb == NULL) { + /* conn descheduled waiting for idle fmb */ + return (0); } - if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */ - goto out; /* come back later for next packet */ + if (ksocknal_init_fmb (conn, fmb)) { + /* packet forwarded */ + return (0); + } try_read: /* NB: sched lock NOT held */ @@ -1570,9 +1525,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) LASSERT (conn->ksnc_rx_nob_wanted > 0); - conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */ - mb(); /* => clear BEFORE trying to read */ - rc = ksocknal_recvmsg(conn); if (rc <= 0) { @@ -1584,17 +1536,16 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n", conn, rc, conn->ksnc_peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port); + ksocknal_close_conn_and_siblings (conn, rc); - goto out; + return (rc == 0 ? -ESHUTDOWN : rc); } + if (conn->ksnc_rx_nob_wanted != 0) { + /* short read */ + return (-EAGAIN); + } - if (conn->ksnc_rx_nob_wanted != 0) /* short read */ - goto out; /* try again later */ - - /* got all I wanted, assume there's more - prevent data_ready locking */ - conn->ksnc_rx_ready = 1; - switch (conn->ksnc_rx_state) { case SOCKNAL_RX_HEADER: if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) && @@ -1603,7 +1554,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) ksocknal_fwd_parse (conn); switch (conn->ksnc_rx_state) { case SOCKNAL_RX_HEADER: /* skipped (zero payload) */ - goto out; /* => come back later */ + return (0); /* => come back later */ case SOCKNAL_RX_SLOP: /* skipping packet's body */ goto try_read; /* => go read it */ case SOCKNAL_RX_GET_FMB: /* forwarding */ @@ -1631,7 +1582,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) case SOCKNAL_RX_SLOP: /* starting new packet? */ if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left)) - goto out; /* come back later */ + return (0); /* come back later */ goto try_read; /* try to finish reading slop now */ case SOCKNAL_RX_BODY_FWD: @@ -1650,7 +1601,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) LASSERT (conn->ksnc_rx_nob_left == 0); ksocknal_new_packet (conn, 0); /* on to next packet */ - goto out; /* (later) */ + return (0); /* (later) */ default: break; @@ -1658,20 +1609,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags) /* Not Reached */ LBUG (); - - out: - spin_lock_irqsave (&sched->kss_lock, *irq_flags); - - /* no data there to read? */ - if (!conn->ksnc_rx_ready) { - /* let socket callback schedule again */ - conn->ksnc_rx_scheduled = 0; - /* drop scheduler's ref */ - ksocknal_put_conn (conn); - } else { - /* stay scheduled */ - list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns); - } + return (-EINVAL); /* keep gcc happy */ } int @@ -1729,6 +1667,8 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg, int ksocknal_scheduler (void *arg) { ksock_sched_t *sched = (ksock_sched_t *)arg; + ksock_conn_t *conn; + ksock_tx_t *tx; unsigned long flags; int rc; int nloops = 0; @@ -1761,15 +1701,96 @@ int ksocknal_scheduler (void *arg) /* Ensure I progress everything semi-fairly */ if (!list_empty (&sched->kss_rx_conns)) { + conn = list_entry(sched->kss_rx_conns.next, + ksock_conn_t, ksnc_rx_list); + list_del(&conn->ksnc_rx_list); + + LASSERT(conn->ksnc_rx_scheduled); + LASSERT(conn->ksnc_rx_ready); + + /* clear rx_ready in case receive isn't complete. + * Do it BEFORE we call process_recv, since + * data_ready can set it any time after we release + * kss_lock. */ + conn->ksnc_rx_ready = 0; + spin_unlock_irqrestore(&sched->kss_lock, flags); + + rc = ksocknal_process_receive(conn); + + spin_lock_irqsave(&sched->kss_lock, flags); + + /* I'm the only one that can clear this flag */ + LASSERT(conn->ksnc_rx_scheduled); + + /* Did process_receive get everything it wanted? */ + if (rc == 0) + conn->ksnc_rx_ready = 1; + + if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP || + conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) { + /* Conn blocked for a forwarding buffer. + * It will get queued for my attention when + * one becomes available (and it might just + * already have been!). Meanwhile my ref + * on it stays put. */ + } else if (conn->ksnc_rx_ready) { + /* reschedule for rx */ + list_add_tail (&conn->ksnc_rx_list, + &sched->kss_rx_conns); + } else { + conn->ksnc_rx_scheduled = 0; + /* drop my ref */ + ksocknal_put_conn(conn); + } + did_something = 1; - /* drops & regains kss_lock */ - ksocknal_process_receive (sched, &flags); } if (!list_empty (&sched->kss_tx_conns)) { + conn = list_entry(sched->kss_tx_conns.next, + ksock_conn_t, ksnc_tx_list); + list_del (&conn->ksnc_tx_list); + + LASSERT(conn->ksnc_tx_scheduled); + LASSERT(conn->ksnc_tx_ready); + LASSERT(!list_empty(&conn->ksnc_tx_queue)); + + tx = list_entry(conn->ksnc_tx_queue.next, + ksock_tx_t, tx_list); + /* dequeue now so empty list => more to send */ + list_del(&tx->tx_list); + + /* Clear tx_ready in case send isn't complete. Do + * it BEFORE we call process_transmit, since + * write_space can set it any time after we release + * kss_lock. */ + conn->ksnc_tx_ready = 0; + spin_unlock_irqrestore (&sched->kss_lock, flags); + + rc = ksocknal_process_transmit(conn, tx); + + spin_lock_irqsave (&sched->kss_lock, flags); + + if (rc != -EAGAIN) { + /* error or everything went: assume more can go */ + conn->ksnc_tx_ready = 1; + } else { + /* back onto HEAD of tx_queue */ + list_add (&tx->tx_list, &conn->ksnc_tx_queue); + } + + if (conn->ksnc_tx_ready && + !list_empty (&conn->ksnc_tx_queue)) { + /* reschedule for tx */ + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + } else { + conn->ksnc_tx_scheduled = 0; + /* drop my ref */ + ksocknal_put_conn (conn); + } + did_something = 1; - /* drops and regains kss_lock */ - ksocknal_process_transmit (sched, &flags); } #if SOCKNAL_ZC if (!list_empty (&sched->kss_zctxdone_list)) { @@ -1833,18 +1854,11 @@ ksocknal_data_ready (struct sock *sk, int n) if (conn == NULL) { /* raced with ksocknal_close_sock */ LASSERT (sk->sk_data_ready != &ksocknal_data_ready); sk->sk_data_ready (sk, n); - goto out; - } - - if (!conn->ksnc_rx_ready) { /* new news */ - /* Set ASAP in case of concurrent calls to me */ - conn->ksnc_rx_ready = 1; - + } else { sched = conn->ksnc_scheduler; spin_lock_irqsave (&sched->kss_lock, flags); - /* Set again (process_receive may have cleared while I blocked for the lock) */ conn->ksnc_rx_ready = 1; if (!conn->ksnc_rx_scheduled) { /* not being progressed */ @@ -1860,7 +1874,6 @@ ksocknal_data_ready (struct sock *sk, int n) spin_unlock_irqrestore (&sched->kss_lock, flags); } - out: read_unlock (&ksocknal_data.ksnd_global_lock); EXIT; @@ -1898,31 +1911,24 @@ ksocknal_write_space (struct sock *sk) if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */ clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); - if (!conn->ksnc_tx_ready) { /* new news */ - /* Set ASAP in case of concurrent calls to me */ - conn->ksnc_tx_ready = 1; - - sched = conn->ksnc_scheduler; - - spin_lock_irqsave (&sched->kss_lock, flags); + sched = conn->ksnc_scheduler; - /* Set again (process_transmit may have - cleared while I blocked for the lock) */ - conn->ksnc_tx_ready = 1; + spin_lock_irqsave (&sched->kss_lock, flags); - if (!conn->ksnc_tx_scheduled && // not being progressed - !list_empty(&conn->ksnc_tx_queue)){//packets to send - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); - conn->ksnc_tx_scheduled = 1; - /* extra ref for scheduler */ - atomic_inc (&conn->ksnc_refcount); + conn->ksnc_tx_ready = 1; - wake_up (&sched->kss_waitq); - } + if (!conn->ksnc_tx_scheduled && // not being progressed + !list_empty(&conn->ksnc_tx_queue)){//packets to send + list_add_tail (&conn->ksnc_tx_list, + &sched->kss_tx_conns); + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + atomic_inc (&conn->ksnc_refcount); - spin_unlock_irqrestore (&sched->kss_lock, flags); + wake_up (&sched->kss_waitq); } + + spin_unlock_irqrestore (&sched->kss_lock, flags); } read_unlock (&ksocknal_data.ksnd_global_lock); diff --git a/lustre/portals/utils/Makefile.am b/lustre/portals/utils/Makefile.am index 0c65269..f1878df 100644 --- a/lustre/portals/utils/Makefile.am +++ b/lustre/portals/utils/Makefile.am @@ -8,10 +8,12 @@ COMPILE = $(CC) -Wall -g -I$(srcdir)/../include LINK = $(CC) -o $@ if LIBLUSTRE -sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck +tmp= else -sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck gmnalnid +tmp=gmnalnid endif + +sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck $(tmp) lib_LIBRARIES = libptlctl.a acceptor_SOURCES = acceptor.c # -lefence diff --git a/lustre/ptlrpc/llog_server.c b/lustre/ptlrpc/llog_server.c index b2c087f..14247d6 100644 --- a/lustre/ptlrpc/llog_server.c +++ b/lustre/ptlrpc/llog_server.c @@ -237,3 +237,47 @@ int llog_origin_handle_close(struct ptlrpc_request *req) RETURN(rc); } + +#ifdef ENABLE_ORPHANS +int llog_origin_handle_cancel(struct ptlrpc_request *req) +{ + struct obd_device *obd = req->rq_export->exp_obd; + struct obd_device *disk_obd; + struct llog_cookie *logcookies; + struct llog_ctxt *ctxt; + int num_cookies, rc = 0; + struct obd_run_ctxt saved; + struct llog_handle *cathandle; + ENTRY; + + logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies)); + num_cookies = req->rq_reqmsg->buflens[0]/sizeof(*logcookies); + if (logcookies == NULL || num_cookies == 0) { + DEBUG_REQ(D_HA, req, "no cookies sent"); + RETURN(-EFAULT); + } + + ctxt = llog_get_context(obd, logcookies->lgc_subsys); + if (ctxt == NULL) { + CWARN("llog subsys not setup or already cleanup\n"); + RETURN(-ENOENT); + } + down(&ctxt->loc_sem); + disk_obd = ctxt->loc_exp->exp_obd; + cathandle = ctxt->loc_handle; + LASSERT(cathandle); + + push_ctxt(&saved, &disk_obd->obd_ctxt, NULL); + rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies); + if (rc) + CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc); + else + CWARN("cancel %d llog-records\n", num_cookies); + + pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL); + up(&ctxt->loc_sem); + + RETURN(rc); +} +EXPORT_SYMBOL(llog_origin_handle_cancel); +#endif -- 1.8.3.1