From 23c2768cfacd6f9fb9dbabbfae9fc7ba5e7b129d Mon Sep 17 00:00:00 2001 From: eeb Date: Fri, 29 Oct 2004 18:02:09 +0000 Subject: [PATCH] * Added socknal multi-frag I/O --- lnet/klnds/socklnd/socklnd.h | 12 + lnet/klnds/socklnd/socklnd_cb.c | 370 ++++++++++++++++++------------ lustre/portals/knals/socknal/socknal.h | 12 + lustre/portals/knals/socknal/socknal_cb.c | 370 ++++++++++++++++++------------ 4 files changed, 482 insertions(+), 282 deletions(-) diff --git a/lnet/klnds/socklnd/socklnd.h b/lnet/klnds/socklnd/socklnd.h index b8bbefd..20cd3d9 100644 --- a/lnet/klnds/socklnd/socklnd.h +++ b/lnet/klnds/socklnd/socklnd.h @@ -104,6 +104,11 @@ #define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10) +#define SOCKNAL_SINGLE_FRAG_TX 0 /* disable multi-fragment sends */ +#define SOCKNAL_SINGLE_FRAG_RX 0 /* disable multi-fragment receives */ +#define SOCKNAL_RISK_KMAP_DEADLOCK 0 /* risk kmap deadlock on multi-frag I/O + * (backs off to single-frag if disabled) */ + #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,72)) # define sk_allocation allocation # define sk_data_ready data_ready @@ -349,6 +354,13 @@ typedef struct ksock_conn atomic_t ksnc_tx_nob; /* # bytes queued */ int ksnc_tx_ready; /* write space */ int ksnc_tx_scheduled; /* being progressed */ + +#if !SOCKNAL_SINGLE_FRAG_RX + struct iovec ksnc_rx_scratch_iov[PTL_MD_MAX_IOV]; +#endif +#if !SOCKNAL_SINGLE_FRAG_TX + struct iovec ksnc_tx_scratch_iov[PTL_MD_MAX_IOV]; +#endif } ksock_conn_t; #define KSNR_TYPED_ROUTES ((1 << SOCKNAL_CONN_CONTROL) | \ diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index 762133e..ed91f94 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -84,21 +84,17 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) { struct socket *sock = conn->ksnc_sock; struct iovec *iov = tx->tx_iov; - int fragsize = iov->iov_len; - unsigned long vaddr = (unsigned long)iov->iov_base; - int more = (tx->tx_niov > 1) || - (tx->tx_nkiov > 0) || - (!list_empty (&conn->ksnc_tx_queue)); #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) + unsigned long vaddr = (unsigned long)iov->iov_base int offset = vaddr & (PAGE_SIZE - 1); - int zcsize = MIN (fragsize, PAGE_SIZE - offset); + int zcsize = MIN (iov->iov_len, PAGE_SIZE - offset); struct page *page; #endif + int nob; int rc; /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only send 1 frag at a time. */ - LASSERT (fragsize <= tx->tx_resid); + * or leave them alone. */ LASSERT (tx->tx_niov > 0); #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) @@ -106,52 +102,74 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) (sock->sk->route_caps & NETIF_F_SG) && (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) && (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) { + int msgflg = MSG_DONTWAIT; CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n", (void *)vaddr, page, page_address(page), offset, zcsize); - if (fragsize > zcsize) { - more = 1; - fragsize = zcsize; - } - - rc = tcp_sendpage_zccd(sock, page, offset, zcsize, - more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT, - &tx->tx_zccd); + if (!list_empty (&conn->ksnc_tx_queue) || + zcsize < tx->tx_resid) + msgflg |= MSG_MORE; + + rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd); } else #endif { - /* NB don't pass tx's iov; sendmsg may or may not update it */ - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; +#if SOCKNAL_SINGLE_FRAG_TX + struct iovec scratch; + struct iovec *scratchiov = &scratch; + int niov = 1; +#else + struct iovec *scratchiov = conn->ksnc_tx_scratch_iov; + int niov = tx->tx_niov; +#endif struct msghdr msg = { .msg_name = NULL, .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, + .msg_iov = scratchiov, + .msg_iovlen = niov, .msg_control = NULL, .msg_controllen = 0, - .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT + .msg_flags = MSG_DONTWAIT }; mm_segment_t oldmm = get_fs(); + int i; + for (nob = i = 0; i < niov; i++) { + scratchiov[i] = tx->tx_iov[i]; + nob += scratchiov[i].iov_len; + } + + if (!list_empty(&conn->ksnc_tx_queue) || + nob < tx->tx_resid) + msg.msg_flags |= MSG_MORE; + set_fs (KERNEL_DS); - rc = sock_sendmsg(sock, &msg, fragsize); + rc = sock_sendmsg(sock, &msg, nob); set_fs (oldmm); } - if (rc > 0) { - tx->tx_resid -= rc; + if (rc <= 0) /* sent nothing? */ + return (rc); - if (rc < iov->iov_len) { - /* didn't send whole iov entry... */ - iov->iov_base = (void *)(vaddr + rc); - iov->iov_len -= rc; - } else { - tx->tx_iov++; - tx->tx_niov--; + nob = rc; + LASSERT (nob <= tx->tx_resid); + tx->tx_resid -= nob; + + /* "consume" iov */ + do { + LASSERT (tx->tx_niov > 0); + + if (nob < iov->iov_len) { + iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); + iov->iov_len -= nob; + return (rc); } - } + + nob -= iov->iov_len; + tx->tx_iov = ++iov; + tx->tx_niov--; + } while (nob != 0); return (rc); } @@ -161,66 +179,94 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx) { struct socket *sock = conn->ksnc_sock; ptl_kiov_t *kiov = tx->tx_kiov; - int fragsize = kiov->kiov_len; - struct page *page = kiov->kiov_page; - int offset = kiov->kiov_offset; - int more = (tx->tx_nkiov > 1) || - (!list_empty (&conn->ksnc_tx_queue)); int rc; - + int nob; + /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only send 1 frag at a time. */ - LASSERT (fragsize <= tx->tx_resid); - LASSERT (offset + fragsize <= PAGE_SIZE); + * or leave them alone. */ LASSERT (tx->tx_niov == 0); LASSERT (tx->tx_nkiov > 0); #if SOCKNAL_ZC - if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag && + if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag && (sock->sk->route_caps & NETIF_F_SG) && (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) { + struct page *page = kiov->kiov_page; + int offset = kiov->kiov_offset; + int fragsize = kiov->kiov_len; + int msgflg = MSG_DONTWAIT; CDEBUG(D_NET, "page %p + offset %x for %d\n", - page, offset, fragsize); + page, offset, kiov->kiov_len); - rc = tcp_sendpage_zccd(sock, page, offset, fragsize, - more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT, + if (!list_empty(&conn->ksnc_tx_queue) || + fragsize < tx->tx_resid) + msgflg |= MSG_MORE; + + rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg, &tx->tx_zccd); } else #endif { - char *addr = ((char *)kmap (page)) + offset; - struct iovec fragiov = {.iov_base = addr, - .iov_len = fragsize}; +#if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK + struct iovec scratch; + struct iovec *scratchiov = &scratch; + int niov = 1; +#else +#warning "XXX risk of kmap deadlock on multiple frags..." + struct iovec *scratchiov = conn->ksnc_tx_scratch_iov; + int niov = tx->tx_nkiov; +#endif struct msghdr msg = { .msg_name = NULL, .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, + .msg_iov = scratchiov, + .msg_iovlen = niov, .msg_control = NULL, .msg_controllen = 0, - .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT + .msg_flags = MSG_DONTWAIT }; mm_segment_t oldmm = get_fs(); + int i; + for (nob = i = 0; i < niov; i++) { + scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + + kiov[i].kiov_offset; + nob += scratchiov[i].iov_len = kiov[i].kiov_len; + } + + if (!list_empty(&conn->ksnc_tx_queue) || + nob < tx->tx_resid) + msg.msg_flags |= MSG_DONTWAIT; + set_fs (KERNEL_DS); - rc = sock_sendmsg(sock, &msg, fragsize); + rc = sock_sendmsg(sock, &msg, nob); set_fs (oldmm); - kunmap (page); + for (i = 0; i < niov; i++) + kunmap(kiov[i].kiov_page); } - if (rc > 0) { - tx->tx_resid -= rc; - - if (rc < fragsize) { - kiov->kiov_offset = offset + rc; - kiov->kiov_len = fragsize - rc; - } else { - tx->tx_kiov++; - tx->tx_nkiov--; + if (rc <= 0) /* sent nothing? */ + return (rc); + + nob = rc; + LASSERT (nob <= tx->tx_resid); + tx->tx_resid -= nob; + + do { + LASSERT(tx->tx_nkiov > 0); + + if (nob < kiov->kiov_len) { + kiov->kiov_offset += nob; + kiov->kiov_len -= nob; + return rc; } - } + + nob -= kiov->kiov_len; + tx->tx_kiov = ++kiov; + tx->tx_nkiov--; + } while (nob != 0); return (rc); } @@ -269,35 +315,35 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx) mb(); } - if (rc <= 0) { - /* Didn't write anything. - * - * NB: rc == 0 and rc == -EAGAIN both mean try - * again later (linux stack returns -EAGAIN for - * this, but Adaptech TOE returns 0). - * - * Also, sends never fail with -ENOMEM, just - * -EAGAIN, but with the added bonus that we can't - * expect write_space() to call us back to tell us - * when to try sending again. We use the - * SOCK_NOSPACE flag to diagnose... */ - - LASSERT(rc != -ENOMEM); - - if (rc == 0 || rc == -EAGAIN) { - if (test_bit(SOCK_NOSPACE, - &conn->ksnc_sock->flags)) { - rc = -EAGAIN; - } else { - static int counter; - - counter++; - if ((counter & (-counter)) == counter) - CWARN("%d ENOMEM tx %p\n", - counter, conn); - rc = -ENOMEM; - } + if (rc <= 0) { /* Didn't write anything? */ + unsigned long flags; + ksock_sched_t *sched; + + if (rc == 0) /* some stacks return 0 instead of -EAGAIN */ + rc = -EAGAIN; + + if (rc != -EAGAIN) + break; + + /* Check if EAGAIN is due to memory pressure */ + + sched = conn->ksnc_scheduler; + spin_lock_irqsave(&sched->kss_lock, flags); + + if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) && + !conn->ksnc_tx_ready) { + /* SOCK_NOSPACE is set when the socket fills + * and cleared in the write_space callback + * (which also sets ksnc_tx_ready). If + * SOCK_NOSPACE and ksnc_tx_ready are BOTH + * zero, I didn't fill the socket and + * write_space won't reschedule me, so I + * return -ENOMEM to get my caller to retry + * after a timeout */ + rc = -ENOMEM; } + + spin_unlock_irqrestore(&sched->kss_lock, flags); break; } @@ -332,114 +378,151 @@ ksocknal_eager_ack (ksock_conn_t *conn) int ksocknal_recv_iov (ksock_conn_t *conn) { +#if SOCKNAL_SINGLE_FRAG_RX + struct iovec scratch; + struct iovec *scratchiov = &scratch; + int niov = 1; +#else + struct iovec *scratchiov = conn->ksnc_rx_scratch_iov; + int niov = conn->ksnc_rx_niov; +#endif struct iovec *iov = conn->ksnc_rx_iov; - int fragsize = iov->iov_len; - unsigned long vaddr = (unsigned long)iov->iov_base; - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; struct msghdr msg = { .msg_name = NULL, .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, + .msg_iov = scratchiov, + .msg_iovlen = niov, .msg_control = NULL, .msg_controllen = 0, .msg_flags = 0 }; mm_segment_t oldmm = get_fs(); + int nob; + int i; int rc; /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only receive 1 frag at a time. */ - LASSERT (conn->ksnc_rx_niov > 0); - LASSERT (fragsize <= conn->ksnc_rx_nob_wanted); + * or leave them alone. */ + LASSERT (niov > 0); + + for (nob = i = 0; i < niov; i++) { + scratchiov[i] = iov[i]; + nob += scratchiov[i].iov_len; + } + LASSERT (nob <= conn->ksnc_rx_nob_wanted); set_fs (KERNEL_DS); - rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT); - /* NB this is just a boolean............................^ */ + rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT); + /* NB this is just a boolean..........................^ */ set_fs (oldmm); if (rc <= 0) return (rc); /* received something... */ + nob = rc; + conn->ksnc_peer->ksnp_last_alive = jiffies; conn->ksnc_rx_deadline = jiffies + ksocknal_tunables.ksnd_io_timeout * HZ; mb(); /* order with setting rx_started */ conn->ksnc_rx_started = 1; - conn->ksnc_rx_nob_wanted -= rc; - conn->ksnc_rx_nob_left -= rc; + conn->ksnc_rx_nob_wanted -= nob; + conn->ksnc_rx_nob_left -= nob; + + do { + LASSERT (conn->ksnc_rx_niov > 0); - if (rc < fragsize) { - iov->iov_base = (void *)(vaddr + rc); - iov->iov_len = fragsize - rc; - return (-EAGAIN); - } + if (nob < iov->iov_len) { + iov->iov_len -= nob; + iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob); + return (-EAGAIN); + } + + nob -= iov->iov_len; + conn->ksnc_rx_iov = ++iov; + conn->ksnc_rx_niov--; + } while (nob != 0); - conn->ksnc_rx_iov++; - conn->ksnc_rx_niov--; - return (1); + return (rc); } int ksocknal_recv_kiov (ksock_conn_t *conn) { +#if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK + struct iovec scratch; + struct iovec *scratchiov = &scratch; + int niov = 1; +#else +#warning "XXX risk of kmap deadlock on multiple frags..." + struct iovec *scratchiov = conn->ksnc_rx_scratch_iov; + int niov = conn->ksnc_rx_nkiov; +#endif ptl_kiov_t *kiov = conn->ksnc_rx_kiov; - struct page *page = kiov->kiov_page; - int offset = kiov->kiov_offset; - int fragsize = kiov->kiov_len; - unsigned long vaddr = ((unsigned long)kmap (page)) + offset; - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; struct msghdr msg = { .msg_name = NULL, .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, + .msg_iov = scratchiov, + .msg_iovlen = niov, .msg_control = NULL, .msg_controllen = 0, .msg_flags = 0 }; mm_segment_t oldmm = get_fs(); + int nob; + int i; int rc; - /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only receive 1 frag at a time. */ - LASSERT (fragsize <= conn->ksnc_rx_nob_wanted); LASSERT (conn->ksnc_rx_nkiov > 0); - LASSERT (offset + fragsize <= PAGE_SIZE); + + /* NB we can't trust socket ops to either consume our iovs + * or leave them alone. */ + for (nob = i = 0; i < niov; i++) { + scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset; + nob += scratchiov[i].iov_len = kiov[i].kiov_len; + } + LASSERT (nob <= conn->ksnc_rx_nob_wanted); set_fs (KERNEL_DS); - rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT); - /* NB this is just a boolean............................^ */ + rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT); + /* NB this is just a boolean.......................^ */ set_fs (oldmm); - kunmap (page); - + for (i = 0; i < niov; i++) + kunmap(kiov[i].kiov_page); + if (rc <= 0) return (rc); /* received something... */ + nob = rc; + conn->ksnc_peer->ksnp_last_alive = jiffies; conn->ksnc_rx_deadline = jiffies + ksocknal_tunables.ksnd_io_timeout * HZ; mb(); /* order with setting rx_started */ conn->ksnc_rx_started = 1; - conn->ksnc_rx_nob_wanted -= rc; - conn->ksnc_rx_nob_left -= rc; + conn->ksnc_rx_nob_wanted -= nob; + conn->ksnc_rx_nob_left -= nob; + + do { + LASSERT (conn->ksnc_rx_nkiov > 0); - if (rc < fragsize) { - kiov->kiov_offset = offset + rc; - kiov->kiov_len = fragsize - rc; - return (-EAGAIN); - } + if (nob < kiov->kiov_len) { + kiov->kiov_offset += nob; + kiov->kiov_len -= nob; + return -EAGAIN; + } + + nob -= kiov->kiov_len; + conn->ksnc_rx_kiov = ++kiov; + conn->ksnc_rx_nkiov--; + } while (nob != 0); - conn->ksnc_rx_kiov++; - conn->ksnc_rx_nkiov--; - return (1); + return 1; } int @@ -599,6 +682,12 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) return (rc); if (rc == -ENOMEM) { + static int counter; + + counter++; /* exponential backoff warnings */ + if ((counter & (-counter)) == counter) + CWARN("%d ENOMEM tx %p\n", counter, conn); + /* Queue on ksnd_enomem_conns for retry after a timeout */ spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags); @@ -1853,12 +1942,11 @@ ksocknal_write_space (struct sock *sk) } if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */ - clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); - sched = conn->ksnc_scheduler; spin_lock_irqsave (&sched->kss_lock, flags); + clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); conn->ksnc_tx_ready = 1; if (!conn->ksnc_tx_scheduled && // not being progressed diff --git a/lustre/portals/knals/socknal/socknal.h b/lustre/portals/knals/socknal/socknal.h index b8bbefd..20cd3d9 100644 --- a/lustre/portals/knals/socknal/socknal.h +++ b/lustre/portals/knals/socknal/socknal.h @@ -104,6 +104,11 @@ #define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10) +#define SOCKNAL_SINGLE_FRAG_TX 0 /* disable multi-fragment sends */ +#define SOCKNAL_SINGLE_FRAG_RX 0 /* disable multi-fragment receives */ +#define SOCKNAL_RISK_KMAP_DEADLOCK 0 /* risk kmap deadlock on multi-frag I/O + * (backs off to single-frag if disabled) */ + #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,72)) # define sk_allocation allocation # define sk_data_ready data_ready @@ -349,6 +354,13 @@ typedef struct ksock_conn atomic_t ksnc_tx_nob; /* # bytes queued */ int ksnc_tx_ready; /* write space */ int ksnc_tx_scheduled; /* being progressed */ + +#if !SOCKNAL_SINGLE_FRAG_RX + struct iovec ksnc_rx_scratch_iov[PTL_MD_MAX_IOV]; +#endif +#if !SOCKNAL_SINGLE_FRAG_TX + struct iovec ksnc_tx_scratch_iov[PTL_MD_MAX_IOV]; +#endif } ksock_conn_t; #define KSNR_TYPED_ROUTES ((1 << SOCKNAL_CONN_CONTROL) | \ diff --git a/lustre/portals/knals/socknal/socknal_cb.c b/lustre/portals/knals/socknal/socknal_cb.c index 762133e..ed91f94 100644 --- a/lustre/portals/knals/socknal/socknal_cb.c +++ b/lustre/portals/knals/socknal/socknal_cb.c @@ -84,21 +84,17 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) { struct socket *sock = conn->ksnc_sock; struct iovec *iov = tx->tx_iov; - int fragsize = iov->iov_len; - unsigned long vaddr = (unsigned long)iov->iov_base; - int more = (tx->tx_niov > 1) || - (tx->tx_nkiov > 0) || - (!list_empty (&conn->ksnc_tx_queue)); #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) + unsigned long vaddr = (unsigned long)iov->iov_base int offset = vaddr & (PAGE_SIZE - 1); - int zcsize = MIN (fragsize, PAGE_SIZE - offset); + int zcsize = MIN (iov->iov_len, PAGE_SIZE - offset); struct page *page; #endif + int nob; int rc; /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only send 1 frag at a time. */ - LASSERT (fragsize <= tx->tx_resid); + * or leave them alone. */ LASSERT (tx->tx_niov > 0); #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) @@ -106,52 +102,74 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) (sock->sk->route_caps & NETIF_F_SG) && (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) && (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) { + int msgflg = MSG_DONTWAIT; CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n", (void *)vaddr, page, page_address(page), offset, zcsize); - if (fragsize > zcsize) { - more = 1; - fragsize = zcsize; - } - - rc = tcp_sendpage_zccd(sock, page, offset, zcsize, - more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT, - &tx->tx_zccd); + if (!list_empty (&conn->ksnc_tx_queue) || + zcsize < tx->tx_resid) + msgflg |= MSG_MORE; + + rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd); } else #endif { - /* NB don't pass tx's iov; sendmsg may or may not update it */ - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; +#if SOCKNAL_SINGLE_FRAG_TX + struct iovec scratch; + struct iovec *scratchiov = &scratch; + int niov = 1; +#else + struct iovec *scratchiov = conn->ksnc_tx_scratch_iov; + int niov = tx->tx_niov; +#endif struct msghdr msg = { .msg_name = NULL, .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, + .msg_iov = scratchiov, + .msg_iovlen = niov, .msg_control = NULL, .msg_controllen = 0, - .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT + .msg_flags = MSG_DONTWAIT }; mm_segment_t oldmm = get_fs(); + int i; + for (nob = i = 0; i < niov; i++) { + scratchiov[i] = tx->tx_iov[i]; + nob += scratchiov[i].iov_len; + } + + if (!list_empty(&conn->ksnc_tx_queue) || + nob < tx->tx_resid) + msg.msg_flags |= MSG_MORE; + set_fs (KERNEL_DS); - rc = sock_sendmsg(sock, &msg, fragsize); + rc = sock_sendmsg(sock, &msg, nob); set_fs (oldmm); } - if (rc > 0) { - tx->tx_resid -= rc; + if (rc <= 0) /* sent nothing? */ + return (rc); - if (rc < iov->iov_len) { - /* didn't send whole iov entry... */ - iov->iov_base = (void *)(vaddr + rc); - iov->iov_len -= rc; - } else { - tx->tx_iov++; - tx->tx_niov--; + nob = rc; + LASSERT (nob <= tx->tx_resid); + tx->tx_resid -= nob; + + /* "consume" iov */ + do { + LASSERT (tx->tx_niov > 0); + + if (nob < iov->iov_len) { + iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); + iov->iov_len -= nob; + return (rc); } - } + + nob -= iov->iov_len; + tx->tx_iov = ++iov; + tx->tx_niov--; + } while (nob != 0); return (rc); } @@ -161,66 +179,94 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx) { struct socket *sock = conn->ksnc_sock; ptl_kiov_t *kiov = tx->tx_kiov; - int fragsize = kiov->kiov_len; - struct page *page = kiov->kiov_page; - int offset = kiov->kiov_offset; - int more = (tx->tx_nkiov > 1) || - (!list_empty (&conn->ksnc_tx_queue)); int rc; - + int nob; + /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only send 1 frag at a time. */ - LASSERT (fragsize <= tx->tx_resid); - LASSERT (offset + fragsize <= PAGE_SIZE); + * or leave them alone. */ LASSERT (tx->tx_niov == 0); LASSERT (tx->tx_nkiov > 0); #if SOCKNAL_ZC - if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag && + if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag && (sock->sk->route_caps & NETIF_F_SG) && (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) { + struct page *page = kiov->kiov_page; + int offset = kiov->kiov_offset; + int fragsize = kiov->kiov_len; + int msgflg = MSG_DONTWAIT; CDEBUG(D_NET, "page %p + offset %x for %d\n", - page, offset, fragsize); + page, offset, kiov->kiov_len); - rc = tcp_sendpage_zccd(sock, page, offset, fragsize, - more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT, + if (!list_empty(&conn->ksnc_tx_queue) || + fragsize < tx->tx_resid) + msgflg |= MSG_MORE; + + rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg, &tx->tx_zccd); } else #endif { - char *addr = ((char *)kmap (page)) + offset; - struct iovec fragiov = {.iov_base = addr, - .iov_len = fragsize}; +#if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK + struct iovec scratch; + struct iovec *scratchiov = &scratch; + int niov = 1; +#else +#warning "XXX risk of kmap deadlock on multiple frags..." + struct iovec *scratchiov = conn->ksnc_tx_scratch_iov; + int niov = tx->tx_nkiov; +#endif struct msghdr msg = { .msg_name = NULL, .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, + .msg_iov = scratchiov, + .msg_iovlen = niov, .msg_control = NULL, .msg_controllen = 0, - .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT + .msg_flags = MSG_DONTWAIT }; mm_segment_t oldmm = get_fs(); + int i; + for (nob = i = 0; i < niov; i++) { + scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + + kiov[i].kiov_offset; + nob += scratchiov[i].iov_len = kiov[i].kiov_len; + } + + if (!list_empty(&conn->ksnc_tx_queue) || + nob < tx->tx_resid) + msg.msg_flags |= MSG_DONTWAIT; + set_fs (KERNEL_DS); - rc = sock_sendmsg(sock, &msg, fragsize); + rc = sock_sendmsg(sock, &msg, nob); set_fs (oldmm); - kunmap (page); + for (i = 0; i < niov; i++) + kunmap(kiov[i].kiov_page); } - if (rc > 0) { - tx->tx_resid -= rc; - - if (rc < fragsize) { - kiov->kiov_offset = offset + rc; - kiov->kiov_len = fragsize - rc; - } else { - tx->tx_kiov++; - tx->tx_nkiov--; + if (rc <= 0) /* sent nothing? */ + return (rc); + + nob = rc; + LASSERT (nob <= tx->tx_resid); + tx->tx_resid -= nob; + + do { + LASSERT(tx->tx_nkiov > 0); + + if (nob < kiov->kiov_len) { + kiov->kiov_offset += nob; + kiov->kiov_len -= nob; + return rc; } - } + + nob -= kiov->kiov_len; + tx->tx_kiov = ++kiov; + tx->tx_nkiov--; + } while (nob != 0); return (rc); } @@ -269,35 +315,35 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx) mb(); } - if (rc <= 0) { - /* Didn't write anything. - * - * NB: rc == 0 and rc == -EAGAIN both mean try - * again later (linux stack returns -EAGAIN for - * this, but Adaptech TOE returns 0). - * - * Also, sends never fail with -ENOMEM, just - * -EAGAIN, but with the added bonus that we can't - * expect write_space() to call us back to tell us - * when to try sending again. We use the - * SOCK_NOSPACE flag to diagnose... */ - - LASSERT(rc != -ENOMEM); - - if (rc == 0 || rc == -EAGAIN) { - if (test_bit(SOCK_NOSPACE, - &conn->ksnc_sock->flags)) { - rc = -EAGAIN; - } else { - static int counter; - - counter++; - if ((counter & (-counter)) == counter) - CWARN("%d ENOMEM tx %p\n", - counter, conn); - rc = -ENOMEM; - } + if (rc <= 0) { /* Didn't write anything? */ + unsigned long flags; + ksock_sched_t *sched; + + if (rc == 0) /* some stacks return 0 instead of -EAGAIN */ + rc = -EAGAIN; + + if (rc != -EAGAIN) + break; + + /* Check if EAGAIN is due to memory pressure */ + + sched = conn->ksnc_scheduler; + spin_lock_irqsave(&sched->kss_lock, flags); + + if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) && + !conn->ksnc_tx_ready) { + /* SOCK_NOSPACE is set when the socket fills + * and cleared in the write_space callback + * (which also sets ksnc_tx_ready). If + * SOCK_NOSPACE and ksnc_tx_ready are BOTH + * zero, I didn't fill the socket and + * write_space won't reschedule me, so I + * return -ENOMEM to get my caller to retry + * after a timeout */ + rc = -ENOMEM; } + + spin_unlock_irqrestore(&sched->kss_lock, flags); break; } @@ -332,114 +378,151 @@ ksocknal_eager_ack (ksock_conn_t *conn) int ksocknal_recv_iov (ksock_conn_t *conn) { +#if SOCKNAL_SINGLE_FRAG_RX + struct iovec scratch; + struct iovec *scratchiov = &scratch; + int niov = 1; +#else + struct iovec *scratchiov = conn->ksnc_rx_scratch_iov; + int niov = conn->ksnc_rx_niov; +#endif struct iovec *iov = conn->ksnc_rx_iov; - int fragsize = iov->iov_len; - unsigned long vaddr = (unsigned long)iov->iov_base; - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; struct msghdr msg = { .msg_name = NULL, .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, + .msg_iov = scratchiov, + .msg_iovlen = niov, .msg_control = NULL, .msg_controllen = 0, .msg_flags = 0 }; mm_segment_t oldmm = get_fs(); + int nob; + int i; int rc; /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only receive 1 frag at a time. */ - LASSERT (conn->ksnc_rx_niov > 0); - LASSERT (fragsize <= conn->ksnc_rx_nob_wanted); + * or leave them alone. */ + LASSERT (niov > 0); + + for (nob = i = 0; i < niov; i++) { + scratchiov[i] = iov[i]; + nob += scratchiov[i].iov_len; + } + LASSERT (nob <= conn->ksnc_rx_nob_wanted); set_fs (KERNEL_DS); - rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT); - /* NB this is just a boolean............................^ */ + rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT); + /* NB this is just a boolean..........................^ */ set_fs (oldmm); if (rc <= 0) return (rc); /* received something... */ + nob = rc; + conn->ksnc_peer->ksnp_last_alive = jiffies; conn->ksnc_rx_deadline = jiffies + ksocknal_tunables.ksnd_io_timeout * HZ; mb(); /* order with setting rx_started */ conn->ksnc_rx_started = 1; - conn->ksnc_rx_nob_wanted -= rc; - conn->ksnc_rx_nob_left -= rc; + conn->ksnc_rx_nob_wanted -= nob; + conn->ksnc_rx_nob_left -= nob; + + do { + LASSERT (conn->ksnc_rx_niov > 0); - if (rc < fragsize) { - iov->iov_base = (void *)(vaddr + rc); - iov->iov_len = fragsize - rc; - return (-EAGAIN); - } + if (nob < iov->iov_len) { + iov->iov_len -= nob; + iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob); + return (-EAGAIN); + } + + nob -= iov->iov_len; + conn->ksnc_rx_iov = ++iov; + conn->ksnc_rx_niov--; + } while (nob != 0); - conn->ksnc_rx_iov++; - conn->ksnc_rx_niov--; - return (1); + return (rc); } int ksocknal_recv_kiov (ksock_conn_t *conn) { +#if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK + struct iovec scratch; + struct iovec *scratchiov = &scratch; + int niov = 1; +#else +#warning "XXX risk of kmap deadlock on multiple frags..." + struct iovec *scratchiov = conn->ksnc_rx_scratch_iov; + int niov = conn->ksnc_rx_nkiov; +#endif ptl_kiov_t *kiov = conn->ksnc_rx_kiov; - struct page *page = kiov->kiov_page; - int offset = kiov->kiov_offset; - int fragsize = kiov->kiov_len; - unsigned long vaddr = ((unsigned long)kmap (page)) + offset; - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; struct msghdr msg = { .msg_name = NULL, .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, + .msg_iov = scratchiov, + .msg_iovlen = niov, .msg_control = NULL, .msg_controllen = 0, .msg_flags = 0 }; mm_segment_t oldmm = get_fs(); + int nob; + int i; int rc; - /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only receive 1 frag at a time. */ - LASSERT (fragsize <= conn->ksnc_rx_nob_wanted); LASSERT (conn->ksnc_rx_nkiov > 0); - LASSERT (offset + fragsize <= PAGE_SIZE); + + /* NB we can't trust socket ops to either consume our iovs + * or leave them alone. */ + for (nob = i = 0; i < niov; i++) { + scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset; + nob += scratchiov[i].iov_len = kiov[i].kiov_len; + } + LASSERT (nob <= conn->ksnc_rx_nob_wanted); set_fs (KERNEL_DS); - rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT); - /* NB this is just a boolean............................^ */ + rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT); + /* NB this is just a boolean.......................^ */ set_fs (oldmm); - kunmap (page); - + for (i = 0; i < niov; i++) + kunmap(kiov[i].kiov_page); + if (rc <= 0) return (rc); /* received something... */ + nob = rc; + conn->ksnc_peer->ksnp_last_alive = jiffies; conn->ksnc_rx_deadline = jiffies + ksocknal_tunables.ksnd_io_timeout * HZ; mb(); /* order with setting rx_started */ conn->ksnc_rx_started = 1; - conn->ksnc_rx_nob_wanted -= rc; - conn->ksnc_rx_nob_left -= rc; + conn->ksnc_rx_nob_wanted -= nob; + conn->ksnc_rx_nob_left -= nob; + + do { + LASSERT (conn->ksnc_rx_nkiov > 0); - if (rc < fragsize) { - kiov->kiov_offset = offset + rc; - kiov->kiov_len = fragsize - rc; - return (-EAGAIN); - } + if (nob < kiov->kiov_len) { + kiov->kiov_offset += nob; + kiov->kiov_len -= nob; + return -EAGAIN; + } + + nob -= kiov->kiov_len; + conn->ksnc_rx_kiov = ++kiov; + conn->ksnc_rx_nkiov--; + } while (nob != 0); - conn->ksnc_rx_kiov++; - conn->ksnc_rx_nkiov--; - return (1); + return 1; } int @@ -599,6 +682,12 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) return (rc); if (rc == -ENOMEM) { + static int counter; + + counter++; /* exponential backoff warnings */ + if ((counter & (-counter)) == counter) + CWARN("%d ENOMEM tx %p\n", counter, conn); + /* Queue on ksnd_enomem_conns for retry after a timeout */ spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags); @@ -1853,12 +1942,11 @@ ksocknal_write_space (struct sock *sk) } if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */ - clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); - sched = conn->ksnc_scheduler; spin_lock_irqsave (&sched->kss_lock, flags); + clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); conn->ksnc_tx_ready = 1; if (!conn->ksnc_tx_scheduled && // not being progressed -- 1.8.3.1