1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/ulnds/socklnd/conn.c
38 * Author: Maxim Patlasov <maxim@clusterfs.com>
43 /* Return 1 if the conn is timed out, 0 else */
45 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
47 if (conn->uc_tx_flag && /* sending is in progress */
48 cfs_time_aftereq(current_time, conn->uc_tx_deadline))
51 if (conn->uc_rx_flag && /* receiving is in progress */
52 cfs_time_aftereq(current_time, conn->uc_rx_deadline))
59 usocklnd_conn_kill(usock_conn_t *conn)
61 pthread_mutex_lock(&conn->uc_lock);
62 if (conn->uc_state != UC_DEAD)
63 usocklnd_conn_kill_locked(conn);
64 pthread_mutex_unlock(&conn->uc_lock);
67 /* Mark the conn as DEAD and schedule its deletion */
69 usocklnd_conn_kill_locked(usock_conn_t *conn)
71 conn->uc_rx_flag = conn->uc_tx_flag = 0;
72 conn->uc_state = UC_DEAD;
73 usocklnd_add_killrequest(conn);
77 usocklnd_conn_allocate()
80 usock_pollrequest_t *pr;
82 LIBCFS_ALLOC (pr, sizeof(*pr));
86 LIBCFS_ALLOC (conn, sizeof(*conn));
88 LIBCFS_FREE (pr, sizeof(*pr));
91 memset(conn, 0, sizeof(*conn));
94 LIBCFS_ALLOC (conn->uc_rx_hello,
95 offsetof(ksock_hello_msg_t,
96 kshm_ips[LNET_MAX_INTERFACES]));
97 if (conn->uc_rx_hello == NULL) {
98 LIBCFS_FREE (pr, sizeof(*pr));
99 LIBCFS_FREE (conn, sizeof(*conn));
107 usocklnd_conn_free(usock_conn_t *conn)
109 usock_pollrequest_t *pr = conn->uc_preq;
112 LIBCFS_FREE (pr, sizeof(*pr));
114 if (conn->uc_rx_hello != NULL)
115 LIBCFS_FREE (conn->uc_rx_hello,
116 offsetof(ksock_hello_msg_t,
117 kshm_ips[LNET_MAX_INTERFACES]));
119 LIBCFS_FREE (conn, sizeof(*conn));
123 usocklnd_tear_peer_conn(usock_conn_t *conn)
125 usock_peer_t *peer = conn->uc_peer;
126 int idx = usocklnd_type2idx(conn->uc_type);
128 lnet_process_id_t id;
130 int killall_flag = 0;
132 if (peer == NULL) /* nothing to tear */
135 pthread_mutex_lock(&peer->up_lock);
136 pthread_mutex_lock(&conn->uc_lock);
139 id = peer->up_peerid;
141 if (peer->up_conns[idx] == conn) {
142 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
143 /* change state not to finalize twice */
144 conn->uc_rx_state = UC_RX_KSM_HEADER;
145 lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
148 usocklnd_destroy_txlist(peer->up_ni,
151 peer->up_conns[idx] = NULL;
152 conn->uc_peer = NULL;
155 if(conn->uc_errored && !peer->up_errored)
156 peer->up_errored = killall_flag = 1;
159 pthread_mutex_unlock(&conn->uc_lock);
162 usocklnd_del_conns_locked(peer);
164 pthread_mutex_unlock(&peer->up_lock);
169 usocklnd_conn_decref(conn);
170 usocklnd_peer_decref(peer);
172 usocklnd_check_peer_stale(ni, id);
175 /* Remove peer from hash list if all up_conns[i] is NULL &&
176 * hash table is the only consumer of the peer */
178 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
182 pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
183 peer = usocklnd_find_peer_locked(ni, id);
186 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
190 if (cfs_atomic_read(&peer->up_refcount) == 2) {
192 for (i = 0; i < N_CONN_TYPES; i++)
193 LASSERT (peer->up_conns[i] == NULL);
195 list_del(&peer->up_list);
197 if (peer->up_errored &&
198 (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
199 lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
200 cfs_time_seconds(peer->up_last_alive));
202 usocklnd_peer_decref(peer);
205 usocklnd_peer_decref(peer);
206 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
209 /* Returns 0 on success, <0 else */
211 usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
218 rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
222 rc = usocklnd_set_sock_options(fd);
226 conn = usocklnd_conn_allocate();
230 usocklnd_rx_hellomagic_state_transition(conn);
233 conn->uc_peer_ip = peer_ip;
234 conn->uc_peer_port = peer_port;
235 conn->uc_state = UC_RECEIVING_HELLO;
236 conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
238 CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
239 CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
240 pthread_mutex_init(&conn->uc_lock, NULL);
241 cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
247 /* Returns 0 on success, <0 else */
249 usocklnd_create_active_conn(usock_peer_t *peer, int type,
250 usock_conn_t **connp)
255 __u32 dst_ip = LNET_NIDADDR(peer->up_peerid.nid);
256 __u16 dst_port = lnet_acceptor_port();
258 conn = usocklnd_conn_allocate();
262 conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
263 peer->up_peerid.nid);
264 if (conn->uc_tx_hello == NULL) {
265 usocklnd_conn_free(conn);
269 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
270 rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
272 rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
275 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
276 usocklnd_conn_free(conn);
280 conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
281 conn->uc_tx_flag = 1;
284 conn->uc_peer_ip = dst_ip;
285 conn->uc_peer_port = dst_port;
286 conn->uc_type = type;
287 conn->uc_activeflag = 1;
288 conn->uc_state = UC_CONNECTING;
289 conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
291 conn->uc_peerid = peer->up_peerid;
292 conn->uc_peer = peer;
293 usocklnd_peer_addref(peer);
294 CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
295 CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
296 pthread_mutex_init(&conn->uc_lock, NULL);
297 cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
303 /* Returns 0 on success, <0 else */
305 usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
311 for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
312 port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
314 /* Iterate through reserved ports. */
316 rc = libcfs_sock_create(&fd);
320 rc = libcfs_sock_bind_to_port(fd, port);
326 rc = usocklnd_set_sock_options(fd);
332 rc = libcfs_sock_connect(fd, dst_ip, dst_port);
338 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
346 CERROR("Can't bind to any reserved port\n");
350 /* Returns 0 on success, <0 else */
352 usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
357 rc = libcfs_sock_create(&fd);
361 rc = usocklnd_set_sock_options(fd);
367 rc = libcfs_sock_connect(fd, dst_ip, dst_port);
378 usocklnd_set_sock_options(int fd)
382 rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
386 if (usock_tuns.ut_sockbufsiz) {
387 rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
392 return libcfs_fcntl_nonblock(fd);
396 usocklnd_init_msg(ksock_msg_t *msg, int type)
398 msg->ksm_type = type;
400 msg->ksm_zc_req_cookie = 0;
401 msg->ksm_zc_ack_cookie = 0;
405 usocklnd_create_noop_tx(__u64 cookie)
409 LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
413 tx->tx_size = sizeof(usock_tx_t);
414 tx->tx_lnetmsg = NULL;
416 usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
417 tx->tx_msg.ksm_zc_ack_cookie = cookie;
419 tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
420 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
421 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
422 tx->tx_iov = tx->tx_iova;
429 usocklnd_create_tx(lnet_msg_t *lntmsg)
432 unsigned int payload_niov = lntmsg->msg_niov;
433 struct iovec *payload_iov = lntmsg->msg_iov;
434 unsigned int payload_offset = lntmsg->msg_offset;
435 unsigned int payload_nob = lntmsg->msg_len;
436 int size = offsetof(usock_tx_t,
437 tx_iova[1 + payload_niov]);
439 LIBCFS_ALLOC (tx, size);
444 tx->tx_lnetmsg = lntmsg;
446 tx->tx_resid = tx->tx_nob =
447 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
450 usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
451 tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
452 tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
453 tx->tx_iova[0].iov_len = offsetof(ksock_msg_t,
454 ksm_u.lnetmsg.ksnm_payload);
455 tx->tx_iov = tx->tx_iova;
458 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
459 payload_niov, payload_iov,
460 payload_offset, payload_nob);
466 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
467 lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
469 usock_net_t *net = (usock_net_t *)ni->ni_data;
471 hello->kshm_magic = LNET_PROTO_MAGIC;
472 hello->kshm_version = KSOCK_PROTO_V2;
473 hello->kshm_nips = 0;
474 hello->kshm_ctype = type;
476 hello->kshm_dst_incarnation = 0; /* not used */
477 hello->kshm_src_incarnation = net->un_incarnation;
479 hello->kshm_src_pid = the_lnet.ln_pid;
480 hello->kshm_src_nid = ni->ni_nid;
481 hello->kshm_dst_nid = peer_nid;
482 hello->kshm_dst_pid = 0; /* not used */
486 usocklnd_create_hello_tx(lnet_ni_t *ni,
487 int type, lnet_nid_t peer_nid)
491 ksock_hello_msg_t *hello;
493 size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
494 LIBCFS_ALLOC (tx, size);
499 tx->tx_lnetmsg = NULL;
501 hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
502 usocklnd_init_hello_msg(hello, ni, type, peer_nid);
504 tx->tx_iova[0].iov_base = (void *)hello;
505 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
506 offsetof(ksock_hello_msg_t, kshm_ips);
507 tx->tx_iov = tx->tx_iova;
514 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
515 int type, lnet_nid_t peer_nid)
519 lnet_acceptor_connreq_t *cr;
520 ksock_hello_msg_t *hello;
522 size = sizeof(usock_tx_t) +
523 sizeof(lnet_acceptor_connreq_t) +
524 offsetof(ksock_hello_msg_t, kshm_ips);
525 LIBCFS_ALLOC (tx, size);
530 tx->tx_lnetmsg = NULL;
532 cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
533 memset(cr, 0, sizeof(*cr));
534 cr->acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
535 cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
536 cr->acr_nid = peer_nid;
538 hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
539 usocklnd_init_hello_msg(hello, ni, type, peer_nid);
541 tx->tx_iova[0].iov_base = (void *)cr;
542 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
543 sizeof(lnet_acceptor_connreq_t) +
544 offsetof(ksock_hello_msg_t, kshm_ips);
545 tx->tx_iov = tx->tx_iova;
552 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
554 lnet_msg_t *lnetmsg = tx->tx_lnetmsg;
555 int rc = (tx->tx_resid == 0) ? 0 : -EIO;
557 LASSERT (ni != NULL || lnetmsg == NULL);
559 LIBCFS_FREE (tx, tx->tx_size);
561 if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
562 lnet_finalize(ni, lnetmsg, rc);
566 usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
570 while (!list_empty(txlist)) {
571 tx = list_entry(txlist->next, usock_tx_t, tx_list);
572 list_del(&tx->tx_list);
574 usocklnd_destroy_tx(ni, tx);
579 usocklnd_destroy_zcack_list(struct list_head *zcack_list)
581 usock_zc_ack_t *zcack;
583 while (!list_empty(zcack_list)) {
584 zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
585 list_del(&zcack->zc_list);
587 LIBCFS_FREE (zcack, sizeof(*zcack));
592 usocklnd_destroy_peer(usock_peer_t *peer)
594 usock_net_t *net = peer->up_ni->ni_data;
597 for (i = 0; i < N_CONN_TYPES; i++)
598 LASSERT (peer->up_conns[i] == NULL);
600 LIBCFS_FREE (peer, sizeof (*peer));
602 pthread_mutex_lock(&net->un_lock);
603 if(--net->un_peercount == 0)
604 pthread_cond_signal(&net->un_cond);
605 pthread_mutex_unlock(&net->un_lock);
609 usocklnd_destroy_conn(usock_conn_t *conn)
611 LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
613 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
614 LASSERT (conn->uc_peer != NULL);
615 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
618 if (!list_empty(&conn->uc_tx_list)) {
619 LASSERT (conn->uc_peer != NULL);
620 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
623 usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
625 if (conn->uc_peer != NULL)
626 usocklnd_peer_decref(conn->uc_peer);
628 if (conn->uc_ni != NULL)
629 lnet_ni_decref(conn->uc_ni);
631 if (conn->uc_tx_hello)
632 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
634 usocklnd_conn_free(conn);
638 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
642 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
643 return SOCKLND_CONN_ANY;
645 nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
648 if (nob >= usock_tuns.ut_min_bulk)
649 return SOCKLND_CONN_BULK_OUT;
651 return SOCKLND_CONN_CONTROL;
654 int usocklnd_type2idx(int type)
657 case SOCKLND_CONN_ANY:
658 case SOCKLND_CONN_CONTROL:
660 case SOCKLND_CONN_BULK_IN:
662 case SOCKLND_CONN_BULK_OUT:
670 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
672 struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
673 struct list_head *tmp;
676 list_for_each (tmp, peer_list) {
678 peer = list_entry (tmp, usock_peer_t, up_list);
680 if (peer->up_ni != ni)
683 if (peer->up_peerid.nid != id.nid ||
684 peer->up_peerid.pid != id.pid)
687 usocklnd_peer_addref(peer);
694 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
695 usock_peer_t **peerp)
697 usock_net_t *net = ni->ni_data;
701 LIBCFS_ALLOC (peer, sizeof (*peer));
705 for (i = 0; i < N_CONN_TYPES; i++)
706 peer->up_conns[i] = NULL;
708 peer->up_peerid = id;
710 peer->up_incrn_is_set = 0;
711 peer->up_errored = 0;
712 peer->up_last_alive = 0;
713 cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
714 pthread_mutex_init(&peer->up_lock, NULL);
716 pthread_mutex_lock(&net->un_lock);
718 pthread_mutex_unlock(&net->un_lock);
724 /* Safely create new peer if needed. Save result in *peerp.
725 * Returns 0 on success, <0 else */
727 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
728 usock_peer_t **peerp)
733 usock_net_t *net = ni->ni_data;
735 pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
736 peer = usocklnd_find_peer_locked(ni, id);
737 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
740 goto find_or_create_peer_done;
742 rc = usocklnd_create_peer(ni, id, &peer);
746 pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
747 peer2 = usocklnd_find_peer_locked(ni, id);
749 if (net->un_shutdown) {
750 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
751 usocklnd_peer_decref(peer); /* should destroy peer */
752 CERROR("Can't create peer: network shutdown\n");
756 /* peer table will take 1 of my refs on peer */
757 usocklnd_peer_addref(peer);
758 list_add_tail (&peer->up_list,
759 usocklnd_nid2peerlist(id.nid));
761 usocklnd_peer_decref(peer); /* should destroy peer */
764 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
766 find_or_create_peer_done:
771 /* NB: both peer and conn locks are held */
773 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
775 if (conn->uc_state == UC_READY &&
776 list_empty(&conn->uc_tx_list) &&
777 list_empty(&conn->uc_zcack_list) &&
779 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
785 list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
789 /* NB: both peer and conn locks are held
790 * NB: if sending isn't in progress. the caller *MUST* send tx
791 * immediately after we'll return */
793 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
794 int *send_immediately)
796 if (conn->uc_state == UC_READY &&
797 list_empty(&conn->uc_tx_list) &&
798 list_empty(&conn->uc_zcack_list) &&
800 conn->uc_sending = 1;
801 *send_immediately = 1;
805 *send_immediately = 0;
806 list_add_tail(&tx->tx_list, &conn->uc_tx_list);
809 /* Safely create new conn if needed. Save result in *connp.
810 * Returns 0 on success, <0 else */
812 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
813 usock_conn_t **connp,
814 usock_tx_t *tx, usock_zc_ack_t *zc_ack,
815 int *send_immediately)
820 lnet_pid_t userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
823 type = SOCKLND_CONN_ANY;
825 idx = usocklnd_type2idx(type);
827 pthread_mutex_lock(&peer->up_lock);
828 if (peer->up_conns[idx] != NULL) {
829 conn = peer->up_conns[idx];
830 LASSERT(conn->uc_type == type);
833 CERROR("Refusing to create a connection to "
834 "userspace process %s\n",
835 libcfs_id2str(peer->up_peerid));
837 goto find_or_create_conn_failed;
840 rc = usocklnd_create_active_conn(peer, type, &conn);
842 peer->up_errored = 1;
843 usocklnd_del_conns_locked(peer);
844 goto find_or_create_conn_failed;
847 /* peer takes 1 of conn refcount */
848 usocklnd_link_conn_to_peer(conn, peer, idx);
850 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
852 peer->up_conns[idx] = NULL;
853 usocklnd_conn_decref(conn); /* should destroy conn */
854 goto find_or_create_conn_failed;
856 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
859 pthread_mutex_lock(&conn->uc_lock);
860 LASSERT(conn->uc_peer == peer);
862 LASSERT(tx == NULL || zc_ack == NULL);
864 usocklnd_enqueue_tx(conn, tx, send_immediately);
866 rc = usocklnd_enqueue_zcack(conn, zc_ack);
868 usocklnd_conn_kill_locked(conn);
869 pthread_mutex_unlock(&conn->uc_lock);
870 goto find_or_create_conn_failed;
873 pthread_mutex_unlock(&conn->uc_lock);
875 usocklnd_conn_addref(conn);
876 pthread_mutex_unlock(&peer->up_lock);
881 find_or_create_conn_failed:
882 pthread_mutex_unlock(&peer->up_lock);
887 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
889 peer->up_conns[idx] = conn;
890 peer->up_errored = 0; /* this new fresh conn will try
891 * revitalize even stale errored peer */
895 usocklnd_invert_type(int type)
899 case SOCKLND_CONN_ANY:
900 case SOCKLND_CONN_CONTROL:
902 case SOCKLND_CONN_BULK_IN:
903 return SOCKLND_CONN_BULK_OUT;
904 case SOCKLND_CONN_BULK_OUT:
905 return SOCKLND_CONN_BULK_IN;
907 return SOCKLND_CONN_NONE;
912 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
914 pthread_mutex_lock(&conn->uc_lock);
915 if (conn->uc_state != UC_DEAD)
916 conn->uc_state = new_state;
917 pthread_mutex_unlock(&conn->uc_lock);
920 /* NB: peer is locked by caller */
922 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
923 usock_conn_t *skip_conn)
927 if (!peer->up_incrn_is_set) {
928 peer->up_incarnation = incrn;
929 peer->up_incrn_is_set = 1;
933 if (peer->up_incarnation == incrn)
936 peer->up_incarnation = incrn;
938 for (i = 0; i < N_CONN_TYPES; i++) {
939 usock_conn_t *conn = peer->up_conns[i];
941 if (conn == NULL || conn == skip_conn)
944 pthread_mutex_lock(&conn->uc_lock);
945 LASSERT (conn->uc_peer == peer);
946 conn->uc_peer = NULL;
947 peer->up_conns[i] = NULL;
948 if (conn->uc_state != UC_DEAD)
949 usocklnd_conn_kill_locked(conn);
950 pthread_mutex_unlock(&conn->uc_lock);
952 usocklnd_conn_decref(conn);
953 usocklnd_peer_decref(peer);
957 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
958 * MAGIC part of hello and set uc_rx_state
961 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
963 LASSERT(conn->uc_rx_hello != NULL);
965 conn->uc_rx_niov = 1;
966 conn->uc_rx_iov = conn->uc_rx_iova;
967 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
968 conn->uc_rx_iov[0].iov_len =
969 conn->uc_rx_nob_wanted =
970 conn->uc_rx_nob_left =
971 sizeof(conn->uc_rx_hello->kshm_magic);
973 conn->uc_rx_state = UC_RX_HELLO_MAGIC;
975 conn->uc_rx_flag = 1; /* waiting for incoming hello */
976 conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
979 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
980 * VERSION part of hello and set uc_rx_state
983 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
985 LASSERT(conn->uc_rx_hello != NULL);
987 conn->uc_rx_niov = 1;
988 conn->uc_rx_iov = conn->uc_rx_iova;
989 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
990 conn->uc_rx_iov[0].iov_len =
991 conn->uc_rx_nob_wanted =
992 conn->uc_rx_nob_left =
993 sizeof(conn->uc_rx_hello->kshm_version);
995 conn->uc_rx_state = UC_RX_HELLO_VERSION;
998 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
999 * the rest of hello and set uc_rx_state
1002 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
1004 LASSERT(conn->uc_rx_hello != NULL);
1006 conn->uc_rx_niov = 1;
1007 conn->uc_rx_iov = conn->uc_rx_iova;
1008 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
1009 conn->uc_rx_iov[0].iov_len =
1010 conn->uc_rx_nob_wanted =
1011 conn->uc_rx_nob_left =
1012 offsetof(ksock_hello_msg_t, kshm_ips) -
1013 offsetof(ksock_hello_msg_t, kshm_src_nid);
1015 conn->uc_rx_state = UC_RX_HELLO_BODY;
1018 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1019 * array of IPs and set uc_rx_state
1022 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1024 LASSERT(conn->uc_rx_hello != NULL);
1026 conn->uc_rx_niov = 1;
1027 conn->uc_rx_iov = conn->uc_rx_iova;
1028 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1029 conn->uc_rx_iov[0].iov_len =
1030 conn->uc_rx_nob_wanted =
1031 conn->uc_rx_nob_left =
1032 conn->uc_rx_hello->kshm_nips *
1033 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1035 conn->uc_rx_state = UC_RX_HELLO_IPS;
1038 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1039 * LNET header and set uc_rx_state
1042 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1044 conn->uc_rx_niov = 1;
1045 conn->uc_rx_iov = conn->uc_rx_iova;
1046 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
1047 conn->uc_rx_iov[0].iov_len =
1048 conn->uc_rx_nob_wanted =
1049 conn->uc_rx_nob_left =
1050 sizeof(ksock_lnet_msg_t);
1052 conn->uc_rx_state = UC_RX_LNET_HEADER;
1053 conn->uc_rx_flag = 1;
1056 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1057 * KSM header and set uc_rx_state
1060 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1062 conn->uc_rx_niov = 1;
1063 conn->uc_rx_iov = conn->uc_rx_iova;
1064 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
1065 conn->uc_rx_iov[0].iov_len =
1066 conn->uc_rx_nob_wanted =
1067 conn->uc_rx_nob_left =
1068 offsetof(ksock_msg_t, ksm_u);
1070 conn->uc_rx_state = UC_RX_KSM_HEADER;
1071 conn->uc_rx_flag = 0;
1074 /* RX state transition to UC_RX_SKIPPING: update RX part for
1075 * skipping and set uc_rx_state
1078 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1080 static char skip_buffer[4096];
1083 unsigned int niov = 0;
1085 int nob_to_skip = conn->uc_rx_nob_left;
1087 LASSERT(nob_to_skip != 0);
1089 conn->uc_rx_iov = conn->uc_rx_iova;
1091 /* Set up to skip as much as possible now. If there's more left
1092 * (ran out of iov entries) we'll get called again */
1095 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1097 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1098 conn->uc_rx_iov[niov].iov_len = nob;
1103 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1104 niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1106 conn->uc_rx_niov = niov;
1107 conn->uc_rx_nob_wanted = skipped;
1109 conn->uc_rx_state = UC_RX_SKIPPING;