Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/socklnd/socklnd.c
37  *
38  * Author: Zach Brown <zab@zabbo.net>
39  * Author: Peter J. Braam <braam@clusterfs.com>
40  * Author: Phil Schwan <phil@clusterfs.com>
41  * Author: Eric Barton <eric@bartonsoftware.com>
42  */
43
44 #include "socklnd.h"
45
46 lnd_t the_ksocklnd = {
47         .lnd_type       = SOCKLND,
48         .lnd_startup    = ksocknal_startup,
49         .lnd_shutdown   = ksocknal_shutdown,
50         .lnd_ctl        = ksocknal_ctl,
51         .lnd_send       = ksocknal_send,
52         .lnd_recv       = ksocknal_recv,
53         .lnd_notify     = ksocknal_notify,
54         .lnd_accept     = ksocknal_accept,
55 };
56
57 ksock_nal_data_t        ksocknal_data;
58
59 ksock_interface_t *
60 ksocknal_ip2iface(lnet_ni_t *ni, __u32 ip)
61 {
62         ksock_net_t       *net = ni->ni_data;
63         int                i;
64         ksock_interface_t *iface;
65
66         for (i = 0; i < net->ksnn_ninterfaces; i++) {
67                 LASSERT(i < LNET_MAX_INTERFACES);
68                 iface = &net->ksnn_interfaces[i];
69
70                 if (iface->ksni_ipaddr == ip)
71                         return (iface);
72         }
73
74         return (NULL);
75 }
76
77 ksock_route_t *
78 ksocknal_create_route (__u32 ipaddr, int port)
79 {
80         ksock_route_t *route;
81
82         LIBCFS_ALLOC (route, sizeof (*route));
83         if (route == NULL)
84                 return (NULL);
85
86         atomic_set (&route->ksnr_refcount, 1);
87         route->ksnr_peer = NULL;
88         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
89         route->ksnr_ipaddr = ipaddr;
90         route->ksnr_port = port;
91         route->ksnr_scheduled = 0;
92         route->ksnr_connecting = 0;
93         route->ksnr_connected = 0;
94         route->ksnr_deleted = 0;
95         route->ksnr_conn_count = 0;
96         route->ksnr_share_count = 0;
97
98         return (route);
99 }
100
101 void
102 ksocknal_destroy_route (ksock_route_t *route)
103 {
104         LASSERT (atomic_read(&route->ksnr_refcount) == 0);
105
106         if (route->ksnr_peer != NULL)
107                 ksocknal_peer_decref(route->ksnr_peer);
108
109         LIBCFS_FREE (route, sizeof (*route));
110 }
111
112 int
113 ksocknal_create_peer (ksock_peer_t **peerp, lnet_ni_t *ni, lnet_process_id_t id)
114 {
115         ksock_net_t   *net = ni->ni_data;
116         ksock_peer_t  *peer;
117
118         LASSERT (id.nid != LNET_NID_ANY);
119         LASSERT (id.pid != LNET_PID_ANY);
120         LASSERT (!in_interrupt());
121
122         LIBCFS_ALLOC (peer, sizeof (*peer));
123         if (peer == NULL)
124                 return -ENOMEM;
125
126         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
127
128         peer->ksnp_ni = ni;
129         peer->ksnp_id = id;
130         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
131         peer->ksnp_closing = 0;
132         peer->ksnp_accepting = 0;
133         peer->ksnp_zc_next_cookie = 1;
134         peer->ksnp_proto = NULL;
135         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
136         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
137         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
138         CFS_INIT_LIST_HEAD (&peer->ksnp_zc_req_list);
139         spin_lock_init(&peer->ksnp_lock);
140
141         spin_lock_bh (&net->ksnn_lock);
142
143         if (net->ksnn_shutdown) {
144                 spin_unlock_bh (&net->ksnn_lock);
145
146                 LIBCFS_FREE(peer, sizeof(*peer));
147                 CERROR("Can't create peer: network shutdown\n");
148                 return -ESHUTDOWN;
149         }
150
151         net->ksnn_npeers++;
152
153         spin_unlock_bh (&net->ksnn_lock);
154
155         *peerp = peer;
156         return 0;
157 }
158
159 void
160 ksocknal_destroy_peer (ksock_peer_t *peer)
161 {
162         ksock_net_t    *net = peer->ksnp_ni->ni_data;
163
164         CDEBUG (D_NET, "peer %s %p deleted\n",
165                 libcfs_id2str(peer->ksnp_id), peer);
166
167         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
168         LASSERT (peer->ksnp_accepting == 0);
169         LASSERT (list_empty (&peer->ksnp_conns));
170         LASSERT (list_empty (&peer->ksnp_routes));
171         LASSERT (list_empty (&peer->ksnp_tx_queue));
172         LASSERT (list_empty (&peer->ksnp_zc_req_list));
173
174         LIBCFS_FREE (peer, sizeof (*peer));
175
176         /* NB a peer's connections and routes keep a reference on their peer
177          * until they are destroyed, so we can be assured that _all_ state to
178          * do with this peer has been cleaned up when its refcount drops to
179          * zero. */
180         spin_lock_bh (&net->ksnn_lock);
181         net->ksnn_npeers--;
182         spin_unlock_bh (&net->ksnn_lock);
183 }
184
185 ksock_peer_t *
186 ksocknal_find_peer_locked (lnet_ni_t *ni, lnet_process_id_t id)
187 {
188         struct list_head *peer_list = ksocknal_nid2peerlist(id.nid);
189         struct list_head *tmp;
190         ksock_peer_t     *peer;
191
192         list_for_each (tmp, peer_list) {
193
194                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
195
196                 LASSERT (!peer->ksnp_closing);
197
198                 if (peer->ksnp_ni != ni)
199                         continue;
200
201                 if (peer->ksnp_id.nid != id.nid ||
202                     peer->ksnp_id.pid != id.pid)
203                         continue;
204
205                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
206                        peer, libcfs_id2str(id),
207                        atomic_read(&peer->ksnp_refcount));
208                 return (peer);
209         }
210         return (NULL);
211 }
212
213 ksock_peer_t *
214 ksocknal_find_peer (lnet_ni_t *ni, lnet_process_id_t id)
215 {
216         ksock_peer_t     *peer;
217
218         read_lock (&ksocknal_data.ksnd_global_lock);
219         peer = ksocknal_find_peer_locked (ni, id);
220         if (peer != NULL)                       /* +1 ref for caller? */
221                 ksocknal_peer_addref(peer);
222         read_unlock (&ksocknal_data.ksnd_global_lock);
223
224         return (peer);
225 }
226
227 void
228 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
229 {
230         int                i;
231         __u32              ip;
232         ksock_interface_t *iface;
233
234         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
235                 LASSERT (i < LNET_MAX_INTERFACES);
236                 ip = peer->ksnp_passive_ips[i];
237
238                 iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
239                 /* All IPs in peer->ksnp_passive_ips[] come from the
240                  * interface list, therefore the call must succeed. */
241                 LASSERT (iface != NULL);
242
243                 CDEBUG(D_NET, "peer=%p iface=%p ksni_nroutes=%d\n",
244                        peer, iface, iface->ksni_nroutes);
245                 iface->ksni_npeers--;
246         }
247
248         LASSERT (list_empty(&peer->ksnp_conns));
249         LASSERT (list_empty(&peer->ksnp_routes));
250         LASSERT (!peer->ksnp_closing);
251         peer->ksnp_closing = 1;
252         list_del (&peer->ksnp_list);
253         /* lose peerlist's ref */
254         ksocknal_peer_decref(peer);
255 }
256
257 int
258 ksocknal_get_peer_info (lnet_ni_t *ni, int index,
259                         lnet_process_id_t *id, __u32 *myip, __u32 *peer_ip, int *port,
260                         int *conn_count, int *share_count)
261 {
262         ksock_peer_t      *peer;
263         struct list_head  *ptmp;
264         ksock_route_t     *route;
265         struct list_head  *rtmp;
266         int                i;
267         int                j;
268         int                rc = -ENOENT;
269
270         read_lock (&ksocknal_data.ksnd_global_lock);
271
272         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
273
274                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
275                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
276
277                         if (peer->ksnp_ni != ni)
278                                 continue;
279
280                         if (peer->ksnp_n_passive_ips == 0 &&
281                             list_empty(&peer->ksnp_routes)) {
282                                 if (index-- > 0)
283                                         continue;
284
285                                 *id = peer->ksnp_id;
286                                 *myip = 0;
287                                 *peer_ip = 0;
288                                 *port = 0;
289                                 *conn_count = 0;
290                                 *share_count = 0;
291                                 rc = 0;
292                                 goto out;
293                         }
294
295                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
296                                 if (index-- > 0)
297                                         continue;
298
299                                 *id = peer->ksnp_id;
300                                 *myip = peer->ksnp_passive_ips[j];
301                                 *peer_ip = 0;
302                                 *port = 0;
303                                 *conn_count = 0;
304                                 *share_count = 0;
305                                 rc = 0;
306                                 goto out;
307                         }
308
309                         list_for_each (rtmp, &peer->ksnp_routes) {
310                                 if (index-- > 0)
311                                         continue;
312
313                                 route = list_entry(rtmp, ksock_route_t,
314                                                    ksnr_list);
315
316                                 *id = peer->ksnp_id;
317                                 *myip = route->ksnr_myipaddr;
318                                 *peer_ip = route->ksnr_ipaddr;
319                                 *port = route->ksnr_port;
320                                 *conn_count = route->ksnr_conn_count;
321                                 *share_count = route->ksnr_share_count;
322                                 rc = 0;
323                                 goto out;
324                         }
325                 }
326         }
327  out:
328         read_unlock (&ksocknal_data.ksnd_global_lock);
329         return (rc);
330 }
331
332 void
333 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
334 {
335         ksock_peer_t      *peer = route->ksnr_peer;
336         int                type = conn->ksnc_type;
337         ksock_interface_t *iface;
338
339         conn->ksnc_route = route;
340         ksocknal_route_addref(route);
341
342         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
343                 if (route->ksnr_myipaddr == 0) {
344                         /* route wasn't bound locally yet (the initial route) */
345                         CDEBUG(D_NET, "Binding %s %u.%u.%u.%u to %u.%u.%u.%u\n",
346                                libcfs_id2str(peer->ksnp_id),
347                                HIPQUAD(route->ksnr_ipaddr),
348                                HIPQUAD(conn->ksnc_myipaddr));
349                 } else {
350                         CDEBUG(D_NET, "Rebinding %s %u.%u.%u.%u from "
351                                "%u.%u.%u.%u to %u.%u.%u.%u\n",
352                                libcfs_id2str(peer->ksnp_id),
353                                HIPQUAD(route->ksnr_ipaddr),
354                                HIPQUAD(route->ksnr_myipaddr),
355                                HIPQUAD(conn->ksnc_myipaddr));
356
357                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
358                                                   route->ksnr_myipaddr);
359                         if (iface != NULL)
360                                 iface->ksni_nroutes--;
361                 }
362                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
363                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
364                                           route->ksnr_myipaddr);
365                 if (iface != NULL)
366                         iface->ksni_nroutes++;
367         }
368
369         route->ksnr_connected |= (1<<type);
370         route->ksnr_conn_count++;
371
372         /* Successful connection => further attempts can
373          * proceed immediately */
374         route->ksnr_retry_interval = 0;
375 }
376
377 void
378 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
379 {
380         struct list_head  *tmp;
381         ksock_conn_t      *conn;
382         ksock_route_t     *route2;
383
384         LASSERT (!peer->ksnp_closing);
385         LASSERT (route->ksnr_peer == NULL);
386         LASSERT (!route->ksnr_scheduled);
387         LASSERT (!route->ksnr_connecting);
388         LASSERT (route->ksnr_connected == 0);
389
390         /* LASSERT(unique) */
391         list_for_each(tmp, &peer->ksnp_routes) {
392                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
393
394                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
395                         CERROR ("Duplicate route %s %u.%u.%u.%u\n",
396                                 libcfs_id2str(peer->ksnp_id),
397                                 HIPQUAD(route->ksnr_ipaddr));
398                         LBUG();
399                 }
400         }
401
402         route->ksnr_peer = peer;
403         ksocknal_peer_addref(peer);
404         /* peer's routelist takes over my ref on 'route' */
405         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
406
407         list_for_each(tmp, &peer->ksnp_conns) {
408                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
409
410                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
411                         continue;
412
413                 ksocknal_associate_route_conn_locked(route, conn);
414                 /* keep going (typed routes) */
415         }
416 }
417
418 void
419 ksocknal_del_route_locked (ksock_route_t *route)
420 {
421         ksock_peer_t      *peer = route->ksnr_peer;
422         ksock_interface_t *iface;
423         ksock_conn_t      *conn;
424         struct list_head  *ctmp;
425         struct list_head  *cnxt;
426
427         LASSERT (!route->ksnr_deleted);
428
429         /* Close associated conns */
430         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
431                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
432
433                 if (conn->ksnc_route != route)
434                         continue;
435
436                 ksocknal_close_conn_locked (conn, 0);
437         }
438
439         if (route->ksnr_myipaddr != 0) {
440                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
441                                           route->ksnr_myipaddr);
442                 if (iface != NULL)
443                         iface->ksni_nroutes--;
444         }
445
446         route->ksnr_deleted = 1;
447         list_del (&route->ksnr_list);
448         ksocknal_route_decref(route);             /* drop peer's ref */
449
450         if (list_empty (&peer->ksnp_routes) &&
451             list_empty (&peer->ksnp_conns)) {
452                 /* I've just removed the last route to a peer with no active
453                  * connections */
454                 ksocknal_unlink_peer_locked (peer);
455         }
456 }
457
458 int
459 ksocknal_add_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ipaddr, int port)
460 {
461         struct list_head  *tmp;
462         ksock_peer_t      *peer;
463         ksock_peer_t      *peer2;
464         ksock_route_t     *route;
465         ksock_route_t     *route2;
466         int                rc;
467
468         if (id.nid == LNET_NID_ANY ||
469             id.pid == LNET_PID_ANY)
470                 return (-EINVAL);
471
472         /* Have a brand new peer ready... */
473         rc = ksocknal_create_peer(&peer, ni, id);
474         if (rc != 0)
475                 return rc;
476
477         route = ksocknal_create_route (ipaddr, port);
478         if (route == NULL) {
479                 ksocknal_peer_decref(peer);
480                 return (-ENOMEM);
481         }
482
483         write_lock_bh (&ksocknal_data.ksnd_global_lock);
484
485         /* always called with a ref on ni, so shutdown can't have started */
486         LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
487
488         peer2 = ksocknal_find_peer_locked (ni, id);
489         if (peer2 != NULL) {
490                 ksocknal_peer_decref(peer);
491                 peer = peer2;
492         } else {
493                 /* peer table takes my ref on peer */
494                 list_add_tail (&peer->ksnp_list,
495                                ksocknal_nid2peerlist (id.nid));
496         }
497
498         route2 = NULL;
499         list_for_each (tmp, &peer->ksnp_routes) {
500                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
501
502                 if (route2->ksnr_ipaddr == ipaddr)
503                         break;
504
505                 route2 = NULL;
506         }
507         if (route2 == NULL) {
508                 ksocknal_add_route_locked(peer, route);
509                 route->ksnr_share_count++;
510         } else {
511                 ksocknal_route_decref(route);
512                 route2->ksnr_share_count++;
513         }
514
515         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
516
517         return (0);
518 }
519
520 void
521 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip)
522 {
523         ksock_conn_t     *conn;
524         ksock_route_t    *route;
525         struct list_head *tmp;
526         struct list_head *nxt;
527         int               nshared;
528
529         LASSERT (!peer->ksnp_closing);
530
531         /* Extra ref prevents peer disappearing until I'm done with it */
532         ksocknal_peer_addref(peer);
533
534         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
535                 route = list_entry(tmp, ksock_route_t, ksnr_list);
536
537                 /* no match */
538                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
539                         continue;
540
541                 route->ksnr_share_count = 0;
542                 /* This deletes associated conns too */
543                 ksocknal_del_route_locked (route);
544         }
545
546         nshared = 0;
547         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
548                 route = list_entry(tmp, ksock_route_t, ksnr_list);
549                 nshared += route->ksnr_share_count;
550         }
551
552         if (nshared == 0) {
553                 /* remove everything else if there are no explicit entries
554                  * left */
555
556                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
557                         route = list_entry(tmp, ksock_route_t, ksnr_list);
558
559                         /* we should only be removing auto-entries */
560                         LASSERT(route->ksnr_share_count == 0);
561                         ksocknal_del_route_locked (route);
562                 }
563
564                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
565                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
566
567                         ksocknal_close_conn_locked(conn, 0);
568                 }
569         }
570
571         ksocknal_peer_decref(peer);
572         /* NB peer unlinks itself when last conn/route is removed */
573 }
574
575 int
576 ksocknal_del_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ip)
577 {
578         CFS_LIST_HEAD     (zombies);
579         struct list_head  *ptmp;
580         struct list_head  *pnxt;
581         ksock_peer_t      *peer;
582         int                lo;
583         int                hi;
584         int                i;
585         int                rc = -ENOENT;
586
587         write_lock_bh (&ksocknal_data.ksnd_global_lock);
588
589         if (id.nid != LNET_NID_ANY)
590                 lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;
591         else {
592                 lo = 0;
593                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
594         }
595
596         for (i = lo; i <= hi; i++) {
597                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
598                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
599
600                         if (peer->ksnp_ni != ni)
601                                 continue;
602
603                         if (!((id.nid == LNET_NID_ANY || peer->ksnp_id.nid == id.nid) &&
604                               (id.pid == LNET_PID_ANY || peer->ksnp_id.pid == id.pid)))
605                                 continue;
606
607                         ksocknal_peer_addref(peer);     /* a ref for me... */
608
609                         ksocknal_del_peer_locked (peer, ip);
610
611                         if (peer->ksnp_closing && !list_empty(&peer->ksnp_tx_queue)) {
612                                 LASSERT (list_empty(&peer->ksnp_conns));
613                                 LASSERT (list_empty(&peer->ksnp_routes));
614
615                                 list_splice_init(&peer->ksnp_tx_queue, &zombies);
616                         }
617
618                         ksocknal_peer_decref(peer);     /* ...till here */
619
620                         rc = 0;                 /* matched! */
621                 }
622         }
623
624         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
625
626         ksocknal_txlist_done(ni, &zombies, 1);
627
628         return (rc);
629 }
630
631 ksock_conn_t *
632 ksocknal_get_conn_by_idx (lnet_ni_t *ni, int index)
633 {
634         ksock_peer_t      *peer;
635         struct list_head  *ptmp;
636         ksock_conn_t      *conn;
637         struct list_head  *ctmp;
638         int                i;
639
640         read_lock (&ksocknal_data.ksnd_global_lock);
641
642         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
643                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
644                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
645
646                         LASSERT (!peer->ksnp_closing);
647
648                         if (peer->ksnp_ni != ni)
649                                 continue;
650
651                         list_for_each (ctmp, &peer->ksnp_conns) {
652                                 if (index-- > 0)
653                                         continue;
654
655                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
656                                 ksocknal_conn_addref(conn);
657                                 read_unlock (&ksocknal_data.ksnd_global_lock);
658                                 return (conn);
659                         }
660                 }
661         }
662
663         read_unlock (&ksocknal_data.ksnd_global_lock);
664         return (NULL);
665 }
666
667 ksock_sched_t *
668 ksocknal_choose_scheduler_locked (unsigned int irq)
669 {
670         ksock_sched_t    *sched;
671         ksock_irqinfo_t  *info;
672         int               i;
673
674         LASSERT (irq < NR_IRQS);
675         info = &ksocknal_data.ksnd_irqinfo[irq];
676
677         if (irq != 0 &&                         /* hardware NIC */
678             info->ksni_valid) {                 /* already set up */
679                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
680         }
681
682         /* software NIC (irq == 0) || not associated with a scheduler yet.
683          * Choose the CPU with the fewest connections... */
684         sched = &ksocknal_data.ksnd_schedulers[0];
685         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
686                 if (sched->kss_nconns >
687                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
688                         sched = &ksocknal_data.ksnd_schedulers[i];
689
690         if (irq != 0) {                         /* Hardware NIC */
691                 info->ksni_valid = 1;
692                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
693
694                 /* no overflow... */
695                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
696         }
697
698         return (sched);
699 }
700
701 int
702 ksocknal_local_ipvec (lnet_ni_t *ni, __u32 *ipaddrs)
703 {
704         ksock_net_t       *net = ni->ni_data;
705         int                i;
706         int                nip;
707
708         read_lock (&ksocknal_data.ksnd_global_lock);
709
710         nip = net->ksnn_ninterfaces;
711         LASSERT (nip <= LNET_MAX_INTERFACES);
712
713         /* Only offer interfaces for additional connections if I have 
714          * more than one. */
715         if (nip < 2) {
716                 read_unlock (&ksocknal_data.ksnd_global_lock);
717                 return 0;
718         }
719
720         for (i = 0; i < nip; i++) {
721                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
722                 LASSERT (ipaddrs[i] != 0);
723         }
724
725         read_unlock (&ksocknal_data.ksnd_global_lock);
726         return (nip);
727 }
728
729 int
730 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
731 {
732         int   best_netmatch = 0;
733         int   best_xor      = 0;
734         int   best          = -1;
735         int   this_xor;
736         int   this_netmatch;
737         int   i;
738
739         for (i = 0; i < nips; i++) {
740                 if (ips[i] == 0)
741                         continue;
742
743                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
744                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
745
746                 if (!(best < 0 ||
747                       best_netmatch < this_netmatch ||
748                       (best_netmatch == this_netmatch &&
749                        best_xor > this_xor)))
750                         continue;
751
752                 best = i;
753                 best_netmatch = this_netmatch;
754                 best_xor = this_xor;
755         }
756
757         LASSERT (best >= 0);
758         return (best);
759 }
760
761 int
762 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
763 {
764         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
765         ksock_net_t        *net = peer->ksnp_ni->ni_data;
766         ksock_interface_t  *iface;
767         ksock_interface_t  *best_iface;
768         int                 n_ips;
769         int                 i;
770         int                 j;
771         int                 k;
772         __u32               ip;
773         __u32               xor;
774         int                 this_netmatch;
775         int                 best_netmatch;
776         int                 best_npeers;
777
778         /* CAVEAT EMPTOR: We do all our interface matching with an
779          * exclusive hold of global lock at IRQ priority.  We're only
780          * expecting to be dealing with small numbers of interfaces, so the
781          * O(n**3)-ness shouldn't matter */
782
783         /* Also note that I'm not going to return more than n_peerips
784          * interfaces, even if I have more myself */
785
786         write_lock_bh (global_lock);
787
788         LASSERT (n_peerips <= LNET_MAX_INTERFACES);
789         LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
790
791         /* Only match interfaces for additional connections 
792          * if I have > 1 interface */
793         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
794                 MIN(n_peerips, net->ksnn_ninterfaces);
795
796         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
797                 /*              ^ yes really... */
798
799                 /* If we have any new interfaces, first tick off all the
800                  * peer IPs that match old interfaces, then choose new
801                  * interfaces to match the remaining peer IPS.
802                  * We don't forget interfaces we've stopped using; we might
803                  * start using them again... */
804
805                 if (i < peer->ksnp_n_passive_ips) {
806                         /* Old interface. */
807                         ip = peer->ksnp_passive_ips[i];
808                         best_iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
809
810                         /* peer passive ips are kept up to date */
811                         LASSERT(best_iface != NULL);
812                 } else {
813                         /* choose a new interface */
814                         LASSERT (i == peer->ksnp_n_passive_ips);
815
816                         best_iface = NULL;
817                         best_netmatch = 0;
818                         best_npeers = 0;
819
820                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
821                                 iface = &net->ksnn_interfaces[j];
822                                 ip = iface->ksni_ipaddr;
823
824                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
825                                         if (peer->ksnp_passive_ips[k] == ip)
826                                                 break;
827
828                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
829                                         continue;
830
831                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
832                                 xor = (ip ^ peerips[k]);
833                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
834
835                                 if (!(best_iface == NULL ||
836                                       best_netmatch < this_netmatch ||
837                                       (best_netmatch == this_netmatch &&
838                                        best_npeers > iface->ksni_npeers)))
839                                         continue;
840
841                                 best_iface = iface;
842                                 best_netmatch = this_netmatch;
843                                 best_npeers = iface->ksni_npeers;
844                         }
845
846                         best_iface->ksni_npeers++;
847                         ip = best_iface->ksni_ipaddr;
848                         peer->ksnp_passive_ips[i] = ip;
849                         peer->ksnp_n_passive_ips = i+1;
850                 }
851
852                 LASSERT (best_iface != NULL);
853
854                 /* mark the best matching peer IP used */
855                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
856                 peerips[j] = 0;
857         }
858
859         /* Overwrite input peer IP addresses */
860         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
861
862         write_unlock_bh (global_lock);
863
864         return (n_ips);
865 }
866
867 void
868 ksocknal_create_routes(ksock_peer_t *peer, int port,
869                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
870 {
871         ksock_route_t      *newroute = NULL;
872         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
873         lnet_ni_t          *ni = peer->ksnp_ni;
874         ksock_net_t        *net = ni->ni_data;
875         struct list_head   *rtmp;
876         ksock_route_t      *route;
877         ksock_interface_t  *iface;
878         ksock_interface_t  *best_iface;
879         int                 best_netmatch;
880         int                 this_netmatch;
881         int                 best_nroutes;
882         int                 i;
883         int                 j;
884
885         /* CAVEAT EMPTOR: We do all our interface matching with an
886          * exclusive hold of global lock at IRQ priority.  We're only
887          * expecting to be dealing with small numbers of interfaces, so the
888          * O(n**3)-ness here shouldn't matter */
889
890         write_lock_bh (global_lock);
891
892         if (net->ksnn_ninterfaces < 2) {
893                 /* Only create additional connections 
894                  * if I have > 1 interface */
895                 write_unlock_bh (global_lock);
896                 return;
897         }
898
899         LASSERT (npeer_ipaddrs <= LNET_MAX_INTERFACES);
900
901         for (i = 0; i < npeer_ipaddrs; i++) {
902                 if (newroute != NULL) {
903                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
904                 } else {
905                         write_unlock_bh (global_lock);
906
907                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
908                         if (newroute == NULL)
909                                 return;
910
911                         write_lock_bh (global_lock);
912                 }
913
914                 if (peer->ksnp_closing) {
915                         /* peer got closed under me */
916                         break;
917                 }
918
919                 /* Already got a route? */
920                 route = NULL;
921                 list_for_each(rtmp, &peer->ksnp_routes) {
922                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
923
924                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
925                                 break;
926
927                         route = NULL;
928                 }
929                 if (route != NULL)
930                         continue;
931
932                 best_iface = NULL;
933                 best_nroutes = 0;
934                 best_netmatch = 0;
935
936                 LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
937
938                 /* Select interface to connect from */
939                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
940                         iface = &net->ksnn_interfaces[j];
941
942                         /* Using this interface already? */
943                         list_for_each(rtmp, &peer->ksnp_routes) {
944                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
945
946                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
947                                         break;
948
949                                 route = NULL;
950                         }
951                         if (route != NULL)
952                                 continue;
953
954                         this_netmatch = (((iface->ksni_ipaddr ^
955                                            newroute->ksnr_ipaddr) &
956                                            iface->ksni_netmask) == 0) ? 1 : 0;
957
958                         if (!(best_iface == NULL ||
959                               best_netmatch < this_netmatch ||
960                               (best_netmatch == this_netmatch &&
961                                best_nroutes > iface->ksni_nroutes)))
962                                 continue;
963
964                         best_iface = iface;
965                         best_netmatch = this_netmatch;
966                         best_nroutes = iface->ksni_nroutes;
967                 }
968
969                 if (best_iface == NULL)
970                         continue;
971
972                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
973                 best_iface->ksni_nroutes++;
974
975                 ksocknal_add_route_locked(peer, newroute);
976                 newroute = NULL;
977         }
978
979         write_unlock_bh (global_lock);
980         if (newroute != NULL)
981                 ksocknal_route_decref(newroute);
982 }
983
984 int
985 ksocknal_accept (lnet_ni_t *ni, cfs_socket_t *sock)
986 {
987         ksock_connreq_t    *cr;
988         int                 rc;
989         __u32               peer_ip;
990         int                 peer_port;
991
992         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
993         LASSERT (rc == 0);                      /* we succeeded before */
994
995         LIBCFS_ALLOC(cr, sizeof(*cr));
996         if (cr == NULL) {
997                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
998                                    "%u.%u.%u.%u: memory exhausted\n",
999                                    HIPQUAD(peer_ip));
1000                 return -ENOMEM;
1001         }
1002
1003         lnet_ni_addref(ni);
1004         cr->ksncr_ni   = ni;
1005         cr->ksncr_sock = sock;
1006
1007         spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
1008
1009         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
1010         cfs_waitq_signal(&ksocknal_data.ksnd_connd_waitq);
1011
1012         spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
1013         return 0;
1014 }
1015
1016 int
1017 ksocknal_connecting (ksock_peer_t *peer, __u32 ipaddr)
1018 {
1019         ksock_route_t   *route;
1020
1021         list_for_each_entry (route, &peer->ksnp_routes, ksnr_list) {
1022
1023                 if (route->ksnr_ipaddr == ipaddr)
1024                         return route->ksnr_connecting;
1025         }
1026         return 0;
1027 }
1028
1029 int
1030 ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
1031                       cfs_socket_t *sock, int type)
1032 {
1033         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
1034         CFS_LIST_HEAD     (zombies);
1035         lnet_process_id_t  peerid;
1036         struct list_head  *tmp;
1037         __u64              incarnation;
1038         ksock_conn_t      *conn;
1039         ksock_conn_t      *conn2;
1040         ksock_peer_t      *peer = NULL;
1041         ksock_peer_t      *peer2;
1042         ksock_sched_t     *sched;
1043         ksock_hello_msg_t *hello;
1044         unsigned int       irq;
1045         ksock_tx_t        *tx;
1046         int                rc;
1047         int                active;
1048         char              *warn = NULL;
1049
1050         active = (route != NULL);
1051
1052         LASSERT (active == (type != SOCKLND_CONN_NONE));
1053
1054         irq = ksocknal_lib_sock_irq (sock);
1055
1056         LIBCFS_ALLOC(conn, sizeof(*conn));
1057         if (conn == NULL) {
1058                 rc = -ENOMEM;
1059                 goto failed_0;
1060         }
1061
1062         memset (conn, 0, sizeof (*conn));
1063         conn->ksnc_peer = NULL;
1064         conn->ksnc_route = NULL;
1065         conn->ksnc_sock = sock;
1066         /* 2 ref, 1 for conn, another extra ref prevents socket
1067          * being closed before establishment of connection */
1068         atomic_set (&conn->ksnc_sock_refcount, 2);
1069         conn->ksnc_type = type;
1070         ksocknal_lib_save_callback(sock, conn);
1071         atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1072
1073         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(sock);
1074         conn->ksnc_rx_ready = 0;
1075         conn->ksnc_rx_scheduled = 0;
1076
1077         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
1078         conn->ksnc_tx_ready = 0;
1079         conn->ksnc_tx_scheduled = 0;
1080         conn->ksnc_tx_mono = NULL;
1081         atomic_set (&conn->ksnc_tx_nob, 0);
1082
1083         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
1084                                      kshm_ips[LNET_MAX_INTERFACES]));
1085         if (hello == NULL) {
1086                 rc = -ENOMEM;
1087                 goto failed_1;
1088         }
1089
1090         /* stash conn's local and remote addrs */
1091         rc = ksocknal_lib_get_conn_addrs (conn);
1092         if (rc != 0)
1093                 goto failed_1;
1094
1095         /* Find out/confirm peer's NID and connection type and get the
1096          * vector of interfaces she's willing to let me connect to.
1097          * Passive connections use the listener timeout since the peer sends
1098          * eagerly */
1099
1100         if (active) {
1101                 peer = route->ksnr_peer;
1102                 LASSERT(ni == peer->ksnp_ni);
1103
1104                 /* Active connection sends HELLO eagerly */
1105                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1106                 peerid = peer->ksnp_id;
1107
1108                 write_lock_bh(global_lock);
1109                 conn->ksnc_proto = peer->ksnp_proto;
1110                 write_unlock_bh(global_lock);
1111
1112                 if (conn->ksnc_proto == NULL) {
1113                         conn->ksnc_proto = &ksocknal_protocol_v2x;
1114 #if SOCKNAL_VERSION_DEBUG
1115                         if (*ksocknal_tunables.ksnd_protocol != 2)
1116                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
1117 #endif
1118                 }
1119
1120                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1121                 if (rc != 0)
1122                         goto failed_1;
1123         } else {
1124                 peerid.nid = LNET_NID_ANY;
1125                 peerid.pid = LNET_PID_ANY;
1126
1127                 /* Passive, get protocol from peer */
1128                 conn->ksnc_proto = NULL;
1129         }
1130
1131         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1132         if (rc < 0)
1133                 goto failed_1;
1134
1135         LASSERT (rc == 0 || active);
1136         LASSERT (conn->ksnc_proto != NULL);
1137         LASSERT (peerid.nid != LNET_NID_ANY);
1138
1139         if (active) {
1140                 ksocknal_peer_addref(peer);
1141                 write_lock_bh (global_lock);
1142         } else {
1143                 rc = ksocknal_create_peer(&peer, ni, peerid);
1144                 if (rc != 0)
1145                         goto failed_1;
1146
1147                 write_lock_bh (global_lock);
1148
1149                 /* called with a ref on ni, so shutdown can't have started */
1150                 LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
1151
1152                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1153                 if (peer2 == NULL) {
1154                         /* NB this puts an "empty" peer in the peer
1155                          * table (which takes my ref) */
1156                         list_add_tail(&peer->ksnp_list,
1157                                       ksocknal_nid2peerlist(peerid.nid));
1158                 } else {
1159                         ksocknal_peer_decref(peer);
1160                         peer = peer2;
1161                 }
1162
1163                 /* +1 ref for me */
1164                 ksocknal_peer_addref(peer);
1165                 peer->ksnp_accepting++;
1166
1167                 /* Am I already connecting to this guy?  Resolve in
1168                  * favour of higher NID... */
1169                 if (peerid.nid < ni->ni_nid &&
1170                     ksocknal_connecting(peer, conn->ksnc_ipaddr)) {
1171                         rc = EALREADY;
1172                         warn = "connection race resolution";
1173                         goto failed_2;
1174                 }
1175         }
1176
1177         if (peer->ksnp_closing ||
1178             (active && route->ksnr_deleted)) {
1179                 /* peer/route got closed under me */
1180                 rc = -ESTALE;
1181                 warn = "peer/route removed";
1182                 goto failed_2;
1183         }
1184
1185         if (peer->ksnp_proto == NULL) {
1186                 /* Never connected before.
1187                  * NB recv_hello may have returned EPROTO to signal my peer
1188                  * wants a different protocol than the one I asked for.
1189                  */
1190                 LASSERT (list_empty(&peer->ksnp_conns));
1191
1192                 peer->ksnp_proto = conn->ksnc_proto;
1193                 peer->ksnp_incarnation = incarnation;
1194         }
1195
1196         if (peer->ksnp_proto != conn->ksnc_proto ||
1197             peer->ksnp_incarnation != incarnation) {
1198                 /* Peer rebooted or I've got the wrong protocol version */
1199                 ksocknal_close_peer_conns_locked(peer, 0, 0);
1200
1201                 peer->ksnp_proto = NULL;
1202                 rc = ESTALE;
1203                 warn = peer->ksnp_incarnation != incarnation ?
1204                        "peer rebooted" :
1205                        "wrong proto version";
1206                 goto failed_2;
1207         }
1208
1209         switch (rc) {
1210         default:
1211                 LBUG();
1212         case 0:
1213                 break;
1214         case EALREADY:
1215                 warn = "lost conn race";
1216                 goto failed_2;
1217         case EPROTO:
1218                 warn = "retry with different protocol version";
1219                 goto failed_2;
1220         }
1221
1222         /* Refuse to duplicate an existing connection, unless this is a
1223          * loopback connection */
1224         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1225                 list_for_each(tmp, &peer->ksnp_conns) {
1226                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1227
1228                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1229                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1230                             conn2->ksnc_type != conn->ksnc_type)
1231                                 continue;
1232
1233                         /* Reply on a passive connection attempt so the peer
1234                          * realises we're connected. */
1235                         LASSERT (rc == 0);
1236                         if (!active)
1237                                 rc = EALREADY;
1238
1239                         warn = "duplicate";
1240                         goto failed_2;
1241                 }
1242         }
1243
1244         /* If the connection created by this route didn't bind to the IP
1245          * address the route connected to, the connection/route matching
1246          * code below probably isn't going to work. */
1247         if (active &&
1248             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1249                 CERROR("Route %s %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1250                        libcfs_id2str(peer->ksnp_id),
1251                        HIPQUAD(route->ksnr_ipaddr),
1252                        HIPQUAD(conn->ksnc_ipaddr));
1253         }
1254
1255         /* Search for a route corresponding to the new connection and
1256          * create an association.  This allows incoming connections created
1257          * by routes in my peer to match my own route entries so I don't
1258          * continually create duplicate routes. */
1259         list_for_each (tmp, &peer->ksnp_routes) {
1260                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1261
1262                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1263                         continue;
1264
1265                 ksocknal_associate_route_conn_locked(route, conn);
1266                 break;
1267         }
1268
1269         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1270         peer->ksnp_last_alive = cfs_time_current();
1271         peer->ksnp_error = 0;
1272
1273         sched = ksocknal_choose_scheduler_locked (irq);
1274         sched->kss_nconns++;
1275         conn->ksnc_scheduler = sched;
1276
1277         /* Set the deadline for the outgoing HELLO to drain */
1278         conn->ksnc_tx_bufnob = SOCK_WMEM_QUEUED(sock);
1279         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1280         mb();       /* order with adding to peer's conn list */
1281
1282         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1283         ksocknal_conn_addref(conn);
1284
1285         ksocknal_new_packet(conn, 0);
1286
1287         /* Take all the packets blocking for a connection.
1288          * NB, it might be nicer to share these blocked packets among any
1289          * other connections that are becoming established. */
1290         while (!list_empty (&peer->ksnp_tx_queue)) {
1291                 tx = list_entry (peer->ksnp_tx_queue.next,
1292                                  ksock_tx_t, tx_list);
1293
1294                 list_del (&tx->tx_list);
1295                 ksocknal_queue_tx_locked (tx, conn);
1296         }
1297
1298         write_unlock_bh (global_lock);
1299
1300         /* We've now got a new connection.  Any errors from here on are just
1301          * like "normal" comms errors and we close the connection normally.
1302          * NB (a) we still have to send the reply HELLO for passive
1303          *        connections, 
1304          *    (b) normal I/O on the conn is blocked until I setup and call the
1305          *        socket callbacks.
1306          */
1307
1308         ksocknal_lib_bind_irq (irq);
1309
1310         CDEBUG(D_NET, "New conn %s p %d.x %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1311                " incarnation:"LPD64" sched[%d]/%d\n",
1312                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1313                HIPQUAD(conn->ksnc_myipaddr), HIPQUAD(conn->ksnc_ipaddr),
1314                conn->ksnc_port, incarnation,
1315                (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1316
1317         if (active) {
1318                 /* additional routes after interface exchange? */
1319                 ksocknal_create_routes(peer, conn->ksnc_port,
1320                                        hello->kshm_ips, hello->kshm_nips);
1321         } else {
1322                 hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
1323                                                        hello->kshm_nips);
1324                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1325         }
1326
1327         LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1328                                     kshm_ips[LNET_MAX_INTERFACES]));
1329
1330         /* setup the socket AFTER I've received hello (it disables
1331          * SO_LINGER).  I might call back to the acceptor who may want
1332          * to send a protocol version response and then close the
1333          * socket; this ensures the socket only tears down after the
1334          * response has been sent. */
1335         if (rc == 0)
1336                 rc = ksocknal_lib_setup_sock(sock);
1337
1338         write_lock_bh(global_lock);
1339
1340         /* NB my callbacks block while I hold ksnd_global_lock */
1341         ksocknal_lib_set_callback(sock, conn);
1342
1343         if (!active)
1344                 peer->ksnp_accepting--;
1345
1346         write_unlock_bh(global_lock);
1347
1348         if (rc != 0) {
1349                 write_lock_bh(global_lock);
1350                 ksocknal_close_conn_locked(conn, rc);
1351                 write_unlock_bh(global_lock);
1352         } else if (ksocknal_connsock_addref(conn) == 0) {
1353                 /* Allow I/O to proceed. */
1354                 ksocknal_read_callback(conn);
1355                 ksocknal_write_callback(conn);
1356                 ksocknal_connsock_decref(conn);
1357         }
1358
1359         ksocknal_connsock_decref(conn);
1360         ksocknal_conn_decref(conn);
1361         return rc;
1362
1363  failed_2:
1364         if (!peer->ksnp_closing &&
1365             list_empty (&peer->ksnp_conns) &&
1366             list_empty (&peer->ksnp_routes)) {
1367                 list_add(&zombies, &peer->ksnp_tx_queue);
1368                 list_del_init(&peer->ksnp_tx_queue);
1369                 ksocknal_unlink_peer_locked(peer);
1370         }
1371
1372         write_unlock_bh (global_lock);
1373
1374         if (warn != NULL) {
1375                 if (rc < 0)
1376                         CERROR("Not creating conn %s type %d: %s\n",
1377                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1378                 else
1379                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1380                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1381         }
1382
1383         if (!active) {
1384                 if (rc > 0) {
1385                         /* Request retry by replying with CONN_NONE 
1386                          * ksnc_proto has been set already */
1387                         conn->ksnc_type = SOCKLND_CONN_NONE;
1388                         hello->kshm_nips = 0;
1389                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1390                 }
1391
1392                 write_lock_bh(global_lock);
1393                 peer->ksnp_accepting--;
1394                 write_unlock_bh(global_lock);
1395         }
1396
1397         ksocknal_txlist_done(ni, &zombies, 1);
1398         ksocknal_peer_decref(peer);
1399
1400  failed_1:
1401         if (hello != NULL)
1402                 LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1403                                             kshm_ips[LNET_MAX_INTERFACES]));
1404
1405         LIBCFS_FREE (conn, sizeof(*conn));
1406
1407  failed_0:
1408         libcfs_sock_release(sock);
1409         return rc;
1410 }
1411
1412 void
1413 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1414 {
1415         /* This just does the immmediate housekeeping, and queues the
1416          * connection for the reaper to terminate.
1417          * Caller holds ksnd_global_lock exclusively in irq context */
1418         ksock_peer_t      *peer = conn->ksnc_peer;
1419         ksock_route_t     *route;
1420         ksock_conn_t      *conn2;
1421         struct list_head  *tmp;
1422
1423         LASSERT (peer->ksnp_error == 0);
1424         LASSERT (!conn->ksnc_closing);
1425         conn->ksnc_closing = 1;
1426
1427         /* ksnd_deathrow_conns takes over peer's ref */
1428         list_del (&conn->ksnc_list);
1429
1430         route = conn->ksnc_route;
1431         if (route != NULL) {
1432                 /* dissociate conn from route... */
1433                 LASSERT (!route->ksnr_deleted);
1434                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1435
1436                 conn2 = NULL;
1437                 list_for_each(tmp, &peer->ksnp_conns) {
1438                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1439
1440                         if (conn2->ksnc_route == route &&
1441                             conn2->ksnc_type == conn->ksnc_type)
1442                                 break;
1443
1444                         conn2 = NULL;
1445                 }
1446                 if (conn2 == NULL)
1447                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1448
1449                 conn->ksnc_route = NULL;
1450
1451 #if 0           /* irrelevent with only eager routes */
1452                 list_del (&route->ksnr_list);   /* make route least favourite */
1453                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1454 #endif
1455                 ksocknal_route_decref(route);     /* drop conn's ref on route */
1456         }
1457
1458         if (list_empty (&peer->ksnp_conns)) {
1459                 /* No more connections to this peer */
1460
1461                 peer->ksnp_proto = NULL;        /* renegotiate protocol version */
1462                 peer->ksnp_error = error;       /* stash last conn close reason */
1463
1464                 if (list_empty (&peer->ksnp_routes)) {
1465                         /* I've just closed last conn belonging to a
1466                          * peer with no routes to it */
1467                         ksocknal_unlink_peer_locked (peer);
1468                 }
1469         }
1470
1471         spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1472
1473         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1474         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1475
1476         spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1477 }
1478
1479 void
1480 ksocknal_peer_failed (ksock_peer_t *peer)
1481 {
1482         time_t    last_alive = 0;
1483         int       notify = 0;
1484
1485         /* There has been a connection failure or comms error; but I'll only
1486          * tell LNET I think the peer is dead if it's to another kernel and
1487          * there are no connections or connection attempts in existance. */
1488
1489         read_lock (&ksocknal_data.ksnd_global_lock);
1490
1491         if ((peer->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1492             list_empty(&peer->ksnp_conns) &&
1493             peer->ksnp_accepting == 0 &&
1494             ksocknal_find_connecting_route_locked(peer) == NULL) {
1495                 notify = 1;
1496                 last_alive = cfs_time_current_sec() -
1497                         cfs_duration_sec(cfs_time_current() -
1498                                          peer->ksnp_last_alive);
1499         }
1500
1501         read_unlock (&ksocknal_data.ksnd_global_lock);
1502
1503         if (notify)
1504                 lnet_notify (peer->ksnp_ni, peer->ksnp_id.nid, 0,
1505                              last_alive);
1506 }
1507
1508 void
1509 ksocknal_finalize_zcreq(ksock_conn_t *conn)
1510 {
1511         ksock_peer_t     *peer = conn->ksnc_peer;
1512         ksock_tx_t       *tx;
1513         ksock_tx_t       *tmp;
1514         CFS_LIST_HEAD    (zlist);
1515
1516         /* NB safe to finalize TXs because closing of socket will
1517          * abort all buffered data */
1518         LASSERT (conn->ksnc_sock == NULL);
1519
1520         spin_lock(&peer->ksnp_lock);
1521
1522         list_for_each_entry_safe(tx, tmp, &peer->ksnp_zc_req_list, tx_zc_list) {
1523                 if (tx->tx_conn != conn)
1524                         continue;
1525
1526                 LASSERT (tx->tx_msg.ksm_zc_req_cookie != 0);
1527
1528                 tx->tx_msg.ksm_zc_req_cookie = 0;
1529                 list_del(&tx->tx_zc_list);
1530                 list_add(&tx->tx_zc_list, &zlist);
1531         }
1532
1533         spin_unlock(&peer->ksnp_lock);
1534
1535         while (!list_empty(&zlist)) {
1536                 tx = list_entry(zlist.next, ksock_tx_t, tx_zc_list);
1537
1538                 list_del(&tx->tx_zc_list);
1539                 ksocknal_tx_decref(tx);
1540         }
1541 }
1542
1543 void
1544 ksocknal_terminate_conn (ksock_conn_t *conn)
1545 {
1546         /* This gets called by the reaper (guaranteed thread context) to
1547          * disengage the socket from its callbacks and close it.
1548          * ksnc_refcount will eventually hit zero, and then the reaper will
1549          * destroy it. */
1550         ksock_peer_t     *peer = conn->ksnc_peer;
1551         ksock_sched_t    *sched = conn->ksnc_scheduler;
1552         int               failed = 0;
1553
1554         LASSERT(conn->ksnc_closing);
1555
1556         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1557         spin_lock_bh (&sched->kss_lock);
1558
1559         /* a closing conn is always ready to tx */
1560         conn->ksnc_tx_ready = 1;
1561
1562         if (!conn->ksnc_tx_scheduled &&
1563             !list_empty(&conn->ksnc_tx_queue)){
1564                 list_add_tail (&conn->ksnc_tx_list,
1565                                &sched->kss_tx_conns);
1566                 conn->ksnc_tx_scheduled = 1;
1567                 /* extra ref for scheduler */
1568                 ksocknal_conn_addref(conn);
1569
1570                 cfs_waitq_signal (&sched->kss_waitq);
1571         }
1572
1573         spin_unlock_bh (&sched->kss_lock);
1574
1575         /* serialise with callbacks */
1576         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1577
1578         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1579
1580         /* OK, so this conn may not be completely disengaged from its
1581          * scheduler yet, but it _has_ committed to terminate... */
1582         conn->ksnc_scheduler->kss_nconns--;
1583
1584         if (peer->ksnp_error != 0) {
1585                 /* peer's last conn closed in error */
1586                 LASSERT (list_empty (&peer->ksnp_conns));
1587                 failed = 1;
1588                 peer->ksnp_error = 0;     /* avoid multiple notifications */
1589         }
1590
1591         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1592
1593         if (failed)
1594                 ksocknal_peer_failed(peer);
1595
1596         /* The socket is closed on the final put; either here, or in
1597          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1598          * when the connection was established, this will close the socket
1599          * immediately, aborting anything buffered in it. Any hung
1600          * zero-copy transmits will therefore complete in finite time. */
1601         ksocknal_connsock_decref(conn);
1602 }
1603
1604 void
1605 ksocknal_queue_zombie_conn (ksock_conn_t *conn)
1606 {
1607         /* Queue the conn for the reaper to destroy */
1608
1609         LASSERT (atomic_read(&conn->ksnc_conn_refcount) == 0);
1610         spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1611
1612         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1613         cfs_waitq_signal(&ksocknal_data.ksnd_reaper_waitq);
1614
1615         spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1616 }
1617
1618 void
1619 ksocknal_destroy_conn (ksock_conn_t *conn)
1620 {
1621         /* Final coup-de-grace of the reaper */
1622         CDEBUG (D_NET, "connection %p\n", conn);
1623
1624         LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);
1625         LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);
1626         LASSERT (conn->ksnc_sock == NULL);
1627         LASSERT (conn->ksnc_route == NULL);
1628         LASSERT (!conn->ksnc_tx_scheduled);
1629         LASSERT (!conn->ksnc_rx_scheduled);
1630         LASSERT (list_empty(&conn->ksnc_tx_queue));
1631
1632         /* complete current receive if any */
1633         switch (conn->ksnc_rx_state) {
1634         case SOCKNAL_RX_LNET_PAYLOAD:
1635                 CERROR("Completing partial receive from %s"
1636                        ", ip %d.%d.%d.%d:%d, with error\n",
1637                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1638                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1639                 lnet_finalize (conn->ksnc_peer->ksnp_ni,
1640                                conn->ksnc_cookie, -EIO);
1641                 break;
1642         case SOCKNAL_RX_LNET_HEADER:
1643                 if (conn->ksnc_rx_started)
1644                         CERROR("Incomplete receive of lnet header from %s"
1645                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1646                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1647                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1648                                conn->ksnc_proto->pro_version);
1649                 break;
1650         case SOCKNAL_RX_KSM_HEADER:
1651                 if (conn->ksnc_rx_started)
1652                         CERROR("Incomplete receive of ksock message from %s"
1653                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1654                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1655                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1656                                conn->ksnc_proto->pro_version);
1657                 break;
1658         case SOCKNAL_RX_SLOP:
1659                 if (conn->ksnc_rx_started)
1660                         CERROR("Incomplete receive of slops from %s"
1661                                ", ip %d.%d.%d.%d:%d, with error\n",
1662                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1663                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1664                break;
1665         default:
1666                 LBUG ();
1667                 break;
1668         }
1669
1670         ksocknal_peer_decref(conn->ksnc_peer);
1671
1672         LIBCFS_FREE (conn, sizeof (*conn));
1673 }
1674
1675 int
1676 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1677 {
1678         ksock_conn_t       *conn;
1679         struct list_head   *ctmp;
1680         struct list_head   *cnxt;
1681         int                 count = 0;
1682
1683         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1684                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1685
1686                 if (ipaddr == 0 ||
1687                     conn->ksnc_ipaddr == ipaddr) {
1688                         count++;
1689                         ksocknal_close_conn_locked (conn, why);
1690                 }
1691         }
1692
1693         return (count);
1694 }
1695
1696 int
1697 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1698 {
1699         ksock_peer_t     *peer = conn->ksnc_peer;
1700         __u32             ipaddr = conn->ksnc_ipaddr;
1701         int               count;
1702
1703         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1704
1705         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1706
1707         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1708
1709         return (count);
1710 }
1711
1712 int
1713 ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr)
1714 {
1715         ksock_peer_t       *peer;
1716         struct list_head   *ptmp;
1717         struct list_head   *pnxt;
1718         int                 lo;
1719         int                 hi;
1720         int                 i;
1721         int                 count = 0;
1722
1723         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1724
1725         if (id.nid != LNET_NID_ANY)
1726                 lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;
1727         else {
1728                 lo = 0;
1729                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1730         }
1731
1732         for (i = lo; i <= hi; i++) {
1733                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1734
1735                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1736
1737                         if (!((id.nid == LNET_NID_ANY || id.nid == peer->ksnp_id.nid) &&
1738                               (id.pid == LNET_PID_ANY || id.pid == peer->ksnp_id.pid)))
1739                                 continue;
1740
1741                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1742                 }
1743         }
1744
1745         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1746
1747         /* wildcards always succeed */
1748         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1749                 return (0);
1750
1751         return (count == 0 ? -ENOENT : 0);
1752 }
1753
1754 void
1755 ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive)
1756 {
1757         /* The router is telling me she's been notified of a change in
1758          * gateway state.... */
1759         lnet_process_id_t  id = {.nid = gw_nid, .pid = LNET_PID_ANY};
1760
1761         CDEBUG (D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid),
1762                 alive ? "up" : "down");
1763
1764         if (!alive) {
1765                 /* If the gateway crashed, close all open connections... */
1766                 ksocknal_close_matching_conns (id, 0);
1767                 return;
1768         }
1769
1770         /* ...otherwise do nothing.  We can only establish new connections
1771          * if we have autroutes, and these connect on demand. */
1772 }
1773
1774 void
1775 ksocknal_push_peer (ksock_peer_t *peer)
1776 {
1777         int               index;
1778         int               i;
1779         struct list_head *tmp;
1780         ksock_conn_t     *conn;
1781
1782         for (index = 0; ; index++) {
1783                 read_lock (&ksocknal_data.ksnd_global_lock);
1784
1785                 i = 0;
1786                 conn = NULL;
1787
1788                 list_for_each (tmp, &peer->ksnp_conns) {
1789                         if (i++ == index) {
1790                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1791                                 ksocknal_conn_addref(conn);
1792                                 break;
1793                         }
1794                 }
1795
1796                 read_unlock (&ksocknal_data.ksnd_global_lock);
1797
1798                 if (conn == NULL)
1799                         break;
1800
1801                 ksocknal_lib_push_conn (conn);
1802                 ksocknal_conn_decref(conn);
1803         }
1804 }
1805
1806 int
1807 ksocknal_push (lnet_ni_t *ni, lnet_process_id_t id)
1808 {
1809         ksock_peer_t      *peer;
1810         struct list_head  *tmp;
1811         int                index;
1812         int                i;
1813         int                j;
1814         int                rc = -ENOENT;
1815
1816         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1817                 for (j = 0; ; j++) {
1818                         read_lock (&ksocknal_data.ksnd_global_lock);
1819
1820                         index = 0;
1821                         peer = NULL;
1822
1823                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1824                                 peer = list_entry(tmp, ksock_peer_t,
1825                                                   ksnp_list);
1826
1827                                 if (!((id.nid == LNET_NID_ANY ||
1828                                        id.nid == peer->ksnp_id.nid) &&
1829                                       (id.pid == LNET_PID_ANY ||
1830                                        id.pid == peer->ksnp_id.pid))) {
1831                                         peer = NULL;
1832                                         continue;
1833                                 }
1834
1835                                 if (index++ == j) {
1836                                         ksocknal_peer_addref(peer);
1837                                         break;
1838                                 }
1839                         }
1840
1841                         read_unlock (&ksocknal_data.ksnd_global_lock);
1842
1843                         if (peer != NULL) {
1844                                 rc = 0;
1845                                 ksocknal_push_peer (peer);
1846                                 ksocknal_peer_decref(peer);
1847                         }
1848                 }
1849
1850         }
1851
1852         return (rc);
1853 }
1854
1855 int
1856 ksocknal_add_interface(lnet_ni_t *ni, __u32 ipaddress, __u32 netmask)
1857 {
1858         ksock_net_t       *net = ni->ni_data;
1859         ksock_interface_t *iface;
1860         int                rc;
1861         int                i;
1862         int                j;
1863         struct list_head  *ptmp;
1864         ksock_peer_t      *peer;
1865         struct list_head  *rtmp;
1866         ksock_route_t     *route;
1867
1868         if (ipaddress == 0 ||
1869             netmask == 0)
1870                 return (-EINVAL);
1871
1872         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1873
1874         iface = ksocknal_ip2iface(ni, ipaddress);
1875         if (iface != NULL) {
1876                 /* silently ignore dups */
1877                 rc = 0;
1878         } else if (net->ksnn_ninterfaces == LNET_MAX_INTERFACES) {
1879                 rc = -ENOSPC;
1880         } else {
1881                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1882
1883                 iface->ksni_ipaddr = ipaddress;
1884                 iface->ksni_netmask = netmask;
1885                 iface->ksni_nroutes = 0;
1886                 iface->ksni_npeers = 0;
1887
1888                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1889                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1890                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1891
1892                                 for (j = 0; j < peer->ksnp_n_passive_ips; j++)
1893                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1894                                                 iface->ksni_npeers++;
1895
1896                                 list_for_each(rtmp, &peer->ksnp_routes) {
1897                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1898
1899                                         if (route->ksnr_myipaddr == ipaddress)
1900                                                 iface->ksni_nroutes++;
1901                                 }
1902                         }
1903                 }
1904
1905                 rc = 0;
1906                 /* NB only new connections will pay attention to the new interface! */
1907         }
1908
1909         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1910
1911         return (rc);
1912 }
1913
1914 void
1915 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1916 {
1917         struct list_head   *tmp;
1918         struct list_head   *nxt;
1919         ksock_route_t      *route;
1920         ksock_conn_t       *conn;
1921         int                 i;
1922         int                 j;
1923
1924         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1925                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1926                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1927                                 peer->ksnp_passive_ips[j-1] =
1928                                         peer->ksnp_passive_ips[j];
1929                         peer->ksnp_n_passive_ips--;
1930                         break;
1931                 }
1932
1933         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1934                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1935
1936                 if (route->ksnr_myipaddr != ipaddr)
1937                         continue;
1938
1939                 if (route->ksnr_share_count != 0) {
1940                         /* Manually created; keep, but unbind */
1941                         route->ksnr_myipaddr = 0;
1942                 } else {
1943                         ksocknal_del_route_locked(route);
1944                 }
1945         }
1946
1947         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1948                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1949
1950                 if (conn->ksnc_myipaddr == ipaddr)
1951                         ksocknal_close_conn_locked (conn, 0);
1952         }
1953 }
1954
1955 int
1956 ksocknal_del_interface(lnet_ni_t *ni, __u32 ipaddress)
1957 {
1958         ksock_net_t       *net = ni->ni_data;
1959         int                rc = -ENOENT;
1960         struct list_head  *tmp;
1961         struct list_head  *nxt;
1962         ksock_peer_t      *peer;
1963         __u32              this_ip;
1964         int                i;
1965         int                j;
1966
1967         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1968
1969         for (i = 0; i < net->ksnn_ninterfaces; i++) {
1970                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
1971
1972                 if (!(ipaddress == 0 ||
1973                       ipaddress == this_ip))
1974                         continue;
1975
1976                 rc = 0;
1977
1978                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
1979                         net->ksnn_interfaces[j-1] =
1980                                 net->ksnn_interfaces[j];
1981
1982                 net->ksnn_ninterfaces--;
1983
1984                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1985                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1986                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1987
1988                                 if (peer->ksnp_ni != ni)
1989                                         continue;
1990
1991                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1992                         }
1993                 }
1994         }
1995
1996         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1997
1998         return (rc);
1999 }
2000
2001 int
2002 ksocknal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
2003 {
2004         struct libcfs_ioctl_data *data = arg;
2005         int rc;
2006
2007         switch(cmd) {
2008         case IOC_LIBCFS_GET_INTERFACE: {
2009                 ksock_net_t       *net = ni->ni_data;
2010                 ksock_interface_t *iface;
2011
2012                 read_lock (&ksocknal_data.ksnd_global_lock);
2013
2014                 if (data->ioc_count < 0 ||
2015                     data->ioc_count >= net->ksnn_ninterfaces) {
2016                         rc = -ENOENT;
2017                 } else {
2018                         rc = 0;
2019                         iface = &net->ksnn_interfaces[data->ioc_count];
2020
2021                         data->ioc_u32[0] = iface->ksni_ipaddr;
2022                         data->ioc_u32[1] = iface->ksni_netmask;
2023                         data->ioc_u32[2] = iface->ksni_npeers;
2024                         data->ioc_u32[3] = iface->ksni_nroutes;
2025                 }
2026
2027                 read_unlock (&ksocknal_data.ksnd_global_lock);
2028                 return rc;
2029         }
2030
2031         case IOC_LIBCFS_ADD_INTERFACE:
2032                 return ksocknal_add_interface(ni,
2033                                               data->ioc_u32[0], /* IP address */
2034                                               data->ioc_u32[1]); /* net mask */
2035
2036         case IOC_LIBCFS_DEL_INTERFACE:
2037                 return ksocknal_del_interface(ni,
2038                                               data->ioc_u32[0]); /* IP address */
2039
2040         case IOC_LIBCFS_GET_PEER: {
2041                 lnet_process_id_t id = {0,};
2042                 __u32            myip = 0;
2043                 __u32            ip = 0;
2044                 int              port = 0;
2045                 int              conn_count = 0;
2046                 int              share_count = 0;
2047
2048                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2049                                             &id, &myip, &ip, &port,
2050                                             &conn_count,  &share_count);
2051                 if (rc != 0)
2052                         return rc;
2053
2054                 data->ioc_nid    = id.nid;
2055                 data->ioc_count  = share_count;
2056                 data->ioc_u32[0] = ip;
2057                 data->ioc_u32[1] = port;
2058                 data->ioc_u32[2] = myip;
2059                 data->ioc_u32[3] = conn_count;
2060                 data->ioc_u32[4] = id.pid;
2061                 return 0;
2062         }
2063
2064         case IOC_LIBCFS_ADD_PEER: {
2065                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2066                                          .pid = LUSTRE_SRV_LNET_PID};
2067                 return ksocknal_add_peer (ni, id,
2068                                           data->ioc_u32[0], /* IP */
2069                                           data->ioc_u32[1]); /* port */
2070         }
2071         case IOC_LIBCFS_DEL_PEER: {
2072                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2073                                          .pid = LNET_PID_ANY};
2074                 return ksocknal_del_peer (ni, id,
2075                                           data->ioc_u32[0]); /* IP */
2076         }
2077         case IOC_LIBCFS_GET_CONN: {
2078                 int           txmem;
2079                 int           rxmem;
2080                 int           nagle;
2081                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (ni, data->ioc_count);
2082
2083                 if (conn == NULL)
2084                         return -ENOENT;
2085
2086                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2087
2088                 data->ioc_count  = txmem;
2089                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2090                 data->ioc_flags  = nagle;
2091                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2092                 data->ioc_u32[1] = conn->ksnc_port;
2093                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2094                 data->ioc_u32[3] = conn->ksnc_type;
2095                 data->ioc_u32[4] = conn->ksnc_scheduler -
2096                                    ksocknal_data.ksnd_schedulers;
2097                 data->ioc_u32[5] = rxmem;
2098                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2099                 ksocknal_conn_decref(conn);
2100                 return 0;
2101         }
2102
2103         case IOC_LIBCFS_CLOSE_CONNECTION: {
2104                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2105                                         .pid = LNET_PID_ANY};
2106
2107                 return ksocknal_close_matching_conns (id,
2108                                                       data->ioc_u32[0]);
2109         }
2110         case IOC_LIBCFS_REGISTER_MYNID:
2111                 /* Ignore if this is a noop */
2112                 if (data->ioc_nid == ni->ni_nid)
2113                         return 0;
2114
2115                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2116                        libcfs_nid2str(data->ioc_nid),
2117                        libcfs_nid2str(ni->ni_nid));
2118                 return -EINVAL;
2119
2120         case IOC_LIBCFS_PUSH_CONNECTION: {
2121                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2122                                         .pid = LNET_PID_ANY};
2123
2124                 return ksocknal_push(ni, id);
2125         }
2126         default:
2127                 return -EINVAL;
2128         }
2129         /* not reached */
2130 }
2131
2132 void
2133 ksocknal_free_buffers (void)
2134 {
2135         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2136
2137         if (ksocknal_data.ksnd_schedulers != NULL)
2138                 LIBCFS_FREE (ksocknal_data.ksnd_schedulers,
2139                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2140
2141         LIBCFS_FREE (ksocknal_data.ksnd_peers,
2142                      sizeof (struct list_head) *
2143                      ksocknal_data.ksnd_peer_hash_size);
2144
2145         spin_lock(&ksocknal_data.ksnd_tx_lock);
2146
2147         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2148                 struct list_head  zlist;
2149                 ksock_tx_t       *tx;
2150
2151                 list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2152                 list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2153                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2154
2155                 while(!list_empty(&zlist)) {
2156                         tx = list_entry(zlist.next, ksock_tx_t, tx_list);
2157                         list_del(&tx->tx_list);
2158                         LIBCFS_FREE(tx, tx->tx_desc_size);
2159                 }
2160         } else {
2161                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2162         }
2163 }
2164
2165 void
2166 ksocknal_base_shutdown (void)
2167 {
2168         ksock_sched_t *sched;
2169         int            i;
2170
2171         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2172                atomic_read (&libcfs_kmemory));
2173         LASSERT (ksocknal_data.ksnd_nnets == 0);
2174
2175         switch (ksocknal_data.ksnd_init) {
2176         default:
2177                 LASSERT (0);
2178
2179         case SOCKNAL_INIT_ALL:
2180         case SOCKNAL_INIT_DATA:
2181                 LASSERT (ksocknal_data.ksnd_peers != NULL);
2182                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2183                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
2184                 }
2185                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
2186                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
2187                 LASSERT (list_empty (&ksocknal_data.ksnd_connd_connreqs));
2188                 LASSERT (list_empty (&ksocknal_data.ksnd_connd_routes));
2189
2190                 if (ksocknal_data.ksnd_schedulers != NULL)
2191                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2192                                 ksock_sched_t *kss =
2193                                         &ksocknal_data.ksnd_schedulers[i];
2194
2195                                 LASSERT (list_empty (&kss->kss_tx_conns));
2196                                 LASSERT (list_empty (&kss->kss_rx_conns));
2197                                 LASSERT (list_empty (&kss->kss_zombie_noop_txs));
2198                                 LASSERT (kss->kss_nconns == 0);
2199                         }
2200
2201                 /* flag threads to terminate; wake and wait for them to die */
2202                 ksocknal_data.ksnd_shuttingdown = 1;
2203                 cfs_waitq_broadcast (&ksocknal_data.ksnd_connd_waitq);
2204                 cfs_waitq_broadcast (&ksocknal_data.ksnd_reaper_waitq);
2205
2206                 if (ksocknal_data.ksnd_schedulers != NULL)
2207                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2208                                 sched = &ksocknal_data.ksnd_schedulers[i];
2209                                 cfs_waitq_broadcast(&sched->kss_waitq);
2210                         }
2211
2212                 i = 4;
2213                 read_lock (&ksocknal_data.ksnd_global_lock);
2214                 while (ksocknal_data.ksnd_nthreads != 0) {
2215                         i++;
2216                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2217                                "waiting for %d threads to terminate\n",
2218                                 ksocknal_data.ksnd_nthreads);
2219                         read_unlock (&ksocknal_data.ksnd_global_lock);
2220                         cfs_pause(cfs_time_seconds(1));
2221                         read_lock (&ksocknal_data.ksnd_global_lock);
2222                 }
2223                 read_unlock (&ksocknal_data.ksnd_global_lock);
2224
2225                 ksocknal_free_buffers();
2226
2227                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2228                 break;
2229         }
2230
2231         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2232                atomic_read (&libcfs_kmemory));
2233
2234         PORTAL_MODULE_UNUSE;
2235 }
2236
2237
2238 __u64
2239 ksocknal_new_incarnation (void)
2240 {
2241         struct timeval tv;
2242
2243         /* The incarnation number is the time this module loaded and it
2244          * identifies this particular instance of the socknal.  Hopefully
2245          * we won't be able to reboot more frequently than 1MHz for the
2246          * forseeable future :) */
2247
2248         do_gettimeofday(&tv);
2249
2250         return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2251 }
2252
2253 int
2254 ksocknal_base_startup (void)
2255 {
2256         int               rc;
2257         int               i;
2258
2259         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2260         LASSERT (ksocknal_data.ksnd_nnets == 0);
2261
2262         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2263
2264         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2265         LIBCFS_ALLOC (ksocknal_data.ksnd_peers,
2266                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2267         if (ksocknal_data.ksnd_peers == NULL)
2268                 return -ENOMEM;
2269
2270         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2271                 CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2272
2273         rwlock_init(&ksocknal_data.ksnd_global_lock);
2274
2275         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2276         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2277         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2278         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2279         cfs_waitq_init(&ksocknal_data.ksnd_reaper_waitq);
2280
2281         spin_lock_init (&ksocknal_data.ksnd_connd_lock);
2282         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_connreqs);
2283         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_routes);
2284         cfs_waitq_init(&ksocknal_data.ksnd_connd_waitq);
2285
2286         spin_lock_init (&ksocknal_data.ksnd_tx_lock);
2287         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_idle_noop_txs);
2288
2289         /* NB memset above zeros whole of ksocknal_data, including
2290          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2291
2292         /* flag lists/ptrs/locks initialised */
2293         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2294         PORTAL_MODULE_USE;
2295
2296         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2297         LIBCFS_ALLOC(ksocknal_data.ksnd_schedulers,
2298                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2299         if (ksocknal_data.ksnd_schedulers == NULL)
2300                 goto failed;
2301
2302         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2303                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2304
2305                 spin_lock_init (&kss->kss_lock);
2306                 CFS_INIT_LIST_HEAD (&kss->kss_rx_conns);
2307                 CFS_INIT_LIST_HEAD (&kss->kss_tx_conns);
2308                 CFS_INIT_LIST_HEAD (&kss->kss_zombie_noop_txs);
2309                 cfs_waitq_init (&kss->kss_waitq);
2310         }
2311
2312         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2313                 rc = ksocknal_thread_start (ksocknal_scheduler,
2314                                             &ksocknal_data.ksnd_schedulers[i]);
2315                 if (rc != 0) {
2316                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2317                                i, rc);
2318                         goto failed;
2319                 }
2320         }
2321
2322         /* must have at least 2 connds to remain responsive to accepts while
2323          * connecting */
2324         if (*ksocknal_tunables.ksnd_nconnds < 2)
2325                 *ksocknal_tunables.ksnd_nconnds = 2;
2326
2327         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2328                 rc = ksocknal_thread_start (ksocknal_connd, (void *)((long)i));
2329                 if (rc != 0) {
2330                         CERROR("Can't spawn socknal connd: %d\n", rc);
2331                         goto failed;
2332                 }
2333         }
2334
2335         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2336         if (rc != 0) {
2337                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2338                 goto failed;
2339         }
2340
2341         /* flag everything initialised */
2342         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2343
2344         return 0;
2345
2346  failed:
2347         ksocknal_base_shutdown();
2348         return -ENETDOWN;
2349 }
2350
2351 void
2352 ksocknal_debug_peerhash (lnet_ni_t *ni)
2353 {
2354         ksock_peer_t     *peer = NULL;
2355         struct list_head *tmp;
2356         int               i;
2357
2358         read_lock (&ksocknal_data.ksnd_global_lock);
2359
2360         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2361                 list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
2362                         peer = list_entry (tmp, ksock_peer_t, ksnp_list);
2363
2364                         if (peer->ksnp_ni == ni) break;
2365
2366                         peer = NULL;
2367                 }
2368         }
2369
2370         if (peer != NULL) {
2371                 ksock_route_t *route;
2372                 ksock_conn_t  *conn;
2373
2374                 CWARN ("Active peer on shutdown: %s, ref %d, scnt %d, "
2375                        "closing %d, accepting %d, err %d, zcookie "LPU64", "
2376                        "txq %d, zc_req %d\n", libcfs_id2str(peer->ksnp_id),
2377                        atomic_read(&peer->ksnp_refcount),
2378                        peer->ksnp_sharecount, peer->ksnp_closing,
2379                        peer->ksnp_accepting, peer->ksnp_error,
2380                        peer->ksnp_zc_next_cookie,
2381                        !list_empty(&peer->ksnp_tx_queue),
2382                        !list_empty(&peer->ksnp_zc_req_list));
2383
2384                 list_for_each (tmp, &peer->ksnp_routes) {
2385                         route = list_entry(tmp, ksock_route_t, ksnr_list);
2386                         CWARN ("Route: ref %d, schd %d, conn %d, cnted %d, "
2387                                "del %d\n", atomic_read(&route->ksnr_refcount),
2388                                route->ksnr_scheduled, route->ksnr_connecting,
2389                                route->ksnr_connected, route->ksnr_deleted);
2390                 }
2391
2392                 list_for_each (tmp, &peer->ksnp_conns) {
2393                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
2394                         CWARN ("Conn: ref %d, sref %d, t %d, c %d\n",
2395                                atomic_read(&conn->ksnc_conn_refcount),
2396                                atomic_read(&conn->ksnc_sock_refcount),
2397                                conn->ksnc_type, conn->ksnc_closing);
2398                 }
2399         }
2400
2401         read_unlock (&ksocknal_data.ksnd_global_lock);
2402         return;
2403 }
2404
2405 void
2406 ksocknal_shutdown (lnet_ni_t *ni)
2407 {
2408         ksock_net_t      *net = ni->ni_data;
2409         int               i;
2410         lnet_process_id_t  anyid = {.nid = LNET_NID_ANY,
2411                                    .pid = LNET_PID_ANY};
2412
2413         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2414         LASSERT(ksocknal_data.ksnd_nnets > 0);
2415
2416         spin_lock_bh (&net->ksnn_lock);
2417         net->ksnn_shutdown = 1;                 /* prevent new peers */
2418         spin_unlock_bh (&net->ksnn_lock);
2419
2420         /* Delete all peers */
2421         ksocknal_del_peer(ni, anyid, 0);
2422
2423         /* Wait for all peer state to clean up */
2424         i = 2;
2425         spin_lock_bh (&net->ksnn_lock);
2426         while (net->ksnn_npeers != 0) {
2427                 spin_unlock_bh (&net->ksnn_lock);
2428
2429                 i++;
2430                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2431                        "waiting for %d peers to disconnect\n",
2432                        net->ksnn_npeers);
2433                 cfs_pause(cfs_time_seconds(1));
2434
2435                 ksocknal_debug_peerhash(ni);
2436
2437                 spin_lock_bh (&net->ksnn_lock);
2438         }
2439         spin_unlock_bh (&net->ksnn_lock);
2440
2441         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2442                 LASSERT (net->ksnn_interfaces[i].ksni_npeers == 0);
2443                 LASSERT (net->ksnn_interfaces[i].ksni_nroutes == 0);
2444         }
2445
2446         LIBCFS_FREE(net, sizeof(*net));
2447
2448         ksocknal_data.ksnd_nnets--;
2449         if (ksocknal_data.ksnd_nnets == 0)
2450                 ksocknal_base_shutdown();
2451 }
2452
2453 int
2454 ksocknal_enumerate_interfaces(ksock_net_t *net)
2455 {
2456         char      **names;
2457         int         i;
2458         int         j;
2459         int         rc;
2460         int         n;
2461
2462         n = libcfs_ipif_enumerate(&names);
2463         if (n <= 0) {
2464                 CERROR("Can't enumerate interfaces: %d\n", n);
2465                 return n;
2466         }
2467
2468         for (i = j = 0; i < n; i++) {
2469                 int        up;
2470                 __u32      ip;
2471                 __u32      mask;
2472
2473                 if (!strcmp(names[i], "lo")) /* skip the loopback IF */
2474                         continue;
2475
2476                 rc = libcfs_ipif_query(names[i], &up, &ip, &mask);
2477                 if (rc != 0) {
2478                         CWARN("Can't get interface %s info: %d\n",
2479                               names[i], rc);
2480                         continue;
2481                 }
2482
2483                 if (!up) {
2484                         CWARN("Ignoring interface %s (down)\n",
2485                               names[i]);
2486                         continue;
2487                 }
2488
2489                 if (j == LNET_MAX_INTERFACES) {
2490                         CWARN("Ignoring interface %s (too many interfaces)\n",
2491                               names[i]);
2492                         continue;
2493                 }
2494
2495                 net->ksnn_interfaces[j].ksni_ipaddr = ip;
2496                 net->ksnn_interfaces[j].ksni_netmask = mask;
2497                 j++;
2498         }
2499
2500         libcfs_ipif_free_enumeration(names, n);
2501
2502         if (j == 0)
2503                 CERROR("Can't find any usable interfaces\n");
2504
2505         return j;
2506 }
2507
2508 int
2509 ksocknal_startup (lnet_ni_t *ni)
2510 {
2511         ksock_net_t  *net;
2512         int           rc;
2513         int           i;
2514
2515         LASSERT (ni->ni_lnd == &the_ksocklnd);
2516
2517         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2518                 rc = ksocknal_base_startup();
2519                 if (rc != 0)
2520                         return rc;
2521         }
2522
2523         LIBCFS_ALLOC(net, sizeof(*net));
2524         if (net == NULL)
2525                 goto fail_0;
2526
2527         memset(net, 0, sizeof(*net));
2528         spin_lock_init(&net->ksnn_lock);
2529         net->ksnn_incarnation = ksocknal_new_incarnation();
2530         ni->ni_data = net;
2531         ni->ni_maxtxcredits = *ksocknal_tunables.ksnd_credits;
2532         ni->ni_peertxcredits = *ksocknal_tunables.ksnd_peercredits;
2533
2534         if (ni->ni_interfaces[0] == NULL) {
2535                 rc = ksocknal_enumerate_interfaces(net);
2536                 if (rc <= 0)
2537                         goto fail_1;
2538
2539                 net->ksnn_ninterfaces = 1;
2540         } else {
2541                 for (i = 0; i < LNET_MAX_INTERFACES; i++) {
2542                         int    up;
2543
2544                         if (ni->ni_interfaces[i] == NULL)
2545                                 break;
2546
2547                         rc = libcfs_ipif_query(
2548                                 ni->ni_interfaces[i], &up,
2549                                 &net->ksnn_interfaces[i].ksni_ipaddr,
2550                                 &net->ksnn_interfaces[i].ksni_netmask);
2551
2552                         if (rc != 0) {
2553                                 CERROR("Can't get interface %s info: %d\n",
2554                                        ni->ni_interfaces[i], rc);
2555                                 goto fail_1;
2556                         }
2557
2558                         if (!up) {
2559                                 CERROR("Interface %s is down\n",
2560                                        ni->ni_interfaces[i]);
2561                                 goto fail_1;
2562                         }
2563                 }
2564                 net->ksnn_ninterfaces = i;
2565         }
2566
2567         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
2568                                 net->ksnn_interfaces[0].ksni_ipaddr);
2569
2570         ksocknal_data.ksnd_nnets++;
2571
2572         return 0;
2573
2574  fail_1:
2575         LIBCFS_FREE(net, sizeof(*net));
2576  fail_0:
2577         if (ksocknal_data.ksnd_nnets == 0)
2578                 ksocknal_base_shutdown();
2579
2580         return -ENETDOWN;
2581 }
2582
2583
2584 void __exit
2585 ksocknal_module_fini (void)
2586 {
2587         lnet_unregister_lnd(&the_ksocklnd);
2588         ksocknal_lib_tunables_fini();
2589 }
2590
2591 int __init
2592 ksocknal_module_init (void)
2593 {
2594         int    rc;
2595
2596         /* check ksnr_connected/connecting field large enough */
2597         CLASSERT(SOCKLND_CONN_NTYPES <= 4);
2598
2599         rc = ksocknal_lib_tunables_init();
2600         if (rc != 0)
2601                 return rc;
2602
2603         lnet_register_lnd(&the_ksocklnd);
2604
2605         return 0;
2606 }
2607
2608 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2609 MODULE_DESCRIPTION("Kernel TCP Socket LND v2.0.0");
2610 MODULE_LICENSE("GPL");
2611
2612 cfs_module(ksocknal, "2.0.0", ksocknal_module_init, ksocknal_module_fini);