Whamcloud - gitweb
b=18844
[fs/lustre-release.git] / lnet / ulnds / socklnd / handlers.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/handlers.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42 #include <unistd.h>
43 #include <sys/syscall.h>
44
45 int
46 usocklnd_notifier_handler(int fd)
47 {
48         int notification;
49         return syscall(SYS_read, fd, &notification, sizeof(notification));
50 }
51
52 void
53 usocklnd_exception_handler(usock_conn_t *conn)
54 {
55         pthread_mutex_lock(&conn->uc_lock);
56
57         if (conn->uc_state == UC_CONNECTING ||
58             conn->uc_state == UC_SENDING_HELLO)
59                 usocklnd_conn_kill_locked(conn);
60
61         pthread_mutex_unlock(&conn->uc_lock);
62 }
63
64 int
65 usocklnd_read_handler(usock_conn_t *conn)
66 {
67         int rc;
68         int continue_reading;
69         int state;
70
71   read_again:
72         rc = 0;
73         pthread_mutex_lock(&conn->uc_lock);
74         state = conn->uc_state;
75
76         /* process special case: LNET calls lnd_recv() asyncronously */
77         if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {
78                 /* still don't have usocklnd_recv() called */
79                 rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0);
80                 if (rc == 0)
81                         conn->uc_rx_state = UC_RX_PARSE_WAIT;
82                 else
83                         usocklnd_conn_kill_locked(conn);
84
85                 pthread_mutex_unlock(&conn->uc_lock);
86                 return rc;
87         }
88
89         pthread_mutex_unlock(&conn->uc_lock);
90         /* From here and below the conn cannot be changed
91          * asyncronously, except:
92          * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list,
93          * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */
94
95         switch (state) {
96
97         case UC_RECEIVING_HELLO:
98         case UC_READY:
99                 if (conn->uc_rx_nob_wanted != 0) {
100                         /* read from conn fd as much wanted data as possible */
101                         rc = usocklnd_read_data(conn);
102                         if (rc == 0) /* partial read */
103                                 break;
104                         if (rc < 0) {/* error happened or EOF */
105                                 usocklnd_conn_kill(conn);
106                                 break;
107                         }
108                 }
109
110                 /* process incoming data */
111                 if (state == UC_READY )
112                         rc = usocklnd_read_msg(conn, &continue_reading);
113                 else /* state == UC_RECEIVING_HELLO */
114                         rc = usocklnd_read_hello(conn, &continue_reading);
115
116                 if (rc < 0) {
117                         usocklnd_conn_kill(conn);
118                         break;
119                 }
120
121                 if (continue_reading)
122                         goto read_again;
123
124                 break;
125
126         case UC_DEAD:
127                 break;
128
129         default:
130                 LBUG();
131         }
132
133         return rc;
134 }
135
136 /* Switch on rx_state.
137  * Return 0 on success, 1 if whole packet is read, else return <0
138  * Always set cont_flag: 1 if we're ready to continue reading, else 0
139  * NB: If whole packet is read, cont_flag will be set to zero to take
140  * care of fairess
141  */
142 int
143 usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
144 {
145         int   rc = 0;
146         __u64 cookie;
147
148        *cont_flag = 0;
149
150         /* smth. new emerged in RX part - let's process it */
151         switch (conn->uc_rx_state) {
152         case UC_RX_KSM_HEADER:
153                 if (conn->uc_flip) {
154                         __swab32s(&conn->uc_rx_msg.ksm_type);
155                         __swab32s(&conn->uc_rx_msg.ksm_csum);
156                         __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[0]);
157                         __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[1]);
158                 }
159
160                 /* we never send packets for wich zc-acking is required */
161                 if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
162                     conn->uc_rx_msg.ksm_zc_cookies[1] != 0) {
163                         conn->uc_errored = 1;
164                         return -EPROTO;
165                 }
166
167                 /* zc_req will be processed later, when
168                    lnet payload will be received */
169
170                 usocklnd_rx_lnethdr_state_transition(conn);
171                 *cont_flag = 1;
172                 break;
173
174         case UC_RX_LNET_HEADER:
175                 if (the_lnet.ln_pid & LNET_PID_USERFLAG) {
176                         /* replace dest_nid,pid (ksocknal sets its own) */
177                         conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid =
178                                 cpu_to_le64(conn->uc_peer->up_ni->ni_nid);
179                         conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =
180                                 cpu_to_le32(the_lnet.ln_pid);
181
182                 } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {
183                         /* Userspace peer */
184                         lnet_process_id_t *id = &conn->uc_peer->up_peerid;
185                         lnet_hdr_t        *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;
186
187                         /* Substitute process ID assigned at connection time */
188                         lhdr->src_pid = cpu_to_le32(id->pid);
189                         lhdr->src_nid = cpu_to_le64(id->nid);
190                 }
191
192                 conn->uc_rx_state = UC_RX_PARSE;
193                 usocklnd_conn_addref(conn); /* ++ref while parsing */
194
195                 rc = lnet_parse(conn->uc_peer->up_ni,
196                                 &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,
197                                 conn->uc_peerid.nid, conn, 0);
198
199                 if (rc < 0) {
200                         /* I just received garbage: give up on this conn */
201                         conn->uc_errored = 1;
202                         usocklnd_conn_decref(conn);
203                         return -EPROTO;
204                 }
205
206                 /* Race with usocklnd_recv() is possible */
207                 pthread_mutex_lock(&conn->uc_lock);
208                 LASSERT (conn->uc_rx_state == UC_RX_PARSE ||
209                          conn->uc_rx_state == UC_RX_LNET_PAYLOAD);
210
211                 /* check whether usocklnd_recv() got called */
212                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)
213                         *cont_flag = 1;
214                 pthread_mutex_unlock(&conn->uc_lock);
215                 break;
216
217         case UC_RX_PARSE:
218                 LBUG(); /* it's error to be here, because this special
219                          * case is handled by caller */
220                 break;
221
222         case UC_RX_PARSE_WAIT:
223                 LBUG(); /* it's error to be here, because the conn
224                          * shouldn't wait for POLLIN event in this
225                          * state */
226                 break;
227
228         case UC_RX_LNET_PAYLOAD:
229                 /* payload all received */
230
231                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
232
233                 cookie = conn->uc_rx_msg.ksm_zc_cookies[0];
234                 if (cookie != 0)
235                         rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
236
237                 if (rc != 0) {
238                         /* change state not to finalize twice */
239                         conn->uc_rx_state = UC_RX_KSM_HEADER;
240                         return -EPROTO;
241                 }
242
243                 /* Fall through */
244
245         case UC_RX_SKIPPING:
246                 if (conn->uc_rx_nob_left != 0) {
247                         usocklnd_rx_skipping_state_transition(conn);
248                         *cont_flag = 1;
249                 } else {
250                         usocklnd_rx_ksmhdr_state_transition(conn);
251                         rc = 1; /* whole packet is read */
252                 }
253
254                 break;
255
256         default:
257                 LBUG(); /* unknown state */
258         }
259
260         return rc;
261 }
262
263 /* Handle incoming ZC request from sender.
264  * NB: it's called only from read_handler, so we're sure that
265  * the conn cannot become zombie in the middle of processing */
266 int
267 usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie)
268 {
269         usock_conn_t   *conn;
270         usock_zc_ack_t *zc_ack;
271         int             type;
272         int             rc;
273         int             dummy;
274
275         LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack));
276         if (zc_ack == NULL)
277                 return -ENOMEM;
278         zc_ack->zc_cookie = cookie;
279
280         /* Let's assume that CONTROL is the best type for zcack,
281          * but userspace clients don't use typed connections */
282         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
283                 type = SOCKLND_CONN_ANY;
284         else
285                 type = SOCKLND_CONN_CONTROL;
286
287         rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,
288                                           &dummy);
289         if (rc != 0) {
290                 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
291                 return rc;
292         }
293         usocklnd_conn_decref(conn);
294
295         return 0;
296 }
297
298 /* Switch on rx_state.
299  * Return 0 on success, else return <0
300  * Always set cont_flag: 1 if we're ready to continue reading, else 0
301  */
302 int
303 usocklnd_read_hello(usock_conn_t *conn, int *cont_flag)
304 {
305         int                rc = 0;
306         ksock_hello_msg_t *hello = conn->uc_rx_hello;
307
308         *cont_flag = 0;
309
310         /* smth. new emerged in hello - let's process it */
311         switch (conn->uc_rx_state) {
312         case UC_RX_HELLO_MAGIC:
313                 if (hello->kshm_magic == LNET_PROTO_MAGIC)
314                         conn->uc_flip = 0;
315                 else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
316                         conn->uc_flip = 1;
317                 else
318                         return -EPROTO;
319
320                 usocklnd_rx_helloversion_state_transition(conn);
321                 *cont_flag = 1;
322                 break;
323
324         case UC_RX_HELLO_VERSION:
325                 if ((!conn->uc_flip &&
326                      (hello->kshm_version != KSOCK_PROTO_V2)) ||
327                     (conn->uc_flip &&
328                      (hello->kshm_version != __swab32(KSOCK_PROTO_V2))))
329                         return -EPROTO;
330
331                 usocklnd_rx_hellobody_state_transition(conn);
332                 *cont_flag = 1;
333                 break;
334
335         case UC_RX_HELLO_BODY:
336                 if (conn->uc_flip) {
337                         ksock_hello_msg_t *hello = conn->uc_rx_hello;
338                         __swab32s(&hello->kshm_src_pid);
339                         __swab64s(&hello->kshm_src_nid);
340                         __swab32s(&hello->kshm_dst_pid);
341                         __swab64s(&hello->kshm_dst_nid);
342                         __swab64s(&hello->kshm_src_incarnation);
343                         __swab64s(&hello->kshm_dst_incarnation);
344                         __swab32s(&hello->kshm_ctype);
345                         __swab32s(&hello->kshm_nips);
346                 }
347
348                 if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) {
349                         CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n",
350                                conn->uc_rx_hello->kshm_nips,
351                                HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);
352                         return -EPROTO;
353                 }
354
355                 if (conn->uc_rx_hello->kshm_nips) {
356                         usocklnd_rx_helloIPs_state_transition(conn);
357                         *cont_flag = 1;
358                         break;
359                 }
360                 /* fall through */
361
362         case UC_RX_HELLO_IPS:
363                 if (conn->uc_activeflag == 1) /* active conn */
364                         rc = usocklnd_activeconn_hellorecv(conn);
365                 else                          /* passive conn */
366                         rc = usocklnd_passiveconn_hellorecv(conn);
367
368                 break;
369
370         default:
371                 LBUG(); /* unknown state */
372         }
373
374         return rc;
375 }
376
377 /* All actions that we need after receiving hello on active conn:
378  * 1) Schedule removing if we're zombie
379  * 2) Restart active conn if we lost the race
380  * 3) Else: update RX part to receive KSM header
381  */
382 int
383 usocklnd_activeconn_hellorecv(usock_conn_t *conn)
384 {
385         int                rc    = 0;
386         ksock_hello_msg_t *hello = conn->uc_rx_hello;
387         usock_peer_t      *peer  = conn->uc_peer;
388
389         /* Active conn with peer==NULL is zombie.
390          * Don't try to link it to peer because the conn
391          * has already had a chance to proceed at the beginning */
392         if (peer == NULL) {
393                 LASSERT(list_empty(&conn->uc_tx_list) &&
394                         list_empty(&conn->uc_zcack_list));
395
396                 usocklnd_conn_kill(conn);
397                 return 0;
398         }
399
400         peer->up_last_alive = cfs_time_current();
401
402         /* peer says that we lost the race */
403         if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
404                 /* Start new active conn, relink txs and zc_acks from
405                  * the conn to new conn, schedule removing the conn.
406                  * Actually, we're expecting that a passive conn will
407                  * make us zombie soon and take care of our txs and
408                  * zc_acks */
409
410                 struct list_head tx_list, zcack_list;
411                 usock_conn_t *conn2;
412                 int idx = usocklnd_type2idx(conn->uc_type);
413
414                 CFS_INIT_LIST_HEAD (&tx_list);
415                 CFS_INIT_LIST_HEAD (&zcack_list);
416
417                 /* Block usocklnd_send() to check peer->up_conns[idx]
418                  * and to enqueue more txs */
419                 pthread_mutex_lock(&peer->up_lock);
420                 pthread_mutex_lock(&conn->uc_lock);
421
422                 /* usocklnd_shutdown() could kill us */
423                 if (conn->uc_state == UC_DEAD) {
424                         pthread_mutex_unlock(&conn->uc_lock);
425                         pthread_mutex_unlock(&peer->up_lock);
426                         return 0;
427                 }
428
429                 LASSERT (peer == conn->uc_peer);
430                 LASSERT (peer->up_conns[idx] == conn);
431
432                 rc = usocklnd_create_active_conn(peer, conn->uc_type, &conn2);
433                 if (rc) {
434                         conn->uc_errored = 1;
435                         pthread_mutex_unlock(&conn->uc_lock);
436                         pthread_mutex_unlock(&peer->up_lock);
437                         return rc;
438                 }
439
440                 usocklnd_link_conn_to_peer(conn2, peer, idx);
441                 conn2->uc_peer = peer;
442
443                 /* unlink txs and zcack from the conn */
444                 list_add(&tx_list, &conn->uc_tx_list);
445                 list_del_init(&conn->uc_tx_list);
446                 list_add(&zcack_list, &conn->uc_zcack_list);
447                 list_del_init(&conn->uc_zcack_list);
448
449                 /* link they to the conn2 */
450                 list_add(&conn2->uc_tx_list, &tx_list);
451                 list_del_init(&tx_list);
452                 list_add(&conn2->uc_zcack_list, &zcack_list);
453                 list_del_init(&zcack_list);
454
455                 /* make conn zombie */
456                 conn->uc_peer = NULL;
457                 usocklnd_peer_decref(peer);
458
459                 /* schedule conn2 for processing */
460                 rc = usocklnd_add_pollrequest(conn2, POLL_ADD_REQUEST, POLLOUT);
461                 if (rc) {
462                         peer->up_conns[idx] = NULL;
463                         usocklnd_conn_decref(conn2); /* should destroy conn */
464                 } else {
465                         usocklnd_conn_kill_locked(conn);
466                 }
467
468                 pthread_mutex_unlock(&conn->uc_lock);
469                 pthread_mutex_unlock(&peer->up_lock);
470                 usocklnd_conn_decref(conn);
471
472         } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */
473                 if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype))
474                         return -EPROTO;
475
476                 pthread_mutex_lock(&peer->up_lock);
477                 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation,
478                                              conn);
479                 pthread_mutex_unlock(&peer->up_lock);
480
481                 /* safely transit to UC_READY state */
482                 /* rc == 0 */
483                 pthread_mutex_lock(&conn->uc_lock);
484                 if (conn->uc_state != UC_DEAD) {
485                         usocklnd_rx_ksmhdr_state_transition(conn);
486
487                         /* POLLIN is already set because we just
488                          * received hello, but maybe we've smth. to
489                          * send? */
490                         LASSERT (conn->uc_sending == 0);
491                         if ( !list_empty(&conn->uc_tx_list) ||
492                              !list_empty(&conn->uc_zcack_list) ) {
493
494                                 conn->uc_tx_deadline =
495                                         cfs_time_shift(usock_tuns.ut_timeout);
496                                 conn->uc_tx_flag = 1;
497                                 rc = usocklnd_add_pollrequest(conn,
498                                                               POLL_SET_REQUEST,
499                                                               POLLIN | POLLOUT);
500                         }
501
502                         if (rc == 0)
503                                 conn->uc_state = UC_READY;
504                 }
505                 pthread_mutex_unlock(&conn->uc_lock);
506         }
507
508         return rc;
509 }
510
511 /* All actions that we need after receiving hello on passive conn:
512  * 1) Stash peer's nid, pid, incarnation and conn type
513  * 2) Cope with easy case: conn[idx] is empty - just save conn there
514  * 3) Resolve race:
515  *    a) if our nid is higher - reply with CONN_NONE and make us zombie
516  *    b) if peer's nid is higher - postpone race resolution till
517  *       READY state
518  * 4) Anyhow, send reply hello
519 */
520 int
521 usocklnd_passiveconn_hellorecv(usock_conn_t *conn)
522 {
523         ksock_hello_msg_t *hello = conn->uc_rx_hello;
524         int                type;
525         int                idx;
526         int                rc;
527         usock_peer_t      *peer;
528         lnet_ni_t         *ni        = conn->uc_ni;
529         __u32              peer_ip   = conn->uc_peer_ip;
530         __u16              peer_port = conn->uc_peer_port;
531
532         /* don't know parent peer yet and not zombie */
533         LASSERT (conn->uc_peer == NULL &&
534                  ni != NULL);
535
536         /* don't know peer's nid and incarnation yet */
537         if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
538                 /* do not trust liblustre clients */
539                 conn->uc_peerid.pid = peer_port | LNET_PID_USERFLAG;
540                 conn->uc_peerid.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
541                                                  peer_ip);
542                 if (hello->kshm_ctype != SOCKLND_CONN_ANY) {
543                         lnet_ni_decref(ni);
544                         conn->uc_ni = NULL;
545                         CERROR("Refusing to accept connection of type=%d from "
546                                "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype,
547                                HIPQUAD(peer_ip), peer_port);
548                         return -EINVAL;
549                 }
550         } else {
551                 conn->uc_peerid.pid = hello->kshm_src_pid;
552                 conn->uc_peerid.nid = hello->kshm_src_nid;
553         }
554         conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype);
555
556         rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer);
557         if (rc) {
558                 lnet_ni_decref(ni);
559                 conn->uc_ni = NULL;
560                 return rc;
561         }
562
563         peer->up_last_alive = cfs_time_current();
564
565         idx = usocklnd_type2idx(conn->uc_type);
566
567         /* safely check whether we're first */
568         pthread_mutex_lock(&peer->up_lock);
569
570         usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL);
571
572         if (peer->up_conns[idx] == NULL) {
573                 peer->up_last_alive = cfs_time_current();
574                 conn->uc_peer = peer;
575                 conn->uc_ni = NULL;
576                 usocklnd_link_conn_to_peer(conn, peer, idx);
577                 usocklnd_conn_addref(conn);
578         } else {
579                 usocklnd_peer_decref(peer);
580
581                 /* Resolve race in favour of higher NID */
582                 if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) {
583                         /* make us zombie */
584                         conn->uc_ni = NULL;
585                         type = SOCKLND_CONN_NONE;
586                 }
587
588                 /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid,
589                  * postpone race resolution till READY state
590                  * (hopefully that conn[idx] will die because of
591                  * incoming hello of CONN_NONE type) */
592         }
593         pthread_mutex_unlock(&peer->up_lock);
594
595         /* allocate and initialize fake tx with hello */
596         conn->uc_tx_hello = usocklnd_create_hello_tx(ni, type,
597                                                      conn->uc_peerid.nid);
598         if (conn->uc_ni == NULL)
599                 lnet_ni_decref(ni);
600
601         if (conn->uc_tx_hello == NULL)
602                 return -ENOMEM;
603
604         /* rc == 0 */
605         pthread_mutex_lock(&conn->uc_lock);
606         if (conn->uc_state == UC_DEAD)
607                 goto passive_hellorecv_done;
608
609         conn->uc_state = UC_SENDING_HELLO;
610         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
611         conn->uc_tx_flag = 1;
612         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT);
613
614   passive_hellorecv_done:
615         pthread_mutex_unlock(&conn->uc_lock);
616         return rc;
617 }
618
619 int
620 usocklnd_write_handler(usock_conn_t *conn)
621 {
622         usock_tx_t   *tx;
623         int           ret;
624         int           rc = 0;
625         int           state;
626         usock_peer_t *peer;
627         lnet_ni_t    *ni;
628
629         pthread_mutex_lock(&conn->uc_lock); /* like membar */
630         state = conn->uc_state;
631         pthread_mutex_unlock(&conn->uc_lock);
632
633         switch (state) {
634         case UC_CONNECTING:
635                 /* hello_tx has already been initialized
636                  * in usocklnd_create_active_conn() */
637                 usocklnd_conn_new_state(conn, UC_SENDING_HELLO);
638                 /* fall through */
639
640         case UC_SENDING_HELLO:
641                 rc = usocklnd_send_tx(conn, conn->uc_tx_hello);
642                 if (rc <= 0) /* error or partial send or connection closed */
643                         break;
644
645                 /* tx with hello was sent successfully */
646                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
647                 conn->uc_tx_hello = NULL;
648
649                 if (conn->uc_activeflag == 1) /* active conn */
650                         rc = usocklnd_activeconn_hellosent(conn);
651                 else                          /* passive conn */
652                         rc = usocklnd_passiveconn_hellosent(conn);
653
654                 break;
655
656         case UC_READY:
657                 pthread_mutex_lock(&conn->uc_lock);
658
659                 peer = conn->uc_peer;
660                 LASSERT (peer != NULL);
661                 ni = peer->up_ni;
662
663                 if (list_empty(&conn->uc_tx_list) &&
664                     list_empty(&conn->uc_zcack_list)) {
665                         LASSERT(usock_tuns.ut_fair_limit > 1);
666                         pthread_mutex_unlock(&conn->uc_lock);
667                         return 0;
668                 }
669
670                 tx = usocklnd_try_piggyback(&conn->uc_tx_list,
671                                             &conn->uc_zcack_list);
672                 if (tx != NULL)
673                         conn->uc_sending = 1;
674                 else
675                         rc = -ENOMEM;
676
677                 pthread_mutex_unlock(&conn->uc_lock);
678
679                 if (rc)
680                         break;
681
682                 rc = usocklnd_send_tx(conn, tx);
683                 if (rc == 0) { /* partial send or connection closed */
684                         pthread_mutex_lock(&conn->uc_lock);
685                         list_add(&tx->tx_list, &conn->uc_tx_list);
686                         conn->uc_sending = 0;
687                         pthread_mutex_unlock(&conn->uc_lock);
688                         break;
689                 }
690                 if (rc < 0) { /* real error */
691                         usocklnd_destroy_tx(ni, tx);
692                         break;
693                 }
694
695                 /* rc == 1: tx was sent completely */
696                 usocklnd_destroy_tx(ni, tx);
697
698                 pthread_mutex_lock(&conn->uc_lock);
699                 conn->uc_sending = 0;
700                 if (conn->uc_state != UC_DEAD &&
701                     list_empty(&conn->uc_tx_list) &&
702                     list_empty(&conn->uc_zcack_list)) {
703                         conn->uc_tx_flag = 0;
704                         ret = usocklnd_add_pollrequest(conn,
705                                                       POLL_TX_SET_REQUEST, 0);
706                         if (ret)
707                                 rc = ret;
708                 }
709                 pthread_mutex_unlock(&conn->uc_lock);
710
711                 break;
712
713         case UC_DEAD:
714                 break;
715
716         default:
717                 LBUG();
718         }
719
720         if (rc < 0)
721                 usocklnd_conn_kill(conn);
722
723         return rc;
724 }
725
726 /* Return the first tx from tx_list with piggybacked zc_ack
727  * from zcack_list when possible. If tx_list is empty, return
728  * brand new noop tx for zc_ack from zcack_list. Return NULL
729  * if an error happened */
730 usock_tx_t *
731 usocklnd_try_piggyback(struct list_head *tx_list_p,
732                        struct list_head *zcack_list_p)
733 {
734         usock_tx_t     *tx;
735         usock_zc_ack_t *zc_ack;
736
737         /* assign tx and zc_ack */
738         if (list_empty(tx_list_p))
739                 tx = NULL;
740         else {
741                 tx = list_entry(tx_list_p->next, usock_tx_t, tx_list);
742                 list_del(&tx->tx_list);
743
744                 /* already piggybacked or partially send */
745                 if (tx->tx_msg.ksm_zc_cookies[1] != 0 ||
746                     tx->tx_resid != tx->tx_nob)
747                         return tx;
748         }
749
750         if (list_empty(zcack_list_p)) {
751                 /* nothing to piggyback */
752                 return tx;
753         } else {
754                 zc_ack = list_entry(zcack_list_p->next,
755                                     usock_zc_ack_t, zc_list);
756                 list_del(&zc_ack->zc_list);
757         }
758
759         if (tx != NULL)
760                 /* piggyback the zc-ack cookie */
761                 tx->tx_msg.ksm_zc_cookies[1] = zc_ack->zc_cookie;
762         else
763                 /* cannot piggyback, need noop */
764                 tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);
765
766         LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
767         return tx;
768 }
769
770 /* All actions that we need after sending hello on active conn:
771  * 1) update RX iov to receive hello
772  * 2) state transition to UC_RECEIVING_HELLO
773  * 3) notify poll_thread that we're waiting for incoming hello */
774 int
775 usocklnd_activeconn_hellosent(usock_conn_t *conn)
776 {
777         int rc = 0;
778
779         pthread_mutex_lock(&conn->uc_lock);
780
781         if (conn->uc_state != UC_DEAD) {
782                 usocklnd_rx_hellomagic_state_transition(conn);
783                 conn->uc_state = UC_RECEIVING_HELLO;
784                 conn->uc_tx_flag = 0;
785                 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN);
786         }
787
788         pthread_mutex_unlock(&conn->uc_lock);
789
790         return rc;
791 }
792
793 /* All actions that we need after sending hello on passive conn:
794  * 1) Cope with 1st easy case: conn is already linked to a peer
795  * 2) Cope with 2nd easy case: remove zombie conn
796   * 3) Resolve race:
797  *    a) find the peer
798  *    b) link the conn to the peer if conn[idx] is empty
799  *    c) if the conn[idx] isn't empty and is in READY state,
800  *       remove the conn as duplicated
801  *    d) if the conn[idx] isn't empty and isn't in READY state,
802  *       override conn[idx] with the conn
803  */
804 int
805 usocklnd_passiveconn_hellosent(usock_conn_t *conn)
806 {
807         usock_conn_t    *conn2;
808         usock_peer_t    *peer;
809         struct list_head tx_list;
810         struct list_head zcack_list;
811         int              idx;
812         int              rc = 0;
813
814         /* almost nothing to do if conn is already linked to peer hash table */
815         if (conn->uc_peer != NULL)
816                 goto passive_hellosent_done;
817
818         /* conn->uc_peer == NULL, so the conn isn't accessible via
819          * peer hash list, so nobody can touch the conn but us */
820
821         if (conn->uc_ni == NULL) /* remove zombie conn */
822                 goto passive_hellosent_connkill;
823
824         /* all code below is race resolution, because normally
825          * passive conn is linked to peer just after receiving hello */
826         CFS_INIT_LIST_HEAD (&tx_list);
827         CFS_INIT_LIST_HEAD (&zcack_list);
828
829         /* conn is passive and isn't linked to any peer,
830            so its tx and zc_ack lists have to be empty */
831         LASSERT (list_empty(&conn->uc_tx_list) &&
832                  list_empty(&conn->uc_zcack_list) &&
833                  conn->uc_sending == 0);
834
835         rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer);
836         if (rc)
837                 return rc;
838
839         idx = usocklnd_type2idx(conn->uc_type);
840
841         /* try to link conn to peer */
842         pthread_mutex_lock(&peer->up_lock);
843         if (peer->up_conns[idx] == NULL) {
844                 usocklnd_link_conn_to_peer(conn, peer, idx);
845                 usocklnd_conn_addref(conn);
846                 conn->uc_peer = peer;
847                 usocklnd_peer_addref(peer);
848         } else {
849                 conn2 = peer->up_conns[idx];
850                 pthread_mutex_lock(&conn2->uc_lock);
851
852                 if (conn2->uc_state == UC_READY) {
853                         /* conn2 is in READY state, so conn is "duplicated" */
854                         pthread_mutex_unlock(&conn2->uc_lock);
855                         pthread_mutex_unlock(&peer->up_lock);
856                         usocklnd_peer_decref(peer);
857                         goto passive_hellosent_connkill;
858                 }
859
860                 /* uc_state != UC_READY => switch conn and conn2 */
861                 /* Relink txs and zc_acks from conn2 to conn.
862                  * We're sure that nobody but us can access to conn,
863                  * nevertheless we use mutex (if we're wrong yet,
864                  * deadlock is easy to see that corrupted list */
865                 list_add(&tx_list, &conn2->uc_tx_list);
866                 list_del_init(&conn2->uc_tx_list);
867                 list_add(&zcack_list, &conn2->uc_zcack_list);
868                 list_del_init(&conn2->uc_zcack_list);
869
870                 pthread_mutex_lock(&conn->uc_lock);
871                 list_add_tail(&conn->uc_tx_list, &tx_list);
872                 list_del_init(&tx_list);
873                 list_add_tail(&conn->uc_zcack_list, &zcack_list);
874                 list_del_init(&zcack_list);
875                 conn->uc_peer = peer;
876                 pthread_mutex_unlock(&conn->uc_lock);
877
878                 conn2->uc_peer = NULL; /* make conn2 zombie */
879                 pthread_mutex_unlock(&conn2->uc_lock);
880                 usocklnd_conn_decref(conn2);
881
882                 usocklnd_link_conn_to_peer(conn, peer, idx);
883                 usocklnd_conn_addref(conn);
884                 conn->uc_peer = peer;
885         }
886
887         lnet_ni_decref(conn->uc_ni);
888         conn->uc_ni = NULL;
889         pthread_mutex_unlock(&peer->up_lock);
890         usocklnd_peer_decref(peer);
891
892   passive_hellosent_done:
893         /* safely transit to UC_READY state */
894         /* rc == 0 */
895         pthread_mutex_lock(&conn->uc_lock);
896         if (conn->uc_state != UC_DEAD) {
897                 usocklnd_rx_ksmhdr_state_transition(conn);
898
899                 /* we're ready to recive incoming packets and maybe
900                    already have smth. to transmit */
901                 LASSERT (conn->uc_sending == 0);
902                 if ( list_empty(&conn->uc_tx_list) &&
903                      list_empty(&conn->uc_zcack_list) ) {
904                         conn->uc_tx_flag = 0;
905                         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
906                                                  POLLIN);
907                 } else {
908                         conn->uc_tx_deadline =
909                                 cfs_time_shift(usock_tuns.ut_timeout);
910                         conn->uc_tx_flag = 1;
911                         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
912                                                       POLLIN | POLLOUT);
913                 }
914
915                 if (rc == 0)
916                         conn->uc_state = UC_READY;
917         }
918         pthread_mutex_unlock(&conn->uc_lock);
919         return rc;
920
921   passive_hellosent_connkill:
922         usocklnd_conn_kill(conn);
923         return 0;
924 }
925
926 /* Send as much tx data as possible.
927  * Returns 0 or 1 on succsess, <0 if fatal error.
928  * 0 means partial send or non-fatal error, 1 - complete.
929  * Rely on libcfs_sock_writev() for differentiating fatal and
930  * non-fatal errors. An error should be considered as non-fatal if:
931  * 1) it still makes sense to continue reading &&
932  * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
933 int
934 usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx)
935 {
936         struct iovec *iov;
937         int           nob;
938         cfs_time_t    t;
939
940         LASSERT (tx->tx_resid != 0);
941
942         do {
943                 usock_peer_t *peer = conn->uc_peer;
944
945                 LASSERT (tx->tx_niov > 0);
946
947                 nob = libcfs_sock_writev(conn->uc_sock,
948                                          tx->tx_iov, tx->tx_niov);
949                 if (nob < 0)
950                         conn->uc_errored = 1;
951                 if (nob <= 0) /* write queue is flow-controlled or error */
952                         return nob;
953
954                 LASSERT (nob <= tx->tx_resid);
955                 tx->tx_resid -= nob;
956                 t = cfs_time_current();
957                 conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
958
959                 if(peer != NULL)
960                         peer->up_last_alive = t;
961
962                 /* "consume" iov */
963                 iov = tx->tx_iov;
964                 do {
965                         LASSERT (tx->tx_niov > 0);
966
967                         if (nob < iov->iov_len) {
968                                 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
969                                 iov->iov_len -= nob;
970                                 break;
971                         }
972
973                         nob -= iov->iov_len;
974                         tx->tx_iov = ++iov;
975                         tx->tx_niov--;
976                 } while (nob != 0);
977
978         } while (tx->tx_resid != 0);
979
980         return 1; /* send complete */
981 }
982
983 /* Read from wire as much data as possible.
984  * Returns 0 or 1 on succsess, <0 if error or EOF.
985  * 0 means partial read, 1 - complete */
986 int
987 usocklnd_read_data(usock_conn_t *conn)
988 {
989         struct iovec *iov;
990         int           nob;
991         cfs_time_t    t;
992
993         LASSERT (conn->uc_rx_nob_wanted != 0);
994
995         do {
996                 usock_peer_t *peer = conn->uc_peer;
997
998                 LASSERT (conn->uc_rx_niov > 0);
999
1000                 nob = libcfs_sock_readv(conn->uc_sock,
1001                                         conn->uc_rx_iov, conn->uc_rx_niov);
1002                 if (nob <= 0) {/* read nothing or error */
1003                         if (nob < 0)
1004                                 conn->uc_errored = 1;
1005                         return nob;
1006                 }
1007
1008                 LASSERT (nob <= conn->uc_rx_nob_wanted);
1009                 conn->uc_rx_nob_wanted -= nob;
1010                 conn->uc_rx_nob_left -= nob;
1011                 t = cfs_time_current();
1012                 conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
1013
1014                 if(peer != NULL)
1015                         peer->up_last_alive = t;
1016
1017                 /* "consume" iov */
1018                 iov = conn->uc_rx_iov;
1019                 do {
1020                         LASSERT (conn->uc_rx_niov > 0);
1021
1022                         if (nob < iov->iov_len) {
1023                                 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
1024                                 iov->iov_len -= nob;
1025                                 break;
1026                         }
1027
1028                         nob -= iov->iov_len;
1029                         conn->uc_rx_iov = ++iov;
1030                         conn->uc_rx_niov--;
1031                 } while (nob != 0);
1032
1033         } while (conn->uc_rx_nob_wanted != 0);
1034
1035         return 1; /* read complete */
1036 }