Whamcloud - gitweb
b543c7a43fa0ad7f57734b17c74ce5f28d939b4d
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/ulnds/socklnd/conn.c
35  *
36  * Author: Maxim Patlasov <maxim@clusterfs.com>
37  */
38
39 #include "usocklnd.h"
40
41 /* Return 1 if the conn is timed out, 0 else */
42 int
43 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
44 {
45         if (conn->uc_tx_flag && /* sending is in progress */
46             cfs_time_aftereq(current_time, conn->uc_tx_deadline))
47                 return 1;
48
49         if (conn->uc_rx_flag && /* receiving is in progress */
50             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
51                 return 1;
52
53         return 0;
54 }
55
56 void
57 usocklnd_conn_kill(usock_conn_t *conn)
58 {
59         pthread_mutex_lock(&conn->uc_lock);
60         if (conn->uc_state != UC_DEAD)
61                 usocklnd_conn_kill_locked(conn);
62         pthread_mutex_unlock(&conn->uc_lock);
63 }
64
65 /* Mark the conn as DEAD and schedule its deletion */
66 void
67 usocklnd_conn_kill_locked(usock_conn_t *conn)
68 {
69         conn->uc_rx_flag = conn->uc_tx_flag = 0;
70         conn->uc_state = UC_DEAD;
71         usocklnd_add_killrequest(conn);
72 }
73
74 usock_conn_t *
75 usocklnd_conn_allocate()
76 {
77         usock_conn_t        *conn;
78         usock_pollrequest_t *pr;
79
80         LIBCFS_ALLOC (pr, sizeof(*pr));
81         if (pr == NULL)
82                 return NULL;
83
84         LIBCFS_ALLOC (conn, sizeof(*conn));
85         if (conn == NULL) {
86                 LIBCFS_FREE (pr, sizeof(*pr));
87                 return NULL;
88         }
89         memset(conn, 0, sizeof(*conn));
90         conn->uc_preq = pr;
91
92         LIBCFS_ALLOC (conn->uc_rx_hello,
93                       offsetof(ksock_hello_msg_t,
94                                kshm_ips[LNET_MAX_INTERFACES]));
95         if (conn->uc_rx_hello == NULL) {
96                 LIBCFS_FREE (pr, sizeof(*pr));
97                 LIBCFS_FREE (conn, sizeof(*conn));
98                 return NULL;
99         }
100
101         return conn;
102 }
103
104 void
105 usocklnd_conn_free(usock_conn_t *conn)
106 {
107         usock_pollrequest_t *pr = conn->uc_preq;
108
109         if (pr != NULL)
110                 LIBCFS_FREE (pr, sizeof(*pr));
111
112         if (conn->uc_rx_hello != NULL)
113                 LIBCFS_FREE (conn->uc_rx_hello,
114                              offsetof(ksock_hello_msg_t,
115                                       kshm_ips[LNET_MAX_INTERFACES]));
116
117         LIBCFS_FREE (conn, sizeof(*conn));
118 }
119
120 void
121 usocklnd_tear_peer_conn(usock_conn_t *conn)
122 {
123         usock_peer_t     *peer = conn->uc_peer;
124         int               idx = usocklnd_type2idx(conn->uc_type);
125         lnet_ni_t        *ni;
126         lnet_process_id_t id;
127         int               decref_flag  = 0;
128         int               killall_flag = 0;
129         void             *rx_lnetmsg   = NULL; 
130         CFS_LIST_HEAD    (zombie_txs);
131
132         if (peer == NULL) /* nothing to tear */
133                 return;
134
135         pthread_mutex_lock(&peer->up_lock);
136         pthread_mutex_lock(&conn->uc_lock);
137
138         ni = peer->up_ni;
139         id = peer->up_peerid;
140
141         if (peer->up_conns[idx] == conn) {
142                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
143                         /* change state not to finalize twice */
144                         conn->uc_rx_state = UC_RX_KSM_HEADER;
145                         /* stash lnetmsg while holding locks */
146                         rx_lnetmsg = conn->uc_rx_lnetmsg;
147                 }
148
149                 /* we cannot finilize txs right now (bug #18844) */
150                 cfs_list_splice_init(&conn->uc_tx_list, &zombie_txs);
151
152                 peer->up_conns[idx] = NULL;
153                 conn->uc_peer = NULL;
154                 decref_flag = 1;
155
156                 if(conn->uc_errored && !peer->up_errored)
157                         peer->up_errored = killall_flag = 1;
158
159                 /* prevent queueing new txs to this conn */
160                 conn->uc_errored = 1;
161         }
162
163         pthread_mutex_unlock(&conn->uc_lock);
164
165         if (killall_flag)
166                 usocklnd_del_conns_locked(peer);
167
168         pthread_mutex_unlock(&peer->up_lock);
169
170         if (!decref_flag)
171                 return;
172
173         if (rx_lnetmsg != NULL)
174                 lnet_finalize(ni, rx_lnetmsg, -EIO);
175         
176         usocklnd_destroy_txlist(ni, &zombie_txs);
177
178         usocklnd_conn_decref(conn);
179         usocklnd_peer_decref(peer);
180
181         usocklnd_check_peer_stale(ni, id);
182 }
183
184 /* Remove peer from hash list if all up_conns[i] is NULL &&
185  * hash table is the only consumer of the peer */
186 void
187 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
188 {
189         usock_peer_t *peer;
190
191         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
192         peer = usocklnd_find_peer_locked(ni, id);
193
194         if (peer == NULL) {
195                 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
196                 return;
197         }
198
199         if (mt_atomic_read(&peer->up_refcount) == 2) {
200                 int i;
201                 for (i = 0; i < N_CONN_TYPES; i++)
202                         LASSERT (peer->up_conns[i] == NULL);
203
204                 cfs_list_del(&peer->up_list);
205
206                 if (peer->up_errored &&
207                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
208                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
209                                      cfs_time_seconds(peer->up_last_alive));
210
211                 usocklnd_peer_decref(peer);
212         }
213
214         usocklnd_peer_decref(peer);
215         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
216 }
217
218 /* Returns 0 on success, <0 else */
219 int
220 usocklnd_create_passive_conn(lnet_ni_t *ni,
221                              cfs_socket_t *sock, usock_conn_t **connp)
222 {
223         int           rc;
224         __u32         peer_ip;
225         int           peer_port;
226         usock_conn_t *conn;
227
228         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
229         if (rc)
230                 return rc;
231
232         LASSERT (peer_port >= 0); /* uc_peer_port is u16 */
233
234         rc = usocklnd_set_sock_options(sock);
235         if (rc)
236                 return rc;
237
238         conn = usocklnd_conn_allocate();
239         if (conn == NULL)
240                 return -ENOMEM;
241
242         usocklnd_rx_hellomagic_state_transition(conn);
243
244         conn->uc_sock = sock;
245         conn->uc_peer_ip = peer_ip;
246         conn->uc_peer_port = peer_port;
247         conn->uc_state = UC_RECEIVING_HELLO;
248         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
249         conn->uc_ni = ni;
250         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
251         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
252         pthread_mutex_init(&conn->uc_lock, NULL);
253         mt_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
254
255         *connp = conn;
256         return 0;
257 }
258
259 /* Returns 0 on success, <0 else */
260 int
261 usocklnd_create_active_conn(usock_peer_t *peer, int type,
262                             usock_conn_t **connp)
263 {
264         int           rc;
265         cfs_socket_t *sock;
266         usock_conn_t *conn;
267         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
268         __u16         dst_port = lnet_acceptor_port();
269
270         conn = usocklnd_conn_allocate();
271         if (conn == NULL)
272                 return -ENOMEM;
273
274         conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
275                                                         peer->up_peerid.nid);
276         if (conn->uc_tx_hello == NULL) {
277                 usocklnd_conn_free(conn);
278                 return -ENOMEM;
279         }
280
281         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
282                 rc = usocklnd_connect_cli_mode(&sock, dst_ip, dst_port);
283         else
284                 rc = usocklnd_connect_srv_mode(&sock, dst_ip, dst_port);
285
286         if (rc) {
287                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
288                 usocklnd_conn_free(conn);
289                 return rc;
290         }
291
292         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
293         conn->uc_tx_flag     = 1;
294
295         conn->uc_sock       = sock;
296         conn->uc_peer_ip    = dst_ip;
297         conn->uc_peer_port  = dst_port;
298         conn->uc_type       = type;
299         conn->uc_activeflag = 1;
300         conn->uc_state      = UC_CONNECTING;
301         conn->uc_pt_idx     = usocklnd_ip2pt_idx(dst_ip);
302         conn->uc_ni         = NULL;
303         conn->uc_peerid     = peer->up_peerid;
304         conn->uc_peer       = peer;
305
306         usocklnd_peer_addref(peer);
307         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
308         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
309         pthread_mutex_init(&conn->uc_lock, NULL);
310         mt_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
311
312         *connp = conn;
313         return 0;
314 }
315
316 /* Returns 0 on success, <0 else */
317 int
318 usocklnd_connect_srv_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
319 {
320         __u16         port;
321         cfs_socket_t *sock;
322         int           rc;
323         int           fatal;
324
325         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
326              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
327              port--) {
328                 /* Iterate through reserved ports. */
329                 rc = libcfs_sock_create(&sock, &fatal, 0, port);
330                 if (rc) {
331                         if (fatal)
332                                 return rc;
333                         continue;
334                 }
335
336                 rc = usocklnd_set_sock_options(sock);
337                 if (rc) {
338                         libcfs_sock_release(sock);
339                         return rc;
340                 }
341
342                 rc = libcfs_sock_connect(sock, dst_ip, dst_port);
343                 if (rc == 0) {
344                         *sockp = sock;
345                         return 0;
346                 }
347
348                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
349                         libcfs_sock_release(sock);
350                         return rc;
351                 }
352
353                 libcfs_sock_release(sock);
354         }
355
356         CERROR("Can't bind to any reserved port\n");
357         return rc;
358 }
359
360 /* Returns 0 on success, <0 else */
361 int
362 usocklnd_connect_cli_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
363 {
364         cfs_socket_t *sock;
365         int           rc;
366         int           fatal;
367
368         rc = libcfs_sock_create(&sock, &fatal, 0, 0);
369         if (rc)
370                 return rc;
371
372         rc = usocklnd_set_sock_options(sock);
373         if (rc) {
374                 libcfs_sock_release(sock);
375                 return rc;
376         }
377
378         rc = libcfs_sock_connect(sock, dst_ip, dst_port);
379         if (rc) {
380                 libcfs_sock_release(sock);
381                 return rc;
382         }
383
384         *sockp = sock;
385         return 0;
386 }
387
388 int
389 usocklnd_set_sock_options(cfs_socket_t *sock)
390 {
391         int rc;
392
393         rc = libcfs_sock_set_nagle(sock, usock_tuns.ut_socknagle);
394         if (rc)
395                 return rc;
396
397         if (usock_tuns.ut_sockbufsiz) {
398                 rc = libcfs_sock_set_bufsiz(sock, usock_tuns.ut_sockbufsiz);
399                 if (rc)
400                         return rc;
401         }
402
403         return libcfs_fcntl_nonblock(sock);
404 }
405
406 usock_tx_t *
407 usocklnd_create_noop_tx(__u64 cookie)
408 {
409         usock_tx_t *tx;
410
411         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
412         if (tx == NULL)
413                 return NULL;
414
415         tx->tx_size = sizeof(usock_tx_t);
416         tx->tx_lnetmsg = NULL;
417
418         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
419         tx->tx_msg.ksm_zc_cookies[1] = cookie;
420
421         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
422         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
423                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
424         tx->tx_iov = tx->tx_iova;
425         tx->tx_niov = 1;
426
427         return tx;
428 }
429
430 usock_tx_t *
431 usocklnd_create_tx(lnet_msg_t *lntmsg)
432 {
433         usock_tx_t   *tx;
434         unsigned int  payload_niov = lntmsg->msg_niov;
435         struct iovec *payload_iov = lntmsg->msg_iov;
436         unsigned int  payload_offset = lntmsg->msg_offset;
437         unsigned int  payload_nob = lntmsg->msg_len;
438         int           size = offsetof(usock_tx_t,
439                                       tx_iova[1 + payload_niov]);
440
441         LIBCFS_ALLOC (tx, size);
442         if (tx == NULL)
443                 return NULL;
444
445         tx->tx_size = size;
446         tx->tx_lnetmsg = lntmsg;
447
448         tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
449
450         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
451         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
452         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
453         tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
454         tx->tx_iov = tx->tx_iova;
455
456         tx->tx_niov = 1 +
457                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
458                                  payload_niov, payload_iov,
459                                  payload_offset, payload_nob);
460
461         return tx;
462 }
463
464 void
465 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
466                         lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
467 {
468         usock_net_t *net = (usock_net_t *)ni->ni_data;
469
470         hello->kshm_magic       = LNET_PROTO_MAGIC;
471         hello->kshm_version     = KSOCK_PROTO_V2;
472         hello->kshm_nips        = 0;
473         hello->kshm_ctype       = type;
474
475         hello->kshm_dst_incarnation = 0; /* not used */
476         hello->kshm_src_incarnation = net->un_incarnation;
477
478         hello->kshm_src_pid = the_lnet.ln_pid;
479         hello->kshm_src_nid = ni->ni_nid;
480         hello->kshm_dst_nid = peer_nid;
481         hello->kshm_dst_pid = 0; /* not used */
482 }
483
484 usock_tx_t *
485 usocklnd_create_hello_tx(lnet_ni_t *ni,
486                          int type, lnet_nid_t peer_nid)
487 {
488         usock_tx_t        *tx;
489         int                size;
490         ksock_hello_msg_t *hello;
491
492         size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
493         LIBCFS_ALLOC (tx, size);
494         if (tx == NULL)
495                 return NULL;
496
497         tx->tx_size = size;
498         tx->tx_lnetmsg = NULL;
499
500         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
501         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
502
503         tx->tx_iova[0].iov_base = (void *)hello;
504         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
505                 offsetof(ksock_hello_msg_t, kshm_ips);
506         tx->tx_iov = tx->tx_iova;
507         tx->tx_niov = 1;
508
509         return tx;
510 }
511
512 usock_tx_t *
513 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
514                             int type, lnet_nid_t peer_nid)
515 {
516         usock_tx_t              *tx;
517         int                      size;
518         lnet_acceptor_connreq_t *cr;
519         ksock_hello_msg_t       *hello;
520
521         size = sizeof(usock_tx_t) +
522                 sizeof(lnet_acceptor_connreq_t) +
523                 offsetof(ksock_hello_msg_t, kshm_ips);
524         LIBCFS_ALLOC (tx, size);
525         if (tx == NULL)
526                 return NULL;
527
528         tx->tx_size = size;
529         tx->tx_lnetmsg = NULL;
530
531         cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
532         memset(cr, 0, sizeof(*cr));
533         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
534         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
535         cr->acr_nid     = peer_nid;
536
537         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
538         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
539
540         tx->tx_iova[0].iov_base = (void *)cr;
541         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
542                 sizeof(lnet_acceptor_connreq_t) +
543                 offsetof(ksock_hello_msg_t, kshm_ips);
544         tx->tx_iov = tx->tx_iova;
545         tx->tx_niov = 1;
546
547         return tx;
548 }
549
550 void
551 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
552 {
553         lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
554         int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
555
556         LASSERT (ni != NULL || lnetmsg == NULL);
557
558         LIBCFS_FREE (tx, tx->tx_size);
559
560         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
561                 lnet_finalize(ni, lnetmsg, rc);
562 }
563
564 void
565 usocklnd_destroy_txlist(lnet_ni_t *ni, cfs_list_t *txlist)
566 {
567         usock_tx_t *tx;
568
569         while (!cfs_list_empty(txlist)) {
570                 tx = cfs_list_entry(txlist->next, usock_tx_t, tx_list);
571                 cfs_list_del(&tx->tx_list);
572
573                 usocklnd_destroy_tx(ni, tx);
574         }
575 }
576
577 void
578 usocklnd_destroy_zcack_list(cfs_list_t *zcack_list)
579 {
580         usock_zc_ack_t *zcack;
581
582         while (!cfs_list_empty(zcack_list)) {
583                 zcack = cfs_list_entry(zcack_list->next, usock_zc_ack_t,
584                                        zc_list);
585                 cfs_list_del(&zcack->zc_list);
586
587                 LIBCFS_FREE (zcack, sizeof(*zcack));
588         }
589 }
590
591 void
592 usocklnd_destroy_peer(usock_peer_t *peer)
593 {
594         usock_net_t *net = peer->up_ni->ni_data;
595         int          i;
596
597         for (i = 0; i < N_CONN_TYPES; i++)
598                 LASSERT (peer->up_conns[i] == NULL);
599
600         LIBCFS_FREE (peer, sizeof (*peer));
601
602         pthread_mutex_lock(&net->un_lock);
603         if(--net->un_peercount == 0)
604                 pthread_cond_signal(&net->un_cond);
605         pthread_mutex_unlock(&net->un_lock);
606 }
607
608 void
609 usocklnd_destroy_conn(usock_conn_t *conn)
610 {
611         LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
612
613         if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
614                 LASSERT (conn->uc_peer != NULL);
615                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
616         }
617
618         if (!cfs_list_empty(&conn->uc_tx_list)) {
619                 LASSERT (conn->uc_peer != NULL);
620                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
621         }
622
623         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
624
625         if (conn->uc_peer != NULL)
626                 usocklnd_peer_decref(conn->uc_peer);
627
628         if (conn->uc_ni != NULL)
629                 lnet_ni_decref(conn->uc_ni);
630
631         if (conn->uc_tx_hello)
632                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
633
634         usocklnd_conn_free(conn);
635 }
636
637 int
638 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
639 {
640         int nob;
641
642         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
643                 return SOCKLND_CONN_ANY;
644
645         nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
646
647         if (nob >= usock_tuns.ut_min_bulk)
648                 return SOCKLND_CONN_BULK_OUT;
649         else
650                 return SOCKLND_CONN_CONTROL;
651 }
652
653 int usocklnd_type2idx(int type)
654 {
655         switch (type) {
656         case SOCKLND_CONN_ANY:
657         case SOCKLND_CONN_CONTROL:
658                 return 0;
659         case SOCKLND_CONN_BULK_IN:
660                 return 1;
661         case SOCKLND_CONN_BULK_OUT:
662                 return 2;
663         default:
664                 LBUG();
665         }
666 }
667
668 usock_peer_t *
669 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
670 {
671         cfs_list_t       *peer_list = usocklnd_nid2peerlist(id.nid);
672         cfs_list_t       *tmp;
673         usock_peer_t     *peer;
674
675         cfs_list_for_each (tmp, peer_list) {
676
677                 peer = cfs_list_entry (tmp, usock_peer_t, up_list);
678
679                 if (peer->up_ni != ni)
680                         continue;
681
682                 if (peer->up_peerid.nid != id.nid ||
683                     peer->up_peerid.pid != id.pid)
684                         continue;
685
686                 usocklnd_peer_addref(peer);
687                 return peer;
688         }
689         return (NULL);
690 }
691
692 int
693 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
694                      usock_peer_t **peerp)
695 {
696         usock_net_t  *net = ni->ni_data;
697         usock_peer_t *peer;
698         int           i;
699
700         LIBCFS_ALLOC (peer, sizeof (*peer));
701         if (peer == NULL)
702                 return -ENOMEM;
703
704         for (i = 0; i < N_CONN_TYPES; i++)
705                 peer->up_conns[i] = NULL;
706
707         peer->up_peerid       = id;
708         peer->up_ni           = ni;
709         peer->up_incrn_is_set = 0;
710         peer->up_errored      = 0;
711         peer->up_last_alive   = 0;
712         mt_atomic_set(&peer->up_refcount, 1); /* 1 ref for caller */
713         pthread_mutex_init(&peer->up_lock, NULL);
714
715         pthread_mutex_lock(&net->un_lock);
716         net->un_peercount++;
717         pthread_mutex_unlock(&net->un_lock);
718
719         *peerp = peer;
720         return 0;
721 }
722
723 /* Safely create new peer if needed. Save result in *peerp.
724  * Returns 0 on success, <0 else */
725 int
726 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
727                              usock_peer_t **peerp)
728 {
729         int           rc;
730         usock_peer_t *peer;
731         usock_peer_t *peer2;
732         usock_net_t  *net = ni->ni_data;
733
734         pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
735         peer = usocklnd_find_peer_locked(ni, id);
736         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
737
738         if (peer != NULL)
739                 goto find_or_create_peer_done;
740
741         rc = usocklnd_create_peer(ni, id, &peer);
742         if (rc)
743                 return rc;
744
745         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
746         peer2 = usocklnd_find_peer_locked(ni, id);
747         if (peer2 == NULL) {
748                 if (net->un_shutdown) {
749                         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
750                         usocklnd_peer_decref(peer); /* should destroy peer */
751                         CERROR("Can't create peer: network shutdown\n");
752                         return -ESHUTDOWN;
753                 }
754
755                 /* peer table will take 1 of my refs on peer */
756                 usocklnd_peer_addref(peer);
757                 cfs_list_add_tail (&peer->up_list,
758                                    usocklnd_nid2peerlist(id.nid));
759         } else {
760                 usocklnd_peer_decref(peer); /* should destroy peer */
761                 peer = peer2;
762         }
763         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
764
765   find_or_create_peer_done:
766         *peerp = peer;
767         return 0;
768 }
769
770 /* NB: both peer and conn locks are held */
771 static int
772 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
773 {
774         if (conn->uc_state == UC_READY &&
775             cfs_list_empty(&conn->uc_tx_list) &&
776             cfs_list_empty(&conn->uc_zcack_list) &&
777             !conn->uc_sending) {
778                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
779                                                   POLLOUT);
780                 if (rc != 0)
781                         return rc;
782         }
783
784         cfs_list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
785         return 0;
786 }
787
788 /* NB: both peer and conn locks are held
789  * NB: if sending isn't in progress.  the caller *MUST* send tx
790  * immediately after we'll return */
791 static void
792 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
793                     int *send_immediately)
794 {
795         if (conn->uc_state == UC_READY &&
796             cfs_list_empty(&conn->uc_tx_list) &&
797             cfs_list_empty(&conn->uc_zcack_list) &&
798             !conn->uc_sending) {
799                 conn->uc_sending = 1;
800                 *send_immediately = 1;
801                 return;
802         }
803
804         *send_immediately = 0;
805         cfs_list_add_tail(&tx->tx_list, &conn->uc_tx_list);
806 }
807
808 /* Safely create new conn if needed. Save result in *connp.
809  * Returns 0 on success, <0 else */
810 int
811 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
812                              usock_conn_t **connp,
813                              usock_tx_t *tx, usock_zc_ack_t *zc_ack,
814                              int *send_immediately)
815 {
816         usock_conn_t *conn;
817         int           idx;
818         int           rc;
819         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
820
821         if (userflag)
822                 type = SOCKLND_CONN_ANY;
823
824         idx = usocklnd_type2idx(type);
825
826         pthread_mutex_lock(&peer->up_lock);
827         if (peer->up_conns[idx] != NULL) {
828                 conn = peer->up_conns[idx];
829                 LASSERT(conn->uc_type == type);
830         } else {
831                 if (userflag) {
832                         CERROR("Refusing to create a connection to "
833                                "userspace process %s\n",
834                                libcfs_id2str(peer->up_peerid));
835                         rc = -EHOSTUNREACH;
836                         goto find_or_create_conn_failed;
837                 }
838
839                 rc = usocklnd_create_active_conn(peer, type, &conn);
840                 if (rc) {
841                         peer->up_errored = 1;
842                         usocklnd_del_conns_locked(peer);
843                         goto find_or_create_conn_failed;
844                 }
845
846                 /* peer takes 1 of conn refcount */
847                 usocklnd_link_conn_to_peer(conn, peer, idx);
848
849                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
850                 if (rc) {
851                         peer->up_conns[idx] = NULL;
852                         usocklnd_conn_decref(conn); /* should destroy conn */
853                         goto find_or_create_conn_failed;
854                 }
855                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
856         }
857
858         pthread_mutex_lock(&conn->uc_lock);
859         LASSERT(conn->uc_peer == peer);
860
861         LASSERT(tx == NULL || zc_ack == NULL);
862         if (tx != NULL) {
863                 /* usocklnd_tear_peer_conn() could signal us stop queueing */
864                 if (conn->uc_errored) {
865                         rc = -EIO;
866                         pthread_mutex_unlock(&conn->uc_lock);
867                         goto find_or_create_conn_failed;
868                 }
869
870                 usocklnd_enqueue_tx(conn, tx, send_immediately);
871         } else {
872                 rc = usocklnd_enqueue_zcack(conn, zc_ack);
873                 if (rc != 0) {
874                         usocklnd_conn_kill_locked(conn);
875                         pthread_mutex_unlock(&conn->uc_lock);
876                         goto find_or_create_conn_failed;
877                 }
878         }
879         pthread_mutex_unlock(&conn->uc_lock);
880
881         usocklnd_conn_addref(conn);
882         pthread_mutex_unlock(&peer->up_lock);
883
884         *connp = conn;
885         return 0;
886
887   find_or_create_conn_failed:
888         pthread_mutex_unlock(&peer->up_lock);
889         return rc;
890 }
891
892 void
893 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
894 {
895         peer->up_conns[idx] = conn;
896         peer->up_errored    = 0; /* this new fresh conn will try
897                                   * revitalize even stale errored peer */
898 }
899
900 int
901 usocklnd_invert_type(int type)
902 {
903         switch (type)
904         {
905         case SOCKLND_CONN_ANY:
906         case SOCKLND_CONN_CONTROL:
907                 return (type);
908         case SOCKLND_CONN_BULK_IN:
909                 return SOCKLND_CONN_BULK_OUT;
910         case SOCKLND_CONN_BULK_OUT:
911                 return SOCKLND_CONN_BULK_IN;
912         default:
913                 return SOCKLND_CONN_NONE;
914         }
915 }
916
917 void
918 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
919 {
920         pthread_mutex_lock(&conn->uc_lock);
921         if (conn->uc_state != UC_DEAD)
922                 conn->uc_state = new_state;
923         pthread_mutex_unlock(&conn->uc_lock);
924 }
925
926 /* NB: peer is locked by caller */
927 void
928 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
929                              usock_conn_t *skip_conn)
930 {
931         int i;
932
933         if (!peer->up_incrn_is_set) {
934                 peer->up_incarnation = incrn;
935                 peer->up_incrn_is_set = 1;
936                 return;
937         }
938
939         if (peer->up_incarnation == incrn)
940                 return;
941
942         peer->up_incarnation = incrn;
943
944         for (i = 0; i < N_CONN_TYPES; i++) {
945                 usock_conn_t *conn = peer->up_conns[i];
946
947                 if (conn == NULL || conn == skip_conn)
948                         continue;
949
950                 pthread_mutex_lock(&conn->uc_lock);
951                 LASSERT (conn->uc_peer == peer);
952                 conn->uc_peer = NULL;
953                 peer->up_conns[i] = NULL;
954                 if (conn->uc_state != UC_DEAD)
955                         usocklnd_conn_kill_locked(conn);
956                 pthread_mutex_unlock(&conn->uc_lock);
957
958                 usocklnd_conn_decref(conn);
959                 usocklnd_peer_decref(peer);
960         }
961 }
962
963 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
964  * MAGIC part of hello and set uc_rx_state
965  */
966 void
967 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
968 {
969         LASSERT(conn->uc_rx_hello != NULL);
970
971         conn->uc_rx_niov = 1;
972         conn->uc_rx_iov = conn->uc_rx_iova;
973         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
974         conn->uc_rx_iov[0].iov_len =
975                 conn->uc_rx_nob_wanted =
976                 conn->uc_rx_nob_left =
977                 sizeof(conn->uc_rx_hello->kshm_magic);
978
979         conn->uc_rx_state = UC_RX_HELLO_MAGIC;
980
981         conn->uc_rx_flag = 1; /* waiting for incoming hello */
982         conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
983 }
984
985 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
986  * VERSION part of hello and set uc_rx_state
987  */
988 void
989 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
990 {
991         LASSERT(conn->uc_rx_hello != NULL);
992
993         conn->uc_rx_niov = 1;
994         conn->uc_rx_iov = conn->uc_rx_iova;
995         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
996         conn->uc_rx_iov[0].iov_len =
997                 conn->uc_rx_nob_wanted =
998                 conn->uc_rx_nob_left =
999                 sizeof(conn->uc_rx_hello->kshm_version);
1000
1001         conn->uc_rx_state = UC_RX_HELLO_VERSION;
1002 }
1003
1004 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
1005  * the rest  of hello and set uc_rx_state
1006  */
1007 void
1008 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
1009 {
1010         LASSERT(conn->uc_rx_hello != NULL);
1011
1012         conn->uc_rx_niov = 1;
1013         conn->uc_rx_iov = conn->uc_rx_iova;
1014         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
1015         conn->uc_rx_iov[0].iov_len =
1016                 conn->uc_rx_nob_wanted =
1017                 conn->uc_rx_nob_left =
1018                 offsetof(ksock_hello_msg_t, kshm_ips) -
1019                 offsetof(ksock_hello_msg_t, kshm_src_nid);
1020
1021         conn->uc_rx_state = UC_RX_HELLO_BODY;
1022 }
1023
1024 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1025  * array of IPs and set uc_rx_state
1026  */
1027 void
1028 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1029 {
1030         LASSERT(conn->uc_rx_hello != NULL);
1031
1032         conn->uc_rx_niov = 1;
1033         conn->uc_rx_iov = conn->uc_rx_iova;
1034         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1035         conn->uc_rx_iov[0].iov_len =
1036                 conn->uc_rx_nob_wanted =
1037                 conn->uc_rx_nob_left =
1038                 conn->uc_rx_hello->kshm_nips *
1039                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1040
1041         conn->uc_rx_state = UC_RX_HELLO_IPS;
1042 }
1043
1044 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1045  * LNET header and set uc_rx_state
1046  */
1047 void
1048 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1049 {
1050         conn->uc_rx_niov = 1;
1051         conn->uc_rx_iov = conn->uc_rx_iova;
1052         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
1053         conn->uc_rx_iov[0].iov_len =
1054                 conn->uc_rx_nob_wanted =
1055                 conn->uc_rx_nob_left =
1056                 sizeof(ksock_lnet_msg_t);
1057
1058         conn->uc_rx_state = UC_RX_LNET_HEADER;
1059         conn->uc_rx_flag = 1;
1060 }
1061
1062 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1063  * KSM header and set uc_rx_state
1064  */
1065 void
1066 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1067 {
1068         conn->uc_rx_niov = 1;
1069         conn->uc_rx_iov = conn->uc_rx_iova;
1070         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
1071         conn->uc_rx_iov[0].iov_len =
1072                 conn->uc_rx_nob_wanted =
1073                 conn->uc_rx_nob_left =
1074                 offsetof(ksock_msg_t, ksm_u);
1075
1076         conn->uc_rx_state = UC_RX_KSM_HEADER;
1077         conn->uc_rx_flag = 0;
1078 }
1079
1080 /* RX state transition to UC_RX_SKIPPING: update RX part for
1081  * skipping and set uc_rx_state
1082  */
1083 void
1084 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1085 {
1086         static char    skip_buffer[4096];
1087
1088         int            nob;
1089         unsigned int   niov = 0;
1090         int            skipped = 0;
1091         int            nob_to_skip = conn->uc_rx_nob_left;
1092
1093         LASSERT(nob_to_skip != 0);
1094
1095         conn->uc_rx_iov = conn->uc_rx_iova;
1096
1097         /* Set up to skip as much as possible now.  If there's more left
1098          * (ran out of iov entries) we'll get called again */
1099
1100         do {
1101                 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1102
1103                 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1104                 conn->uc_rx_iov[niov].iov_len  = nob;
1105                 niov++;
1106                 skipped += nob;
1107                 nob_to_skip -=nob;
1108
1109         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1110                  niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1111
1112         conn->uc_rx_niov = niov;
1113         conn->uc_rx_nob_wanted = skipped;
1114
1115         conn->uc_rx_state = UC_RX_SKIPPING;
1116 }