1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Maxim Patlasov <maxim@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
17 usocklnd_notifier_handler(int fd)
20 return syscall(SYS_read, fd, ¬ification, sizeof(notification));
24 usocklnd_exception_handler(usock_conn_t *conn)
26 pthread_mutex_lock(&conn->uc_lock);
28 if (conn->uc_state == UC_CONNECTING ||
29 conn->uc_state == UC_SENDING_HELLO)
30 usocklnd_conn_kill_locked(conn);
32 pthread_mutex_unlock(&conn->uc_lock);
36 usocklnd_read_handler(usock_conn_t *conn)
44 pthread_mutex_lock(&conn->uc_lock);
45 state = conn->uc_state;
47 /* process special case: LNET calls lnd_recv() asyncronously */
48 if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {
49 /* still don't have usocklnd_recv() called */
50 rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0);
52 conn->uc_rx_state = UC_RX_PARSE_WAIT;
54 usocklnd_conn_kill_locked(conn);
56 pthread_mutex_unlock(&conn->uc_lock);
60 pthread_mutex_unlock(&conn->uc_lock);
61 /* From here and below the conn cannot be changed
62 * asyncronously, except:
63 * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list,
64 * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */
68 case UC_RECEIVING_HELLO:
70 if (conn->uc_rx_nob_wanted != 0) {
71 /* read from conn fd as much wanted data as possible */
72 rc = usocklnd_read_data(conn);
73 if (rc == 0) /* partial read */
75 if (rc < 0) {/* error happened or EOF */
76 usocklnd_conn_kill(conn);
81 /* process incoming data */
82 if (state == UC_READY )
83 rc = usocklnd_read_msg(conn, &continue_reading);
84 else /* state == UC_RECEIVING_HELLO */
85 rc = usocklnd_read_hello(conn, &continue_reading);
88 usocklnd_conn_kill(conn);
107 /* Switch on rx_state.
108 * Return 0 on success, 1 if whole packet is read, else return <0
109 * Always set cont_flag: 1 if we're ready to continue reading, else 0
110 * NB: If whole packet is read, cont_flag will be set to zero to take
114 usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
121 /* smth. new emerged in RX part - let's process it */
122 switch (conn->uc_rx_state) {
123 case UC_RX_KSM_HEADER:
125 __swab32s(&conn->uc_rx_msg.ksm_type);
126 __swab32s(&conn->uc_rx_msg.ksm_csum);
127 __swab64s(&conn->uc_rx_msg.ksm_zc_req_cookie);
128 __swab64s(&conn->uc_rx_msg.ksm_zc_ack_cookie);
131 /* we never send packets for wich zc-acking is required */
132 if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
133 conn->uc_rx_msg.ksm_zc_ack_cookie != 0) {
134 conn->uc_errored = 1;
138 /* zc_req will be processed later, when
139 lnet payload will be received */
141 usocklnd_rx_lnethdr_state_transition(conn);
145 case UC_RX_LNET_HEADER:
146 if (the_lnet.ln_pid & LNET_PID_USERFLAG) {
147 /* replace dest_nid,pid (ksocknal sets its own) */
148 conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid =
149 cpu_to_le64(conn->uc_peer->up_ni->ni_nid);
150 conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =
151 cpu_to_le32(the_lnet.ln_pid);
153 } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {
155 lnet_process_id_t *id = &conn->uc_peer->up_peerid;
156 lnet_hdr_t *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;
158 /* Substitute process ID assigned at connection time */
159 lhdr->src_pid = cpu_to_le32(id->pid);
160 lhdr->src_nid = cpu_to_le64(id->nid);
163 conn->uc_rx_state = UC_RX_PARSE;
164 usocklnd_conn_addref(conn); /* ++ref while parsing */
166 rc = lnet_parse(conn->uc_peer->up_ni,
167 &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,
168 conn->uc_peerid.nid, conn, 0);
171 /* I just received garbage: give up on this conn */
172 conn->uc_errored = 1;
173 usocklnd_conn_decref(conn);
177 /* Race with usocklnd_recv() is possible */
178 pthread_mutex_lock(&conn->uc_lock);
179 LASSERT (conn->uc_rx_state == UC_RX_PARSE ||
180 conn->uc_rx_state == UC_RX_LNET_PAYLOAD);
182 /* check whether usocklnd_recv() got called */
183 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)
185 pthread_mutex_unlock(&conn->uc_lock);
189 LBUG(); /* it's error to be here, because this special
190 * case is handled by caller */
193 case UC_RX_PARSE_WAIT:
194 LBUG(); /* it's error to be here, because the conn
195 * shouldn't wait for POLLIN event in this
199 case UC_RX_LNET_PAYLOAD:
200 /* payload all received */
202 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
204 cookie = conn->uc_rx_msg.ksm_zc_req_cookie;
206 rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
209 /* change state not to finalize twice */
210 conn->uc_rx_state = UC_RX_KSM_HEADER;
217 if (conn->uc_rx_nob_left != 0) {
218 usocklnd_rx_skipping_state_transition(conn);
221 usocklnd_rx_ksmhdr_state_transition(conn);
222 rc = 1; /* whole packet is read */
228 LBUG(); /* unknown state */
234 /* Handle incoming ZC request from sender.
235 * NB: it's called only from read_handler, so we're sure that
236 * the conn cannot become zombie in the middle of processing */
238 usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie)
241 usock_zc_ack_t *zc_ack;
246 LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack));
249 zc_ack->zc_cookie = cookie;
251 /* Let's assume that CONTROL is the best type for zcack,
252 * but userspace clients don't use typed connections */
253 if (the_lnet.ln_pid & LNET_PID_USERFLAG)
254 type = SOCKLND_CONN_ANY;
256 type = SOCKLND_CONN_CONTROL;
258 rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,
261 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
264 usocklnd_conn_decref(conn);
269 /* Switch on rx_state.
270 * Return 0 on success, else return <0
271 * Always set cont_flag: 1 if we're ready to continue reading, else 0
274 usocklnd_read_hello(usock_conn_t *conn, int *cont_flag)
277 ksock_hello_msg_t *hello = conn->uc_rx_hello;
281 /* smth. new emerged in hello - let's process it */
282 switch (conn->uc_rx_state) {
283 case UC_RX_HELLO_MAGIC:
284 if (hello->kshm_magic == LNET_PROTO_MAGIC)
286 else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
291 usocklnd_rx_helloversion_state_transition(conn);
295 case UC_RX_HELLO_VERSION:
296 if ((!conn->uc_flip &&
297 (hello->kshm_version != KSOCK_PROTO_V2)) ||
299 (hello->kshm_version != __swab32(KSOCK_PROTO_V2))))
302 usocklnd_rx_hellobody_state_transition(conn);
306 case UC_RX_HELLO_BODY:
308 ksock_hello_msg_t *hello = conn->uc_rx_hello;
309 __swab32s(&hello->kshm_src_pid);
310 __swab64s(&hello->kshm_src_nid);
311 __swab32s(&hello->kshm_dst_pid);
312 __swab64s(&hello->kshm_dst_nid);
313 __swab64s(&hello->kshm_src_incarnation);
314 __swab64s(&hello->kshm_dst_incarnation);
315 __swab32s(&hello->kshm_ctype);
316 __swab32s(&hello->kshm_nips);
319 if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) {
320 CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n",
321 conn->uc_rx_hello->kshm_nips,
322 HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);
326 if (conn->uc_rx_hello->kshm_nips) {
327 usocklnd_rx_helloIPs_state_transition(conn);
333 case UC_RX_HELLO_IPS:
334 if (conn->uc_activeflag == 1) /* active conn */
335 rc = usocklnd_activeconn_hellorecv(conn);
336 else /* passive conn */
337 rc = usocklnd_passiveconn_hellorecv(conn);
342 LBUG(); /* unknown state */
348 /* All actions that we need after receiving hello on active conn:
349 * 1) Schedule removing if we're zombie
350 * 2) Restart active conn if we lost the race
351 * 3) Else: update RX part to receive KSM header
354 usocklnd_activeconn_hellorecv(usock_conn_t *conn)
357 ksock_hello_msg_t *hello = conn->uc_rx_hello;
358 usock_peer_t *peer = conn->uc_peer;
360 /* Active conn with peer==NULL is zombie.
361 * Don't try to link it to peer because the conn
362 * has already had a chance to proceed at the beginning */
364 LASSERT(list_empty(&conn->uc_tx_list) &&
365 list_empty(&conn->uc_zcack_list));
367 usocklnd_conn_kill(conn);
371 peer->up_last_alive = cfs_time_current();
373 /* peer says that we lost the race */
374 if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
375 /* Start new active conn, relink txs and zc_acks from
376 * the conn to new conn, schedule removing the conn.
377 * Actually, we're expecting that a passive conn will
378 * make us zombie soon and take care of our txs and
381 struct list_head tx_list, zcack_list;
383 int idx = usocklnd_type2idx(conn->uc_type);
385 CFS_INIT_LIST_HEAD (&tx_list);
386 CFS_INIT_LIST_HEAD (&zcack_list);
388 /* Block usocklnd_send() to check peer->up_conns[idx]
389 * and to enqueue more txs */
390 pthread_mutex_lock(&peer->up_lock);
391 pthread_mutex_lock(&conn->uc_lock);
393 /* usocklnd_shutdown() could kill us */
394 if (conn->uc_state == UC_DEAD) {
395 pthread_mutex_unlock(&conn->uc_lock);
396 pthread_mutex_unlock(&peer->up_lock);
400 LASSERT (peer == conn->uc_peer);
401 LASSERT (peer->up_conns[idx] == conn);
403 rc = usocklnd_create_active_conn(peer, conn->uc_type, &conn2);
405 conn->uc_errored = 1;
406 pthread_mutex_unlock(&conn->uc_lock);
407 pthread_mutex_unlock(&peer->up_lock);
411 usocklnd_link_conn_to_peer(conn2, peer, idx);
412 conn2->uc_peer = peer;
414 /* unlink txs and zcack from the conn */
415 list_add(&tx_list, &conn->uc_tx_list);
416 list_del_init(&conn->uc_tx_list);
417 list_add(&zcack_list, &conn->uc_zcack_list);
418 list_del_init(&conn->uc_zcack_list);
420 /* link they to the conn2 */
421 list_add(&conn2->uc_tx_list, &tx_list);
422 list_del_init(&tx_list);
423 list_add(&conn2->uc_zcack_list, &zcack_list);
424 list_del_init(&zcack_list);
426 /* make conn zombie */
427 conn->uc_peer = NULL;
428 usocklnd_peer_decref(peer);
430 /* schedule conn2 for processing */
431 rc = usocklnd_add_pollrequest(conn2, POLL_ADD_REQUEST, POLLOUT);
433 peer->up_conns[idx] = NULL;
434 usocklnd_conn_decref(conn2); /* should destroy conn */
436 usocklnd_conn_kill_locked(conn);
439 pthread_mutex_unlock(&conn->uc_lock);
440 pthread_mutex_unlock(&peer->up_lock);
441 usocklnd_conn_decref(conn);
443 } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */
444 if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype))
447 pthread_mutex_lock(&peer->up_lock);
448 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation,
450 pthread_mutex_unlock(&peer->up_lock);
452 /* safely transit to UC_READY state */
454 pthread_mutex_lock(&conn->uc_lock);
455 if (conn->uc_state != UC_DEAD) {
456 usocklnd_rx_ksmhdr_state_transition(conn);
458 /* POLLIN is already set because we just
459 * received hello, but maybe we've smth. to
461 LASSERT (conn->uc_sending == 0);
462 if ( !list_empty(&conn->uc_tx_list) ||
463 !list_empty(&conn->uc_zcack_list) ) {
465 conn->uc_tx_deadline =
466 cfs_time_shift(usock_tuns.ut_timeout);
467 conn->uc_tx_flag = 1;
468 rc = usocklnd_add_pollrequest(conn,
474 conn->uc_state = UC_READY;
476 pthread_mutex_unlock(&conn->uc_lock);
482 /* All actions that we need after receiving hello on passive conn:
483 * 1) Stash peer's nid, pid, incarnation and conn type
484 * 2) Cope with easy case: conn[idx] is empty - just save conn there
486 * a) if our nid is higher - reply with CONN_NONE and make us zombie
487 * b) if peer's nid is higher - postpone race resolution till
489 * 4) Anyhow, send reply hello
492 usocklnd_passiveconn_hellorecv(usock_conn_t *conn)
494 ksock_hello_msg_t *hello = conn->uc_rx_hello;
499 lnet_ni_t *ni = conn->uc_ni;
500 __u32 peer_ip = conn->uc_peer_ip;
501 __u16 peer_port = conn->uc_peer_port;
503 /* don't know parent peer yet and not zombie */
504 LASSERT (conn->uc_peer == NULL &&
507 /* don't know peer's nid and incarnation yet */
508 if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
509 /* do not trust liblustre clients */
510 conn->uc_peerid.pid = peer_port | LNET_PID_USERFLAG;
511 conn->uc_peerid.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
513 if (hello->kshm_ctype != SOCKLND_CONN_ANY) {
516 CERROR("Refusing to accept connection of type=%d from "
517 "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype,
518 HIPQUAD(peer_ip), peer_port);
522 conn->uc_peerid.pid = hello->kshm_src_pid;
523 conn->uc_peerid.nid = hello->kshm_src_nid;
525 conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype);
527 rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer);
534 peer->up_last_alive = cfs_time_current();
536 idx = usocklnd_type2idx(conn->uc_type);
538 /* safely check whether we're first */
539 pthread_mutex_lock(&peer->up_lock);
541 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL);
543 if (peer->up_conns[idx] == NULL) {
544 peer->up_last_alive = cfs_time_current();
545 conn->uc_peer = peer;
547 usocklnd_link_conn_to_peer(conn, peer, idx);
548 usocklnd_conn_addref(conn);
550 usocklnd_peer_decref(peer);
552 /* Resolve race in favour of higher NID */
553 if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) {
556 type = SOCKLND_CONN_NONE;
559 /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid,
560 * postpone race resolution till READY state
561 * (hopefully that conn[idx] will die because of
562 * incoming hello of CONN_NONE type) */
564 pthread_mutex_unlock(&peer->up_lock);
566 /* allocate and initialize fake tx with hello */
567 conn->uc_tx_hello = usocklnd_create_hello_tx(ni, type,
568 conn->uc_peerid.nid);
569 if (conn->uc_ni == NULL)
572 if (conn->uc_tx_hello == NULL)
576 pthread_mutex_lock(&conn->uc_lock);
577 if (conn->uc_state == UC_DEAD)
578 goto passive_hellorecv_done;
580 conn->uc_state = UC_SENDING_HELLO;
581 conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
582 conn->uc_tx_flag = 1;
583 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT);
585 passive_hellorecv_done:
586 pthread_mutex_unlock(&conn->uc_lock);
591 usocklnd_write_handler(usock_conn_t *conn)
600 pthread_mutex_lock(&conn->uc_lock); /* like membar */
601 state = conn->uc_state;
602 pthread_mutex_unlock(&conn->uc_lock);
606 /* hello_tx has already been initialized
607 * in usocklnd_create_active_conn() */
608 usocklnd_conn_new_state(conn, UC_SENDING_HELLO);
611 case UC_SENDING_HELLO:
612 rc = usocklnd_send_tx(conn, conn->uc_tx_hello);
613 if (rc <= 0) /* error or partial send or connection closed */
616 /* tx with hello was sent successfully */
617 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
618 conn->uc_tx_hello = NULL;
620 if (conn->uc_activeflag == 1) /* active conn */
621 rc = usocklnd_activeconn_hellosent(conn);
622 else /* passive conn */
623 rc = usocklnd_passiveconn_hellosent(conn);
628 pthread_mutex_lock(&conn->uc_lock);
630 peer = conn->uc_peer;
631 LASSERT (peer != NULL);
634 if (list_empty(&conn->uc_tx_list) &&
635 list_empty(&conn->uc_zcack_list)) {
636 LASSERT(usock_tuns.ut_fair_limit > 1);
637 pthread_mutex_unlock(&conn->uc_lock);
641 tx = usocklnd_try_piggyback(&conn->uc_tx_list,
642 &conn->uc_zcack_list);
644 conn->uc_sending = 1;
648 pthread_mutex_unlock(&conn->uc_lock);
653 rc = usocklnd_send_tx(conn, tx);
654 if (rc == 0) { /* partial send or connection closed */
655 pthread_mutex_lock(&conn->uc_lock);
656 list_add(&tx->tx_list, &conn->uc_tx_list);
657 conn->uc_sending = 0;
658 pthread_mutex_unlock(&conn->uc_lock);
661 if (rc < 0) { /* real error */
662 usocklnd_destroy_tx(ni, tx);
666 /* rc == 1: tx was sent completely */
667 usocklnd_destroy_tx(ni, tx);
669 pthread_mutex_lock(&conn->uc_lock);
670 conn->uc_sending = 0;
671 if (conn->uc_state != UC_DEAD &&
672 list_empty(&conn->uc_tx_list) &&
673 list_empty(&conn->uc_zcack_list)) {
674 conn->uc_tx_flag = 0;
675 ret = usocklnd_add_pollrequest(conn,
676 POLL_TX_SET_REQUEST, 0);
680 pthread_mutex_unlock(&conn->uc_lock);
692 usocklnd_conn_kill(conn);
697 /* Return the first tx from tx_list with piggybacked zc_ack
698 * from zcack_list when possible. If tx_list is empty, return
699 * brand new noop tx for zc_ack from zcack_list. Return NULL
700 * if an error happened */
702 usocklnd_try_piggyback(struct list_head *tx_list_p,
703 struct list_head *zcack_list_p)
706 usock_zc_ack_t *zc_ack;
708 /* assign tx and zc_ack */
709 if (list_empty(tx_list_p))
712 tx = list_entry(tx_list_p->next, usock_tx_t, tx_list);
713 list_del(&tx->tx_list);
715 /* already piggybacked or partially send */
716 if (tx->tx_msg.ksm_zc_ack_cookie ||
717 tx->tx_resid != tx->tx_nob)
721 if (list_empty(zcack_list_p)) {
722 /* nothing to piggyback */
725 zc_ack = list_entry(zcack_list_p->next,
726 usock_zc_ack_t, zc_list);
727 list_del(&zc_ack->zc_list);
731 /* piggyback the zc-ack cookie */
732 tx->tx_msg.ksm_zc_ack_cookie = zc_ack->zc_cookie;
734 /* cannot piggyback, need noop */
735 tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);
737 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
741 /* All actions that we need after sending hello on active conn:
742 * 1) update RX iov to receive hello
743 * 2) state transition to UC_RECEIVING_HELLO
744 * 3) notify poll_thread that we're waiting for incoming hello */
746 usocklnd_activeconn_hellosent(usock_conn_t *conn)
750 pthread_mutex_lock(&conn->uc_lock);
752 if (conn->uc_state != UC_DEAD) {
753 usocklnd_rx_hellomagic_state_transition(conn);
754 conn->uc_state = UC_RECEIVING_HELLO;
755 conn->uc_tx_flag = 0;
756 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN);
759 pthread_mutex_unlock(&conn->uc_lock);
764 /* All actions that we need after sending hello on passive conn:
765 * 1) Cope with 1st easy case: conn is already linked to a peer
766 * 2) Cope with 2nd easy case: remove zombie conn
769 * b) link the conn to the peer if conn[idx] is empty
770 * c) if the conn[idx] isn't empty and is in READY state,
771 * remove the conn as duplicated
772 * d) if the conn[idx] isn't empty and isn't in READY state,
773 * override conn[idx] with the conn
776 usocklnd_passiveconn_hellosent(usock_conn_t *conn)
780 struct list_head tx_list;
781 struct list_head zcack_list;
785 /* almost nothing to do if conn is already linked to peer hash table */
786 if (conn->uc_peer != NULL)
787 goto passive_hellosent_done;
789 /* conn->uc_peer == NULL, so the conn isn't accessible via
790 * peer hash list, so nobody can touch the conn but us */
792 if (conn->uc_ni == NULL) /* remove zombie conn */
793 goto passive_hellosent_connkill;
795 /* all code below is race resolution, because normally
796 * passive conn is linked to peer just after receiving hello */
797 CFS_INIT_LIST_HEAD (&tx_list);
798 CFS_INIT_LIST_HEAD (&zcack_list);
800 /* conn is passive and isn't linked to any peer,
801 so its tx and zc_ack lists have to be empty */
802 LASSERT (list_empty(&conn->uc_tx_list) &&
803 list_empty(&conn->uc_zcack_list) &&
804 conn->uc_sending == 0);
806 rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer);
810 idx = usocklnd_type2idx(conn->uc_type);
812 /* try to link conn to peer */
813 pthread_mutex_lock(&peer->up_lock);
814 if (peer->up_conns[idx] == NULL) {
815 usocklnd_link_conn_to_peer(conn, peer, idx);
816 usocklnd_conn_addref(conn);
817 conn->uc_peer = peer;
818 usocklnd_peer_addref(peer);
820 conn2 = peer->up_conns[idx];
821 pthread_mutex_lock(&conn2->uc_lock);
823 if (conn2->uc_state == UC_READY) {
824 /* conn2 is in READY state, so conn is "duplicated" */
825 pthread_mutex_unlock(&conn2->uc_lock);
826 pthread_mutex_unlock(&peer->up_lock);
827 usocklnd_peer_decref(peer);
828 goto passive_hellosent_connkill;
831 /* uc_state != UC_READY => switch conn and conn2 */
832 /* Relink txs and zc_acks from conn2 to conn.
833 * We're sure that nobody but us can access to conn,
834 * nevertheless we use mutex (if we're wrong yet,
835 * deadlock is easy to see that corrupted list */
836 list_add(&tx_list, &conn2->uc_tx_list);
837 list_del_init(&conn2->uc_tx_list);
838 list_add(&zcack_list, &conn2->uc_zcack_list);
839 list_del_init(&conn2->uc_zcack_list);
841 pthread_mutex_lock(&conn->uc_lock);
842 list_add_tail(&conn->uc_tx_list, &tx_list);
843 list_del_init(&tx_list);
844 list_add_tail(&conn->uc_zcack_list, &zcack_list);
845 list_del_init(&zcack_list);
846 conn->uc_peer = peer;
847 pthread_mutex_unlock(&conn->uc_lock);
849 conn2->uc_peer = NULL; /* make conn2 zombie */
850 pthread_mutex_unlock(&conn2->uc_lock);
851 usocklnd_conn_decref(conn2);
853 usocklnd_link_conn_to_peer(conn, peer, idx);
854 usocklnd_conn_addref(conn);
855 conn->uc_peer = peer;
858 lnet_ni_decref(conn->uc_ni);
860 pthread_mutex_unlock(&peer->up_lock);
861 usocklnd_peer_decref(peer);
863 passive_hellosent_done:
864 /* safely transit to UC_READY state */
866 pthread_mutex_lock(&conn->uc_lock);
867 if (conn->uc_state != UC_DEAD) {
868 usocklnd_rx_ksmhdr_state_transition(conn);
870 /* we're ready to recive incoming packets and maybe
871 already have smth. to transmit */
872 LASSERT (conn->uc_sending == 0);
873 if ( list_empty(&conn->uc_tx_list) &&
874 list_empty(&conn->uc_zcack_list) ) {
875 conn->uc_tx_flag = 0;
876 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
879 conn->uc_tx_deadline =
880 cfs_time_shift(usock_tuns.ut_timeout);
881 conn->uc_tx_flag = 1;
882 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
887 conn->uc_state = UC_READY;
889 pthread_mutex_unlock(&conn->uc_lock);
892 passive_hellosent_connkill:
893 usocklnd_conn_kill(conn);
897 /* Send as much tx data as possible.
898 * Returns 0 or 1 on succsess, <0 if fatal error.
899 * 0 means partial send or non-fatal error, 1 - complete.
900 * Rely on libcfs_sock_writev() for differentiating fatal and
901 * non-fatal errors. An error should be considered as non-fatal if:
902 * 1) it still makes sense to continue reading &&
903 * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
905 usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx)
909 int fd = conn->uc_fd;
912 LASSERT (tx->tx_resid != 0);
915 usock_peer_t *peer = conn->uc_peer;
917 LASSERT (tx->tx_niov > 0);
919 nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov);
921 conn->uc_errored = 1;
922 if (nob <= 0) /* write queue is flow-controlled or error */
925 LASSERT (nob <= tx->tx_resid);
927 t = cfs_time_current();
928 conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
931 peer->up_last_alive = t;
936 LASSERT (tx->tx_niov > 0);
938 if (nob < iov->iov_len) {
939 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
949 } while (tx->tx_resid != 0);
951 return 1; /* send complete */
954 /* Read from wire as much data as possible.
955 * Returns 0 or 1 on succsess, <0 if error or EOF.
956 * 0 means partial read, 1 - complete */
958 usocklnd_read_data(usock_conn_t *conn)
964 LASSERT (conn->uc_rx_nob_wanted != 0);
967 usock_peer_t *peer = conn->uc_peer;
969 LASSERT (conn->uc_rx_niov > 0);
971 nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov);
972 if (nob <= 0) {/* read nothing or error */
973 conn->uc_errored = 1;
977 LASSERT (nob <= conn->uc_rx_nob_wanted);
978 conn->uc_rx_nob_wanted -= nob;
979 conn->uc_rx_nob_left -= nob;
980 t = cfs_time_current();
981 conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
984 peer->up_last_alive = t;
987 iov = conn->uc_rx_iov;
989 LASSERT (conn->uc_rx_niov > 0);
991 if (nob < iov->iov_len) {
992 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
998 conn->uc_rx_iov = ++iov;
1002 } while (conn->uc_rx_nob_wanted != 0);
1004 return 1; /* read complete */