Whamcloud - gitweb
LU-1146 build: batch update copyright messages
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/socklnd/socklnd.c
39  *
40  * Author: Zach Brown <zab@zabbo.net>
41  * Author: Peter J. Braam <braam@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  * Author: Eric Barton <eric@bartonsoftware.com>
44  */
45
46 #include "socklnd.h"
47
48 lnd_t                   the_ksocklnd;
49 ksock_nal_data_t        ksocknal_data;
50
51 ksock_interface_t *
52 ksocknal_ip2iface(lnet_ni_t *ni, __u32 ip)
53 {
54         ksock_net_t       *net = ni->ni_data;
55         int                i;
56         ksock_interface_t *iface;
57
58         for (i = 0; i < net->ksnn_ninterfaces; i++) {
59                 LASSERT(i < LNET_MAX_INTERFACES);
60                 iface = &net->ksnn_interfaces[i];
61
62                 if (iface->ksni_ipaddr == ip)
63                         return (iface);
64         }
65
66         return (NULL);
67 }
68
69 ksock_route_t *
70 ksocknal_create_route (__u32 ipaddr, int port)
71 {
72         ksock_route_t *route;
73
74         LIBCFS_ALLOC (route, sizeof (*route));
75         if (route == NULL)
76                 return (NULL);
77
78         cfs_atomic_set (&route->ksnr_refcount, 1);
79         route->ksnr_peer = NULL;
80         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
81         route->ksnr_ipaddr = ipaddr;
82         route->ksnr_port = port;
83         route->ksnr_scheduled = 0;
84         route->ksnr_connecting = 0;
85         route->ksnr_connected = 0;
86         route->ksnr_deleted = 0;
87         route->ksnr_conn_count = 0;
88         route->ksnr_share_count = 0;
89
90         return (route);
91 }
92
93 void
94 ksocknal_destroy_route (ksock_route_t *route)
95 {
96         LASSERT (cfs_atomic_read(&route->ksnr_refcount) == 0);
97
98         if (route->ksnr_peer != NULL)
99                 ksocknal_peer_decref(route->ksnr_peer);
100
101         LIBCFS_FREE (route, sizeof (*route));
102 }
103
104 int
105 ksocknal_create_peer (ksock_peer_t **peerp, lnet_ni_t *ni, lnet_process_id_t id)
106 {
107         ksock_net_t   *net = ni->ni_data;
108         ksock_peer_t  *peer;
109
110         LASSERT (id.nid != LNET_NID_ANY);
111         LASSERT (id.pid != LNET_PID_ANY);
112         LASSERT (!cfs_in_interrupt());
113
114         LIBCFS_ALLOC (peer, sizeof (*peer));
115         if (peer == NULL)
116                 return -ENOMEM;
117
118         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
119
120         peer->ksnp_ni = ni;
121         peer->ksnp_id = id;
122         cfs_atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
123         peer->ksnp_closing = 0;
124         peer->ksnp_accepting = 0;
125         peer->ksnp_proto = NULL;
126         peer->ksnp_last_alive = 0;
127         peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
128
129         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
130         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
131         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
132         CFS_INIT_LIST_HEAD (&peer->ksnp_zc_req_list);
133         cfs_spin_lock_init(&peer->ksnp_lock);
134
135         cfs_spin_lock_bh (&net->ksnn_lock);
136
137         if (net->ksnn_shutdown) {
138                 cfs_spin_unlock_bh (&net->ksnn_lock);
139
140                 LIBCFS_FREE(peer, sizeof(*peer));
141                 CERROR("Can't create peer: network shutdown\n");
142                 return -ESHUTDOWN;
143         }
144
145         net->ksnn_npeers++;
146
147         cfs_spin_unlock_bh (&net->ksnn_lock);
148
149         *peerp = peer;
150         return 0;
151 }
152
153 void
154 ksocknal_destroy_peer (ksock_peer_t *peer)
155 {
156         ksock_net_t    *net = peer->ksnp_ni->ni_data;
157
158         CDEBUG (D_NET, "peer %s %p deleted\n",
159                 libcfs_id2str(peer->ksnp_id), peer);
160
161         LASSERT (cfs_atomic_read (&peer->ksnp_refcount) == 0);
162         LASSERT (peer->ksnp_accepting == 0);
163         LASSERT (cfs_list_empty (&peer->ksnp_conns));
164         LASSERT (cfs_list_empty (&peer->ksnp_routes));
165         LASSERT (cfs_list_empty (&peer->ksnp_tx_queue));
166         LASSERT (cfs_list_empty (&peer->ksnp_zc_req_list));
167
168         LIBCFS_FREE (peer, sizeof (*peer));
169
170         /* NB a peer's connections and routes keep a reference on their peer
171          * until they are destroyed, so we can be assured that _all_ state to
172          * do with this peer has been cleaned up when its refcount drops to
173          * zero. */
174         cfs_spin_lock_bh (&net->ksnn_lock);
175         net->ksnn_npeers--;
176         cfs_spin_unlock_bh (&net->ksnn_lock);
177 }
178
179 ksock_peer_t *
180 ksocknal_find_peer_locked (lnet_ni_t *ni, lnet_process_id_t id)
181 {
182         cfs_list_t       *peer_list = ksocknal_nid2peerlist(id.nid);
183         cfs_list_t       *tmp;
184         ksock_peer_t     *peer;
185
186         cfs_list_for_each (tmp, peer_list) {
187
188                 peer = cfs_list_entry (tmp, ksock_peer_t, ksnp_list);
189
190                 LASSERT (!peer->ksnp_closing);
191
192                 if (peer->ksnp_ni != ni)
193                         continue;
194
195                 if (peer->ksnp_id.nid != id.nid ||
196                     peer->ksnp_id.pid != id.pid)
197                         continue;
198
199                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
200                        peer, libcfs_id2str(id),
201                        cfs_atomic_read(&peer->ksnp_refcount));
202                 return (peer);
203         }
204         return (NULL);
205 }
206
207 ksock_peer_t *
208 ksocknal_find_peer (lnet_ni_t *ni, lnet_process_id_t id)
209 {
210         ksock_peer_t     *peer;
211
212         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
213         peer = ksocknal_find_peer_locked (ni, id);
214         if (peer != NULL)                       /* +1 ref for caller? */
215                 ksocknal_peer_addref(peer);
216         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
217
218         return (peer);
219 }
220
221 void
222 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
223 {
224         int                i;
225         __u32              ip;
226         ksock_interface_t *iface;
227
228         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
229                 LASSERT (i < LNET_MAX_INTERFACES);
230                 ip = peer->ksnp_passive_ips[i];
231
232                 iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
233                 /* All IPs in peer->ksnp_passive_ips[] come from the
234                  * interface list, therefore the call must succeed. */
235                 LASSERT (iface != NULL);
236
237                 CDEBUG(D_NET, "peer=%p iface=%p ksni_nroutes=%d\n",
238                        peer, iface, iface->ksni_nroutes);
239                 iface->ksni_npeers--;
240         }
241
242         LASSERT (cfs_list_empty(&peer->ksnp_conns));
243         LASSERT (cfs_list_empty(&peer->ksnp_routes));
244         LASSERT (!peer->ksnp_closing);
245         peer->ksnp_closing = 1;
246         cfs_list_del (&peer->ksnp_list);
247         /* lose peerlist's ref */
248         ksocknal_peer_decref(peer);
249 }
250
251 int
252 ksocknal_get_peer_info (lnet_ni_t *ni, int index,
253                         lnet_process_id_t *id, __u32 *myip, __u32 *peer_ip,
254                         int *port, int *conn_count, int *share_count)
255 {
256         ksock_peer_t      *peer;
257         cfs_list_t        *ptmp;
258         ksock_route_t     *route;
259         cfs_list_t        *rtmp;
260         int                i;
261         int                j;
262         int                rc = -ENOENT;
263
264         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
265
266         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
267
268                 cfs_list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
269                         peer = cfs_list_entry (ptmp, ksock_peer_t, ksnp_list);
270
271                         if (peer->ksnp_ni != ni)
272                                 continue;
273
274                         if (peer->ksnp_n_passive_ips == 0 &&
275                             cfs_list_empty(&peer->ksnp_routes)) {
276                                 if (index-- > 0)
277                                         continue;
278
279                                 *id = peer->ksnp_id;
280                                 *myip = 0;
281                                 *peer_ip = 0;
282                                 *port = 0;
283                                 *conn_count = 0;
284                                 *share_count = 0;
285                                 rc = 0;
286                                 goto out;
287                         }
288
289                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
290                                 if (index-- > 0)
291                                         continue;
292
293                                 *id = peer->ksnp_id;
294                                 *myip = peer->ksnp_passive_ips[j];
295                                 *peer_ip = 0;
296                                 *port = 0;
297                                 *conn_count = 0;
298                                 *share_count = 0;
299                                 rc = 0;
300                                 goto out;
301                         }
302
303                         cfs_list_for_each (rtmp, &peer->ksnp_routes) {
304                                 if (index-- > 0)
305                                         continue;
306
307                                 route = cfs_list_entry(rtmp, ksock_route_t,
308                                                        ksnr_list);
309
310                                 *id = peer->ksnp_id;
311                                 *myip = route->ksnr_myipaddr;
312                                 *peer_ip = route->ksnr_ipaddr;
313                                 *port = route->ksnr_port;
314                                 *conn_count = route->ksnr_conn_count;
315                                 *share_count = route->ksnr_share_count;
316                                 rc = 0;
317                                 goto out;
318                         }
319                 }
320         }
321  out:
322         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
323         return (rc);
324 }
325
326 void
327 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
328 {
329         ksock_peer_t      *peer = route->ksnr_peer;
330         int                type = conn->ksnc_type;
331         ksock_interface_t *iface;
332
333         conn->ksnc_route = route;
334         ksocknal_route_addref(route);
335
336         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
337                 if (route->ksnr_myipaddr == 0) {
338                         /* route wasn't bound locally yet (the initial route) */
339                         CDEBUG(D_NET, "Binding %s %u.%u.%u.%u to %u.%u.%u.%u\n",
340                                libcfs_id2str(peer->ksnp_id),
341                                HIPQUAD(route->ksnr_ipaddr),
342                                HIPQUAD(conn->ksnc_myipaddr));
343                 } else {
344                         CDEBUG(D_NET, "Rebinding %s %u.%u.%u.%u from "
345                                "%u.%u.%u.%u to %u.%u.%u.%u\n",
346                                libcfs_id2str(peer->ksnp_id),
347                                HIPQUAD(route->ksnr_ipaddr),
348                                HIPQUAD(route->ksnr_myipaddr),
349                                HIPQUAD(conn->ksnc_myipaddr));
350
351                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
352                                                   route->ksnr_myipaddr);
353                         if (iface != NULL)
354                                 iface->ksni_nroutes--;
355                 }
356                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
357                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
358                                           route->ksnr_myipaddr);
359                 if (iface != NULL)
360                         iface->ksni_nroutes++;
361         }
362
363         route->ksnr_connected |= (1<<type);
364         route->ksnr_conn_count++;
365
366         /* Successful connection => further attempts can
367          * proceed immediately */
368         route->ksnr_retry_interval = 0;
369 }
370
371 void
372 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
373 {
374         cfs_list_t        *tmp;
375         ksock_conn_t      *conn;
376         ksock_route_t     *route2;
377
378         LASSERT (!peer->ksnp_closing);
379         LASSERT (route->ksnr_peer == NULL);
380         LASSERT (!route->ksnr_scheduled);
381         LASSERT (!route->ksnr_connecting);
382         LASSERT (route->ksnr_connected == 0);
383
384         /* LASSERT(unique) */
385         cfs_list_for_each(tmp, &peer->ksnp_routes) {
386                 route2 = cfs_list_entry(tmp, ksock_route_t, ksnr_list);
387
388                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
389                         CERROR ("Duplicate route %s %u.%u.%u.%u\n",
390                                 libcfs_id2str(peer->ksnp_id),
391                                 HIPQUAD(route->ksnr_ipaddr));
392                         LBUG();
393                 }
394         }
395
396         route->ksnr_peer = peer;
397         ksocknal_peer_addref(peer);
398         /* peer's routelist takes over my ref on 'route' */
399         cfs_list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
400
401         cfs_list_for_each(tmp, &peer->ksnp_conns) {
402                 conn = cfs_list_entry(tmp, ksock_conn_t, ksnc_list);
403
404                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
405                         continue;
406
407                 ksocknal_associate_route_conn_locked(route, conn);
408                 /* keep going (typed routes) */
409         }
410 }
411
412 void
413 ksocknal_del_route_locked (ksock_route_t *route)
414 {
415         ksock_peer_t      *peer = route->ksnr_peer;
416         ksock_interface_t *iface;
417         ksock_conn_t      *conn;
418         cfs_list_t        *ctmp;
419         cfs_list_t        *cnxt;
420
421         LASSERT (!route->ksnr_deleted);
422
423         /* Close associated conns */
424         cfs_list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
425                 conn = cfs_list_entry(ctmp, ksock_conn_t, ksnc_list);
426
427                 if (conn->ksnc_route != route)
428                         continue;
429
430                 ksocknal_close_conn_locked (conn, 0);
431         }
432
433         if (route->ksnr_myipaddr != 0) {
434                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
435                                           route->ksnr_myipaddr);
436                 if (iface != NULL)
437                         iface->ksni_nroutes--;
438         }
439
440         route->ksnr_deleted = 1;
441         cfs_list_del (&route->ksnr_list);
442         ksocknal_route_decref(route);             /* drop peer's ref */
443
444         if (cfs_list_empty (&peer->ksnp_routes) &&
445             cfs_list_empty (&peer->ksnp_conns)) {
446                 /* I've just removed the last route to a peer with no active
447                  * connections */
448                 ksocknal_unlink_peer_locked (peer);
449         }
450 }
451
452 int
453 ksocknal_add_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ipaddr, int port)
454 {
455         cfs_list_t        *tmp;
456         ksock_peer_t      *peer;
457         ksock_peer_t      *peer2;
458         ksock_route_t     *route;
459         ksock_route_t     *route2;
460         int                rc;
461
462         if (id.nid == LNET_NID_ANY ||
463             id.pid == LNET_PID_ANY)
464                 return (-EINVAL);
465
466         /* Have a brand new peer ready... */
467         rc = ksocknal_create_peer(&peer, ni, id);
468         if (rc != 0)
469                 return rc;
470
471         route = ksocknal_create_route (ipaddr, port);
472         if (route == NULL) {
473                 ksocknal_peer_decref(peer);
474                 return (-ENOMEM);
475         }
476
477         cfs_write_lock_bh (&ksocknal_data.ksnd_global_lock);
478
479         /* always called with a ref on ni, so shutdown can't have started */
480         LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
481
482         peer2 = ksocknal_find_peer_locked (ni, id);
483         if (peer2 != NULL) {
484                 ksocknal_peer_decref(peer);
485                 peer = peer2;
486         } else {
487                 /* peer table takes my ref on peer */
488                 cfs_list_add_tail (&peer->ksnp_list,
489                                    ksocknal_nid2peerlist (id.nid));
490         }
491
492         route2 = NULL;
493         cfs_list_for_each (tmp, &peer->ksnp_routes) {
494                 route2 = cfs_list_entry(tmp, ksock_route_t, ksnr_list);
495
496                 if (route2->ksnr_ipaddr == ipaddr)
497                         break;
498
499                 route2 = NULL;
500         }
501         if (route2 == NULL) {
502                 ksocknal_add_route_locked(peer, route);
503                 route->ksnr_share_count++;
504         } else {
505                 ksocknal_route_decref(route);
506                 route2->ksnr_share_count++;
507         }
508
509         cfs_write_unlock_bh (&ksocknal_data.ksnd_global_lock);
510
511         return (0);
512 }
513
514 void
515 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip)
516 {
517         ksock_conn_t     *conn;
518         ksock_route_t    *route;
519         cfs_list_t       *tmp;
520         cfs_list_t       *nxt;
521         int               nshared;
522
523         LASSERT (!peer->ksnp_closing);
524
525         /* Extra ref prevents peer disappearing until I'm done with it */
526         ksocknal_peer_addref(peer);
527
528         cfs_list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
529                 route = cfs_list_entry(tmp, ksock_route_t, ksnr_list);
530
531                 /* no match */
532                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
533                         continue;
534
535                 route->ksnr_share_count = 0;
536                 /* This deletes associated conns too */
537                 ksocknal_del_route_locked (route);
538         }
539
540         nshared = 0;
541         cfs_list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
542                 route = cfs_list_entry(tmp, ksock_route_t, ksnr_list);
543                 nshared += route->ksnr_share_count;
544         }
545
546         if (nshared == 0) {
547                 /* remove everything else if there are no explicit entries
548                  * left */
549
550                 cfs_list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
551                         route = cfs_list_entry(tmp, ksock_route_t, ksnr_list);
552
553                         /* we should only be removing auto-entries */
554                         LASSERT(route->ksnr_share_count == 0);
555                         ksocknal_del_route_locked (route);
556                 }
557
558                 cfs_list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
559                         conn = cfs_list_entry(tmp, ksock_conn_t, ksnc_list);
560
561                         ksocknal_close_conn_locked(conn, 0);
562                 }
563         }
564
565         ksocknal_peer_decref(peer);
566         /* NB peer unlinks itself when last conn/route is removed */
567 }
568
569 int
570 ksocknal_del_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ip)
571 {
572         CFS_LIST_HEAD     (zombies);
573         cfs_list_t        *ptmp;
574         cfs_list_t        *pnxt;
575         ksock_peer_t      *peer;
576         int                lo;
577         int                hi;
578         int                i;
579         int                rc = -ENOENT;
580
581         cfs_write_lock_bh (&ksocknal_data.ksnd_global_lock);
582
583         if (id.nid != LNET_NID_ANY)
584                 lo = hi = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
585         else {
586                 lo = 0;
587                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
588         }
589
590         for (i = lo; i <= hi; i++) {
591                 cfs_list_for_each_safe (ptmp, pnxt,
592                                         &ksocknal_data.ksnd_peers[i]) {
593                         peer = cfs_list_entry (ptmp, ksock_peer_t, ksnp_list);
594
595                         if (peer->ksnp_ni != ni)
596                                 continue;
597
598                         if (!((id.nid == LNET_NID_ANY || peer->ksnp_id.nid == id.nid) &&
599                               (id.pid == LNET_PID_ANY || peer->ksnp_id.pid == id.pid)))
600                                 continue;
601
602                         ksocknal_peer_addref(peer);     /* a ref for me... */
603
604                         ksocknal_del_peer_locked (peer, ip);
605
606                         if (peer->ksnp_closing &&
607                             !cfs_list_empty(&peer->ksnp_tx_queue)) {
608                                 LASSERT (cfs_list_empty(&peer->ksnp_conns));
609                                 LASSERT (cfs_list_empty(&peer->ksnp_routes));
610
611                                 cfs_list_splice_init(&peer->ksnp_tx_queue,
612                                                      &zombies);
613                         }
614
615                         ksocknal_peer_decref(peer);     /* ...till here */
616
617                         rc = 0;                 /* matched! */
618                 }
619         }
620
621         cfs_write_unlock_bh (&ksocknal_data.ksnd_global_lock);
622
623         ksocknal_txlist_done(ni, &zombies, 1);
624
625         return (rc);
626 }
627
628 ksock_conn_t *
629 ksocknal_get_conn_by_idx (lnet_ni_t *ni, int index)
630 {
631         ksock_peer_t      *peer;
632         cfs_list_t        *ptmp;
633         ksock_conn_t      *conn;
634         cfs_list_t        *ctmp;
635         int                i;
636
637         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
638
639         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
640                 cfs_list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
641                         peer = cfs_list_entry (ptmp, ksock_peer_t, ksnp_list);
642
643                         LASSERT (!peer->ksnp_closing);
644
645                         if (peer->ksnp_ni != ni)
646                                 continue;
647
648                         cfs_list_for_each (ctmp, &peer->ksnp_conns) {
649                                 if (index-- > 0)
650                                         continue;
651
652                                 conn = cfs_list_entry (ctmp, ksock_conn_t,
653                                                        ksnc_list);
654                                 ksocknal_conn_addref(conn);
655                                 cfs_read_unlock (&ksocknal_data. \
656                                                  ksnd_global_lock);
657                                 return (conn);
658                         }
659                 }
660         }
661
662         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
663         return (NULL);
664 }
665
666 ksock_sched_t *
667 ksocknal_choose_scheduler_locked (unsigned int irq)
668 {
669         ksock_sched_t    *sched;
670         ksock_irqinfo_t  *info;
671         int               i;
672
673         LASSERT (irq < CFS_NR_IRQS);
674         info = &ksocknal_data.ksnd_irqinfo[irq];
675
676         if (irq != 0 &&                         /* hardware NIC */
677             info->ksni_valid) {                 /* already set up */
678                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
679         }
680
681         /* software NIC (irq == 0) || not associated with a scheduler yet.
682          * Choose the CPU with the fewest connections... */
683         sched = &ksocknal_data.ksnd_schedulers[0];
684         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
685                 if (sched->kss_nconns >
686                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
687                         sched = &ksocknal_data.ksnd_schedulers[i];
688
689         if (irq != 0) {                         /* Hardware NIC */
690                 info->ksni_valid = 1;
691                 info->ksni_sched = (unsigned int)(sched - ksocknal_data.ksnd_schedulers);
692
693                 /* no overflow... */
694                 LASSERT (info->ksni_sched == (unsigned int)(sched - ksocknal_data.ksnd_schedulers));
695         }
696
697         return (sched);
698 }
699
700 int
701 ksocknal_local_ipvec (lnet_ni_t *ni, __u32 *ipaddrs)
702 {
703         ksock_net_t       *net = ni->ni_data;
704         int                i;
705         int                nip;
706
707         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
708
709         nip = net->ksnn_ninterfaces;
710         LASSERT (nip <= LNET_MAX_INTERFACES);
711
712         /* Only offer interfaces for additional connections if I have 
713          * more than one. */
714         if (nip < 2) {
715                 cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
716                 return 0;
717         }
718
719         for (i = 0; i < nip; i++) {
720                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
721                 LASSERT (ipaddrs[i] != 0);
722         }
723
724         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
725         return (nip);
726 }
727
728 int
729 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
730 {
731         int   best_netmatch = 0;
732         int   best_xor      = 0;
733         int   best          = -1;
734         int   this_xor;
735         int   this_netmatch;
736         int   i;
737
738         for (i = 0; i < nips; i++) {
739                 if (ips[i] == 0)
740                         continue;
741
742                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
743                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
744
745                 if (!(best < 0 ||
746                       best_netmatch < this_netmatch ||
747                       (best_netmatch == this_netmatch &&
748                        best_xor > this_xor)))
749                         continue;
750
751                 best = i;
752                 best_netmatch = this_netmatch;
753                 best_xor = this_xor;
754         }
755
756         LASSERT (best >= 0);
757         return (best);
758 }
759
760 int
761 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
762 {
763         cfs_rwlock_t       *global_lock = &ksocknal_data.ksnd_global_lock;
764         ksock_net_t        *net = peer->ksnp_ni->ni_data;
765         ksock_interface_t  *iface;
766         ksock_interface_t  *best_iface;
767         int                 n_ips;
768         int                 i;
769         int                 j;
770         int                 k;
771         __u32               ip;
772         __u32               xor;
773         int                 this_netmatch;
774         int                 best_netmatch;
775         int                 best_npeers;
776
777         /* CAVEAT EMPTOR: We do all our interface matching with an
778          * exclusive hold of global lock at IRQ priority.  We're only
779          * expecting to be dealing with small numbers of interfaces, so the
780          * O(n**3)-ness shouldn't matter */
781
782         /* Also note that I'm not going to return more than n_peerips
783          * interfaces, even if I have more myself */
784
785         cfs_write_lock_bh (global_lock);
786
787         LASSERT (n_peerips <= LNET_MAX_INTERFACES);
788         LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
789
790         /* Only match interfaces for additional connections 
791          * if I have > 1 interface */
792         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
793                 MIN(n_peerips, net->ksnn_ninterfaces);
794
795         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
796                 /*              ^ yes really... */
797
798                 /* If we have any new interfaces, first tick off all the
799                  * peer IPs that match old interfaces, then choose new
800                  * interfaces to match the remaining peer IPS.
801                  * We don't forget interfaces we've stopped using; we might
802                  * start using them again... */
803
804                 if (i < peer->ksnp_n_passive_ips) {
805                         /* Old interface. */
806                         ip = peer->ksnp_passive_ips[i];
807                         best_iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
808
809                         /* peer passive ips are kept up to date */
810                         LASSERT(best_iface != NULL);
811                 } else {
812                         /* choose a new interface */
813                         LASSERT (i == peer->ksnp_n_passive_ips);
814
815                         best_iface = NULL;
816                         best_netmatch = 0;
817                         best_npeers = 0;
818
819                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
820                                 iface = &net->ksnn_interfaces[j];
821                                 ip = iface->ksni_ipaddr;
822
823                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
824                                         if (peer->ksnp_passive_ips[k] == ip)
825                                                 break;
826
827                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
828                                         continue;
829
830                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
831                                 xor = (ip ^ peerips[k]);
832                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
833
834                                 if (!(best_iface == NULL ||
835                                       best_netmatch < this_netmatch ||
836                                       (best_netmatch == this_netmatch &&
837                                        best_npeers > iface->ksni_npeers)))
838                                         continue;
839
840                                 best_iface = iface;
841                                 best_netmatch = this_netmatch;
842                                 best_npeers = iface->ksni_npeers;
843                         }
844
845                         best_iface->ksni_npeers++;
846                         ip = best_iface->ksni_ipaddr;
847                         peer->ksnp_passive_ips[i] = ip;
848                         peer->ksnp_n_passive_ips = i+1;
849                 }
850
851                 LASSERT (best_iface != NULL);
852
853                 /* mark the best matching peer IP used */
854                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
855                 peerips[j] = 0;
856         }
857
858         /* Overwrite input peer IP addresses */
859         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
860
861         cfs_write_unlock_bh (global_lock);
862
863         return (n_ips);
864 }
865
866 void
867 ksocknal_create_routes(ksock_peer_t *peer, int port,
868                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
869 {
870         ksock_route_t       *newroute = NULL;
871         cfs_rwlock_t        *global_lock = &ksocknal_data.ksnd_global_lock;
872         lnet_ni_t           *ni = peer->ksnp_ni;
873         ksock_net_t         *net = ni->ni_data;
874         cfs_list_t          *rtmp;
875         ksock_route_t       *route;
876         ksock_interface_t   *iface;
877         ksock_interface_t   *best_iface;
878         int                  best_netmatch;
879         int                  this_netmatch;
880         int                  best_nroutes;
881         int                  i;
882         int                  j;
883
884         /* CAVEAT EMPTOR: We do all our interface matching with an
885          * exclusive hold of global lock at IRQ priority.  We're only
886          * expecting to be dealing with small numbers of interfaces, so the
887          * O(n**3)-ness here shouldn't matter */
888
889         cfs_write_lock_bh (global_lock);
890
891         if (net->ksnn_ninterfaces < 2) {
892                 /* Only create additional connections 
893                  * if I have > 1 interface */
894                 cfs_write_unlock_bh (global_lock);
895                 return;
896         }
897
898         LASSERT (npeer_ipaddrs <= LNET_MAX_INTERFACES);
899
900         for (i = 0; i < npeer_ipaddrs; i++) {
901                 if (newroute != NULL) {
902                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
903                 } else {
904                         cfs_write_unlock_bh (global_lock);
905
906                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
907                         if (newroute == NULL)
908                                 return;
909
910                         cfs_write_lock_bh (global_lock);
911                 }
912
913                 if (peer->ksnp_closing) {
914                         /* peer got closed under me */
915                         break;
916                 }
917
918                 /* Already got a route? */
919                 route = NULL;
920                 cfs_list_for_each(rtmp, &peer->ksnp_routes) {
921                         route = cfs_list_entry(rtmp, ksock_route_t, ksnr_list);
922
923                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
924                                 break;
925
926                         route = NULL;
927                 }
928                 if (route != NULL)
929                         continue;
930
931                 best_iface = NULL;
932                 best_nroutes = 0;
933                 best_netmatch = 0;
934
935                 LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
936
937                 /* Select interface to connect from */
938                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
939                         iface = &net->ksnn_interfaces[j];
940
941                         /* Using this interface already? */
942                         cfs_list_for_each(rtmp, &peer->ksnp_routes) {
943                                 route = cfs_list_entry(rtmp, ksock_route_t,
944                                                        ksnr_list);
945
946                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
947                                         break;
948
949                                 route = NULL;
950                         }
951                         if (route != NULL)
952                                 continue;
953
954                         this_netmatch = (((iface->ksni_ipaddr ^
955                                            newroute->ksnr_ipaddr) &
956                                            iface->ksni_netmask) == 0) ? 1 : 0;
957
958                         if (!(best_iface == NULL ||
959                               best_netmatch < this_netmatch ||
960                               (best_netmatch == this_netmatch &&
961                                best_nroutes > iface->ksni_nroutes)))
962                                 continue;
963
964                         best_iface = iface;
965                         best_netmatch = this_netmatch;
966                         best_nroutes = iface->ksni_nroutes;
967                 }
968
969                 if (best_iface == NULL)
970                         continue;
971
972                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
973                 best_iface->ksni_nroutes++;
974
975                 ksocknal_add_route_locked(peer, newroute);
976                 newroute = NULL;
977         }
978
979         cfs_write_unlock_bh (global_lock);
980         if (newroute != NULL)
981                 ksocknal_route_decref(newroute);
982 }
983
984 int
985 ksocknal_accept (lnet_ni_t *ni, cfs_socket_t *sock)
986 {
987         ksock_connreq_t    *cr;
988         int                 rc;
989         __u32               peer_ip;
990         int                 peer_port;
991
992         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
993         LASSERT (rc == 0);                      /* we succeeded before */
994
995         LIBCFS_ALLOC(cr, sizeof(*cr));
996         if (cr == NULL) {
997                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
998                                    "%u.%u.%u.%u: memory exhausted\n",
999                                    HIPQUAD(peer_ip));
1000                 return -ENOMEM;
1001         }
1002
1003         lnet_ni_addref(ni);
1004         cr->ksncr_ni   = ni;
1005         cr->ksncr_sock = sock;
1006
1007         cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
1008
1009         cfs_list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
1010         cfs_waitq_signal(&ksocknal_data.ksnd_connd_waitq);
1011
1012         cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
1013         return 0;
1014 }
1015
1016 int
1017 ksocknal_connecting (ksock_peer_t *peer, __u32 ipaddr)
1018 {
1019         ksock_route_t   *route;
1020
1021         cfs_list_for_each_entry_typed (route, &peer->ksnp_routes,
1022                                        ksock_route_t, ksnr_list) {
1023
1024                 if (route->ksnr_ipaddr == ipaddr)
1025                         return route->ksnr_connecting;
1026         }
1027         return 0;
1028 }
1029
1030 int
1031 ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
1032                       cfs_socket_t *sock, int type)
1033 {
1034         cfs_rwlock_t      *global_lock = &ksocknal_data.ksnd_global_lock;
1035         CFS_LIST_HEAD     (zombies);
1036         lnet_process_id_t  peerid;
1037         cfs_list_t        *tmp;
1038         __u64              incarnation;
1039         ksock_conn_t      *conn;
1040         ksock_conn_t      *conn2;
1041         ksock_peer_t      *peer = NULL;
1042         ksock_peer_t      *peer2;
1043         ksock_sched_t     *sched;
1044         ksock_hello_msg_t *hello;
1045         unsigned int       irq;
1046         ksock_tx_t        *tx;
1047         ksock_tx_t        *txtmp;
1048         int                rc;
1049         int                active;
1050         char              *warn = NULL;
1051
1052         active = (route != NULL);
1053
1054         LASSERT (active == (type != SOCKLND_CONN_NONE));
1055
1056         irq = ksocknal_lib_sock_irq (sock);
1057
1058         LIBCFS_ALLOC(conn, sizeof(*conn));
1059         if (conn == NULL) {
1060                 rc = -ENOMEM;
1061                 goto failed_0;
1062         }
1063
1064         memset (conn, 0, sizeof (*conn));
1065
1066         conn->ksnc_peer = NULL;
1067         conn->ksnc_route = NULL;
1068         conn->ksnc_sock = sock;
1069         /* 2 ref, 1 for conn, another extra ref prevents socket
1070          * being closed before establishment of connection */
1071         cfs_atomic_set (&conn->ksnc_sock_refcount, 2);
1072         conn->ksnc_type = type;
1073         ksocknal_lib_save_callback(sock, conn);
1074         cfs_atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1075
1076         conn->ksnc_rx_ready = 0;
1077         conn->ksnc_rx_scheduled = 0;
1078
1079         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
1080         conn->ksnc_tx_ready = 0;
1081         conn->ksnc_tx_scheduled = 0;
1082         conn->ksnc_tx_carrier = NULL;
1083         cfs_atomic_set (&conn->ksnc_tx_nob, 0);
1084
1085         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
1086                                      kshm_ips[LNET_MAX_INTERFACES]));
1087         if (hello == NULL) {
1088                 rc = -ENOMEM;
1089                 goto failed_1;
1090         }
1091
1092         /* stash conn's local and remote addrs */
1093         rc = ksocknal_lib_get_conn_addrs (conn);
1094         if (rc != 0)
1095                 goto failed_1;
1096
1097         /* Find out/confirm peer's NID and connection type and get the
1098          * vector of interfaces she's willing to let me connect to.
1099          * Passive connections use the listener timeout since the peer sends
1100          * eagerly */
1101
1102         if (active) {
1103                 peer = route->ksnr_peer;
1104                 LASSERT(ni == peer->ksnp_ni);
1105
1106                 /* Active connection sends HELLO eagerly */
1107                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1108                 peerid = peer->ksnp_id;
1109
1110                 cfs_write_lock_bh(global_lock);
1111                 conn->ksnc_proto = peer->ksnp_proto;
1112                 cfs_write_unlock_bh(global_lock);
1113
1114                 if (conn->ksnc_proto == NULL) {
1115                          conn->ksnc_proto = &ksocknal_protocol_v3x;
1116 #if SOCKNAL_VERSION_DEBUG
1117                          if (*ksocknal_tunables.ksnd_protocol == 2)
1118                                  conn->ksnc_proto = &ksocknal_protocol_v2x;
1119                          else if (*ksocknal_tunables.ksnd_protocol == 1)
1120                                  conn->ksnc_proto = &ksocknal_protocol_v1x;
1121 #endif
1122                 }
1123
1124                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1125                 if (rc != 0)
1126                         goto failed_1;
1127         } else {
1128                 peerid.nid = LNET_NID_ANY;
1129                 peerid.pid = LNET_PID_ANY;
1130
1131                 /* Passive, get protocol from peer */
1132                 conn->ksnc_proto = NULL;
1133         }
1134
1135         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1136         if (rc < 0)
1137                 goto failed_1;
1138
1139         LASSERT (rc == 0 || active);
1140         LASSERT (conn->ksnc_proto != NULL);
1141         LASSERT (peerid.nid != LNET_NID_ANY);
1142
1143         if (active) {
1144                 ksocknal_peer_addref(peer);
1145                 cfs_write_lock_bh (global_lock);
1146         } else {
1147                 rc = ksocknal_create_peer(&peer, ni, peerid);
1148                 if (rc != 0)
1149                         goto failed_1;
1150
1151                 cfs_write_lock_bh (global_lock);
1152
1153                 /* called with a ref on ni, so shutdown can't have started */
1154                 LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
1155
1156                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1157                 if (peer2 == NULL) {
1158                         /* NB this puts an "empty" peer in the peer
1159                          * table (which takes my ref) */
1160                         cfs_list_add_tail(&peer->ksnp_list,
1161                                           ksocknal_nid2peerlist(peerid.nid));
1162                 } else {
1163                         ksocknal_peer_decref(peer);
1164                         peer = peer2;
1165                 }
1166
1167                 /* +1 ref for me */
1168                 ksocknal_peer_addref(peer);
1169                 peer->ksnp_accepting++;
1170
1171                 /* Am I already connecting to this guy?  Resolve in
1172                  * favour of higher NID... */
1173                 if (peerid.nid < ni->ni_nid &&
1174                     ksocknal_connecting(peer, conn->ksnc_ipaddr)) {
1175                         rc = EALREADY;
1176                         warn = "connection race resolution";
1177                         goto failed_2;
1178                 }
1179         }
1180
1181         if (peer->ksnp_closing ||
1182             (active && route->ksnr_deleted)) {
1183                 /* peer/route got closed under me */
1184                 rc = -ESTALE;
1185                 warn = "peer/route removed";
1186                 goto failed_2;
1187         }
1188
1189         if (peer->ksnp_proto == NULL) {
1190                 /* Never connected before.
1191                  * NB recv_hello may have returned EPROTO to signal my peer
1192                  * wants a different protocol than the one I asked for.
1193                  */
1194                 LASSERT (cfs_list_empty(&peer->ksnp_conns));
1195
1196                 peer->ksnp_proto = conn->ksnc_proto;
1197                 peer->ksnp_incarnation = incarnation;
1198         }
1199
1200         if (peer->ksnp_proto != conn->ksnc_proto ||
1201             peer->ksnp_incarnation != incarnation) {
1202                 /* Peer rebooted or I've got the wrong protocol version */
1203                 ksocknal_close_peer_conns_locked(peer, 0, 0);
1204
1205                 peer->ksnp_proto = NULL;
1206                 rc = ESTALE;
1207                 warn = peer->ksnp_incarnation != incarnation ?
1208                        "peer rebooted" :
1209                        "wrong proto version";
1210                 goto failed_2;
1211         }
1212
1213         switch (rc) {
1214         default:
1215                 LBUG();
1216         case 0:
1217                 break;
1218         case EALREADY:
1219                 warn = "lost conn race";
1220                 goto failed_2;
1221         case EPROTO:
1222                 warn = "retry with different protocol version";
1223                 goto failed_2;
1224         }
1225
1226         /* Refuse to duplicate an existing connection, unless this is a
1227          * loopback connection */
1228         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1229                 cfs_list_for_each(tmp, &peer->ksnp_conns) {
1230                         conn2 = cfs_list_entry(tmp, ksock_conn_t, ksnc_list);
1231
1232                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1233                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1234                             conn2->ksnc_type != conn->ksnc_type)
1235                                 continue;
1236
1237                         /* Reply on a passive connection attempt so the peer
1238                          * realises we're connected. */
1239                         LASSERT (rc == 0);
1240                         if (!active)
1241                                 rc = EALREADY;
1242
1243                         warn = "duplicate";
1244                         goto failed_2;
1245                 }
1246         }
1247
1248         /* If the connection created by this route didn't bind to the IP
1249          * address the route connected to, the connection/route matching
1250          * code below probably isn't going to work. */
1251         if (active &&
1252             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1253                 CERROR("Route %s %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1254                        libcfs_id2str(peer->ksnp_id),
1255                        HIPQUAD(route->ksnr_ipaddr),
1256                        HIPQUAD(conn->ksnc_ipaddr));
1257         }
1258
1259         /* Search for a route corresponding to the new connection and
1260          * create an association.  This allows incoming connections created
1261          * by routes in my peer to match my own route entries so I don't
1262          * continually create duplicate routes. */
1263         cfs_list_for_each (tmp, &peer->ksnp_routes) {
1264                 route = cfs_list_entry(tmp, ksock_route_t, ksnr_list);
1265
1266                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1267                         continue;
1268
1269                 ksocknal_associate_route_conn_locked(route, conn);
1270                 break;
1271         }
1272
1273         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1274         peer->ksnp_last_alive = cfs_time_current();
1275         peer->ksnp_send_keepalive = 0;
1276         peer->ksnp_error = 0;
1277
1278         sched = ksocknal_choose_scheduler_locked (irq);
1279         sched->kss_nconns++;
1280         conn->ksnc_scheduler = sched;
1281
1282         conn->ksnc_tx_last_post = cfs_time_current();
1283         /* Set the deadline for the outgoing HELLO to drain */
1284         conn->ksnc_tx_bufnob = libcfs_sock_wmem_queued(sock);
1285         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1286         cfs_mb();   /* order with adding to peer's conn list */
1287
1288         cfs_list_add (&conn->ksnc_list, &peer->ksnp_conns);
1289         ksocknal_conn_addref(conn);
1290
1291         ksocknal_new_packet(conn, 0);
1292
1293         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1294
1295         /* Take packets blocking for this connection. */
1296         cfs_list_for_each_entry_safe(tx, txtmp, &peer->ksnp_tx_queue, tx_list) {
1297                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) == SOCKNAL_MATCH_NO)
1298                                 continue;
1299
1300                 cfs_list_del (&tx->tx_list);
1301                 ksocknal_queue_tx_locked (tx, conn);
1302         }
1303
1304         cfs_write_unlock_bh (global_lock);
1305
1306         /* We've now got a new connection.  Any errors from here on are just
1307          * like "normal" comms errors and we close the connection normally.
1308          * NB (a) we still have to send the reply HELLO for passive
1309          *        connections, 
1310          *    (b) normal I/O on the conn is blocked until I setup and call the
1311          *        socket callbacks.
1312          */
1313
1314         ksocknal_lib_bind_irq (irq);
1315
1316         CDEBUG(D_NET, "New conn %s p %d.x %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1317                " incarnation:"LPD64" sched[%d]/%d\n",
1318                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1319                HIPQUAD(conn->ksnc_myipaddr), HIPQUAD(conn->ksnc_ipaddr),
1320                conn->ksnc_port, incarnation,
1321                (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1322
1323         if (active) {
1324                 /* additional routes after interface exchange? */
1325                 ksocknal_create_routes(peer, conn->ksnc_port,
1326                                        hello->kshm_ips, hello->kshm_nips);
1327         } else {
1328                 hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
1329                                                        hello->kshm_nips);
1330                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1331         }
1332
1333         LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1334                                     kshm_ips[LNET_MAX_INTERFACES]));
1335
1336         /* setup the socket AFTER I've received hello (it disables
1337          * SO_LINGER).  I might call back to the acceptor who may want
1338          * to send a protocol version response and then close the
1339          * socket; this ensures the socket only tears down after the
1340          * response has been sent. */
1341         if (rc == 0)
1342                 rc = ksocknal_lib_setup_sock(sock);
1343
1344         cfs_write_lock_bh(global_lock);
1345
1346         /* NB my callbacks block while I hold ksnd_global_lock */
1347         ksocknal_lib_set_callback(sock, conn);
1348
1349         if (!active)
1350                 peer->ksnp_accepting--;
1351
1352         cfs_write_unlock_bh(global_lock);
1353
1354         if (rc != 0) {
1355                 cfs_write_lock_bh(global_lock);
1356                 if (!conn->ksnc_closing) {
1357                         /* could be closed by another thread */
1358                         ksocknal_close_conn_locked(conn, rc);
1359                 }
1360                 cfs_write_unlock_bh(global_lock);
1361         } else if (ksocknal_connsock_addref(conn) == 0) {
1362                 /* Allow I/O to proceed. */
1363                 ksocknal_read_callback(conn);
1364                 ksocknal_write_callback(conn);
1365                 ksocknal_connsock_decref(conn);
1366         }
1367
1368         ksocknal_connsock_decref(conn);
1369         ksocknal_conn_decref(conn);
1370         return rc;
1371
1372  failed_2:
1373         if (!peer->ksnp_closing &&
1374             cfs_list_empty (&peer->ksnp_conns) &&
1375             cfs_list_empty (&peer->ksnp_routes)) {
1376                 cfs_list_add(&zombies, &peer->ksnp_tx_queue);
1377                 cfs_list_del_init(&peer->ksnp_tx_queue);
1378                 ksocknal_unlink_peer_locked(peer);
1379         }
1380
1381         cfs_write_unlock_bh (global_lock);
1382
1383         if (warn != NULL) {
1384                 if (rc < 0)
1385                         CERROR("Not creating conn %s type %d: %s\n",
1386                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1387                 else
1388                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1389                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1390         }
1391
1392         if (!active) {
1393                 if (rc > 0) {
1394                         /* Request retry by replying with CONN_NONE 
1395                          * ksnc_proto has been set already */
1396                         conn->ksnc_type = SOCKLND_CONN_NONE;
1397                         hello->kshm_nips = 0;
1398                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1399                 }
1400
1401                 cfs_write_lock_bh(global_lock);
1402                 peer->ksnp_accepting--;
1403                 cfs_write_unlock_bh(global_lock);
1404         }
1405
1406         ksocknal_txlist_done(ni, &zombies, 1);
1407         ksocknal_peer_decref(peer);
1408
1409  failed_1:
1410         if (hello != NULL)
1411                 LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1412                                             kshm_ips[LNET_MAX_INTERFACES]));
1413
1414         LIBCFS_FREE (conn, sizeof(*conn));
1415
1416  failed_0:
1417         libcfs_sock_release(sock);
1418         return rc;
1419 }
1420
1421 void
1422 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1423 {
1424         /* This just does the immmediate housekeeping, and queues the
1425          * connection for the reaper to terminate.
1426          * Caller holds ksnd_global_lock exclusively in irq context */
1427         ksock_peer_t      *peer = conn->ksnc_peer;
1428         ksock_route_t     *route;
1429         ksock_conn_t      *conn2;
1430         cfs_list_t        *tmp;
1431
1432         LASSERT (peer->ksnp_error == 0);
1433         LASSERT (!conn->ksnc_closing);
1434         conn->ksnc_closing = 1;
1435
1436         /* ksnd_deathrow_conns takes over peer's ref */
1437         cfs_list_del (&conn->ksnc_list);
1438
1439         route = conn->ksnc_route;
1440         if (route != NULL) {
1441                 /* dissociate conn from route... */
1442                 LASSERT (!route->ksnr_deleted);
1443                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1444
1445                 conn2 = NULL;
1446                 cfs_list_for_each(tmp, &peer->ksnp_conns) {
1447                         conn2 = cfs_list_entry(tmp, ksock_conn_t, ksnc_list);
1448
1449                         if (conn2->ksnc_route == route &&
1450                             conn2->ksnc_type == conn->ksnc_type)
1451                                 break;
1452
1453                         conn2 = NULL;
1454                 }
1455                 if (conn2 == NULL)
1456                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1457
1458                 conn->ksnc_route = NULL;
1459
1460 #if 0           /* irrelevent with only eager routes */
1461                 /* make route least favourite */
1462                 cfs_list_del (&route->ksnr_list);
1463                 cfs_list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1464 #endif
1465                 ksocknal_route_decref(route);     /* drop conn's ref on route */
1466         }
1467
1468         if (cfs_list_empty (&peer->ksnp_conns)) {
1469                 /* No more connections to this peer */
1470
1471                 if (!cfs_list_empty(&peer->ksnp_tx_queue)) {
1472                         ksock_tx_t *tx;
1473
1474                         LASSERT (conn->ksnc_proto == &ksocknal_protocol_v3x);
1475
1476                         /* throw them to the last connection...,
1477                          * these TXs will be send to /dev/null by scheduler */
1478                         cfs_list_for_each_entry(tx, &peer->ksnp_tx_queue,
1479                                                 tx_list)
1480                                 ksocknal_tx_prep(conn, tx);
1481
1482                         cfs_spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1483                         cfs_list_splice_init(&peer->ksnp_tx_queue,
1484                                              &conn->ksnc_tx_queue);
1485                         cfs_spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1486                 }
1487
1488                 peer->ksnp_proto = NULL;        /* renegotiate protocol version */
1489                 peer->ksnp_error = error;       /* stash last conn close reason */
1490
1491                 if (cfs_list_empty (&peer->ksnp_routes)) {
1492                         /* I've just closed last conn belonging to a
1493                          * peer with no routes to it */
1494                         ksocknal_unlink_peer_locked (peer);
1495                 }
1496         }
1497
1498         cfs_spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1499
1500         cfs_list_add_tail (&conn->ksnc_list,
1501                            &ksocknal_data.ksnd_deathrow_conns);
1502         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1503
1504         cfs_spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1505 }
1506
1507 void
1508 ksocknal_peer_failed (ksock_peer_t *peer)
1509 {
1510         int        notify = 0;
1511         cfs_time_t last_alive = 0;
1512
1513         /* There has been a connection failure or comms error; but I'll only
1514          * tell LNET I think the peer is dead if it's to another kernel and
1515          * there are no connections or connection attempts in existance. */
1516
1517         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
1518
1519         if ((peer->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1520             cfs_list_empty(&peer->ksnp_conns) &&
1521             peer->ksnp_accepting == 0 &&
1522             ksocknal_find_connecting_route_locked(peer) == NULL) {
1523                 notify = 1;
1524                 last_alive = peer->ksnp_last_alive;
1525         }
1526
1527         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
1528
1529         if (notify)
1530                 lnet_notify (peer->ksnp_ni, peer->ksnp_id.nid, 0,
1531                              last_alive);
1532 }
1533
1534 void
1535 ksocknal_finalize_zcreq(ksock_conn_t *conn)
1536 {
1537         ksock_peer_t     *peer = conn->ksnc_peer;
1538         ksock_tx_t       *tx;
1539         ksock_tx_t       *tmp;
1540         CFS_LIST_HEAD    (zlist);
1541
1542         /* NB safe to finalize TXs because closing of socket will
1543          * abort all buffered data */
1544         LASSERT (conn->ksnc_sock == NULL);
1545
1546         cfs_spin_lock(&peer->ksnp_lock);
1547
1548         cfs_list_for_each_entry_safe_typed(tx, tmp, &peer->ksnp_zc_req_list,
1549                                            ksock_tx_t, tx_zc_list) {
1550                 if (tx->tx_conn != conn)
1551                         continue;
1552
1553                 LASSERT (tx->tx_msg.ksm_zc_cookies[0] != 0);
1554
1555                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1556                 tx->tx_zc_aborted = 1; /* mark it as not-acked */
1557                 cfs_list_del(&tx->tx_zc_list);
1558                 cfs_list_add(&tx->tx_zc_list, &zlist);
1559         }
1560
1561         cfs_spin_unlock(&peer->ksnp_lock);
1562
1563         while (!cfs_list_empty(&zlist)) {
1564                 tx = cfs_list_entry(zlist.next, ksock_tx_t, tx_zc_list);
1565
1566                 cfs_list_del(&tx->tx_zc_list);
1567                 ksocknal_tx_decref(tx);
1568         }
1569 }
1570
1571 void
1572 ksocknal_terminate_conn (ksock_conn_t *conn)
1573 {
1574         /* This gets called by the reaper (guaranteed thread context) to
1575          * disengage the socket from its callbacks and close it.
1576          * ksnc_refcount will eventually hit zero, and then the reaper will
1577          * destroy it. */
1578         ksock_peer_t     *peer = conn->ksnc_peer;
1579         ksock_sched_t    *sched = conn->ksnc_scheduler;
1580         int               failed = 0;
1581
1582         LASSERT(conn->ksnc_closing);
1583
1584         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1585         cfs_spin_lock_bh (&sched->kss_lock);
1586
1587         /* a closing conn is always ready to tx */
1588         conn->ksnc_tx_ready = 1;
1589
1590         if (!conn->ksnc_tx_scheduled &&
1591             !cfs_list_empty(&conn->ksnc_tx_queue)){
1592                 cfs_list_add_tail (&conn->ksnc_tx_list,
1593                                &sched->kss_tx_conns);
1594                 conn->ksnc_tx_scheduled = 1;
1595                 /* extra ref for scheduler */
1596                 ksocknal_conn_addref(conn);
1597
1598                 cfs_waitq_signal (&sched->kss_waitq);
1599         }
1600
1601         cfs_spin_unlock_bh (&sched->kss_lock);
1602
1603         /* serialise with callbacks */
1604         cfs_write_lock_bh (&ksocknal_data.ksnd_global_lock);
1605
1606         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1607
1608         /* OK, so this conn may not be completely disengaged from its
1609          * scheduler yet, but it _has_ committed to terminate... */
1610         conn->ksnc_scheduler->kss_nconns--;
1611
1612         if (peer->ksnp_error != 0) {
1613                 /* peer's last conn closed in error */
1614                 LASSERT (cfs_list_empty (&peer->ksnp_conns));
1615                 failed = 1;
1616                 peer->ksnp_error = 0;     /* avoid multiple notifications */
1617         }
1618
1619         cfs_write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1620
1621         if (failed)
1622                 ksocknal_peer_failed(peer);
1623
1624         /* The socket is closed on the final put; either here, or in
1625          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1626          * when the connection was established, this will close the socket
1627          * immediately, aborting anything buffered in it. Any hung
1628          * zero-copy transmits will therefore complete in finite time. */
1629         ksocknal_connsock_decref(conn);
1630 }
1631
1632 void
1633 ksocknal_queue_zombie_conn (ksock_conn_t *conn)
1634 {
1635         /* Queue the conn for the reaper to destroy */
1636
1637         LASSERT (cfs_atomic_read(&conn->ksnc_conn_refcount) == 0);
1638         cfs_spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1639
1640         cfs_list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1641         cfs_waitq_signal(&ksocknal_data.ksnd_reaper_waitq);
1642
1643         cfs_spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1644 }
1645
1646 void
1647 ksocknal_destroy_conn (ksock_conn_t *conn)
1648 {
1649         cfs_time_t      last_rcv;
1650
1651         /* Final coup-de-grace of the reaper */
1652         CDEBUG (D_NET, "connection %p\n", conn);
1653
1654         LASSERT (cfs_atomic_read (&conn->ksnc_conn_refcount) == 0);
1655         LASSERT (cfs_atomic_read (&conn->ksnc_sock_refcount) == 0);
1656         LASSERT (conn->ksnc_sock == NULL);
1657         LASSERT (conn->ksnc_route == NULL);
1658         LASSERT (!conn->ksnc_tx_scheduled);
1659         LASSERT (!conn->ksnc_rx_scheduled);
1660         LASSERT (cfs_list_empty(&conn->ksnc_tx_queue));
1661
1662         /* complete current receive if any */
1663         switch (conn->ksnc_rx_state) {
1664         case SOCKNAL_RX_LNET_PAYLOAD:
1665                 last_rcv = conn->ksnc_rx_deadline -
1666                            cfs_time_seconds(*ksocknal_tunables.ksnd_timeout);
1667                 CERROR("Completing partial receive from %s[%d]"
1668                        ", ip %d.%d.%d.%d:%d, with error, wanted: %d, left: %d, "
1669                        "last alive is %ld secs ago\n",
1670                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1671                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1672                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1673                        cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1674                                         last_rcv)));
1675                 lnet_finalize (conn->ksnc_peer->ksnp_ni,
1676                                conn->ksnc_cookie, -EIO);
1677                 break;
1678         case SOCKNAL_RX_LNET_HEADER:
1679                 if (conn->ksnc_rx_started)
1680                         CERROR("Incomplete receive of lnet header from %s"
1681                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1682                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1683                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1684                                conn->ksnc_proto->pro_version);
1685                 break;
1686         case SOCKNAL_RX_KSM_HEADER:
1687                 if (conn->ksnc_rx_started)
1688                         CERROR("Incomplete receive of ksock message from %s"
1689                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1690                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1691                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1692                                conn->ksnc_proto->pro_version);
1693                 break;
1694         case SOCKNAL_RX_SLOP:
1695                 if (conn->ksnc_rx_started)
1696                         CERROR("Incomplete receive of slops from %s"
1697                                ", ip %d.%d.%d.%d:%d, with error\n",
1698                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1699                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1700                break;
1701         default:
1702                 LBUG ();
1703                 break;
1704         }
1705
1706         ksocknal_peer_decref(conn->ksnc_peer);
1707
1708         LIBCFS_FREE (conn, sizeof (*conn));
1709 }
1710
1711 int
1712 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1713 {
1714         ksock_conn_t       *conn;
1715         cfs_list_t         *ctmp;
1716         cfs_list_t         *cnxt;
1717         int                 count = 0;
1718
1719         cfs_list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1720                 conn = cfs_list_entry (ctmp, ksock_conn_t, ksnc_list);
1721
1722                 if (ipaddr == 0 ||
1723                     conn->ksnc_ipaddr == ipaddr) {
1724                         count++;
1725                         ksocknal_close_conn_locked (conn, why);
1726                 }
1727         }
1728
1729         return (count);
1730 }
1731
1732 int
1733 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1734 {
1735         ksock_peer_t     *peer = conn->ksnc_peer;
1736         __u32             ipaddr = conn->ksnc_ipaddr;
1737         int               count;
1738
1739         cfs_write_lock_bh (&ksocknal_data.ksnd_global_lock);
1740
1741         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1742
1743         cfs_write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1744
1745         return (count);
1746 }
1747
1748 int
1749 ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr)
1750 {
1751         ksock_peer_t       *peer;
1752         cfs_list_t         *ptmp;
1753         cfs_list_t         *pnxt;
1754         int                 lo;
1755         int                 hi;
1756         int                 i;
1757         int                 count = 0;
1758
1759         cfs_write_lock_bh (&ksocknal_data.ksnd_global_lock);
1760
1761         if (id.nid != LNET_NID_ANY)
1762                 lo = hi = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
1763         else {
1764                 lo = 0;
1765                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1766         }
1767
1768         for (i = lo; i <= hi; i++) {
1769                 cfs_list_for_each_safe (ptmp, pnxt,
1770                                         &ksocknal_data.ksnd_peers[i]) {
1771
1772                         peer = cfs_list_entry (ptmp, ksock_peer_t, ksnp_list);
1773
1774                         if (!((id.nid == LNET_NID_ANY || id.nid == peer->ksnp_id.nid) &&
1775                               (id.pid == LNET_PID_ANY || id.pid == peer->ksnp_id.pid)))
1776                                 continue;
1777
1778                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1779                 }
1780         }
1781
1782         cfs_write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1783
1784         /* wildcards always succeed */
1785         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1786                 return (0);
1787
1788         return (count == 0 ? -ENOENT : 0);
1789 }
1790
1791 void
1792 ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive)
1793 {
1794         /* The router is telling me she's been notified of a change in
1795          * gateway state.... */
1796         lnet_process_id_t  id = {0};
1797
1798         id.nid = gw_nid;
1799         id.pid = LNET_PID_ANY;
1800
1801         CDEBUG (D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid),
1802                 alive ? "up" : "down");
1803
1804         if (!alive) {
1805                 /* If the gateway crashed, close all open connections... */
1806                 ksocknal_close_matching_conns (id, 0);
1807                 return;
1808         }
1809
1810         /* ...otherwise do nothing.  We can only establish new connections
1811          * if we have autroutes, and these connect on demand. */
1812 }
1813
1814 void
1815 ksocknal_query (lnet_ni_t *ni, lnet_nid_t nid, cfs_time_t *when)
1816 {
1817         int                connect = 1;
1818         cfs_time_t         last_alive = 0;
1819         cfs_time_t         now = cfs_time_current();
1820         ksock_peer_t      *peer = NULL;
1821         cfs_rwlock_t      *glock = &ksocknal_data.ksnd_global_lock;
1822         lnet_process_id_t  id = {.nid = nid, .pid = LUSTRE_SRV_LNET_PID};
1823
1824         cfs_read_lock(glock);
1825
1826         peer = ksocknal_find_peer_locked(ni, id);
1827         if (peer != NULL) {
1828                 cfs_list_t       *tmp;
1829                 ksock_conn_t     *conn;
1830                 int               bufnob;
1831
1832                 cfs_list_for_each (tmp, &peer->ksnp_conns) {
1833                         conn = cfs_list_entry(tmp, ksock_conn_t, ksnc_list);
1834                         bufnob = libcfs_sock_wmem_queued(conn->ksnc_sock);
1835
1836                         if (bufnob < conn->ksnc_tx_bufnob) {
1837                                 /* something got ACKed */
1838                                 conn->ksnc_tx_deadline =
1839                                         cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1840                                 peer->ksnp_last_alive = now;
1841                                 conn->ksnc_tx_bufnob = bufnob;
1842                         }
1843                 }
1844
1845                 last_alive = peer->ksnp_last_alive;
1846                 if (ksocknal_find_connectable_route_locked(peer) == NULL)
1847                         connect = 0;
1848         }
1849
1850         cfs_read_unlock(glock);
1851
1852         if (last_alive != 0)
1853                 *when = last_alive;
1854
1855         CDEBUG(D_NET, "Peer %s %p, alive %ld secs ago, connect %d\n",
1856                libcfs_nid2str(nid), peer,
1857                last_alive ? cfs_duration_sec(now - last_alive) : -1,
1858                connect);
1859
1860         if (!connect)
1861                 return;
1862
1863         ksocknal_add_peer(ni, id, LNET_NIDADDR(nid), lnet_acceptor_port());
1864
1865         cfs_write_lock_bh(glock);
1866
1867         peer = ksocknal_find_peer_locked(ni, id);
1868         if (peer != NULL)
1869                 ksocknal_launch_all_connections_locked(peer);
1870
1871         cfs_write_unlock_bh(glock);
1872         return;
1873 }
1874
1875 void
1876 ksocknal_push_peer (ksock_peer_t *peer)
1877 {
1878         int               index;
1879         int               i;
1880         cfs_list_t       *tmp;
1881         ksock_conn_t     *conn;
1882
1883         for (index = 0; ; index++) {
1884                 cfs_read_lock (&ksocknal_data.ksnd_global_lock);
1885
1886                 i = 0;
1887                 conn = NULL;
1888
1889                 cfs_list_for_each (tmp, &peer->ksnp_conns) {
1890                         if (i++ == index) {
1891                                 conn = cfs_list_entry (tmp, ksock_conn_t,
1892                                                        ksnc_list);
1893                                 ksocknal_conn_addref(conn);
1894                                 break;
1895                         }
1896                 }
1897
1898                 cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
1899
1900                 if (conn == NULL)
1901                         break;
1902
1903                 ksocknal_lib_push_conn (conn);
1904                 ksocknal_conn_decref(conn);
1905         }
1906 }
1907
1908 int
1909 ksocknal_push (lnet_ni_t *ni, lnet_process_id_t id)
1910 {
1911         ksock_peer_t      *peer;
1912         cfs_list_t        *tmp;
1913         int                index;
1914         int                i;
1915         int                j;
1916         int                rc = -ENOENT;
1917
1918         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1919                 for (j = 0; ; j++) {
1920                         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
1921
1922                         index = 0;
1923                         peer = NULL;
1924
1925                         cfs_list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1926                                 peer = cfs_list_entry(tmp, ksock_peer_t,
1927                                                       ksnp_list);
1928
1929                                 if (!((id.nid == LNET_NID_ANY ||
1930                                        id.nid == peer->ksnp_id.nid) &&
1931                                       (id.pid == LNET_PID_ANY ||
1932                                        id.pid == peer->ksnp_id.pid))) {
1933                                         peer = NULL;
1934                                         continue;
1935                                 }
1936
1937                                 if (index++ == j) {
1938                                         ksocknal_peer_addref(peer);
1939                                         break;
1940                                 }
1941                         }
1942
1943                         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
1944
1945                         if (peer != NULL) {
1946                                 rc = 0;
1947                                 ksocknal_push_peer (peer);
1948                                 ksocknal_peer_decref(peer);
1949                         }
1950                 }
1951
1952         }
1953
1954         return (rc);
1955 }
1956
1957 int
1958 ksocknal_add_interface(lnet_ni_t *ni, __u32 ipaddress, __u32 netmask)
1959 {
1960         ksock_net_t       *net = ni->ni_data;
1961         ksock_interface_t *iface;
1962         int                rc;
1963         int                i;
1964         int                j;
1965         cfs_list_t        *ptmp;
1966         ksock_peer_t      *peer;
1967         cfs_list_t        *rtmp;
1968         ksock_route_t     *route;
1969
1970         if (ipaddress == 0 ||
1971             netmask == 0)
1972                 return (-EINVAL);
1973
1974         cfs_write_lock_bh (&ksocknal_data.ksnd_global_lock);
1975
1976         iface = ksocknal_ip2iface(ni, ipaddress);
1977         if (iface != NULL) {
1978                 /* silently ignore dups */
1979                 rc = 0;
1980         } else if (net->ksnn_ninterfaces == LNET_MAX_INTERFACES) {
1981                 rc = -ENOSPC;
1982         } else {
1983                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1984
1985                 iface->ksni_ipaddr = ipaddress;
1986                 iface->ksni_netmask = netmask;
1987                 iface->ksni_nroutes = 0;
1988                 iface->ksni_npeers = 0;
1989
1990                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1991                         cfs_list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1992                                 peer = cfs_list_entry(ptmp, ksock_peer_t,
1993                                                       ksnp_list);
1994
1995                                 for (j = 0; j < peer->ksnp_n_passive_ips; j++)
1996                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1997                                                 iface->ksni_npeers++;
1998
1999                                 cfs_list_for_each(rtmp, &peer->ksnp_routes) {
2000                                         route = cfs_list_entry(rtmp,
2001                                                                ksock_route_t,
2002                                                                ksnr_list);
2003
2004                                         if (route->ksnr_myipaddr == ipaddress)
2005                                                 iface->ksni_nroutes++;
2006                                 }
2007                         }
2008                 }
2009
2010                 rc = 0;
2011                 /* NB only new connections will pay attention to the new interface! */
2012         }
2013
2014         cfs_write_unlock_bh (&ksocknal_data.ksnd_global_lock);
2015
2016         return (rc);
2017 }
2018
2019 void
2020 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
2021 {
2022         cfs_list_t         *tmp;
2023         cfs_list_t         *nxt;
2024         ksock_route_t      *route;
2025         ksock_conn_t       *conn;
2026         int                 i;
2027         int                 j;
2028
2029         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
2030                 if (peer->ksnp_passive_ips[i] == ipaddr) {
2031                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
2032                                 peer->ksnp_passive_ips[j-1] =
2033                                         peer->ksnp_passive_ips[j];
2034                         peer->ksnp_n_passive_ips--;
2035                         break;
2036                 }
2037
2038         cfs_list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
2039                 route = cfs_list_entry (tmp, ksock_route_t, ksnr_list);
2040
2041                 if (route->ksnr_myipaddr != ipaddr)
2042                         continue;
2043
2044                 if (route->ksnr_share_count != 0) {
2045                         /* Manually created; keep, but unbind */
2046                         route->ksnr_myipaddr = 0;
2047                 } else {
2048                         ksocknal_del_route_locked(route);
2049                 }
2050         }
2051
2052         cfs_list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
2053                 conn = cfs_list_entry(tmp, ksock_conn_t, ksnc_list);
2054
2055                 if (conn->ksnc_myipaddr == ipaddr)
2056                         ksocknal_close_conn_locked (conn, 0);
2057         }
2058 }
2059
2060 int
2061 ksocknal_del_interface(lnet_ni_t *ni, __u32 ipaddress)
2062 {
2063         ksock_net_t       *net = ni->ni_data;
2064         int                rc = -ENOENT;
2065         cfs_list_t        *tmp;
2066         cfs_list_t        *nxt;
2067         ksock_peer_t      *peer;
2068         __u32              this_ip;
2069         int                i;
2070         int                j;
2071
2072         cfs_write_lock_bh (&ksocknal_data.ksnd_global_lock);
2073
2074         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2075                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
2076
2077                 if (!(ipaddress == 0 ||
2078                       ipaddress == this_ip))
2079                         continue;
2080
2081                 rc = 0;
2082
2083                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
2084                         net->ksnn_interfaces[j-1] =
2085                                 net->ksnn_interfaces[j];
2086
2087                 net->ksnn_ninterfaces--;
2088
2089                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
2090                         cfs_list_for_each_safe(tmp, nxt,
2091                                                &ksocknal_data.ksnd_peers[j]) {
2092                                 peer = cfs_list_entry(tmp, ksock_peer_t,
2093                                                       ksnp_list);
2094
2095                                 if (peer->ksnp_ni != ni)
2096                                         continue;
2097
2098                                 ksocknal_peer_del_interface_locked(peer, this_ip);
2099                         }
2100                 }
2101         }
2102
2103         cfs_write_unlock_bh (&ksocknal_data.ksnd_global_lock);
2104
2105         return (rc);
2106 }
2107
2108 int
2109 ksocknal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
2110 {
2111         lnet_process_id_t id = {0}; 
2112         struct libcfs_ioctl_data *data = arg;
2113         int rc;
2114
2115         switch(cmd) {
2116         case IOC_LIBCFS_GET_INTERFACE: {
2117                 ksock_net_t       *net = ni->ni_data;
2118                 ksock_interface_t *iface;
2119
2120                 cfs_read_lock (&ksocknal_data.ksnd_global_lock);
2121
2122                 if (data->ioc_count >= (__u32)net->ksnn_ninterfaces) {
2123                         rc = -ENOENT;
2124                 } else {
2125                         rc = 0;
2126                         iface = &net->ksnn_interfaces[data->ioc_count];
2127
2128                         data->ioc_u32[0] = iface->ksni_ipaddr;
2129                         data->ioc_u32[1] = iface->ksni_netmask;
2130                         data->ioc_u32[2] = iface->ksni_npeers;
2131                         data->ioc_u32[3] = iface->ksni_nroutes;
2132                 }
2133
2134                 cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
2135                 return rc;
2136         }
2137
2138         case IOC_LIBCFS_ADD_INTERFACE:
2139                 return ksocknal_add_interface(ni,
2140                                               data->ioc_u32[0], /* IP address */
2141                                               data->ioc_u32[1]); /* net mask */
2142
2143         case IOC_LIBCFS_DEL_INTERFACE:
2144                 return ksocknal_del_interface(ni,
2145                                               data->ioc_u32[0]); /* IP address */
2146
2147         case IOC_LIBCFS_GET_PEER: {
2148                 __u32            myip = 0;
2149                 __u32            ip = 0;
2150                 int              port = 0;
2151                 int              conn_count = 0;
2152                 int              share_count = 0;
2153
2154                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2155                                             &id, &myip, &ip, &port,
2156                                             &conn_count,  &share_count);
2157                 if (rc != 0)
2158                         return rc;
2159
2160                 data->ioc_nid    = id.nid;
2161                 data->ioc_count  = share_count;
2162                 data->ioc_u32[0] = ip;
2163                 data->ioc_u32[1] = port;
2164                 data->ioc_u32[2] = myip;
2165                 data->ioc_u32[3] = conn_count;
2166                 data->ioc_u32[4] = id.pid;
2167                 return 0;
2168         }
2169
2170         case IOC_LIBCFS_ADD_PEER:
2171                 id.nid = data->ioc_nid;
2172                 id.pid = LUSTRE_SRV_LNET_PID;
2173                 return ksocknal_add_peer (ni, id,
2174                                           data->ioc_u32[0], /* IP */
2175                                           data->ioc_u32[1]); /* port */
2176
2177         case IOC_LIBCFS_DEL_PEER:
2178                 id.nid = data->ioc_nid;
2179                 id.pid = LNET_PID_ANY;
2180                 return ksocknal_del_peer (ni, id,
2181                                           data->ioc_u32[0]); /* IP */
2182
2183         case IOC_LIBCFS_GET_CONN: {
2184                 int           txmem;
2185                 int           rxmem;
2186                 int           nagle;
2187                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (ni, data->ioc_count);
2188
2189                 if (conn == NULL)
2190                         return -ENOENT;
2191
2192                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2193
2194                 data->ioc_count  = txmem;
2195                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2196                 data->ioc_flags  = nagle;
2197                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2198                 data->ioc_u32[1] = conn->ksnc_port;
2199                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2200                 data->ioc_u32[3] = conn->ksnc_type;
2201                 data->ioc_u32[4] = (__u32)(conn->ksnc_scheduler -
2202                                    ksocknal_data.ksnd_schedulers);
2203                 data->ioc_u32[5] = rxmem;
2204                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2205                 ksocknal_conn_decref(conn);
2206                 return 0;
2207         }
2208
2209         case IOC_LIBCFS_CLOSE_CONNECTION:
2210                 id.nid = data->ioc_nid;
2211                 id.pid = LNET_PID_ANY;
2212                 return ksocknal_close_matching_conns (id,
2213                                                       data->ioc_u32[0]);
2214
2215         case IOC_LIBCFS_REGISTER_MYNID:
2216                 /* Ignore if this is a noop */
2217                 if (data->ioc_nid == ni->ni_nid)
2218                         return 0;
2219
2220                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2221                        libcfs_nid2str(data->ioc_nid),
2222                        libcfs_nid2str(ni->ni_nid));
2223                 return -EINVAL;
2224
2225         case IOC_LIBCFS_PUSH_CONNECTION:
2226                 id.nid = data->ioc_nid;
2227                 id.pid = LNET_PID_ANY;
2228                 return ksocknal_push(ni, id);
2229
2230         default:
2231                 return -EINVAL;
2232         }
2233         /* not reached */
2234 }
2235
2236 void
2237 ksocknal_free_buffers (void)
2238 {
2239         LASSERT (cfs_atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2240
2241         if (ksocknal_data.ksnd_schedulers != NULL)
2242                 LIBCFS_FREE (ksocknal_data.ksnd_schedulers,
2243                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2244
2245         LIBCFS_FREE (ksocknal_data.ksnd_peers,
2246                      sizeof (cfs_list_t) *
2247                      ksocknal_data.ksnd_peer_hash_size);
2248
2249         cfs_spin_lock(&ksocknal_data.ksnd_tx_lock);
2250
2251         if (!cfs_list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2252                 cfs_list_t        zlist;
2253                 ksock_tx_t       *tx;
2254
2255                 cfs_list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2256                 cfs_list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2257                 cfs_spin_unlock(&ksocknal_data.ksnd_tx_lock);
2258
2259                 while(!cfs_list_empty(&zlist)) {
2260                         tx = cfs_list_entry(zlist.next, ksock_tx_t, tx_list);
2261                         cfs_list_del(&tx->tx_list);
2262                         LIBCFS_FREE(tx, tx->tx_desc_size);
2263                 }
2264         } else {
2265                 cfs_spin_unlock(&ksocknal_data.ksnd_tx_lock);
2266         }
2267 }
2268
2269 void
2270 ksocknal_base_shutdown (void)
2271 {
2272         ksock_sched_t *sched;
2273         int            i;
2274
2275         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2276                cfs_atomic_read (&libcfs_kmemory));
2277         LASSERT (ksocknal_data.ksnd_nnets == 0);
2278
2279         switch (ksocknal_data.ksnd_init) {
2280         default:
2281                 LASSERT (0);
2282
2283         case SOCKNAL_INIT_ALL:
2284         case SOCKNAL_INIT_DATA:
2285                 LASSERT (ksocknal_data.ksnd_peers != NULL);
2286                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2287                         LASSERT (cfs_list_empty (&ksocknal_data.ksnd_peers[i]));
2288                 }
2289                 LASSERT (cfs_list_empty (&ksocknal_data.ksnd_enomem_conns));
2290                 LASSERT (cfs_list_empty (&ksocknal_data.ksnd_zombie_conns));
2291                 LASSERT (cfs_list_empty (&ksocknal_data.ksnd_connd_connreqs));
2292                 LASSERT (cfs_list_empty (&ksocknal_data.ksnd_connd_routes));
2293
2294                 if (ksocknal_data.ksnd_schedulers != NULL)
2295                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2296                                 ksock_sched_t *kss =
2297                                         &ksocknal_data.ksnd_schedulers[i];
2298
2299                                 LASSERT (cfs_list_empty (&kss->kss_tx_conns));
2300                                 LASSERT (cfs_list_empty (&kss->kss_rx_conns));
2301                                 LASSERT (cfs_list_empty (&kss-> \
2302                                                          kss_zombie_noop_txs));
2303                                 LASSERT (kss->kss_nconns == 0);
2304                         }
2305
2306                 /* flag threads to terminate; wake and wait for them to die */
2307                 ksocknal_data.ksnd_shuttingdown = 1;
2308                 cfs_waitq_broadcast (&ksocknal_data.ksnd_connd_waitq);
2309                 cfs_waitq_broadcast (&ksocknal_data.ksnd_reaper_waitq);
2310
2311                 if (ksocknal_data.ksnd_schedulers != NULL)
2312                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2313                                 sched = &ksocknal_data.ksnd_schedulers[i];
2314                                 cfs_waitq_broadcast(&sched->kss_waitq);
2315                         }
2316
2317                 i = 4;
2318                 cfs_read_lock (&ksocknal_data.ksnd_global_lock);
2319                 while (ksocknal_data.ksnd_nthreads != 0) {
2320                         i++;
2321                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2322                                "waiting for %d threads to terminate\n",
2323                                 ksocknal_data.ksnd_nthreads);
2324                         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
2325                         cfs_pause(cfs_time_seconds(1));
2326                         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
2327                 }
2328                 cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
2329
2330                 ksocknal_free_buffers();
2331
2332                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2333                 break;
2334         }
2335
2336         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2337                cfs_atomic_read (&libcfs_kmemory));
2338
2339         PORTAL_MODULE_UNUSE;
2340 }
2341
2342 __u64
2343 ksocknal_new_incarnation (void)
2344 {
2345         struct timeval tv;
2346
2347         /* The incarnation number is the time this module loaded and it
2348          * identifies this particular instance of the socknal.  Hopefully
2349          * we won't be able to reboot more frequently than 1MHz for the
2350          * forseeable future :) */
2351
2352         cfs_gettimeofday(&tv);
2353
2354         return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2355 }
2356
2357 int
2358 ksocknal_base_startup (void)
2359 {
2360         int               rc;
2361         int               i;
2362
2363         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2364         LASSERT (ksocknal_data.ksnd_nnets == 0);
2365
2366         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2367
2368         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2369         LIBCFS_ALLOC (ksocknal_data.ksnd_peers,
2370                       sizeof (cfs_list_t) *
2371                       ksocknal_data.ksnd_peer_hash_size);
2372         if (ksocknal_data.ksnd_peers == NULL)
2373                 return -ENOMEM;
2374
2375         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2376                 CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2377
2378         cfs_rwlock_init(&ksocknal_data.ksnd_global_lock);
2379
2380         cfs_spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2381         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2382         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2383         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2384         cfs_waitq_init(&ksocknal_data.ksnd_reaper_waitq);
2385
2386         cfs_spin_lock_init (&ksocknal_data.ksnd_connd_lock);
2387         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_connreqs);
2388         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_routes);
2389         cfs_waitq_init(&ksocknal_data.ksnd_connd_waitq);
2390
2391         cfs_spin_lock_init (&ksocknal_data.ksnd_tx_lock);
2392         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_idle_noop_txs);
2393
2394         /* NB memset above zeros whole of ksocknal_data, including
2395          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2396
2397         /* flag lists/ptrs/locks initialised */
2398         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2399         PORTAL_MODULE_USE;
2400
2401         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2402         LIBCFS_ALLOC(ksocknal_data.ksnd_schedulers,
2403                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2404         if (ksocknal_data.ksnd_schedulers == NULL)
2405                 goto failed;
2406
2407         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2408                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2409
2410                 cfs_spin_lock_init (&kss->kss_lock);
2411                 CFS_INIT_LIST_HEAD (&kss->kss_rx_conns);
2412                 CFS_INIT_LIST_HEAD (&kss->kss_tx_conns);
2413                 CFS_INIT_LIST_HEAD (&kss->kss_zombie_noop_txs);
2414                 cfs_waitq_init (&kss->kss_waitq);
2415         }
2416
2417         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2418                 rc = ksocknal_thread_start (ksocknal_scheduler,
2419                                             &ksocknal_data.ksnd_schedulers[i]);
2420                 if (rc != 0) {
2421                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2422                                i, rc);
2423                         goto failed;
2424                 }
2425         }
2426
2427         ksocknal_data.ksnd_connd_starting         = 0;
2428         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2429         ksocknal_data.ksnd_connd_starting_stamp   = cfs_time_current_sec();
2430         /* must have at least 2 connds to remain responsive to accepts while
2431          * connecting */
2432         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2433                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2434
2435         if (*ksocknal_tunables.ksnd_nconnds_max <
2436             *ksocknal_tunables.ksnd_nconnds) {
2437                 ksocknal_tunables.ksnd_nconnds_max =
2438                         ksocknal_tunables.ksnd_nconnds;
2439         }
2440
2441         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2442                 cfs_spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2443                 ksocknal_data.ksnd_connd_starting++;
2444                 cfs_spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2445
2446                 rc = ksocknal_thread_start (ksocknal_connd,
2447                                             (void *)((ulong_ptr_t)i));
2448                 if (rc != 0) {
2449                         cfs_spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2450                         ksocknal_data.ksnd_connd_starting--;
2451                         cfs_spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2452                         CERROR("Can't spawn socknal connd: %d\n", rc);
2453                         goto failed;
2454                 }
2455         }
2456
2457         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2458         if (rc != 0) {
2459                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2460                 goto failed;
2461         }
2462
2463         /* flag everything initialised */
2464         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2465
2466         return 0;
2467
2468  failed:
2469         ksocknal_base_shutdown();
2470         return -ENETDOWN;
2471 }
2472
2473 void
2474 ksocknal_debug_peerhash (lnet_ni_t *ni)
2475 {
2476         ksock_peer_t     *peer = NULL;
2477         cfs_list_t       *tmp;
2478         int               i;
2479
2480         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
2481
2482         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2483                 cfs_list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
2484                         peer = cfs_list_entry (tmp, ksock_peer_t, ksnp_list);
2485
2486                         if (peer->ksnp_ni == ni) break;
2487
2488                         peer = NULL;
2489                 }
2490         }
2491
2492         if (peer != NULL) {
2493                 ksock_route_t *route;
2494                 ksock_conn_t  *conn;
2495
2496                 CWARN ("Active peer on shutdown: %s, ref %d, scnt %d, "
2497                        "closing %d, accepting %d, err %d, zcookie "LPU64", "
2498                        "txq %d, zc_req %d\n", libcfs_id2str(peer->ksnp_id),
2499                        cfs_atomic_read(&peer->ksnp_refcount),
2500                        peer->ksnp_sharecount, peer->ksnp_closing,
2501                        peer->ksnp_accepting, peer->ksnp_error,
2502                        peer->ksnp_zc_next_cookie,
2503                        !cfs_list_empty(&peer->ksnp_tx_queue),
2504                        !cfs_list_empty(&peer->ksnp_zc_req_list));
2505
2506                 cfs_list_for_each (tmp, &peer->ksnp_routes) {
2507                         route = cfs_list_entry(tmp, ksock_route_t, ksnr_list);
2508                         CWARN ("Route: ref %d, schd %d, conn %d, cnted %d, "
2509                                "del %d\n", cfs_atomic_read(&route->ksnr_refcount),
2510                                route->ksnr_scheduled, route->ksnr_connecting,
2511                                route->ksnr_connected, route->ksnr_deleted);
2512                 }
2513
2514                 cfs_list_for_each (tmp, &peer->ksnp_conns) {
2515                         conn = cfs_list_entry(tmp, ksock_conn_t, ksnc_list);
2516                         CWARN ("Conn: ref %d, sref %d, t %d, c %d\n",
2517                                cfs_atomic_read(&conn->ksnc_conn_refcount),
2518                                cfs_atomic_read(&conn->ksnc_sock_refcount),
2519                                conn->ksnc_type, conn->ksnc_closing);
2520                 }
2521         }
2522
2523         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
2524         return;
2525 }
2526
2527 void
2528 ksocknal_shutdown (lnet_ni_t *ni)
2529 {
2530         ksock_net_t      *net = ni->ni_data;
2531         int               i;
2532         lnet_process_id_t anyid = {0};
2533
2534         anyid.nid =  LNET_NID_ANY;
2535         anyid.pid =  LNET_PID_ANY;
2536
2537         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2538         LASSERT(ksocknal_data.ksnd_nnets > 0);
2539
2540         cfs_spin_lock_bh (&net->ksnn_lock);
2541         net->ksnn_shutdown = 1;                 /* prevent new peers */
2542         cfs_spin_unlock_bh (&net->ksnn_lock);
2543
2544         /* Delete all peers */
2545         ksocknal_del_peer(ni, anyid, 0);
2546
2547         /* Wait for all peer state to clean up */
2548         i = 2;
2549         cfs_spin_lock_bh (&net->ksnn_lock);
2550         while (net->ksnn_npeers != 0) {
2551                 cfs_spin_unlock_bh (&net->ksnn_lock);
2552
2553                 i++;
2554                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2555                        "waiting for %d peers to disconnect\n",
2556                        net->ksnn_npeers);
2557                 cfs_pause(cfs_time_seconds(1));
2558
2559                 ksocknal_debug_peerhash(ni);
2560
2561                 cfs_spin_lock_bh (&net->ksnn_lock);
2562         }
2563         cfs_spin_unlock_bh (&net->ksnn_lock);
2564
2565         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2566                 LASSERT (net->ksnn_interfaces[i].ksni_npeers == 0);
2567                 LASSERT (net->ksnn_interfaces[i].ksni_nroutes == 0);
2568         }
2569
2570         LIBCFS_FREE(net, sizeof(*net));
2571
2572         ksocknal_data.ksnd_nnets--;
2573         if (ksocknal_data.ksnd_nnets == 0)
2574                 ksocknal_base_shutdown();
2575 }
2576
2577 int
2578 ksocknal_enumerate_interfaces(ksock_net_t *net)
2579 {
2580         char      **names;
2581         int         i;
2582         int         j;
2583         int         rc;
2584         int         n;
2585
2586         n = libcfs_ipif_enumerate(&names);
2587         if (n <= 0) {
2588                 CERROR("Can't enumerate interfaces: %d\n", n);
2589                 return n;
2590         }
2591
2592         for (i = j = 0; i < n; i++) {
2593                 int        up;
2594                 __u32      ip;
2595                 __u32      mask;
2596
2597                 if (!strcmp(names[i], "lo")) /* skip the loopback IF */
2598                         continue;
2599
2600                 rc = libcfs_ipif_query(names[i], &up, &ip, &mask);
2601                 if (rc != 0) {
2602                         CWARN("Can't get interface %s info: %d\n",
2603                               names[i], rc);
2604                         continue;
2605                 }
2606
2607                 if (!up) {
2608                         CWARN("Ignoring interface %s (down)\n",
2609                               names[i]);
2610                         continue;
2611                 }
2612
2613                 if (j == LNET_MAX_INTERFACES) {
2614                         CWARN("Ignoring interface %s (too many interfaces)\n",
2615                               names[i]);
2616                         continue;
2617                 }
2618
2619                 net->ksnn_interfaces[j].ksni_ipaddr = ip;
2620                 net->ksnn_interfaces[j].ksni_netmask = mask;
2621                 j++;
2622         }
2623
2624         libcfs_ipif_free_enumeration(names, n);
2625
2626         if (j == 0)
2627                 CERROR("Can't find any usable interfaces\n");
2628
2629         return j;
2630 }
2631
2632 int
2633 ksocknal_startup (lnet_ni_t *ni)
2634 {
2635         ksock_net_t  *net;
2636         int           rc;
2637         int           i;
2638
2639         LASSERT (ni->ni_lnd == &the_ksocklnd);
2640
2641         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2642                 rc = ksocknal_base_startup();
2643                 if (rc != 0)
2644                         return rc;
2645         }
2646
2647         LIBCFS_ALLOC(net, sizeof(*net));
2648         if (net == NULL)
2649                 goto fail_0;
2650
2651         memset(net, 0, sizeof(*net));
2652         cfs_spin_lock_init(&net->ksnn_lock);
2653         net->ksnn_incarnation = ksocknal_new_incarnation();
2654         ni->ni_data = net;
2655         ni->ni_peertimeout    = *ksocknal_tunables.ksnd_peertimeout;
2656         ni->ni_maxtxcredits   = *ksocknal_tunables.ksnd_credits;
2657         ni->ni_peertxcredits  = *ksocknal_tunables.ksnd_peertxcredits;
2658         ni->ni_peerrtrcredits = *ksocknal_tunables.ksnd_peerrtrcredits;
2659
2660         if (ni->ni_interfaces[0] == NULL) {
2661                 rc = ksocknal_enumerate_interfaces(net);
2662                 if (rc <= 0)
2663                         goto fail_1;
2664
2665                 net->ksnn_ninterfaces = 1;
2666         } else {
2667                 for (i = 0; i < LNET_MAX_INTERFACES; i++) {
2668                         int    up;
2669
2670                         if (ni->ni_interfaces[i] == NULL)
2671                                 break;
2672
2673                         rc = libcfs_ipif_query(
2674                                 ni->ni_interfaces[i], &up,
2675                                 &net->ksnn_interfaces[i].ksni_ipaddr,
2676                                 &net->ksnn_interfaces[i].ksni_netmask);
2677
2678                         if (rc != 0) {
2679                                 CERROR("Can't get interface %s info: %d\n",
2680                                        ni->ni_interfaces[i], rc);
2681                                 goto fail_1;
2682                         }
2683
2684                         if (!up) {
2685                                 CERROR("Interface %s is down\n",
2686                                        ni->ni_interfaces[i]);
2687                                 goto fail_1;
2688                         }
2689                 }
2690                 net->ksnn_ninterfaces = i;
2691         }
2692
2693         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
2694                                 net->ksnn_interfaces[0].ksni_ipaddr);
2695
2696         ksocknal_data.ksnd_nnets++;
2697
2698         return 0;
2699
2700  fail_1:
2701         LIBCFS_FREE(net, sizeof(*net));
2702  fail_0:
2703         if (ksocknal_data.ksnd_nnets == 0)
2704                 ksocknal_base_shutdown();
2705
2706         return -ENETDOWN;
2707 }
2708
2709
2710 void __exit
2711 ksocknal_module_fini (void)
2712 {
2713         lnet_unregister_lnd(&the_ksocklnd);
2714         ksocknal_tunables_fini();
2715 }
2716
2717 int __init
2718 ksocknal_module_init (void)
2719 {
2720         int    rc;
2721
2722         /* check ksnr_connected/connecting field large enough */
2723         CLASSERT (SOCKLND_CONN_NTYPES <= 4);
2724         CLASSERT (SOCKLND_CONN_ACK == SOCKLND_CONN_BULK_IN);
2725
2726         /* initialize the_ksocklnd */
2727         the_ksocklnd.lnd_type     = SOCKLND;
2728         the_ksocklnd.lnd_startup  = ksocknal_startup;
2729         the_ksocklnd.lnd_shutdown = ksocknal_shutdown;
2730         the_ksocklnd.lnd_ctl      = ksocknal_ctl;
2731         the_ksocklnd.lnd_send     = ksocknal_send;
2732         the_ksocklnd.lnd_recv     = ksocknal_recv;
2733         the_ksocklnd.lnd_notify   = ksocknal_notify;
2734         the_ksocklnd.lnd_query    = ksocknal_query;
2735         the_ksocklnd.lnd_accept   = ksocknal_accept;
2736
2737         rc = ksocknal_tunables_init();
2738         if (rc != 0)
2739                 return rc;
2740
2741         lnet_register_lnd(&the_ksocklnd);
2742
2743         return 0;
2744 }
2745
2746 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2747 MODULE_DESCRIPTION("Kernel TCP Socket LND v3.0.0");
2748 MODULE_LICENSE("GPL");
2749
2750 cfs_module(ksocknal, "3.0.0", ksocknal_module_init, ksocknal_module_fini);