1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Maxim Patlasov <maxim@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
14 /* Return 1 if the conn is timed out, 0 else */
16 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
18 if (conn->uc_tx_flag && /* sending is in progress */
19 cfs_time_aftereq(current_time, conn->uc_tx_deadline))
22 if (conn->uc_rx_flag && /* receiving is in progress */
23 cfs_time_aftereq(current_time, conn->uc_rx_deadline))
30 usocklnd_conn_kill(usock_conn_t *conn)
32 pthread_mutex_lock(&conn->uc_lock);
33 if (conn->uc_state != UC_DEAD)
34 usocklnd_conn_kill_locked(conn);
35 pthread_mutex_unlock(&conn->uc_lock);
38 /* Mark the conn as DEAD and schedule its deletion */
40 usocklnd_conn_kill_locked(usock_conn_t *conn)
42 conn->uc_rx_flag = conn->uc_tx_flag = 0;
43 conn->uc_state = UC_DEAD;
44 usocklnd_add_killrequest(conn);
48 usocklnd_conn_allocate()
51 usock_pollrequest_t *pr;
53 LIBCFS_ALLOC (pr, sizeof(*pr));
57 LIBCFS_ALLOC (conn, sizeof(*conn));
59 LIBCFS_FREE (pr, sizeof(*pr));
62 memset(conn, 0, sizeof(*conn));
65 LIBCFS_ALLOC (conn->uc_rx_hello,
66 offsetof(ksock_hello_msg_t,
67 kshm_ips[LNET_MAX_INTERFACES]));
68 if (conn->uc_rx_hello == NULL) {
69 LIBCFS_FREE (pr, sizeof(*pr));
70 LIBCFS_FREE (conn, sizeof(*conn));
78 usocklnd_conn_free(usock_conn_t *conn)
80 usock_pollrequest_t *pr = conn->uc_preq;
83 LIBCFS_FREE (pr, sizeof(*pr));
85 if (conn->uc_rx_hello != NULL)
86 LIBCFS_FREE (conn->uc_rx_hello,
87 offsetof(ksock_hello_msg_t,
88 kshm_ips[LNET_MAX_INTERFACES]));
90 LIBCFS_FREE (conn, sizeof(*conn));
94 usocklnd_tear_peer_conn(usock_conn_t *conn)
96 usock_peer_t *peer = conn->uc_peer;
97 int idx = usocklnd_type2idx(conn->uc_type);
101 int killall_flag = 0;
103 if (peer == NULL) /* nothing to tear */
106 pthread_mutex_lock(&peer->up_lock);
107 pthread_mutex_lock(&conn->uc_lock);
110 id = peer->up_peerid;
112 if (peer->up_conns[idx] == conn) {
113 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
114 /* change state not to finalize twice */
115 conn->uc_rx_state = UC_RX_KSM_HEADER;
116 lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
119 usocklnd_destroy_txlist(peer->up_ni,
122 peer->up_conns[idx] = NULL;
123 conn->uc_peer = NULL;
126 if(conn->uc_errored && !peer->up_errored)
127 peer->up_errored = killall_flag = 1;
130 pthread_mutex_unlock(&conn->uc_lock);
133 usocklnd_del_conns_locked(peer);
135 pthread_mutex_unlock(&peer->up_lock);
140 usocklnd_conn_decref(conn);
141 usocklnd_peer_decref(peer);
143 usocklnd_check_peer_stale(ni, id);
146 /* Remove peer from hash list if all up_conns[i] is NULL &&
147 * hash table is the only consumer of the peer */
149 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
153 pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
154 peer = usocklnd_find_peer_locked(ni, id);
157 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
161 if (cfs_atomic_read(&peer->up_refcount) == 2) {
163 for (i = 0; i < N_CONN_TYPES; i++)
164 LASSERT (peer->up_conns[i] == NULL);
166 list_del(&peer->up_list);
168 if (peer->up_errored &&
169 (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
170 lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
171 cfs_time_seconds(peer->up_last_alive));
173 usocklnd_peer_decref(peer);
176 usocklnd_peer_decref(peer);
177 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
180 /* Returns 0 on success, <0 else */
182 usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
189 rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
193 rc = usocklnd_set_sock_options(fd);
197 conn = usocklnd_conn_allocate();
201 usocklnd_rx_hellomagic_state_transition(conn);
204 conn->uc_peer_ip = peer_ip;
205 conn->uc_peer_port = peer_port;
206 conn->uc_state = UC_RECEIVING_HELLO;
207 conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
209 CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
210 CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
211 pthread_mutex_init(&conn->uc_lock, NULL);
212 cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
218 /* Returns 0 on success, <0 else */
220 usocklnd_create_active_conn(usock_peer_t *peer, int type,
221 usock_conn_t **connp)
226 __u32 dst_ip = LNET_NIDADDR(peer->up_peerid.nid);
227 __u16 dst_port = lnet_acceptor_port();
229 conn = usocklnd_conn_allocate();
233 conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
234 peer->up_peerid.nid);
235 if (conn->uc_tx_hello == NULL) {
236 usocklnd_conn_free(conn);
240 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
241 rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
243 rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
246 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
247 usocklnd_conn_free(conn);
251 conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
252 conn->uc_tx_flag = 1;
255 conn->uc_peer_ip = dst_ip;
256 conn->uc_peer_port = dst_port;
257 conn->uc_type = type;
258 conn->uc_activeflag = 1;
259 conn->uc_state = UC_CONNECTING;
260 conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
262 conn->uc_peerid = peer->up_peerid;
263 conn->uc_peer = peer;
264 usocklnd_peer_addref(peer);
265 CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
266 CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
267 pthread_mutex_init(&conn->uc_lock, NULL);
268 cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
274 /* Returns 0 on success, <0 else */
276 usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
282 for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
283 port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
285 /* Iterate through reserved ports. */
287 rc = libcfs_sock_create(&fd);
291 rc = libcfs_sock_bind_to_port(fd, port);
297 rc = usocklnd_set_sock_options(fd);
303 rc = libcfs_sock_connect(fd, dst_ip, dst_port);
309 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
317 CERROR("Can't bind to any reserved port\n");
321 /* Returns 0 on success, <0 else */
323 usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
328 rc = libcfs_sock_create(&fd);
332 rc = usocklnd_set_sock_options(fd);
338 rc = libcfs_sock_connect(fd, dst_ip, dst_port);
349 usocklnd_set_sock_options(int fd)
353 rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
357 if (usock_tuns.ut_sockbufsiz) {
358 rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
363 return libcfs_fcntl_nonblock(fd);
367 usocklnd_init_msg(ksock_msg_t *msg, int type)
369 msg->ksm_type = type;
371 msg->ksm_zc_req_cookie = 0;
372 msg->ksm_zc_ack_cookie = 0;
376 usocklnd_create_noop_tx(__u64 cookie)
380 LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
384 tx->tx_size = sizeof(usock_tx_t);
385 tx->tx_lnetmsg = NULL;
387 usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
388 tx->tx_msg.ksm_zc_ack_cookie = cookie;
390 tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
391 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
392 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
393 tx->tx_iov = tx->tx_iova;
400 usocklnd_create_tx(lnet_msg_t *lntmsg)
403 unsigned int payload_niov = lntmsg->msg_niov;
404 struct iovec *payload_iov = lntmsg->msg_iov;
405 unsigned int payload_offset = lntmsg->msg_offset;
406 unsigned int payload_nob = lntmsg->msg_len;
407 int size = offsetof(usock_tx_t,
408 tx_iova[1 + payload_niov]);
410 LIBCFS_ALLOC (tx, size);
415 tx->tx_lnetmsg = lntmsg;
417 tx->tx_resid = tx->tx_nob =
418 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
421 usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
422 tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
423 tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
424 tx->tx_iova[0].iov_len = offsetof(ksock_msg_t,
425 ksm_u.lnetmsg.ksnm_payload);
426 tx->tx_iov = tx->tx_iova;
429 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
430 payload_niov, payload_iov,
431 payload_offset, payload_nob);
437 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
438 lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
440 usock_net_t *net = (usock_net_t *)ni->ni_data;
442 hello->kshm_magic = LNET_PROTO_MAGIC;
443 hello->kshm_version = KSOCK_PROTO_V2;
444 hello->kshm_nips = 0;
445 hello->kshm_ctype = type;
447 hello->kshm_dst_incarnation = 0; /* not used */
448 hello->kshm_src_incarnation = net->un_incarnation;
450 hello->kshm_src_pid = the_lnet.ln_pid;
451 hello->kshm_src_nid = ni->ni_nid;
452 hello->kshm_dst_nid = peer_nid;
453 hello->kshm_dst_pid = 0; /* not used */
457 usocklnd_create_hello_tx(lnet_ni_t *ni,
458 int type, lnet_nid_t peer_nid)
462 ksock_hello_msg_t *hello;
464 size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
465 LIBCFS_ALLOC (tx, size);
470 tx->tx_lnetmsg = NULL;
472 hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
473 usocklnd_init_hello_msg(hello, ni, type, peer_nid);
475 tx->tx_iova[0].iov_base = (void *)hello;
476 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
477 offsetof(ksock_hello_msg_t, kshm_ips);
478 tx->tx_iov = tx->tx_iova;
485 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
486 int type, lnet_nid_t peer_nid)
490 lnet_acceptor_connreq_t *cr;
491 ksock_hello_msg_t *hello;
493 size = sizeof(usock_tx_t) +
494 sizeof(lnet_acceptor_connreq_t) +
495 offsetof(ksock_hello_msg_t, kshm_ips);
496 LIBCFS_ALLOC (tx, size);
501 tx->tx_lnetmsg = NULL;
503 cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
504 memset(cr, 0, sizeof(*cr));
505 cr->acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
506 cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
507 cr->acr_nid = peer_nid;
509 hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
510 usocklnd_init_hello_msg(hello, ni, type, peer_nid);
512 tx->tx_iova[0].iov_base = (void *)cr;
513 tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
514 sizeof(lnet_acceptor_connreq_t) +
515 offsetof(ksock_hello_msg_t, kshm_ips);
516 tx->tx_iov = tx->tx_iova;
523 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
525 lnet_msg_t *lnetmsg = tx->tx_lnetmsg;
526 int rc = (tx->tx_resid == 0) ? 0 : -EIO;
528 LASSERT (ni != NULL || lnetmsg == NULL);
530 LIBCFS_FREE (tx, tx->tx_size);
532 if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
533 lnet_finalize(ni, lnetmsg, rc);
537 usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
541 while (!list_empty(txlist)) {
542 tx = list_entry(txlist->next, usock_tx_t, tx_list);
543 list_del(&tx->tx_list);
545 usocklnd_destroy_tx(ni, tx);
550 usocklnd_destroy_zcack_list(struct list_head *zcack_list)
552 usock_zc_ack_t *zcack;
554 while (!list_empty(zcack_list)) {
555 zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
556 list_del(&zcack->zc_list);
558 LIBCFS_FREE (zcack, sizeof(*zcack));
563 usocklnd_destroy_peer(usock_peer_t *peer)
565 usock_net_t *net = peer->up_ni->ni_data;
568 for (i = 0; i < N_CONN_TYPES; i++)
569 LASSERT (peer->up_conns[i] == NULL);
571 LIBCFS_FREE (peer, sizeof (*peer));
573 pthread_mutex_lock(&net->un_lock);
574 if(--net->un_peercount == 0)
575 pthread_cond_signal(&net->un_cond);
576 pthread_mutex_unlock(&net->un_lock);
580 usocklnd_destroy_conn(usock_conn_t *conn)
582 LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
584 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
585 LASSERT (conn->uc_peer != NULL);
586 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
589 if (!list_empty(&conn->uc_tx_list)) {
590 LASSERT (conn->uc_peer != NULL);
591 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
594 usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
596 if (conn->uc_peer != NULL)
597 usocklnd_peer_decref(conn->uc_peer);
599 if (conn->uc_ni != NULL)
600 lnet_ni_decref(conn->uc_ni);
602 if (conn->uc_tx_hello)
603 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
605 usocklnd_conn_free(conn);
609 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
613 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
614 return SOCKLND_CONN_ANY;
616 nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
619 if (nob >= usock_tuns.ut_min_bulk)
620 return SOCKLND_CONN_BULK_OUT;
622 return SOCKLND_CONN_CONTROL;
625 int usocklnd_type2idx(int type)
628 case SOCKLND_CONN_ANY:
629 case SOCKLND_CONN_CONTROL:
631 case SOCKLND_CONN_BULK_IN:
633 case SOCKLND_CONN_BULK_OUT:
641 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
643 struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
644 struct list_head *tmp;
647 list_for_each (tmp, peer_list) {
649 peer = list_entry (tmp, usock_peer_t, up_list);
651 if (peer->up_ni != ni)
654 if (peer->up_peerid.nid != id.nid ||
655 peer->up_peerid.pid != id.pid)
658 usocklnd_peer_addref(peer);
665 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
666 usock_peer_t **peerp)
668 usock_net_t *net = ni->ni_data;
672 LIBCFS_ALLOC (peer, sizeof (*peer));
676 for (i = 0; i < N_CONN_TYPES; i++)
677 peer->up_conns[i] = NULL;
679 peer->up_peerid = id;
681 peer->up_incrn_is_set = 0;
682 peer->up_errored = 0;
683 peer->up_last_alive = 0;
684 cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
685 pthread_mutex_init(&peer->up_lock, NULL);
687 pthread_mutex_lock(&net->un_lock);
689 pthread_mutex_unlock(&net->un_lock);
695 /* Safely create new peer if needed. Save result in *peerp.
696 * Returns 0 on success, <0 else */
698 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
699 usock_peer_t **peerp)
704 usock_net_t *net = ni->ni_data;
706 pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
707 peer = usocklnd_find_peer_locked(ni, id);
708 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
711 goto find_or_create_peer_done;
713 rc = usocklnd_create_peer(ni, id, &peer);
717 pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
718 peer2 = usocklnd_find_peer_locked(ni, id);
720 if (net->un_shutdown) {
721 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
722 usocklnd_peer_decref(peer); /* should destroy peer */
723 CERROR("Can't create peer: network shutdown\n");
727 /* peer table will take 1 of my refs on peer */
728 usocklnd_peer_addref(peer);
729 list_add_tail (&peer->up_list,
730 usocklnd_nid2peerlist(id.nid));
732 usocklnd_peer_decref(peer); /* should destroy peer */
735 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
737 find_or_create_peer_done:
742 /* NB: both peer and conn locks are held */
744 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
746 if (conn->uc_state == UC_READY &&
747 list_empty(&conn->uc_tx_list) &&
748 list_empty(&conn->uc_zcack_list) &&
750 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
756 list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
760 /* NB: both peer and conn locks are held
761 * NB: if sending isn't in progress. the caller *MUST* send tx
762 * immediately after we'll return */
764 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
765 int *send_immediately)
767 if (conn->uc_state == UC_READY &&
768 list_empty(&conn->uc_tx_list) &&
769 list_empty(&conn->uc_zcack_list) &&
771 conn->uc_sending = 1;
772 *send_immediately = 1;
776 *send_immediately = 0;
777 list_add_tail(&tx->tx_list, &conn->uc_tx_list);
780 /* Safely create new conn if needed. Save result in *connp.
781 * Returns 0 on success, <0 else */
783 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
784 usock_conn_t **connp,
785 usock_tx_t *tx, usock_zc_ack_t *zc_ack,
786 int *send_immediately)
791 lnet_pid_t userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
794 type = SOCKLND_CONN_ANY;
796 idx = usocklnd_type2idx(type);
798 pthread_mutex_lock(&peer->up_lock);
799 if (peer->up_conns[idx] != NULL) {
800 conn = peer->up_conns[idx];
801 LASSERT(conn->uc_type == type);
804 CERROR("Refusing to create a connection to "
805 "userspace process %s\n",
806 libcfs_id2str(peer->up_peerid));
808 goto find_or_create_conn_failed;
811 rc = usocklnd_create_active_conn(peer, type, &conn);
813 peer->up_errored = 1;
814 usocklnd_del_conns_locked(peer);
815 goto find_or_create_conn_failed;
818 /* peer takes 1 of conn refcount */
819 usocklnd_link_conn_to_peer(conn, peer, idx);
821 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
823 peer->up_conns[idx] = NULL;
824 usocklnd_conn_decref(conn); /* should destroy conn */
825 goto find_or_create_conn_failed;
827 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
830 pthread_mutex_lock(&conn->uc_lock);
831 LASSERT(conn->uc_peer == peer);
833 LASSERT(tx == NULL || zc_ack == NULL);
835 usocklnd_enqueue_tx(conn, tx, send_immediately);
837 rc = usocklnd_enqueue_zcack(conn, zc_ack);
839 usocklnd_conn_kill_locked(conn);
840 pthread_mutex_unlock(&conn->uc_lock);
841 goto find_or_create_conn_failed;
844 pthread_mutex_unlock(&conn->uc_lock);
846 usocklnd_conn_addref(conn);
847 pthread_mutex_unlock(&peer->up_lock);
852 find_or_create_conn_failed:
853 pthread_mutex_unlock(&peer->up_lock);
858 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
860 peer->up_conns[idx] = conn;
861 peer->up_errored = 0; /* this new fresh conn will try
862 * revitalize even stale errored peer */
866 usocklnd_invert_type(int type)
870 case SOCKLND_CONN_ANY:
871 case SOCKLND_CONN_CONTROL:
873 case SOCKLND_CONN_BULK_IN:
874 return SOCKLND_CONN_BULK_OUT;
875 case SOCKLND_CONN_BULK_OUT:
876 return SOCKLND_CONN_BULK_IN;
878 return SOCKLND_CONN_NONE;
883 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
885 pthread_mutex_lock(&conn->uc_lock);
886 if (conn->uc_state != UC_DEAD)
887 conn->uc_state = new_state;
888 pthread_mutex_unlock(&conn->uc_lock);
891 /* NB: peer is locked by caller */
893 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
894 usock_conn_t *skip_conn)
898 if (!peer->up_incrn_is_set) {
899 peer->up_incarnation = incrn;
900 peer->up_incrn_is_set = 1;
904 if (peer->up_incarnation == incrn)
907 peer->up_incarnation = incrn;
909 for (i = 0; i < N_CONN_TYPES; i++) {
910 usock_conn_t *conn = peer->up_conns[i];
912 if (conn == NULL || conn == skip_conn)
915 pthread_mutex_lock(&conn->uc_lock);
916 LASSERT (conn->uc_peer == peer);
917 conn->uc_peer = NULL;
918 peer->up_conns[i] = NULL;
919 if (conn->uc_state != UC_DEAD)
920 usocklnd_conn_kill_locked(conn);
921 pthread_mutex_unlock(&conn->uc_lock);
923 usocklnd_conn_decref(conn);
924 usocklnd_peer_decref(peer);
928 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
929 * MAGIC part of hello and set uc_rx_state
932 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
934 LASSERT(conn->uc_rx_hello != NULL);
936 conn->uc_rx_niov = 1;
937 conn->uc_rx_iov = conn->uc_rx_iova;
938 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
939 conn->uc_rx_iov[0].iov_len =
940 conn->uc_rx_nob_wanted =
941 conn->uc_rx_nob_left =
942 sizeof(conn->uc_rx_hello->kshm_magic);
944 conn->uc_rx_state = UC_RX_HELLO_MAGIC;
946 conn->uc_rx_flag = 1; /* waiting for incoming hello */
947 conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
950 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
951 * VERSION part of hello and set uc_rx_state
954 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
956 LASSERT(conn->uc_rx_hello != NULL);
958 conn->uc_rx_niov = 1;
959 conn->uc_rx_iov = conn->uc_rx_iova;
960 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
961 conn->uc_rx_iov[0].iov_len =
962 conn->uc_rx_nob_wanted =
963 conn->uc_rx_nob_left =
964 sizeof(conn->uc_rx_hello->kshm_version);
966 conn->uc_rx_state = UC_RX_HELLO_VERSION;
969 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
970 * the rest of hello and set uc_rx_state
973 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
975 LASSERT(conn->uc_rx_hello != NULL);
977 conn->uc_rx_niov = 1;
978 conn->uc_rx_iov = conn->uc_rx_iova;
979 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
980 conn->uc_rx_iov[0].iov_len =
981 conn->uc_rx_nob_wanted =
982 conn->uc_rx_nob_left =
983 offsetof(ksock_hello_msg_t, kshm_ips) -
984 offsetof(ksock_hello_msg_t, kshm_src_nid);
986 conn->uc_rx_state = UC_RX_HELLO_BODY;
989 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
990 * array of IPs and set uc_rx_state
993 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
995 LASSERT(conn->uc_rx_hello != NULL);
997 conn->uc_rx_niov = 1;
998 conn->uc_rx_iov = conn->uc_rx_iova;
999 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1000 conn->uc_rx_iov[0].iov_len =
1001 conn->uc_rx_nob_wanted =
1002 conn->uc_rx_nob_left =
1003 conn->uc_rx_hello->kshm_nips *
1004 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1006 conn->uc_rx_state = UC_RX_HELLO_IPS;
1009 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1010 * LNET header and set uc_rx_state
1013 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1015 conn->uc_rx_niov = 1;
1016 conn->uc_rx_iov = conn->uc_rx_iova;
1017 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
1018 conn->uc_rx_iov[0].iov_len =
1019 conn->uc_rx_nob_wanted =
1020 conn->uc_rx_nob_left =
1021 sizeof(ksock_lnet_msg_t);
1023 conn->uc_rx_state = UC_RX_LNET_HEADER;
1024 conn->uc_rx_flag = 1;
1027 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1028 * KSM header and set uc_rx_state
1031 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1033 conn->uc_rx_niov = 1;
1034 conn->uc_rx_iov = conn->uc_rx_iova;
1035 conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
1036 conn->uc_rx_iov[0].iov_len =
1037 conn->uc_rx_nob_wanted =
1038 conn->uc_rx_nob_left =
1039 offsetof(ksock_msg_t, ksm_u);
1041 conn->uc_rx_state = UC_RX_KSM_HEADER;
1042 conn->uc_rx_flag = 0;
1045 /* RX state transition to UC_RX_SKIPPING: update RX part for
1046 * skipping and set uc_rx_state
1049 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1051 static char skip_buffer[4096];
1054 unsigned int niov = 0;
1056 int nob_to_skip = conn->uc_rx_nob_left;
1058 LASSERT(nob_to_skip != 0);
1060 conn->uc_rx_iov = conn->uc_rx_iova;
1062 /* Set up to skip as much as possible now. If there's more left
1063 * (ran out of iov entries) we'll get called again */
1066 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1068 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1069 conn->uc_rx_iov[niov].iov_len = nob;
1074 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1075 niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1077 conn->uc_rx_niov = niov;
1078 conn->uc_rx_nob_wanted = skipped;
1080 conn->uc_rx_state = UC_RX_SKIPPING;