Whamcloud - gitweb
- off-by-one assertion error.
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socklnd.h"
27
28 lnd_t the_ksocklnd = {
29         .lnd_type       = SOCKLND,
30         .lnd_startup    = ksocknal_startup,
31         .lnd_shutdown   = ksocknal_shutdown,
32         .lnd_ctl        = ksocknal_ctl,
33         .lnd_send       = ksocknal_send,
34         .lnd_recv       = ksocknal_recv,
35         .lnd_notify     = ksocknal_notify,
36         .lnd_accept     = ksocknal_accept,
37 };
38
39 ksock_nal_data_t        ksocknal_data;
40
41 ksock_interface_t *
42 ksocknal_ip2iface(lnet_ni_t *ni, __u32 ip)
43 {
44         ksock_net_t       *net = ni->ni_data;
45         int                i;
46         ksock_interface_t *iface;
47
48         for (i = 0; i < net->ksnn_ninterfaces; i++) {
49                 LASSERT(i < LNET_MAX_INTERFACES);
50                 iface = &net->ksnn_interfaces[i];
51
52                 if (iface->ksni_ipaddr == ip)
53                         return (iface);
54         }
55
56         return (NULL);
57 }
58
59 ksock_route_t *
60 ksocknal_create_route (__u32 ipaddr, int port)
61 {
62         ksock_route_t *route;
63
64         LIBCFS_ALLOC (route, sizeof (*route));
65         if (route == NULL)
66                 return (NULL);
67
68         atomic_set (&route->ksnr_refcount, 1);
69         route->ksnr_peer = NULL;
70         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
71         route->ksnr_ipaddr = ipaddr;
72         route->ksnr_port = port;
73         route->ksnr_scheduled = 0;
74         route->ksnr_connecting = 0;
75         route->ksnr_connected = 0;
76         route->ksnr_deleted = 0;
77         route->ksnr_conn_count = 0;
78         route->ksnr_share_count = 0;
79
80         return (route);
81 }
82
83 void
84 ksocknal_destroy_route (ksock_route_t *route)
85 {
86         LASSERT (atomic_read(&route->ksnr_refcount) == 0);
87
88         if (route->ksnr_peer != NULL)
89                 ksocknal_peer_decref(route->ksnr_peer);
90
91         LIBCFS_FREE (route, sizeof (*route));
92 }
93
94 int
95 ksocknal_create_peer (ksock_peer_t **peerp, lnet_ni_t *ni, lnet_process_id_t id)
96 {
97         ksock_net_t   *net = ni->ni_data;
98         ksock_peer_t  *peer;
99
100         LASSERT (id.nid != LNET_NID_ANY);
101         LASSERT (id.pid != LNET_PID_ANY);
102         LASSERT (!in_interrupt());
103
104         LIBCFS_ALLOC (peer, sizeof (*peer));
105         if (peer == NULL)
106                 return -ENOMEM;
107
108         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
109
110         peer->ksnp_ni = ni;
111         peer->ksnp_id = id;
112         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
113         peer->ksnp_closing = 0;
114         peer->ksnp_accepting = 0;
115         peer->ksnp_zc_next_cookie = 1;
116         peer->ksnp_proto = NULL;
117         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
118         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
119         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
120         CFS_INIT_LIST_HEAD (&peer->ksnp_zc_req_list);
121         spin_lock_init(&peer->ksnp_lock);
122
123         spin_lock_bh (&net->ksnn_lock);
124
125         if (net->ksnn_shutdown) {
126                 spin_unlock_bh (&net->ksnn_lock);
127                 
128                 LIBCFS_FREE(peer, sizeof(*peer));
129                 CERROR("Can't create peer: network shutdown\n");
130                 return -ESHUTDOWN;
131         }
132
133         net->ksnn_npeers++;
134
135         spin_unlock_bh (&net->ksnn_lock);
136
137         *peerp = peer;
138         return 0;
139 }
140
141 void
142 ksocknal_destroy_peer (ksock_peer_t *peer)
143 {
144         ksock_net_t    *net = peer->ksnp_ni->ni_data;
145
146         CDEBUG (D_NET, "peer %s %p deleted\n", 
147                 libcfs_id2str(peer->ksnp_id), peer);
148
149         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
150         LASSERT (peer->ksnp_accepting == 0);
151         LASSERT (list_empty (&peer->ksnp_conns));
152         LASSERT (list_empty (&peer->ksnp_routes));
153         LASSERT (list_empty (&peer->ksnp_tx_queue));
154         LASSERT (list_empty (&peer->ksnp_zc_req_list));
155
156         LIBCFS_FREE (peer, sizeof (*peer));
157
158         /* NB a peer's connections and routes keep a reference on their peer
159          * until they are destroyed, so we can be assured that _all_ state to
160          * do with this peer has been cleaned up when its refcount drops to
161          * zero. */
162         spin_lock_bh (&net->ksnn_lock);
163         net->ksnn_npeers--;
164         spin_unlock_bh (&net->ksnn_lock);
165 }
166
167 ksock_peer_t *
168 ksocknal_find_peer_locked (lnet_ni_t *ni, lnet_process_id_t id)
169 {
170         struct list_head *peer_list = ksocknal_nid2peerlist(id.nid);
171         struct list_head *tmp;
172         ksock_peer_t     *peer;
173
174         list_for_each (tmp, peer_list) {
175
176                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
177
178                 LASSERT (!peer->ksnp_closing);
179
180                 if (peer->ksnp_ni != ni)
181                         continue;
182
183                 if (peer->ksnp_id.nid != id.nid ||
184                     peer->ksnp_id.pid != id.pid)
185                         continue;
186
187                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
188                        peer, libcfs_id2str(id), 
189                        atomic_read(&peer->ksnp_refcount));
190                 return (peer);
191         }
192         return (NULL);
193 }
194
195 ksock_peer_t *
196 ksocknal_find_peer (lnet_ni_t *ni, lnet_process_id_t id)
197 {
198         ksock_peer_t     *peer;
199
200         read_lock (&ksocknal_data.ksnd_global_lock);
201         peer = ksocknal_find_peer_locked (ni, id);
202         if (peer != NULL)                       /* +1 ref for caller? */
203                 ksocknal_peer_addref(peer);
204         read_unlock (&ksocknal_data.ksnd_global_lock);
205
206         return (peer);
207 }
208
209 void
210 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
211 {
212         int                i;
213         __u32              ip;
214
215         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
216                 LASSERT (i < LNET_MAX_INTERFACES);
217                 ip = peer->ksnp_passive_ips[i];
218
219                 ksocknal_ip2iface(peer->ksnp_ni, ip)->ksni_npeers--;
220         }
221
222         LASSERT (list_empty(&peer->ksnp_conns));
223         LASSERT (list_empty(&peer->ksnp_routes));
224         LASSERT (!peer->ksnp_closing);
225         peer->ksnp_closing = 1;
226         list_del (&peer->ksnp_list);
227         /* lose peerlist's ref */
228         ksocknal_peer_decref(peer);
229 }
230
231 int
232 ksocknal_get_peer_info (lnet_ni_t *ni, int index, 
233                         lnet_process_id_t *id, __u32 *myip, __u32 *peer_ip, int *port,
234                         int *conn_count, int *share_count)
235 {
236         ksock_peer_t      *peer;
237         struct list_head  *ptmp;
238         ksock_route_t     *route;
239         struct list_head  *rtmp;
240         int                i;
241         int                j;
242         int                rc = -ENOENT;
243
244         read_lock (&ksocknal_data.ksnd_global_lock);
245
246         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
247
248                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
249                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
250
251                         if (peer->ksnp_ni != ni)
252                                 continue;
253
254                         if (peer->ksnp_n_passive_ips == 0 &&
255                             list_empty(&peer->ksnp_routes)) {
256                                 if (index-- > 0)
257                                         continue;
258
259                                 *id = peer->ksnp_id;
260                                 *myip = 0;
261                                 *peer_ip = 0;
262                                 *port = 0;
263                                 *conn_count = 0;
264                                 *share_count = 0;
265                                 rc = 0;
266                                 goto out;
267                         }
268
269                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
270                                 if (index-- > 0)
271                                         continue;
272
273                                 *id = peer->ksnp_id;
274                                 *myip = peer->ksnp_passive_ips[j];
275                                 *peer_ip = 0;
276                                 *port = 0;
277                                 *conn_count = 0;
278                                 *share_count = 0;
279                                 rc = 0;
280                                 goto out;
281                         }
282
283                         list_for_each (rtmp, &peer->ksnp_routes) {
284                                 if (index-- > 0)
285                                         continue;
286
287                                 route = list_entry(rtmp, ksock_route_t,
288                                                    ksnr_list);
289
290                                 *id = peer->ksnp_id;
291                                 *myip = route->ksnr_myipaddr;
292                                 *peer_ip = route->ksnr_ipaddr;
293                                 *port = route->ksnr_port;
294                                 *conn_count = route->ksnr_conn_count;
295                                 *share_count = route->ksnr_share_count;
296                                 rc = 0;
297                                 goto out;
298                         }
299                 }
300         }
301  out:
302         read_unlock (&ksocknal_data.ksnd_global_lock);
303         return (rc);
304 }
305
306 void
307 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
308 {
309         ksock_peer_t      *peer = route->ksnr_peer;
310         int                type = conn->ksnc_type;
311         ksock_interface_t *iface;
312
313         conn->ksnc_route = route;
314         ksocknal_route_addref(route);
315
316         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
317                 if (route->ksnr_myipaddr == 0) {
318                         /* route wasn't bound locally yet (the initial route) */
319                         CDEBUG(D_NET, "Binding %s %u.%u.%u.%u to %u.%u.%u.%u\n",
320                                libcfs_id2str(peer->ksnp_id),
321                                HIPQUAD(route->ksnr_ipaddr),
322                                HIPQUAD(conn->ksnc_myipaddr));
323                 } else {
324                         CDEBUG(D_NET, "Rebinding %s %u.%u.%u.%u from "
325                                "%u.%u.%u.%u to %u.%u.%u.%u\n",
326                                libcfs_id2str(peer->ksnp_id),
327                                HIPQUAD(route->ksnr_ipaddr),
328                                HIPQUAD(route->ksnr_myipaddr),
329                                HIPQUAD(conn->ksnc_myipaddr));
330
331                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
332                                                   route->ksnr_myipaddr);
333                         if (iface != NULL)
334                                 iface->ksni_nroutes--;
335                 }
336                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
337                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
338                                           route->ksnr_myipaddr);
339                 if (iface != NULL)
340                         iface->ksni_nroutes++;
341         }
342
343         route->ksnr_connected |= (1<<type);
344         route->ksnr_conn_count++;
345
346         /* Successful connection => further attempts can
347          * proceed immediately */
348         route->ksnr_retry_interval = 0;
349 }
350
351 void
352 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
353 {
354         struct list_head  *tmp;
355         ksock_conn_t      *conn;
356         ksock_route_t     *route2;
357
358         LASSERT (!peer->ksnp_closing);
359         LASSERT (route->ksnr_peer == NULL);
360         LASSERT (!route->ksnr_scheduled);
361         LASSERT (!route->ksnr_connecting);
362         LASSERT (route->ksnr_connected == 0);
363
364         /* LASSERT(unique) */
365         list_for_each(tmp, &peer->ksnp_routes) {
366                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
367
368                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
369                         CERROR ("Duplicate route %s %u.%u.%u.%u\n",
370                                 libcfs_id2str(peer->ksnp_id), 
371                                 HIPQUAD(route->ksnr_ipaddr));
372                         LBUG();
373                 }
374         }
375
376         route->ksnr_peer = peer;
377         ksocknal_peer_addref(peer);
378         /* peer's routelist takes over my ref on 'route' */
379         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
380
381         list_for_each(tmp, &peer->ksnp_conns) {
382                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
383
384                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
385                         continue;
386
387                 ksocknal_associate_route_conn_locked(route, conn);
388                 /* keep going (typed routes) */
389         }
390 }
391
392 void
393 ksocknal_del_route_locked (ksock_route_t *route)
394 {
395         ksock_peer_t      *peer = route->ksnr_peer;
396         ksock_interface_t *iface;
397         ksock_conn_t      *conn;
398         struct list_head  *ctmp;
399         struct list_head  *cnxt;
400
401         LASSERT (!route->ksnr_deleted);
402
403         /* Close associated conns */
404         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
405                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
406
407                 if (conn->ksnc_route != route)
408                         continue;
409
410                 ksocknal_close_conn_locked (conn, 0);
411         }
412
413         if (route->ksnr_myipaddr != 0) {
414                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
415                                           route->ksnr_myipaddr);
416                 if (iface != NULL)
417                         iface->ksni_nroutes--;
418         }
419
420         route->ksnr_deleted = 1;
421         list_del (&route->ksnr_list);
422         ksocknal_route_decref(route);             /* drop peer's ref */
423
424         if (list_empty (&peer->ksnp_routes) &&
425             list_empty (&peer->ksnp_conns)) {
426                 /* I've just removed the last route to a peer with no active
427                  * connections */
428                 ksocknal_unlink_peer_locked (peer);
429         }
430 }
431
432 int
433 ksocknal_add_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ipaddr, int port)
434 {
435         struct list_head  *tmp;
436         ksock_peer_t      *peer;
437         ksock_peer_t      *peer2;
438         ksock_route_t     *route;
439         ksock_route_t     *route2;
440         int                rc;
441
442         if (id.nid == LNET_NID_ANY ||
443             id.pid == LNET_PID_ANY)
444                 return (-EINVAL);
445
446         /* Have a brand new peer ready... */
447         rc = ksocknal_create_peer(&peer, ni, id);
448         if (rc != 0)
449                 return rc;
450
451         route = ksocknal_create_route (ipaddr, port);
452         if (route == NULL) {
453                 ksocknal_peer_decref(peer);
454                 return (-ENOMEM);
455         }
456
457         write_lock_bh (&ksocknal_data.ksnd_global_lock);
458
459         /* always called with a ref on ni, so shutdown can't have started */
460         LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
461
462         peer2 = ksocknal_find_peer_locked (ni, id);
463         if (peer2 != NULL) {
464                 ksocknal_peer_decref(peer);
465                 peer = peer2;
466         } else {
467                 /* peer table takes my ref on peer */
468                 list_add_tail (&peer->ksnp_list,
469                                ksocknal_nid2peerlist (id.nid));
470         }
471
472         route2 = NULL;
473         list_for_each (tmp, &peer->ksnp_routes) {
474                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
475
476                 if (route2->ksnr_ipaddr == ipaddr)
477                         break;
478
479                 route2 = NULL;
480         }
481         if (route2 == NULL) {
482                 ksocknal_add_route_locked(peer, route);
483                 route->ksnr_share_count++;
484         } else {
485                 ksocknal_route_decref(route);
486                 route2->ksnr_share_count++;
487         }
488
489         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
490
491         return (0);
492 }
493
494 void
495 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip)
496 {
497         ksock_conn_t     *conn;
498         ksock_route_t    *route;
499         struct list_head *tmp;
500         struct list_head *nxt;
501         int               nshared;
502
503         LASSERT (!peer->ksnp_closing);
504
505         /* Extra ref prevents peer disappearing until I'm done with it */
506         ksocknal_peer_addref(peer);
507
508         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
509                 route = list_entry(tmp, ksock_route_t, ksnr_list);
510
511                 /* no match */
512                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
513                         continue;
514
515                 route->ksnr_share_count = 0;
516                 /* This deletes associated conns too */
517                 ksocknal_del_route_locked (route);
518         }
519
520         nshared = 0;
521         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
522                 route = list_entry(tmp, ksock_route_t, ksnr_list);
523                 nshared += route->ksnr_share_count;
524         }
525
526         if (nshared == 0) {
527                 /* remove everything else if there are no explicit entries
528                  * left */
529
530                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
531                         route = list_entry(tmp, ksock_route_t, ksnr_list);
532
533                         /* we should only be removing auto-entries */
534                         LASSERT(route->ksnr_share_count == 0);
535                         ksocknal_del_route_locked (route);
536                 }
537
538                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
539                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
540
541                         ksocknal_close_conn_locked(conn, 0);
542                 }
543         }
544
545         ksocknal_peer_decref(peer);
546         /* NB peer unlinks itself when last conn/route is removed */
547 }
548
549 int
550 ksocknal_del_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ip)
551 {
552         CFS_LIST_HEAD     (zombies);
553         struct list_head  *ptmp;
554         struct list_head  *pnxt;
555         ksock_peer_t      *peer;
556         int                lo;
557         int                hi;
558         int                i;
559         int                rc = -ENOENT;
560
561         write_lock_bh (&ksocknal_data.ksnd_global_lock);
562
563         if (id.nid != LNET_NID_ANY)
564                 lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;
565         else {
566                 lo = 0;
567                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
568         }
569
570         for (i = lo; i <= hi; i++) {
571                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
572                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
573
574                         if (peer->ksnp_ni != ni)
575                                 continue;
576
577                         if (!((id.nid == LNET_NID_ANY || peer->ksnp_id.nid == id.nid) &&
578                               (id.pid == LNET_PID_ANY || peer->ksnp_id.pid == id.pid)))
579                                 continue;
580
581                         ksocknal_peer_addref(peer);     /* a ref for me... */
582
583                         ksocknal_del_peer_locked (peer, ip);
584
585                         if (peer->ksnp_closing && !list_empty(&peer->ksnp_tx_queue)) {
586                                 LASSERT (list_empty(&peer->ksnp_conns));
587                                 LASSERT (list_empty(&peer->ksnp_routes));
588
589                                 list_splice_init(&peer->ksnp_tx_queue, &zombies);
590                         }
591
592                         ksocknal_peer_decref(peer);     /* ...till here */
593
594                         rc = 0;                 /* matched! */
595                 }
596         }
597
598         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
599
600         ksocknal_txlist_done(ni, &zombies, 1);
601
602         return (rc);
603 }
604
605 ksock_conn_t *
606 ksocknal_get_conn_by_idx (lnet_ni_t *ni, int index)
607 {
608         ksock_peer_t      *peer;
609         struct list_head  *ptmp;
610         ksock_conn_t      *conn;
611         struct list_head  *ctmp;
612         int                i;
613
614         read_lock (&ksocknal_data.ksnd_global_lock);
615
616         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
617                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
618                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
619
620                         LASSERT (!peer->ksnp_closing);
621
622                         if (peer->ksnp_ni != ni)
623                                 continue;
624
625                         list_for_each (ctmp, &peer->ksnp_conns) {
626                                 if (index-- > 0)
627                                         continue;
628
629                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
630                                 ksocknal_conn_addref(conn);
631                                 read_unlock (&ksocknal_data.ksnd_global_lock);
632                                 return (conn);
633                         }
634                 }
635         }
636
637         read_unlock (&ksocknal_data.ksnd_global_lock);
638         return (NULL);
639 }
640
641 ksock_sched_t *
642 ksocknal_choose_scheduler_locked (unsigned int irq)
643 {
644         ksock_sched_t    *sched;
645         ksock_irqinfo_t  *info;
646         int               i;
647
648         LASSERT (irq < NR_IRQS);
649         info = &ksocknal_data.ksnd_irqinfo[irq];
650
651         if (irq != 0 &&                         /* hardware NIC */
652             info->ksni_valid) {                 /* already set up */
653                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
654         }
655
656         /* software NIC (irq == 0) || not associated with a scheduler yet.
657          * Choose the CPU with the fewest connections... */
658         sched = &ksocknal_data.ksnd_schedulers[0];
659         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
660                 if (sched->kss_nconns >
661                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
662                         sched = &ksocknal_data.ksnd_schedulers[i];
663
664         if (irq != 0) {                         /* Hardware NIC */
665                 info->ksni_valid = 1;
666                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
667
668                 /* no overflow... */
669                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
670         }
671
672         return (sched);
673 }
674
675 int
676 ksocknal_local_ipvec (lnet_ni_t *ni, __u32 *ipaddrs)
677 {
678         ksock_net_t       *net = ni->ni_data;
679         int                i;
680         int                nip;
681
682         read_lock (&ksocknal_data.ksnd_global_lock);
683
684         nip = net->ksnn_ninterfaces;
685         LASSERT (nip <= LNET_MAX_INTERFACES);
686
687         /* Only offer interfaces for additional connections if I have 
688          * more than one. */
689         if (nip < 2) {
690                 read_unlock (&ksocknal_data.ksnd_global_lock);
691                 return 0;
692         }
693         
694         for (i = 0; i < nip; i++) {
695                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
696                 LASSERT (ipaddrs[i] != 0);
697         }
698
699         read_unlock (&ksocknal_data.ksnd_global_lock);
700         return (nip);
701 }
702
703 int
704 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
705 {
706         int   best_netmatch = 0;
707         int   best_xor      = 0;
708         int   best          = -1;
709         int   this_xor;
710         int   this_netmatch;
711         int   i;
712
713         for (i = 0; i < nips; i++) {
714                 if (ips[i] == 0)
715                         continue;
716
717                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
718                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
719
720                 if (!(best < 0 ||
721                       best_netmatch < this_netmatch ||
722                       (best_netmatch == this_netmatch &&
723                        best_xor > this_xor)))
724                         continue;
725
726                 best = i;
727                 best_netmatch = this_netmatch;
728                 best_xor = this_xor;
729         }
730
731         LASSERT (best >= 0);
732         return (best);
733 }
734
735 int
736 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
737 {
738         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
739         ksock_net_t        *net = peer->ksnp_ni->ni_data;
740         ksock_interface_t  *iface;
741         ksock_interface_t  *best_iface;
742         int                 n_ips;
743         int                 i;
744         int                 j;
745         int                 k;
746         __u32               ip;
747         __u32               xor;
748         int                 this_netmatch;
749         int                 best_netmatch;
750         int                 best_npeers;
751
752         /* CAVEAT EMPTOR: We do all our interface matching with an
753          * exclusive hold of global lock at IRQ priority.  We're only
754          * expecting to be dealing with small numbers of interfaces, so the
755          * O(n**3)-ness shouldn't matter */
756
757         /* Also note that I'm not going to return more than n_peerips
758          * interfaces, even if I have more myself */
759
760         write_lock_bh (global_lock);
761
762         LASSERT (n_peerips <= LNET_MAX_INTERFACES);
763         LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
764
765         /* Only match interfaces for additional connections 
766          * if I have > 1 interface */
767         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
768                 MIN(n_peerips, net->ksnn_ninterfaces);
769
770         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
771                 /*              ^ yes really... */
772
773                 /* If we have any new interfaces, first tick off all the
774                  * peer IPs that match old interfaces, then choose new
775                  * interfaces to match the remaining peer IPS.
776                  * We don't forget interfaces we've stopped using; we might
777                  * start using them again... */
778
779                 if (i < peer->ksnp_n_passive_ips) {
780                         /* Old interface. */
781                         ip = peer->ksnp_passive_ips[i];
782                         best_iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
783
784                         /* peer passive ips are kept up to date */
785                         LASSERT(best_iface != NULL);
786                 } else {
787                         /* choose a new interface */
788                         LASSERT (i == peer->ksnp_n_passive_ips);
789
790                         best_iface = NULL;
791                         best_netmatch = 0;
792                         best_npeers = 0;
793
794                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
795                                 iface = &net->ksnn_interfaces[j];
796                                 ip = iface->ksni_ipaddr;
797
798                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
799                                         if (peer->ksnp_passive_ips[k] == ip)
800                                                 break;
801
802                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
803                                         continue;
804
805                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
806                                 xor = (ip ^ peerips[k]);
807                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
808
809                                 if (!(best_iface == NULL ||
810                                       best_netmatch < this_netmatch ||
811                                       (best_netmatch == this_netmatch &&
812                                        best_npeers > iface->ksni_npeers)))
813                                         continue;
814
815                                 best_iface = iface;
816                                 best_netmatch = this_netmatch;
817                                 best_npeers = iface->ksni_npeers;
818                         }
819
820                         best_iface->ksni_npeers++;
821                         ip = best_iface->ksni_ipaddr;
822                         peer->ksnp_passive_ips[i] = ip;
823                         peer->ksnp_n_passive_ips = i+1;
824                 }
825
826                 LASSERT (best_iface != NULL);
827
828                 /* mark the best matching peer IP used */
829                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
830                 peerips[j] = 0;
831         }
832
833         /* Overwrite input peer IP addresses */
834         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
835
836         write_unlock_bh (global_lock);
837
838         return (n_ips);
839 }
840
841 void
842 ksocknal_create_routes(ksock_peer_t *peer, int port,
843                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
844 {
845         ksock_route_t      *newroute = NULL;
846         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
847         lnet_ni_t          *ni = peer->ksnp_ni;
848         ksock_net_t        *net = ni->ni_data;
849         struct list_head   *rtmp;
850         ksock_route_t      *route;
851         ksock_interface_t  *iface;
852         ksock_interface_t  *best_iface;
853         int                 best_netmatch;
854         int                 this_netmatch;
855         int                 best_nroutes;
856         int                 i;
857         int                 j;
858
859         /* CAVEAT EMPTOR: We do all our interface matching with an
860          * exclusive hold of global lock at IRQ priority.  We're only
861          * expecting to be dealing with small numbers of interfaces, so the
862          * O(n**3)-ness here shouldn't matter */
863
864         write_lock_bh (global_lock);
865
866         if (net->ksnn_ninterfaces < 2) {
867                 /* Only create additional connections 
868                  * if I have > 1 interface */
869                 write_unlock_bh (global_lock);
870                 return;
871         }
872         
873         LASSERT (npeer_ipaddrs <= LNET_MAX_INTERFACES);
874
875         for (i = 0; i < npeer_ipaddrs; i++) {
876                 if (newroute != NULL) {
877                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
878                 } else {
879                         write_unlock_bh (global_lock);
880
881                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
882                         if (newroute == NULL)
883                                 return;
884
885                         write_lock_bh (global_lock);
886                 }
887
888                 if (peer->ksnp_closing) {
889                         /* peer got closed under me */
890                         break;
891                 }
892
893                 /* Already got a route? */
894                 route = NULL;
895                 list_for_each(rtmp, &peer->ksnp_routes) {
896                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
897
898                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
899                                 break;
900
901                         route = NULL;
902                 }
903                 if (route != NULL)
904                         continue;
905
906                 best_iface = NULL;
907                 best_nroutes = 0;
908                 best_netmatch = 0;
909
910                 LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
911
912                 /* Select interface to connect from */
913                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
914                         iface = &net->ksnn_interfaces[j];
915
916                         /* Using this interface already? */
917                         list_for_each(rtmp, &peer->ksnp_routes) {
918                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
919
920                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
921                                         break;
922
923                                 route = NULL;
924                         }
925                         if (route != NULL)
926                                 continue;
927
928                         this_netmatch = (((iface->ksni_ipaddr ^
929                                            newroute->ksnr_ipaddr) &
930                                            iface->ksni_netmask) == 0) ? 1 : 0;
931
932                         if (!(best_iface == NULL ||
933                               best_netmatch < this_netmatch ||
934                               (best_netmatch == this_netmatch &&
935                                best_nroutes > iface->ksni_nroutes)))
936                                 continue;
937
938                         best_iface = iface;
939                         best_netmatch = this_netmatch;
940                         best_nroutes = iface->ksni_nroutes;
941                 }
942
943                 if (best_iface == NULL)
944                         continue;
945
946                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
947                 best_iface->ksni_nroutes++;
948
949                 ksocknal_add_route_locked(peer, newroute);
950                 newroute = NULL;
951         }
952
953         write_unlock_bh (global_lock);
954         if (newroute != NULL)
955                 ksocknal_route_decref(newroute);
956 }
957
958 int
959 ksocknal_accept (lnet_ni_t *ni, cfs_socket_t *sock)
960 {
961         ksock_connreq_t    *cr;
962         int                 rc;
963         __u32               peer_ip;
964         int                 peer_port;
965
966         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
967         LASSERT (rc == 0);                      /* we succeeded before */
968
969         LIBCFS_ALLOC(cr, sizeof(*cr));
970         if (cr == NULL) {
971                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
972                                    "%u.%u.%u.%u: memory exhausted\n",
973                                    HIPQUAD(peer_ip));
974                 return -ENOMEM;
975         }
976
977         lnet_ni_addref(ni);
978         cr->ksncr_ni   = ni;
979         cr->ksncr_sock = sock;
980
981         spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
982
983         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
984         cfs_waitq_signal(&ksocknal_data.ksnd_connd_waitq);
985                         
986         spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
987         return 0;
988 }
989
990 int
991 ksocknal_connecting (ksock_peer_t *peer, __u32 ipaddr) 
992 {
993         ksock_route_t   *route;
994         
995         list_for_each_entry (route, &peer->ksnp_routes, ksnr_list) {
996
997                 if (route->ksnr_ipaddr == ipaddr)
998                         return route->ksnr_connecting;
999         }
1000         return 0;
1001 }
1002
1003 int
1004 ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route, 
1005                       cfs_socket_t *sock, int type)
1006 {
1007         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
1008         CFS_LIST_HEAD     (zombies);
1009         lnet_process_id_t  peerid;
1010         struct list_head  *tmp;
1011         __u64              incarnation;
1012         ksock_conn_t      *conn;
1013         ksock_conn_t      *conn2;
1014         ksock_peer_t      *peer = NULL;
1015         ksock_peer_t      *peer2;
1016         ksock_sched_t     *sched;
1017         ksock_hello_msg_t *hello;
1018         unsigned int       irq;
1019         ksock_tx_t        *tx;
1020         int                rc;
1021         int                active;
1022         char              *warn = NULL;
1023
1024         active = (route != NULL);
1025
1026         LASSERT (active == (type != SOCKLND_CONN_NONE));
1027
1028         irq = ksocknal_lib_sock_irq (sock);
1029
1030         LIBCFS_ALLOC(conn, sizeof(*conn));
1031         if (conn == NULL) {
1032                 rc = -ENOMEM;
1033                 goto failed_0;
1034         }
1035
1036         memset (conn, 0, sizeof (*conn));
1037         conn->ksnc_peer = NULL;
1038         conn->ksnc_route = NULL;
1039         conn->ksnc_sock = sock;
1040         atomic_set (&conn->ksnc_sock_refcount, 1); /* 1 ref for conn */
1041         conn->ksnc_type = type;
1042         ksocknal_lib_save_callback(sock, conn);
1043         atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1044
1045         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(sock);
1046         conn->ksnc_rx_ready = 0;
1047         conn->ksnc_rx_scheduled = 0;
1048
1049         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
1050         conn->ksnc_tx_ready = 0;
1051         conn->ksnc_tx_scheduled = 0;
1052         conn->ksnc_tx_mono = NULL;
1053         atomic_set (&conn->ksnc_tx_nob, 0);
1054
1055         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
1056                                      kshm_ips[LNET_MAX_INTERFACES]));
1057         if (hello == NULL) {
1058                 rc = -ENOMEM;
1059                 goto failed_1;
1060         }
1061
1062         /* stash conn's local and remote addrs */
1063         rc = ksocknal_lib_get_conn_addrs (conn);
1064         if (rc != 0)
1065                 goto failed_1;
1066
1067         /* Find out/confirm peer's NID and connection type and get the
1068          * vector of interfaces she's willing to let me connect to.
1069          * Passive connections use the listener timeout since the peer sends
1070          * eagerly */
1071
1072         if (active) {
1073                 peer = route->ksnr_peer;
1074                 LASSERT(ni == peer->ksnp_ni);
1075
1076                 /* Active connection sends HELLO eagerly */
1077                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1078                 peerid = peer->ksnp_id;
1079
1080                 write_lock_bh(global_lock);
1081                 conn->ksnc_proto = peer->ksnp_proto;
1082                 write_unlock_bh(global_lock);
1083                 
1084                 if (conn->ksnc_proto == NULL) {
1085                         conn->ksnc_proto = &ksocknal_protocol_v2x;
1086 #if SOCKNAL_VERSION_DEBUG
1087                         if (*ksocknal_tunables.ksnd_protocol != 2)
1088                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
1089 #endif
1090                 }
1091
1092                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1093                 if (rc != 0)
1094                         goto failed_1;
1095         } else {
1096                 peerid.nid = LNET_NID_ANY;
1097                 peerid.pid = LNET_PID_ANY;
1098
1099                 /* Passive, get protocol from peer */
1100                 conn->ksnc_proto = NULL;
1101         }
1102
1103         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1104         if (rc < 0)
1105                 goto failed_1;
1106
1107         LASSERT (rc == 0 || active);
1108         LASSERT (conn->ksnc_proto != NULL);
1109         LASSERT (peerid.nid != LNET_NID_ANY);
1110
1111         if (active) {
1112                 ksocknal_peer_addref(peer);
1113                 write_lock_bh (global_lock);
1114         } else {
1115                 rc = ksocknal_create_peer(&peer, ni, peerid);
1116                 if (rc != 0)
1117                         goto failed_1;
1118
1119                 write_lock_bh (global_lock);
1120
1121                 /* called with a ref on ni, so shutdown can't have started */
1122                 LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
1123
1124                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1125                 if (peer2 == NULL) {
1126                         /* NB this puts an "empty" peer in the peer
1127                          * table (which takes my ref) */
1128                         list_add_tail(&peer->ksnp_list,
1129                                       ksocknal_nid2peerlist(peerid.nid));
1130                 } else {
1131                         ksocknal_peer_decref(peer);
1132                         peer = peer2;
1133                 }
1134
1135                 /* +1 ref for me */
1136                 ksocknal_peer_addref(peer);
1137                 peer->ksnp_accepting++;
1138                 
1139                 /* Am I already connecting to this guy?  Resolve in
1140                  * favour of higher NID... */
1141                 if (peerid.nid < ni->ni_nid &&
1142                     ksocknal_connecting(peer, conn->ksnc_ipaddr)) {
1143                         rc = EALREADY;
1144                         warn = "connection race resolution";
1145                         goto failed_2;
1146                 }
1147         }
1148
1149         if (peer->ksnp_closing ||
1150             (active && route->ksnr_deleted)) {
1151                 /* peer/route got closed under me */
1152                 rc = -ESTALE;
1153                 warn = "peer/route removed";
1154                 goto failed_2;
1155         }
1156
1157         if (peer->ksnp_proto == NULL) { 
1158                 /* Never connected before.
1159                  * NB recv_hello may have returned EPROTO to signal my peer
1160                  * wants a different protocol than the one I asked for.
1161                  */
1162                 LASSERT (list_empty(&peer->ksnp_conns));
1163                 
1164                 peer->ksnp_proto = conn->ksnc_proto;
1165                 peer->ksnp_incarnation = incarnation;
1166         }
1167
1168         if (peer->ksnp_proto != conn->ksnc_proto ||
1169             peer->ksnp_incarnation != incarnation) {
1170                 /* Peer rebooted or I've got the wrong protocol version */
1171                 ksocknal_close_peer_conns_locked(peer, 0, 0);
1172
1173                 peer->ksnp_proto = NULL;
1174                 rc = ESTALE;
1175                 warn = peer->ksnp_incarnation != incarnation ? 
1176                        "peer rebooted" :
1177                        "wrong proto version";
1178                 goto failed_2;
1179         }
1180
1181         switch (rc) {
1182         default:
1183                 LBUG();
1184         case 0:
1185                 break;
1186         case EALREADY:
1187                 warn = "lost conn race";
1188                 goto failed_2;
1189         case EPROTO:
1190                 warn = "retry with different protocol version";
1191                 goto failed_2;
1192         }
1193
1194         /* Refuse to duplicate an existing connection, unless this is a
1195          * loopback connection */
1196         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1197                 list_for_each(tmp, &peer->ksnp_conns) {
1198                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1199
1200                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1201                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1202                             conn2->ksnc_type != conn->ksnc_type)
1203                                 continue;
1204
1205                         /* Reply on a passive connection attempt so the peer
1206                          * realises we're connected. */
1207                         LASSERT (rc == 0);
1208                         if (!active)
1209                                 rc = EALREADY;
1210
1211                         warn = "duplicate";
1212                         goto failed_2;
1213                 }
1214         }
1215
1216         /* If the connection created by this route didn't bind to the IP
1217          * address the route connected to, the connection/route matching
1218          * code below probably isn't going to work. */
1219         if (active &&
1220             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1221                 CERROR("Route %s %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1222                        libcfs_id2str(peer->ksnp_id),
1223                        HIPQUAD(route->ksnr_ipaddr),
1224                        HIPQUAD(conn->ksnc_ipaddr));
1225         }
1226
1227         /* Search for a route corresponding to the new connection and
1228          * create an association.  This allows incoming connections created
1229          * by routes in my peer to match my own route entries so I don't
1230          * continually create duplicate routes. */
1231         list_for_each (tmp, &peer->ksnp_routes) {
1232                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1233
1234                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1235                         continue;
1236
1237                 ksocknal_associate_route_conn_locked(route, conn);
1238                 break;
1239         }
1240
1241         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1242         peer->ksnp_last_alive = cfs_time_current();
1243         peer->ksnp_error = 0;
1244
1245         sched = ksocknal_choose_scheduler_locked (irq);
1246         sched->kss_nconns++;
1247         conn->ksnc_scheduler = sched;
1248
1249         /* Set the deadline for the outgoing HELLO to drain */
1250         conn->ksnc_tx_bufnob = SOCK_WMEM_QUEUED(sock);
1251         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1252         mb();       /* order with adding to peer's conn list */
1253
1254         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1255         ksocknal_conn_addref(conn);
1256
1257         ksocknal_new_packet(conn, 0);
1258
1259         /* Take all the packets blocking for a connection.
1260          * NB, it might be nicer to share these blocked packets among any
1261          * other connections that are becoming established. */
1262         while (!list_empty (&peer->ksnp_tx_queue)) {
1263                 tx = list_entry (peer->ksnp_tx_queue.next,
1264                                  ksock_tx_t, tx_list);
1265
1266                 list_del (&tx->tx_list);
1267                 ksocknal_queue_tx_locked (tx, conn);
1268         }
1269
1270         write_unlock_bh (global_lock);
1271
1272         /* We've now got a new connection.  Any errors from here on are just
1273          * like "normal" comms errors and we close the connection normally.
1274          * NB (a) we still have to send the reply HELLO for passive
1275          *        connections, 
1276          *    (b) normal I/O on the conn is blocked until I setup and call the
1277          *        socket callbacks.
1278          */
1279
1280         ksocknal_lib_bind_irq (irq);
1281
1282         CDEBUG(D_NET, "New conn %s p %d.x %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1283                " incarnation:"LPD64" sched[%d]/%d\n",
1284                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1285                HIPQUAD(conn->ksnc_myipaddr), HIPQUAD(conn->ksnc_ipaddr),
1286                conn->ksnc_port, incarnation,
1287                (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1288
1289         if (active) {
1290                 /* additional routes after interface exchange? */
1291                 ksocknal_create_routes(peer, conn->ksnc_port,
1292                                        hello->kshm_ips, hello->kshm_nips);
1293         } else {
1294                 hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
1295                                                        hello->kshm_nips);
1296                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1297         }
1298         
1299         LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1300                                     kshm_ips[LNET_MAX_INTERFACES]));
1301
1302         /* setup the socket AFTER I've received hello (it disables
1303          * SO_LINGER).  I might call back to the acceptor who may want
1304          * to send a protocol version response and then close the
1305          * socket; this ensures the socket only tears down after the
1306          * response has been sent. */
1307         if (rc == 0)
1308                 rc = ksocknal_lib_setup_sock(sock);
1309
1310         write_lock_bh(global_lock);
1311
1312         /* NB my callbacks block while I hold ksnd_global_lock */
1313         ksocknal_lib_set_callback(sock, conn);
1314
1315         if (!active)
1316                 peer->ksnp_accepting--;
1317
1318         write_unlock_bh(global_lock);
1319
1320         if (rc != 0) {
1321                 write_lock_bh(global_lock);
1322                 ksocknal_close_conn_locked(conn, rc);
1323                 write_unlock_bh(global_lock);
1324         } else if (ksocknal_connsock_addref(conn) == 0) {
1325                 /* Allow I/O to proceed. */
1326                 ksocknal_read_callback(conn);
1327                 ksocknal_write_callback(conn);
1328                 ksocknal_connsock_decref(conn);
1329         }
1330
1331         ksocknal_conn_decref(conn);
1332         return rc;
1333
1334  failed_2:
1335         if (!peer->ksnp_closing &&
1336             list_empty (&peer->ksnp_conns) &&
1337             list_empty (&peer->ksnp_routes)) {
1338                 list_add(&zombies, &peer->ksnp_tx_queue);
1339                 list_del_init(&peer->ksnp_tx_queue);
1340                 ksocknal_unlink_peer_locked(peer);
1341         }
1342         
1343         write_unlock_bh (global_lock);
1344
1345         if (warn != NULL) {
1346                 if (rc < 0)
1347                         CERROR("Not creating conn %s type %d: %s\n",
1348                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1349                 else
1350                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1351                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1352         }
1353
1354         if (!active) {
1355                 if (rc > 0) {
1356                         /* Request retry by replying with CONN_NONE 
1357                          * ksnc_proto has been set already */
1358                         conn->ksnc_type = SOCKLND_CONN_NONE;
1359                         hello->kshm_nips = 0;
1360                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1361                 }
1362
1363                 write_lock_bh(global_lock);
1364                 peer->ksnp_accepting--;
1365                 write_unlock_bh(global_lock);
1366         }
1367         
1368         ksocknal_txlist_done(ni, &zombies, 1);
1369         ksocknal_peer_decref(peer);
1370
1371  failed_1:
1372         if (hello != NULL)
1373                 LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1374                                             kshm_ips[LNET_MAX_INTERFACES]));
1375
1376         LIBCFS_FREE (conn, sizeof(*conn));
1377
1378  failed_0:
1379         libcfs_sock_release(sock);
1380         return rc;
1381 }
1382
1383 void
1384 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1385 {
1386         /* This just does the immmediate housekeeping, and queues the
1387          * connection for the reaper to terminate.
1388          * Caller holds ksnd_global_lock exclusively in irq context */
1389         ksock_peer_t      *peer = conn->ksnc_peer;
1390         ksock_route_t     *route;
1391         ksock_conn_t      *conn2;
1392         struct list_head  *tmp;
1393
1394         LASSERT (peer->ksnp_error == 0);
1395         LASSERT (!conn->ksnc_closing);
1396         conn->ksnc_closing = 1;
1397
1398         /* ksnd_deathrow_conns takes over peer's ref */
1399         list_del (&conn->ksnc_list);
1400
1401         route = conn->ksnc_route;
1402         if (route != NULL) {
1403                 /* dissociate conn from route... */
1404                 LASSERT (!route->ksnr_deleted);
1405                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1406
1407                 conn2 = NULL;
1408                 list_for_each(tmp, &peer->ksnp_conns) {
1409                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1410
1411                         if (conn2->ksnc_route == route &&
1412                             conn2->ksnc_type == conn->ksnc_type)
1413                                 break;
1414
1415                         conn2 = NULL;
1416                 }
1417                 if (conn2 == NULL)
1418                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1419
1420                 conn->ksnc_route = NULL;
1421
1422 #if 0           /* irrelevent with only eager routes */
1423                 list_del (&route->ksnr_list);   /* make route least favourite */
1424                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1425 #endif
1426                 ksocknal_route_decref(route);     /* drop conn's ref on route */
1427         }
1428
1429         if (list_empty (&peer->ksnp_conns)) {
1430                 /* No more connections to this peer */
1431
1432                 peer->ksnp_proto = NULL;        /* renegotiate protocol version */
1433                 peer->ksnp_error = error;       /* stash last conn close reason */
1434
1435                 if (list_empty (&peer->ksnp_routes)) {
1436                         /* I've just closed last conn belonging to a
1437                          * peer with no routes to it */
1438                         ksocknal_unlink_peer_locked (peer);
1439                 }
1440         }
1441
1442         spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1443
1444         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1445         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1446
1447         spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1448 }
1449
1450 void
1451 ksocknal_peer_failed (ksock_peer_t *peer)
1452 {
1453         time_t    last_alive = 0;
1454         int       notify = 0;
1455
1456         /* There has been a connection failure or comms error; but I'll only
1457          * tell LNET I think the peer is dead if it's to another kernel and
1458          * there are no connections or connection attempts in existance. */
1459         
1460         read_lock (&ksocknal_data.ksnd_global_lock);
1461
1462         if ((peer->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1463             list_empty(&peer->ksnp_conns) &&
1464             peer->ksnp_accepting == 0 &&
1465             ksocknal_find_connecting_route_locked(peer) == NULL) {
1466                 notify = 1;
1467                 last_alive = cfs_time_seconds(peer->ksnp_last_alive);
1468         }
1469         
1470         read_unlock (&ksocknal_data.ksnd_global_lock);
1471
1472         if (notify)
1473                 lnet_notify (peer->ksnp_ni, peer->ksnp_id.nid, 0,
1474                              last_alive);
1475 }
1476
1477 void
1478 ksocknal_terminate_conn (ksock_conn_t *conn)
1479 {
1480         /* This gets called by the reaper (guaranteed thread context) to
1481          * disengage the socket from its callbacks and close it.
1482          * ksnc_refcount will eventually hit zero, and then the reaper will
1483          * destroy it. */
1484         ksock_peer_t     *peer = conn->ksnc_peer;
1485         ksock_sched_t    *sched = conn->ksnc_scheduler;
1486         int               failed = 0;
1487         struct list_head *tmp;
1488         struct list_head *nxt;
1489         ksock_tx_t       *tx;
1490         LIST_HEAD        (zlist);
1491
1492         LASSERT(conn->ksnc_closing);
1493
1494         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1495         spin_lock_bh (&sched->kss_lock);
1496
1497         if (!conn->ksnc_tx_scheduled &&
1498             !list_empty(&conn->ksnc_tx_queue)){
1499                 list_add_tail (&conn->ksnc_tx_list,
1500                                &sched->kss_tx_conns);
1501                 /* a closing conn is always ready to tx */
1502                 conn->ksnc_tx_ready = 1;
1503                 conn->ksnc_tx_scheduled = 1;
1504                 /* extra ref for scheduler */
1505                 ksocknal_conn_addref(conn);
1506
1507                 cfs_waitq_signal (&sched->kss_waitq);
1508         }
1509
1510         spin_unlock_bh (&sched->kss_lock);
1511
1512         spin_lock(&peer->ksnp_lock);
1513
1514         list_for_each_safe(tmp, nxt, &peer->ksnp_zc_req_list) {
1515                 tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
1516
1517                 if (tx->tx_conn != conn)
1518                         continue;
1519
1520                 LASSERT (tx->tx_msg.ksm_zc_req_cookie != 0);
1521
1522                 tx->tx_msg.ksm_zc_req_cookie = 0;
1523                 list_del(&tx->tx_zc_list);
1524                 list_add(&tx->tx_zc_list, &zlist);
1525         }
1526
1527         spin_unlock(&peer->ksnp_lock);
1528
1529         list_for_each_safe(tmp, nxt, &zlist) {
1530                 tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
1531
1532                 list_del(&tx->tx_zc_list);
1533                 ksocknal_tx_decref(tx);
1534         }
1535
1536         /* serialise with callbacks */
1537         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1538
1539         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1540
1541         /* OK, so this conn may not be completely disengaged from its
1542          * scheduler yet, but it _has_ committed to terminate... */
1543         conn->ksnc_scheduler->kss_nconns--;
1544
1545         if (peer->ksnp_error != 0) {
1546                 /* peer's last conn closed in error */
1547                 LASSERT (list_empty (&peer->ksnp_conns));
1548                 failed = 1;
1549                 peer->ksnp_error = 0;     /* avoid multiple notifications */
1550         }
1551
1552         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1553
1554         if (failed)
1555                 ksocknal_peer_failed(peer);
1556
1557         /* The socket is closed on the final put; either here, or in
1558          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1559          * when the connection was established, this will close the socket
1560          * immediately, aborting anything buffered in it. Any hung
1561          * zero-copy transmits will therefore complete in finite time. */
1562         ksocknal_connsock_decref(conn);
1563 }
1564
1565 void
1566 ksocknal_queue_zombie_conn (ksock_conn_t *conn)
1567 {
1568         /* Queue the conn for the reaper to destroy */
1569
1570         LASSERT (atomic_read(&conn->ksnc_conn_refcount) == 0);
1571         spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1572
1573         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1574         cfs_waitq_signal(&ksocknal_data.ksnd_reaper_waitq);
1575         
1576         spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1577 }
1578
1579 void
1580 ksocknal_destroy_conn (ksock_conn_t *conn)
1581 {
1582         /* Final coup-de-grace of the reaper */
1583         CDEBUG (D_NET, "connection %p\n", conn);
1584
1585         LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);
1586         LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);
1587         LASSERT (conn->ksnc_sock == NULL);
1588         LASSERT (conn->ksnc_route == NULL);
1589         LASSERT (!conn->ksnc_tx_scheduled);
1590         LASSERT (!conn->ksnc_rx_scheduled);
1591         LASSERT (list_empty(&conn->ksnc_tx_queue));
1592
1593         /* complete current receive if any */
1594         switch (conn->ksnc_rx_state) {
1595         case SOCKNAL_RX_LNET_PAYLOAD:
1596                 CERROR("Completing partial receive from %s"
1597                        ", ip %d.%d.%d.%d:%d, with error\n",
1598                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1599                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1600                 lnet_finalize (conn->ksnc_peer->ksnp_ni, 
1601                                conn->ksnc_cookie, -EIO);
1602                 break;
1603         case SOCKNAL_RX_LNET_HEADER:
1604                 if (conn->ksnc_rx_started)
1605                         CERROR("Incomplete receive of lnet header from %s"
1606                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1607                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1608                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1609                                conn->ksnc_proto->pro_version);
1610                 break;
1611         case SOCKNAL_RX_KSM_HEADER:
1612                 if (conn->ksnc_rx_started)
1613                         CERROR("Incomplete receive of ksock message from %s"
1614                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1615                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1616                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1617                                conn->ksnc_proto->pro_version);
1618                 break;
1619         case SOCKNAL_RX_SLOP:
1620                 if (conn->ksnc_rx_started)
1621                         CERROR("Incomplete receive of slops from %s"
1622                                ", ip %d.%d.%d.%d:%d, with error\n",
1623                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1624                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1625                break;
1626         default:
1627                 LBUG ();
1628                 break;
1629         }
1630
1631         ksocknal_peer_decref(conn->ksnc_peer);
1632
1633         LIBCFS_FREE (conn, sizeof (*conn));
1634 }
1635
1636 int
1637 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1638 {
1639         ksock_conn_t       *conn;
1640         struct list_head   *ctmp;
1641         struct list_head   *cnxt;
1642         int                 count = 0;
1643
1644         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1645                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1646
1647                 if (ipaddr == 0 ||
1648                     conn->ksnc_ipaddr == ipaddr) {
1649                         count++;
1650                         ksocknal_close_conn_locked (conn, why);
1651                 }
1652         }
1653
1654         return (count);
1655 }
1656
1657 int
1658 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1659 {
1660         ksock_peer_t     *peer = conn->ksnc_peer;
1661         __u32             ipaddr = conn->ksnc_ipaddr;
1662         int               count;
1663
1664         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1665
1666         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1667
1668         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1669
1670         return (count);
1671 }
1672
1673 int
1674 ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr)
1675 {
1676         ksock_peer_t       *peer;
1677         struct list_head   *ptmp;
1678         struct list_head   *pnxt;
1679         int                 lo;
1680         int                 hi;
1681         int                 i;
1682         int                 count = 0;
1683
1684         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1685
1686         if (id.nid != LNET_NID_ANY)
1687                 lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;
1688         else {
1689                 lo = 0;
1690                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1691         }
1692
1693         for (i = lo; i <= hi; i++) {
1694                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1695
1696                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1697
1698                         if (!((id.nid == LNET_NID_ANY || id.nid == peer->ksnp_id.nid) &&
1699                               (id.pid == LNET_PID_ANY || id.pid == peer->ksnp_id.pid)))
1700                                 continue;
1701
1702                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1703                 }
1704         }
1705
1706         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1707
1708         /* wildcards always succeed */
1709         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1710                 return (0);
1711
1712         return (count == 0 ? -ENOENT : 0);
1713 }
1714
1715 void
1716 ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive)
1717 {
1718         /* The router is telling me she's been notified of a change in
1719          * gateway state.... */
1720         lnet_process_id_t  id = {.nid = gw_nid, .pid = LNET_PID_ANY};
1721
1722         CDEBUG (D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid), 
1723                 alive ? "up" : "down");
1724
1725         if (!alive) {
1726                 /* If the gateway crashed, close all open connections... */
1727                 ksocknal_close_matching_conns (id, 0);
1728                 return;
1729         }
1730
1731         /* ...otherwise do nothing.  We can only establish new connections
1732          * if we have autroutes, and these connect on demand. */
1733 }
1734
1735 void
1736 ksocknal_push_peer (ksock_peer_t *peer)
1737 {
1738         int               index;
1739         int               i;
1740         struct list_head *tmp;
1741         ksock_conn_t     *conn;
1742
1743         for (index = 0; ; index++) {
1744                 read_lock (&ksocknal_data.ksnd_global_lock);
1745
1746                 i = 0;
1747                 conn = NULL;
1748
1749                 list_for_each (tmp, &peer->ksnp_conns) {
1750                         if (i++ == index) {
1751                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1752                                 ksocknal_conn_addref(conn);
1753                                 break;
1754                         }
1755                 }
1756
1757                 read_unlock (&ksocknal_data.ksnd_global_lock);
1758
1759                 if (conn == NULL)
1760                         break;
1761
1762                 ksocknal_lib_push_conn (conn);
1763                 ksocknal_conn_decref(conn);
1764         }
1765 }
1766
1767 int
1768 ksocknal_push (lnet_ni_t *ni, lnet_process_id_t id)
1769 {
1770         ksock_peer_t      *peer;
1771         struct list_head  *tmp;
1772         int                index;
1773         int                i;
1774         int                j;
1775         int                rc = -ENOENT;
1776
1777         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1778                 for (j = 0; ; j++) {
1779                         read_lock (&ksocknal_data.ksnd_global_lock);
1780
1781                         index = 0;
1782                         peer = NULL;
1783
1784                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1785                                 peer = list_entry(tmp, ksock_peer_t,
1786                                                   ksnp_list);
1787
1788                                 if (!((id.nid == LNET_NID_ANY ||
1789                                        id.nid == peer->ksnp_id.nid) &&
1790                                       (id.pid == LNET_PID_ANY ||
1791                                        id.pid == peer->ksnp_id.pid))) {
1792                                         peer = NULL;
1793                                         continue;
1794                                 }
1795
1796                                 if (index++ == j) {
1797                                         ksocknal_peer_addref(peer);
1798                                         break;
1799                                 }
1800                         }
1801
1802                         read_unlock (&ksocknal_data.ksnd_global_lock);
1803
1804                         if (peer != NULL) {
1805                                 rc = 0;
1806                                 ksocknal_push_peer (peer);
1807                                 ksocknal_peer_decref(peer);
1808                         }
1809                 }
1810
1811         }
1812
1813         return (rc);
1814 }
1815
1816 int
1817 ksocknal_add_interface(lnet_ni_t *ni, __u32 ipaddress, __u32 netmask)
1818 {
1819         ksock_net_t       *net = ni->ni_data;
1820         ksock_interface_t *iface;
1821         int                rc;
1822         int                i;
1823         int                j;
1824         struct list_head  *ptmp;
1825         ksock_peer_t      *peer;
1826         struct list_head  *rtmp;
1827         ksock_route_t     *route;
1828
1829         if (ipaddress == 0 ||
1830             netmask == 0)
1831                 return (-EINVAL);
1832
1833         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1834
1835         iface = ksocknal_ip2iface(ni, ipaddress);
1836         if (iface != NULL) {
1837                 /* silently ignore dups */
1838                 rc = 0;
1839         } else if (net->ksnn_ninterfaces == LNET_MAX_INTERFACES) {
1840                 rc = -ENOSPC;
1841         } else {
1842                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1843
1844                 iface->ksni_ipaddr = ipaddress;
1845                 iface->ksni_netmask = netmask;
1846                 iface->ksni_nroutes = 0;
1847                 iface->ksni_npeers = 0;
1848
1849                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1850                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1851                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1852
1853                                 for (j = 0; j < peer->ksnp_n_passive_ips; j++)
1854                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1855                                                 iface->ksni_npeers++;
1856
1857                                 list_for_each(rtmp, &peer->ksnp_routes) {
1858                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1859
1860                                         if (route->ksnr_myipaddr == ipaddress)
1861                                                 iface->ksni_nroutes++;
1862                                 }
1863                         }
1864                 }
1865
1866                 rc = 0;
1867                 /* NB only new connections will pay attention to the new interface! */
1868         }
1869
1870         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1871
1872         return (rc);
1873 }
1874
1875 void
1876 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1877 {
1878         struct list_head   *tmp;
1879         struct list_head   *nxt;
1880         ksock_route_t      *route;
1881         ksock_conn_t       *conn;
1882         int                 i;
1883         int                 j;
1884
1885         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1886                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1887                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1888                                 peer->ksnp_passive_ips[j-1] =
1889                                         peer->ksnp_passive_ips[j];
1890                         peer->ksnp_n_passive_ips--;
1891                         break;
1892                 }
1893
1894         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1895                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1896
1897                 if (route->ksnr_myipaddr != ipaddr)
1898                         continue;
1899
1900                 if (route->ksnr_share_count != 0) {
1901                         /* Manually created; keep, but unbind */
1902                         route->ksnr_myipaddr = 0;
1903                 } else {
1904                         ksocknal_del_route_locked(route);
1905                 }
1906         }
1907
1908         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1909                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1910
1911                 if (conn->ksnc_myipaddr == ipaddr)
1912                         ksocknal_close_conn_locked (conn, 0);
1913         }
1914 }
1915
1916 int
1917 ksocknal_del_interface(lnet_ni_t *ni, __u32 ipaddress)
1918 {
1919         ksock_net_t       *net = ni->ni_data;
1920         int                rc = -ENOENT;
1921         struct list_head  *tmp;
1922         struct list_head  *nxt;
1923         ksock_peer_t      *peer;
1924         __u32              this_ip;
1925         int                i;
1926         int                j;
1927
1928         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1929
1930         for (i = 0; i < net->ksnn_ninterfaces; i++) {
1931                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
1932
1933                 if (!(ipaddress == 0 ||
1934                       ipaddress == this_ip))
1935                         continue;
1936
1937                 rc = 0;
1938
1939                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
1940                         net->ksnn_interfaces[j-1] =
1941                                 net->ksnn_interfaces[j];
1942
1943                 net->ksnn_ninterfaces--;
1944
1945                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1946                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1947                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1948
1949                                 if (peer->ksnp_ni != ni)
1950                                         continue;
1951
1952                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1953                         }
1954                 }
1955         }
1956
1957         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1958
1959         return (rc);
1960 }
1961
1962 int
1963 ksocknal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1964 {
1965         struct libcfs_ioctl_data *data = arg;
1966         int rc;
1967
1968         switch(cmd) {
1969         case IOC_LIBCFS_GET_INTERFACE: {
1970                 ksock_net_t       *net = ni->ni_data;
1971                 ksock_interface_t *iface;
1972
1973                 read_lock (&ksocknal_data.ksnd_global_lock);
1974
1975                 if (data->ioc_count < 0 ||
1976                     data->ioc_count >= net->ksnn_ninterfaces) {
1977                         rc = -ENOENT;
1978                 } else {
1979                         rc = 0;
1980                         iface = &net->ksnn_interfaces[data->ioc_count];
1981
1982                         data->ioc_u32[0] = iface->ksni_ipaddr;
1983                         data->ioc_u32[1] = iface->ksni_netmask;
1984                         data->ioc_u32[2] = iface->ksni_npeers;
1985                         data->ioc_u32[3] = iface->ksni_nroutes;
1986                 }
1987
1988                 read_unlock (&ksocknal_data.ksnd_global_lock);
1989                 return rc;
1990         }
1991
1992         case IOC_LIBCFS_ADD_INTERFACE:
1993                 return ksocknal_add_interface(ni,
1994                                               data->ioc_u32[0], /* IP address */
1995                                               data->ioc_u32[1]); /* net mask */
1996
1997         case IOC_LIBCFS_DEL_INTERFACE:
1998                 return ksocknal_del_interface(ni, 
1999                                               data->ioc_u32[0]); /* IP address */
2000
2001         case IOC_LIBCFS_GET_PEER: {
2002                 lnet_process_id_t id = {0,};
2003                 __u32            myip = 0;
2004                 __u32            ip = 0;
2005                 int              port = 0;
2006                 int              conn_count = 0;
2007                 int              share_count = 0;
2008
2009                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2010                                             &id, &myip, &ip, &port,
2011                                             &conn_count,  &share_count);
2012                 if (rc != 0)
2013                         return rc;
2014                         
2015                 data->ioc_nid    = id.nid;
2016                 data->ioc_count  = share_count;
2017                 data->ioc_u32[0] = ip;
2018                 data->ioc_u32[1] = port;
2019                 data->ioc_u32[2] = myip;
2020                 data->ioc_u32[3] = conn_count;
2021                 data->ioc_u32[4] = id.pid;
2022                 return 0;
2023         }
2024
2025         case IOC_LIBCFS_ADD_PEER: {
2026                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2027                                          .pid = LUSTRE_SRV_LNET_PID};
2028                 return ksocknal_add_peer (ni, id,
2029                                           data->ioc_u32[0], /* IP */
2030                                           data->ioc_u32[1]); /* port */
2031         }
2032         case IOC_LIBCFS_DEL_PEER: {
2033                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2034                                          .pid = LNET_PID_ANY};
2035                 return ksocknal_del_peer (ni, id,
2036                                           data->ioc_u32[0]); /* IP */
2037         }
2038         case IOC_LIBCFS_GET_CONN: {
2039                 int           txmem;
2040                 int           rxmem;
2041                 int           nagle;
2042                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (ni, data->ioc_count);
2043
2044                 if (conn == NULL)
2045                         return -ENOENT;
2046
2047                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2048
2049                 data->ioc_count  = txmem;
2050                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2051                 data->ioc_flags  = nagle;
2052                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2053                 data->ioc_u32[1] = conn->ksnc_port;
2054                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2055                 data->ioc_u32[3] = conn->ksnc_type;
2056                 data->ioc_u32[4] = conn->ksnc_scheduler -
2057                                    ksocknal_data.ksnd_schedulers;
2058                 data->ioc_u32[5] = rxmem;
2059                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2060                 ksocknal_conn_decref(conn);
2061                 return 0;
2062         }
2063
2064         case IOC_LIBCFS_CLOSE_CONNECTION: {
2065                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2066                                         .pid = LNET_PID_ANY};
2067
2068                 return ksocknal_close_matching_conns (id,
2069                                                       data->ioc_u32[0]);
2070         }
2071         case IOC_LIBCFS_REGISTER_MYNID:
2072                 /* Ignore if this is a noop */
2073                 if (data->ioc_nid == ni->ni_nid)
2074                         return 0;
2075
2076                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2077                        libcfs_nid2str(data->ioc_nid),
2078                        libcfs_nid2str(ni->ni_nid));
2079                 return -EINVAL;
2080
2081         case IOC_LIBCFS_PUSH_CONNECTION: {
2082                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2083                                         .pid = LNET_PID_ANY};
2084                 
2085                 return ksocknal_push(ni, id);
2086         }
2087         default:
2088                 return -EINVAL;
2089         }
2090         /* not reached */
2091 }
2092
2093 void
2094 ksocknal_free_buffers (void)
2095 {
2096         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2097
2098         if (ksocknal_data.ksnd_schedulers != NULL)
2099                 LIBCFS_FREE (ksocknal_data.ksnd_schedulers,
2100                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2101
2102         LIBCFS_FREE (ksocknal_data.ksnd_peers,
2103                      sizeof (struct list_head) *
2104                      ksocknal_data.ksnd_peer_hash_size);
2105
2106         spin_lock(&ksocknal_data.ksnd_tx_lock);
2107
2108         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2109                 struct list_head  zlist;
2110                 ksock_tx_t       *tx;
2111
2112                 list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2113                 list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2114                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2115
2116                 while(!list_empty(&zlist)) {
2117                         tx = list_entry(zlist.next, ksock_tx_t, tx_list);
2118                         list_del(&tx->tx_list);
2119                         LIBCFS_FREE(tx, tx->tx_desc_size);
2120                 }
2121         } else {
2122                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2123         }
2124 }
2125
2126 void
2127 ksocknal_base_shutdown (void)
2128 {
2129         ksock_sched_t *sched;
2130         int            i;
2131
2132         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2133                atomic_read (&libcfs_kmemory));
2134         LASSERT (ksocknal_data.ksnd_nnets == 0);
2135
2136         switch (ksocknal_data.ksnd_init) {
2137         default:
2138                 LASSERT (0);
2139
2140         case SOCKNAL_INIT_ALL:
2141         case SOCKNAL_INIT_DATA:
2142                 LASSERT (ksocknal_data.ksnd_peers != NULL);
2143                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2144                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
2145                 }
2146                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
2147                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
2148                 LASSERT (list_empty (&ksocknal_data.ksnd_connd_connreqs));
2149                 LASSERT (list_empty (&ksocknal_data.ksnd_connd_routes));
2150
2151                 if (ksocknal_data.ksnd_schedulers != NULL)
2152                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2153                                 ksock_sched_t *kss =
2154                                         &ksocknal_data.ksnd_schedulers[i];
2155
2156                                 LASSERT (list_empty (&kss->kss_tx_conns));
2157                                 LASSERT (list_empty (&kss->kss_rx_conns));
2158                                 LASSERT (list_empty (&kss->kss_zombie_noop_txs));
2159                                 LASSERT (kss->kss_nconns == 0);
2160                         }
2161
2162                 /* flag threads to terminate; wake and wait for them to die */
2163                 ksocknal_data.ksnd_shuttingdown = 1;
2164                 cfs_waitq_broadcast (&ksocknal_data.ksnd_connd_waitq);
2165                 cfs_waitq_broadcast (&ksocknal_data.ksnd_reaper_waitq);
2166
2167                 if (ksocknal_data.ksnd_schedulers != NULL)
2168                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2169                                 sched = &ksocknal_data.ksnd_schedulers[i];
2170                                 cfs_waitq_broadcast(&sched->kss_waitq);
2171                         }
2172
2173                 i = 4;
2174                 read_lock (&ksocknal_data.ksnd_global_lock);
2175                 while (ksocknal_data.ksnd_nthreads != 0) {
2176                         i++;
2177                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2178                                "waiting for %d threads to terminate\n",
2179                                 ksocknal_data.ksnd_nthreads);
2180                         read_unlock (&ksocknal_data.ksnd_global_lock);
2181                         cfs_pause(cfs_time_seconds(1));
2182                         read_lock (&ksocknal_data.ksnd_global_lock);
2183                 }
2184                 read_unlock (&ksocknal_data.ksnd_global_lock);
2185
2186                 ksocknal_free_buffers();
2187
2188                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2189                 break;
2190         }
2191
2192         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2193                atomic_read (&libcfs_kmemory));
2194
2195         PORTAL_MODULE_UNUSE;
2196 }
2197
2198
2199 __u64
2200 ksocknal_new_incarnation (void)
2201 {
2202         struct timeval tv;
2203
2204         /* The incarnation number is the time this module loaded and it
2205          * identifies this particular instance of the socknal.  Hopefully
2206          * we won't be able to reboot more frequently than 1MHz for the
2207          * forseeable future :) */
2208
2209         do_gettimeofday(&tv);
2210
2211         return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2212 }
2213
2214 int
2215 ksocknal_base_startup (void)
2216 {
2217         int               rc;
2218         int               i;
2219
2220         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2221         LASSERT (ksocknal_data.ksnd_nnets == 0);
2222
2223         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2224
2225         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2226         LIBCFS_ALLOC (ksocknal_data.ksnd_peers,
2227                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2228         if (ksocknal_data.ksnd_peers == NULL)
2229                 return -ENOMEM;
2230
2231         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2232                 CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2233
2234         rwlock_init(&ksocknal_data.ksnd_global_lock);
2235
2236         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2237         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2238         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2239         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2240         cfs_waitq_init(&ksocknal_data.ksnd_reaper_waitq);
2241
2242         spin_lock_init (&ksocknal_data.ksnd_connd_lock);
2243         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_connreqs);
2244         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_routes);
2245         cfs_waitq_init(&ksocknal_data.ksnd_connd_waitq);
2246
2247         spin_lock_init (&ksocknal_data.ksnd_tx_lock);
2248         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_idle_noop_txs);
2249
2250         /* NB memset above zeros whole of ksocknal_data, including
2251          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2252
2253         /* flag lists/ptrs/locks initialised */
2254         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2255         PORTAL_MODULE_USE;
2256
2257         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2258         LIBCFS_ALLOC(ksocknal_data.ksnd_schedulers,
2259                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2260         if (ksocknal_data.ksnd_schedulers == NULL)
2261                 goto failed;
2262
2263         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2264                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2265
2266                 spin_lock_init (&kss->kss_lock);
2267                 CFS_INIT_LIST_HEAD (&kss->kss_rx_conns);
2268                 CFS_INIT_LIST_HEAD (&kss->kss_tx_conns);
2269                 CFS_INIT_LIST_HEAD (&kss->kss_zombie_noop_txs);
2270                 cfs_waitq_init (&kss->kss_waitq);
2271         }
2272
2273         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2274                 rc = ksocknal_thread_start (ksocknal_scheduler,
2275                                             &ksocknal_data.ksnd_schedulers[i]);
2276                 if (rc != 0) {
2277                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2278                                i, rc);
2279                         goto failed;
2280                 }
2281         }
2282
2283         /* must have at least 2 connds to remain responsive to accepts while
2284          * connecting */
2285         if (*ksocknal_tunables.ksnd_nconnds < 2)
2286                 *ksocknal_tunables.ksnd_nconnds = 2;
2287         
2288         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2289                 rc = ksocknal_thread_start (ksocknal_connd, (void *)((long)i));
2290                 if (rc != 0) {
2291                         CERROR("Can't spawn socknal connd: %d\n", rc);
2292                         goto failed;
2293                 }
2294         }
2295
2296         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2297         if (rc != 0) {
2298                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2299                 goto failed;
2300         }
2301
2302         /* flag everything initialised */
2303         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2304
2305         return 0;
2306
2307  failed:
2308         ksocknal_base_shutdown();
2309         return -ENETDOWN;
2310 }
2311
2312 void
2313 ksocknal_debug_peerhash (lnet_ni_t *ni)
2314 {
2315         ksock_peer_t     *peer = NULL;
2316         struct list_head *tmp;
2317         int               i;
2318
2319         read_lock (&ksocknal_data.ksnd_global_lock);
2320
2321         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2322                 list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
2323                         peer = list_entry (tmp, ksock_peer_t, ksnp_list);
2324
2325                         if (peer->ksnp_ni == ni) break;
2326
2327                         peer = NULL;
2328                 }
2329         }
2330
2331         if (peer != NULL) {
2332                 ksock_route_t *route;
2333                 ksock_conn_t  *conn;
2334
2335                 CWARN ("Active peer on shutdown: %s, ref %d, scnt %d, "
2336                        "closing %d, accepting %d, err %d, zcookie "LPU64", "
2337                        "txq %d, zc_req %d\n", libcfs_id2str(peer->ksnp_id),
2338                        atomic_read(&peer->ksnp_refcount),
2339                        peer->ksnp_sharecount, peer->ksnp_closing,
2340                        peer->ksnp_accepting, peer->ksnp_error,
2341                        peer->ksnp_zc_next_cookie,
2342                        !list_empty(&peer->ksnp_tx_queue),
2343                        !list_empty(&peer->ksnp_zc_req_list));
2344
2345                 list_for_each (tmp, &peer->ksnp_routes) {
2346                         route = list_entry(tmp, ksock_route_t, ksnr_list);
2347                         CWARN ("Route: ref %d, schd %d, conn %d, cnted %d, "
2348                                "del %d\n", atomic_read(&route->ksnr_refcount),
2349                                route->ksnr_scheduled, route->ksnr_connecting,
2350                                route->ksnr_connected, route->ksnr_deleted);
2351                 }
2352
2353                 list_for_each (tmp, &peer->ksnp_conns) {
2354                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
2355                         CWARN ("Conn: ref %d, sref %d, t %d, c %d\n",
2356                                atomic_read(&conn->ksnc_conn_refcount),
2357                                atomic_read(&conn->ksnc_sock_refcount),
2358                                conn->ksnc_type, conn->ksnc_closing);
2359                 }
2360         }
2361
2362         read_unlock (&ksocknal_data.ksnd_global_lock);
2363         return;
2364 }
2365
2366 void
2367 ksocknal_shutdown (lnet_ni_t *ni)
2368 {
2369         ksock_net_t      *net = ni->ni_data;
2370         int               i;
2371         lnet_process_id_t  anyid = {.nid = LNET_NID_ANY,
2372                                    .pid = LNET_PID_ANY};
2373
2374         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2375         LASSERT(ksocknal_data.ksnd_nnets > 0);
2376
2377         spin_lock_bh (&net->ksnn_lock);
2378         net->ksnn_shutdown = 1;                 /* prevent new peers */
2379         spin_unlock_bh (&net->ksnn_lock);
2380
2381         /* Delete all peers */
2382         ksocknal_del_peer(ni, anyid, 0);
2383
2384         /* Wait for all peer state to clean up */
2385         i = 2;
2386         spin_lock_bh (&net->ksnn_lock);
2387         while (net->ksnn_npeers != 0) {
2388                 spin_unlock_bh (&net->ksnn_lock);
2389
2390                 i++;
2391                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2392                        "waiting for %d peers to disconnect\n",
2393                        net->ksnn_npeers);
2394                 cfs_pause(cfs_time_seconds(1));
2395
2396                 ksocknal_debug_peerhash(ni);
2397
2398                 spin_lock_bh (&net->ksnn_lock);
2399         }
2400         spin_unlock_bh (&net->ksnn_lock);
2401
2402         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2403                 LASSERT (net->ksnn_interfaces[i].ksni_npeers == 0);
2404                 LASSERT (net->ksnn_interfaces[i].ksni_nroutes == 0);
2405         }
2406
2407         LIBCFS_FREE(net, sizeof(*net));
2408         
2409         ksocknal_data.ksnd_nnets--;
2410         if (ksocknal_data.ksnd_nnets == 0)
2411                 ksocknal_base_shutdown();
2412 }
2413
2414 int
2415 ksocknal_enumerate_interfaces(ksock_net_t *net)
2416 {
2417         char      **names;
2418         int         i;
2419         int         j;
2420         int         rc;
2421         int         n;
2422                 
2423         n = libcfs_ipif_enumerate(&names);
2424         if (n <= 0) {
2425                 CERROR("Can't enumerate interfaces: %d\n", n);
2426                 return n;
2427         }
2428
2429         for (i = j = 0; i < n; i++) {
2430                 int        up;
2431                 __u32      ip;
2432                 __u32      mask;
2433
2434                 if (!strcmp(names[i], "lo")) /* skip the loopback IF */
2435                         continue;
2436
2437                 rc = libcfs_ipif_query(names[i], &up, &ip, &mask);
2438                 if (rc != 0) {
2439                         CWARN("Can't get interface %s info: %d\n",
2440                               names[i], rc);
2441                         continue;
2442                 }
2443                 
2444                 if (!up) {
2445                         CWARN("Ignoring interface %s (down)\n",
2446                               names[i]);
2447                         continue;
2448                 }
2449
2450                 if (j == LNET_MAX_INTERFACES) {
2451                         CWARN("Ignoring interface %s (too many interfaces)\n",
2452                               names[i]);
2453                         continue;
2454                 }
2455
2456                 net->ksnn_interfaces[j].ksni_ipaddr = ip;
2457                 net->ksnn_interfaces[j].ksni_netmask = mask;
2458                 j++;
2459         }
2460
2461         libcfs_ipif_free_enumeration(names, n);
2462         
2463         if (j == 0)
2464                 CERROR("Can't find any usable interfaces\n");
2465         
2466         return j;
2467 }
2468
2469 int
2470 ksocknal_startup (lnet_ni_t *ni)
2471 {
2472         ksock_net_t  *net;
2473         int           rc;
2474         int           i;
2475
2476         LASSERT (ni->ni_lnd == &the_ksocklnd);
2477
2478         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2479                 rc = ksocknal_base_startup();
2480                 if (rc != 0)
2481                         return rc;
2482         }
2483         
2484         LIBCFS_ALLOC(net, sizeof(*net));
2485         if (net == NULL)
2486                 goto fail_0;
2487                 
2488         memset(net, 0, sizeof(*net));
2489         spin_lock_init(&net->ksnn_lock);
2490         net->ksnn_incarnation = ksocknal_new_incarnation();
2491         ni->ni_data = net;
2492         ni->ni_maxtxcredits = *ksocknal_tunables.ksnd_credits;
2493         ni->ni_peertxcredits = *ksocknal_tunables.ksnd_peercredits;
2494         
2495         if (ni->ni_interfaces[0] == NULL) {
2496                 rc = ksocknal_enumerate_interfaces(net);
2497                 if (rc <= 0)
2498                         goto fail_1;
2499
2500                 net->ksnn_ninterfaces = 1;
2501         } else {
2502                 for (i = 0; i < LNET_MAX_INTERFACES; i++) {
2503                         int    up;
2504
2505                         if (ni->ni_interfaces[i] == NULL)
2506                                 break;
2507
2508                         rc = libcfs_ipif_query(
2509                                 ni->ni_interfaces[i], &up,
2510                                 &net->ksnn_interfaces[i].ksni_ipaddr,
2511                                 &net->ksnn_interfaces[i].ksni_netmask);
2512                         
2513                         if (rc != 0) {
2514                                 CERROR("Can't get interface %s info: %d\n",
2515                                        ni->ni_interfaces[i], rc);
2516                                 goto fail_1;
2517                         }
2518                         
2519                         if (!up) {
2520                                 CERROR("Interface %s is down\n",
2521                                        ni->ni_interfaces[i]);
2522                                 goto fail_1;
2523                         }
2524                 }
2525                 net->ksnn_ninterfaces = i;
2526         }
2527
2528         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
2529                                 net->ksnn_interfaces[0].ksni_ipaddr);
2530
2531         ksocknal_data.ksnd_nnets++;
2532
2533         return 0;
2534         
2535  fail_1:
2536         LIBCFS_FREE(net, sizeof(*net));
2537  fail_0:
2538         if (ksocknal_data.ksnd_nnets == 0)
2539                 ksocknal_base_shutdown();
2540
2541         return -ENETDOWN;
2542 }
2543
2544
2545 void __exit
2546 ksocknal_module_fini (void)
2547 {
2548         lnet_unregister_lnd(&the_ksocklnd);
2549         ksocknal_lib_tunables_fini();
2550 }
2551
2552 int __init
2553 ksocknal_module_init (void)
2554 {
2555         int    rc;
2556
2557         /* check ksnr_connected/connecting field large enough */
2558         CLASSERT(SOCKLND_CONN_NTYPES <= 4);
2559         
2560         rc = ksocknal_lib_tunables_init();
2561         if (rc != 0)
2562                 return rc;
2563
2564         lnet_register_lnd(&the_ksocklnd);
2565
2566         return 0;
2567 }
2568
2569 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2570 MODULE_DESCRIPTION("Kernel TCP Socket LND v2.0.0");
2571 MODULE_LICENSE("GPL");
2572
2573 cfs_module(ksocknal, "2.0.0", ksocknal_module_init, ksocknal_module_fini);