Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/conn.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42
43 /* Return 1 if the conn is timed out, 0 else */
44 int
45 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
46 {
47         if (conn->uc_tx_flag && /* sending is in progress */
48             cfs_time_aftereq(current_time, conn->uc_tx_deadline))
49                 return 1;
50
51         if (conn->uc_rx_flag && /* receiving is in progress */
52             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
53                 return 1;
54         
55         return 0;
56 }
57
58 void
59 usocklnd_conn_kill(usock_conn_t *conn)
60 {
61         pthread_mutex_lock(&conn->uc_lock);
62         if (conn->uc_state != UC_DEAD)
63                 usocklnd_conn_kill_locked(conn);
64         pthread_mutex_unlock(&conn->uc_lock);        
65 }
66
67 /* Mark the conn as DEAD and schedule its deletion */
68 void
69 usocklnd_conn_kill_locked(usock_conn_t *conn)
70 {
71         conn->uc_rx_flag = conn->uc_tx_flag = 0;
72         conn->uc_state = UC_DEAD;
73         usocklnd_add_killrequest(conn);
74 }
75
76 usock_conn_t *
77 usocklnd_conn_allocate()
78 {
79         usock_conn_t        *conn;
80         usock_pollrequest_t *pr;
81
82         LIBCFS_ALLOC (pr, sizeof(*pr));
83         if (pr == NULL)
84                 return NULL;
85         
86         LIBCFS_ALLOC (conn, sizeof(*conn));
87         if (conn == NULL) {
88                 LIBCFS_FREE (pr, sizeof(*pr));
89                 return NULL;
90         }
91         memset(conn, 0, sizeof(*conn));
92         conn->uc_preq = pr;
93
94         LIBCFS_ALLOC (conn->uc_rx_hello,
95                       offsetof(ksock_hello_msg_t,
96                                kshm_ips[LNET_MAX_INTERFACES]));
97         if (conn->uc_rx_hello == NULL) {
98                 LIBCFS_FREE (pr, sizeof(*pr));
99                 LIBCFS_FREE (conn, sizeof(*conn));
100                 return NULL;
101         }
102
103         return conn;
104 }
105
106 void
107 usocklnd_conn_free(usock_conn_t *conn)
108 {
109         usock_pollrequest_t *pr = conn->uc_preq;
110
111         if (pr != NULL)
112                 LIBCFS_FREE (pr, sizeof(*pr));
113
114         if (conn->uc_rx_hello != NULL)
115                 LIBCFS_FREE (conn->uc_rx_hello,
116                              offsetof(ksock_hello_msg_t,
117                                       kshm_ips[LNET_MAX_INTERFACES]));
118         
119         LIBCFS_FREE (conn, sizeof(*conn));
120 }
121
122 void
123 usocklnd_tear_peer_conn(usock_conn_t *conn)
124 {
125         usock_peer_t     *peer = conn->uc_peer;
126         int               idx = usocklnd_type2idx(conn->uc_type);
127         lnet_ni_t        *ni;
128         lnet_process_id_t id;
129         int               decref_flag  = 0;
130         int               killall_flag = 0;
131         
132         if (peer == NULL) /* nothing to tear */
133                 return;
134         
135         pthread_mutex_lock(&peer->up_lock);
136         pthread_mutex_lock(&conn->uc_lock);        
137
138         ni = peer->up_ni;
139         id = peer->up_peerid;
140
141         if (peer->up_conns[idx] == conn) {
142                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
143                         /* change state not to finalize twice */
144                         conn->uc_rx_state = UC_RX_KSM_HEADER;
145                         lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);                        
146                 }
147                 
148                 usocklnd_destroy_txlist(peer->up_ni,
149                                         &conn->uc_tx_list);
150
151                 peer->up_conns[idx] = NULL;
152                 conn->uc_peer = NULL;
153                 decref_flag = 1;
154
155                 if(conn->uc_errored && !peer->up_errored)
156                         peer->up_errored = killall_flag = 1;
157         }
158         
159         pthread_mutex_unlock(&conn->uc_lock);
160
161         if (killall_flag)
162                 usocklnd_del_conns_locked(peer);
163
164         pthread_mutex_unlock(&peer->up_lock);
165         
166         if (!decref_flag)
167                 return;
168
169         usocklnd_conn_decref(conn);
170         usocklnd_peer_decref(peer);
171
172         usocklnd_check_peer_stale(ni, id);
173 }
174
175 /* Remove peer from hash list if all up_conns[i] is NULL &&
176  * hash table is the only consumer of the peer */
177 void
178 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
179 {
180         usock_peer_t *peer;
181         
182         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
183         peer = usocklnd_find_peer_locked(ni, id);
184
185         if (peer == NULL) {
186                 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
187                 return;
188         }
189
190         if (cfs_atomic_read(&peer->up_refcount) == 2) {
191                 int i;
192                 for (i = 0; i < N_CONN_TYPES; i++)
193                         LASSERT (peer->up_conns[i] == NULL);
194
195                 list_del(&peer->up_list);                        
196                 
197                 if (peer->up_errored &&
198                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
199                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
200                                      cfs_time_seconds(peer->up_last_alive));
201                 
202                 usocklnd_peer_decref(peer);
203         }
204
205         usocklnd_peer_decref(peer);
206         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
207 }
208
209 /* Returns 0 on success, <0 else */
210 int
211 usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
212 {
213         int           rc;
214         __u32         peer_ip;
215         __u16         peer_port;
216         usock_conn_t *conn;
217
218         rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
219         if (rc)
220                 return rc;
221
222         rc = usocklnd_set_sock_options(fd);
223         if (rc)
224                 return rc;
225
226         conn = usocklnd_conn_allocate();
227         if (conn == NULL)
228                 return -ENOMEM;
229
230         usocklnd_rx_hellomagic_state_transition(conn);
231         
232         conn->uc_fd = fd;
233         conn->uc_peer_ip = peer_ip;
234         conn->uc_peer_port = peer_port;
235         conn->uc_state = UC_RECEIVING_HELLO;
236         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
237         conn->uc_ni = ni;
238         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
239         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
240         pthread_mutex_init(&conn->uc_lock, NULL);
241         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
242
243         *connp = conn;
244         return 0;
245 }
246
247 /* Returns 0 on success, <0 else */
248 int
249 usocklnd_create_active_conn(usock_peer_t *peer, int type,
250                             usock_conn_t **connp)
251 {
252         int           rc;
253         int           fd;
254         usock_conn_t *conn;
255         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
256         __u16         dst_port = lnet_acceptor_port();
257         
258         conn = usocklnd_conn_allocate();
259         if (conn == NULL)
260                 return -ENOMEM;
261
262         conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
263                                                         peer->up_peerid.nid);
264         if (conn->uc_tx_hello == NULL) {
265                 usocklnd_conn_free(conn);
266                 return -ENOMEM;
267         }                
268         
269         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
270                 rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
271         else
272                 rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
273         
274         if (rc) {
275                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
276                 usocklnd_conn_free(conn);
277                 return rc;
278         }
279         
280         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
281         conn->uc_tx_flag = 1;
282         
283         conn->uc_fd = fd;
284         conn->uc_peer_ip = dst_ip;
285         conn->uc_peer_port = dst_port;
286         conn->uc_type = type;
287         conn->uc_activeflag = 1;
288         conn->uc_state = UC_CONNECTING;
289         conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
290         conn->uc_ni = NULL;
291         conn->uc_peerid = peer->up_peerid;
292         conn->uc_peer = peer;
293         usocklnd_peer_addref(peer);
294         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
295         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
296         pthread_mutex_init(&conn->uc_lock, NULL);
297         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
298
299         *connp = conn;
300         return 0;
301 }
302
303 /* Returns 0 on success, <0 else */
304 int
305 usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
306 {
307         __u16 port;
308         int   fd;
309         int   rc;
310
311         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; 
312              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; 
313              port--) {
314                 /* Iterate through reserved ports. */
315
316                 rc = libcfs_sock_create(&fd);
317                 if (rc)
318                         return rc;                        
319                                 
320                 rc = libcfs_sock_bind_to_port(fd, port);
321                 if (rc) {
322                         close(fd);
323                         continue;
324                 }
325
326                 rc = usocklnd_set_sock_options(fd);
327                 if (rc) {
328                         close(fd);
329                         return rc;
330                 }
331
332                 rc = libcfs_sock_connect(fd, dst_ip, dst_port);
333                 if (rc == 0) {
334                         *fdp = fd;
335                         return 0;
336                 }
337                 
338                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
339                         close(fd);
340                         return rc;
341                 }
342
343                 close(fd);
344         }
345
346         CERROR("Can't bind to any reserved port\n");
347         return rc;
348 }
349
350 /* Returns 0 on success, <0 else */
351 int
352 usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
353 {
354         int fd;
355         int rc;
356
357         rc = libcfs_sock_create(&fd);
358         if (rc)
359                 return rc;
360         
361         rc = usocklnd_set_sock_options(fd);
362         if (rc) {
363                 close(fd);
364                 return rc;
365         }
366
367         rc = libcfs_sock_connect(fd, dst_ip, dst_port);
368         if (rc) {
369                 close(fd);
370                 return rc;
371         }
372
373         *fdp = fd;
374         return 0;
375 }
376
377 int
378 usocklnd_set_sock_options(int fd)
379 {
380         int rc;
381
382         rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
383         if (rc)
384                 return rc;
385
386         if (usock_tuns.ut_sockbufsiz) {
387                 rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
388                 if (rc)
389                         return rc;        
390         }
391         
392         return libcfs_fcntl_nonblock(fd);
393 }
394
395 void
396 usocklnd_init_msg(ksock_msg_t *msg, int type)
397 {
398         msg->ksm_type           = type;
399         msg->ksm_csum           = 0;
400         msg->ksm_zc_req_cookie  = 0;
401         msg->ksm_zc_ack_cookie  = 0;
402 }
403
404 usock_tx_t *
405 usocklnd_create_noop_tx(__u64 cookie)
406 {
407         usock_tx_t *tx;
408         
409         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
410         if (tx == NULL)
411                 return NULL;
412
413         tx->tx_size = sizeof(usock_tx_t);
414         tx->tx_lnetmsg = NULL;
415
416         usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
417         tx->tx_msg.ksm_zc_ack_cookie = cookie;
418         
419         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
420         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
421                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
422         tx->tx_iov = tx->tx_iova;
423         tx->tx_niov = 1;
424         
425         return tx;
426 }
427         
428 usock_tx_t *
429 usocklnd_create_tx(lnet_msg_t *lntmsg)
430 {
431         usock_tx_t   *tx;
432         unsigned int  payload_niov = lntmsg->msg_niov; 
433         struct iovec *payload_iov = lntmsg->msg_iov; 
434         unsigned int  payload_offset = lntmsg->msg_offset;
435         unsigned int  payload_nob = lntmsg->msg_len;
436         int           size = offsetof(usock_tx_t,
437                                       tx_iova[1 + payload_niov]);
438
439         LIBCFS_ALLOC (tx, size);
440         if (tx == NULL)
441                 return NULL;
442
443         tx->tx_size = size;
444         tx->tx_lnetmsg = lntmsg;
445
446         tx->tx_resid = tx->tx_nob =
447                 offsetof(ksock_msg_t,  ksm_u.lnetmsg.ksnm_payload) +
448                 payload_nob;
449         
450         usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
451         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
452         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
453         tx->tx_iova[0].iov_len = offsetof(ksock_msg_t,
454                                           ksm_u.lnetmsg.ksnm_payload);
455         tx->tx_iov = tx->tx_iova;
456
457         tx->tx_niov = 1 + 
458                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
459                                  payload_niov, payload_iov,
460                                  payload_offset, payload_nob);
461
462         return tx;
463 }
464
465 void
466 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
467                         lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
468 {
469         usock_net_t *net = (usock_net_t *)ni->ni_data;
470
471         hello->kshm_magic       = LNET_PROTO_MAGIC;
472         hello->kshm_version     = KSOCK_PROTO_V2;
473         hello->kshm_nips        = 0;
474         hello->kshm_ctype       = type;
475         
476         hello->kshm_dst_incarnation = 0; /* not used */
477         hello->kshm_src_incarnation = net->un_incarnation;
478
479         hello->kshm_src_pid = the_lnet.ln_pid;
480         hello->kshm_src_nid = ni->ni_nid;
481         hello->kshm_dst_nid = peer_nid;
482         hello->kshm_dst_pid = 0; /* not used */
483 }
484
485 usock_tx_t *
486 usocklnd_create_hello_tx(lnet_ni_t *ni,
487                          int type, lnet_nid_t peer_nid)
488 {
489         usock_tx_t        *tx;
490         int                size;
491         ksock_hello_msg_t *hello;
492
493         size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
494         LIBCFS_ALLOC (tx, size);
495         if (tx == NULL)
496                 return NULL;
497
498         tx->tx_size = size;
499         tx->tx_lnetmsg = NULL;
500
501         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
502         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
503         
504         tx->tx_iova[0].iov_base = (void *)hello;
505         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
506                 offsetof(ksock_hello_msg_t, kshm_ips);
507         tx->tx_iov = tx->tx_iova;
508         tx->tx_niov = 1;
509
510         return tx;
511 }
512
513 usock_tx_t *
514 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
515                             int type, lnet_nid_t peer_nid)
516 {
517         usock_tx_t              *tx;
518         int                      size;
519         lnet_acceptor_connreq_t *cr;
520         ksock_hello_msg_t       *hello;
521
522         size = sizeof(usock_tx_t) +
523                 sizeof(lnet_acceptor_connreq_t) +
524                 offsetof(ksock_hello_msg_t, kshm_ips);
525         LIBCFS_ALLOC (tx, size);
526         if (tx == NULL)
527                 return NULL;
528
529         tx->tx_size = size;
530         tx->tx_lnetmsg = NULL;
531
532         cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
533         memset(cr, 0, sizeof(*cr));
534         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
535         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
536         cr->acr_nid     = peer_nid;
537         
538         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
539         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
540         
541         tx->tx_iova[0].iov_base = (void *)cr;
542         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
543                 sizeof(lnet_acceptor_connreq_t) +
544                 offsetof(ksock_hello_msg_t, kshm_ips);
545         tx->tx_iov = tx->tx_iova;
546         tx->tx_niov = 1;
547
548         return tx;
549 }
550
551 void
552 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
553 {
554         lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
555         int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
556
557         LASSERT (ni != NULL || lnetmsg == NULL);
558
559         LIBCFS_FREE (tx, tx->tx_size);
560         
561         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
562                 lnet_finalize(ni, lnetmsg, rc);
563 }
564
565 void
566 usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
567 {
568         usock_tx_t *tx;
569
570         while (!list_empty(txlist)) {
571                 tx = list_entry(txlist->next, usock_tx_t, tx_list);
572                 list_del(&tx->tx_list);
573                 
574                 usocklnd_destroy_tx(ni, tx);
575         }
576 }
577
578 void
579 usocklnd_destroy_zcack_list(struct list_head *zcack_list)
580 {
581         usock_zc_ack_t *zcack;
582
583         while (!list_empty(zcack_list)) {
584                 zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
585                 list_del(&zcack->zc_list);
586                 
587                 LIBCFS_FREE (zcack, sizeof(*zcack));
588         }
589 }
590
591 void
592 usocklnd_destroy_peer(usock_peer_t *peer)
593 {
594         usock_net_t *net = peer->up_ni->ni_data;
595         int          i;
596
597         for (i = 0; i < N_CONN_TYPES; i++)
598                 LASSERT (peer->up_conns[i] == NULL);
599
600         LIBCFS_FREE (peer, sizeof (*peer));
601
602         pthread_mutex_lock(&net->un_lock);
603         if(--net->un_peercount == 0)                
604                 pthread_cond_signal(&net->un_cond);
605         pthread_mutex_unlock(&net->un_lock);
606 }
607
608 void
609 usocklnd_destroy_conn(usock_conn_t *conn)
610 {
611         LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
612
613         if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
614                 LASSERT (conn->uc_peer != NULL);
615                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
616         }
617
618         if (!list_empty(&conn->uc_tx_list)) {
619                 LASSERT (conn->uc_peer != NULL);                
620                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
621         }
622
623         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
624         
625         if (conn->uc_peer != NULL)
626                 usocklnd_peer_decref(conn->uc_peer);
627
628         if (conn->uc_ni != NULL)
629                 lnet_ni_decref(conn->uc_ni);
630
631         if (conn->uc_tx_hello)
632                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
633
634         usocklnd_conn_free(conn);
635 }
636
637 int
638 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
639 {
640         int nob;
641
642         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
643                 return SOCKLND_CONN_ANY;
644
645         nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
646                 lntmsg->msg_len;
647         
648         if (nob >= usock_tuns.ut_min_bulk)
649                 return SOCKLND_CONN_BULK_OUT;
650         else
651                 return SOCKLND_CONN_CONTROL;
652 }
653
654 int usocklnd_type2idx(int type)
655 {
656         switch (type) {
657         case SOCKLND_CONN_ANY:
658         case SOCKLND_CONN_CONTROL:
659                 return 0;
660         case SOCKLND_CONN_BULK_IN:
661                 return 1;
662         case SOCKLND_CONN_BULK_OUT:
663                 return 2;
664         default:
665                 LBUG();
666         }
667 }
668
669 usock_peer_t *
670 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
671 {
672         struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
673         struct list_head *tmp;
674         usock_peer_t     *peer;
675
676         list_for_each (tmp, peer_list) {
677
678                 peer = list_entry (tmp, usock_peer_t, up_list);
679
680                 if (peer->up_ni != ni)
681                         continue;
682
683                 if (peer->up_peerid.nid != id.nid ||
684                     peer->up_peerid.pid != id.pid)
685                         continue;
686
687                 usocklnd_peer_addref(peer);
688                 return peer;
689         }
690         return (NULL);
691 }
692
693 int
694 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
695                      usock_peer_t **peerp)
696 {
697         usock_net_t  *net = ni->ni_data;
698         usock_peer_t *peer;
699         int           i;
700
701         LIBCFS_ALLOC (peer, sizeof (*peer));
702         if (peer == NULL)
703                 return -ENOMEM;
704
705         for (i = 0; i < N_CONN_TYPES; i++)
706                 peer->up_conns[i] = NULL;
707
708         peer->up_peerid       = id;
709         peer->up_ni           = ni;
710         peer->up_incrn_is_set = 0;
711         peer->up_errored      = 0;
712         peer->up_last_alive   = 0;
713         cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
714         pthread_mutex_init(&peer->up_lock, NULL);        
715
716         pthread_mutex_lock(&net->un_lock);
717         net->un_peercount++;        
718         pthread_mutex_unlock(&net->un_lock);
719
720         *peerp = peer;
721         return 0;
722 }
723
724 /* Safely create new peer if needed. Save result in *peerp.
725  * Returns 0 on success, <0 else */
726 int
727 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
728                              usock_peer_t **peerp)
729 {
730         int           rc;
731         usock_peer_t *peer;
732         usock_peer_t *peer2;
733         usock_net_t  *net = ni->ni_data;
734
735         pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
736         peer = usocklnd_find_peer_locked(ni, id);
737         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
738
739         if (peer != NULL)
740                 goto find_or_create_peer_done;
741
742         rc = usocklnd_create_peer(ni, id, &peer);
743         if (rc)
744                 return rc;
745         
746         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
747         peer2 = usocklnd_find_peer_locked(ni, id);
748         if (peer2 == NULL) {
749                 if (net->un_shutdown) {
750                         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
751                         usocklnd_peer_decref(peer); /* should destroy peer */
752                         CERROR("Can't create peer: network shutdown\n");
753                         return -ESHUTDOWN;
754                 }
755                 
756                 /* peer table will take 1 of my refs on peer */
757                 usocklnd_peer_addref(peer);
758                 list_add_tail (&peer->up_list,
759                                usocklnd_nid2peerlist(id.nid));
760         } else {
761                 usocklnd_peer_decref(peer); /* should destroy peer */
762                 peer = peer2;
763         }
764         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
765         
766   find_or_create_peer_done:        
767         *peerp = peer;
768         return 0;
769 }
770
771 /* NB: both peer and conn locks are held */
772 static int
773 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
774 {        
775         if (conn->uc_state == UC_READY &&
776             list_empty(&conn->uc_tx_list) &&
777             list_empty(&conn->uc_zcack_list) &&
778             !conn->uc_sending) {
779                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
780                                                   POLLOUT);
781                 if (rc != 0)
782                         return rc;
783         }                
784
785         list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
786         return 0;
787 }
788
789 /* NB: both peer and conn locks are held
790  * NB: if sending isn't in progress.  the caller *MUST* send tx
791  * immediately after we'll return */
792 static void
793 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
794                     int *send_immediately)
795 {        
796         if (conn->uc_state == UC_READY &&
797             list_empty(&conn->uc_tx_list) &&
798             list_empty(&conn->uc_zcack_list) &&
799             !conn->uc_sending) {
800                 conn->uc_sending = 1;
801                 *send_immediately = 1;
802                 return;
803         }                
804
805         *send_immediately = 0;
806         list_add_tail(&tx->tx_list, &conn->uc_tx_list);
807 }
808
809 /* Safely create new conn if needed. Save result in *connp.
810  * Returns 0 on success, <0 else */
811 int
812 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
813                              usock_conn_t **connp,
814                              usock_tx_t *tx, usock_zc_ack_t *zc_ack,
815                              int *send_immediately)
816 {
817         usock_conn_t *conn;
818         int           idx;
819         int           rc;
820         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
821         
822         if (userflag)
823                 type = SOCKLND_CONN_ANY;
824
825         idx = usocklnd_type2idx(type);
826         
827         pthread_mutex_lock(&peer->up_lock);
828         if (peer->up_conns[idx] != NULL) {
829                 conn = peer->up_conns[idx];
830                 LASSERT(conn->uc_type == type);
831         } else {
832                 if (userflag) {
833                         CERROR("Refusing to create a connection to "
834                                "userspace process %s\n",
835                                libcfs_id2str(peer->up_peerid));
836                         rc = -EHOSTUNREACH;
837                         goto find_or_create_conn_failed;
838                 }
839                 
840                 rc = usocklnd_create_active_conn(peer, type, &conn);
841                 if (rc) {
842                         peer->up_errored = 1;
843                         usocklnd_del_conns_locked(peer);
844                         goto find_or_create_conn_failed;
845                 }
846
847                 /* peer takes 1 of conn refcount */
848                 usocklnd_link_conn_to_peer(conn, peer, idx);
849                 
850                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
851                 if (rc) {
852                         peer->up_conns[idx] = NULL;
853                         usocklnd_conn_decref(conn); /* should destroy conn */
854                         goto find_or_create_conn_failed;
855                 }
856                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
857         }
858         
859         pthread_mutex_lock(&conn->uc_lock);
860         LASSERT(conn->uc_peer == peer);
861
862         LASSERT(tx == NULL || zc_ack == NULL);
863         if (tx != NULL) {
864                 usocklnd_enqueue_tx(conn, tx, send_immediately);
865         } else {
866                 rc = usocklnd_enqueue_zcack(conn, zc_ack);        
867                 if (rc != 0) {
868                         usocklnd_conn_kill_locked(conn);
869                         pthread_mutex_unlock(&conn->uc_lock);
870                         goto find_or_create_conn_failed;
871                 }
872         }
873         pthread_mutex_unlock(&conn->uc_lock);         
874
875         usocklnd_conn_addref(conn);
876         pthread_mutex_unlock(&peer->up_lock);
877
878         *connp = conn;
879         return 0;
880
881   find_or_create_conn_failed:
882         pthread_mutex_unlock(&peer->up_lock);
883         return rc;
884 }
885
886 void
887 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
888 {
889         peer->up_conns[idx] = conn;        
890         peer->up_errored    = 0; /* this new fresh conn will try
891                                   * revitalize even stale errored peer */
892 }
893
894 int
895 usocklnd_invert_type(int type)
896 {
897         switch (type)
898         {
899         case SOCKLND_CONN_ANY:
900         case SOCKLND_CONN_CONTROL:
901                 return (type);
902         case SOCKLND_CONN_BULK_IN:
903                 return SOCKLND_CONN_BULK_OUT;
904         case SOCKLND_CONN_BULK_OUT:
905                 return SOCKLND_CONN_BULK_IN;
906         default:
907                 return SOCKLND_CONN_NONE;
908         }
909 }
910
911 void
912 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
913 {
914         pthread_mutex_lock(&conn->uc_lock);
915         if (conn->uc_state != UC_DEAD)
916                 conn->uc_state = new_state;
917         pthread_mutex_unlock(&conn->uc_lock);
918 }
919
920 /* NB: peer is locked by caller */
921 void
922 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
923                              usock_conn_t *skip_conn)
924 {
925         int i;
926         
927         if (!peer->up_incrn_is_set) {
928                 peer->up_incarnation = incrn;
929                 peer->up_incrn_is_set = 1;
930                 return;
931         }
932
933         if (peer->up_incarnation == incrn)
934                 return;
935
936         peer->up_incarnation = incrn;
937         
938         for (i = 0; i < N_CONN_TYPES; i++) {
939                 usock_conn_t *conn = peer->up_conns[i];
940                 
941                 if (conn == NULL || conn == skip_conn)
942                         continue;
943
944                 pthread_mutex_lock(&conn->uc_lock);        
945                 LASSERT (conn->uc_peer == peer);
946                 conn->uc_peer = NULL;
947                 peer->up_conns[i] = NULL;
948                 if (conn->uc_state != UC_DEAD)
949                         usocklnd_conn_kill_locked(conn);                
950                 pthread_mutex_unlock(&conn->uc_lock);
951
952                 usocklnd_conn_decref(conn);
953                 usocklnd_peer_decref(peer);
954         }
955 }
956
957 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
958  * MAGIC part of hello and set uc_rx_state
959  */
960 void
961 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
962 {
963         LASSERT(conn->uc_rx_hello != NULL);
964
965         conn->uc_rx_niov = 1;
966         conn->uc_rx_iov = conn->uc_rx_iova;
967         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
968         conn->uc_rx_iov[0].iov_len =
969                 conn->uc_rx_nob_wanted =
970                 conn->uc_rx_nob_left =
971                 sizeof(conn->uc_rx_hello->kshm_magic);
972
973         conn->uc_rx_state = UC_RX_HELLO_MAGIC;
974
975         conn->uc_rx_flag = 1; /* waiting for incoming hello */
976         conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
977 }
978
979 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
980  * VERSION part of hello and set uc_rx_state
981  */
982 void
983 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
984 {
985         LASSERT(conn->uc_rx_hello != NULL);
986
987         conn->uc_rx_niov = 1;
988         conn->uc_rx_iov = conn->uc_rx_iova;
989         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
990         conn->uc_rx_iov[0].iov_len =
991                 conn->uc_rx_nob_wanted =
992                 conn->uc_rx_nob_left =
993                 sizeof(conn->uc_rx_hello->kshm_version);
994         
995         conn->uc_rx_state = UC_RX_HELLO_VERSION;
996 }
997
998 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
999  * the rest  of hello and set uc_rx_state
1000  */
1001 void
1002 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
1003 {
1004         LASSERT(conn->uc_rx_hello != NULL);
1005
1006         conn->uc_rx_niov = 1;
1007         conn->uc_rx_iov = conn->uc_rx_iova;
1008         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
1009         conn->uc_rx_iov[0].iov_len =
1010                 conn->uc_rx_nob_wanted =
1011                 conn->uc_rx_nob_left =
1012                 offsetof(ksock_hello_msg_t, kshm_ips) -
1013                 offsetof(ksock_hello_msg_t, kshm_src_nid);
1014         
1015         conn->uc_rx_state = UC_RX_HELLO_BODY;
1016 }
1017
1018 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1019  * array of IPs and set uc_rx_state
1020  */
1021 void
1022 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1023 {
1024         LASSERT(conn->uc_rx_hello != NULL);
1025
1026         conn->uc_rx_niov = 1;
1027         conn->uc_rx_iov = conn->uc_rx_iova;
1028         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1029         conn->uc_rx_iov[0].iov_len =
1030                 conn->uc_rx_nob_wanted =
1031                 conn->uc_rx_nob_left =
1032                 conn->uc_rx_hello->kshm_nips *
1033                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1034         
1035         conn->uc_rx_state = UC_RX_HELLO_IPS;
1036 }
1037
1038 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1039  * LNET header and set uc_rx_state
1040  */
1041 void
1042 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1043 {
1044         conn->uc_rx_niov = 1;
1045         conn->uc_rx_iov = conn->uc_rx_iova;
1046         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;                
1047         conn->uc_rx_iov[0].iov_len =
1048                 conn->uc_rx_nob_wanted =
1049                 conn->uc_rx_nob_left =
1050                 sizeof(ksock_lnet_msg_t);
1051         
1052         conn->uc_rx_state = UC_RX_LNET_HEADER;
1053         conn->uc_rx_flag = 1;
1054 }
1055
1056 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1057  * KSM header and set uc_rx_state
1058  */
1059 void
1060 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1061 {
1062         conn->uc_rx_niov = 1;
1063         conn->uc_rx_iov = conn->uc_rx_iova;
1064         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;                
1065         conn->uc_rx_iov[0].iov_len =
1066                 conn->uc_rx_nob_wanted =
1067                 conn->uc_rx_nob_left =                        
1068                 offsetof(ksock_msg_t, ksm_u);
1069         
1070         conn->uc_rx_state = UC_RX_KSM_HEADER;
1071         conn->uc_rx_flag = 0;
1072 }
1073
1074 /* RX state transition to UC_RX_SKIPPING: update RX part for
1075  * skipping and set uc_rx_state
1076  */
1077 void
1078 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1079 {
1080         static char    skip_buffer[4096];
1081
1082         int            nob;
1083         unsigned int   niov = 0;
1084         int            skipped = 0;
1085         int            nob_to_skip = conn->uc_rx_nob_left;
1086         
1087         LASSERT(nob_to_skip != 0);
1088
1089         conn->uc_rx_iov = conn->uc_rx_iova;
1090
1091         /* Set up to skip as much as possible now.  If there's more left
1092          * (ran out of iov entries) we'll get called again */
1093
1094         do {
1095                 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1096
1097                 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1098                 conn->uc_rx_iov[niov].iov_len  = nob;
1099                 niov++;
1100                 skipped += nob;
1101                 nob_to_skip -=nob;
1102
1103         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1104                  niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1105
1106         conn->uc_rx_niov = niov;
1107         conn->uc_rx_nob_wanted = skipped;
1108
1109         conn->uc_rx_state = UC_RX_SKIPPING;
1110 }