4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/ulnds/socklnd/conn.c
36 * Author: Maxim Patlasov <maxim@clusterfs.com>
41 /* Return 1 if the conn is timed out, 0 else */
43 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
45 if (conn->uc_tx_flag && /* sending is in progress */
46 cfs_time_aftereq(current_time, conn->uc_tx_deadline))
49 if (conn->uc_rx_flag && /* receiving is in progress */
50 cfs_time_aftereq(current_time, conn->uc_rx_deadline))
57 usocklnd_conn_kill(usock_conn_t *conn)
59 pthread_mutex_lock(&conn->uc_lock);
60 if (conn->uc_state != UC_DEAD)
61 usocklnd_conn_kill_locked(conn);
62 pthread_mutex_unlock(&conn->uc_lock);
65 /* Mark the conn as DEAD and schedule its deletion */
67 usocklnd_conn_kill_locked(usock_conn_t *conn)
69 conn->uc_rx_flag = conn->uc_tx_flag = 0;
70 conn->uc_state = UC_DEAD;
71 usocklnd_add_killrequest(conn);
75 usocklnd_conn_allocate()
78 usock_pollrequest_t *pr;
80 LIBCFS_ALLOC (pr, sizeof(*pr));
84 LIBCFS_ALLOC (conn, sizeof(*conn));
86 LIBCFS_FREE (pr, sizeof(*pr));
89 memset(conn, 0, sizeof(*conn));
92 LIBCFS_ALLOC (conn->uc_rx_hello,
93 offsetof(ksock_hello_msg_t,
94 kshm_ips[LNET_MAX_INTERFACES]));
95 if (conn->uc_rx_hello == NULL) {
96 LIBCFS_FREE (pr, sizeof(*pr));
97 LIBCFS_FREE (conn, sizeof(*conn));
105 usocklnd_conn_free(usock_conn_t *conn)
107 usock_pollrequest_t *pr = conn->uc_preq;
110 LIBCFS_FREE (pr, sizeof(*pr));
112 if (conn->uc_rx_hello != NULL)
113 LIBCFS_FREE (conn->uc_rx_hello,
114 offsetof(ksock_hello_msg_t,
115 kshm_ips[LNET_MAX_INTERFACES]));
117 LIBCFS_FREE (conn, sizeof(*conn));
121 usocklnd_tear_peer_conn(usock_conn_t *conn)
123 usock_peer_t *peer = conn->uc_peer;
124 int idx = usocklnd_type2idx(conn->uc_type);
126 lnet_process_id_t id;
128 int killall_flag = 0;
129 void *rx_lnetmsg = NULL;
130 CFS_LIST_HEAD (zombie_txs);
132 if (peer == NULL) /* nothing to tear */
135 pthread_mutex_lock(&peer->up_lock);
136 pthread_mutex_lock(&conn->uc_lock);
139 id = peer->up_peerid;
141 if (peer->up_conns[idx] == conn) {
142 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
143 /* change state not to finalize twice */
144 conn->uc_rx_state = UC_RX_KSM_HEADER;
145 /* stash lnetmsg while holding locks */
146 rx_lnetmsg = conn->uc_rx_lnetmsg;
149 /* we cannot finilize txs right now (bug #18844) */
150 cfs_list_splice_init(&conn->uc_tx_list, &zombie_txs);
152 peer->up_conns[idx] = NULL;
153 conn->uc_peer = NULL;
156 if(conn->uc_errored && !peer->up_errored)
157 peer->up_errored = killall_flag = 1;
159 /* prevent queueing new txs to this conn */
160 conn->uc_errored = 1;
163 pthread_mutex_unlock(&conn->uc_lock);
166 usocklnd_del_conns_locked(peer);
168 pthread_mutex_unlock(&peer->up_lock);
173 if (rx_lnetmsg != NULL)
174 lnet_finalize(ni, rx_lnetmsg, -EIO);
176 usocklnd_destroy_txlist(ni, &zombie_txs);
178 usocklnd_conn_decref(conn);
179 usocklnd_peer_decref(peer);
181 usocklnd_check_peer_stale(ni, id);
184 /* Remove peer from hash list if all up_conns[i] is NULL &&
185 * hash table is the only consumer of the peer */
187 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
191 pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
192 peer = usocklnd_find_peer_locked(ni, id);
195 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
199 if (cfs_mt_atomic_read(&peer->up_refcount) == 2) {
201 for (i = 0; i < N_CONN_TYPES; i++)
202 LASSERT (peer->up_conns[i] == NULL);
204 cfs_list_del(&peer->up_list);
206 if (peer->up_errored &&
207 (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
208 lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
209 cfs_time_seconds(peer->up_last_alive));
211 usocklnd_peer_decref(peer);
214 usocklnd_peer_decref(peer);
215 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
218 /* Returns 0 on success, <0 else */
220 usocklnd_create_passive_conn(lnet_ni_t *ni,
221 cfs_socket_t *sock, usock_conn_t **connp)
228 rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
232 LASSERT (peer_port >= 0); /* uc_peer_port is u16 */
234 rc = usocklnd_set_sock_options(sock);
238 conn = usocklnd_conn_allocate();
242 usocklnd_rx_hellomagic_state_transition(conn);
244 conn->uc_sock = sock;
245 conn->uc_peer_ip = peer_ip;
246 conn->uc_peer_port = peer_port;
247 conn->uc_state = UC_RECEIVING_HELLO;
248 conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
250 CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
251 CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
252 pthread_mutex_init(&conn->uc_lock, NULL);
253 cfs_mt_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
259 /* Returns 0 on success, <0 else */
261 usocklnd_create_active_conn(usock_peer_t *peer, int type,
262 usock_conn_t **connp)
267 __u32 dst_ip = LNET_NIDADDR(peer->up_peerid.nid);
268 __u16 dst_port = lnet_acceptor_port();
270 conn = usocklnd_conn_allocate();
274 conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
275 peer->up_peerid.nid);
276 if (conn->uc_tx_hello == NULL) {
277 usocklnd_conn_free(conn);
281 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
282 rc = usocklnd_connect_cli_mode(&sock, dst_ip, dst_port);
284 rc = usocklnd_connect_srv_mode(&sock, dst_ip, dst_port);
287 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
288 usocklnd_conn_free(conn);
292 conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
293 conn->uc_tx_flag = 1;
295 conn->uc_sock = sock;
296 conn->uc_peer_ip = dst_ip;
297 conn->uc_peer_port = dst_port;
298 conn->uc_type = type;
299 conn->uc_activeflag = 1;
300 conn->uc_state = UC_CONNECTING;
301 conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
303 conn->uc_peerid = peer->up_peerid;
304 conn->uc_peer = peer;
306 usocklnd_peer_addref(peer);
307 CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
308 CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
309 pthread_mutex_init(&conn->uc_lock, NULL);
310 cfs_mt_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
316 /* Returns 0 on success, <0 else */
318 usocklnd_connect_srv_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
325 for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
326 port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
328 /* Iterate through reserved ports. */
329 rc = libcfs_sock_create(&sock, &fatal, 0, port);
336 rc = usocklnd_set_sock_options(sock);
338 libcfs_sock_release(sock);
342 rc = libcfs_sock_connect(sock, dst_ip, dst_port);
348 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
349 libcfs_sock_release(sock);
353 libcfs_sock_release(sock);
356 CERROR("Can't bind to any reserved port\n");
360 /* Returns 0 on success, <0 else */
362 usocklnd_connect_cli_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
368 rc = libcfs_sock_create(&sock, &fatal, 0, 0);
372 rc = usocklnd_set_sock_options(sock);
374 libcfs_sock_release(sock);
378 rc = libcfs_sock_connect(sock, dst_ip, dst_port);
380 libcfs_sock_release(sock);
389 usocklnd_set_sock_options(cfs_socket_t *sock)
393 rc = libcfs_sock_set_nagle(sock, usock_tuns.ut_socknagle);
397 if (usock_tuns.ut_sockbufsiz) {
398 rc = libcfs_sock_set_bufsiz(sock, usock_tuns.ut_sockbufsiz);
403 return libcfs_fcntl_nonblock(sock);
407 usocklnd_create_noop_tx(__u64 cookie)
411 LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
415 tx->tx_size = sizeof(usock_tx_t);
416 tx->tx_lnetmsg = NULL;
418 socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
419 tx->tx_msg.ksm_zc_cookies[1] = cookie;
421 tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
422 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
423 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
424 tx->tx_iov = tx->tx_iova;
431 usocklnd_create_tx(lnet_msg_t *lntmsg)
434 unsigned int payload_niov = lntmsg->msg_niov;
435 struct iovec *payload_iov = lntmsg->msg_iov;
436 unsigned int payload_offset = lntmsg->msg_offset;
437 unsigned int payload_nob = lntmsg->msg_len;
438 int size = offsetof(usock_tx_t,
439 tx_iova[1 + payload_niov]);
441 LIBCFS_ALLOC (tx, size);
446 tx->tx_lnetmsg = lntmsg;
448 tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
450 socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
451 tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
452 tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
453 tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
454 tx->tx_iov = tx->tx_iova;
457 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
458 payload_niov, payload_iov,
459 payload_offset, payload_nob);
465 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
466 lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
468 usock_net_t *net = (usock_net_t *)ni->ni_data;
470 hello->kshm_magic = LNET_PROTO_MAGIC;
471 hello->kshm_version = KSOCK_PROTO_V2;
472 hello->kshm_nips = 0;
473 hello->kshm_ctype = type;
475 hello->kshm_dst_incarnation = 0; /* not used */
476 hello->kshm_src_incarnation = net->un_incarnation;
478 hello->kshm_src_pid = the_lnet.ln_pid;
479 hello->kshm_src_nid = ni->ni_nid;
480 hello->kshm_dst_nid = peer_nid;
481 hello->kshm_dst_pid = 0; /* not used */
485 usocklnd_create_hello_tx(lnet_ni_t *ni,
486 int type, lnet_nid_t peer_nid)
490 ksock_hello_msg_t *hello;
492 size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
493 LIBCFS_ALLOC (tx, size);
498 tx->tx_lnetmsg = NULL;
500 hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
501 usocklnd_init_hello_msg(hello, ni, type, peer_nid);
503 tx->tx_iova[0].iov_base = (void *)hello;
504 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
505 offsetof(ksock_hello_msg_t, kshm_ips);
506 tx->tx_iov = tx->tx_iova;
513 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
514 int type, lnet_nid_t peer_nid)
518 lnet_acceptor_connreq_t *cr;
519 ksock_hello_msg_t *hello;
521 size = sizeof(usock_tx_t) +
522 sizeof(lnet_acceptor_connreq_t) +
523 offsetof(ksock_hello_msg_t, kshm_ips);
524 LIBCFS_ALLOC (tx, size);
529 tx->tx_lnetmsg = NULL;
531 cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
532 memset(cr, 0, sizeof(*cr));
533 cr->acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
534 cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
535 cr->acr_nid = peer_nid;
537 hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
538 usocklnd_init_hello_msg(hello, ni, type, peer_nid);
540 tx->tx_iova[0].iov_base = (void *)cr;
541 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
542 sizeof(lnet_acceptor_connreq_t) +
543 offsetof(ksock_hello_msg_t, kshm_ips);
544 tx->tx_iov = tx->tx_iova;
551 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
553 lnet_msg_t *lnetmsg = tx->tx_lnetmsg;
554 int rc = (tx->tx_resid == 0) ? 0 : -EIO;
556 LASSERT (ni != NULL || lnetmsg == NULL);
558 LIBCFS_FREE (tx, tx->tx_size);
560 if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
561 lnet_finalize(ni, lnetmsg, rc);
565 usocklnd_destroy_txlist(lnet_ni_t *ni, cfs_list_t *txlist)
569 while (!cfs_list_empty(txlist)) {
570 tx = cfs_list_entry(txlist->next, usock_tx_t, tx_list);
571 cfs_list_del(&tx->tx_list);
573 usocklnd_destroy_tx(ni, tx);
578 usocklnd_destroy_zcack_list(cfs_list_t *zcack_list)
580 usock_zc_ack_t *zcack;
582 while (!cfs_list_empty(zcack_list)) {
583 zcack = cfs_list_entry(zcack_list->next, usock_zc_ack_t,
585 cfs_list_del(&zcack->zc_list);
587 LIBCFS_FREE (zcack, sizeof(*zcack));
592 usocklnd_destroy_peer(usock_peer_t *peer)
594 usock_net_t *net = peer->up_ni->ni_data;
597 for (i = 0; i < N_CONN_TYPES; i++)
598 LASSERT (peer->up_conns[i] == NULL);
600 LIBCFS_FREE (peer, sizeof (*peer));
602 pthread_mutex_lock(&net->un_lock);
603 if(--net->un_peercount == 0)
604 pthread_cond_signal(&net->un_cond);
605 pthread_mutex_unlock(&net->un_lock);
609 usocklnd_destroy_conn(usock_conn_t *conn)
611 LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
613 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
614 LASSERT (conn->uc_peer != NULL);
615 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
618 if (!cfs_list_empty(&conn->uc_tx_list)) {
619 LASSERT (conn->uc_peer != NULL);
620 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
623 usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
625 if (conn->uc_peer != NULL)
626 usocklnd_peer_decref(conn->uc_peer);
628 if (conn->uc_ni != NULL)
629 lnet_ni_decref(conn->uc_ni);
631 if (conn->uc_tx_hello)
632 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
634 usocklnd_conn_free(conn);
638 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
642 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
643 return SOCKLND_CONN_ANY;
645 nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
647 if (nob >= usock_tuns.ut_min_bulk)
648 return SOCKLND_CONN_BULK_OUT;
650 return SOCKLND_CONN_CONTROL;
653 int usocklnd_type2idx(int type)
656 case SOCKLND_CONN_ANY:
657 case SOCKLND_CONN_CONTROL:
659 case SOCKLND_CONN_BULK_IN:
661 case SOCKLND_CONN_BULK_OUT:
669 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
671 cfs_list_t *peer_list = usocklnd_nid2peerlist(id.nid);
675 cfs_list_for_each (tmp, peer_list) {
677 peer = cfs_list_entry (tmp, usock_peer_t, up_list);
679 if (peer->up_ni != ni)
682 if (peer->up_peerid.nid != id.nid ||
683 peer->up_peerid.pid != id.pid)
686 usocklnd_peer_addref(peer);
693 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
694 usock_peer_t **peerp)
696 usock_net_t *net = ni->ni_data;
700 LIBCFS_ALLOC (peer, sizeof (*peer));
704 for (i = 0; i < N_CONN_TYPES; i++)
705 peer->up_conns[i] = NULL;
707 peer->up_peerid = id;
709 peer->up_incrn_is_set = 0;
710 peer->up_errored = 0;
711 peer->up_last_alive = 0;
712 cfs_mt_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
713 pthread_mutex_init(&peer->up_lock, NULL);
715 pthread_mutex_lock(&net->un_lock);
717 pthread_mutex_unlock(&net->un_lock);
723 /* Safely create new peer if needed. Save result in *peerp.
724 * Returns 0 on success, <0 else */
726 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
727 usock_peer_t **peerp)
732 usock_net_t *net = ni->ni_data;
734 pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
735 peer = usocklnd_find_peer_locked(ni, id);
736 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
739 goto find_or_create_peer_done;
741 rc = usocklnd_create_peer(ni, id, &peer);
745 pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
746 peer2 = usocklnd_find_peer_locked(ni, id);
748 if (net->un_shutdown) {
749 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
750 usocklnd_peer_decref(peer); /* should destroy peer */
751 CERROR("Can't create peer: network shutdown\n");
755 /* peer table will take 1 of my refs on peer */
756 usocklnd_peer_addref(peer);
757 cfs_list_add_tail (&peer->up_list,
758 usocklnd_nid2peerlist(id.nid));
760 usocklnd_peer_decref(peer); /* should destroy peer */
763 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
765 find_or_create_peer_done:
770 /* NB: both peer and conn locks are held */
772 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
774 if (conn->uc_state == UC_READY &&
775 cfs_list_empty(&conn->uc_tx_list) &&
776 cfs_list_empty(&conn->uc_zcack_list) &&
778 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
784 cfs_list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
788 /* NB: both peer and conn locks are held
789 * NB: if sending isn't in progress. the caller *MUST* send tx
790 * immediately after we'll return */
792 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
793 int *send_immediately)
795 if (conn->uc_state == UC_READY &&
796 cfs_list_empty(&conn->uc_tx_list) &&
797 cfs_list_empty(&conn->uc_zcack_list) &&
799 conn->uc_sending = 1;
800 *send_immediately = 1;
804 *send_immediately = 0;
805 cfs_list_add_tail(&tx->tx_list, &conn->uc_tx_list);
808 /* Safely create new conn if needed. Save result in *connp.
809 * Returns 0 on success, <0 else */
811 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
812 usock_conn_t **connp,
813 usock_tx_t *tx, usock_zc_ack_t *zc_ack,
814 int *send_immediately)
819 lnet_pid_t userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
822 type = SOCKLND_CONN_ANY;
824 idx = usocklnd_type2idx(type);
826 pthread_mutex_lock(&peer->up_lock);
827 if (peer->up_conns[idx] != NULL) {
828 conn = peer->up_conns[idx];
829 LASSERT(conn->uc_type == type);
832 CERROR("Refusing to create a connection to "
833 "userspace process %s\n",
834 libcfs_id2str(peer->up_peerid));
836 goto find_or_create_conn_failed;
839 rc = usocklnd_create_active_conn(peer, type, &conn);
841 peer->up_errored = 1;
842 usocklnd_del_conns_locked(peer);
843 goto find_or_create_conn_failed;
846 /* peer takes 1 of conn refcount */
847 usocklnd_link_conn_to_peer(conn, peer, idx);
849 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
851 peer->up_conns[idx] = NULL;
852 usocklnd_conn_decref(conn); /* should destroy conn */
853 goto find_or_create_conn_failed;
855 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
858 pthread_mutex_lock(&conn->uc_lock);
859 LASSERT(conn->uc_peer == peer);
861 LASSERT(tx == NULL || zc_ack == NULL);
863 /* usocklnd_tear_peer_conn() could signal us stop queueing */
864 if (conn->uc_errored) {
866 pthread_mutex_unlock(&conn->uc_lock);
867 goto find_or_create_conn_failed;
870 usocklnd_enqueue_tx(conn, tx, send_immediately);
872 rc = usocklnd_enqueue_zcack(conn, zc_ack);
874 usocklnd_conn_kill_locked(conn);
875 pthread_mutex_unlock(&conn->uc_lock);
876 goto find_or_create_conn_failed;
879 pthread_mutex_unlock(&conn->uc_lock);
881 usocklnd_conn_addref(conn);
882 pthread_mutex_unlock(&peer->up_lock);
887 find_or_create_conn_failed:
888 pthread_mutex_unlock(&peer->up_lock);
893 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
895 peer->up_conns[idx] = conn;
896 peer->up_errored = 0; /* this new fresh conn will try
897 * revitalize even stale errored peer */
901 usocklnd_invert_type(int type)
905 case SOCKLND_CONN_ANY:
906 case SOCKLND_CONN_CONTROL:
908 case SOCKLND_CONN_BULK_IN:
909 return SOCKLND_CONN_BULK_OUT;
910 case SOCKLND_CONN_BULK_OUT:
911 return SOCKLND_CONN_BULK_IN;
913 return SOCKLND_CONN_NONE;
918 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
920 pthread_mutex_lock(&conn->uc_lock);
921 if (conn->uc_state != UC_DEAD)
922 conn->uc_state = new_state;
923 pthread_mutex_unlock(&conn->uc_lock);
926 /* NB: peer is locked by caller */
928 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
929 usock_conn_t *skip_conn)
933 if (!peer->up_incrn_is_set) {
934 peer->up_incarnation = incrn;
935 peer->up_incrn_is_set = 1;
939 if (peer->up_incarnation == incrn)
942 peer->up_incarnation = incrn;
944 for (i = 0; i < N_CONN_TYPES; i++) {
945 usock_conn_t *conn = peer->up_conns[i];
947 if (conn == NULL || conn == skip_conn)
950 pthread_mutex_lock(&conn->uc_lock);
951 LASSERT (conn->uc_peer == peer);
952 conn->uc_peer = NULL;
953 peer->up_conns[i] = NULL;
954 if (conn->uc_state != UC_DEAD)
955 usocklnd_conn_kill_locked(conn);
956 pthread_mutex_unlock(&conn->uc_lock);
958 usocklnd_conn_decref(conn);
959 usocklnd_peer_decref(peer);
963 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
964 * MAGIC part of hello and set uc_rx_state
967 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
969 LASSERT(conn->uc_rx_hello != NULL);
971 conn->uc_rx_niov = 1;
972 conn->uc_rx_iov = conn->uc_rx_iova;
973 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
974 conn->uc_rx_iov[0].iov_len =
975 conn->uc_rx_nob_wanted =
976 conn->uc_rx_nob_left =
977 sizeof(conn->uc_rx_hello->kshm_magic);
979 conn->uc_rx_state = UC_RX_HELLO_MAGIC;
981 conn->uc_rx_flag = 1; /* waiting for incoming hello */
982 conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
985 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
986 * VERSION part of hello and set uc_rx_state
989 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
991 LASSERT(conn->uc_rx_hello != NULL);
993 conn->uc_rx_niov = 1;
994 conn->uc_rx_iov = conn->uc_rx_iova;
995 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
996 conn->uc_rx_iov[0].iov_len =
997 conn->uc_rx_nob_wanted =
998 conn->uc_rx_nob_left =
999 sizeof(conn->uc_rx_hello->kshm_version);
1001 conn->uc_rx_state = UC_RX_HELLO_VERSION;
1004 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
1005 * the rest of hello and set uc_rx_state
1008 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
1010 LASSERT(conn->uc_rx_hello != NULL);
1012 conn->uc_rx_niov = 1;
1013 conn->uc_rx_iov = conn->uc_rx_iova;
1014 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
1015 conn->uc_rx_iov[0].iov_len =
1016 conn->uc_rx_nob_wanted =
1017 conn->uc_rx_nob_left =
1018 offsetof(ksock_hello_msg_t, kshm_ips) -
1019 offsetof(ksock_hello_msg_t, kshm_src_nid);
1021 conn->uc_rx_state = UC_RX_HELLO_BODY;
1024 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1025 * array of IPs and set uc_rx_state
1028 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1030 LASSERT(conn->uc_rx_hello != NULL);
1032 conn->uc_rx_niov = 1;
1033 conn->uc_rx_iov = conn->uc_rx_iova;
1034 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1035 conn->uc_rx_iov[0].iov_len =
1036 conn->uc_rx_nob_wanted =
1037 conn->uc_rx_nob_left =
1038 conn->uc_rx_hello->kshm_nips *
1039 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1041 conn->uc_rx_state = UC_RX_HELLO_IPS;
1044 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1045 * LNET header and set uc_rx_state
1048 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1050 conn->uc_rx_niov = 1;
1051 conn->uc_rx_iov = conn->uc_rx_iova;
1052 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
1053 conn->uc_rx_iov[0].iov_len =
1054 conn->uc_rx_nob_wanted =
1055 conn->uc_rx_nob_left =
1056 sizeof(ksock_lnet_msg_t);
1058 conn->uc_rx_state = UC_RX_LNET_HEADER;
1059 conn->uc_rx_flag = 1;
1062 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1063 * KSM header and set uc_rx_state
1066 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1068 conn->uc_rx_niov = 1;
1069 conn->uc_rx_iov = conn->uc_rx_iova;
1070 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
1071 conn->uc_rx_iov[0].iov_len =
1072 conn->uc_rx_nob_wanted =
1073 conn->uc_rx_nob_left =
1074 offsetof(ksock_msg_t, ksm_u);
1076 conn->uc_rx_state = UC_RX_KSM_HEADER;
1077 conn->uc_rx_flag = 0;
1080 /* RX state transition to UC_RX_SKIPPING: update RX part for
1081 * skipping and set uc_rx_state
1084 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1086 static char skip_buffer[4096];
1089 unsigned int niov = 0;
1091 int nob_to_skip = conn->uc_rx_nob_left;
1093 LASSERT(nob_to_skip != 0);
1095 conn->uc_rx_iov = conn->uc_rx_iova;
1097 /* Set up to skip as much as possible now. If there's more left
1098 * (ran out of iov entries) we'll get called again */
1101 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1103 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1104 conn->uc_rx_iov[niov].iov_len = nob;
1109 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1110 niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1112 conn->uc_rx_niov = niov;
1113 conn->uc_rx_nob_wanted = skipped;
1115 conn->uc_rx_state = UC_RX_SKIPPING;