Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/conn.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42
43 /* Return 1 if the conn is timed out, 0 else */
44 int
45 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
46 {
47         if (conn->uc_tx_flag && /* sending is in progress */
48             cfs_time_aftereq(current_time, conn->uc_tx_deadline))
49                 return 1;
50
51         if (conn->uc_rx_flag && /* receiving is in progress */
52             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
53                 return 1;
54         
55         return 0;
56 }
57
58 void
59 usocklnd_conn_kill(usock_conn_t *conn)
60 {
61         pthread_mutex_lock(&conn->uc_lock);
62         if (conn->uc_state != UC_DEAD)
63                 usocklnd_conn_kill_locked(conn);
64         pthread_mutex_unlock(&conn->uc_lock);        
65 }
66
67 /* Mark the conn as DEAD and schedule its deletion */
68 void
69 usocklnd_conn_kill_locked(usock_conn_t *conn)
70 {
71         conn->uc_rx_flag = conn->uc_tx_flag = 0;
72         conn->uc_state = UC_DEAD;
73         usocklnd_add_killrequest(conn);
74 }
75
76 usock_conn_t *
77 usocklnd_conn_allocate()
78 {
79         usock_conn_t        *conn;
80         usock_pollrequest_t *pr;
81
82         LIBCFS_ALLOC (pr, sizeof(*pr));
83         if (pr == NULL)
84                 return NULL;
85         
86         LIBCFS_ALLOC (conn, sizeof(*conn));
87         if (conn == NULL) {
88                 LIBCFS_FREE (pr, sizeof(*pr));
89                 return NULL;
90         }
91         memset(conn, 0, sizeof(*conn));
92         conn->uc_preq = pr;
93
94         LIBCFS_ALLOC (conn->uc_rx_hello,
95                       offsetof(ksock_hello_msg_t,
96                                kshm_ips[LNET_MAX_INTERFACES]));
97         if (conn->uc_rx_hello == NULL) {
98                 LIBCFS_FREE (pr, sizeof(*pr));
99                 LIBCFS_FREE (conn, sizeof(*conn));
100                 return NULL;
101         }
102
103         return conn;
104 }
105
106 void
107 usocklnd_conn_free(usock_conn_t *conn)
108 {
109         usock_pollrequest_t *pr = conn->uc_preq;
110
111         if (pr != NULL)
112                 LIBCFS_FREE (pr, sizeof(*pr));
113
114         if (conn->uc_rx_hello != NULL)
115                 LIBCFS_FREE (conn->uc_rx_hello,
116                              offsetof(ksock_hello_msg_t,
117                                       kshm_ips[LNET_MAX_INTERFACES]));
118         
119         LIBCFS_FREE (conn, sizeof(*conn));
120 }
121
122 void
123 usocklnd_tear_peer_conn(usock_conn_t *conn)
124 {
125         usock_peer_t     *peer = conn->uc_peer;
126         int               idx = usocklnd_type2idx(conn->uc_type);
127         lnet_ni_t        *ni;
128         lnet_process_id_t id;
129         int               decref_flag  = 0;
130         int               killall_flag = 0;
131         
132         if (peer == NULL) /* nothing to tear */
133                 return;
134         
135         pthread_mutex_lock(&peer->up_lock);
136         pthread_mutex_lock(&conn->uc_lock);        
137
138         ni = peer->up_ni;
139         id = peer->up_peerid;
140
141         if (peer->up_conns[idx] == conn) {
142                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
143                         /* change state not to finalize twice */
144                         conn->uc_rx_state = UC_RX_KSM_HEADER;
145                         lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);                        
146                 }
147                 
148                 usocklnd_destroy_txlist(peer->up_ni,
149                                         &conn->uc_tx_list);
150
151                 peer->up_conns[idx] = NULL;
152                 conn->uc_peer = NULL;
153                 decref_flag = 1;
154
155                 if(conn->uc_errored && !peer->up_errored)
156                         peer->up_errored = killall_flag = 1;
157         }
158         
159         pthread_mutex_unlock(&conn->uc_lock);
160
161         if (killall_flag)
162                 usocklnd_del_conns_locked(peer);
163
164         pthread_mutex_unlock(&peer->up_lock);
165         
166         if (!decref_flag)
167                 return;
168
169         usocklnd_conn_decref(conn);
170         usocklnd_peer_decref(peer);
171
172         usocklnd_check_peer_stale(ni, id);
173 }
174
175 /* Remove peer from hash list if all up_conns[i] is NULL &&
176  * hash table is the only consumer of the peer */
177 void
178 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
179 {
180         usock_peer_t *peer;
181         
182         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
183         peer = usocklnd_find_peer_locked(ni, id);
184
185         if (peer == NULL) {
186                 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
187                 return;
188         }
189
190         if (cfs_atomic_read(&peer->up_refcount) == 2) {
191                 int i;
192                 for (i = 0; i < N_CONN_TYPES; i++)
193                         LASSERT (peer->up_conns[i] == NULL);
194
195                 list_del(&peer->up_list);                        
196                 
197                 if (peer->up_errored &&
198                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
199                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
200                                      cfs_time_seconds(peer->up_last_alive));
201                 
202                 usocklnd_peer_decref(peer);
203         }
204
205         usocklnd_peer_decref(peer);
206         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
207 }
208
209 /* Returns 0 on success, <0 else */
210 int
211 usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
212 {
213         int           rc;
214         __u32         peer_ip;
215         __u16         peer_port;
216         usock_conn_t *conn;
217
218         rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
219         if (rc)
220                 return rc;
221
222         rc = usocklnd_set_sock_options(fd);
223         if (rc)
224                 return rc;
225
226         conn = usocklnd_conn_allocate();
227         if (conn == NULL)
228                 return -ENOMEM;
229
230         usocklnd_rx_hellomagic_state_transition(conn);
231         
232         conn->uc_fd = fd;
233         conn->uc_peer_ip = peer_ip;
234         conn->uc_peer_port = peer_port;
235         conn->uc_state = UC_RECEIVING_HELLO;
236         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
237         conn->uc_ni = ni;
238         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
239         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
240         pthread_mutex_init(&conn->uc_lock, NULL);
241         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
242
243         *connp = conn;
244         return 0;
245 }
246
247 /* Returns 0 on success, <0 else */
248 int
249 usocklnd_create_active_conn(usock_peer_t *peer, int type,
250                             usock_conn_t **connp)
251 {
252         int           rc;
253         int           fd;
254         usock_conn_t *conn;
255         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
256         __u16         dst_port = lnet_acceptor_port();
257         
258         conn = usocklnd_conn_allocate();
259         if (conn == NULL)
260                 return -ENOMEM;
261
262         conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
263                                                         peer->up_peerid.nid);
264         if (conn->uc_tx_hello == NULL) {
265                 usocklnd_conn_free(conn);
266                 return -ENOMEM;
267         }                
268         
269         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
270                 rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
271         else
272                 rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
273         
274         if (rc) {
275                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
276                 usocklnd_conn_free(conn);
277                 return rc;
278         }
279         
280         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
281         conn->uc_tx_flag = 1;
282         
283         conn->uc_fd = fd;
284         conn->uc_peer_ip = dst_ip;
285         conn->uc_peer_port = dst_port;
286         conn->uc_type = type;
287         conn->uc_activeflag = 1;
288         conn->uc_state = UC_CONNECTING;
289         conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
290         conn->uc_ni = NULL;
291         conn->uc_peerid = peer->up_peerid;
292         conn->uc_peer = peer;
293         usocklnd_peer_addref(peer);
294         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
295         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
296         pthread_mutex_init(&conn->uc_lock, NULL);
297         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
298
299         *connp = conn;
300         return 0;
301 }
302
303 /* Returns 0 on success, <0 else */
304 int
305 usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
306 {
307         __u16 port;
308         int   fd;
309         int   rc;
310
311         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; 
312              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; 
313              port--) {
314                 /* Iterate through reserved ports. */
315
316                 rc = libcfs_sock_create(&fd);
317                 if (rc)
318                         return rc;                        
319                                 
320                 rc = libcfs_sock_bind_to_port(fd, port);
321                 if (rc) {
322                         close(fd);
323                         continue;
324                 }
325
326                 rc = usocklnd_set_sock_options(fd);
327                 if (rc) {
328                         close(fd);
329                         return rc;
330                 }
331
332                 rc = libcfs_sock_connect(fd, dst_ip, dst_port);
333                 if (rc == 0) {
334                         *fdp = fd;
335                         return 0;
336                 }
337                 
338                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
339                         close(fd);
340                         return rc;
341                 }
342
343                 close(fd);
344         }
345
346         CERROR("Can't bind to any reserved port\n");
347         return rc;
348 }
349
350 /* Returns 0 on success, <0 else */
351 int
352 usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
353 {
354         int fd;
355         int rc;
356
357         rc = libcfs_sock_create(&fd);
358         if (rc)
359                 return rc;
360         
361         rc = usocklnd_set_sock_options(fd);
362         if (rc) {
363                 close(fd);
364                 return rc;
365         }
366
367         rc = libcfs_sock_connect(fd, dst_ip, dst_port);
368         if (rc) {
369                 close(fd);
370                 return rc;
371         }
372
373         *fdp = fd;
374         return 0;
375 }
376
377 int
378 usocklnd_set_sock_options(int fd)
379 {
380         int rc;
381
382         rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
383         if (rc)
384                 return rc;
385
386         if (usock_tuns.ut_sockbufsiz) {
387                 rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
388                 if (rc)
389                         return rc;        
390         }
391         
392         return libcfs_fcntl_nonblock(fd);
393 }
394
395 usock_tx_t *
396 usocklnd_create_noop_tx(__u64 cookie)
397 {
398         usock_tx_t *tx;
399         
400         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
401         if (tx == NULL)
402                 return NULL;
403
404         tx->tx_size = sizeof(usock_tx_t);
405         tx->tx_lnetmsg = NULL;
406
407         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
408         tx->tx_msg.ksm_zc_cookies[1] = cookie;
409         
410         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
411         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
412                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
413         tx->tx_iov = tx->tx_iova;
414         tx->tx_niov = 1;
415         
416         return tx;
417 }
418         
419 usock_tx_t *
420 usocklnd_create_tx(lnet_msg_t *lntmsg)
421 {
422         usock_tx_t   *tx;
423         unsigned int  payload_niov = lntmsg->msg_niov; 
424         struct iovec *payload_iov = lntmsg->msg_iov; 
425         unsigned int  payload_offset = lntmsg->msg_offset;
426         unsigned int  payload_nob = lntmsg->msg_len;
427         int           size = offsetof(usock_tx_t,
428                                       tx_iova[1 + payload_niov]);
429
430         LIBCFS_ALLOC (tx, size);
431         if (tx == NULL)
432                 return NULL;
433
434         tx->tx_size = size;
435         tx->tx_lnetmsg = lntmsg;
436
437         tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
438         
439         socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
440         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
441         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
442         tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
443         tx->tx_iov = tx->tx_iova;
444
445         tx->tx_niov = 1 + 
446                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
447                                  payload_niov, payload_iov,
448                                  payload_offset, payload_nob);
449
450         return tx;
451 }
452
453 void
454 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
455                         lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
456 {
457         usock_net_t *net = (usock_net_t *)ni->ni_data;
458
459         hello->kshm_magic       = LNET_PROTO_MAGIC;
460         hello->kshm_version     = KSOCK_PROTO_V2;
461         hello->kshm_nips        = 0;
462         hello->kshm_ctype       = type;
463         
464         hello->kshm_dst_incarnation = 0; /* not used */
465         hello->kshm_src_incarnation = net->un_incarnation;
466
467         hello->kshm_src_pid = the_lnet.ln_pid;
468         hello->kshm_src_nid = ni->ni_nid;
469         hello->kshm_dst_nid = peer_nid;
470         hello->kshm_dst_pid = 0; /* not used */
471 }
472
473 usock_tx_t *
474 usocklnd_create_hello_tx(lnet_ni_t *ni,
475                          int type, lnet_nid_t peer_nid)
476 {
477         usock_tx_t        *tx;
478         int                size;
479         ksock_hello_msg_t *hello;
480
481         size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
482         LIBCFS_ALLOC (tx, size);
483         if (tx == NULL)
484                 return NULL;
485
486         tx->tx_size = size;
487         tx->tx_lnetmsg = NULL;
488
489         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
490         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
491         
492         tx->tx_iova[0].iov_base = (void *)hello;
493         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
494                 offsetof(ksock_hello_msg_t, kshm_ips);
495         tx->tx_iov = tx->tx_iova;
496         tx->tx_niov = 1;
497
498         return tx;
499 }
500
501 usock_tx_t *
502 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
503                             int type, lnet_nid_t peer_nid)
504 {
505         usock_tx_t              *tx;
506         int                      size;
507         lnet_acceptor_connreq_t *cr;
508         ksock_hello_msg_t       *hello;
509
510         size = sizeof(usock_tx_t) +
511                 sizeof(lnet_acceptor_connreq_t) +
512                 offsetof(ksock_hello_msg_t, kshm_ips);
513         LIBCFS_ALLOC (tx, size);
514         if (tx == NULL)
515                 return NULL;
516
517         tx->tx_size = size;
518         tx->tx_lnetmsg = NULL;
519
520         cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
521         memset(cr, 0, sizeof(*cr));
522         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
523         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
524         cr->acr_nid     = peer_nid;
525         
526         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
527         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
528         
529         tx->tx_iova[0].iov_base = (void *)cr;
530         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
531                 sizeof(lnet_acceptor_connreq_t) +
532                 offsetof(ksock_hello_msg_t, kshm_ips);
533         tx->tx_iov = tx->tx_iova;
534         tx->tx_niov = 1;
535
536         return tx;
537 }
538
539 void
540 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
541 {
542         lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
543         int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
544
545         LASSERT (ni != NULL || lnetmsg == NULL);
546
547         LIBCFS_FREE (tx, tx->tx_size);
548         
549         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
550                 lnet_finalize(ni, lnetmsg, rc);
551 }
552
553 void
554 usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
555 {
556         usock_tx_t *tx;
557
558         while (!list_empty(txlist)) {
559                 tx = list_entry(txlist->next, usock_tx_t, tx_list);
560                 list_del(&tx->tx_list);
561                 
562                 usocklnd_destroy_tx(ni, tx);
563         }
564 }
565
566 void
567 usocklnd_destroy_zcack_list(struct list_head *zcack_list)
568 {
569         usock_zc_ack_t *zcack;
570
571         while (!list_empty(zcack_list)) {
572                 zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
573                 list_del(&zcack->zc_list);
574                 
575                 LIBCFS_FREE (zcack, sizeof(*zcack));
576         }
577 }
578
579 void
580 usocklnd_destroy_peer(usock_peer_t *peer)
581 {
582         usock_net_t *net = peer->up_ni->ni_data;
583         int          i;
584
585         for (i = 0; i < N_CONN_TYPES; i++)
586                 LASSERT (peer->up_conns[i] == NULL);
587
588         LIBCFS_FREE (peer, sizeof (*peer));
589
590         pthread_mutex_lock(&net->un_lock);
591         if(--net->un_peercount == 0)                
592                 pthread_cond_signal(&net->un_cond);
593         pthread_mutex_unlock(&net->un_lock);
594 }
595
596 void
597 usocklnd_destroy_conn(usock_conn_t *conn)
598 {
599         LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
600
601         if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
602                 LASSERT (conn->uc_peer != NULL);
603                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
604         }
605
606         if (!list_empty(&conn->uc_tx_list)) {
607                 LASSERT (conn->uc_peer != NULL);                
608                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
609         }
610
611         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
612         
613         if (conn->uc_peer != NULL)
614                 usocklnd_peer_decref(conn->uc_peer);
615
616         if (conn->uc_ni != NULL)
617                 lnet_ni_decref(conn->uc_ni);
618
619         if (conn->uc_tx_hello)
620                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
621
622         usocklnd_conn_free(conn);
623 }
624
625 int
626 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
627 {
628         int nob;
629
630         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
631                 return SOCKLND_CONN_ANY;
632
633         nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
634         
635         if (nob >= usock_tuns.ut_min_bulk)
636                 return SOCKLND_CONN_BULK_OUT;
637         else
638                 return SOCKLND_CONN_CONTROL;
639 }
640
641 int usocklnd_type2idx(int type)
642 {
643         switch (type) {
644         case SOCKLND_CONN_ANY:
645         case SOCKLND_CONN_CONTROL:
646                 return 0;
647         case SOCKLND_CONN_BULK_IN:
648                 return 1;
649         case SOCKLND_CONN_BULK_OUT:
650                 return 2;
651         default:
652                 LBUG();
653         }
654 }
655
656 usock_peer_t *
657 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
658 {
659         struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
660         struct list_head *tmp;
661         usock_peer_t     *peer;
662
663         list_for_each (tmp, peer_list) {
664
665                 peer = list_entry (tmp, usock_peer_t, up_list);
666
667                 if (peer->up_ni != ni)
668                         continue;
669
670                 if (peer->up_peerid.nid != id.nid ||
671                     peer->up_peerid.pid != id.pid)
672                         continue;
673
674                 usocklnd_peer_addref(peer);
675                 return peer;
676         }
677         return (NULL);
678 }
679
680 int
681 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
682                      usock_peer_t **peerp)
683 {
684         usock_net_t  *net = ni->ni_data;
685         usock_peer_t *peer;
686         int           i;
687
688         LIBCFS_ALLOC (peer, sizeof (*peer));
689         if (peer == NULL)
690                 return -ENOMEM;
691
692         for (i = 0; i < N_CONN_TYPES; i++)
693                 peer->up_conns[i] = NULL;
694
695         peer->up_peerid       = id;
696         peer->up_ni           = ni;
697         peer->up_incrn_is_set = 0;
698         peer->up_errored      = 0;
699         peer->up_last_alive   = 0;
700         cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
701         pthread_mutex_init(&peer->up_lock, NULL);        
702
703         pthread_mutex_lock(&net->un_lock);
704         net->un_peercount++;        
705         pthread_mutex_unlock(&net->un_lock);
706
707         *peerp = peer;
708         return 0;
709 }
710
711 /* Safely create new peer if needed. Save result in *peerp.
712  * Returns 0 on success, <0 else */
713 int
714 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
715                              usock_peer_t **peerp)
716 {
717         int           rc;
718         usock_peer_t *peer;
719         usock_peer_t *peer2;
720         usock_net_t  *net = ni->ni_data;
721
722         pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
723         peer = usocklnd_find_peer_locked(ni, id);
724         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
725
726         if (peer != NULL)
727                 goto find_or_create_peer_done;
728
729         rc = usocklnd_create_peer(ni, id, &peer);
730         if (rc)
731                 return rc;
732         
733         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
734         peer2 = usocklnd_find_peer_locked(ni, id);
735         if (peer2 == NULL) {
736                 if (net->un_shutdown) {
737                         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
738                         usocklnd_peer_decref(peer); /* should destroy peer */
739                         CERROR("Can't create peer: network shutdown\n");
740                         return -ESHUTDOWN;
741                 }
742                 
743                 /* peer table will take 1 of my refs on peer */
744                 usocklnd_peer_addref(peer);
745                 list_add_tail (&peer->up_list,
746                                usocklnd_nid2peerlist(id.nid));
747         } else {
748                 usocklnd_peer_decref(peer); /* should destroy peer */
749                 peer = peer2;
750         }
751         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
752         
753   find_or_create_peer_done:        
754         *peerp = peer;
755         return 0;
756 }
757
758 /* NB: both peer and conn locks are held */
759 static int
760 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
761 {        
762         if (conn->uc_state == UC_READY &&
763             list_empty(&conn->uc_tx_list) &&
764             list_empty(&conn->uc_zcack_list) &&
765             !conn->uc_sending) {
766                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
767                                                   POLLOUT);
768                 if (rc != 0)
769                         return rc;
770         }                
771
772         list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
773         return 0;
774 }
775
776 /* NB: both peer and conn locks are held
777  * NB: if sending isn't in progress.  the caller *MUST* send tx
778  * immediately after we'll return */
779 static void
780 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
781                     int *send_immediately)
782 {        
783         if (conn->uc_state == UC_READY &&
784             list_empty(&conn->uc_tx_list) &&
785             list_empty(&conn->uc_zcack_list) &&
786             !conn->uc_sending) {
787                 conn->uc_sending = 1;
788                 *send_immediately = 1;
789                 return;
790         }                
791
792         *send_immediately = 0;
793         list_add_tail(&tx->tx_list, &conn->uc_tx_list);
794 }
795
796 /* Safely create new conn if needed. Save result in *connp.
797  * Returns 0 on success, <0 else */
798 int
799 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
800                              usock_conn_t **connp,
801                              usock_tx_t *tx, usock_zc_ack_t *zc_ack,
802                              int *send_immediately)
803 {
804         usock_conn_t *conn;
805         int           idx;
806         int           rc;
807         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
808         
809         if (userflag)
810                 type = SOCKLND_CONN_ANY;
811
812         idx = usocklnd_type2idx(type);
813         
814         pthread_mutex_lock(&peer->up_lock);
815         if (peer->up_conns[idx] != NULL) {
816                 conn = peer->up_conns[idx];
817                 LASSERT(conn->uc_type == type);
818         } else {
819                 if (userflag) {
820                         CERROR("Refusing to create a connection to "
821                                "userspace process %s\n",
822                                libcfs_id2str(peer->up_peerid));
823                         rc = -EHOSTUNREACH;
824                         goto find_or_create_conn_failed;
825                 }
826                 
827                 rc = usocklnd_create_active_conn(peer, type, &conn);
828                 if (rc) {
829                         peer->up_errored = 1;
830                         usocklnd_del_conns_locked(peer);
831                         goto find_or_create_conn_failed;
832                 }
833
834                 /* peer takes 1 of conn refcount */
835                 usocklnd_link_conn_to_peer(conn, peer, idx);
836                 
837                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
838                 if (rc) {
839                         peer->up_conns[idx] = NULL;
840                         usocklnd_conn_decref(conn); /* should destroy conn */
841                         goto find_or_create_conn_failed;
842                 }
843                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
844         }
845         
846         pthread_mutex_lock(&conn->uc_lock);
847         LASSERT(conn->uc_peer == peer);
848
849         LASSERT(tx == NULL || zc_ack == NULL);
850         if (tx != NULL) {
851                 usocklnd_enqueue_tx(conn, tx, send_immediately);
852         } else {
853                 rc = usocklnd_enqueue_zcack(conn, zc_ack);        
854                 if (rc != 0) {
855                         usocklnd_conn_kill_locked(conn);
856                         pthread_mutex_unlock(&conn->uc_lock);
857                         goto find_or_create_conn_failed;
858                 }
859         }
860         pthread_mutex_unlock(&conn->uc_lock);         
861
862         usocklnd_conn_addref(conn);
863         pthread_mutex_unlock(&peer->up_lock);
864
865         *connp = conn;
866         return 0;
867
868   find_or_create_conn_failed:
869         pthread_mutex_unlock(&peer->up_lock);
870         return rc;
871 }
872
873 void
874 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
875 {
876         peer->up_conns[idx] = conn;        
877         peer->up_errored    = 0; /* this new fresh conn will try
878                                   * revitalize even stale errored peer */
879 }
880
881 int
882 usocklnd_invert_type(int type)
883 {
884         switch (type)
885         {
886         case SOCKLND_CONN_ANY:
887         case SOCKLND_CONN_CONTROL:
888                 return (type);
889         case SOCKLND_CONN_BULK_IN:
890                 return SOCKLND_CONN_BULK_OUT;
891         case SOCKLND_CONN_BULK_OUT:
892                 return SOCKLND_CONN_BULK_IN;
893         default:
894                 return SOCKLND_CONN_NONE;
895         }
896 }
897
898 void
899 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
900 {
901         pthread_mutex_lock(&conn->uc_lock);
902         if (conn->uc_state != UC_DEAD)
903                 conn->uc_state = new_state;
904         pthread_mutex_unlock(&conn->uc_lock);
905 }
906
907 /* NB: peer is locked by caller */
908 void
909 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
910                              usock_conn_t *skip_conn)
911 {
912         int i;
913         
914         if (!peer->up_incrn_is_set) {
915                 peer->up_incarnation = incrn;
916                 peer->up_incrn_is_set = 1;
917                 return;
918         }
919
920         if (peer->up_incarnation == incrn)
921                 return;
922
923         peer->up_incarnation = incrn;
924         
925         for (i = 0; i < N_CONN_TYPES; i++) {
926                 usock_conn_t *conn = peer->up_conns[i];
927                 
928                 if (conn == NULL || conn == skip_conn)
929                         continue;
930
931                 pthread_mutex_lock(&conn->uc_lock);        
932                 LASSERT (conn->uc_peer == peer);
933                 conn->uc_peer = NULL;
934                 peer->up_conns[i] = NULL;
935                 if (conn->uc_state != UC_DEAD)
936                         usocklnd_conn_kill_locked(conn);                
937                 pthread_mutex_unlock(&conn->uc_lock);
938
939                 usocklnd_conn_decref(conn);
940                 usocklnd_peer_decref(peer);
941         }
942 }
943
944 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
945  * MAGIC part of hello and set uc_rx_state
946  */
947 void
948 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
949 {
950         LASSERT(conn->uc_rx_hello != NULL);
951
952         conn->uc_rx_niov = 1;
953         conn->uc_rx_iov = conn->uc_rx_iova;
954         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
955         conn->uc_rx_iov[0].iov_len =
956                 conn->uc_rx_nob_wanted =
957                 conn->uc_rx_nob_left =
958                 sizeof(conn->uc_rx_hello->kshm_magic);
959
960         conn->uc_rx_state = UC_RX_HELLO_MAGIC;
961
962         conn->uc_rx_flag = 1; /* waiting for incoming hello */
963         conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
964 }
965
966 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
967  * VERSION part of hello and set uc_rx_state
968  */
969 void
970 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
971 {
972         LASSERT(conn->uc_rx_hello != NULL);
973
974         conn->uc_rx_niov = 1;
975         conn->uc_rx_iov = conn->uc_rx_iova;
976         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
977         conn->uc_rx_iov[0].iov_len =
978                 conn->uc_rx_nob_wanted =
979                 conn->uc_rx_nob_left =
980                 sizeof(conn->uc_rx_hello->kshm_version);
981         
982         conn->uc_rx_state = UC_RX_HELLO_VERSION;
983 }
984
985 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
986  * the rest  of hello and set uc_rx_state
987  */
988 void
989 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
990 {
991         LASSERT(conn->uc_rx_hello != NULL);
992
993         conn->uc_rx_niov = 1;
994         conn->uc_rx_iov = conn->uc_rx_iova;
995         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
996         conn->uc_rx_iov[0].iov_len =
997                 conn->uc_rx_nob_wanted =
998                 conn->uc_rx_nob_left =
999                 offsetof(ksock_hello_msg_t, kshm_ips) -
1000                 offsetof(ksock_hello_msg_t, kshm_src_nid);
1001         
1002         conn->uc_rx_state = UC_RX_HELLO_BODY;
1003 }
1004
1005 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1006  * array of IPs and set uc_rx_state
1007  */
1008 void
1009 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1010 {
1011         LASSERT(conn->uc_rx_hello != NULL);
1012
1013         conn->uc_rx_niov = 1;
1014         conn->uc_rx_iov = conn->uc_rx_iova;
1015         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1016         conn->uc_rx_iov[0].iov_len =
1017                 conn->uc_rx_nob_wanted =
1018                 conn->uc_rx_nob_left =
1019                 conn->uc_rx_hello->kshm_nips *
1020                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1021         
1022         conn->uc_rx_state = UC_RX_HELLO_IPS;
1023 }
1024
1025 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1026  * LNET header and set uc_rx_state
1027  */
1028 void
1029 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1030 {
1031         conn->uc_rx_niov = 1;
1032         conn->uc_rx_iov = conn->uc_rx_iova;
1033         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;                
1034         conn->uc_rx_iov[0].iov_len =
1035                 conn->uc_rx_nob_wanted =
1036                 conn->uc_rx_nob_left =
1037                 sizeof(ksock_lnet_msg_t);
1038         
1039         conn->uc_rx_state = UC_RX_LNET_HEADER;
1040         conn->uc_rx_flag = 1;
1041 }
1042
1043 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1044  * KSM header and set uc_rx_state
1045  */
1046 void
1047 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1048 {
1049         conn->uc_rx_niov = 1;
1050         conn->uc_rx_iov = conn->uc_rx_iova;
1051         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;                
1052         conn->uc_rx_iov[0].iov_len =
1053                 conn->uc_rx_nob_wanted =
1054                 conn->uc_rx_nob_left =                        
1055                 offsetof(ksock_msg_t, ksm_u);
1056         
1057         conn->uc_rx_state = UC_RX_KSM_HEADER;
1058         conn->uc_rx_flag = 0;
1059 }
1060
1061 /* RX state transition to UC_RX_SKIPPING: update RX part for
1062  * skipping and set uc_rx_state
1063  */
1064 void
1065 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1066 {
1067         static char    skip_buffer[4096];
1068
1069         int            nob;
1070         unsigned int   niov = 0;
1071         int            skipped = 0;
1072         int            nob_to_skip = conn->uc_rx_nob_left;
1073         
1074         LASSERT(nob_to_skip != 0);
1075
1076         conn->uc_rx_iov = conn->uc_rx_iova;
1077
1078         /* Set up to skip as much as possible now.  If there's more left
1079          * (ran out of iov entries) we'll get called again */
1080
1081         do {
1082                 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1083
1084                 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1085                 conn->uc_rx_iov[niov].iov_len  = nob;
1086                 niov++;
1087                 skipped += nob;
1088                 nob_to_skip -=nob;
1089
1090         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1091                  niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1092
1093         conn->uc_rx_niov = niov;
1094         conn->uc_rx_nob_wanted = skipped;
1095
1096         conn->uc_rx_state = UC_RX_SKIPPING;
1097 }