1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/ulnds/socklnd/handlers.c
38 * Author: Maxim Patlasov <maxim@clusterfs.com>
43 #include <sys/syscall.h>
46 usocklnd_notifier_handler(int fd)
49 return syscall(SYS_read, fd, ¬ification, sizeof(notification));
53 usocklnd_exception_handler(usock_conn_t *conn)
55 pthread_mutex_lock(&conn->uc_lock);
57 if (conn->uc_state == UC_CONNECTING ||
58 conn->uc_state == UC_SENDING_HELLO)
59 usocklnd_conn_kill_locked(conn);
61 pthread_mutex_unlock(&conn->uc_lock);
65 usocklnd_read_handler(usock_conn_t *conn)
73 pthread_mutex_lock(&conn->uc_lock);
74 state = conn->uc_state;
76 /* process special case: LNET calls lnd_recv() asyncronously */
77 if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {
78 /* still don't have usocklnd_recv() called */
79 rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0);
81 conn->uc_rx_state = UC_RX_PARSE_WAIT;
83 usocklnd_conn_kill_locked(conn);
85 pthread_mutex_unlock(&conn->uc_lock);
89 pthread_mutex_unlock(&conn->uc_lock);
90 /* From here and below the conn cannot be changed
91 * asyncronously, except:
92 * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list,
93 * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */
97 case UC_RECEIVING_HELLO:
99 if (conn->uc_rx_nob_wanted != 0) {
100 /* read from conn fd as much wanted data as possible */
101 rc = usocklnd_read_data(conn);
102 if (rc == 0) /* partial read */
104 if (rc < 0) {/* error happened or EOF */
105 usocklnd_conn_kill(conn);
110 /* process incoming data */
111 if (state == UC_READY )
112 rc = usocklnd_read_msg(conn, &continue_reading);
113 else /* state == UC_RECEIVING_HELLO */
114 rc = usocklnd_read_hello(conn, &continue_reading);
117 usocklnd_conn_kill(conn);
121 if (continue_reading)
136 /* Switch on rx_state.
137 * Return 0 on success, 1 if whole packet is read, else return <0
138 * Always set cont_flag: 1 if we're ready to continue reading, else 0
139 * NB: If whole packet is read, cont_flag will be set to zero to take
143 usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
150 /* smth. new emerged in RX part - let's process it */
151 switch (conn->uc_rx_state) {
152 case UC_RX_KSM_HEADER:
154 __swab32s(&conn->uc_rx_msg.ksm_type);
155 __swab32s(&conn->uc_rx_msg.ksm_csum);
156 __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[0]);
157 __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[1]);
160 /* we never send packets for wich zc-acking is required */
161 if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
162 conn->uc_rx_msg.ksm_zc_cookies[1] != 0) {
163 conn->uc_errored = 1;
167 /* zc_req will be processed later, when
168 lnet payload will be received */
170 usocklnd_rx_lnethdr_state_transition(conn);
174 case UC_RX_LNET_HEADER:
175 if (the_lnet.ln_pid & LNET_PID_USERFLAG) {
176 /* replace dest_nid,pid (ksocknal sets its own) */
177 conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid =
178 cpu_to_le64(conn->uc_peer->up_ni->ni_nid);
179 conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =
180 cpu_to_le32(the_lnet.ln_pid);
182 } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {
184 lnet_process_id_t *id = &conn->uc_peer->up_peerid;
185 lnet_hdr_t *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;
187 /* Substitute process ID assigned at connection time */
188 lhdr->src_pid = cpu_to_le32(id->pid);
189 lhdr->src_nid = cpu_to_le64(id->nid);
192 conn->uc_rx_state = UC_RX_PARSE;
193 usocklnd_conn_addref(conn); /* ++ref while parsing */
195 rc = lnet_parse(conn->uc_peer->up_ni,
196 &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,
197 conn->uc_peerid.nid, conn, 0);
200 /* I just received garbage: give up on this conn */
201 conn->uc_errored = 1;
202 usocklnd_conn_decref(conn);
206 /* Race with usocklnd_recv() is possible */
207 pthread_mutex_lock(&conn->uc_lock);
208 LASSERT (conn->uc_rx_state == UC_RX_PARSE ||
209 conn->uc_rx_state == UC_RX_LNET_PAYLOAD);
211 /* check whether usocklnd_recv() got called */
212 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)
214 pthread_mutex_unlock(&conn->uc_lock);
218 LBUG(); /* it's error to be here, because this special
219 * case is handled by caller */
222 case UC_RX_PARSE_WAIT:
223 LBUG(); /* it's error to be here, because the conn
224 * shouldn't wait for POLLIN event in this
228 case UC_RX_LNET_PAYLOAD:
229 /* payload all received */
231 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
233 cookie = conn->uc_rx_msg.ksm_zc_cookies[0];
235 rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
238 /* change state not to finalize twice */
239 conn->uc_rx_state = UC_RX_KSM_HEADER;
246 if (conn->uc_rx_nob_left != 0) {
247 usocklnd_rx_skipping_state_transition(conn);
250 usocklnd_rx_ksmhdr_state_transition(conn);
251 rc = 1; /* whole packet is read */
257 LBUG(); /* unknown state */
263 /* Handle incoming ZC request from sender.
264 * NB: it's called only from read_handler, so we're sure that
265 * the conn cannot become zombie in the middle of processing */
267 usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie)
270 usock_zc_ack_t *zc_ack;
275 LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack));
278 zc_ack->zc_cookie = cookie;
280 /* Let's assume that CONTROL is the best type for zcack,
281 * but userspace clients don't use typed connections */
282 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
283 type = SOCKLND_CONN_ANY;
285 type = SOCKLND_CONN_CONTROL;
287 rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,
290 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
293 usocklnd_conn_decref(conn);
298 /* Switch on rx_state.
299 * Return 0 on success, else return <0
300 * Always set cont_flag: 1 if we're ready to continue reading, else 0
303 usocklnd_read_hello(usock_conn_t *conn, int *cont_flag)
306 ksock_hello_msg_t *hello = conn->uc_rx_hello;
310 /* smth. new emerged in hello - let's process it */
311 switch (conn->uc_rx_state) {
312 case UC_RX_HELLO_MAGIC:
313 if (hello->kshm_magic == LNET_PROTO_MAGIC)
315 else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
320 usocklnd_rx_helloversion_state_transition(conn);
324 case UC_RX_HELLO_VERSION:
325 if ((!conn->uc_flip &&
326 (hello->kshm_version != KSOCK_PROTO_V2)) ||
328 (hello->kshm_version != __swab32(KSOCK_PROTO_V2))))
331 usocklnd_rx_hellobody_state_transition(conn);
335 case UC_RX_HELLO_BODY:
337 ksock_hello_msg_t *hello = conn->uc_rx_hello;
338 __swab32s(&hello->kshm_src_pid);
339 __swab64s(&hello->kshm_src_nid);
340 __swab32s(&hello->kshm_dst_pid);
341 __swab64s(&hello->kshm_dst_nid);
342 __swab64s(&hello->kshm_src_incarnation);
343 __swab64s(&hello->kshm_dst_incarnation);
344 __swab32s(&hello->kshm_ctype);
345 __swab32s(&hello->kshm_nips);
348 if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) {
349 CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n",
350 conn->uc_rx_hello->kshm_nips,
351 HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);
355 if (conn->uc_rx_hello->kshm_nips) {
356 usocklnd_rx_helloIPs_state_transition(conn);
362 case UC_RX_HELLO_IPS:
363 if (conn->uc_activeflag == 1) /* active conn */
364 rc = usocklnd_activeconn_hellorecv(conn);
365 else /* passive conn */
366 rc = usocklnd_passiveconn_hellorecv(conn);
371 LBUG(); /* unknown state */
377 /* All actions that we need after receiving hello on active conn:
378 * 1) Schedule removing if we're zombie
379 * 2) Restart active conn if we lost the race
380 * 3) Else: update RX part to receive KSM header
383 usocklnd_activeconn_hellorecv(usock_conn_t *conn)
386 ksock_hello_msg_t *hello = conn->uc_rx_hello;
387 usock_peer_t *peer = conn->uc_peer;
389 /* Active conn with peer==NULL is zombie.
390 * Don't try to link it to peer because the conn
391 * has already had a chance to proceed at the beginning */
393 LASSERT(list_empty(&conn->uc_tx_list) &&
394 list_empty(&conn->uc_zcack_list));
396 usocklnd_conn_kill(conn);
400 peer->up_last_alive = cfs_time_current();
402 /* peer says that we lost the race */
403 if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
404 /* Start new active conn, relink txs and zc_acks from
405 * the conn to new conn, schedule removing the conn.
406 * Actually, we're expecting that a passive conn will
407 * make us zombie soon and take care of our txs and
410 struct list_head tx_list, zcack_list;
412 int idx = usocklnd_type2idx(conn->uc_type);
414 CFS_INIT_LIST_HEAD (&tx_list);
415 CFS_INIT_LIST_HEAD (&zcack_list);
417 /* Block usocklnd_send() to check peer->up_conns[idx]
418 * and to enqueue more txs */
419 pthread_mutex_lock(&peer->up_lock);
420 pthread_mutex_lock(&conn->uc_lock);
422 /* usocklnd_shutdown() could kill us */
423 if (conn->uc_state == UC_DEAD) {
424 pthread_mutex_unlock(&conn->uc_lock);
425 pthread_mutex_unlock(&peer->up_lock);
429 LASSERT (peer == conn->uc_peer);
430 LASSERT (peer->up_conns[idx] == conn);
432 rc = usocklnd_create_active_conn(peer, conn->uc_type, &conn2);
434 conn->uc_errored = 1;
435 pthread_mutex_unlock(&conn->uc_lock);
436 pthread_mutex_unlock(&peer->up_lock);
440 usocklnd_link_conn_to_peer(conn2, peer, idx);
441 conn2->uc_peer = peer;
443 /* unlink txs and zcack from the conn */
444 list_add(&tx_list, &conn->uc_tx_list);
445 list_del_init(&conn->uc_tx_list);
446 list_add(&zcack_list, &conn->uc_zcack_list);
447 list_del_init(&conn->uc_zcack_list);
449 /* link they to the conn2 */
450 list_add(&conn2->uc_tx_list, &tx_list);
451 list_del_init(&tx_list);
452 list_add(&conn2->uc_zcack_list, &zcack_list);
453 list_del_init(&zcack_list);
455 /* make conn zombie */
456 conn->uc_peer = NULL;
457 usocklnd_peer_decref(peer);
459 /* schedule conn2 for processing */
460 rc = usocklnd_add_pollrequest(conn2, POLL_ADD_REQUEST, POLLOUT);
462 peer->up_conns[idx] = NULL;
463 usocklnd_conn_decref(conn2); /* should destroy conn */
465 usocklnd_conn_kill_locked(conn);
468 pthread_mutex_unlock(&conn->uc_lock);
469 pthread_mutex_unlock(&peer->up_lock);
470 usocklnd_conn_decref(conn);
472 } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */
473 if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype))
476 pthread_mutex_lock(&peer->up_lock);
477 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation,
479 pthread_mutex_unlock(&peer->up_lock);
481 /* safely transit to UC_READY state */
483 pthread_mutex_lock(&conn->uc_lock);
484 if (conn->uc_state != UC_DEAD) {
485 usocklnd_rx_ksmhdr_state_transition(conn);
487 /* POLLIN is already set because we just
488 * received hello, but maybe we've smth. to
490 LASSERT (conn->uc_sending == 0);
491 if ( !list_empty(&conn->uc_tx_list) ||
492 !list_empty(&conn->uc_zcack_list) ) {
494 conn->uc_tx_deadline =
495 cfs_time_shift(usock_tuns.ut_timeout);
496 conn->uc_tx_flag = 1;
497 rc = usocklnd_add_pollrequest(conn,
503 conn->uc_state = UC_READY;
505 pthread_mutex_unlock(&conn->uc_lock);
511 /* All actions that we need after receiving hello on passive conn:
512 * 1) Stash peer's nid, pid, incarnation and conn type
513 * 2) Cope with easy case: conn[idx] is empty - just save conn there
515 * a) if our nid is higher - reply with CONN_NONE and make us zombie
516 * b) if peer's nid is higher - postpone race resolution till
518 * 4) Anyhow, send reply hello
521 usocklnd_passiveconn_hellorecv(usock_conn_t *conn)
523 ksock_hello_msg_t *hello = conn->uc_rx_hello;
528 lnet_ni_t *ni = conn->uc_ni;
529 __u32 peer_ip = conn->uc_peer_ip;
530 __u16 peer_port = conn->uc_peer_port;
532 /* don't know parent peer yet and not zombie */
533 LASSERT (conn->uc_peer == NULL &&
536 /* don't know peer's nid and incarnation yet */
537 if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
538 /* do not trust liblustre clients */
539 conn->uc_peerid.pid = peer_port | LNET_PID_USERFLAG;
540 conn->uc_peerid.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
542 if (hello->kshm_ctype != SOCKLND_CONN_ANY) {
545 CERROR("Refusing to accept connection of type=%d from "
546 "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype,
547 HIPQUAD(peer_ip), peer_port);
551 conn->uc_peerid.pid = hello->kshm_src_pid;
552 conn->uc_peerid.nid = hello->kshm_src_nid;
554 conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype);
556 rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer);
563 peer->up_last_alive = cfs_time_current();
565 idx = usocklnd_type2idx(conn->uc_type);
567 /* safely check whether we're first */
568 pthread_mutex_lock(&peer->up_lock);
570 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL);
572 if (peer->up_conns[idx] == NULL) {
573 peer->up_last_alive = cfs_time_current();
574 conn->uc_peer = peer;
576 usocklnd_link_conn_to_peer(conn, peer, idx);
577 usocklnd_conn_addref(conn);
579 usocklnd_peer_decref(peer);
581 /* Resolve race in favour of higher NID */
582 if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) {
585 type = SOCKLND_CONN_NONE;
588 /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid,
589 * postpone race resolution till READY state
590 * (hopefully that conn[idx] will die because of
591 * incoming hello of CONN_NONE type) */
593 pthread_mutex_unlock(&peer->up_lock);
595 /* allocate and initialize fake tx with hello */
596 conn->uc_tx_hello = usocklnd_create_hello_tx(ni, type,
597 conn->uc_peerid.nid);
598 if (conn->uc_ni == NULL)
601 if (conn->uc_tx_hello == NULL)
605 pthread_mutex_lock(&conn->uc_lock);
606 if (conn->uc_state == UC_DEAD)
607 goto passive_hellorecv_done;
609 conn->uc_state = UC_SENDING_HELLO;
610 conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
611 conn->uc_tx_flag = 1;
612 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT);
614 passive_hellorecv_done:
615 pthread_mutex_unlock(&conn->uc_lock);
620 usocklnd_write_handler(usock_conn_t *conn)
629 pthread_mutex_lock(&conn->uc_lock); /* like membar */
630 state = conn->uc_state;
631 pthread_mutex_unlock(&conn->uc_lock);
635 /* hello_tx has already been initialized
636 * in usocklnd_create_active_conn() */
637 usocklnd_conn_new_state(conn, UC_SENDING_HELLO);
640 case UC_SENDING_HELLO:
641 rc = usocklnd_send_tx(conn, conn->uc_tx_hello);
642 if (rc <= 0) /* error or partial send or connection closed */
645 /* tx with hello was sent successfully */
646 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
647 conn->uc_tx_hello = NULL;
649 if (conn->uc_activeflag == 1) /* active conn */
650 rc = usocklnd_activeconn_hellosent(conn);
651 else /* passive conn */
652 rc = usocklnd_passiveconn_hellosent(conn);
657 pthread_mutex_lock(&conn->uc_lock);
659 peer = conn->uc_peer;
660 LASSERT (peer != NULL);
663 if (list_empty(&conn->uc_tx_list) &&
664 list_empty(&conn->uc_zcack_list)) {
665 LASSERT(usock_tuns.ut_fair_limit > 1);
666 pthread_mutex_unlock(&conn->uc_lock);
670 tx = usocklnd_try_piggyback(&conn->uc_tx_list,
671 &conn->uc_zcack_list);
673 conn->uc_sending = 1;
677 pthread_mutex_unlock(&conn->uc_lock);
682 rc = usocklnd_send_tx(conn, tx);
683 if (rc == 0) { /* partial send or connection closed */
684 pthread_mutex_lock(&conn->uc_lock);
685 list_add(&tx->tx_list, &conn->uc_tx_list);
686 conn->uc_sending = 0;
687 pthread_mutex_unlock(&conn->uc_lock);
690 if (rc < 0) { /* real error */
691 usocklnd_destroy_tx(ni, tx);
695 /* rc == 1: tx was sent completely */
696 usocklnd_destroy_tx(ni, tx);
698 pthread_mutex_lock(&conn->uc_lock);
699 conn->uc_sending = 0;
700 if (conn->uc_state != UC_DEAD &&
701 list_empty(&conn->uc_tx_list) &&
702 list_empty(&conn->uc_zcack_list)) {
703 conn->uc_tx_flag = 0;
704 ret = usocklnd_add_pollrequest(conn,
705 POLL_TX_SET_REQUEST, 0);
709 pthread_mutex_unlock(&conn->uc_lock);
721 usocklnd_conn_kill(conn);
726 /* Return the first tx from tx_list with piggybacked zc_ack
727 * from zcack_list when possible. If tx_list is empty, return
728 * brand new noop tx for zc_ack from zcack_list. Return NULL
729 * if an error happened */
731 usocklnd_try_piggyback(struct list_head *tx_list_p,
732 struct list_head *zcack_list_p)
735 usock_zc_ack_t *zc_ack;
737 /* assign tx and zc_ack */
738 if (list_empty(tx_list_p))
741 tx = list_entry(tx_list_p->next, usock_tx_t, tx_list);
742 list_del(&tx->tx_list);
744 /* already piggybacked or partially send */
745 if (tx->tx_msg.ksm_zc_cookies[1] != 0 ||
746 tx->tx_resid != tx->tx_nob)
750 if (list_empty(zcack_list_p)) {
751 /* nothing to piggyback */
754 zc_ack = list_entry(zcack_list_p->next,
755 usock_zc_ack_t, zc_list);
756 list_del(&zc_ack->zc_list);
760 /* piggyback the zc-ack cookie */
761 tx->tx_msg.ksm_zc_cookies[1] = zc_ack->zc_cookie;
763 /* cannot piggyback, need noop */
764 tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);
766 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
770 /* All actions that we need after sending hello on active conn:
771 * 1) update RX iov to receive hello
772 * 2) state transition to UC_RECEIVING_HELLO
773 * 3) notify poll_thread that we're waiting for incoming hello */
775 usocklnd_activeconn_hellosent(usock_conn_t *conn)
779 pthread_mutex_lock(&conn->uc_lock);
781 if (conn->uc_state != UC_DEAD) {
782 usocklnd_rx_hellomagic_state_transition(conn);
783 conn->uc_state = UC_RECEIVING_HELLO;
784 conn->uc_tx_flag = 0;
785 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN);
788 pthread_mutex_unlock(&conn->uc_lock);
793 /* All actions that we need after sending hello on passive conn:
794 * 1) Cope with 1st easy case: conn is already linked to a peer
795 * 2) Cope with 2nd easy case: remove zombie conn
798 * b) link the conn to the peer if conn[idx] is empty
799 * c) if the conn[idx] isn't empty and is in READY state,
800 * remove the conn as duplicated
801 * d) if the conn[idx] isn't empty and isn't in READY state,
802 * override conn[idx] with the conn
805 usocklnd_passiveconn_hellosent(usock_conn_t *conn)
809 struct list_head tx_list;
810 struct list_head zcack_list;
814 /* almost nothing to do if conn is already linked to peer hash table */
815 if (conn->uc_peer != NULL)
816 goto passive_hellosent_done;
818 /* conn->uc_peer == NULL, so the conn isn't accessible via
819 * peer hash list, so nobody can touch the conn but us */
821 if (conn->uc_ni == NULL) /* remove zombie conn */
822 goto passive_hellosent_connkill;
824 /* all code below is race resolution, because normally
825 * passive conn is linked to peer just after receiving hello */
826 CFS_INIT_LIST_HEAD (&tx_list);
827 CFS_INIT_LIST_HEAD (&zcack_list);
829 /* conn is passive and isn't linked to any peer,
830 so its tx and zc_ack lists have to be empty */
831 LASSERT (list_empty(&conn->uc_tx_list) &&
832 list_empty(&conn->uc_zcack_list) &&
833 conn->uc_sending == 0);
835 rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer);
839 idx = usocklnd_type2idx(conn->uc_type);
841 /* try to link conn to peer */
842 pthread_mutex_lock(&peer->up_lock);
843 if (peer->up_conns[idx] == NULL) {
844 usocklnd_link_conn_to_peer(conn, peer, idx);
845 usocklnd_conn_addref(conn);
846 conn->uc_peer = peer;
847 usocklnd_peer_addref(peer);
849 conn2 = peer->up_conns[idx];
850 pthread_mutex_lock(&conn2->uc_lock);
852 if (conn2->uc_state == UC_READY) {
853 /* conn2 is in READY state, so conn is "duplicated" */
854 pthread_mutex_unlock(&conn2->uc_lock);
855 pthread_mutex_unlock(&peer->up_lock);
856 usocklnd_peer_decref(peer);
857 goto passive_hellosent_connkill;
860 /* uc_state != UC_READY => switch conn and conn2 */
861 /* Relink txs and zc_acks from conn2 to conn.
862 * We're sure that nobody but us can access to conn,
863 * nevertheless we use mutex (if we're wrong yet,
864 * deadlock is easy to see that corrupted list */
865 list_add(&tx_list, &conn2->uc_tx_list);
866 list_del_init(&conn2->uc_tx_list);
867 list_add(&zcack_list, &conn2->uc_zcack_list);
868 list_del_init(&conn2->uc_zcack_list);
870 pthread_mutex_lock(&conn->uc_lock);
871 list_add_tail(&conn->uc_tx_list, &tx_list);
872 list_del_init(&tx_list);
873 list_add_tail(&conn->uc_zcack_list, &zcack_list);
874 list_del_init(&zcack_list);
875 conn->uc_peer = peer;
876 pthread_mutex_unlock(&conn->uc_lock);
878 conn2->uc_peer = NULL; /* make conn2 zombie */
879 pthread_mutex_unlock(&conn2->uc_lock);
880 usocklnd_conn_decref(conn2);
882 usocklnd_link_conn_to_peer(conn, peer, idx);
883 usocklnd_conn_addref(conn);
884 conn->uc_peer = peer;
887 lnet_ni_decref(conn->uc_ni);
889 pthread_mutex_unlock(&peer->up_lock);
890 usocklnd_peer_decref(peer);
892 passive_hellosent_done:
893 /* safely transit to UC_READY state */
895 pthread_mutex_lock(&conn->uc_lock);
896 if (conn->uc_state != UC_DEAD) {
897 usocklnd_rx_ksmhdr_state_transition(conn);
899 /* we're ready to recive incoming packets and maybe
900 already have smth. to transmit */
901 LASSERT (conn->uc_sending == 0);
902 if ( list_empty(&conn->uc_tx_list) &&
903 list_empty(&conn->uc_zcack_list) ) {
904 conn->uc_tx_flag = 0;
905 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
908 conn->uc_tx_deadline =
909 cfs_time_shift(usock_tuns.ut_timeout);
910 conn->uc_tx_flag = 1;
911 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
916 conn->uc_state = UC_READY;
918 pthread_mutex_unlock(&conn->uc_lock);
921 passive_hellosent_connkill:
922 usocklnd_conn_kill(conn);
926 /* Send as much tx data as possible.
927 * Returns 0 or 1 on succsess, <0 if fatal error.
928 * 0 means partial send or non-fatal error, 1 - complete.
929 * Rely on libcfs_sock_writev() for differentiating fatal and
930 * non-fatal errors. An error should be considered as non-fatal if:
931 * 1) it still makes sense to continue reading &&
932 * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
934 usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx)
938 int fd = conn->uc_fd;
941 LASSERT (tx->tx_resid != 0);
944 usock_peer_t *peer = conn->uc_peer;
946 LASSERT (tx->tx_niov > 0);
948 nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov);
950 conn->uc_errored = 1;
951 if (nob <= 0) /* write queue is flow-controlled or error */
954 LASSERT (nob <= tx->tx_resid);
956 t = cfs_time_current();
957 conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
960 peer->up_last_alive = t;
965 LASSERT (tx->tx_niov > 0);
967 if (nob < iov->iov_len) {
968 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
978 } while (tx->tx_resid != 0);
980 return 1; /* send complete */
983 /* Read from wire as much data as possible.
984 * Returns 0 or 1 on succsess, <0 if error or EOF.
985 * 0 means partial read, 1 - complete */
987 usocklnd_read_data(usock_conn_t *conn)
993 LASSERT (conn->uc_rx_nob_wanted != 0);
996 usock_peer_t *peer = conn->uc_peer;
998 LASSERT (conn->uc_rx_niov > 0);
1000 nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov);
1001 if (nob <= 0) {/* read nothing or error */
1002 conn->uc_errored = 1;
1006 LASSERT (nob <= conn->uc_rx_nob_wanted);
1007 conn->uc_rx_nob_wanted -= nob;
1008 conn->uc_rx_nob_left -= nob;
1009 t = cfs_time_current();
1010 conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
1013 peer->up_last_alive = t;
1016 iov = conn->uc_rx_iov;
1018 LASSERT (conn->uc_rx_niov > 0);
1020 if (nob < iov->iov_len) {
1021 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
1022 iov->iov_len -= nob;
1026 nob -= iov->iov_len;
1027 conn->uc_rx_iov = ++iov;
1031 } while (conn->uc_rx_nob_wanted != 0);
1033 return 1; /* read complete */