/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright 2008 Sun Microsystems, Inc. All rights reserved * * Author: Zach Brown * Author: Peter J. Braam * Author: Phil Schwan * Author: Eric Barton * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * * Portals is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public * License as published by the Free Software Foundation. * * Portals is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Portals; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #include "socklnd.h" ksock_tx_t * ksocknal_alloc_tx (int size) { ksock_tx_t *tx = NULL; if (size == KSOCK_NOOP_TX_SIZE) { /* searching for a noop tx in free list */ spin_lock(&ksocknal_data.ksnd_tx_lock); if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) { tx = list_entry(ksocknal_data.ksnd_idle_noop_txs.next, ksock_tx_t, tx_list); LASSERT(tx->tx_desc_size == size); list_del(&tx->tx_list); } spin_unlock(&ksocknal_data.ksnd_tx_lock); } if (tx == NULL) LIBCFS_ALLOC(tx, size); if (tx == NULL) return NULL; atomic_set(&tx->tx_refcount, 1); tx->tx_desc_size = size; atomic_inc(&ksocknal_data.ksnd_nactive_txs); return tx; } void ksocknal_free_tx (ksock_tx_t *tx) { atomic_dec(&ksocknal_data.ksnd_nactive_txs); if (tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) { /* it's a noop tx */ spin_lock(&ksocknal_data.ksnd_tx_lock); list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs); spin_unlock(&ksocknal_data.ksnd_tx_lock); } else { LIBCFS_FREE(tx, tx->tx_desc_size); } } void ksocknal_init_msg(ksock_msg_t *msg, int type) { msg->ksm_type = type; msg->ksm_csum = 0; msg->ksm_zc_req_cookie = 0; msg->ksm_zc_ack_cookie = 0; } int ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) { struct iovec *iov = tx->tx_iov; int nob; int rc; LASSERT (tx->tx_niov > 0); /* Never touch tx->tx_iov inside ksocknal_lib_send_iov() */ rc = ksocknal_lib_send_iov(conn, tx); if (rc <= 0) /* sent nothing? */ return (rc); nob = rc; LASSERT (nob <= tx->tx_resid); tx->tx_resid -= nob; /* "consume" iov */ do { LASSERT (tx->tx_niov > 0); if (nob < (int) iov->iov_len) { iov->iov_base = (void *)((char *)iov->iov_base + nob); iov->iov_len -= nob; return (rc); } nob -= iov->iov_len; tx->tx_iov = ++iov; tx->tx_niov--; } while (nob != 0); return (rc); } int ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx) { lnet_kiov_t *kiov = tx->tx_kiov; int nob; int rc; LASSERT (tx->tx_niov == 0); LASSERT (tx->tx_nkiov > 0); /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */ rc = ksocknal_lib_send_kiov(conn, tx); if (rc <= 0) /* sent nothing? */ return (rc); nob = rc; LASSERT (nob <= tx->tx_resid); tx->tx_resid -= nob; /* "consume" kiov */ do { LASSERT(tx->tx_nkiov > 0); if (nob < (int)kiov->kiov_len) { kiov->kiov_offset += nob; kiov->kiov_len -= nob; return rc; } nob -= (int)kiov->kiov_len; tx->tx_kiov = ++kiov; tx->tx_nkiov--; } while (nob != 0); return (rc); } int ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx) { int rc; int bufnob; if (ksocknal_data.ksnd_stall_tx != 0) { cfs_pause(cfs_time_seconds(ksocknal_data.ksnd_stall_tx)); } LASSERT (tx->tx_resid != 0); rc = ksocknal_connsock_addref(conn); if (rc != 0) { LASSERT (conn->ksnc_closing); return (-ESHUTDOWN); } do { if (ksocknal_data.ksnd_enomem_tx > 0) { /* testing... */ ksocknal_data.ksnd_enomem_tx--; rc = -EAGAIN; } else if (tx->tx_niov != 0) { rc = ksocknal_send_iov (conn, tx); } else { rc = ksocknal_send_kiov (conn, tx); } bufnob = SOCK_WMEM_QUEUED(conn->ksnc_sock); if (rc > 0) /* sent something? */ conn->ksnc_tx_bufnob += rc; /* account it */ if (bufnob < conn->ksnc_tx_bufnob) { /* allocated send buffer bytes < computed; infer * something got ACKed */ conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); conn->ksnc_tx_bufnob = bufnob; mb(); } if (rc <= 0) { /* Didn't write anything? */ ksock_sched_t *sched; if (rc == 0) /* some stacks return 0 instead of -EAGAIN */ rc = -EAGAIN; if (rc != -EAGAIN) break; /* Check if EAGAIN is due to memory pressure */ sched = conn->ksnc_scheduler; spin_lock_bh (&sched->kss_lock); if (!SOCK_TEST_NOSPACE(conn->ksnc_sock) && !conn->ksnc_tx_ready) { /* SOCK_NOSPACE is set when the socket fills * and cleared in the write_space callback * (which also sets ksnc_tx_ready). If * SOCK_NOSPACE and ksnc_tx_ready are BOTH * zero, I didn't fill the socket and * write_space won't reschedule me, so I * return -ENOMEM to get my caller to retry * after a timeout */ rc = -ENOMEM; } spin_unlock_bh (&sched->kss_lock); break; } /* socket's wmem_queued now includes 'rc' bytes */ atomic_sub (rc, &conn->ksnc_tx_nob); rc = 0; } while (tx->tx_resid != 0); ksocknal_connsock_decref(conn); return (rc); } int ksocknal_recv_iov (ksock_conn_t *conn) { struct iovec *iov = conn->ksnc_rx_iov; int nob; int rc; LASSERT (conn->ksnc_rx_niov > 0); /* Never touch conn->ksnc_rx_iov or change connection * status inside ksocknal_lib_recv_iov */ rc = ksocknal_lib_recv_iov(conn); if (rc <= 0) return (rc); /* received something... */ nob = rc; conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); conn->ksnc_rx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); mb(); /* order with setting rx_started */ conn->ksnc_rx_started = 1; conn->ksnc_rx_nob_wanted -= nob; conn->ksnc_rx_nob_left -= nob; do { LASSERT (conn->ksnc_rx_niov > 0); if (nob < (int)iov->iov_len) { iov->iov_len -= nob; iov->iov_base = (void *)((char *)iov->iov_base + nob); return (-EAGAIN); } nob -= iov->iov_len; conn->ksnc_rx_iov = ++iov; conn->ksnc_rx_niov--; } while (nob != 0); return (rc); } int ksocknal_recv_kiov (ksock_conn_t *conn) { lnet_kiov_t *kiov = conn->ksnc_rx_kiov; int nob; int rc; LASSERT (conn->ksnc_rx_nkiov > 0); /* Never touch conn->ksnc_rx_kiov or change connection * status inside ksocknal_lib_recv_iov */ rc = ksocknal_lib_recv_kiov(conn); if (rc <= 0) return (rc); /* received something... */ nob = rc; conn->ksnc_peer->ksnp_last_alive = cfs_time_current(); conn->ksnc_rx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); mb(); /* order with setting rx_started */ conn->ksnc_rx_started = 1; conn->ksnc_rx_nob_wanted -= nob; conn->ksnc_rx_nob_left -= nob; do { LASSERT (conn->ksnc_rx_nkiov > 0); if (nob < (int) kiov->kiov_len) { kiov->kiov_offset += nob; kiov->kiov_len -= nob; return -EAGAIN; } nob -= kiov->kiov_len; conn->ksnc_rx_kiov = ++kiov; conn->ksnc_rx_nkiov--; } while (nob != 0); return 1; } int ksocknal_receive (ksock_conn_t *conn) { /* Return 1 on success, 0 on EOF, < 0 on error. * Caller checks ksnc_rx_nob_wanted to determine * progress/completion. */ int rc; ENTRY; if (ksocknal_data.ksnd_stall_rx != 0) { cfs_pause(cfs_time_seconds (ksocknal_data.ksnd_stall_rx)); } rc = ksocknal_connsock_addref(conn); if (rc != 0) { LASSERT (conn->ksnc_closing); return (-ESHUTDOWN); } for (;;) { if (conn->ksnc_rx_niov != 0) rc = ksocknal_recv_iov (conn); else rc = ksocknal_recv_kiov (conn); if (rc <= 0) { /* error/EOF or partial receive */ if (rc == -EAGAIN) { rc = 1; } else if (rc == 0 && conn->ksnc_rx_started) { /* EOF in the middle of a message */ rc = -EPROTO; } break; } /* Completed a fragment */ if (conn->ksnc_rx_nob_wanted == 0) { rc = 1; break; } } ksocknal_connsock_decref(conn); RETURN (rc); } void ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx) { lnet_msg_t *lnetmsg = tx->tx_lnetmsg; int rc = (tx->tx_resid == 0) ? 0 : -EIO; ENTRY; LASSERT(ni != NULL || tx->tx_conn != NULL); if (tx->tx_conn != NULL) ksocknal_conn_decref(tx->tx_conn); if (ni == NULL && tx->tx_conn != NULL) ni = tx->tx_conn->ksnc_peer->ksnp_ni; ksocknal_free_tx (tx); if (lnetmsg != NULL) /* KSOCK_MSG_NOOP go without lnetmsg */ lnet_finalize (ni, lnetmsg, rc); EXIT; } void ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error) { ksock_tx_t *tx; while (!list_empty (txlist)) { tx = list_entry (txlist->next, ksock_tx_t, tx_list); if (error && tx->tx_lnetmsg != NULL) { CDEBUG (D_NETERROR, "Deleting packet type %d len %d %s->%s\n", le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type), le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length), libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)), libcfs_nid2str(le64_to_cpu (tx->tx_lnetmsg->msg_hdr.dest_nid))); } else if (error) { CDEBUG (D_NETERROR, "Deleting noop packet\n"); } list_del (&tx->tx_list); LASSERT (atomic_read(&tx->tx_refcount) == 1); ksocknal_tx_done (ni, tx); } } static void ksocknal_check_zc_req(ksock_tx_t *tx) { ksock_conn_t *conn = tx->tx_conn; ksock_peer_t *peer = conn->ksnc_peer; lnet_kiov_t *kiov = tx->tx_kiov; int nkiov = tx->tx_nkiov; /* Set tx_msg.ksm_zc_req_cookie to a unique non-zero cookie and add tx * to ksnp_zc_req_list if some fragment of this message should be sent * zero-copy. Our peer will send an ACK containing this cookie when * she has received this message to tell us we can signal completion. * tx_msg.ksm_zc_req_cookie remains non-zero while tx is on * ksnp_zc_req_list. */ if (conn->ksnc_proto != &ksocknal_protocol_v2x || !conn->ksnc_zc_capable) return; while (nkiov > 0) { if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag) break; --nkiov; ++kiov; } if (nkiov == 0) return; /* assign cookie and queue tx to pending list, it will be released when * a matching ack is received. See ksocknal_handle_zc_ack() */ ksocknal_tx_addref(tx); spin_lock(&peer->ksnp_lock); /* ZC_REQ is going to be pinned to the peer */ tx->tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); LASSERT (tx->tx_msg.ksm_zc_req_cookie == 0); tx->tx_msg.ksm_zc_req_cookie = peer->ksnp_zc_next_cookie++; list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list); spin_unlock(&peer->ksnp_lock); } static void ksocknal_unzc_req(ksock_tx_t *tx) { ksock_peer_t *peer = tx->tx_conn->ksnc_peer; spin_lock(&peer->ksnp_lock); if (tx->tx_msg.ksm_zc_req_cookie == 0) { /* Not waiting for an ACK */ spin_unlock(&peer->ksnp_lock); return; } tx->tx_msg.ksm_zc_req_cookie = 0; list_del(&tx->tx_zc_list); spin_unlock(&peer->ksnp_lock); ksocknal_tx_decref(tx); } int ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx) { int rc; if (!tx->tx_checked_zc) { tx->tx_checked_zc = 1; ksocknal_check_zc_req(tx); } rc = ksocknal_transmit (conn, tx); CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc); if (tx->tx_resid == 0) { /* Sent everything OK */ LASSERT (rc == 0); return (0); } if (rc == -EAGAIN) return (rc); if (rc == -ENOMEM) { static int counter; counter++; /* exponential backoff warnings */ if ((counter & (-counter)) == counter) CWARN("%u ENOMEM tx %p (%u allocated)\n", counter, conn, atomic_read(&libcfs_kmemory)); /* Queue on ksnd_enomem_conns for retry after a timeout */ spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); /* enomem list takes over scheduler's ref... */ LASSERT (conn->ksnc_tx_scheduled); list_add_tail(&conn->ksnc_tx_list, &ksocknal_data.ksnd_enomem_conns); if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(), SOCKNAL_ENOMEM_RETRY), ksocknal_data.ksnd_reaper_waketime)) cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq); spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); return (rc); } /* Actual error */ LASSERT (rc < 0); if (!conn->ksnc_closing) { switch (rc) { case -ECONNRESET: LCONSOLE_WARN("Host %u.%u.%u.%u reset our connection " "while we were sending data; it may have " "rebooted.\n", HIPQUAD(conn->ksnc_ipaddr)); break; default: LCONSOLE_WARN("There was an unexpected network error " "while writing to %u.%u.%u.%u: %d.\n", HIPQUAD(conn->ksnc_ipaddr), rc); break; } CDEBUG(D_NET, "[%p] Error %d on write to %s" " ip %d.%d.%d.%d:%d\n", conn, rc, libcfs_id2str(conn->ksnc_peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); } ksocknal_unzc_req(tx); /* it's not an error if conn is being closed */ ksocknal_close_conn_and_siblings (conn, (conn->ksnc_closing) ? 0 : rc); return (rc); } void ksocknal_launch_connection_locked (ksock_route_t *route) { /* called holding write lock on ksnd_global_lock */ LASSERT (!route->ksnr_scheduled); LASSERT (!route->ksnr_connecting); LASSERT ((ksocknal_route_mask() & ~route->ksnr_connected) != 0); route->ksnr_scheduled = 1; /* scheduling conn for connd */ ksocknal_route_addref(route); /* extra ref for connd */ spin_lock_bh (&ksocknal_data.ksnd_connd_lock); list_add_tail (&route->ksnr_connd_list, &ksocknal_data.ksnd_connd_routes); cfs_waitq_signal (&ksocknal_data.ksnd_connd_waitq); spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); } ksock_conn_t * ksocknal_find_conn_locked (int payload_nob, ksock_peer_t *peer) { struct list_head *tmp; ksock_conn_t *typed = NULL; int tnob = 0; ksock_conn_t *fallback = NULL; int fnob = 0; ksock_conn_t *conn; list_for_each (tmp, &peer->ksnp_conns) { ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list); int hdr_nob = 0; #if SOCKNAL_ROUND_ROBIN const int nob = 0; #else int nob = atomic_read(&c->ksnc_tx_nob) + SOCK_WMEM_QUEUED(c->ksnc_sock); #endif LASSERT (!c->ksnc_closing); LASSERT (c->ksnc_proto != NULL); if (fallback == NULL || nob < fnob) { fallback = c; fnob = nob; } if (!*ksocknal_tunables.ksnd_typed_conns) continue; if (payload_nob == 0) { /* noop packet */ hdr_nob = offsetof(ksock_msg_t, ksm_u); } else { /* lnet packet */ hdr_nob = (c->ksnc_proto == &ksocknal_protocol_v2x)? sizeof(ksock_msg_t) : sizeof(lnet_hdr_t); } switch (c->ksnc_type) { default: CERROR("ksnc_type bad: %u\n", c->ksnc_type); LBUG(); case SOCKLND_CONN_ANY: break; case SOCKLND_CONN_BULK_IN: continue; case SOCKLND_CONN_BULK_OUT: if ((hdr_nob + payload_nob) < *ksocknal_tunables.ksnd_min_bulk) continue; break; case SOCKLND_CONN_CONTROL: if ((hdr_nob + payload_nob) >= *ksocknal_tunables.ksnd_min_bulk) continue; break; } if (typed == NULL || nob < tnob) { typed = c; tnob = nob; } } /* prefer the typed selection */ conn = (typed != NULL) ? typed : fallback; #if SOCKNAL_ROUND_ROBIN if (conn != NULL) { /* round-robin all else being equal */ list_del (&conn->ksnc_list); list_add_tail (&conn->ksnc_list, &peer->ksnp_conns); } #endif return conn; } void ksocknal_next_mono_tx(ksock_conn_t *conn) { ksock_tx_t *tx = conn->ksnc_tx_mono; /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */ LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); LASSERT(!list_empty(&conn->ksnc_tx_queue)); LASSERT(tx != NULL); if (tx->tx_list.next == &conn->ksnc_tx_queue) { /* no more packets queued */ conn->ksnc_tx_mono = NULL; } else { conn->ksnc_tx_mono = list_entry(tx->tx_list.next, ksock_tx_t, tx_list); LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == tx->tx_msg.ksm_type); } } int ksocknal_piggyback_zcack(ksock_conn_t *conn, __u64 cookie) { ksock_tx_t *tx = conn->ksnc_tx_mono; /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */ if (tx == NULL) return 0; if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) { /* tx is noop zc-ack, can't piggyback zc-ack cookie */ return 0; } LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET); LASSERT(tx->tx_msg.ksm_zc_ack_cookie == 0); /* piggyback the zc-ack cookie */ tx->tx_msg.ksm_zc_ack_cookie = cookie; ksocknal_next_mono_tx(conn); return 1; } void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn) { ksock_sched_t *sched = conn->ksnc_scheduler; ksock_msg_t *msg = &tx->tx_msg; ksock_tx_t *ztx; int bufnob = 0; /* called holding global lock (read or irq-write) and caller may * not have dropped this lock between finding conn and calling me, * so we don't need the {get,put}connsock dance to deref * ksnc_sock... */ LASSERT(!conn->ksnc_closing); CDEBUG (D_NET, "Sending to %s ip %d.%d.%d.%d:%d\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); tx->tx_checked_zc = 0; conn->ksnc_proto->pro_pack(tx); /* Ensure the frags we've been given EXACTLY match the number of * bytes we want to send. Many TCP/IP stacks disregard any total * size parameters passed to them and just look at the frags. * * We always expect at least 1 mapped fragment containing the * complete ksocknal message header. */ LASSERT (lnet_iov_nob (tx->tx_niov, tx->tx_iov) + lnet_kiov_nob(tx->tx_nkiov, tx->tx_kiov) == (unsigned int)tx->tx_nob); LASSERT (tx->tx_niov >= 1); LASSERT (tx->tx_resid == tx->tx_nob); CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n", tx, (tx->tx_lnetmsg != NULL)? tx->tx_lnetmsg->msg_hdr.type: KSOCK_MSG_NOOP, tx->tx_nob, tx->tx_niov, tx->tx_nkiov); atomic_add (tx->tx_nob, &conn->ksnc_tx_nob); tx->tx_conn = conn; ksocknal_conn_addref(conn); /* +1 ref for tx */ /* * FIXME: SOCK_WMEM_QUEUED and SOCK_ERROR could block in __DARWIN8__ * but they're used inside spinlocks a lot. */ bufnob = SOCK_WMEM_QUEUED(conn->ksnc_sock); spin_lock_bh (&sched->kss_lock); if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) { /* First packet starts the timeout */ conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); conn->ksnc_tx_bufnob = 0; mb(); /* order with adding to tx_queue */ } ztx = NULL; if (msg->ksm_type == KSOCK_MSG_NOOP) { /* The packet is noop ZC ACK, try to piggyback the ack_cookie * on a normal packet so I don't need to send it */ LASSERT(msg->ksm_zc_req_cookie == 0); LASSERT(msg->ksm_zc_ack_cookie != 0); if (conn->ksnc_tx_mono != NULL) { if (ksocknal_piggyback_zcack(conn, msg->ksm_zc_ack_cookie)) { /* zc-ack cookie is piggybacked */ atomic_sub (tx->tx_nob, &conn->ksnc_tx_nob); ztx = tx; /* Put to freelist later */ } else { /* no packet can piggyback zc-ack cookie */ list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } } else { /* It's the first mono-packet */ conn->ksnc_tx_mono = tx; list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } } else { /* It's a normal packet - can it piggback a noop zc-ack that * has been queued already? */ LASSERT(msg->ksm_zc_ack_cookie == 0); if (conn->ksnc_proto == &ksocknal_protocol_v2x && /* V2.x packet */ conn->ksnc_tx_mono != NULL) { if (conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_NOOP) { /* There is a noop zc-ack can be piggybacked */ ztx = conn->ksnc_tx_mono; msg->ksm_zc_ack_cookie = ztx->tx_msg.ksm_zc_ack_cookie; ksocknal_next_mono_tx(conn); /* use tx to replace the noop zc-ack packet, ztx will * be put to freelist later */ list_add(&tx->tx_list, &ztx->tx_list); list_del(&ztx->tx_list); atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob); } else { /* no noop zc-ack packet, just enqueue it */ LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_LNET); list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } } else if (conn->ksnc_proto == &ksocknal_protocol_v2x) { /* it's the first mono-packet, enqueue it */ conn->ksnc_tx_mono = tx; list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } else { /* V1.x packet, just enqueue it */ list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } } if (ztx != NULL) list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs); if (conn->ksnc_tx_ready && /* able to send */ !conn->ksnc_tx_scheduled) { /* not scheduled to send */ /* +1 ref for scheduler */ ksocknal_conn_addref(conn); list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); conn->ksnc_tx_scheduled = 1; cfs_waitq_signal (&sched->kss_waitq); } spin_unlock_bh (&sched->kss_lock); } ksock_route_t * ksocknal_find_connectable_route_locked (ksock_peer_t *peer) { struct list_head *tmp; ksock_route_t *route; list_for_each (tmp, &peer->ksnp_routes) { route = list_entry (tmp, ksock_route_t, ksnr_list); LASSERT (!route->ksnr_connecting || route->ksnr_scheduled); if (route->ksnr_scheduled) /* connections being established */ continue; /* all route types connected ? */ if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0) continue; /* too soon to retry this guy? */ if (!(route->ksnr_retry_interval == 0 || /* first attempt */ cfs_time_aftereq (cfs_time_current(), route->ksnr_timeout))) continue; return (route); } return (NULL); } ksock_route_t * ksocknal_find_connecting_route_locked (ksock_peer_t *peer) { struct list_head *tmp; ksock_route_t *route; list_for_each (tmp, &peer->ksnp_routes) { route = list_entry (tmp, ksock_route_t, ksnr_list); LASSERT (!route->ksnr_connecting || route->ksnr_scheduled); if (route->ksnr_scheduled) return (route); } return (NULL); } int ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id) { ksock_peer_t *peer; ksock_conn_t *conn; ksock_route_t *route; rwlock_t *g_lock; int retry; int rc; LASSERT (tx->tx_conn == NULL); LASSERT (tx->tx_lnetmsg != NULL); g_lock = &ksocknal_data.ksnd_global_lock; for (retry = 0;; retry = 1) { #if !SOCKNAL_ROUND_ROBIN read_lock (g_lock); peer = ksocknal_find_peer_locked(ni, id); if (peer != NULL) { if (ksocknal_find_connectable_route_locked(peer) == NULL) { conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer); if (conn != NULL) { /* I've got no routes that need to be * connecting and I do have an actual * connection... */ ksocknal_queue_tx_locked (tx, conn); read_unlock (g_lock); return (0); } } } /* I'll need a write lock... */ read_unlock (g_lock); #endif write_lock_bh (g_lock); peer = ksocknal_find_peer_locked(ni, id); if (peer != NULL) break; write_unlock_bh (g_lock); if ((id.pid & LNET_PID_USERFLAG) != 0) { CERROR("Refusing to create a connection to " "userspace process %s\n", libcfs_id2str(id)); return -EHOSTUNREACH; } if (retry) { CERROR("Can't find peer %s\n", libcfs_id2str(id)); return -EHOSTUNREACH; } rc = ksocknal_add_peer(ni, id, LNET_NIDADDR(id.nid), lnet_acceptor_port()); if (rc != 0) { CERROR("Can't add peer %s: %d\n", libcfs_id2str(id), rc); return rc; } } for (;;) { /* launch any/all connections that need it */ route = ksocknal_find_connectable_route_locked (peer); if (route == NULL) break; ksocknal_launch_connection_locked (route); } conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer); if (conn != NULL) { /* Connection exists; queue message on it */ ksocknal_queue_tx_locked (tx, conn); write_unlock_bh (g_lock); return (0); } if (peer->ksnp_accepting > 0 || ksocknal_find_connecting_route_locked (peer) != NULL) { /* the message is going to be pinned to the peer */ tx->tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); /* Queue the message until a connection is established */ list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue); write_unlock_bh (g_lock); return 0; } write_unlock_bh (g_lock); /* NB Routes may be ignored if connections to them failed recently */ CDEBUG(D_NETERROR, "No usable routes to %s\n", libcfs_id2str(id)); return (-EHOSTUNREACH); } int ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) { int type = lntmsg->msg_type; lnet_process_id_t target = lntmsg->msg_target; unsigned int payload_niov = lntmsg->msg_niov; struct iovec *payload_iov = lntmsg->msg_iov; lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; ksock_tx_t *tx; int desc_size; int rc; /* NB 'private' is different depending on what we're sending. * Just ignore it... */ CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n", payload_nob, payload_niov, libcfs_id2str(target)); LASSERT (payload_nob == 0 || payload_niov > 0); LASSERT (payload_niov <= LNET_MAX_IOV); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); LASSERT (!in_interrupt ()); if (payload_iov != NULL) desc_size = offsetof(ksock_tx_t, tx_frags.virt.iov[1 + payload_niov]); else desc_size = offsetof(ksock_tx_t, tx_frags.paged.kiov[payload_niov]); tx = ksocknal_alloc_tx(desc_size); if (tx == NULL) { CERROR("Can't allocate tx desc type %d size %d\n", type, desc_size); return (-ENOMEM); } tx->tx_conn = NULL; /* set when assigned a conn */ tx->tx_lnetmsg = lntmsg; if (payload_iov != NULL) { tx->tx_kiov = NULL; tx->tx_nkiov = 0; tx->tx_iov = tx->tx_frags.virt.iov; tx->tx_niov = 1 + lnet_extract_iov(payload_niov, &tx->tx_iov[1], payload_niov, payload_iov, payload_offset, payload_nob); } else { tx->tx_niov = 1; tx->tx_iov = &tx->tx_frags.paged.iov; tx->tx_kiov = tx->tx_frags.paged.kiov; tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov, payload_niov, payload_kiov, payload_offset, payload_nob); } ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_LNET); /* The first fragment will be set later in pro_pack */ rc = ksocknal_launch_packet(ni, tx, target); if (rc == 0) return (0); ksocknal_free_tx(tx); return (-EIO); } int ksocknal_thread_start (int (*fn)(void *arg), void *arg) { long pid = cfs_kernel_thread (fn, arg, 0); if (pid < 0) return ((int)pid); write_lock_bh (&ksocknal_data.ksnd_global_lock); ksocknal_data.ksnd_nthreads++; write_unlock_bh (&ksocknal_data.ksnd_global_lock); return (0); } void ksocknal_thread_fini (void) { write_lock_bh (&ksocknal_data.ksnd_global_lock); ksocknal_data.ksnd_nthreads--; write_unlock_bh (&ksocknal_data.ksnd_global_lock); } int ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip) { static char ksocknal_slop_buffer[4096]; int nob; unsigned int niov; int skipped; LASSERT(conn->ksnc_proto != NULL); if ((*ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0) { /* Remind the socket to ack eagerly... */ ksocknal_lib_eager_ack(conn); } if (nob_to_skip == 0) { /* right at next packet boundary now */ conn->ksnc_rx_started = 0; mb (); /* racing with timeout thread */ switch (conn->ksnc_proto->pro_version) { case KSOCK_PROTO_V2: conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER; conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg; conn->ksnc_rx_nob_wanted = offsetof(ksock_msg_t, ksm_u); conn->ksnc_rx_nob_left = offsetof(ksock_msg_t, ksm_u); conn->ksnc_rx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u); break; case KSOCK_PROTO_V1: /* Receiving bare lnet_hdr_t */ conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER; conn->ksnc_rx_nob_wanted = sizeof(lnet_hdr_t); conn->ksnc_rx_nob_left = sizeof(lnet_hdr_t); conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg; conn->ksnc_rx_iov[0].iov_len = sizeof (lnet_hdr_t); break; default: LBUG (); } conn->ksnc_rx_niov = 1; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_nkiov = 0; conn->ksnc_rx_csum = ~0; return (1); } /* Set up to skip as much as possible now. If there's more left * (ran out of iov entries) we'll get called again */ conn->ksnc_rx_state = SOCKNAL_RX_SLOP; conn->ksnc_rx_nob_left = nob_to_skip; conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; skipped = 0; niov = 0; do { nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer)); conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer; conn->ksnc_rx_iov[niov].iov_len = nob; niov++; skipped += nob; nob_to_skip -=nob; } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */ niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec)); conn->ksnc_rx_niov = niov; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_nkiov = 0; conn->ksnc_rx_nob_wanted = skipped; return (0); } /* (Sink) handle incoming ZC request from sender */ static int ksocknal_handle_zc_req(ksock_peer_t *peer, __u64 cookie) { ksock_conn_t *conn; ksock_tx_t *tx; ksock_sched_t *sched; int rc; read_lock (&ksocknal_data.ksnd_global_lock); conn = ksocknal_find_conn_locked (0, peer); if (conn == NULL) { read_unlock (&ksocknal_data.ksnd_global_lock); CERROR("Can't find connection to send zcack.\n"); return -ECONNRESET; } sched = conn->ksnc_scheduler; spin_lock_bh (&sched->kss_lock); rc = ksocknal_piggyback_zcack(conn, cookie); spin_unlock_bh (&sched->kss_lock); read_unlock (&ksocknal_data.ksnd_global_lock); if (rc) { /* Ack cookie is piggybacked */ return 0; } tx = ksocknal_alloc_tx(KSOCK_NOOP_TX_SIZE); if (tx == NULL) { CERROR("Can't allocate noop tx desc\n"); return -ENOMEM; } tx->tx_conn = NULL; tx->tx_lnetmsg = NULL; tx->tx_kiov = NULL; tx->tx_nkiov = 0; tx->tx_iov = tx->tx_frags.virt.iov; tx->tx_niov = 1; ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP); tx->tx_msg.ksm_zc_ack_cookie = cookie; /* incoming cookie */ read_lock (&ksocknal_data.ksnd_global_lock); conn = ksocknal_find_conn_locked (0, peer); if (conn == NULL) { read_unlock (&ksocknal_data.ksnd_global_lock); ksocknal_free_tx(tx); CERROR("Can't find connection to send zcack.\n"); return -ECONNRESET; } ksocknal_queue_tx_locked(tx, conn); read_unlock (&ksocknal_data.ksnd_global_lock); return 0; } /* (Sender) handle ZC_ACK from sink */ static int ksocknal_handle_zc_ack(ksock_peer_t *peer, __u64 cookie) { ksock_tx_t *tx; struct list_head *ctmp; spin_lock(&peer->ksnp_lock); list_for_each(ctmp, &peer->ksnp_zc_req_list) { tx = list_entry (ctmp, ksock_tx_t, tx_zc_list); if (tx->tx_msg.ksm_zc_req_cookie != cookie) continue; tx->tx_msg.ksm_zc_req_cookie = 0; list_del(&tx->tx_zc_list); spin_unlock(&peer->ksnp_lock); ksocknal_tx_decref(tx); return 0; } spin_unlock(&peer->ksnp_lock); return -EPROTO; } int ksocknal_process_receive (ksock_conn_t *conn) { int rc; LASSERT (atomic_read(&conn->ksnc_conn_refcount) > 0); /* NB: sched lock NOT held */ /* SOCKNAL_RX_LNET_HEADER is here for backward compatability */ LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER || conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD || conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER || conn->ksnc_rx_state == SOCKNAL_RX_SLOP); again: if (conn->ksnc_rx_nob_wanted != 0) { rc = ksocknal_receive(conn); if (rc <= 0) { LASSERT (rc != -EAGAIN); if (rc == 0) CDEBUG (D_NET, "[%p] EOF from %s" " ip %d.%d.%d.%d:%d\n", conn, libcfs_id2str(conn->ksnc_peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); else if (!conn->ksnc_closing) CERROR ("[%p] Error %d on read from %s" " ip %d.%d.%d.%d:%d\n", conn, rc, libcfs_id2str(conn->ksnc_peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); /* it's not an error if conn is being closed */ ksocknal_close_conn_and_siblings (conn, (conn->ksnc_closing) ? 0 : rc); return (rc == 0 ? -ESHUTDOWN : rc); } if (conn->ksnc_rx_nob_wanted != 0) { /* short read */ return (-EAGAIN); } } switch (conn->ksnc_rx_state) { case SOCKNAL_RX_KSM_HEADER: if (conn->ksnc_flip) { __swab32s(&conn->ksnc_msg.ksm_type); __swab32s(&conn->ksnc_msg.ksm_csum); __swab64s(&conn->ksnc_msg.ksm_zc_req_cookie); __swab64s(&conn->ksnc_msg.ksm_zc_ack_cookie); } if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP && conn->ksnc_msg.ksm_type != KSOCK_MSG_LNET) { CERROR("%s: Unknown message type: %x\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_msg.ksm_type); ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings(conn, -EPROTO); return (-EPROTO); } if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP && conn->ksnc_msg.ksm_csum != 0 && /* has checksum */ conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) { /* NOOP Checksum error */ CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum); ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings(conn, -EPROTO); return (-EIO); } if (conn->ksnc_msg.ksm_zc_ack_cookie != 0) { LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); rc = ksocknal_handle_zc_ack(conn->ksnc_peer, conn->ksnc_msg.ksm_zc_ack_cookie); if (rc != 0) { CERROR("%s: Unknown zero copy ACK cookie: "LPU64"\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_msg.ksm_zc_ack_cookie); ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings(conn, -EPROTO); return (rc); } } if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) { ksocknal_new_packet (conn, 0); return 0; /* NOOP is done and just return */ } conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER; conn->ksnc_rx_nob_wanted = sizeof(ksock_lnet_msg_t); conn->ksnc_rx_nob_left = sizeof(ksock_lnet_msg_t); conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg; conn->ksnc_rx_iov[0].iov_len = sizeof(ksock_lnet_msg_t); conn->ksnc_rx_niov = 1; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_nkiov = 0; goto again; /* read lnet header now */ case SOCKNAL_RX_LNET_HEADER: /* unpack message header */ conn->ksnc_proto->pro_unpack(&conn->ksnc_msg); if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) { /* Userspace peer */ lnet_process_id_t *id = &conn->ksnc_peer->ksnp_id; lnet_hdr_t *lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr; /* Substitute process ID assigned at connection time */ lhdr->src_pid = cpu_to_le32(id->pid); lhdr->src_nid = cpu_to_le64(id->nid); } conn->ksnc_rx_state = SOCKNAL_RX_PARSE; ksocknal_conn_addref(conn); /* ++ref while parsing */ rc = lnet_parse(conn->ksnc_peer->ksnp_ni, &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr, conn->ksnc_peer->ksnp_id.nid, conn, 0); if (rc < 0) { /* I just received garbage: give up on this conn */ ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings (conn, rc); ksocknal_conn_decref(conn); return (-EPROTO); } /* I'm racing with ksocknal_recv() */ LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_PARSE || conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD); if (conn->ksnc_rx_state != SOCKNAL_RX_LNET_PAYLOAD) return 0; /* ksocknal_recv() got called */ goto again; case SOCKNAL_RX_LNET_PAYLOAD: /* payload all received */ rc = 0; if (conn->ksnc_rx_nob_left == 0 && /* not truncating */ conn->ksnc_msg.ksm_csum != 0 && /* has checksum */ conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) { CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum); rc = -EIO; } lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc); if (rc == 0 && conn->ksnc_msg.ksm_zc_req_cookie != 0) { LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); rc = ksocknal_handle_zc_req(conn->ksnc_peer, conn->ksnc_msg.ksm_zc_req_cookie); } if (rc != 0) { ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings (conn, rc); return (-EPROTO); } /* Fall through */ case SOCKNAL_RX_SLOP: /* starting new packet? */ if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left)) return 0; /* come back later */ goto again; /* try to finish reading slop now */ default: break; } /* Not Reached */ LBUG (); return (-EINVAL); /* keep gcc happy */ } int ksocknal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen) { ksock_conn_t *conn = (ksock_conn_t *)private; ksock_sched_t *sched = conn->ksnc_scheduler; LASSERT (mlen <= rlen); LASSERT (niov <= LNET_MAX_IOV); conn->ksnc_cookie = msg; conn->ksnc_rx_nob_wanted = mlen; conn->ksnc_rx_nob_left = rlen; if (mlen == 0 || iov != NULL) { conn->ksnc_rx_nkiov = 0; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov; conn->ksnc_rx_niov = lnet_extract_iov(LNET_MAX_IOV, conn->ksnc_rx_iov, niov, iov, offset, mlen); } else { conn->ksnc_rx_niov = 0; conn->ksnc_rx_iov = NULL; conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; conn->ksnc_rx_nkiov = lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov, niov, kiov, offset, mlen); } LASSERT (mlen == lnet_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + lnet_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); LASSERT (conn->ksnc_rx_scheduled); spin_lock_bh (&sched->kss_lock); switch (conn->ksnc_rx_state) { case SOCKNAL_RX_PARSE_WAIT: list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns); cfs_waitq_signal (&sched->kss_waitq); LASSERT (conn->ksnc_rx_ready); break; case SOCKNAL_RX_PARSE: /* scheduler hasn't noticed I'm parsing yet */ break; } conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD; spin_unlock_bh (&sched->kss_lock); ksocknal_conn_decref(conn); return (0); } static inline int ksocknal_sched_cansleep(ksock_sched_t *sched) { int rc; spin_lock_bh (&sched->kss_lock); rc = (!ksocknal_data.ksnd_shuttingdown && list_empty(&sched->kss_rx_conns) && list_empty(&sched->kss_tx_conns)); spin_unlock_bh (&sched->kss_lock); return (rc); } int ksocknal_scheduler (void *arg) { ksock_sched_t *sched = (ksock_sched_t *)arg; ksock_conn_t *conn; ksock_tx_t *tx; int rc; int nloops = 0; int id = (int)(sched - ksocknal_data.ksnd_schedulers); char name[16]; snprintf (name, sizeof (name),"socknal_sd%02d", id); cfs_daemonize (name); cfs_block_allsigs (); #if defined(CONFIG_SMP) && defined(CPU_AFFINITY) id = ksocknal_sched2cpu(id); if (cpu_online(id)) { cpumask_t m = CPU_MASK_NONE; cpu_set(id, m); set_cpus_allowed(current, m); } else { CERROR ("Can't set CPU affinity for %s to %d\n", name, id); } #endif /* CONFIG_SMP && CPU_AFFINITY */ spin_lock_bh (&sched->kss_lock); while (!ksocknal_data.ksnd_shuttingdown) { int did_something = 0; /* Ensure I progress everything semi-fairly */ if (!list_empty (&sched->kss_rx_conns)) { conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list); list_del(&conn->ksnc_rx_list); LASSERT(conn->ksnc_rx_scheduled); LASSERT(conn->ksnc_rx_ready); /* clear rx_ready in case receive isn't complete. * Do it BEFORE we call process_recv, since * data_ready can set it any time after we release * kss_lock. */ conn->ksnc_rx_ready = 0; spin_unlock_bh (&sched->kss_lock); rc = ksocknal_process_receive(conn); spin_lock_bh (&sched->kss_lock); /* I'm the only one that can clear this flag */ LASSERT(conn->ksnc_rx_scheduled); /* Did process_receive get everything it wanted? */ if (rc == 0) conn->ksnc_rx_ready = 1; if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) { /* Conn blocked waiting for ksocknal_recv() * I change its state (under lock) to signal * it can be rescheduled */ conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT; } else if (conn->ksnc_rx_ready) { /* reschedule for rx */ list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns); } else { conn->ksnc_rx_scheduled = 0; /* drop my ref */ ksocknal_conn_decref(conn); } did_something = 1; } if (!list_empty (&sched->kss_tx_conns)) { CFS_LIST_HEAD (zlist); if (!list_empty(&sched->kss_zombie_noop_txs)) { list_add(&zlist, &sched->kss_zombie_noop_txs); list_del_init(&sched->kss_zombie_noop_txs); } conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list); list_del (&conn->ksnc_tx_list); LASSERT(conn->ksnc_tx_scheduled); LASSERT(conn->ksnc_tx_ready); LASSERT(!list_empty(&conn->ksnc_tx_queue)); tx = list_entry(conn->ksnc_tx_queue.next, ksock_tx_t, tx_list); if (conn->ksnc_tx_mono == tx) ksocknal_next_mono_tx(conn); /* dequeue now so empty list => more to send */ list_del(&tx->tx_list); /* Clear tx_ready in case send isn't complete. Do * it BEFORE we call process_transmit, since * write_space can set it any time after we release * kss_lock. */ conn->ksnc_tx_ready = 0; spin_unlock_bh (&sched->kss_lock); if (!list_empty(&zlist)) { /* free zombie noop txs, it's fast because * noop txs are just put in freelist */ ksocknal_txlist_done(NULL, &zlist, 0); } rc = ksocknal_process_transmit(conn, tx); if (rc == -ENOMEM || rc == -EAGAIN) { /* Incomplete send: replace tx on HEAD of tx_queue */ spin_lock_bh (&sched->kss_lock); list_add (&tx->tx_list, &conn->ksnc_tx_queue); } else { /* Complete send; tx -ref */ ksocknal_tx_decref (tx); spin_lock_bh (&sched->kss_lock); /* assume space for more */ conn->ksnc_tx_ready = 1; } if (rc == -ENOMEM) { /* Do nothing; after a short timeout, this * conn will be reposted on kss_tx_conns. */ } else if (conn->ksnc_tx_ready && !list_empty (&conn->ksnc_tx_queue)) { /* reschedule for tx */ list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); } else { conn->ksnc_tx_scheduled = 0; /* drop my ref */ ksocknal_conn_decref(conn); } did_something = 1; } if (!did_something || /* nothing to do */ ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */ spin_unlock_bh (&sched->kss_lock); nloops = 0; if (!did_something) { /* wait for something to do */ cfs_wait_event_interruptible_exclusive( sched->kss_waitq, !ksocknal_sched_cansleep(sched), rc); LASSERT (rc == 0); } else { our_cond_resched(); } spin_lock_bh (&sched->kss_lock); } } spin_unlock_bh (&sched->kss_lock); ksocknal_thread_fini (); return (0); } /* * Add connection to kss_rx_conns of scheduler * and wakeup the scheduler. */ void ksocknal_read_callback (ksock_conn_t *conn) { ksock_sched_t *sched; ENTRY; sched = conn->ksnc_scheduler; spin_lock_bh (&sched->kss_lock); conn->ksnc_rx_ready = 1; if (!conn->ksnc_rx_scheduled) { /* not being progressed */ list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns); conn->ksnc_rx_scheduled = 1; /* extra ref for scheduler */ ksocknal_conn_addref(conn); cfs_waitq_signal (&sched->kss_waitq); } spin_unlock_bh (&sched->kss_lock); EXIT; } /* * Add connection to kss_tx_conns of scheduler * and wakeup the scheduler. */ void ksocknal_write_callback (ksock_conn_t *conn) { ksock_sched_t *sched; ENTRY; sched = conn->ksnc_scheduler; spin_lock_bh (&sched->kss_lock); conn->ksnc_tx_ready = 1; if (!conn->ksnc_tx_scheduled && // not being progressed !list_empty(&conn->ksnc_tx_queue)){//packets to send list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); conn->ksnc_tx_scheduled = 1; /* extra ref for scheduler */ ksocknal_conn_addref(conn); cfs_waitq_signal (&sched->kss_waitq); } spin_unlock_bh (&sched->kss_lock); EXIT; } ksock_proto_t * ksocknal_parse_proto_version (ksock_hello_msg_t *hello) { if ((hello->kshm_magic == LNET_PROTO_MAGIC && hello->kshm_version == KSOCK_PROTO_V2) || (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC) && hello->kshm_version == __swab32(KSOCK_PROTO_V2))) { #if SOCKNAL_VERSION_DEBUG if (*ksocknal_tunables.ksnd_protocol != 2) return NULL; #endif return &ksocknal_protocol_v2x; } if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) { lnet_magicversion_t *hmv = (lnet_magicversion_t *)hello; CLASSERT (sizeof (lnet_magicversion_t) == offsetof (ksock_hello_msg_t, kshm_src_nid)); if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) && hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR)) return &ksocknal_protocol_v1x; } return NULL; } static int ksocknal_send_hello_v1 (ksock_conn_t *conn, ksock_hello_msg_t *hello) { cfs_socket_t *sock = conn->ksnc_sock; lnet_hdr_t *hdr; lnet_magicversion_t *hmv; int rc; int i; CLASSERT(sizeof(lnet_magicversion_t) == offsetof(lnet_hdr_t, src_nid)); LIBCFS_ALLOC(hdr, sizeof(*hdr)); if (hdr == NULL) { CERROR("Can't allocate lnet_hdr_t\n"); return -ENOMEM; } hmv = (lnet_magicversion_t *)&hdr->dest_nid; /* Re-organize V2.x message header to V1.x (lnet_hdr_t) * header and send out */ hmv->magic = cpu_to_le32 (LNET_PROTO_TCP_MAGIC); hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR); hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR); if (the_lnet.ln_testprotocompat != 0) { /* single-shot proto check */ LNET_LOCK(); if ((the_lnet.ln_testprotocompat & 1) != 0) { hmv->version_major++; /* just different! */ the_lnet.ln_testprotocompat &= ~1; } if ((the_lnet.ln_testprotocompat & 2) != 0) { hmv->magic = LNET_PROTO_MAGIC; the_lnet.ln_testprotocompat &= ~2; } LNET_UNLOCK(); } hdr->src_nid = cpu_to_le64 (hello->kshm_src_nid); hdr->src_pid = cpu_to_le32 (hello->kshm_src_pid); hdr->type = cpu_to_le32 (LNET_MSG_HELLO); hdr->payload_length = cpu_to_le32 (hello->kshm_nips * sizeof(__u32)); hdr->msg.hello.type = cpu_to_le32 (hello->kshm_ctype); hdr->msg.hello.incarnation = cpu_to_le64 (hello->kshm_src_incarnation); rc = libcfs_sock_write(sock, hdr, sizeof(*hdr), lnet_acceptor_timeout()); if (rc != 0) { CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n", rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); goto out; } if (hello->kshm_nips == 0) goto out; for (i = 0; i < (int) hello->kshm_nips; i++) { hello->kshm_ips[i] = __cpu_to_le32 (hello->kshm_ips[i]); } rc = libcfs_sock_write(sock, hello->kshm_ips, hello->kshm_nips * sizeof(__u32), lnet_acceptor_timeout()); if (rc != 0) { CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)" " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); } out: LIBCFS_FREE(hdr, sizeof(*hdr)); return rc; } static int ksocknal_send_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello) { cfs_socket_t *sock = conn->ksnc_sock; int rc; hello->kshm_magic = LNET_PROTO_MAGIC; hello->kshm_version = KSOCK_PROTO_V2; if (the_lnet.ln_testprotocompat != 0) { /* single-shot proto check */ LNET_LOCK(); if ((the_lnet.ln_testprotocompat & 1) != 0) { hello->kshm_version++; /* just different! */ the_lnet.ln_testprotocompat &= ~1; } LNET_UNLOCK(); } rc = libcfs_sock_write(sock, hello, offsetof(ksock_hello_msg_t, kshm_ips), lnet_acceptor_timeout()); if (rc != 0) { CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n", rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); return rc; } if (hello->kshm_nips == 0) return 0; rc = libcfs_sock_write(sock, hello->kshm_ips, hello->kshm_nips * sizeof(__u32), lnet_acceptor_timeout()); if (rc != 0) { CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)" " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); } return rc; } static int ksocknal_recv_hello_v1(ksock_conn_t *conn, ksock_hello_msg_t *hello,int timeout) { cfs_socket_t *sock = conn->ksnc_sock; lnet_hdr_t *hdr; int rc; int i; LIBCFS_ALLOC(hdr, sizeof(*hdr)); if (hdr == NULL) { CERROR("Can't allocate lnet_hdr_t\n"); return -ENOMEM; } rc = libcfs_sock_read(sock, &hdr->src_nid, sizeof (*hdr) - offsetof (lnet_hdr_t, src_nid), timeout); if (rc != 0) { CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n", rc, HIPQUAD(conn->ksnc_ipaddr)); LASSERT (rc < 0 && rc != -EALREADY); goto out; } /* ...and check we got what we expected */ if (hdr->type != cpu_to_le32 (LNET_MSG_HELLO)) { CERROR ("Expecting a HELLO hdr," " but got type %d from %u.%u.%u.%u\n", le32_to_cpu (hdr->type), HIPQUAD(conn->ksnc_ipaddr)); rc = -EPROTO; goto out; } hello->kshm_src_nid = le64_to_cpu (hdr->src_nid); hello->kshm_src_pid = le32_to_cpu (hdr->src_pid); hello->kshm_src_incarnation = le64_to_cpu (hdr->msg.hello.incarnation); hello->kshm_ctype = le32_to_cpu (hdr->msg.hello.type); hello->kshm_nips = le32_to_cpu (hdr->payload_length) / sizeof (__u32); if (hello->kshm_nips > LNET_MAX_INTERFACES) { CERROR("Bad nips %d from ip %u.%u.%u.%u\n", hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr)); rc = -EPROTO; goto out; } if (hello->kshm_nips == 0) goto out; rc = libcfs_sock_read(sock, hello->kshm_ips, hello->kshm_nips * sizeof(__u32), timeout); if (rc != 0) { CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n", rc, HIPQUAD(conn->ksnc_ipaddr)); LASSERT (rc < 0 && rc != -EALREADY); goto out; } for (i = 0; i < (int) hello->kshm_nips; i++) { hello->kshm_ips[i] = __le32_to_cpu(hello->kshm_ips[i]); if (hello->kshm_ips[i] == 0) { CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n", i, HIPQUAD(conn->ksnc_ipaddr)); rc = -EPROTO; break; } } out: LIBCFS_FREE(hdr, sizeof(*hdr)); return rc; } static int ksocknal_recv_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello, int timeout) { cfs_socket_t *sock = conn->ksnc_sock; int rc; int i; if (hello->kshm_magic == LNET_PROTO_MAGIC) conn->ksnc_flip = 0; else conn->ksnc_flip = 1; rc = libcfs_sock_read(sock, &hello->kshm_src_nid, offsetof(ksock_hello_msg_t, kshm_ips) - offsetof(ksock_hello_msg_t, kshm_src_nid), timeout); if (rc != 0) { CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n", rc, HIPQUAD(conn->ksnc_ipaddr)); LASSERT (rc < 0 && rc != -EALREADY); return rc; } if (conn->ksnc_flip) { __swab32s(&hello->kshm_src_pid); __swab64s(&hello->kshm_src_nid); __swab32s(&hello->kshm_dst_pid); __swab64s(&hello->kshm_dst_nid); __swab64s(&hello->kshm_src_incarnation); __swab64s(&hello->kshm_dst_incarnation); __swab32s(&hello->kshm_ctype); __swab32s(&hello->kshm_nips); } if (hello->kshm_nips > LNET_MAX_INTERFACES) { CERROR("Bad nips %d from ip %u.%u.%u.%u\n", hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr)); return -EPROTO; } if (hello->kshm_nips == 0) return 0; rc = libcfs_sock_read(sock, hello->kshm_ips, hello->kshm_nips * sizeof(__u32), timeout); if (rc != 0) { CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n", rc, HIPQUAD(conn->ksnc_ipaddr)); LASSERT (rc < 0 && rc != -EALREADY); return rc; } for (i = 0; i < (int) hello->kshm_nips; i++) { if (conn->ksnc_flip) __swab32s(&hello->kshm_ips[i]); if (hello->kshm_ips[i] == 0) { CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n", i, HIPQUAD(conn->ksnc_ipaddr)); return -EPROTO; } } return 0; } static void ksocknal_pack_msg_v1(ksock_tx_t *tx) { /* V1.x has no KSOCK_MSG_NOOP */ LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP); LASSERT(tx->tx_lnetmsg != NULL); tx->tx_iov[0].iov_base = (void *)&tx->tx_lnetmsg->msg_hdr; tx->tx_iov[0].iov_len = sizeof(lnet_hdr_t); tx->tx_resid = tx->tx_nob = tx->tx_lnetmsg->msg_len + sizeof(lnet_hdr_t); } static void ksocknal_pack_msg_v2(ksock_tx_t *tx) { tx->tx_iov[0].iov_base = (void *)&tx->tx_msg; if (tx->tx_lnetmsg != NULL) { LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP); tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = tx->tx_lnetmsg->msg_hdr; tx->tx_iov[0].iov_len = sizeof(ksock_msg_t); tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + tx->tx_lnetmsg->msg_len; } else { LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_NOOP); tx->tx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr); tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr); } /* Don't checksum before start sending, because packet can be piggybacked with ACK */ } static void ksocknal_unpack_msg_v1(ksock_msg_t *msg) { msg->ksm_type = KSOCK_MSG_LNET; msg->ksm_csum = 0; msg->ksm_zc_req_cookie = 0; msg->ksm_zc_ack_cookie = 0; } static void ksocknal_unpack_msg_v2(ksock_msg_t *msg) { return; /* Do nothing */ } ksock_proto_t ksocknal_protocol_v1x = { KSOCK_PROTO_V1, ksocknal_send_hello_v1, ksocknal_recv_hello_v1, ksocknal_pack_msg_v1, ksocknal_unpack_msg_v1 }; ksock_proto_t ksocknal_protocol_v2x = { KSOCK_PROTO_V2, ksocknal_send_hello_v2, ksocknal_recv_hello_v2, ksocknal_pack_msg_v2, ksocknal_unpack_msg_v2 }; int ksocknal_send_hello (lnet_ni_t *ni, ksock_conn_t *conn, lnet_nid_t peer_nid, ksock_hello_msg_t *hello) { /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */ ksock_net_t *net = (ksock_net_t *)ni->ni_data; LASSERT (hello->kshm_nips <= LNET_MAX_INTERFACES); /* rely on caller to hold a ref on socket so it wouldn't disappear */ LASSERT (conn->ksnc_proto != NULL); hello->kshm_src_nid = ni->ni_nid; hello->kshm_dst_nid = peer_nid; hello->kshm_src_pid = the_lnet.ln_pid; hello->kshm_src_incarnation = net->ksnn_incarnation; hello->kshm_ctype = conn->ksnc_type; return conn->ksnc_proto->pro_send_hello(conn, hello); } int ksocknal_invert_type(int type) { switch (type) { case SOCKLND_CONN_ANY: case SOCKLND_CONN_CONTROL: return (type); case SOCKLND_CONN_BULK_IN: return SOCKLND_CONN_BULK_OUT; case SOCKLND_CONN_BULK_OUT: return SOCKLND_CONN_BULK_IN; default: return (SOCKLND_CONN_NONE); } } int ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn, ksock_hello_msg_t *hello, lnet_process_id_t *peerid, __u64 *incarnation) { /* Return < 0 fatal error * 0 success * EALREADY lost connection race * EPROTO protocol version mismatch */ cfs_socket_t *sock = conn->ksnc_sock; int active = (conn->ksnc_proto != NULL); int timeout; int proto_match; int rc; ksock_proto_t *proto; lnet_process_id_t recv_id; /* socket type set on active connections - not set on passive */ LASSERT (!active == !(conn->ksnc_type != SOCKLND_CONN_NONE)); timeout = active ? *ksocknal_tunables.ksnd_timeout : lnet_acceptor_timeout(); rc = libcfs_sock_read(sock, &hello->kshm_magic, sizeof (hello->kshm_magic), timeout); if (rc != 0) { CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n", rc, HIPQUAD(conn->ksnc_ipaddr)); LASSERT (rc < 0); return rc; } if (hello->kshm_magic != LNET_PROTO_MAGIC && hello->kshm_magic != __swab32(LNET_PROTO_MAGIC) && hello->kshm_magic != le32_to_cpu (LNET_PROTO_TCP_MAGIC)) { /* Unexpected magic! */ CERROR ("Bad magic(1) %#08x (%#08x expected) from " "%u.%u.%u.%u\n", __cpu_to_le32 (hello->kshm_magic), LNET_PROTO_TCP_MAGIC, HIPQUAD(conn->ksnc_ipaddr)); return -EPROTO; } rc = libcfs_sock_read(sock, &hello->kshm_version, sizeof(hello->kshm_version), timeout); if (rc != 0) { CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n", rc, HIPQUAD(conn->ksnc_ipaddr)); LASSERT (rc < 0); return rc; } proto = ksocknal_parse_proto_version(hello); if (proto == NULL) { if (!active) { /* unknown protocol from peer, tell peer my protocol */ conn->ksnc_proto = &ksocknal_protocol_v2x; #if SOCKNAL_VERSION_DEBUG if (*ksocknal_tunables.ksnd_protocol != 2) conn->ksnc_proto = &ksocknal_protocol_v1x; #endif hello->kshm_nips = 0; ksocknal_send_hello(ni, conn, ni->ni_nid, hello); } CERROR ("Unknown protocol version (%d.x expected)" " from %u.%u.%u.%u\n", conn->ksnc_proto->pro_version, HIPQUAD(conn->ksnc_ipaddr)); return -EPROTO; } proto_match = (conn->ksnc_proto == proto); conn->ksnc_proto = proto; /* receive the rest of hello message anyway */ rc = conn->ksnc_proto->pro_recv_hello(conn, hello, timeout); if (rc != 0) { CERROR("Error %d reading or checking hello from from %u.%u.%u.%u\n", rc, HIPQUAD(conn->ksnc_ipaddr)); LASSERT (rc < 0); return rc; } *incarnation = hello->kshm_src_incarnation; if (hello->kshm_src_nid == LNET_NID_ANY) { CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY" "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr)); return -EPROTO; } if (!active && conn->ksnc_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) { /* Userspace NAL assigns peer process ID from socket */ recv_id.pid = conn->ksnc_port | LNET_PID_USERFLAG; recv_id.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), conn->ksnc_ipaddr); } else { recv_id.nid = hello->kshm_src_nid; recv_id.pid = hello->kshm_src_pid; } if (!active) { *peerid = recv_id; /* peer determines type */ conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype); if (conn->ksnc_type == SOCKLND_CONN_NONE) { CERROR ("Unexpected type %d from %s ip %u.%u.%u.%u\n", hello->kshm_ctype, libcfs_id2str(*peerid), HIPQUAD(conn->ksnc_ipaddr)); return -EPROTO; } return 0; } if (peerid->pid != recv_id.pid || peerid->nid != recv_id.nid) { LCONSOLE_ERROR_MSG(0x130, "Connected successfully to %s on host" " %u.%u.%u.%u, but they claimed they were " "%s; please check your Lustre " "configuration.\n", libcfs_id2str(*peerid), HIPQUAD(conn->ksnc_ipaddr), libcfs_id2str(recv_id)); return -EPROTO; } if (hello->kshm_ctype == SOCKLND_CONN_NONE) { /* Possible protocol mismatch or I lost the connection race */ return proto_match ? EALREADY : EPROTO; } if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) { CERROR ("Mismatched types: me %d, %s ip %u.%u.%u.%u %d\n", conn->ksnc_type, libcfs_id2str(*peerid), HIPQUAD(conn->ksnc_ipaddr), hello->kshm_ctype); return -EPROTO; } return 0; } void ksocknal_connect (ksock_route_t *route) { CFS_LIST_HEAD (zombies); ksock_peer_t *peer = route->ksnr_peer; int type; int wanted; cfs_socket_t *sock; cfs_time_t deadline; int retry_later = 0; int rc = 0; deadline = cfs_time_add(cfs_time_current(), cfs_time_seconds(*ksocknal_tunables.ksnd_timeout)); write_lock_bh (&ksocknal_data.ksnd_global_lock); LASSERT (route->ksnr_scheduled); LASSERT (!route->ksnr_connecting); route->ksnr_connecting = 1; for (;;) { wanted = ksocknal_route_mask() & ~route->ksnr_connected; /* stop connecting if peer/route got closed under me, or * route got connected while queued */ if (peer->ksnp_closing || route->ksnr_deleted || wanted == 0) { retry_later = 0; break; } /* reschedule if peer is connecting to me */ if (peer->ksnp_accepting > 0) { CDEBUG(D_NET, "peer %s(%d) already connecting to me, retry later.\n", libcfs_nid2str(peer->ksnp_id.nid), peer->ksnp_accepting); retry_later = 1; } if (retry_later) /* needs reschedule */ break; if ((wanted & (1 << SOCKLND_CONN_ANY)) != 0) { type = SOCKLND_CONN_ANY; } else if ((wanted & (1 << SOCKLND_CONN_CONTROL)) != 0) { type = SOCKLND_CONN_CONTROL; } else if ((wanted & (1 << SOCKLND_CONN_BULK_IN)) != 0) { type = SOCKLND_CONN_BULK_IN; } else { LASSERT ((wanted & (1 << SOCKLND_CONN_BULK_OUT)) != 0); type = SOCKLND_CONN_BULK_OUT; } write_unlock_bh (&ksocknal_data.ksnd_global_lock); if (cfs_time_aftereq(cfs_time_current(), deadline)) { rc = -ETIMEDOUT; lnet_connect_console_error(rc, peer->ksnp_id.nid, route->ksnr_ipaddr, route->ksnr_port); goto failed; } rc = lnet_connect(&sock, peer->ksnp_id.nid, route->ksnr_myipaddr, route->ksnr_ipaddr, route->ksnr_port); if (rc != 0) goto failed; rc = ksocknal_create_conn(peer->ksnp_ni, route, sock, type); if (rc < 0) { lnet_connect_console_error(rc, peer->ksnp_id.nid, route->ksnr_ipaddr, route->ksnr_port); goto failed; } /* A +ve RC means I have to retry because I lost the connection * race or I have to renegotiate protocol version */ retry_later = (rc != 0); if (retry_later) CDEBUG(D_NET, "peer %s: conn race, retry later.\n", libcfs_nid2str(peer->ksnp_id.nid)); write_lock_bh (&ksocknal_data.ksnd_global_lock); } route->ksnr_scheduled = 0; route->ksnr_connecting = 0; if (retry_later) { /* re-queue for attention; this frees me up to handle * the peer's incoming connection request */ ksocknal_launch_connection_locked(route); } write_unlock_bh (&ksocknal_data.ksnd_global_lock); return; failed: write_lock_bh (&ksocknal_data.ksnd_global_lock); route->ksnr_scheduled = 0; route->ksnr_connecting = 0; /* This is a retry rather than a new connection */ route->ksnr_retry_interval *= 2; route->ksnr_retry_interval = MAX(route->ksnr_retry_interval, cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms)/1000); route->ksnr_retry_interval = MIN(route->ksnr_retry_interval, cfs_time_seconds(*ksocknal_tunables.ksnd_max_reconnectms)/1000); LASSERT (route->ksnr_retry_interval != 0); route->ksnr_timeout = cfs_time_add(cfs_time_current(), route->ksnr_retry_interval); if (!list_empty(&peer->ksnp_tx_queue) && peer->ksnp_accepting == 0 && ksocknal_find_connecting_route_locked(peer) == NULL) { /* ksnp_tx_queue is queued on a conn on successful * connection */ LASSERT (list_empty (&peer->ksnp_conns)); /* take all the blocked packets while I've got the lock and * complete below... */ list_add(&zombies, &peer->ksnp_tx_queue); list_del_init(&peer->ksnp_tx_queue); } #if 0 /* irrelevent with only eager routes */ if (!route->ksnr_deleted) { /* make this route least-favourite for re-selection */ list_del(&route->ksnr_list); list_add_tail(&route->ksnr_list, &peer->ksnp_routes); } #endif write_unlock_bh (&ksocknal_data.ksnd_global_lock); ksocknal_peer_failed(peer); ksocknal_txlist_done(peer->ksnp_ni, &zombies, 1); } static inline int ksocknal_connd_connect_route_locked(void) { /* Only handle an outgoing connection request if there is someone left * to handle incoming connections */ return !list_empty(&ksocknal_data.ksnd_connd_routes) && ((ksocknal_data.ksnd_connd_connecting + 1) < *ksocknal_tunables.ksnd_nconnds); } static inline int ksocknal_connd_ready(void) { int rc; spin_lock_bh (&ksocknal_data.ksnd_connd_lock); rc = ksocknal_data.ksnd_shuttingdown || !list_empty(&ksocknal_data.ksnd_connd_connreqs) || ksocknal_connd_connect_route_locked(); spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); return rc; } int ksocknal_connd (void *arg) { long id = (long)(long_ptr_t)arg; char name[16]; ksock_connreq_t *cr; ksock_route_t *route; int rc = 0; snprintf (name, sizeof (name), "socknal_cd%02ld", id); cfs_daemonize (name); cfs_block_allsigs (); spin_lock_bh (&ksocknal_data.ksnd_connd_lock); while (!ksocknal_data.ksnd_shuttingdown) { if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) { /* Connection accepted by the listener */ cr = list_entry(ksocknal_data.ksnd_connd_connreqs.next, ksock_connreq_t, ksncr_list); list_del(&cr->ksncr_list); spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); ksocknal_create_conn(cr->ksncr_ni, NULL, cr->ksncr_sock, SOCKLND_CONN_NONE); lnet_ni_decref(cr->ksncr_ni); LIBCFS_FREE(cr, sizeof(*cr)); spin_lock_bh (&ksocknal_data.ksnd_connd_lock); } if (ksocknal_connd_connect_route_locked()) { /* Connection request */ route = list_entry (ksocknal_data.ksnd_connd_routes.next, ksock_route_t, ksnr_connd_list); list_del (&route->ksnr_connd_list); ksocknal_data.ksnd_connd_connecting++; spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); ksocknal_connect (route); ksocknal_route_decref(route); spin_lock_bh (&ksocknal_data.ksnd_connd_lock); ksocknal_data.ksnd_connd_connecting--; } spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); cfs_wait_event_interruptible_exclusive( ksocknal_data.ksnd_connd_waitq, ksocknal_connd_ready(), rc); spin_lock_bh (&ksocknal_data.ksnd_connd_lock); } spin_unlock_bh (&ksocknal_data.ksnd_connd_lock); ksocknal_thread_fini (); return (0); } ksock_conn_t * ksocknal_find_timed_out_conn (ksock_peer_t *peer) { /* We're called with a shared lock on ksnd_global_lock */ ksock_conn_t *conn; struct list_head *ctmp; list_for_each (ctmp, &peer->ksnp_conns) { int error; conn = list_entry (ctmp, ksock_conn_t, ksnc_list); /* Don't need the {get,put}connsock dance to deref ksnc_sock */ LASSERT (!conn->ksnc_closing); /* SOCK_ERROR will reset error code of socket in * some platform (like Darwin8.x) */ error = SOCK_ERROR(conn->ksnc_sock); if (error != 0) { ksocknal_conn_addref(conn); switch (error) { case ECONNRESET: CDEBUG(D_NETERROR, "A connection with %s " "(%u.%u.%u.%u:%d) was reset; " "it may have rebooted.\n", libcfs_id2str(peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); break; case ETIMEDOUT: CDEBUG(D_NETERROR, "A connection with %s " "(%u.%u.%u.%u:%d) timed out; the " "network or node may be down.\n", libcfs_id2str(peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); break; default: CDEBUG(D_NETERROR, "An unexpected network error %d " "occurred with %s " "(%u.%u.%u.%u:%d\n", error, libcfs_id2str(peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); break; } return (conn); } if (conn->ksnc_rx_started && cfs_time_aftereq(cfs_time_current(), conn->ksnc_rx_deadline)) { /* Timed out incomplete incoming message */ ksocknal_conn_addref(conn); CDEBUG(D_NETERROR, "Timeout receiving from %s " "(%u.%u.%u.%u:%d), state %d wanted %d left %d\n", libcfs_id2str(peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port, conn->ksnc_rx_state, conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left); return (conn); } if ((!list_empty(&conn->ksnc_tx_queue) || SOCK_WMEM_QUEUED(conn->ksnc_sock) != 0) && cfs_time_aftereq(cfs_time_current(), conn->ksnc_tx_deadline)) { /* Timed out messages queued for sending or * buffered in the socket's send buffer */ ksocknal_conn_addref(conn); CDEBUG(D_NETERROR, "Timeout sending data to %s " "(%u.%u.%u.%u:%d) the network or that " "node may be down.\n", libcfs_id2str(peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); return (conn); } } return (NULL); } static inline void ksocknal_flush_stale_txs(ksock_peer_t *peer) { ksock_tx_t *tx; CFS_LIST_HEAD (stale_txs); write_lock_bh (&ksocknal_data.ksnd_global_lock); while (!list_empty (&peer->ksnp_tx_queue)) { tx = list_entry (peer->ksnp_tx_queue.next, ksock_tx_t, tx_list); if (!cfs_time_aftereq(cfs_time_current(), tx->tx_deadline)) break; list_del (&tx->tx_list); list_add_tail (&tx->tx_list, &stale_txs); } write_unlock_bh (&ksocknal_data.ksnd_global_lock); ksocknal_txlist_done(peer->ksnp_ni, &stale_txs, 1); } void ksocknal_check_peer_timeouts (int idx) { struct list_head *peers = &ksocknal_data.ksnd_peers[idx]; struct list_head *ptmp; ksock_peer_t *peer; ksock_conn_t *conn; again: /* NB. We expect to have a look at all the peers and not find any * connections to time out, so we just use a shared lock while we * take a look... */ read_lock (&ksocknal_data.ksnd_global_lock); list_for_each (ptmp, peers) { peer = list_entry (ptmp, ksock_peer_t, ksnp_list); conn = ksocknal_find_timed_out_conn (peer); if (conn != NULL) { read_unlock (&ksocknal_data.ksnd_global_lock); ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT); /* NB we won't find this one again, but we can't * just proceed with the next peer, since we dropped * ksnd_global_lock and it might be dead already! */ ksocknal_conn_decref(conn); goto again; } /* we can't process stale txs right here because we're * holding only shared lock */ if (!list_empty (&peer->ksnp_tx_queue)) { ksock_tx_t *tx = list_entry (peer->ksnp_tx_queue.next, ksock_tx_t, tx_list); if (cfs_time_aftereq(cfs_time_current(), tx->tx_deadline)) { ksocknal_peer_addref(peer); read_unlock (&ksocknal_data.ksnd_global_lock); ksocknal_flush_stale_txs(peer); ksocknal_peer_decref(peer); goto again; } } } /* print out warnings about stale ZC_REQs */ cfs_list_for_each_entry_typed(peer, peers, ksock_peer_t, ksnp_list) { ksock_tx_t *tx; int n = 0; cfs_list_for_each_entry_typed(tx, &peer->ksnp_zc_req_list, ksock_tx_t, tx_zc_list) { if (!cfs_time_aftereq(cfs_time_current(), tx->tx_deadline)) break; n++; } if (n != 0) { tx = list_entry (peer->ksnp_zc_req_list.next, ksock_tx_t, tx_zc_list); CWARN("Stale ZC_REQs for peer %s detected: %d; the " "oldest (%p) timed out %ld secs ago\n", libcfs_nid2str(peer->ksnp_id.nid), n, tx, cfs_duration_sec(cfs_time_current() - tx->tx_deadline)); } } read_unlock (&ksocknal_data.ksnd_global_lock); } int ksocknal_reaper (void *arg) { cfs_waitlink_t wait; ksock_conn_t *conn; ksock_sched_t *sched; struct list_head enomem_conns; int nenomem_conns; cfs_duration_t timeout; int i; int peer_index = 0; cfs_time_t deadline = cfs_time_current(); cfs_daemonize ("socknal_reaper"); cfs_block_allsigs (); CFS_INIT_LIST_HEAD(&enomem_conns); cfs_waitlink_init (&wait); spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); while (!ksocknal_data.ksnd_shuttingdown) { if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) { conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next, ksock_conn_t, ksnc_list); list_del (&conn->ksnc_list); spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); ksocknal_terminate_conn (conn); ksocknal_conn_decref(conn); spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); continue; } if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) { conn = list_entry (ksocknal_data.ksnd_zombie_conns.next, ksock_conn_t, ksnc_list); list_del (&conn->ksnc_list); spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); ksocknal_destroy_conn (conn); spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); continue; } if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) { list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns); list_del_init(&ksocknal_data.ksnd_enomem_conns); } spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); /* reschedule all the connections that stalled with ENOMEM... */ nenomem_conns = 0; while (!list_empty (&enomem_conns)) { conn = list_entry (enomem_conns.next, ksock_conn_t, ksnc_tx_list); list_del (&conn->ksnc_tx_list); sched = conn->ksnc_scheduler; spin_lock_bh (&sched->kss_lock); LASSERT (conn->ksnc_tx_scheduled); conn->ksnc_tx_ready = 1; list_add_tail(&conn->ksnc_tx_list, &sched->kss_tx_conns); cfs_waitq_signal (&sched->kss_waitq); spin_unlock_bh (&sched->kss_lock); nenomem_conns++; } /* careful with the jiffy wrap... */ while ((timeout = cfs_time_sub(deadline, cfs_time_current())) <= 0) { const int n = 4; const int p = 1; int chunk = ksocknal_data.ksnd_peer_hash_size; /* Time to check for timeouts on a few more peers: I do * checks every 'p' seconds on a proportion of the peer * table and I need to check every connection 'n' times * within a timeout interval, to ensure I detect a * timeout on any connection within (n+1)/n times the * timeout interval. */ if (*ksocknal_tunables.ksnd_timeout > n * p) chunk = (chunk * n * p) / *ksocknal_tunables.ksnd_timeout; if (chunk == 0) chunk = 1; for (i = 0; i < chunk; i++) { ksocknal_check_peer_timeouts (peer_index); peer_index = (peer_index + 1) % ksocknal_data.ksnd_peer_hash_size; } deadline = cfs_time_add(deadline, cfs_time_seconds(p)); } if (nenomem_conns != 0) { /* Reduce my timeout if I rescheduled ENOMEM conns. * This also prevents me getting woken immediately * if any go back on my enomem list. */ timeout = SOCKNAL_ENOMEM_RETRY; } ksocknal_data.ksnd_reaper_waketime = cfs_time_add(cfs_time_current(), timeout); set_current_state (TASK_INTERRUPTIBLE); cfs_waitq_add (&ksocknal_data.ksnd_reaper_waitq, &wait); if (!ksocknal_data.ksnd_shuttingdown && list_empty (&ksocknal_data.ksnd_deathrow_conns) && list_empty (&ksocknal_data.ksnd_zombie_conns)) cfs_waitq_timedwait (&wait, CFS_TASK_INTERRUPTIBLE, timeout); set_current_state (TASK_RUNNING); cfs_waitq_del (&ksocknal_data.ksnd_reaper_waitq, &wait); spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); } spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); ksocknal_thread_fini (); return (0); }