Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socklnd.h"
27
28 lnd_t the_ksocklnd = {
29         .lnd_type       = SOCKLND,
30         .lnd_startup    = ksocknal_startup,
31         .lnd_shutdown   = ksocknal_shutdown,
32         .lnd_ctl        = ksocknal_ctl,
33         .lnd_send       = ksocknal_send,
34         .lnd_recv       = ksocknal_recv,
35         .lnd_notify     = ksocknal_notify,
36         .lnd_accept     = ksocknal_accept,
37 };
38
39 ksock_nal_data_t        ksocknal_data;
40
41 ksock_interface_t *
42 ksocknal_ip2iface(lnet_ni_t *ni, __u32 ip)
43 {
44         ksock_net_t       *net = ni->ni_data;
45         int                i;
46         ksock_interface_t *iface;
47
48         for (i = 0; i < net->ksnn_ninterfaces; i++) {
49                 LASSERT(i < LNET_MAX_INTERFACES);
50                 iface = &net->ksnn_interfaces[i];
51
52                 if (iface->ksni_ipaddr == ip)
53                         return (iface);
54         }
55
56         return (NULL);
57 }
58
59 ksock_route_t *
60 ksocknal_create_route (__u32 ipaddr, int port)
61 {
62         ksock_route_t *route;
63
64         LIBCFS_ALLOC (route, sizeof (*route));
65         if (route == NULL)
66                 return (NULL);
67
68         atomic_set (&route->ksnr_refcount, 1);
69         route->ksnr_peer = NULL;
70         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
71         route->ksnr_ipaddr = ipaddr;
72         route->ksnr_port = port;
73         route->ksnr_scheduled = 0;
74         route->ksnr_connecting = 0;
75         route->ksnr_connected = 0;
76         route->ksnr_deleted = 0;
77         route->ksnr_conn_count = 0;
78         route->ksnr_share_count = 0;
79
80         return (route);
81 }
82
83 void
84 ksocknal_destroy_route (ksock_route_t *route)
85 {
86         LASSERT (atomic_read(&route->ksnr_refcount) == 0);
87
88         if (route->ksnr_peer != NULL)
89                 ksocknal_peer_decref(route->ksnr_peer);
90
91         LIBCFS_FREE (route, sizeof (*route));
92 }
93
94 int
95 ksocknal_create_peer (ksock_peer_t **peerp, lnet_ni_t *ni, lnet_process_id_t id)
96 {
97         ksock_net_t   *net = ni->ni_data;
98         ksock_peer_t  *peer;
99
100         LASSERT (id.nid != LNET_NID_ANY);
101         LASSERT (id.pid != LNET_PID_ANY);
102         LASSERT (!in_interrupt());
103
104         LIBCFS_ALLOC (peer, sizeof (*peer));
105         if (peer == NULL)
106                 return -ENOMEM;
107
108         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
109
110         peer->ksnp_ni = ni;
111         peer->ksnp_id = id;
112         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
113         peer->ksnp_closing = 0;
114         peer->ksnp_accepting = 0;
115         peer->ksnp_zc_next_cookie = 1;
116         peer->ksnp_proto = NULL;
117         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
118         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
119         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
120         CFS_INIT_LIST_HEAD (&peer->ksnp_zc_req_list);
121         spin_lock_init(&peer->ksnp_lock);
122
123         spin_lock_bh (&net->ksnn_lock);
124
125         if (net->ksnn_shutdown) {
126                 spin_unlock_bh (&net->ksnn_lock);
127
128                 LIBCFS_FREE(peer, sizeof(*peer));
129                 CERROR("Can't create peer: network shutdown\n");
130                 return -ESHUTDOWN;
131         }
132
133         net->ksnn_npeers++;
134
135         spin_unlock_bh (&net->ksnn_lock);
136
137         *peerp = peer;
138         return 0;
139 }
140
141 void
142 ksocknal_destroy_peer (ksock_peer_t *peer)
143 {
144         ksock_net_t    *net = peer->ksnp_ni->ni_data;
145
146         CDEBUG (D_NET, "peer %s %p deleted\n",
147                 libcfs_id2str(peer->ksnp_id), peer);
148
149         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
150         LASSERT (peer->ksnp_accepting == 0);
151         LASSERT (list_empty (&peer->ksnp_conns));
152         LASSERT (list_empty (&peer->ksnp_routes));
153         LASSERT (list_empty (&peer->ksnp_tx_queue));
154         LASSERT (list_empty (&peer->ksnp_zc_req_list));
155
156         LIBCFS_FREE (peer, sizeof (*peer));
157
158         /* NB a peer's connections and routes keep a reference on their peer
159          * until they are destroyed, so we can be assured that _all_ state to
160          * do with this peer has been cleaned up when its refcount drops to
161          * zero. */
162         spin_lock_bh (&net->ksnn_lock);
163         net->ksnn_npeers--;
164         spin_unlock_bh (&net->ksnn_lock);
165 }
166
167 ksock_peer_t *
168 ksocknal_find_peer_locked (lnet_ni_t *ni, lnet_process_id_t id)
169 {
170         struct list_head *peer_list = ksocknal_nid2peerlist(id.nid);
171         struct list_head *tmp;
172         ksock_peer_t     *peer;
173
174         list_for_each (tmp, peer_list) {
175
176                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
177
178                 LASSERT (!peer->ksnp_closing);
179
180                 if (peer->ksnp_ni != ni)
181                         continue;
182
183                 if (peer->ksnp_id.nid != id.nid ||
184                     peer->ksnp_id.pid != id.pid)
185                         continue;
186
187                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
188                        peer, libcfs_id2str(id),
189                        atomic_read(&peer->ksnp_refcount));
190                 return (peer);
191         }
192         return (NULL);
193 }
194
195 ksock_peer_t *
196 ksocknal_find_peer (lnet_ni_t *ni, lnet_process_id_t id)
197 {
198         ksock_peer_t     *peer;
199
200         read_lock (&ksocknal_data.ksnd_global_lock);
201         peer = ksocknal_find_peer_locked (ni, id);
202         if (peer != NULL)                       /* +1 ref for caller? */
203                 ksocknal_peer_addref(peer);
204         read_unlock (&ksocknal_data.ksnd_global_lock);
205
206         return (peer);
207 }
208
209 void
210 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
211 {
212         int                i;
213         __u32              ip;
214         ksock_interface_t *iface;
215
216         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
217                 LASSERT (i < LNET_MAX_INTERFACES);
218                 ip = peer->ksnp_passive_ips[i];
219
220                 iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
221                 /* All IPs in peer->ksnp_passive_ips[] come from the
222                  * interface list, therefore the call must succeed. */
223                 LASSERT (iface != NULL);
224
225                 CDEBUG(D_NET, "peer=%p iface=%p ksni_nroutes=%d\n",
226                        peer, iface, iface->ksni_nroutes);
227                 iface->ksni_npeers--;
228         }
229
230         LASSERT (list_empty(&peer->ksnp_conns));
231         LASSERT (list_empty(&peer->ksnp_routes));
232         LASSERT (!peer->ksnp_closing);
233         peer->ksnp_closing = 1;
234         list_del (&peer->ksnp_list);
235         /* lose peerlist's ref */
236         ksocknal_peer_decref(peer);
237 }
238
239 int
240 ksocknal_get_peer_info (lnet_ni_t *ni, int index,
241                         lnet_process_id_t *id, __u32 *myip, __u32 *peer_ip, int *port,
242                         int *conn_count, int *share_count)
243 {
244         ksock_peer_t      *peer;
245         struct list_head  *ptmp;
246         ksock_route_t     *route;
247         struct list_head  *rtmp;
248         int                i;
249         int                j;
250         int                rc = -ENOENT;
251
252         read_lock (&ksocknal_data.ksnd_global_lock);
253
254         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
255
256                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
257                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
258
259                         if (peer->ksnp_ni != ni)
260                                 continue;
261
262                         if (peer->ksnp_n_passive_ips == 0 &&
263                             list_empty(&peer->ksnp_routes)) {
264                                 if (index-- > 0)
265                                         continue;
266
267                                 *id = peer->ksnp_id;
268                                 *myip = 0;
269                                 *peer_ip = 0;
270                                 *port = 0;
271                                 *conn_count = 0;
272                                 *share_count = 0;
273                                 rc = 0;
274                                 goto out;
275                         }
276
277                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
278                                 if (index-- > 0)
279                                         continue;
280
281                                 *id = peer->ksnp_id;
282                                 *myip = peer->ksnp_passive_ips[j];
283                                 *peer_ip = 0;
284                                 *port = 0;
285                                 *conn_count = 0;
286                                 *share_count = 0;
287                                 rc = 0;
288                                 goto out;
289                         }
290
291                         list_for_each (rtmp, &peer->ksnp_routes) {
292                                 if (index-- > 0)
293                                         continue;
294
295                                 route = list_entry(rtmp, ksock_route_t,
296                                                    ksnr_list);
297
298                                 *id = peer->ksnp_id;
299                                 *myip = route->ksnr_myipaddr;
300                                 *peer_ip = route->ksnr_ipaddr;
301                                 *port = route->ksnr_port;
302                                 *conn_count = route->ksnr_conn_count;
303                                 *share_count = route->ksnr_share_count;
304                                 rc = 0;
305                                 goto out;
306                         }
307                 }
308         }
309  out:
310         read_unlock (&ksocknal_data.ksnd_global_lock);
311         return (rc);
312 }
313
314 void
315 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
316 {
317         ksock_peer_t      *peer = route->ksnr_peer;
318         int                type = conn->ksnc_type;
319         ksock_interface_t *iface;
320
321         conn->ksnc_route = route;
322         ksocknal_route_addref(route);
323
324         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
325                 if (route->ksnr_myipaddr == 0) {
326                         /* route wasn't bound locally yet (the initial route) */
327                         CDEBUG(D_NET, "Binding %s %u.%u.%u.%u to %u.%u.%u.%u\n",
328                                libcfs_id2str(peer->ksnp_id),
329                                HIPQUAD(route->ksnr_ipaddr),
330                                HIPQUAD(conn->ksnc_myipaddr));
331                 } else {
332                         CDEBUG(D_NET, "Rebinding %s %u.%u.%u.%u from "
333                                "%u.%u.%u.%u to %u.%u.%u.%u\n",
334                                libcfs_id2str(peer->ksnp_id),
335                                HIPQUAD(route->ksnr_ipaddr),
336                                HIPQUAD(route->ksnr_myipaddr),
337                                HIPQUAD(conn->ksnc_myipaddr));
338
339                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
340                                                   route->ksnr_myipaddr);
341                         if (iface != NULL)
342                                 iface->ksni_nroutes--;
343                 }
344                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
345                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
346                                           route->ksnr_myipaddr);
347                 if (iface != NULL)
348                         iface->ksni_nroutes++;
349         }
350
351         route->ksnr_connected |= (1<<type);
352         route->ksnr_conn_count++;
353
354         /* Successful connection => further attempts can
355          * proceed immediately */
356         route->ksnr_retry_interval = 0;
357 }
358
359 void
360 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
361 {
362         struct list_head  *tmp;
363         ksock_conn_t      *conn;
364         ksock_route_t     *route2;
365
366         LASSERT (!peer->ksnp_closing);
367         LASSERT (route->ksnr_peer == NULL);
368         LASSERT (!route->ksnr_scheduled);
369         LASSERT (!route->ksnr_connecting);
370         LASSERT (route->ksnr_connected == 0);
371
372         /* LASSERT(unique) */
373         list_for_each(tmp, &peer->ksnp_routes) {
374                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
375
376                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
377                         CERROR ("Duplicate route %s %u.%u.%u.%u\n",
378                                 libcfs_id2str(peer->ksnp_id),
379                                 HIPQUAD(route->ksnr_ipaddr));
380                         LBUG();
381                 }
382         }
383
384         route->ksnr_peer = peer;
385         ksocknal_peer_addref(peer);
386         /* peer's routelist takes over my ref on 'route' */
387         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
388
389         list_for_each(tmp, &peer->ksnp_conns) {
390                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
391
392                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
393                         continue;
394
395                 ksocknal_associate_route_conn_locked(route, conn);
396                 /* keep going (typed routes) */
397         }
398 }
399
400 void
401 ksocknal_del_route_locked (ksock_route_t *route)
402 {
403         ksock_peer_t      *peer = route->ksnr_peer;
404         ksock_interface_t *iface;
405         ksock_conn_t      *conn;
406         struct list_head  *ctmp;
407         struct list_head  *cnxt;
408
409         LASSERT (!route->ksnr_deleted);
410
411         /* Close associated conns */
412         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
413                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
414
415                 if (conn->ksnc_route != route)
416                         continue;
417
418                 ksocknal_close_conn_locked (conn, 0);
419         }
420
421         if (route->ksnr_myipaddr != 0) {
422                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
423                                           route->ksnr_myipaddr);
424                 if (iface != NULL)
425                         iface->ksni_nroutes--;
426         }
427
428         route->ksnr_deleted = 1;
429         list_del (&route->ksnr_list);
430         ksocknal_route_decref(route);             /* drop peer's ref */
431
432         if (list_empty (&peer->ksnp_routes) &&
433             list_empty (&peer->ksnp_conns)) {
434                 /* I've just removed the last route to a peer with no active
435                  * connections */
436                 ksocknal_unlink_peer_locked (peer);
437         }
438 }
439
440 int
441 ksocknal_add_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ipaddr, int port)
442 {
443         struct list_head  *tmp;
444         ksock_peer_t      *peer;
445         ksock_peer_t      *peer2;
446         ksock_route_t     *route;
447         ksock_route_t     *route2;
448         int                rc;
449
450         if (id.nid == LNET_NID_ANY ||
451             id.pid == LNET_PID_ANY)
452                 return (-EINVAL);
453
454         /* Have a brand new peer ready... */
455         rc = ksocknal_create_peer(&peer, ni, id);
456         if (rc != 0)
457                 return rc;
458
459         route = ksocknal_create_route (ipaddr, port);
460         if (route == NULL) {
461                 ksocknal_peer_decref(peer);
462                 return (-ENOMEM);
463         }
464
465         write_lock_bh (&ksocknal_data.ksnd_global_lock);
466
467         /* always called with a ref on ni, so shutdown can't have started */
468         LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
469
470         peer2 = ksocknal_find_peer_locked (ni, id);
471         if (peer2 != NULL) {
472                 ksocknal_peer_decref(peer);
473                 peer = peer2;
474         } else {
475                 /* peer table takes my ref on peer */
476                 list_add_tail (&peer->ksnp_list,
477                                ksocknal_nid2peerlist (id.nid));
478         }
479
480         route2 = NULL;
481         list_for_each (tmp, &peer->ksnp_routes) {
482                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
483
484                 if (route2->ksnr_ipaddr == ipaddr)
485                         break;
486
487                 route2 = NULL;
488         }
489         if (route2 == NULL) {
490                 ksocknal_add_route_locked(peer, route);
491                 route->ksnr_share_count++;
492         } else {
493                 ksocknal_route_decref(route);
494                 route2->ksnr_share_count++;
495         }
496
497         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
498
499         return (0);
500 }
501
502 void
503 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip)
504 {
505         ksock_conn_t     *conn;
506         ksock_route_t    *route;
507         struct list_head *tmp;
508         struct list_head *nxt;
509         int               nshared;
510
511         LASSERT (!peer->ksnp_closing);
512
513         /* Extra ref prevents peer disappearing until I'm done with it */
514         ksocknal_peer_addref(peer);
515
516         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
517                 route = list_entry(tmp, ksock_route_t, ksnr_list);
518
519                 /* no match */
520                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
521                         continue;
522
523                 route->ksnr_share_count = 0;
524                 /* This deletes associated conns too */
525                 ksocknal_del_route_locked (route);
526         }
527
528         nshared = 0;
529         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
530                 route = list_entry(tmp, ksock_route_t, ksnr_list);
531                 nshared += route->ksnr_share_count;
532         }
533
534         if (nshared == 0) {
535                 /* remove everything else if there are no explicit entries
536                  * left */
537
538                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
539                         route = list_entry(tmp, ksock_route_t, ksnr_list);
540
541                         /* we should only be removing auto-entries */
542                         LASSERT(route->ksnr_share_count == 0);
543                         ksocknal_del_route_locked (route);
544                 }
545
546                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
547                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
548
549                         ksocknal_close_conn_locked(conn, 0);
550                 }
551         }
552
553         ksocknal_peer_decref(peer);
554         /* NB peer unlinks itself when last conn/route is removed */
555 }
556
557 int
558 ksocknal_del_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ip)
559 {
560         CFS_LIST_HEAD     (zombies);
561         struct list_head  *ptmp;
562         struct list_head  *pnxt;
563         ksock_peer_t      *peer;
564         int                lo;
565         int                hi;
566         int                i;
567         int                rc = -ENOENT;
568
569         write_lock_bh (&ksocknal_data.ksnd_global_lock);
570
571         if (id.nid != LNET_NID_ANY)
572                 lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;
573         else {
574                 lo = 0;
575                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
576         }
577
578         for (i = lo; i <= hi; i++) {
579                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
580                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
581
582                         if (peer->ksnp_ni != ni)
583                                 continue;
584
585                         if (!((id.nid == LNET_NID_ANY || peer->ksnp_id.nid == id.nid) &&
586                               (id.pid == LNET_PID_ANY || peer->ksnp_id.pid == id.pid)))
587                                 continue;
588
589                         ksocknal_peer_addref(peer);     /* a ref for me... */
590
591                         ksocknal_del_peer_locked (peer, ip);
592
593                         if (peer->ksnp_closing && !list_empty(&peer->ksnp_tx_queue)) {
594                                 LASSERT (list_empty(&peer->ksnp_conns));
595                                 LASSERT (list_empty(&peer->ksnp_routes));
596
597                                 list_splice_init(&peer->ksnp_tx_queue, &zombies);
598                         }
599
600                         ksocknal_peer_decref(peer);     /* ...till here */
601
602                         rc = 0;                 /* matched! */
603                 }
604         }
605
606         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
607
608         ksocknal_txlist_done(ni, &zombies, 1);
609
610         return (rc);
611 }
612
613 ksock_conn_t *
614 ksocknal_get_conn_by_idx (lnet_ni_t *ni, int index)
615 {
616         ksock_peer_t      *peer;
617         struct list_head  *ptmp;
618         ksock_conn_t      *conn;
619         struct list_head  *ctmp;
620         int                i;
621
622         read_lock (&ksocknal_data.ksnd_global_lock);
623
624         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
625                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
626                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
627
628                         LASSERT (!peer->ksnp_closing);
629
630                         if (peer->ksnp_ni != ni)
631                                 continue;
632
633                         list_for_each (ctmp, &peer->ksnp_conns) {
634                                 if (index-- > 0)
635                                         continue;
636
637                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
638                                 ksocknal_conn_addref(conn);
639                                 read_unlock (&ksocknal_data.ksnd_global_lock);
640                                 return (conn);
641                         }
642                 }
643         }
644
645         read_unlock (&ksocknal_data.ksnd_global_lock);
646         return (NULL);
647 }
648
649 ksock_sched_t *
650 ksocknal_choose_scheduler_locked (unsigned int irq)
651 {
652         ksock_sched_t    *sched;
653         ksock_irqinfo_t  *info;
654         int               i;
655
656         LASSERT (irq < NR_IRQS);
657         info = &ksocknal_data.ksnd_irqinfo[irq];
658
659         if (irq != 0 &&                         /* hardware NIC */
660             info->ksni_valid) {                 /* already set up */
661                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
662         }
663
664         /* software NIC (irq == 0) || not associated with a scheduler yet.
665          * Choose the CPU with the fewest connections... */
666         sched = &ksocknal_data.ksnd_schedulers[0];
667         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
668                 if (sched->kss_nconns >
669                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
670                         sched = &ksocknal_data.ksnd_schedulers[i];
671
672         if (irq != 0) {                         /* Hardware NIC */
673                 info->ksni_valid = 1;
674                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
675
676                 /* no overflow... */
677                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
678         }
679
680         return (sched);
681 }
682
683 int
684 ksocknal_local_ipvec (lnet_ni_t *ni, __u32 *ipaddrs)
685 {
686         ksock_net_t       *net = ni->ni_data;
687         int                i;
688         int                nip;
689
690         read_lock (&ksocknal_data.ksnd_global_lock);
691
692         nip = net->ksnn_ninterfaces;
693         LASSERT (nip <= LNET_MAX_INTERFACES);
694
695         /* Only offer interfaces for additional connections if I have 
696          * more than one. */
697         if (nip < 2) {
698                 read_unlock (&ksocknal_data.ksnd_global_lock);
699                 return 0;
700         }
701
702         for (i = 0; i < nip; i++) {
703                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
704                 LASSERT (ipaddrs[i] != 0);
705         }
706
707         read_unlock (&ksocknal_data.ksnd_global_lock);
708         return (nip);
709 }
710
711 int
712 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
713 {
714         int   best_netmatch = 0;
715         int   best_xor      = 0;
716         int   best          = -1;
717         int   this_xor;
718         int   this_netmatch;
719         int   i;
720
721         for (i = 0; i < nips; i++) {
722                 if (ips[i] == 0)
723                         continue;
724
725                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
726                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
727
728                 if (!(best < 0 ||
729                       best_netmatch < this_netmatch ||
730                       (best_netmatch == this_netmatch &&
731                        best_xor > this_xor)))
732                         continue;
733
734                 best = i;
735                 best_netmatch = this_netmatch;
736                 best_xor = this_xor;
737         }
738
739         LASSERT (best >= 0);
740         return (best);
741 }
742
743 int
744 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
745 {
746         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
747         ksock_net_t        *net = peer->ksnp_ni->ni_data;
748         ksock_interface_t  *iface;
749         ksock_interface_t  *best_iface;
750         int                 n_ips;
751         int                 i;
752         int                 j;
753         int                 k;
754         __u32               ip;
755         __u32               xor;
756         int                 this_netmatch;
757         int                 best_netmatch;
758         int                 best_npeers;
759
760         /* CAVEAT EMPTOR: We do all our interface matching with an
761          * exclusive hold of global lock at IRQ priority.  We're only
762          * expecting to be dealing with small numbers of interfaces, so the
763          * O(n**3)-ness shouldn't matter */
764
765         /* Also note that I'm not going to return more than n_peerips
766          * interfaces, even if I have more myself */
767
768         write_lock_bh (global_lock);
769
770         LASSERT (n_peerips <= LNET_MAX_INTERFACES);
771         LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
772
773         /* Only match interfaces for additional connections 
774          * if I have > 1 interface */
775         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
776                 MIN(n_peerips, net->ksnn_ninterfaces);
777
778         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
779                 /*              ^ yes really... */
780
781                 /* If we have any new interfaces, first tick off all the
782                  * peer IPs that match old interfaces, then choose new
783                  * interfaces to match the remaining peer IPS.
784                  * We don't forget interfaces we've stopped using; we might
785                  * start using them again... */
786
787                 if (i < peer->ksnp_n_passive_ips) {
788                         /* Old interface. */
789                         ip = peer->ksnp_passive_ips[i];
790                         best_iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
791
792                         /* peer passive ips are kept up to date */
793                         LASSERT(best_iface != NULL);
794                 } else {
795                         /* choose a new interface */
796                         LASSERT (i == peer->ksnp_n_passive_ips);
797
798                         best_iface = NULL;
799                         best_netmatch = 0;
800                         best_npeers = 0;
801
802                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
803                                 iface = &net->ksnn_interfaces[j];
804                                 ip = iface->ksni_ipaddr;
805
806                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
807                                         if (peer->ksnp_passive_ips[k] == ip)
808                                                 break;
809
810                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
811                                         continue;
812
813                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
814                                 xor = (ip ^ peerips[k]);
815                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
816
817                                 if (!(best_iface == NULL ||
818                                       best_netmatch < this_netmatch ||
819                                       (best_netmatch == this_netmatch &&
820                                        best_npeers > iface->ksni_npeers)))
821                                         continue;
822
823                                 best_iface = iface;
824                                 best_netmatch = this_netmatch;
825                                 best_npeers = iface->ksni_npeers;
826                         }
827
828                         best_iface->ksni_npeers++;
829                         ip = best_iface->ksni_ipaddr;
830                         peer->ksnp_passive_ips[i] = ip;
831                         peer->ksnp_n_passive_ips = i+1;
832                 }
833
834                 LASSERT (best_iface != NULL);
835
836                 /* mark the best matching peer IP used */
837                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
838                 peerips[j] = 0;
839         }
840
841         /* Overwrite input peer IP addresses */
842         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
843
844         write_unlock_bh (global_lock);
845
846         return (n_ips);
847 }
848
849 void
850 ksocknal_create_routes(ksock_peer_t *peer, int port,
851                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
852 {
853         ksock_route_t      *newroute = NULL;
854         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
855         lnet_ni_t          *ni = peer->ksnp_ni;
856         ksock_net_t        *net = ni->ni_data;
857         struct list_head   *rtmp;
858         ksock_route_t      *route;
859         ksock_interface_t  *iface;
860         ksock_interface_t  *best_iface;
861         int                 best_netmatch;
862         int                 this_netmatch;
863         int                 best_nroutes;
864         int                 i;
865         int                 j;
866
867         /* CAVEAT EMPTOR: We do all our interface matching with an
868          * exclusive hold of global lock at IRQ priority.  We're only
869          * expecting to be dealing with small numbers of interfaces, so the
870          * O(n**3)-ness here shouldn't matter */
871
872         write_lock_bh (global_lock);
873
874         if (net->ksnn_ninterfaces < 2) {
875                 /* Only create additional connections 
876                  * if I have > 1 interface */
877                 write_unlock_bh (global_lock);
878                 return;
879         }
880
881         LASSERT (npeer_ipaddrs <= LNET_MAX_INTERFACES);
882
883         for (i = 0; i < npeer_ipaddrs; i++) {
884                 if (newroute != NULL) {
885                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
886                 } else {
887                         write_unlock_bh (global_lock);
888
889                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
890                         if (newroute == NULL)
891                                 return;
892
893                         write_lock_bh (global_lock);
894                 }
895
896                 if (peer->ksnp_closing) {
897                         /* peer got closed under me */
898                         break;
899                 }
900
901                 /* Already got a route? */
902                 route = NULL;
903                 list_for_each(rtmp, &peer->ksnp_routes) {
904                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
905
906                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
907                                 break;
908
909                         route = NULL;
910                 }
911                 if (route != NULL)
912                         continue;
913
914                 best_iface = NULL;
915                 best_nroutes = 0;
916                 best_netmatch = 0;
917
918                 LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
919
920                 /* Select interface to connect from */
921                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
922                         iface = &net->ksnn_interfaces[j];
923
924                         /* Using this interface already? */
925                         list_for_each(rtmp, &peer->ksnp_routes) {
926                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
927
928                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
929                                         break;
930
931                                 route = NULL;
932                         }
933                         if (route != NULL)
934                                 continue;
935
936                         this_netmatch = (((iface->ksni_ipaddr ^
937                                            newroute->ksnr_ipaddr) &
938                                            iface->ksni_netmask) == 0) ? 1 : 0;
939
940                         if (!(best_iface == NULL ||
941                               best_netmatch < this_netmatch ||
942                               (best_netmatch == this_netmatch &&
943                                best_nroutes > iface->ksni_nroutes)))
944                                 continue;
945
946                         best_iface = iface;
947                         best_netmatch = this_netmatch;
948                         best_nroutes = iface->ksni_nroutes;
949                 }
950
951                 if (best_iface == NULL)
952                         continue;
953
954                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
955                 best_iface->ksni_nroutes++;
956
957                 ksocknal_add_route_locked(peer, newroute);
958                 newroute = NULL;
959         }
960
961         write_unlock_bh (global_lock);
962         if (newroute != NULL)
963                 ksocknal_route_decref(newroute);
964 }
965
966 int
967 ksocknal_accept (lnet_ni_t *ni, cfs_socket_t *sock)
968 {
969         ksock_connreq_t    *cr;
970         int                 rc;
971         __u32               peer_ip;
972         int                 peer_port;
973
974         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
975         LASSERT (rc == 0);                      /* we succeeded before */
976
977         LIBCFS_ALLOC(cr, sizeof(*cr));
978         if (cr == NULL) {
979                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
980                                    "%u.%u.%u.%u: memory exhausted\n",
981                                    HIPQUAD(peer_ip));
982                 return -ENOMEM;
983         }
984
985         lnet_ni_addref(ni);
986         cr->ksncr_ni   = ni;
987         cr->ksncr_sock = sock;
988
989         spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
990
991         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
992         cfs_waitq_signal(&ksocknal_data.ksnd_connd_waitq);
993
994         spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
995         return 0;
996 }
997
998 int
999 ksocknal_connecting (ksock_peer_t *peer, __u32 ipaddr)
1000 {
1001         ksock_route_t   *route;
1002
1003         list_for_each_entry (route, &peer->ksnp_routes, ksnr_list) {
1004
1005                 if (route->ksnr_ipaddr == ipaddr)
1006                         return route->ksnr_connecting;
1007         }
1008         return 0;
1009 }
1010
1011 int
1012 ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
1013                       cfs_socket_t *sock, int type)
1014 {
1015         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
1016         CFS_LIST_HEAD     (zombies);
1017         lnet_process_id_t  peerid;
1018         struct list_head  *tmp;
1019         __u64              incarnation;
1020         ksock_conn_t      *conn;
1021         ksock_conn_t      *conn2;
1022         ksock_peer_t      *peer = NULL;
1023         ksock_peer_t      *peer2;
1024         ksock_sched_t     *sched;
1025         ksock_hello_msg_t *hello;
1026         unsigned int       irq;
1027         ksock_tx_t        *tx;
1028         int                rc;
1029         int                active;
1030         char              *warn = NULL;
1031
1032         active = (route != NULL);
1033
1034         LASSERT (active == (type != SOCKLND_CONN_NONE));
1035
1036         irq = ksocknal_lib_sock_irq (sock);
1037
1038         LIBCFS_ALLOC(conn, sizeof(*conn));
1039         if (conn == NULL) {
1040                 rc = -ENOMEM;
1041                 goto failed_0;
1042         }
1043
1044         memset (conn, 0, sizeof (*conn));
1045         conn->ksnc_peer = NULL;
1046         conn->ksnc_route = NULL;
1047         conn->ksnc_sock = sock;
1048         /* 2 ref, 1 for conn, another extra ref prevents socket
1049          * being closed before establishment of connection */
1050         atomic_set (&conn->ksnc_sock_refcount, 2);
1051         conn->ksnc_type = type;
1052         ksocknal_lib_save_callback(sock, conn);
1053         atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1054
1055         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(sock);
1056         conn->ksnc_rx_ready = 0;
1057         conn->ksnc_rx_scheduled = 0;
1058
1059         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
1060         conn->ksnc_tx_ready = 0;
1061         conn->ksnc_tx_scheduled = 0;
1062         conn->ksnc_tx_mono = NULL;
1063         atomic_set (&conn->ksnc_tx_nob, 0);
1064
1065         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
1066                                      kshm_ips[LNET_MAX_INTERFACES]));
1067         if (hello == NULL) {
1068                 rc = -ENOMEM;
1069                 goto failed_1;
1070         }
1071
1072         /* stash conn's local and remote addrs */
1073         rc = ksocknal_lib_get_conn_addrs (conn);
1074         if (rc != 0)
1075                 goto failed_1;
1076
1077         /* Find out/confirm peer's NID and connection type and get the
1078          * vector of interfaces she's willing to let me connect to.
1079          * Passive connections use the listener timeout since the peer sends
1080          * eagerly */
1081
1082         if (active) {
1083                 peer = route->ksnr_peer;
1084                 LASSERT(ni == peer->ksnp_ni);
1085
1086                 /* Active connection sends HELLO eagerly */
1087                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1088                 peerid = peer->ksnp_id;
1089
1090                 write_lock_bh(global_lock);
1091                 conn->ksnc_proto = peer->ksnp_proto;
1092                 write_unlock_bh(global_lock);
1093
1094                 if (conn->ksnc_proto == NULL) {
1095                         conn->ksnc_proto = &ksocknal_protocol_v2x;
1096 #if SOCKNAL_VERSION_DEBUG
1097                         if (*ksocknal_tunables.ksnd_protocol != 2)
1098                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
1099 #endif
1100                 }
1101
1102                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1103                 if (rc != 0)
1104                         goto failed_1;
1105         } else {
1106                 peerid.nid = LNET_NID_ANY;
1107                 peerid.pid = LNET_PID_ANY;
1108
1109                 /* Passive, get protocol from peer */
1110                 conn->ksnc_proto = NULL;
1111         }
1112
1113         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1114         if (rc < 0)
1115                 goto failed_1;
1116
1117         LASSERT (rc == 0 || active);
1118         LASSERT (conn->ksnc_proto != NULL);
1119         LASSERT (peerid.nid != LNET_NID_ANY);
1120
1121         if (active) {
1122                 ksocknal_peer_addref(peer);
1123                 write_lock_bh (global_lock);
1124         } else {
1125                 rc = ksocknal_create_peer(&peer, ni, peerid);
1126                 if (rc != 0)
1127                         goto failed_1;
1128
1129                 write_lock_bh (global_lock);
1130
1131                 /* called with a ref on ni, so shutdown can't have started */
1132                 LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
1133
1134                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1135                 if (peer2 == NULL) {
1136                         /* NB this puts an "empty" peer in the peer
1137                          * table (which takes my ref) */
1138                         list_add_tail(&peer->ksnp_list,
1139                                       ksocknal_nid2peerlist(peerid.nid));
1140                 } else {
1141                         ksocknal_peer_decref(peer);
1142                         peer = peer2;
1143                 }
1144
1145                 /* +1 ref for me */
1146                 ksocknal_peer_addref(peer);
1147                 peer->ksnp_accepting++;
1148
1149                 /* Am I already connecting to this guy?  Resolve in
1150                  * favour of higher NID... */
1151                 if (peerid.nid < ni->ni_nid &&
1152                     ksocknal_connecting(peer, conn->ksnc_ipaddr)) {
1153                         rc = EALREADY;
1154                         warn = "connection race resolution";
1155                         goto failed_2;
1156                 }
1157         }
1158
1159         if (peer->ksnp_closing ||
1160             (active && route->ksnr_deleted)) {
1161                 /* peer/route got closed under me */
1162                 rc = -ESTALE;
1163                 warn = "peer/route removed";
1164                 goto failed_2;
1165         }
1166
1167         if (peer->ksnp_proto == NULL) {
1168                 /* Never connected before.
1169                  * NB recv_hello may have returned EPROTO to signal my peer
1170                  * wants a different protocol than the one I asked for.
1171                  */
1172                 LASSERT (list_empty(&peer->ksnp_conns));
1173
1174                 peer->ksnp_proto = conn->ksnc_proto;
1175                 peer->ksnp_incarnation = incarnation;
1176         }
1177
1178         if (peer->ksnp_proto != conn->ksnc_proto ||
1179             peer->ksnp_incarnation != incarnation) {
1180                 /* Peer rebooted or I've got the wrong protocol version */
1181                 ksocknal_close_peer_conns_locked(peer, 0, 0);
1182
1183                 peer->ksnp_proto = NULL;
1184                 rc = ESTALE;
1185                 warn = peer->ksnp_incarnation != incarnation ?
1186                        "peer rebooted" :
1187                        "wrong proto version";
1188                 goto failed_2;
1189         }
1190
1191         switch (rc) {
1192         default:
1193                 LBUG();
1194         case 0:
1195                 break;
1196         case EALREADY:
1197                 warn = "lost conn race";
1198                 goto failed_2;
1199         case EPROTO:
1200                 warn = "retry with different protocol version";
1201                 goto failed_2;
1202         }
1203
1204         /* Refuse to duplicate an existing connection, unless this is a
1205          * loopback connection */
1206         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1207                 list_for_each(tmp, &peer->ksnp_conns) {
1208                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1209
1210                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1211                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1212                             conn2->ksnc_type != conn->ksnc_type)
1213                                 continue;
1214
1215                         /* Reply on a passive connection attempt so the peer
1216                          * realises we're connected. */
1217                         LASSERT (rc == 0);
1218                         if (!active)
1219                                 rc = EALREADY;
1220
1221                         warn = "duplicate";
1222                         goto failed_2;
1223                 }
1224         }
1225
1226         /* If the connection created by this route didn't bind to the IP
1227          * address the route connected to, the connection/route matching
1228          * code below probably isn't going to work. */
1229         if (active &&
1230             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1231                 CERROR("Route %s %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1232                        libcfs_id2str(peer->ksnp_id),
1233                        HIPQUAD(route->ksnr_ipaddr),
1234                        HIPQUAD(conn->ksnc_ipaddr));
1235         }
1236
1237         /* Search for a route corresponding to the new connection and
1238          * create an association.  This allows incoming connections created
1239          * by routes in my peer to match my own route entries so I don't
1240          * continually create duplicate routes. */
1241         list_for_each (tmp, &peer->ksnp_routes) {
1242                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1243
1244                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1245                         continue;
1246
1247                 ksocknal_associate_route_conn_locked(route, conn);
1248                 break;
1249         }
1250
1251         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1252         peer->ksnp_last_alive = cfs_time_current();
1253         peer->ksnp_error = 0;
1254
1255         sched = ksocknal_choose_scheduler_locked (irq);
1256         sched->kss_nconns++;
1257         conn->ksnc_scheduler = sched;
1258
1259         /* Set the deadline for the outgoing HELLO to drain */
1260         conn->ksnc_tx_bufnob = SOCK_WMEM_QUEUED(sock);
1261         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1262         mb();       /* order with adding to peer's conn list */
1263
1264         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1265         ksocknal_conn_addref(conn);
1266
1267         ksocknal_new_packet(conn, 0);
1268
1269         /* Take all the packets blocking for a connection.
1270          * NB, it might be nicer to share these blocked packets among any
1271          * other connections that are becoming established. */
1272         while (!list_empty (&peer->ksnp_tx_queue)) {
1273                 tx = list_entry (peer->ksnp_tx_queue.next,
1274                                  ksock_tx_t, tx_list);
1275
1276                 list_del (&tx->tx_list);
1277                 ksocknal_queue_tx_locked (tx, conn);
1278         }
1279
1280         write_unlock_bh (global_lock);
1281
1282         /* We've now got a new connection.  Any errors from here on are just
1283          * like "normal" comms errors and we close the connection normally.
1284          * NB (a) we still have to send the reply HELLO for passive
1285          *        connections, 
1286          *    (b) normal I/O on the conn is blocked until I setup and call the
1287          *        socket callbacks.
1288          */
1289
1290         ksocknal_lib_bind_irq (irq);
1291
1292         CDEBUG(D_NET, "New conn %s p %d.x %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1293                " incarnation:"LPD64" sched[%d]/%d\n",
1294                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1295                HIPQUAD(conn->ksnc_myipaddr), HIPQUAD(conn->ksnc_ipaddr),
1296                conn->ksnc_port, incarnation,
1297                (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1298
1299         if (active) {
1300                 /* additional routes after interface exchange? */
1301                 ksocknal_create_routes(peer, conn->ksnc_port,
1302                                        hello->kshm_ips, hello->kshm_nips);
1303         } else {
1304                 hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
1305                                                        hello->kshm_nips);
1306                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1307         }
1308
1309         LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1310                                     kshm_ips[LNET_MAX_INTERFACES]));
1311
1312         /* setup the socket AFTER I've received hello (it disables
1313          * SO_LINGER).  I might call back to the acceptor who may want
1314          * to send a protocol version response and then close the
1315          * socket; this ensures the socket only tears down after the
1316          * response has been sent. */
1317         if (rc == 0)
1318                 rc = ksocknal_lib_setup_sock(sock);
1319
1320         write_lock_bh(global_lock);
1321
1322         /* NB my callbacks block while I hold ksnd_global_lock */
1323         ksocknal_lib_set_callback(sock, conn);
1324
1325         if (!active)
1326                 peer->ksnp_accepting--;
1327
1328         write_unlock_bh(global_lock);
1329
1330         if (rc != 0) {
1331                 write_lock_bh(global_lock);
1332                 ksocknal_close_conn_locked(conn, rc);
1333                 write_unlock_bh(global_lock);
1334         } else if (ksocknal_connsock_addref(conn) == 0) {
1335                 /* Allow I/O to proceed. */
1336                 ksocknal_read_callback(conn);
1337                 ksocknal_write_callback(conn);
1338                 ksocknal_connsock_decref(conn);
1339         }
1340
1341         ksocknal_connsock_decref(conn);
1342         ksocknal_conn_decref(conn);
1343         return rc;
1344
1345  failed_2:
1346         if (!peer->ksnp_closing &&
1347             list_empty (&peer->ksnp_conns) &&
1348             list_empty (&peer->ksnp_routes)) {
1349                 list_add(&zombies, &peer->ksnp_tx_queue);
1350                 list_del_init(&peer->ksnp_tx_queue);
1351                 ksocknal_unlink_peer_locked(peer);
1352         }
1353
1354         write_unlock_bh (global_lock);
1355
1356         if (warn != NULL) {
1357                 if (rc < 0)
1358                         CERROR("Not creating conn %s type %d: %s\n",
1359                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1360                 else
1361                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1362                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1363         }
1364
1365         if (!active) {
1366                 if (rc > 0) {
1367                         /* Request retry by replying with CONN_NONE 
1368                          * ksnc_proto has been set already */
1369                         conn->ksnc_type = SOCKLND_CONN_NONE;
1370                         hello->kshm_nips = 0;
1371                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1372                 }
1373
1374                 write_lock_bh(global_lock);
1375                 peer->ksnp_accepting--;
1376                 write_unlock_bh(global_lock);
1377         }
1378
1379         ksocknal_txlist_done(ni, &zombies, 1);
1380         ksocknal_peer_decref(peer);
1381
1382  failed_1:
1383         if (hello != NULL)
1384                 LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1385                                             kshm_ips[LNET_MAX_INTERFACES]));
1386
1387         LIBCFS_FREE (conn, sizeof(*conn));
1388
1389  failed_0:
1390         libcfs_sock_release(sock);
1391         return rc;
1392 }
1393
1394 void
1395 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1396 {
1397         /* This just does the immmediate housekeeping, and queues the
1398          * connection for the reaper to terminate.
1399          * Caller holds ksnd_global_lock exclusively in irq context */
1400         ksock_peer_t      *peer = conn->ksnc_peer;
1401         ksock_route_t     *route;
1402         ksock_conn_t      *conn2;
1403         struct list_head  *tmp;
1404
1405         LASSERT (peer->ksnp_error == 0);
1406         LASSERT (!conn->ksnc_closing);
1407         conn->ksnc_closing = 1;
1408
1409         /* ksnd_deathrow_conns takes over peer's ref */
1410         list_del (&conn->ksnc_list);
1411
1412         route = conn->ksnc_route;
1413         if (route != NULL) {
1414                 /* dissociate conn from route... */
1415                 LASSERT (!route->ksnr_deleted);
1416                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1417
1418                 conn2 = NULL;
1419                 list_for_each(tmp, &peer->ksnp_conns) {
1420                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1421
1422                         if (conn2->ksnc_route == route &&
1423                             conn2->ksnc_type == conn->ksnc_type)
1424                                 break;
1425
1426                         conn2 = NULL;
1427                 }
1428                 if (conn2 == NULL)
1429                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1430
1431                 conn->ksnc_route = NULL;
1432
1433 #if 0           /* irrelevent with only eager routes */
1434                 list_del (&route->ksnr_list);   /* make route least favourite */
1435                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1436 #endif
1437                 ksocknal_route_decref(route);     /* drop conn's ref on route */
1438         }
1439
1440         if (list_empty (&peer->ksnp_conns)) {
1441                 /* No more connections to this peer */
1442
1443                 peer->ksnp_proto = NULL;        /* renegotiate protocol version */
1444                 peer->ksnp_error = error;       /* stash last conn close reason */
1445
1446                 if (list_empty (&peer->ksnp_routes)) {
1447                         /* I've just closed last conn belonging to a
1448                          * peer with no routes to it */
1449                         ksocknal_unlink_peer_locked (peer);
1450                 }
1451         }
1452
1453         spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1454
1455         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1456         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1457
1458         spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1459 }
1460
1461 void
1462 ksocknal_peer_failed (ksock_peer_t *peer)
1463 {
1464         time_t    last_alive = 0;
1465         int       notify = 0;
1466
1467         /* There has been a connection failure or comms error; but I'll only
1468          * tell LNET I think the peer is dead if it's to another kernel and
1469          * there are no connections or connection attempts in existance. */
1470
1471         read_lock (&ksocknal_data.ksnd_global_lock);
1472
1473         if ((peer->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1474             list_empty(&peer->ksnp_conns) &&
1475             peer->ksnp_accepting == 0 &&
1476             ksocknal_find_connecting_route_locked(peer) == NULL) {
1477                 notify = 1;
1478                 last_alive = cfs_time_current_sec() -
1479                         cfs_duration_sec(cfs_time_current() -
1480                                          peer->ksnp_last_alive);
1481         }
1482
1483         read_unlock (&ksocknal_data.ksnd_global_lock);
1484
1485         if (notify)
1486                 lnet_notify (peer->ksnp_ni, peer->ksnp_id.nid, 0,
1487                              last_alive);
1488 }
1489
1490 void
1491 ksocknal_terminate_conn (ksock_conn_t *conn)
1492 {
1493         /* This gets called by the reaper (guaranteed thread context) to
1494          * disengage the socket from its callbacks and close it.
1495          * ksnc_refcount will eventually hit zero, and then the reaper will
1496          * destroy it. */
1497         ksock_peer_t     *peer = conn->ksnc_peer;
1498         ksock_sched_t    *sched = conn->ksnc_scheduler;
1499         int               failed = 0;
1500         struct list_head *tmp;
1501         struct list_head *nxt;
1502         ksock_tx_t       *tx;
1503         LIST_HEAD        (zlist);
1504
1505         LASSERT(conn->ksnc_closing);
1506
1507         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1508         spin_lock_bh (&sched->kss_lock);
1509
1510         /* a closing conn is always ready to tx */
1511         conn->ksnc_tx_ready = 1;
1512
1513         if (!conn->ksnc_tx_scheduled &&
1514             !list_empty(&conn->ksnc_tx_queue)){
1515                 list_add_tail (&conn->ksnc_tx_list,
1516                                &sched->kss_tx_conns);
1517                 conn->ksnc_tx_scheduled = 1;
1518                 /* extra ref for scheduler */
1519                 ksocknal_conn_addref(conn);
1520
1521                 cfs_waitq_signal (&sched->kss_waitq);
1522         }
1523
1524         spin_unlock_bh (&sched->kss_lock);
1525
1526         spin_lock(&peer->ksnp_lock);
1527
1528         list_for_each_safe(tmp, nxt, &peer->ksnp_zc_req_list) {
1529                 tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
1530
1531                 if (tx->tx_conn != conn)
1532                         continue;
1533
1534                 LASSERT (tx->tx_msg.ksm_zc_req_cookie != 0);
1535
1536                 tx->tx_msg.ksm_zc_req_cookie = 0;
1537                 list_del(&tx->tx_zc_list);
1538                 list_add(&tx->tx_zc_list, &zlist);
1539         }
1540
1541         spin_unlock(&peer->ksnp_lock);
1542
1543         list_for_each_safe(tmp, nxt, &zlist) {
1544                 tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
1545
1546                 list_del(&tx->tx_zc_list);
1547                 ksocknal_tx_decref(tx);
1548         }
1549
1550         /* serialise with callbacks */
1551         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1552
1553         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1554
1555         /* OK, so this conn may not be completely disengaged from its
1556          * scheduler yet, but it _has_ committed to terminate... */
1557         conn->ksnc_scheduler->kss_nconns--;
1558
1559         if (peer->ksnp_error != 0) {
1560                 /* peer's last conn closed in error */
1561                 LASSERT (list_empty (&peer->ksnp_conns));
1562                 failed = 1;
1563                 peer->ksnp_error = 0;     /* avoid multiple notifications */
1564         }
1565
1566         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1567
1568         if (failed)
1569                 ksocknal_peer_failed(peer);
1570
1571         /* The socket is closed on the final put; either here, or in
1572          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1573          * when the connection was established, this will close the socket
1574          * immediately, aborting anything buffered in it. Any hung
1575          * zero-copy transmits will therefore complete in finite time. */
1576         ksocknal_connsock_decref(conn);
1577 }
1578
1579 void
1580 ksocknal_queue_zombie_conn (ksock_conn_t *conn)
1581 {
1582         /* Queue the conn for the reaper to destroy */
1583
1584         LASSERT (atomic_read(&conn->ksnc_conn_refcount) == 0);
1585         spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1586
1587         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1588         cfs_waitq_signal(&ksocknal_data.ksnd_reaper_waitq);
1589
1590         spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1591 }
1592
1593 void
1594 ksocknal_destroy_conn (ksock_conn_t *conn)
1595 {
1596         /* Final coup-de-grace of the reaper */
1597         CDEBUG (D_NET, "connection %p\n", conn);
1598
1599         LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);
1600         LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);
1601         LASSERT (conn->ksnc_sock == NULL);
1602         LASSERT (conn->ksnc_route == NULL);
1603         LASSERT (!conn->ksnc_tx_scheduled);
1604         LASSERT (!conn->ksnc_rx_scheduled);
1605         LASSERT (list_empty(&conn->ksnc_tx_queue));
1606
1607         /* complete current receive if any */
1608         switch (conn->ksnc_rx_state) {
1609         case SOCKNAL_RX_LNET_PAYLOAD:
1610                 CERROR("Completing partial receive from %s"
1611                        ", ip %d.%d.%d.%d:%d, with error\n",
1612                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1613                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1614                 lnet_finalize (conn->ksnc_peer->ksnp_ni,
1615                                conn->ksnc_cookie, -EIO);
1616                 break;
1617         case SOCKNAL_RX_LNET_HEADER:
1618                 if (conn->ksnc_rx_started)
1619                         CERROR("Incomplete receive of lnet header from %s"
1620                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1621                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1622                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1623                                conn->ksnc_proto->pro_version);
1624                 break;
1625         case SOCKNAL_RX_KSM_HEADER:
1626                 if (conn->ksnc_rx_started)
1627                         CERROR("Incomplete receive of ksock message from %s"
1628                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1629                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1630                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1631                                conn->ksnc_proto->pro_version);
1632                 break;
1633         case SOCKNAL_RX_SLOP:
1634                 if (conn->ksnc_rx_started)
1635                         CERROR("Incomplete receive of slops from %s"
1636                                ", ip %d.%d.%d.%d:%d, with error\n",
1637                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1638                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1639                break;
1640         default:
1641                 LBUG ();
1642                 break;
1643         }
1644
1645         ksocknal_peer_decref(conn->ksnc_peer);
1646
1647         LIBCFS_FREE (conn, sizeof (*conn));
1648 }
1649
1650 int
1651 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1652 {
1653         ksock_conn_t       *conn;
1654         struct list_head   *ctmp;
1655         struct list_head   *cnxt;
1656         int                 count = 0;
1657
1658         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1659                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1660
1661                 if (ipaddr == 0 ||
1662                     conn->ksnc_ipaddr == ipaddr) {
1663                         count++;
1664                         ksocknal_close_conn_locked (conn, why);
1665                 }
1666         }
1667
1668         return (count);
1669 }
1670
1671 int
1672 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1673 {
1674         ksock_peer_t     *peer = conn->ksnc_peer;
1675         __u32             ipaddr = conn->ksnc_ipaddr;
1676         int               count;
1677
1678         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1679
1680         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1681
1682         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1683
1684         return (count);
1685 }
1686
1687 int
1688 ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr)
1689 {
1690         ksock_peer_t       *peer;
1691         struct list_head   *ptmp;
1692         struct list_head   *pnxt;
1693         int                 lo;
1694         int                 hi;
1695         int                 i;
1696         int                 count = 0;
1697
1698         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1699
1700         if (id.nid != LNET_NID_ANY)
1701                 lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;
1702         else {
1703                 lo = 0;
1704                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1705         }
1706
1707         for (i = lo; i <= hi; i++) {
1708                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1709
1710                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1711
1712                         if (!((id.nid == LNET_NID_ANY || id.nid == peer->ksnp_id.nid) &&
1713                               (id.pid == LNET_PID_ANY || id.pid == peer->ksnp_id.pid)))
1714                                 continue;
1715
1716                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1717                 }
1718         }
1719
1720         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1721
1722         /* wildcards always succeed */
1723         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1724                 return (0);
1725
1726         return (count == 0 ? -ENOENT : 0);
1727 }
1728
1729 void
1730 ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive)
1731 {
1732         /* The router is telling me she's been notified of a change in
1733          * gateway state.... */
1734         lnet_process_id_t  id = {.nid = gw_nid, .pid = LNET_PID_ANY};
1735
1736         CDEBUG (D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid),
1737                 alive ? "up" : "down");
1738
1739         if (!alive) {
1740                 /* If the gateway crashed, close all open connections... */
1741                 ksocknal_close_matching_conns (id, 0);
1742                 return;
1743         }
1744
1745         /* ...otherwise do nothing.  We can only establish new connections
1746          * if we have autroutes, and these connect on demand. */
1747 }
1748
1749 void
1750 ksocknal_push_peer (ksock_peer_t *peer)
1751 {
1752         int               index;
1753         int               i;
1754         struct list_head *tmp;
1755         ksock_conn_t     *conn;
1756
1757         for (index = 0; ; index++) {
1758                 read_lock (&ksocknal_data.ksnd_global_lock);
1759
1760                 i = 0;
1761                 conn = NULL;
1762
1763                 list_for_each (tmp, &peer->ksnp_conns) {
1764                         if (i++ == index) {
1765                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1766                                 ksocknal_conn_addref(conn);
1767                                 break;
1768                         }
1769                 }
1770
1771                 read_unlock (&ksocknal_data.ksnd_global_lock);
1772
1773                 if (conn == NULL)
1774                         break;
1775
1776                 ksocknal_lib_push_conn (conn);
1777                 ksocknal_conn_decref(conn);
1778         }
1779 }
1780
1781 int
1782 ksocknal_push (lnet_ni_t *ni, lnet_process_id_t id)
1783 {
1784         ksock_peer_t      *peer;
1785         struct list_head  *tmp;
1786         int                index;
1787         int                i;
1788         int                j;
1789         int                rc = -ENOENT;
1790
1791         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1792                 for (j = 0; ; j++) {
1793                         read_lock (&ksocknal_data.ksnd_global_lock);
1794
1795                         index = 0;
1796                         peer = NULL;
1797
1798                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1799                                 peer = list_entry(tmp, ksock_peer_t,
1800                                                   ksnp_list);
1801
1802                                 if (!((id.nid == LNET_NID_ANY ||
1803                                        id.nid == peer->ksnp_id.nid) &&
1804                                       (id.pid == LNET_PID_ANY ||
1805                                        id.pid == peer->ksnp_id.pid))) {
1806                                         peer = NULL;
1807                                         continue;
1808                                 }
1809
1810                                 if (index++ == j) {
1811                                         ksocknal_peer_addref(peer);
1812                                         break;
1813                                 }
1814                         }
1815
1816                         read_unlock (&ksocknal_data.ksnd_global_lock);
1817
1818                         if (peer != NULL) {
1819                                 rc = 0;
1820                                 ksocknal_push_peer (peer);
1821                                 ksocknal_peer_decref(peer);
1822                         }
1823                 }
1824
1825         }
1826
1827         return (rc);
1828 }
1829
1830 int
1831 ksocknal_add_interface(lnet_ni_t *ni, __u32 ipaddress, __u32 netmask)
1832 {
1833         ksock_net_t       *net = ni->ni_data;
1834         ksock_interface_t *iface;
1835         int                rc;
1836         int                i;
1837         int                j;
1838         struct list_head  *ptmp;
1839         ksock_peer_t      *peer;
1840         struct list_head  *rtmp;
1841         ksock_route_t     *route;
1842
1843         if (ipaddress == 0 ||
1844             netmask == 0)
1845                 return (-EINVAL);
1846
1847         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1848
1849         iface = ksocknal_ip2iface(ni, ipaddress);
1850         if (iface != NULL) {
1851                 /* silently ignore dups */
1852                 rc = 0;
1853         } else if (net->ksnn_ninterfaces == LNET_MAX_INTERFACES) {
1854                 rc = -ENOSPC;
1855         } else {
1856                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1857
1858                 iface->ksni_ipaddr = ipaddress;
1859                 iface->ksni_netmask = netmask;
1860                 iface->ksni_nroutes = 0;
1861                 iface->ksni_npeers = 0;
1862
1863                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1864                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1865                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1866
1867                                 for (j = 0; j < peer->ksnp_n_passive_ips; j++)
1868                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1869                                                 iface->ksni_npeers++;
1870
1871                                 list_for_each(rtmp, &peer->ksnp_routes) {
1872                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1873
1874                                         if (route->ksnr_myipaddr == ipaddress)
1875                                                 iface->ksni_nroutes++;
1876                                 }
1877                         }
1878                 }
1879
1880                 rc = 0;
1881                 /* NB only new connections will pay attention to the new interface! */
1882         }
1883
1884         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1885
1886         return (rc);
1887 }
1888
1889 void
1890 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1891 {
1892         struct list_head   *tmp;
1893         struct list_head   *nxt;
1894         ksock_route_t      *route;
1895         ksock_conn_t       *conn;
1896         int                 i;
1897         int                 j;
1898
1899         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1900                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1901                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1902                                 peer->ksnp_passive_ips[j-1] =
1903                                         peer->ksnp_passive_ips[j];
1904                         peer->ksnp_n_passive_ips--;
1905                         break;
1906                 }
1907
1908         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1909                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1910
1911                 if (route->ksnr_myipaddr != ipaddr)
1912                         continue;
1913
1914                 if (route->ksnr_share_count != 0) {
1915                         /* Manually created; keep, but unbind */
1916                         route->ksnr_myipaddr = 0;
1917                 } else {
1918                         ksocknal_del_route_locked(route);
1919                 }
1920         }
1921
1922         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1923                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1924
1925                 if (conn->ksnc_myipaddr == ipaddr)
1926                         ksocknal_close_conn_locked (conn, 0);
1927         }
1928 }
1929
1930 int
1931 ksocknal_del_interface(lnet_ni_t *ni, __u32 ipaddress)
1932 {
1933         ksock_net_t       *net = ni->ni_data;
1934         int                rc = -ENOENT;
1935         struct list_head  *tmp;
1936         struct list_head  *nxt;
1937         ksock_peer_t      *peer;
1938         __u32              this_ip;
1939         int                i;
1940         int                j;
1941
1942         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1943
1944         for (i = 0; i < net->ksnn_ninterfaces; i++) {
1945                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
1946
1947                 if (!(ipaddress == 0 ||
1948                       ipaddress == this_ip))
1949                         continue;
1950
1951                 rc = 0;
1952
1953                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
1954                         net->ksnn_interfaces[j-1] =
1955                                 net->ksnn_interfaces[j];
1956
1957                 net->ksnn_ninterfaces--;
1958
1959                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1960                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1961                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1962
1963                                 if (peer->ksnp_ni != ni)
1964                                         continue;
1965
1966                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1967                         }
1968                 }
1969         }
1970
1971         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1972
1973         return (rc);
1974 }
1975
1976 int
1977 ksocknal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1978 {
1979         struct libcfs_ioctl_data *data = arg;
1980         int rc;
1981
1982         switch(cmd) {
1983         case IOC_LIBCFS_GET_INTERFACE: {
1984                 ksock_net_t       *net = ni->ni_data;
1985                 ksock_interface_t *iface;
1986
1987                 read_lock (&ksocknal_data.ksnd_global_lock);
1988
1989                 if (data->ioc_count < 0 ||
1990                     data->ioc_count >= net->ksnn_ninterfaces) {
1991                         rc = -ENOENT;
1992                 } else {
1993                         rc = 0;
1994                         iface = &net->ksnn_interfaces[data->ioc_count];
1995
1996                         data->ioc_u32[0] = iface->ksni_ipaddr;
1997                         data->ioc_u32[1] = iface->ksni_netmask;
1998                         data->ioc_u32[2] = iface->ksni_npeers;
1999                         data->ioc_u32[3] = iface->ksni_nroutes;
2000                 }
2001
2002                 read_unlock (&ksocknal_data.ksnd_global_lock);
2003                 return rc;
2004         }
2005
2006         case IOC_LIBCFS_ADD_INTERFACE:
2007                 return ksocknal_add_interface(ni,
2008                                               data->ioc_u32[0], /* IP address */
2009                                               data->ioc_u32[1]); /* net mask */
2010
2011         case IOC_LIBCFS_DEL_INTERFACE:
2012                 return ksocknal_del_interface(ni,
2013                                               data->ioc_u32[0]); /* IP address */
2014
2015         case IOC_LIBCFS_GET_PEER: {
2016                 lnet_process_id_t id = {0,};
2017                 __u32            myip = 0;
2018                 __u32            ip = 0;
2019                 int              port = 0;
2020                 int              conn_count = 0;
2021                 int              share_count = 0;
2022
2023                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2024                                             &id, &myip, &ip, &port,
2025                                             &conn_count,  &share_count);
2026                 if (rc != 0)
2027                         return rc;
2028
2029                 data->ioc_nid    = id.nid;
2030                 data->ioc_count  = share_count;
2031                 data->ioc_u32[0] = ip;
2032                 data->ioc_u32[1] = port;
2033                 data->ioc_u32[2] = myip;
2034                 data->ioc_u32[3] = conn_count;
2035                 data->ioc_u32[4] = id.pid;
2036                 return 0;
2037         }
2038
2039         case IOC_LIBCFS_ADD_PEER: {
2040                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2041                                          .pid = LUSTRE_SRV_LNET_PID};
2042                 return ksocknal_add_peer (ni, id,
2043                                           data->ioc_u32[0], /* IP */
2044                                           data->ioc_u32[1]); /* port */
2045         }
2046         case IOC_LIBCFS_DEL_PEER: {
2047                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2048                                          .pid = LNET_PID_ANY};
2049                 return ksocknal_del_peer (ni, id,
2050                                           data->ioc_u32[0]); /* IP */
2051         }
2052         case IOC_LIBCFS_GET_CONN: {
2053                 int           txmem;
2054                 int           rxmem;
2055                 int           nagle;
2056                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (ni, data->ioc_count);
2057
2058                 if (conn == NULL)
2059                         return -ENOENT;
2060
2061                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2062
2063                 data->ioc_count  = txmem;
2064                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2065                 data->ioc_flags  = nagle;
2066                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2067                 data->ioc_u32[1] = conn->ksnc_port;
2068                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2069                 data->ioc_u32[3] = conn->ksnc_type;
2070                 data->ioc_u32[4] = conn->ksnc_scheduler -
2071                                    ksocknal_data.ksnd_schedulers;
2072                 data->ioc_u32[5] = rxmem;
2073                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2074                 ksocknal_conn_decref(conn);
2075                 return 0;
2076         }
2077
2078         case IOC_LIBCFS_CLOSE_CONNECTION: {
2079                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2080                                         .pid = LNET_PID_ANY};
2081
2082                 return ksocknal_close_matching_conns (id,
2083                                                       data->ioc_u32[0]);
2084         }
2085         case IOC_LIBCFS_REGISTER_MYNID:
2086                 /* Ignore if this is a noop */
2087                 if (data->ioc_nid == ni->ni_nid)
2088                         return 0;
2089
2090                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2091                        libcfs_nid2str(data->ioc_nid),
2092                        libcfs_nid2str(ni->ni_nid));
2093                 return -EINVAL;
2094
2095         case IOC_LIBCFS_PUSH_CONNECTION: {
2096                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2097                                         .pid = LNET_PID_ANY};
2098
2099                 return ksocknal_push(ni, id);
2100         }
2101         default:
2102                 return -EINVAL;
2103         }
2104         /* not reached */
2105 }
2106
2107 void
2108 ksocknal_free_buffers (void)
2109 {
2110         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2111
2112         if (ksocknal_data.ksnd_schedulers != NULL)
2113                 LIBCFS_FREE (ksocknal_data.ksnd_schedulers,
2114                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2115
2116         LIBCFS_FREE (ksocknal_data.ksnd_peers,
2117                      sizeof (struct list_head) *
2118                      ksocknal_data.ksnd_peer_hash_size);
2119
2120         spin_lock(&ksocknal_data.ksnd_tx_lock);
2121
2122         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2123                 struct list_head  zlist;
2124                 ksock_tx_t       *tx;
2125
2126                 list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2127                 list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2128                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2129
2130                 while(!list_empty(&zlist)) {
2131                         tx = list_entry(zlist.next, ksock_tx_t, tx_list);
2132                         list_del(&tx->tx_list);
2133                         LIBCFS_FREE(tx, tx->tx_desc_size);
2134                 }
2135         } else {
2136                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2137         }
2138 }
2139
2140 void
2141 ksocknal_base_shutdown (void)
2142 {
2143         ksock_sched_t *sched;
2144         int            i;
2145
2146         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2147                atomic_read (&libcfs_kmemory));
2148         LASSERT (ksocknal_data.ksnd_nnets == 0);
2149
2150         switch (ksocknal_data.ksnd_init) {
2151         default:
2152                 LASSERT (0);
2153
2154         case SOCKNAL_INIT_ALL:
2155         case SOCKNAL_INIT_DATA:
2156                 LASSERT (ksocknal_data.ksnd_peers != NULL);
2157                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2158                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
2159                 }
2160                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
2161                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
2162                 LASSERT (list_empty (&ksocknal_data.ksnd_connd_connreqs));
2163                 LASSERT (list_empty (&ksocknal_data.ksnd_connd_routes));
2164
2165                 if (ksocknal_data.ksnd_schedulers != NULL)
2166                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2167                                 ksock_sched_t *kss =
2168                                         &ksocknal_data.ksnd_schedulers[i];
2169
2170                                 LASSERT (list_empty (&kss->kss_tx_conns));
2171                                 LASSERT (list_empty (&kss->kss_rx_conns));
2172                                 LASSERT (list_empty (&kss->kss_zombie_noop_txs));
2173                                 LASSERT (kss->kss_nconns == 0);
2174                         }
2175
2176                 /* flag threads to terminate; wake and wait for them to die */
2177                 ksocknal_data.ksnd_shuttingdown = 1;
2178                 cfs_waitq_broadcast (&ksocknal_data.ksnd_connd_waitq);
2179                 cfs_waitq_broadcast (&ksocknal_data.ksnd_reaper_waitq);
2180
2181                 if (ksocknal_data.ksnd_schedulers != NULL)
2182                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2183                                 sched = &ksocknal_data.ksnd_schedulers[i];
2184                                 cfs_waitq_broadcast(&sched->kss_waitq);
2185                         }
2186
2187                 i = 4;
2188                 read_lock (&ksocknal_data.ksnd_global_lock);
2189                 while (ksocknal_data.ksnd_nthreads != 0) {
2190                         i++;
2191                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2192                                "waiting for %d threads to terminate\n",
2193                                 ksocknal_data.ksnd_nthreads);
2194                         read_unlock (&ksocknal_data.ksnd_global_lock);
2195                         cfs_pause(cfs_time_seconds(1));
2196                         read_lock (&ksocknal_data.ksnd_global_lock);
2197                 }
2198                 read_unlock (&ksocknal_data.ksnd_global_lock);
2199
2200                 ksocknal_free_buffers();
2201
2202                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2203                 break;
2204         }
2205
2206         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2207                atomic_read (&libcfs_kmemory));
2208
2209         PORTAL_MODULE_UNUSE;
2210 }
2211
2212
2213 __u64
2214 ksocknal_new_incarnation (void)
2215 {
2216         struct timeval tv;
2217
2218         /* The incarnation number is the time this module loaded and it
2219          * identifies this particular instance of the socknal.  Hopefully
2220          * we won't be able to reboot more frequently than 1MHz for the
2221          * forseeable future :) */
2222
2223         do_gettimeofday(&tv);
2224
2225         return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2226 }
2227
2228 int
2229 ksocknal_base_startup (void)
2230 {
2231         int               rc;
2232         int               i;
2233
2234         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2235         LASSERT (ksocknal_data.ksnd_nnets == 0);
2236
2237         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2238
2239         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2240         LIBCFS_ALLOC (ksocknal_data.ksnd_peers,
2241                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2242         if (ksocknal_data.ksnd_peers == NULL)
2243                 return -ENOMEM;
2244
2245         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2246                 CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2247
2248         rwlock_init(&ksocknal_data.ksnd_global_lock);
2249
2250         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2251         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2252         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2253         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2254         cfs_waitq_init(&ksocknal_data.ksnd_reaper_waitq);
2255
2256         spin_lock_init (&ksocknal_data.ksnd_connd_lock);
2257         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_connreqs);
2258         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_routes);
2259         cfs_waitq_init(&ksocknal_data.ksnd_connd_waitq);
2260
2261         spin_lock_init (&ksocknal_data.ksnd_tx_lock);
2262         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_idle_noop_txs);
2263
2264         /* NB memset above zeros whole of ksocknal_data, including
2265          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2266
2267         /* flag lists/ptrs/locks initialised */
2268         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2269         PORTAL_MODULE_USE;
2270
2271         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2272         LIBCFS_ALLOC(ksocknal_data.ksnd_schedulers,
2273                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2274         if (ksocknal_data.ksnd_schedulers == NULL)
2275                 goto failed;
2276
2277         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2278                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2279
2280                 spin_lock_init (&kss->kss_lock);
2281                 CFS_INIT_LIST_HEAD (&kss->kss_rx_conns);
2282                 CFS_INIT_LIST_HEAD (&kss->kss_tx_conns);
2283                 CFS_INIT_LIST_HEAD (&kss->kss_zombie_noop_txs);
2284                 cfs_waitq_init (&kss->kss_waitq);
2285         }
2286
2287         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2288                 rc = ksocknal_thread_start (ksocknal_scheduler,
2289                                             &ksocknal_data.ksnd_schedulers[i]);
2290                 if (rc != 0) {
2291                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2292                                i, rc);
2293                         goto failed;
2294                 }
2295         }
2296
2297         /* must have at least 2 connds to remain responsive to accepts while
2298          * connecting */
2299         if (*ksocknal_tunables.ksnd_nconnds < 2)
2300                 *ksocknal_tunables.ksnd_nconnds = 2;
2301
2302         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2303                 rc = ksocknal_thread_start (ksocknal_connd, (void *)((long)i));
2304                 if (rc != 0) {
2305                         CERROR("Can't spawn socknal connd: %d\n", rc);
2306                         goto failed;
2307                 }
2308         }
2309
2310         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2311         if (rc != 0) {
2312                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2313                 goto failed;
2314         }
2315
2316         /* flag everything initialised */
2317         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2318
2319         return 0;
2320
2321  failed:
2322         ksocknal_base_shutdown();
2323         return -ENETDOWN;
2324 }
2325
2326 void
2327 ksocknal_debug_peerhash (lnet_ni_t *ni)
2328 {
2329         ksock_peer_t     *peer = NULL;
2330         struct list_head *tmp;
2331         int               i;
2332
2333         read_lock (&ksocknal_data.ksnd_global_lock);
2334
2335         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2336                 list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
2337                         peer = list_entry (tmp, ksock_peer_t, ksnp_list);
2338
2339                         if (peer->ksnp_ni == ni) break;
2340
2341                         peer = NULL;
2342                 }
2343         }
2344
2345         if (peer != NULL) {
2346                 ksock_route_t *route;
2347                 ksock_conn_t  *conn;
2348
2349                 CWARN ("Active peer on shutdown: %s, ref %d, scnt %d, "
2350                        "closing %d, accepting %d, err %d, zcookie "LPU64", "
2351                        "txq %d, zc_req %d\n", libcfs_id2str(peer->ksnp_id),
2352                        atomic_read(&peer->ksnp_refcount),
2353                        peer->ksnp_sharecount, peer->ksnp_closing,
2354                        peer->ksnp_accepting, peer->ksnp_error,
2355                        peer->ksnp_zc_next_cookie,
2356                        !list_empty(&peer->ksnp_tx_queue),
2357                        !list_empty(&peer->ksnp_zc_req_list));
2358
2359                 list_for_each (tmp, &peer->ksnp_routes) {
2360                         route = list_entry(tmp, ksock_route_t, ksnr_list);
2361                         CWARN ("Route: ref %d, schd %d, conn %d, cnted %d, "
2362                                "del %d\n", atomic_read(&route->ksnr_refcount),
2363                                route->ksnr_scheduled, route->ksnr_connecting,
2364                                route->ksnr_connected, route->ksnr_deleted);
2365                 }
2366
2367                 list_for_each (tmp, &peer->ksnp_conns) {
2368                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
2369                         CWARN ("Conn: ref %d, sref %d, t %d, c %d\n",
2370                                atomic_read(&conn->ksnc_conn_refcount),
2371                                atomic_read(&conn->ksnc_sock_refcount),
2372                                conn->ksnc_type, conn->ksnc_closing);
2373                 }
2374         }
2375
2376         read_unlock (&ksocknal_data.ksnd_global_lock);
2377         return;
2378 }
2379
2380 void
2381 ksocknal_shutdown (lnet_ni_t *ni)
2382 {
2383         ksock_net_t      *net = ni->ni_data;
2384         int               i;
2385         lnet_process_id_t  anyid = {.nid = LNET_NID_ANY,
2386                                    .pid = LNET_PID_ANY};
2387
2388         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2389         LASSERT(ksocknal_data.ksnd_nnets > 0);
2390
2391         spin_lock_bh (&net->ksnn_lock);
2392         net->ksnn_shutdown = 1;                 /* prevent new peers */
2393         spin_unlock_bh (&net->ksnn_lock);
2394
2395         /* Delete all peers */
2396         ksocknal_del_peer(ni, anyid, 0);
2397
2398         /* Wait for all peer state to clean up */
2399         i = 2;
2400         spin_lock_bh (&net->ksnn_lock);
2401         while (net->ksnn_npeers != 0) {
2402                 spin_unlock_bh (&net->ksnn_lock);
2403
2404                 i++;
2405                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2406                        "waiting for %d peers to disconnect\n",
2407                        net->ksnn_npeers);
2408                 cfs_pause(cfs_time_seconds(1));
2409
2410                 ksocknal_debug_peerhash(ni);
2411
2412                 spin_lock_bh (&net->ksnn_lock);
2413         }
2414         spin_unlock_bh (&net->ksnn_lock);
2415
2416         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2417                 LASSERT (net->ksnn_interfaces[i].ksni_npeers == 0);
2418                 LASSERT (net->ksnn_interfaces[i].ksni_nroutes == 0);
2419         }
2420
2421         LIBCFS_FREE(net, sizeof(*net));
2422
2423         ksocknal_data.ksnd_nnets--;
2424         if (ksocknal_data.ksnd_nnets == 0)
2425                 ksocknal_base_shutdown();
2426 }
2427
2428 int
2429 ksocknal_enumerate_interfaces(ksock_net_t *net)
2430 {
2431         char      **names;
2432         int         i;
2433         int         j;
2434         int         rc;
2435         int         n;
2436
2437         n = libcfs_ipif_enumerate(&names);
2438         if (n <= 0) {
2439                 CERROR("Can't enumerate interfaces: %d\n", n);
2440                 return n;
2441         }
2442
2443         for (i = j = 0; i < n; i++) {
2444                 int        up;
2445                 __u32      ip;
2446                 __u32      mask;
2447
2448                 if (!strcmp(names[i], "lo")) /* skip the loopback IF */
2449                         continue;
2450
2451                 rc = libcfs_ipif_query(names[i], &up, &ip, &mask);
2452                 if (rc != 0) {
2453                         CWARN("Can't get interface %s info: %d\n",
2454                               names[i], rc);
2455                         continue;
2456                 }
2457
2458                 if (!up) {
2459                         CWARN("Ignoring interface %s (down)\n",
2460                               names[i]);
2461                         continue;
2462                 }
2463
2464                 if (j == LNET_MAX_INTERFACES) {
2465                         CWARN("Ignoring interface %s (too many interfaces)\n",
2466                               names[i]);
2467                         continue;
2468                 }
2469
2470                 net->ksnn_interfaces[j].ksni_ipaddr = ip;
2471                 net->ksnn_interfaces[j].ksni_netmask = mask;
2472                 j++;
2473         }
2474
2475         libcfs_ipif_free_enumeration(names, n);
2476
2477         if (j == 0)
2478                 CERROR("Can't find any usable interfaces\n");
2479
2480         return j;
2481 }
2482
2483 int
2484 ksocknal_startup (lnet_ni_t *ni)
2485 {
2486         ksock_net_t  *net;
2487         int           rc;
2488         int           i;
2489
2490         LASSERT (ni->ni_lnd == &the_ksocklnd);
2491
2492         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2493                 rc = ksocknal_base_startup();
2494                 if (rc != 0)
2495                         return rc;
2496         }
2497
2498         LIBCFS_ALLOC(net, sizeof(*net));
2499         if (net == NULL)
2500                 goto fail_0;
2501
2502         memset(net, 0, sizeof(*net));
2503         spin_lock_init(&net->ksnn_lock);
2504         net->ksnn_incarnation = ksocknal_new_incarnation();
2505         ni->ni_data = net;
2506         ni->ni_maxtxcredits = *ksocknal_tunables.ksnd_credits;
2507         ni->ni_peertxcredits = *ksocknal_tunables.ksnd_peercredits;
2508
2509         if (ni->ni_interfaces[0] == NULL) {
2510                 rc = ksocknal_enumerate_interfaces(net);
2511                 if (rc <= 0)
2512                         goto fail_1;
2513
2514                 net->ksnn_ninterfaces = 1;
2515         } else {
2516                 for (i = 0; i < LNET_MAX_INTERFACES; i++) {
2517                         int    up;
2518
2519                         if (ni->ni_interfaces[i] == NULL)
2520                                 break;
2521
2522                         rc = libcfs_ipif_query(
2523                                 ni->ni_interfaces[i], &up,
2524                                 &net->ksnn_interfaces[i].ksni_ipaddr,
2525                                 &net->ksnn_interfaces[i].ksni_netmask);
2526
2527                         if (rc != 0) {
2528                                 CERROR("Can't get interface %s info: %d\n",
2529                                        ni->ni_interfaces[i], rc);
2530                                 goto fail_1;
2531                         }
2532
2533                         if (!up) {
2534                                 CERROR("Interface %s is down\n",
2535                                        ni->ni_interfaces[i]);
2536                                 goto fail_1;
2537                         }
2538                 }
2539                 net->ksnn_ninterfaces = i;
2540         }
2541
2542         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
2543                                 net->ksnn_interfaces[0].ksni_ipaddr);
2544
2545         ksocknal_data.ksnd_nnets++;
2546
2547         return 0;
2548
2549  fail_1:
2550         LIBCFS_FREE(net, sizeof(*net));
2551  fail_0:
2552         if (ksocknal_data.ksnd_nnets == 0)
2553                 ksocknal_base_shutdown();
2554
2555         return -ENETDOWN;
2556 }
2557
2558
2559 void __exit
2560 ksocknal_module_fini (void)
2561 {
2562         lnet_unregister_lnd(&the_ksocklnd);
2563         ksocknal_lib_tunables_fini();
2564 }
2565
2566 int __init
2567 ksocknal_module_init (void)
2568 {
2569         int    rc;
2570
2571         /* check ksnr_connected/connecting field large enough */
2572         CLASSERT(SOCKLND_CONN_NTYPES <= 4);
2573
2574         rc = ksocknal_lib_tunables_init();
2575         if (rc != 0)
2576                 return rc;
2577
2578         lnet_register_lnd(&the_ksocklnd);
2579
2580         return 0;
2581 }
2582
2583 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2584 MODULE_DESCRIPTION("Kernel TCP Socket LND v2.0.0");
2585 MODULE_LICENSE("GPL");
2586
2587 cfs_module(ksocknal, "2.0.0", ksocknal_module_init, ksocknal_module_fini);