Whamcloud - gitweb
4d6e01d63e7ef807919f4339571a697bd37f1c0f
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/conn.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42
43 /* Return 1 if the conn is timed out, 0 else */
44 int
45 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
46 {
47         if (conn->uc_tx_flag && /* sending is in progress */
48             cfs_time_aftereq(current_time, conn->uc_tx_deadline))
49                 return 1;
50
51         if (conn->uc_rx_flag && /* receiving is in progress */
52             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
53                 return 1;
54
55         return 0;
56 }
57
58 void
59 usocklnd_conn_kill(usock_conn_t *conn)
60 {
61         pthread_mutex_lock(&conn->uc_lock);
62         if (conn->uc_state != UC_DEAD)
63                 usocklnd_conn_kill_locked(conn);
64         pthread_mutex_unlock(&conn->uc_lock);
65 }
66
67 /* Mark the conn as DEAD and schedule its deletion */
68 void
69 usocklnd_conn_kill_locked(usock_conn_t *conn)
70 {
71         conn->uc_rx_flag = conn->uc_tx_flag = 0;
72         conn->uc_state = UC_DEAD;
73         usocklnd_add_killrequest(conn);
74 }
75
76 usock_conn_t *
77 usocklnd_conn_allocate()
78 {
79         usock_conn_t        *conn;
80         usock_pollrequest_t *pr;
81
82         LIBCFS_ALLOC (pr, sizeof(*pr));
83         if (pr == NULL)
84                 return NULL;
85
86         LIBCFS_ALLOC (conn, sizeof(*conn));
87         if (conn == NULL) {
88                 LIBCFS_FREE (pr, sizeof(*pr));
89                 return NULL;
90         }
91         memset(conn, 0, sizeof(*conn));
92         conn->uc_preq = pr;
93
94         LIBCFS_ALLOC (conn->uc_rx_hello,
95                       offsetof(ksock_hello_msg_t,
96                                kshm_ips[LNET_MAX_INTERFACES]));
97         if (conn->uc_rx_hello == NULL) {
98                 LIBCFS_FREE (pr, sizeof(*pr));
99                 LIBCFS_FREE (conn, sizeof(*conn));
100                 return NULL;
101         }
102
103         return conn;
104 }
105
106 void
107 usocklnd_conn_free(usock_conn_t *conn)
108 {
109         usock_pollrequest_t *pr = conn->uc_preq;
110
111         if (pr != NULL)
112                 LIBCFS_FREE (pr, sizeof(*pr));
113
114         if (conn->uc_rx_hello != NULL)
115                 LIBCFS_FREE (conn->uc_rx_hello,
116                              offsetof(ksock_hello_msg_t,
117                                       kshm_ips[LNET_MAX_INTERFACES]));
118
119         LIBCFS_FREE (conn, sizeof(*conn));
120 }
121
122 void
123 usocklnd_tear_peer_conn(usock_conn_t *conn)
124 {
125         usock_peer_t     *peer = conn->uc_peer;
126         int               idx = usocklnd_type2idx(conn->uc_type);
127         lnet_ni_t        *ni;
128         lnet_process_id_t id;
129         int               decref_flag  = 0;
130         int               killall_flag = 0;
131
132         if (peer == NULL) /* nothing to tear */
133                 return;
134
135         pthread_mutex_lock(&peer->up_lock);
136         pthread_mutex_lock(&conn->uc_lock);
137
138         ni = peer->up_ni;
139         id = peer->up_peerid;
140
141         if (peer->up_conns[idx] == conn) {
142                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
143                         /* change state not to finalize twice */
144                         conn->uc_rx_state = UC_RX_KSM_HEADER;
145                         lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
146                 }
147
148                 usocklnd_destroy_txlist(peer->up_ni,
149                                         &conn->uc_tx_list);
150
151                 peer->up_conns[idx] = NULL;
152                 conn->uc_peer = NULL;
153                 decref_flag = 1;
154
155                 if(conn->uc_errored && !peer->up_errored)
156                         peer->up_errored = killall_flag = 1;
157         }
158
159         pthread_mutex_unlock(&conn->uc_lock);
160
161         if (killall_flag)
162                 usocklnd_del_conns_locked(peer);
163
164         pthread_mutex_unlock(&peer->up_lock);
165
166         if (!decref_flag)
167                 return;
168
169         usocklnd_conn_decref(conn);
170         usocklnd_peer_decref(peer);
171
172         usocklnd_check_peer_stale(ni, id);
173 }
174
175 /* Remove peer from hash list if all up_conns[i] is NULL &&
176  * hash table is the only consumer of the peer */
177 void
178 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
179 {
180         usock_peer_t *peer;
181
182         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
183         peer = usocklnd_find_peer_locked(ni, id);
184
185         if (peer == NULL) {
186                 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
187                 return;
188         }
189
190         if (cfs_atomic_read(&peer->up_refcount) == 2) {
191                 int i;
192                 for (i = 0; i < N_CONN_TYPES; i++)
193                         LASSERT (peer->up_conns[i] == NULL);
194
195                 list_del(&peer->up_list);
196
197                 if (peer->up_errored &&
198                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
199                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
200                                      cfs_time_seconds(peer->up_last_alive));
201
202                 usocklnd_peer_decref(peer);
203         }
204
205         usocklnd_peer_decref(peer);
206         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
207 }
208
209 /* Returns 0 on success, <0 else */
210 int
211 usocklnd_create_passive_conn(lnet_ni_t *ni,
212                              cfs_socket_t *sock, usock_conn_t **connp)
213 {
214         int           rc;
215         __u32         peer_ip;
216         int           peer_port;
217         usock_conn_t *conn;
218
219         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
220         if (rc)
221                 return rc;
222
223         LASSERT (peer_port >= 0); /* uc_peer_port is u16 */
224
225         rc = usocklnd_set_sock_options(sock);
226         if (rc)
227                 return rc;
228
229         conn = usocklnd_conn_allocate();
230         if (conn == NULL)
231                 return -ENOMEM;
232
233         usocklnd_rx_hellomagic_state_transition(conn);
234
235         conn->uc_sock = sock;
236         conn->uc_peer_ip = peer_ip;
237         conn->uc_peer_port = peer_port;
238         conn->uc_state = UC_RECEIVING_HELLO;
239         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
240         conn->uc_ni = ni;
241         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
242         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
243         pthread_mutex_init(&conn->uc_lock, NULL);
244         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
245
246         *connp = conn;
247         return 0;
248 }
249
250 /* Returns 0 on success, <0 else */
251 int
252 usocklnd_create_active_conn(usock_peer_t *peer, int type,
253                             usock_conn_t **connp)
254 {
255         int           rc;
256         cfs_socket_t *sock;
257         usock_conn_t *conn;
258         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
259         __u16         dst_port = lnet_acceptor_port();
260
261         conn = usocklnd_conn_allocate();
262         if (conn == NULL)
263                 return -ENOMEM;
264
265         conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
266                                                         peer->up_peerid.nid);
267         if (conn->uc_tx_hello == NULL) {
268                 usocklnd_conn_free(conn);
269                 return -ENOMEM;
270         }
271
272         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
273                 rc = usocklnd_connect_cli_mode(&sock, dst_ip, dst_port);
274         else
275                 rc = usocklnd_connect_srv_mode(&sock, dst_ip, dst_port);
276
277         if (rc) {
278                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
279                 usocklnd_conn_free(conn);
280                 return rc;
281         }
282
283         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
284         conn->uc_tx_flag     = 1;
285
286         conn->uc_sock       = sock;
287         conn->uc_peer_ip    = dst_ip;
288         conn->uc_peer_port  = dst_port;
289         conn->uc_type       = type;
290         conn->uc_activeflag = 1;
291         conn->uc_state      = UC_CONNECTING;
292         conn->uc_pt_idx     = usocklnd_ip2pt_idx(dst_ip);
293         conn->uc_ni         = NULL;
294         conn->uc_peerid     = peer->up_peerid;
295         conn->uc_peer       = peer;
296
297         usocklnd_peer_addref(peer);
298         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
299         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
300         pthread_mutex_init(&conn->uc_lock, NULL);
301         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
302
303         *connp = conn;
304         return 0;
305 }
306
307 /* Returns 0 on success, <0 else */
308 int
309 usocklnd_connect_srv_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
310 {
311         __u16         port;
312         cfs_socket_t *sock;
313         int           rc;
314         int           fatal;
315
316         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
317              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
318              port--) {
319                 /* Iterate through reserved ports. */
320                 rc = libcfs_sock_create(&sock, &fatal, 0, port);
321                 if (rc) {
322                         if (fatal)
323                                 return rc;
324                         continue;
325                 }
326
327                 rc = usocklnd_set_sock_options(sock);
328                 if (rc) {
329                         libcfs_sock_release(sock);
330                         return rc;
331                 }
332
333                 rc = libcfs_sock_connect(sock, dst_ip, dst_port);
334                 if (rc == 0) {
335                         *sockp = sock;
336                         return 0;
337                 }
338
339                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
340                         libcfs_sock_release(sock);
341                         return rc;
342                 }
343
344                 libcfs_sock_release(sock);
345         }
346
347         CERROR("Can't bind to any reserved port\n");
348         return rc;
349 }
350
351 /* Returns 0 on success, <0 else */
352 int
353 usocklnd_connect_cli_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
354 {
355         cfs_socket_t *sock;
356         int           rc;
357         int           fatal;
358
359         rc = libcfs_sock_create(&sock, &fatal, 0, 0);
360         if (rc)
361                 return rc;
362
363         rc = usocklnd_set_sock_options(sock);
364         if (rc) {
365                 libcfs_sock_release(sock);
366                 return rc;
367         }
368
369         rc = libcfs_sock_connect(sock, dst_ip, dst_port);
370         if (rc) {
371                 libcfs_sock_release(sock);
372                 return rc;
373         }
374
375         *sockp = sock;
376         return 0;
377 }
378
379 int
380 usocklnd_set_sock_options(cfs_socket_t *sock)
381 {
382         int rc;
383
384         rc = libcfs_sock_set_nagle(sock, usock_tuns.ut_socknagle);
385         if (rc)
386                 return rc;
387
388         if (usock_tuns.ut_sockbufsiz) {
389                 rc = libcfs_sock_set_bufsiz(sock, usock_tuns.ut_sockbufsiz);
390                 if (rc)
391                         return rc;
392         }
393
394         return libcfs_fcntl_nonblock(sock);
395 }
396
397 usock_tx_t *
398 usocklnd_create_noop_tx(__u64 cookie)
399 {
400         usock_tx_t *tx;
401
402         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
403         if (tx == NULL)
404                 return NULL;
405
406         tx->tx_size = sizeof(usock_tx_t);
407         tx->tx_lnetmsg = NULL;
408
409         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
410         tx->tx_msg.ksm_zc_cookies[1] = cookie;
411
412         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
413         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
414                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
415         tx->tx_iov = tx->tx_iova;
416         tx->tx_niov = 1;
417
418         return tx;
419 }
420
421 usock_tx_t *
422 usocklnd_create_tx(lnet_msg_t *lntmsg)
423 {
424         usock_tx_t   *tx;
425         unsigned int  payload_niov = lntmsg->msg_niov;
426         struct iovec *payload_iov = lntmsg->msg_iov;
427         unsigned int  payload_offset = lntmsg->msg_offset;
428         unsigned int  payload_nob = lntmsg->msg_len;
429         int           size = offsetof(usock_tx_t,
430                                       tx_iova[1 + payload_niov]);
431
432         LIBCFS_ALLOC (tx, size);
433         if (tx == NULL)
434                 return NULL;
435
436         tx->tx_size = size;
437         tx->tx_lnetmsg = lntmsg;
438
439         tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
440
441         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
442         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
443         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
444         tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
445         tx->tx_iov = tx->tx_iova;
446
447         tx->tx_niov = 1 +
448                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
449                                  payload_niov, payload_iov,
450                                  payload_offset, payload_nob);
451
452         return tx;
453 }
454
455 void
456 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
457                         lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
458 {
459         usock_net_t *net = (usock_net_t *)ni->ni_data;
460
461         hello->kshm_magic       = LNET_PROTO_MAGIC;
462         hello->kshm_version     = KSOCK_PROTO_V2;
463         hello->kshm_nips        = 0;
464         hello->kshm_ctype       = type;
465
466         hello->kshm_dst_incarnation = 0; /* not used */
467         hello->kshm_src_incarnation = net->un_incarnation;
468
469         hello->kshm_src_pid = the_lnet.ln_pid;
470         hello->kshm_src_nid = ni->ni_nid;
471         hello->kshm_dst_nid = peer_nid;
472         hello->kshm_dst_pid = 0; /* not used */
473 }
474
475 usock_tx_t *
476 usocklnd_create_hello_tx(lnet_ni_t *ni,
477                          int type, lnet_nid_t peer_nid)
478 {
479         usock_tx_t        *tx;
480         int                size;
481         ksock_hello_msg_t *hello;
482
483         size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
484         LIBCFS_ALLOC (tx, size);
485         if (tx == NULL)
486                 return NULL;
487
488         tx->tx_size = size;
489         tx->tx_lnetmsg = NULL;
490
491         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
492         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
493
494         tx->tx_iova[0].iov_base = (void *)hello;
495         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
496                 offsetof(ksock_hello_msg_t, kshm_ips);
497         tx->tx_iov = tx->tx_iova;
498         tx->tx_niov = 1;
499
500         return tx;
501 }
502
503 usock_tx_t *
504 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
505                             int type, lnet_nid_t peer_nid)
506 {
507         usock_tx_t              *tx;
508         int                      size;
509         lnet_acceptor_connreq_t *cr;
510         ksock_hello_msg_t       *hello;
511
512         size = sizeof(usock_tx_t) +
513                 sizeof(lnet_acceptor_connreq_t) +
514                 offsetof(ksock_hello_msg_t, kshm_ips);
515         LIBCFS_ALLOC (tx, size);
516         if (tx == NULL)
517                 return NULL;
518
519         tx->tx_size = size;
520         tx->tx_lnetmsg = NULL;
521
522         cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
523         memset(cr, 0, sizeof(*cr));
524         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
525         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
526         cr->acr_nid     = peer_nid;
527
528         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
529         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
530
531         tx->tx_iova[0].iov_base = (void *)cr;
532         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
533                 sizeof(lnet_acceptor_connreq_t) +
534                 offsetof(ksock_hello_msg_t, kshm_ips);
535         tx->tx_iov = tx->tx_iova;
536         tx->tx_niov = 1;
537
538         return tx;
539 }
540
541 void
542 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
543 {
544         lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
545         int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
546
547         LASSERT (ni != NULL || lnetmsg == NULL);
548
549         LIBCFS_FREE (tx, tx->tx_size);
550
551         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
552                 lnet_finalize(ni, lnetmsg, rc);
553 }
554
555 void
556 usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
557 {
558         usock_tx_t *tx;
559
560         while (!list_empty(txlist)) {
561                 tx = list_entry(txlist->next, usock_tx_t, tx_list);
562                 list_del(&tx->tx_list);
563
564                 usocklnd_destroy_tx(ni, tx);
565         }
566 }
567
568 void
569 usocklnd_destroy_zcack_list(struct list_head *zcack_list)
570 {
571         usock_zc_ack_t *zcack;
572
573         while (!list_empty(zcack_list)) {
574                 zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
575                 list_del(&zcack->zc_list);
576
577                 LIBCFS_FREE (zcack, sizeof(*zcack));
578         }
579 }
580
581 void
582 usocklnd_destroy_peer(usock_peer_t *peer)
583 {
584         usock_net_t *net = peer->up_ni->ni_data;
585         int          i;
586
587         for (i = 0; i < N_CONN_TYPES; i++)
588                 LASSERT (peer->up_conns[i] == NULL);
589
590         LIBCFS_FREE (peer, sizeof (*peer));
591
592         pthread_mutex_lock(&net->un_lock);
593         if(--net->un_peercount == 0)
594                 pthread_cond_signal(&net->un_cond);
595         pthread_mutex_unlock(&net->un_lock);
596 }
597
598 void
599 usocklnd_destroy_conn(usock_conn_t *conn)
600 {
601         LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
602
603         if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
604                 LASSERT (conn->uc_peer != NULL);
605                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
606         }
607
608         if (!list_empty(&conn->uc_tx_list)) {
609                 LASSERT (conn->uc_peer != NULL);
610                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
611         }
612
613         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
614
615         if (conn->uc_peer != NULL)
616                 usocklnd_peer_decref(conn->uc_peer);
617
618         if (conn->uc_ni != NULL)
619                 lnet_ni_decref(conn->uc_ni);
620
621         if (conn->uc_tx_hello)
622                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
623
624         usocklnd_conn_free(conn);
625 }
626
627 int
628 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
629 {
630         int nob;
631
632         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
633                 return SOCKLND_CONN_ANY;
634
635         nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
636
637         if (nob >= usock_tuns.ut_min_bulk)
638                 return SOCKLND_CONN_BULK_OUT;
639         else
640                 return SOCKLND_CONN_CONTROL;
641 }
642
643 int usocklnd_type2idx(int type)
644 {
645         switch (type) {
646         case SOCKLND_CONN_ANY:
647         case SOCKLND_CONN_CONTROL:
648                 return 0;
649         case SOCKLND_CONN_BULK_IN:
650                 return 1;
651         case SOCKLND_CONN_BULK_OUT:
652                 return 2;
653         default:
654                 LBUG();
655         }
656 }
657
658 usock_peer_t *
659 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
660 {
661         struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
662         struct list_head *tmp;
663         usock_peer_t     *peer;
664
665         list_for_each (tmp, peer_list) {
666
667                 peer = list_entry (tmp, usock_peer_t, up_list);
668
669                 if (peer->up_ni != ni)
670                         continue;
671
672                 if (peer->up_peerid.nid != id.nid ||
673                     peer->up_peerid.pid != id.pid)
674                         continue;
675
676                 usocklnd_peer_addref(peer);
677                 return peer;
678         }
679         return (NULL);
680 }
681
682 int
683 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
684                      usock_peer_t **peerp)
685 {
686         usock_net_t  *net = ni->ni_data;
687         usock_peer_t *peer;
688         int           i;
689
690         LIBCFS_ALLOC (peer, sizeof (*peer));
691         if (peer == NULL)
692                 return -ENOMEM;
693
694         for (i = 0; i < N_CONN_TYPES; i++)
695                 peer->up_conns[i] = NULL;
696
697         peer->up_peerid       = id;
698         peer->up_ni           = ni;
699         peer->up_incrn_is_set = 0;
700         peer->up_errored      = 0;
701         peer->up_last_alive   = 0;
702         cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
703         pthread_mutex_init(&peer->up_lock, NULL);
704
705         pthread_mutex_lock(&net->un_lock);
706         net->un_peercount++;
707         pthread_mutex_unlock(&net->un_lock);
708
709         *peerp = peer;
710         return 0;
711 }
712
713 /* Safely create new peer if needed. Save result in *peerp.
714  * Returns 0 on success, <0 else */
715 int
716 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
717                              usock_peer_t **peerp)
718 {
719         int           rc;
720         usock_peer_t *peer;
721         usock_peer_t *peer2;
722         usock_net_t  *net = ni->ni_data;
723
724         pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
725         peer = usocklnd_find_peer_locked(ni, id);
726         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
727
728         if (peer != NULL)
729                 goto find_or_create_peer_done;
730
731         rc = usocklnd_create_peer(ni, id, &peer);
732         if (rc)
733                 return rc;
734
735         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
736         peer2 = usocklnd_find_peer_locked(ni, id);
737         if (peer2 == NULL) {
738                 if (net->un_shutdown) {
739                         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
740                         usocklnd_peer_decref(peer); /* should destroy peer */
741                         CERROR("Can't create peer: network shutdown\n");
742                         return -ESHUTDOWN;
743                 }
744
745                 /* peer table will take 1 of my refs on peer */
746                 usocklnd_peer_addref(peer);
747                 list_add_tail (&peer->up_list,
748                                usocklnd_nid2peerlist(id.nid));
749         } else {
750                 usocklnd_peer_decref(peer); /* should destroy peer */
751                 peer = peer2;
752         }
753         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
754
755   find_or_create_peer_done:
756         *peerp = peer;
757         return 0;
758 }
759
760 /* NB: both peer and conn locks are held */
761 static int
762 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
763 {
764         if (conn->uc_state == UC_READY &&
765             list_empty(&conn->uc_tx_list) &&
766             list_empty(&conn->uc_zcack_list) &&
767             !conn->uc_sending) {
768                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
769                                                   POLLOUT);
770                 if (rc != 0)
771                         return rc;
772         }
773
774         list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
775         return 0;
776 }
777
778 /* NB: both peer and conn locks are held
779  * NB: if sending isn't in progress.  the caller *MUST* send tx
780  * immediately after we'll return */
781 static void
782 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
783                     int *send_immediately)
784 {
785         if (conn->uc_state == UC_READY &&
786             list_empty(&conn->uc_tx_list) &&
787             list_empty(&conn->uc_zcack_list) &&
788             !conn->uc_sending) {
789                 conn->uc_sending = 1;
790                 *send_immediately = 1;
791                 return;
792         }
793
794         *send_immediately = 0;
795         list_add_tail(&tx->tx_list, &conn->uc_tx_list);
796 }
797
798 /* Safely create new conn if needed. Save result in *connp.
799  * Returns 0 on success, <0 else */
800 int
801 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
802                              usock_conn_t **connp,
803                              usock_tx_t *tx, usock_zc_ack_t *zc_ack,
804                              int *send_immediately)
805 {
806         usock_conn_t *conn;
807         int           idx;
808         int           rc;
809         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
810
811         if (userflag)
812                 type = SOCKLND_CONN_ANY;
813
814         idx = usocklnd_type2idx(type);
815
816         pthread_mutex_lock(&peer->up_lock);
817         if (peer->up_conns[idx] != NULL) {
818                 conn = peer->up_conns[idx];
819                 LASSERT(conn->uc_type == type);
820         } else {
821                 if (userflag) {
822                         CERROR("Refusing to create a connection to "
823                                "userspace process %s\n",
824                                libcfs_id2str(peer->up_peerid));
825                         rc = -EHOSTUNREACH;
826                         goto find_or_create_conn_failed;
827                 }
828
829                 rc = usocklnd_create_active_conn(peer, type, &conn);
830                 if (rc) {
831                         peer->up_errored = 1;
832                         usocklnd_del_conns_locked(peer);
833                         goto find_or_create_conn_failed;
834                 }
835
836                 /* peer takes 1 of conn refcount */
837                 usocklnd_link_conn_to_peer(conn, peer, idx);
838
839                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
840                 if (rc) {
841                         peer->up_conns[idx] = NULL;
842                         usocklnd_conn_decref(conn); /* should destroy conn */
843                         goto find_or_create_conn_failed;
844                 }
845                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
846         }
847
848         pthread_mutex_lock(&conn->uc_lock);
849         LASSERT(conn->uc_peer == peer);
850
851         LASSERT(tx == NULL || zc_ack == NULL);
852         if (tx != NULL) {
853                 usocklnd_enqueue_tx(conn, tx, send_immediately);
854         } else {
855                 rc = usocklnd_enqueue_zcack(conn, zc_ack);
856                 if (rc != 0) {
857                         usocklnd_conn_kill_locked(conn);
858                         pthread_mutex_unlock(&conn->uc_lock);
859                         goto find_or_create_conn_failed;
860                 }
861         }
862         pthread_mutex_unlock(&conn->uc_lock);
863
864         usocklnd_conn_addref(conn);
865         pthread_mutex_unlock(&peer->up_lock);
866
867         *connp = conn;
868         return 0;
869
870   find_or_create_conn_failed:
871         pthread_mutex_unlock(&peer->up_lock);
872         return rc;
873 }
874
875 void
876 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
877 {
878         peer->up_conns[idx] = conn;
879         peer->up_errored    = 0; /* this new fresh conn will try
880                                   * revitalize even stale errored peer */
881 }
882
883 int
884 usocklnd_invert_type(int type)
885 {
886         switch (type)
887         {
888         case SOCKLND_CONN_ANY:
889         case SOCKLND_CONN_CONTROL:
890                 return (type);
891         case SOCKLND_CONN_BULK_IN:
892                 return SOCKLND_CONN_BULK_OUT;
893         case SOCKLND_CONN_BULK_OUT:
894                 return SOCKLND_CONN_BULK_IN;
895         default:
896                 return SOCKLND_CONN_NONE;
897         }
898 }
899
900 void
901 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
902 {
903         pthread_mutex_lock(&conn->uc_lock);
904         if (conn->uc_state != UC_DEAD)
905                 conn->uc_state = new_state;
906         pthread_mutex_unlock(&conn->uc_lock);
907 }
908
909 /* NB: peer is locked by caller */
910 void
911 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
912                              usock_conn_t *skip_conn)
913 {
914         int i;
915
916         if (!peer->up_incrn_is_set) {
917                 peer->up_incarnation = incrn;
918                 peer->up_incrn_is_set = 1;
919                 return;
920         }
921
922         if (peer->up_incarnation == incrn)
923                 return;
924
925         peer->up_incarnation = incrn;
926
927         for (i = 0; i < N_CONN_TYPES; i++) {
928                 usock_conn_t *conn = peer->up_conns[i];
929
930                 if (conn == NULL || conn == skip_conn)
931                         continue;
932
933                 pthread_mutex_lock(&conn->uc_lock);
934                 LASSERT (conn->uc_peer == peer);
935                 conn->uc_peer = NULL;
936                 peer->up_conns[i] = NULL;
937                 if (conn->uc_state != UC_DEAD)
938                         usocklnd_conn_kill_locked(conn);
939                 pthread_mutex_unlock(&conn->uc_lock);
940
941                 usocklnd_conn_decref(conn);
942                 usocklnd_peer_decref(peer);
943         }
944 }
945
946 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
947  * MAGIC part of hello and set uc_rx_state
948  */
949 void
950 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
951 {
952         LASSERT(conn->uc_rx_hello != NULL);
953
954         conn->uc_rx_niov = 1;
955         conn->uc_rx_iov = conn->uc_rx_iova;
956         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
957         conn->uc_rx_iov[0].iov_len =
958                 conn->uc_rx_nob_wanted =
959                 conn->uc_rx_nob_left =
960                 sizeof(conn->uc_rx_hello->kshm_magic);
961
962         conn->uc_rx_state = UC_RX_HELLO_MAGIC;
963
964         conn->uc_rx_flag = 1; /* waiting for incoming hello */
965         conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
966 }
967
968 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
969  * VERSION part of hello and set uc_rx_state
970  */
971 void
972 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
973 {
974         LASSERT(conn->uc_rx_hello != NULL);
975
976         conn->uc_rx_niov = 1;
977         conn->uc_rx_iov = conn->uc_rx_iova;
978         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
979         conn->uc_rx_iov[0].iov_len =
980                 conn->uc_rx_nob_wanted =
981                 conn->uc_rx_nob_left =
982                 sizeof(conn->uc_rx_hello->kshm_version);
983
984         conn->uc_rx_state = UC_RX_HELLO_VERSION;
985 }
986
987 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
988  * the rest  of hello and set uc_rx_state
989  */
990 void
991 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
992 {
993         LASSERT(conn->uc_rx_hello != NULL);
994
995         conn->uc_rx_niov = 1;
996         conn->uc_rx_iov = conn->uc_rx_iova;
997         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
998         conn->uc_rx_iov[0].iov_len =
999                 conn->uc_rx_nob_wanted =
1000                 conn->uc_rx_nob_left =
1001                 offsetof(ksock_hello_msg_t, kshm_ips) -
1002                 offsetof(ksock_hello_msg_t, kshm_src_nid);
1003
1004         conn->uc_rx_state = UC_RX_HELLO_BODY;
1005 }
1006
1007 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1008  * array of IPs and set uc_rx_state
1009  */
1010 void
1011 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1012 {
1013         LASSERT(conn->uc_rx_hello != NULL);
1014
1015         conn->uc_rx_niov = 1;
1016         conn->uc_rx_iov = conn->uc_rx_iova;
1017         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1018         conn->uc_rx_iov[0].iov_len =
1019                 conn->uc_rx_nob_wanted =
1020                 conn->uc_rx_nob_left =
1021                 conn->uc_rx_hello->kshm_nips *
1022                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1023
1024         conn->uc_rx_state = UC_RX_HELLO_IPS;
1025 }
1026
1027 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1028  * LNET header and set uc_rx_state
1029  */
1030 void
1031 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1032 {
1033         conn->uc_rx_niov = 1;
1034         conn->uc_rx_iov = conn->uc_rx_iova;
1035         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
1036         conn->uc_rx_iov[0].iov_len =
1037                 conn->uc_rx_nob_wanted =
1038                 conn->uc_rx_nob_left =
1039                 sizeof(ksock_lnet_msg_t);
1040
1041         conn->uc_rx_state = UC_RX_LNET_HEADER;
1042         conn->uc_rx_flag = 1;
1043 }
1044
1045 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1046  * KSM header and set uc_rx_state
1047  */
1048 void
1049 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1050 {
1051         conn->uc_rx_niov = 1;
1052         conn->uc_rx_iov = conn->uc_rx_iova;
1053         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
1054         conn->uc_rx_iov[0].iov_len =
1055                 conn->uc_rx_nob_wanted =
1056                 conn->uc_rx_nob_left =
1057                 offsetof(ksock_msg_t, ksm_u);
1058
1059         conn->uc_rx_state = UC_RX_KSM_HEADER;
1060         conn->uc_rx_flag = 0;
1061 }
1062
1063 /* RX state transition to UC_RX_SKIPPING: update RX part for
1064  * skipping and set uc_rx_state
1065  */
1066 void
1067 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1068 {
1069         static char    skip_buffer[4096];
1070
1071         int            nob;
1072         unsigned int   niov = 0;
1073         int            skipped = 0;
1074         int            nob_to_skip = conn->uc_rx_nob_left;
1075
1076         LASSERT(nob_to_skip != 0);
1077
1078         conn->uc_rx_iov = conn->uc_rx_iova;
1079
1080         /* Set up to skip as much as possible now.  If there's more left
1081          * (ran out of iov entries) we'll get called again */
1082
1083         do {
1084                 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1085
1086                 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1087                 conn->uc_rx_iov[niov].iov_len  = nob;
1088                 niov++;
1089                 skipped += nob;
1090                 nob_to_skip -=nob;
1091
1092         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1093                  niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1094
1095         conn->uc_rx_niov = niov;
1096         conn->uc_rx_nob_wanted = skipped;
1097
1098         conn->uc_rx_state = UC_RX_SKIPPING;
1099 }