Whamcloud - gitweb
b=17167 libcfs: ensure all libcfs exported symbols to have cfs_ prefix
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/conn.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42
43 /* Return 1 if the conn is timed out, 0 else */
44 int
45 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
46 {
47         if (conn->uc_tx_flag && /* sending is in progress */
48             cfs_time_aftereq(current_time, conn->uc_tx_deadline))
49                 return 1;
50
51         if (conn->uc_rx_flag && /* receiving is in progress */
52             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
53                 return 1;
54
55         return 0;
56 }
57
58 void
59 usocklnd_conn_kill(usock_conn_t *conn)
60 {
61         pthread_mutex_lock(&conn->uc_lock);
62         if (conn->uc_state != UC_DEAD)
63                 usocklnd_conn_kill_locked(conn);
64         pthread_mutex_unlock(&conn->uc_lock);
65 }
66
67 /* Mark the conn as DEAD and schedule its deletion */
68 void
69 usocklnd_conn_kill_locked(usock_conn_t *conn)
70 {
71         conn->uc_rx_flag = conn->uc_tx_flag = 0;
72         conn->uc_state = UC_DEAD;
73         usocklnd_add_killrequest(conn);
74 }
75
76 usock_conn_t *
77 usocklnd_conn_allocate()
78 {
79         usock_conn_t        *conn;
80         usock_pollrequest_t *pr;
81
82         LIBCFS_ALLOC (pr, sizeof(*pr));
83         if (pr == NULL)
84                 return NULL;
85
86         LIBCFS_ALLOC (conn, sizeof(*conn));
87         if (conn == NULL) {
88                 LIBCFS_FREE (pr, sizeof(*pr));
89                 return NULL;
90         }
91         memset(conn, 0, sizeof(*conn));
92         conn->uc_preq = pr;
93
94         LIBCFS_ALLOC (conn->uc_rx_hello,
95                       offsetof(ksock_hello_msg_t,
96                                kshm_ips[LNET_MAX_INTERFACES]));
97         if (conn->uc_rx_hello == NULL) {
98                 LIBCFS_FREE (pr, sizeof(*pr));
99                 LIBCFS_FREE (conn, sizeof(*conn));
100                 return NULL;
101         }
102
103         return conn;
104 }
105
106 void
107 usocklnd_conn_free(usock_conn_t *conn)
108 {
109         usock_pollrequest_t *pr = conn->uc_preq;
110
111         if (pr != NULL)
112                 LIBCFS_FREE (pr, sizeof(*pr));
113
114         if (conn->uc_rx_hello != NULL)
115                 LIBCFS_FREE (conn->uc_rx_hello,
116                              offsetof(ksock_hello_msg_t,
117                                       kshm_ips[LNET_MAX_INTERFACES]));
118
119         LIBCFS_FREE (conn, sizeof(*conn));
120 }
121
122 void
123 usocklnd_tear_peer_conn(usock_conn_t *conn)
124 {
125         usock_peer_t     *peer = conn->uc_peer;
126         int               idx = usocklnd_type2idx(conn->uc_type);
127         lnet_ni_t        *ni;
128         lnet_process_id_t id;
129         int               decref_flag  = 0;
130         int               killall_flag = 0;
131         void             *rx_lnetmsg   = NULL; 
132         CFS_LIST_HEAD    (zombie_txs);
133
134         if (peer == NULL) /* nothing to tear */
135                 return;
136
137         pthread_mutex_lock(&peer->up_lock);
138         pthread_mutex_lock(&conn->uc_lock);
139
140         ni = peer->up_ni;
141         id = peer->up_peerid;
142
143         if (peer->up_conns[idx] == conn) {
144                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
145                         /* change state not to finalize twice */
146                         conn->uc_rx_state = UC_RX_KSM_HEADER;
147                         /* stash lnetmsg while holding locks */
148                         rx_lnetmsg = conn->uc_rx_lnetmsg;
149                 }
150
151                 /* we cannot finilize txs right now (bug #18844) */
152                 cfs_list_splice_init(&conn->uc_tx_list, &zombie_txs);
153
154                 peer->up_conns[idx] = NULL;
155                 conn->uc_peer = NULL;
156                 decref_flag = 1;
157
158                 if(conn->uc_errored && !peer->up_errored)
159                         peer->up_errored = killall_flag = 1;
160
161                 /* prevent queueing new txs to this conn */
162                 conn->uc_errored = 1;
163         }
164
165         pthread_mutex_unlock(&conn->uc_lock);
166
167         if (killall_flag)
168                 usocklnd_del_conns_locked(peer);
169
170         pthread_mutex_unlock(&peer->up_lock);
171
172         if (!decref_flag)
173                 return;
174
175         if (rx_lnetmsg != NULL)
176                 lnet_finalize(ni, rx_lnetmsg, -EIO);
177         
178         usocklnd_destroy_txlist(ni, &zombie_txs);
179
180         usocklnd_conn_decref(conn);
181         usocklnd_peer_decref(peer);
182
183         usocklnd_check_peer_stale(ni, id);
184 }
185
186 /* Remove peer from hash list if all up_conns[i] is NULL &&
187  * hash table is the only consumer of the peer */
188 void
189 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
190 {
191         usock_peer_t *peer;
192
193         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
194         peer = usocklnd_find_peer_locked(ni, id);
195
196         if (peer == NULL) {
197                 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
198                 return;
199         }
200
201         if (cfs_mt_atomic_read(&peer->up_refcount) == 2) {
202                 int i;
203                 for (i = 0; i < N_CONN_TYPES; i++)
204                         LASSERT (peer->up_conns[i] == NULL);
205
206                 cfs_list_del(&peer->up_list);
207
208                 if (peer->up_errored &&
209                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
210                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
211                                      cfs_time_seconds(peer->up_last_alive));
212
213                 usocklnd_peer_decref(peer);
214         }
215
216         usocklnd_peer_decref(peer);
217         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
218 }
219
220 /* Returns 0 on success, <0 else */
221 int
222 usocklnd_create_passive_conn(lnet_ni_t *ni,
223                              cfs_socket_t *sock, usock_conn_t **connp)
224 {
225         int           rc;
226         __u32         peer_ip;
227         int           peer_port;
228         usock_conn_t *conn;
229
230         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
231         if (rc)
232                 return rc;
233
234         LASSERT (peer_port >= 0); /* uc_peer_port is u16 */
235
236         rc = usocklnd_set_sock_options(sock);
237         if (rc)
238                 return rc;
239
240         conn = usocklnd_conn_allocate();
241         if (conn == NULL)
242                 return -ENOMEM;
243
244         usocklnd_rx_hellomagic_state_transition(conn);
245
246         conn->uc_sock = sock;
247         conn->uc_peer_ip = peer_ip;
248         conn->uc_peer_port = peer_port;
249         conn->uc_state = UC_RECEIVING_HELLO;
250         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
251         conn->uc_ni = ni;
252         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
253         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
254         pthread_mutex_init(&conn->uc_lock, NULL);
255         cfs_mt_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
256
257         *connp = conn;
258         return 0;
259 }
260
261 /* Returns 0 on success, <0 else */
262 int
263 usocklnd_create_active_conn(usock_peer_t *peer, int type,
264                             usock_conn_t **connp)
265 {
266         int           rc;
267         cfs_socket_t *sock;
268         usock_conn_t *conn;
269         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
270         __u16         dst_port = lnet_acceptor_port();
271
272         conn = usocklnd_conn_allocate();
273         if (conn == NULL)
274                 return -ENOMEM;
275
276         conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
277                                                         peer->up_peerid.nid);
278         if (conn->uc_tx_hello == NULL) {
279                 usocklnd_conn_free(conn);
280                 return -ENOMEM;
281         }
282
283         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
284                 rc = usocklnd_connect_cli_mode(&sock, dst_ip, dst_port);
285         else
286                 rc = usocklnd_connect_srv_mode(&sock, dst_ip, dst_port);
287
288         if (rc) {
289                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
290                 usocklnd_conn_free(conn);
291                 return rc;
292         }
293
294         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
295         conn->uc_tx_flag     = 1;
296
297         conn->uc_sock       = sock;
298         conn->uc_peer_ip    = dst_ip;
299         conn->uc_peer_port  = dst_port;
300         conn->uc_type       = type;
301         conn->uc_activeflag = 1;
302         conn->uc_state      = UC_CONNECTING;
303         conn->uc_pt_idx     = usocklnd_ip2pt_idx(dst_ip);
304         conn->uc_ni         = NULL;
305         conn->uc_peerid     = peer->up_peerid;
306         conn->uc_peer       = peer;
307
308         usocklnd_peer_addref(peer);
309         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
310         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
311         pthread_mutex_init(&conn->uc_lock, NULL);
312         cfs_mt_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
313
314         *connp = conn;
315         return 0;
316 }
317
318 /* Returns 0 on success, <0 else */
319 int
320 usocklnd_connect_srv_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
321 {
322         __u16         port;
323         cfs_socket_t *sock;
324         int           rc;
325         int           fatal;
326
327         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
328              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
329              port--) {
330                 /* Iterate through reserved ports. */
331                 rc = libcfs_sock_create(&sock, &fatal, 0, port);
332                 if (rc) {
333                         if (fatal)
334                                 return rc;
335                         continue;
336                 }
337
338                 rc = usocklnd_set_sock_options(sock);
339                 if (rc) {
340                         libcfs_sock_release(sock);
341                         return rc;
342                 }
343
344                 rc = libcfs_sock_connect(sock, dst_ip, dst_port);
345                 if (rc == 0) {
346                         *sockp = sock;
347                         return 0;
348                 }
349
350                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
351                         libcfs_sock_release(sock);
352                         return rc;
353                 }
354
355                 libcfs_sock_release(sock);
356         }
357
358         CERROR("Can't bind to any reserved port\n");
359         return rc;
360 }
361
362 /* Returns 0 on success, <0 else */
363 int
364 usocklnd_connect_cli_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
365 {
366         cfs_socket_t *sock;
367         int           rc;
368         int           fatal;
369
370         rc = libcfs_sock_create(&sock, &fatal, 0, 0);
371         if (rc)
372                 return rc;
373
374         rc = usocklnd_set_sock_options(sock);
375         if (rc) {
376                 libcfs_sock_release(sock);
377                 return rc;
378         }
379
380         rc = libcfs_sock_connect(sock, dst_ip, dst_port);
381         if (rc) {
382                 libcfs_sock_release(sock);
383                 return rc;
384         }
385
386         *sockp = sock;
387         return 0;
388 }
389
390 int
391 usocklnd_set_sock_options(cfs_socket_t *sock)
392 {
393         int rc;
394
395         rc = libcfs_sock_set_nagle(sock, usock_tuns.ut_socknagle);
396         if (rc)
397                 return rc;
398
399         if (usock_tuns.ut_sockbufsiz) {
400                 rc = libcfs_sock_set_bufsiz(sock, usock_tuns.ut_sockbufsiz);
401                 if (rc)
402                         return rc;
403         }
404
405         return libcfs_fcntl_nonblock(sock);
406 }
407
408 usock_tx_t *
409 usocklnd_create_noop_tx(__u64 cookie)
410 {
411         usock_tx_t *tx;
412
413         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
414         if (tx == NULL)
415                 return NULL;
416
417         tx->tx_size = sizeof(usock_tx_t);
418         tx->tx_lnetmsg = NULL;
419
420         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
421         tx->tx_msg.ksm_zc_cookies[1] = cookie;
422
423         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
424         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
425                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
426         tx->tx_iov = tx->tx_iova;
427         tx->tx_niov = 1;
428
429         return tx;
430 }
431
432 usock_tx_t *
433 usocklnd_create_tx(lnet_msg_t *lntmsg)
434 {
435         usock_tx_t   *tx;
436         unsigned int  payload_niov = lntmsg->msg_niov;
437         struct iovec *payload_iov = lntmsg->msg_iov;
438         unsigned int  payload_offset = lntmsg->msg_offset;
439         unsigned int  payload_nob = lntmsg->msg_len;
440         int           size = offsetof(usock_tx_t,
441                                       tx_iova[1 + payload_niov]);
442
443         LIBCFS_ALLOC (tx, size);
444         if (tx == NULL)
445                 return NULL;
446
447         tx->tx_size = size;
448         tx->tx_lnetmsg = lntmsg;
449
450         tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
451
452         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
453         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
454         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
455         tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
456         tx->tx_iov = tx->tx_iova;
457
458         tx->tx_niov = 1 +
459                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
460                                  payload_niov, payload_iov,
461                                  payload_offset, payload_nob);
462
463         return tx;
464 }
465
466 void
467 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
468                         lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
469 {
470         usock_net_t *net = (usock_net_t *)ni->ni_data;
471
472         hello->kshm_magic       = LNET_PROTO_MAGIC;
473         hello->kshm_version     = KSOCK_PROTO_V2;
474         hello->kshm_nips        = 0;
475         hello->kshm_ctype       = type;
476
477         hello->kshm_dst_incarnation = 0; /* not used */
478         hello->kshm_src_incarnation = net->un_incarnation;
479
480         hello->kshm_src_pid = the_lnet.ln_pid;
481         hello->kshm_src_nid = ni->ni_nid;
482         hello->kshm_dst_nid = peer_nid;
483         hello->kshm_dst_pid = 0; /* not used */
484 }
485
486 usock_tx_t *
487 usocklnd_create_hello_tx(lnet_ni_t *ni,
488                          int type, lnet_nid_t peer_nid)
489 {
490         usock_tx_t        *tx;
491         int                size;
492         ksock_hello_msg_t *hello;
493
494         size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
495         LIBCFS_ALLOC (tx, size);
496         if (tx == NULL)
497                 return NULL;
498
499         tx->tx_size = size;
500         tx->tx_lnetmsg = NULL;
501
502         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
503         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
504
505         tx->tx_iova[0].iov_base = (void *)hello;
506         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
507                 offsetof(ksock_hello_msg_t, kshm_ips);
508         tx->tx_iov = tx->tx_iova;
509         tx->tx_niov = 1;
510
511         return tx;
512 }
513
514 usock_tx_t *
515 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
516                             int type, lnet_nid_t peer_nid)
517 {
518         usock_tx_t              *tx;
519         int                      size;
520         lnet_acceptor_connreq_t *cr;
521         ksock_hello_msg_t       *hello;
522
523         size = sizeof(usock_tx_t) +
524                 sizeof(lnet_acceptor_connreq_t) +
525                 offsetof(ksock_hello_msg_t, kshm_ips);
526         LIBCFS_ALLOC (tx, size);
527         if (tx == NULL)
528                 return NULL;
529
530         tx->tx_size = size;
531         tx->tx_lnetmsg = NULL;
532
533         cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
534         memset(cr, 0, sizeof(*cr));
535         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
536         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
537         cr->acr_nid     = peer_nid;
538
539         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
540         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
541
542         tx->tx_iova[0].iov_base = (void *)cr;
543         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
544                 sizeof(lnet_acceptor_connreq_t) +
545                 offsetof(ksock_hello_msg_t, kshm_ips);
546         tx->tx_iov = tx->tx_iova;
547         tx->tx_niov = 1;
548
549         return tx;
550 }
551
552 void
553 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
554 {
555         lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
556         int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
557
558         LASSERT (ni != NULL || lnetmsg == NULL);
559
560         LIBCFS_FREE (tx, tx->tx_size);
561
562         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
563                 lnet_finalize(ni, lnetmsg, rc);
564 }
565
566 void
567 usocklnd_destroy_txlist(lnet_ni_t *ni, cfs_list_t *txlist)
568 {
569         usock_tx_t *tx;
570
571         while (!cfs_list_empty(txlist)) {
572                 tx = cfs_list_entry(txlist->next, usock_tx_t, tx_list);
573                 cfs_list_del(&tx->tx_list);
574
575                 usocklnd_destroy_tx(ni, tx);
576         }
577 }
578
579 void
580 usocklnd_destroy_zcack_list(cfs_list_t *zcack_list)
581 {
582         usock_zc_ack_t *zcack;
583
584         while (!cfs_list_empty(zcack_list)) {
585                 zcack = cfs_list_entry(zcack_list->next, usock_zc_ack_t,
586                                        zc_list);
587                 cfs_list_del(&zcack->zc_list);
588
589                 LIBCFS_FREE (zcack, sizeof(*zcack));
590         }
591 }
592
593 void
594 usocklnd_destroy_peer(usock_peer_t *peer)
595 {
596         usock_net_t *net = peer->up_ni->ni_data;
597         int          i;
598
599         for (i = 0; i < N_CONN_TYPES; i++)
600                 LASSERT (peer->up_conns[i] == NULL);
601
602         LIBCFS_FREE (peer, sizeof (*peer));
603
604         pthread_mutex_lock(&net->un_lock);
605         if(--net->un_peercount == 0)
606                 pthread_cond_signal(&net->un_cond);
607         pthread_mutex_unlock(&net->un_lock);
608 }
609
610 void
611 usocklnd_destroy_conn(usock_conn_t *conn)
612 {
613         LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
614
615         if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
616                 LASSERT (conn->uc_peer != NULL);
617                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
618         }
619
620         if (!cfs_list_empty(&conn->uc_tx_list)) {
621                 LASSERT (conn->uc_peer != NULL);
622                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
623         }
624
625         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
626
627         if (conn->uc_peer != NULL)
628                 usocklnd_peer_decref(conn->uc_peer);
629
630         if (conn->uc_ni != NULL)
631                 lnet_ni_decref(conn->uc_ni);
632
633         if (conn->uc_tx_hello)
634                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
635
636         usocklnd_conn_free(conn);
637 }
638
639 int
640 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
641 {
642         int nob;
643
644         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
645                 return SOCKLND_CONN_ANY;
646
647         nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
648
649         if (nob >= usock_tuns.ut_min_bulk)
650                 return SOCKLND_CONN_BULK_OUT;
651         else
652                 return SOCKLND_CONN_CONTROL;
653 }
654
655 int usocklnd_type2idx(int type)
656 {
657         switch (type) {
658         case SOCKLND_CONN_ANY:
659         case SOCKLND_CONN_CONTROL:
660                 return 0;
661         case SOCKLND_CONN_BULK_IN:
662                 return 1;
663         case SOCKLND_CONN_BULK_OUT:
664                 return 2;
665         default:
666                 LBUG();
667         }
668 }
669
670 usock_peer_t *
671 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
672 {
673         cfs_list_t       *peer_list = usocklnd_nid2peerlist(id.nid);
674         cfs_list_t       *tmp;
675         usock_peer_t     *peer;
676
677         cfs_list_for_each (tmp, peer_list) {
678
679                 peer = cfs_list_entry (tmp, usock_peer_t, up_list);
680
681                 if (peer->up_ni != ni)
682                         continue;
683
684                 if (peer->up_peerid.nid != id.nid ||
685                     peer->up_peerid.pid != id.pid)
686                         continue;
687
688                 usocklnd_peer_addref(peer);
689                 return peer;
690         }
691         return (NULL);
692 }
693
694 int
695 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
696                      usock_peer_t **peerp)
697 {
698         usock_net_t  *net = ni->ni_data;
699         usock_peer_t *peer;
700         int           i;
701
702         LIBCFS_ALLOC (peer, sizeof (*peer));
703         if (peer == NULL)
704                 return -ENOMEM;
705
706         for (i = 0; i < N_CONN_TYPES; i++)
707                 peer->up_conns[i] = NULL;
708
709         peer->up_peerid       = id;
710         peer->up_ni           = ni;
711         peer->up_incrn_is_set = 0;
712         peer->up_errored      = 0;
713         peer->up_last_alive   = 0;
714         cfs_mt_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
715         pthread_mutex_init(&peer->up_lock, NULL);
716
717         pthread_mutex_lock(&net->un_lock);
718         net->un_peercount++;
719         pthread_mutex_unlock(&net->un_lock);
720
721         *peerp = peer;
722         return 0;
723 }
724
725 /* Safely create new peer if needed. Save result in *peerp.
726  * Returns 0 on success, <0 else */
727 int
728 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
729                              usock_peer_t **peerp)
730 {
731         int           rc;
732         usock_peer_t *peer;
733         usock_peer_t *peer2;
734         usock_net_t  *net = ni->ni_data;
735
736         pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
737         peer = usocklnd_find_peer_locked(ni, id);
738         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
739
740         if (peer != NULL)
741                 goto find_or_create_peer_done;
742
743         rc = usocklnd_create_peer(ni, id, &peer);
744         if (rc)
745                 return rc;
746
747         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
748         peer2 = usocklnd_find_peer_locked(ni, id);
749         if (peer2 == NULL) {
750                 if (net->un_shutdown) {
751                         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
752                         usocklnd_peer_decref(peer); /* should destroy peer */
753                         CERROR("Can't create peer: network shutdown\n");
754                         return -ESHUTDOWN;
755                 }
756
757                 /* peer table will take 1 of my refs on peer */
758                 usocklnd_peer_addref(peer);
759                 cfs_list_add_tail (&peer->up_list,
760                                    usocklnd_nid2peerlist(id.nid));
761         } else {
762                 usocklnd_peer_decref(peer); /* should destroy peer */
763                 peer = peer2;
764         }
765         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
766
767   find_or_create_peer_done:
768         *peerp = peer;
769         return 0;
770 }
771
772 /* NB: both peer and conn locks are held */
773 static int
774 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
775 {
776         if (conn->uc_state == UC_READY &&
777             cfs_list_empty(&conn->uc_tx_list) &&
778             cfs_list_empty(&conn->uc_zcack_list) &&
779             !conn->uc_sending) {
780                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
781                                                   POLLOUT);
782                 if (rc != 0)
783                         return rc;
784         }
785
786         cfs_list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
787         return 0;
788 }
789
790 /* NB: both peer and conn locks are held
791  * NB: if sending isn't in progress.  the caller *MUST* send tx
792  * immediately after we'll return */
793 static void
794 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
795                     int *send_immediately)
796 {
797         if (conn->uc_state == UC_READY &&
798             cfs_list_empty(&conn->uc_tx_list) &&
799             cfs_list_empty(&conn->uc_zcack_list) &&
800             !conn->uc_sending) {
801                 conn->uc_sending = 1;
802                 *send_immediately = 1;
803                 return;
804         }
805
806         *send_immediately = 0;
807         cfs_list_add_tail(&tx->tx_list, &conn->uc_tx_list);
808 }
809
810 /* Safely create new conn if needed. Save result in *connp.
811  * Returns 0 on success, <0 else */
812 int
813 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
814                              usock_conn_t **connp,
815                              usock_tx_t *tx, usock_zc_ack_t *zc_ack,
816                              int *send_immediately)
817 {
818         usock_conn_t *conn;
819         int           idx;
820         int           rc;
821         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
822
823         if (userflag)
824                 type = SOCKLND_CONN_ANY;
825
826         idx = usocklnd_type2idx(type);
827
828         pthread_mutex_lock(&peer->up_lock);
829         if (peer->up_conns[idx] != NULL) {
830                 conn = peer->up_conns[idx];
831                 LASSERT(conn->uc_type == type);
832         } else {
833                 if (userflag) {
834                         CERROR("Refusing to create a connection to "
835                                "userspace process %s\n",
836                                libcfs_id2str(peer->up_peerid));
837                         rc = -EHOSTUNREACH;
838                         goto find_or_create_conn_failed;
839                 }
840
841                 rc = usocklnd_create_active_conn(peer, type, &conn);
842                 if (rc) {
843                         peer->up_errored = 1;
844                         usocklnd_del_conns_locked(peer);
845                         goto find_or_create_conn_failed;
846                 }
847
848                 /* peer takes 1 of conn refcount */
849                 usocklnd_link_conn_to_peer(conn, peer, idx);
850
851                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
852                 if (rc) {
853                         peer->up_conns[idx] = NULL;
854                         usocklnd_conn_decref(conn); /* should destroy conn */
855                         goto find_or_create_conn_failed;
856                 }
857                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
858         }
859
860         pthread_mutex_lock(&conn->uc_lock);
861         LASSERT(conn->uc_peer == peer);
862
863         LASSERT(tx == NULL || zc_ack == NULL);
864         if (tx != NULL) {
865                 /* usocklnd_tear_peer_conn() could signal us stop queueing */
866                 if (conn->uc_errored) {
867                         rc = -EIO;
868                         pthread_mutex_unlock(&conn->uc_lock);
869                         goto find_or_create_conn_failed;
870                 }
871
872                 usocklnd_enqueue_tx(conn, tx, send_immediately);
873         } else {
874                 rc = usocklnd_enqueue_zcack(conn, zc_ack);
875                 if (rc != 0) {
876                         usocklnd_conn_kill_locked(conn);
877                         pthread_mutex_unlock(&conn->uc_lock);
878                         goto find_or_create_conn_failed;
879                 }
880         }
881         pthread_mutex_unlock(&conn->uc_lock);
882
883         usocklnd_conn_addref(conn);
884         pthread_mutex_unlock(&peer->up_lock);
885
886         *connp = conn;
887         return 0;
888
889   find_or_create_conn_failed:
890         pthread_mutex_unlock(&peer->up_lock);
891         return rc;
892 }
893
894 void
895 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
896 {
897         peer->up_conns[idx] = conn;
898         peer->up_errored    = 0; /* this new fresh conn will try
899                                   * revitalize even stale errored peer */
900 }
901
902 int
903 usocklnd_invert_type(int type)
904 {
905         switch (type)
906         {
907         case SOCKLND_CONN_ANY:
908         case SOCKLND_CONN_CONTROL:
909                 return (type);
910         case SOCKLND_CONN_BULK_IN:
911                 return SOCKLND_CONN_BULK_OUT;
912         case SOCKLND_CONN_BULK_OUT:
913                 return SOCKLND_CONN_BULK_IN;
914         default:
915                 return SOCKLND_CONN_NONE;
916         }
917 }
918
919 void
920 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
921 {
922         pthread_mutex_lock(&conn->uc_lock);
923         if (conn->uc_state != UC_DEAD)
924                 conn->uc_state = new_state;
925         pthread_mutex_unlock(&conn->uc_lock);
926 }
927
928 /* NB: peer is locked by caller */
929 void
930 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
931                              usock_conn_t *skip_conn)
932 {
933         int i;
934
935         if (!peer->up_incrn_is_set) {
936                 peer->up_incarnation = incrn;
937                 peer->up_incrn_is_set = 1;
938                 return;
939         }
940
941         if (peer->up_incarnation == incrn)
942                 return;
943
944         peer->up_incarnation = incrn;
945
946         for (i = 0; i < N_CONN_TYPES; i++) {
947                 usock_conn_t *conn = peer->up_conns[i];
948
949                 if (conn == NULL || conn == skip_conn)
950                         continue;
951
952                 pthread_mutex_lock(&conn->uc_lock);
953                 LASSERT (conn->uc_peer == peer);
954                 conn->uc_peer = NULL;
955                 peer->up_conns[i] = NULL;
956                 if (conn->uc_state != UC_DEAD)
957                         usocklnd_conn_kill_locked(conn);
958                 pthread_mutex_unlock(&conn->uc_lock);
959
960                 usocklnd_conn_decref(conn);
961                 usocklnd_peer_decref(peer);
962         }
963 }
964
965 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
966  * MAGIC part of hello and set uc_rx_state
967  */
968 void
969 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
970 {
971         LASSERT(conn->uc_rx_hello != NULL);
972
973         conn->uc_rx_niov = 1;
974         conn->uc_rx_iov = conn->uc_rx_iova;
975         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
976         conn->uc_rx_iov[0].iov_len =
977                 conn->uc_rx_nob_wanted =
978                 conn->uc_rx_nob_left =
979                 sizeof(conn->uc_rx_hello->kshm_magic);
980
981         conn->uc_rx_state = UC_RX_HELLO_MAGIC;
982
983         conn->uc_rx_flag = 1; /* waiting for incoming hello */
984         conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
985 }
986
987 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
988  * VERSION part of hello and set uc_rx_state
989  */
990 void
991 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
992 {
993         LASSERT(conn->uc_rx_hello != NULL);
994
995         conn->uc_rx_niov = 1;
996         conn->uc_rx_iov = conn->uc_rx_iova;
997         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
998         conn->uc_rx_iov[0].iov_len =
999                 conn->uc_rx_nob_wanted =
1000                 conn->uc_rx_nob_left =
1001                 sizeof(conn->uc_rx_hello->kshm_version);
1002
1003         conn->uc_rx_state = UC_RX_HELLO_VERSION;
1004 }
1005
1006 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
1007  * the rest  of hello and set uc_rx_state
1008  */
1009 void
1010 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
1011 {
1012         LASSERT(conn->uc_rx_hello != NULL);
1013
1014         conn->uc_rx_niov = 1;
1015         conn->uc_rx_iov = conn->uc_rx_iova;
1016         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
1017         conn->uc_rx_iov[0].iov_len =
1018                 conn->uc_rx_nob_wanted =
1019                 conn->uc_rx_nob_left =
1020                 offsetof(ksock_hello_msg_t, kshm_ips) -
1021                 offsetof(ksock_hello_msg_t, kshm_src_nid);
1022
1023         conn->uc_rx_state = UC_RX_HELLO_BODY;
1024 }
1025
1026 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1027  * array of IPs and set uc_rx_state
1028  */
1029 void
1030 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1031 {
1032         LASSERT(conn->uc_rx_hello != NULL);
1033
1034         conn->uc_rx_niov = 1;
1035         conn->uc_rx_iov = conn->uc_rx_iova;
1036         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1037         conn->uc_rx_iov[0].iov_len =
1038                 conn->uc_rx_nob_wanted =
1039                 conn->uc_rx_nob_left =
1040                 conn->uc_rx_hello->kshm_nips *
1041                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1042
1043         conn->uc_rx_state = UC_RX_HELLO_IPS;
1044 }
1045
1046 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1047  * LNET header and set uc_rx_state
1048  */
1049 void
1050 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1051 {
1052         conn->uc_rx_niov = 1;
1053         conn->uc_rx_iov = conn->uc_rx_iova;
1054         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
1055         conn->uc_rx_iov[0].iov_len =
1056                 conn->uc_rx_nob_wanted =
1057                 conn->uc_rx_nob_left =
1058                 sizeof(ksock_lnet_msg_t);
1059
1060         conn->uc_rx_state = UC_RX_LNET_HEADER;
1061         conn->uc_rx_flag = 1;
1062 }
1063
1064 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1065  * KSM header and set uc_rx_state
1066  */
1067 void
1068 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1069 {
1070         conn->uc_rx_niov = 1;
1071         conn->uc_rx_iov = conn->uc_rx_iova;
1072         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
1073         conn->uc_rx_iov[0].iov_len =
1074                 conn->uc_rx_nob_wanted =
1075                 conn->uc_rx_nob_left =
1076                 offsetof(ksock_msg_t, ksm_u);
1077
1078         conn->uc_rx_state = UC_RX_KSM_HEADER;
1079         conn->uc_rx_flag = 0;
1080 }
1081
1082 /* RX state transition to UC_RX_SKIPPING: update RX part for
1083  * skipping and set uc_rx_state
1084  */
1085 void
1086 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1087 {
1088         static char    skip_buffer[4096];
1089
1090         int            nob;
1091         unsigned int   niov = 0;
1092         int            skipped = 0;
1093         int            nob_to_skip = conn->uc_rx_nob_left;
1094
1095         LASSERT(nob_to_skip != 0);
1096
1097         conn->uc_rx_iov = conn->uc_rx_iova;
1098
1099         /* Set up to skip as much as possible now.  If there's more left
1100          * (ran out of iov entries) we'll get called again */
1101
1102         do {
1103                 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1104
1105                 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1106                 conn->uc_rx_iov[niov].iov_len  = nob;
1107                 niov++;
1108                 skipped += nob;
1109                 nob_to_skip -=nob;
1110
1111         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1112                  niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1113
1114         conn->uc_rx_niov = niov;
1115         conn->uc_rx_nob_wanted = skipped;
1116
1117         conn->uc_rx_state = UC_RX_SKIPPING;
1118 }