Whamcloud - gitweb
use one place for syscall.h
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Maxim Patlasov <maxim@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  */
11
12 #include "usocklnd.h"
13
14 /* Return 1 if the conn is timed out, 0 else */
15 int
16 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
17 {
18         if (conn->uc_tx_flag && /* sending is in progress */
19             cfs_time_aftereq(current_time, conn->uc_tx_deadline))
20                 return 1;
21
22         if (conn->uc_rx_flag && /* receiving is in progress */
23             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
24                 return 1;
25         
26         return 0;
27 }
28
29 void
30 usocklnd_conn_kill(usock_conn_t *conn)
31 {
32         pthread_mutex_lock(&conn->uc_lock);
33         if (conn->uc_state != UC_DEAD)
34                 usocklnd_conn_kill_locked(conn);
35         pthread_mutex_unlock(&conn->uc_lock);        
36 }
37
38 /* Mark the conn as DEAD and schedule its deletion */
39 void
40 usocklnd_conn_kill_locked(usock_conn_t *conn)
41 {
42         conn->uc_rx_flag = conn->uc_tx_flag = 0;
43         conn->uc_state = UC_DEAD;
44         usocklnd_add_killrequest(conn);
45 }
46
47 usock_conn_t *
48 usocklnd_conn_allocate()
49 {
50         usock_conn_t        *conn;
51         usock_pollrequest_t *pr;
52
53         LIBCFS_ALLOC (pr, sizeof(*pr));
54         if (pr == NULL)
55                 return NULL;
56         
57         LIBCFS_ALLOC (conn, sizeof(*conn));
58         if (conn == NULL) {
59                 LIBCFS_FREE (pr, sizeof(*pr));
60                 return NULL;
61         }
62         memset(conn, 0, sizeof(*conn));
63         conn->uc_preq = pr;
64
65         LIBCFS_ALLOC (conn->uc_rx_hello,
66                       offsetof(ksock_hello_msg_t,
67                                kshm_ips[LNET_MAX_INTERFACES]));
68         if (conn->uc_rx_hello == NULL) {
69                 LIBCFS_FREE (pr, sizeof(*pr));
70                 LIBCFS_FREE (conn, sizeof(*conn));
71                 return NULL;
72         }
73
74         return conn;
75 }
76
77 void
78 usocklnd_conn_free(usock_conn_t *conn)
79 {
80         usock_pollrequest_t *pr = conn->uc_preq;
81
82         if (pr != NULL)
83                 LIBCFS_FREE (pr, sizeof(*pr));
84
85         if (conn->uc_rx_hello != NULL)
86                 LIBCFS_FREE (conn->uc_rx_hello,
87                              offsetof(ksock_hello_msg_t,
88                                       kshm_ips[LNET_MAX_INTERFACES]));
89         
90         LIBCFS_FREE (conn, sizeof(*conn));
91 }
92
93 void
94 usocklnd_tear_peer_conn(usock_conn_t *conn)
95 {
96         usock_peer_t     *peer = conn->uc_peer;
97         int               idx = usocklnd_type2idx(conn->uc_type);
98         lnet_ni_t        *ni;
99         lnet_process_id_t id;
100         int               decref_flag  = 0;
101         int               killall_flag = 0;
102         
103         if (peer == NULL) /* nothing to tear */
104                 return;
105         
106         pthread_mutex_lock(&peer->up_lock);
107         pthread_mutex_lock(&conn->uc_lock);        
108
109         ni = peer->up_ni;
110         id = peer->up_peerid;
111
112         if (peer->up_conns[idx] == conn) {
113                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
114                         /* change state not to finalize twice */
115                         conn->uc_rx_state = UC_RX_KSM_HEADER;
116                         lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);                        
117                 }
118                 
119                 usocklnd_destroy_txlist(peer->up_ni,
120                                         &conn->uc_tx_list);
121
122                 peer->up_conns[idx] = NULL;
123                 conn->uc_peer = NULL;
124                 decref_flag = 1;
125
126                 if(conn->uc_errored && !peer->up_errored)
127                         peer->up_errored = killall_flag = 1;
128         }
129         
130         pthread_mutex_unlock(&conn->uc_lock);
131
132         if (killall_flag)
133                 usocklnd_del_conns_locked(peer);
134
135         pthread_mutex_unlock(&peer->up_lock);
136         
137         if (!decref_flag)
138                 return;
139
140         usocklnd_conn_decref(conn);
141         usocklnd_peer_decref(peer);
142
143         usocklnd_check_peer_stale(ni, id);
144 }
145
146 /* Remove peer from hash list if all up_conns[i] is NULL &&
147  * hash table is the only consumer of the peer */
148 void
149 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
150 {
151         usock_peer_t *peer;
152         
153         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
154         peer = usocklnd_find_peer_locked(ni, id);
155
156         if (peer == NULL) {
157                 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
158                 return;
159         }
160
161         if (cfs_atomic_read(&peer->up_refcount) == 2) {
162                 int i;
163                 for (i = 0; i < N_CONN_TYPES; i++)
164                         LASSERT (peer->up_conns[i] == NULL);
165
166                 list_del(&peer->up_list);                        
167                 
168                 if (peer->up_errored &&
169                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
170                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
171                                      cfs_time_seconds(peer->up_last_alive));
172                 
173                 usocklnd_peer_decref(peer);
174         }
175
176         usocklnd_peer_decref(peer);
177         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
178 }
179
180 /* Returns 0 on success, <0 else */
181 int
182 usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
183 {
184         int           rc;
185         __u32         peer_ip;
186         __u16         peer_port;
187         usock_conn_t *conn;
188
189         rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
190         if (rc)
191                 return rc;
192
193         rc = usocklnd_set_sock_options(fd);
194         if (rc)
195                 return rc;
196
197         conn = usocklnd_conn_allocate();
198         if (conn == NULL)
199                 return -ENOMEM;
200
201         usocklnd_rx_hellomagic_state_transition(conn);
202         
203         conn->uc_fd = fd;
204         conn->uc_peer_ip = peer_ip;
205         conn->uc_peer_port = peer_port;
206         conn->uc_state = UC_RECEIVING_HELLO;
207         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
208         conn->uc_ni = ni;
209         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
210         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
211         pthread_mutex_init(&conn->uc_lock, NULL);
212         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
213
214         *connp = conn;
215         return 0;
216 }
217
218 /* Returns 0 on success, <0 else */
219 int
220 usocklnd_create_active_conn(usock_peer_t *peer, int type,
221                             usock_conn_t **connp)
222 {
223         int           rc;
224         int           fd;
225         usock_conn_t *conn;
226         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
227         __u16         dst_port = lnet_acceptor_port();
228         
229         conn = usocklnd_conn_allocate();
230         if (conn == NULL)
231                 return -ENOMEM;
232
233         conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
234                                                         peer->up_peerid.nid);
235         if (conn->uc_tx_hello == NULL) {
236                 usocklnd_conn_free(conn);
237                 return -ENOMEM;
238         }                
239         
240         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
241                 rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
242         else
243                 rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
244         
245         if (rc) {
246                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
247                 usocklnd_conn_free(conn);
248                 return rc;
249         }
250         
251         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
252         conn->uc_tx_flag = 1;
253         
254         conn->uc_fd = fd;
255         conn->uc_peer_ip = dst_ip;
256         conn->uc_peer_port = dst_port;
257         conn->uc_type = type;
258         conn->uc_activeflag = 1;
259         conn->uc_state = UC_CONNECTING;
260         conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
261         conn->uc_ni = NULL;
262         conn->uc_peerid = peer->up_peerid;
263         conn->uc_peer = peer;
264         usocklnd_peer_addref(peer);
265         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
266         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
267         pthread_mutex_init(&conn->uc_lock, NULL);
268         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
269
270         *connp = conn;
271         return 0;
272 }
273
274 /* Returns 0 on success, <0 else */
275 int
276 usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
277 {
278         __u16 port;
279         int   fd;
280         int   rc;
281
282         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; 
283              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; 
284              port--) {
285                 /* Iterate through reserved ports. */
286
287                 rc = libcfs_sock_create(&fd);
288                 if (rc)
289                         return rc;                        
290                                 
291                 rc = libcfs_sock_bind_to_port(fd, port);
292                 if (rc) {
293                         close(fd);
294                         continue;
295                 }
296
297                 rc = usocklnd_set_sock_options(fd);
298                 if (rc) {
299                         close(fd);
300                         return rc;
301                 }
302
303                 rc = libcfs_sock_connect(fd, dst_ip, dst_port);
304                 if (rc == 0) {
305                         *fdp = fd;
306                         return 0;
307                 }
308                 
309                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
310                         close(fd);
311                         return rc;
312                 }
313
314                 close(fd);
315         }
316
317         CERROR("Can't bind to any reserved port\n");
318         return rc;
319 }
320
321 /* Returns 0 on success, <0 else */
322 int
323 usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
324 {
325         int fd;
326         int rc;
327
328         rc = libcfs_sock_create(&fd);
329         if (rc)
330                 return rc;
331         
332         rc = usocklnd_set_sock_options(fd);
333         if (rc) {
334                 close(fd);
335                 return rc;
336         }
337
338         rc = libcfs_sock_connect(fd, dst_ip, dst_port);
339         if (rc) {
340                 close(fd);
341                 return rc;
342         }
343
344         *fdp = fd;
345         return 0;
346 }
347
348 int
349 usocklnd_set_sock_options(int fd)
350 {
351         int rc;
352
353         rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
354         if (rc)
355                 return rc;
356
357         if (usock_tuns.ut_sockbufsiz) {
358                 rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
359                 if (rc)
360                         return rc;        
361         }
362         
363         return libcfs_fcntl_nonblock(fd);
364 }
365
366 void
367 usocklnd_init_msg(ksock_msg_t *msg, int type)
368 {
369         msg->ksm_type           = type;
370         msg->ksm_csum           = 0;
371         msg->ksm_zc_req_cookie  = 0;
372         msg->ksm_zc_ack_cookie  = 0;
373 }
374
375 usock_tx_t *
376 usocklnd_create_noop_tx(__u64 cookie)
377 {
378         usock_tx_t *tx;
379         
380         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
381         if (tx == NULL)
382                 return NULL;
383
384         tx->tx_size = sizeof(usock_tx_t);
385         tx->tx_lnetmsg = NULL;
386
387         usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
388         tx->tx_msg.ksm_zc_ack_cookie = cookie;
389         
390         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
391         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
392                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
393         tx->tx_iov = tx->tx_iova;
394         tx->tx_niov = 1;
395         
396         return tx;
397 }
398         
399 usock_tx_t *
400 usocklnd_create_tx(lnet_msg_t *lntmsg)
401 {
402         usock_tx_t   *tx;
403         unsigned int  payload_niov = lntmsg->msg_niov; 
404         struct iovec *payload_iov = lntmsg->msg_iov; 
405         unsigned int  payload_offset = lntmsg->msg_offset;
406         unsigned int  payload_nob = lntmsg->msg_len;
407         int           size = offsetof(usock_tx_t,
408                                       tx_iova[1 + payload_niov]);
409
410         LIBCFS_ALLOC (tx, size);
411         if (tx == NULL)
412                 return NULL;
413
414         tx->tx_size = size;
415         tx->tx_lnetmsg = lntmsg;
416
417         tx->tx_resid = tx->tx_nob =
418                 offsetof(ksock_msg_t,  ksm_u.lnetmsg.ksnm_payload) +
419                 payload_nob;
420         
421         usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
422         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
423         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
424         tx->tx_iova[0].iov_len = offsetof(ksock_msg_t,
425                                           ksm_u.lnetmsg.ksnm_payload);
426         tx->tx_iov = tx->tx_iova;
427
428         tx->tx_niov = 1 + 
429                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
430                                  payload_niov, payload_iov,
431                                  payload_offset, payload_nob);
432
433         return tx;
434 }
435
436 void
437 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
438                         lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
439 {
440         usock_net_t *net = (usock_net_t *)ni->ni_data;
441
442         hello->kshm_magic       = LNET_PROTO_MAGIC;
443         hello->kshm_version     = KSOCK_PROTO_V2;
444         hello->kshm_nips        = 0;
445         hello->kshm_ctype       = type;
446         
447         hello->kshm_dst_incarnation = 0; /* not used */
448         hello->kshm_src_incarnation = net->un_incarnation;
449
450         hello->kshm_src_pid = the_lnet.ln_pid;
451         hello->kshm_src_nid = ni->ni_nid;
452         hello->kshm_dst_nid = peer_nid;
453         hello->kshm_dst_pid = 0; /* not used */
454 }
455
456 usock_tx_t *
457 usocklnd_create_hello_tx(lnet_ni_t *ni,
458                          int type, lnet_nid_t peer_nid)
459 {
460         usock_tx_t        *tx;
461         int                size;
462         ksock_hello_msg_t *hello;
463
464         size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
465         LIBCFS_ALLOC (tx, size);
466         if (tx == NULL)
467                 return NULL;
468
469         tx->tx_size = size;
470         tx->tx_lnetmsg = NULL;
471
472         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
473         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
474         
475         tx->tx_iova[0].iov_base = (void *)hello;
476         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
477                 offsetof(ksock_hello_msg_t, kshm_ips);
478         tx->tx_iov = tx->tx_iova;
479         tx->tx_niov = 1;
480
481         return tx;
482 }
483
484 usock_tx_t *
485 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
486                             int type, lnet_nid_t peer_nid)
487 {
488         usock_tx_t              *tx;
489         int                      size;
490         lnet_acceptor_connreq_t *cr;
491         ksock_hello_msg_t       *hello;
492
493         size = sizeof(usock_tx_t) +
494                 sizeof(lnet_acceptor_connreq_t) +
495                 offsetof(ksock_hello_msg_t, kshm_ips);
496         LIBCFS_ALLOC (tx, size);
497         if (tx == NULL)
498                 return NULL;
499
500         tx->tx_size = size;
501         tx->tx_lnetmsg = NULL;
502
503         cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
504         memset(cr, 0, sizeof(*cr));
505         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
506         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
507         cr->acr_nid     = peer_nid;
508         
509         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
510         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
511         
512         tx->tx_iova[0].iov_base = (void *)cr;
513         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
514                 sizeof(lnet_acceptor_connreq_t) +
515                 offsetof(ksock_hello_msg_t, kshm_ips);
516         tx->tx_iov = tx->tx_iova;
517         tx->tx_niov = 1;
518
519         return tx;
520 }
521
522 void
523 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
524 {
525         lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
526         int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
527
528         LASSERT (ni != NULL || lnetmsg == NULL);
529
530         LIBCFS_FREE (tx, tx->tx_size);
531         
532         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
533                 lnet_finalize(ni, lnetmsg, rc);
534 }
535
536 void
537 usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
538 {
539         usock_tx_t *tx;
540
541         while (!list_empty(txlist)) {
542                 tx = list_entry(txlist->next, usock_tx_t, tx_list);
543                 list_del(&tx->tx_list);
544                 
545                 usocklnd_destroy_tx(ni, tx);
546         }
547 }
548
549 void
550 usocklnd_destroy_zcack_list(struct list_head *zcack_list)
551 {
552         usock_zc_ack_t *zcack;
553
554         while (!list_empty(zcack_list)) {
555                 zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
556                 list_del(&zcack->zc_list);
557                 
558                 LIBCFS_FREE (zcack, sizeof(*zcack));
559         }
560 }
561
562 void
563 usocklnd_destroy_peer(usock_peer_t *peer)
564 {
565         usock_net_t *net = peer->up_ni->ni_data;
566         int          i;
567
568         for (i = 0; i < N_CONN_TYPES; i++)
569                 LASSERT (peer->up_conns[i] == NULL);
570
571         LIBCFS_FREE (peer, sizeof (*peer));
572
573         pthread_mutex_lock(&net->un_lock);
574         if(--net->un_peercount == 0)                
575                 pthread_cond_signal(&net->un_cond);
576         pthread_mutex_unlock(&net->un_lock);
577 }
578
579 void
580 usocklnd_destroy_conn(usock_conn_t *conn)
581 {
582         LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
583
584         if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
585                 LASSERT (conn->uc_peer != NULL);
586                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
587         }
588
589         if (!list_empty(&conn->uc_tx_list)) {
590                 LASSERT (conn->uc_peer != NULL);                
591                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
592         }
593
594         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
595         
596         if (conn->uc_peer != NULL)
597                 usocklnd_peer_decref(conn->uc_peer);
598
599         if (conn->uc_ni != NULL)
600                 lnet_ni_decref(conn->uc_ni);
601
602         if (conn->uc_tx_hello)
603                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
604
605         usocklnd_conn_free(conn);
606 }
607
608 int
609 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
610 {
611         int nob;
612
613         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
614                 return SOCKLND_CONN_ANY;
615
616         nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
617                 lntmsg->msg_len;
618         
619         if (nob >= usock_tuns.ut_min_bulk)
620                 return SOCKLND_CONN_BULK_OUT;
621         else
622                 return SOCKLND_CONN_CONTROL;
623 }
624
625 int usocklnd_type2idx(int type)
626 {
627         switch (type) {
628         case SOCKLND_CONN_ANY:
629         case SOCKLND_CONN_CONTROL:
630                 return 0;
631         case SOCKLND_CONN_BULK_IN:
632                 return 1;
633         case SOCKLND_CONN_BULK_OUT:
634                 return 2;
635         default:
636                 LBUG();
637         }
638 }
639
640 usock_peer_t *
641 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
642 {
643         struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
644         struct list_head *tmp;
645         usock_peer_t     *peer;
646
647         list_for_each (tmp, peer_list) {
648
649                 peer = list_entry (tmp, usock_peer_t, up_list);
650
651                 if (peer->up_ni != ni)
652                         continue;
653
654                 if (peer->up_peerid.nid != id.nid ||
655                     peer->up_peerid.pid != id.pid)
656                         continue;
657
658                 usocklnd_peer_addref(peer);
659                 return peer;
660         }
661         return (NULL);
662 }
663
664 int
665 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
666                      usock_peer_t **peerp)
667 {
668         usock_net_t  *net = ni->ni_data;
669         usock_peer_t *peer;
670         int           i;
671
672         LIBCFS_ALLOC (peer, sizeof (*peer));
673         if (peer == NULL)
674                 return -ENOMEM;
675
676         for (i = 0; i < N_CONN_TYPES; i++)
677                 peer->up_conns[i] = NULL;
678
679         peer->up_peerid       = id;
680         peer->up_ni           = ni;
681         peer->up_incrn_is_set = 0;
682         peer->up_errored      = 0;
683         peer->up_last_alive   = 0;
684         cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
685         pthread_mutex_init(&peer->up_lock, NULL);        
686
687         pthread_mutex_lock(&net->un_lock);
688         net->un_peercount++;        
689         pthread_mutex_unlock(&net->un_lock);
690
691         *peerp = peer;
692         return 0;
693 }
694
695 /* Safely create new peer if needed. Save result in *peerp.
696  * Returns 0 on success, <0 else */
697 int
698 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
699                              usock_peer_t **peerp)
700 {
701         int           rc;
702         usock_peer_t *peer;
703         usock_peer_t *peer2;
704         usock_net_t  *net = ni->ni_data;
705
706         pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
707         peer = usocklnd_find_peer_locked(ni, id);
708         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
709
710         if (peer != NULL)
711                 goto find_or_create_peer_done;
712
713         rc = usocklnd_create_peer(ni, id, &peer);
714         if (rc)
715                 return rc;
716         
717         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
718         peer2 = usocklnd_find_peer_locked(ni, id);
719         if (peer2 == NULL) {
720                 if (net->un_shutdown) {
721                         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
722                         usocklnd_peer_decref(peer); /* should destroy peer */
723                         CERROR("Can't create peer: network shutdown\n");
724                         return -ESHUTDOWN;
725                 }
726                 
727                 /* peer table will take 1 of my refs on peer */
728                 usocklnd_peer_addref(peer);
729                 list_add_tail (&peer->up_list,
730                                usocklnd_nid2peerlist(id.nid));
731         } else {
732                 usocklnd_peer_decref(peer); /* should destroy peer */
733                 peer = peer2;
734         }
735         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
736         
737   find_or_create_peer_done:        
738         *peerp = peer;
739         return 0;
740 }
741
742 /* NB: both peer and conn locks are held */
743 static int
744 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
745 {        
746         if (conn->uc_state == UC_READY &&
747             list_empty(&conn->uc_tx_list) &&
748             list_empty(&conn->uc_zcack_list) &&
749             !conn->uc_sending) {
750                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
751                                                   POLLOUT);
752                 if (rc != 0)
753                         return rc;
754         }                
755
756         list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
757         return 0;
758 }
759
760 /* NB: both peer and conn locks are held
761  * NB: if sending isn't in progress.  the caller *MUST* send tx
762  * immediately after we'll return */
763 static void
764 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
765                     int *send_immediately)
766 {        
767         if (conn->uc_state == UC_READY &&
768             list_empty(&conn->uc_tx_list) &&
769             list_empty(&conn->uc_zcack_list) &&
770             !conn->uc_sending) {
771                 conn->uc_sending = 1;
772                 *send_immediately = 1;
773                 return;
774         }                
775
776         *send_immediately = 0;
777         list_add_tail(&tx->tx_list, &conn->uc_tx_list);
778 }
779
780 /* Safely create new conn if needed. Save result in *connp.
781  * Returns 0 on success, <0 else */
782 int
783 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
784                              usock_conn_t **connp,
785                              usock_tx_t *tx, usock_zc_ack_t *zc_ack,
786                              int *send_immediately)
787 {
788         usock_conn_t *conn;
789         int           idx;
790         int           rc;
791         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
792         
793         if (userflag)
794                 type = SOCKLND_CONN_ANY;
795
796         idx = usocklnd_type2idx(type);
797         
798         pthread_mutex_lock(&peer->up_lock);
799         if (peer->up_conns[idx] != NULL) {
800                 conn = peer->up_conns[idx];
801                 LASSERT(conn->uc_type == type);
802         } else {
803                 if (userflag) {
804                         CERROR("Refusing to create a connection to "
805                                "userspace process %s\n",
806                                libcfs_id2str(peer->up_peerid));
807                         rc = -EHOSTUNREACH;
808                         goto find_or_create_conn_failed;
809                 }
810                 
811                 rc = usocklnd_create_active_conn(peer, type, &conn);
812                 if (rc) {
813                         peer->up_errored = 1;
814                         usocklnd_del_conns_locked(peer);
815                         goto find_or_create_conn_failed;
816                 }
817
818                 /* peer takes 1 of conn refcount */
819                 usocklnd_link_conn_to_peer(conn, peer, idx);
820                 
821                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
822                 if (rc) {
823                         peer->up_conns[idx] = NULL;
824                         usocklnd_conn_decref(conn); /* should destroy conn */
825                         goto find_or_create_conn_failed;
826                 }
827                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
828         }
829         
830         pthread_mutex_lock(&conn->uc_lock);
831         LASSERT(conn->uc_peer == peer);
832
833         LASSERT(tx == NULL || zc_ack == NULL);
834         if (tx != NULL) {
835                 usocklnd_enqueue_tx(conn, tx, send_immediately);
836         } else {
837                 rc = usocklnd_enqueue_zcack(conn, zc_ack);        
838                 if (rc != 0) {
839                         usocklnd_conn_kill_locked(conn);
840                         pthread_mutex_unlock(&conn->uc_lock);
841                         goto find_or_create_conn_failed;
842                 }
843         }
844         pthread_mutex_unlock(&conn->uc_lock);         
845
846         usocklnd_conn_addref(conn);
847         pthread_mutex_unlock(&peer->up_lock);
848
849         *connp = conn;
850         return 0;
851
852   find_or_create_conn_failed:
853         pthread_mutex_unlock(&peer->up_lock);
854         return rc;
855 }
856
857 void
858 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
859 {
860         peer->up_conns[idx] = conn;        
861         peer->up_errored    = 0; /* this new fresh conn will try
862                                   * revitalize even stale errored peer */
863 }
864
865 int
866 usocklnd_invert_type(int type)
867 {
868         switch (type)
869         {
870         case SOCKLND_CONN_ANY:
871         case SOCKLND_CONN_CONTROL:
872                 return (type);
873         case SOCKLND_CONN_BULK_IN:
874                 return SOCKLND_CONN_BULK_OUT;
875         case SOCKLND_CONN_BULK_OUT:
876                 return SOCKLND_CONN_BULK_IN;
877         default:
878                 return SOCKLND_CONN_NONE;
879         }
880 }
881
882 void
883 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
884 {
885         pthread_mutex_lock(&conn->uc_lock);
886         if (conn->uc_state != UC_DEAD)
887                 conn->uc_state = new_state;
888         pthread_mutex_unlock(&conn->uc_lock);
889 }
890
891 /* NB: peer is locked by caller */
892 void
893 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
894                              usock_conn_t *skip_conn)
895 {
896         int i;
897         
898         if (!peer->up_incrn_is_set) {
899                 peer->up_incarnation = incrn;
900                 peer->up_incrn_is_set = 1;
901                 return;
902         }
903
904         if (peer->up_incarnation == incrn)
905                 return;
906
907         peer->up_incarnation = incrn;
908         
909         for (i = 0; i < N_CONN_TYPES; i++) {
910                 usock_conn_t *conn = peer->up_conns[i];
911                 
912                 if (conn == NULL || conn == skip_conn)
913                         continue;
914
915                 pthread_mutex_lock(&conn->uc_lock);        
916                 LASSERT (conn->uc_peer == peer);
917                 conn->uc_peer = NULL;
918                 peer->up_conns[i] = NULL;
919                 if (conn->uc_state != UC_DEAD)
920                         usocklnd_conn_kill_locked(conn);                
921                 pthread_mutex_unlock(&conn->uc_lock);
922
923                 usocklnd_conn_decref(conn);
924                 usocklnd_peer_decref(peer);
925         }
926 }
927
928 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
929  * MAGIC part of hello and set uc_rx_state
930  */
931 void
932 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
933 {
934         LASSERT(conn->uc_rx_hello != NULL);
935
936         conn->uc_rx_niov = 1;
937         conn->uc_rx_iov = conn->uc_rx_iova;
938         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
939         conn->uc_rx_iov[0].iov_len =
940                 conn->uc_rx_nob_wanted =
941                 conn->uc_rx_nob_left =
942                 sizeof(conn->uc_rx_hello->kshm_magic);
943
944         conn->uc_rx_state = UC_RX_HELLO_MAGIC;
945
946         conn->uc_rx_flag = 1; /* waiting for incoming hello */
947         conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
948 }
949
950 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
951  * VERSION part of hello and set uc_rx_state
952  */
953 void
954 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
955 {
956         LASSERT(conn->uc_rx_hello != NULL);
957
958         conn->uc_rx_niov = 1;
959         conn->uc_rx_iov = conn->uc_rx_iova;
960         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
961         conn->uc_rx_iov[0].iov_len =
962                 conn->uc_rx_nob_wanted =
963                 conn->uc_rx_nob_left =
964                 sizeof(conn->uc_rx_hello->kshm_version);
965         
966         conn->uc_rx_state = UC_RX_HELLO_VERSION;
967 }
968
969 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
970  * the rest  of hello and set uc_rx_state
971  */
972 void
973 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
974 {
975         LASSERT(conn->uc_rx_hello != NULL);
976
977         conn->uc_rx_niov = 1;
978         conn->uc_rx_iov = conn->uc_rx_iova;
979         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
980         conn->uc_rx_iov[0].iov_len =
981                 conn->uc_rx_nob_wanted =
982                 conn->uc_rx_nob_left =
983                 offsetof(ksock_hello_msg_t, kshm_ips) -
984                 offsetof(ksock_hello_msg_t, kshm_src_nid);
985         
986         conn->uc_rx_state = UC_RX_HELLO_BODY;
987 }
988
989 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
990  * array of IPs and set uc_rx_state
991  */
992 void
993 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
994 {
995         LASSERT(conn->uc_rx_hello != NULL);
996
997         conn->uc_rx_niov = 1;
998         conn->uc_rx_iov = conn->uc_rx_iova;
999         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1000         conn->uc_rx_iov[0].iov_len =
1001                 conn->uc_rx_nob_wanted =
1002                 conn->uc_rx_nob_left =
1003                 conn->uc_rx_hello->kshm_nips *
1004                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1005         
1006         conn->uc_rx_state = UC_RX_HELLO_IPS;
1007 }
1008
1009 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1010  * LNET header and set uc_rx_state
1011  */
1012 void
1013 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1014 {
1015         conn->uc_rx_niov = 1;
1016         conn->uc_rx_iov = conn->uc_rx_iova;
1017         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;                
1018         conn->uc_rx_iov[0].iov_len =
1019                 conn->uc_rx_nob_wanted =
1020                 conn->uc_rx_nob_left =
1021                 sizeof(ksock_lnet_msg_t);
1022         
1023         conn->uc_rx_state = UC_RX_LNET_HEADER;
1024         conn->uc_rx_flag = 1;
1025 }
1026
1027 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1028  * KSM header and set uc_rx_state
1029  */
1030 void
1031 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1032 {
1033         conn->uc_rx_niov = 1;
1034         conn->uc_rx_iov = conn->uc_rx_iova;
1035         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;                
1036         conn->uc_rx_iov[0].iov_len =
1037                 conn->uc_rx_nob_wanted =
1038                 conn->uc_rx_nob_left =                        
1039                 offsetof(ksock_msg_t, ksm_u);
1040         
1041         conn->uc_rx_state = UC_RX_KSM_HEADER;
1042         conn->uc_rx_flag = 0;
1043 }
1044
1045 /* RX state transition to UC_RX_SKIPPING: update RX part for
1046  * skipping and set uc_rx_state
1047  */
1048 void
1049 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1050 {
1051         static char    skip_buffer[4096];
1052
1053         int            nob;
1054         unsigned int   niov = 0;
1055         int            skipped = 0;
1056         int            nob_to_skip = conn->uc_rx_nob_left;
1057         
1058         LASSERT(nob_to_skip != 0);
1059
1060         conn->uc_rx_iov = conn->uc_rx_iova;
1061
1062         /* Set up to skip as much as possible now.  If there's more left
1063          * (ran out of iov entries) we'll get called again */
1064
1065         do {
1066                 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1067
1068                 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1069                 conn->uc_rx_iov[niov].iov_len  = nob;
1070                 niov++;
1071                 skipped += nob;
1072                 nob_to_skip -=nob;
1073
1074         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1075                  niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1076
1077         conn->uc_rx_niov = niov;
1078         conn->uc_rx_nob_wanted = skipped;
1079
1080         conn->uc_rx_state = UC_RX_SKIPPING;
1081 }