1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 ksocknal_alloc_tx (int size)
31 ksock_tx_t *tx = NULL;
33 if (size == KSOCK_NOOP_TX_SIZE) {
34 /* searching for a noop tx in free list */
35 spin_lock(&ksocknal_data.ksnd_tx_lock);
37 if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
38 tx = list_entry(ksocknal_data.ksnd_idle_noop_txs.next,
40 LASSERT(tx->tx_desc_size == size);
41 list_del(&tx->tx_list);
44 spin_unlock(&ksocknal_data.ksnd_tx_lock);
48 LIBCFS_ALLOC(tx, size);
53 atomic_set(&tx->tx_refcount, 1);
54 tx->tx_desc_size = size;
55 atomic_inc(&ksocknal_data.ksnd_nactive_txs);
61 ksocknal_free_tx (ksock_tx_t *tx)
63 atomic_dec(&ksocknal_data.ksnd_nactive_txs);
65 if (tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) {
67 spin_lock(&ksocknal_data.ksnd_tx_lock);
69 list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs);
71 spin_unlock(&ksocknal_data.ksnd_tx_lock);
73 LIBCFS_FREE(tx, tx->tx_desc_size);
78 ksocknal_init_msg(ksock_msg_t *msg, int type)
82 msg->ksm_zc_req_cookie = 0;
83 msg->ksm_zc_ack_cookie = 0;
87 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
89 struct iovec *iov = tx->tx_iov;
93 LASSERT (tx->tx_niov > 0);
95 /* Never touch tx->tx_iov inside ksocknal_lib_send_iov() */
96 rc = ksocknal_lib_send_iov(conn, tx);
98 if (rc <= 0) /* sent nothing? */
102 LASSERT (nob <= tx->tx_resid);
107 LASSERT (tx->tx_niov > 0);
109 if (nob < iov->iov_len) {
110 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
124 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
126 lnet_kiov_t *kiov = tx->tx_kiov;
130 LASSERT (tx->tx_niov == 0);
131 LASSERT (tx->tx_nkiov > 0);
133 /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */
134 rc = ksocknal_lib_send_kiov(conn, tx);
136 if (rc <= 0) /* sent nothing? */
140 LASSERT (nob <= tx->tx_resid);
145 LASSERT(tx->tx_nkiov > 0);
147 if (nob < kiov->kiov_len) {
148 kiov->kiov_offset += nob;
149 kiov->kiov_len -= nob;
153 nob -= kiov->kiov_len;
154 tx->tx_kiov = ++kiov;
162 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
167 if (ksocknal_data.ksnd_stall_tx != 0) {
168 cfs_pause(cfs_time_seconds(ksocknal_data.ksnd_stall_tx));
171 LASSERT (tx->tx_resid != 0);
173 rc = ksocknal_connsock_addref(conn);
175 LASSERT (conn->ksnc_closing);
180 if (ksocknal_data.ksnd_enomem_tx > 0) {
182 ksocknal_data.ksnd_enomem_tx--;
184 } else if (tx->tx_niov != 0) {
185 rc = ksocknal_send_iov (conn, tx);
187 rc = ksocknal_send_kiov (conn, tx);
190 bufnob = SOCK_WMEM_QUEUED(conn->ksnc_sock);
191 if (rc > 0) /* sent something? */
192 conn->ksnc_tx_bufnob += rc; /* account it */
194 if (bufnob < conn->ksnc_tx_bufnob) {
195 /* allocated send buffer bytes < computed; infer
196 * something got ACKed */
197 conn->ksnc_tx_deadline =
198 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
199 conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
200 conn->ksnc_tx_bufnob = bufnob;
204 if (rc <= 0) { /* Didn't write anything? */
205 ksock_sched_t *sched;
207 if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
213 /* Check if EAGAIN is due to memory pressure */
215 sched = conn->ksnc_scheduler;
216 spin_lock_bh (&sched->kss_lock);
218 if (!SOCK_TEST_NOSPACE(conn->ksnc_sock) &&
219 !conn->ksnc_tx_ready) {
220 /* SOCK_NOSPACE is set when the socket fills
221 * and cleared in the write_space callback
222 * (which also sets ksnc_tx_ready). If
223 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
224 * zero, I didn't fill the socket and
225 * write_space won't reschedule me, so I
226 * return -ENOMEM to get my caller to retry
231 spin_unlock_bh (&sched->kss_lock);
235 /* socket's wmem_queued now includes 'rc' bytes */
236 atomic_sub (rc, &conn->ksnc_tx_nob);
239 } while (tx->tx_resid != 0);
241 ksocknal_connsock_decref(conn);
246 ksocknal_recv_iov (ksock_conn_t *conn)
248 struct iovec *iov = conn->ksnc_rx_iov;
252 LASSERT (conn->ksnc_rx_niov > 0);
254 /* Never touch conn->ksnc_rx_iov or change connection
255 * status inside ksocknal_lib_recv_iov */
256 rc = ksocknal_lib_recv_iov(conn);
261 /* received something... */
264 conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
265 conn->ksnc_rx_deadline =
266 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
267 mb(); /* order with setting rx_started */
268 conn->ksnc_rx_started = 1;
270 conn->ksnc_rx_nob_wanted -= nob;
271 conn->ksnc_rx_nob_left -= nob;
274 LASSERT (conn->ksnc_rx_niov > 0);
276 if (nob < iov->iov_len) {
278 iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
283 conn->ksnc_rx_iov = ++iov;
284 conn->ksnc_rx_niov--;
291 ksocknal_recv_kiov (ksock_conn_t *conn)
293 lnet_kiov_t *kiov = conn->ksnc_rx_kiov;
296 LASSERT (conn->ksnc_rx_nkiov > 0);
298 /* Never touch conn->ksnc_rx_kiov or change connection
299 * status inside ksocknal_lib_recv_iov */
300 rc = ksocknal_lib_recv_kiov(conn);
305 /* received something... */
308 conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
309 conn->ksnc_rx_deadline =
310 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
311 mb(); /* order with setting rx_started */
312 conn->ksnc_rx_started = 1;
314 conn->ksnc_rx_nob_wanted -= nob;
315 conn->ksnc_rx_nob_left -= nob;
318 LASSERT (conn->ksnc_rx_nkiov > 0);
320 if (nob < kiov->kiov_len) {
321 kiov->kiov_offset += nob;
322 kiov->kiov_len -= nob;
326 nob -= kiov->kiov_len;
327 conn->ksnc_rx_kiov = ++kiov;
328 conn->ksnc_rx_nkiov--;
335 ksocknal_receive (ksock_conn_t *conn)
337 /* Return 1 on success, 0 on EOF, < 0 on error.
338 * Caller checks ksnc_rx_nob_wanted to determine
339 * progress/completion. */
343 if (ksocknal_data.ksnd_stall_rx != 0) {
344 cfs_pause(cfs_time_seconds (ksocknal_data.ksnd_stall_rx));
347 rc = ksocknal_connsock_addref(conn);
349 LASSERT (conn->ksnc_closing);
354 if (conn->ksnc_rx_niov != 0)
355 rc = ksocknal_recv_iov (conn);
357 rc = ksocknal_recv_kiov (conn);
360 /* error/EOF or partial receive */
363 } else if (rc == 0 && conn->ksnc_rx_started) {
364 /* EOF in the middle of a message */
370 /* Completed a fragment */
372 if (conn->ksnc_rx_nob_wanted == 0) {
378 ksocknal_connsock_decref(conn);
383 ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx)
385 lnet_msg_t *lnetmsg = tx->tx_lnetmsg;
386 int rc = (tx->tx_resid == 0) ? 0 : -EIO;
389 LASSERT(ni != NULL || tx->tx_conn != NULL);
391 if (tx->tx_conn != NULL)
392 ksocknal_conn_decref(tx->tx_conn);
394 if (ni == NULL && tx->tx_conn != NULL)
395 ni = tx->tx_conn->ksnc_peer->ksnp_ni;
397 ksocknal_free_tx (tx);
398 if (lnetmsg != NULL) /* KSOCK_MSG_NOOP go without lnetmsg */
399 lnet_finalize (ni, lnetmsg, rc);
405 ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error)
409 while (!list_empty (txlist)) {
410 tx = list_entry (txlist->next, ksock_tx_t, tx_list);
412 if (error && tx->tx_lnetmsg != NULL) {
413 CDEBUG (D_NETERROR, "Deleting packet type %d len %d %s->%s\n",
414 le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type),
415 le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length),
416 libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
417 libcfs_nid2str(le64_to_cpu (tx->tx_lnetmsg->msg_hdr.dest_nid)));
419 CDEBUG (D_NETERROR, "Deleting noop packet\n");
422 list_del (&tx->tx_list);
424 LASSERT (atomic_read(&tx->tx_refcount) == 1);
425 ksocknal_tx_done (ni, tx);
430 ksocknal_check_zc_req(ksock_tx_t *tx)
432 ksock_conn_t *conn = tx->tx_conn;
433 ksock_peer_t *peer = conn->ksnc_peer;
434 lnet_kiov_t *kiov = tx->tx_kiov;
435 int nkiov = tx->tx_nkiov;
437 /* Set tx_msg.ksm_zc_req_cookie to a unique non-zero cookie and add tx
438 * to ksnp_zc_req_list if some fragment of this message should be sent
439 * zero-copy. Our peer will send an ACK containing this cookie when
440 * she has received this message to tell us we can signal completion.
441 * tx_msg.ksm_zc_req_cookie remains non-zero while tx is on
442 * ksnp_zc_req_list. */
444 if (conn->ksnc_proto != &ksocknal_protocol_v2x ||
445 !conn->ksnc_zc_capable)
449 if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag)
458 /* assign cookie and queue tx to pending list, it will be released when
459 * a matching ack is received. See ksocknal_handle_zc_ack() */
461 ksocknal_tx_addref(tx);
463 spin_lock(&peer->ksnp_lock);
465 LASSERT (tx->tx_msg.ksm_zc_req_cookie == 0);
466 tx->tx_msg.ksm_zc_req_cookie = peer->ksnp_zc_next_cookie++;
467 list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list);
469 spin_unlock(&peer->ksnp_lock);
473 ksocknal_unzc_req(ksock_tx_t *tx)
475 ksock_peer_t *peer = tx->tx_conn->ksnc_peer;
477 spin_lock(&peer->ksnp_lock);
479 if (tx->tx_msg.ksm_zc_req_cookie == 0) {
480 /* Not waiting for an ACK */
481 spin_unlock(&peer->ksnp_lock);
485 tx->tx_msg.ksm_zc_req_cookie = 0;
486 list_del(&tx->tx_zc_list);
488 spin_unlock(&peer->ksnp_lock);
490 ksocknal_tx_decref(tx);
494 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
498 if (!tx->tx_checked_zc) {
499 tx->tx_checked_zc = 1;
500 ksocknal_check_zc_req(tx);
503 rc = ksocknal_transmit (conn, tx);
505 CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
507 if (tx->tx_resid == 0) {
508 /* Sent everything OK */
520 counter++; /* exponential backoff warnings */
521 if ((counter & (-counter)) == counter)
522 CWARN("%u ENOMEM tx %p (%u allocated)\n",
523 counter, conn, atomic_read(&libcfs_kmemory));
525 /* Queue on ksnd_enomem_conns for retry after a timeout */
526 spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
528 /* enomem list takes over scheduler's ref... */
529 LASSERT (conn->ksnc_tx_scheduled);
530 list_add_tail(&conn->ksnc_tx_list,
531 &ksocknal_data.ksnd_enomem_conns);
532 if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(),
533 SOCKNAL_ENOMEM_RETRY),
534 ksocknal_data.ksnd_reaper_waketime))
535 cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
537 spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
544 if (!conn->ksnc_closing) {
547 LCONSOLE_WARN("Host %u.%u.%u.%u reset our connection "
548 "while we were sending data; it may have "
550 HIPQUAD(conn->ksnc_ipaddr));
553 LCONSOLE_WARN("There was an unexpected network error "
554 "while writing to %u.%u.%u.%u: %d.\n",
555 HIPQUAD(conn->ksnc_ipaddr), rc);
558 CDEBUG(D_NET, "[%p] Error %d on write to %s"
559 " ip %d.%d.%d.%d:%d\n", conn, rc,
560 libcfs_id2str(conn->ksnc_peer->ksnp_id),
561 HIPQUAD(conn->ksnc_ipaddr),
565 ksocknal_unzc_req(tx);
567 /* it's not an error if conn is being closed */
568 ksocknal_close_conn_and_siblings (conn,
569 (conn->ksnc_closing) ? 0 : rc);
575 ksocknal_launch_connection_locked (ksock_route_t *route)
578 /* called holding write lock on ksnd_global_lock */
580 LASSERT (!route->ksnr_scheduled);
581 LASSERT (!route->ksnr_connecting);
582 LASSERT ((ksocknal_route_mask() & ~route->ksnr_connected) != 0);
584 route->ksnr_scheduled = 1; /* scheduling conn for connd */
585 ksocknal_route_addref(route); /* extra ref for connd */
587 spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
589 list_add_tail (&route->ksnr_connd_list,
590 &ksocknal_data.ksnd_connd_routes);
591 cfs_waitq_signal (&ksocknal_data.ksnd_connd_waitq);
593 spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
597 ksocknal_find_conn_locked (int payload_nob, ksock_peer_t *peer)
599 struct list_head *tmp;
600 ksock_conn_t *typed = NULL;
602 ksock_conn_t *fallback = NULL;
606 list_for_each (tmp, &peer->ksnp_conns) {
607 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
609 #if SOCKNAL_ROUND_ROBIN
612 int nob = atomic_read(&c->ksnc_tx_nob) +
613 SOCK_WMEM_QUEUED(c->ksnc_sock);
615 LASSERT (!c->ksnc_closing);
616 LASSERT (c->ksnc_proto != NULL);
618 if (fallback == NULL || nob < fnob) {
623 if (!*ksocknal_tunables.ksnd_typed_conns)
626 if (payload_nob == 0) {
628 hdr_nob = offsetof(ksock_msg_t, ksm_u);
631 hdr_nob = (c->ksnc_proto == &ksocknal_protocol_v2x)?
632 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload):
636 switch (c->ksnc_type) {
638 CERROR("ksnc_type bad: %u\n", c->ksnc_type);
640 case SOCKLND_CONN_ANY:
642 case SOCKLND_CONN_BULK_IN:
644 case SOCKLND_CONN_BULK_OUT:
645 if ((hdr_nob + payload_nob) < *ksocknal_tunables.ksnd_min_bulk)
648 case SOCKLND_CONN_CONTROL:
649 if ((hdr_nob + payload_nob) >= *ksocknal_tunables.ksnd_min_bulk)
654 if (typed == NULL || nob < tnob) {
660 /* prefer the typed selection */
661 conn = (typed != NULL) ? typed : fallback;
663 #if SOCKNAL_ROUND_ROBIN
665 /* round-robin all else being equal */
666 list_del (&conn->ksnc_list);
667 list_add_tail (&conn->ksnc_list, &peer->ksnp_conns);
674 ksocknal_next_mono_tx(ksock_conn_t *conn)
676 ksock_tx_t *tx = conn->ksnc_tx_mono;
678 /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */
679 LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x);
680 LASSERT(!list_empty(&conn->ksnc_tx_queue));
683 if (tx->tx_list.next == &conn->ksnc_tx_queue) {
684 /* no more packets queued */
685 conn->ksnc_tx_mono = NULL;
687 conn->ksnc_tx_mono = list_entry(tx->tx_list.next, ksock_tx_t, tx_list);
688 LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == tx->tx_msg.ksm_type);
693 ksocknal_piggyback_zcack(ksock_conn_t *conn, __u64 cookie)
695 ksock_tx_t *tx = conn->ksnc_tx_mono;
697 /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */
702 if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) {
703 /* tx is noop zc-ack, can't piggyback zc-ack cookie */
707 LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET);
708 LASSERT(tx->tx_msg.ksm_zc_ack_cookie == 0);
710 /* piggyback the zc-ack cookie */
711 tx->tx_msg.ksm_zc_ack_cookie = cookie;
712 ksocknal_next_mono_tx(conn);
718 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
720 ksock_sched_t *sched = conn->ksnc_scheduler;
721 ksock_msg_t *msg = &tx->tx_msg;
725 /* called holding global lock (read or irq-write) and caller may
726 * not have dropped this lock between finding conn and calling me,
727 * so we don't need the {get,put}connsock dance to deref
729 LASSERT(!conn->ksnc_closing);
731 CDEBUG (D_NET, "Sending to %s ip %d.%d.%d.%d:%d\n",
732 libcfs_id2str(conn->ksnc_peer->ksnp_id),
733 HIPQUAD(conn->ksnc_ipaddr),
736 tx->tx_checked_zc = 0;
737 conn->ksnc_proto->pro_pack(tx);
739 /* Ensure the frags we've been given EXACTLY match the number of
740 * bytes we want to send. Many TCP/IP stacks disregard any total
741 * size parameters passed to them and just look at the frags.
743 * We always expect at least 1 mapped fragment containing the
744 * complete ksocknal message header. */
745 LASSERT (lnet_iov_nob (tx->tx_niov, tx->tx_iov) +
746 lnet_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
747 LASSERT (tx->tx_niov >= 1);
748 LASSERT (tx->tx_resid == tx->tx_nob);
750 CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n",
751 tx, (tx->tx_lnetmsg != NULL)? tx->tx_lnetmsg->msg_hdr.type:
753 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
755 atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
757 ksocknal_conn_addref(conn); /* +1 ref for tx */
760 * NB Darwin: SOCK_WMEM_QUEUED()->sock_getsockopt() will take
761 * a blockable lock(socket lock), so SOCK_WMEM_QUEUED can't be
764 bufnob = SOCK_WMEM_QUEUED(conn->ksnc_sock);
765 spin_lock_bh (&sched->kss_lock);
767 if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) {
768 /* First packet starts the timeout */
769 conn->ksnc_tx_deadline =
770 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
771 conn->ksnc_tx_bufnob = 0;
772 mb(); /* order with adding to tx_queue */
777 if (msg->ksm_type == KSOCK_MSG_NOOP) {
778 /* The packet is noop ZC ACK, try to piggyback the ack_cookie
779 * on a normal packet so I don't need to send it */
780 LASSERT(msg->ksm_zc_req_cookie == 0);
781 LASSERT(msg->ksm_zc_ack_cookie != 0);
783 if (conn->ksnc_tx_mono != NULL) {
784 if (ksocknal_piggyback_zcack(conn, msg->ksm_zc_ack_cookie)) {
785 /* zc-ack cookie is piggybacked */
786 atomic_sub (tx->tx_nob, &conn->ksnc_tx_nob);
787 ztx = tx; /* Put to freelist later */
789 /* no packet can piggyback zc-ack cookie */
790 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
793 /* It's the first mono-packet */
794 conn->ksnc_tx_mono = tx;
795 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
799 /* It's a normal packet - can it piggback a noop zc-ack that
800 * has been queued already? */
801 LASSERT(msg->ksm_zc_ack_cookie == 0);
803 if (conn->ksnc_proto == &ksocknal_protocol_v2x && /* V2.x packet */
804 conn->ksnc_tx_mono != NULL) {
805 if (conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_NOOP) {
806 /* There is a noop zc-ack can be piggybacked */
807 ztx = conn->ksnc_tx_mono;
809 msg->ksm_zc_ack_cookie = ztx->tx_msg.ksm_zc_ack_cookie;
810 ksocknal_next_mono_tx(conn);
812 /* use tx to replace the noop zc-ack packet, ztx will
813 * be put to freelist later */
814 list_add(&tx->tx_list, &ztx->tx_list);
815 list_del(&ztx->tx_list);
817 atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob);
819 /* no noop zc-ack packet, just enqueue it */
820 LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_LNET);
821 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
824 } else if (conn->ksnc_proto == &ksocknal_protocol_v2x) {
825 /* it's the first mono-packet, enqueue it */
826 conn->ksnc_tx_mono = tx;
827 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
829 /* V1.x packet, just enqueue it */
830 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
835 list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs);
837 if (conn->ksnc_tx_ready && /* able to send */
838 !conn->ksnc_tx_scheduled) { /* not scheduled to send */
839 /* +1 ref for scheduler */
840 ksocknal_conn_addref(conn);
841 list_add_tail (&conn->ksnc_tx_list,
842 &sched->kss_tx_conns);
843 conn->ksnc_tx_scheduled = 1;
844 cfs_waitq_signal (&sched->kss_waitq);
847 spin_unlock_bh (&sched->kss_lock);
851 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
853 struct list_head *tmp;
854 ksock_route_t *route;
856 list_for_each (tmp, &peer->ksnp_routes) {
857 route = list_entry (tmp, ksock_route_t, ksnr_list);
859 LASSERT (!route->ksnr_connecting || route->ksnr_scheduled);
861 if (route->ksnr_scheduled) /* connections being established */
864 /* all route types connected ? */
865 if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0)
868 /* too soon to retry this guy? */
869 if (!(route->ksnr_retry_interval == 0 || /* first attempt */
870 cfs_time_aftereq (cfs_time_current(),
871 route->ksnr_timeout)))
881 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
883 struct list_head *tmp;
884 ksock_route_t *route;
886 list_for_each (tmp, &peer->ksnp_routes) {
887 route = list_entry (tmp, ksock_route_t, ksnr_list);
889 LASSERT (!route->ksnr_connecting || route->ksnr_scheduled);
891 if (route->ksnr_scheduled)
899 ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id)
903 ksock_route_t *route;
908 LASSERT (tx->tx_conn == NULL);
909 LASSERT (tx->tx_lnetmsg != NULL);
911 g_lock = &ksocknal_data.ksnd_global_lock;
913 for (retry = 0;; retry = 1) {
914 #if !SOCKNAL_ROUND_ROBIN
916 peer = ksocknal_find_peer_locked(ni, id);
918 if (ksocknal_find_connectable_route_locked(peer) == NULL) {
919 conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer);
921 /* I've got no routes that need to be
922 * connecting and I do have an actual
924 ksocknal_queue_tx_locked (tx, conn);
925 read_unlock (g_lock);
931 /* I'll need a write lock... */
932 read_unlock (g_lock);
934 write_lock_bh (g_lock);
936 peer = ksocknal_find_peer_locked(ni, id);
940 write_unlock_bh (g_lock);
942 if ((id.pid & LNET_PID_USERFLAG) != 0) {
943 CERROR("Refusing to create a connection to "
944 "userspace process %s\n", libcfs_id2str(id));
945 return -EHOSTUNREACH;
949 CERROR("Can't find peer %s\n", libcfs_id2str(id));
950 return -EHOSTUNREACH;
953 rc = ksocknal_add_peer(ni, id,
954 LNET_NIDADDR(id.nid),
955 lnet_acceptor_port());
957 CERROR("Can't add peer %s: %d\n",
958 libcfs_id2str(id), rc);
964 /* launch any/all connections that need it */
965 route = ksocknal_find_connectable_route_locked (peer);
969 ksocknal_launch_connection_locked (route);
972 conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer);
974 /* Connection exists; queue message on it */
975 ksocknal_queue_tx_locked (tx, conn);
976 write_unlock_bh (g_lock);
980 if (peer->ksnp_accepting > 0 ||
981 ksocknal_find_connecting_route_locked (peer) != NULL) {
982 /* Queue the message until a connection is established */
983 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
984 write_unlock_bh (g_lock);
988 write_unlock_bh (g_lock);
990 /* NB Routes may be ignored if connections to them failed recently */
991 CDEBUG(D_NETERROR, "No usable routes to %s\n", libcfs_id2str(id));
992 return (-EHOSTUNREACH);
996 ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
998 int type = lntmsg->msg_type;
999 lnet_process_id_t target = lntmsg->msg_target;
1000 unsigned int payload_niov = lntmsg->msg_niov;
1001 struct iovec *payload_iov = lntmsg->msg_iov;
1002 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1003 unsigned int payload_offset = lntmsg->msg_offset;
1004 unsigned int payload_nob = lntmsg->msg_len;
1009 /* NB 'private' is different depending on what we're sending.
1010 * Just ignore it... */
1012 CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1013 payload_nob, payload_niov, libcfs_id2str(target));
1015 LASSERT (payload_nob == 0 || payload_niov > 0);
1016 LASSERT (payload_niov <= LNET_MAX_IOV);
1017 /* payload is either all vaddrs or all pages */
1018 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1019 LASSERT (!in_interrupt ());
1021 if (payload_iov != NULL)
1022 desc_size = offsetof(ksock_tx_t,
1023 tx_frags.virt.iov[1 + payload_niov]);
1025 desc_size = offsetof(ksock_tx_t,
1026 tx_frags.paged.kiov[payload_niov]);
1028 tx = ksocknal_alloc_tx(desc_size);
1030 CERROR("Can't allocate tx desc type %d size %d\n",
1035 tx->tx_conn = NULL; /* set when assigned a conn */
1036 tx->tx_lnetmsg = lntmsg;
1038 if (payload_iov != NULL) {
1041 tx->tx_iov = tx->tx_frags.virt.iov;
1043 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
1044 payload_niov, payload_iov,
1045 payload_offset, payload_nob);
1048 tx->tx_iov = &tx->tx_frags.paged.iov;
1049 tx->tx_kiov = tx->tx_frags.paged.kiov;
1050 tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov,
1051 payload_niov, payload_kiov,
1052 payload_offset, payload_nob);
1055 ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
1057 /* The first fragment will be set later in pro_pack */
1058 rc = ksocknal_launch_packet(ni, tx, target);
1062 ksocknal_free_tx(tx);
1067 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1069 long pid = cfs_kernel_thread (fn, arg, 0);
1074 write_lock_bh (&ksocknal_data.ksnd_global_lock);
1075 ksocknal_data.ksnd_nthreads++;
1076 write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1081 ksocknal_thread_fini (void)
1083 write_lock_bh (&ksocknal_data.ksnd_global_lock);
1084 ksocknal_data.ksnd_nthreads--;
1085 write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1089 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1091 static char ksocknal_slop_buffer[4096];
1097 LASSERT(conn->ksnc_proto != NULL);
1099 if ((*ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0) {
1100 /* Remind the socket to ack eagerly... */
1101 ksocknal_lib_eager_ack(conn);
1104 if (nob_to_skip == 0) { /* right at next packet boundary now */
1105 conn->ksnc_rx_started = 0;
1106 mb (); /* racing with timeout thread */
1108 switch (conn->ksnc_proto->pro_version) {
1109 case KSOCK_PROTO_V2:
1110 conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER;
1111 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1112 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg;
1114 conn->ksnc_rx_nob_wanted = offsetof(ksock_msg_t, ksm_u);
1115 conn->ksnc_rx_nob_left = offsetof(ksock_msg_t, ksm_u);
1116 conn->ksnc_rx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u);
1119 case KSOCK_PROTO_V1:
1120 /* Receiving bare lnet_hdr_t */
1121 conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
1122 conn->ksnc_rx_nob_wanted = sizeof(lnet_hdr_t);
1123 conn->ksnc_rx_nob_left = sizeof(lnet_hdr_t);
1125 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1126 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg;
1127 conn->ksnc_rx_iov[0].iov_len = sizeof (lnet_hdr_t);
1133 conn->ksnc_rx_niov = 1;
1135 conn->ksnc_rx_kiov = NULL;
1136 conn->ksnc_rx_nkiov = 0;
1137 conn->ksnc_rx_csum = ~0;
1141 /* Set up to skip as much as possible now. If there's more left
1142 * (ran out of iov entries) we'll get called again */
1144 conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1145 conn->ksnc_rx_nob_left = nob_to_skip;
1146 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1151 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1153 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1154 conn->ksnc_rx_iov[niov].iov_len = nob;
1159 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1160 niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1162 conn->ksnc_rx_niov = niov;
1163 conn->ksnc_rx_kiov = NULL;
1164 conn->ksnc_rx_nkiov = 0;
1165 conn->ksnc_rx_nob_wanted = skipped;
1169 /* (Sink) handle incoming ZC request from sender */
1171 ksocknal_handle_zc_req(ksock_peer_t *peer, __u64 cookie)
1175 ksock_sched_t *sched;
1178 read_lock (&ksocknal_data.ksnd_global_lock);
1180 conn = ksocknal_find_conn_locked (0, peer);
1182 read_unlock (&ksocknal_data.ksnd_global_lock);
1183 CERROR("Can't find connection to send zcack.\n");
1187 sched = conn->ksnc_scheduler;
1189 spin_lock_bh (&sched->kss_lock);
1190 rc = ksocknal_piggyback_zcack(conn, cookie);
1191 spin_unlock_bh (&sched->kss_lock);
1193 read_unlock (&ksocknal_data.ksnd_global_lock);
1195 /* Ack cookie is piggybacked */
1199 tx = ksocknal_alloc_tx(KSOCK_NOOP_TX_SIZE);
1201 CERROR("Can't allocate noop tx desc\n");
1206 tx->tx_lnetmsg = NULL;
1209 tx->tx_iov = tx->tx_frags.virt.iov;
1212 ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
1213 tx->tx_msg.ksm_zc_ack_cookie = cookie; /* incoming cookie */
1215 read_lock (&ksocknal_data.ksnd_global_lock);
1217 conn = ksocknal_find_conn_locked (0, peer);
1219 read_unlock (&ksocknal_data.ksnd_global_lock);
1220 ksocknal_free_tx(tx);
1221 CERROR("Can't find connection to send zcack.\n");
1224 ksocknal_queue_tx_locked(tx, conn);
1226 read_unlock (&ksocknal_data.ksnd_global_lock);
1231 /* (Sender) handle ZC_ACK from sink */
1233 ksocknal_handle_zc_ack(ksock_peer_t *peer, __u64 cookie)
1236 struct list_head *ctmp;
1238 spin_lock(&peer->ksnp_lock);
1240 list_for_each(ctmp, &peer->ksnp_zc_req_list) {
1241 tx = list_entry (ctmp, ksock_tx_t, tx_zc_list);
1242 if (tx->tx_msg.ksm_zc_req_cookie != cookie)
1245 tx->tx_msg.ksm_zc_req_cookie = 0;
1246 list_del(&tx->tx_zc_list);
1248 spin_unlock(&peer->ksnp_lock);
1250 ksocknal_tx_decref(tx);
1253 spin_unlock(&peer->ksnp_lock);
1259 ksocknal_process_receive (ksock_conn_t *conn)
1263 LASSERT (atomic_read(&conn->ksnc_conn_refcount) > 0);
1265 /* NB: sched lock NOT held */
1266 /* SOCKNAL_RX_LNET_HEADER is here for backward compatability */
1267 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER ||
1268 conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD ||
1269 conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER ||
1270 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1272 if (conn->ksnc_rx_nob_wanted != 0) {
1273 rc = ksocknal_receive(conn);
1276 LASSERT (rc != -EAGAIN);
1279 CDEBUG (D_NET, "[%p] EOF from %s"
1280 " ip %d.%d.%d.%d:%d\n", conn,
1281 libcfs_id2str(conn->ksnc_peer->ksnp_id),
1282 HIPQUAD(conn->ksnc_ipaddr),
1284 else if (!conn->ksnc_closing)
1285 CERROR ("[%p] Error %d on read from %s"
1286 " ip %d.%d.%d.%d:%d\n",
1288 libcfs_id2str(conn->ksnc_peer->ksnp_id),
1289 HIPQUAD(conn->ksnc_ipaddr),
1292 /* it's not an error if conn is being closed */
1293 ksocknal_close_conn_and_siblings (conn,
1294 (conn->ksnc_closing) ? 0 : rc);
1295 return (rc == 0 ? -ESHUTDOWN : rc);
1298 if (conn->ksnc_rx_nob_wanted != 0) {
1303 switch (conn->ksnc_rx_state) {
1304 case SOCKNAL_RX_KSM_HEADER:
1305 if (conn->ksnc_flip) {
1306 __swab32s(&conn->ksnc_msg.ksm_type);
1307 __swab32s(&conn->ksnc_msg.ksm_csum);
1308 __swab64s(&conn->ksnc_msg.ksm_zc_req_cookie);
1309 __swab64s(&conn->ksnc_msg.ksm_zc_ack_cookie);
1312 if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP &&
1313 conn->ksnc_msg.ksm_csum != 0 && /* has checksum */
1314 conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
1315 /* NOOP Checksum error */
1316 CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
1317 libcfs_id2str(conn->ksnc_peer->ksnp_id),
1318 conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
1319 ksocknal_new_packet(conn, 0);
1320 ksocknal_close_conn_and_siblings(conn, -EPROTO);
1324 if (conn->ksnc_msg.ksm_zc_ack_cookie != 0) {
1325 LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x);
1327 rc = ksocknal_handle_zc_ack(conn->ksnc_peer,
1328 conn->ksnc_msg.ksm_zc_ack_cookie);
1330 CERROR("%s: Unknown zero copy ACK cookie: "LPU64"\n",
1331 libcfs_id2str(conn->ksnc_peer->ksnp_id),
1332 conn->ksnc_msg.ksm_zc_ack_cookie);
1333 ksocknal_new_packet(conn, 0);
1334 ksocknal_close_conn_and_siblings(conn, -EPROTO);
1339 if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) {
1340 ksocknal_new_packet (conn, 0);
1341 return 0; /* NOOP is done and just return */
1343 LASSERT (conn->ksnc_msg.ksm_type == KSOCK_MSG_LNET);
1345 conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
1346 conn->ksnc_rx_nob_wanted = sizeof(ksock_lnet_msg_t);
1347 conn->ksnc_rx_nob_left = sizeof(ksock_lnet_msg_t);
1349 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1350 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg;
1351 conn->ksnc_rx_iov[0].iov_len = sizeof(ksock_lnet_msg_t);
1353 conn->ksnc_rx_niov = 1;
1354 conn->ksnc_rx_kiov = NULL;
1355 conn->ksnc_rx_nkiov = 0;
1357 goto again; /* read lnet header now */
1359 case SOCKNAL_RX_LNET_HEADER:
1360 /* unpack message header */
1361 conn->ksnc_proto->pro_unpack(&conn->ksnc_msg);
1363 if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) {
1364 /* Userspace peer */
1365 lnet_process_id_t *id = &conn->ksnc_peer->ksnp_id;
1366 lnet_hdr_t *lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
1368 /* Substitute process ID assigned at connection time */
1369 lhdr->src_pid = cpu_to_le32(id->pid);
1370 lhdr->src_nid = cpu_to_le64(id->nid);
1373 conn->ksnc_rx_state = SOCKNAL_RX_PARSE;
1374 ksocknal_conn_addref(conn); /* ++ref while parsing */
1376 rc = lnet_parse(conn->ksnc_peer->ksnp_ni,
1377 &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr,
1378 conn->ksnc_peer->ksnp_id.nid, conn, 0);
1380 /* I just received garbage: give up on this conn */
1381 ksocknal_new_packet(conn, 0);
1382 ksocknal_close_conn_and_siblings (conn, rc);
1383 ksocknal_conn_decref(conn);
1387 /* I'm racing with ksocknal_recv() */
1388 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_PARSE ||
1389 conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD);
1391 if (conn->ksnc_rx_state != SOCKNAL_RX_LNET_PAYLOAD)
1394 /* ksocknal_recv() got called */
1397 case SOCKNAL_RX_LNET_PAYLOAD:
1398 /* payload all received */
1401 if (conn->ksnc_rx_nob_left == 0 && /* not truncating */
1402 conn->ksnc_msg.ksm_csum != 0 && /* has checksum */
1403 conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
1404 CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
1405 libcfs_id2str(conn->ksnc_peer->ksnp_id),
1406 conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
1410 lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc);
1412 if (rc == 0 && conn->ksnc_msg.ksm_zc_req_cookie != 0) {
1413 LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x);
1414 rc = ksocknal_handle_zc_req(conn->ksnc_peer,
1415 conn->ksnc_msg.ksm_zc_req_cookie);
1419 ksocknal_new_packet(conn, 0);
1420 ksocknal_close_conn_and_siblings (conn, rc);
1425 case SOCKNAL_RX_SLOP:
1426 /* starting new packet? */
1427 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1428 return 0; /* come back later */
1429 goto again; /* try to finish reading slop now */
1437 return (-EINVAL); /* keep gcc happy */
1441 ksocknal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
1442 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1443 unsigned int offset, unsigned int mlen, unsigned int rlen)
1445 ksock_conn_t *conn = (ksock_conn_t *)private;
1446 ksock_sched_t *sched = conn->ksnc_scheduler;
1448 LASSERT (mlen <= rlen);
1449 LASSERT (niov <= LNET_MAX_IOV);
1451 conn->ksnc_cookie = msg;
1452 conn->ksnc_rx_nob_wanted = mlen;
1453 conn->ksnc_rx_nob_left = rlen;
1455 if (mlen == 0 || iov != NULL) {
1456 conn->ksnc_rx_nkiov = 0;
1457 conn->ksnc_rx_kiov = NULL;
1458 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1459 conn->ksnc_rx_niov =
1460 lnet_extract_iov(LNET_MAX_IOV, conn->ksnc_rx_iov,
1461 niov, iov, offset, mlen);
1463 conn->ksnc_rx_niov = 0;
1464 conn->ksnc_rx_iov = NULL;
1465 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1466 conn->ksnc_rx_nkiov =
1467 lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov,
1468 niov, kiov, offset, mlen);
1472 lnet_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1473 lnet_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1475 LASSERT (conn->ksnc_rx_scheduled);
1477 spin_lock_bh (&sched->kss_lock);
1479 switch (conn->ksnc_rx_state) {
1480 case SOCKNAL_RX_PARSE_WAIT:
1481 list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns);
1482 cfs_waitq_signal (&sched->kss_waitq);
1483 LASSERT (conn->ksnc_rx_ready);
1486 case SOCKNAL_RX_PARSE:
1487 /* scheduler hasn't noticed I'm parsing yet */
1491 conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD;
1493 spin_unlock_bh (&sched->kss_lock);
1494 ksocknal_conn_decref(conn);
1499 ksocknal_sched_cansleep(ksock_sched_t *sched)
1503 spin_lock_bh (&sched->kss_lock);
1505 rc = (!ksocknal_data.ksnd_shuttingdown &&
1506 list_empty(&sched->kss_rx_conns) &&
1507 list_empty(&sched->kss_tx_conns));
1509 spin_unlock_bh (&sched->kss_lock);
1513 int ksocknal_scheduler (void *arg)
1515 ksock_sched_t *sched = (ksock_sched_t *)arg;
1520 int id = sched - ksocknal_data.ksnd_schedulers;
1523 snprintf (name, sizeof (name),"socknal_sd%02d", id);
1524 cfs_daemonize (name);
1525 cfs_block_allsigs ();
1527 #if defined(CONFIG_SMP) && defined(CPU_AFFINITY)
1528 id = ksocknal_sched2cpu(id);
1529 if (cpu_online(id)) {
1530 cpumask_t m = CPU_MASK_NONE;
1532 set_cpus_allowed(current, m);
1534 CERROR ("Can't set CPU affinity for %s to %d\n", name, id);
1536 #endif /* CONFIG_SMP && CPU_AFFINITY */
1538 spin_lock_bh (&sched->kss_lock);
1540 while (!ksocknal_data.ksnd_shuttingdown) {
1541 int did_something = 0;
1543 /* Ensure I progress everything semi-fairly */
1545 if (!list_empty (&sched->kss_rx_conns)) {
1546 conn = list_entry(sched->kss_rx_conns.next,
1547 ksock_conn_t, ksnc_rx_list);
1548 list_del(&conn->ksnc_rx_list);
1550 LASSERT(conn->ksnc_rx_scheduled);
1551 LASSERT(conn->ksnc_rx_ready);
1553 /* clear rx_ready in case receive isn't complete.
1554 * Do it BEFORE we call process_recv, since
1555 * data_ready can set it any time after we release
1557 conn->ksnc_rx_ready = 0;
1558 spin_unlock_bh (&sched->kss_lock);
1560 rc = ksocknal_process_receive(conn);
1562 spin_lock_bh (&sched->kss_lock);
1564 /* I'm the only one that can clear this flag */
1565 LASSERT(conn->ksnc_rx_scheduled);
1567 /* Did process_receive get everything it wanted? */
1569 conn->ksnc_rx_ready = 1;
1571 if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) {
1572 /* Conn blocked waiting for ksocknal_recv()
1573 * I change its state (under lock) to signal
1574 * it can be rescheduled */
1575 conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT;
1576 } else if (conn->ksnc_rx_ready) {
1577 /* reschedule for rx */
1578 list_add_tail (&conn->ksnc_rx_list,
1579 &sched->kss_rx_conns);
1581 conn->ksnc_rx_scheduled = 0;
1583 ksocknal_conn_decref(conn);
1589 if (!list_empty (&sched->kss_tx_conns)) {
1590 CFS_LIST_HEAD (zlist);
1592 if (!list_empty(&sched->kss_zombie_noop_txs)) {
1593 list_add(&zlist, &sched->kss_zombie_noop_txs);
1594 list_del_init(&sched->kss_zombie_noop_txs);
1597 conn = list_entry(sched->kss_tx_conns.next,
1598 ksock_conn_t, ksnc_tx_list);
1599 list_del (&conn->ksnc_tx_list);
1601 LASSERT(conn->ksnc_tx_scheduled);
1602 LASSERT(conn->ksnc_tx_ready);
1603 LASSERT(!list_empty(&conn->ksnc_tx_queue));
1605 tx = list_entry(conn->ksnc_tx_queue.next,
1606 ksock_tx_t, tx_list);
1608 if (conn->ksnc_tx_mono == tx)
1609 ksocknal_next_mono_tx(conn);
1611 /* dequeue now so empty list => more to send */
1612 list_del(&tx->tx_list);
1614 /* Clear tx_ready in case send isn't complete. Do
1615 * it BEFORE we call process_transmit, since
1616 * write_space can set it any time after we release
1618 conn->ksnc_tx_ready = 0;
1619 spin_unlock_bh (&sched->kss_lock);
1621 if (!list_empty(&zlist)) {
1622 /* free zombie noop txs, it's fast because
1623 * noop txs are just put in freelist */
1624 ksocknal_txlist_done(NULL, &zlist, 0);
1627 rc = ksocknal_process_transmit(conn, tx);
1629 if (rc == -ENOMEM || rc == -EAGAIN) {
1630 /* Incomplete send: replace tx on HEAD of tx_queue */
1631 spin_lock_bh (&sched->kss_lock);
1632 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1634 /* Complete send; tx -ref */
1635 ksocknal_tx_decref (tx);
1637 spin_lock_bh (&sched->kss_lock);
1638 /* assume space for more */
1639 conn->ksnc_tx_ready = 1;
1642 if (rc == -ENOMEM) {
1643 /* Do nothing; after a short timeout, this
1644 * conn will be reposted on kss_tx_conns. */
1645 } else if (conn->ksnc_tx_ready &&
1646 !list_empty (&conn->ksnc_tx_queue)) {
1647 /* reschedule for tx */
1648 list_add_tail (&conn->ksnc_tx_list,
1649 &sched->kss_tx_conns);
1651 conn->ksnc_tx_scheduled = 0;
1653 ksocknal_conn_decref(conn);
1658 if (!did_something || /* nothing to do */
1659 ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1660 spin_unlock_bh (&sched->kss_lock);
1664 if (!did_something) { /* wait for something to do */
1665 rc = wait_event_interruptible_exclusive(
1667 !ksocknal_sched_cansleep(sched));
1673 spin_lock_bh (&sched->kss_lock);
1677 spin_unlock_bh (&sched->kss_lock);
1678 ksocknal_thread_fini ();
1683 * Add connection to kss_rx_conns of scheduler
1684 * and wakeup the scheduler.
1686 void ksocknal_read_callback (ksock_conn_t *conn)
1688 ksock_sched_t *sched;
1691 sched = conn->ksnc_scheduler;
1693 spin_lock_bh (&sched->kss_lock);
1695 conn->ksnc_rx_ready = 1;
1697 if (!conn->ksnc_rx_scheduled) { /* not being progressed */
1698 list_add_tail(&conn->ksnc_rx_list,
1699 &sched->kss_rx_conns);
1700 conn->ksnc_rx_scheduled = 1;
1701 /* extra ref for scheduler */
1702 ksocknal_conn_addref(conn);
1704 cfs_waitq_signal (&sched->kss_waitq);
1706 spin_unlock_bh (&sched->kss_lock);
1712 * Add connection to kss_tx_conns of scheduler
1713 * and wakeup the scheduler.
1715 void ksocknal_write_callback (ksock_conn_t *conn)
1717 ksock_sched_t *sched;
1720 sched = conn->ksnc_scheduler;
1722 spin_lock_bh (&sched->kss_lock);
1724 conn->ksnc_tx_ready = 1;
1726 if (!conn->ksnc_tx_scheduled && // not being progressed
1727 !list_empty(&conn->ksnc_tx_queue)){//packets to send
1728 list_add_tail (&conn->ksnc_tx_list,
1729 &sched->kss_tx_conns);
1730 conn->ksnc_tx_scheduled = 1;
1731 /* extra ref for scheduler */
1732 ksocknal_conn_addref(conn);
1734 cfs_waitq_signal (&sched->kss_waitq);
1737 spin_unlock_bh (&sched->kss_lock);
1743 ksocknal_parse_proto_version (ksock_hello_msg_t *hello)
1745 if ((hello->kshm_magic == LNET_PROTO_MAGIC &&
1746 hello->kshm_version == KSOCK_PROTO_V2) ||
1747 (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC) &&
1748 hello->kshm_version == __swab32(KSOCK_PROTO_V2))) {
1749 #if SOCKNAL_VERSION_DEBUG
1750 if (*ksocknal_tunables.ksnd_protocol != 2)
1753 return &ksocknal_protocol_v2x;
1756 if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
1757 lnet_magicversion_t *hmv = (lnet_magicversion_t *)hello;
1759 CLASSERT (sizeof (lnet_magicversion_t) ==
1760 offsetof (ksock_hello_msg_t, kshm_src_nid));
1762 if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) &&
1763 hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR))
1764 return &ksocknal_protocol_v1x;
1771 ksocknal_send_hello_v1 (ksock_conn_t *conn, ksock_hello_msg_t *hello)
1773 cfs_socket_t *sock = conn->ksnc_sock;
1775 lnet_magicversion_t *hmv;
1779 CLASSERT(sizeof(lnet_magicversion_t) == offsetof(lnet_hdr_t, src_nid));
1781 LIBCFS_ALLOC(hdr, sizeof(*hdr));
1783 CERROR("Can't allocate lnet_hdr_t\n");
1787 hmv = (lnet_magicversion_t *)&hdr->dest_nid;
1789 /* Re-organize V2.x message header to V1.x (lnet_hdr_t)
1790 * header and send out */
1791 hmv->magic = cpu_to_le32 (LNET_PROTO_TCP_MAGIC);
1792 hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR);
1793 hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR);
1795 if (the_lnet.ln_testprotocompat != 0) {
1796 /* single-shot proto check */
1798 if ((the_lnet.ln_testprotocompat & 1) != 0) {
1799 hmv->version_major++; /* just different! */
1800 the_lnet.ln_testprotocompat &= ~1;
1802 if ((the_lnet.ln_testprotocompat & 2) != 0) {
1803 hmv->magic = LNET_PROTO_MAGIC;
1804 the_lnet.ln_testprotocompat &= ~2;
1809 hdr->src_nid = cpu_to_le64 (hello->kshm_src_nid);
1810 hdr->src_pid = cpu_to_le32 (hello->kshm_src_pid);
1811 hdr->type = cpu_to_le32 (LNET_MSG_HELLO);
1812 hdr->payload_length = cpu_to_le32 (hello->kshm_nips * sizeof(__u32));
1813 hdr->msg.hello.type = cpu_to_le32 (hello->kshm_ctype);
1814 hdr->msg.hello.incarnation = cpu_to_le64 (hello->kshm_src_incarnation);
1816 rc = libcfs_sock_write(sock, hdr, sizeof(*hdr), lnet_acceptor_timeout());
1819 CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
1820 rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1824 if (hello->kshm_nips == 0)
1827 for (i = 0; i < hello->kshm_nips; i++) {
1828 hello->kshm_ips[i] = __cpu_to_le32 (hello->kshm_ips[i]);
1831 rc = libcfs_sock_write(sock, hello->kshm_ips,
1832 hello->kshm_nips * sizeof(__u32),
1833 lnet_acceptor_timeout());
1835 CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)"
1836 " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips,
1837 HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1840 LIBCFS_FREE(hdr, sizeof(*hdr));
1846 ksocknal_send_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello)
1848 cfs_socket_t *sock = conn->ksnc_sock;
1851 hello->kshm_magic = LNET_PROTO_MAGIC;
1852 hello->kshm_version = KSOCK_PROTO_V2;
1854 if (the_lnet.ln_testprotocompat != 0) {
1855 /* single-shot proto check */
1857 if ((the_lnet.ln_testprotocompat & 1) != 0) {
1858 hello->kshm_version++; /* just different! */
1859 the_lnet.ln_testprotocompat &= ~1;
1864 rc = libcfs_sock_write(sock, hello, offsetof(ksock_hello_msg_t, kshm_ips),
1865 lnet_acceptor_timeout());
1868 CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
1869 rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1873 if (hello->kshm_nips == 0)
1876 rc = libcfs_sock_write(sock, hello->kshm_ips,
1877 hello->kshm_nips * sizeof(__u32),
1878 lnet_acceptor_timeout());
1880 CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)"
1881 " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips,
1882 HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1889 ksocknal_recv_hello_v1(ksock_conn_t *conn, ksock_hello_msg_t *hello,int timeout)
1891 cfs_socket_t *sock = conn->ksnc_sock;
1896 LIBCFS_ALLOC(hdr, sizeof(*hdr));
1898 CERROR("Can't allocate lnet_hdr_t\n");
1902 rc = libcfs_sock_read(sock, &hdr->src_nid,
1903 sizeof (*hdr) - offsetof (lnet_hdr_t, src_nid),
1906 CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
1907 rc, HIPQUAD(conn->ksnc_ipaddr));
1908 LASSERT (rc < 0 && rc != -EALREADY);
1912 /* ...and check we got what we expected */
1913 if (hdr->type != cpu_to_le32 (LNET_MSG_HELLO)) {
1914 CERROR ("Expecting a HELLO hdr,"
1915 " but got type %d from %u.%u.%u.%u\n",
1916 le32_to_cpu (hdr->type),
1917 HIPQUAD(conn->ksnc_ipaddr));
1922 hello->kshm_src_nid = le64_to_cpu (hdr->src_nid);
1923 hello->kshm_src_pid = le32_to_cpu (hdr->src_pid);
1924 hello->kshm_src_incarnation = le64_to_cpu (hdr->msg.hello.incarnation);
1925 hello->kshm_ctype = le32_to_cpu (hdr->msg.hello.type);
1926 hello->kshm_nips = le32_to_cpu (hdr->payload_length) /
1929 if (hello->kshm_nips > LNET_MAX_INTERFACES) {
1930 CERROR("Bad nips %d from ip %u.%u.%u.%u\n",
1931 hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr));
1936 if (hello->kshm_nips == 0)
1939 rc = libcfs_sock_read(sock, hello->kshm_ips,
1940 hello->kshm_nips * sizeof(__u32), timeout);
1942 CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n",
1943 rc, HIPQUAD(conn->ksnc_ipaddr));
1944 LASSERT (rc < 0 && rc != -EALREADY);
1948 for (i = 0; i < hello->kshm_nips; i++) {
1949 hello->kshm_ips[i] = __le32_to_cpu(hello->kshm_ips[i]);
1951 if (hello->kshm_ips[i] == 0) {
1952 CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n",
1953 i, HIPQUAD(conn->ksnc_ipaddr));
1959 LIBCFS_FREE(hdr, sizeof(*hdr));
1965 ksocknal_recv_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello, int timeout)
1967 cfs_socket_t *sock = conn->ksnc_sock;
1971 if (hello->kshm_magic == LNET_PROTO_MAGIC)
1972 conn->ksnc_flip = 0;
1974 conn->ksnc_flip = 1;
1976 rc = libcfs_sock_read(sock, &hello->kshm_src_nid,
1977 offsetof(ksock_hello_msg_t, kshm_ips) -
1978 offsetof(ksock_hello_msg_t, kshm_src_nid),
1981 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
1982 rc, HIPQUAD(conn->ksnc_ipaddr));
1983 LASSERT (rc < 0 && rc != -EALREADY);
1987 if (conn->ksnc_flip) {
1988 __swab32s(&hello->kshm_src_pid);
1989 __swab64s(&hello->kshm_src_nid);
1990 __swab32s(&hello->kshm_dst_pid);
1991 __swab64s(&hello->kshm_dst_nid);
1992 __swab64s(&hello->kshm_src_incarnation);
1993 __swab64s(&hello->kshm_dst_incarnation);
1994 __swab32s(&hello->kshm_ctype);
1995 __swab32s(&hello->kshm_nips);
1998 if (hello->kshm_nips > LNET_MAX_INTERFACES) {
1999 CERROR("Bad nips %d from ip %u.%u.%u.%u\n",
2000 hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr));
2004 if (hello->kshm_nips == 0)
2007 rc = libcfs_sock_read(sock, hello->kshm_ips,
2008 hello->kshm_nips * sizeof(__u32), timeout);
2010 CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n",
2011 rc, HIPQUAD(conn->ksnc_ipaddr));
2012 LASSERT (rc < 0 && rc != -EALREADY);
2016 for (i = 0; i < hello->kshm_nips; i++) {
2017 if (conn->ksnc_flip)
2018 __swab32s(&hello->kshm_ips[i]);
2020 if (hello->kshm_ips[i] == 0) {
2021 CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n",
2022 i, HIPQUAD(conn->ksnc_ipaddr));
2031 ksocknal_pack_msg_v1(ksock_tx_t *tx)
2033 /* V1.x has no KSOCK_MSG_NOOP */
2034 LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
2035 LASSERT(tx->tx_lnetmsg != NULL);
2037 tx->tx_iov[0].iov_base = (void *)&tx->tx_lnetmsg->msg_hdr;
2038 tx->tx_iov[0].iov_len = sizeof(lnet_hdr_t);
2040 tx->tx_resid = tx->tx_nob = tx->tx_lnetmsg->msg_len + sizeof(lnet_hdr_t);
2044 ksocknal_pack_msg_v2(ksock_tx_t *tx)
2046 tx->tx_iov[0].iov_base = (void *)&tx->tx_msg;
2048 if (tx->tx_lnetmsg != NULL) {
2049 LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
2051 tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = tx->tx_lnetmsg->msg_hdr;
2052 tx->tx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload);
2053 tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
2054 tx->tx_lnetmsg->msg_len;
2056 LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
2058 tx->tx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
2059 tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
2061 /* Don't checksum before start sending, because packet can be piggybacked with ACK */
2065 ksocknal_unpack_msg_v1(ksock_msg_t *msg)
2067 msg->ksm_type = KSOCK_MSG_LNET;
2069 msg->ksm_zc_req_cookie = 0;
2070 msg->ksm_zc_ack_cookie = 0;
2074 ksocknal_unpack_msg_v2(ksock_msg_t *msg)
2076 return; /* Do nothing */
2079 ksock_proto_t ksocknal_protocol_v1x =
2082 ksocknal_send_hello_v1,
2083 ksocknal_recv_hello_v1,
2084 ksocknal_pack_msg_v1,
2085 ksocknal_unpack_msg_v1
2088 ksock_proto_t ksocknal_protocol_v2x =
2091 ksocknal_send_hello_v2,
2092 ksocknal_recv_hello_v2,
2093 ksocknal_pack_msg_v2,
2094 ksocknal_unpack_msg_v2
2098 ksocknal_send_hello (lnet_ni_t *ni, ksock_conn_t *conn,
2099 lnet_nid_t peer_nid, ksock_hello_msg_t *hello)
2101 /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
2102 ksock_net_t *net = (ksock_net_t *)ni->ni_data;
2105 LASSERT (0 <= hello->kshm_nips && hello->kshm_nips <= LNET_MAX_INTERFACES);
2107 /* No need for getconnsock/putconnsock */
2108 LASSERT (!conn->ksnc_closing);
2109 LASSERT (conn->ksnc_proto != NULL);
2111 srcnid = lnet_ptlcompat_srcnid(ni->ni_nid, peer_nid);
2113 hello->kshm_src_nid = srcnid;
2114 hello->kshm_dst_nid = peer_nid;
2115 hello->kshm_src_pid = the_lnet.ln_pid;
2117 hello->kshm_src_incarnation = net->ksnn_incarnation;
2118 hello->kshm_ctype = conn->ksnc_type;
2120 return conn->ksnc_proto->pro_send_hello(conn, hello);
2124 ksocknal_invert_type(int type)
2128 case SOCKLND_CONN_ANY:
2129 case SOCKLND_CONN_CONTROL:
2131 case SOCKLND_CONN_BULK_IN:
2132 return SOCKLND_CONN_BULK_OUT;
2133 case SOCKLND_CONN_BULK_OUT:
2134 return SOCKLND_CONN_BULK_IN;
2136 return (SOCKLND_CONN_NONE);
2141 ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn,
2142 ksock_hello_msg_t *hello, lnet_process_id_t *peerid,
2145 /* Return < 0 fatal error
2147 * EALREADY lost connection race
2148 * EPROTO protocol version mismatch
2150 cfs_socket_t *sock = conn->ksnc_sock;
2151 int active = (conn->ksnc_proto != NULL);
2155 ksock_proto_t *proto;
2156 lnet_process_id_t recv_id;
2158 /* socket type set on active connections - not set on passive */
2159 LASSERT (!active == !(conn->ksnc_type != SOCKLND_CONN_NONE));
2161 timeout = active ? *ksocknal_tunables.ksnd_timeout :
2162 lnet_acceptor_timeout();
2164 rc = libcfs_sock_read(sock, &hello->kshm_magic, sizeof (hello->kshm_magic), timeout);
2166 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
2167 rc, HIPQUAD(conn->ksnc_ipaddr));
2172 if (hello->kshm_magic != LNET_PROTO_MAGIC &&
2173 hello->kshm_magic != __swab32(LNET_PROTO_MAGIC) &&
2174 hello->kshm_magic != le32_to_cpu (LNET_PROTO_TCP_MAGIC)) {
2175 /* Unexpected magic! */
2177 the_lnet.ln_ptlcompat == 0) {
2178 CERROR ("Bad magic(1) %#08x (%#08x expected) from "
2179 "%u.%u.%u.%u\n", __cpu_to_le32 (hello->kshm_magic),
2180 LNET_PROTO_TCP_MAGIC,
2181 HIPQUAD(conn->ksnc_ipaddr));
2185 /* When portals compatibility is set, I may be passed a new
2186 * connection "blindly" by the acceptor, and I have to
2187 * determine if my peer has sent an acceptor connection request
2188 * or not. This isn't a 'hello', so I'll get the acceptor to
2190 rc = lnet_accept(ni, sock, hello->kshm_magic);
2194 /* ...and if it's OK I'm back to looking for a 'hello'... */
2195 rc = libcfs_sock_read(sock, &hello->kshm_magic,
2196 sizeof (hello->kshm_magic), timeout);
2198 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
2199 rc, HIPQUAD(conn->ksnc_ipaddr));
2204 /* Only need to check V1.x magic */
2205 if (hello->kshm_magic != le32_to_cpu (LNET_PROTO_TCP_MAGIC)) {
2206 CERROR ("Bad magic(2) %#08x (%#08x expected) from "
2207 "%u.%u.%u.%u\n", __cpu_to_le32 (hello->kshm_magic),
2208 LNET_PROTO_TCP_MAGIC,
2209 HIPQUAD(conn->ksnc_ipaddr));
2214 rc = libcfs_sock_read(sock, &hello->kshm_version,
2215 sizeof(hello->kshm_version), timeout);
2217 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
2218 rc, HIPQUAD(conn->ksnc_ipaddr));
2223 proto = ksocknal_parse_proto_version(hello);
2224 if (proto == NULL) {
2226 /* unknown protocol from peer, tell peer my protocol */
2227 conn->ksnc_proto = &ksocknal_protocol_v2x;
2228 #if SOCKNAL_VERSION_DEBUG
2229 if (*ksocknal_tunables.ksnd_protocol != 2)
2230 conn->ksnc_proto = &ksocknal_protocol_v1x;
2232 hello->kshm_nips = 0;
2233 ksocknal_send_hello(ni, conn, ni->ni_nid, hello);
2236 CERROR ("Unknown protocol version (%d.x expected)"
2237 " from %u.%u.%u.%u\n",
2238 conn->ksnc_proto->pro_version,
2239 HIPQUAD(conn->ksnc_ipaddr));
2244 proto_match = (conn->ksnc_proto == proto);
2245 conn->ksnc_proto = proto;
2247 /* receive the rest of hello message anyway */
2248 rc = conn->ksnc_proto->pro_recv_hello(conn, hello, timeout);
2250 CERROR("Error %d reading or checking hello from from %u.%u.%u.%u\n",
2251 rc, HIPQUAD(conn->ksnc_ipaddr));
2256 *incarnation = hello->kshm_src_incarnation;
2258 if (hello->kshm_src_nid == LNET_NID_ANY) {
2259 CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY"
2260 "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr));
2264 if (conn->ksnc_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
2265 /* Userspace NAL assigns peer process ID from socket */
2266 recv_id.pid = conn->ksnc_port | LNET_PID_USERFLAG;
2267 recv_id.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), conn->ksnc_ipaddr);
2269 recv_id.nid = hello->kshm_src_nid;
2271 if (the_lnet.ln_ptlcompat > 1 && /* portals peers may exist */
2272 LNET_NIDNET(recv_id.nid) == 0) /* this is one */
2273 recv_id.pid = the_lnet.ln_pid; /* give it a sensible pid */
2275 recv_id.pid = hello->kshm_src_pid;
2282 /* peer determines type */
2283 conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype);
2284 if (conn->ksnc_type == SOCKLND_CONN_NONE) {
2285 CERROR ("Unexpected type %d from %s ip %u.%u.%u.%u\n",
2286 hello->kshm_ctype, libcfs_id2str(*peerid),
2287 HIPQUAD(conn->ksnc_ipaddr));
2294 if (peerid->pid != recv_id.pid ||
2295 !lnet_ptlcompat_matchnid(peerid->nid, recv_id.nid)) {
2296 LCONSOLE_ERROR_MSG(0x130, "Connected successfully to %s on host"
2297 " %u.%u.%u.%u, but they claimed they were "
2298 "%s; please check your Lustre "
2300 libcfs_id2str(*peerid),
2301 HIPQUAD(conn->ksnc_ipaddr),
2302 libcfs_id2str(recv_id));
2306 if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
2307 /* Possible protocol mismatch or I lost the connection race */
2308 return proto_match ? EALREADY : EPROTO;
2311 if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) {
2312 CERROR ("Mismatched types: me %d, %s ip %u.%u.%u.%u %d\n",
2313 conn->ksnc_type, libcfs_id2str(*peerid),
2314 HIPQUAD(conn->ksnc_ipaddr),
2323 ksocknal_connect (ksock_route_t *route)
2325 CFS_LIST_HEAD (zombies);
2326 ksock_peer_t *peer = route->ksnr_peer;
2330 cfs_time_t deadline;
2331 int retry_later = 0;
2334 deadline = cfs_time_add(cfs_time_current(),
2335 cfs_time_seconds(*ksocknal_tunables.ksnd_timeout));
2337 write_lock_bh (&ksocknal_data.ksnd_global_lock);
2339 LASSERT (route->ksnr_scheduled);
2340 LASSERT (!route->ksnr_connecting);
2342 route->ksnr_connecting = 1;
2345 wanted = ksocknal_route_mask() & ~route->ksnr_connected;
2347 /* stop connecting if peer/route got closed under me, or
2348 * route got connected while queued */
2349 if (peer->ksnp_closing || route->ksnr_deleted ||
2355 /* reschedule if peer is connecting to me */
2356 if (peer->ksnp_accepting > 0) {
2358 "peer %s(%d) already connecting to me, retry later.\n",
2359 libcfs_nid2str(peer->ksnp_id.nid), peer->ksnp_accepting);
2363 if (retry_later) /* needs reschedule */
2366 if ((wanted & (1 << SOCKLND_CONN_ANY)) != 0) {
2367 type = SOCKLND_CONN_ANY;
2368 } else if ((wanted & (1 << SOCKLND_CONN_CONTROL)) != 0) {
2369 type = SOCKLND_CONN_CONTROL;
2370 } else if ((wanted & (1 << SOCKLND_CONN_BULK_IN)) != 0) {
2371 type = SOCKLND_CONN_BULK_IN;
2373 LASSERT ((wanted & (1 << SOCKLND_CONN_BULK_OUT)) != 0);
2374 type = SOCKLND_CONN_BULK_OUT;
2377 write_unlock_bh (&ksocknal_data.ksnd_global_lock);
2379 if (cfs_time_aftereq(cfs_time_current(), deadline)) {
2381 lnet_connect_console_error(rc, peer->ksnp_id.nid,
2387 rc = lnet_connect(&sock, peer->ksnp_id.nid,
2388 route->ksnr_myipaddr,
2389 route->ksnr_ipaddr, route->ksnr_port);
2393 rc = ksocknal_create_conn(peer->ksnp_ni, route, sock, type);
2395 lnet_connect_console_error(rc, peer->ksnp_id.nid,
2401 /* A +ve RC means I have to retry because I lost the connection
2402 * race or I have to renegotiate protocol version */
2403 retry_later = (rc != 0);
2405 CDEBUG(D_NET, "peer %s: conn race, retry later.\n",
2406 libcfs_nid2str(peer->ksnp_id.nid));
2408 write_lock_bh (&ksocknal_data.ksnd_global_lock);
2411 route->ksnr_scheduled = 0;
2412 route->ksnr_connecting = 0;
2415 /* re-queue for attention; this frees me up to handle
2416 * the peer's incoming connection request */
2417 ksocknal_launch_connection_locked(route);
2420 write_unlock_bh (&ksocknal_data.ksnd_global_lock);
2424 write_lock_bh (&ksocknal_data.ksnd_global_lock);
2426 route->ksnr_scheduled = 0;
2427 route->ksnr_connecting = 0;
2429 /* This is a retry rather than a new connection */
2430 route->ksnr_retry_interval *= 2;
2431 route->ksnr_retry_interval =
2432 MAX(route->ksnr_retry_interval,
2433 cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms)/1000);
2434 route->ksnr_retry_interval =
2435 MIN(route->ksnr_retry_interval,
2436 cfs_time_seconds(*ksocknal_tunables.ksnd_max_reconnectms)/1000);
2438 LASSERT (route->ksnr_retry_interval != 0);
2439 route->ksnr_timeout = cfs_time_add(cfs_time_current(),
2440 route->ksnr_retry_interval);
2442 if (!list_empty(&peer->ksnp_tx_queue) &&
2443 peer->ksnp_accepting == 0 &&
2444 ksocknal_find_connecting_route_locked(peer) == NULL) {
2445 /* ksnp_tx_queue is queued on a conn on successful
2447 LASSERT (list_empty (&peer->ksnp_conns));
2449 /* take all the blocked packets while I've got the lock and
2450 * complete below... */
2451 list_add(&zombies, &peer->ksnp_tx_queue);
2452 list_del_init(&peer->ksnp_tx_queue);
2455 #if 0 /* irrelevent with only eager routes */
2456 if (!route->ksnr_deleted) {
2457 /* make this route least-favourite for re-selection */
2458 list_del(&route->ksnr_list);
2459 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2462 write_unlock_bh (&ksocknal_data.ksnd_global_lock);
2464 ksocknal_peer_failed(peer);
2465 ksocknal_txlist_done(peer->ksnp_ni, &zombies, 1);
2469 ksocknal_connd_connect_route_locked(void)
2471 /* Only handle an outgoing connection request if there is someone left
2472 * to handle incoming connections */
2473 return !list_empty(&ksocknal_data.ksnd_connd_routes) &&
2474 ((ksocknal_data.ksnd_connd_connecting + 1) <
2475 *ksocknal_tunables.ksnd_nconnds);
2479 ksocknal_connd_ready(void)
2483 spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
2485 rc = ksocknal_data.ksnd_shuttingdown ||
2486 !list_empty(&ksocknal_data.ksnd_connd_connreqs) ||
2487 ksocknal_connd_connect_route_locked();
2489 spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
2495 ksocknal_connd (void *arg)
2497 long id = (long)arg;
2499 ksock_connreq_t *cr;
2500 ksock_route_t *route;
2502 snprintf (name, sizeof (name), "socknal_cd%02ld", id);
2503 cfs_daemonize (name);
2504 cfs_block_allsigs ();
2506 spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
2508 while (!ksocknal_data.ksnd_shuttingdown) {
2510 if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) {
2511 /* Connection accepted by the listener */
2512 cr = list_entry(ksocknal_data.ksnd_connd_connreqs.next,
2513 ksock_connreq_t, ksncr_list);
2515 list_del(&cr->ksncr_list);
2516 spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
2518 ksocknal_create_conn(cr->ksncr_ni, NULL,
2519 cr->ksncr_sock, SOCKLND_CONN_NONE);
2520 lnet_ni_decref(cr->ksncr_ni);
2521 LIBCFS_FREE(cr, sizeof(*cr));
2523 spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
2526 if (ksocknal_connd_connect_route_locked()) {
2527 /* Connection request */
2528 route = list_entry (ksocknal_data.ksnd_connd_routes.next,
2529 ksock_route_t, ksnr_connd_list);
2531 list_del (&route->ksnr_connd_list);
2532 ksocknal_data.ksnd_connd_connecting++;
2533 spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
2535 ksocknal_connect (route);
2536 ksocknal_route_decref(route);
2538 spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
2539 ksocknal_data.ksnd_connd_connecting--;
2542 spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
2544 wait_event_interruptible_exclusive(
2545 ksocknal_data.ksnd_connd_waitq,
2546 ksocknal_connd_ready());
2548 spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
2551 spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
2553 ksocknal_thread_fini ();
2558 ksocknal_find_timed_out_conn (ksock_peer_t *peer)
2560 /* We're called with a shared lock on ksnd_global_lock */
2562 struct list_head *ctmp;
2564 list_for_each (ctmp, &peer->ksnp_conns) {
2566 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2568 /* Don't need the {get,put}connsock dance to deref ksnc_sock */
2569 LASSERT (!conn->ksnc_closing);
2571 /* SOCK_ERROR will reset error code of socket in
2572 * some platform (like Darwin8.x) */
2573 error = SOCK_ERROR(conn->ksnc_sock);
2575 ksocknal_conn_addref(conn);
2579 CDEBUG(D_NETERROR, "A connection with %s "
2580 "(%u.%u.%u.%u:%d) was reset; "
2581 "it may have rebooted.\n",
2582 libcfs_id2str(peer->ksnp_id),
2583 HIPQUAD(conn->ksnc_ipaddr),
2587 CDEBUG(D_NETERROR, "A connection with %s "
2588 "(%u.%u.%u.%u:%d) timed out; the "
2589 "network or node may be down.\n",
2590 libcfs_id2str(peer->ksnp_id),
2591 HIPQUAD(conn->ksnc_ipaddr),
2595 CDEBUG(D_NETERROR, "An unexpected network error %d "
2597 "(%u.%u.%u.%u:%d\n", error,
2598 libcfs_id2str(peer->ksnp_id),
2599 HIPQUAD(conn->ksnc_ipaddr),
2607 if (conn->ksnc_rx_started &&
2608 cfs_time_aftereq(cfs_time_current(),
2609 conn->ksnc_rx_deadline)) {
2610 /* Timed out incomplete incoming message */
2611 ksocknal_conn_addref(conn);
2612 CDEBUG(D_NETERROR, "Timeout receiving from %s "
2613 "(%u.%u.%u.%u:%d), state %d wanted %d left %d\n",
2614 libcfs_id2str(peer->ksnp_id),
2615 HIPQUAD(conn->ksnc_ipaddr),
2617 conn->ksnc_rx_state,
2618 conn->ksnc_rx_nob_wanted,
2619 conn->ksnc_rx_nob_left);
2623 if ((!list_empty(&conn->ksnc_tx_queue) ||
2624 SOCK_WMEM_QUEUED(conn->ksnc_sock) != 0) &&
2625 cfs_time_aftereq(cfs_time_current(),
2626 conn->ksnc_tx_deadline)) {
2627 /* Timed out messages queued for sending or
2628 * buffered in the socket's send buffer */
2629 ksocknal_conn_addref(conn);
2630 CDEBUG(D_NETERROR, "Timeout sending data to %s "
2631 "(%u.%u.%u.%u:%d) the network or that "
2632 "node may be down.\n",
2633 libcfs_id2str(peer->ksnp_id),
2634 HIPQUAD(conn->ksnc_ipaddr),
2644 ksocknal_check_peer_timeouts (int idx)
2646 struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2647 struct list_head *ptmp;
2652 /* NB. We expect to have a look at all the peers and not find any
2653 * connections to time out, so we just use a shared lock while we
2655 read_lock (&ksocknal_data.ksnd_global_lock);
2657 list_for_each (ptmp, peers) {
2658 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2659 conn = ksocknal_find_timed_out_conn (peer);
2662 read_unlock (&ksocknal_data.ksnd_global_lock);
2664 ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2666 /* NB we won't find this one again, but we can't
2667 * just proceed with the next peer, since we dropped
2668 * ksnd_global_lock and it might be dead already! */
2669 ksocknal_conn_decref(conn);
2674 read_unlock (&ksocknal_data.ksnd_global_lock);
2678 ksocknal_reaper (void *arg)
2680 cfs_waitlink_t wait;
2682 ksock_sched_t *sched;
2683 struct list_head enomem_conns;
2685 cfs_duration_t timeout;
2688 cfs_time_t deadline = cfs_time_current();
2690 cfs_daemonize ("socknal_reaper");
2691 cfs_block_allsigs ();
2693 CFS_INIT_LIST_HEAD(&enomem_conns);
2694 cfs_waitlink_init (&wait);
2696 spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
2698 while (!ksocknal_data.ksnd_shuttingdown) {
2700 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2701 conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2702 ksock_conn_t, ksnc_list);
2703 list_del (&conn->ksnc_list);
2705 spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
2707 ksocknal_terminate_conn (conn);
2708 ksocknal_conn_decref(conn);
2710 spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
2714 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2715 conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2716 ksock_conn_t, ksnc_list);
2717 list_del (&conn->ksnc_list);
2719 spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
2721 ksocknal_destroy_conn (conn);
2723 spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
2727 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2728 list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2729 list_del_init(&ksocknal_data.ksnd_enomem_conns);
2732 spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
2734 /* reschedule all the connections that stalled with ENOMEM... */
2736 while (!list_empty (&enomem_conns)) {
2737 conn = list_entry (enomem_conns.next,
2738 ksock_conn_t, ksnc_tx_list);
2739 list_del (&conn->ksnc_tx_list);
2741 sched = conn->ksnc_scheduler;
2743 spin_lock_bh (&sched->kss_lock);
2745 LASSERT (conn->ksnc_tx_scheduled);
2746 conn->ksnc_tx_ready = 1;
2747 list_add_tail(&conn->ksnc_tx_list, &sched->kss_tx_conns);
2748 cfs_waitq_signal (&sched->kss_waitq);
2750 spin_unlock_bh (&sched->kss_lock);
2754 /* careful with the jiffy wrap... */
2755 while ((timeout = cfs_time_sub(deadline,
2756 cfs_time_current())) <= 0) {
2759 int chunk = ksocknal_data.ksnd_peer_hash_size;
2761 /* Time to check for timeouts on a few more peers: I do
2762 * checks every 'p' seconds on a proportion of the peer
2763 * table and I need to check every connection 'n' times
2764 * within a timeout interval, to ensure I detect a
2765 * timeout on any connection within (n+1)/n times the
2766 * timeout interval. */
2768 if (*ksocknal_tunables.ksnd_timeout > n * p)
2769 chunk = (chunk * n * p) /
2770 *ksocknal_tunables.ksnd_timeout;
2774 for (i = 0; i < chunk; i++) {
2775 ksocknal_check_peer_timeouts (peer_index);
2776 peer_index = (peer_index + 1) %
2777 ksocknal_data.ksnd_peer_hash_size;
2780 deadline = cfs_time_add(deadline, cfs_time_seconds(p));
2783 if (nenomem_conns != 0) {
2784 /* Reduce my timeout if I rescheduled ENOMEM conns.
2785 * This also prevents me getting woken immediately
2786 * if any go back on my enomem list. */
2787 timeout = SOCKNAL_ENOMEM_RETRY;
2789 ksocknal_data.ksnd_reaper_waketime =
2790 cfs_time_add(cfs_time_current(), timeout);
2792 set_current_state (TASK_INTERRUPTIBLE);
2793 cfs_waitq_add (&ksocknal_data.ksnd_reaper_waitq, &wait);
2795 if (!ksocknal_data.ksnd_shuttingdown &&
2796 list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2797 list_empty (&ksocknal_data.ksnd_zombie_conns))
2798 cfs_waitq_timedwait (&wait, CFS_TASK_INTERRUPTIBLE, timeout);
2800 set_current_state (TASK_RUNNING);
2801 cfs_waitq_del (&ksocknal_data.ksnd_reaper_waitq, &wait);
2803 spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
2806 spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
2808 ksocknal_thread_fini ();