X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fsocklnd%2Fsocklnd_cb.c;h=c2f7c5767cb26adbcd31046357da8f5458b82efa;hp=efec018e23ac6621cd89bc4269f7e7f2b9e9452b;hb=3e753d381565875c685342e28e76476bee75a4ca;hpb=f6cd596982ed4380e5547181022ad81e4c6d3512 diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index efec018..c2f7c57 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -1,14 +1,14 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: +/* + * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2011, 2014, Intel Corporation. * * Author: Zach Brown * Author: Peter J. Braam * Author: Phil Schwan * Author: Eric Barton * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ + * This file is part of Lustre, https://wiki.hpdd.intel.com/ * * Portals is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public @@ -27,22 +27,24 @@ #include "socklnd.h" ksock_tx_t * -ksocknal_alloc_tx (int size) +ksocknal_alloc_tx(int type, int size) { - ksock_tx_t *tx = NULL; + ksock_tx_t *tx = NULL; - if (size == KSOCK_NOOP_TX_SIZE) { - /* searching for a noop tx in free list */ - spin_lock(&ksocknal_data.ksnd_tx_lock); + if (type == KSOCK_MSG_NOOP) { + LASSERT(size == KSOCK_NOOP_TX_SIZE); - if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) { - tx = list_entry(ksocknal_data.ksnd_idle_noop_txs.next, - ksock_tx_t, tx_list); - LASSERT(tx->tx_desc_size == size); - list_del(&tx->tx_list); - } + /* searching for a noop tx in free list */ + spin_lock(&ksocknal_data.ksnd_tx_lock); - spin_unlock(&ksocknal_data.ksnd_tx_lock); + if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) { + tx = list_entry(ksocknal_data.ksnd_idle_noop_txs. \ + next, ksock_tx_t, tx_list); + LASSERT(tx->tx_desc_size == size); + list_del(&tx->tx_list); + } + + spin_unlock(&ksocknal_data.ksnd_tx_lock); } if (tx == NULL) @@ -51,43 +53,66 @@ ksocknal_alloc_tx (int size) if (tx == NULL) return NULL; - atomic_set(&tx->tx_refcount, 1); - tx->tx_desc_size = size; - atomic_inc(&ksocknal_data.ksnd_nactive_txs); + atomic_set(&tx->tx_refcount, 1); + tx->tx_zc_aborted = 0; + tx->tx_zc_capable = 0; + tx->tx_zc_checked = 0; + tx->tx_desc_size = size; - return tx; + atomic_inc(&ksocknal_data.ksnd_nactive_txs); + + return tx; } -void -ksocknal_free_tx (ksock_tx_t *tx) +ksock_tx_t * +ksocknal_alloc_tx_noop(__u64 cookie, int nonblk) { - atomic_dec(&ksocknal_data.ksnd_nactive_txs); + ksock_tx_t *tx; + + tx = ksocknal_alloc_tx(KSOCK_MSG_NOOP, KSOCK_NOOP_TX_SIZE); + if (tx == NULL) { + CERROR("Can't allocate noop tx desc\n"); + return NULL; + } - if (tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) { - /* it's a noop tx */ - spin_lock(&ksocknal_data.ksnd_tx_lock); + tx->tx_conn = NULL; + tx->tx_lnetmsg = NULL; + tx->tx_kiov = NULL; + tx->tx_nkiov = 0; + tx->tx_iov = tx->tx_frags.virt.iov; + tx->tx_niov = 1; + tx->tx_nonblk = nonblk; - list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs); + tx->tx_msg.ksm_csum = 0; + tx->tx_msg.ksm_type = KSOCK_MSG_NOOP; + tx->tx_msg.ksm_zc_cookies[0] = 0; + tx->tx_msg.ksm_zc_cookies[1] = cookie; - spin_unlock(&ksocknal_data.ksnd_tx_lock); - } else { - LIBCFS_FREE(tx, tx->tx_desc_size); - } + return tx; } + void -ksocknal_init_msg(ksock_msg_t *msg, int type) +ksocknal_free_tx (ksock_tx_t *tx) { - msg->ksm_type = type; - msg->ksm_csum = 0; - msg->ksm_zc_req_cookie = 0; - msg->ksm_zc_ack_cookie = 0; + atomic_dec(&ksocknal_data.ksnd_nactive_txs); + + if (tx->tx_lnetmsg == NULL && tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) { + /* it's a noop tx */ + spin_lock(&ksocknal_data.ksnd_tx_lock); + + list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs); + + spin_unlock(&ksocknal_data.ksnd_tx_lock); + } else { + LIBCFS_FREE(tx, tx->tx_desc_size); + } } -int +static int ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) { - struct iovec *iov = tx->tx_iov; + struct kvec *iov = tx->tx_iov; int nob; int rc; @@ -107,8 +132,8 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) do { LASSERT (tx->tx_niov > 0); - if (nob < iov->iov_len) { - iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); + if (nob < (int) iov->iov_len) { + iov->iov_base += nob; iov->iov_len -= nob; return (rc); } @@ -121,7 +146,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) return (rc); } -int +static int ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx) { lnet_kiov_t *kiov = tx->tx_kiov; @@ -145,13 +170,13 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx) do { LASSERT(tx->tx_nkiov > 0); - if (nob < kiov->kiov_len) { + if (nob < (int)kiov->kiov_len) { kiov->kiov_offset += nob; kiov->kiov_len -= nob; return rc; } - nob -= kiov->kiov_len; + nob -= (int)kiov->kiov_len; tx->tx_kiov = ++kiov; tx->tx_nkiov--; } while (nob != 0); @@ -159,17 +184,18 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx) return (rc); } -int -ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx) +static int +ksocknal_transmit(ksock_conn_t *conn, ksock_tx_t *tx) { - int rc; - int bufnob; + int rc; + int bufnob; - if (ksocknal_data.ksnd_stall_tx != 0) { - cfs_pause(cfs_time_seconds(ksocknal_data.ksnd_stall_tx)); - } + if (ksocknal_data.ksnd_stall_tx != 0) { + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_tx)); + } - LASSERT (tx->tx_resid != 0); + LASSERT(tx->tx_resid != 0); rc = ksocknal_connsock_addref(conn); if (rc != 0) { @@ -188,53 +214,34 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx) rc = ksocknal_send_kiov (conn, tx); } - bufnob = SOCK_WMEM_QUEUED(conn->ksnc_sock); + bufnob = conn->ksnc_sock->sk->sk_wmem_queued; if (rc > 0) /* sent something? */ conn->ksnc_tx_bufnob += rc; /* account it */ - if (bufnob < conn->ksnc_tx_bufnob) { - /* allocated send buffer bytes < computed; infer - * something got ACKed */ - conn->ksnc_tx_deadline = - cfs_time_shift(*ksocknal_tunables.ksnd_timeout); - conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); - conn->ksnc_tx_bufnob = bufnob; - mb(); - } + if (bufnob < conn->ksnc_tx_bufnob) { + /* allocated send buffer bytes < computed; infer + * something got ACKed */ + conn->ksnc_tx_deadline = + cfs_time_shift(*ksocknal_tunables.ksnd_timeout); + conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); + conn->ksnc_tx_bufnob = bufnob; + smp_mb(); + } - if (rc <= 0) { /* Didn't write anything? */ - ksock_sched_t *sched; + if (rc <= 0) { /* Didn't write anything? */ if (rc == 0) /* some stacks return 0 instead of -EAGAIN */ rc = -EAGAIN; - if (rc != -EAGAIN) - break; - /* Check if EAGAIN is due to memory pressure */ - - sched = conn->ksnc_scheduler; - spin_lock_bh (&sched->kss_lock); - - if (!SOCK_TEST_NOSPACE(conn->ksnc_sock) && - !conn->ksnc_tx_ready) { - /* SOCK_NOSPACE is set when the socket fills - * and cleared in the write_space callback - * (which also sets ksnc_tx_ready). If - * SOCK_NOSPACE and ksnc_tx_ready are BOTH - * zero, I didn't fill the socket and - * write_space won't reschedule me, so I - * return -ENOMEM to get my caller to retry - * after a timeout */ + if(rc == -EAGAIN && ksocknal_lib_memory_pressure(conn)) rc = -ENOMEM; - } - spin_unlock_bh (&sched->kss_lock); break; } /* socket's wmem_queued now includes 'rc' bytes */ - atomic_sub (rc, &conn->ksnc_tx_nob); + atomic_sub (rc, &conn->ksnc_tx_nob); rc = 0; } while (tx->tx_resid != 0); @@ -243,16 +250,16 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx) return (rc); } -int +static int ksocknal_recv_iov (ksock_conn_t *conn) { - struct iovec *iov = conn->ksnc_rx_iov; + struct kvec *iov = conn->ksnc_rx_iov; int nob; int rc; LASSERT (conn->ksnc_rx_niov > 0); - /* Never touch conn->ksnc_rx_iov or change connection + /* Never touch conn->ksnc_rx_iov or change connection * status inside ksocknal_lib_recv_iov */ rc = ksocknal_lib_recv_iov(conn); @@ -262,21 +269,21 @@ ksocknal_recv_iov (ksock_conn_t *conn) /* received something... */ nob = rc; - conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); - conn->ksnc_rx_deadline = - cfs_time_shift(*ksocknal_tunables.ksnd_timeout); - mb(); /* order with setting rx_started */ - conn->ksnc_rx_started = 1; + conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); + conn->ksnc_rx_deadline = + cfs_time_shift(*ksocknal_tunables.ksnd_timeout); + smp_mb(); /* order with setting rx_started */ + conn->ksnc_rx_started = 1; - conn->ksnc_rx_nob_wanted -= nob; - conn->ksnc_rx_nob_left -= nob; + conn->ksnc_rx_nob_wanted -= nob; + conn->ksnc_rx_nob_left -= nob; do { LASSERT (conn->ksnc_rx_niov > 0); - if (nob < iov->iov_len) { + if (nob < (int)iov->iov_len) { iov->iov_len -= nob; - iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob); + iov->iov_base += nob; return (-EAGAIN); } @@ -288,7 +295,7 @@ ksocknal_recv_iov (ksock_conn_t *conn) return (rc); } -int +static int ksocknal_recv_kiov (ksock_conn_t *conn) { lnet_kiov_t *kiov = conn->ksnc_rx_kiov; @@ -296,7 +303,7 @@ ksocknal_recv_kiov (ksock_conn_t *conn) int rc; LASSERT (conn->ksnc_rx_nkiov > 0); - /* Never touch conn->ksnc_rx_kiov or change connection + /* Never touch conn->ksnc_rx_kiov or change connection * status inside ksocknal_lib_recv_iov */ rc = ksocknal_lib_recv_kiov(conn); @@ -306,19 +313,19 @@ ksocknal_recv_kiov (ksock_conn_t *conn) /* received something... */ nob = rc; - conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); - conn->ksnc_rx_deadline = - cfs_time_shift(*ksocknal_tunables.ksnd_timeout); - mb(); /* order with setting rx_started */ - conn->ksnc_rx_started = 1; + conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); + conn->ksnc_rx_deadline = + cfs_time_shift(*ksocknal_tunables.ksnd_timeout); + smp_mb(); /* order with setting rx_started */ + conn->ksnc_rx_started = 1; - conn->ksnc_rx_nob_wanted -= nob; - conn->ksnc_rx_nob_left -= nob; + conn->ksnc_rx_nob_wanted -= nob; + conn->ksnc_rx_nob_left -= nob; do { LASSERT (conn->ksnc_rx_nkiov > 0); - if (nob < kiov->kiov_len) { + if (nob < (int) kiov->kiov_len) { kiov->kiov_offset += nob; kiov->kiov_len -= nob; return -EAGAIN; @@ -332,7 +339,7 @@ ksocknal_recv_kiov (ksock_conn_t *conn) return 1; } -int +static int ksocknal_receive (ksock_conn_t *conn) { /* Return 1 on success, 0 on EOF, < 0 on error. @@ -341,9 +348,10 @@ ksocknal_receive (ksock_conn_t *conn) int rc; ENTRY; - if (ksocknal_data.ksnd_stall_rx != 0) { - cfs_pause(cfs_time_seconds (ksocknal_data.ksnd_stall_rx)); - } + if (ksocknal_data.ksnd_stall_rx != 0) { + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_rx)); + } rc = ksocknal_connsock_addref(conn); if (rc != 0) { @@ -384,7 +392,7 @@ void ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx) { lnet_msg_t *lnetmsg = tx->tx_lnetmsg; - int rc = (tx->tx_resid == 0) ? 0 : -EIO; + int rc = (tx->tx_resid == 0 && !tx->tx_zc_aborted) ? 0 : -EIO; ENTRY; LASSERT(ni != NULL || tx->tx_conn != NULL); @@ -403,26 +411,26 @@ ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx) } void -ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error) +ksocknal_txlist_done(lnet_ni_t *ni, struct list_head *txlist, int error) { ksock_tx_t *tx; - while (!list_empty (txlist)) { - tx = list_entry (txlist->next, ksock_tx_t, tx_list); + while (!list_empty(txlist)) { + tx = list_entry(txlist->next, ksock_tx_t, tx_list); if (error && tx->tx_lnetmsg != NULL) { - CDEBUG (D_NETERROR, "Deleting packet type %d len %d %s->%s\n", + CNETERR("Deleting packet type %d len %d %s->%s\n", le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type), le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length), libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)), - libcfs_nid2str(le64_to_cpu (tx->tx_lnetmsg->msg_hdr.dest_nid))); + libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.dest_nid))); } else if (error) { - CDEBUG (D_NETERROR, "Deleting noop packet\n"); + CNETERR("Deleting noop packet\n"); } - list_del (&tx->tx_list); + list_del(&tx->tx_list); - LASSERT (atomic_read(&tx->tx_refcount) == 1); + LASSERT (atomic_read(&tx->tx_refcount) == 1); ksocknal_tx_done (ni, tx); } } @@ -432,78 +440,78 @@ ksocknal_check_zc_req(ksock_tx_t *tx) { ksock_conn_t *conn = tx->tx_conn; ksock_peer_t *peer = conn->ksnc_peer; - lnet_kiov_t *kiov = tx->tx_kiov; - int nkiov = tx->tx_nkiov; - /* Set tx_msg.ksm_zc_req_cookie to a unique non-zero cookie and add tx + /* Set tx_msg.ksm_zc_cookies[0] to a unique non-zero cookie and add tx * to ksnp_zc_req_list if some fragment of this message should be sent * zero-copy. Our peer will send an ACK containing this cookie when * she has received this message to tell us we can signal completion. - * tx_msg.ksm_zc_req_cookie remains non-zero while tx is on + * tx_msg.ksm_zc_cookies[0] remains non-zero while tx is on * ksnp_zc_req_list. */ + LASSERT (tx->tx_msg.ksm_type != KSOCK_MSG_NOOP); + LASSERT (tx->tx_zc_capable); - if (conn->ksnc_proto != &ksocknal_protocol_v2x || - !conn->ksnc_zc_capable) - return; - - while (nkiov > 0) { - if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag) - break; - --nkiov; - ++kiov; - } + tx->tx_zc_checked = 1; - if (nkiov == 0) + if (conn->ksnc_proto == &ksocknal_protocol_v1x || + !conn->ksnc_zc_capable) return; /* assign cookie and queue tx to pending list, it will be released when - * a matching ack is received. See ksocknal_handle_zc_ack() */ + * a matching ack is received. See ksocknal_handle_zcack() */ ksocknal_tx_addref(tx); - spin_lock(&peer->ksnp_lock); + spin_lock(&peer->ksnp_lock); /* ZC_REQ is going to be pinned to the peer */ tx->tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); - LASSERT (tx->tx_msg.ksm_zc_req_cookie == 0); - tx->tx_msg.ksm_zc_req_cookie = peer->ksnp_zc_next_cookie++; - list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list); + LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0); + + tx->tx_msg.ksm_zc_cookies[0] = peer->ksnp_zc_next_cookie++; + + if (peer->ksnp_zc_next_cookie == 0) + peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1; - spin_unlock(&peer->ksnp_lock); + list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list); + + spin_unlock(&peer->ksnp_lock); } static void -ksocknal_unzc_req(ksock_tx_t *tx) +ksocknal_uncheck_zc_req(ksock_tx_t *tx) { - ksock_peer_t *peer = tx->tx_conn->ksnc_peer; + ksock_peer_t *peer = tx->tx_conn->ksnc_peer; - spin_lock(&peer->ksnp_lock); + LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP); + LASSERT(tx->tx_zc_capable); - if (tx->tx_msg.ksm_zc_req_cookie == 0) { - /* Not waiting for an ACK */ - spin_unlock(&peer->ksnp_lock); - return; - } + tx->tx_zc_checked = 0; + + spin_lock(&peer->ksnp_lock); - tx->tx_msg.ksm_zc_req_cookie = 0; - list_del(&tx->tx_zc_list); + if (tx->tx_msg.ksm_zc_cookies[0] == 0) { + /* Not waiting for an ACK */ + spin_unlock(&peer->ksnp_lock); + return; + } - spin_unlock(&peer->ksnp_lock); + tx->tx_msg.ksm_zc_cookies[0] = 0; + list_del(&tx->tx_zc_list); - ksocknal_tx_decref(tx); + spin_unlock(&peer->ksnp_lock); + + ksocknal_tx_decref(tx); } -int +static int ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) { int rc; - if (!tx->tx_checked_zc) { - tx->tx_checked_zc = 1; + if (tx->tx_zc_capable && !tx->tx_zc_checked) ksocknal_check_zc_req(tx); - } rc = ksocknal_transmit (conn, tx); @@ -525,23 +533,23 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) counter++; /* exponential backoff warnings */ if ((counter & (-counter)) == counter) CWARN("%u ENOMEM tx %p (%u allocated)\n", - counter, conn, atomic_read(&libcfs_kmemory)); + counter, conn, atomic_read(&libcfs_kmemory)); /* Queue on ksnd_enomem_conns for retry after a timeout */ - spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); /* enomem list takes over scheduler's ref... */ LASSERT (conn->ksnc_tx_scheduled); - list_add_tail(&conn->ksnc_tx_list, - &ksocknal_data.ksnd_enomem_conns); - if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(), - SOCKNAL_ENOMEM_RETRY), - ksocknal_data.ksnd_reaper_waketime)) - cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq); - - spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); - return (rc); - } + list_add_tail(&conn->ksnc_tx_list, + &ksocknal_data.ksnd_enomem_conns); + if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(), + SOCKNAL_ENOMEM_RETRY), + ksocknal_data.ksnd_reaper_waketime)) + wake_up(&ksocknal_data.ksnd_reaper_waitq); + + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); + return (rc); + } /* Actual error */ LASSERT (rc < 0); @@ -549,25 +557,24 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) if (!conn->ksnc_closing) { switch (rc) { case -ECONNRESET: - LCONSOLE_WARN("Host %u.%u.%u.%u reset our connection " + LCONSOLE_WARN("Host %pI4h reset our connection " "while we were sending data; it may have " "rebooted.\n", - HIPQUAD(conn->ksnc_ipaddr)); + &conn->ksnc_ipaddr); break; default: LCONSOLE_WARN("There was an unexpected network error " - "while writing to %u.%u.%u.%u: %d.\n", - HIPQUAD(conn->ksnc_ipaddr), rc); + "while writing to %pI4h: %d.\n", + &conn->ksnc_ipaddr, rc); break; } - CDEBUG(D_NET, "[%p] Error %d on write to %s" - " ip %d.%d.%d.%d:%d\n", conn, rc, - libcfs_id2str(conn->ksnc_peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); + CDEBUG(D_NET, "[%p] Error %d on write to %s ip %pI4h:%d\n", + conn, rc, libcfs_id2str(conn->ksnc_peer->ksnp_id), + &conn->ksnc_ipaddr, conn->ksnc_port); } - ksocknal_unzc_req(tx); + if (tx->tx_zc_checked) + ksocknal_uncheck_zc_req(tx); /* it's not an error if conn is being closed */ ksocknal_close_conn_and_siblings (conn, @@ -576,7 +583,7 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) return (rc); } -void +static void ksocknal_launch_connection_locked (ksock_route_t *route) { @@ -589,142 +596,104 @@ ksocknal_launch_connection_locked (ksock_route_t *route) route->ksnr_scheduled = 1; /* scheduling conn for connd */ ksocknal_route_addref(route); /* extra ref for connd */ - spin_lock_bh (&ksocknal_data.ksnd_connd_lock); + spin_lock_bh(&ksocknal_data.ksnd_connd_lock); + + list_add_tail(&route->ksnr_connd_list, + &ksocknal_data.ksnd_connd_routes); + wake_up(&ksocknal_data.ksnd_connd_waitq); + + spin_unlock_bh(&ksocknal_data.ksnd_connd_lock); +} + +void +ksocknal_launch_all_connections_locked (ksock_peer_t *peer) +{ + ksock_route_t *route; - list_add_tail (&route->ksnr_connd_list, - &ksocknal_data.ksnd_connd_routes); - cfs_waitq_signal (&ksocknal_data.ksnd_connd_waitq); + /* called holding write lock on ksnd_global_lock */ + for (;;) { + /* launch any/all connections that need it */ + route = ksocknal_find_connectable_route_locked(peer); + if (route == NULL) + return; - spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); + ksocknal_launch_connection_locked(route); + } } ksock_conn_t * -ksocknal_find_conn_locked (int payload_nob, ksock_peer_t *peer) +ksocknal_find_conn_locked(ksock_peer_t *peer, ksock_tx_t *tx, int nonblk) { - struct list_head *tmp; + struct list_head *tmp; + ksock_conn_t *conn; ksock_conn_t *typed = NULL; - int tnob = 0; ksock_conn_t *fallback = NULL; + int tnob = 0; int fnob = 0; - ksock_conn_t *conn; - - list_for_each (tmp, &peer->ksnp_conns) { - ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list); - int hdr_nob = 0; -#if SOCKNAL_ROUND_ROBIN - const int nob = 0; -#else - int nob = atomic_read(&c->ksnc_tx_nob) + - SOCK_WMEM_QUEUED(c->ksnc_sock); -#endif - LASSERT (!c->ksnc_closing); - LASSERT (c->ksnc_proto != NULL); - if (fallback == NULL || nob < fnob) { - fallback = c; - fnob = nob; - } + list_for_each(tmp, &peer->ksnp_conns) { + ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list); + int nob = atomic_read(&c->ksnc_tx_nob) + + c->ksnc_sock->sk->sk_wmem_queued; + int rc; - if (!*ksocknal_tunables.ksnd_typed_conns) - continue; + LASSERT (!c->ksnc_closing); + LASSERT (c->ksnc_proto != NULL && + c->ksnc_proto->pro_match_tx != NULL); - if (payload_nob == 0) { - /* noop packet */ - hdr_nob = offsetof(ksock_msg_t, ksm_u); - } else { - /* lnet packet */ - hdr_nob = (c->ksnc_proto == &ksocknal_protocol_v2x)? - offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload): - sizeof(lnet_hdr_t); - } + rc = c->ksnc_proto->pro_match_tx(c, tx, nonblk); - switch (c->ksnc_type) { + switch (rc) { default: - CERROR("ksnc_type bad: %u\n", c->ksnc_type); LBUG(); - case SOCKLND_CONN_ANY: - break; - case SOCKLND_CONN_BULK_IN: + case SOCKNAL_MATCH_NO: /* protocol rejected the tx */ continue; - case SOCKLND_CONN_BULK_OUT: - if ((hdr_nob + payload_nob) < *ksocknal_tunables.ksnd_min_bulk) - continue; - break; - case SOCKLND_CONN_CONTROL: - if ((hdr_nob + payload_nob) >= *ksocknal_tunables.ksnd_min_bulk) - continue; + + case SOCKNAL_MATCH_YES: /* typed connection */ + if (typed == NULL || tnob > nob || + (tnob == nob && *ksocknal_tunables.ksnd_round_robin && + cfs_time_after(typed->ksnc_tx_last_post, c->ksnc_tx_last_post))) { + typed = c; + tnob = nob; + } break; - } - if (typed == NULL || nob < tnob) { - typed = c; - tnob = nob; + case SOCKNAL_MATCH_MAY: /* fallback connection */ + if (fallback == NULL || fnob > nob || + (fnob == nob && *ksocknal_tunables.ksnd_round_robin && + cfs_time_after(fallback->ksnc_tx_last_post, c->ksnc_tx_last_post))) { + fallback = c; + fnob = nob; + } + break; } } /* prefer the typed selection */ conn = (typed != NULL) ? typed : fallback; -#if SOCKNAL_ROUND_ROBIN - if (conn != NULL) { - /* round-robin all else being equal */ - list_del (&conn->ksnc_list); - list_add_tail (&conn->ksnc_list, &peer->ksnp_conns); - } -#endif + if (conn != NULL) + conn->ksnc_tx_last_post = cfs_time_current(); + return conn; } void -ksocknal_next_mono_tx(ksock_conn_t *conn) -{ - ksock_tx_t *tx = conn->ksnc_tx_mono; - - /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */ - LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); - LASSERT(!list_empty(&conn->ksnc_tx_queue)); - LASSERT(tx != NULL); - - if (tx->tx_list.next == &conn->ksnc_tx_queue) { - /* no more packets queued */ - conn->ksnc_tx_mono = NULL; - } else { - conn->ksnc_tx_mono = list_entry(tx->tx_list.next, ksock_tx_t, tx_list); - LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == tx->tx_msg.ksm_type); - } -} - -int -ksocknal_piggyback_zcack(ksock_conn_t *conn, __u64 cookie) +ksocknal_tx_prep(ksock_conn_t *conn, ksock_tx_t *tx) { - ksock_tx_t *tx = conn->ksnc_tx_mono; - - /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */ - - if (tx == NULL) - return 0; - - if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) { - /* tx is noop zc-ack, can't piggyback zc-ack cookie */ - return 0; - } - - LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET); - LASSERT(tx->tx_msg.ksm_zc_ack_cookie == 0); - - /* piggyback the zc-ack cookie */ - tx->tx_msg.ksm_zc_ack_cookie = cookie; - ksocknal_next_mono_tx(conn); + conn->ksnc_proto->pro_pack(tx); - return 1; + atomic_add (tx->tx_nob, &conn->ksnc_tx_nob); + ksocknal_conn_addref(conn); /* +1 ref for tx */ + tx->tx_conn = conn; } void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn) { ksock_sched_t *sched = conn->ksnc_scheduler; - ksock_msg_t *msg = &tx->tx_msg; - ksock_tx_t *ztx; + struct ksock_msg *msg = &tx->tx_msg; + ksock_tx_t *ztx = NULL; int bufnob = 0; /* called holding global lock (read or irq-write) and caller may @@ -733,133 +702,89 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn) * ksnc_sock... */ LASSERT(!conn->ksnc_closing); - CDEBUG (D_NET, "Sending to %s ip %d.%d.%d.%d:%d\n", - libcfs_id2str(conn->ksnc_peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); + CDEBUG(D_NET, "Sending to %s ip %pI4h:%d\n", + libcfs_id2str(conn->ksnc_peer->ksnp_id), + &conn->ksnc_ipaddr, conn->ksnc_port); - tx->tx_checked_zc = 0; - conn->ksnc_proto->pro_pack(tx); + ksocknal_tx_prep(conn, tx); /* Ensure the frags we've been given EXACTLY match the number of * bytes we want to send. Many TCP/IP stacks disregard any total - * size parameters passed to them and just look at the frags. + * size parameters passed to them and just look at the frags. * * We always expect at least 1 mapped fragment containing the * complete ksocknal message header. */ LASSERT (lnet_iov_nob (tx->tx_niov, tx->tx_iov) + - lnet_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob); + lnet_kiov_nob(tx->tx_nkiov, tx->tx_kiov) == + (unsigned int)tx->tx_nob); LASSERT (tx->tx_niov >= 1); LASSERT (tx->tx_resid == tx->tx_nob); CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n", - tx, (tx->tx_lnetmsg != NULL)? tx->tx_lnetmsg->msg_hdr.type: - KSOCK_MSG_NOOP, + tx, (tx->tx_lnetmsg != NULL) ? tx->tx_lnetmsg->msg_hdr.type: + KSOCK_MSG_NOOP, tx->tx_nob, tx->tx_niov, tx->tx_nkiov); - atomic_add (tx->tx_nob, &conn->ksnc_tx_nob); - tx->tx_conn = conn; - ksocknal_conn_addref(conn); /* +1 ref for tx */ - - /* - * NB Darwin: SOCK_WMEM_QUEUED()->sock_getsockopt() will take - * a blockable lock(socket lock), so SOCK_WMEM_QUEUED can't be - * put in spinlock. - */ - bufnob = SOCK_WMEM_QUEUED(conn->ksnc_sock); - spin_lock_bh (&sched->kss_lock); - - if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) { - /* First packet starts the timeout */ - conn->ksnc_tx_deadline = - cfs_time_shift(*ksocknal_tunables.ksnd_timeout); - conn->ksnc_tx_bufnob = 0; - mb(); /* order with adding to tx_queue */ - } + bufnob = conn->ksnc_sock->sk->sk_wmem_queued; + spin_lock_bh(&sched->kss_lock); - ztx = NULL; + if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) { + /* First packet starts the timeout */ + conn->ksnc_tx_deadline = + cfs_time_shift(*ksocknal_tunables.ksnd_timeout); + if (conn->ksnc_tx_bufnob > 0) /* something got ACKed */ + conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); + conn->ksnc_tx_bufnob = 0; + smp_mb(); /* order with adding to tx_queue */ + } - if (msg->ksm_type == KSOCK_MSG_NOOP) { - /* The packet is noop ZC ACK, try to piggyback the ack_cookie - * on a normal packet so I don't need to send it */ - LASSERT(msg->ksm_zc_req_cookie == 0); - LASSERT(msg->ksm_zc_ack_cookie != 0); + if (msg->ksm_type == KSOCK_MSG_NOOP) { + /* The packet is noop ZC ACK, try to piggyback the ack_cookie + * on a normal packet so I don't need to send it */ + LASSERT (msg->ksm_zc_cookies[1] != 0); + LASSERT (conn->ksnc_proto->pro_queue_tx_zcack != NULL); - if (conn->ksnc_tx_mono != NULL) { - if (ksocknal_piggyback_zcack(conn, msg->ksm_zc_ack_cookie)) { - /* zc-ack cookie is piggybacked */ - atomic_sub (tx->tx_nob, &conn->ksnc_tx_nob); - ztx = tx; /* Put to freelist later */ - } else { - /* no packet can piggyback zc-ack cookie */ - list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); - } - } else { - /* It's the first mono-packet */ - conn->ksnc_tx_mono = tx; - list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); - } + if (conn->ksnc_proto->pro_queue_tx_zcack(conn, tx, 0)) + ztx = tx; /* ZC ACK piggybacked on ztx release tx later */ } else { /* It's a normal packet - can it piggback a noop zc-ack that * has been queued already? */ - LASSERT(msg->ksm_zc_ack_cookie == 0); - - if (conn->ksnc_proto == &ksocknal_protocol_v2x && /* V2.x packet */ - conn->ksnc_tx_mono != NULL) { - if (conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_NOOP) { - /* There is a noop zc-ack can be piggybacked */ - ztx = conn->ksnc_tx_mono; - - msg->ksm_zc_ack_cookie = ztx->tx_msg.ksm_zc_ack_cookie; - ksocknal_next_mono_tx(conn); + LASSERT (msg->ksm_zc_cookies[1] == 0); + LASSERT (conn->ksnc_proto->pro_queue_tx_msg != NULL); - /* use tx to replace the noop zc-ack packet, ztx will - * be put to freelist later */ - list_add(&tx->tx_list, &ztx->tx_list); - list_del(&ztx->tx_list); - - atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob); - } else { - /* no noop zc-ack packet, just enqueue it */ - LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_LNET); - list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); - } - - } else if (conn->ksnc_proto == &ksocknal_protocol_v2x) { - /* it's the first mono-packet, enqueue it */ - conn->ksnc_tx_mono = tx; - list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); - } else { - /* V1.x packet, just enqueue it */ - list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); - } + ztx = conn->ksnc_proto->pro_queue_tx_msg(conn, tx); + /* ztx will be released later */ } - if (ztx != NULL) - list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs); - - if (conn->ksnc_tx_ready && /* able to send */ - !conn->ksnc_tx_scheduled) { /* not scheduled to send */ - /* +1 ref for scheduler */ - ksocknal_conn_addref(conn); - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); - conn->ksnc_tx_scheduled = 1; - cfs_waitq_signal (&sched->kss_waitq); + if (ztx != NULL) { + atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob); + list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs); } - spin_unlock_bh (&sched->kss_lock); + if (conn->ksnc_tx_ready && /* able to send */ + !conn->ksnc_tx_scheduled) { /* not scheduled to send */ + /* +1 ref for scheduler */ + ksocknal_conn_addref(conn); + list_add_tail(&conn->ksnc_tx_list, + &sched->kss_tx_conns); + conn->ksnc_tx_scheduled = 1; + wake_up(&sched->kss_waitq); + } + + spin_unlock_bh(&sched->kss_lock); } + ksock_route_t * ksocknal_find_connectable_route_locked (ksock_peer_t *peer) { - struct list_head *tmp; - ksock_route_t *route; + cfs_time_t now = cfs_time_current(); + struct list_head *tmp; + ksock_route_t *route; - list_for_each (tmp, &peer->ksnp_routes) { - route = list_entry (tmp, ksock_route_t, ksnr_list); + list_for_each(tmp, &peer->ksnp_routes) { + route = list_entry(tmp, ksock_route_t, ksnr_list); LASSERT (!route->ksnr_connecting || route->ksnr_scheduled); @@ -870,11 +795,17 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer) if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0) continue; - /* too soon to retry this guy? */ if (!(route->ksnr_retry_interval == 0 || /* first attempt */ - cfs_time_aftereq (cfs_time_current(), - route->ksnr_timeout))) + cfs_time_aftereq(now, route->ksnr_timeout))) { + CDEBUG(D_NET, + "Too soon to retry route %pI4h " + "(cnted %d, interval %ld, %ld secs later)\n", + &route->ksnr_ipaddr, + route->ksnr_connected, + route->ksnr_retry_interval, + cfs_duration_sec(route->ksnr_timeout - now)); continue; + } return (route); } @@ -885,11 +816,11 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer) ksock_route_t * ksocknal_find_connecting_route_locked (ksock_peer_t *peer) { - struct list_head *tmp; + struct list_head *tmp; ksock_route_t *route; - list_for_each (tmp, &peer->ksnp_routes) { - route = list_entry (tmp, ksock_route_t, ksnr_list); + list_for_each(tmp, &peer->ksnp_routes) { + route = list_entry(tmp, ksock_route_t, ksnr_list); LASSERT (!route->ksnr_connecting || route->ksnr_scheduled); @@ -905,44 +836,41 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id) { ksock_peer_t *peer; ksock_conn_t *conn; - ksock_route_t *route; - rwlock_t *g_lock; + rwlock_t *g_lock; int retry; int rc; LASSERT (tx->tx_conn == NULL); - LASSERT (tx->tx_lnetmsg != NULL); g_lock = &ksocknal_data.ksnd_global_lock; for (retry = 0;; retry = 1) { -#if !SOCKNAL_ROUND_ROBIN - read_lock (g_lock); + read_lock(g_lock); peer = ksocknal_find_peer_locked(ni, id); if (peer != NULL) { if (ksocknal_find_connectable_route_locked(peer) == NULL) { - conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer); + conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk); if (conn != NULL) { /* I've got no routes that need to be * connecting and I do have an actual * connection... */ ksocknal_queue_tx_locked (tx, conn); - read_unlock (g_lock); + read_unlock(g_lock); return (0); } } } /* I'll need a write lock... */ - read_unlock (g_lock); -#endif - write_lock_bh (g_lock); + read_unlock(g_lock); + + write_lock_bh(g_lock); peer = ksocknal_find_peer_locked(ni, id); if (peer != NULL) break; - write_unlock_bh (g_lock); + write_unlock_bh(g_lock); if ((id.pid & LNET_PID_USERFLAG) != 0) { CERROR("Refusing to create a connection to " @@ -965,20 +893,13 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id) } } - for (;;) { - /* launch any/all connections that need it */ - route = ksocknal_find_connectable_route_locked (peer); - if (route == NULL) - break; - - ksocknal_launch_connection_locked (route); - } + ksocknal_launch_all_connections_locked(peer); - conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer); + conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk); if (conn != NULL) { /* Connection exists; queue message on it */ ksocknal_queue_tx_locked (tx, conn); - write_unlock_bh (g_lock); + write_unlock_bh(g_lock); return (0); } @@ -987,27 +908,28 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id) /* the message is going to be pinned to the peer */ tx->tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); - + /* Queue the message until a connection is established */ - list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue); - write_unlock_bh (g_lock); + list_add_tail(&tx->tx_list, &peer->ksnp_tx_queue); + write_unlock_bh(g_lock); return 0; } - write_unlock_bh (g_lock); + write_unlock_bh(g_lock); /* NB Routes may be ignored if connections to them failed recently */ - CDEBUG(D_NETERROR, "No usable routes to %s\n", libcfs_id2str(id)); + CNETERR("No usable routes to %s\n", libcfs_id2str(id)); return (-EHOSTUNREACH); } int ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) { + int mpflag = 1; int type = lntmsg->msg_type; lnet_process_id_t target = lntmsg->msg_target; unsigned int payload_niov = lntmsg->msg_niov; - struct iovec *payload_iov = lntmsg->msg_iov; + struct kvec *payload_iov = lntmsg->msg_iov; lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; @@ -1021,23 +943,27 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n", payload_nob, payload_niov, libcfs_id2str(target)); - LASSERT (payload_nob == 0 || payload_niov > 0); - LASSERT (payload_niov <= LNET_MAX_IOV); - /* payload is either all vaddrs or all pages */ - LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); - LASSERT (!in_interrupt ()); - - if (payload_iov != NULL) - desc_size = offsetof(ksock_tx_t, - tx_frags.virt.iov[1 + payload_niov]); - else - desc_size = offsetof(ksock_tx_t, - tx_frags.paged.kiov[payload_niov]); - - tx = ksocknal_alloc_tx(desc_size); + LASSERT (payload_nob == 0 || payload_niov > 0); + LASSERT (payload_niov <= LNET_MAX_IOV); + /* payload is either all vaddrs or all pages */ + LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); + LASSERT (!in_interrupt ()); + + if (payload_iov != NULL) + desc_size = offsetof(ksock_tx_t, + tx_frags.virt.iov[1 + payload_niov]); + else + desc_size = offsetof(ksock_tx_t, + tx_frags.paged.kiov[payload_niov]); + + if (lntmsg->msg_vmflush) + mpflag = cfs_memory_pressure_get_and_set(); + tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size); if (tx == NULL) { CERROR("Can't allocate tx desc type %d size %d\n", type, desc_size); + if (lntmsg->msg_vmflush) + cfs_memory_pressure_restore(mpflag); return (-ENOMEM); } @@ -1059,12 +985,21 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov, payload_niov, payload_kiov, payload_offset, payload_nob); + + if (payload_nob >= *ksocknal_tunables.ksnd_zc_min_payload) + tx->tx_zc_capable = 1; } - ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_LNET); + tx->tx_msg.ksm_csum = 0; + tx->tx_msg.ksm_type = KSOCK_MSG_LNET; + tx->tx_msg.ksm_zc_cookies[0] = 0; + tx->tx_msg.ksm_zc_cookies[1] = 0; /* The first fragment will be set later in pro_pack */ rc = ksocknal_launch_packet(ni, tx, target); + if (!mpflag) + cfs_memory_pressure_restore(mpflag); + if (rc == 0) return (0); @@ -1073,25 +1008,25 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) } int -ksocknal_thread_start (int (*fn)(void *arg), void *arg) +ksocknal_thread_start(int (*fn)(void *arg), void *arg, char *name) { - long pid = cfs_kernel_thread (fn, arg, 0); + struct task_struct *task = kthread_run(fn, arg, name); - if (pid < 0) - return ((int)pid); + if (IS_ERR(task)) + return PTR_ERR(task); - write_lock_bh (&ksocknal_data.ksnd_global_lock); - ksocknal_data.ksnd_nthreads++; - write_unlock_bh (&ksocknal_data.ksnd_global_lock); - return (0); + write_lock_bh(&ksocknal_data.ksnd_global_lock); + ksocknal_data.ksnd_nthreads++; + write_unlock_bh(&ksocknal_data.ksnd_global_lock); + return 0; } void ksocknal_thread_fini (void) { - write_lock_bh (&ksocknal_data.ksnd_global_lock); + write_lock_bh(&ksocknal_data.ksnd_global_lock); ksocknal_data.ksnd_nthreads--; - write_unlock_bh (&ksocknal_data.ksnd_global_lock); + write_unlock_bh(&ksocknal_data.ksnd_global_lock); } int @@ -1110,19 +1045,20 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) ksocknal_lib_eager_ack(conn); } - if (nob_to_skip == 0) { /* right at next packet boundary now */ - conn->ksnc_rx_started = 0; - mb (); /* racing with timeout thread */ + if (nob_to_skip == 0) { /* right at next packet boundary now */ + conn->ksnc_rx_started = 0; + smp_mb(); /* racing with timeout thread */ - switch (conn->ksnc_proto->pro_version) { - case KSOCK_PROTO_V2: + switch (conn->ksnc_proto->pro_version) { + case KSOCK_PROTO_V2: + case KSOCK_PROTO_V3: conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER; - conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; + conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg; - conn->ksnc_rx_nob_wanted = offsetof(ksock_msg_t, ksm_u); - conn->ksnc_rx_nob_left = offsetof(ksock_msg_t, ksm_u); - conn->ksnc_rx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u); + conn->ksnc_rx_nob_wanted = offsetof(struct ksock_msg, ksm_u); + conn->ksnc_rx_nob_left = offsetof(struct ksock_msg, ksm_u); + conn->ksnc_rx_iov[0].iov_len = offsetof(struct ksock_msg, ksm_u); break; case KSOCK_PROTO_V1: @@ -1131,7 +1067,7 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) conn->ksnc_rx_nob_wanted = sizeof(lnet_hdr_t); conn->ksnc_rx_nob_left = sizeof(lnet_hdr_t); - conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; + conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg; conn->ksnc_rx_iov[0].iov_len = sizeof (lnet_hdr_t); break; @@ -1152,7 +1088,7 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) conn->ksnc_rx_state = SOCKNAL_RX_SLOP; conn->ksnc_rx_nob_left = nob_to_skip; - conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; + conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space; skipped = 0; niov = 0; @@ -1166,7 +1102,7 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) nob_to_skip -=nob; } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */ - niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec)); + niov < sizeof(conn->ksnc_rx_iov_space) / sizeof(struct kvec)); conn->ksnc_rx_niov = niov; conn->ksnc_rx_kiov = NULL; @@ -1175,104 +1111,17 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) return (0); } -/* (Sink) handle incoming ZC request from sender */ -static int -ksocknal_handle_zc_req(ksock_peer_t *peer, __u64 cookie) -{ - ksock_conn_t *conn; - ksock_tx_t *tx; - ksock_sched_t *sched; - int rc; - - read_lock (&ksocknal_data.ksnd_global_lock); - - conn = ksocknal_find_conn_locked (0, peer); - if (conn == NULL) { - read_unlock (&ksocknal_data.ksnd_global_lock); - CERROR("Can't find connection to send zcack.\n"); - return -ECONNRESET; - } - - sched = conn->ksnc_scheduler; - - spin_lock_bh (&sched->kss_lock); - rc = ksocknal_piggyback_zcack(conn, cookie); - spin_unlock_bh (&sched->kss_lock); - - read_unlock (&ksocknal_data.ksnd_global_lock); - if (rc) { - /* Ack cookie is piggybacked */ - return 0; - } - - tx = ksocknal_alloc_tx(KSOCK_NOOP_TX_SIZE); - if (tx == NULL) { - CERROR("Can't allocate noop tx desc\n"); - return -ENOMEM; - } - - tx->tx_conn = NULL; - tx->tx_lnetmsg = NULL; - tx->tx_kiov = NULL; - tx->tx_nkiov = 0; - tx->tx_iov = tx->tx_frags.virt.iov; - tx->tx_niov = 1; - - ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP); - tx->tx_msg.ksm_zc_ack_cookie = cookie; /* incoming cookie */ - - read_lock (&ksocknal_data.ksnd_global_lock); - - conn = ksocknal_find_conn_locked (0, peer); - if (conn == NULL) { - read_unlock (&ksocknal_data.ksnd_global_lock); - ksocknal_free_tx(tx); - CERROR("Can't find connection to send zcack.\n"); - return -ECONNRESET; - } - ksocknal_queue_tx_locked(tx, conn); - - read_unlock (&ksocknal_data.ksnd_global_lock); - - return 0; -} - -/* (Sender) handle ZC_ACK from sink */ static int -ksocknal_handle_zc_ack(ksock_peer_t *peer, __u64 cookie) -{ - ksock_tx_t *tx; - struct list_head *ctmp; - - spin_lock(&peer->ksnp_lock); - - list_for_each(ctmp, &peer->ksnp_zc_req_list) { - tx = list_entry (ctmp, ksock_tx_t, tx_zc_list); - if (tx->tx_msg.ksm_zc_req_cookie != cookie) - continue; - - tx->tx_msg.ksm_zc_req_cookie = 0; - list_del(&tx->tx_zc_list); - - spin_unlock(&peer->ksnp_lock); - - ksocknal_tx_decref(tx); - return 0; - } - spin_unlock(&peer->ksnp_lock); - - return -EPROTO; -} - -int ksocknal_process_receive (ksock_conn_t *conn) { - int rc; + lnet_hdr_t *lhdr; + lnet_process_id_t *id; + int rc; - LASSERT (atomic_read(&conn->ksnc_conn_refcount) > 0); + LASSERT (atomic_read(&conn->ksnc_conn_refcount) > 0); - /* NB: sched lock NOT held */ - /* SOCKNAL_RX_LNET_HEADER is here for backward compatability */ + /* NB: sched lock NOT held */ + /* SOCKNAL_RX_LNET_HEADER is here for backward compatibility */ LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER || conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD || conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER || @@ -1281,22 +1130,23 @@ ksocknal_process_receive (ksock_conn_t *conn) if (conn->ksnc_rx_nob_wanted != 0) { rc = ksocknal_receive(conn); - if (rc <= 0) { - LASSERT (rc != -EAGAIN); + if (rc <= 0) { + lnet_process_id_t ksnp_id = conn->ksnc_peer->ksnp_id; - if (rc == 0) - CDEBUG (D_NET, "[%p] EOF from %s" - " ip %d.%d.%d.%d:%d\n", conn, - libcfs_id2str(conn->ksnc_peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); - else if (!conn->ksnc_closing) - CERROR ("[%p] Error %d on read from %s" - " ip %d.%d.%d.%d:%d\n", - conn, rc, - libcfs_id2str(conn->ksnc_peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), + LASSERT(rc != -EAGAIN); + + if (rc == 0) + CDEBUG(D_NET, "[%p] EOF from %s " + "ip %pI4h:%d\n", conn, + libcfs_id2str(ksnp_id), + &conn->ksnc_ipaddr, conn->ksnc_port); + else if (!conn->ksnc_closing) + CERROR("[%p] Error %d on read from %s " + "ip %pI4h:%d\n", conn, rc, + libcfs_id2str(ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); /* it's not an error if conn is being closed */ ksocknal_close_conn_and_siblings (conn, @@ -1314,8 +1164,18 @@ ksocknal_process_receive (ksock_conn_t *conn) if (conn->ksnc_flip) { __swab32s(&conn->ksnc_msg.ksm_type); __swab32s(&conn->ksnc_msg.ksm_csum); - __swab64s(&conn->ksnc_msg.ksm_zc_req_cookie); - __swab64s(&conn->ksnc_msg.ksm_zc_ack_cookie); + __swab64s(&conn->ksnc_msg.ksm_zc_cookies[0]); + __swab64s(&conn->ksnc_msg.ksm_zc_cookies[1]); + } + + if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP && + conn->ksnc_msg.ksm_type != KSOCK_MSG_LNET) { + CERROR("%s: Unknown message type: %x\n", + libcfs_id2str(conn->ksnc_peer->ksnp_id), + conn->ksnc_msg.ksm_type); + ksocknal_new_packet(conn, 0); + ksocknal_close_conn_and_siblings(conn, -EPROTO); + return (-EPROTO); } if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP && @@ -1330,15 +1190,21 @@ ksocknal_process_receive (ksock_conn_t *conn) return (-EIO); } - if (conn->ksnc_msg.ksm_zc_ack_cookie != 0) { - LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); + if (conn->ksnc_msg.ksm_zc_cookies[1] != 0) { + __u64 cookie = 0; + + LASSERT (conn->ksnc_proto != &ksocknal_protocol_v1x); + + if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) + cookie = conn->ksnc_msg.ksm_zc_cookies[0]; + + rc = conn->ksnc_proto->pro_handle_zcack(conn, cookie, + conn->ksnc_msg.ksm_zc_cookies[1]); - rc = ksocknal_handle_zc_ack(conn->ksnc_peer, - conn->ksnc_msg.ksm_zc_ack_cookie); if (rc != 0) { - CERROR("%s: Unknown zero copy ACK cookie: "LPU64"\n", + CERROR("%s: Unknown ZC-ACK cookie: %llu, %llu\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), - conn->ksnc_msg.ksm_zc_ack_cookie); + cookie, conn->ksnc_msg.ksm_zc_cookies[1]); ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings(conn, -EPROTO); return (rc); @@ -1349,15 +1215,14 @@ ksocknal_process_receive (ksock_conn_t *conn) ksocknal_new_packet (conn, 0); return 0; /* NOOP is done and just return */ } - LASSERT (conn->ksnc_msg.ksm_type == KSOCK_MSG_LNET); conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER; - conn->ksnc_rx_nob_wanted = sizeof(ksock_lnet_msg_t); - conn->ksnc_rx_nob_left = sizeof(ksock_lnet_msg_t); + conn->ksnc_rx_nob_wanted = sizeof(struct ksock_lnet_msg); + conn->ksnc_rx_nob_left = sizeof(struct ksock_lnet_msg); - conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; + conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg; - conn->ksnc_rx_iov[0].iov_len = sizeof(ksock_lnet_msg_t); + conn->ksnc_rx_iov[0].iov_len = sizeof(struct ksock_lnet_msg); conn->ksnc_rx_niov = 1; conn->ksnc_rx_kiov = NULL; @@ -1371,8 +1236,8 @@ ksocknal_process_receive (ksock_conn_t *conn) if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) { /* Userspace peer */ - lnet_process_id_t *id = &conn->ksnc_peer->ksnp_id; - lnet_hdr_t *lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr; + lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr; + id = &conn->ksnc_peer->ksnp_id; /* Substitute process ID assigned at connection time */ lhdr->src_pid = cpu_to_le32(id->pid); @@ -1416,14 +1281,20 @@ ksocknal_process_receive (ksock_conn_t *conn) rc = -EIO; } - lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc); + if (rc == 0 && conn->ksnc_msg.ksm_zc_cookies[0] != 0) { + LASSERT(conn->ksnc_proto != &ksocknal_protocol_v1x); - if (rc == 0 && conn->ksnc_msg.ksm_zc_req_cookie != 0) { - LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); - rc = ksocknal_handle_zc_req(conn->ksnc_peer, - conn->ksnc_msg.ksm_zc_req_cookie); + lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr; + id = &conn->ksnc_peer->ksnp_id; + + rc = conn->ksnc_proto->pro_handle_zcreq(conn, + conn->ksnc_msg.ksm_zc_cookies[0], + *ksocknal_tunables.ksnd_nonblk_zcack || + le64_to_cpu(lhdr->src_nid) != id->nid); } + lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc); + if (rc != 0) { ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings (conn, rc); @@ -1448,7 +1319,7 @@ ksocknal_process_receive (ksock_conn_t *conn) int ksocknal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, - unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, + unsigned int niov, struct kvec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen) { ksock_conn_t *conn = (ksock_conn_t *)private; @@ -1483,14 +1354,14 @@ ksocknal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, LASSERT (conn->ksnc_rx_scheduled); - spin_lock_bh (&sched->kss_lock); + spin_lock_bh(&sched->kss_lock); - switch (conn->ksnc_rx_state) { - case SOCKNAL_RX_PARSE_WAIT: - list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns); - cfs_waitq_signal (&sched->kss_waitq); - LASSERT (conn->ksnc_rx_ready); - break; + switch (conn->ksnc_rx_state) { + case SOCKNAL_RX_PARSE_WAIT: + list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns); + wake_up(&sched->kss_waitq); + LASSERT(conn->ksnc_rx_ready); + break; case SOCKNAL_RX_PARSE: /* scheduler hasn't noticed I'm parsing yet */ @@ -1499,62 +1370,58 @@ ksocknal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD; - spin_unlock_bh (&sched->kss_lock); - ksocknal_conn_decref(conn); - return (0); + spin_unlock_bh(&sched->kss_lock); + ksocknal_conn_decref(conn); + return 0; } static inline int ksocknal_sched_cansleep(ksock_sched_t *sched) { - int rc; + int rc; - spin_lock_bh (&sched->kss_lock); + spin_lock_bh(&sched->kss_lock); - rc = (!ksocknal_data.ksnd_shuttingdown && - list_empty(&sched->kss_rx_conns) && - list_empty(&sched->kss_tx_conns)); + rc = (!ksocknal_data.ksnd_shuttingdown && + list_empty(&sched->kss_rx_conns) && + list_empty(&sched->kss_tx_conns)); - spin_unlock_bh (&sched->kss_lock); - return (rc); + spin_unlock_bh(&sched->kss_lock); + return rc; } -int ksocknal_scheduler (void *arg) +int ksocknal_scheduler(void *arg) { - ksock_sched_t *sched = (ksock_sched_t *)arg; - ksock_conn_t *conn; - ksock_tx_t *tx; - int rc; - int nloops = 0; - int id = sched - ksocknal_data.ksnd_schedulers; - char name[16]; + struct ksock_sched_info *info; + ksock_sched_t *sched; + ksock_conn_t *conn; + ksock_tx_t *tx; + int rc; + int nloops = 0; + long id = (long)arg; - snprintf (name, sizeof (name),"socknal_sd%02d", id); - cfs_daemonize (name); - cfs_block_allsigs (); + info = ksocknal_data.ksnd_sched_info[KSOCK_THREAD_CPT(id)]; + sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)]; -#if defined(CONFIG_SMP) && defined(CPU_AFFINITY) - id = ksocknal_sched2cpu(id); - if (cpu_online(id)) { - cpumask_t m = CPU_MASK_NONE; - cpu_set(id, m); - set_cpus_allowed(current, m); - } else { - CERROR ("Can't set CPU affinity for %s to %d\n", name, id); - } -#endif /* CONFIG_SMP && CPU_AFFINITY */ + cfs_block_allsigs(); + + rc = cfs_cpt_bind(lnet_cpt_table(), info->ksi_cpt); + if (rc != 0) { + CWARN("Can't set CPU partition affinity to %d: %d\n", + info->ksi_cpt, rc); + } - spin_lock_bh (&sched->kss_lock); + spin_lock_bh(&sched->kss_lock); while (!ksocknal_data.ksnd_shuttingdown) { int did_something = 0; /* Ensure I progress everything semi-fairly */ - if (!list_empty (&sched->kss_rx_conns)) { - conn = list_entry(sched->kss_rx_conns.next, - ksock_conn_t, ksnc_rx_list); - list_del(&conn->ksnc_rx_list); + if (!list_empty(&sched->kss_rx_conns)) { + conn = list_entry(sched->kss_rx_conns.next, + ksock_conn_t, ksnc_rx_list); + list_del(&conn->ksnc_rx_list); LASSERT(conn->ksnc_rx_scheduled); LASSERT(conn->ksnc_rx_ready); @@ -1564,11 +1431,11 @@ int ksocknal_scheduler (void *arg) * data_ready can set it any time after we release * kss_lock. */ conn->ksnc_rx_ready = 0; - spin_unlock_bh (&sched->kss_lock); + spin_unlock_bh(&sched->kss_lock); - rc = ksocknal_process_receive(conn); + rc = ksocknal_process_receive(conn); - spin_lock_bh (&sched->kss_lock); + spin_lock_bh(&sched->kss_lock); /* I'm the only one that can clear this flag */ LASSERT(conn->ksnc_rx_scheduled); @@ -1584,8 +1451,8 @@ int ksocknal_scheduler (void *arg) conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT; } else if (conn->ksnc_rx_ready) { /* reschedule for rx */ - list_add_tail (&conn->ksnc_rx_list, - &sched->kss_rx_conns); + list_add_tail(&conn->ksnc_rx_list, + &sched->kss_rx_conns); } else { conn->ksnc_rx_scheduled = 0; /* drop my ref */ @@ -1595,40 +1462,41 @@ int ksocknal_scheduler (void *arg) did_something = 1; } - if (!list_empty (&sched->kss_tx_conns)) { - CFS_LIST_HEAD (zlist); + if (!list_empty(&sched->kss_tx_conns)) { + struct list_head zlist = LIST_HEAD_INIT(zlist); - if (!list_empty(&sched->kss_zombie_noop_txs)) { - list_add(&zlist, &sched->kss_zombie_noop_txs); - list_del_init(&sched->kss_zombie_noop_txs); + if (!list_empty(&sched->kss_zombie_noop_txs)) { + list_add(&zlist, + &sched->kss_zombie_noop_txs); + list_del_init(&sched->kss_zombie_noop_txs); } - conn = list_entry(sched->kss_tx_conns.next, - ksock_conn_t, ksnc_tx_list); - list_del (&conn->ksnc_tx_list); + conn = list_entry(sched->kss_tx_conns.next, + ksock_conn_t, ksnc_tx_list); + list_del(&conn->ksnc_tx_list); LASSERT(conn->ksnc_tx_scheduled); LASSERT(conn->ksnc_tx_ready); - LASSERT(!list_empty(&conn->ksnc_tx_queue)); + LASSERT(!list_empty(&conn->ksnc_tx_queue)); - tx = list_entry(conn->ksnc_tx_queue.next, - ksock_tx_t, tx_list); + tx = list_entry(conn->ksnc_tx_queue.next, + ksock_tx_t, tx_list); - if (conn->ksnc_tx_mono == tx) - ksocknal_next_mono_tx(conn); + if (conn->ksnc_tx_carrier == tx) + ksocknal_next_tx_carrier(conn); /* dequeue now so empty list => more to send */ - list_del(&tx->tx_list); + list_del(&tx->tx_list); /* Clear tx_ready in case send isn't complete. Do * it BEFORE we call process_transmit, since * write_space can set it any time after we release * kss_lock. */ conn->ksnc_tx_ready = 0; - spin_unlock_bh (&sched->kss_lock); + spin_unlock_bh(&sched->kss_lock); - if (!list_empty(&zlist)) { - /* free zombie noop txs, it's fast because + if (!list_empty(&zlist)) { + /* free zombie noop txs, it's fast because * noop txs are just put in freelist */ ksocknal_txlist_done(NULL, &zlist, 0); } @@ -1637,13 +1505,14 @@ int ksocknal_scheduler (void *arg) if (rc == -ENOMEM || rc == -EAGAIN) { /* Incomplete send: replace tx on HEAD of tx_queue */ - spin_lock_bh (&sched->kss_lock); - list_add (&tx->tx_list, &conn->ksnc_tx_queue); - } else { - /* Complete send; tx -ref */ - ksocknal_tx_decref (tx); - - spin_lock_bh (&sched->kss_lock); + spin_lock_bh(&sched->kss_lock); + list_add(&tx->tx_list, + &conn->ksnc_tx_queue); + } else { + /* Complete send; tx -ref */ + ksocknal_tx_decref(tx); + + spin_lock_bh(&sched->kss_lock); /* assume space for more */ conn->ksnc_tx_ready = 1; } @@ -1652,10 +1521,10 @@ int ksocknal_scheduler (void *arg) /* Do nothing; after a short timeout, this * conn will be reposted on kss_tx_conns. */ } else if (conn->ksnc_tx_ready && - !list_empty (&conn->ksnc_tx_queue)) { + !list_empty(&conn->ksnc_tx_queue)) { /* reschedule for tx */ - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); + list_add_tail(&conn->ksnc_tx_list, + &sched->kss_tx_conns); } else { conn->ksnc_tx_scheduled = 0; /* drop my ref */ @@ -1666,26 +1535,26 @@ int ksocknal_scheduler (void *arg) } if (!did_something || /* nothing to do */ ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */ - spin_unlock_bh (&sched->kss_lock); + spin_unlock_bh(&sched->kss_lock); nloops = 0; if (!did_something) { /* wait for something to do */ - rc = wait_event_interruptible_exclusive( - sched->kss_waitq, - !ksocknal_sched_cansleep(sched)); - LASSERT (rc == 0); - } else { - our_cond_resched(); - } - - spin_lock_bh (&sched->kss_lock); - } - } - - spin_unlock_bh (&sched->kss_lock); - ksocknal_thread_fini (); - return (0); + rc = wait_event_interruptible_exclusive( + sched->kss_waitq, + !ksocknal_sched_cansleep(sched)); + LASSERT (rc == 0); + } else { + cond_resched(); + } + + spin_lock_bh(&sched->kss_lock); + } + } + + spin_unlock_bh(&sched->kss_lock); + ksocknal_thread_fini(); + return 0; } /* @@ -1694,79 +1563,92 @@ int ksocknal_scheduler (void *arg) */ void ksocknal_read_callback (ksock_conn_t *conn) { - ksock_sched_t *sched; - ENTRY; + ksock_sched_t *sched; + ENTRY; - sched = conn->ksnc_scheduler; + sched = conn->ksnc_scheduler; - spin_lock_bh (&sched->kss_lock); + spin_lock_bh(&sched->kss_lock); - conn->ksnc_rx_ready = 1; + conn->ksnc_rx_ready = 1; - if (!conn->ksnc_rx_scheduled) { /* not being progressed */ - list_add_tail(&conn->ksnc_rx_list, - &sched->kss_rx_conns); - conn->ksnc_rx_scheduled = 1; - /* extra ref for scheduler */ - ksocknal_conn_addref(conn); + if (!conn->ksnc_rx_scheduled) { /* not being progressed */ + list_add_tail(&conn->ksnc_rx_list, + &sched->kss_rx_conns); + conn->ksnc_rx_scheduled = 1; + /* extra ref for scheduler */ + ksocknal_conn_addref(conn); - cfs_waitq_signal (&sched->kss_waitq); - } - spin_unlock_bh (&sched->kss_lock); + wake_up (&sched->kss_waitq); + } + spin_unlock_bh(&sched->kss_lock); - EXIT; + EXIT; } /* * Add connection to kss_tx_conns of scheduler * and wakeup the scheduler. */ -void ksocknal_write_callback (ksock_conn_t *conn) +void ksocknal_write_callback(ksock_conn_t *conn) { - ksock_sched_t *sched; - ENTRY; + ksock_sched_t *sched; + ENTRY; - sched = conn->ksnc_scheduler; + sched = conn->ksnc_scheduler; - spin_lock_bh (&sched->kss_lock); + spin_lock_bh(&sched->kss_lock); - conn->ksnc_tx_ready = 1; + conn->ksnc_tx_ready = 1; - if (!conn->ksnc_tx_scheduled && // not being progressed - !list_empty(&conn->ksnc_tx_queue)){//packets to send - list_add_tail (&conn->ksnc_tx_list, - &sched->kss_tx_conns); - conn->ksnc_tx_scheduled = 1; - /* extra ref for scheduler */ - ksocknal_conn_addref(conn); + if (!conn->ksnc_tx_scheduled && /* not being progressed */ + !list_empty(&conn->ksnc_tx_queue)) { /* packets to send */ + list_add_tail(&conn->ksnc_tx_list, &sched->kss_tx_conns); + conn->ksnc_tx_scheduled = 1; + /* extra ref for scheduler */ + ksocknal_conn_addref(conn); - cfs_waitq_signal (&sched->kss_waitq); - } + wake_up(&sched->kss_waitq); + } - spin_unlock_bh (&sched->kss_lock); + spin_unlock_bh(&sched->kss_lock); - EXIT; + EXIT; } -ksock_proto_t * -ksocknal_parse_proto_version (ksock_hello_msg_t *hello) +static ksock_proto_t * +ksocknal_parse_proto_version (struct ksock_hello_msg *hello) { - if ((hello->kshm_magic == LNET_PROTO_MAGIC && - hello->kshm_version == KSOCK_PROTO_V2) || - (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC) && - hello->kshm_version == __swab32(KSOCK_PROTO_V2))) { + __u32 version = 0; + + if (hello->kshm_magic == LNET_PROTO_MAGIC) + version = hello->kshm_version; + else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC)) + version = __swab32(hello->kshm_version); + + if (version != 0) { #if SOCKNAL_VERSION_DEBUG - if (*ksocknal_tunables.ksnd_protocol != 2) + if (*ksocknal_tunables.ksnd_protocol == 1) + return NULL; + + if (*ksocknal_tunables.ksnd_protocol == 2 && + version == KSOCK_PROTO_V3) return NULL; #endif - return &ksocknal_protocol_v2x; + if (version == KSOCK_PROTO_V2) + return &ksocknal_protocol_v2x; + + if (version == KSOCK_PROTO_V3) + return &ksocknal_protocol_v3x; + + return NULL; } if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) { lnet_magicversion_t *hmv = (lnet_magicversion_t *)hello; CLASSERT (sizeof (lnet_magicversion_t) == - offsetof (ksock_hello_msg_t, kshm_src_nid)); + offsetof (struct ksock_hello_msg, kshm_src_nid)); if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) && hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR)) @@ -1776,341 +1658,14 @@ ksocknal_parse_proto_version (ksock_hello_msg_t *hello) return NULL; } -static int -ksocknal_send_hello_v1 (ksock_conn_t *conn, ksock_hello_msg_t *hello) -{ - cfs_socket_t *sock = conn->ksnc_sock; - lnet_hdr_t *hdr; - lnet_magicversion_t *hmv; - int rc; - int i; - - CLASSERT(sizeof(lnet_magicversion_t) == offsetof(lnet_hdr_t, src_nid)); - - LIBCFS_ALLOC(hdr, sizeof(*hdr)); - if (hdr == NULL) { - CERROR("Can't allocate lnet_hdr_t\n"); - return -ENOMEM; - } - - hmv = (lnet_magicversion_t *)&hdr->dest_nid; - - /* Re-organize V2.x message header to V1.x (lnet_hdr_t) - * header and send out */ - hmv->magic = cpu_to_le32 (LNET_PROTO_TCP_MAGIC); - hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR); - hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR); - - if (the_lnet.ln_testprotocompat != 0) { - /* single-shot proto check */ - LNET_LOCK(); - if ((the_lnet.ln_testprotocompat & 1) != 0) { - hmv->version_major++; /* just different! */ - the_lnet.ln_testprotocompat &= ~1; - } - if ((the_lnet.ln_testprotocompat & 2) != 0) { - hmv->magic = LNET_PROTO_MAGIC; - the_lnet.ln_testprotocompat &= ~2; - } - LNET_UNLOCK(); - } - - hdr->src_nid = cpu_to_le64 (hello->kshm_src_nid); - hdr->src_pid = cpu_to_le32 (hello->kshm_src_pid); - hdr->type = cpu_to_le32 (LNET_MSG_HELLO); - hdr->payload_length = cpu_to_le32 (hello->kshm_nips * sizeof(__u32)); - hdr->msg.hello.type = cpu_to_le32 (hello->kshm_ctype); - hdr->msg.hello.incarnation = cpu_to_le64 (hello->kshm_src_incarnation); - - rc = libcfs_sock_write(sock, hdr, sizeof(*hdr), lnet_acceptor_timeout()); - - if (rc != 0) { - CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n", - rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); - goto out; - } - - if (hello->kshm_nips == 0) - goto out; - - for (i = 0; i < hello->kshm_nips; i++) { - hello->kshm_ips[i] = __cpu_to_le32 (hello->kshm_ips[i]); - } - - rc = libcfs_sock_write(sock, hello->kshm_ips, - hello->kshm_nips * sizeof(__u32), - lnet_acceptor_timeout()); - if (rc != 0) { - CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)" - " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips, - HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); - } -out: - LIBCFS_FREE(hdr, sizeof(*hdr)); - - return rc; -} - -static int -ksocknal_send_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello) -{ - cfs_socket_t *sock = conn->ksnc_sock; - int rc; - - hello->kshm_magic = LNET_PROTO_MAGIC; - hello->kshm_version = KSOCK_PROTO_V2; - - if (the_lnet.ln_testprotocompat != 0) { - /* single-shot proto check */ - LNET_LOCK(); - if ((the_lnet.ln_testprotocompat & 1) != 0) { - hello->kshm_version++; /* just different! */ - the_lnet.ln_testprotocompat &= ~1; - } - LNET_UNLOCK(); - } - - rc = libcfs_sock_write(sock, hello, offsetof(ksock_hello_msg_t, kshm_ips), - lnet_acceptor_timeout()); - - if (rc != 0) { - CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n", - rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); - return rc; - } - - if (hello->kshm_nips == 0) - return 0; - - rc = libcfs_sock_write(sock, hello->kshm_ips, - hello->kshm_nips * sizeof(__u32), - lnet_acceptor_timeout()); - if (rc != 0) { - CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)" - " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips, - HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); - } - - return rc; -} - -static int -ksocknal_recv_hello_v1(ksock_conn_t *conn, ksock_hello_msg_t *hello,int timeout) -{ - cfs_socket_t *sock = conn->ksnc_sock; - lnet_hdr_t *hdr; - int rc; - int i; - - LIBCFS_ALLOC(hdr, sizeof(*hdr)); - if (hdr == NULL) { - CERROR("Can't allocate lnet_hdr_t\n"); - return -ENOMEM; - } - - rc = libcfs_sock_read(sock, &hdr->src_nid, - sizeof (*hdr) - offsetof (lnet_hdr_t, src_nid), - timeout); - if (rc != 0) { - CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n", - rc, HIPQUAD(conn->ksnc_ipaddr)); - LASSERT (rc < 0 && rc != -EALREADY); - goto out; - } - - /* ...and check we got what we expected */ - if (hdr->type != cpu_to_le32 (LNET_MSG_HELLO)) { - CERROR ("Expecting a HELLO hdr," - " but got type %d from %u.%u.%u.%u\n", - le32_to_cpu (hdr->type), - HIPQUAD(conn->ksnc_ipaddr)); - rc = -EPROTO; - goto out; - } - - hello->kshm_src_nid = le64_to_cpu (hdr->src_nid); - hello->kshm_src_pid = le32_to_cpu (hdr->src_pid); - hello->kshm_src_incarnation = le64_to_cpu (hdr->msg.hello.incarnation); - hello->kshm_ctype = le32_to_cpu (hdr->msg.hello.type); - hello->kshm_nips = le32_to_cpu (hdr->payload_length) / - sizeof (__u32); - - if (hello->kshm_nips > LNET_MAX_INTERFACES) { - CERROR("Bad nips %d from ip %u.%u.%u.%u\n", - hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr)); - rc = -EPROTO; - goto out; - } - - if (hello->kshm_nips == 0) - goto out; - - rc = libcfs_sock_read(sock, hello->kshm_ips, - hello->kshm_nips * sizeof(__u32), timeout); - if (rc != 0) { - CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n", - rc, HIPQUAD(conn->ksnc_ipaddr)); - LASSERT (rc < 0 && rc != -EALREADY); - goto out; - } - - for (i = 0; i < hello->kshm_nips; i++) { - hello->kshm_ips[i] = __le32_to_cpu(hello->kshm_ips[i]); - - if (hello->kshm_ips[i] == 0) { - CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n", - i, HIPQUAD(conn->ksnc_ipaddr)); - rc = -EPROTO; - break; - } - } -out: - LIBCFS_FREE(hdr, sizeof(*hdr)); - - return rc; -} - -static int -ksocknal_recv_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello, int timeout) -{ - cfs_socket_t *sock = conn->ksnc_sock; - int rc; - int i; - - if (hello->kshm_magic == LNET_PROTO_MAGIC) - conn->ksnc_flip = 0; - else - conn->ksnc_flip = 1; - - rc = libcfs_sock_read(sock, &hello->kshm_src_nid, - offsetof(ksock_hello_msg_t, kshm_ips) - - offsetof(ksock_hello_msg_t, kshm_src_nid), - timeout); - if (rc != 0) { - CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n", - rc, HIPQUAD(conn->ksnc_ipaddr)); - LASSERT (rc < 0 && rc != -EALREADY); - return rc; - } - - if (conn->ksnc_flip) { - __swab32s(&hello->kshm_src_pid); - __swab64s(&hello->kshm_src_nid); - __swab32s(&hello->kshm_dst_pid); - __swab64s(&hello->kshm_dst_nid); - __swab64s(&hello->kshm_src_incarnation); - __swab64s(&hello->kshm_dst_incarnation); - __swab32s(&hello->kshm_ctype); - __swab32s(&hello->kshm_nips); - } - - if (hello->kshm_nips > LNET_MAX_INTERFACES) { - CERROR("Bad nips %d from ip %u.%u.%u.%u\n", - hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr)); - return -EPROTO; - } - - if (hello->kshm_nips == 0) - return 0; - - rc = libcfs_sock_read(sock, hello->kshm_ips, - hello->kshm_nips * sizeof(__u32), timeout); - if (rc != 0) { - CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n", - rc, HIPQUAD(conn->ksnc_ipaddr)); - LASSERT (rc < 0 && rc != -EALREADY); - return rc; - } - - for (i = 0; i < hello->kshm_nips; i++) { - if (conn->ksnc_flip) - __swab32s(&hello->kshm_ips[i]); - - if (hello->kshm_ips[i] == 0) { - CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n", - i, HIPQUAD(conn->ksnc_ipaddr)); - return -EPROTO; - } - } - - return 0; -} - -static void -ksocknal_pack_msg_v1(ksock_tx_t *tx) -{ - /* V1.x has no KSOCK_MSG_NOOP */ - LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP); - LASSERT(tx->tx_lnetmsg != NULL); - - tx->tx_iov[0].iov_base = (void *)&tx->tx_lnetmsg->msg_hdr; - tx->tx_iov[0].iov_len = sizeof(lnet_hdr_t); - - tx->tx_resid = tx->tx_nob = tx->tx_lnetmsg->msg_len + sizeof(lnet_hdr_t); -} - -static void -ksocknal_pack_msg_v2(ksock_tx_t *tx) -{ - tx->tx_iov[0].iov_base = (void *)&tx->tx_msg; - - if (tx->tx_lnetmsg != NULL) { - LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP); - - tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = tx->tx_lnetmsg->msg_hdr; - tx->tx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload); - tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) + - tx->tx_lnetmsg->msg_len; - } else { - LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_NOOP); - - tx->tx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr); - tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr); - } - /* Don't checksum before start sending, because packet can be piggybacked with ACK */ -} - -static void -ksocknal_unpack_msg_v1(ksock_msg_t *msg) -{ - msg->ksm_type = KSOCK_MSG_LNET; - msg->ksm_csum = 0; - msg->ksm_zc_req_cookie = 0; - msg->ksm_zc_ack_cookie = 0; -} - -static void -ksocknal_unpack_msg_v2(ksock_msg_t *msg) -{ - return; /* Do nothing */ -} - -ksock_proto_t ksocknal_protocol_v1x = -{ - KSOCK_PROTO_V1, - ksocknal_send_hello_v1, - ksocknal_recv_hello_v1, - ksocknal_pack_msg_v1, - ksocknal_unpack_msg_v1 -}; - -ksock_proto_t ksocknal_protocol_v2x = -{ - KSOCK_PROTO_V2, - ksocknal_send_hello_v2, - ksocknal_recv_hello_v2, - ksocknal_pack_msg_v2, - ksocknal_unpack_msg_v2 -}; - int ksocknal_send_hello (lnet_ni_t *ni, ksock_conn_t *conn, - lnet_nid_t peer_nid, ksock_hello_msg_t *hello) + lnet_nid_t peer_nid, struct ksock_hello_msg *hello) { /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */ ksock_net_t *net = (ksock_net_t *)ni->ni_data; - LASSERT (0 <= hello->kshm_nips && hello->kshm_nips <= LNET_MAX_INTERFACES); + LASSERT (hello->kshm_nips <= LNET_MAX_INTERFACES); /* rely on caller to hold a ref on socket so it wouldn't disappear */ LASSERT (conn->ksnc_proto != NULL); @@ -2125,7 +1680,7 @@ ksocknal_send_hello (lnet_ni_t *ni, ksock_conn_t *conn, return conn->ksnc_proto->pro_send_hello(conn, hello); } -int +static int ksocknal_invert_type(int type) { switch (type) @@ -2143,16 +1698,16 @@ ksocknal_invert_type(int type) } int -ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, - ksock_hello_msg_t *hello, lnet_process_id_t *peerid, - __u64 *incarnation) +ksocknal_recv_hello(lnet_ni_t *ni, ksock_conn_t *conn, + struct ksock_hello_msg *hello, lnet_process_id_t *peerid, + __u64 *incarnation) { /* Return < 0 fatal error * 0 success * EALREADY lost connection race * EPROTO protocol version mismatch */ - cfs_socket_t *sock = conn->ksnc_sock; + struct socket *sock = conn->ksnc_sock; int active = (conn->ksnc_proto != NULL); int timeout; int proto_match; @@ -2160,16 +1715,17 @@ ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, ksock_proto_t *proto; lnet_process_id_t recv_id; - /* socket type set on active connections - not set on passive */ - LASSERT (!active == !(conn->ksnc_type != SOCKLND_CONN_NONE)); + /* socket type set on active connections - not set on passive */ + LASSERT(!active == !(conn->ksnc_type != SOCKLND_CONN_NONE)); - timeout = active ? *ksocknal_tunables.ksnd_timeout : - lnet_acceptor_timeout(); + timeout = active ? *ksocknal_tunables.ksnd_timeout : + lnet_acceptor_timeout(); - rc = libcfs_sock_read(sock, &hello->kshm_magic, sizeof (hello->kshm_magic), timeout); + rc = lnet_sock_read(sock, &hello->kshm_magic, + sizeof(hello->kshm_magic), timeout); if (rc != 0) { - CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n", - rc, HIPQUAD(conn->ksnc_ipaddr)); + CERROR("Error %d reading HELLO from %pI4h\n", + rc, &conn->ksnc_ipaddr); LASSERT (rc < 0); return rc; } @@ -2179,18 +1735,17 @@ ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, hello->kshm_magic != le32_to_cpu (LNET_PROTO_TCP_MAGIC)) { /* Unexpected magic! */ CERROR ("Bad magic(1) %#08x (%#08x expected) from " - "%u.%u.%u.%u\n", __cpu_to_le32 (hello->kshm_magic), - LNET_PROTO_TCP_MAGIC, - HIPQUAD(conn->ksnc_ipaddr)); + "%pI4h\n", __cpu_to_le32 (hello->kshm_magic), + LNET_PROTO_TCP_MAGIC, &conn->ksnc_ipaddr); return -EPROTO; } - rc = libcfs_sock_read(sock, &hello->kshm_version, - sizeof(hello->kshm_version), timeout); + rc = lnet_sock_read(sock, &hello->kshm_version, + sizeof(hello->kshm_version), timeout); if (rc != 0) { - CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n", - rc, HIPQUAD(conn->ksnc_ipaddr)); - LASSERT (rc < 0); + CERROR("Error %d reading HELLO from %pI4h\n", + rc, &conn->ksnc_ipaddr); + LASSERT(rc < 0); return rc; } @@ -2198,19 +1753,19 @@ ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, if (proto == NULL) { if (!active) { /* unknown protocol from peer, tell peer my protocol */ - conn->ksnc_proto = &ksocknal_protocol_v2x; + conn->ksnc_proto = &ksocknal_protocol_v3x; #if SOCKNAL_VERSION_DEBUG - if (*ksocknal_tunables.ksnd_protocol != 2) + if (*ksocknal_tunables.ksnd_protocol == 2) + conn->ksnc_proto = &ksocknal_protocol_v2x; + else if (*ksocknal_tunables.ksnd_protocol == 1) conn->ksnc_proto = &ksocknal_protocol_v1x; #endif hello->kshm_nips = 0; ksocknal_send_hello(ni, conn, ni->ni_nid, hello); } - CERROR ("Unknown protocol version (%d.x expected)" - " from %u.%u.%u.%u\n", - conn->ksnc_proto->pro_version, - HIPQUAD(conn->ksnc_ipaddr)); + CERROR("Unknown protocol version (%d.x expected) from %pI4h\n", + conn->ksnc_proto->pro_version, &conn->ksnc_ipaddr); return -EPROTO; } @@ -2221,8 +1776,8 @@ ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, /* receive the rest of hello message anyway */ rc = conn->ksnc_proto->pro_recv_hello(conn, hello, timeout); if (rc != 0) { - CERROR("Error %d reading or checking hello from from %u.%u.%u.%u\n", - rc, HIPQUAD(conn->ksnc_ipaddr)); + CERROR("Error %d reading or checking hello from from %pI4h\n", + rc, &conn->ksnc_ipaddr); LASSERT (rc < 0); return rc; } @@ -2231,7 +1786,7 @@ ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, if (hello->kshm_src_nid == LNET_NID_ANY) { CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY" - "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr)); + "from %pI4h\n", &conn->ksnc_ipaddr); return -EPROTO; } @@ -2248,26 +1803,25 @@ ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, if (!active) { *peerid = recv_id; - /* peer determines type */ - conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype); - if (conn->ksnc_type == SOCKLND_CONN_NONE) { - CERROR ("Unexpected type %d from %s ip %u.%u.%u.%u\n", - hello->kshm_ctype, libcfs_id2str(*peerid), - HIPQUAD(conn->ksnc_ipaddr)); - return -EPROTO; - } - - return 0; - } + /* peer determines type */ + conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype); + if (conn->ksnc_type == SOCKLND_CONN_NONE) { + CERROR("Unexpected type %d from %s ip %pI4h\n", + hello->kshm_ctype, libcfs_id2str(*peerid), + &conn->ksnc_ipaddr); + return -EPROTO; + } + return 0; + } if (peerid->pid != recv_id.pid || peerid->nid != recv_id.nid) { LCONSOLE_ERROR_MSG(0x130, "Connected successfully to %s on host" - " %u.%u.%u.%u, but they claimed they were " + " %pI4h, but they claimed they were " "%s; please check your Lustre " "configuration.\n", libcfs_id2str(*peerid), - HIPQUAD(conn->ksnc_ipaddr), + &conn->ksnc_ipaddr, libcfs_id2str(recv_id)); return -EPROTO; } @@ -2277,25 +1831,24 @@ ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, return proto_match ? EALREADY : EPROTO; } - if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) { - CERROR ("Mismatched types: me %d, %s ip %u.%u.%u.%u %d\n", - conn->ksnc_type, libcfs_id2str(*peerid), - HIPQUAD(conn->ksnc_ipaddr), - hello->kshm_ctype); - return -EPROTO; - } - - return 0; + if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) { + CERROR("Mismatched types: me %d, %s ip %pI4h %d\n", + conn->ksnc_type, libcfs_id2str(*peerid), + &conn->ksnc_ipaddr, + hello->kshm_ctype); + return -EPROTO; + } + return 0; } -void +static int ksocknal_connect (ksock_route_t *route) { - CFS_LIST_HEAD (zombies); + struct list_head zombies = LIST_HEAD_INIT(zombies); ksock_peer_t *peer = route->ksnr_peer; int type; int wanted; - cfs_socket_t *sock; + struct socket *sock; cfs_time_t deadline; int retry_later = 0; int rc = 0; @@ -2303,7 +1856,7 @@ ksocknal_connect (ksock_route_t *route) deadline = cfs_time_add(cfs_time_current(), cfs_time_seconds(*ksocknal_tunables.ksnd_timeout)); - write_lock_bh (&ksocknal_data.ksnd_global_lock); + write_lock_bh(&ksocknal_data.ksnd_global_lock); LASSERT (route->ksnr_scheduled); LASSERT (!route->ksnr_connecting); @@ -2343,7 +1896,7 @@ ksocknal_connect (ksock_route_t *route) type = SOCKLND_CONN_BULK_OUT; } - write_unlock_bh (&ksocknal_data.ksnd_global_lock); + write_unlock_bh(&ksocknal_data.ksnd_global_lock); if (cfs_time_aftereq(cfs_time_current(), deadline)) { rc = -ETIMEDOUT; @@ -2374,7 +1927,7 @@ ksocknal_connect (ksock_route_t *route) CDEBUG(D_NET, "peer %s: conn race, retry later.\n", libcfs_nid2str(peer->ksnp_id.nid)); - write_lock_bh (&ksocknal_data.ksnd_global_lock); + write_lock_bh(&ksocknal_data.ksnd_global_lock); } route->ksnr_scheduled = 0; @@ -2383,14 +1936,27 @@ ksocknal_connect (ksock_route_t *route) if (retry_later) { /* re-queue for attention; this frees me up to handle * the peer's incoming connection request */ + + if (rc == EALREADY || + (rc == 0 && peer->ksnp_accepting > 0)) { + /* We want to introduce a delay before next + * attempt to connect if we lost conn race, + * but the race is resolved quickly usually, + * so min_reconnectms should be good heuristic */ + route->ksnr_retry_interval = + cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms)/1000; + route->ksnr_timeout = cfs_time_add(cfs_time_current(), + route->ksnr_retry_interval); + } + ksocknal_launch_connection_locked(route); } - write_unlock_bh (&ksocknal_data.ksnd_global_lock); - return; + write_unlock_bh(&ksocknal_data.ksnd_global_lock); + return retry_later; failed: - write_lock_bh (&ksocknal_data.ksnd_global_lock); + write_lock_bh(&ksocknal_data.ksnd_global_lock); route->ksnr_scheduled = 0; route->ksnr_connecting = 0; @@ -2408,165 +1974,315 @@ ksocknal_connect (ksock_route_t *route) route->ksnr_timeout = cfs_time_add(cfs_time_current(), route->ksnr_retry_interval); - if (!list_empty(&peer->ksnp_tx_queue) && + if (!list_empty(&peer->ksnp_tx_queue) && peer->ksnp_accepting == 0 && ksocknal_find_connecting_route_locked(peer) == NULL) { + ksock_conn_t *conn; + /* ksnp_tx_queue is queued on a conn on successful - * connection */ - LASSERT (list_empty (&peer->ksnp_conns)); + * connection for V1.x and V2.x */ + if (!list_empty(&peer->ksnp_conns)) { + conn = list_entry(peer->ksnp_conns.next, + ksock_conn_t, ksnc_list); + LASSERT (conn->ksnc_proto == &ksocknal_protocol_v3x); + } /* take all the blocked packets while I've got the lock and * complete below... */ - list_add(&zombies, &peer->ksnp_tx_queue); - list_del_init(&peer->ksnp_tx_queue); + list_splice_init(&peer->ksnp_tx_queue, &zombies); } -#if 0 /* irrelevent with only eager routes */ - if (!route->ksnr_deleted) { - /* make this route least-favourite for re-selection */ - list_del(&route->ksnr_list); - list_add_tail(&route->ksnr_list, &peer->ksnp_routes); - } -#endif - write_unlock_bh (&ksocknal_data.ksnd_global_lock); + write_unlock_bh(&ksocknal_data.ksnd_global_lock); ksocknal_peer_failed(peer); ksocknal_txlist_done(peer->ksnp_ni, &zombies, 1); + return 0; } -static inline int -ksocknal_connd_connect_route_locked(void) +/* + * check whether we need to create more connds. + * It will try to create new thread if it's necessary, @timeout can + * be updated if failed to create, so caller wouldn't keep try while + * running out of resource. + */ +static int +ksocknal_connd_check_start(long sec, long *timeout) { - /* Only handle an outgoing connection request if there is someone left - * to handle incoming connections */ - return !list_empty(&ksocknal_data.ksnd_connd_routes) && - ((ksocknal_data.ksnd_connd_connecting + 1) < - *ksocknal_tunables.ksnd_nconnds); + char name[16]; + int rc; + int total = ksocknal_data.ksnd_connd_starting + + ksocknal_data.ksnd_connd_running; + + if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) { + /* still in initializing */ + return 0; + } + + if (total >= *ksocknal_tunables.ksnd_nconnds_max || + total > ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV) { + /* can't create more connd, or still have enough + * threads to handle more connecting */ + return 0; + } + + if (list_empty(&ksocknal_data.ksnd_connd_routes)) { + /* no pending connecting request */ + return 0; + } + + if (sec - ksocknal_data.ksnd_connd_failed_stamp <= 1) { + /* may run out of resource, retry later */ + *timeout = cfs_time_seconds(1); + return 0; + } + + if (ksocknal_data.ksnd_connd_starting > 0) { + /* serialize starting to avoid flood */ + return 0; + } + + ksocknal_data.ksnd_connd_starting_stamp = sec; + ksocknal_data.ksnd_connd_starting++; + spin_unlock_bh(&ksocknal_data.ksnd_connd_lock); + + /* NB: total is the next id */ + snprintf(name, sizeof(name), "socknal_cd%02d", total); + rc = ksocknal_thread_start(ksocknal_connd, NULL, name); + + spin_lock_bh(&ksocknal_data.ksnd_connd_lock); + if (rc == 0) + return 1; + + /* we tried ... */ + LASSERT(ksocknal_data.ksnd_connd_starting > 0); + ksocknal_data.ksnd_connd_starting--; + ksocknal_data.ksnd_connd_failed_stamp = cfs_time_current_sec(); + + return 1; } -static inline int -ksocknal_connd_ready(void) +/* + * check whether current thread can exit, it will return 1 if there are too + * many threads and no creating in past 120 seconds. + * Also, this function may update @timeout to make caller come back + * again to recheck these conditions. + */ +static int +ksocknal_connd_check_stop(long sec, long *timeout) { - int rc; + int val; + + if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) { + /* still in initializing */ + return 0; + } + + if (ksocknal_data.ksnd_connd_starting > 0) { + /* in progress of starting new thread */ + return 0; + } - spin_lock_bh (&ksocknal_data.ksnd_connd_lock); + if (ksocknal_data.ksnd_connd_running <= + *ksocknal_tunables.ksnd_nconnds) { /* can't shrink */ + return 0; + } - rc = ksocknal_data.ksnd_shuttingdown || - !list_empty(&ksocknal_data.ksnd_connd_connreqs) || - ksocknal_connd_connect_route_locked(); + /* created thread in past 120 seconds? */ + val = (int)(ksocknal_data.ksnd_connd_starting_stamp + + SOCKNAL_CONND_TIMEOUT - sec); - spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); + *timeout = (val > 0) ? cfs_time_seconds(val) : + cfs_time_seconds(SOCKNAL_CONND_TIMEOUT); + if (val > 0) + return 0; - return rc; + /* no creating in past 120 seconds */ + + return ksocknal_data.ksnd_connd_running > + ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV; +} + +/* Go through connd_routes queue looking for a route that we can process + * right now, @timeout_p can be updated if we need to come back later */ +static ksock_route_t * +ksocknal_connd_get_route_locked(signed long *timeout_p) +{ + ksock_route_t *route; + cfs_time_t now; + + now = cfs_time_current(); + + /* connd_routes can contain both pending and ordinary routes */ + list_for_each_entry(route, &ksocknal_data.ksnd_connd_routes, + ksnr_connd_list) { + + if (route->ksnr_retry_interval == 0 || + cfs_time_aftereq(now, route->ksnr_timeout)) + return route; + + if (*timeout_p == MAX_SCHEDULE_TIMEOUT || + (int)*timeout_p > (int)(route->ksnr_timeout - now)) + *timeout_p = (int)(route->ksnr_timeout - now); + } + + return NULL; } int ksocknal_connd (void *arg) { - long id = (long)arg; - char name[16]; - ksock_connreq_t *cr; - ksock_route_t *route; + spinlock_t *connd_lock = &ksocknal_data.ksnd_connd_lock; + ksock_connreq_t *cr; + wait_queue_t wait; + int nloops = 0; + int cons_retry = 0; - snprintf (name, sizeof (name), "socknal_cd%02ld", id); - cfs_daemonize (name); - cfs_block_allsigs (); + cfs_block_allsigs(); - spin_lock_bh (&ksocknal_data.ksnd_connd_lock); + init_waitqueue_entry(&wait, current); - while (!ksocknal_data.ksnd_shuttingdown) { + spin_lock_bh(connd_lock); - if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) { - /* Connection accepted by the listener */ - cr = list_entry(ksocknal_data.ksnd_connd_connreqs.next, - ksock_connreq_t, ksncr_list); + LASSERT(ksocknal_data.ksnd_connd_starting > 0); + ksocknal_data.ksnd_connd_starting--; + ksocknal_data.ksnd_connd_running++; - list_del(&cr->ksncr_list); - spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); + while (!ksocknal_data.ksnd_shuttingdown) { + ksock_route_t *route = NULL; + long sec = cfs_time_current_sec(); + long timeout = MAX_SCHEDULE_TIMEOUT; + int dropped_lock = 0; - ksocknal_create_conn(cr->ksncr_ni, NULL, - cr->ksncr_sock, SOCKLND_CONN_NONE); - lnet_ni_decref(cr->ksncr_ni); - LIBCFS_FREE(cr, sizeof(*cr)); + if (ksocknal_connd_check_stop(sec, &timeout)) { + /* wakeup another one to check stop */ + wake_up(&ksocknal_data.ksnd_connd_waitq); + break; + } - spin_lock_bh (&ksocknal_data.ksnd_connd_lock); + if (ksocknal_connd_check_start(sec, &timeout)) { + /* created new thread */ + dropped_lock = 1; } - if (ksocknal_connd_connect_route_locked()) { - /* Connection request */ - route = list_entry (ksocknal_data.ksnd_connd_routes.next, - ksock_route_t, ksnr_connd_list); + if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) { + /* Connection accepted by the listener */ + cr = list_entry(ksocknal_data.ksnd_connd_connreqs. \ + next, ksock_connreq_t, ksncr_list); - list_del (&route->ksnr_connd_list); - ksocknal_data.ksnd_connd_connecting++; - spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); + list_del(&cr->ksncr_list); + spin_unlock_bh(connd_lock); + dropped_lock = 1; - ksocknal_connect (route); - ksocknal_route_decref(route); + ksocknal_create_conn(cr->ksncr_ni, NULL, + cr->ksncr_sock, SOCKLND_CONN_NONE); + lnet_ni_decref(cr->ksncr_ni); + LIBCFS_FREE(cr, sizeof(*cr)); - spin_lock_bh (&ksocknal_data.ksnd_connd_lock); - ksocknal_data.ksnd_connd_connecting--; + spin_lock_bh(connd_lock); } - spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); - - wait_event_interruptible_exclusive( - ksocknal_data.ksnd_connd_waitq, - ksocknal_connd_ready()); - - spin_lock_bh (&ksocknal_data.ksnd_connd_lock); - } + /* Only handle an outgoing connection request if there + * is a thread left to handle incoming connections and + * create new connd */ + if (ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV < + ksocknal_data.ksnd_connd_running) { + route = ksocknal_connd_get_route_locked(&timeout); + } + if (route != NULL) { + list_del(&route->ksnr_connd_list); + ksocknal_data.ksnd_connd_connecting++; + spin_unlock_bh(connd_lock); + dropped_lock = 1; + + if (ksocknal_connect(route)) { + /* consecutive retry */ + if (cons_retry++ > SOCKNAL_INSANITY_RECONN) { + CWARN("massive consecutive " + "re-connecting to %pI4h\n", + &route->ksnr_ipaddr); + cons_retry = 0; + } + } else { + cons_retry = 0; + } - spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); + ksocknal_route_decref(route); - ksocknal_thread_fini (); - return (0); + spin_lock_bh(connd_lock); + ksocknal_data.ksnd_connd_connecting--; + } + + if (dropped_lock) { + if (++nloops < SOCKNAL_RESCHED) + continue; + spin_unlock_bh(connd_lock); + nloops = 0; + cond_resched(); + spin_lock_bh(connd_lock); + continue; + } + + /* Nothing to do for 'timeout' */ + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue_exclusive(&ksocknal_data.ksnd_connd_waitq, &wait); + spin_unlock_bh(connd_lock); + + nloops = 0; + schedule_timeout(timeout); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&ksocknal_data.ksnd_connd_waitq, &wait); + spin_lock_bh(connd_lock); + } + ksocknal_data.ksnd_connd_running--; + spin_unlock_bh(connd_lock); + + ksocknal_thread_fini(); + return 0; } -ksock_conn_t * +static ksock_conn_t * ksocknal_find_timed_out_conn (ksock_peer_t *peer) { /* We're called with a shared lock on ksnd_global_lock */ ksock_conn_t *conn; - struct list_head *ctmp; + struct list_head *ctmp; - list_for_each (ctmp, &peer->ksnp_conns) { + list_for_each(ctmp, &peer->ksnp_conns) { int error; - conn = list_entry (ctmp, ksock_conn_t, ksnc_list); + conn = list_entry(ctmp, ksock_conn_t, ksnc_list); /* Don't need the {get,put}connsock dance to deref ksnc_sock */ LASSERT (!conn->ksnc_closing); - /* SOCK_ERROR will reset error code of socket in - * some platform (like Darwin8.x) */ - error = SOCK_ERROR(conn->ksnc_sock); + error = conn->ksnc_sock->sk->sk_err; if (error != 0) { ksocknal_conn_addref(conn); switch (error) { case ECONNRESET: - CDEBUG(D_NETERROR, "A connection with %s " - "(%u.%u.%u.%u:%d) was reset; " - "it may have rebooted.\n", - libcfs_id2str(peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); + CNETERR("A connection with %s " + "(%pI4h:%d) was reset; " + "it may have rebooted.\n", + libcfs_id2str(peer->ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); break; case ETIMEDOUT: - CDEBUG(D_NETERROR, "A connection with %s " - "(%u.%u.%u.%u:%d) timed out; the " - "network or node may be down.\n", - libcfs_id2str(peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); + CNETERR("A connection with %s " + "(%pI4h:%d) timed out; the " + "network or node may be down.\n", + libcfs_id2str(peer->ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); break; default: - CDEBUG(D_NETERROR, "An unexpected network error %d " - "occurred with %s " - "(%u.%u.%u.%u:%d\n", error, - libcfs_id2str(peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); + CNETERR("An unexpected network error %d " + "occurred with %s " + "(%pI4h:%d\n", error, + libcfs_id2str(peer->ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port); break; } @@ -2578,30 +2294,28 @@ ksocknal_find_timed_out_conn (ksock_peer_t *peer) conn->ksnc_rx_deadline)) { /* Timed out incomplete incoming message */ ksocknal_conn_addref(conn); - CDEBUG(D_NETERROR, "Timeout receiving from %s " - "(%u.%u.%u.%u:%d), state %d wanted %d left %d\n", - libcfs_id2str(peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port, - conn->ksnc_rx_state, - conn->ksnc_rx_nob_wanted, - conn->ksnc_rx_nob_left); + CNETERR("Timeout receiving from %s (%pI4h:%d), " + "state %d wanted %d left %d\n", + libcfs_id2str(peer->ksnp_id), + &conn->ksnc_ipaddr, + conn->ksnc_port, + conn->ksnc_rx_state, + conn->ksnc_rx_nob_wanted, + conn->ksnc_rx_nob_left); return (conn); } - if ((!list_empty(&conn->ksnc_tx_queue) || - SOCK_WMEM_QUEUED(conn->ksnc_sock) != 0) && + if ((!list_empty(&conn->ksnc_tx_queue) || + conn->ksnc_sock->sk->sk_wmem_queued != 0) && cfs_time_aftereq(cfs_time_current(), conn->ksnc_tx_deadline)) { /* Timed out messages queued for sending or * buffered in the socket's send buffer */ ksocknal_conn_addref(conn); - CDEBUG(D_NETERROR, "Timeout sending data to %s " - "(%u.%u.%u.%u:%d) the network or that " - "node may be down.\n", - libcfs_id2str(peer->ksnp_id), - HIPQUAD(conn->ksnc_ipaddr), - conn->ksnc_port); + CNETERR("Timeout sending data to %s (%pI4h:%d) " + "the network or that node may be down.\n", + libcfs_id2str(peer->ksnp_id), + &conn->ksnc_ipaddr, conn->ksnc_port); return (conn); } } @@ -2613,47 +2327,120 @@ static inline void ksocknal_flush_stale_txs(ksock_peer_t *peer) { ksock_tx_t *tx; - CFS_LIST_HEAD (stale_txs); - - write_lock_bh (&ksocknal_data.ksnd_global_lock); + struct list_head stale_txs = LIST_HEAD_INIT(stale_txs); + + write_lock_bh(&ksocknal_data.ksnd_global_lock); - while (!list_empty (&peer->ksnp_tx_queue)) { - tx = list_entry (peer->ksnp_tx_queue.next, - ksock_tx_t, tx_list); + while (!list_empty(&peer->ksnp_tx_queue)) { + tx = list_entry(peer->ksnp_tx_queue.next, + ksock_tx_t, tx_list); if (!cfs_time_aftereq(cfs_time_current(), tx->tx_deadline)) break; - - list_del (&tx->tx_list); - list_add_tail (&tx->tx_list, &stale_txs); + + list_del(&tx->tx_list); + list_add_tail(&tx->tx_list, &stale_txs); } - write_unlock_bh (&ksocknal_data.ksnd_global_lock); + write_unlock_bh(&ksocknal_data.ksnd_global_lock); ksocknal_txlist_done(peer->ksnp_ni, &stale_txs, 1); } -void +static int +ksocknal_send_keepalive_locked(ksock_peer_t *peer) +__must_hold(&ksocknal_data.ksnd_global_lock) +{ + ksock_sched_t *sched; + ksock_conn_t *conn; + ksock_tx_t *tx; + + /* last_alive will be updated by create_conn */ + if (list_empty(&peer->ksnp_conns)) + return 0; + + if (peer->ksnp_proto != &ksocknal_protocol_v3x) + return 0; + + if (*ksocknal_tunables.ksnd_keepalive <= 0 || + cfs_time_before(cfs_time_current(), + cfs_time_add(peer->ksnp_last_alive, + cfs_time_seconds(*ksocknal_tunables.ksnd_keepalive)))) + return 0; + + if (cfs_time_before(cfs_time_current(), + peer->ksnp_send_keepalive)) + return 0; + + /* retry 10 secs later, so we wouldn't put pressure + * on this peer if we failed to send keepalive this time */ + peer->ksnp_send_keepalive = cfs_time_shift(10); + + conn = ksocknal_find_conn_locked(peer, NULL, 1); + if (conn != NULL) { + sched = conn->ksnc_scheduler; + + spin_lock_bh(&sched->kss_lock); + if (!list_empty(&conn->ksnc_tx_queue)) { + spin_unlock_bh(&sched->kss_lock); + /* there is an queued ACK, don't need keepalive */ + return 0; + } + + spin_unlock_bh(&sched->kss_lock); + } + + read_unlock(&ksocknal_data.ksnd_global_lock); + + /* cookie = 1 is reserved for keepalive PING */ + tx = ksocknal_alloc_tx_noop(1, 1); + if (tx == NULL) { + read_lock(&ksocknal_data.ksnd_global_lock); + return -ENOMEM; + } + + if (ksocknal_launch_packet(peer->ksnp_ni, tx, peer->ksnp_id) == 0) { + read_lock(&ksocknal_data.ksnd_global_lock); + return 1; + } + + ksocknal_free_tx(tx); + read_lock(&ksocknal_data.ksnd_global_lock); + + return -EIO; +} + + +static void ksocknal_check_peer_timeouts (int idx) { - struct list_head *peers = &ksocknal_data.ksnd_peers[idx]; - struct list_head *ptmp; + struct list_head *peers = &ksocknal_data.ksnd_peers[idx]; ksock_peer_t *peer; ksock_conn_t *conn; + ksock_tx_t *tx; again: /* NB. We expect to have a look at all the peers and not find any * connections to time out, so we just use a shared lock while we * take a look... */ - read_lock (&ksocknal_data.ksnd_global_lock); + read_lock(&ksocknal_data.ksnd_global_lock); + + list_for_each_entry(peer, peers, ksnp_list) { + ksock_tx_t *tx_stale; + cfs_time_t deadline = 0; + int resid = 0; + int n = 0; + + if (ksocknal_send_keepalive_locked(peer) != 0) { + read_unlock(&ksocknal_data.ksnd_global_lock); + goto again; + } - list_for_each (ptmp, peers) { - peer = list_entry (ptmp, ksock_peer_t, ksnp_list); conn = ksocknal_find_timed_out_conn (peer); if (conn != NULL) { - read_unlock (&ksocknal_data.ksnd_global_lock); + read_unlock(&ksocknal_data.ksnd_global_lock); ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT); @@ -2666,124 +2453,144 @@ ksocknal_check_peer_timeouts (int idx) /* we can't process stale txs right here because we're * holding only shared lock */ - if (!list_empty (&peer->ksnp_tx_queue)) { - ksock_tx_t *tx = list_entry (peer->ksnp_tx_queue.next, - ksock_tx_t, tx_list); + if (!list_empty(&peer->ksnp_tx_queue)) { + ksock_tx_t *tx = + list_entry(peer->ksnp_tx_queue.next, + ksock_tx_t, tx_list); if (cfs_time_aftereq(cfs_time_current(), tx->tx_deadline)) { ksocknal_peer_addref(peer); - read_unlock (&ksocknal_data.ksnd_global_lock); - + read_unlock(&ksocknal_data.ksnd_global_lock); + ksocknal_flush_stale_txs(peer); ksocknal_peer_decref(peer); goto again; } } - } - /* print out warnings about stale ZC_REQs */ - list_for_each_entry(peer, peers, ksnp_list) { - ksock_tx_t *tx; - int n = 0; - - list_for_each_entry(tx, &peer->ksnp_zc_req_list, tx_zc_list) { + if (list_empty(&peer->ksnp_zc_req_list)) + continue; + + tx_stale = NULL; + spin_lock(&peer->ksnp_lock); + list_for_each_entry(tx, &peer->ksnp_zc_req_list, tx_zc_list) { if (!cfs_time_aftereq(cfs_time_current(), tx->tx_deadline)) break; + /* ignore the TX if connection is being closed */ + if (tx->tx_conn->ksnc_closing) + continue; n++; + if (tx_stale == NULL) + tx_stale = tx; } - if (n != 0) { - tx = list_entry (peer->ksnp_zc_req_list.next, - ksock_tx_t, tx_zc_list); - CWARN("Stale ZC_REQs for peer %s detected: %d; the " - "oldest (%p) timed out %ld secs ago\n", - libcfs_nid2str(peer->ksnp_id.nid), n, tx, - cfs_duration_sec(cfs_time_current() - - tx->tx_deadline)); - } + if (tx_stale == NULL) { + spin_unlock(&peer->ksnp_lock); + continue; + } + + deadline = tx_stale->tx_deadline; + resid = tx_stale->tx_resid; + conn = tx_stale->tx_conn; + ksocknal_conn_addref(conn); + + spin_unlock(&peer->ksnp_lock); + read_unlock(&ksocknal_data.ksnd_global_lock); + + CERROR("Total %d stale ZC_REQs for peer %s detected; the " + "oldest(%p) timed out %ld secs ago, " + "resid: %d, wmem: %d\n", + n, libcfs_nid2str(peer->ksnp_id.nid), tx_stale, + cfs_duration_sec(cfs_time_current() - deadline), + resid, conn->ksnc_sock->sk->sk_wmem_queued); + + ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT); + ksocknal_conn_decref(conn); + goto again; } - - read_unlock (&ksocknal_data.ksnd_global_lock); + + read_unlock(&ksocknal_data.ksnd_global_lock); } -int -ksocknal_reaper (void *arg) +int ksocknal_reaper(void *arg) { - cfs_waitlink_t wait; - ksock_conn_t *conn; - ksock_sched_t *sched; - struct list_head enomem_conns; + wait_queue_t wait; + ksock_conn_t *conn; + ksock_sched_t *sched; + struct list_head enomem_conns; int nenomem_conns; cfs_duration_t timeout; int i; int peer_index = 0; cfs_time_t deadline = cfs_time_current(); - cfs_daemonize ("socknal_reaper"); cfs_block_allsigs (); - CFS_INIT_LIST_HEAD(&enomem_conns); - cfs_waitlink_init (&wait); + INIT_LIST_HEAD(&enomem_conns); + init_waitqueue_entry(&wait, current); - spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); while (!ksocknal_data.ksnd_shuttingdown) { - if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) { - conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next, - ksock_conn_t, ksnc_list); - list_del (&conn->ksnc_list); + if (!list_empty(&ksocknal_data.ksnd_deathrow_conns)) { + conn = list_entry(ksocknal_data. \ + ksnd_deathrow_conns.next, + ksock_conn_t, ksnc_list); + list_del(&conn->ksnc_list); - spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); - ksocknal_terminate_conn (conn); - ksocknal_conn_decref(conn); + ksocknal_terminate_conn(conn); + ksocknal_conn_decref(conn); - spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); continue; } - if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) { - conn = list_entry (ksocknal_data.ksnd_zombie_conns.next, - ksock_conn_t, ksnc_list); - list_del (&conn->ksnc_list); + if (!list_empty(&ksocknal_data.ksnd_zombie_conns)) { + conn = list_entry(ksocknal_data.ksnd_zombie_conns.\ + next, ksock_conn_t, ksnc_list); + list_del(&conn->ksnc_list); - spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); - ksocknal_destroy_conn (conn); + ksocknal_destroy_conn(conn); - spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); continue; } - if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) { - list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns); - list_del_init(&ksocknal_data.ksnd_enomem_conns); + if (!list_empty(&ksocknal_data.ksnd_enomem_conns)) { + list_add(&enomem_conns, + &ksocknal_data.ksnd_enomem_conns); + list_del_init(&ksocknal_data.ksnd_enomem_conns); } - spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); /* reschedule all the connections that stalled with ENOMEM... */ nenomem_conns = 0; - while (!list_empty (&enomem_conns)) { - conn = list_entry (enomem_conns.next, - ksock_conn_t, ksnc_tx_list); - list_del (&conn->ksnc_tx_list); + while (!list_empty(&enomem_conns)) { + conn = list_entry(enomem_conns.next, + ksock_conn_t, ksnc_tx_list); + list_del(&conn->ksnc_tx_list); sched = conn->ksnc_scheduler; - spin_lock_bh (&sched->kss_lock); + spin_lock_bh(&sched->kss_lock); - LASSERT (conn->ksnc_tx_scheduled); - conn->ksnc_tx_ready = 1; - list_add_tail(&conn->ksnc_tx_list, &sched->kss_tx_conns); - cfs_waitq_signal (&sched->kss_waitq); + LASSERT(conn->ksnc_tx_scheduled); + conn->ksnc_tx_ready = 1; + list_add_tail(&conn->ksnc_tx_list, + &sched->kss_tx_conns); + wake_up(&sched->kss_waitq); - spin_unlock_bh (&sched->kss_lock); + spin_unlock_bh(&sched->kss_lock); nenomem_conns++; } @@ -2825,22 +2632,22 @@ ksocknal_reaper (void *arg) ksocknal_data.ksnd_reaper_waketime = cfs_time_add(cfs_time_current(), timeout); - set_current_state (TASK_INTERRUPTIBLE); - cfs_waitq_add (&ksocknal_data.ksnd_reaper_waitq, &wait); + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&ksocknal_data.ksnd_reaper_waitq, &wait); - if (!ksocknal_data.ksnd_shuttingdown && - list_empty (&ksocknal_data.ksnd_deathrow_conns) && - list_empty (&ksocknal_data.ksnd_zombie_conns)) - cfs_waitq_timedwait (&wait, CFS_TASK_INTERRUPTIBLE, timeout); + if (!ksocknal_data.ksnd_shuttingdown && + list_empty(&ksocknal_data.ksnd_deathrow_conns) && + list_empty(&ksocknal_data.ksnd_zombie_conns)) + schedule_timeout(timeout); - set_current_state (TASK_RUNNING); - cfs_waitq_del (&ksocknal_data.ksnd_reaper_waitq, &wait); + set_current_state(TASK_RUNNING); + remove_wait_queue(&ksocknal_data.ksnd_reaper_waitq, &wait); - spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); - } + spin_lock_bh(&ksocknal_data.ksnd_reaper_lock); + } - spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); + spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock); - ksocknal_thread_fini (); - return (0); + ksocknal_thread_fini(); + return 0; }