Whamcloud - gitweb
Introduce .gitignore files.
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/conn.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42
43 /* Return 1 if the conn is timed out, 0 else */
44 int
45 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
46 {
47         if (conn->uc_tx_flag && /* sending is in progress */
48             cfs_time_aftereq(current_time, conn->uc_tx_deadline))
49                 return 1;
50
51         if (conn->uc_rx_flag && /* receiving is in progress */
52             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
53                 return 1;
54
55         return 0;
56 }
57
58 void
59 usocklnd_conn_kill(usock_conn_t *conn)
60 {
61         pthread_mutex_lock(&conn->uc_lock);
62         if (conn->uc_state != UC_DEAD)
63                 usocklnd_conn_kill_locked(conn);
64         pthread_mutex_unlock(&conn->uc_lock);
65 }
66
67 /* Mark the conn as DEAD and schedule its deletion */
68 void
69 usocklnd_conn_kill_locked(usock_conn_t *conn)
70 {
71         conn->uc_rx_flag = conn->uc_tx_flag = 0;
72         conn->uc_state = UC_DEAD;
73         usocklnd_add_killrequest(conn);
74 }
75
76 usock_conn_t *
77 usocklnd_conn_allocate()
78 {
79         usock_conn_t        *conn;
80         usock_pollrequest_t *pr;
81
82         LIBCFS_ALLOC (pr, sizeof(*pr));
83         if (pr == NULL)
84                 return NULL;
85
86         LIBCFS_ALLOC (conn, sizeof(*conn));
87         if (conn == NULL) {
88                 LIBCFS_FREE (pr, sizeof(*pr));
89                 return NULL;
90         }
91         memset(conn, 0, sizeof(*conn));
92         conn->uc_preq = pr;
93
94         LIBCFS_ALLOC (conn->uc_rx_hello,
95                       offsetof(ksock_hello_msg_t,
96                                kshm_ips[LNET_MAX_INTERFACES]));
97         if (conn->uc_rx_hello == NULL) {
98                 LIBCFS_FREE (pr, sizeof(*pr));
99                 LIBCFS_FREE (conn, sizeof(*conn));
100                 return NULL;
101         }
102
103         return conn;
104 }
105
106 void
107 usocklnd_conn_free(usock_conn_t *conn)
108 {
109         usock_pollrequest_t *pr = conn->uc_preq;
110
111         if (pr != NULL)
112                 LIBCFS_FREE (pr, sizeof(*pr));
113
114         if (conn->uc_rx_hello != NULL)
115                 LIBCFS_FREE (conn->uc_rx_hello,
116                              offsetof(ksock_hello_msg_t,
117                                       kshm_ips[LNET_MAX_INTERFACES]));
118
119         LIBCFS_FREE (conn, sizeof(*conn));
120 }
121
122 void
123 usocklnd_tear_peer_conn(usock_conn_t *conn)
124 {
125         usock_peer_t     *peer = conn->uc_peer;
126         int               idx = usocklnd_type2idx(conn->uc_type);
127         lnet_ni_t        *ni;
128         lnet_process_id_t id;
129         int               decref_flag  = 0;
130         int               killall_flag = 0;
131         void             *rx_lnetmsg   = NULL; 
132         CFS_LIST_HEAD    (zombie_txs);
133
134         if (peer == NULL) /* nothing to tear */
135                 return;
136
137         pthread_mutex_lock(&peer->up_lock);
138         pthread_mutex_lock(&conn->uc_lock);
139
140         ni = peer->up_ni;
141         id = peer->up_peerid;
142
143         if (peer->up_conns[idx] == conn) {
144                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
145                         /* change state not to finalize twice */
146                         conn->uc_rx_state = UC_RX_KSM_HEADER;
147                         /* stash lnetmsg while holding locks */
148                         rx_lnetmsg = conn->uc_rx_lnetmsg;
149                 }
150
151                 /* we cannot finilize txs right now (bug #18844) */
152                 list_splice_init(&conn->uc_tx_list, &zombie_txs);
153
154                 peer->up_conns[idx] = NULL;
155                 conn->uc_peer = NULL;
156                 decref_flag = 1;
157
158                 if(conn->uc_errored && !peer->up_errored)
159                         peer->up_errored = killall_flag = 1;
160
161                 /* prevent queueing new txs to this conn */
162                 conn->uc_errored = 1;
163         }
164
165         pthread_mutex_unlock(&conn->uc_lock);
166
167         if (killall_flag)
168                 usocklnd_del_conns_locked(peer);
169
170         pthread_mutex_unlock(&peer->up_lock);
171
172         if (!decref_flag)
173                 return;
174
175         if (rx_lnetmsg != NULL)
176                 lnet_finalize(ni, rx_lnetmsg, -EIO);
177         
178         usocklnd_destroy_txlist(ni, &zombie_txs);
179
180         usocklnd_conn_decref(conn);
181         usocklnd_peer_decref(peer);
182
183         usocklnd_check_peer_stale(ni, id);
184 }
185
186 /* Remove peer from hash list if all up_conns[i] is NULL &&
187  * hash table is the only consumer of the peer */
188 void
189 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
190 {
191         usock_peer_t *peer;
192
193         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
194         peer = usocklnd_find_peer_locked(ni, id);
195
196         if (peer == NULL) {
197                 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
198                 return;
199         }
200
201         if (cfs_atomic_read(&peer->up_refcount) == 2) {
202                 int i;
203                 for (i = 0; i < N_CONN_TYPES; i++)
204                         LASSERT (peer->up_conns[i] == NULL);
205
206                 list_del(&peer->up_list);
207
208                 if (peer->up_errored &&
209                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
210                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
211                                      cfs_time_seconds(peer->up_last_alive));
212
213                 usocklnd_peer_decref(peer);
214         }
215
216         usocklnd_peer_decref(peer);
217         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
218 }
219
220 /* Returns 0 on success, <0 else */
221 int
222 usocklnd_create_passive_conn(lnet_ni_t *ni,
223                              cfs_socket_t *sock, usock_conn_t **connp)
224 {
225         int           rc;
226         __u32         peer_ip;
227         int           peer_port;
228         usock_conn_t *conn;
229
230         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
231         if (rc)
232                 return rc;
233
234         LASSERT (peer_port >= 0); /* uc_peer_port is u16 */
235
236         rc = usocklnd_set_sock_options(sock);
237         if (rc)
238                 return rc;
239
240         conn = usocklnd_conn_allocate();
241         if (conn == NULL)
242                 return -ENOMEM;
243
244         usocklnd_rx_hellomagic_state_transition(conn);
245
246         conn->uc_sock = sock;
247         conn->uc_peer_ip = peer_ip;
248         conn->uc_peer_port = peer_port;
249         conn->uc_state = UC_RECEIVING_HELLO;
250         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
251         conn->uc_ni = ni;
252         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
253         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
254         pthread_mutex_init(&conn->uc_lock, NULL);
255         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
256
257         *connp = conn;
258         return 0;
259 }
260
261 /* Returns 0 on success, <0 else */
262 int
263 usocklnd_create_active_conn(usock_peer_t *peer, int type,
264                             usock_conn_t **connp)
265 {
266         int           rc;
267         cfs_socket_t *sock;
268         usock_conn_t *conn;
269         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
270         __u16         dst_port = lnet_acceptor_port();
271
272         conn = usocklnd_conn_allocate();
273         if (conn == NULL)
274                 return -ENOMEM;
275
276         conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
277                                                         peer->up_peerid.nid);
278         if (conn->uc_tx_hello == NULL) {
279                 usocklnd_conn_free(conn);
280                 return -ENOMEM;
281         }
282
283         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
284                 rc = usocklnd_connect_cli_mode(&sock, dst_ip, dst_port);
285         else
286                 rc = usocklnd_connect_srv_mode(&sock, dst_ip, dst_port);
287
288         if (rc) {
289                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
290                 usocklnd_conn_free(conn);
291                 return rc;
292         }
293
294         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
295         conn->uc_tx_flag     = 1;
296
297         conn->uc_sock       = sock;
298         conn->uc_peer_ip    = dst_ip;
299         conn->uc_peer_port  = dst_port;
300         conn->uc_type       = type;
301         conn->uc_activeflag = 1;
302         conn->uc_state      = UC_CONNECTING;
303         conn->uc_pt_idx     = usocklnd_ip2pt_idx(dst_ip);
304         conn->uc_ni         = NULL;
305         conn->uc_peerid     = peer->up_peerid;
306         conn->uc_peer       = peer;
307
308         usocklnd_peer_addref(peer);
309         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
310         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
311         pthread_mutex_init(&conn->uc_lock, NULL);
312         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
313
314         *connp = conn;
315         return 0;
316 }
317
318 /* Returns 0 on success, <0 else */
319 int
320 usocklnd_connect_srv_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
321 {
322         __u16         port;
323         cfs_socket_t *sock;
324         int           rc;
325         int           fatal;
326
327         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
328              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
329              port--) {
330                 /* Iterate through reserved ports. */
331                 rc = libcfs_sock_create(&sock, &fatal, 0, port);
332                 if (rc) {
333                         if (fatal)
334                                 return rc;
335                         continue;
336                 }
337
338                 rc = usocklnd_set_sock_options(sock);
339                 if (rc) {
340                         libcfs_sock_release(sock);
341                         return rc;
342                 }
343
344                 rc = libcfs_sock_connect(sock, dst_ip, dst_port);
345                 if (rc == 0) {
346                         *sockp = sock;
347                         return 0;
348                 }
349
350                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
351                         libcfs_sock_release(sock);
352                         return rc;
353                 }
354
355                 libcfs_sock_release(sock);
356         }
357
358         CERROR("Can't bind to any reserved port\n");
359         return rc;
360 }
361
362 /* Returns 0 on success, <0 else */
363 int
364 usocklnd_connect_cli_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
365 {
366         cfs_socket_t *sock;
367         int           rc;
368         int           fatal;
369
370         rc = libcfs_sock_create(&sock, &fatal, 0, 0);
371         if (rc)
372                 return rc;
373
374         rc = usocklnd_set_sock_options(sock);
375         if (rc) {
376                 libcfs_sock_release(sock);
377                 return rc;
378         }
379
380         rc = libcfs_sock_connect(sock, dst_ip, dst_port);
381         if (rc) {
382                 libcfs_sock_release(sock);
383                 return rc;
384         }
385
386         *sockp = sock;
387         return 0;
388 }
389
390 int
391 usocklnd_set_sock_options(cfs_socket_t *sock)
392 {
393         int rc;
394
395         rc = libcfs_sock_set_nagle(sock, usock_tuns.ut_socknagle);
396         if (rc)
397                 return rc;
398
399         if (usock_tuns.ut_sockbufsiz) {
400                 rc = libcfs_sock_set_bufsiz(sock, usock_tuns.ut_sockbufsiz);
401                 if (rc)
402                         return rc;
403         }
404
405         return libcfs_fcntl_nonblock(sock);
406 }
407
408 usock_tx_t *
409 usocklnd_create_noop_tx(__u64 cookie)
410 {
411         usock_tx_t *tx;
412
413         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
414         if (tx == NULL)
415                 return NULL;
416
417         tx->tx_size = sizeof(usock_tx_t);
418         tx->tx_lnetmsg = NULL;
419
420         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
421         tx->tx_msg.ksm_zc_cookies[1] = cookie;
422
423         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
424         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
425                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
426         tx->tx_iov = tx->tx_iova;
427         tx->tx_niov = 1;
428
429         return tx;
430 }
431
432 usock_tx_t *
433 usocklnd_create_tx(lnet_msg_t *lntmsg)
434 {
435         usock_tx_t   *tx;
436         unsigned int  payload_niov = lntmsg->msg_niov;
437         struct iovec *payload_iov = lntmsg->msg_iov;
438         unsigned int  payload_offset = lntmsg->msg_offset;
439         unsigned int  payload_nob = lntmsg->msg_len;
440         int           size = offsetof(usock_tx_t,
441                                       tx_iova[1 + payload_niov]);
442
443         LIBCFS_ALLOC (tx, size);
444         if (tx == NULL)
445                 return NULL;
446
447         tx->tx_size = size;
448         tx->tx_lnetmsg = lntmsg;
449
450         tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
451
452         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
453         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
454         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
455         tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
456         tx->tx_iov = tx->tx_iova;
457
458         tx->tx_niov = 1 +
459                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
460                                  payload_niov, payload_iov,
461                                  payload_offset, payload_nob);
462
463         return tx;
464 }
465
466 void
467 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
468                         lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
469 {
470         usock_net_t *net = (usock_net_t *)ni->ni_data;
471
472         hello->kshm_magic       = LNET_PROTO_MAGIC;
473         hello->kshm_version     = KSOCK_PROTO_V2;
474         hello->kshm_nips        = 0;
475         hello->kshm_ctype       = type;
476
477         hello->kshm_dst_incarnation = 0; /* not used */
478         hello->kshm_src_incarnation = net->un_incarnation;
479
480         hello->kshm_src_pid = the_lnet.ln_pid;
481         hello->kshm_src_nid = ni->ni_nid;
482         hello->kshm_dst_nid = peer_nid;
483         hello->kshm_dst_pid = 0; /* not used */
484 }
485
486 usock_tx_t *
487 usocklnd_create_hello_tx(lnet_ni_t *ni,
488                          int type, lnet_nid_t peer_nid)
489 {
490         usock_tx_t        *tx;
491         int                size;
492         ksock_hello_msg_t *hello;
493
494         size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
495         LIBCFS_ALLOC (tx, size);
496         if (tx == NULL)
497                 return NULL;
498
499         tx->tx_size = size;
500         tx->tx_lnetmsg = NULL;
501
502         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
503         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
504
505         tx->tx_iova[0].iov_base = (void *)hello;
506         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
507                 offsetof(ksock_hello_msg_t, kshm_ips);
508         tx->tx_iov = tx->tx_iova;
509         tx->tx_niov = 1;
510
511         return tx;
512 }
513
514 usock_tx_t *
515 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
516                             int type, lnet_nid_t peer_nid)
517 {
518         usock_tx_t              *tx;
519         int                      size;
520         lnet_acceptor_connreq_t *cr;
521         ksock_hello_msg_t       *hello;
522
523         size = sizeof(usock_tx_t) +
524                 sizeof(lnet_acceptor_connreq_t) +
525                 offsetof(ksock_hello_msg_t, kshm_ips);
526         LIBCFS_ALLOC (tx, size);
527         if (tx == NULL)
528                 return NULL;
529
530         tx->tx_size = size;
531         tx->tx_lnetmsg = NULL;
532
533         cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
534         memset(cr, 0, sizeof(*cr));
535         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
536         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
537         cr->acr_nid     = peer_nid;
538
539         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
540         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
541
542         tx->tx_iova[0].iov_base = (void *)cr;
543         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
544                 sizeof(lnet_acceptor_connreq_t) +
545                 offsetof(ksock_hello_msg_t, kshm_ips);
546         tx->tx_iov = tx->tx_iova;
547         tx->tx_niov = 1;
548
549         return tx;
550 }
551
552 void
553 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
554 {
555         lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
556         int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
557
558         LASSERT (ni != NULL || lnetmsg == NULL);
559
560         LIBCFS_FREE (tx, tx->tx_size);
561
562         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
563                 lnet_finalize(ni, lnetmsg, rc);
564 }
565
566 void
567 usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
568 {
569         usock_tx_t *tx;
570
571         while (!list_empty(txlist)) {
572                 tx = list_entry(txlist->next, usock_tx_t, tx_list);
573                 list_del(&tx->tx_list);
574
575                 usocklnd_destroy_tx(ni, tx);
576         }
577 }
578
579 void
580 usocklnd_destroy_zcack_list(struct list_head *zcack_list)
581 {
582         usock_zc_ack_t *zcack;
583
584         while (!list_empty(zcack_list)) {
585                 zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
586                 list_del(&zcack->zc_list);
587
588                 LIBCFS_FREE (zcack, sizeof(*zcack));
589         }
590 }
591
592 void
593 usocklnd_destroy_peer(usock_peer_t *peer)
594 {
595         usock_net_t *net = peer->up_ni->ni_data;
596         int          i;
597
598         for (i = 0; i < N_CONN_TYPES; i++)
599                 LASSERT (peer->up_conns[i] == NULL);
600
601         LIBCFS_FREE (peer, sizeof (*peer));
602
603         pthread_mutex_lock(&net->un_lock);
604         if(--net->un_peercount == 0)
605                 pthread_cond_signal(&net->un_cond);
606         pthread_mutex_unlock(&net->un_lock);
607 }
608
609 void
610 usocklnd_destroy_conn(usock_conn_t *conn)
611 {
612         LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
613
614         if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
615                 LASSERT (conn->uc_peer != NULL);
616                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
617         }
618
619         if (!list_empty(&conn->uc_tx_list)) {
620                 LASSERT (conn->uc_peer != NULL);
621                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
622         }
623
624         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
625
626         if (conn->uc_peer != NULL)
627                 usocklnd_peer_decref(conn->uc_peer);
628
629         if (conn->uc_ni != NULL)
630                 lnet_ni_decref(conn->uc_ni);
631
632         if (conn->uc_tx_hello)
633                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
634
635         usocklnd_conn_free(conn);
636 }
637
638 int
639 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
640 {
641         int nob;
642
643         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
644                 return SOCKLND_CONN_ANY;
645
646         nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
647
648         if (nob >= usock_tuns.ut_min_bulk)
649                 return SOCKLND_CONN_BULK_OUT;
650         else
651                 return SOCKLND_CONN_CONTROL;
652 }
653
654 int usocklnd_type2idx(int type)
655 {
656         switch (type) {
657         case SOCKLND_CONN_ANY:
658         case SOCKLND_CONN_CONTROL:
659                 return 0;
660         case SOCKLND_CONN_BULK_IN:
661                 return 1;
662         case SOCKLND_CONN_BULK_OUT:
663                 return 2;
664         default:
665                 LBUG();
666         }
667 }
668
669 usock_peer_t *
670 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
671 {
672         struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
673         struct list_head *tmp;
674         usock_peer_t     *peer;
675
676         list_for_each (tmp, peer_list) {
677
678                 peer = list_entry (tmp, usock_peer_t, up_list);
679
680                 if (peer->up_ni != ni)
681                         continue;
682
683                 if (peer->up_peerid.nid != id.nid ||
684                     peer->up_peerid.pid != id.pid)
685                         continue;
686
687                 usocklnd_peer_addref(peer);
688                 return peer;
689         }
690         return (NULL);
691 }
692
693 int
694 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
695                      usock_peer_t **peerp)
696 {
697         usock_net_t  *net = ni->ni_data;
698         usock_peer_t *peer;
699         int           i;
700
701         LIBCFS_ALLOC (peer, sizeof (*peer));
702         if (peer == NULL)
703                 return -ENOMEM;
704
705         for (i = 0; i < N_CONN_TYPES; i++)
706                 peer->up_conns[i] = NULL;
707
708         peer->up_peerid       = id;
709         peer->up_ni           = ni;
710         peer->up_incrn_is_set = 0;
711         peer->up_errored      = 0;
712         peer->up_last_alive   = 0;
713         cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
714         pthread_mutex_init(&peer->up_lock, NULL);
715
716         pthread_mutex_lock(&net->un_lock);
717         net->un_peercount++;
718         pthread_mutex_unlock(&net->un_lock);
719
720         *peerp = peer;
721         return 0;
722 }
723
724 /* Safely create new peer if needed. Save result in *peerp.
725  * Returns 0 on success, <0 else */
726 int
727 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
728                              usock_peer_t **peerp)
729 {
730         int           rc;
731         usock_peer_t *peer;
732         usock_peer_t *peer2;
733         usock_net_t  *net = ni->ni_data;
734
735         pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
736         peer = usocklnd_find_peer_locked(ni, id);
737         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
738
739         if (peer != NULL)
740                 goto find_or_create_peer_done;
741
742         rc = usocklnd_create_peer(ni, id, &peer);
743         if (rc)
744                 return rc;
745
746         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
747         peer2 = usocklnd_find_peer_locked(ni, id);
748         if (peer2 == NULL) {
749                 if (net->un_shutdown) {
750                         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
751                         usocklnd_peer_decref(peer); /* should destroy peer */
752                         CERROR("Can't create peer: network shutdown\n");
753                         return -ESHUTDOWN;
754                 }
755
756                 /* peer table will take 1 of my refs on peer */
757                 usocklnd_peer_addref(peer);
758                 list_add_tail (&peer->up_list,
759                                usocklnd_nid2peerlist(id.nid));
760         } else {
761                 usocklnd_peer_decref(peer); /* should destroy peer */
762                 peer = peer2;
763         }
764         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
765
766   find_or_create_peer_done:
767         *peerp = peer;
768         return 0;
769 }
770
771 /* NB: both peer and conn locks are held */
772 static int
773 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
774 {
775         if (conn->uc_state == UC_READY &&
776             list_empty(&conn->uc_tx_list) &&
777             list_empty(&conn->uc_zcack_list) &&
778             !conn->uc_sending) {
779                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
780                                                   POLLOUT);
781                 if (rc != 0)
782                         return rc;
783         }
784
785         list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
786         return 0;
787 }
788
789 /* NB: both peer and conn locks are held
790  * NB: if sending isn't in progress.  the caller *MUST* send tx
791  * immediately after we'll return */
792 static void
793 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
794                     int *send_immediately)
795 {
796         if (conn->uc_state == UC_READY &&
797             list_empty(&conn->uc_tx_list) &&
798             list_empty(&conn->uc_zcack_list) &&
799             !conn->uc_sending) {
800                 conn->uc_sending = 1;
801                 *send_immediately = 1;
802                 return;
803         }
804
805         *send_immediately = 0;
806         list_add_tail(&tx->tx_list, &conn->uc_tx_list);
807 }
808
809 /* Safely create new conn if needed. Save result in *connp.
810  * Returns 0 on success, <0 else */
811 int
812 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
813                              usock_conn_t **connp,
814                              usock_tx_t *tx, usock_zc_ack_t *zc_ack,
815                              int *send_immediately)
816 {
817         usock_conn_t *conn;
818         int           idx;
819         int           rc;
820         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
821
822         if (userflag)
823                 type = SOCKLND_CONN_ANY;
824
825         idx = usocklnd_type2idx(type);
826
827         pthread_mutex_lock(&peer->up_lock);
828         if (peer->up_conns[idx] != NULL) {
829                 conn = peer->up_conns[idx];
830                 LASSERT(conn->uc_type == type);
831         } else {
832                 if (userflag) {
833                         CERROR("Refusing to create a connection to "
834                                "userspace process %s\n",
835                                libcfs_id2str(peer->up_peerid));
836                         rc = -EHOSTUNREACH;
837                         goto find_or_create_conn_failed;
838                 }
839
840                 rc = usocklnd_create_active_conn(peer, type, &conn);
841                 if (rc) {
842                         peer->up_errored = 1;
843                         usocklnd_del_conns_locked(peer);
844                         goto find_or_create_conn_failed;
845                 }
846
847                 /* peer takes 1 of conn refcount */
848                 usocklnd_link_conn_to_peer(conn, peer, idx);
849
850                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
851                 if (rc) {
852                         peer->up_conns[idx] = NULL;
853                         usocklnd_conn_decref(conn); /* should destroy conn */
854                         goto find_or_create_conn_failed;
855                 }
856                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
857         }
858
859         pthread_mutex_lock(&conn->uc_lock);
860         LASSERT(conn->uc_peer == peer);
861
862         LASSERT(tx == NULL || zc_ack == NULL);
863         if (tx != NULL) {
864                 /* usocklnd_tear_peer_conn() could signal us stop queueing */
865                 if (conn->uc_errored) {
866                         rc = -EIO;
867                         pthread_mutex_unlock(&conn->uc_lock);
868                         goto find_or_create_conn_failed;
869                 }
870
871                 usocklnd_enqueue_tx(conn, tx, send_immediately);
872         } else {
873                 rc = usocklnd_enqueue_zcack(conn, zc_ack);
874                 if (rc != 0) {
875                         usocklnd_conn_kill_locked(conn);
876                         pthread_mutex_unlock(&conn->uc_lock);
877                         goto find_or_create_conn_failed;
878                 }
879         }
880         pthread_mutex_unlock(&conn->uc_lock);
881
882         usocklnd_conn_addref(conn);
883         pthread_mutex_unlock(&peer->up_lock);
884
885         *connp = conn;
886         return 0;
887
888   find_or_create_conn_failed:
889         pthread_mutex_unlock(&peer->up_lock);
890         return rc;
891 }
892
893 void
894 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
895 {
896         peer->up_conns[idx] = conn;
897         peer->up_errored    = 0; /* this new fresh conn will try
898                                   * revitalize even stale errored peer */
899 }
900
901 int
902 usocklnd_invert_type(int type)
903 {
904         switch (type)
905         {
906         case SOCKLND_CONN_ANY:
907         case SOCKLND_CONN_CONTROL:
908                 return (type);
909         case SOCKLND_CONN_BULK_IN:
910                 return SOCKLND_CONN_BULK_OUT;
911         case SOCKLND_CONN_BULK_OUT:
912                 return SOCKLND_CONN_BULK_IN;
913         default:
914                 return SOCKLND_CONN_NONE;
915         }
916 }
917
918 void
919 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
920 {
921         pthread_mutex_lock(&conn->uc_lock);
922         if (conn->uc_state != UC_DEAD)
923                 conn->uc_state = new_state;
924         pthread_mutex_unlock(&conn->uc_lock);
925 }
926
927 /* NB: peer is locked by caller */
928 void
929 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
930                              usock_conn_t *skip_conn)
931 {
932         int i;
933
934         if (!peer->up_incrn_is_set) {
935                 peer->up_incarnation = incrn;
936                 peer->up_incrn_is_set = 1;
937                 return;
938         }
939
940         if (peer->up_incarnation == incrn)
941                 return;
942
943         peer->up_incarnation = incrn;
944
945         for (i = 0; i < N_CONN_TYPES; i++) {
946                 usock_conn_t *conn = peer->up_conns[i];
947
948                 if (conn == NULL || conn == skip_conn)
949                         continue;
950
951                 pthread_mutex_lock(&conn->uc_lock);
952                 LASSERT (conn->uc_peer == peer);
953                 conn->uc_peer = NULL;
954                 peer->up_conns[i] = NULL;
955                 if (conn->uc_state != UC_DEAD)
956                         usocklnd_conn_kill_locked(conn);
957                 pthread_mutex_unlock(&conn->uc_lock);
958
959                 usocklnd_conn_decref(conn);
960                 usocklnd_peer_decref(peer);
961         }
962 }
963
964 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
965  * MAGIC part of hello and set uc_rx_state
966  */
967 void
968 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
969 {
970         LASSERT(conn->uc_rx_hello != NULL);
971
972         conn->uc_rx_niov = 1;
973         conn->uc_rx_iov = conn->uc_rx_iova;
974         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
975         conn->uc_rx_iov[0].iov_len =
976                 conn->uc_rx_nob_wanted =
977                 conn->uc_rx_nob_left =
978                 sizeof(conn->uc_rx_hello->kshm_magic);
979
980         conn->uc_rx_state = UC_RX_HELLO_MAGIC;
981
982         conn->uc_rx_flag = 1; /* waiting for incoming hello */
983         conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
984 }
985
986 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
987  * VERSION part of hello and set uc_rx_state
988  */
989 void
990 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
991 {
992         LASSERT(conn->uc_rx_hello != NULL);
993
994         conn->uc_rx_niov = 1;
995         conn->uc_rx_iov = conn->uc_rx_iova;
996         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
997         conn->uc_rx_iov[0].iov_len =
998                 conn->uc_rx_nob_wanted =
999                 conn->uc_rx_nob_left =
1000                 sizeof(conn->uc_rx_hello->kshm_version);
1001
1002         conn->uc_rx_state = UC_RX_HELLO_VERSION;
1003 }
1004
1005 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
1006  * the rest  of hello and set uc_rx_state
1007  */
1008 void
1009 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
1010 {
1011         LASSERT(conn->uc_rx_hello != NULL);
1012
1013         conn->uc_rx_niov = 1;
1014         conn->uc_rx_iov = conn->uc_rx_iova;
1015         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
1016         conn->uc_rx_iov[0].iov_len =
1017                 conn->uc_rx_nob_wanted =
1018                 conn->uc_rx_nob_left =
1019                 offsetof(ksock_hello_msg_t, kshm_ips) -
1020                 offsetof(ksock_hello_msg_t, kshm_src_nid);
1021
1022         conn->uc_rx_state = UC_RX_HELLO_BODY;
1023 }
1024
1025 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1026  * array of IPs and set uc_rx_state
1027  */
1028 void
1029 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1030 {
1031         LASSERT(conn->uc_rx_hello != NULL);
1032
1033         conn->uc_rx_niov = 1;
1034         conn->uc_rx_iov = conn->uc_rx_iova;
1035         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1036         conn->uc_rx_iov[0].iov_len =
1037                 conn->uc_rx_nob_wanted =
1038                 conn->uc_rx_nob_left =
1039                 conn->uc_rx_hello->kshm_nips *
1040                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1041
1042         conn->uc_rx_state = UC_RX_HELLO_IPS;
1043 }
1044
1045 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1046  * LNET header and set uc_rx_state
1047  */
1048 void
1049 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1050 {
1051         conn->uc_rx_niov = 1;
1052         conn->uc_rx_iov = conn->uc_rx_iova;
1053         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
1054         conn->uc_rx_iov[0].iov_len =
1055                 conn->uc_rx_nob_wanted =
1056                 conn->uc_rx_nob_left =
1057                 sizeof(ksock_lnet_msg_t);
1058
1059         conn->uc_rx_state = UC_RX_LNET_HEADER;
1060         conn->uc_rx_flag = 1;
1061 }
1062
1063 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1064  * KSM header and set uc_rx_state
1065  */
1066 void
1067 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1068 {
1069         conn->uc_rx_niov = 1;
1070         conn->uc_rx_iov = conn->uc_rx_iova;
1071         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
1072         conn->uc_rx_iov[0].iov_len =
1073                 conn->uc_rx_nob_wanted =
1074                 conn->uc_rx_nob_left =
1075                 offsetof(ksock_msg_t, ksm_u);
1076
1077         conn->uc_rx_state = UC_RX_KSM_HEADER;
1078         conn->uc_rx_flag = 0;
1079 }
1080
1081 /* RX state transition to UC_RX_SKIPPING: update RX part for
1082  * skipping and set uc_rx_state
1083  */
1084 void
1085 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1086 {
1087         static char    skip_buffer[4096];
1088
1089         int            nob;
1090         unsigned int   niov = 0;
1091         int            skipped = 0;
1092         int            nob_to_skip = conn->uc_rx_nob_left;
1093
1094         LASSERT(nob_to_skip != 0);
1095
1096         conn->uc_rx_iov = conn->uc_rx_iova;
1097
1098         /* Set up to skip as much as possible now.  If there's more left
1099          * (ran out of iov entries) we'll get called again */
1100
1101         do {
1102                 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1103
1104                 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1105                 conn->uc_rx_iov[niov].iov_len  = nob;
1106                 niov++;
1107                 skipped += nob;
1108                 nob_to_skip -=nob;
1109
1110         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1111                  niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1112
1113         conn->uc_rx_niov = niov;
1114         conn->uc_rx_nob_wanted = skipped;
1115
1116         conn->uc_rx_state = UC_RX_SKIPPING;
1117 }