X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fsocklnd%2Fsocklnd_cb.c;h=2eed8b109ab1f3f6d5a00e42bce249e490d899c0;hp=5815d16f0b4bbd7b2cf4aff7d6d682ba4a5043d7;hb=84f3af43c4bdeb1744736f44cd746dd4b6e8fa6d;hpb=23de47e82bd999ec651f927097922413527cca71 diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index 5815d16..2eed8b1 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -1,13 +1,14 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: +/* + * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. + * + * Copyright (c) 2011, 2017, Intel Corporation. * - * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * Author: Zach Brown * Author: Peter J. Braam * Author: Phil Schwan * Author: Eric Barton * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ + * This file is part of Lustre, https://wiki.hpdd.intel.com/ * * Portals is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public @@ -23,221 +24,181 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#include "socknal.h" -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) -# include -#endif +#include "socklnd.h" -/* - * LIB functions follow - * - */ -int -ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist) +struct ksock_tx * +ksocknal_alloc_tx(int type, int size) { - /* I would guess that if ksocknal_get_peer (nid) == NULL, - and we're not routing, then 'nid' is very distant :) */ - if (nal->libnal_ni.ni_pid.nid == nid) { - *dist = 0; - } else { - *dist = 1; + struct ksock_tx *tx = NULL; + + if (type == KSOCK_MSG_NOOP) { + LASSERT(size == KSOCK_NOOP_TX_SIZE); + + /* searching for a noop tx in free list */ + spin_lock(&ksocknal_data.ksnd_tx_lock); + + if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) { + tx = list_entry(ksocknal_data.ksnd_idle_noop_txs.next, + struct ksock_tx, tx_list); + LASSERT(tx->tx_desc_size == size); + list_del(&tx->tx_list); + } + + spin_unlock(&ksocknal_data.ksnd_tx_lock); } - return 0; + if (tx == NULL) + LIBCFS_ALLOC(tx, size); + + if (tx == NULL) + return NULL; + + atomic_set(&tx->tx_refcount, 1); + tx->tx_zc_aborted = 0; + tx->tx_zc_capable = 0; + tx->tx_zc_checked = 0; + tx->tx_hstatus = LNET_MSG_STATUS_OK; + tx->tx_desc_size = size; + + atomic_inc(&ksocknal_data.ksnd_nactive_txs); + + return tx; } -void -ksocknal_free_ltx (ksock_ltx_t *ltx) +struct ksock_tx * +ksocknal_alloc_tx_noop(__u64 cookie, int nonblk) { - atomic_dec(&ksocknal_data.ksnd_nactive_ltxs); - PORTAL_FREE(ltx, ltx->ltx_desc_size); + struct ksock_tx *tx; + + tx = ksocknal_alloc_tx(KSOCK_MSG_NOOP, KSOCK_NOOP_TX_SIZE); + if (tx == NULL) { + CERROR("Can't allocate noop tx desc\n"); + return NULL; + } + + tx->tx_conn = NULL; + tx->tx_lnetmsg = NULL; + tx->tx_kiov = NULL; + tx->tx_nkiov = 0; + tx->tx_iov = tx->tx_frags.virt.iov; + tx->tx_niov = 1; + tx->tx_nonblk = nonblk; + + tx->tx_msg.ksm_csum = 0; + tx->tx_msg.ksm_type = KSOCK_MSG_NOOP; + tx->tx_msg.ksm_zc_cookies[0] = 0; + tx->tx_msg.ksm_zc_cookies[1] = cookie; + + return tx; } -#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) -struct page * -ksocknal_kvaddr_to_page (unsigned long vaddr) + +void +ksocknal_free_tx(struct ksock_tx *tx) { - struct page *page; - - if (vaddr >= VMALLOC_START && - vaddr < VMALLOC_END) - page = vmalloc_to_page ((void *)vaddr); -#if CONFIG_HIGHMEM - else if (vaddr >= PKMAP_BASE && - vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) - page = vmalloc_to_page ((void *)vaddr); - /* in 2.4 ^ just walks the page tables */ -#endif - else - page = virt_to_page (vaddr); + atomic_dec(&ksocknal_data.ksnd_nactive_txs); + + if (tx->tx_lnetmsg == NULL && tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) { + /* it's a noop tx */ + spin_lock(&ksocknal_data.ksnd_tx_lock); - if (page == NULL || - !VALID_PAGE (page)) - return (NULL); + list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs); - return (page); + spin_unlock(&ksocknal_data.ksnd_tx_lock); + } else { + LIBCFS_FREE(tx, tx->tx_desc_size); + } } -#endif -int -ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) +static int +ksocknal_send_iov(struct ksock_conn *conn, struct ksock_tx *tx) { - struct socket *sock = conn->ksnc_sock; - struct iovec *iov = tx->tx_iov; - int fragsize = iov->iov_len; - unsigned long vaddr = (unsigned long)iov->iov_base; - int more = (tx->tx_niov > 1) || - (tx->tx_nkiov > 0) || - (!list_empty (&conn->ksnc_tx_queue)); -#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) - int offset = vaddr & (PAGE_SIZE - 1); - int zcsize = MIN (fragsize, PAGE_SIZE - offset); - struct page *page; -#endif - int rc; + struct kvec *iov = tx->tx_iov; + int nob; + int rc; - /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only send 1 frag at a time. */ - LASSERT (fragsize <= tx->tx_resid); LASSERT (tx->tx_niov > 0); - -#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) - if (zcsize >= ksocknal_data.ksnd_zc_min_frag && - (sock->sk->route_caps & NETIF_F_SG) && - (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) && - (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) { - - CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n", - (void *)vaddr, page, page_address(page), offset, zcsize); - - if (fragsize > zcsize) { - more = 1; - fragsize = zcsize; - } - rc = tcp_sendpage_zccd(sock, page, offset, zcsize, - more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT, - &tx->tx_zccd); - } else -#endif - { - /* NB don't pass tx's iov; sendmsg may or may not update it */ - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; - struct msghdr msg = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, - .msg_control = NULL, - .msg_controllen = 0, - .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT - }; - mm_segment_t oldmm = get_fs(); - - set_fs (KERNEL_DS); - rc = sock_sendmsg(sock, &msg, fragsize); - set_fs (oldmm); - } - - if (rc > 0) { - tx->tx_resid -= rc; - - if (rc < iov->iov_len) { - /* didn't send whole iov entry... */ - iov->iov_base = (void *)(vaddr + rc); - iov->iov_len -= rc; - } else { - tx->tx_iov++; - tx->tx_niov--; + /* Never touch tx->tx_iov inside ksocknal_lib_send_iov() */ + rc = ksocknal_lib_send_iov(conn, tx); + + if (rc <= 0) /* sent nothing? */ + return (rc); + + nob = rc; + LASSERT (nob <= tx->tx_resid); + tx->tx_resid -= nob; + + /* "consume" iov */ + do { + LASSERT (tx->tx_niov > 0); + + if (nob < (int) iov->iov_len) { + iov->iov_base += nob; + iov->iov_len -= nob; + return (rc); } - } - + + nob -= iov->iov_len; + tx->tx_iov = ++iov; + tx->tx_niov--; + } while (nob != 0); + return (rc); } -int -ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx) +static int +ksocknal_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx) { - struct socket *sock = conn->ksnc_sock; - ptl_kiov_t *kiov = tx->tx_kiov; - int fragsize = kiov->kiov_len; - struct page *page = kiov->kiov_page; - int offset = kiov->kiov_offset; - int more = (tx->tx_nkiov > 1) || - (!list_empty (&conn->ksnc_tx_queue)); - int rc; - - /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only send 1 frag at a time. */ - LASSERT (fragsize <= tx->tx_resid); - LASSERT (offset + fragsize <= PAGE_SIZE); + lnet_kiov_t *kiov = tx->tx_kiov; + int nob; + int rc; + LASSERT (tx->tx_niov == 0); LASSERT (tx->tx_nkiov > 0); -#if SOCKNAL_ZC - if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag && - (sock->sk->route_caps & NETIF_F_SG) && - (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) { + /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */ + rc = ksocknal_lib_send_kiov(conn, tx); - CDEBUG(D_NET, "page %p + offset %x for %d\n", - page, offset, fragsize); + if (rc <= 0) /* sent nothing? */ + return (rc); - rc = tcp_sendpage_zccd(sock, page, offset, fragsize, - more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT, - &tx->tx_zccd); - } else -#endif - { - char *addr = ((char *)kmap (page)) + offset; - struct iovec fragiov = {.iov_base = addr, - .iov_len = fragsize}; - struct msghdr msg = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, - .msg_control = NULL, - .msg_controllen = 0, - .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT - }; - mm_segment_t oldmm = get_fs(); - - set_fs (KERNEL_DS); - rc = sock_sendmsg(sock, &msg, fragsize); - set_fs (oldmm); - - kunmap (page); - } + nob = rc; + LASSERT (nob <= tx->tx_resid); + tx->tx_resid -= nob; - if (rc > 0) { - tx->tx_resid -= rc; - - if (rc < fragsize) { - kiov->kiov_offset = offset + rc; - kiov->kiov_len = fragsize - rc; - } else { - tx->tx_kiov++; - tx->tx_nkiov--; + /* "consume" kiov */ + do { + LASSERT(tx->tx_nkiov > 0); + + if (nob < (int)kiov->kiov_len) { + kiov->kiov_offset += nob; + kiov->kiov_len -= nob; + return rc; } - } + + nob -= (int)kiov->kiov_len; + tx->tx_kiov = ++kiov; + tx->tx_nkiov--; + } while (nob != 0); return (rc); } -int -ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx) +static int +ksocknal_transmit(struct ksock_conn *conn, struct ksock_tx *tx) { - int rc; - - if (ksocknal_data.ksnd_stall_tx != 0) { - set_current_state (TASK_UNINTERRUPTIBLE); - schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ); - } + int rc; + int bufnob; - LASSERT (tx->tx_resid != 0); + if (ksocknal_data.ksnd_stall_tx != 0) { + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_tx)); + } - rc = ksocknal_getconnsock (conn); + LASSERT(tx->tx_resid != 0); + + rc = ksocknal_connsock_addref(conn); if (rc != 0) { LASSERT (conn->ksnc_closing); return (-ESHUTDOWN); @@ -254,202 +215,146 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx) rc = ksocknal_send_kiov (conn, tx); } - if (rc <= 0) { - /* Didn't write anything. - * - * NB: rc == 0 and rc == -EAGAIN both mean try - * again later (linux stack returns -EAGAIN for - * this, but Adaptech TOE returns 0). - * - * Also, sends never fail with -ENOMEM, just - * -EAGAIN, but with the added bonus that we can't - * expect write_space() to call us back to tell us - * when to try sending again. We use the - * SOCK_NOSPACE flag to diagnose... */ - - LASSERT(rc != -ENOMEM); - - if (rc == 0 || rc == -EAGAIN) { - if (test_bit(SOCK_NOSPACE, - &conn->ksnc_sock->flags)) { - rc = -EAGAIN; - } else { - static int counter; - - counter++; - if ((counter & (-counter)) == counter) - CWARN("%d ENOMEM tx %p\n", - counter, conn); - rc = -ENOMEM; - } - } + bufnob = conn->ksnc_sock->sk->sk_wmem_queued; + if (rc > 0) /* sent something? */ + conn->ksnc_tx_bufnob += rc; /* account it */ + + if (bufnob < conn->ksnc_tx_bufnob) { + /* allocated send buffer bytes < computed; infer + * something got ACKed */ + conn->ksnc_tx_deadline = ktime_get_seconds() + + lnet_get_lnd_timeout(); + conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds(); + conn->ksnc_tx_bufnob = bufnob; + smp_mb(); + } + + if (rc <= 0) { /* Didn't write anything? */ + + if (rc == 0) /* some stacks return 0 instead of -EAGAIN */ + rc = -EAGAIN; + + /* Check if EAGAIN is due to memory pressure */ + if(rc == -EAGAIN && ksocknal_lib_memory_pressure(conn)) + rc = -ENOMEM; + break; } + /* socket's wmem_queued now includes 'rc' bytes */ + atomic_sub (rc, &conn->ksnc_tx_nob); rc = 0; - /* Consider the connection alive since we managed to chuck - * more data into it. Really, we'd like to consider it - * alive only when the peer ACKs something, but - * write_space() only gets called back while SOCK_NOSPACE - * is set. Instead, we presume peer death has occurred if - * the socket doesn't drain within a timout */ - conn->ksnc_tx_deadline = jiffies + - ksocknal_tunables.ksnd_io_timeout * HZ; - conn->ksnc_peer->ksnp_last_alive = jiffies; - } while (tx->tx_resid != 0); - ksocknal_putconnsock (conn); + ksocknal_connsock_decref(conn); return (rc); } -void -ksocknal_eager_ack (ksock_conn_t *conn) +static int +ksocknal_recv_iov(struct ksock_conn *conn) { - int opt = 1; - mm_segment_t oldmm = get_fs(); - struct socket *sock = conn->ksnc_sock; - - /* Remind the socket to ACK eagerly. If I don't, the socket might - * think I'm about to send something it could piggy-back the ACK - * on, introducing delay in completing zero-copy sends in my - * peer. */ - - set_fs(KERNEL_DS); - sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK, - (char *)&opt, sizeof (opt)); - set_fs(oldmm); -} + struct kvec *iov = conn->ksnc_rx_iov; + int nob; + int rc; -int -ksocknal_recv_iov (ksock_conn_t *conn) -{ - struct iovec *iov = conn->ksnc_rx_iov; - int fragsize = iov->iov_len; - unsigned long vaddr = (unsigned long)iov->iov_base; - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; - struct msghdr msg = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, - .msg_control = NULL, - .msg_controllen = 0, - .msg_flags = 0 - }; - mm_segment_t oldmm = get_fs(); - int rc; - - /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only receive 1 frag at a time. */ LASSERT (conn->ksnc_rx_niov > 0); - LASSERT (fragsize <= conn->ksnc_rx_nob_wanted); - set_fs (KERNEL_DS); - rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT); - /* NB this is just a boolean............................^ */ - set_fs (oldmm); + /* Never touch conn->ksnc_rx_iov or change connection + * status inside ksocknal_lib_recv_iov */ + rc = ksocknal_lib_recv_iov(conn); if (rc <= 0) return (rc); /* received something... */ - conn->ksnc_peer->ksnp_last_alive = jiffies; - conn->ksnc_rx_deadline = jiffies + - ksocknal_tunables.ksnd_io_timeout * HZ; - mb(); /* order with setting rx_started */ - conn->ksnc_rx_started = 1; - - conn->ksnc_rx_nob_wanted -= rc; - conn->ksnc_rx_nob_left -= rc; - - if (rc < fragsize) { - iov->iov_base = (void *)(vaddr + rc); - iov->iov_len = fragsize - rc; - return (-EAGAIN); - } + nob = rc; - conn->ksnc_rx_iov++; - conn->ksnc_rx_niov--; - return (1); + conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds(); + conn->ksnc_rx_deadline = ktime_get_seconds() + + lnet_get_lnd_timeout(); + smp_mb(); /* order with setting rx_started */ + conn->ksnc_rx_started = 1; + + conn->ksnc_rx_nob_wanted -= nob; + conn->ksnc_rx_nob_left -= nob; + + do { + LASSERT (conn->ksnc_rx_niov > 0); + + if (nob < (int)iov->iov_len) { + iov->iov_len -= nob; + iov->iov_base += nob; + return (-EAGAIN); + } + + nob -= iov->iov_len; + conn->ksnc_rx_iov = ++iov; + conn->ksnc_rx_niov--; + } while (nob != 0); + + return (rc); } -int -ksocknal_recv_kiov (ksock_conn_t *conn) +static int +ksocknal_recv_kiov(struct ksock_conn *conn) { - ptl_kiov_t *kiov = conn->ksnc_rx_kiov; - struct page *page = kiov->kiov_page; - int offset = kiov->kiov_offset; - int fragsize = kiov->kiov_len; - unsigned long vaddr = ((unsigned long)kmap (page)) + offset; - struct iovec fragiov = { .iov_base = (void *)vaddr, - .iov_len = fragsize}; - struct msghdr msg = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = &fragiov, - .msg_iovlen = 1, - .msg_control = NULL, - .msg_controllen = 0, - .msg_flags = 0 - }; - mm_segment_t oldmm = get_fs(); - int rc; - - /* NB we can't trust socket ops to either consume our iovs - * or leave them alone, so we only receive 1 frag at a time. */ - LASSERT (fragsize <= conn->ksnc_rx_nob_wanted); + lnet_kiov_t *kiov = conn->ksnc_rx_kiov; + int nob; + int rc; LASSERT (conn->ksnc_rx_nkiov > 0); - LASSERT (offset + fragsize <= PAGE_SIZE); - set_fs (KERNEL_DS); - rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT); - /* NB this is just a boolean............................^ */ - set_fs (oldmm); + /* Never touch conn->ksnc_rx_kiov or change connection + * status inside ksocknal_lib_recv_iov */ + rc = ksocknal_lib_recv_kiov(conn); - kunmap (page); - if (rc <= 0) return (rc); - + /* received something... */ - conn->ksnc_peer->ksnp_last_alive = jiffies; - conn->ksnc_rx_deadline = jiffies + - ksocknal_tunables.ksnd_io_timeout * HZ; - mb(); /* order with setting rx_started */ - conn->ksnc_rx_started = 1; - - conn->ksnc_rx_nob_wanted -= rc; - conn->ksnc_rx_nob_left -= rc; - - if (rc < fragsize) { - kiov->kiov_offset = offset + rc; - kiov->kiov_len = fragsize - rc; - return (-EAGAIN); - } + nob = rc; + + conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds(); + conn->ksnc_rx_deadline = ktime_get_seconds() + + lnet_get_lnd_timeout(); + smp_mb(); /* order with setting rx_started */ + conn->ksnc_rx_started = 1; + + conn->ksnc_rx_nob_wanted -= nob; + conn->ksnc_rx_nob_left -= nob; + + do { + LASSERT (conn->ksnc_rx_nkiov > 0); + + if (nob < (int) kiov->kiov_len) { + kiov->kiov_offset += nob; + kiov->kiov_len -= nob; + return -EAGAIN; + } + + nob -= kiov->kiov_len; + conn->ksnc_rx_kiov = ++kiov; + conn->ksnc_rx_nkiov--; + } while (nob != 0); - conn->ksnc_rx_kiov++; - conn->ksnc_rx_nkiov--; - return (1); + return 1; } -int -ksocknal_receive (ksock_conn_t *conn) +static int +ksocknal_receive(struct ksock_conn *conn) { /* Return 1 on success, 0 on EOF, < 0 on error. * Caller checks ksnc_rx_nob_wanted to determine * progress/completion. */ int rc; ENTRY; - - if (ksocknal_data.ksnd_stall_rx != 0) { - set_current_state (TASK_UNINTERRUPTIBLE); - schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ); - } - rc = ksocknal_getconnsock (conn); + if (ksocknal_data.ksnd_stall_rx != 0) { + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_rx)); + } + + rc = ksocknal_connsock_addref(conn); if (rc != 0) { LASSERT (conn->ksnc_closing); return (-ESHUTDOWN); @@ -475,109 +380,163 @@ ksocknal_receive (ksock_conn_t *conn) /* Completed a fragment */ if (conn->ksnc_rx_nob_wanted == 0) { - /* Completed a message segment (header or payload) */ - if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 && - (conn->ksnc_rx_state == SOCKNAL_RX_BODY || - conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) { - /* Remind the socket to ack eagerly... */ - ksocknal_eager_ack(conn); - } rc = 1; break; } } - ksocknal_putconnsock (conn); + ksocknal_connsock_decref(conn); RETURN (rc); } -#if SOCKNAL_ZC void -ksocknal_zc_callback (zccd_t *zcd) +ksocknal_tx_done(struct lnet_ni *ni, struct ksock_tx *tx, int rc) { - ksock_tx_t *tx = KSOCK_ZCCD_2_TX(zcd); - ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler; - unsigned long flags; + struct lnet_msg *lnetmsg = tx->tx_lnetmsg; + enum lnet_msg_hstatus hstatus = tx->tx_hstatus; ENTRY; - /* Schedule tx for cleanup (can't do it now due to lock conflicts) */ + LASSERT(ni != NULL || tx->tx_conn != NULL); - spin_lock_irqsave (&sched->kss_lock, flags); + if (!rc && (tx->tx_resid != 0 || tx->tx_zc_aborted)) { + rc = -EIO; + hstatus = LNET_MSG_STATUS_LOCAL_ERROR; + } - list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list); - wake_up (&sched->kss_waitq); + if (tx->tx_conn != NULL) + ksocknal_conn_decref(tx->tx_conn); - spin_unlock_irqrestore (&sched->kss_lock, flags); - EXIT; + ksocknal_free_tx(tx); + if (lnetmsg != NULL) { /* KSOCK_MSG_NOOP go without lnetmsg */ + if (rc) + CERROR("tx failure rc = %d, hstatus = %d\n", rc, + hstatus); + lnetmsg->msg_health_status = hstatus; + lnet_finalize(lnetmsg, rc); + } + + EXIT; } -#endif void -ksocknal_tx_done (ksock_tx_t *tx, int asynch) +ksocknal_txlist_done(struct lnet_ni *ni, struct list_head *txlist, int error) { - ksock_ltx_t *ltx; - ENTRY; - - if (tx->tx_conn != NULL) { - /* This tx got queued on a conn; do the accounting... */ - atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob); -#if SOCKNAL_ZC - /* zero copy completion isn't always from - * process_transmit() so it needs to keep a ref on - * tx_conn... */ - if (asynch) - ksocknal_put_conn (tx->tx_conn); -#else - LASSERT (!asynch); -#endif - } + struct ksock_tx *tx; + + while (!list_empty(txlist)) { + tx = list_entry(txlist->next, struct ksock_tx, tx_list); + + if (error && tx->tx_lnetmsg != NULL) { + CNETERR("Deleting packet type %d len %d %s->%s\n", + le32_to_cpu(tx->tx_lnetmsg->msg_hdr.type), + le32_to_cpu(tx->tx_lnetmsg->msg_hdr.payload_length), + libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)), + libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.dest_nid))); + } else if (error) { + CNETERR("Deleting noop packet\n"); + } + + list_del(&tx->tx_list); + + if (tx->tx_hstatus == LNET_MSG_STATUS_OK) { + if (error == -ETIMEDOUT) + tx->tx_hstatus = + LNET_MSG_STATUS_LOCAL_TIMEOUT; + else if (error == -ENETDOWN || + error == -EHOSTUNREACH || + error == -ENETUNREACH) + tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_DROPPED; + /* + * for all other errors we don't want to + * retransmit + */ + else if (error) + tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR; + } + + LASSERT(atomic_read(&tx->tx_refcount) == 1); + ksocknal_tx_done(ni, tx, error); + } +} - if (tx->tx_isfwd) { /* was a forwarded packet? */ - kpr_fwd_done (&ksocknal_data.ksnd_router, - KSOCK_TX_2_KPR_FWD_DESC (tx), - (tx->tx_resid == 0) ? 0 : -ECONNABORTED); - EXIT; +static void +ksocknal_check_zc_req(struct ksock_tx *tx) +{ + struct ksock_conn *conn = tx->tx_conn; + struct ksock_peer_ni *peer_ni = conn->ksnc_peer; + + /* Set tx_msg.ksm_zc_cookies[0] to a unique non-zero cookie and add tx + * to ksnp_zc_req_list if some fragment of this message should be sent + * zero-copy. Our peer_ni will send an ACK containing this cookie when + * she has received this message to tell us we can signal completion. + * tx_msg.ksm_zc_cookies[0] remains non-zero while tx is on + * ksnp_zc_req_list. */ + LASSERT (tx->tx_msg.ksm_type != KSOCK_MSG_NOOP); + LASSERT (tx->tx_zc_capable); + + tx->tx_zc_checked = 1; + + if (conn->ksnc_proto == &ksocknal_protocol_v1x || + !conn->ksnc_zc_capable) return; - } - /* local send */ - ltx = KSOCK_TX_2_KSOCK_LTX (tx); + /* assign cookie and queue tx to pending list, it will be released when + * a matching ack is received. See ksocknal_handle_zcack() */ + + ksocknal_tx_addref(tx); - lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie, - (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL); + spin_lock(&peer_ni->ksnp_lock); - ksocknal_free_ltx (ltx); - EXIT; + /* ZC_REQ is going to be pinned to the peer_ni */ + tx->tx_deadline = ktime_get_seconds() + + lnet_get_lnd_timeout(); + + LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0); + + tx->tx_msg.ksm_zc_cookies[0] = peer_ni->ksnp_zc_next_cookie++; + + if (peer_ni->ksnp_zc_next_cookie == 0) + peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1; + + list_add_tail(&tx->tx_zc_list, &peer_ni->ksnp_zc_req_list); + + spin_unlock(&peer_ni->ksnp_lock); } -void -ksocknal_tx_launched (ksock_tx_t *tx) +static void +ksocknal_uncheck_zc_req(struct ksock_tx *tx) { -#if SOCKNAL_ZC - if (atomic_read (&tx->tx_zccd.zccd_count) != 1) { - ksock_conn_t *conn = tx->tx_conn; - - /* zccd skbufs are still in-flight. First take a ref on - * conn, so it hangs about for ksocknal_tx_done... */ - atomic_inc (&conn->ksnc_refcount); - - /* ...then drop the initial ref on zccd, so the zero copy - * callback can occur */ - zccd_put (&tx->tx_zccd); - return; - } -#endif - /* Any zero-copy-ness (if any) has completed; I can complete the - * transmit now, avoiding an extra schedule */ - ksocknal_tx_done (tx, 0); + struct ksock_peer_ni *peer_ni = tx->tx_conn->ksnc_peer; + + LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP); + LASSERT(tx->tx_zc_capable); + + tx->tx_zc_checked = 0; + + spin_lock(&peer_ni->ksnp_lock); + + if (tx->tx_msg.ksm_zc_cookies[0] == 0) { + /* Not waiting for an ACK */ + spin_unlock(&peer_ni->ksnp_lock); + return; + } + + tx->tx_msg.ksm_zc_cookies[0] = 0; + list_del(&tx->tx_zc_list); + + spin_unlock(&peer_ni->ksnp_lock); + + ksocknal_tx_decref(tx); } -int -ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) +static int +ksocknal_process_transmit(struct ksock_conn *conn, struct ksock_tx *tx) { - unsigned long flags; - int rc; - + int rc; + + if (tx->tx_zc_capable && !tx->tx_zc_checked) + ksocknal_check_zc_req(tx); + rc = ksocknal_transmit (conn, tx); CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc); @@ -586,7 +545,6 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) /* Sent everything OK */ LASSERT (rc == 0); - ksocknal_tx_launched (tx); return (0); } @@ -594,787 +552,582 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) return (rc); if (rc == -ENOMEM) { + static int counter; + + counter++; /* exponential backoff warnings */ + if ((counter & (-counter)) == counter) + CWARN("%u ENOMEM tx %p (%u allocated)\n", + counter, conn, atomic_read(&libcfs_kmemory)); + /* Queue on ksnd_enomem_conns for retry after a timeout */ - spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags); + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); /* enomem list takes over scheduler's ref... */ LASSERT (conn->ksnc_tx_scheduled); - list_add_tail(&conn->ksnc_tx_list, - &ksocknal_data.ksnd_enomem_conns); - if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY, - ksocknal_data.ksnd_reaper_waketime)) - wake_up (&ksocknal_data.ksnd_reaper_waitq); - - spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags); - return (rc); - } - - /* Actual error */ - LASSERT (rc < 0); - - if (!conn->ksnc_closing) - CERROR("[%p] Error %d on write to "LPX64 - " ip %d.%d.%d.%d:%d\n", conn, rc, - conn->ksnc_peer->ksnp_nid, - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); - - ksocknal_close_conn_and_siblings (conn, rc); - ksocknal_tx_launched (tx); - - return (rc); + list_add_tail(&conn->ksnc_tx_list, + &ksocknal_data.ksnd_enomem_conns); + if (ktime_get_seconds() + SOCKNAL_ENOMEM_RETRY < + ksocknal_data.ksnd_reaper_waketime) + wake_up(&ksocknal_data.ksnd_reaper_waitq); + + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); + + /* + * set the health status of the message which determines + * whether we should retry the transmit + */ + tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR; + return (rc); + } + + /* Actual error */ + LASSERT(rc < 0); + + /* + * set the health status of the message which determines + * whether we should retry the transmit + */ + if (rc == -ETIMEDOUT) + tx->tx_hstatus = LNET_MSG_STATUS_REMOTE_TIMEOUT; + else + tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR; + + if (!conn->ksnc_closing) { + switch (rc) { + case -ECONNRESET: + LCONSOLE_WARN("Host %pI4h reset our connection " + "while we were sending data; it may have " + "rebooted.\n", + &conn->ksnc_ipaddr); + break; + default: + LCONSOLE_WARN("There was an unexpected network error " + "while writing to %pI4h: %d.\n", + &conn->ksnc_ipaddr, rc); + break; + } + CDEBUG(D_NET, "[%p] Error %d on write to %s ip %pI4h:%d\n", + conn, rc, libcfs_id2str(conn->ksnc_peer->ksnp_id), + &conn->ksnc_ipaddr, conn->ksnc_port); + } + + if (tx->tx_zc_checked) + ksocknal_uncheck_zc_req(tx); + + /* it's not an error if conn is being closed */ + ksocknal_close_conn_and_siblings(conn, + (conn->ksnc_closing) ? 0 : rc); + + return rc; } -void -ksocknal_launch_autoconnect_locked (ksock_route_t *route) +static void +ksocknal_launch_connection_locked(struct ksock_route *route) { - unsigned long flags; /* called holding write lock on ksnd_global_lock */ - LASSERT (!route->ksnr_deleted); - LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0); - LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES); + LASSERT (!route->ksnr_scheduled); LASSERT (!route->ksnr_connecting); - - if (ksocknal_tunables.ksnd_typed_conns) - route->ksnr_connecting = - KSNR_TYPED_ROUTES & ~route->ksnr_connected; - else - route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY); - - atomic_inc (&route->ksnr_refcount); /* extra ref for asynchd */ - - spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags); - - list_add_tail (&route->ksnr_connect_list, - &ksocknal_data.ksnd_autoconnectd_routes); - wake_up (&ksocknal_data.ksnd_autoconnectd_waitq); - - spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags); + LASSERT ((ksocknal_route_mask() & ~route->ksnr_connected) != 0); + + route->ksnr_scheduled = 1; /* scheduling conn for connd */ + ksocknal_route_addref(route); /* extra ref for connd */ + + spin_lock_bh(&ksocknal_data.ksnd_connd_lock); + + list_add_tail(&route->ksnr_connd_list, + &ksocknal_data.ksnd_connd_routes); + wake_up(&ksocknal_data.ksnd_connd_waitq); + + spin_unlock_bh(&ksocknal_data.ksnd_connd_lock); } -ksock_peer_t * -ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid) +void +ksocknal_launch_all_connections_locked(struct ksock_peer_ni *peer_ni) { - char ipbuf[PTL_NALFMT_SIZE]; - ptl_nid_t target_nid; - int rc; - ksock_peer_t *peer = ksocknal_find_peer_locked (nid); - - if (peer != NULL) - return (peer); - - if (tx->tx_isfwd) { - CERROR ("Can't send packet to "LPX64 - " %s: routed target is not a peer\n", - nid, portals_nid2str(SOCKNAL, nid, ipbuf)); - return (NULL); - } - - rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob, - &target_nid); - if (rc != 0) { - CERROR ("Can't route to "LPX64" %s: router error %d\n", - nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc); - return (NULL); - } + struct ksock_route *route; - peer = ksocknal_find_peer_locked (target_nid); - if (peer != NULL) - return (peer); + /* called holding write lock on ksnd_global_lock */ + for (;;) { + /* launch any/all connections that need it */ + route = ksocknal_find_connectable_route_locked(peer_ni); + if (route == NULL) + return; - CERROR ("Can't send packet to "LPX64" %s: no peer entry\n", - target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf)); - return (NULL); + ksocknal_launch_connection_locked(route); + } } -ksock_conn_t * -ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) +struct ksock_conn * +ksocknal_find_conn_locked(struct ksock_peer_ni *peer_ni, struct ksock_tx *tx, int nonblk) { - struct list_head *tmp; - ksock_conn_t *typed = NULL; - int tnob = 0; - ksock_conn_t *fallback = NULL; - int fnob = 0; - - /* Find the conn with the shortest tx queue */ - list_for_each (tmp, &peer->ksnp_conns) { - ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list); - int nob = atomic_read(&c->ksnc_tx_nob) + - c->ksnc_sock->sk->sk_wmem_queued; + struct list_head *tmp; + struct ksock_conn *conn; + struct ksock_conn *typed = NULL; + struct ksock_conn *fallback = NULL; + int tnob = 0; + int fnob = 0; + + list_for_each(tmp, &peer_ni->ksnp_conns) { + struct ksock_conn *c = list_entry(tmp, struct ksock_conn, + ksnc_list); + int nob = atomic_read(&c->ksnc_tx_nob) + + c->ksnc_sock->sk->sk_wmem_queued; + int rc; LASSERT (!c->ksnc_closing); + LASSERT (c->ksnc_proto != NULL && + c->ksnc_proto->pro_match_tx != NULL); - if (fallback == NULL || nob < fnob) { - fallback = c; - fnob = nob; - } - - if (!ksocknal_tunables.ksnd_typed_conns) - continue; + rc = c->ksnc_proto->pro_match_tx(c, tx, nonblk); - switch (c->ksnc_type) { + switch (rc) { default: LBUG(); - case SOCKNAL_CONN_ANY: - break; - case SOCKNAL_CONN_BULK_IN: + case SOCKNAL_MATCH_NO: /* protocol rejected the tx */ continue; - case SOCKNAL_CONN_BULK_OUT: - if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk) - continue; - break; - case SOCKNAL_CONN_CONTROL: - if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk) - continue; + + case SOCKNAL_MATCH_YES: /* typed connection */ + if (typed == NULL || tnob > nob || + (tnob == nob && *ksocknal_tunables.ksnd_round_robin && + typed->ksnc_tx_last_post > c->ksnc_tx_last_post)) { + typed = c; + tnob = nob; + } break; - } - if (typed == NULL || nob < tnob) { - typed = c; - tnob = nob; + case SOCKNAL_MATCH_MAY: /* fallback connection */ + if (fallback == NULL || fnob > nob || + (fnob == nob && *ksocknal_tunables.ksnd_round_robin && + fallback->ksnc_tx_last_post > c->ksnc_tx_last_post)) { + fallback = c; + fnob = nob; + } + break; } } /* prefer the typed selection */ - return ((typed != NULL) ? typed : fallback); + conn = (typed != NULL) ? typed : fallback; + + if (conn != NULL) + conn->ksnc_tx_last_post = ktime_get_seconds(); + + return conn; +} + +void +ksocknal_tx_prep(struct ksock_conn *conn, struct ksock_tx *tx) +{ + conn->ksnc_proto->pro_pack(tx); + + atomic_add (tx->tx_nob, &conn->ksnc_tx_nob); + ksocknal_conn_addref(conn); /* +1 ref for tx */ + tx->tx_conn = conn; } void -ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn) +ksocknal_queue_tx_locked(struct ksock_tx *tx, struct ksock_conn *conn) { - unsigned long flags; - ksock_sched_t *sched = conn->ksnc_scheduler; + struct ksock_sched *sched = conn->ksnc_scheduler; + struct ksock_msg *msg = &tx->tx_msg; + struct ksock_tx *ztx = NULL; + int bufnob = 0; /* called holding global lock (read or irq-write) and caller may * not have dropped this lock between finding conn and calling me, * so we don't need the {get,put}connsock dance to deref * ksnc_sock... */ LASSERT(!conn->ksnc_closing); - LASSERT(tx->tx_resid == tx->tx_nob); - - CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n", - conn->ksnc_peer->ksnp_nid, - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); - - atomic_add (tx->tx_nob, &conn->ksnc_tx_nob); - tx->tx_conn = conn; -#if SOCKNAL_ZC - zccd_init (&tx->tx_zccd, ksocknal_zc_callback); - /* NB this sets 1 ref on zccd, so the callback can only occur after - * I've released this ref. */ -#endif - spin_lock_irqsave (&sched->kss_lock, flags); - - conn->ksnc_tx_deadline = jiffies + - ksocknal_tunables.ksnd_io_timeout * HZ; - mb(); /* order with list_add_tail */ - - list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); - - if (conn->ksnc_tx_ready && /* able to send */ - !conn->ksnc_tx_scheduled) { /* not scheduled to send */ - /* +1 ref for scheduler */ - atomic_inc (&conn->ksnc_refcount); - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); - conn->ksnc_tx_scheduled = 1; - wake_up (&sched->kss_waitq); + CDEBUG(D_NET, "Sending to %s ip %pI4h:%d\n", + libcfs_id2str(conn->ksnc_peer->ksnp_id), + &conn->ksnc_ipaddr, conn->ksnc_port); + + ksocknal_tx_prep(conn, tx); + + /* Ensure the frags we've been given EXACTLY match the number of + * bytes we want to send. Many TCP/IP stacks disregard any total + * size parameters passed to them and just look at the frags. + * + * We always expect at least 1 mapped fragment containing the + * complete ksocknal message header. */ + LASSERT (lnet_iov_nob (tx->tx_niov, tx->tx_iov) + + lnet_kiov_nob(tx->tx_nkiov, tx->tx_kiov) == + (unsigned int)tx->tx_nob); + LASSERT (tx->tx_niov >= 1); + LASSERT (tx->tx_resid == tx->tx_nob); + + CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n", + tx, (tx->tx_lnetmsg != NULL) ? tx->tx_lnetmsg->msg_hdr.type: + KSOCK_MSG_NOOP, + tx->tx_nob, tx->tx_niov, tx->tx_nkiov); + + bufnob = conn->ksnc_sock->sk->sk_wmem_queued; + spin_lock_bh(&sched->kss_lock); + + if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) { + /* First packet starts the timeout */ + conn->ksnc_tx_deadline = ktime_get_seconds() + + lnet_get_lnd_timeout(); + if (conn->ksnc_tx_bufnob > 0) /* something got ACKed */ + conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds(); + conn->ksnc_tx_bufnob = 0; + smp_mb(); /* order with adding to tx_queue */ + } + + if (msg->ksm_type == KSOCK_MSG_NOOP) { + /* The packet is noop ZC ACK, try to piggyback the ack_cookie + * on a normal packet so I don't need to send it */ + LASSERT (msg->ksm_zc_cookies[1] != 0); + LASSERT (conn->ksnc_proto->pro_queue_tx_zcack != NULL); + + if (conn->ksnc_proto->pro_queue_tx_zcack(conn, tx, 0)) + ztx = tx; /* ZC ACK piggybacked on ztx release tx later */ + + } else { + /* It's a normal packet - can it piggback a noop zc-ack that + * has been queued already? */ + LASSERT (msg->ksm_zc_cookies[1] == 0); + LASSERT (conn->ksnc_proto->pro_queue_tx_msg != NULL); + + ztx = conn->ksnc_proto->pro_queue_tx_msg(conn, tx); + /* ztx will be released later */ + } + + if (ztx != NULL) { + atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob); + list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs); } - spin_unlock_irqrestore (&sched->kss_lock, flags); + if (conn->ksnc_tx_ready && /* able to send */ + !conn->ksnc_tx_scheduled) { /* not scheduled to send */ + /* +1 ref for scheduler */ + ksocknal_conn_addref(conn); + list_add_tail(&conn->ksnc_tx_list, + &sched->kss_tx_conns); + conn->ksnc_tx_scheduled = 1; + wake_up(&sched->kss_waitq); + } + + spin_unlock_bh(&sched->kss_lock); } -ksock_route_t * -ksocknal_find_connectable_route_locked (ksock_peer_t *peer) + +struct ksock_route * +ksocknal_find_connectable_route_locked(struct ksock_peer_ni *peer_ni) { - struct list_head *tmp; - ksock_route_t *route; - ksock_route_t *first_lazy = NULL; - int found_connecting_or_connected = 0; - int bits; - - list_for_each (tmp, &peer->ksnp_routes) { - route = list_entry (tmp, ksock_route_t, ksnr_list); - bits = route->ksnr_connected; - - if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES || - (bits & (1 << SOCKNAL_CONN_ANY)) != 0 || - route->ksnr_connecting != 0) { - /* All typed connections have been established, or - * an untyped connection has been established, or - * connections are currently being established */ - found_connecting_or_connected = 1; + time64_t now = ktime_get_seconds(); + struct list_head *tmp; + struct ksock_route *route; + + list_for_each(tmp, &peer_ni->ksnp_routes) { + route = list_entry(tmp, struct ksock_route, ksnr_list); + + LASSERT (!route->ksnr_connecting || route->ksnr_scheduled); + + if (route->ksnr_scheduled) /* connections being established */ continue; - } - /* too soon to retry this guy? */ - if (!time_after_eq (jiffies, route->ksnr_timeout)) + /* all route types connected ? */ + if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0) continue; - - /* eager routes always want to be connected */ - if (route->ksnr_eager) - return (route); - if (first_lazy == NULL) - first_lazy = route; + if (!(route->ksnr_retry_interval == 0 || /* first attempt */ + now >= route->ksnr_timeout)) { + CDEBUG(D_NET, + "Too soon to retry route %pI4h " + "(cnted %d, interval %lld, %lld secs later)\n", + &route->ksnr_ipaddr, + route->ksnr_connected, + route->ksnr_retry_interval, + route->ksnr_timeout - now); + continue; + } + + return (route); } - - /* No eager routes need to be connected. If some connection has - * already been established, or is being established there's nothing to - * do. Otherwise we return the first lazy route we found. If it fails - * to connect, it will go to the end of the list. */ - - if (!list_empty (&peer->ksnp_conns) || - found_connecting_or_connected) - return (NULL); - - return (first_lazy); + + return (NULL); } -ksock_route_t * -ksocknal_find_connecting_route_locked (ksock_peer_t *peer) +struct ksock_route * +ksocknal_find_connecting_route_locked(struct ksock_peer_ni *peer_ni) { - struct list_head *tmp; - ksock_route_t *route; + struct list_head *tmp; + struct ksock_route *route; - list_for_each (tmp, &peer->ksnp_routes) { - route = list_entry (tmp, ksock_route_t, ksnr_list); - - if (route->ksnr_connecting != 0) + list_for_each(tmp, &peer_ni->ksnp_routes) { + route = list_entry(tmp, struct ksock_route, ksnr_list); + + LASSERT (!route->ksnr_connecting || route->ksnr_scheduled); + + if (route->ksnr_scheduled) return (route); } - + return (NULL); } int -ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid) +ksocknal_launch_packet(struct lnet_ni *ni, struct ksock_tx *tx, + struct lnet_process_id id) { - unsigned long flags; - ksock_peer_t *peer; - ksock_conn_t *conn; - ksock_route_t *route; - rwlock_t *g_lock; - - /* Ensure the frags we've been given EXACTLY match the number of - * bytes we want to send. Many TCP/IP stacks disregard any total - * size parameters passed to them and just look at the frags. - * - * We always expect at least 1 mapped fragment containing the - * complete portals header. */ - LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) + - lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob); - LASSERT (tx->tx_niov >= 1); - LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t)); + struct ksock_peer_ni *peer_ni; + struct ksock_conn *conn; + rwlock_t *g_lock; + int retry; + int rc; - CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n", - tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, - tx->tx_nob, tx->tx_niov, tx->tx_nkiov); - - tx->tx_conn = NULL; /* only set when assigned a conn */ - tx->tx_resid = tx->tx_nob; - tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base; + LASSERT (tx->tx_conn == NULL); g_lock = &ksocknal_data.ksnd_global_lock; - read_lock (g_lock); - - peer = ksocknal_find_target_peer_locked (tx, nid); - if (peer == NULL) { - read_unlock (g_lock); - return (-EHOSTUNREACH); - } - if (ksocknal_find_connectable_route_locked(peer) == NULL) { - conn = ksocknal_find_conn_locked (tx, peer); - if (conn != NULL) { - /* I've got no autoconnect routes that need to be - * connecting and I do have an actual connection... */ - ksocknal_queue_tx_locked (tx, conn); - read_unlock (g_lock); - return (0); + for (retry = 0;; retry = 1) { + read_lock(g_lock); + peer_ni = ksocknal_find_peer_locked(ni, id); + if (peer_ni != NULL) { + if (ksocknal_find_connectable_route_locked(peer_ni) == NULL) { + conn = ksocknal_find_conn_locked(peer_ni, tx, tx->tx_nonblk); + if (conn != NULL) { + /* I've got no routes that need to be + * connecting and I do have an actual + * connection... */ + ksocknal_queue_tx_locked (tx, conn); + read_unlock(g_lock); + return (0); + } + } } - } - - /* Making one or more connections; I'll need a write lock... */ - - atomic_inc (&peer->ksnp_refcount); /* +1 ref for me while I unlock */ - read_unlock (g_lock); - write_lock_irqsave (g_lock, flags); - - if (peer->ksnp_closing) { /* peer deleted as I blocked! */ - write_unlock_irqrestore (g_lock, flags); - ksocknal_put_peer (peer); - return (-EHOSTUNREACH); - } - ksocknal_put_peer (peer); /* drop ref I got above */ - for (;;) { - /* launch any/all autoconnections that need it */ - route = ksocknal_find_connectable_route_locked (peer); - if (route == NULL) + /* I'll need a write lock... */ + read_unlock(g_lock); + + write_lock_bh(g_lock); + + peer_ni = ksocknal_find_peer_locked(ni, id); + if (peer_ni != NULL) break; - ksocknal_launch_autoconnect_locked (route); + write_unlock_bh(g_lock); + + if ((id.pid & LNET_PID_USERFLAG) != 0) { + CERROR("Refusing to create a connection to " + "userspace process %s\n", libcfs_id2str(id)); + return -EHOSTUNREACH; + } + + if (retry) { + CERROR("Can't find peer_ni %s\n", libcfs_id2str(id)); + return -EHOSTUNREACH; + } + + rc = ksocknal_add_peer(ni, id, + LNET_NIDADDR(id.nid), + lnet_acceptor_port()); + if (rc != 0) { + CERROR("Can't add peer_ni %s: %d\n", + libcfs_id2str(id), rc); + return rc; + } } - conn = ksocknal_find_conn_locked (tx, peer); + ksocknal_launch_all_connections_locked(peer_ni); + + conn = ksocknal_find_conn_locked(peer_ni, tx, tx->tx_nonblk); if (conn != NULL) { /* Connection exists; queue message on it */ ksocknal_queue_tx_locked (tx, conn); - write_unlock_irqrestore (g_lock, flags); + write_unlock_bh(g_lock); return (0); } - route = ksocknal_find_connecting_route_locked (peer); - if (route != NULL) { - /* At least 1 connection is being established; queue the - * message... */ - list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue); - write_unlock_irqrestore (g_lock, flags); - return (0); + if (peer_ni->ksnp_accepting > 0 || + ksocknal_find_connecting_route_locked (peer_ni) != NULL) { + /* the message is going to be pinned to the peer_ni */ + tx->tx_deadline = ktime_get_seconds() + + lnet_get_lnd_timeout(); + + /* Queue the message until a connection is established */ + list_add_tail(&tx->tx_list, &peer_ni->ksnp_tx_queue); + write_unlock_bh(g_lock); + return 0; } - - write_unlock_irqrestore (g_lock, flags); + + write_unlock_bh(g_lock); + + /* NB Routes may be ignored if connections to them failed recently */ + CNETERR("No usable routes to %s\n", libcfs_id2str(id)); return (-EHOSTUNREACH); } -ptl_err_t -ksocknal_sendmsg(lib_nal_t *nal, - void *private, - lib_msg_t *cookie, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int payload_niov, - struct iovec *payload_iov, - ptl_kiov_t *payload_kiov, - size_t payload_offset, - size_t payload_nob) +int +ksocknal_send(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg) { - ksock_ltx_t *ltx; - int desc_size; - int rc; + int mpflag = 1; + int type = lntmsg->msg_type; + struct lnet_process_id target = lntmsg->msg_target; + unsigned int payload_niov = lntmsg->msg_niov; + struct kvec *payload_iov = lntmsg->msg_iov; + lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; + unsigned int payload_offset = lntmsg->msg_offset; + unsigned int payload_nob = lntmsg->msg_len; + struct ksock_tx *tx; + int desc_size; + int rc; /* NB 'private' is different depending on what we're sending. * Just ignore it... */ - CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64 - " pid %d\n", payload_nob, payload_niov, nid , pid); - - LASSERT (payload_nob == 0 || payload_niov > 0); - LASSERT (payload_niov <= PTL_MD_MAX_IOV); - - /* It must be OK to kmap() if required */ - LASSERT (payload_kiov == NULL || !in_interrupt ()); - /* payload is either all vaddrs or all pages */ - LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); - - if (payload_iov != NULL) - desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]); - else - desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]); - - if (in_interrupt() || - type == PTL_MSG_ACK || - type == PTL_MSG_REPLY) { - /* Can't block if in interrupt or responding to an incoming - * message */ - PORTAL_ALLOC_ATOMIC(ltx, desc_size); - } else { - PORTAL_ALLOC(ltx, desc_size); - } - - if (ltx == NULL) { - CERROR("Can't allocate tx desc type %d size %d %s\n", - type, desc_size, in_interrupt() ? "(intr)" : ""); - return (PTL_NO_SPACE); + CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n", + payload_nob, payload_niov, libcfs_id2str(target)); + + LASSERT (payload_nob == 0 || payload_niov > 0); + LASSERT (payload_niov <= LNET_MAX_IOV); + /* payload is either all vaddrs or all pages */ + LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); + LASSERT (!in_interrupt ()); + + if (payload_iov != NULL) + desc_size = offsetof(struct ksock_tx, + tx_frags.virt.iov[1 + payload_niov]); + else + desc_size = offsetof(struct ksock_tx, + tx_frags.paged.kiov[payload_niov]); + + if (lntmsg->msg_vmflush) + mpflag = cfs_memory_pressure_get_and_set(); + tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size); + if (tx == NULL) { + CERROR("Can't allocate tx desc type %d size %d\n", + type, desc_size); + if (lntmsg->msg_vmflush) + cfs_memory_pressure_restore(mpflag); + return (-ENOMEM); } - atomic_inc(&ksocknal_data.ksnd_nactive_ltxs); - - ltx->ltx_desc_size = desc_size; - - /* We always have 1 mapped frag for the header */ - ltx->ltx_tx.tx_iov = ltx->ltx_iov; - ltx->ltx_iov[0].iov_base = <x->ltx_hdr; - ltx->ltx_iov[0].iov_len = sizeof(*hdr); - ltx->ltx_hdr = *hdr; - - ltx->ltx_private = private; - ltx->ltx_cookie = cookie; - - ltx->ltx_tx.tx_isfwd = 0; - ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob; + tx->tx_conn = NULL; /* set when assigned a conn */ + tx->tx_lnetmsg = lntmsg; if (payload_iov != NULL) { - /* payload is all mapped */ - ltx->ltx_tx.tx_kiov = NULL; - ltx->ltx_tx.tx_nkiov = 0; - - ltx->ltx_tx.tx_niov = - 1 + lib_extract_iov(payload_niov, <x->ltx_iov[1], - payload_niov, payload_iov, - payload_offset, payload_nob); + tx->tx_kiov = NULL; + tx->tx_nkiov = 0; + tx->tx_iov = tx->tx_frags.virt.iov; + tx->tx_niov = 1 + + lnet_extract_iov(payload_niov, &tx->tx_iov[1], + payload_niov, payload_iov, + payload_offset, payload_nob); } else { - /* payload is all pages */ - ltx->ltx_tx.tx_niov = 1; - - ltx->ltx_tx.tx_kiov = ltx->ltx_kiov; - ltx->ltx_tx.tx_nkiov = - lib_extract_kiov(payload_niov, ltx->ltx_kiov, - payload_niov, payload_kiov, - payload_offset, payload_nob); + tx->tx_niov = 1; + tx->tx_iov = &tx->tx_frags.paged.iov; + tx->tx_kiov = tx->tx_frags.paged.kiov; + tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov, + payload_niov, payload_kiov, + payload_offset, payload_nob); + + if (payload_nob >= *ksocknal_tunables.ksnd_zc_min_payload) + tx->tx_zc_capable = 1; } - rc = ksocknal_launch_packet(<x->ltx_tx, nid); - if (rc == 0) - return (PTL_OK); - - ksocknal_free_ltx(ltx); - return (PTL_FAIL); -} + tx->tx_msg.ksm_csum = 0; + tx->tx_msg.ksm_type = KSOCK_MSG_LNET; + tx->tx_msg.ksm_zc_cookies[0] = 0; + tx->tx_msg.ksm_zc_cookies[1] = 0; -ptl_err_t -ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int payload_niov, struct iovec *payload_iov, - size_t payload_offset, size_t payload_len) -{ - return (ksocknal_sendmsg(nal, private, cookie, - hdr, type, nid, pid, - payload_niov, payload_iov, NULL, - payload_offset, payload_len)); -} + /* The first fragment will be set later in pro_pack */ + rc = ksocknal_launch_packet(ni, tx, target); + if (!mpflag) + cfs_memory_pressure_restore(mpflag); -ptl_err_t -ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int payload_niov, ptl_kiov_t *payload_kiov, - size_t payload_offset, size_t payload_len) -{ - return (ksocknal_sendmsg(nal, private, cookie, - hdr, type, nid, pid, - payload_niov, NULL, payload_kiov, - payload_offset, payload_len)); -} + if (rc == 0) + return (0); -void -ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) -{ - ptl_nid_t nid = fwd->kprfd_gateway_nid; - ksock_ftx_t *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch; - int rc; - - CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd, - fwd->kprfd_gateway_nid, fwd->kprfd_target_nid); - - /* I'm the gateway; must be the last hop */ - if (nid == ksocknal_lib.libnal_ni.ni_pid.nid) - nid = fwd->kprfd_target_nid; - - /* setup iov for hdr */ - ftx->ftx_iov.iov_base = fwd->kprfd_hdr; - ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t); - - ftx->ftx_tx.tx_isfwd = 1; /* This is a forwarding packet */ - ftx->ftx_tx.tx_nob = sizeof(ptl_hdr_t) + fwd->kprfd_nob; - ftx->ftx_tx.tx_niov = 1; - ftx->ftx_tx.tx_iov = &ftx->ftx_iov; - ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov; - ftx->ftx_tx.tx_kiov = fwd->kprfd_kiov; - - rc = ksocknal_launch_packet (&ftx->ftx_tx, nid); - if (rc != 0) - kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc); + ksocknal_free_tx(tx); + return (-EIO); } int -ksocknal_thread_start (int (*fn)(void *arg), void *arg) +ksocknal_thread_start(int (*fn)(void *arg), void *arg, char *name) { - long pid = kernel_thread (fn, arg, 0); + struct task_struct *task = kthread_run(fn, arg, name); - if (pid < 0) - return ((int)pid); + if (IS_ERR(task)) + return PTR_ERR(task); - atomic_inc (&ksocknal_data.ksnd_nthreads); - return (0); + write_lock_bh(&ksocknal_data.ksnd_global_lock); + ksocknal_data.ksnd_nthreads++; + write_unlock_bh(&ksocknal_data.ksnd_global_lock); + return 0; } void ksocknal_thread_fini (void) { - atomic_dec (&ksocknal_data.ksnd_nthreads); + write_lock_bh(&ksocknal_data.ksnd_global_lock); + ksocknal_data.ksnd_nthreads--; + write_unlock_bh(&ksocknal_data.ksnd_global_lock); } -void -ksocknal_fmb_callback (void *arg, int error) +int +ksocknal_new_packet(struct ksock_conn *conn, int nob_to_skip) { - ksock_fmb_t *fmb = (ksock_fmb_t *)arg; - ksock_fmb_pool_t *fmp = fmb->fmb_pool; - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page); - ksock_conn_t *conn = NULL; - ksock_sched_t *sched; - unsigned long flags; - char ipbuf[PTL_NALFMT_SIZE]; - char ipbuf2[PTL_NALFMT_SIZE]; - - if (error != 0) - CERROR("Failed to route packet from " - LPX64" %s to "LPX64" %s: %d\n", - NTOH__u64(hdr->src_nid), - portals_nid2str(SOCKNAL, NTOH__u64(hdr->src_nid), ipbuf), - NTOH__u64(hdr->dest_nid), - portals_nid2str(SOCKNAL, NTOH__u64(hdr->dest_nid), ipbuf2), - error); - else - CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n", - NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid)); - - /* drop peer ref taken on init */ - ksocknal_put_peer (fmb->fmb_peer); - - spin_lock_irqsave (&fmp->fmp_lock, flags); - - list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs); - fmp->fmp_nactive_fmbs--; - - if (!list_empty (&fmp->fmp_blocked_conns)) { - conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next, - ksock_conn_t, ksnc_rx_list); - list_del (&conn->ksnc_rx_list); + static char ksocknal_slop_buffer[4096]; + int nob; + unsigned int niov; + int skipped; + + LASSERT(conn->ksnc_proto != NULL); + + if ((*ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0) { + /* Remind the socket to ack eagerly... */ + ksocknal_lib_eager_ack(conn); } - spin_unlock_irqrestore (&fmp->fmp_lock, flags); + if (nob_to_skip == 0) { /* right at next packet boundary now */ + conn->ksnc_rx_started = 0; + smp_mb(); /* racing with timeout thread */ - if (conn == NULL) - return; - - CDEBUG (D_NET, "Scheduling conn %p\n", conn); - LASSERT (conn->ksnc_rx_scheduled); - LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP); - - conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; - - sched = conn->ksnc_scheduler; - - spin_lock_irqsave (&sched->kss_lock, flags); - - list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns); - wake_up (&sched->kss_waitq); - - spin_unlock_irqrestore (&sched->kss_lock, flags); -} - -ksock_fmb_t * -ksocknal_get_idle_fmb (ksock_conn_t *conn) -{ - int payload_nob = conn->ksnc_rx_nob_left; - unsigned long flags; - ksock_fmb_pool_t *pool; - ksock_fmb_t *fmb; - - LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB); - LASSERT (kpr_routing(&ksocknal_data.ksnd_router)); - - if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE) - pool = &ksocknal_data.ksnd_small_fmp; - else - pool = &ksocknal_data.ksnd_large_fmp; - - spin_lock_irqsave (&pool->fmp_lock, flags); - - if (!list_empty (&pool->fmp_idle_fmbs)) { - fmb = list_entry(pool->fmp_idle_fmbs.next, - ksock_fmb_t, fmb_list); - list_del (&fmb->fmb_list); - pool->fmp_nactive_fmbs++; - spin_unlock_irqrestore (&pool->fmp_lock, flags); - - return (fmb); - } - - /* deschedule until fmb free */ - - conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP; - - list_add_tail (&conn->ksnc_rx_list, - &pool->fmp_blocked_conns); - - spin_unlock_irqrestore (&pool->fmp_lock, flags); - return (NULL); -} - -int -ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb) -{ - int payload_nob = conn->ksnc_rx_nob_left; - ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid); - int niov = 0; - int nob = payload_nob; - - LASSERT (conn->ksnc_rx_scheduled); - LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB); - LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left); - LASSERT (payload_nob >= 0); - LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE); - LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE); - LASSERT (fmb->fmb_kiov[0].kiov_offset == 0); - - /* Take a ref on the conn's peer to prevent module unload before - * forwarding completes. */ - fmb->fmb_peer = conn->ksnc_peer; - atomic_inc (&conn->ksnc_peer->ksnp_refcount); - - /* Copy the header we just read into the forwarding buffer. If - * there's payload, start reading reading it into the buffer, - * otherwise the forwarding buffer can be kicked off - * immediately. */ - fmb->fmb_hdr = conn->ksnc_hdr; - - while (nob > 0) { - LASSERT (niov < fmb->fmb_pool->fmp_buff_pages); - LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0); - fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob); - nob -= PAGE_SIZE; - niov++; - } - - kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr, - payload_nob, niov, fmb->fmb_kiov, - ksocknal_fmb_callback, fmb); - - if (payload_nob == 0) { /* got complete packet already */ - CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n", - conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid); - - kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd); - - ksocknal_new_packet (conn, 0); /* on to next packet */ - return (1); - } + switch (conn->ksnc_proto->pro_version) { + case KSOCK_PROTO_V2: + case KSOCK_PROTO_V3: + conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER; + conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space; + conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg; - conn->ksnc_cookie = fmb; /* stash fmb for later */ - conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */ - - /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed - * buffer */ - LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t)); - - conn->ksnc_rx_niov = 0; - conn->ksnc_rx_nkiov = niov; - conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; - memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t)); - - CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn, - NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob); - return (0); -} - -void -ksocknal_fwd_parse (ksock_conn_t *conn) -{ - ksock_peer_t *peer; - ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid); - ptl_nid_t src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid); - int body_len = NTOH__u32 (conn->ksnc_hdr.payload_length); - char str[PTL_NALFMT_SIZE]; - char str2[PTL_NALFMT_SIZE]; - - CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn, - src_nid, dest_nid, conn->ksnc_rx_nob_left); - - LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER); - LASSERT (conn->ksnc_rx_scheduled); - - if (body_len < 0) { /* length corrupt (overflow) */ - CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): " - "packet size %d illegal\n", - src_nid, portals_nid2str(TCPNAL, src_nid, str), - dest_nid, portals_nid2str(TCPNAL, dest_nid, str2), - body_len); - - ksocknal_new_packet (conn, 0); /* on to new packet */ - return; - } - - if (!kpr_routing(&ksocknal_data.ksnd_router)) { /* not forwarding */ - CERROR("dropping packet from "LPX64" (%s) for "LPX64 - " (%s): not forwarding\n", - src_nid, portals_nid2str(TCPNAL, src_nid, str), - dest_nid, portals_nid2str(TCPNAL, dest_nid, str2)); - /* on to new packet (skip this one's body) */ - ksocknal_new_packet (conn, body_len); - return; - } - - if (body_len > PTL_MTU) { /* too big to forward */ - CERROR ("dropping packet from "LPX64" (%s) for "LPX64 - "(%s): packet size %d too big\n", - src_nid, portals_nid2str(TCPNAL, src_nid, str), - dest_nid, portals_nid2str(TCPNAL, dest_nid, str2), - body_len); - /* on to new packet (skip this one's body) */ - ksocknal_new_packet (conn, body_len); - return; - } + conn->ksnc_rx_nob_wanted = offsetof(struct ksock_msg, ksm_u); + conn->ksnc_rx_nob_left = offsetof(struct ksock_msg, ksm_u); + conn->ksnc_rx_iov[0].iov_len = offsetof(struct ksock_msg, ksm_u); + break; - /* should have gone direct */ - peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid); - if (peer != NULL) { - CERROR ("dropping packet from "LPX64" (%s) for "LPX64 - "(%s): target is a peer\n", - src_nid, portals_nid2str(TCPNAL, src_nid, str), - dest_nid, portals_nid2str(TCPNAL, dest_nid, str2)); - ksocknal_put_peer (peer); /* drop ref from get above */ - - /* on to next packet (skip this one's body) */ - ksocknal_new_packet (conn, body_len); - return; - } + case KSOCK_PROTO_V1: + /* Receiving bare struct lnet_hdr */ + conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER; + conn->ksnc_rx_nob_wanted = sizeof(struct lnet_hdr); + conn->ksnc_rx_nob_left = sizeof(struct lnet_hdr); - conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; /* Getting FMB now */ - conn->ksnc_rx_nob_left = body_len; /* stash packet size */ - conn->ksnc_rx_nob_wanted = body_len; /* (no slop) */ -} - -int -ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) -{ - static char ksocknal_slop_buffer[4096]; + conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space; + conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg; + conn->ksnc_rx_iov[0].iov_len = sizeof(struct lnet_hdr); + break; - int nob; - int niov; - int skipped; - - if (nob_to_skip == 0) { /* right at next packet boundary now */ - conn->ksnc_rx_started = 0; - mb (); /* racing with timeout thread */ - - conn->ksnc_rx_state = SOCKNAL_RX_HEADER; - conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t); - conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t); - - conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; - conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr; - conn->ksnc_rx_iov[0].iov_len = sizeof (ptl_hdr_t); + default: + LBUG (); + } conn->ksnc_rx_niov = 1; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_nkiov = 0; + conn->ksnc_rx_csum = ~0; return (1); } - /* Set up to skip as much a possible now. If there's more left + /* Set up to skip as much as possible now. If there's more left * (ran out of iov entries) we'll get called again */ conn->ksnc_rx_state = SOCKNAL_RX_SLOP; conn->ksnc_rx_nob_left = nob_to_skip; - conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; + conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space; skipped = 0; niov = 0; @@ -1388,7 +1141,7 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) nob_to_skip -=nob; } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */ - niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec)); + niov < sizeof(conn->ksnc_rx_iov_space) / sizeof(struct kvec)); conn->ksnc_rx_niov = niov; conn->ksnc_rx_kiov = NULL; @@ -1397,128 +1150,203 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) return (0); } -int -ksocknal_process_receive (ksock_conn_t *conn) +static int +ksocknal_process_receive(struct ksock_conn *conn) { - ksock_fmb_t *fmb; - int rc; - - LASSERT (atomic_read (&conn->ksnc_refcount) > 0); - - /* doesn't need a forwarding buffer */ - if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB) - goto try_read; - - get_fmb: - fmb = ksocknal_get_idle_fmb (conn); - if (fmb == NULL) { - /* conn descheduled waiting for idle fmb */ - return (0); - } + struct lnet_hdr *lhdr; + struct lnet_process_id *id; + int rc; - if (ksocknal_init_fmb (conn, fmb)) { - /* packet forwarded */ - return (0); - } + LASSERT (atomic_read(&conn->ksnc_conn_refcount) > 0); - try_read: - /* NB: sched lock NOT held */ - LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER || - conn->ksnc_rx_state == SOCKNAL_RX_BODY || - conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD || + /* NB: sched lock NOT held */ + /* SOCKNAL_RX_LNET_HEADER is here for backward compatibility */ + LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER || + conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD || + conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER || conn->ksnc_rx_state == SOCKNAL_RX_SLOP); + again: + if (conn->ksnc_rx_nob_wanted != 0) { + rc = ksocknal_receive(conn); + + if (rc <= 0) { + struct lnet_process_id ksnp_id; + + ksnp_id = conn->ksnc_peer->ksnp_id; + + LASSERT(rc != -EAGAIN); + if (rc == 0) + CDEBUG(D_NET, "[%p] EOF from %s " + "ip %pI4h:%d\n", conn, + libcfs_id2str(ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); + else if (!conn->ksnc_closing) + CERROR("[%p] Error %d on read from %s " + "ip %pI4h:%d\n", conn, rc, + libcfs_id2str(ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); + + /* it's not an error if conn is being closed */ + ksocknal_close_conn_and_siblings (conn, + (conn->ksnc_closing) ? 0 : rc); + return (rc == 0 ? -ESHUTDOWN : rc); + } - LASSERT (conn->ksnc_rx_nob_wanted > 0); + if (conn->ksnc_rx_nob_wanted != 0) { + /* short read */ + return (-EAGAIN); + } + } + switch (conn->ksnc_rx_state) { + case SOCKNAL_RX_KSM_HEADER: + if (conn->ksnc_flip) { + __swab32s(&conn->ksnc_msg.ksm_type); + __swab32s(&conn->ksnc_msg.ksm_csum); + __swab64s(&conn->ksnc_msg.ksm_zc_cookies[0]); + __swab64s(&conn->ksnc_msg.ksm_zc_cookies[1]); + } - rc = ksocknal_receive(conn); + if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP && + conn->ksnc_msg.ksm_type != KSOCK_MSG_LNET) { + CERROR("%s: Unknown message type: %x\n", + libcfs_id2str(conn->ksnc_peer->ksnp_id), + conn->ksnc_msg.ksm_type); + ksocknal_new_packet(conn, 0); + ksocknal_close_conn_and_siblings(conn, -EPROTO); + return (-EPROTO); + } - if (rc <= 0) { - LASSERT (rc != -EAGAIN); + if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP && + conn->ksnc_msg.ksm_csum != 0 && /* has checksum */ + conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) { + /* NOOP Checksum error */ + CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n", + libcfs_id2str(conn->ksnc_peer->ksnp_id), + conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum); + ksocknal_new_packet(conn, 0); + ksocknal_close_conn_and_siblings(conn, -EPROTO); + return (-EIO); + } - if (rc == 0) - CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n", - conn, conn->ksnc_peer->ksnp_nid, - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); - else if (!conn->ksnc_closing) - CERROR ("[%p] Error %d on read from "LPX64 - " ip %d.%d.%d.%d:%d\n", - conn, rc, conn->ksnc_peer->ksnp_nid, - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); + if (conn->ksnc_msg.ksm_zc_cookies[1] != 0) { + __u64 cookie = 0; - ksocknal_close_conn_and_siblings (conn, rc); - return (rc == 0 ? -ESHUTDOWN : rc); - } + LASSERT (conn->ksnc_proto != &ksocknal_protocol_v1x); - if (conn->ksnc_rx_nob_wanted != 0) { - /* short read */ - return (-EAGAIN); - } - - switch (conn->ksnc_rx_state) { - case SOCKNAL_RX_HEADER: - if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) && - NTOH__u64(conn->ksnc_hdr.dest_nid) != - ksocknal_lib.libnal_ni.ni_pid.nid) { - /* This packet isn't for me */ - ksocknal_fwd_parse (conn); - switch (conn->ksnc_rx_state) { - case SOCKNAL_RX_HEADER: /* skipped (zero payload) */ - return (0); /* => come back later */ - case SOCKNAL_RX_SLOP: /* skipping packet's body */ - goto try_read; /* => go read it */ - case SOCKNAL_RX_GET_FMB: /* forwarding */ - goto get_fmb; /* => go get a fwd msg buffer */ - default: - LBUG (); + if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) + cookie = conn->ksnc_msg.ksm_zc_cookies[0]; + + rc = conn->ksnc_proto->pro_handle_zcack(conn, cookie, + conn->ksnc_msg.ksm_zc_cookies[1]); + + if (rc != 0) { + CERROR("%s: Unknown ZC-ACK cookie: %llu, %llu\n", + libcfs_id2str(conn->ksnc_peer->ksnp_id), + cookie, conn->ksnc_msg.ksm_zc_cookies[1]); + ksocknal_new_packet(conn, 0); + ksocknal_close_conn_and_siblings(conn, -EPROTO); + return (rc); } - /* Not Reached */ } - /* sets wanted_len, iovs etc */ - rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn); + if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) { + ksocknal_new_packet (conn, 0); + return 0; /* NOOP is done and just return */ + } + + conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER; + conn->ksnc_rx_nob_wanted = sizeof(struct ksock_lnet_msg); + conn->ksnc_rx_nob_left = sizeof(struct ksock_lnet_msg); + + conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space; + conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg; + conn->ksnc_rx_iov[0].iov_len = sizeof(struct ksock_lnet_msg); + + conn->ksnc_rx_niov = 1; + conn->ksnc_rx_kiov = NULL; + conn->ksnc_rx_nkiov = 0; + + goto again; /* read lnet header now */ + + case SOCKNAL_RX_LNET_HEADER: + /* unpack message header */ + conn->ksnc_proto->pro_unpack(&conn->ksnc_msg); + + if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) { + /* Userspace peer_ni */ + lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr; + id = &conn->ksnc_peer->ksnp_id; + + /* Substitute process ID assigned at connection time */ + lhdr->src_pid = cpu_to_le32(id->pid); + lhdr->src_nid = cpu_to_le64(id->nid); + } + + conn->ksnc_rx_state = SOCKNAL_RX_PARSE; + ksocknal_conn_addref(conn); /* ++ref while parsing */ - if (rc != PTL_OK) { + rc = lnet_parse(conn->ksnc_peer->ksnp_ni, + &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr, + conn->ksnc_peer->ksnp_id.nid, conn, 0); + if (rc < 0) { /* I just received garbage: give up on this conn */ + ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings (conn, rc); + ksocknal_conn_decref(conn); return (-EPROTO); } - if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */ - conn->ksnc_rx_state = SOCKNAL_RX_BODY; - goto try_read; /* go read the payload */ - } - /* Fall through (completed packet for me) */ + /* I'm racing with ksocknal_recv() */ + LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_PARSE || + conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD); - case SOCKNAL_RX_BODY: - /* payload all received */ - lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK); - /* Fall through */ + if (conn->ksnc_rx_state != SOCKNAL_RX_LNET_PAYLOAD) + return 0; - case SOCKNAL_RX_SLOP: - /* starting new packet? */ - if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left)) - return (0); /* come back later */ - goto try_read; /* try to finish reading slop now */ + /* ksocknal_recv() got called */ + goto again; - case SOCKNAL_RX_BODY_FWD: + case SOCKNAL_RX_LNET_PAYLOAD: /* payload all received */ - CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n", - conn, NTOH__u64 (conn->ksnc_hdr.src_nid), - NTOH__u64 (conn->ksnc_hdr.dest_nid), - conn->ksnc_rx_nob_left); + rc = 0; - /* forward the packet. NB ksocknal_init_fmb() put fmb into - * conn->ksnc_cookie */ - fmb = (ksock_fmb_t *)conn->ksnc_cookie; - kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd); + if (conn->ksnc_rx_nob_left == 0 && /* not truncating */ + conn->ksnc_msg.ksm_csum != 0 && /* has checksum */ + conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) { + CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n", + libcfs_id2str(conn->ksnc_peer->ksnp_id), + conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum); + rc = -EIO; + } + + if (rc == 0 && conn->ksnc_msg.ksm_zc_cookies[0] != 0) { + LASSERT(conn->ksnc_proto != &ksocknal_protocol_v1x); - /* no slop in forwarded packets */ - LASSERT (conn->ksnc_rx_nob_left == 0); + lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr; + id = &conn->ksnc_peer->ksnp_id; + + rc = conn->ksnc_proto->pro_handle_zcreq(conn, + conn->ksnc_msg.ksm_zc_cookies[0], + *ksocknal_tunables.ksnd_nonblk_zcack || + le64_to_cpu(lhdr->src_nid) != id->nid); + } + + lnet_finalize(conn->ksnc_cookie, rc); + + if (rc != 0) { + ksocknal_new_packet(conn, 0); + ksocknal_close_conn_and_siblings (conn, rc); + return (-EPROTO); + } + /* Fall through */ - ksocknal_new_packet (conn, 0); /* on to next packet */ - return (0); /* (later) */ + case SOCKNAL_RX_SLOP: + /* starting new packet? */ + if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left)) + return 0; /* come back later */ + goto again; /* try to finish reading slop now */ default: break; @@ -1529,100 +1357,112 @@ ksocknal_process_receive (ksock_conn_t *conn) return (-EINVAL); /* keep gcc happy */ } -ptl_err_t -ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, - size_t offset, size_t mlen, size_t rlen) +int +ksocknal_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg, + int delayed, unsigned int niov, struct kvec *iov, + lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, + unsigned int rlen) { - ksock_conn_t *conn = (ksock_conn_t *)private; + struct ksock_conn *conn = private; + struct ksock_sched *sched = conn->ksnc_scheduler; LASSERT (mlen <= rlen); - LASSERT (niov <= PTL_MD_MAX_IOV); - + LASSERT (niov <= LNET_MAX_IOV); + conn->ksnc_cookie = msg; conn->ksnc_rx_nob_wanted = mlen; conn->ksnc_rx_nob_left = rlen; - conn->ksnc_rx_nkiov = 0; - conn->ksnc_rx_kiov = NULL; - conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov; - conn->ksnc_rx_niov = - lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov, - niov, iov, offset, mlen); + if (mlen == 0 || iov != NULL) { + conn->ksnc_rx_nkiov = 0; + conn->ksnc_rx_kiov = NULL; + conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov; + conn->ksnc_rx_niov = + lnet_extract_iov(LNET_MAX_IOV, conn->ksnc_rx_iov, + niov, iov, offset, mlen); + } else { + conn->ksnc_rx_niov = 0; + conn->ksnc_rx_iov = NULL; + conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; + conn->ksnc_rx_nkiov = + lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov, + niov, kiov, offset, mlen); + } + + LASSERT (mlen == + lnet_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + + lnet_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); + + LASSERT (conn->ksnc_rx_scheduled); + + spin_lock_bh(&sched->kss_lock); - LASSERT (mlen == - lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + - lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); + switch (conn->ksnc_rx_state) { + case SOCKNAL_RX_PARSE_WAIT: + list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns); + wake_up(&sched->kss_waitq); + LASSERT(conn->ksnc_rx_ready); + break; - return (PTL_OK); + case SOCKNAL_RX_PARSE: + /* scheduler hasn't noticed I'm parsing yet */ + break; + } + + conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD; + + spin_unlock_bh(&sched->kss_lock); + ksocknal_conn_decref(conn); + return 0; } -ptl_err_t -ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, ptl_kiov_t *kiov, - size_t offset, size_t mlen, size_t rlen) +static inline int +ksocknal_sched_cansleep(struct ksock_sched *sched) { - ksock_conn_t *conn = (ksock_conn_t *)private; + int rc; - LASSERT (mlen <= rlen); - LASSERT (niov <= PTL_MD_MAX_IOV); - - conn->ksnc_cookie = msg; - conn->ksnc_rx_nob_wanted = mlen; - conn->ksnc_rx_nob_left = rlen; + spin_lock_bh(&sched->kss_lock); - conn->ksnc_rx_niov = 0; - conn->ksnc_rx_iov = NULL; - conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; - conn->ksnc_rx_nkiov = - lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov, - niov, kiov, offset, mlen); + rc = (!ksocknal_data.ksnd_shuttingdown && + list_empty(&sched->kss_rx_conns) && + list_empty(&sched->kss_tx_conns)); - LASSERT (mlen == - lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + - lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); - - return (PTL_OK); + spin_unlock_bh(&sched->kss_lock); + return rc; } -int ksocknal_scheduler (void *arg) +int ksocknal_scheduler(void *arg) { - ksock_sched_t *sched = (ksock_sched_t *)arg; - ksock_conn_t *conn; - ksock_tx_t *tx; - unsigned long flags; - int rc; - int nloops = 0; - int id = sched - ksocknal_data.ksnd_schedulers; - char name[16]; - - snprintf (name, sizeof (name),"ksocknald_%02d", id); - kportal_daemonize (name); - kportal_blockallsigs (); - -#if (CONFIG_SMP && CPU_AFFINITY) - if ((cpu_online_map & (1 << id)) != 0) { -#if 1 - current->cpus_allowed = (1 << id); -#else - set_cpus_allowed (current, 1<kss_lock, flags); + struct ksock_sched_info *info; + struct ksock_sched *sched; + struct ksock_conn *conn; + struct ksock_tx *tx; + int rc; + int nloops = 0; + long id = (long)arg; + + info = ksocknal_data.ksnd_sched_info[KSOCK_THREAD_CPT(id)]; + sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)]; + + cfs_block_allsigs(); + + rc = cfs_cpt_bind(lnet_cpt_table(), info->ksi_cpt); + if (rc != 0) { + CWARN("Can't set CPU partition affinity to %d: %d\n", + info->ksi_cpt, rc); + } + + spin_lock_bh(&sched->kss_lock); while (!ksocknal_data.ksnd_shuttingdown) { int did_something = 0; /* Ensure I progress everything semi-fairly */ - if (!list_empty (&sched->kss_rx_conns)) { - conn = list_entry(sched->kss_rx_conns.next, - ksock_conn_t, ksnc_rx_list); - list_del(&conn->ksnc_rx_list); + if (!list_empty(&sched->kss_rx_conns)) { + conn = list_entry(sched->kss_rx_conns.next, + struct ksock_conn, ksnc_rx_list); + list_del(&conn->ksnc_rx_list); LASSERT(conn->ksnc_rx_scheduled); LASSERT(conn->ksnc_rx_ready); @@ -1632,11 +1472,11 @@ int ksocknal_scheduler (void *arg) * data_ready can set it any time after we release * kss_lock. */ conn->ksnc_rx_ready = 0; - spin_unlock_irqrestore(&sched->kss_lock, flags); - - rc = ksocknal_process_receive(conn); - - spin_lock_irqsave(&sched->kss_lock, flags); + spin_unlock_bh(&sched->kss_lock); + + rc = ksocknal_process_receive(conn); + + spin_lock_bh(&sched->kss_lock); /* I'm the only one that can clear this flag */ LASSERT(conn->ksnc_rx_scheduled); @@ -1644,57 +1484,77 @@ int ksocknal_scheduler (void *arg) /* Did process_receive get everything it wanted? */ if (rc == 0) conn->ksnc_rx_ready = 1; - - if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP || - conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) { - /* Conn blocked for a forwarding buffer. - * It will get queued for my attention when - * one becomes available (and it might just - * already have been!). Meanwhile my ref - * on it stays put. */ + + if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) { + /* Conn blocked waiting for ksocknal_recv() + * I change its state (under lock) to signal + * it can be rescheduled */ + conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT; } else if (conn->ksnc_rx_ready) { /* reschedule for rx */ - list_add_tail (&conn->ksnc_rx_list, - &sched->kss_rx_conns); + list_add_tail(&conn->ksnc_rx_list, + &sched->kss_rx_conns); } else { conn->ksnc_rx_scheduled = 0; /* drop my ref */ - ksocknal_put_conn(conn); + ksocknal_conn_decref(conn); } did_something = 1; } - if (!list_empty (&sched->kss_tx_conns)) { - conn = list_entry(sched->kss_tx_conns.next, - ksock_conn_t, ksnc_tx_list); - list_del (&conn->ksnc_tx_list); - + if (!list_empty(&sched->kss_tx_conns)) { + struct list_head zlist = LIST_HEAD_INIT(zlist); + + if (!list_empty(&sched->kss_zombie_noop_txs)) { + list_add(&zlist, + &sched->kss_zombie_noop_txs); + list_del_init(&sched->kss_zombie_noop_txs); + } + + conn = list_entry(sched->kss_tx_conns.next, + struct ksock_conn, ksnc_tx_list); + list_del(&conn->ksnc_tx_list); + LASSERT(conn->ksnc_tx_scheduled); LASSERT(conn->ksnc_tx_ready); - LASSERT(!list_empty(&conn->ksnc_tx_queue)); - - tx = list_entry(conn->ksnc_tx_queue.next, - ksock_tx_t, tx_list); + LASSERT(!list_empty(&conn->ksnc_tx_queue)); + + tx = list_entry(conn->ksnc_tx_queue.next, + struct ksock_tx, tx_list); + + if (conn->ksnc_tx_carrier == tx) + ksocknal_next_tx_carrier(conn); + /* dequeue now so empty list => more to send */ - list_del(&tx->tx_list); - + list_del(&tx->tx_list); + /* Clear tx_ready in case send isn't complete. Do * it BEFORE we call process_transmit, since * write_space can set it any time after we release * kss_lock. */ conn->ksnc_tx_ready = 0; - spin_unlock_irqrestore (&sched->kss_lock, flags); + spin_unlock_bh(&sched->kss_lock); - rc = ksocknal_process_transmit(conn, tx); + if (!list_empty(&zlist)) { + /* free zombie noop txs, it's fast because + * noop txs are just put in freelist */ + ksocknal_txlist_done(NULL, &zlist, 0); + } - spin_lock_irqsave (&sched->kss_lock, flags); + rc = ksocknal_process_transmit(conn, tx); if (rc == -ENOMEM || rc == -EAGAIN) { /* Incomplete send: replace tx on HEAD of tx_queue */ - list_add (&tx->tx_list, &conn->ksnc_tx_queue); - } else { - /* Complete send; assume space for more */ + spin_lock_bh(&sched->kss_lock); + list_add(&tx->tx_list, + &conn->ksnc_tx_queue); + } else { + /* Complete send; tx -ref */ + ksocknal_tx_decref(tx); + + spin_lock_bh(&sched->kss_lock); + /* assume space for more */ conn->ksnc_tx_ready = 1; } @@ -1702,745 +1562,803 @@ int ksocknal_scheduler (void *arg) /* Do nothing; after a short timeout, this * conn will be reposted on kss_tx_conns. */ } else if (conn->ksnc_tx_ready && - !list_empty (&conn->ksnc_tx_queue)) { + !list_empty(&conn->ksnc_tx_queue)) { /* reschedule for tx */ - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); + list_add_tail(&conn->ksnc_tx_list, + &sched->kss_tx_conns); } else { conn->ksnc_tx_scheduled = 0; /* drop my ref */ - ksocknal_put_conn (conn); + ksocknal_conn_decref(conn); } - - did_something = 1; - } -#if SOCKNAL_ZC - if (!list_empty (&sched->kss_zctxdone_list)) { - ksock_tx_t *tx = - list_entry(sched->kss_zctxdone_list.next, - ksock_tx_t, tx_list); - did_something = 1; - list_del (&tx->tx_list); - spin_unlock_irqrestore (&sched->kss_lock, flags); - - ksocknal_tx_done (tx, 1); - - spin_lock_irqsave (&sched->kss_lock, flags); + did_something = 1; } -#endif if (!did_something || /* nothing to do */ ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */ - spin_unlock_irqrestore (&sched->kss_lock, flags); + spin_unlock_bh(&sched->kss_lock); nloops = 0; if (!did_something) { /* wait for something to do */ -#if SOCKNAL_ZC - rc = wait_event_interruptible (sched->kss_waitq, - ksocknal_data.ksnd_shuttingdown || - !list_empty(&sched->kss_rx_conns) || - !list_empty(&sched->kss_tx_conns) || - !list_empty(&sched->kss_zctxdone_list)); -#else - rc = wait_event_interruptible (sched->kss_waitq, - ksocknal_data.ksnd_shuttingdown || - !list_empty(&sched->kss_rx_conns) || - !list_empty(&sched->kss_tx_conns)); -#endif - LASSERT (rc == 0); - } else - our_cond_resched(); - - spin_lock_irqsave (&sched->kss_lock, flags); - } - } - - spin_unlock_irqrestore (&sched->kss_lock, flags); - ksocknal_thread_fini (); - return (0); + rc = wait_event_interruptible_exclusive( + sched->kss_waitq, + !ksocknal_sched_cansleep(sched)); + LASSERT (rc == 0); + } else { + cond_resched(); + } + + spin_lock_bh(&sched->kss_lock); + } + } + + spin_unlock_bh(&sched->kss_lock); + ksocknal_thread_fini(); + return 0; } -void -ksocknal_data_ready (struct sock *sk, int n) +/* + * Add connection to kss_rx_conns of scheduler + * and wakeup the scheduler. + */ +void ksocknal_read_callback(struct ksock_conn *conn) { - unsigned long flags; - ksock_conn_t *conn; - ksock_sched_t *sched; - ENTRY; + struct ksock_sched *sched; + ENTRY; - /* interleave correctly with closing sockets... */ - read_lock (&ksocknal_data.ksnd_global_lock); + sched = conn->ksnc_scheduler; - conn = sk->sk_user_data; - if (conn == NULL) { /* raced with ksocknal_terminate_conn */ - LASSERT (sk->sk_data_ready != &ksocknal_data_ready); - sk->sk_data_ready (sk, n); - } else { - sched = conn->ksnc_scheduler; - - spin_lock_irqsave (&sched->kss_lock, flags); + spin_lock_bh(&sched->kss_lock); - conn->ksnc_rx_ready = 1; + conn->ksnc_rx_ready = 1; - if (!conn->ksnc_rx_scheduled) { /* not being progressed */ - list_add_tail(&conn->ksnc_rx_list, - &sched->kss_rx_conns); - conn->ksnc_rx_scheduled = 1; - /* extra ref for scheduler */ - atomic_inc (&conn->ksnc_refcount); + if (!conn->ksnc_rx_scheduled) { /* not being progressed */ + list_add_tail(&conn->ksnc_rx_list, + &sched->kss_rx_conns); + conn->ksnc_rx_scheduled = 1; + /* extra ref for scheduler */ + ksocknal_conn_addref(conn); - wake_up (&sched->kss_waitq); - } - - spin_unlock_irqrestore (&sched->kss_lock, flags); - } + wake_up (&sched->kss_waitq); + } + spin_unlock_bh(&sched->kss_lock); - read_unlock (&ksocknal_data.ksnd_global_lock); - - EXIT; + EXIT; } -void -ksocknal_write_space (struct sock *sk) +/* + * Add connection to kss_tx_conns of scheduler + * and wakeup the scheduler. + */ +void ksocknal_write_callback(struct ksock_conn *conn) { - unsigned long flags; - ksock_conn_t *conn; - ksock_sched_t *sched; + struct ksock_sched *sched; + ENTRY; - /* interleave correctly with closing sockets... */ - read_lock (&ksocknal_data.ksnd_global_lock); + sched = conn->ksnc_scheduler; - conn = sk->sk_user_data; + spin_lock_bh(&sched->kss_lock); - CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n", - sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn, - (conn == NULL) ? "" : (conn->ksnc_tx_ready ? - " ready" : " blocked"), - (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ? - " scheduled" : " idle"), - (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ? - " empty" : " queued")); + conn->ksnc_tx_ready = 1; - if (conn == NULL) { /* raced with ksocknal_terminate_conn */ - LASSERT (sk->sk_write_space != &ksocknal_write_space); - sk->sk_write_space (sk); + if (!conn->ksnc_tx_scheduled && /* not being progressed */ + !list_empty(&conn->ksnc_tx_queue)) { /* packets to send */ + list_add_tail(&conn->ksnc_tx_list, &sched->kss_tx_conns); + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + ksocknal_conn_addref(conn); - read_unlock (&ksocknal_data.ksnd_global_lock); - return; - } + wake_up(&sched->kss_waitq); + } - if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */ - clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags); + spin_unlock_bh(&sched->kss_lock); - sched = conn->ksnc_scheduler; + EXIT; +} - spin_lock_irqsave (&sched->kss_lock, flags); +static struct ksock_proto * +ksocknal_parse_proto_version (struct ksock_hello_msg *hello) +{ + __u32 version = 0; - conn->ksnc_tx_ready = 1; + if (hello->kshm_magic == LNET_PROTO_MAGIC) + version = hello->kshm_version; + else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC)) + version = __swab32(hello->kshm_version); - if (!conn->ksnc_tx_scheduled && // not being progressed - !list_empty(&conn->ksnc_tx_queue)){//packets to send - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); - conn->ksnc_tx_scheduled = 1; - /* extra ref for scheduler */ - atomic_inc (&conn->ksnc_refcount); + if (version != 0) { +#if SOCKNAL_VERSION_DEBUG + if (*ksocknal_tunables.ksnd_protocol == 1) + return NULL; - wake_up (&sched->kss_waitq); - } + if (*ksocknal_tunables.ksnd_protocol == 2 && + version == KSOCK_PROTO_V3) + return NULL; +#endif + if (version == KSOCK_PROTO_V2) + return &ksocknal_protocol_v2x; + + if (version == KSOCK_PROTO_V3) + return &ksocknal_protocol_v3x; - spin_unlock_irqrestore (&sched->kss_lock, flags); + return NULL; } - read_unlock (&ksocknal_data.ksnd_global_lock); -} + if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) { + struct lnet_magicversion *hmv; -int -ksocknal_sock_write (struct socket *sock, void *buffer, int nob) -{ - int rc; - mm_segment_t oldmm = get_fs(); - - while (nob > 0) { - struct iovec iov = { - .iov_base = buffer, - .iov_len = nob - }; - struct msghdr msg = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = &iov, - .msg_iovlen = 1, - .msg_control = NULL, - .msg_controllen = 0, - .msg_flags = 0 - }; - - set_fs (KERNEL_DS); - rc = sock_sendmsg (sock, &msg, iov.iov_len); - set_fs (oldmm); - - if (rc < 0) - return (rc); + CLASSERT(sizeof(struct lnet_magicversion) == + offsetof(struct ksock_hello_msg, kshm_src_nid)); - if (rc == 0) { - CERROR ("Unexpected zero rc\n"); - return (-ECONNABORTED); - } + hmv = (struct lnet_magicversion *)hello; - buffer = ((char *)buffer) + rc; - nob -= rc; + if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) && + hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR)) + return &ksocknal_protocol_v1x; } - - return (0); + + return NULL; } int -ksocknal_sock_read (struct socket *sock, void *buffer, int nob) +ksocknal_send_hello(struct lnet_ni *ni, struct ksock_conn *conn, + lnet_nid_t peer_nid, struct ksock_hello_msg *hello) { - int rc; - mm_segment_t oldmm = get_fs(); - - while (nob > 0) { - struct iovec iov = { - .iov_base = buffer, - .iov_len = nob - }; - struct msghdr msg = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = &iov, - .msg_iovlen = 1, - .msg_control = NULL, - .msg_controllen = 0, - .msg_flags = 0 - }; - - set_fs (KERNEL_DS); - rc = sock_recvmsg (sock, &msg, iov.iov_len, 0); - set_fs (oldmm); - - if (rc < 0) - return (rc); + /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */ + struct ksock_net *net = (struct ksock_net *)ni->ni_data; - if (rc == 0) - return (-ECONNABORTED); + LASSERT(hello->kshm_nips <= LNET_INTERFACES_NUM); - buffer = ((char *)buffer) + rc; - nob -= rc; + /* rely on caller to hold a ref on socket so it wouldn't disappear */ + LASSERT(conn->ksnc_proto != NULL); + + hello->kshm_src_nid = ni->ni_nid; + hello->kshm_dst_nid = peer_nid; + hello->kshm_src_pid = the_lnet.ln_pid; + + hello->kshm_src_incarnation = net->ksnn_incarnation; + hello->kshm_ctype = conn->ksnc_type; + + return conn->ksnc_proto->pro_send_hello(conn, hello); +} + +static int +ksocknal_invert_type(int type) +{ + switch (type) + { + case SOCKLND_CONN_ANY: + case SOCKLND_CONN_CONTROL: + return (type); + case SOCKLND_CONN_BULK_IN: + return SOCKLND_CONN_BULK_OUT; + case SOCKLND_CONN_BULK_OUT: + return SOCKLND_CONN_BULK_IN; + default: + return (SOCKLND_CONN_NONE); } - - return (0); } int -ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, - __u64 *incarnation) +ksocknal_recv_hello(struct lnet_ni *ni, struct ksock_conn *conn, + struct ksock_hello_msg *hello, + struct lnet_process_id *peerid, + __u64 *incarnation) { - int rc; - ptl_hdr_t hdr; - ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid; - char ipbuf[PTL_NALFMT_SIZE]; - char ipbuf2[PTL_NALFMT_SIZE]; - - LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid)); + /* Return < 0 fatal error + * 0 success + * EALREADY lost connection race + * EPROTO protocol version mismatch + */ + struct socket *sock = conn->ksnc_sock; + int active = (conn->ksnc_proto != NULL); + int timeout; + int proto_match; + int rc; + struct ksock_proto *proto; + struct lnet_process_id recv_id; + + /* socket type set on active connections - not set on passive */ + LASSERT(!active == !(conn->ksnc_type != SOCKLND_CONN_NONE)); + + timeout = active ? lnet_get_lnd_timeout() : + lnet_acceptor_timeout(); + + rc = lnet_sock_read(sock, &hello->kshm_magic, + sizeof(hello->kshm_magic), timeout); + if (rc != 0) { + CERROR("Error %d reading HELLO from %pI4h\n", + rc, &conn->ksnc_ipaddr); + LASSERT (rc < 0); + return rc; + } - memset (&hdr, 0, sizeof (hdr)); - hmv->magic = __cpu_to_le32 (PORTALS_PROTO_MAGIC); - hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR); - hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR); + if (hello->kshm_magic != LNET_PROTO_MAGIC && + hello->kshm_magic != __swab32(LNET_PROTO_MAGIC) && + hello->kshm_magic != le32_to_cpu (LNET_PROTO_TCP_MAGIC)) { + /* Unexpected magic! */ + CERROR ("Bad magic(1) %#08x (%#08x expected) from " + "%pI4h\n", __cpu_to_le32 (hello->kshm_magic), + LNET_PROTO_TCP_MAGIC, &conn->ksnc_ipaddr); + return -EPROTO; + } - hdr.src_nid = __cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid); - hdr.type = __cpu_to_le32 (PTL_MSG_HELLO); + rc = lnet_sock_read(sock, &hello->kshm_version, + sizeof(hello->kshm_version), timeout); + if (rc != 0) { + CERROR("Error %d reading HELLO from %pI4h\n", + rc, &conn->ksnc_ipaddr); + LASSERT(rc < 0); + return rc; + } + + proto = ksocknal_parse_proto_version(hello); + if (proto == NULL) { + if (!active) { + /* unknown protocol from peer_ni, tell peer_ni my protocol */ + conn->ksnc_proto = &ksocknal_protocol_v3x; +#if SOCKNAL_VERSION_DEBUG + if (*ksocknal_tunables.ksnd_protocol == 2) + conn->ksnc_proto = &ksocknal_protocol_v2x; + else if (*ksocknal_tunables.ksnd_protocol == 1) + conn->ksnc_proto = &ksocknal_protocol_v1x; +#endif + hello->kshm_nips = 0; + ksocknal_send_hello(ni, conn, ni->ni_nid, hello); + } - hdr.msg.hello.type = __cpu_to_le32 (*type); - hdr.msg.hello.incarnation = - __cpu_to_le64 (ksocknal_data.ksnd_incarnation); + CERROR("Unknown protocol version (%d.x expected) from %pI4h\n", + conn->ksnc_proto->pro_version, &conn->ksnc_ipaddr); - /* Assume sufficient socket buffering for this message */ - rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr)); - if (rc != 0) { - CERROR ("Error %d sending HELLO to "LPX64" %s\n", - rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf)); - return (rc); + return -EPROTO; } - rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv)); + proto_match = (conn->ksnc_proto == proto); + conn->ksnc_proto = proto; + + /* receive the rest of hello message anyway */ + rc = conn->ksnc_proto->pro_recv_hello(conn, hello, timeout); if (rc != 0) { - CERROR ("Error %d reading HELLO from "LPX64" %s\n", - rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf)); - return (rc); + CERROR("Error %d reading or checking hello from from %pI4h\n", + rc, &conn->ksnc_ipaddr); + LASSERT (rc < 0); + return rc; } - if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) { - CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n", - __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid, - portals_nid2str(SOCKNAL, *nid, ipbuf)); - return (-EPROTO); - } + *incarnation = hello->kshm_src_incarnation; - if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) || - hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) { - CERROR ("Incompatible protocol version %d.%d (%d.%d expected)" - " from "LPX64" %s\n", - __le16_to_cpu (hmv->version_major), - __le16_to_cpu (hmv->version_minor), - PORTALS_PROTO_VERSION_MAJOR, - PORTALS_PROTO_VERSION_MINOR, - *nid, portals_nid2str(SOCKNAL, *nid, ipbuf)); - return (-EPROTO); + if (hello->kshm_src_nid == LNET_NID_ANY) { + CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY" + "from %pI4h\n", &conn->ksnc_ipaddr); + return -EPROTO; } -#if (PORTALS_PROTO_VERSION_MAJOR != 0) -# error "This code only understands protocol version 0.x" -#endif - /* version 0 sends magic/version as the dest_nid of a 'hello' header, - * so read the rest of it in now... */ + if (!active && + conn->ksnc_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) { + /* Userspace NAL assigns peer_ni process ID from socket */ + recv_id.pid = conn->ksnc_port | LNET_PID_USERFLAG; + recv_id.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), conn->ksnc_ipaddr); + } else { + recv_id.nid = hello->kshm_src_nid; + recv_id.pid = hello->kshm_src_pid; + } + + if (!active) { + *peerid = recv_id; + + /* peer_ni determines type */ + conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype); + if (conn->ksnc_type == SOCKLND_CONN_NONE) { + CERROR("Unexpected type %d from %s ip %pI4h\n", + hello->kshm_ctype, libcfs_id2str(*peerid), + &conn->ksnc_ipaddr); + return -EPROTO; + } + return 0; + } + + if (peerid->pid != recv_id.pid || + peerid->nid != recv_id.nid) { + LCONSOLE_ERROR_MSG(0x130, "Connected successfully to %s on host" + " %pI4h, but they claimed they were " + "%s; please check your Lustre " + "configuration.\n", + libcfs_id2str(*peerid), + &conn->ksnc_ipaddr, + libcfs_id2str(recv_id)); + return -EPROTO; + } + + if (hello->kshm_ctype == SOCKLND_CONN_NONE) { + /* Possible protocol mismatch or I lost the connection race */ + return proto_match ? EALREADY : EPROTO; + } + + if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) { + CERROR("Mismatched types: me %d, %s ip %pI4h %d\n", + conn->ksnc_type, libcfs_id2str(*peerid), + &conn->ksnc_ipaddr, + hello->kshm_ctype); + return -EPROTO; + } + return 0; +} - rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv)); - if (rc != 0) { - CERROR ("Error %d reading rest of HELLO hdr from "LPX64" %s\n", - rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf)); - return (rc); - } +static int +ksocknal_connect(struct ksock_route *route) +{ + struct list_head zombies = LIST_HEAD_INIT(zombies); + struct ksock_peer_ni *peer_ni = route->ksnr_peer; + int type; + int wanted; + struct socket *sock; + time64_t deadline; + int retry_later = 0; + int rc = 0; - /* ...and check we got what we expected */ - if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) || - hdr.payload_length != __cpu_to_le32 (0)) { - CERROR ("Expecting a HELLO hdr with 0 payload," - " but got type %d with %d payload from "LPX64" %s\n", - __le32_to_cpu (hdr.type), - __le32_to_cpu (hdr.payload_length), *nid, - portals_nid2str(SOCKNAL, *nid, ipbuf)); - return (-EPROTO); - } + deadline = ktime_get_seconds() + lnet_get_lnd_timeout(); - if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) { - CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n"); - return (-EPROTO); - } + write_lock_bh(&ksocknal_data.ksnd_global_lock); - if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */ - *nid = __le64_to_cpu(hdr.src_nid); - } else if (*nid != __le64_to_cpu (hdr.src_nid)) { - CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n", - __le64_to_cpu (hdr.src_nid), - portals_nid2str(SOCKNAL, - __le64_to_cpu(hdr.src_nid), - ipbuf), - *nid, portals_nid2str(SOCKNAL, *nid, ipbuf2)); - return (-EPROTO); - } + LASSERT (route->ksnr_scheduled); + LASSERT (!route->ksnr_connecting); - if (*type == SOCKNAL_CONN_NONE) { - /* I've accepted this connection; peer determines type */ - *type = __le32_to_cpu(hdr.msg.hello.type); - switch (*type) { - case SOCKNAL_CONN_ANY: - case SOCKNAL_CONN_CONTROL: - break; - case SOCKNAL_CONN_BULK_IN: - *type = SOCKNAL_CONN_BULK_OUT; - break; - case SOCKNAL_CONN_BULK_OUT: - *type = SOCKNAL_CONN_BULK_IN; + route->ksnr_connecting = 1; + + for (;;) { + wanted = ksocknal_route_mask() & ~route->ksnr_connected; + + /* stop connecting if peer_ni/route got closed under me, or + * route got connected while queued */ + if (peer_ni->ksnp_closing || route->ksnr_deleted || + wanted == 0) { + retry_later = 0; break; - default: - CERROR ("Unexpected type %d from "LPX64" %s\n", - *type, *nid, - portals_nid2str(SOCKNAL, *nid, ipbuf)); - return (-EPROTO); } - } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) { - CERROR ("Mismatched types: me %d "LPX64" %s %d\n", - *type, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf), - __le32_to_cpu(hdr.msg.hello.type)); - return (-EPROTO); - } - *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation); + /* reschedule if peer_ni is connecting to me */ + if (peer_ni->ksnp_accepting > 0) { + CDEBUG(D_NET, + "peer_ni %s(%d) already connecting to me, retry later.\n", + libcfs_nid2str(peer_ni->ksnp_id.nid), peer_ni->ksnp_accepting); + retry_later = 1; + } - return (0); -} + if (retry_later) /* needs reschedule */ + break; -int -ksocknal_setup_sock (struct socket *sock) -{ - mm_segment_t oldmm = get_fs (); - int rc; - int option; - struct linger linger; - -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) - sock->sk->sk_allocation = GFP_NOFS; -#else - sock->sk->allocation = GFP_NOFS; -#endif + if ((wanted & (1 << SOCKLND_CONN_ANY)) != 0) { + type = SOCKLND_CONN_ANY; + } else if ((wanted & (1 << SOCKLND_CONN_CONTROL)) != 0) { + type = SOCKLND_CONN_CONTROL; + } else if ((wanted & (1 << SOCKLND_CONN_BULK_IN)) != 0) { + type = SOCKLND_CONN_BULK_IN; + } else { + LASSERT ((wanted & (1 << SOCKLND_CONN_BULK_OUT)) != 0); + type = SOCKLND_CONN_BULK_OUT; + } - /* Ensure this socket aborts active sends immediately when we close - * it. */ + write_unlock_bh(&ksocknal_data.ksnd_global_lock); - linger.l_onoff = 0; - linger.l_linger = 0; + if (ktime_get_seconds() >= deadline) { + rc = -ETIMEDOUT; + lnet_connect_console_error(rc, peer_ni->ksnp_id.nid, + route->ksnr_ipaddr, + route->ksnr_port); + goto failed; + } - set_fs (KERNEL_DS); - rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER, - (char *)&linger, sizeof (linger)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set SO_LINGER: %d\n", rc); - return (rc); - } + rc = lnet_connect(&sock, peer_ni->ksnp_id.nid, + route->ksnr_myipaddr, + route->ksnr_ipaddr, route->ksnr_port); + if (rc != 0) + goto failed; + + rc = ksocknal_create_conn(peer_ni->ksnp_ni, route, sock, type); + if (rc < 0) { + lnet_connect_console_error(rc, peer_ni->ksnp_id.nid, + route->ksnr_ipaddr, + route->ksnr_port); + goto failed; + } - option = -1; - set_fs (KERNEL_DS); - rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2, - (char *)&option, sizeof (option)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set SO_LINGER2: %d\n", rc); - return (rc); + /* A +ve RC means I have to retry because I lost the connection + * race or I have to renegotiate protocol version */ + retry_later = (rc != 0); + if (retry_later) + CDEBUG(D_NET, "peer_ni %s: conn race, retry later.\n", + libcfs_nid2str(peer_ni->ksnp_id.nid)); + + write_lock_bh(&ksocknal_data.ksnd_global_lock); } -#if SOCKNAL_USE_KEEPALIVES - /* Keepalives: If 3/4 of the timeout elapses, start probing every - * second until the timeout elapses. */ + route->ksnr_scheduled = 0; + route->ksnr_connecting = 0; - option = (ksocknal_tunables.ksnd_io_timeout * 3) / 4; - set_fs (KERNEL_DS); - rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE, - (char *)&option, sizeof (option)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc); - return (rc); - } - - option = 1; - set_fs (KERNEL_DS); - rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL, - (char *)&option, sizeof (option)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc); - return (rc); - } - - option = ksocknal_tunables.ksnd_io_timeout / 4; - set_fs (KERNEL_DS); - rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT, - (char *)&option, sizeof (option)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc); - return (rc); - } + if (retry_later) { + /* re-queue for attention; this frees me up to handle + * the peer_ni's incoming connection request */ + + if (rc == EALREADY || + (rc == 0 && peer_ni->ksnp_accepting > 0)) { + /* We want to introduce a delay before next + * attempt to connect if we lost conn race, + * but the race is resolved quickly usually, + * so min_reconnectms should be good heuristic */ + route->ksnr_retry_interval = *ksocknal_tunables.ksnd_min_reconnectms / 1000; + route->ksnr_timeout = ktime_get_seconds() + + route->ksnr_retry_interval; + } - option = 1; - set_fs (KERNEL_DS); - rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, - (char *)&option, sizeof (option)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set SO_KEEPALIVE: %d\n", rc); - return (rc); + ksocknal_launch_connection_locked(route); } -#endif - return (0); -} -int -ksocknal_connect_peer (ksock_route_t *route, int type) -{ - struct sockaddr_in peer_addr; - mm_segment_t oldmm = get_fs(); - struct timeval tv; - int fd; - struct socket *sock; - int rc; - char ipbuf[PTL_NALFMT_SIZE]; - - rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock); - if (rc != 0) { - CERROR ("Can't create autoconnect socket: %d\n", rc); - return (rc); - } + write_unlock_bh(&ksocknal_data.ksnd_global_lock); + return retry_later; - /* Ugh; have to map_fd for compatibility with sockets passed in - * from userspace. And we actually need the sock->file refcounting - * that this gives you :) */ + failed: + write_lock_bh(&ksocknal_data.ksnd_global_lock); - fd = sock_map_fd (sock); - if (fd < 0) { - sock_release (sock); - CERROR ("sock_map_fd error %d\n", fd); - return (fd); + route->ksnr_scheduled = 0; + route->ksnr_connecting = 0; + + /* This is a retry rather than a new connection */ + route->ksnr_retry_interval *= 2; + route->ksnr_retry_interval = + max_t(time64_t, route->ksnr_retry_interval, + *ksocknal_tunables.ksnd_min_reconnectms / 1000); + route->ksnr_retry_interval = + min_t(time64_t, route->ksnr_retry_interval, + *ksocknal_tunables.ksnd_max_reconnectms / 1000); + + LASSERT(route->ksnr_retry_interval); + route->ksnr_timeout = ktime_get_seconds() + route->ksnr_retry_interval; + + if (!list_empty(&peer_ni->ksnp_tx_queue) && + peer_ni->ksnp_accepting == 0 && + ksocknal_find_connecting_route_locked(peer_ni) == NULL) { + struct ksock_conn *conn; + + /* ksnp_tx_queue is queued on a conn on successful + * connection for V1.x and V2.x */ + if (!list_empty(&peer_ni->ksnp_conns)) { + conn = list_entry(peer_ni->ksnp_conns.next, + struct ksock_conn, ksnc_list); + LASSERT (conn->ksnc_proto == &ksocknal_protocol_v3x); + } + + /* take all the blocked packets while I've got the lock and + * complete below... */ + list_splice_init(&peer_ni->ksnp_tx_queue, &zombies); } - /* NB the fd now owns the ref on sock->file */ - LASSERT (sock->file != NULL); - LASSERT (file_count(sock->file) == 1); + write_unlock_bh(&ksocknal_data.ksnd_global_lock); - /* Set the socket timeouts, so our connection attempt completes in - * finite time */ - tv.tv_sec = ksocknal_tunables.ksnd_io_timeout; - tv.tv_usec = 0; + ksocknal_peer_failed(peer_ni); + ksocknal_txlist_done(peer_ni->ksnp_ni, &zombies, rc); + return 0; +} - set_fs (KERNEL_DS); - rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO, - (char *)&tv, sizeof (tv)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set send timeout %d: %d\n", - ksocknal_tunables.ksnd_io_timeout, rc); - goto out; - } - - set_fs (KERNEL_DS); - rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, - (char *)&tv, sizeof (tv)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set receive timeout %d: %d\n", - ksocknal_tunables.ksnd_io_timeout, rc); - goto out; +/* + * check whether we need to create more connds. + * It will try to create new thread if it's necessary, @timeout can + * be updated if failed to create, so caller wouldn't keep try while + * running out of resource. + */ +static int +ksocknal_connd_check_start(time64_t sec, long *timeout) +{ + char name[16]; + int rc; + int total = ksocknal_data.ksnd_connd_starting + + ksocknal_data.ksnd_connd_running; + + if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) { + /* still in initializing */ + return 0; } - { - int option = 1; - - set_fs (KERNEL_DS); - rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY, - (char *)&option, sizeof (option)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't disable nagle: %d\n", rc); - goto out; - } + if (total >= *ksocknal_tunables.ksnd_nconnds_max || + total > ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV) { + /* can't create more connd, or still have enough + * threads to handle more connecting */ + return 0; } - - if (route->ksnr_buffer_size != 0) { - int option = route->ksnr_buffer_size; - - set_fs (KERNEL_DS); - rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, - (char *)&option, sizeof (option)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set send buffer %d: %d\n", - route->ksnr_buffer_size, rc); - goto out; - } - set_fs (KERNEL_DS); - rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, - (char *)&option, sizeof (option)); - set_fs (oldmm); - if (rc != 0) { - CERROR ("Can't set receive buffer %d: %d\n", - route->ksnr_buffer_size, rc); - goto out; - } + if (list_empty(&ksocknal_data.ksnd_connd_routes)) { + /* no pending connecting request */ + return 0; } - - memset (&peer_addr, 0, sizeof (peer_addr)); - peer_addr.sin_family = AF_INET; - peer_addr.sin_port = htons (route->ksnr_port); - peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr); - - rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, - sizeof (peer_addr), sock->file->f_flags); - if (rc != 0) { - CERROR ("Error %d connecting to "LPX64" %s\n", rc, - route->ksnr_peer->ksnp_nid, - portals_nid2str(SOCKNAL, - route->ksnr_peer->ksnp_nid, - ipbuf)); - goto out; + + if (sec - ksocknal_data.ksnd_connd_failed_stamp <= 1) { + /* may run out of resource, retry later */ + *timeout = cfs_time_seconds(1); + return 0; } - - rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type); - if (rc == 0) { - /* Take an extra ref on sock->file to compensate for the - * upcoming close which will lose fd's ref on it. */ - get_file (sock->file); + + if (ksocknal_data.ksnd_connd_starting > 0) { + /* serialize starting to avoid flood */ + return 0; } - out: - sys_close (fd); - return (rc); + ksocknal_data.ksnd_connd_starting_stamp = sec; + ksocknal_data.ksnd_connd_starting++; + spin_unlock_bh(&ksocknal_data.ksnd_connd_lock); + + /* NB: total is the next id */ + snprintf(name, sizeof(name), "socknal_cd%02d", total); + rc = ksocknal_thread_start(ksocknal_connd, NULL, name); + + spin_lock_bh(&ksocknal_data.ksnd_connd_lock); + if (rc == 0) + return 1; + + /* we tried ... */ + LASSERT(ksocknal_data.ksnd_connd_starting > 0); + ksocknal_data.ksnd_connd_starting--; + ksocknal_data.ksnd_connd_failed_stamp = ktime_get_real_seconds(); + + return 1; } -void -ksocknal_autoconnect (ksock_route_t *route) +/* + * check whether current thread can exit, it will return 1 if there are too + * many threads and no creating in past 120 seconds. + * Also, this function may update @timeout to make caller come back + * again to recheck these conditions. + */ +static int +ksocknal_connd_check_stop(time64_t sec, long *timeout) { - LIST_HEAD (zombies); - ksock_tx_t *tx; - ksock_peer_t *peer; - unsigned long flags; - int rc; - int type; - - for (;;) { - for (type = 0; type < SOCKNAL_CONN_NTYPES; type++) - if ((route->ksnr_connecting & (1 << type)) != 0) - break; - LASSERT (type < SOCKNAL_CONN_NTYPES); + int val; - rc = ksocknal_connect_peer (route, type); + if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) { + /* still in initializing */ + return 0; + } - if (rc != 0) - break; - - /* successfully autoconnected: create_conn did the - * route/conn binding and scheduled any blocked packets */ + if (ksocknal_data.ksnd_connd_starting > 0) { + /* in progress of starting new thread */ + return 0; + } - if (route->ksnr_connecting == 0) { - /* No more connections required */ - return; - } + if (ksocknal_data.ksnd_connd_running <= + *ksocknal_tunables.ksnd_nconnds) { /* can't shrink */ + return 0; } - /* Connection attempt failed */ + /* created thread in past 120 seconds? */ + val = (int)(ksocknal_data.ksnd_connd_starting_stamp + + SOCKNAL_CONND_TIMEOUT - sec); - write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags); + *timeout = (val > 0) ? cfs_time_seconds(val) : + cfs_time_seconds(SOCKNAL_CONND_TIMEOUT); + if (val > 0) + return 0; - peer = route->ksnr_peer; - route->ksnr_connecting = 0; + /* no creating in past 120 seconds */ - /* This is a retry rather than a new connection */ - LASSERT (route->ksnr_retry_interval != 0); - route->ksnr_timeout = jiffies + route->ksnr_retry_interval; - route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2, - SOCKNAL_MAX_RECONNECT_INTERVAL); - - if (!list_empty (&peer->ksnp_tx_queue) && - ksocknal_find_connecting_route_locked (peer) == NULL) { - LASSERT (list_empty (&peer->ksnp_conns)); - - /* None of the connections that the blocked packets are - * waiting for have been successful. Complete them now... */ - do { - tx = list_entry (peer->ksnp_tx_queue.next, - ksock_tx_t, tx_list); - list_del (&tx->tx_list); - list_add_tail (&tx->tx_list, &zombies); - } while (!list_empty (&peer->ksnp_tx_queue)); - } + return ksocknal_data.ksnd_connd_running > + ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV; +} - /* make this route least-favourite for re-selection */ - if (!route->ksnr_deleted) { - list_del(&route->ksnr_list); - list_add_tail(&route->ksnr_list, &peer->ksnp_routes); - } - - write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags); - - while (!list_empty (&zombies)) { - char ipbuf[PTL_NALFMT_SIZE]; - char ipbuf2[PTL_NALFMT_SIZE]; - tx = list_entry (zombies.next, ksock_tx_t, tx_list); - - CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n", - NTOH__u32 (tx->tx_hdr->type), - NTOH__u32 (tx->tx_hdr->payload_length), - NTOH__u64 (tx->tx_hdr->src_nid), - portals_nid2str(SOCKNAL, - NTOH__u64(tx->tx_hdr->src_nid), - ipbuf), - NTOH__u64 (tx->tx_hdr->dest_nid), - portals_nid2str(SOCKNAL, - NTOH__u64(tx->tx_hdr->src_nid), - ipbuf2)); - - list_del (&tx->tx_list); - /* complete now */ - ksocknal_tx_done (tx, 0); - } +/* Go through connd_routes queue looking for a route that we can process + * right now, @timeout_p can be updated if we need to come back later */ +static struct ksock_route * +ksocknal_connd_get_route_locked(signed long *timeout_p) +{ + time64_t now = ktime_get_seconds(); + struct ksock_route *route; + + /* connd_routes can contain both pending and ordinary routes */ + list_for_each_entry(route, &ksocknal_data.ksnd_connd_routes, + ksnr_connd_list) { + + if (route->ksnr_retry_interval == 0 || + now >= route->ksnr_timeout) + return route; + + if (*timeout_p == MAX_SCHEDULE_TIMEOUT || + *timeout_p > cfs_time_seconds(route->ksnr_timeout - now)) + *timeout_p = cfs_time_seconds(route->ksnr_timeout - now); + } + + return NULL; } int -ksocknal_autoconnectd (void *arg) +ksocknal_connd(void *arg) { - long id = (long)arg; - char name[16]; - unsigned long flags; - ksock_route_t *route; - int rc; + spinlock_t *connd_lock = &ksocknal_data.ksnd_connd_lock; + struct ksock_connreq *cr; + wait_queue_entry_t wait; + int nloops = 0; + int cons_retry = 0; - snprintf (name, sizeof (name), "ksocknal_ad%02ld", id); - kportal_daemonize (name); - kportal_blockallsigs (); + cfs_block_allsigs(); - spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags); + init_waitqueue_entry(&wait, current); - while (!ksocknal_data.ksnd_shuttingdown) { + spin_lock_bh(connd_lock); - if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) { - route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next, - ksock_route_t, ksnr_connect_list); - - list_del (&route->ksnr_connect_list); - spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags); + LASSERT(ksocknal_data.ksnd_connd_starting > 0); + ksocknal_data.ksnd_connd_starting--; + ksocknal_data.ksnd_connd_running++; - ksocknal_autoconnect (route); - ksocknal_put_route (route); + while (!ksocknal_data.ksnd_shuttingdown) { + struct ksock_route *route = NULL; + time64_t sec = ktime_get_real_seconds(); + long timeout = MAX_SCHEDULE_TIMEOUT; + int dropped_lock = 0; - spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags); - continue; + if (ksocknal_connd_check_stop(sec, &timeout)) { + /* wakeup another one to check stop */ + wake_up(&ksocknal_data.ksnd_connd_waitq); + break; + } + + if (ksocknal_connd_check_start(sec, &timeout)) { + /* created new thread */ + dropped_lock = 1; } - - spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags); - rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq, - ksocknal_data.ksnd_shuttingdown || - !list_empty (&ksocknal_data.ksnd_autoconnectd_routes)); + if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) { + /* Connection accepted by the listener */ + cr = list_entry(ksocknal_data.ksnd_connd_connreqs.next, + struct ksock_connreq, ksncr_list); - spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags); - } + list_del(&cr->ksncr_list); + spin_unlock_bh(connd_lock); + dropped_lock = 1; - spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags); + ksocknal_create_conn(cr->ksncr_ni, NULL, + cr->ksncr_sock, SOCKLND_CONN_NONE); + lnet_ni_decref(cr->ksncr_ni); + LIBCFS_FREE(cr, sizeof(*cr)); - ksocknal_thread_fini (); - return (0); + spin_lock_bh(connd_lock); + } + + /* Only handle an outgoing connection request if there + * is a thread left to handle incoming connections and + * create new connd */ + if (ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV < + ksocknal_data.ksnd_connd_running) { + route = ksocknal_connd_get_route_locked(&timeout); + } + if (route != NULL) { + list_del(&route->ksnr_connd_list); + ksocknal_data.ksnd_connd_connecting++; + spin_unlock_bh(connd_lock); + dropped_lock = 1; + + if (ksocknal_connect(route)) { + /* consecutive retry */ + if (cons_retry++ > SOCKNAL_INSANITY_RECONN) { + CWARN("massive consecutive " + "re-connecting to %pI4h\n", + &route->ksnr_ipaddr); + cons_retry = 0; + } + } else { + cons_retry = 0; + } + + ksocknal_route_decref(route); + + spin_lock_bh(connd_lock); + ksocknal_data.ksnd_connd_connecting--; + } + + if (dropped_lock) { + if (++nloops < SOCKNAL_RESCHED) + continue; + spin_unlock_bh(connd_lock); + nloops = 0; + cond_resched(); + spin_lock_bh(connd_lock); + continue; + } + + /* Nothing to do for 'timeout' */ + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue_exclusive(&ksocknal_data.ksnd_connd_waitq, &wait); + spin_unlock_bh(connd_lock); + + nloops = 0; + schedule_timeout(timeout); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&ksocknal_data.ksnd_connd_waitq, &wait); + spin_lock_bh(connd_lock); + } + ksocknal_data.ksnd_connd_running--; + spin_unlock_bh(connd_lock); + + ksocknal_thread_fini(); + return 0; } -ksock_conn_t * -ksocknal_find_timed_out_conn (ksock_peer_t *peer) +static struct ksock_conn * +ksocknal_find_timed_out_conn(struct ksock_peer_ni *peer_ni) { /* We're called with a shared lock on ksnd_global_lock */ - ksock_conn_t *conn; - struct list_head *ctmp; - ksock_sched_t *sched; + struct ksock_conn *conn; + struct list_head *ctmp; + struct ksock_tx *tx; - list_for_each (ctmp, &peer->ksnp_conns) { - conn = list_entry (ctmp, ksock_conn_t, ksnc_list); - sched = conn->ksnc_scheduler; + list_for_each(ctmp, &peer_ni->ksnp_conns) { + int error; - /* Don't need the {get,put}connsock dance to deref ksnc_sock... */ + conn = list_entry(ctmp, struct ksock_conn, ksnc_list); + + /* Don't need the {get,put}connsock dance to deref ksnc_sock */ LASSERT (!conn->ksnc_closing); - + + error = conn->ksnc_sock->sk->sk_err; + if (error != 0) { + ksocknal_conn_addref(conn); + + switch (error) { + case ECONNRESET: + CNETERR("A connection with %s " + "(%pI4h:%d) was reset; " + "it may have rebooted.\n", + libcfs_id2str(peer_ni->ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); + break; + case ETIMEDOUT: + CNETERR("A connection with %s " + "(%pI4h:%d) timed out; the " + "network or node may be down.\n", + libcfs_id2str(peer_ni->ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); + break; + default: + CNETERR("An unexpected network error %d " + "occurred with %s " + "(%pI4h:%d\n", error, + libcfs_id2str(peer_ni->ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); + break; + } + + return (conn); + } + if (conn->ksnc_rx_started && - time_after_eq (jiffies, conn->ksnc_rx_deadline)) { + ktime_get_seconds() >= conn->ksnc_rx_deadline) { /* Timed out incomplete incoming message */ - atomic_inc (&conn->ksnc_refcount); - CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n", - peer->ksnp_nid, conn, HIPQUAD(conn->ksnc_ipaddr)); + ksocknal_conn_addref(conn); + CNETERR("Timeout receiving from %s (%pI4h:%d), " + "state %d wanted %d left %d\n", + libcfs_id2str(peer_ni->ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port, + conn->ksnc_rx_state, + conn->ksnc_rx_nob_wanted, + conn->ksnc_rx_nob_left); return (conn); } - - if ((!list_empty (&conn->ksnc_tx_queue) || - conn->ksnc_sock->sk->sk_wmem_queued != 0) && - time_after_eq (jiffies, conn->ksnc_tx_deadline)) { - /* Timed out messages queued for sending, or - * messages buffered in the socket's send buffer */ - atomic_inc (&conn->ksnc_refcount); - CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n", - peer->ksnp_nid, - list_empty (&conn->ksnc_tx_queue) ? "" : "Q ", - conn->ksnc_sock->sk->sk_wmem_queued, conn, - HIPQUAD(conn->ksnc_ipaddr)); + + if ((!list_empty(&conn->ksnc_tx_queue) || + conn->ksnc_sock->sk->sk_wmem_queued != 0) && + ktime_get_seconds() >= conn->ksnc_tx_deadline) { + /* Timed out messages queued for sending or + * buffered in the socket's send buffer */ + ksocknal_conn_addref(conn); + list_for_each_entry(tx, &conn->ksnc_tx_queue, + tx_list) + tx->tx_hstatus = + LNET_MSG_STATUS_LOCAL_TIMEOUT; + CNETERR("Timeout sending data to %s (%pI4h:%d) " + "the network or that node may be down.\n", + libcfs_id2str(peer_ni->ksnp_id), + &conn->ksnc_ipaddr, conn->ksnc_port); return (conn); } } @@ -2448,148 +2366,298 @@ ksocknal_find_timed_out_conn (ksock_peer_t *peer) return (NULL); } -void -ksocknal_check_peer_timeouts (int idx) +static inline void +ksocknal_flush_stale_txs(struct ksock_peer_ni *peer_ni) +{ + struct ksock_tx *tx; + struct list_head stale_txs = LIST_HEAD_INIT(stale_txs); + + write_lock_bh(&ksocknal_data.ksnd_global_lock); + + while (!list_empty(&peer_ni->ksnp_tx_queue)) { + tx = list_entry(peer_ni->ksnp_tx_queue.next, + struct ksock_tx, tx_list); + + if (ktime_get_seconds() < tx->tx_deadline) + break; + + tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_TIMEOUT; + + list_del(&tx->tx_list); + list_add_tail(&tx->tx_list, &stale_txs); + } + + write_unlock_bh(&ksocknal_data.ksnd_global_lock); + + ksocknal_txlist_done(peer_ni->ksnp_ni, &stale_txs, -ETIMEDOUT); +} + +static int +ksocknal_send_keepalive_locked(struct ksock_peer_ni *peer_ni) +__must_hold(&ksocknal_data.ksnd_global_lock) { - struct list_head *peers = &ksocknal_data.ksnd_peers[idx]; - struct list_head *ptmp; - ksock_peer_t *peer; - ksock_conn_t *conn; + struct ksock_sched *sched; + struct ksock_conn *conn; + struct ksock_tx *tx; + + /* last_alive will be updated by create_conn */ + if (list_empty(&peer_ni->ksnp_conns)) + return 0; + + if (peer_ni->ksnp_proto != &ksocknal_protocol_v3x) + return 0; + + if (*ksocknal_tunables.ksnd_keepalive <= 0 || + ktime_get_seconds() < peer_ni->ksnp_last_alive + + *ksocknal_tunables.ksnd_keepalive) + return 0; + + if (ktime_get_seconds() < peer_ni->ksnp_send_keepalive) + return 0; + + /* retry 10 secs later, so we wouldn't put pressure + * on this peer_ni if we failed to send keepalive this time */ + peer_ni->ksnp_send_keepalive = ktime_get_seconds() + 10; + + conn = ksocknal_find_conn_locked(peer_ni, NULL, 1); + if (conn != NULL) { + sched = conn->ksnc_scheduler; + + spin_lock_bh(&sched->kss_lock); + if (!list_empty(&conn->ksnc_tx_queue)) { + spin_unlock_bh(&sched->kss_lock); + /* there is an queued ACK, don't need keepalive */ + return 0; + } + + spin_unlock_bh(&sched->kss_lock); + } + + read_unlock(&ksocknal_data.ksnd_global_lock); + + /* cookie = 1 is reserved for keepalive PING */ + tx = ksocknal_alloc_tx_noop(1, 1); + if (tx == NULL) { + read_lock(&ksocknal_data.ksnd_global_lock); + return -ENOMEM; + } + + if (ksocknal_launch_packet(peer_ni->ksnp_ni, tx, peer_ni->ksnp_id) == 0) { + read_lock(&ksocknal_data.ksnd_global_lock); + return 1; + } + + ksocknal_free_tx(tx); + read_lock(&ksocknal_data.ksnd_global_lock); + + return -EIO; +} + + +static void +ksocknal_check_peer_timeouts(int idx) +{ + struct list_head *peers = &ksocknal_data.ksnd_peers[idx]; + struct ksock_peer_ni *peer_ni; + struct ksock_conn *conn; + struct ksock_tx *tx; again: /* NB. We expect to have a look at all the peers and not find any * connections to time out, so we just use a shared lock while we * take a look... */ - read_lock (&ksocknal_data.ksnd_global_lock); + read_lock(&ksocknal_data.ksnd_global_lock); + + list_for_each_entry(peer_ni, peers, ksnp_list) { + struct ksock_tx *tx_stale; + time64_t deadline = 0; + int resid = 0; + int n = 0; + + if (ksocknal_send_keepalive_locked(peer_ni) != 0) { + read_unlock(&ksocknal_data.ksnd_global_lock); + goto again; + } + + conn = ksocknal_find_timed_out_conn (peer_ni); - list_for_each (ptmp, peers) { - peer = list_entry (ptmp, ksock_peer_t, ksnp_list); - conn = ksocknal_find_timed_out_conn (peer); - if (conn != NULL) { - read_unlock (&ksocknal_data.ksnd_global_lock); + read_unlock(&ksocknal_data.ksnd_global_lock); - CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n", - peer->ksnp_nid, - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT); - + /* NB we won't find this one again, but we can't - * just proceed with the next peer, since we dropped + * just proceed with the next peer_ni, since we dropped * ksnd_global_lock and it might be dead already! */ - ksocknal_put_conn (conn); + ksocknal_conn_decref(conn); goto again; } + + /* we can't process stale txs right here because we're + * holding only shared lock */ + if (!list_empty(&peer_ni->ksnp_tx_queue)) { + struct ksock_tx *tx; + + tx = list_entry(peer_ni->ksnp_tx_queue.next, + struct ksock_tx, tx_list); + if (ktime_get_seconds() >= tx->tx_deadline) { + ksocknal_peer_addref(peer_ni); + read_unlock(&ksocknal_data.ksnd_global_lock); + + ksocknal_flush_stale_txs(peer_ni); + + ksocknal_peer_decref(peer_ni); + goto again; + } + } + + if (list_empty(&peer_ni->ksnp_zc_req_list)) + continue; + + tx_stale = NULL; + spin_lock(&peer_ni->ksnp_lock); + list_for_each_entry(tx, &peer_ni->ksnp_zc_req_list, tx_zc_list) { + if (ktime_get_seconds() < tx->tx_deadline) + break; + /* ignore the TX if connection is being closed */ + if (tx->tx_conn->ksnc_closing) + continue; + n++; + if (tx_stale == NULL) + tx_stale = tx; + } + + if (tx_stale == NULL) { + spin_unlock(&peer_ni->ksnp_lock); + continue; + } + + deadline = tx_stale->tx_deadline; + resid = tx_stale->tx_resid; + conn = tx_stale->tx_conn; + ksocknal_conn_addref(conn); + + spin_unlock(&peer_ni->ksnp_lock); + read_unlock(&ksocknal_data.ksnd_global_lock); + + CERROR("Total %d stale ZC_REQs for peer_ni %s detected; the " + "oldest(%p) timed out %lld secs ago, " + "resid: %d, wmem: %d\n", + n, libcfs_nid2str(peer_ni->ksnp_id.nid), tx_stale, + ktime_get_seconds() - deadline, + resid, conn->ksnc_sock->sk->sk_wmem_queued); + + ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT); + ksocknal_conn_decref(conn); + goto again; } - read_unlock (&ksocknal_data.ksnd_global_lock); + read_unlock(&ksocknal_data.ksnd_global_lock); } -int -ksocknal_reaper (void *arg) +int ksocknal_reaper(void *arg) { - wait_queue_t wait; - unsigned long flags; - ksock_conn_t *conn; - ksock_sched_t *sched; - struct list_head enomem_conns; - int nenomem_conns; - int timeout; - int i; - int peer_index = 0; - unsigned long deadline = jiffies; - - kportal_daemonize ("ksocknal_reaper"); - kportal_blockallsigs (); - - INIT_LIST_HEAD(&enomem_conns); - init_waitqueue_entry (&wait, current); - - spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags); + wait_queue_entry_t wait; + struct ksock_conn *conn; + struct ksock_sched *sched; + struct list_head enomem_conns; + int nenomem_conns; + time64_t timeout; + int i; + int peer_index = 0; + time64_t deadline = ktime_get_seconds(); + + cfs_block_allsigs (); + + INIT_LIST_HEAD(&enomem_conns); + init_waitqueue_entry(&wait, current); + + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); while (!ksocknal_data.ksnd_shuttingdown) { + if (!list_empty(&ksocknal_data.ksnd_deathrow_conns)) { + conn = list_entry(ksocknal_data.ksnd_deathrow_conns.next, + struct ksock_conn, ksnc_list); + list_del(&conn->ksnc_list); - if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) { - conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next, - ksock_conn_t, ksnc_list); - list_del (&conn->ksnc_list); - - spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags); + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); - ksocknal_terminate_conn (conn); - ksocknal_put_conn (conn); + ksocknal_terminate_conn(conn); + ksocknal_conn_decref(conn); - spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags); + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); continue; } - if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) { - conn = list_entry (ksocknal_data.ksnd_zombie_conns.next, - ksock_conn_t, ksnc_list); - list_del (&conn->ksnc_list); - - spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags); + if (!list_empty(&ksocknal_data.ksnd_zombie_conns)) { + conn = list_entry(ksocknal_data.ksnd_zombie_conns.next, + struct ksock_conn, ksnc_list); + list_del(&conn->ksnc_list); - ksocknal_destroy_conn (conn); + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); - spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags); + ksocknal_destroy_conn(conn); + + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); continue; } - if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) { - list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns); - list_del_init(&ksocknal_data.ksnd_enomem_conns); + if (!list_empty(&ksocknal_data.ksnd_enomem_conns)) { + list_add(&enomem_conns, + &ksocknal_data.ksnd_enomem_conns); + list_del_init(&ksocknal_data.ksnd_enomem_conns); } - spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags); + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); /* reschedule all the connections that stalled with ENOMEM... */ nenomem_conns = 0; - while (!list_empty (&enomem_conns)) { - conn = list_entry (enomem_conns.next, - ksock_conn_t, ksnc_tx_list); - list_del (&conn->ksnc_tx_list); + while (!list_empty(&enomem_conns)) { + conn = list_entry(enomem_conns.next, + struct ksock_conn, ksnc_tx_list); + list_del(&conn->ksnc_tx_list); sched = conn->ksnc_scheduler; - spin_lock_irqsave (&sched->kss_lock, flags); + spin_lock_bh(&sched->kss_lock); - LASSERT (conn->ksnc_tx_scheduled); - conn->ksnc_tx_ready = 1; - list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); - wake_up (&sched->kss_waitq); + LASSERT(conn->ksnc_tx_scheduled); + conn->ksnc_tx_ready = 1; + list_add_tail(&conn->ksnc_tx_list, + &sched->kss_tx_conns); + wake_up(&sched->kss_waitq); - spin_unlock_irqrestore (&sched->kss_lock, flags); + spin_unlock_bh(&sched->kss_lock); nenomem_conns++; } - + /* careful with the jiffy wrap... */ - while ((timeout = (int)(deadline - jiffies)) <= 0) { + while ((timeout = deadline - ktime_get_seconds()) <= 0) { const int n = 4; const int p = 1; int chunk = ksocknal_data.ksnd_peer_hash_size; - + unsigned int lnd_timeout; + /* Time to check for timeouts on a few more peers: I do - * checks every 'p' seconds on a proportion of the peer + * checks every 'p' seconds on a proportion of the peer_ni * table and I need to check every connection 'n' times * within a timeout interval, to ensure I detect a * timeout on any connection within (n+1)/n times the * timeout interval. */ - if (ksocknal_tunables.ksnd_io_timeout > n * p) - chunk = (chunk * n * p) / - ksocknal_tunables.ksnd_io_timeout; - if (chunk == 0) - chunk = 1; + lnd_timeout = lnet_get_lnd_timeout(); + if (lnd_timeout > n * p) + chunk = (chunk * n * p) / lnd_timeout; + if (chunk == 0) + chunk = 1; for (i = 0; i < chunk; i++) { ksocknal_check_peer_timeouts (peer_index); - peer_index = (peer_index + 1) % + peer_index = (peer_index + 1) % ksocknal_data.ksnd_peer_hash_size; } - deadline += p * HZ; + deadline += p; } if (nenomem_conns != 0) { @@ -2598,33 +2666,25 @@ ksocknal_reaper (void *arg) * if any go back on my enomem list. */ timeout = SOCKNAL_ENOMEM_RETRY; } - ksocknal_data.ksnd_reaper_waketime = jiffies + timeout; + ksocknal_data.ksnd_reaper_waketime = ktime_get_seconds() + + timeout; - set_current_state (TASK_INTERRUPTIBLE); - add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait); + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&ksocknal_data.ksnd_reaper_waitq, &wait); - if (!ksocknal_data.ksnd_shuttingdown && - list_empty (&ksocknal_data.ksnd_deathrow_conns) && - list_empty (&ksocknal_data.ksnd_zombie_conns)) - schedule_timeout (timeout); + if (!ksocknal_data.ksnd_shuttingdown && + list_empty(&ksocknal_data.ksnd_deathrow_conns) && + list_empty(&ksocknal_data.ksnd_zombie_conns)) + schedule_timeout(cfs_time_seconds(timeout)); - set_current_state (TASK_RUNNING); - remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait); + set_current_state(TASK_RUNNING); + remove_wait_queue(&ksocknal_data.ksnd_reaper_waitq, &wait); - spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags); - } + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); + } - spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags); + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); - ksocknal_thread_fini (); - return (0); + ksocknal_thread_fini(); + return 0; } - -lib_nal_t ksocknal_lib = { - libnal_data: &ksocknal_data, /* NAL private data */ - libnal_send: ksocknal_send, - libnal_send_pages: ksocknal_send_pages, - libnal_recv: ksocknal_recv, - libnal_recv_pages: ksocknal_recv_pages, - libnal_dist: ksocknal_dist -};