4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/ulnds/socklnd/handlers.c
36 * Author: Maxim Patlasov <maxim@clusterfs.com>
41 #include <sys/syscall.h>
44 usocklnd_notifier_handler(int fd)
47 return syscall(SYS_read, fd, ¬ification, sizeof(notification));
51 usocklnd_exception_handler(usock_conn_t *conn)
53 pthread_mutex_lock(&conn->uc_lock);
55 if (conn->uc_state == UC_CONNECTING ||
56 conn->uc_state == UC_SENDING_HELLO)
57 usocklnd_conn_kill_locked(conn);
59 pthread_mutex_unlock(&conn->uc_lock);
63 usocklnd_read_handler(usock_conn_t *conn)
71 pthread_mutex_lock(&conn->uc_lock);
72 state = conn->uc_state;
74 /* process special case: LNET calls lnd_recv() asyncronously */
75 if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {
76 /* still don't have usocklnd_recv() called */
77 rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0);
79 conn->uc_rx_state = UC_RX_PARSE_WAIT;
81 usocklnd_conn_kill_locked(conn);
83 pthread_mutex_unlock(&conn->uc_lock);
87 pthread_mutex_unlock(&conn->uc_lock);
88 /* From here and below the conn cannot be changed
89 * asyncronously, except:
90 * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list,
91 * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */
95 case UC_RECEIVING_HELLO:
97 if (conn->uc_rx_nob_wanted != 0) {
98 /* read from conn fd as much wanted data as possible */
99 rc = usocklnd_read_data(conn);
100 if (rc == 0) /* partial read */
102 if (rc < 0) {/* error happened or EOF */
103 usocklnd_conn_kill(conn);
108 /* process incoming data */
109 if (state == UC_READY )
110 rc = usocklnd_read_msg(conn, &continue_reading);
111 else /* state == UC_RECEIVING_HELLO */
112 rc = usocklnd_read_hello(conn, &continue_reading);
115 usocklnd_conn_kill(conn);
119 if (continue_reading)
134 /* Switch on rx_state.
135 * Return 0 on success, 1 if whole packet is read, else return <0
136 * Always set cont_flag: 1 if we're ready to continue reading, else 0
137 * NB: If whole packet is read, cont_flag will be set to zero to take
141 usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
148 /* smth. new emerged in RX part - let's process it */
149 switch (conn->uc_rx_state) {
150 case UC_RX_KSM_HEADER:
152 __swab32s(&conn->uc_rx_msg.ksm_type);
153 __swab32s(&conn->uc_rx_msg.ksm_csum);
154 __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[0]);
155 __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[1]);
158 /* we never send packets for wich zc-acking is required */
159 if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
160 conn->uc_rx_msg.ksm_zc_cookies[1] != 0) {
161 conn->uc_errored = 1;
165 /* zc_req will be processed later, when
166 lnet payload will be received */
168 usocklnd_rx_lnethdr_state_transition(conn);
172 case UC_RX_LNET_HEADER:
173 if (the_lnet.ln_pid & LNET_PID_USERFLAG) {
174 /* replace dest_nid,pid (ksocknal sets its own) */
175 conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid =
176 cpu_to_le64(conn->uc_peer->up_ni->ni_nid);
177 conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =
178 cpu_to_le32(the_lnet.ln_pid);
180 } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {
182 lnet_process_id_t *id = &conn->uc_peer->up_peerid;
183 lnet_hdr_t *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;
185 /* Substitute process ID assigned at connection time */
186 lhdr->src_pid = cpu_to_le32(id->pid);
187 lhdr->src_nid = cpu_to_le64(id->nid);
190 conn->uc_rx_state = UC_RX_PARSE;
191 usocklnd_conn_addref(conn); /* ++ref while parsing */
193 rc = lnet_parse(conn->uc_peer->up_ni,
194 &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,
195 conn->uc_peerid.nid, conn, 0);
198 /* I just received garbage: give up on this conn */
199 conn->uc_errored = 1;
200 usocklnd_conn_decref(conn);
204 /* Race with usocklnd_recv() is possible */
205 pthread_mutex_lock(&conn->uc_lock);
206 LASSERT (conn->uc_rx_state == UC_RX_PARSE ||
207 conn->uc_rx_state == UC_RX_LNET_PAYLOAD);
209 /* check whether usocklnd_recv() got called */
210 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)
212 pthread_mutex_unlock(&conn->uc_lock);
216 LBUG(); /* it's error to be here, because this special
217 * case is handled by caller */
220 case UC_RX_PARSE_WAIT:
221 LBUG(); /* it's error to be here, because the conn
222 * shouldn't wait for POLLIN event in this
226 case UC_RX_LNET_PAYLOAD:
227 /* payload all received */
229 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
231 cookie = conn->uc_rx_msg.ksm_zc_cookies[0];
233 rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
236 /* change state not to finalize twice */
237 conn->uc_rx_state = UC_RX_KSM_HEADER;
244 if (conn->uc_rx_nob_left != 0) {
245 usocklnd_rx_skipping_state_transition(conn);
248 usocklnd_rx_ksmhdr_state_transition(conn);
249 rc = 1; /* whole packet is read */
255 LBUG(); /* unknown state */
261 /* Handle incoming ZC request from sender.
262 * NB: it's called only from read_handler, so we're sure that
263 * the conn cannot become zombie in the middle of processing */
265 usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie)
268 usock_zc_ack_t *zc_ack;
273 LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack));
276 zc_ack->zc_cookie = cookie;
278 /* Let's assume that CONTROL is the best type for zcack,
279 * but userspace clients don't use typed connections */
280 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
281 type = SOCKLND_CONN_ANY;
283 type = SOCKLND_CONN_CONTROL;
285 rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,
288 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
291 usocklnd_conn_decref(conn);
296 /* Switch on rx_state.
297 * Return 0 on success, else return <0
298 * Always set cont_flag: 1 if we're ready to continue reading, else 0
301 usocklnd_read_hello(usock_conn_t *conn, int *cont_flag)
304 ksock_hello_msg_t *hello = conn->uc_rx_hello;
308 /* smth. new emerged in hello - let's process it */
309 switch (conn->uc_rx_state) {
310 case UC_RX_HELLO_MAGIC:
311 if (hello->kshm_magic == LNET_PROTO_MAGIC)
313 else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
318 usocklnd_rx_helloversion_state_transition(conn);
322 case UC_RX_HELLO_VERSION:
323 if ((!conn->uc_flip &&
324 (hello->kshm_version != KSOCK_PROTO_V2)) ||
326 (hello->kshm_version != __swab32(KSOCK_PROTO_V2))))
329 usocklnd_rx_hellobody_state_transition(conn);
333 case UC_RX_HELLO_BODY:
335 ksock_hello_msg_t *hello = conn->uc_rx_hello;
336 __swab32s(&hello->kshm_src_pid);
337 __swab64s(&hello->kshm_src_nid);
338 __swab32s(&hello->kshm_dst_pid);
339 __swab64s(&hello->kshm_dst_nid);
340 __swab64s(&hello->kshm_src_incarnation);
341 __swab64s(&hello->kshm_dst_incarnation);
342 __swab32s(&hello->kshm_ctype);
343 __swab32s(&hello->kshm_nips);
346 if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) {
347 CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n",
348 conn->uc_rx_hello->kshm_nips,
349 HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);
353 if (conn->uc_rx_hello->kshm_nips) {
354 usocklnd_rx_helloIPs_state_transition(conn);
360 case UC_RX_HELLO_IPS:
361 if (conn->uc_activeflag == 1) /* active conn */
362 rc = usocklnd_activeconn_hellorecv(conn);
363 else /* passive conn */
364 rc = usocklnd_passiveconn_hellorecv(conn);
369 LBUG(); /* unknown state */
375 /* All actions that we need after receiving hello on active conn:
376 * 1) Schedule removing if we're zombie
377 * 2) Restart active conn if we lost the race
378 * 3) Else: update RX part to receive KSM header
381 usocklnd_activeconn_hellorecv(usock_conn_t *conn)
384 ksock_hello_msg_t *hello = conn->uc_rx_hello;
385 usock_peer_t *peer = conn->uc_peer;
387 /* Active conn with peer==NULL is zombie.
388 * Don't try to link it to peer because the conn
389 * has already had a chance to proceed at the beginning */
391 LASSERT(cfs_list_empty(&conn->uc_tx_list) &&
392 cfs_list_empty(&conn->uc_zcack_list));
394 usocklnd_conn_kill(conn);
398 peer->up_last_alive = cfs_time_current();
400 /* peer says that we lost the race */
401 if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
402 /* Start new active conn, relink txs and zc_acks from
403 * the conn to new conn, schedule removing the conn.
404 * Actually, we're expecting that a passive conn will
405 * make us zombie soon and take care of our txs and
408 cfs_list_t tx_list, zcack_list;
410 int idx = usocklnd_type2idx(conn->uc_type);
412 CFS_INIT_LIST_HEAD (&tx_list);
413 CFS_INIT_LIST_HEAD (&zcack_list);
415 /* Block usocklnd_send() to check peer->up_conns[idx]
416 * and to enqueue more txs */
417 pthread_mutex_lock(&peer->up_lock);
418 pthread_mutex_lock(&conn->uc_lock);
420 /* usocklnd_shutdown() could kill us */
421 if (conn->uc_state == UC_DEAD) {
422 pthread_mutex_unlock(&conn->uc_lock);
423 pthread_mutex_unlock(&peer->up_lock);
427 LASSERT (peer == conn->uc_peer);
428 LASSERT (peer->up_conns[idx] == conn);
430 rc = usocklnd_create_active_conn(peer, conn->uc_type, &conn2);
432 conn->uc_errored = 1;
433 pthread_mutex_unlock(&conn->uc_lock);
434 pthread_mutex_unlock(&peer->up_lock);
438 usocklnd_link_conn_to_peer(conn2, peer, idx);
439 conn2->uc_peer = peer;
441 /* unlink txs and zcack from the conn */
442 cfs_list_add(&tx_list, &conn->uc_tx_list);
443 cfs_list_del_init(&conn->uc_tx_list);
444 cfs_list_add(&zcack_list, &conn->uc_zcack_list);
445 cfs_list_del_init(&conn->uc_zcack_list);
447 /* link they to the conn2 */
448 cfs_list_add(&conn2->uc_tx_list, &tx_list);
449 cfs_list_del_init(&tx_list);
450 cfs_list_add(&conn2->uc_zcack_list, &zcack_list);
451 cfs_list_del_init(&zcack_list);
453 /* make conn zombie */
454 conn->uc_peer = NULL;
455 usocklnd_peer_decref(peer);
457 /* schedule conn2 for processing */
458 rc = usocklnd_add_pollrequest(conn2, POLL_ADD_REQUEST, POLLOUT);
460 peer->up_conns[idx] = NULL;
461 usocklnd_conn_decref(conn2); /* should destroy conn */
463 usocklnd_conn_kill_locked(conn);
466 pthread_mutex_unlock(&conn->uc_lock);
467 pthread_mutex_unlock(&peer->up_lock);
468 usocklnd_conn_decref(conn);
470 } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */
471 if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype))
474 pthread_mutex_lock(&peer->up_lock);
475 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation,
477 pthread_mutex_unlock(&peer->up_lock);
479 /* safely transit to UC_READY state */
481 pthread_mutex_lock(&conn->uc_lock);
482 if (conn->uc_state != UC_DEAD) {
483 usocklnd_rx_ksmhdr_state_transition(conn);
485 /* POLLIN is already set because we just
486 * received hello, but maybe we've smth. to
488 LASSERT (conn->uc_sending == 0);
489 if ( !cfs_list_empty(&conn->uc_tx_list) ||
490 !cfs_list_empty(&conn->uc_zcack_list) ) {
492 conn->uc_tx_deadline =
493 cfs_time_shift(usock_tuns.ut_timeout);
494 conn->uc_tx_flag = 1;
495 rc = usocklnd_add_pollrequest(conn,
501 conn->uc_state = UC_READY;
503 pthread_mutex_unlock(&conn->uc_lock);
509 /* All actions that we need after receiving hello on passive conn:
510 * 1) Stash peer's nid, pid, incarnation and conn type
511 * 2) Cope with easy case: conn[idx] is empty - just save conn there
513 * a) if our nid is higher - reply with CONN_NONE and make us zombie
514 * b) if peer's nid is higher - postpone race resolution till
516 * 4) Anyhow, send reply hello
519 usocklnd_passiveconn_hellorecv(usock_conn_t *conn)
521 ksock_hello_msg_t *hello = conn->uc_rx_hello;
526 lnet_ni_t *ni = conn->uc_ni;
527 __u32 peer_ip = conn->uc_peer_ip;
528 __u16 peer_port = conn->uc_peer_port;
530 /* don't know parent peer yet and not zombie */
531 LASSERT (conn->uc_peer == NULL &&
534 /* don't know peer's nid and incarnation yet */
535 if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
536 /* do not trust liblustre clients */
537 conn->uc_peerid.pid = peer_port | LNET_PID_USERFLAG;
538 conn->uc_peerid.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
540 if (hello->kshm_ctype != SOCKLND_CONN_ANY) {
543 CERROR("Refusing to accept connection of type=%d from "
544 "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype,
545 HIPQUAD(peer_ip), peer_port);
549 conn->uc_peerid.pid = hello->kshm_src_pid;
550 conn->uc_peerid.nid = hello->kshm_src_nid;
552 conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype);
554 rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer);
561 peer->up_last_alive = cfs_time_current();
563 idx = usocklnd_type2idx(conn->uc_type);
565 /* safely check whether we're first */
566 pthread_mutex_lock(&peer->up_lock);
568 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL);
570 if (peer->up_conns[idx] == NULL) {
571 peer->up_last_alive = cfs_time_current();
572 conn->uc_peer = peer;
574 usocklnd_link_conn_to_peer(conn, peer, idx);
575 usocklnd_conn_addref(conn);
577 usocklnd_peer_decref(peer);
579 /* Resolve race in favour of higher NID */
580 if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) {
583 type = SOCKLND_CONN_NONE;
586 /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid,
587 * postpone race resolution till READY state
588 * (hopefully that conn[idx] will die because of
589 * incoming hello of CONN_NONE type) */
591 pthread_mutex_unlock(&peer->up_lock);
593 /* allocate and initialize fake tx with hello */
594 conn->uc_tx_hello = usocklnd_create_hello_tx(ni, type,
595 conn->uc_peerid.nid);
596 if (conn->uc_ni == NULL)
599 if (conn->uc_tx_hello == NULL)
603 pthread_mutex_lock(&conn->uc_lock);
604 if (conn->uc_state == UC_DEAD)
605 goto passive_hellorecv_done;
607 conn->uc_state = UC_SENDING_HELLO;
608 conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
609 conn->uc_tx_flag = 1;
610 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT);
612 passive_hellorecv_done:
613 pthread_mutex_unlock(&conn->uc_lock);
618 usocklnd_write_handler(usock_conn_t *conn)
627 pthread_mutex_lock(&conn->uc_lock); /* like membar */
628 state = conn->uc_state;
629 pthread_mutex_unlock(&conn->uc_lock);
633 /* hello_tx has already been initialized
634 * in usocklnd_create_active_conn() */
635 usocklnd_conn_new_state(conn, UC_SENDING_HELLO);
638 case UC_SENDING_HELLO:
639 rc = usocklnd_send_tx(conn, conn->uc_tx_hello);
640 if (rc <= 0) /* error or partial send or connection closed */
643 /* tx with hello was sent successfully */
644 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
645 conn->uc_tx_hello = NULL;
647 if (conn->uc_activeflag == 1) /* active conn */
648 rc = usocklnd_activeconn_hellosent(conn);
649 else /* passive conn */
650 rc = usocklnd_passiveconn_hellosent(conn);
655 pthread_mutex_lock(&conn->uc_lock);
657 peer = conn->uc_peer;
658 LASSERT (peer != NULL);
661 if (cfs_list_empty(&conn->uc_tx_list) &&
662 cfs_list_empty(&conn->uc_zcack_list)) {
663 LASSERT(usock_tuns.ut_fair_limit > 1);
664 pthread_mutex_unlock(&conn->uc_lock);
668 tx = usocklnd_try_piggyback(&conn->uc_tx_list,
669 &conn->uc_zcack_list);
671 conn->uc_sending = 1;
675 pthread_mutex_unlock(&conn->uc_lock);
680 rc = usocklnd_send_tx(conn, tx);
681 if (rc == 0) { /* partial send or connection closed */
682 pthread_mutex_lock(&conn->uc_lock);
683 cfs_list_add(&tx->tx_list, &conn->uc_tx_list);
684 conn->uc_sending = 0;
685 pthread_mutex_unlock(&conn->uc_lock);
688 if (rc < 0) { /* real error */
689 usocklnd_destroy_tx(ni, tx);
693 /* rc == 1: tx was sent completely */
694 usocklnd_destroy_tx(ni, tx);
696 pthread_mutex_lock(&conn->uc_lock);
697 conn->uc_sending = 0;
698 if (conn->uc_state != UC_DEAD &&
699 cfs_list_empty(&conn->uc_tx_list) &&
700 cfs_list_empty(&conn->uc_zcack_list)) {
701 conn->uc_tx_flag = 0;
702 ret = usocklnd_add_pollrequest(conn,
703 POLL_TX_SET_REQUEST, 0);
707 pthread_mutex_unlock(&conn->uc_lock);
719 usocklnd_conn_kill(conn);
724 /* Return the first tx from tx_list with piggybacked zc_ack
725 * from zcack_list when possible. If tx_list is empty, return
726 * brand new noop tx for zc_ack from zcack_list. Return NULL
727 * if an error happened */
729 usocklnd_try_piggyback(cfs_list_t *tx_list_p,
730 cfs_list_t *zcack_list_p)
733 usock_zc_ack_t *zc_ack;
735 /* assign tx and zc_ack */
736 if (cfs_list_empty(tx_list_p))
739 tx = cfs_list_entry(tx_list_p->next, usock_tx_t, tx_list);
740 cfs_list_del(&tx->tx_list);
742 /* already piggybacked or partially send */
743 if (tx->tx_msg.ksm_zc_cookies[1] != 0 ||
744 tx->tx_resid != tx->tx_nob)
748 if (cfs_list_empty(zcack_list_p)) {
749 /* nothing to piggyback */
752 zc_ack = cfs_list_entry(zcack_list_p->next,
753 usock_zc_ack_t, zc_list);
754 cfs_list_del(&zc_ack->zc_list);
758 /* piggyback the zc-ack cookie */
759 tx->tx_msg.ksm_zc_cookies[1] = zc_ack->zc_cookie;
761 /* cannot piggyback, need noop */
762 tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);
764 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
768 /* All actions that we need after sending hello on active conn:
769 * 1) update RX iov to receive hello
770 * 2) state transition to UC_RECEIVING_HELLO
771 * 3) notify poll_thread that we're waiting for incoming hello */
773 usocklnd_activeconn_hellosent(usock_conn_t *conn)
777 pthread_mutex_lock(&conn->uc_lock);
779 if (conn->uc_state != UC_DEAD) {
780 usocklnd_rx_hellomagic_state_transition(conn);
781 conn->uc_state = UC_RECEIVING_HELLO;
782 conn->uc_tx_flag = 0;
783 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN);
786 pthread_mutex_unlock(&conn->uc_lock);
791 /* All actions that we need after sending hello on passive conn:
792 * 1) Cope with 1st easy case: conn is already linked to a peer
793 * 2) Cope with 2nd easy case: remove zombie conn
796 * b) link the conn to the peer if conn[idx] is empty
797 * c) if the conn[idx] isn't empty and is in READY state,
798 * remove the conn as duplicated
799 * d) if the conn[idx] isn't empty and isn't in READY state,
800 * override conn[idx] with the conn
803 usocklnd_passiveconn_hellosent(usock_conn_t *conn)
808 cfs_list_t zcack_list;
812 /* almost nothing to do if conn is already linked to peer hash table */
813 if (conn->uc_peer != NULL)
814 goto passive_hellosent_done;
816 /* conn->uc_peer == NULL, so the conn isn't accessible via
817 * peer hash list, so nobody can touch the conn but us */
819 if (conn->uc_ni == NULL) /* remove zombie conn */
820 goto passive_hellosent_connkill;
822 /* all code below is race resolution, because normally
823 * passive conn is linked to peer just after receiving hello */
824 CFS_INIT_LIST_HEAD (&tx_list);
825 CFS_INIT_LIST_HEAD (&zcack_list);
827 /* conn is passive and isn't linked to any peer,
828 so its tx and zc_ack lists have to be empty */
829 LASSERT (cfs_list_empty(&conn->uc_tx_list) &&
830 cfs_list_empty(&conn->uc_zcack_list) &&
831 conn->uc_sending == 0);
833 rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer);
837 idx = usocklnd_type2idx(conn->uc_type);
839 /* try to link conn to peer */
840 pthread_mutex_lock(&peer->up_lock);
841 if (peer->up_conns[idx] == NULL) {
842 usocklnd_link_conn_to_peer(conn, peer, idx);
843 usocklnd_conn_addref(conn);
844 conn->uc_peer = peer;
845 usocklnd_peer_addref(peer);
847 conn2 = peer->up_conns[idx];
848 pthread_mutex_lock(&conn2->uc_lock);
850 if (conn2->uc_state == UC_READY) {
851 /* conn2 is in READY state, so conn is "duplicated" */
852 pthread_mutex_unlock(&conn2->uc_lock);
853 pthread_mutex_unlock(&peer->up_lock);
854 usocklnd_peer_decref(peer);
855 goto passive_hellosent_connkill;
858 /* uc_state != UC_READY => switch conn and conn2 */
859 /* Relink txs and zc_acks from conn2 to conn.
860 * We're sure that nobody but us can access to conn,
861 * nevertheless we use mutex (if we're wrong yet,
862 * deadlock is easy to see that corrupted list */
863 cfs_list_add(&tx_list, &conn2->uc_tx_list);
864 cfs_list_del_init(&conn2->uc_tx_list);
865 cfs_list_add(&zcack_list, &conn2->uc_zcack_list);
866 cfs_list_del_init(&conn2->uc_zcack_list);
868 pthread_mutex_lock(&conn->uc_lock);
869 cfs_list_add_tail(&conn->uc_tx_list, &tx_list);
870 cfs_list_del_init(&tx_list);
871 cfs_list_add_tail(&conn->uc_zcack_list, &zcack_list);
872 cfs_list_del_init(&zcack_list);
873 conn->uc_peer = peer;
874 pthread_mutex_unlock(&conn->uc_lock);
876 conn2->uc_peer = NULL; /* make conn2 zombie */
877 pthread_mutex_unlock(&conn2->uc_lock);
878 usocklnd_conn_decref(conn2);
880 usocklnd_link_conn_to_peer(conn, peer, idx);
881 usocklnd_conn_addref(conn);
882 conn->uc_peer = peer;
885 lnet_ni_decref(conn->uc_ni);
887 pthread_mutex_unlock(&peer->up_lock);
888 usocklnd_peer_decref(peer);
890 passive_hellosent_done:
891 /* safely transit to UC_READY state */
893 pthread_mutex_lock(&conn->uc_lock);
894 if (conn->uc_state != UC_DEAD) {
895 usocklnd_rx_ksmhdr_state_transition(conn);
897 /* we're ready to recive incoming packets and maybe
898 already have smth. to transmit */
899 LASSERT (conn->uc_sending == 0);
900 if ( cfs_list_empty(&conn->uc_tx_list) &&
901 cfs_list_empty(&conn->uc_zcack_list) ) {
902 conn->uc_tx_flag = 0;
903 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
906 conn->uc_tx_deadline =
907 cfs_time_shift(usock_tuns.ut_timeout);
908 conn->uc_tx_flag = 1;
909 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
914 conn->uc_state = UC_READY;
916 pthread_mutex_unlock(&conn->uc_lock);
919 passive_hellosent_connkill:
920 usocklnd_conn_kill(conn);
924 /* Send as much tx data as possible.
925 * Returns 0 or 1 on succsess, <0 if fatal error.
926 * 0 means partial send or non-fatal error, 1 - complete.
927 * Rely on libcfs_sock_writev() for differentiating fatal and
928 * non-fatal errors. An error should be considered as non-fatal if:
929 * 1) it still makes sense to continue reading &&
930 * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
932 usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx)
938 LASSERT (tx->tx_resid != 0);
941 usock_peer_t *peer = conn->uc_peer;
943 LASSERT (tx->tx_niov > 0);
945 nob = libcfs_sock_writev(conn->uc_sock,
946 tx->tx_iov, tx->tx_niov);
948 conn->uc_errored = 1;
949 if (nob <= 0) /* write queue is flow-controlled or error */
952 LASSERT (nob <= tx->tx_resid);
954 t = cfs_time_current();
955 conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
958 peer->up_last_alive = t;
963 LASSERT (tx->tx_niov > 0);
965 if (nob < iov->iov_len) {
966 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
976 } while (tx->tx_resid != 0);
978 return 1; /* send complete */
981 /* Read from wire as much data as possible.
982 * Returns 0 or 1 on succsess, <0 if error or EOF.
983 * 0 means partial read, 1 - complete */
985 usocklnd_read_data(usock_conn_t *conn)
991 LASSERT (conn->uc_rx_nob_wanted != 0);
994 usock_peer_t *peer = conn->uc_peer;
996 LASSERT (conn->uc_rx_niov > 0);
998 nob = libcfs_sock_readv(conn->uc_sock,
999 conn->uc_rx_iov, conn->uc_rx_niov);
1000 if (nob <= 0) {/* read nothing or error */
1002 conn->uc_errored = 1;
1006 LASSERT (nob <= conn->uc_rx_nob_wanted);
1007 conn->uc_rx_nob_wanted -= nob;
1008 conn->uc_rx_nob_left -= nob;
1009 t = cfs_time_current();
1010 conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
1013 peer->up_last_alive = t;
1016 iov = conn->uc_rx_iov;
1018 LASSERT (conn->uc_rx_niov > 0);
1020 if (nob < iov->iov_len) {
1021 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
1022 iov->iov_len -= nob;
1026 nob -= iov->iov_len;
1027 conn->uc_rx_iov = ++iov;
1031 } while (conn->uc_rx_nob_wanted != 0);
1033 return 1; /* read complete */