Whamcloud - gitweb
LU-1600 lnet: another race in lnet_nid2peer_locked
[fs/lustre-release.git] / lnet / ulnds / socklnd / handlers.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/ulnds/socklnd/handlers.c
35  *
36  * Author: Maxim Patlasov <maxim@clusterfs.com>
37  */
38
39 #include "usocklnd.h"
40 #include <unistd.h>
41 #include <sys/syscall.h>
42
43 int
44 usocklnd_notifier_handler(int fd)
45 {
46         int notification;
47         return syscall(SYS_read, fd, &notification, sizeof(notification));
48 }
49
50 void
51 usocklnd_exception_handler(usock_conn_t *conn)
52 {
53         pthread_mutex_lock(&conn->uc_lock);
54
55         if (conn->uc_state == UC_CONNECTING ||
56             conn->uc_state == UC_SENDING_HELLO)
57                 usocklnd_conn_kill_locked(conn);
58
59         pthread_mutex_unlock(&conn->uc_lock);
60 }
61
62 int
63 usocklnd_read_handler(usock_conn_t *conn)
64 {
65         int rc;
66         int continue_reading;
67         int state;
68
69   read_again:
70         rc = 0;
71         pthread_mutex_lock(&conn->uc_lock);
72         state = conn->uc_state;
73
74         /* process special case: LNET calls lnd_recv() asyncronously */
75         if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {
76                 /* still don't have usocklnd_recv() called */
77                 rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0);
78                 if (rc == 0)
79                         conn->uc_rx_state = UC_RX_PARSE_WAIT;
80                 else
81                         usocklnd_conn_kill_locked(conn);
82
83                 pthread_mutex_unlock(&conn->uc_lock);
84                 return rc;
85         }
86
87         pthread_mutex_unlock(&conn->uc_lock);
88         /* From here and below the conn cannot be changed
89          * asyncronously, except:
90          * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list,
91          * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */
92
93         switch (state) {
94
95         case UC_RECEIVING_HELLO:
96         case UC_READY:
97                 if (conn->uc_rx_nob_wanted != 0) {
98                         /* read from conn fd as much wanted data as possible */
99                         rc = usocklnd_read_data(conn);
100                         if (rc == 0) /* partial read */
101                                 break;
102                         if (rc < 0) {/* error happened or EOF */
103                                 usocklnd_conn_kill(conn);
104                                 break;
105                         }
106                 }
107
108                 /* process incoming data */
109                 if (state == UC_READY )
110                         rc = usocklnd_read_msg(conn, &continue_reading);
111                 else /* state == UC_RECEIVING_HELLO */
112                         rc = usocklnd_read_hello(conn, &continue_reading);
113
114                 if (rc < 0) {
115                         usocklnd_conn_kill(conn);
116                         break;
117                 }
118
119                 if (continue_reading)
120                         goto read_again;
121
122                 break;
123
124         case UC_DEAD:
125                 break;
126
127         default:
128                 LBUG();
129         }
130
131         return rc;
132 }
133
134 /* Switch on rx_state.
135  * Return 0 on success, 1 if whole packet is read, else return <0
136  * Always set cont_flag: 1 if we're ready to continue reading, else 0
137  * NB: If whole packet is read, cont_flag will be set to zero to take
138  * care of fairess
139  */
140 int
141 usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
142 {
143         int   rc = 0;
144         __u64 cookie;
145
146        *cont_flag = 0;
147
148         /* smth. new emerged in RX part - let's process it */
149         switch (conn->uc_rx_state) {
150         case UC_RX_KSM_HEADER:
151                 if (conn->uc_flip) {
152                         __swab32s(&conn->uc_rx_msg.ksm_type);
153                         __swab32s(&conn->uc_rx_msg.ksm_csum);
154                         __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[0]);
155                         __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[1]);
156                 }
157
158                 /* we never send packets for wich zc-acking is required */
159                 if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
160                     conn->uc_rx_msg.ksm_zc_cookies[1] != 0) {
161                         conn->uc_errored = 1;
162                         return -EPROTO;
163                 }
164
165                 /* zc_req will be processed later, when
166                    lnet payload will be received */
167
168                 usocklnd_rx_lnethdr_state_transition(conn);
169                 *cont_flag = 1;
170                 break;
171
172         case UC_RX_LNET_HEADER:
173                 if (the_lnet.ln_pid & LNET_PID_USERFLAG) {
174                         /* replace dest_nid,pid (ksocknal sets its own) */
175                         conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid =
176                                 cpu_to_le64(conn->uc_peer->up_ni->ni_nid);
177                         conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =
178                                 cpu_to_le32(the_lnet.ln_pid);
179
180                 } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {
181                         /* Userspace peer */
182                         lnet_process_id_t *id = &conn->uc_peer->up_peerid;
183                         lnet_hdr_t        *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;
184
185                         /* Substitute process ID assigned at connection time */
186                         lhdr->src_pid = cpu_to_le32(id->pid);
187                         lhdr->src_nid = cpu_to_le64(id->nid);
188                 }
189
190                 conn->uc_rx_state = UC_RX_PARSE;
191                 usocklnd_conn_addref(conn); /* ++ref while parsing */
192
193                 rc = lnet_parse(conn->uc_peer->up_ni,
194                                 &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,
195                                 conn->uc_peerid.nid, conn, 0);
196
197                 if (rc < 0) {
198                         /* I just received garbage: give up on this conn */
199                         conn->uc_errored = 1;
200                         usocklnd_conn_decref(conn);
201                         return -EPROTO;
202                 }
203
204                 /* Race with usocklnd_recv() is possible */
205                 pthread_mutex_lock(&conn->uc_lock);
206                 LASSERT (conn->uc_rx_state == UC_RX_PARSE ||
207                          conn->uc_rx_state == UC_RX_LNET_PAYLOAD);
208
209                 /* check whether usocklnd_recv() got called */
210                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)
211                         *cont_flag = 1;
212                 pthread_mutex_unlock(&conn->uc_lock);
213                 break;
214
215         case UC_RX_PARSE:
216                 LBUG(); /* it's error to be here, because this special
217                          * case is handled by caller */
218                 break;
219
220         case UC_RX_PARSE_WAIT:
221                 LBUG(); /* it's error to be here, because the conn
222                          * shouldn't wait for POLLIN event in this
223                          * state */
224                 break;
225
226         case UC_RX_LNET_PAYLOAD:
227                 /* payload all received */
228
229                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
230
231                 cookie = conn->uc_rx_msg.ksm_zc_cookies[0];
232                 if (cookie != 0)
233                         rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
234
235                 if (rc != 0) {
236                         /* change state not to finalize twice */
237                         conn->uc_rx_state = UC_RX_KSM_HEADER;
238                         return -EPROTO;
239                 }
240
241                 /* Fall through */
242
243         case UC_RX_SKIPPING:
244                 if (conn->uc_rx_nob_left != 0) {
245                         usocklnd_rx_skipping_state_transition(conn);
246                         *cont_flag = 1;
247                 } else {
248                         usocklnd_rx_ksmhdr_state_transition(conn);
249                         rc = 1; /* whole packet is read */
250                 }
251
252                 break;
253
254         default:
255                 LBUG(); /* unknown state */
256         }
257
258         return rc;
259 }
260
261 /* Handle incoming ZC request from sender.
262  * NB: it's called only from read_handler, so we're sure that
263  * the conn cannot become zombie in the middle of processing */
264 int
265 usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie)
266 {
267         usock_conn_t   *conn;
268         usock_zc_ack_t *zc_ack;
269         int             type;
270         int             rc;
271         int             dummy;
272
273         LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack));
274         if (zc_ack == NULL)
275                 return -ENOMEM;
276         zc_ack->zc_cookie = cookie;
277
278         /* Let's assume that CONTROL is the best type for zcack,
279          * but userspace clients don't use typed connections */
280         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
281                 type = SOCKLND_CONN_ANY;
282         else
283                 type = SOCKLND_CONN_CONTROL;
284
285         rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,
286                                           &dummy);
287         if (rc != 0) {
288                 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
289                 return rc;
290         }
291         usocklnd_conn_decref(conn);
292
293         return 0;
294 }
295
296 /* Switch on rx_state.
297  * Return 0 on success, else return <0
298  * Always set cont_flag: 1 if we're ready to continue reading, else 0
299  */
300 int
301 usocklnd_read_hello(usock_conn_t *conn, int *cont_flag)
302 {
303         int                rc = 0;
304         ksock_hello_msg_t *hello = conn->uc_rx_hello;
305
306         *cont_flag = 0;
307
308         /* smth. new emerged in hello - let's process it */
309         switch (conn->uc_rx_state) {
310         case UC_RX_HELLO_MAGIC:
311                 if (hello->kshm_magic == LNET_PROTO_MAGIC)
312                         conn->uc_flip = 0;
313                 else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
314                         conn->uc_flip = 1;
315                 else
316                         return -EPROTO;
317
318                 usocklnd_rx_helloversion_state_transition(conn);
319                 *cont_flag = 1;
320                 break;
321
322         case UC_RX_HELLO_VERSION:
323                 if ((!conn->uc_flip &&
324                      (hello->kshm_version != KSOCK_PROTO_V2)) ||
325                     (conn->uc_flip &&
326                      (hello->kshm_version != __swab32(KSOCK_PROTO_V2))))
327                         return -EPROTO;
328
329                 usocklnd_rx_hellobody_state_transition(conn);
330                 *cont_flag = 1;
331                 break;
332
333         case UC_RX_HELLO_BODY:
334                 if (conn->uc_flip) {
335                         ksock_hello_msg_t *hello = conn->uc_rx_hello;
336                         __swab32s(&hello->kshm_src_pid);
337                         __swab64s(&hello->kshm_src_nid);
338                         __swab32s(&hello->kshm_dst_pid);
339                         __swab64s(&hello->kshm_dst_nid);
340                         __swab64s(&hello->kshm_src_incarnation);
341                         __swab64s(&hello->kshm_dst_incarnation);
342                         __swab32s(&hello->kshm_ctype);
343                         __swab32s(&hello->kshm_nips);
344                 }
345
346                 if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) {
347                         CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n",
348                                conn->uc_rx_hello->kshm_nips,
349                                HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);
350                         return -EPROTO;
351                 }
352
353                 if (conn->uc_rx_hello->kshm_nips) {
354                         usocklnd_rx_helloIPs_state_transition(conn);
355                         *cont_flag = 1;
356                         break;
357                 }
358                 /* fall through */
359
360         case UC_RX_HELLO_IPS:
361                 if (conn->uc_activeflag == 1) /* active conn */
362                         rc = usocklnd_activeconn_hellorecv(conn);
363                 else                          /* passive conn */
364                         rc = usocklnd_passiveconn_hellorecv(conn);
365
366                 break;
367
368         default:
369                 LBUG(); /* unknown state */
370         }
371
372         return rc;
373 }
374
375 /* All actions that we need after receiving hello on active conn:
376  * 1) Schedule removing if we're zombie
377  * 2) Restart active conn if we lost the race
378  * 3) Else: update RX part to receive KSM header
379  */
380 int
381 usocklnd_activeconn_hellorecv(usock_conn_t *conn)
382 {
383         int                rc    = 0;
384         ksock_hello_msg_t *hello = conn->uc_rx_hello;
385         usock_peer_t      *peer  = conn->uc_peer;
386
387         /* Active conn with peer==NULL is zombie.
388          * Don't try to link it to peer because the conn
389          * has already had a chance to proceed at the beginning */
390         if (peer == NULL) {
391                 LASSERT(cfs_list_empty(&conn->uc_tx_list) &&
392                         cfs_list_empty(&conn->uc_zcack_list));
393
394                 usocklnd_conn_kill(conn);
395                 return 0;
396         }
397
398         peer->up_last_alive = cfs_time_current();
399
400         /* peer says that we lost the race */
401         if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
402                 /* Start new active conn, relink txs and zc_acks from
403                  * the conn to new conn, schedule removing the conn.
404                  * Actually, we're expecting that a passive conn will
405                  * make us zombie soon and take care of our txs and
406                  * zc_acks */
407
408                 cfs_list_t tx_list, zcack_list;
409                 usock_conn_t *conn2;
410                 int idx = usocklnd_type2idx(conn->uc_type);
411
412                 CFS_INIT_LIST_HEAD (&tx_list);
413                 CFS_INIT_LIST_HEAD (&zcack_list);
414
415                 /* Block usocklnd_send() to check peer->up_conns[idx]
416                  * and to enqueue more txs */
417                 pthread_mutex_lock(&peer->up_lock);
418                 pthread_mutex_lock(&conn->uc_lock);
419
420                 /* usocklnd_shutdown() could kill us */
421                 if (conn->uc_state == UC_DEAD) {
422                         pthread_mutex_unlock(&conn->uc_lock);
423                         pthread_mutex_unlock(&peer->up_lock);
424                         return 0;
425                 }
426
427                 LASSERT (peer == conn->uc_peer);
428                 LASSERT (peer->up_conns[idx] == conn);
429
430                 rc = usocklnd_create_active_conn(peer, conn->uc_type, &conn2);
431                 if (rc) {
432                         conn->uc_errored = 1;
433                         pthread_mutex_unlock(&conn->uc_lock);
434                         pthread_mutex_unlock(&peer->up_lock);
435                         return rc;
436                 }
437
438                 usocklnd_link_conn_to_peer(conn2, peer, idx);
439                 conn2->uc_peer = peer;
440
441                 /* unlink txs and zcack from the conn */
442                 cfs_list_add(&tx_list, &conn->uc_tx_list);
443                 cfs_list_del_init(&conn->uc_tx_list);
444                 cfs_list_add(&zcack_list, &conn->uc_zcack_list);
445                 cfs_list_del_init(&conn->uc_zcack_list);
446
447                 /* link they to the conn2 */
448                 cfs_list_add(&conn2->uc_tx_list, &tx_list);
449                 cfs_list_del_init(&tx_list);
450                 cfs_list_add(&conn2->uc_zcack_list, &zcack_list);
451                 cfs_list_del_init(&zcack_list);
452
453                 /* make conn zombie */
454                 conn->uc_peer = NULL;
455                 usocklnd_peer_decref(peer);
456
457                 /* schedule conn2 for processing */
458                 rc = usocklnd_add_pollrequest(conn2, POLL_ADD_REQUEST, POLLOUT);
459                 if (rc) {
460                         peer->up_conns[idx] = NULL;
461                         usocklnd_conn_decref(conn2); /* should destroy conn */
462                 } else {
463                         usocklnd_conn_kill_locked(conn);
464                 }
465
466                 pthread_mutex_unlock(&conn->uc_lock);
467                 pthread_mutex_unlock(&peer->up_lock);
468                 usocklnd_conn_decref(conn);
469
470         } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */
471                 if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype))
472                         return -EPROTO;
473
474                 pthread_mutex_lock(&peer->up_lock);
475                 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation,
476                                              conn);
477                 pthread_mutex_unlock(&peer->up_lock);
478
479                 /* safely transit to UC_READY state */
480                 /* rc == 0 */
481                 pthread_mutex_lock(&conn->uc_lock);
482                 if (conn->uc_state != UC_DEAD) {
483                         usocklnd_rx_ksmhdr_state_transition(conn);
484
485                         /* POLLIN is already set because we just
486                          * received hello, but maybe we've smth. to
487                          * send? */
488                         LASSERT (conn->uc_sending == 0);
489                         if ( !cfs_list_empty(&conn->uc_tx_list) ||
490                              !cfs_list_empty(&conn->uc_zcack_list) ) {
491
492                                 conn->uc_tx_deadline =
493                                         cfs_time_shift(usock_tuns.ut_timeout);
494                                 conn->uc_tx_flag = 1;
495                                 rc = usocklnd_add_pollrequest(conn,
496                                                               POLL_SET_REQUEST,
497                                                               POLLIN | POLLOUT);
498                         }
499
500                         if (rc == 0)
501                                 conn->uc_state = UC_READY;
502                 }
503                 pthread_mutex_unlock(&conn->uc_lock);
504         }
505
506         return rc;
507 }
508
509 /* All actions that we need after receiving hello on passive conn:
510  * 1) Stash peer's nid, pid, incarnation and conn type
511  * 2) Cope with easy case: conn[idx] is empty - just save conn there
512  * 3) Resolve race:
513  *    a) if our nid is higher - reply with CONN_NONE and make us zombie
514  *    b) if peer's nid is higher - postpone race resolution till
515  *       READY state
516  * 4) Anyhow, send reply hello
517 */
518 int
519 usocklnd_passiveconn_hellorecv(usock_conn_t *conn)
520 {
521         ksock_hello_msg_t *hello = conn->uc_rx_hello;
522         int                type;
523         int                idx;
524         int                rc;
525         usock_peer_t      *peer;
526         lnet_ni_t         *ni        = conn->uc_ni;
527         __u32              peer_ip   = conn->uc_peer_ip;
528         __u16              peer_port = conn->uc_peer_port;
529
530         /* don't know parent peer yet and not zombie */
531         LASSERT (conn->uc_peer == NULL &&
532                  ni != NULL);
533
534         /* don't know peer's nid and incarnation yet */
535         if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
536                 /* do not trust liblustre clients */
537                 conn->uc_peerid.pid = peer_port | LNET_PID_USERFLAG;
538                 conn->uc_peerid.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
539                                                  peer_ip);
540                 if (hello->kshm_ctype != SOCKLND_CONN_ANY) {
541                         lnet_ni_decref(ni);
542                         conn->uc_ni = NULL;
543                         CERROR("Refusing to accept connection of type=%d from "
544                                "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype,
545                                HIPQUAD(peer_ip), peer_port);
546                         return -EINVAL;
547                 }
548         } else {
549                 conn->uc_peerid.pid = hello->kshm_src_pid;
550                 conn->uc_peerid.nid = hello->kshm_src_nid;
551         }
552         conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype);
553
554         rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer);
555         if (rc) {
556                 lnet_ni_decref(ni);
557                 conn->uc_ni = NULL;
558                 return rc;
559         }
560
561         peer->up_last_alive = cfs_time_current();
562
563         idx = usocklnd_type2idx(conn->uc_type);
564
565         /* safely check whether we're first */
566         pthread_mutex_lock(&peer->up_lock);
567
568         usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL);
569
570         if (peer->up_conns[idx] == NULL) {
571                 peer->up_last_alive = cfs_time_current();
572                 conn->uc_peer = peer;
573                 conn->uc_ni = NULL;
574                 usocklnd_link_conn_to_peer(conn, peer, idx);
575                 usocklnd_conn_addref(conn);
576         } else {
577                 usocklnd_peer_decref(peer);
578
579                 /* Resolve race in favour of higher NID */
580                 if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) {
581                         /* make us zombie */
582                         conn->uc_ni = NULL;
583                         type = SOCKLND_CONN_NONE;
584                 }
585
586                 /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid,
587                  * postpone race resolution till READY state
588                  * (hopefully that conn[idx] will die because of
589                  * incoming hello of CONN_NONE type) */
590         }
591         pthread_mutex_unlock(&peer->up_lock);
592
593         /* allocate and initialize fake tx with hello */
594         conn->uc_tx_hello = usocklnd_create_hello_tx(ni, type,
595                                                      conn->uc_peerid.nid);
596         if (conn->uc_ni == NULL)
597                 lnet_ni_decref(ni);
598
599         if (conn->uc_tx_hello == NULL)
600                 return -ENOMEM;
601
602         /* rc == 0 */
603         pthread_mutex_lock(&conn->uc_lock);
604         if (conn->uc_state == UC_DEAD)
605                 goto passive_hellorecv_done;
606
607         conn->uc_state = UC_SENDING_HELLO;
608         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
609         conn->uc_tx_flag = 1;
610         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT);
611
612   passive_hellorecv_done:
613         pthread_mutex_unlock(&conn->uc_lock);
614         return rc;
615 }
616
617 int
618 usocklnd_write_handler(usock_conn_t *conn)
619 {
620         usock_tx_t   *tx;
621         int           ret;
622         int           rc = 0;
623         int           state;
624         usock_peer_t *peer;
625         lnet_ni_t    *ni;
626
627         pthread_mutex_lock(&conn->uc_lock); /* like membar */
628         state = conn->uc_state;
629         pthread_mutex_unlock(&conn->uc_lock);
630
631         switch (state) {
632         case UC_CONNECTING:
633                 /* hello_tx has already been initialized
634                  * in usocklnd_create_active_conn() */
635                 usocklnd_conn_new_state(conn, UC_SENDING_HELLO);
636                 /* fall through */
637
638         case UC_SENDING_HELLO:
639                 rc = usocklnd_send_tx(conn, conn->uc_tx_hello);
640                 if (rc <= 0) /* error or partial send or connection closed */
641                         break;
642
643                 /* tx with hello was sent successfully */
644                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
645                 conn->uc_tx_hello = NULL;
646
647                 if (conn->uc_activeflag == 1) /* active conn */
648                         rc = usocklnd_activeconn_hellosent(conn);
649                 else                          /* passive conn */
650                         rc = usocklnd_passiveconn_hellosent(conn);
651
652                 break;
653
654         case UC_READY:
655                 pthread_mutex_lock(&conn->uc_lock);
656
657                 peer = conn->uc_peer;
658                 LASSERT (peer != NULL);
659                 ni = peer->up_ni;
660
661                 if (cfs_list_empty(&conn->uc_tx_list) &&
662                     cfs_list_empty(&conn->uc_zcack_list)) {
663                         LASSERT(usock_tuns.ut_fair_limit > 1);
664                         pthread_mutex_unlock(&conn->uc_lock);
665                         return 0;
666                 }
667
668                 tx = usocklnd_try_piggyback(&conn->uc_tx_list,
669                                             &conn->uc_zcack_list);
670                 if (tx != NULL)
671                         conn->uc_sending = 1;
672                 else
673                         rc = -ENOMEM;
674
675                 pthread_mutex_unlock(&conn->uc_lock);
676
677                 if (rc)
678                         break;
679
680                 rc = usocklnd_send_tx(conn, tx);
681                 if (rc == 0) { /* partial send or connection closed */
682                         pthread_mutex_lock(&conn->uc_lock);
683                         cfs_list_add(&tx->tx_list, &conn->uc_tx_list);
684                         conn->uc_sending = 0;
685                         pthread_mutex_unlock(&conn->uc_lock);
686                         break;
687                 }
688                 if (rc < 0) { /* real error */
689                         usocklnd_destroy_tx(ni, tx);
690                         break;
691                 }
692
693                 /* rc == 1: tx was sent completely */
694                 usocklnd_destroy_tx(ni, tx);
695
696                 pthread_mutex_lock(&conn->uc_lock);
697                 conn->uc_sending = 0;
698                 if (conn->uc_state != UC_DEAD &&
699                     cfs_list_empty(&conn->uc_tx_list) &&
700                     cfs_list_empty(&conn->uc_zcack_list)) {
701                         conn->uc_tx_flag = 0;
702                         ret = usocklnd_add_pollrequest(conn,
703                                                       POLL_TX_SET_REQUEST, 0);
704                         if (ret)
705                                 rc = ret;
706                 }
707                 pthread_mutex_unlock(&conn->uc_lock);
708
709                 break;
710
711         case UC_DEAD:
712                 break;
713
714         default:
715                 LBUG();
716         }
717
718         if (rc < 0)
719                 usocklnd_conn_kill(conn);
720
721         return rc;
722 }
723
724 /* Return the first tx from tx_list with piggybacked zc_ack
725  * from zcack_list when possible. If tx_list is empty, return
726  * brand new noop tx for zc_ack from zcack_list. Return NULL
727  * if an error happened */
728 usock_tx_t *
729 usocklnd_try_piggyback(cfs_list_t *tx_list_p,
730                        cfs_list_t *zcack_list_p)
731 {
732         usock_tx_t     *tx;
733         usock_zc_ack_t *zc_ack;
734
735         /* assign tx and zc_ack */
736         if (cfs_list_empty(tx_list_p))
737                 tx = NULL;
738         else {
739                 tx = cfs_list_entry(tx_list_p->next, usock_tx_t, tx_list);
740                 cfs_list_del(&tx->tx_list);
741
742                 /* already piggybacked or partially send */
743                 if (tx->tx_msg.ksm_zc_cookies[1] != 0 ||
744                     tx->tx_resid != tx->tx_nob)
745                         return tx;
746         }
747
748         if (cfs_list_empty(zcack_list_p)) {
749                 /* nothing to piggyback */
750                 return tx;
751         } else {
752                 zc_ack = cfs_list_entry(zcack_list_p->next,
753                                         usock_zc_ack_t, zc_list);
754                 cfs_list_del(&zc_ack->zc_list);
755         }
756
757         if (tx != NULL)
758                 /* piggyback the zc-ack cookie */
759                 tx->tx_msg.ksm_zc_cookies[1] = zc_ack->zc_cookie;
760         else
761                 /* cannot piggyback, need noop */
762                 tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);
763
764         LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
765         return tx;
766 }
767
768 /* All actions that we need after sending hello on active conn:
769  * 1) update RX iov to receive hello
770  * 2) state transition to UC_RECEIVING_HELLO
771  * 3) notify poll_thread that we're waiting for incoming hello */
772 int
773 usocklnd_activeconn_hellosent(usock_conn_t *conn)
774 {
775         int rc = 0;
776
777         pthread_mutex_lock(&conn->uc_lock);
778
779         if (conn->uc_state != UC_DEAD) {
780                 usocklnd_rx_hellomagic_state_transition(conn);
781                 conn->uc_state = UC_RECEIVING_HELLO;
782                 conn->uc_tx_flag = 0;
783                 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN);
784         }
785
786         pthread_mutex_unlock(&conn->uc_lock);
787
788         return rc;
789 }
790
791 /* All actions that we need after sending hello on passive conn:
792  * 1) Cope with 1st easy case: conn is already linked to a peer
793  * 2) Cope with 2nd easy case: remove zombie conn
794   * 3) Resolve race:
795  *    a) find the peer
796  *    b) link the conn to the peer if conn[idx] is empty
797  *    c) if the conn[idx] isn't empty and is in READY state,
798  *       remove the conn as duplicated
799  *    d) if the conn[idx] isn't empty and isn't in READY state,
800  *       override conn[idx] with the conn
801  */
802 int
803 usocklnd_passiveconn_hellosent(usock_conn_t *conn)
804 {
805         usock_conn_t    *conn2;
806         usock_peer_t    *peer;
807         cfs_list_t       tx_list;
808         cfs_list_t       zcack_list;
809         int              idx;
810         int              rc = 0;
811
812         /* almost nothing to do if conn is already linked to peer hash table */
813         if (conn->uc_peer != NULL)
814                 goto passive_hellosent_done;
815
816         /* conn->uc_peer == NULL, so the conn isn't accessible via
817          * peer hash list, so nobody can touch the conn but us */
818
819         if (conn->uc_ni == NULL) /* remove zombie conn */
820                 goto passive_hellosent_connkill;
821
822         /* all code below is race resolution, because normally
823          * passive conn is linked to peer just after receiving hello */
824         CFS_INIT_LIST_HEAD (&tx_list);
825         CFS_INIT_LIST_HEAD (&zcack_list);
826
827         /* conn is passive and isn't linked to any peer,
828            so its tx and zc_ack lists have to be empty */
829         LASSERT (cfs_list_empty(&conn->uc_tx_list) &&
830                  cfs_list_empty(&conn->uc_zcack_list) &&
831                  conn->uc_sending == 0);
832
833         rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer);
834         if (rc)
835                 return rc;
836
837         idx = usocklnd_type2idx(conn->uc_type);
838
839         /* try to link conn to peer */
840         pthread_mutex_lock(&peer->up_lock);
841         if (peer->up_conns[idx] == NULL) {
842                 usocklnd_link_conn_to_peer(conn, peer, idx);
843                 usocklnd_conn_addref(conn);
844                 conn->uc_peer = peer;
845                 usocklnd_peer_addref(peer);
846         } else {
847                 conn2 = peer->up_conns[idx];
848                 pthread_mutex_lock(&conn2->uc_lock);
849
850                 if (conn2->uc_state == UC_READY) {
851                         /* conn2 is in READY state, so conn is "duplicated" */
852                         pthread_mutex_unlock(&conn2->uc_lock);
853                         pthread_mutex_unlock(&peer->up_lock);
854                         usocklnd_peer_decref(peer);
855                         goto passive_hellosent_connkill;
856                 }
857
858                 /* uc_state != UC_READY => switch conn and conn2 */
859                 /* Relink txs and zc_acks from conn2 to conn.
860                  * We're sure that nobody but us can access to conn,
861                  * nevertheless we use mutex (if we're wrong yet,
862                  * deadlock is easy to see that corrupted list */
863                 cfs_list_add(&tx_list, &conn2->uc_tx_list);
864                 cfs_list_del_init(&conn2->uc_tx_list);
865                 cfs_list_add(&zcack_list, &conn2->uc_zcack_list);
866                 cfs_list_del_init(&conn2->uc_zcack_list);
867
868                 pthread_mutex_lock(&conn->uc_lock);
869                 cfs_list_add_tail(&conn->uc_tx_list, &tx_list);
870                 cfs_list_del_init(&tx_list);
871                 cfs_list_add_tail(&conn->uc_zcack_list, &zcack_list);
872                 cfs_list_del_init(&zcack_list);
873                 conn->uc_peer = peer;
874                 pthread_mutex_unlock(&conn->uc_lock);
875
876                 conn2->uc_peer = NULL; /* make conn2 zombie */
877                 pthread_mutex_unlock(&conn2->uc_lock);
878                 usocklnd_conn_decref(conn2);
879
880                 usocklnd_link_conn_to_peer(conn, peer, idx);
881                 usocklnd_conn_addref(conn);
882                 conn->uc_peer = peer;
883         }
884
885         lnet_ni_decref(conn->uc_ni);
886         conn->uc_ni = NULL;
887         pthread_mutex_unlock(&peer->up_lock);
888         usocklnd_peer_decref(peer);
889
890   passive_hellosent_done:
891         /* safely transit to UC_READY state */
892         /* rc == 0 */
893         pthread_mutex_lock(&conn->uc_lock);
894         if (conn->uc_state != UC_DEAD) {
895                 usocklnd_rx_ksmhdr_state_transition(conn);
896
897                 /* we're ready to recive incoming packets and maybe
898                    already have smth. to transmit */
899                 LASSERT (conn->uc_sending == 0);
900                 if ( cfs_list_empty(&conn->uc_tx_list) &&
901                      cfs_list_empty(&conn->uc_zcack_list) ) {
902                         conn->uc_tx_flag = 0;
903                         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
904                                                  POLLIN);
905                 } else {
906                         conn->uc_tx_deadline =
907                                 cfs_time_shift(usock_tuns.ut_timeout);
908                         conn->uc_tx_flag = 1;
909                         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
910                                                       POLLIN | POLLOUT);
911                 }
912
913                 if (rc == 0)
914                         conn->uc_state = UC_READY;
915         }
916         pthread_mutex_unlock(&conn->uc_lock);
917         return rc;
918
919   passive_hellosent_connkill:
920         usocklnd_conn_kill(conn);
921         return 0;
922 }
923
924 /* Send as much tx data as possible.
925  * Returns 0 or 1 on succsess, <0 if fatal error.
926  * 0 means partial send or non-fatal error, 1 - complete.
927  * Rely on libcfs_sock_writev() for differentiating fatal and
928  * non-fatal errors. An error should be considered as non-fatal if:
929  * 1) it still makes sense to continue reading &&
930  * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
931 int
932 usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx)
933 {
934         struct iovec *iov;
935         int           nob;
936         cfs_time_t    t;
937
938         LASSERT (tx->tx_resid != 0);
939
940         do {
941                 usock_peer_t *peer = conn->uc_peer;
942
943                 LASSERT (tx->tx_niov > 0);
944
945                 nob = libcfs_sock_writev(conn->uc_sock,
946                                          tx->tx_iov, tx->tx_niov);
947                 if (nob < 0)
948                         conn->uc_errored = 1;
949                 if (nob <= 0) /* write queue is flow-controlled or error */
950                         return nob;
951
952                 LASSERT (nob <= tx->tx_resid);
953                 tx->tx_resid -= nob;
954                 t = cfs_time_current();
955                 conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
956
957                 if(peer != NULL)
958                         peer->up_last_alive = t;
959
960                 /* "consume" iov */
961                 iov = tx->tx_iov;
962                 do {
963                         LASSERT (tx->tx_niov > 0);
964
965                         if (nob < iov->iov_len) {
966                                 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
967                                 iov->iov_len -= nob;
968                                 break;
969                         }
970
971                         nob -= iov->iov_len;
972                         tx->tx_iov = ++iov;
973                         tx->tx_niov--;
974                 } while (nob != 0);
975
976         } while (tx->tx_resid != 0);
977
978         return 1; /* send complete */
979 }
980
981 /* Read from wire as much data as possible.
982  * Returns 0 or 1 on succsess, <0 if error or EOF.
983  * 0 means partial read, 1 - complete */
984 int
985 usocklnd_read_data(usock_conn_t *conn)
986 {
987         struct iovec *iov;
988         int           nob;
989         cfs_time_t    t;
990
991         LASSERT (conn->uc_rx_nob_wanted != 0);
992
993         do {
994                 usock_peer_t *peer = conn->uc_peer;
995
996                 LASSERT (conn->uc_rx_niov > 0);
997
998                 nob = libcfs_sock_readv(conn->uc_sock,
999                                         conn->uc_rx_iov, conn->uc_rx_niov);
1000                 if (nob <= 0) {/* read nothing or error */
1001                         if (nob < 0)
1002                                 conn->uc_errored = 1;
1003                         return nob;
1004                 }
1005
1006                 LASSERT (nob <= conn->uc_rx_nob_wanted);
1007                 conn->uc_rx_nob_wanted -= nob;
1008                 conn->uc_rx_nob_left -= nob;
1009                 t = cfs_time_current();
1010                 conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
1011
1012                 if(peer != NULL)
1013                         peer->up_last_alive = t;
1014
1015                 /* "consume" iov */
1016                 iov = conn->uc_rx_iov;
1017                 do {
1018                         LASSERT (conn->uc_rx_niov > 0);
1019
1020                         if (nob < iov->iov_len) {
1021                                 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
1022                                 iov->iov_len -= nob;
1023                                 break;
1024                         }
1025
1026                         nob -= iov->iov_len;
1027                         conn->uc_rx_iov = ++iov;
1028                         conn->uc_rx_niov--;
1029                 } while (nob != 0);
1030
1031         } while (conn->uc_rx_nob_wanted != 0);
1032
1033         return 1; /* read complete */
1034 }