Whamcloud - gitweb
b=16150
[fs/lustre-release.git] / lnet / ulnds / socklnd / conn.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/conn.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42
43 /* Return 1 if the conn is timed out, 0 else */
44 int
45 usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
46 {
47         if (conn->uc_tx_flag && /* sending is in progress */
48             cfs_time_aftereq(current_time, conn->uc_tx_deadline))
49                 return 1;
50
51         if (conn->uc_rx_flag && /* receiving is in progress */
52             cfs_time_aftereq(current_time, conn->uc_rx_deadline))
53                 return 1;
54         
55         return 0;
56 }
57
58 void
59 usocklnd_conn_kill(usock_conn_t *conn)
60 {
61         pthread_mutex_lock(&conn->uc_lock);
62         if (conn->uc_state != UC_DEAD)
63                 usocklnd_conn_kill_locked(conn);
64         pthread_mutex_unlock(&conn->uc_lock);        
65 }
66
67 /* Mark the conn as DEAD and schedule its deletion */
68 void
69 usocklnd_conn_kill_locked(usock_conn_t *conn)
70 {
71         conn->uc_rx_flag = conn->uc_tx_flag = 0;
72         conn->uc_state = UC_DEAD;
73         usocklnd_add_killrequest(conn);
74 }
75
76 usock_conn_t *
77 usocklnd_conn_allocate()
78 {
79         usock_conn_t        *conn;
80         usock_pollrequest_t *pr;
81
82         LIBCFS_ALLOC (pr, sizeof(*pr));
83         if (pr == NULL)
84                 return NULL;
85         
86         LIBCFS_ALLOC (conn, sizeof(*conn));
87         if (conn == NULL) {
88                 LIBCFS_FREE (pr, sizeof(*pr));
89                 return NULL;
90         }
91         memset(conn, 0, sizeof(*conn));
92         conn->uc_preq = pr;
93
94         LIBCFS_ALLOC (conn->uc_rx_hello,
95                       offsetof(ksock_hello_msg_t,
96                                kshm_ips[LNET_MAX_INTERFACES]));
97         if (conn->uc_rx_hello == NULL) {
98                 LIBCFS_FREE (pr, sizeof(*pr));
99                 LIBCFS_FREE (conn, sizeof(*conn));
100                 return NULL;
101         }
102
103         return conn;
104 }
105
106 void
107 usocklnd_conn_free(usock_conn_t *conn)
108 {
109         usock_pollrequest_t *pr = conn->uc_preq;
110
111         if (pr != NULL)
112                 LIBCFS_FREE (pr, sizeof(*pr));
113
114         if (conn->uc_rx_hello != NULL)
115                 LIBCFS_FREE (conn->uc_rx_hello,
116                              offsetof(ksock_hello_msg_t,
117                                       kshm_ips[LNET_MAX_INTERFACES]));
118         
119         LIBCFS_FREE (conn, sizeof(*conn));
120 }
121
122 void
123 usocklnd_tear_peer_conn(usock_conn_t *conn)
124 {
125         usock_peer_t     *peer = conn->uc_peer;
126         int               idx = usocklnd_type2idx(conn->uc_type);
127         lnet_ni_t        *ni;
128         lnet_process_id_t id;
129         int               decref_flag  = 0;
130         int               killall_flag = 0;
131         
132         if (peer == NULL) /* nothing to tear */
133                 return;
134         
135         pthread_mutex_lock(&peer->up_lock);
136         pthread_mutex_lock(&conn->uc_lock);        
137
138         ni = peer->up_ni;
139         id = peer->up_peerid;
140
141         if (peer->up_conns[idx] == conn) {
142                 if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
143                         /* change state not to finalize twice */
144                         conn->uc_rx_state = UC_RX_KSM_HEADER;
145                         lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);                        
146                 }
147                 
148                 usocklnd_destroy_txlist(peer->up_ni,
149                                         &conn->uc_tx_list);
150
151                 peer->up_conns[idx] = NULL;
152                 conn->uc_peer = NULL;
153                 decref_flag = 1;
154
155                 if(conn->uc_errored && !peer->up_errored)
156                         peer->up_errored = killall_flag = 1;
157         }
158         
159         pthread_mutex_unlock(&conn->uc_lock);
160
161         if (killall_flag)
162                 usocklnd_del_conns_locked(peer);
163
164         pthread_mutex_unlock(&peer->up_lock);
165         
166         if (!decref_flag)
167                 return;
168
169         usocklnd_conn_decref(conn);
170         usocklnd_peer_decref(peer);
171
172         usocklnd_check_peer_stale(ni, id);
173 }
174
175 /* Remove peer from hash list if all up_conns[i] is NULL &&
176  * hash table is the only consumer of the peer */
177 void
178 usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
179 {
180         usock_peer_t *peer;
181         
182         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
183         peer = usocklnd_find_peer_locked(ni, id);
184
185         if (peer == NULL) {
186                 pthread_rwlock_unlock(&usock_data.ud_peers_lock);
187                 return;
188         }
189
190         if (cfs_atomic_read(&peer->up_refcount) == 2) {
191                 int i;
192                 for (i = 0; i < N_CONN_TYPES; i++)
193                         LASSERT (peer->up_conns[i] == NULL);
194
195                 list_del(&peer->up_list);                        
196                 
197                 if (peer->up_errored &&
198                     (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
199                         lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
200                                      cfs_time_seconds(peer->up_last_alive));
201                 
202                 usocklnd_peer_decref(peer);
203         }
204
205         usocklnd_peer_decref(peer);
206         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
207 }
208
209 /* Returns 0 on success, <0 else */
210 int
211 usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
212 {
213         int           rc;
214         __u32         peer_ip;
215         __u16         peer_port;
216         usock_conn_t *conn;
217
218         rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
219         if (rc)
220                 return rc;
221
222         rc = usocklnd_set_sock_options(fd);
223         if (rc)
224                 return rc;
225
226         conn = usocklnd_conn_allocate();
227         if (conn == NULL)
228                 return -ENOMEM;
229
230         usocklnd_rx_hellomagic_state_transition(conn);
231         
232         conn->uc_fd = fd;
233         conn->uc_peer_ip = peer_ip;
234         conn->uc_peer_port = peer_port;
235         conn->uc_state = UC_RECEIVING_HELLO;
236         conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
237         conn->uc_ni = ni;
238         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
239         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
240         pthread_mutex_init(&conn->uc_lock, NULL);
241         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
242
243         *connp = conn;
244         return 0;
245 }
246
247 /* Returns 0 on success, <0 else */
248 int
249 usocklnd_create_active_conn(usock_peer_t *peer, int type,
250                             usock_conn_t **connp)
251 {
252         int           rc;
253         int           fd;
254         usock_conn_t *conn;
255         __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);
256         __u16         dst_port = lnet_acceptor_port();
257         
258         conn = usocklnd_conn_allocate();
259         if (conn == NULL)
260                 return -ENOMEM;
261
262         conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
263                                                         peer->up_peerid.nid);
264         if (conn->uc_tx_hello == NULL) {
265                 usocklnd_conn_free(conn);
266                 return -ENOMEM;
267         }                
268         
269         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
270                 rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
271         else
272                 rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
273         
274         if (rc) {
275                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
276                 usocklnd_conn_free(conn);
277                 return rc;
278         }
279         
280         conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
281         conn->uc_tx_flag = 1;
282         
283         conn->uc_fd = fd;
284         conn->uc_peer_ip = dst_ip;
285         conn->uc_peer_port = dst_port;
286         conn->uc_type = type;
287         conn->uc_activeflag = 1;
288         conn->uc_state = UC_CONNECTING;
289         conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
290         conn->uc_ni = NULL;
291         conn->uc_peerid = peer->up_peerid;
292         conn->uc_peer = peer;
293         usocklnd_peer_addref(peer);
294         CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
295         CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
296         pthread_mutex_init(&conn->uc_lock, NULL);
297         cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
298
299         *connp = conn;
300         return 0;
301 }
302
303 /* Returns 0 on success, <0 else */
304 int
305 usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
306 {
307         __u16 port;
308         int   fd;
309         int   rc;
310
311         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; 
312              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; 
313              port--) {
314                 /* Iterate through reserved ports. */
315
316                 rc = libcfs_sock_create(&fd);
317                 if (rc)
318                         return rc;                        
319                                 
320                 rc = libcfs_sock_bind_to_port(fd, port);
321                 if (rc) {
322                         close(fd);
323                         continue;
324                 }
325
326                 rc = usocklnd_set_sock_options(fd);
327                 if (rc) {
328                         close(fd);
329                         return rc;
330                 }
331
332                 rc = libcfs_sock_connect(fd, dst_ip, dst_port);
333                 if (rc == 0) {
334                         *fdp = fd;
335                         return 0;
336                 }
337                 
338                 if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
339                         close(fd);
340                         return rc;
341                 }
342
343                 close(fd);
344         }
345
346         CERROR("Can't bind to any reserved port\n");
347         return rc;
348 }
349
350 /* Returns 0 on success, <0 else */
351 int
352 usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
353 {
354         int fd;
355         int rc;
356
357         rc = libcfs_sock_create(&fd);
358         if (rc)
359                 return rc;
360         
361         rc = usocklnd_set_sock_options(fd);
362         if (rc) {
363                 close(fd);
364                 return rc;
365         }
366
367         rc = libcfs_sock_connect(fd, dst_ip, dst_port);
368         if (rc) {
369                 close(fd);
370                 return rc;
371         }
372
373         *fdp = fd;
374         return 0;
375 }
376
377 int
378 usocklnd_set_sock_options(int fd)
379 {
380         int rc;
381
382         rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
383         if (rc)
384                 return rc;
385
386         if (usock_tuns.ut_sockbufsiz) {
387                 rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
388                 if (rc)
389                         return rc;        
390         }
391         
392         return libcfs_fcntl_nonblock(fd);
393 }
394
395 void
396 usocklnd_init_msg(ksock_msg_t *msg, int type)
397 {
398         msg->ksm_type           = type;
399         msg->ksm_csum           = 0;
400         msg->ksm_zc_req_cookie  = 0;
401         msg->ksm_zc_ack_cookie  = 0;
402 }
403
404 usock_tx_t *
405 usocklnd_create_noop_tx(__u64 cookie)
406 {
407         usock_tx_t *tx;
408         
409         LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
410         if (tx == NULL)
411                 return NULL;
412
413         tx->tx_size = sizeof(usock_tx_t);
414         tx->tx_lnetmsg = NULL;
415
416         usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
417         tx->tx_msg.ksm_zc_ack_cookie = cookie;
418         
419         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
420         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
421                 offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
422         tx->tx_iov = tx->tx_iova;
423         tx->tx_niov = 1;
424         
425         return tx;
426 }
427         
428 usock_tx_t *
429 usocklnd_create_tx(lnet_msg_t *lntmsg)
430 {
431         usock_tx_t   *tx;
432         unsigned int  payload_niov = lntmsg->msg_niov; 
433         struct iovec *payload_iov = lntmsg->msg_iov; 
434         unsigned int  payload_offset = lntmsg->msg_offset;
435         unsigned int  payload_nob = lntmsg->msg_len;
436         int           size = offsetof(usock_tx_t,
437                                       tx_iova[1 + payload_niov]);
438
439         LIBCFS_ALLOC (tx, size);
440         if (tx == NULL)
441                 return NULL;
442
443         tx->tx_size = size;
444         tx->tx_lnetmsg = lntmsg;
445
446         tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
447         
448         usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
449         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
450         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
451         tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
452         tx->tx_iov = tx->tx_iova;
453
454         tx->tx_niov = 1 + 
455                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],
456                                  payload_niov, payload_iov,
457                                  payload_offset, payload_nob);
458
459         return tx;
460 }
461
462 void
463 usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
464                         lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
465 {
466         usock_net_t *net = (usock_net_t *)ni->ni_data;
467
468         hello->kshm_magic       = LNET_PROTO_MAGIC;
469         hello->kshm_version     = KSOCK_PROTO_V2;
470         hello->kshm_nips        = 0;
471         hello->kshm_ctype       = type;
472         
473         hello->kshm_dst_incarnation = 0; /* not used */
474         hello->kshm_src_incarnation = net->un_incarnation;
475
476         hello->kshm_src_pid = the_lnet.ln_pid;
477         hello->kshm_src_nid = ni->ni_nid;
478         hello->kshm_dst_nid = peer_nid;
479         hello->kshm_dst_pid = 0; /* not used */
480 }
481
482 usock_tx_t *
483 usocklnd_create_hello_tx(lnet_ni_t *ni,
484                          int type, lnet_nid_t peer_nid)
485 {
486         usock_tx_t        *tx;
487         int                size;
488         ksock_hello_msg_t *hello;
489
490         size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
491         LIBCFS_ALLOC (tx, size);
492         if (tx == NULL)
493                 return NULL;
494
495         tx->tx_size = size;
496         tx->tx_lnetmsg = NULL;
497
498         hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
499         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
500         
501         tx->tx_iova[0].iov_base = (void *)hello;
502         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
503                 offsetof(ksock_hello_msg_t, kshm_ips);
504         tx->tx_iov = tx->tx_iova;
505         tx->tx_niov = 1;
506
507         return tx;
508 }
509
510 usock_tx_t *
511 usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
512                             int type, lnet_nid_t peer_nid)
513 {
514         usock_tx_t              *tx;
515         int                      size;
516         lnet_acceptor_connreq_t *cr;
517         ksock_hello_msg_t       *hello;
518
519         size = sizeof(usock_tx_t) +
520                 sizeof(lnet_acceptor_connreq_t) +
521                 offsetof(ksock_hello_msg_t, kshm_ips);
522         LIBCFS_ALLOC (tx, size);
523         if (tx == NULL)
524                 return NULL;
525
526         tx->tx_size = size;
527         tx->tx_lnetmsg = NULL;
528
529         cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
530         memset(cr, 0, sizeof(*cr));
531         cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
532         cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
533         cr->acr_nid     = peer_nid;
534         
535         hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
536         usocklnd_init_hello_msg(hello, ni, type, peer_nid);
537         
538         tx->tx_iova[0].iov_base = (void *)cr;
539         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
540                 sizeof(lnet_acceptor_connreq_t) +
541                 offsetof(ksock_hello_msg_t, kshm_ips);
542         tx->tx_iov = tx->tx_iova;
543         tx->tx_niov = 1;
544
545         return tx;
546 }
547
548 void
549 usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
550 {
551         lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
552         int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
553
554         LASSERT (ni != NULL || lnetmsg == NULL);
555
556         LIBCFS_FREE (tx, tx->tx_size);
557         
558         if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
559                 lnet_finalize(ni, lnetmsg, rc);
560 }
561
562 void
563 usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
564 {
565         usock_tx_t *tx;
566
567         while (!list_empty(txlist)) {
568                 tx = list_entry(txlist->next, usock_tx_t, tx_list);
569                 list_del(&tx->tx_list);
570                 
571                 usocklnd_destroy_tx(ni, tx);
572         }
573 }
574
575 void
576 usocklnd_destroy_zcack_list(struct list_head *zcack_list)
577 {
578         usock_zc_ack_t *zcack;
579
580         while (!list_empty(zcack_list)) {
581                 zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
582                 list_del(&zcack->zc_list);
583                 
584                 LIBCFS_FREE (zcack, sizeof(*zcack));
585         }
586 }
587
588 void
589 usocklnd_destroy_peer(usock_peer_t *peer)
590 {
591         usock_net_t *net = peer->up_ni->ni_data;
592         int          i;
593
594         for (i = 0; i < N_CONN_TYPES; i++)
595                 LASSERT (peer->up_conns[i] == NULL);
596
597         LIBCFS_FREE (peer, sizeof (*peer));
598
599         pthread_mutex_lock(&net->un_lock);
600         if(--net->un_peercount == 0)                
601                 pthread_cond_signal(&net->un_cond);
602         pthread_mutex_unlock(&net->un_lock);
603 }
604
605 void
606 usocklnd_destroy_conn(usock_conn_t *conn)
607 {
608         LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
609
610         if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
611                 LASSERT (conn->uc_peer != NULL);
612                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
613         }
614
615         if (!list_empty(&conn->uc_tx_list)) {
616                 LASSERT (conn->uc_peer != NULL);                
617                 usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
618         }
619
620         usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
621         
622         if (conn->uc_peer != NULL)
623                 usocklnd_peer_decref(conn->uc_peer);
624
625         if (conn->uc_ni != NULL)
626                 lnet_ni_decref(conn->uc_ni);
627
628         if (conn->uc_tx_hello)
629                 usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
630
631         usocklnd_conn_free(conn);
632 }
633
634 int
635 usocklnd_get_conn_type(lnet_msg_t *lntmsg)
636 {
637         int nob;
638
639         if (the_lnet.ln_pid & LNET_PID_USERFLAG)
640                 return SOCKLND_CONN_ANY;
641
642         nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
643         
644         if (nob >= usock_tuns.ut_min_bulk)
645                 return SOCKLND_CONN_BULK_OUT;
646         else
647                 return SOCKLND_CONN_CONTROL;
648 }
649
650 int usocklnd_type2idx(int type)
651 {
652         switch (type) {
653         case SOCKLND_CONN_ANY:
654         case SOCKLND_CONN_CONTROL:
655                 return 0;
656         case SOCKLND_CONN_BULK_IN:
657                 return 1;
658         case SOCKLND_CONN_BULK_OUT:
659                 return 2;
660         default:
661                 LBUG();
662         }
663 }
664
665 usock_peer_t *
666 usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
667 {
668         struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
669         struct list_head *tmp;
670         usock_peer_t     *peer;
671
672         list_for_each (tmp, peer_list) {
673
674                 peer = list_entry (tmp, usock_peer_t, up_list);
675
676                 if (peer->up_ni != ni)
677                         continue;
678
679                 if (peer->up_peerid.nid != id.nid ||
680                     peer->up_peerid.pid != id.pid)
681                         continue;
682
683                 usocklnd_peer_addref(peer);
684                 return peer;
685         }
686         return (NULL);
687 }
688
689 int
690 usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
691                      usock_peer_t **peerp)
692 {
693         usock_net_t  *net = ni->ni_data;
694         usock_peer_t *peer;
695         int           i;
696
697         LIBCFS_ALLOC (peer, sizeof (*peer));
698         if (peer == NULL)
699                 return -ENOMEM;
700
701         for (i = 0; i < N_CONN_TYPES; i++)
702                 peer->up_conns[i] = NULL;
703
704         peer->up_peerid       = id;
705         peer->up_ni           = ni;
706         peer->up_incrn_is_set = 0;
707         peer->up_errored      = 0;
708         peer->up_last_alive   = 0;
709         cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
710         pthread_mutex_init(&peer->up_lock, NULL);        
711
712         pthread_mutex_lock(&net->un_lock);
713         net->un_peercount++;        
714         pthread_mutex_unlock(&net->un_lock);
715
716         *peerp = peer;
717         return 0;
718 }
719
720 /* Safely create new peer if needed. Save result in *peerp.
721  * Returns 0 on success, <0 else */
722 int
723 usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
724                              usock_peer_t **peerp)
725 {
726         int           rc;
727         usock_peer_t *peer;
728         usock_peer_t *peer2;
729         usock_net_t  *net = ni->ni_data;
730
731         pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
732         peer = usocklnd_find_peer_locked(ni, id);
733         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
734
735         if (peer != NULL)
736                 goto find_or_create_peer_done;
737
738         rc = usocklnd_create_peer(ni, id, &peer);
739         if (rc)
740                 return rc;
741         
742         pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
743         peer2 = usocklnd_find_peer_locked(ni, id);
744         if (peer2 == NULL) {
745                 if (net->un_shutdown) {
746                         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
747                         usocklnd_peer_decref(peer); /* should destroy peer */
748                         CERROR("Can't create peer: network shutdown\n");
749                         return -ESHUTDOWN;
750                 }
751                 
752                 /* peer table will take 1 of my refs on peer */
753                 usocklnd_peer_addref(peer);
754                 list_add_tail (&peer->up_list,
755                                usocklnd_nid2peerlist(id.nid));
756         } else {
757                 usocklnd_peer_decref(peer); /* should destroy peer */
758                 peer = peer2;
759         }
760         pthread_rwlock_unlock(&usock_data.ud_peers_lock);
761         
762   find_or_create_peer_done:        
763         *peerp = peer;
764         return 0;
765 }
766
767 /* NB: both peer and conn locks are held */
768 static int
769 usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
770 {        
771         if (conn->uc_state == UC_READY &&
772             list_empty(&conn->uc_tx_list) &&
773             list_empty(&conn->uc_zcack_list) &&
774             !conn->uc_sending) {
775                 int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
776                                                   POLLOUT);
777                 if (rc != 0)
778                         return rc;
779         }                
780
781         list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
782         return 0;
783 }
784
785 /* NB: both peer and conn locks are held
786  * NB: if sending isn't in progress.  the caller *MUST* send tx
787  * immediately after we'll return */
788 static void
789 usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
790                     int *send_immediately)
791 {        
792         if (conn->uc_state == UC_READY &&
793             list_empty(&conn->uc_tx_list) &&
794             list_empty(&conn->uc_zcack_list) &&
795             !conn->uc_sending) {
796                 conn->uc_sending = 1;
797                 *send_immediately = 1;
798                 return;
799         }                
800
801         *send_immediately = 0;
802         list_add_tail(&tx->tx_list, &conn->uc_tx_list);
803 }
804
805 /* Safely create new conn if needed. Save result in *connp.
806  * Returns 0 on success, <0 else */
807 int
808 usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
809                              usock_conn_t **connp,
810                              usock_tx_t *tx, usock_zc_ack_t *zc_ack,
811                              int *send_immediately)
812 {
813         usock_conn_t *conn;
814         int           idx;
815         int           rc;
816         lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
817         
818         if (userflag)
819                 type = SOCKLND_CONN_ANY;
820
821         idx = usocklnd_type2idx(type);
822         
823         pthread_mutex_lock(&peer->up_lock);
824         if (peer->up_conns[idx] != NULL) {
825                 conn = peer->up_conns[idx];
826                 LASSERT(conn->uc_type == type);
827         } else {
828                 if (userflag) {
829                         CERROR("Refusing to create a connection to "
830                                "userspace process %s\n",
831                                libcfs_id2str(peer->up_peerid));
832                         rc = -EHOSTUNREACH;
833                         goto find_or_create_conn_failed;
834                 }
835                 
836                 rc = usocklnd_create_active_conn(peer, type, &conn);
837                 if (rc) {
838                         peer->up_errored = 1;
839                         usocklnd_del_conns_locked(peer);
840                         goto find_or_create_conn_failed;
841                 }
842
843                 /* peer takes 1 of conn refcount */
844                 usocklnd_link_conn_to_peer(conn, peer, idx);
845                 
846                 rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
847                 if (rc) {
848                         peer->up_conns[idx] = NULL;
849                         usocklnd_conn_decref(conn); /* should destroy conn */
850                         goto find_or_create_conn_failed;
851                 }
852                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
853         }
854         
855         pthread_mutex_lock(&conn->uc_lock);
856         LASSERT(conn->uc_peer == peer);
857
858         LASSERT(tx == NULL || zc_ack == NULL);
859         if (tx != NULL) {
860                 usocklnd_enqueue_tx(conn, tx, send_immediately);
861         } else {
862                 rc = usocklnd_enqueue_zcack(conn, zc_ack);        
863                 if (rc != 0) {
864                         usocklnd_conn_kill_locked(conn);
865                         pthread_mutex_unlock(&conn->uc_lock);
866                         goto find_or_create_conn_failed;
867                 }
868         }
869         pthread_mutex_unlock(&conn->uc_lock);         
870
871         usocklnd_conn_addref(conn);
872         pthread_mutex_unlock(&peer->up_lock);
873
874         *connp = conn;
875         return 0;
876
877   find_or_create_conn_failed:
878         pthread_mutex_unlock(&peer->up_lock);
879         return rc;
880 }
881
882 void
883 usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
884 {
885         peer->up_conns[idx] = conn;        
886         peer->up_errored    = 0; /* this new fresh conn will try
887                                   * revitalize even stale errored peer */
888 }
889
890 int
891 usocklnd_invert_type(int type)
892 {
893         switch (type)
894         {
895         case SOCKLND_CONN_ANY:
896         case SOCKLND_CONN_CONTROL:
897                 return (type);
898         case SOCKLND_CONN_BULK_IN:
899                 return SOCKLND_CONN_BULK_OUT;
900         case SOCKLND_CONN_BULK_OUT:
901                 return SOCKLND_CONN_BULK_IN;
902         default:
903                 return SOCKLND_CONN_NONE;
904         }
905 }
906
907 void
908 usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
909 {
910         pthread_mutex_lock(&conn->uc_lock);
911         if (conn->uc_state != UC_DEAD)
912                 conn->uc_state = new_state;
913         pthread_mutex_unlock(&conn->uc_lock);
914 }
915
916 /* NB: peer is locked by caller */
917 void
918 usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
919                              usock_conn_t *skip_conn)
920 {
921         int i;
922         
923         if (!peer->up_incrn_is_set) {
924                 peer->up_incarnation = incrn;
925                 peer->up_incrn_is_set = 1;
926                 return;
927         }
928
929         if (peer->up_incarnation == incrn)
930                 return;
931
932         peer->up_incarnation = incrn;
933         
934         for (i = 0; i < N_CONN_TYPES; i++) {
935                 usock_conn_t *conn = peer->up_conns[i];
936                 
937                 if (conn == NULL || conn == skip_conn)
938                         continue;
939
940                 pthread_mutex_lock(&conn->uc_lock);        
941                 LASSERT (conn->uc_peer == peer);
942                 conn->uc_peer = NULL;
943                 peer->up_conns[i] = NULL;
944                 if (conn->uc_state != UC_DEAD)
945                         usocklnd_conn_kill_locked(conn);                
946                 pthread_mutex_unlock(&conn->uc_lock);
947
948                 usocklnd_conn_decref(conn);
949                 usocklnd_peer_decref(peer);
950         }
951 }
952
953 /* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
954  * MAGIC part of hello and set uc_rx_state
955  */
956 void
957 usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
958 {
959         LASSERT(conn->uc_rx_hello != NULL);
960
961         conn->uc_rx_niov = 1;
962         conn->uc_rx_iov = conn->uc_rx_iova;
963         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
964         conn->uc_rx_iov[0].iov_len =
965                 conn->uc_rx_nob_wanted =
966                 conn->uc_rx_nob_left =
967                 sizeof(conn->uc_rx_hello->kshm_magic);
968
969         conn->uc_rx_state = UC_RX_HELLO_MAGIC;
970
971         conn->uc_rx_flag = 1; /* waiting for incoming hello */
972         conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
973 }
974
975 /* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
976  * VERSION part of hello and set uc_rx_state
977  */
978 void
979 usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
980 {
981         LASSERT(conn->uc_rx_hello != NULL);
982
983         conn->uc_rx_niov = 1;
984         conn->uc_rx_iov = conn->uc_rx_iova;
985         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
986         conn->uc_rx_iov[0].iov_len =
987                 conn->uc_rx_nob_wanted =
988                 conn->uc_rx_nob_left =
989                 sizeof(conn->uc_rx_hello->kshm_version);
990         
991         conn->uc_rx_state = UC_RX_HELLO_VERSION;
992 }
993
994 /* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
995  * the rest  of hello and set uc_rx_state
996  */
997 void
998 usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
999 {
1000         LASSERT(conn->uc_rx_hello != NULL);
1001
1002         conn->uc_rx_niov = 1;
1003         conn->uc_rx_iov = conn->uc_rx_iova;
1004         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
1005         conn->uc_rx_iov[0].iov_len =
1006                 conn->uc_rx_nob_wanted =
1007                 conn->uc_rx_nob_left =
1008                 offsetof(ksock_hello_msg_t, kshm_ips) -
1009                 offsetof(ksock_hello_msg_t, kshm_src_nid);
1010         
1011         conn->uc_rx_state = UC_RX_HELLO_BODY;
1012 }
1013
1014 /* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
1015  * array of IPs and set uc_rx_state
1016  */
1017 void
1018 usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
1019 {
1020         LASSERT(conn->uc_rx_hello != NULL);
1021
1022         conn->uc_rx_niov = 1;
1023         conn->uc_rx_iov = conn->uc_rx_iova;
1024         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
1025         conn->uc_rx_iov[0].iov_len =
1026                 conn->uc_rx_nob_wanted =
1027                 conn->uc_rx_nob_left =
1028                 conn->uc_rx_hello->kshm_nips *
1029                 sizeof(conn->uc_rx_hello->kshm_ips[0]);
1030         
1031         conn->uc_rx_state = UC_RX_HELLO_IPS;
1032 }
1033
1034 /* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
1035  * LNET header and set uc_rx_state
1036  */
1037 void
1038 usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
1039 {
1040         conn->uc_rx_niov = 1;
1041         conn->uc_rx_iov = conn->uc_rx_iova;
1042         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;                
1043         conn->uc_rx_iov[0].iov_len =
1044                 conn->uc_rx_nob_wanted =
1045                 conn->uc_rx_nob_left =
1046                 sizeof(ksock_lnet_msg_t);
1047         
1048         conn->uc_rx_state = UC_RX_LNET_HEADER;
1049         conn->uc_rx_flag = 1;
1050 }
1051
1052 /* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
1053  * KSM header and set uc_rx_state
1054  */
1055 void
1056 usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
1057 {
1058         conn->uc_rx_niov = 1;
1059         conn->uc_rx_iov = conn->uc_rx_iova;
1060         conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;                
1061         conn->uc_rx_iov[0].iov_len =
1062                 conn->uc_rx_nob_wanted =
1063                 conn->uc_rx_nob_left =                        
1064                 offsetof(ksock_msg_t, ksm_u);
1065         
1066         conn->uc_rx_state = UC_RX_KSM_HEADER;
1067         conn->uc_rx_flag = 0;
1068 }
1069
1070 /* RX state transition to UC_RX_SKIPPING: update RX part for
1071  * skipping and set uc_rx_state
1072  */
1073 void
1074 usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
1075 {
1076         static char    skip_buffer[4096];
1077
1078         int            nob;
1079         unsigned int   niov = 0;
1080         int            skipped = 0;
1081         int            nob_to_skip = conn->uc_rx_nob_left;
1082         
1083         LASSERT(nob_to_skip != 0);
1084
1085         conn->uc_rx_iov = conn->uc_rx_iova;
1086
1087         /* Set up to skip as much as possible now.  If there's more left
1088          * (ran out of iov entries) we'll get called again */
1089
1090         do {
1091                 nob = MIN (nob_to_skip, sizeof(skip_buffer));
1092
1093                 conn->uc_rx_iov[niov].iov_base = skip_buffer;
1094                 conn->uc_rx_iov[niov].iov_len  = nob;
1095                 niov++;
1096                 skipped += nob;
1097                 nob_to_skip -=nob;
1098
1099         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1100                  niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
1101
1102         conn->uc_rx_niov = niov;
1103         conn->uc_rx_nob_wanted = skipped;
1104
1105         conn->uc_rx_state = UC_RX_SKIPPING;
1106 }