Whamcloud - gitweb
* landed unified portals (b_hd_cleanup_merge_singleportals) on HEAD
[fs/lustre-release.git] / lustre / portals / knals / socknal / socknal.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 nal_t                   ksocknal_api;
29 ksock_nal_data_t        ksocknal_data;
30 ptl_handle_ni_t         ksocknal_ni;
31 ksock_tunables_t        ksocknal_tunables;
32
33 kpr_nal_interface_t ksocknal_router_interface = {
34         kprni_nalid:      SOCKNAL,
35         kprni_arg:        &ksocknal_data,
36         kprni_fwd:        ksocknal_fwd_packet,
37         kprni_notify:     ksocknal_notify,
38 };
39
40 #ifdef CONFIG_SYSCTL
41 #define SOCKNAL_SYSCTL  200
42
43 #define SOCKNAL_SYSCTL_TIMEOUT          1
44 #define SOCKNAL_SYSCTL_EAGER_ACK        2
45 #define SOCKNAL_SYSCTL_ZERO_COPY        3
46 #define SOCKNAL_SYSCTL_TYPED            4
47 #define SOCKNAL_SYSCTL_MIN_BULK         5
48 #define SOCKNAL_SYSCTL_BUFFER_SIZE      6
49 #define SOCKNAL_SYSCTL_NAGLE            7
50 #define SOCKNAL_SYSCTL_IRQ_AFFINITY     8
51 #define SOCKNAL_SYSCTL_KEEPALIVE_IDLE   9
52 #define SOCKNAL_SYSCTL_KEEPALIVE_COUNT 10
53 #define SOCKNAL_SYSCTL_KEEPALIVE_INTVL 11
54
55 static ctl_table ksocknal_ctl_table[] = {
56         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
57          &ksocknal_tunables.ksnd_io_timeout, sizeof (int),
58          0644, NULL, &proc_dointvec},
59         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
60          &ksocknal_tunables.ksnd_eager_ack, sizeof (int),
61          0644, NULL, &proc_dointvec},
62 #if SOCKNAL_ZC
63         {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy", 
64          &ksocknal_tunables.ksnd_zc_min_frag, sizeof (int),
65          0644, NULL, &proc_dointvec},
66 #endif
67         {SOCKNAL_SYSCTL_TYPED, "typed", 
68          &ksocknal_tunables.ksnd_typed_conns, sizeof (int),
69          0644, NULL, &proc_dointvec},
70         {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk", 
71          &ksocknal_tunables.ksnd_min_bulk, sizeof (int),
72          0644, NULL, &proc_dointvec},
73         {SOCKNAL_SYSCTL_BUFFER_SIZE, "buffer_size",
74          &ksocknal_tunables.ksnd_buffer_size, sizeof(int),
75          0644, NULL, &proc_dointvec},
76         {SOCKNAL_SYSCTL_NAGLE, "nagle",
77          &ksocknal_tunables.ksnd_nagle, sizeof(int),
78          0644, NULL, &proc_dointvec},
79 #if CPU_AFFINITY
80         {SOCKNAL_SYSCTL_IRQ_AFFINITY, "irq_affinity",
81          &ksocknal_tunables.ksnd_irq_affinity, sizeof(int),
82          0644, NULL, &proc_dointvec},
83 #endif
84         {SOCKNAL_SYSCTL_KEEPALIVE_IDLE, "keepalive_idle",
85          &ksocknal_tunables.ksnd_keepalive_idle, sizeof(int),
86          0644, NULL, &proc_dointvec},
87         {SOCKNAL_SYSCTL_KEEPALIVE_COUNT, "keepalive_count",
88          &ksocknal_tunables.ksnd_keepalive_count, sizeof(int),
89          0644, NULL, &proc_dointvec},
90         {SOCKNAL_SYSCTL_KEEPALIVE_INTVL, "keepalive_intvl",
91          &ksocknal_tunables.ksnd_keepalive_intvl, sizeof(int),
92          0644, NULL, &proc_dointvec},
93         { 0 }
94 };
95
96 static ctl_table ksocknal_top_ctl_table[] = {
97         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
98         { 0 }
99 };
100 #endif
101
102 int
103 ksocknal_set_mynid(ptl_nid_t nid)
104 {
105         lib_ni_t *ni = &ksocknal_lib.libnal_ni;
106
107         /* FIXME: we have to do this because we call lib_init() at module
108          * insertion time, which is before we have 'mynid' available.  lib_init
109          * sets the NAL's nid, which it uses to tell other nodes where packets
110          * are coming from.  This is not a very graceful solution to this
111          * problem. */
112
113         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
114                nid, ni->ni_pid.nid);
115
116         ni->ni_pid.nid = nid;
117         return (0);
118 }
119
120 void
121 ksocknal_bind_irq (unsigned int irq)
122 {
123 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
124         int              bind;
125         int              cpu;
126         unsigned long    flags;
127         char             cmdline[64];
128         ksock_irqinfo_t *info;
129         char            *argv[] = {"/bin/sh",
130                                    "-c",
131                                    cmdline,
132                                    NULL};
133         char            *envp[] = {"HOME=/",
134                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
135                                    NULL};
136
137         LASSERT (irq < NR_IRQS);
138         if (irq == 0)              /* software NIC or affinity disabled */
139                 return;
140
141         info = &ksocknal_data.ksnd_irqinfo[irq];
142
143         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
144
145         LASSERT (info->ksni_valid);
146         bind = !info->ksni_bound;
147         info->ksni_bound = 1;
148
149         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
150
151         if (!bind)                              /* bound already */
152                 return;
153
154         cpu = ksocknal_irqsched2cpu(info->ksni_sched);
155         snprintf (cmdline, sizeof (cmdline),
156                   "echo %d > /proc/irq/%u/smp_affinity", 1 << cpu, irq);
157
158         printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
159                 irq, cpu, cmdline);
160
161         /* FIXME: Find a better method of setting IRQ affinity...
162          */
163
164         USERMODEHELPER(argv[0], argv, envp);
165 #endif
166 }
167
168 ksock_interface_t *
169 ksocknal_ip2iface(__u32 ip)
170 {
171         int                i;
172         ksock_interface_t *iface;
173
174         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
175                 LASSERT(i < SOCKNAL_MAX_INTERFACES);
176                 iface = &ksocknal_data.ksnd_interfaces[i];
177                 
178                 if (iface->ksni_ipaddr == ip)
179                         return (iface);
180         }
181         
182         return (NULL);
183 }
184
185 ksock_route_t *
186 ksocknal_create_route (__u32 ipaddr, int port)
187 {
188         ksock_route_t *route;
189
190         PORTAL_ALLOC (route, sizeof (*route));
191         if (route == NULL)
192                 return (NULL);
193
194         atomic_set (&route->ksnr_refcount, 1);
195         route->ksnr_peer = NULL;
196         route->ksnr_timeout = jiffies;
197         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
198         route->ksnr_ipaddr = ipaddr;
199         route->ksnr_port = port;
200         route->ksnr_connecting = 0;
201         route->ksnr_connected = 0;
202         route->ksnr_deleted = 0;
203         route->ksnr_conn_count = 0;
204         route->ksnr_share_count = 0;
205
206         return (route);
207 }
208
209 void
210 ksocknal_destroy_route (ksock_route_t *route)
211 {
212         if (route->ksnr_peer != NULL)
213                 ksocknal_put_peer (route->ksnr_peer);
214
215         PORTAL_FREE (route, sizeof (*route));
216 }
217
218 void
219 ksocknal_put_route (ksock_route_t *route)
220 {
221         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
222                 route, atomic_read (&route->ksnr_refcount));
223
224         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
225         if (!atomic_dec_and_test (&route->ksnr_refcount))
226              return;
227
228         ksocknal_destroy_route (route);
229 }
230
231 ksock_peer_t *
232 ksocknal_create_peer (ptl_nid_t nid)
233 {
234         ksock_peer_t *peer;
235
236         LASSERT (nid != PTL_NID_ANY);
237
238         PORTAL_ALLOC (peer, sizeof (*peer));
239         if (peer == NULL)
240                 return (NULL);
241
242         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
243
244         peer->ksnp_nid = nid;
245         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
246         peer->ksnp_closing = 0;
247         INIT_LIST_HEAD (&peer->ksnp_conns);
248         INIT_LIST_HEAD (&peer->ksnp_routes);
249         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
250
251         atomic_inc (&ksocknal_data.ksnd_npeers);
252         return (peer);
253 }
254
255 void
256 ksocknal_destroy_peer (ksock_peer_t *peer)
257 {
258         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
259
260         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
261         LASSERT (list_empty (&peer->ksnp_conns));
262         LASSERT (list_empty (&peer->ksnp_routes));
263         LASSERT (list_empty (&peer->ksnp_tx_queue));
264
265         PORTAL_FREE (peer, sizeof (*peer));
266
267         /* NB a peer's connections and autoconnect routes keep a reference
268          * on their peer until they are destroyed, so we can be assured
269          * that _all_ state to do with this peer has been cleaned up when
270          * its refcount drops to zero. */
271         atomic_dec (&ksocknal_data.ksnd_npeers);
272 }
273
274 void
275 ksocknal_put_peer (ksock_peer_t *peer)
276 {
277         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
278                 peer, peer->ksnp_nid,
279                 atomic_read (&peer->ksnp_refcount));
280
281         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
282         if (!atomic_dec_and_test (&peer->ksnp_refcount))
283                 return;
284
285         ksocknal_destroy_peer (peer);
286 }
287
288 ksock_peer_t *
289 ksocknal_find_peer_locked (ptl_nid_t nid)
290 {
291         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
292         struct list_head *tmp;
293         ksock_peer_t     *peer;
294
295         list_for_each (tmp, peer_list) {
296
297                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
298
299                 LASSERT (!peer->ksnp_closing);
300
301                 if (peer->ksnp_nid != nid)
302                         continue;
303
304                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
305                        peer, nid, atomic_read (&peer->ksnp_refcount));
306                 return (peer);
307         }
308         return (NULL);
309 }
310
311 ksock_peer_t *
312 ksocknal_get_peer (ptl_nid_t nid)
313 {
314         ksock_peer_t     *peer;
315
316         read_lock (&ksocknal_data.ksnd_global_lock);
317         peer = ksocknal_find_peer_locked (nid);
318         if (peer != NULL)                       /* +1 ref for caller? */
319                 atomic_inc (&peer->ksnp_refcount);
320         read_unlock (&ksocknal_data.ksnd_global_lock);
321
322         return (peer);
323 }
324
325 void
326 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
327 {
328         int                i;
329         __u32              ip;
330
331         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
332                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
333                 ip = peer->ksnp_passive_ips[i];
334
335                 ksocknal_ip2iface(ip)->ksni_npeers--;
336         }
337
338         LASSERT (list_empty(&peer->ksnp_conns));
339         LASSERT (list_empty(&peer->ksnp_routes));
340         LASSERT (!peer->ksnp_closing);
341         peer->ksnp_closing = 1;
342         list_del (&peer->ksnp_list);
343         /* lose peerlist's ref */
344         ksocknal_put_peer (peer);
345 }
346
347 int
348 ksocknal_get_peer_info (int index, ptl_nid_t *nid,
349                         __u32 *myip, __u32 *peer_ip, int *port, 
350                         int *conn_count, int *share_count)
351 {
352         ksock_peer_t      *peer;
353         struct list_head  *ptmp;
354         ksock_route_t     *route;
355         struct list_head  *rtmp;
356         int                i;
357         int                j;
358         int                rc = -ENOENT;
359
360         read_lock (&ksocknal_data.ksnd_global_lock);
361
362         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
363                 
364                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
365                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
366
367                         if (peer->ksnp_n_passive_ips == 0 &&
368                             list_empty(&peer->ksnp_routes)) {
369                                 if (index-- > 0)
370                                         continue;
371                                 
372                                 *nid = peer->ksnp_nid;
373                                 *myip = 0;
374                                 *peer_ip = 0;
375                                 *port = 0;
376                                 *conn_count = 0;
377                                 *share_count = 0;
378                                 rc = 0;
379                                 goto out;
380                         }
381
382                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
383                                 if (index-- > 0)
384                                         continue;
385                                 
386                                 *nid = peer->ksnp_nid;
387                                 *myip = peer->ksnp_passive_ips[j];
388                                 *peer_ip = 0;
389                                 *port = 0;
390                                 *conn_count = 0;
391                                 *share_count = 0;
392                                 rc = 0;
393                                 goto out;
394                         }
395                         
396                         list_for_each (rtmp, &peer->ksnp_routes) {
397                                 if (index-- > 0)
398                                         continue;
399
400                                 route = list_entry(rtmp, ksock_route_t,
401                                                    ksnr_list);
402
403                                 *nid = peer->ksnp_nid;
404                                 *myip = route->ksnr_myipaddr;
405                                 *peer_ip = route->ksnr_ipaddr;
406                                 *port = route->ksnr_port;
407                                 *conn_count = route->ksnr_conn_count;
408                                 *share_count = route->ksnr_share_count;
409                                 rc = 0;
410                                 goto out;
411                         }
412                 }
413         }
414  out:
415         read_unlock (&ksocknal_data.ksnd_global_lock);
416         return (rc);
417 }
418
419 void
420 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
421 {
422         ksock_peer_t      *peer = route->ksnr_peer;
423         int                type = conn->ksnc_type;
424         ksock_interface_t *iface;
425
426         conn->ksnc_route = route;
427         atomic_inc (&route->ksnr_refcount);
428
429         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
430                 if (route->ksnr_myipaddr == 0) {
431                         /* route wasn't bound locally yet (the initial route) */
432                         CWARN("Binding "LPX64" %u.%u.%u.%u to %u.%u.%u.%u\n",
433                               peer->ksnp_nid, 
434                               HIPQUAD(route->ksnr_ipaddr),
435                               HIPQUAD(conn->ksnc_myipaddr));
436                 } else {
437                         CWARN("Rebinding "LPX64" %u.%u.%u.%u from "
438                               "%u.%u.%u.%u to %u.%u.%u.%u\n",
439                               peer->ksnp_nid, 
440                               HIPQUAD(route->ksnr_ipaddr),
441                               HIPQUAD(route->ksnr_myipaddr),
442                               HIPQUAD(conn->ksnc_myipaddr));
443                         
444                         iface = ksocknal_ip2iface(route->ksnr_myipaddr);
445                         if (iface != NULL) 
446                                 iface->ksni_nroutes--;
447                 }
448                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
449                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
450                 if (iface != NULL) 
451                         iface->ksni_nroutes++;
452         }
453
454         route->ksnr_connected |= (1<<type);
455         route->ksnr_connecting &= ~(1<<type);
456         route->ksnr_conn_count++;
457
458         /* Successful connection => further attempts can
459          * proceed immediately */
460         route->ksnr_timeout = jiffies;
461         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
462 }
463
464 void
465 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
466 {
467         struct list_head  *tmp;
468         ksock_conn_t      *conn;
469         int                type;
470         ksock_route_t     *route2;
471
472         LASSERT (route->ksnr_peer == NULL);
473         LASSERT (route->ksnr_connecting == 0);
474         LASSERT (route->ksnr_connected == 0);
475
476         /* LASSERT(unique) */
477         list_for_each(tmp, &peer->ksnp_routes) {
478                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
479
480                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
481                         CERROR ("Duplicate route "LPX64" %u.%u.%u.%u\n",
482                                 peer->ksnp_nid, HIPQUAD(route->ksnr_ipaddr));
483                         LBUG();
484                 }
485         }
486
487         route->ksnr_peer = peer;
488         atomic_inc (&peer->ksnp_refcount);
489         /* peer's routelist takes over my ref on 'route' */
490         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
491         
492         list_for_each(tmp, &peer->ksnp_conns) {
493                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
494                 type = conn->ksnc_type;
495
496                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
497                         continue;
498
499                 ksocknal_associate_route_conn_locked(route, conn);
500                 /* keep going (typed routes) */
501         }
502 }
503
504 void
505 ksocknal_del_route_locked (ksock_route_t *route)
506 {
507         ksock_peer_t      *peer = route->ksnr_peer;
508         ksock_interface_t *iface;
509         ksock_conn_t      *conn;
510         struct list_head  *ctmp;
511         struct list_head  *cnxt;
512
513         LASSERT (!route->ksnr_deleted);
514
515         /* Close associated conns */
516         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
517                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
518
519                 if (conn->ksnc_route != route)
520                         continue;
521                 
522                 ksocknal_close_conn_locked (conn, 0);
523         }
524
525         if (route->ksnr_myipaddr != 0) {
526                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
527                 if (iface != NULL)
528                         iface->ksni_nroutes--;
529         }
530
531         route->ksnr_deleted = 1;
532         list_del (&route->ksnr_list);
533         ksocknal_put_route (route);             /* drop peer's ref */
534
535         if (list_empty (&peer->ksnp_routes) &&
536             list_empty (&peer->ksnp_conns)) {
537                 /* I've just removed the last autoconnect route of a peer
538                  * with no active connections */
539                 ksocknal_unlink_peer_locked (peer);
540         }
541 }
542
543 int
544 ksocknal_add_peer (ptl_nid_t nid, __u32 ipaddr, int port)
545 {
546         unsigned long      flags;
547         struct list_head  *tmp;
548         ksock_peer_t      *peer;
549         ksock_peer_t      *peer2;
550         ksock_route_t     *route;
551         ksock_route_t     *route2;
552         
553         if (nid == PTL_NID_ANY)
554                 return (-EINVAL);
555
556         /* Have a brand new peer ready... */
557         peer = ksocknal_create_peer (nid);
558         if (peer == NULL)
559                 return (-ENOMEM);
560
561         route = ksocknal_create_route (ipaddr, port);
562         if (route == NULL) {
563                 ksocknal_put_peer (peer);
564                 return (-ENOMEM);
565         }
566
567         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
568
569         peer2 = ksocknal_find_peer_locked (nid);
570         if (peer2 != NULL) {
571                 ksocknal_put_peer (peer);
572                 peer = peer2;
573         } else {
574                 /* peer table takes my ref on peer */
575                 list_add_tail (&peer->ksnp_list,
576                                ksocknal_nid2peerlist (nid));
577         }
578
579         route2 = NULL;
580         list_for_each (tmp, &peer->ksnp_routes) {
581                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
582                 
583                 if (route2->ksnr_ipaddr == ipaddr)
584                         break;
585                 
586                 route2 = NULL;
587         }
588         if (route2 == NULL) {
589                 ksocknal_add_route_locked(peer, route);
590                 route->ksnr_share_count++;
591         } else {
592                 ksocknal_put_route(route);
593                 route2->ksnr_share_count++;
594         }
595
596         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
597
598         return (0);
599 }
600
601 void
602 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip, int single_share)
603 {
604         ksock_conn_t     *conn;
605         ksock_route_t    *route;
606         struct list_head *tmp;
607         struct list_head *nxt;
608         int               nshared;
609
610         LASSERT (!peer->ksnp_closing);
611
612         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
613                 route = list_entry(tmp, ksock_route_t, ksnr_list);
614
615                 if (single_share && route->ksnr_share_count == 0)
616                         continue;
617
618                 /* no match */
619                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
620                         continue;
621
622                 if (!single_share)
623                         route->ksnr_share_count = 0;
624                 else if (route->ksnr_share_count > 0)
625                         route->ksnr_share_count--;
626
627                 if (route->ksnr_share_count == 0) {
628                         /* This deletes associated conns too */
629                         ksocknal_del_route_locked (route);
630                 }
631                 
632                 if (single_share)
633                         break;
634         }
635
636         nshared = 0;
637         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
638                 route = list_entry(tmp, ksock_route_t, ksnr_list);
639                 nshared += route->ksnr_share_count;
640         }
641                         
642         if (nshared == 0) {
643                 /* remove everything else if there are no explicit entries
644                  * left */
645
646                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
647                         route = list_entry(tmp, ksock_route_t, ksnr_list);
648
649                         /* we should only be removing auto-entries */
650                         LASSERT(route->ksnr_share_count == 0);
651                         ksocknal_del_route_locked (route);
652                 }
653
654                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
655                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
656
657                         ksocknal_close_conn_locked(conn, 0);
658                 }
659         }
660                 
661         /* NB peer unlinks itself when last conn/route is removed */
662 }
663
664 int
665 ksocknal_del_peer (ptl_nid_t nid, __u32 ip, int single_share)
666 {
667         unsigned long      flags;
668         struct list_head  *ptmp;
669         struct list_head  *pnxt;
670         ksock_peer_t      *peer;
671         int                lo;
672         int                hi;
673         int                i;
674         int                rc = -ENOENT;
675
676         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
677
678         if (nid != PTL_NID_ANY)
679                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
680         else {
681                 lo = 0;
682                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
683         }
684
685         for (i = lo; i <= hi; i++) {
686                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
687                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
688
689                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
690                                 continue;
691
692                         ksocknal_del_peer_locked (peer, ip, single_share);
693                         rc = 0;                 /* matched! */
694
695                         if (single_share)
696                                 break;
697                 }
698         }
699
700         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
701
702         return (rc);
703 }
704
705 ksock_conn_t *
706 ksocknal_get_conn_by_idx (int index)
707 {
708         ksock_peer_t      *peer;
709         struct list_head  *ptmp;
710         ksock_conn_t      *conn;
711         struct list_head  *ctmp;
712         int                i;
713
714         read_lock (&ksocknal_data.ksnd_global_lock);
715
716         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
717                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
718                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
719
720                         LASSERT (!peer->ksnp_closing);
721
722                         list_for_each (ctmp, &peer->ksnp_conns) {
723                                 if (index-- > 0)
724                                         continue;
725
726                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
727                                 atomic_inc (&conn->ksnc_refcount);
728                                 read_unlock (&ksocknal_data.ksnd_global_lock);
729                                 return (conn);
730                         }
731                 }
732         }
733
734         read_unlock (&ksocknal_data.ksnd_global_lock);
735         return (NULL);
736 }
737
738 int
739 ksocknal_get_conn_addrs (ksock_conn_t *conn)
740 {
741         struct sockaddr_in sin;
742         int                len = sizeof (sin);
743         int                rc;
744         
745         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
746                                             (struct sockaddr *)&sin, &len, 2);
747         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
748         LASSERT (!conn->ksnc_closing);
749
750         if (rc != 0) {
751                 CERROR ("Error %d getting sock peer IP\n", rc);
752                 return rc;
753         }
754
755         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
756         conn->ksnc_port   = ntohs (sin.sin_port);
757
758         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
759                                             (struct sockaddr *)&sin, &len, 0);
760         if (rc != 0) {
761                 CERROR ("Error %d getting sock local IP\n", rc);
762                 return rc;
763         }
764
765         conn->ksnc_myipaddr = ntohl (sin.sin_addr.s_addr);
766
767         return 0;
768 }
769
770 unsigned int
771 ksocknal_sock_irq (struct socket *sock)
772 {
773         int                irq = 0;
774         struct dst_entry  *dst;
775
776         if (!ksocknal_tunables.ksnd_irq_affinity)
777                 return 0;
778
779         dst = sk_dst_get (sock->sk);
780         if (dst != NULL) {
781                 if (dst->dev != NULL) {
782                         irq = dst->dev->irq;
783                         if (irq >= NR_IRQS) {
784                                 CERROR ("Unexpected IRQ %x\n", irq);
785                                 irq = 0;
786                         }
787                 }
788                 dst_release (dst);
789         }
790         
791         return (irq);
792 }
793
794 ksock_sched_t *
795 ksocknal_choose_scheduler_locked (unsigned int irq)
796 {
797         ksock_sched_t    *sched;
798         ksock_irqinfo_t  *info;
799         int               i;
800
801         LASSERT (irq < NR_IRQS);
802         info = &ksocknal_data.ksnd_irqinfo[irq];
803
804         if (irq != 0 &&                         /* hardware NIC */
805             info->ksni_valid) {                 /* already set up */
806                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
807         }
808
809         /* software NIC (irq == 0) || not associated with a scheduler yet.
810          * Choose the CPU with the fewest connections... */
811         sched = &ksocknal_data.ksnd_schedulers[0];
812         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
813                 if (sched->kss_nconns >
814                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
815                         sched = &ksocknal_data.ksnd_schedulers[i];
816
817         if (irq != 0) {                         /* Hardware NIC */
818                 info->ksni_valid = 1;
819                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
820
821                 /* no overflow... */
822                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
823         }
824
825         return (sched);
826 }
827
828 int
829 ksocknal_local_ipvec (__u32 *ipaddrs)
830 {
831         int                i;
832         int                nip;
833
834         read_lock (&ksocknal_data.ksnd_global_lock);
835
836         nip = ksocknal_data.ksnd_ninterfaces;
837         for (i = 0; i < nip; i++) {
838                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
839
840                 ipaddrs[i] = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
841                 LASSERT (ipaddrs[i] != 0);
842         }
843         
844         read_unlock (&ksocknal_data.ksnd_global_lock);
845         return (nip);
846 }
847
848 int
849 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
850 {
851         int   best_netmatch = 0;
852         int   best_xor      = 0;
853         int   best          = -1;
854         int   this_xor;
855         int   this_netmatch;
856         int   i;
857         
858         for (i = 0; i < nips; i++) {
859                 if (ips[i] == 0)
860                         continue;
861
862                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
863                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
864                 
865                 if (!(best < 0 ||
866                       best_netmatch < this_netmatch ||
867                       (best_netmatch == this_netmatch && 
868                        best_xor > this_xor)))
869                         continue;
870                 
871                 best = i;
872                 best_netmatch = this_netmatch;
873                 best_xor = this_xor;
874         }
875         
876         LASSERT (best >= 0);
877         return (best);
878 }
879
880 int
881 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
882 {
883         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
884         unsigned long       flags;
885         ksock_interface_t  *iface;
886         ksock_interface_t  *best_iface;
887         int                 n_ips;
888         int                 i;
889         int                 j;
890         int                 k;
891         __u32               ip;
892         __u32               xor;
893         int                 this_netmatch;
894         int                 best_netmatch;
895         int                 best_npeers;
896
897         /* CAVEAT EMPTOR: We do all our interface matching with an
898          * exclusive hold of global lock at IRQ priority.  We're only
899          * expecting to be dealing with small numbers of interfaces, so the
900          * O(n**3)-ness shouldn't matter */
901
902         /* Also note that I'm not going to return more than n_peerips
903          * interfaces, even if I have more myself */
904         
905         write_lock_irqsave(global_lock, flags);
906
907         LASSERT (n_peerips <= SOCKNAL_MAX_INTERFACES);
908         LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
909
910         n_ips = MIN(n_peerips, ksocknal_data.ksnd_ninterfaces);
911
912         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
913                 /*              ^ yes really... */
914
915                 /* If we have any new interfaces, first tick off all the
916                  * peer IPs that match old interfaces, then choose new
917                  * interfaces to match the remaining peer IPS. 
918                  * We don't forget interfaces we've stopped using; we might
919                  * start using them again... */
920                 
921                 if (i < peer->ksnp_n_passive_ips) {
922                         /* Old interface. */
923                         ip = peer->ksnp_passive_ips[i];
924                         best_iface = ksocknal_ip2iface(ip);
925
926                         /* peer passive ips are kept up to date */
927                         LASSERT(best_iface != NULL);
928                 } else {
929                         /* choose a new interface */
930                         LASSERT (i == peer->ksnp_n_passive_ips);
931
932                         best_iface = NULL;
933                         best_netmatch = 0;
934                         best_npeers = 0;
935                         
936                         for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
937                                 iface = &ksocknal_data.ksnd_interfaces[j];
938                                 ip = iface->ksni_ipaddr;
939
940                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
941                                         if (peer->ksnp_passive_ips[k] == ip)
942                                                 break;
943                         
944                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
945                                         continue;
946
947                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
948                                 xor = (ip ^ peerips[k]);
949                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
950
951                                 if (!(best_iface == NULL ||
952                                       best_netmatch < this_netmatch ||
953                                       (best_netmatch == this_netmatch &&
954                                        best_npeers > iface->ksni_npeers)))
955                                         continue;
956
957                                 best_iface = iface;
958                                 best_netmatch = this_netmatch;
959                                 best_npeers = iface->ksni_npeers;
960                         }
961
962                         best_iface->ksni_npeers++;
963                         ip = best_iface->ksni_ipaddr;
964                         peer->ksnp_passive_ips[i] = ip;
965                         peer->ksnp_n_passive_ips = i+1;
966                 }
967                 
968                 LASSERT (best_iface != NULL);
969
970                 /* mark the best matching peer IP used */
971                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
972                 peerips[j] = 0;
973         }
974         
975         /* Overwrite input peer IP addresses */
976         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
977         
978         write_unlock_irqrestore(global_lock, flags);
979         
980         return (n_ips);
981 }
982
983 void
984 ksocknal_create_routes(ksock_peer_t *peer, int port, 
985                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
986 {
987         ksock_route_t      *newroute = NULL;
988         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
989         unsigned long       flags;
990         struct list_head   *rtmp;
991         ksock_route_t      *route;
992         ksock_interface_t  *iface;
993         ksock_interface_t  *best_iface;
994         int                 best_netmatch;
995         int                 this_netmatch;
996         int                 best_nroutes;
997         int                 i;
998         int                 j;
999
1000         /* CAVEAT EMPTOR: We do all our interface matching with an
1001          * exclusive hold of global lock at IRQ priority.  We're only
1002          * expecting to be dealing with small numbers of interfaces, so the
1003          * O(n**3)-ness here shouldn't matter */
1004
1005         write_lock_irqsave(global_lock, flags);
1006
1007         LASSERT (npeer_ipaddrs <= SOCKNAL_MAX_INTERFACES);
1008         
1009         for (i = 0; i < npeer_ipaddrs; i++) {
1010                 if (newroute != NULL) {
1011                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
1012                 } else {
1013                         write_unlock_irqrestore(global_lock, flags);
1014
1015                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
1016                         if (newroute == NULL)
1017                                 return;
1018
1019                         write_lock_irqsave(global_lock, flags);
1020                 }
1021                 
1022                 /* Already got a route? */
1023                 route = NULL;
1024                 list_for_each(rtmp, &peer->ksnp_routes) {
1025                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1026
1027                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
1028                                 break;
1029                         
1030                         route = NULL;
1031                 }
1032                 if (route != NULL)
1033                         continue;
1034
1035                 best_iface = NULL;
1036                 best_nroutes = 0;
1037                 best_netmatch = 0;
1038
1039                 LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
1040
1041                 /* Select interface to connect from */
1042                 for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
1043                         iface = &ksocknal_data.ksnd_interfaces[j];
1044
1045                         /* Using this interface already? */
1046                         list_for_each(rtmp, &peer->ksnp_routes) {
1047                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
1048
1049                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
1050                                         break;
1051
1052                                 route = NULL;
1053                         }
1054                         if (route != NULL)
1055                                 continue;
1056
1057                         this_netmatch = (((iface->ksni_ipaddr ^ 
1058                                            newroute->ksnr_ipaddr) & 
1059                                            iface->ksni_netmask) == 0) ? 1 : 0;
1060                         
1061                         if (!(best_iface == NULL ||
1062                               best_netmatch < this_netmatch ||
1063                               (best_netmatch == this_netmatch &&
1064                                best_nroutes > iface->ksni_nroutes)))
1065                                 continue;
1066                         
1067                         best_iface = iface;
1068                         best_netmatch = this_netmatch;
1069                         best_nroutes = iface->ksni_nroutes;
1070                 }
1071                 
1072                 if (best_iface == NULL)
1073                         continue;
1074
1075                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
1076                 best_iface->ksni_nroutes++;
1077
1078                 ksocknal_add_route_locked(peer, newroute);
1079                 newroute = NULL;
1080         }
1081         
1082         write_unlock_irqrestore(global_lock, flags);
1083         if (newroute != NULL)
1084                 ksocknal_put_route(newroute);
1085 }
1086
1087 int
1088 ksocknal_create_conn (ksock_route_t *route, struct socket *sock, int type)
1089 {
1090         int                passive = (type == SOCKNAL_CONN_NONE);
1091         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
1092         __u32              ipaddrs[SOCKNAL_MAX_INTERFACES];
1093         int                nipaddrs;
1094         ptl_nid_t          nid;
1095         struct list_head  *tmp;
1096         __u64              incarnation;
1097         unsigned long      flags;
1098         ksock_conn_t      *conn;
1099         ksock_conn_t      *conn2;
1100         ksock_peer_t      *peer = NULL;
1101         ksock_peer_t      *peer2;
1102         ksock_sched_t     *sched;
1103         unsigned int       irq;
1104         ksock_tx_t        *tx;
1105         int                rc;
1106
1107         /* NB, sock has an associated file since (a) this connection might
1108          * have been created in userland and (b) we need to refcount the
1109          * socket so that we don't close it while I/O is being done on
1110          * it, and sock->file has that pre-cooked... */
1111         LASSERT (sock->file != NULL);
1112         LASSERT (file_count(sock->file) > 0);
1113         LASSERT (route == NULL || !passive);
1114
1115         rc = ksocknal_setup_sock (sock);
1116         if (rc != 0)
1117                 return (rc);
1118
1119         irq = ksocknal_sock_irq (sock);
1120
1121         PORTAL_ALLOC(conn, sizeof(*conn));
1122         if (conn == NULL)
1123                 return (-ENOMEM);
1124
1125         memset (conn, 0, sizeof (*conn));
1126         conn->ksnc_peer = NULL;
1127         conn->ksnc_route = NULL;
1128         conn->ksnc_sock = sock;
1129         conn->ksnc_type = type;
1130         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
1131         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
1132         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
1133
1134         conn->ksnc_rx_ready = 0;
1135         conn->ksnc_rx_scheduled = 0;
1136         ksocknal_new_packet (conn, 0);
1137
1138         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
1139         conn->ksnc_tx_ready = 0;
1140         conn->ksnc_tx_scheduled = 0;
1141         atomic_set (&conn->ksnc_tx_nob, 0);
1142
1143         /* stash conn's local and remote addrs */
1144         rc = ksocknal_get_conn_addrs (conn);
1145         if (rc != 0)
1146                 goto failed_0;
1147
1148         if (!passive) {
1149                 /* Active connection sends HELLO eagerly */
1150                 rc = ksocknal_local_ipvec(ipaddrs);
1151                 if (rc < 0)
1152                         goto failed_0;
1153                 nipaddrs = rc;
1154
1155                 rc = ksocknal_send_hello (conn, ipaddrs, nipaddrs);
1156                 if (rc != 0)
1157                         goto failed_0;
1158         }
1159
1160         /* Find out/confirm peer's NID and connection type and get the
1161          * vector of interfaces she's willing to let me connect to */
1162         nid = (route == NULL) ? PTL_NID_ANY : route->ksnr_peer->ksnp_nid;
1163         rc = ksocknal_recv_hello (conn, &nid, &incarnation, ipaddrs);
1164         if (rc < 0)
1165                 goto failed_0;
1166         nipaddrs = rc;
1167         LASSERT (nid != PTL_NID_ANY);
1168
1169         if (route != NULL) {
1170                 peer = route->ksnr_peer;
1171                 atomic_inc(&peer->ksnp_refcount);
1172         } else {
1173                 peer = ksocknal_create_peer(nid);
1174                 if (peer == NULL) {
1175                         rc = -ENOMEM;
1176                         goto failed_0;
1177                 }
1178
1179                 write_lock_irqsave(global_lock, flags);
1180
1181                 peer2 = ksocknal_find_peer_locked(nid);
1182                 if (peer2 == NULL) {
1183                         /* NB this puts an "empty" peer in the peer
1184                          * table (which takes my ref) */
1185                         list_add_tail(&peer->ksnp_list,
1186                                       ksocknal_nid2peerlist(nid));
1187                 } else  {
1188                         ksocknal_put_peer(peer);
1189                         peer = peer2;
1190                 }
1191                 /* +1 ref for me */
1192                 atomic_inc(&peer->ksnp_refcount);
1193
1194                 write_unlock_irqrestore(global_lock, flags);
1195         }
1196         
1197         if (!passive) {
1198                 ksocknal_create_routes(peer, conn->ksnc_port, 
1199                                        ipaddrs, nipaddrs);
1200                 rc = 0;
1201         } else {
1202                 rc = ksocknal_select_ips(peer, ipaddrs, nipaddrs);
1203                 LASSERT (rc >= 0);
1204                 rc = ksocknal_send_hello (conn, ipaddrs, rc);
1205         }
1206         if (rc < 0)
1207                 goto failed_1;
1208         
1209         write_lock_irqsave (global_lock, flags);
1210
1211         if (peer->ksnp_closing ||
1212             (route != NULL && route->ksnr_deleted)) {
1213                 /* route/peer got closed under me */
1214                 rc = -ESTALE;
1215                 goto failed_2;
1216         }
1217
1218         /* Refuse to duplicate an existing connection (both sides might
1219          * autoconnect at once), unless this is a loopback connection */
1220         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1221                 list_for_each(tmp, &peer->ksnp_conns) {
1222                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1223
1224                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1225                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1226                             conn2->ksnc_type != conn->ksnc_type ||
1227                             conn2->ksnc_incarnation != incarnation)
1228                                 continue;
1229
1230                         CWARN("Not creating duplicate connection to "
1231                               "%u.%u.%u.%u type %d\n",
1232                               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_type);
1233                         rc = -EALREADY;
1234                         goto failed_2;
1235                 }
1236         }
1237
1238         /* If the connection created by this route didn't bind to the IP
1239          * address the route connected to, the connection/route matching
1240          * code below probably isn't going to work. */
1241         if (route != NULL &&
1242             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1243                 CERROR("Route "LPX64" %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1244                        peer->ksnp_nid,
1245                        HIPQUAD(route->ksnr_ipaddr),
1246                        HIPQUAD(conn->ksnc_ipaddr));
1247         }
1248
1249         /* Search for a route corresponding to the new connection and
1250          * create an association.  This allows incoming connections created
1251          * by routes in my peer to match my own route entries so I don't
1252          * continually create duplicate routes. */
1253         list_for_each (tmp, &peer->ksnp_routes) {
1254                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1255
1256                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1257                         continue;
1258                 
1259                 ksocknal_associate_route_conn_locked(route, conn);
1260                 break;
1261         }
1262
1263         /* Give conn a ref on sock->file since we're going to return success */
1264         get_file(sock->file);
1265
1266         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1267         conn->ksnc_incarnation = incarnation;
1268         peer->ksnp_last_alive = jiffies;
1269         peer->ksnp_error = 0;
1270
1271         sched = ksocknal_choose_scheduler_locked (irq);
1272         sched->kss_nconns++;
1273         conn->ksnc_scheduler = sched;
1274
1275         /* Set the deadline for the outgoing HELLO to drain */
1276         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1277         conn->ksnc_tx_deadline = jiffies +
1278                                  ksocknal_tunables.ksnd_io_timeout * HZ;
1279         mb();       /* order with adding to peer's conn list */
1280
1281         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1282         atomic_inc (&conn->ksnc_refcount);
1283
1284         /* NB my callbacks block while I hold ksnd_global_lock */
1285         sock->sk->sk_user_data = conn;
1286         sock->sk->sk_data_ready = ksocknal_data_ready;
1287         sock->sk->sk_write_space = ksocknal_write_space;
1288
1289         /* Take all the packets blocking for a connection.
1290          * NB, it might be nicer to share these blocked packets among any
1291          * other connections that are becoming established. */
1292         while (!list_empty (&peer->ksnp_tx_queue)) {
1293                 tx = list_entry (peer->ksnp_tx_queue.next,
1294                                  ksock_tx_t, tx_list);
1295
1296                 list_del (&tx->tx_list);
1297                 ksocknal_queue_tx_locked (tx, conn);
1298         }
1299
1300         rc = ksocknal_close_stale_conns_locked(peer, incarnation);
1301         if (rc != 0)
1302                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
1303                         rc, conn->ksnc_peer->ksnp_nid,
1304                         HIPQUAD(conn->ksnc_ipaddr));
1305
1306         write_unlock_irqrestore (global_lock, flags);
1307
1308         ksocknal_bind_irq (irq);
1309
1310         /* Call the callbacks right now to get things going. */
1311         if (ksocknal_getconnsock(conn) == 0) {
1312                 ksocknal_data_ready (sock->sk, 0);
1313                 ksocknal_write_space (sock->sk);
1314                 ksocknal_putconnsock(conn);
1315         }
1316
1317         CWARN("New conn nid:"LPX64" %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1318               " incarnation:"LPX64" sched[%d]/%d\n",
1319               nid, HIPQUAD(conn->ksnc_myipaddr), 
1320               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port, incarnation,
1321               (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1322
1323         ksocknal_put_conn (conn);
1324         return (0);
1325
1326  failed_2:
1327         if (!peer->ksnp_closing &&
1328             list_empty (&peer->ksnp_conns) &&
1329             list_empty (&peer->ksnp_routes))
1330                 ksocknal_unlink_peer_locked(peer);
1331         write_unlock_irqrestore(global_lock, flags);
1332
1333  failed_1:
1334         ksocknal_put_peer (peer);
1335
1336  failed_0:
1337         PORTAL_FREE (conn, sizeof(*conn));
1338
1339         LASSERT (rc != 0);
1340         return (rc);
1341 }
1342
1343 void
1344 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1345 {
1346         /* This just does the immmediate housekeeping, and queues the
1347          * connection for the reaper to terminate.
1348          * Caller holds ksnd_global_lock exclusively in irq context */
1349         ksock_peer_t      *peer = conn->ksnc_peer;
1350         ksock_route_t     *route;
1351         ksock_conn_t      *conn2;
1352         struct list_head  *tmp;
1353
1354         LASSERT (peer->ksnp_error == 0);
1355         LASSERT (!conn->ksnc_closing);
1356         conn->ksnc_closing = 1;
1357         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
1358         
1359         /* ksnd_deathrow_conns takes over peer's ref */
1360         list_del (&conn->ksnc_list);
1361
1362         route = conn->ksnc_route;
1363         if (route != NULL) {
1364                 /* dissociate conn from route... */
1365                 LASSERT (!route->ksnr_deleted);
1366                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
1367                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1368
1369                 conn2 = NULL;
1370                 list_for_each(tmp, &peer->ksnp_conns) {
1371                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1372                         
1373                         if (conn2->ksnc_route == route &&
1374                             conn2->ksnc_type == conn->ksnc_type)
1375                                 break;
1376                         
1377                         conn2 = NULL;
1378                 }
1379                 if (conn2 == NULL)
1380                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1381
1382                 conn->ksnc_route = NULL;
1383
1384 #if 0           /* irrelevent with only eager routes */
1385                 list_del (&route->ksnr_list);   /* make route least favourite */
1386                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1387 #endif
1388                 ksocknal_put_route (route);     /* drop conn's ref on route */
1389         }
1390
1391         if (list_empty (&peer->ksnp_conns)) {
1392                 /* No more connections to this peer */
1393
1394                 peer->ksnp_error = error;       /* stash last conn close reason */
1395
1396                 if (list_empty (&peer->ksnp_routes)) {
1397                         /* I've just closed last conn belonging to a
1398                          * non-autoconnecting peer */
1399                         ksocknal_unlink_peer_locked (peer);
1400                 }
1401         }
1402
1403         spin_lock (&ksocknal_data.ksnd_reaper_lock);
1404
1405         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1406         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1407                 
1408         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
1409 }
1410
1411 void
1412 ksocknal_terminate_conn (ksock_conn_t *conn)
1413 {
1414         /* This gets called by the reaper (guaranteed thread context) to
1415          * disengage the socket from its callbacks and close it.
1416          * ksnc_refcount will eventually hit zero, and then the reaper will
1417          * destroy it. */
1418         unsigned long   flags;
1419         ksock_peer_t   *peer = conn->ksnc_peer;
1420         ksock_sched_t  *sched = conn->ksnc_scheduler;
1421         struct timeval  now;
1422         time_t          then = 0;
1423         int             notify = 0;
1424
1425         LASSERT(conn->ksnc_closing);
1426
1427         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1428         spin_lock_irqsave(&sched->kss_lock, flags);
1429
1430         if (!conn->ksnc_tx_scheduled &&
1431             !list_empty(&conn->ksnc_tx_queue)){
1432                 list_add_tail (&conn->ksnc_tx_list,
1433                                &sched->kss_tx_conns);
1434                 /* a closing conn is always ready to tx */
1435                 conn->ksnc_tx_ready = 1;
1436                 conn->ksnc_tx_scheduled = 1;
1437                 /* extra ref for scheduler */
1438                 atomic_inc (&conn->ksnc_refcount);
1439
1440                 wake_up (&sched->kss_waitq);
1441         }
1442
1443         spin_unlock_irqrestore (&sched->kss_lock, flags);
1444
1445         /* serialise with callbacks */
1446         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1447
1448         /* Remove conn's network callbacks.
1449          * NB I _have_ to restore the callback, rather than storing a noop,
1450          * since the socket could survive past this module being unloaded!! */
1451         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
1452         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
1453
1454         /* A callback could be in progress already; they hold a read lock
1455          * on ksnd_global_lock (to serialise with me) and NOOP if
1456          * sk_user_data is NULL. */
1457         conn->ksnc_sock->sk->sk_user_data = NULL;
1458
1459         /* OK, so this conn may not be completely disengaged from its
1460          * scheduler yet, but it _has_ committed to terminate... */
1461         conn->ksnc_scheduler->kss_nconns--;
1462
1463         if (peer->ksnp_error != 0) {
1464                 /* peer's last conn closed in error */
1465                 LASSERT (list_empty (&peer->ksnp_conns));
1466                 
1467                 /* convert peer's last-known-alive timestamp from jiffies */
1468                 do_gettimeofday (&now);
1469                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
1470                 notify = 1;
1471         }
1472         
1473         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1474
1475         /* The socket is closed on the final put; either here, or in
1476          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1477          * when the connection was established, this will close the socket
1478          * immediately, aborting anything buffered in it. Any hung
1479          * zero-copy transmits will therefore complete in finite time. */
1480         ksocknal_putconnsock (conn);
1481
1482         if (notify)
1483                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
1484                             0, then);
1485 }
1486
1487 void
1488 ksocknal_destroy_conn (ksock_conn_t *conn)
1489 {
1490         /* Final coup-de-grace of the reaper */
1491         CDEBUG (D_NET, "connection %p\n", conn);
1492
1493         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1494         LASSERT (conn->ksnc_route == NULL);
1495         LASSERT (!conn->ksnc_tx_scheduled);
1496         LASSERT (!conn->ksnc_rx_scheduled);
1497         LASSERT (list_empty(&conn->ksnc_tx_queue));
1498
1499         /* complete current receive if any */
1500         switch (conn->ksnc_rx_state) {
1501         case SOCKNAL_RX_BODY:
1502                 CERROR("Completing partial receive from "LPX64
1503                        ", ip %d.%d.%d.%d:%d, with error\n",
1504                        conn->ksnc_peer->ksnp_nid,
1505                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1506                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1507                 break;
1508         case SOCKNAL_RX_BODY_FWD:
1509                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1510                 break;
1511         case SOCKNAL_RX_HEADER:
1512         case SOCKNAL_RX_SLOP:
1513                 break;
1514         default:
1515                 LBUG ();
1516                 break;
1517         }
1518
1519         ksocknal_put_peer (conn->ksnc_peer);
1520
1521         PORTAL_FREE (conn, sizeof (*conn));
1522         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1523 }
1524
1525 void
1526 ksocknal_put_conn (ksock_conn_t *conn)
1527 {
1528         unsigned long flags;
1529
1530         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1531                 conn, conn->ksnc_peer->ksnp_nid,
1532                 atomic_read (&conn->ksnc_refcount));
1533
1534         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1535         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1536                 return;
1537
1538         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1539
1540         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1541         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1542
1543         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1544 }
1545
1546 int
1547 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1548 {
1549         ksock_conn_t       *conn;
1550         struct list_head   *ctmp;
1551         struct list_head   *cnxt;
1552         int                 count = 0;
1553
1554         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1555                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1556
1557                 if (ipaddr == 0 ||
1558                     conn->ksnc_ipaddr == ipaddr) {
1559                         count++;
1560                         ksocknal_close_conn_locked (conn, why);
1561                 }
1562         }
1563
1564         return (count);
1565 }
1566
1567 int
1568 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1569 {
1570         ksock_conn_t       *conn;
1571         struct list_head   *ctmp;
1572         struct list_head   *cnxt;
1573         int                 count = 0;
1574
1575         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1576                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1577
1578                 if (conn->ksnc_incarnation == incarnation)
1579                         continue;
1580
1581                 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1582                       "incarnation:"LPX64"("LPX64")\n",
1583                       peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1584                       conn->ksnc_incarnation, incarnation);
1585                 
1586                 count++;
1587                 ksocknal_close_conn_locked (conn, -ESTALE);
1588         }
1589
1590         return (count);
1591 }
1592
1593 int
1594 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why) 
1595 {
1596         ksock_peer_t     *peer = conn->ksnc_peer;
1597         __u32             ipaddr = conn->ksnc_ipaddr;
1598         unsigned long     flags;
1599         int               count;
1600
1601         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1602
1603         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1604         
1605         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1606
1607         return (count);
1608 }
1609
1610 int
1611 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1612 {
1613         unsigned long       flags;
1614         ksock_peer_t       *peer;
1615         struct list_head   *ptmp;
1616         struct list_head   *pnxt;
1617         int                 lo;
1618         int                 hi;
1619         int                 i;
1620         int                 count = 0;
1621
1622         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1623
1624         if (nid != PTL_NID_ANY)
1625                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1626         else {
1627                 lo = 0;
1628                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1629         }
1630
1631         for (i = lo; i <= hi; i++) {
1632                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1633
1634                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1635
1636                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1637                                 continue;
1638
1639                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1640                 }
1641         }
1642
1643         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1644
1645         /* wildcards always succeed */
1646         if (nid == PTL_NID_ANY || ipaddr == 0)
1647                 return (0);
1648         
1649         return (count == 0 ? -ENOENT : 0);
1650 }
1651
1652 void
1653 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1654 {
1655         /* The router is telling me she's been notified of a change in
1656          * gateway state.... */
1657
1658         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1659
1660         if (!alive) {
1661                 /* If the gateway crashed, close all open connections... */
1662                 ksocknal_close_matching_conns (gw_nid, 0);
1663                 return;
1664         }
1665         
1666         /* ...otherwise do nothing.  We can only establish new connections
1667          * if we have autroutes, and these connect on demand. */
1668 }
1669
1670 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1671 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1672 {
1673         return &(sk->tp_pinfo.af_tcp);
1674 }
1675 #else
1676 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1677 {
1678         struct tcp_sock *s = (struct tcp_sock *)sk;
1679         return &s->tcp;
1680 }
1681 #endif
1682
1683 void
1684 ksocknal_push_conn (ksock_conn_t *conn)
1685 {
1686         struct sock    *sk;
1687         struct tcp_opt *tp;
1688         int             nonagle;
1689         int             val = 1;
1690         int             rc;
1691         mm_segment_t    oldmm;
1692
1693         rc = ksocknal_getconnsock (conn);
1694         if (rc != 0)                            /* being shut down */
1695                 return;
1696         
1697         sk = conn->ksnc_sock->sk;
1698         tp = sock2tcp_opt(sk);
1699         
1700         lock_sock (sk);
1701         nonagle = tp->nonagle;
1702         tp->nonagle = 1;
1703         release_sock (sk);
1704
1705         oldmm = get_fs ();
1706         set_fs (KERNEL_DS);
1707
1708         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1709                                       (char *)&val, sizeof (val));
1710         LASSERT (rc == 0);
1711
1712         set_fs (oldmm);
1713
1714         lock_sock (sk);
1715         tp->nonagle = nonagle;
1716         release_sock (sk);
1717
1718         ksocknal_putconnsock (conn);
1719 }
1720
1721 void
1722 ksocknal_push_peer (ksock_peer_t *peer)
1723 {
1724         int               index;
1725         int               i;
1726         struct list_head *tmp;
1727         ksock_conn_t     *conn;
1728
1729         for (index = 0; ; index++) {
1730                 read_lock (&ksocknal_data.ksnd_global_lock);
1731
1732                 i = 0;
1733                 conn = NULL;
1734
1735                 list_for_each (tmp, &peer->ksnp_conns) {
1736                         if (i++ == index) {
1737                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1738                                 atomic_inc (&conn->ksnc_refcount);
1739                                 break;
1740                         }
1741                 }
1742
1743                 read_unlock (&ksocknal_data.ksnd_global_lock);
1744
1745                 if (conn == NULL)
1746                         break;
1747
1748                 ksocknal_push_conn (conn);
1749                 ksocknal_put_conn (conn);
1750         }
1751 }
1752
1753 int
1754 ksocknal_push (ptl_nid_t nid)
1755 {
1756         ksock_peer_t      *peer;
1757         struct list_head  *tmp;
1758         int                index;
1759         int                i;
1760         int                j;
1761         int                rc = -ENOENT;
1762
1763         if (nid != PTL_NID_ANY) {
1764                 peer = ksocknal_get_peer (nid);
1765
1766                 if (peer != NULL) {
1767                         rc = 0;
1768                         ksocknal_push_peer (peer);
1769                         ksocknal_put_peer (peer);
1770                 }
1771                 return (rc);
1772         }
1773
1774         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1775                 for (j = 0; ; j++) {
1776                         read_lock (&ksocknal_data.ksnd_global_lock);
1777
1778                         index = 0;
1779                         peer = NULL;
1780
1781                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1782                                 if (index++ == j) {
1783                                         peer = list_entry(tmp, ksock_peer_t,
1784                                                           ksnp_list);
1785                                         atomic_inc (&peer->ksnp_refcount);
1786                                         break;
1787                                 }
1788                         }
1789
1790                         read_unlock (&ksocknal_data.ksnd_global_lock);
1791
1792                         if (peer != NULL) {
1793                                 rc = 0;
1794                                 ksocknal_push_peer (peer);
1795                                 ksocknal_put_peer (peer);
1796                         }
1797                 }
1798
1799         }
1800
1801         return (rc);
1802 }
1803
1804 int
1805 ksocknal_add_interface(__u32 ipaddress, __u32 netmask)
1806 {
1807         unsigned long      flags;
1808         ksock_interface_t *iface;
1809         int                rc;
1810         int                i;
1811         int                j;
1812         struct list_head  *ptmp;
1813         ksock_peer_t      *peer;
1814         struct list_head  *rtmp;
1815         ksock_route_t     *route;
1816
1817         if (ipaddress == 0 ||
1818             netmask == 0)
1819                 return (-EINVAL);
1820
1821         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1822
1823         iface = ksocknal_ip2iface(ipaddress);
1824         if (iface != NULL) {
1825                 /* silently ignore dups */
1826                 rc = 0;
1827         } else if (ksocknal_data.ksnd_ninterfaces == SOCKNAL_MAX_INTERFACES) {
1828                 rc = -ENOSPC;
1829         } else {
1830                 iface = &ksocknal_data.ksnd_interfaces[ksocknal_data.ksnd_ninterfaces++];
1831
1832                 iface->ksni_ipaddr = ipaddress;
1833                 iface->ksni_netmask = netmask;
1834                 iface->ksni_nroutes = 0;
1835                 iface->ksni_npeers = 0;
1836
1837                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1838                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1839                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1840
1841                                 for (j = 0; i < peer->ksnp_n_passive_ips; j++)
1842                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1843                                                 iface->ksni_npeers++;
1844                                 
1845                                 list_for_each(rtmp, &peer->ksnp_routes) {
1846                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1847                                         
1848                                         if (route->ksnr_myipaddr == ipaddress)
1849                                                 iface->ksni_nroutes++;
1850                                 }
1851                         }
1852                 }
1853
1854                 rc = 0;
1855                 /* NB only new connections will pay attention to the new interface! */
1856         }
1857         
1858         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1859
1860         return (rc);
1861 }
1862
1863 void
1864 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1865 {
1866         struct list_head   *tmp;
1867         struct list_head   *nxt;
1868         ksock_route_t      *route;
1869         ksock_conn_t       *conn;
1870         int                 i;
1871         int                 j;
1872
1873         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1874                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1875                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1876                                 peer->ksnp_passive_ips[j-1] =
1877                                         peer->ksnp_passive_ips[j];
1878                         peer->ksnp_n_passive_ips--;
1879                         break;
1880                 }
1881
1882         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1883                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1884                 
1885                 if (route->ksnr_myipaddr != ipaddr)
1886                         continue;
1887                 
1888                 if (route->ksnr_share_count != 0) {
1889                         /* Manually created; keep, but unbind */
1890                         route->ksnr_myipaddr = 0;
1891                 } else {
1892                         ksocknal_del_route_locked(route);
1893                 }
1894         }
1895         
1896         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1897                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1898                 
1899                 if (conn->ksnc_myipaddr == ipaddr)
1900                         ksocknal_close_conn_locked (conn, 0);
1901         }
1902 }
1903
1904 int
1905 ksocknal_del_interface(__u32 ipaddress)
1906 {
1907         int                rc = -ENOENT;
1908         unsigned long      flags;
1909         struct list_head  *tmp;
1910         struct list_head  *nxt;
1911         ksock_peer_t      *peer;
1912         __u32              this_ip;
1913         int                i;
1914         int                j;
1915
1916         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1917
1918         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
1919                 this_ip = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
1920
1921                 if (!(ipaddress == 0 ||
1922                       ipaddress == this_ip))
1923                         continue;
1924
1925                 rc = 0;
1926
1927                 for (j = i+1; j < ksocknal_data.ksnd_ninterfaces; j++)
1928                         ksocknal_data.ksnd_interfaces[j-1] =
1929                                 ksocknal_data.ksnd_interfaces[j];
1930                 
1931                 ksocknal_data.ksnd_ninterfaces--;
1932
1933                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1934                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1935                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1936                                 
1937                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1938                         }
1939                 }
1940         }
1941         
1942         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1943         
1944         return (rc);
1945 }
1946
1947 int
1948 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1949 {
1950         int rc;
1951
1952         switch(pcfg->pcfg_command) {
1953         case NAL_CMD_GET_INTERFACE: {
1954                 ksock_interface_t *iface;
1955
1956                 read_lock (&ksocknal_data.ksnd_global_lock);
1957
1958                 if (pcfg->pcfg_count < 0 ||
1959                     pcfg->pcfg_count >= ksocknal_data.ksnd_ninterfaces) {
1960                         rc = -ENOENT;
1961                 } else {
1962                         rc = 0;
1963                         iface = &ksocknal_data.ksnd_interfaces[pcfg->pcfg_count];
1964
1965                         pcfg->pcfg_id    = iface->ksni_ipaddr;
1966                         pcfg->pcfg_misc  = iface->ksni_netmask;
1967                         pcfg->pcfg_fd    = iface->ksni_npeers;
1968                         pcfg->pcfg_count = iface->ksni_nroutes;
1969                 }
1970                 
1971                 read_unlock (&ksocknal_data.ksnd_global_lock);
1972                 break;
1973         }
1974         case NAL_CMD_ADD_INTERFACE: {
1975                 rc = ksocknal_add_interface(pcfg->pcfg_id, /* IP address */
1976                                             pcfg->pcfg_misc); /* net mask */
1977                 break;
1978         }
1979         case NAL_CMD_DEL_INTERFACE: {
1980                 rc = ksocknal_del_interface(pcfg->pcfg_id); /* IP address */
1981                 break;
1982         }
1983         case NAL_CMD_GET_PEER: {
1984                 ptl_nid_t    nid = 0;
1985                 __u32        myip = 0;
1986                 __u32        ip = 0;
1987                 int          port = 0;
1988                 int          conn_count = 0;
1989                 int          share_count = 0;
1990                 
1991                 rc = ksocknal_get_peer_info(pcfg->pcfg_count, &nid,
1992                                             &myip, &ip, &port,
1993                                             &conn_count,  &share_count);
1994                 pcfg->pcfg_nid   = nid;
1995                 pcfg->pcfg_size  = myip;
1996                 pcfg->pcfg_id    = ip;
1997                 pcfg->pcfg_misc  = port;
1998                 pcfg->pcfg_count = conn_count;
1999                 pcfg->pcfg_wait  = share_count;
2000                 break;
2001         }
2002         case NAL_CMD_ADD_PEER: {
2003                 rc = ksocknal_add_peer (pcfg->pcfg_nid, 
2004                                         pcfg->pcfg_id, /* IP */
2005                                         pcfg->pcfg_misc); /* port */
2006                 break;
2007         }
2008         case NAL_CMD_DEL_PEER: {
2009                 rc = ksocknal_del_peer (pcfg->pcfg_nid, 
2010                                         pcfg->pcfg_id, /* IP */
2011                                         pcfg->pcfg_flags); /* single_share? */
2012                 break;
2013         }
2014         case NAL_CMD_GET_CONN: {
2015                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
2016
2017                 if (conn == NULL)
2018                         rc = -ENOENT;
2019                 else {
2020                         int   txmem;
2021                         int   rxmem;
2022                         int   nagle;
2023
2024                         ksocknal_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2025
2026                         rc = 0;
2027                         pcfg->pcfg_nid    = conn->ksnc_peer->ksnp_nid;
2028                         pcfg->pcfg_id     = conn->ksnc_ipaddr;
2029                         pcfg->pcfg_misc   = conn->ksnc_port;
2030                         pcfg->pcfg_fd     = conn->ksnc_myipaddr;
2031                         pcfg->pcfg_flags  = conn->ksnc_type;
2032                         pcfg->pcfg_gw_nal = conn->ksnc_scheduler - 
2033                                             ksocknal_data.ksnd_schedulers;
2034                         pcfg->pcfg_count  = txmem;
2035                         pcfg->pcfg_size   = rxmem;
2036                         pcfg->pcfg_wait   = nagle;
2037                         ksocknal_put_conn (conn);
2038                 }
2039                 break;
2040         }
2041         case NAL_CMD_REGISTER_PEER_FD: {
2042                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
2043                 int            type = pcfg->pcfg_misc;
2044
2045                 if (sock == NULL)
2046                         break;
2047
2048                 switch (type) {
2049                 case SOCKNAL_CONN_NONE:
2050                 case SOCKNAL_CONN_ANY:
2051                 case SOCKNAL_CONN_CONTROL:
2052                 case SOCKNAL_CONN_BULK_IN:
2053                 case SOCKNAL_CONN_BULK_OUT:
2054                         rc = ksocknal_create_conn(NULL, sock, type);
2055                         break;
2056                 default:
2057                         rc = -EINVAL;
2058                         break;
2059                 }
2060                 fput (sock->file);
2061                 break;
2062         }
2063         case NAL_CMD_CLOSE_CONNECTION: {
2064                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid, 
2065                                                     pcfg->pcfg_id);
2066                 break;
2067         }
2068         case NAL_CMD_REGISTER_MYNID: {
2069                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
2070                 break;
2071         }
2072         case NAL_CMD_PUSH_CONNECTION: {
2073                 rc = ksocknal_push (pcfg->pcfg_nid);
2074                 break;
2075         }
2076         default:
2077                 rc = -EINVAL;
2078                 break;
2079         }
2080
2081         return rc;
2082 }
2083
2084 void
2085 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
2086 {
2087         int          npages = p->fmp_buff_pages;
2088         ksock_fmb_t *fmb;
2089         int          i;
2090
2091         LASSERT (list_empty(&p->fmp_blocked_conns));
2092         LASSERT (p->fmp_nactive_fmbs == 0);
2093         
2094         while (!list_empty(&p->fmp_idle_fmbs)) {
2095
2096                 fmb = list_entry(p->fmp_idle_fmbs.next,
2097                                  ksock_fmb_t, fmb_list);
2098                 
2099                 for (i = 0; i < npages; i++)
2100                         if (fmb->fmb_kiov[i].kiov_page != NULL)
2101                                 __free_page(fmb->fmb_kiov[i].kiov_page);
2102
2103                 list_del(&fmb->fmb_list);
2104                 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
2105         }
2106 }
2107
2108 void
2109 ksocknal_free_buffers (void)
2110 {
2111         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
2112         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
2113
2114         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
2115
2116         if (ksocknal_data.ksnd_schedulers != NULL)
2117                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
2118                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2119
2120         PORTAL_FREE (ksocknal_data.ksnd_peers,
2121                      sizeof (struct list_head) * 
2122                      ksocknal_data.ksnd_peer_hash_size);
2123 }
2124
2125 void
2126 ksocknal_api_shutdown (nal_t *nal)
2127 {
2128         ksock_sched_t *sched;
2129         int            i;
2130
2131         if (nal->nal_refct != 0) {
2132                 /* This module got the first ref */
2133                 PORTAL_MODULE_UNUSE;
2134                 return;
2135         }
2136
2137         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2138                atomic_read (&portal_kmemory));
2139
2140         LASSERT(nal == &ksocknal_api);
2141
2142         switch (ksocknal_data.ksnd_init) {
2143         default:
2144                 LASSERT (0);
2145
2146         case SOCKNAL_INIT_ALL:
2147                 libcfs_nal_cmd_unregister(SOCKNAL);
2148
2149                 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
2150                 /* fall through */
2151
2152         case SOCKNAL_INIT_LIB:
2153                 /* No more calls to ksocknal_cmd() to create new
2154                  * autoroutes/connections since we're being unloaded. */
2155
2156                 /* Delete all peers */
2157                 ksocknal_del_peer(PTL_NID_ANY, 0, 0);
2158
2159                 /* Wait for all peer state to clean up */
2160                 i = 2;
2161                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
2162                         i++;
2163                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2164                                "waiting for %d peers to disconnect\n",
2165                                atomic_read (&ksocknal_data.ksnd_npeers));
2166                         set_current_state (TASK_UNINTERRUPTIBLE);
2167                         schedule_timeout (HZ);
2168                 }
2169
2170                 /* Tell lib we've stopped calling into her. */
2171                 lib_fini(&ksocknal_lib);
2172
2173                 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2174                 /* fall through */
2175
2176         case SOCKNAL_INIT_DATA:
2177                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
2178                 LASSERT (ksocknal_data.ksnd_peers != NULL);
2179                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2180                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
2181                 }
2182                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
2183                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
2184                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2185                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
2186                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
2187
2188                 if (ksocknal_data.ksnd_schedulers != NULL)
2189                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2190                                 ksock_sched_t *kss =
2191                                         &ksocknal_data.ksnd_schedulers[i];
2192
2193                                 LASSERT (list_empty (&kss->kss_tx_conns));
2194                                 LASSERT (list_empty (&kss->kss_rx_conns));
2195                                 LASSERT (kss->kss_nconns == 0);
2196                         }
2197
2198                 /* stop router calling me */
2199                 kpr_shutdown (&ksocknal_data.ksnd_router);
2200
2201                 /* flag threads to terminate; wake and wait for them to die */
2202                 ksocknal_data.ksnd_shuttingdown = 1;
2203                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
2204                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
2205
2206                 for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2207                         sched = &ksocknal_data.ksnd_schedulers[i];
2208                         wake_up_all(&sched->kss_waitq);
2209                 }
2210
2211                 i = 4;
2212                 read_lock(&ksocknal_data.ksnd_global_lock);
2213                 while (ksocknal_data.ksnd_nthreads != 0) {
2214                         i++;
2215                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2216                                "waiting for %d threads to terminate\n",
2217                                 ksocknal_data.ksnd_nthreads);
2218                         read_unlock(&ksocknal_data.ksnd_global_lock);
2219                         set_current_state (TASK_UNINTERRUPTIBLE);
2220                         schedule_timeout (HZ);
2221                         read_lock(&ksocknal_data.ksnd_global_lock);
2222                 }
2223                 read_unlock(&ksocknal_data.ksnd_global_lock);
2224
2225                 kpr_deregister (&ksocknal_data.ksnd_router);
2226
2227                 ksocknal_free_buffers();
2228
2229                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2230                 /* fall through */
2231
2232         case SOCKNAL_INIT_NOTHING:
2233                 break;
2234         }
2235
2236         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2237                atomic_read (&portal_kmemory));
2238
2239         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
2240                atomic_read(&portal_kmemory));
2241 }
2242
2243
2244 void
2245 ksocknal_init_incarnation (void)
2246 {
2247         struct timeval tv;
2248
2249         /* The incarnation number is the time this module loaded and it
2250          * identifies this particular instance of the socknal.  Hopefully
2251          * we won't be able to reboot more frequently than 1MHz for the
2252          * forseeable future :) */
2253         
2254         do_gettimeofday(&tv);
2255         
2256         ksocknal_data.ksnd_incarnation = 
2257                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2258 }
2259
2260 int
2261 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
2262                       ptl_ni_limits_t *requested_limits,
2263                       ptl_ni_limits_t *actual_limits)
2264 {
2265         ptl_process_id_t  process_id;
2266         int               pkmem = atomic_read(&portal_kmemory);
2267         int               rc;
2268         int               i;
2269         int               j;
2270
2271         LASSERT (nal == &ksocknal_api);
2272
2273         if (nal->nal_refct != 0) {
2274                 if (actual_limits != NULL)
2275                         *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
2276                 /* This module got the first ref */
2277                 PORTAL_MODULE_USE;
2278                 return (PTL_OK);
2279         }
2280
2281         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2282
2283         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2284
2285         ksocknal_init_incarnation();
2286         
2287         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2288         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
2289                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2290         if (ksocknal_data.ksnd_peers == NULL)
2291                 return (-ENOMEM);
2292
2293         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2294                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2295
2296         rwlock_init(&ksocknal_data.ksnd_global_lock);
2297
2298         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
2299         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
2300         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
2301         ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
2302
2303         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
2304         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
2305         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
2306         ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
2307
2308         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2309         INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2310         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2311         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2312         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2313
2314         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
2315         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
2316         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
2317
2318         /* NB memset above zeros whole of ksocknal_data, including
2319          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2320
2321         /* flag lists/ptrs/locks initialised */
2322         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2323
2324         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2325         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
2326                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2327         if (ksocknal_data.ksnd_schedulers == NULL) {
2328                 ksocknal_api_shutdown (nal);
2329                 return (-ENOMEM);
2330         }
2331
2332         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2333                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2334
2335                 spin_lock_init (&kss->kss_lock);
2336                 INIT_LIST_HEAD (&kss->kss_rx_conns);
2337                 INIT_LIST_HEAD (&kss->kss_tx_conns);
2338 #if SOCKNAL_ZC
2339                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
2340 #endif
2341                 init_waitqueue_head (&kss->kss_waitq);
2342         }
2343
2344         /* NB we have to wait to be told our true NID... */
2345         process_id.pid = requested_pid; 
2346         process_id.nid = 0;
2347         
2348         rc = lib_init(&ksocknal_lib, nal, process_id,
2349                       requested_limits, actual_limits);
2350         if (rc != PTL_OK) {
2351                 CERROR("lib_init failed: error %d\n", rc);
2352                 ksocknal_api_shutdown (nal);
2353                 return (rc);
2354         }
2355
2356         ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
2357
2358         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2359                 rc = ksocknal_thread_start (ksocknal_scheduler,
2360                                             &ksocknal_data.ksnd_schedulers[i]);
2361                 if (rc != 0) {
2362                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2363                                i, rc);
2364                         ksocknal_api_shutdown (nal);
2365                         return (rc);
2366                 }
2367         }
2368
2369         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
2370                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
2371                 if (rc != 0) {
2372                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
2373                         ksocknal_api_shutdown (nal);
2374                         return (rc);
2375                 }
2376         }
2377
2378         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2379         if (rc != 0) {
2380                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2381                 ksocknal_api_shutdown (nal);
2382                 return (rc);
2383         }
2384
2385         rc = kpr_register(&ksocknal_data.ksnd_router,
2386                           &ksocknal_router_interface);
2387         if (rc != 0) {
2388                 CDEBUG(D_NET, "Can't initialise routing interface "
2389                        "(rc = %d): not routing\n", rc);
2390         } else {
2391                 /* Only allocate forwarding buffers if there's a router */
2392
2393                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
2394                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
2395                         ksock_fmb_t      *fmb;
2396                         ksock_fmb_pool_t *pool;
2397                         
2398
2399                         if (i < SOCKNAL_SMALL_FWD_NMSGS)
2400                                 pool = &ksocknal_data.ksnd_small_fmp;
2401                         else
2402                                 pool = &ksocknal_data.ksnd_large_fmp;
2403                         
2404                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t, 
2405                                                    fmb_kiov[pool->fmp_buff_pages]));
2406                         if (fmb == NULL) {
2407                                 ksocknal_api_shutdown(nal);
2408                                 return (-ENOMEM);
2409                         }
2410
2411                         fmb->fmb_pool = pool;
2412                         
2413                         for (j = 0; j < pool->fmp_buff_pages; j++) {
2414                                 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
2415
2416                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
2417                                         ksocknal_api_shutdown (nal);
2418                                         return (-ENOMEM);
2419                                 }
2420
2421                                 LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
2422                         }
2423
2424                         list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
2425                 }
2426         }
2427
2428         rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
2429         if (rc != 0) {
2430                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
2431                 ksocknal_api_shutdown (nal);
2432                 return (rc);
2433         }
2434
2435         /* flag everything initialised */
2436         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2437
2438         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
2439                "(Routing %s, initial mem %d, incarnation "LPX64")\n",
2440                kpr_routing (&ksocknal_data.ksnd_router) ?
2441                "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
2442
2443         return (0);
2444 }
2445
2446 void __exit
2447 ksocknal_module_fini (void)
2448 {
2449 #ifdef CONFIG_SYSCTL
2450         if (ksocknal_tunables.ksnd_sysctl != NULL)
2451                 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
2452 #endif
2453         PtlNIFini(ksocknal_ni);
2454
2455         ptl_unregister_nal(SOCKNAL);
2456 }
2457
2458 int __init
2459 ksocknal_module_init (void)
2460 {
2461         int    rc;
2462
2463         /* packet descriptor must fit in a router descriptor's scratchpad */
2464         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
2465         /* the following must be sizeof(int) for proc_dointvec() */
2466         LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
2467         LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
2468         LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
2469         LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
2470         LASSERT(sizeof (ksocknal_tunables.ksnd_buffer_size) == sizeof (int));
2471         LASSERT(sizeof (ksocknal_tunables.ksnd_nagle) == sizeof (int));
2472         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_idle) == sizeof (int));
2473         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_count) == sizeof (int));
2474         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_intvl) == sizeof (int));
2475 #if CPU_AFFINITY
2476         LASSERT(sizeof (ksocknal_tunables.ksnd_irq_affinity) == sizeof (int));
2477 #endif
2478 #if SOCKNAL_ZC
2479         LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
2480 #endif
2481         /* check ksnr_connected/connecting field large enough */
2482         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
2483         
2484         ksocknal_api.nal_ni_init = ksocknal_api_startup;
2485         ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
2486
2487         /* Initialise dynamic tunables to defaults once only */
2488         ksocknal_tunables.ksnd_io_timeout      = SOCKNAL_IO_TIMEOUT;
2489         ksocknal_tunables.ksnd_eager_ack       = SOCKNAL_EAGER_ACK;
2490         ksocknal_tunables.ksnd_typed_conns     = SOCKNAL_TYPED_CONNS;
2491         ksocknal_tunables.ksnd_min_bulk        = SOCKNAL_MIN_BULK;
2492         ksocknal_tunables.ksnd_buffer_size     = SOCKNAL_BUFFER_SIZE;
2493         ksocknal_tunables.ksnd_nagle           = SOCKNAL_NAGLE;
2494         ksocknal_tunables.ksnd_keepalive_idle  = SOCKNAL_KEEPALIVE_IDLE;
2495         ksocknal_tunables.ksnd_keepalive_count = SOCKNAL_KEEPALIVE_COUNT;
2496         ksocknal_tunables.ksnd_keepalive_intvl = SOCKNAL_KEEPALIVE_INTVL;
2497 #if CPU_AFFINITY
2498         ksocknal_tunables.ksnd_irq_affinity = SOCKNAL_IRQ_AFFINITY;
2499 #endif
2500 #if SOCKNAL_ZC
2501         ksocknal_tunables.ksnd_zc_min_frag  = SOCKNAL_ZC_MIN_FRAG;
2502 #endif
2503
2504         rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
2505         if (rc != PTL_OK) {
2506                 CERROR("Can't register SOCKNAL: %d\n", rc);
2507                 return (-ENOMEM);               /* or something... */
2508         }
2509
2510         /* Pure gateways want the NAL started up at module load time... */
2511         rc = PtlNIInit(SOCKNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &ksocknal_ni);
2512         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
2513                 ptl_unregister_nal(SOCKNAL);
2514                 return (-ENODEV);
2515         }
2516         
2517 #ifdef CONFIG_SYSCTL
2518         /* Press on regardless even if registering sysctl doesn't work */
2519         ksocknal_tunables.ksnd_sysctl = 
2520                 register_sysctl_table (ksocknal_top_ctl_table, 0);
2521 #endif
2522         return (0);
2523 }
2524
2525 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2526 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
2527 MODULE_LICENSE("GPL");
2528
2529 module_init(ksocknal_module_init);
2530 module_exit(ksocknal_module_fini);
2531