Whamcloud - gitweb
b=12302
[fs/lustre-release.git] / lnet / ulnds / socklnd / handlers.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Maxim Patlasov <maxim@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  */
11
12 #include "usocklnd.h"
13 #include <unistd.h>
14 #include <syscall.h>
15
16 int
17 usocklnd_notifier_handler(int fd)
18 {
19         int notification;
20         return syscall(SYS_read, fd, &notification, sizeof(notification));
21 }
22
23 void
24 usocklnd_exception_handler(usock_conn_t *conn)
25 {
26         pthread_mutex_lock(&conn->uc_lock);
27
28         if (conn->uc_state == UC_CONNECTING ||
29             conn->uc_state == UC_SENDING_HELLO)
30                 usocklnd_conn_kill_locked(conn);
31         
32         pthread_mutex_unlock(&conn->uc_lock);                
33 }
34
35 int
36 usocklnd_read_handler(usock_conn_t *conn)
37 {
38         int rc;
39         int continue_reading;
40         int state;
41
42   read_again:
43         rc = 0;
44         pthread_mutex_lock(&conn->uc_lock);
45         state = conn->uc_state;
46         
47         /* process special case: LNET calls lnd_recv() asyncronously */
48         if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {
49                 /* still don't have usocklnd_recv() called */
50                 rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0);
51                 if (rc == 0)
52                         conn->uc_rx_state = UC_RX_PARSE_WAIT;
53                 else
54                         usocklnd_conn_kill_locked(conn);
55
56                 pthread_mutex_unlock(&conn->uc_lock);
57                 return rc;
58         }
59
60         pthread_mutex_unlock(&conn->uc_lock);
61         /* From here and below the conn cannot be changed
62          * asyncronously, except:
63          * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list,
64          * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */
65
66         switch (state) {
67                 
68         case UC_RECEIVING_HELLO:
69         case UC_READY:
70                 if (conn->uc_rx_nob_wanted != 0) {
71                         /* read from conn fd as much wanted data as possible */
72                         rc = usocklnd_read_data(conn);
73                         if (rc == 0) /* partial read */
74                                 break;
75                         if (rc < 0) {/* error happened or EOF */
76                                 usocklnd_conn_kill(conn);
77                                 break;
78                         }
79                 }
80
81                 /* process incoming data */
82                 if (state == UC_READY )
83                         rc = usocklnd_read_msg(conn, &continue_reading);
84                 else /* state == UC_RECEIVING_HELLO */
85                         rc = usocklnd_read_hello(conn, &continue_reading);
86
87                 if (rc < 0) {
88                         usocklnd_conn_kill(conn);
89                         break;
90                 }
91                 
92                 if (continue_reading)
93                         goto read_again;
94
95                 break;
96
97         case UC_DEAD:
98                 break;
99
100         default:
101                 LBUG();
102         }
103         
104         return rc;
105 }
106
107 /* Switch on rx_state.
108  * Return 0 on success, 1 if whole packet is read, else return <0
109  * Always set cont_flag: 1 if we're ready to continue reading, else 0
110  * NB: If whole packet is read, cont_flag will be set to zero to take
111  * care of fairess
112  */
113 int
114 usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
115 {
116         int   rc = 0;
117         __u64 cookie;
118         
119        *cont_flag = 0;
120
121         /* smth. new emerged in RX part - let's process it */
122         switch (conn->uc_rx_state) {
123         case UC_RX_KSM_HEADER:
124                 if (conn->uc_flip) {
125                         __swab32s(&conn->uc_rx_msg.ksm_type);
126                         __swab32s(&conn->uc_rx_msg.ksm_csum);
127                         __swab64s(&conn->uc_rx_msg.ksm_zc_req_cookie);
128                         __swab64s(&conn->uc_rx_msg.ksm_zc_ack_cookie);
129                 } 
130
131                 /* we never send packets for wich zc-acking is required */
132                 if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
133                     conn->uc_rx_msg.ksm_zc_ack_cookie != 0) {
134                         conn->uc_errored = 1;
135                         return -EPROTO;
136                 }
137
138                 /* zc_req will be processed later, when
139                    lnet payload will be received */
140
141                 usocklnd_rx_lnethdr_state_transition(conn);
142                 *cont_flag = 1;
143                 break;
144                 
145         case UC_RX_LNET_HEADER:
146                 if (the_lnet.ln_pid & LNET_PID_USERFLAG) {
147                         /* replace dest_nid,pid (ksocknal sets its own) */
148                         conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid =
149                                 cpu_to_le64(conn->uc_peer->up_ni->ni_nid);
150                         conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =
151                                 cpu_to_le32(the_lnet.ln_pid);
152                         
153                 } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) { 
154                         /* Userspace peer */
155                         lnet_process_id_t *id = &conn->uc_peer->up_peerid;
156                         lnet_hdr_t        *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;
157                         
158                         /* Substitute process ID assigned at connection time */
159                         lhdr->src_pid = cpu_to_le32(id->pid);
160                         lhdr->src_nid = cpu_to_le64(id->nid);
161                 }
162                 
163                 conn->uc_rx_state = UC_RX_PARSE;
164                 usocklnd_conn_addref(conn); /* ++ref while parsing */
165                 
166                 rc = lnet_parse(conn->uc_peer->up_ni, 
167                                 &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr, 
168                                 conn->uc_peerid.nid, conn, 0);
169                 
170                 if (rc < 0) {
171                         /* I just received garbage: give up on this conn */
172                         conn->uc_errored = 1;
173                         usocklnd_conn_decref(conn);
174                         return -EPROTO;
175                 }
176
177                 /* Race with usocklnd_recv() is possible */
178                 pthread_mutex_lock(&conn->uc_lock);
179                 LASSERT (conn->uc_rx_state == UC_RX_PARSE ||
180                          conn->uc_rx_state == UC_RX_LNET_PAYLOAD);
181                 
182                 /* check whether usocklnd_recv() got called */
183                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)
184                         *cont_flag = 1;
185                 pthread_mutex_unlock(&conn->uc_lock);
186                 break;
187                 
188         case UC_RX_PARSE:
189                 LBUG(); /* it's error to be here, because this special
190                          * case is handled by caller */
191                 break;
192                 
193         case UC_RX_PARSE_WAIT:
194                 LBUG(); /* it's error to be here, because the conn
195                          * shouldn't wait for POLLIN event in this
196                          * state */
197                 break;
198                 
199         case UC_RX_LNET_PAYLOAD:
200                 /* payload all received */
201
202                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
203
204                 cookie = conn->uc_rx_msg.ksm_zc_req_cookie;
205                 if (cookie != 0)
206                         rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
207                 
208                 if (rc != 0) {
209                         /* change state not to finalize twice */
210                         conn->uc_rx_state = UC_RX_KSM_HEADER;
211                         return -EPROTO;
212                 }
213                 
214                 /* Fall through */
215                 
216         case UC_RX_SKIPPING:
217                 if (conn->uc_rx_nob_left != 0) {
218                         usocklnd_rx_skipping_state_transition(conn);
219                         *cont_flag = 1;
220                 } else {
221                         usocklnd_rx_ksmhdr_state_transition(conn);
222                         rc = 1; /* whole packet is read */
223                 }
224
225                 break;
226
227         default:
228                 LBUG(); /* unknown state */
229         }
230
231         return rc;
232 }
233
234 /* Handle incoming ZC request from sender.
235  * NB: it's called only from read_handler, so we're sure that
236  * the conn cannot become zombie in the middle of processing */
237 int
238 usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie)
239 {
240         usock_conn_t   *conn;
241         usock_zc_ack_t *zc_ack;
242         int             type;
243         int             rc;
244         int             dummy;
245
246         LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack));
247         if (zc_ack == NULL)
248                 return -ENOMEM;
249         zc_ack->zc_cookie = cookie;
250         
251         /* Let's assume that CONTROL is the best type for zcack,
252          * but userspace clients don't use typed connections */
253         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
254                 type = SOCKLND_CONN_ANY;
255         else
256                 type = SOCKLND_CONN_CONTROL;        
257
258         rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,
259                                           &dummy);
260         if (rc != 0) {
261                 LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
262                 return rc;
263         }
264         usocklnd_conn_decref(conn);
265
266         return 0;
267 }
268
269 /* Switch on rx_state.
270  * Return 0 on success, else return <0
271  * Always set cont_flag: 1 if we're ready to continue reading, else 0
272  */
273 int
274 usocklnd_read_hello(usock_conn_t *conn, int *cont_flag)
275 {
276         int                rc = 0;
277         ksock_hello_msg_t *hello = conn->uc_rx_hello;
278         
279         *cont_flag = 0;
280         
281         /* smth. new emerged in hello - let's process it */
282         switch (conn->uc_rx_state) {
283         case UC_RX_HELLO_MAGIC:
284                 if (hello->kshm_magic == LNET_PROTO_MAGIC)
285                         conn->uc_flip = 0;
286                 else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
287                         conn->uc_flip = 1;
288                 else
289                         return -EPROTO;
290
291                 usocklnd_rx_helloversion_state_transition(conn);
292                 *cont_flag = 1;
293                 break;                        
294
295         case UC_RX_HELLO_VERSION:
296                 if ((!conn->uc_flip &&
297                      (hello->kshm_version != KSOCK_PROTO_V2)) ||
298                     (conn->uc_flip &&
299                      (hello->kshm_version != __swab32(KSOCK_PROTO_V2))))
300                         return -EPROTO;
301
302                 usocklnd_rx_hellobody_state_transition(conn);
303                 *cont_flag = 1;
304                 break;
305                 
306         case UC_RX_HELLO_BODY:
307                 if (conn->uc_flip) {
308                         ksock_hello_msg_t *hello = conn->uc_rx_hello;
309                         __swab32s(&hello->kshm_src_pid);
310                         __swab64s(&hello->kshm_src_nid);
311                         __swab32s(&hello->kshm_dst_pid);
312                         __swab64s(&hello->kshm_dst_nid);
313                         __swab64s(&hello->kshm_src_incarnation);
314                         __swab64s(&hello->kshm_dst_incarnation);
315                         __swab32s(&hello->kshm_ctype);
316                         __swab32s(&hello->kshm_nips);
317                 }
318
319                 if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) {
320                         CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n",
321                                conn->uc_rx_hello->kshm_nips,
322                                HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);
323                         return -EPROTO;
324                 }
325                 
326                 if (conn->uc_rx_hello->kshm_nips) {                        
327                         usocklnd_rx_helloIPs_state_transition(conn);
328                         *cont_flag = 1;
329                         break;
330                 }
331                 /* fall through */
332
333         case UC_RX_HELLO_IPS:
334                 if (conn->uc_activeflag == 1) /* active conn */
335                         rc = usocklnd_activeconn_hellorecv(conn);
336                 else                          /* passive conn */
337                         rc = usocklnd_passiveconn_hellorecv(conn);
338                 
339                 break;                
340
341         default:
342                 LBUG(); /* unknown state */
343         }
344
345         return rc;
346 }
347
348 /* All actions that we need after receiving hello on active conn:
349  * 1) Schedule removing if we're zombie
350  * 2) Restart active conn if we lost the race
351  * 3) Else: update RX part to receive KSM header
352  */
353 int
354 usocklnd_activeconn_hellorecv(usock_conn_t *conn)
355 {
356         int                rc    = 0;
357         ksock_hello_msg_t *hello = conn->uc_rx_hello;
358         usock_peer_t      *peer  = conn->uc_peer;
359
360         /* Active conn with peer==NULL is zombie.
361          * Don't try to link it to peer because the conn
362          * has already had a chance to proceed at the beginning */
363         if (peer == NULL) {
364                 LASSERT(list_empty(&conn->uc_tx_list) &&
365                         list_empty(&conn->uc_zcack_list));
366
367                 usocklnd_conn_kill(conn);
368                 return 0;
369         }
370
371         peer->up_last_alive = cfs_time_current();
372         
373         /* peer says that we lost the race */
374         if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
375                 /* Start new active conn, relink txs and zc_acks from
376                  * the conn to new conn, schedule removing the conn.
377                  * Actually, we're expecting that a passive conn will
378                  * make us zombie soon and take care of our txs and
379                  * zc_acks */
380                  
381                 struct list_head tx_list, zcack_list;
382                 usock_conn_t *conn2;
383                 int idx = usocklnd_type2idx(conn->uc_type);
384
385                 CFS_INIT_LIST_HEAD (&tx_list);
386                 CFS_INIT_LIST_HEAD (&zcack_list);
387
388                 /* Block usocklnd_send() to check peer->up_conns[idx]
389                  * and to enqueue more txs */
390                 pthread_mutex_lock(&peer->up_lock);
391                 pthread_mutex_lock(&conn->uc_lock);
392
393                 /* usocklnd_shutdown() could kill us */
394                 if (conn->uc_state == UC_DEAD) {
395                         pthread_mutex_unlock(&conn->uc_lock);
396                         pthread_mutex_unlock(&peer->up_lock);
397                         return 0;
398                 }
399                 
400                 LASSERT (peer == conn->uc_peer);
401                 LASSERT (peer->up_conns[idx] == conn);
402
403                 rc = usocklnd_create_active_conn(peer, conn->uc_type, &conn2);
404                 if (rc) {
405                         conn->uc_errored = 1;
406                         pthread_mutex_unlock(&conn->uc_lock);
407                         pthread_mutex_unlock(&peer->up_lock);
408                         return rc;
409                 }
410                                 
411                 usocklnd_link_conn_to_peer(conn2, peer, idx);
412                 conn2->uc_peer = peer;
413                 
414                 /* unlink txs and zcack from the conn */
415                 list_add(&tx_list, &conn->uc_tx_list);
416                 list_del_init(&conn->uc_tx_list);
417                 list_add(&zcack_list, &conn->uc_zcack_list);
418                 list_del_init(&conn->uc_zcack_list);
419                 
420                 /* link they to the conn2 */
421                 list_add(&conn2->uc_tx_list, &tx_list);
422                 list_del_init(&tx_list);
423                 list_add(&conn2->uc_zcack_list, &zcack_list);
424                 list_del_init(&zcack_list);
425                 
426                 /* make conn zombie */
427                 conn->uc_peer = NULL;
428                 usocklnd_peer_decref(peer);
429
430                 /* schedule conn2 for processing */
431                 rc = usocklnd_add_pollrequest(conn2, POLL_ADD_REQUEST, POLLOUT);
432                 if (rc) {
433                         peer->up_conns[idx] = NULL;
434                         usocklnd_conn_decref(conn2); /* should destroy conn */
435                 } else {
436                         usocklnd_conn_kill_locked(conn);
437                 }
438                 
439                 pthread_mutex_unlock(&conn->uc_lock);                
440                 pthread_mutex_unlock(&peer->up_lock);
441                 usocklnd_conn_decref(conn);
442                 
443         } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */
444                 if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype))
445                         return -EPROTO;
446
447                 pthread_mutex_lock(&peer->up_lock);
448                 usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation,
449                                              conn);
450                 pthread_mutex_unlock(&peer->up_lock);
451                                 
452                 /* safely transit to UC_READY state */
453                 /* rc == 0 */
454                 pthread_mutex_lock(&conn->uc_lock);
455                 if (conn->uc_state != UC_DEAD) {
456                         usocklnd_rx_ksmhdr_state_transition(conn);
457
458                         /* POLLIN is already set because we just
459                          * received hello, but maybe we've smth. to
460                          * send? */
461                         LASSERT (conn->uc_sending == 0);
462                         if ( !list_empty(&conn->uc_tx_list) ||
463                              !list_empty(&conn->uc_zcack_list) ) {
464                                 
465                                 conn->uc_tx_deadline =
466                                         cfs_time_shift(usock_tuns.ut_timeout);
467                                 conn->uc_tx_flag = 1;
468                                 rc = usocklnd_add_pollrequest(conn,
469                                                               POLL_SET_REQUEST,
470                                                               POLLIN | POLLOUT);
471                         }
472
473                         if (rc == 0)
474                                 conn->uc_state = UC_READY;
475                 }
476                 pthread_mutex_unlock(&conn->uc_lock);                
477         }
478
479         return rc;
480 }
481
482 /* All actions that we need after receiving hello on passive conn:
483  * 1) Stash peer's nid, pid, incarnation and conn type
484  * 2) Cope with easy case: conn[idx] is empty - just save conn there
485  * 3) Resolve race:
486  *    a) if our nid is higher - reply with CONN_NONE and make us zombie
487  *    b) if peer's nid is higher - postpone race resolution till
488  *       READY state
489  * 4) Anyhow, send reply hello
490 */
491 int
492 usocklnd_passiveconn_hellorecv(usock_conn_t *conn)
493 {
494         ksock_hello_msg_t *hello = conn->uc_rx_hello;
495         int                type;
496         int                idx;
497         int                rc;
498         usock_peer_t      *peer;
499         lnet_ni_t         *ni        = conn->uc_ni;
500         __u32              peer_ip   = conn->uc_peer_ip;
501         __u16              peer_port = conn->uc_peer_port;
502
503         /* don't know parent peer yet and not zombie */
504         LASSERT (conn->uc_peer == NULL &&
505                  ni != NULL);
506         
507         /* don't know peer's nid and incarnation yet */
508         if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
509                 /* do not trust liblustre clients */
510                 conn->uc_peerid.pid = peer_port | LNET_PID_USERFLAG;
511                 conn->uc_peerid.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
512                                                  peer_ip);
513                 if (hello->kshm_ctype != SOCKLND_CONN_ANY) {
514                         lnet_ni_decref(ni);
515                         conn->uc_ni = NULL;                
516                         CERROR("Refusing to accept connection of type=%d from "
517                                "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype,
518                                HIPQUAD(peer_ip), peer_port);
519                         return -EINVAL;
520                 }
521         } else {
522                 conn->uc_peerid.pid = hello->kshm_src_pid;
523                 conn->uc_peerid.nid = hello->kshm_src_nid;
524         }
525         conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype);
526                 
527         rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer);
528         if (rc) {
529                 lnet_ni_decref(ni);
530                 conn->uc_ni = NULL;
531                 return rc;
532         }
533
534         peer->up_last_alive = cfs_time_current();
535         
536         idx = usocklnd_type2idx(conn->uc_type);
537
538         /* safely check whether we're first */
539         pthread_mutex_lock(&peer->up_lock);
540
541         usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL);
542         
543         if (peer->up_conns[idx] == NULL) {
544                 peer->up_last_alive = cfs_time_current();
545                 conn->uc_peer = peer;
546                 conn->uc_ni = NULL;
547                 usocklnd_link_conn_to_peer(conn, peer, idx);
548                 usocklnd_conn_addref(conn);
549         } else {
550                 usocklnd_peer_decref(peer);                                         
551
552                 /* Resolve race in favour of higher NID */
553                 if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) {
554                         /* make us zombie */
555                         conn->uc_ni = NULL;
556                         type = SOCKLND_CONN_NONE;
557                 }
558
559                 /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid,
560                  * postpone race resolution till READY state
561                  * (hopefully that conn[idx] will die because of
562                  * incoming hello of CONN_NONE type) */                 
563         }
564         pthread_mutex_unlock(&peer->up_lock);
565
566         /* allocate and initialize fake tx with hello */
567         conn->uc_tx_hello = usocklnd_create_hello_tx(ni, type,
568                                                      conn->uc_peerid.nid);
569         if (conn->uc_ni == NULL)
570                 lnet_ni_decref(ni);
571
572         if (conn->uc_tx_hello == NULL)
573                 return -ENOMEM;
574
575         /* rc == 0 */
576         pthread_mutex_lock(&conn->uc_lock);
577         if (conn->uc_state == UC_DEAD)
578                 goto passive_hellorecv_done;
579
580         conn->uc_state = UC_SENDING_HELLO;
581         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
582         conn->uc_tx_flag = 1;
583         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT);
584         
585   passive_hellorecv_done:
586         pthread_mutex_unlock(&conn->uc_lock);
587         return rc;        
588 }
589
590 int
591 usocklnd_write_handler(usock_conn_t *conn)
592 {
593         usock_tx_t   *tx;
594         int           ret;
595         int           rc = 0;
596         int           state;
597         usock_peer_t *peer;
598         lnet_ni_t    *ni;
599         
600         pthread_mutex_lock(&conn->uc_lock); /* like membar */
601         state = conn->uc_state;
602         pthread_mutex_unlock(&conn->uc_lock);
603
604         switch (state) {
605         case UC_CONNECTING:
606                 /* hello_tx has already been initialized
607                  * in usocklnd_create_active_conn() */
608                 usocklnd_conn_new_state(conn, UC_SENDING_HELLO);
609                 /* fall through */
610
611         case UC_SENDING_HELLO:
612                 rc = usocklnd_send_tx(conn, conn->uc_tx_hello);
613                 if (rc <= 0) /* error or partial send or connection closed */
614                         break;
615
616                 /* tx with hello was sent successfully */
617                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
618                 conn->uc_tx_hello = NULL;
619
620                 if (conn->uc_activeflag == 1) /* active conn */
621                         rc = usocklnd_activeconn_hellosent(conn);
622                 else                          /* passive conn */
623                         rc = usocklnd_passiveconn_hellosent(conn);
624                 
625                 break;                
626
627         case UC_READY:
628                 pthread_mutex_lock(&conn->uc_lock);
629
630                 peer = conn->uc_peer;
631                 LASSERT (peer != NULL);
632                 ni = peer->up_ni;
633
634                 if (list_empty(&conn->uc_tx_list) &&
635                     list_empty(&conn->uc_zcack_list)) {
636                         LASSERT(usock_tuns.ut_fair_limit > 1);
637                         pthread_mutex_unlock(&conn->uc_lock);
638                         return 0;
639                 }                
640
641                 tx = usocklnd_try_piggyback(&conn->uc_tx_list,
642                                             &conn->uc_zcack_list);
643                 if (tx != NULL)
644                         conn->uc_sending = 1;
645                 else
646                         rc = -ENOMEM;
647
648                 pthread_mutex_unlock(&conn->uc_lock);
649
650                 if (rc)
651                         break;
652                 
653                 rc = usocklnd_send_tx(conn, tx);
654                 if (rc == 0) { /* partial send or connection closed */
655                         pthread_mutex_lock(&conn->uc_lock);
656                         list_add(&tx->tx_list, &conn->uc_tx_list);
657                         conn->uc_sending = 0;
658                         pthread_mutex_unlock(&conn->uc_lock);
659                         break;
660                 }
661                 if (rc < 0) { /* real error */
662                         usocklnd_destroy_tx(ni, tx);
663                         break;
664                 }
665
666                 /* rc == 1: tx was sent completely */
667                 usocklnd_destroy_tx(ni, tx);
668
669                 pthread_mutex_lock(&conn->uc_lock);
670                 conn->uc_sending = 0;
671                 if (conn->uc_state != UC_DEAD &&
672                     list_empty(&conn->uc_tx_list) &&
673                     list_empty(&conn->uc_zcack_list)) {
674                         conn->uc_tx_flag = 0;
675                         ret = usocklnd_add_pollrequest(conn,
676                                                       POLL_TX_SET_REQUEST, 0);
677                         if (ret)
678                                 rc = ret;
679                 }
680                 pthread_mutex_unlock(&conn->uc_lock);
681                 
682                 break;
683
684         case UC_DEAD:
685                 break;
686
687         default:
688                 LBUG();
689         }
690
691         if (rc < 0)
692                 usocklnd_conn_kill(conn);
693         
694         return rc;
695 }
696
697 /* Return the first tx from tx_list with piggybacked zc_ack
698  * from zcack_list when possible. If tx_list is empty, return
699  * brand new noop tx for zc_ack from zcack_list. Return NULL
700  * if an error happened */
701 usock_tx_t *
702 usocklnd_try_piggyback(struct list_head *tx_list_p,
703                        struct list_head *zcack_list_p)
704 {
705         usock_tx_t     *tx;
706         usock_zc_ack_t *zc_ack;
707
708         /* assign tx and zc_ack */
709         if (list_empty(tx_list_p))
710                 tx = NULL;
711         else {
712                 tx = list_entry(tx_list_p->next, usock_tx_t, tx_list);
713                 list_del(&tx->tx_list);
714
715                 /* already piggybacked or partially send */
716                 if (tx->tx_msg.ksm_zc_ack_cookie ||
717                     tx->tx_resid != tx->tx_nob)
718                         return tx;
719         }
720                 
721         if (list_empty(zcack_list_p)) {
722                 /* nothing to piggyback */
723                 return tx;
724         } else {
725                 zc_ack = list_entry(zcack_list_p->next,
726                                     usock_zc_ack_t, zc_list);
727                 list_del(&zc_ack->zc_list);
728         }                        
729                 
730         if (tx != NULL)
731                 /* piggyback the zc-ack cookie */
732                 tx->tx_msg.ksm_zc_ack_cookie = zc_ack->zc_cookie;
733         else
734                 /* cannot piggyback, need noop */
735                 tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);                     
736         
737         LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
738         return tx;
739 }
740
741 /* All actions that we need after sending hello on active conn:
742  * 1) update RX iov to receive hello
743  * 2) state transition to UC_RECEIVING_HELLO
744  * 3) notify poll_thread that we're waiting for incoming hello */
745 int
746 usocklnd_activeconn_hellosent(usock_conn_t *conn)
747 {
748         int rc = 0;
749         
750         pthread_mutex_lock(&conn->uc_lock);
751
752         if (conn->uc_state != UC_DEAD) {
753                 usocklnd_rx_hellomagic_state_transition(conn);
754                 conn->uc_state = UC_RECEIVING_HELLO;
755                 conn->uc_tx_flag = 0;
756                 rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN);
757         }
758
759         pthread_mutex_unlock(&conn->uc_lock);
760
761         return rc;
762 }
763
764 /* All actions that we need after sending hello on passive conn:
765  * 1) Cope with 1st easy case: conn is already linked to a peer
766  * 2) Cope with 2nd easy case: remove zombie conn
767   * 3) Resolve race:
768  *    a) find the peer
769  *    b) link the conn to the peer if conn[idx] is empty
770  *    c) if the conn[idx] isn't empty and is in READY state,
771  *       remove the conn as duplicated
772  *    d) if the conn[idx] isn't empty and isn't in READY state,
773  *       override conn[idx] with the conn
774  */
775 int
776 usocklnd_passiveconn_hellosent(usock_conn_t *conn)
777 {
778         usock_conn_t    *conn2;
779         usock_peer_t    *peer;
780         struct list_head tx_list;
781         struct list_head zcack_list;
782         int              idx;
783         int              rc = 0;
784
785         /* almost nothing to do if conn is already linked to peer hash table */
786         if (conn->uc_peer != NULL)
787                 goto passive_hellosent_done;
788
789         /* conn->uc_peer == NULL, so the conn isn't accessible via
790          * peer hash list, so nobody can touch the conn but us */
791         
792         if (conn->uc_ni == NULL) /* remove zombie conn */
793                 goto passive_hellosent_connkill;
794         
795         /* all code below is race resolution, because normally
796          * passive conn is linked to peer just after receiving hello */
797         CFS_INIT_LIST_HEAD (&tx_list);
798         CFS_INIT_LIST_HEAD (&zcack_list);
799         
800         /* conn is passive and isn't linked to any peer,
801            so its tx and zc_ack lists have to be empty */
802         LASSERT (list_empty(&conn->uc_tx_list) &&
803                  list_empty(&conn->uc_zcack_list) &&
804                  conn->uc_sending == 0);
805
806         rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer);
807         if (rc)
808                 return rc;
809
810         idx = usocklnd_type2idx(conn->uc_type);                        
811
812         /* try to link conn to peer */
813         pthread_mutex_lock(&peer->up_lock);        
814         if (peer->up_conns[idx] == NULL) {
815                 usocklnd_link_conn_to_peer(conn, peer, idx);
816                 usocklnd_conn_addref(conn);
817                 conn->uc_peer = peer;
818                 usocklnd_peer_addref(peer);
819         } else {
820                 conn2 = peer->up_conns[idx];
821                 pthread_mutex_lock(&conn2->uc_lock);
822
823                 if (conn2->uc_state == UC_READY) {
824                         /* conn2 is in READY state, so conn is "duplicated" */
825                         pthread_mutex_unlock(&conn2->uc_lock);
826                         pthread_mutex_unlock(&peer->up_lock);
827                         usocklnd_peer_decref(peer);
828                         goto passive_hellosent_connkill;
829                 }
830
831                 /* uc_state != UC_READY => switch conn and conn2 */
832                 /* Relink txs and zc_acks from conn2 to conn.
833                  * We're sure that nobody but us can access to conn,
834                  * nevertheless we use mutex (if we're wrong yet,
835                  * deadlock is easy to see that corrupted list */
836                 list_add(&tx_list, &conn2->uc_tx_list);
837                 list_del_init(&conn2->uc_tx_list);
838                 list_add(&zcack_list, &conn2->uc_zcack_list);
839                 list_del_init(&conn2->uc_zcack_list);
840         
841                 pthread_mutex_lock(&conn->uc_lock);
842                 list_add_tail(&conn->uc_tx_list, &tx_list);
843                 list_del_init(&tx_list);
844                 list_add_tail(&conn->uc_zcack_list, &zcack_list);
845                 list_del_init(&zcack_list);
846                 conn->uc_peer = peer;
847                 pthread_mutex_unlock(&conn->uc_lock);
848                 
849                 conn2->uc_peer = NULL; /* make conn2 zombie */
850                 pthread_mutex_unlock(&conn2->uc_lock);
851                 usocklnd_conn_decref(conn2);
852
853                 usocklnd_link_conn_to_peer(conn, peer, idx);
854                 usocklnd_conn_addref(conn);
855                 conn->uc_peer = peer;
856         }
857
858         lnet_ni_decref(conn->uc_ni);
859         conn->uc_ni = NULL;
860         pthread_mutex_unlock(&peer->up_lock);
861         usocklnd_peer_decref(peer);
862
863   passive_hellosent_done:        
864         /* safely transit to UC_READY state */
865         /* rc == 0 */
866         pthread_mutex_lock(&conn->uc_lock);
867         if (conn->uc_state != UC_DEAD) {
868                 usocklnd_rx_ksmhdr_state_transition(conn);
869
870                 /* we're ready to recive incoming packets and maybe
871                    already have smth. to transmit */
872                 LASSERT (conn->uc_sending == 0);
873                 if ( list_empty(&conn->uc_tx_list) &&
874                      list_empty(&conn->uc_zcack_list) ) {
875                         conn->uc_tx_flag = 0;
876                         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
877                                                  POLLIN);
878                 } else {
879                         conn->uc_tx_deadline =
880                                 cfs_time_shift(usock_tuns.ut_timeout);
881                         conn->uc_tx_flag = 1;
882                         rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
883                                                       POLLIN | POLLOUT);
884                 }
885
886                 if (rc == 0)
887                         conn->uc_state = UC_READY;
888         }
889         pthread_mutex_unlock(&conn->uc_lock);
890         return rc;
891
892   passive_hellosent_connkill:
893         usocklnd_conn_kill(conn);
894         return 0;
895 }
896
897 /* Send as much tx data as possible.
898  * Returns 0 or 1 on succsess, <0 if fatal error.
899  * 0 means partial send or non-fatal error, 1 - complete.
900  * Rely on libcfs_sock_writev() for differentiating fatal and
901  * non-fatal errors. An error should be considered as non-fatal if:
902  * 1) it still makes sense to continue reading &&
903  * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
904 int
905 usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx)
906 {
907         struct iovec *iov;
908         int           nob;
909         int           fd = conn->uc_fd;
910         cfs_time_t    t;
911         
912         LASSERT (tx->tx_resid != 0);
913
914         do {
915                 usock_peer_t *peer = conn->uc_peer;
916
917                 LASSERT (tx->tx_niov > 0);
918                 
919                 nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov);
920                 if (nob < 0)
921                         conn->uc_errored = 1;
922                 if (nob <= 0) /* write queue is flow-controlled or error */
923                         return nob;
924                 
925                 LASSERT (nob <= tx->tx_resid); 
926                 tx->tx_resid -= nob;
927                 t = cfs_time_current();
928                 conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
929
930                 if(peer != NULL)
931                         peer->up_last_alive = t;
932
933                 /* "consume" iov */ 
934                 iov = tx->tx_iov;
935                 do { 
936                         LASSERT (tx->tx_niov > 0); 
937                         
938                         if (nob < iov->iov_len) { 
939                                 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
940                                 iov->iov_len -= nob; 
941                                 break; 
942                         } 
943
944                         nob -= iov->iov_len; 
945                         tx->tx_iov = ++iov; 
946                         tx->tx_niov--; 
947                 } while (nob != 0);
948                 
949         } while (tx->tx_resid != 0);
950
951         return 1; /* send complete */
952 }
953
954 /* Read from wire as much data as possible.
955  * Returns 0 or 1 on succsess, <0 if error or EOF.
956  * 0 means partial read, 1 - complete */
957 int
958 usocklnd_read_data(usock_conn_t *conn)
959 {
960         struct iovec *iov;
961         int           nob;
962         cfs_time_t    t;
963
964         LASSERT (conn->uc_rx_nob_wanted != 0);
965
966         do {
967                 usock_peer_t *peer = conn->uc_peer;
968                 
969                 LASSERT (conn->uc_rx_niov > 0);
970                 
971                 nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov);                
972                 if (nob <= 0) {/* read nothing or error */
973                         conn->uc_errored = 1;
974                         return nob;
975                 }
976                 
977                 LASSERT (nob <= conn->uc_rx_nob_wanted); 
978                 conn->uc_rx_nob_wanted -= nob;
979                 conn->uc_rx_nob_left -= nob;
980                 t = cfs_time_current();
981                 conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
982
983                 if(peer != NULL)
984                         peer->up_last_alive = t;
985                 
986                 /* "consume" iov */ 
987                 iov = conn->uc_rx_iov;
988                 do { 
989                         LASSERT (conn->uc_rx_niov > 0); 
990                         
991                         if (nob < iov->iov_len) { 
992                                 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); 
993                                 iov->iov_len -= nob; 
994                                 break; 
995                         } 
996
997                         nob -= iov->iov_len; 
998                         conn->uc_rx_iov = ++iov;
999                         conn->uc_rx_niov--; 
1000                 } while (nob != 0);
1001                 
1002         } while (conn->uc_rx_nob_wanted != 0);
1003
1004         return 1; /* read complete */
1005 }