Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 nal_t                   ksocknal_api;
29 ksock_nal_data_t        ksocknal_data;
30 ptl_handle_ni_t         ksocknal_ni;
31 ksock_tunables_t        ksocknal_tunables;
32
33 kpr_nal_interface_t ksocknal_router_interface = {
34         kprni_nalid:      SOCKNAL,
35         kprni_arg:        &ksocknal_data,
36         kprni_fwd:        ksocknal_fwd_packet,
37         kprni_notify:     ksocknal_notify,
38 };
39
40 #ifdef CONFIG_SYSCTL
41 #define SOCKNAL_SYSCTL  200
42
43 #define SOCKNAL_SYSCTL_TIMEOUT          1
44 #define SOCKNAL_SYSCTL_EAGER_ACK        2
45 #define SOCKNAL_SYSCTL_ZERO_COPY        3
46 #define SOCKNAL_SYSCTL_TYPED            4
47 #define SOCKNAL_SYSCTL_MIN_BULK         5
48 #define SOCKNAL_SYSCTL_BUFFER_SIZE      6
49 #define SOCKNAL_SYSCTL_NAGLE            7
50 #define SOCKNAL_SYSCTL_IRQ_AFFINITY     8
51 #define SOCKNAL_SYSCTL_KEEPALIVE_IDLE   9
52 #define SOCKNAL_SYSCTL_KEEPALIVE_COUNT 10
53 #define SOCKNAL_SYSCTL_KEEPALIVE_INTVL 11
54
55 static ctl_table ksocknal_ctl_table[] = {
56         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
57          &ksocknal_tunables.ksnd_io_timeout, sizeof (int),
58          0644, NULL, &proc_dointvec},
59         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
60          &ksocknal_tunables.ksnd_eager_ack, sizeof (int),
61          0644, NULL, &proc_dointvec},
62 #if SOCKNAL_ZC
63         {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy", 
64          &ksocknal_tunables.ksnd_zc_min_frag, sizeof (int),
65          0644, NULL, &proc_dointvec},
66 #endif
67         {SOCKNAL_SYSCTL_TYPED, "typed", 
68          &ksocknal_tunables.ksnd_typed_conns, sizeof (int),
69          0644, NULL, &proc_dointvec},
70         {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk", 
71          &ksocknal_tunables.ksnd_min_bulk, sizeof (int),
72          0644, NULL, &proc_dointvec},
73         {SOCKNAL_SYSCTL_BUFFER_SIZE, "buffer_size",
74          &ksocknal_tunables.ksnd_buffer_size, sizeof(int),
75          0644, NULL, &proc_dointvec},
76         {SOCKNAL_SYSCTL_NAGLE, "nagle",
77          &ksocknal_tunables.ksnd_nagle, sizeof(int),
78          0644, NULL, &proc_dointvec},
79 #if CPU_AFFINITY
80         {SOCKNAL_SYSCTL_IRQ_AFFINITY, "irq_affinity",
81          &ksocknal_tunables.ksnd_irq_affinity, sizeof(int),
82          0644, NULL, &proc_dointvec},
83 #endif
84         {SOCKNAL_SYSCTL_KEEPALIVE_IDLE, "keepalive_idle",
85          &ksocknal_tunables.ksnd_keepalive_idle, sizeof(int),
86          0644, NULL, &proc_dointvec},
87         {SOCKNAL_SYSCTL_KEEPALIVE_COUNT, "keepalive_count",
88          &ksocknal_tunables.ksnd_keepalive_count, sizeof(int),
89          0644, NULL, &proc_dointvec},
90         {SOCKNAL_SYSCTL_KEEPALIVE_INTVL, "keepalive_intvl",
91          &ksocknal_tunables.ksnd_keepalive_intvl, sizeof(int),
92          0644, NULL, &proc_dointvec},
93         { 0 }
94 };
95
96 static ctl_table ksocknal_top_ctl_table[] = {
97         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
98         { 0 }
99 };
100 #endif
101
102 int
103 ksocknal_set_mynid(ptl_nid_t nid)
104 {
105         lib_ni_t *ni = &ksocknal_lib.libnal_ni;
106
107         /* FIXME: we have to do this because we call lib_init() at module
108          * insertion time, which is before we have 'mynid' available.  lib_init
109          * sets the NAL's nid, which it uses to tell other nodes where packets
110          * are coming from.  This is not a very graceful solution to this
111          * problem. */
112
113         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
114                nid, ni->ni_pid.nid);
115
116         ni->ni_pid.nid = nid;
117         return (0);
118 }
119
120 void
121 ksocknal_bind_irq (unsigned int irq)
122 {
123 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
124         int              bind;
125         int              cpu;
126         unsigned long    flags;
127         char             cmdline[64];
128         ksock_irqinfo_t *info;
129         char            *argv[] = {"/bin/sh",
130                                    "-c",
131                                    cmdline,
132                                    NULL};
133         char            *envp[] = {"HOME=/",
134                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
135                                    NULL};
136
137         LASSERT (irq < NR_IRQS);
138         if (irq == 0)              /* software NIC or affinity disabled */
139                 return;
140
141         info = &ksocknal_data.ksnd_irqinfo[irq];
142
143         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
144
145         LASSERT (info->ksni_valid);
146         bind = !info->ksni_bound;
147         info->ksni_bound = 1;
148
149         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
150
151         if (!bind)                              /* bound already */
152                 return;
153
154         cpu = ksocknal_irqsched2cpu(info->ksni_sched);
155         snprintf (cmdline, sizeof (cmdline),
156                   "echo %d > /proc/irq/%u/smp_affinity", 1 << cpu, irq);
157
158         printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
159                 irq, cpu, cmdline);
160
161         /* FIXME: Find a better method of setting IRQ affinity...
162          */
163
164         USERMODEHELPER(argv[0], argv, envp);
165 #endif
166 }
167
168 ksock_interface_t *
169 ksocknal_ip2iface(__u32 ip)
170 {
171         int                i;
172         ksock_interface_t *iface;
173
174         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
175                 LASSERT(i < SOCKNAL_MAX_INTERFACES);
176                 iface = &ksocknal_data.ksnd_interfaces[i];
177                 
178                 if (iface->ksni_ipaddr == ip)
179                         return (iface);
180         }
181         
182         return (NULL);
183 }
184
185 ksock_route_t *
186 ksocknal_create_route (__u32 ipaddr, int port)
187 {
188         ksock_route_t *route;
189
190         PORTAL_ALLOC (route, sizeof (*route));
191         if (route == NULL)
192                 return (NULL);
193
194         atomic_set (&route->ksnr_refcount, 1);
195         route->ksnr_peer = NULL;
196         route->ksnr_timeout = jiffies;
197         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
198         route->ksnr_ipaddr = ipaddr;
199         route->ksnr_port = port;
200         route->ksnr_connecting = 0;
201         route->ksnr_connected = 0;
202         route->ksnr_deleted = 0;
203         route->ksnr_conn_count = 0;
204         route->ksnr_share_count = 0;
205
206         return (route);
207 }
208
209 void
210 ksocknal_destroy_route (ksock_route_t *route)
211 {
212         if (route->ksnr_peer != NULL)
213                 ksocknal_put_peer (route->ksnr_peer);
214
215         PORTAL_FREE (route, sizeof (*route));
216 }
217
218 void
219 ksocknal_put_route (ksock_route_t *route)
220 {
221         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
222                 route, atomic_read (&route->ksnr_refcount));
223
224         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
225         if (!atomic_dec_and_test (&route->ksnr_refcount))
226              return;
227
228         ksocknal_destroy_route (route);
229 }
230
231 ksock_peer_t *
232 ksocknal_create_peer (ptl_nid_t nid)
233 {
234         ksock_peer_t *peer;
235
236         LASSERT (nid != PTL_NID_ANY);
237
238         PORTAL_ALLOC (peer, sizeof (*peer));
239         if (peer == NULL)
240                 return (NULL);
241
242         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
243
244         peer->ksnp_nid = nid;
245         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
246         peer->ksnp_closing = 0;
247         INIT_LIST_HEAD (&peer->ksnp_conns);
248         INIT_LIST_HEAD (&peer->ksnp_routes);
249         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
250
251         atomic_inc (&ksocknal_data.ksnd_npeers);
252         return (peer);
253 }
254
255 void
256 ksocknal_destroy_peer (ksock_peer_t *peer)
257 {
258         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
259
260         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
261         LASSERT (list_empty (&peer->ksnp_conns));
262         LASSERT (list_empty (&peer->ksnp_routes));
263         LASSERT (list_empty (&peer->ksnp_tx_queue));
264
265         PORTAL_FREE (peer, sizeof (*peer));
266
267         /* NB a peer's connections and autoconnect routes keep a reference
268          * on their peer until they are destroyed, so we can be assured
269          * that _all_ state to do with this peer has been cleaned up when
270          * its refcount drops to zero. */
271         atomic_dec (&ksocknal_data.ksnd_npeers);
272 }
273
274 void
275 ksocknal_put_peer (ksock_peer_t *peer)
276 {
277         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
278                 peer, peer->ksnp_nid,
279                 atomic_read (&peer->ksnp_refcount));
280
281         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
282         if (!atomic_dec_and_test (&peer->ksnp_refcount))
283                 return;
284
285         ksocknal_destroy_peer (peer);
286 }
287
288 ksock_peer_t *
289 ksocknal_find_peer_locked (ptl_nid_t nid)
290 {
291         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
292         struct list_head *tmp;
293         ksock_peer_t     *peer;
294
295         list_for_each (tmp, peer_list) {
296
297                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
298
299                 LASSERT (!peer->ksnp_closing);
300
301                 if (peer->ksnp_nid != nid)
302                         continue;
303
304                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
305                        peer, nid, atomic_read (&peer->ksnp_refcount));
306                 return (peer);
307         }
308         return (NULL);
309 }
310
311 ksock_peer_t *
312 ksocknal_get_peer (ptl_nid_t nid)
313 {
314         ksock_peer_t     *peer;
315
316         read_lock (&ksocknal_data.ksnd_global_lock);
317         peer = ksocknal_find_peer_locked (nid);
318         if (peer != NULL)                       /* +1 ref for caller? */
319                 atomic_inc (&peer->ksnp_refcount);
320         read_unlock (&ksocknal_data.ksnd_global_lock);
321
322         return (peer);
323 }
324
325 void
326 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
327 {
328         int                i;
329         __u32              ip;
330
331         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
332                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
333                 ip = peer->ksnp_passive_ips[i];
334
335                 ksocknal_ip2iface(ip)->ksni_npeers--;
336         }
337
338         LASSERT (list_empty(&peer->ksnp_conns));
339         LASSERT (list_empty(&peer->ksnp_routes));
340         LASSERT (!peer->ksnp_closing);
341         peer->ksnp_closing = 1;
342         list_del (&peer->ksnp_list);
343         /* lose peerlist's ref */
344         ksocknal_put_peer (peer);
345 }
346
347 int
348 ksocknal_get_peer_info (int index, ptl_nid_t *nid,
349                         __u32 *myip, __u32 *peer_ip, int *port, 
350                         int *conn_count, int *share_count)
351 {
352         ksock_peer_t      *peer;
353         struct list_head  *ptmp;
354         ksock_route_t     *route;
355         struct list_head  *rtmp;
356         int                i;
357         int                j;
358         int                rc = -ENOENT;
359
360         read_lock (&ksocknal_data.ksnd_global_lock);
361
362         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
363                 
364                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
365                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
366
367                         if (peer->ksnp_n_passive_ips == 0 &&
368                             list_empty(&peer->ksnp_routes)) {
369                                 if (index-- > 0)
370                                         continue;
371                                 
372                                 *nid = peer->ksnp_nid;
373                                 *myip = 0;
374                                 *peer_ip = 0;
375                                 *port = 0;
376                                 *conn_count = 0;
377                                 *share_count = 0;
378                                 rc = 0;
379                                 goto out;
380                         }
381
382                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
383                                 if (index-- > 0)
384                                         continue;
385                                 
386                                 *nid = peer->ksnp_nid;
387                                 *myip = peer->ksnp_passive_ips[j];
388                                 *peer_ip = 0;
389                                 *port = 0;
390                                 *conn_count = 0;
391                                 *share_count = 0;
392                                 rc = 0;
393                                 goto out;
394                         }
395                         
396                         list_for_each (rtmp, &peer->ksnp_routes) {
397                                 if (index-- > 0)
398                                         continue;
399
400                                 route = list_entry(rtmp, ksock_route_t,
401                                                    ksnr_list);
402
403                                 *nid = peer->ksnp_nid;
404                                 *myip = route->ksnr_myipaddr;
405                                 *peer_ip = route->ksnr_ipaddr;
406                                 *port = route->ksnr_port;
407                                 *conn_count = route->ksnr_conn_count;
408                                 *share_count = route->ksnr_share_count;
409                                 rc = 0;
410                                 goto out;
411                         }
412                 }
413         }
414  out:
415         read_unlock (&ksocknal_data.ksnd_global_lock);
416         return (rc);
417 }
418
419 void
420 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
421 {
422         ksock_peer_t      *peer = route->ksnr_peer;
423         int                type = conn->ksnc_type;
424         ksock_interface_t *iface;
425
426         conn->ksnc_route = route;
427         atomic_inc (&route->ksnr_refcount);
428
429         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
430                 if (route->ksnr_myipaddr == 0) {
431                         /* route wasn't bound locally yet (the initial route) */
432                         CWARN("Binding "LPX64" %u.%u.%u.%u to %u.%u.%u.%u\n",
433                               peer->ksnp_nid, 
434                               HIPQUAD(route->ksnr_ipaddr),
435                               HIPQUAD(conn->ksnc_myipaddr));
436                 } else {
437                         CWARN("Rebinding "LPX64" %u.%u.%u.%u from "
438                               "%u.%u.%u.%u to %u.%u.%u.%u\n",
439                               peer->ksnp_nid, 
440                               HIPQUAD(route->ksnr_ipaddr),
441                               HIPQUAD(route->ksnr_myipaddr),
442                               HIPQUAD(conn->ksnc_myipaddr));
443                         
444                         iface = ksocknal_ip2iface(route->ksnr_myipaddr);
445                         if (iface != NULL) 
446                                 iface->ksni_nroutes--;
447                 }
448                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
449                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
450                 if (iface != NULL) 
451                         iface->ksni_nroutes++;
452         }
453
454         route->ksnr_connected |= (1<<type);
455         route->ksnr_connecting &= ~(1<<type);
456         route->ksnr_conn_count++;
457
458         /* Successful connection => further attempts can
459          * proceed immediately */
460         route->ksnr_timeout = jiffies;
461         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
462 }
463
464 void
465 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
466 {
467         struct list_head  *tmp;
468         ksock_conn_t      *conn;
469         int                type;
470         ksock_route_t     *route2;
471
472         LASSERT (route->ksnr_peer == NULL);
473         LASSERT (route->ksnr_connecting == 0);
474         LASSERT (route->ksnr_connected == 0);
475
476         /* LASSERT(unique) */
477         list_for_each(tmp, &peer->ksnp_routes) {
478                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
479
480                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
481                         CERROR ("Duplicate route "LPX64" %u.%u.%u.%u\n",
482                                 peer->ksnp_nid, HIPQUAD(route->ksnr_ipaddr));
483                         LBUG();
484                 }
485         }
486
487         route->ksnr_peer = peer;
488         atomic_inc (&peer->ksnp_refcount);
489         /* peer's routelist takes over my ref on 'route' */
490         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
491         
492         list_for_each(tmp, &peer->ksnp_conns) {
493                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
494                 type = conn->ksnc_type;
495
496                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
497                         continue;
498
499                 ksocknal_associate_route_conn_locked(route, conn);
500                 /* keep going (typed routes) */
501         }
502 }
503
504 void
505 ksocknal_del_route_locked (ksock_route_t *route)
506 {
507         ksock_peer_t      *peer = route->ksnr_peer;
508         ksock_interface_t *iface;
509         ksock_conn_t      *conn;
510         struct list_head  *ctmp;
511         struct list_head  *cnxt;
512
513         LASSERT (!route->ksnr_deleted);
514
515         /* Close associated conns */
516         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
517                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
518
519                 if (conn->ksnc_route != route)
520                         continue;
521                 
522                 ksocknal_close_conn_locked (conn, 0);
523         }
524
525         if (route->ksnr_myipaddr != 0) {
526                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
527                 if (iface != NULL)
528                         iface->ksni_nroutes--;
529         }
530
531         route->ksnr_deleted = 1;
532         list_del (&route->ksnr_list);
533         ksocknal_put_route (route);             /* drop peer's ref */
534
535         if (list_empty (&peer->ksnp_routes) &&
536             list_empty (&peer->ksnp_conns)) {
537                 /* I've just removed the last autoconnect route of a peer
538                  * with no active connections */
539                 ksocknal_unlink_peer_locked (peer);
540         }
541 }
542
543 int
544 ksocknal_add_peer (ptl_nid_t nid, __u32 ipaddr, int port)
545 {
546         unsigned long      flags;
547         struct list_head  *tmp;
548         ksock_peer_t      *peer;
549         ksock_peer_t      *peer2;
550         ksock_route_t     *route;
551         ksock_route_t     *route2;
552         
553         if (nid == PTL_NID_ANY)
554                 return (-EINVAL);
555
556         /* Have a brand new peer ready... */
557         peer = ksocknal_create_peer (nid);
558         if (peer == NULL)
559                 return (-ENOMEM);
560
561         route = ksocknal_create_route (ipaddr, port);
562         if (route == NULL) {
563                 ksocknal_put_peer (peer);
564                 return (-ENOMEM);
565         }
566
567         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
568
569         peer2 = ksocknal_find_peer_locked (nid);
570         if (peer2 != NULL) {
571                 ksocknal_put_peer (peer);
572                 peer = peer2;
573         } else {
574                 /* peer table takes my ref on peer */
575                 list_add_tail (&peer->ksnp_list,
576                                ksocknal_nid2peerlist (nid));
577         }
578
579         route2 = NULL;
580         list_for_each (tmp, &peer->ksnp_routes) {
581                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
582                 
583                 if (route2->ksnr_ipaddr == ipaddr)
584                         break;
585                 
586                 route2 = NULL;
587         }
588         if (route2 == NULL) {
589                 ksocknal_add_route_locked(peer, route);
590                 route->ksnr_share_count++;
591         } else {
592                 ksocknal_put_route(route);
593                 route2->ksnr_share_count++;
594         }
595
596         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
597
598         return (0);
599 }
600
601 void
602 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip, int single_share)
603 {
604         ksock_conn_t     *conn;
605         ksock_route_t    *route;
606         struct list_head *tmp;
607         struct list_head *nxt;
608         int               nshared;
609
610         LASSERT (!peer->ksnp_closing);
611
612         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
613                 route = list_entry(tmp, ksock_route_t, ksnr_list);
614
615                 if (single_share && route->ksnr_share_count == 0)
616                         continue;
617
618                 /* no match */
619                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
620                         continue;
621
622                 if (!single_share)
623                         route->ksnr_share_count = 0;
624                 else if (route->ksnr_share_count > 0)
625                         route->ksnr_share_count--;
626
627                 if (route->ksnr_share_count == 0) {
628                         /* This deletes associated conns too */
629                         ksocknal_del_route_locked (route);
630                 }
631                 
632                 if (single_share)
633                         break;
634         }
635
636         nshared = 0;
637         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
638                 route = list_entry(tmp, ksock_route_t, ksnr_list);
639                 nshared += route->ksnr_share_count;
640         }
641                         
642         if (nshared == 0) {
643                 /* remove everything else if there are no explicit entries
644                  * left */
645
646                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
647                         route = list_entry(tmp, ksock_route_t, ksnr_list);
648
649                         /* we should only be removing auto-entries */
650                         LASSERT(route->ksnr_share_count == 0);
651                         ksocknal_del_route_locked (route);
652                 }
653
654                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
655                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
656
657                         ksocknal_close_conn_locked(conn, 0);
658                 }
659         }
660                 
661         /* NB peer unlinks itself when last conn/route is removed */
662 }
663
664 int
665 ksocknal_del_peer (ptl_nid_t nid, __u32 ip, int single_share)
666 {
667         unsigned long      flags;
668         struct list_head  *ptmp;
669         struct list_head  *pnxt;
670         ksock_peer_t      *peer;
671         int                lo;
672         int                hi;
673         int                i;
674         int                rc = -ENOENT;
675
676         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
677
678         if (nid != PTL_NID_ANY)
679                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
680         else {
681                 lo = 0;
682                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
683         }
684
685         for (i = lo; i <= hi; i++) {
686                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
687                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
688
689                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
690                                 continue;
691
692                         ksocknal_del_peer_locked (peer, ip, single_share);
693                         rc = 0;                 /* matched! */
694
695                         if (single_share)
696                                 break;
697                 }
698         }
699
700         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
701
702         return (rc);
703 }
704
705 ksock_conn_t *
706 ksocknal_get_conn_by_idx (int index)
707 {
708         ksock_peer_t      *peer;
709         struct list_head  *ptmp;
710         ksock_conn_t      *conn;
711         struct list_head  *ctmp;
712         int                i;
713
714         read_lock (&ksocknal_data.ksnd_global_lock);
715
716         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
717                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
718                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
719
720                         LASSERT (!peer->ksnp_closing);
721
722                         list_for_each (ctmp, &peer->ksnp_conns) {
723                                 if (index-- > 0)
724                                         continue;
725
726                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
727                                 atomic_inc (&conn->ksnc_refcount);
728                                 read_unlock (&ksocknal_data.ksnd_global_lock);
729                                 return (conn);
730                         }
731                 }
732         }
733
734         read_unlock (&ksocknal_data.ksnd_global_lock);
735         return (NULL);
736 }
737
738 int
739 ksocknal_get_conn_addrs (ksock_conn_t *conn)
740 {
741         struct sockaddr_in sin;
742         int                len = sizeof (sin);
743         int                rc;
744         
745         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
746                                             (struct sockaddr *)&sin, &len, 2);
747         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
748         LASSERT (!conn->ksnc_closing);
749
750         if (rc != 0) {
751                 CERROR ("Error %d getting sock peer IP\n", rc);
752                 return rc;
753         }
754
755         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
756         conn->ksnc_port   = ntohs (sin.sin_port);
757
758         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
759                                             (struct sockaddr *)&sin, &len, 0);
760         if (rc != 0) {
761                 CERROR ("Error %d getting sock local IP\n", rc);
762                 return rc;
763         }
764
765         conn->ksnc_myipaddr = ntohl (sin.sin_addr.s_addr);
766
767         return 0;
768 }
769
770 unsigned int
771 ksocknal_sock_irq (struct socket *sock)
772 {
773         int                irq = 0;
774         struct dst_entry  *dst;
775
776         if (!ksocknal_tunables.ksnd_irq_affinity)
777                 return 0;
778
779         dst = sk_dst_get (sock->sk);
780         if (dst != NULL) {
781                 if (dst->dev != NULL) {
782                         irq = dst->dev->irq;
783                         if (irq >= NR_IRQS) {
784                                 CERROR ("Unexpected IRQ %x\n", irq);
785                                 irq = 0;
786                         }
787                 }
788                 dst_release (dst);
789         }
790         
791         return (irq);
792 }
793
794 ksock_sched_t *
795 ksocknal_choose_scheduler_locked (unsigned int irq)
796 {
797         ksock_sched_t    *sched;
798         ksock_irqinfo_t  *info;
799         int               i;
800
801         LASSERT (irq < NR_IRQS);
802         info = &ksocknal_data.ksnd_irqinfo[irq];
803
804         if (irq != 0 &&                         /* hardware NIC */
805             info->ksni_valid) {                 /* already set up */
806                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
807         }
808
809         /* software NIC (irq == 0) || not associated with a scheduler yet.
810          * Choose the CPU with the fewest connections... */
811         sched = &ksocknal_data.ksnd_schedulers[0];
812         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
813                 if (sched->kss_nconns >
814                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
815                         sched = &ksocknal_data.ksnd_schedulers[i];
816
817         if (irq != 0) {                         /* Hardware NIC */
818                 info->ksni_valid = 1;
819                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
820
821                 /* no overflow... */
822                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
823         }
824
825         return (sched);
826 }
827
828 int
829 ksocknal_local_ipvec (__u32 *ipaddrs)
830 {
831         int                i;
832         int                nip;
833
834         read_lock (&ksocknal_data.ksnd_global_lock);
835
836         nip = ksocknal_data.ksnd_ninterfaces;
837         for (i = 0; i < nip; i++) {
838                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
839
840                 ipaddrs[i] = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
841                 LASSERT (ipaddrs[i] != 0);
842         }
843         
844         read_unlock (&ksocknal_data.ksnd_global_lock);
845         return (nip);
846 }
847
848 int
849 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
850 {
851         int   best_netmatch = 0;
852         int   best_xor      = 0;
853         int   best          = -1;
854         int   this_xor;
855         int   this_netmatch;
856         int   i;
857         
858         for (i = 0; i < nips; i++) {
859                 if (ips[i] == 0)
860                         continue;
861
862                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
863                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
864                 
865                 if (!(best < 0 ||
866                       best_netmatch < this_netmatch ||
867                       (best_netmatch == this_netmatch && 
868                        best_xor > this_xor)))
869                         continue;
870                 
871                 best = i;
872                 best_netmatch = this_netmatch;
873                 best_xor = this_xor;
874         }
875         
876         LASSERT (best >= 0);
877         return (best);
878 }
879
880 int
881 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
882 {
883         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
884         unsigned long       flags;
885         ksock_interface_t  *iface;
886         ksock_interface_t  *best_iface;
887         int                 n_ips;
888         int                 i;
889         int                 j;
890         int                 k;
891         __u32               ip;
892         __u32               xor;
893         int                 this_netmatch;
894         int                 best_netmatch;
895         int                 best_npeers;
896
897         /* CAVEAT EMPTOR: We do all our interface matching with an
898          * exclusive hold of global lock at IRQ priority.  We're only
899          * expecting to be dealing with small numbers of interfaces, so the
900          * O(n**3)-ness shouldn't matter */
901
902         /* Also note that I'm not going to return more than n_peerips
903          * interfaces, even if I have more myself */
904         
905         write_lock_irqsave(global_lock, flags);
906
907         LASSERT (n_peerips <= SOCKNAL_MAX_INTERFACES);
908         LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
909
910         n_ips = MIN(n_peerips, ksocknal_data.ksnd_ninterfaces);
911
912         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
913                 /*              ^ yes really... */
914
915                 /* If we have any new interfaces, first tick off all the
916                  * peer IPs that match old interfaces, then choose new
917                  * interfaces to match the remaining peer IPS. 
918                  * We don't forget interfaces we've stopped using; we might
919                  * start using them again... */
920                 
921                 if (i < peer->ksnp_n_passive_ips) {
922                         /* Old interface. */
923                         ip = peer->ksnp_passive_ips[i];
924                         best_iface = ksocknal_ip2iface(ip);
925
926                         /* peer passive ips are kept up to date */
927                         LASSERT(best_iface != NULL);
928                 } else {
929                         /* choose a new interface */
930                         LASSERT (i == peer->ksnp_n_passive_ips);
931
932                         best_iface = NULL;
933                         best_netmatch = 0;
934                         best_npeers = 0;
935                         
936                         for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
937                                 iface = &ksocknal_data.ksnd_interfaces[j];
938                                 ip = iface->ksni_ipaddr;
939
940                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
941                                         if (peer->ksnp_passive_ips[k] == ip)
942                                                 break;
943                         
944                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
945                                         continue;
946
947                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
948                                 xor = (ip ^ peerips[k]);
949                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
950
951                                 if (!(best_iface == NULL ||
952                                       best_netmatch < this_netmatch ||
953                                       (best_netmatch == this_netmatch &&
954                                        best_npeers > iface->ksni_npeers)))
955                                         continue;
956
957                                 best_iface = iface;
958                                 best_netmatch = this_netmatch;
959                                 best_npeers = iface->ksni_npeers;
960                         }
961
962                         best_iface->ksni_npeers++;
963                         ip = best_iface->ksni_ipaddr;
964                         peer->ksnp_passive_ips[i] = ip;
965                         peer->ksnp_n_passive_ips = i+1;
966                 }
967                 
968                 LASSERT (best_iface != NULL);
969
970                 /* mark the best matching peer IP used */
971                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
972                 peerips[j] = 0;
973         }
974         
975         /* Overwrite input peer IP addresses */
976         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
977         
978         write_unlock_irqrestore(global_lock, flags);
979         
980         return (n_ips);
981 }
982
983 void
984 ksocknal_create_routes(ksock_peer_t *peer, int port, 
985                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
986 {
987         ksock_route_t      *newroute = NULL;
988         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
989         unsigned long       flags;
990         struct list_head   *rtmp;
991         ksock_route_t      *route;
992         ksock_interface_t  *iface;
993         ksock_interface_t  *best_iface;
994         int                 best_netmatch;
995         int                 this_netmatch;
996         int                 best_nroutes;
997         int                 i;
998         int                 j;
999
1000         /* CAVEAT EMPTOR: We do all our interface matching with an
1001          * exclusive hold of global lock at IRQ priority.  We're only
1002          * expecting to be dealing with small numbers of interfaces, so the
1003          * O(n**3)-ness here shouldn't matter */
1004
1005         write_lock_irqsave(global_lock, flags);
1006
1007         LASSERT (npeer_ipaddrs <= SOCKNAL_MAX_INTERFACES);
1008         
1009         for (i = 0; i < npeer_ipaddrs; i++) {
1010                 if (newroute != NULL) {
1011                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
1012                 } else {
1013                         write_unlock_irqrestore(global_lock, flags);
1014
1015                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
1016                         if (newroute == NULL)
1017                                 return;
1018
1019                         write_lock_irqsave(global_lock, flags);
1020                 }
1021                 
1022                 /* Already got a route? */
1023                 route = NULL;
1024                 list_for_each(rtmp, &peer->ksnp_routes) {
1025                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1026
1027                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
1028                                 break;
1029                         
1030                         route = NULL;
1031                 }
1032                 if (route != NULL)
1033                         continue;
1034
1035                 best_iface = NULL;
1036                 best_nroutes = 0;
1037                 best_netmatch = 0;
1038
1039                 LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
1040
1041                 /* Select interface to connect from */
1042                 for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
1043                         iface = &ksocknal_data.ksnd_interfaces[j];
1044
1045                         /* Using this interface already? */
1046                         list_for_each(rtmp, &peer->ksnp_routes) {
1047                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
1048
1049                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
1050                                         break;
1051
1052                                 route = NULL;
1053                         }
1054                         if (route != NULL)
1055                                 continue;
1056
1057                         this_netmatch = (((iface->ksni_ipaddr ^ 
1058                                            newroute->ksnr_ipaddr) & 
1059                                            iface->ksni_netmask) == 0) ? 1 : 0;
1060                         
1061                         if (!(best_iface == NULL ||
1062                               best_netmatch < this_netmatch ||
1063                               (best_netmatch == this_netmatch &&
1064                                best_nroutes > iface->ksni_nroutes)))
1065                                 continue;
1066                         
1067                         best_iface = iface;
1068                         best_netmatch = this_netmatch;
1069                         best_nroutes = iface->ksni_nroutes;
1070                 }
1071                 
1072                 if (best_iface == NULL)
1073                         continue;
1074
1075                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
1076                 best_iface->ksni_nroutes++;
1077
1078                 ksocknal_add_route_locked(peer, newroute);
1079                 newroute = NULL;
1080         }
1081         
1082         write_unlock_irqrestore(global_lock, flags);
1083         if (newroute != NULL)
1084                 ksocknal_put_route(newroute);
1085 }
1086
1087 int
1088 ksocknal_create_conn (ksock_route_t *route, struct socket *sock, int type)
1089 {
1090         int                passive = (type == SOCKNAL_CONN_NONE);
1091         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
1092         __u32              ipaddrs[SOCKNAL_MAX_INTERFACES];
1093         int                nipaddrs;
1094         ptl_nid_t          nid;
1095         struct list_head  *tmp;
1096         __u64              incarnation;
1097         unsigned long      flags;
1098         ksock_conn_t      *conn;
1099         ksock_conn_t      *conn2;
1100         ksock_peer_t      *peer = NULL;
1101         ksock_peer_t      *peer2;
1102         ksock_sched_t     *sched;
1103         unsigned int       irq;
1104         ksock_tx_t        *tx;
1105         int                rc;
1106
1107         /* NB, sock has an associated file since (a) this connection might
1108          * have been created in userland and (b) we need to refcount the
1109          * socket so that we don't close it while I/O is being done on
1110          * it, and sock->file has that pre-cooked... */
1111         LASSERT (sock->file != NULL);
1112         LASSERT (file_count(sock->file) > 0);
1113         LASSERT (route == NULL || !passive);
1114
1115         rc = ksocknal_setup_sock (sock);
1116         if (rc != 0)
1117                 return (rc);
1118
1119         irq = ksocknal_sock_irq (sock);
1120
1121         PORTAL_ALLOC(conn, sizeof(*conn));
1122         if (conn == NULL)
1123                 return (-ENOMEM);
1124
1125         memset (conn, 0, sizeof (*conn));
1126         conn->ksnc_peer = NULL;
1127         conn->ksnc_route = NULL;
1128         conn->ksnc_sock = sock;
1129         conn->ksnc_type = type;
1130         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
1131         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
1132         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
1133
1134         conn->ksnc_rx_ready = 0;
1135         conn->ksnc_rx_scheduled = 0;
1136         ksocknal_new_packet (conn, 0);
1137
1138         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
1139         conn->ksnc_tx_ready = 0;
1140         conn->ksnc_tx_scheduled = 0;
1141         atomic_set (&conn->ksnc_tx_nob, 0);
1142
1143         /* stash conn's local and remote addrs */
1144         rc = ksocknal_get_conn_addrs (conn);
1145         if (rc != 0)
1146                 goto failed_0;
1147
1148         if (!passive) {
1149                 /* Active connection sends HELLO eagerly */
1150                 rc = ksocknal_local_ipvec(ipaddrs);
1151                 if (rc < 0)
1152                         goto failed_0;
1153                 nipaddrs = rc;
1154
1155                 rc = ksocknal_send_hello (conn, ipaddrs, nipaddrs);
1156                 if (rc != 0)
1157                         goto failed_0;
1158         }
1159
1160         /* Find out/confirm peer's NID and connection type and get the
1161          * vector of interfaces she's willing to let me connect to */
1162         nid = (route == NULL) ? PTL_NID_ANY : route->ksnr_peer->ksnp_nid;
1163         rc = ksocknal_recv_hello (conn, &nid, &incarnation, ipaddrs);
1164         if (rc < 0)
1165                 goto failed_0;
1166         nipaddrs = rc;
1167         LASSERT (nid != PTL_NID_ANY);
1168
1169         if (route != NULL) {
1170                 peer = route->ksnr_peer;
1171                 atomic_inc(&peer->ksnp_refcount);
1172         } else {
1173                 peer = ksocknal_create_peer(nid);
1174                 if (peer == NULL) {
1175                         rc = -ENOMEM;
1176                         goto failed_0;
1177                 }
1178
1179                 write_lock_irqsave(global_lock, flags);
1180
1181                 peer2 = ksocknal_find_peer_locked(nid);
1182                 if (peer2 == NULL) {
1183                         /* NB this puts an "empty" peer in the peer
1184                          * table (which takes my ref) */
1185                         list_add_tail(&peer->ksnp_list,
1186                                       ksocknal_nid2peerlist(nid));
1187                 } else  {
1188                         ksocknal_put_peer(peer);
1189                         peer = peer2;
1190                 }
1191                 /* +1 ref for me */
1192                 atomic_inc(&peer->ksnp_refcount);
1193
1194                 write_unlock_irqrestore(global_lock, flags);
1195         }
1196         
1197         if (!passive) {
1198                 ksocknal_create_routes(peer, conn->ksnc_port, 
1199                                        ipaddrs, nipaddrs);
1200                 rc = 0;
1201         } else {
1202                 rc = ksocknal_select_ips(peer, ipaddrs, nipaddrs);
1203                 LASSERT (rc >= 0);
1204                 rc = ksocknal_send_hello (conn, ipaddrs, rc);
1205         }
1206         if (rc < 0)
1207                 goto failed_1;
1208         
1209         write_lock_irqsave (global_lock, flags);
1210
1211         if (peer->ksnp_closing ||
1212             (route != NULL && route->ksnr_deleted)) {
1213                 /* route/peer got closed under me */
1214                 rc = -ESTALE;
1215                 goto failed_2;
1216         }
1217
1218         /* Refuse to duplicate an existing connection (both sides might
1219          * autoconnect at once), unless this is a loopback connection */
1220         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1221                 list_for_each(tmp, &peer->ksnp_conns) {
1222                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1223
1224                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1225                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1226                             conn2->ksnc_type != conn->ksnc_type ||
1227                             conn2->ksnc_incarnation != incarnation)
1228                                 continue;
1229                         
1230                         CWARN("Not creating duplicate connection to "
1231                               "%u.%u.%u.%u type %d\n", 
1232                               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_type);
1233                         rc = -EALREADY;
1234                         goto failed_2;
1235                 }
1236         }
1237
1238         /* If the connection created by this route didn't bind to the IP
1239          * address the route connected to, the connection/route matching
1240          * code below probably isn't going to work. */
1241         if (route != NULL &&
1242             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1243                 CERROR("Route "LPX64" %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1244                        peer->ksnp_nid,
1245                        HIPQUAD(route->ksnr_ipaddr),
1246                        HIPQUAD(conn->ksnc_ipaddr));
1247         }
1248
1249         /* Search for a route corresponding to the new connection and
1250          * create an association.  This allows incoming connections created
1251          * by routes in my peer to match my own route entries so I don't
1252          * continually create duplicate routes. */
1253         list_for_each (tmp, &peer->ksnp_routes) {
1254                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1255
1256                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1257                         continue;
1258                 
1259                 ksocknal_associate_route_conn_locked(route, conn);
1260                 break;
1261         }
1262
1263         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1264         conn->ksnc_incarnation = incarnation;
1265         peer->ksnp_last_alive = jiffies;
1266         peer->ksnp_error = 0;
1267
1268         sched = ksocknal_choose_scheduler_locked (irq);
1269         sched->kss_nconns++;
1270         conn->ksnc_scheduler = sched;
1271
1272         /* Set the deadline for the outgoing HELLO to drain */
1273         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1274         conn->ksnc_tx_deadline = jiffies +
1275                                  ksocknal_tunables.ksnd_io_timeout * HZ;
1276         mb();       /* order with adding to peer's conn list */
1277
1278         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1279         atomic_inc (&conn->ksnc_refcount);
1280
1281         /* NB my callbacks block while I hold ksnd_global_lock */
1282         sock->sk->sk_user_data = conn;
1283         sock->sk->sk_data_ready = ksocknal_data_ready;
1284         sock->sk->sk_write_space = ksocknal_write_space;
1285
1286         /* Take all the packets blocking for a connection.
1287          * NB, it might be nicer to share these blocked packets among any
1288          * other connections that are becoming established. */
1289         while (!list_empty (&peer->ksnp_tx_queue)) {
1290                 tx = list_entry (peer->ksnp_tx_queue.next,
1291                                  ksock_tx_t, tx_list);
1292
1293                 list_del (&tx->tx_list);
1294                 ksocknal_queue_tx_locked (tx, conn);
1295         }
1296
1297         rc = ksocknal_close_stale_conns_locked(peer, incarnation);
1298         if (rc != 0)
1299                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
1300                         rc, conn->ksnc_peer->ksnp_nid,
1301                         HIPQUAD(conn->ksnc_ipaddr));
1302
1303         write_unlock_irqrestore (global_lock, flags);
1304
1305         ksocknal_bind_irq (irq);
1306
1307         /* Call the callbacks right now to get things going. */
1308         if (ksocknal_getconnsock(conn) == 0) {
1309                 ksocknal_data_ready (sock->sk, 0);
1310                 ksocknal_write_space (sock->sk);
1311                 ksocknal_putconnsock(conn);
1312         }
1313
1314         CWARN("New conn nid:"LPX64" [type:%d] %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1315               " incarnation:"LPX64" sched[%d]/%d\n",
1316               nid, conn->ksnc_type, HIPQUAD(conn->ksnc_myipaddr), 
1317               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port, incarnation,
1318               (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1319
1320         ksocknal_put_conn (conn);
1321         return (0);
1322
1323  failed_2:
1324         if (!peer->ksnp_closing &&
1325             list_empty (&peer->ksnp_conns) &&
1326             list_empty (&peer->ksnp_routes))
1327                 ksocknal_unlink_peer_locked(peer);
1328         write_unlock_irqrestore(global_lock, flags);
1329
1330  failed_1:
1331         ksocknal_put_peer (peer);
1332
1333  failed_0:
1334         PORTAL_FREE (conn, sizeof(*conn));
1335
1336         LASSERT (rc != 0);
1337         return (rc);
1338 }
1339
1340 void
1341 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1342 {
1343         /* This just does the immmediate housekeeping, and queues the
1344          * connection for the reaper to terminate.
1345          * Caller holds ksnd_global_lock exclusively in irq context */
1346         ksock_peer_t      *peer = conn->ksnc_peer;
1347         ksock_route_t     *route;
1348         ksock_conn_t      *conn2;
1349         struct list_head  *tmp;
1350
1351         LASSERT (peer->ksnp_error == 0);
1352         LASSERT (!conn->ksnc_closing);
1353         conn->ksnc_closing = 1;
1354         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
1355         
1356         /* ksnd_deathrow_conns takes over peer's ref */
1357         list_del (&conn->ksnc_list);
1358
1359         route = conn->ksnc_route;
1360         if (route != NULL) {
1361                 /* dissociate conn from route... */
1362                 LASSERT (!route->ksnr_deleted);
1363                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
1364                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1365
1366                 conn2 = NULL;
1367                 list_for_each(tmp, &peer->ksnp_conns) {
1368                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1369                         
1370                         if (conn2->ksnc_route == route &&
1371                             conn2->ksnc_type == conn->ksnc_type)
1372                                 break;
1373                         
1374                         conn2 = NULL;
1375                 }
1376                 if (conn2 == NULL)
1377                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1378
1379                 conn->ksnc_route = NULL;
1380
1381 #if 0           /* irrelevent with only eager routes */
1382                 list_del (&route->ksnr_list);   /* make route least favourite */
1383                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1384 #endif
1385                 ksocknal_put_route (route);     /* drop conn's ref on route */
1386         }
1387
1388         if (list_empty (&peer->ksnp_conns)) {
1389                 /* No more connections to this peer */
1390
1391                 peer->ksnp_error = error;       /* stash last conn close reason */
1392
1393                 if (list_empty (&peer->ksnp_routes)) {
1394                         /* I've just closed last conn belonging to a
1395                          * non-autoconnecting peer */
1396                         ksocknal_unlink_peer_locked (peer);
1397                 }
1398         }
1399
1400         spin_lock (&ksocknal_data.ksnd_reaper_lock);
1401
1402         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1403         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1404                 
1405         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
1406 }
1407
1408 void
1409 ksocknal_terminate_conn (ksock_conn_t *conn)
1410 {
1411         /* This gets called by the reaper (guaranteed thread context) to
1412          * disengage the socket from its callbacks and close it.
1413          * ksnc_refcount will eventually hit zero, and then the reaper will
1414          * destroy it. */
1415         unsigned long   flags;
1416         ksock_peer_t   *peer = conn->ksnc_peer;
1417         ksock_sched_t  *sched = conn->ksnc_scheduler;
1418         struct timeval  now;
1419         time_t          then = 0;
1420         int             notify = 0;
1421
1422         LASSERT(conn->ksnc_closing);
1423
1424         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1425         spin_lock_irqsave(&sched->kss_lock, flags);
1426
1427         if (!conn->ksnc_tx_scheduled &&
1428             !list_empty(&conn->ksnc_tx_queue)){
1429                 list_add_tail (&conn->ksnc_tx_list,
1430                                &sched->kss_tx_conns);
1431                 /* a closing conn is always ready to tx */
1432                 conn->ksnc_tx_ready = 1;
1433                 conn->ksnc_tx_scheduled = 1;
1434                 /* extra ref for scheduler */
1435                 atomic_inc (&conn->ksnc_refcount);
1436
1437                 wake_up (&sched->kss_waitq);
1438         }
1439
1440         spin_unlock_irqrestore (&sched->kss_lock, flags);
1441
1442         /* serialise with callbacks */
1443         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1444
1445         /* Remove conn's network callbacks.
1446          * NB I _have_ to restore the callback, rather than storing a noop,
1447          * since the socket could survive past this module being unloaded!! */
1448         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
1449         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
1450
1451         /* A callback could be in progress already; they hold a read lock
1452          * on ksnd_global_lock (to serialise with me) and NOOP if
1453          * sk_user_data is NULL. */
1454         conn->ksnc_sock->sk->sk_user_data = NULL;
1455
1456         /* OK, so this conn may not be completely disengaged from its
1457          * scheduler yet, but it _has_ committed to terminate... */
1458         conn->ksnc_scheduler->kss_nconns--;
1459
1460         if (peer->ksnp_error != 0) {
1461                 /* peer's last conn closed in error */
1462                 LASSERT (list_empty (&peer->ksnp_conns));
1463                 
1464                 /* convert peer's last-known-alive timestamp from jiffies */
1465                 do_gettimeofday (&now);
1466                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
1467                 notify = 1;
1468         }
1469         
1470         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1471
1472         /* The socket is closed on the final put; either here, or in
1473          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1474          * when the connection was established, this will close the socket
1475          * immediately, aborting anything buffered in it. Any hung
1476          * zero-copy transmits will therefore complete in finite time. */
1477         ksocknal_putconnsock (conn);
1478
1479         if (notify)
1480                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
1481                             0, then);
1482 }
1483
1484 void
1485 ksocknal_destroy_conn (ksock_conn_t *conn)
1486 {
1487         /* Final coup-de-grace of the reaper */
1488         CDEBUG (D_NET, "connection %p\n", conn);
1489
1490         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1491         LASSERT (conn->ksnc_route == NULL);
1492         LASSERT (!conn->ksnc_tx_scheduled);
1493         LASSERT (!conn->ksnc_rx_scheduled);
1494         LASSERT (list_empty(&conn->ksnc_tx_queue));
1495
1496         /* complete current receive if any */
1497         switch (conn->ksnc_rx_state) {
1498         case SOCKNAL_RX_BODY:
1499                 CERROR("Completing partial receive from "LPX64
1500                        ", ip %d.%d.%d.%d:%d, with error\n",
1501                        conn->ksnc_peer->ksnp_nid,
1502                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1503                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1504                 break;
1505         case SOCKNAL_RX_BODY_FWD:
1506                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1507                 break;
1508         case SOCKNAL_RX_HEADER:
1509         case SOCKNAL_RX_SLOP:
1510                 break;
1511         default:
1512                 LBUG ();
1513                 break;
1514         }
1515
1516         ksocknal_put_peer (conn->ksnc_peer);
1517
1518         PORTAL_FREE (conn, sizeof (*conn));
1519         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1520 }
1521
1522 void
1523 ksocknal_put_conn (ksock_conn_t *conn)
1524 {
1525         unsigned long flags;
1526
1527         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1528                 conn, conn->ksnc_peer->ksnp_nid,
1529                 atomic_read (&conn->ksnc_refcount));
1530
1531         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1532         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1533                 return;
1534
1535         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1536
1537         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1538         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1539
1540         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1541 }
1542
1543 int
1544 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1545 {
1546         ksock_conn_t       *conn;
1547         struct list_head   *ctmp;
1548         struct list_head   *cnxt;
1549         int                 count = 0;
1550
1551         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1552                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1553
1554                 if (ipaddr == 0 ||
1555                     conn->ksnc_ipaddr == ipaddr) {
1556                         count++;
1557                         ksocknal_close_conn_locked (conn, why);
1558                 }
1559         }
1560
1561         return (count);
1562 }
1563
1564 int
1565 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1566 {
1567         ksock_conn_t       *conn;
1568         struct list_head   *ctmp;
1569         struct list_head   *cnxt;
1570         int                 count = 0;
1571
1572         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1573                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1574
1575                 if (conn->ksnc_incarnation == incarnation)
1576                         continue;
1577
1578                 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1579                       "incarnation:"LPX64"("LPX64")\n",
1580                       peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1581                       conn->ksnc_incarnation, incarnation);
1582                 
1583                 count++;
1584                 ksocknal_close_conn_locked (conn, -ESTALE);
1585         }
1586
1587         return (count);
1588 }
1589
1590 int
1591 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why) 
1592 {
1593         ksock_peer_t     *peer = conn->ksnc_peer;
1594         __u32             ipaddr = conn->ksnc_ipaddr;
1595         unsigned long     flags;
1596         int               count;
1597
1598         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1599
1600         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1601         
1602         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1603
1604         return (count);
1605 }
1606
1607 int
1608 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1609 {
1610         unsigned long       flags;
1611         ksock_peer_t       *peer;
1612         struct list_head   *ptmp;
1613         struct list_head   *pnxt;
1614         int                 lo;
1615         int                 hi;
1616         int                 i;
1617         int                 count = 0;
1618
1619         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1620
1621         if (nid != PTL_NID_ANY)
1622                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1623         else {
1624                 lo = 0;
1625                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1626         }
1627
1628         for (i = lo; i <= hi; i++) {
1629                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1630
1631                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1632
1633                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1634                                 continue;
1635
1636                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1637                 }
1638         }
1639
1640         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1641
1642         /* wildcards always succeed */
1643         if (nid == PTL_NID_ANY || ipaddr == 0)
1644                 return (0);
1645         
1646         return (count == 0 ? -ENOENT : 0);
1647 }
1648
1649 void
1650 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1651 {
1652         /* The router is telling me she's been notified of a change in
1653          * gateway state.... */
1654
1655         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1656
1657         if (!alive) {
1658                 /* If the gateway crashed, close all open connections... */
1659                 ksocknal_close_matching_conns (gw_nid, 0);
1660                 return;
1661         }
1662         
1663         /* ...otherwise do nothing.  We can only establish new connections
1664          * if we have autroutes, and these connect on demand. */
1665 }
1666
1667 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1668 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1669 {
1670         return &(sk->tp_pinfo.af_tcp);
1671 }
1672 #else
1673 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1674 {
1675         struct tcp_sock *s = (struct tcp_sock *)sk;
1676         return &s->tcp;
1677 }
1678 #endif
1679
1680 void
1681 ksocknal_push_conn (ksock_conn_t *conn)
1682 {
1683         struct sock    *sk;
1684         struct tcp_opt *tp;
1685         int             nonagle;
1686         int             val = 1;
1687         int             rc;
1688         mm_segment_t    oldmm;
1689
1690         rc = ksocknal_getconnsock (conn);
1691         if (rc != 0)                            /* being shut down */
1692                 return;
1693         
1694         sk = conn->ksnc_sock->sk;
1695         tp = sock2tcp_opt(sk);
1696         
1697         lock_sock (sk);
1698         nonagle = tp->nonagle;
1699         tp->nonagle = 1;
1700         release_sock (sk);
1701
1702         oldmm = get_fs ();
1703         set_fs (KERNEL_DS);
1704
1705         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1706                                       (char *)&val, sizeof (val));
1707         LASSERT (rc == 0);
1708
1709         set_fs (oldmm);
1710
1711         lock_sock (sk);
1712         tp->nonagle = nonagle;
1713         release_sock (sk);
1714
1715         ksocknal_putconnsock (conn);
1716 }
1717
1718 void
1719 ksocknal_push_peer (ksock_peer_t *peer)
1720 {
1721         int               index;
1722         int               i;
1723         struct list_head *tmp;
1724         ksock_conn_t     *conn;
1725
1726         for (index = 0; ; index++) {
1727                 read_lock (&ksocknal_data.ksnd_global_lock);
1728
1729                 i = 0;
1730                 conn = NULL;
1731
1732                 list_for_each (tmp, &peer->ksnp_conns) {
1733                         if (i++ == index) {
1734                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1735                                 atomic_inc (&conn->ksnc_refcount);
1736                                 break;
1737                         }
1738                 }
1739
1740                 read_unlock (&ksocknal_data.ksnd_global_lock);
1741
1742                 if (conn == NULL)
1743                         break;
1744
1745                 ksocknal_push_conn (conn);
1746                 ksocknal_put_conn (conn);
1747         }
1748 }
1749
1750 int
1751 ksocknal_push (ptl_nid_t nid)
1752 {
1753         ksock_peer_t      *peer;
1754         struct list_head  *tmp;
1755         int                index;
1756         int                i;
1757         int                j;
1758         int                rc = -ENOENT;
1759
1760         if (nid != PTL_NID_ANY) {
1761                 peer = ksocknal_get_peer (nid);
1762
1763                 if (peer != NULL) {
1764                         rc = 0;
1765                         ksocknal_push_peer (peer);
1766                         ksocknal_put_peer (peer);
1767                 }
1768                 return (rc);
1769         }
1770
1771         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1772                 for (j = 0; ; j++) {
1773                         read_lock (&ksocknal_data.ksnd_global_lock);
1774
1775                         index = 0;
1776                         peer = NULL;
1777
1778                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1779                                 if (index++ == j) {
1780                                         peer = list_entry(tmp, ksock_peer_t,
1781                                                           ksnp_list);
1782                                         atomic_inc (&peer->ksnp_refcount);
1783                                         break;
1784                                 }
1785                         }
1786
1787                         read_unlock (&ksocknal_data.ksnd_global_lock);
1788
1789                         if (peer != NULL) {
1790                                 rc = 0;
1791                                 ksocknal_push_peer (peer);
1792                                 ksocknal_put_peer (peer);
1793                         }
1794                 }
1795
1796         }
1797
1798         return (rc);
1799 }
1800
1801 int
1802 ksocknal_add_interface(__u32 ipaddress, __u32 netmask)
1803 {
1804         unsigned long      flags;
1805         ksock_interface_t *iface;
1806         int                rc;
1807         int                i;
1808         int                j;
1809         struct list_head  *ptmp;
1810         ksock_peer_t      *peer;
1811         struct list_head  *rtmp;
1812         ksock_route_t     *route;
1813
1814         if (ipaddress == 0 ||
1815             netmask == 0)
1816                 return (-EINVAL);
1817
1818         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1819
1820         iface = ksocknal_ip2iface(ipaddress);
1821         if (iface != NULL) {
1822                 /* silently ignore dups */
1823                 rc = 0;
1824         } else if (ksocknal_data.ksnd_ninterfaces == SOCKNAL_MAX_INTERFACES) {
1825                 rc = -ENOSPC;
1826         } else {
1827                 iface = &ksocknal_data.ksnd_interfaces[ksocknal_data.ksnd_ninterfaces++];
1828
1829                 iface->ksni_ipaddr = ipaddress;
1830                 iface->ksni_netmask = netmask;
1831                 iface->ksni_nroutes = 0;
1832                 iface->ksni_npeers = 0;
1833
1834                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1835                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1836                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1837
1838                                 for (j = 0; i < peer->ksnp_n_passive_ips; j++)
1839                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1840                                                 iface->ksni_npeers++;
1841                                 
1842                                 list_for_each(rtmp, &peer->ksnp_routes) {
1843                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1844                                         
1845                                         if (route->ksnr_myipaddr == ipaddress)
1846                                                 iface->ksni_nroutes++;
1847                                 }
1848                         }
1849                 }
1850
1851                 rc = 0;
1852                 /* NB only new connections will pay attention to the new interface! */
1853         }
1854         
1855         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1856
1857         return (rc);
1858 }
1859
1860 void
1861 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1862 {
1863         struct list_head   *tmp;
1864         struct list_head   *nxt;
1865         ksock_route_t      *route;
1866         ksock_conn_t       *conn;
1867         int                 i;
1868         int                 j;
1869
1870         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1871                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1872                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1873                                 peer->ksnp_passive_ips[j-1] =
1874                                         peer->ksnp_passive_ips[j];
1875                         peer->ksnp_n_passive_ips--;
1876                         break;
1877                 }
1878
1879         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1880                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1881                 
1882                 if (route->ksnr_myipaddr != ipaddr)
1883                         continue;
1884                 
1885                 if (route->ksnr_share_count != 0) {
1886                         /* Manually created; keep, but unbind */
1887                         route->ksnr_myipaddr = 0;
1888                 } else {
1889                         ksocknal_del_route_locked(route);
1890                 }
1891         }
1892         
1893         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1894                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1895                 
1896                 if (conn->ksnc_myipaddr == ipaddr)
1897                         ksocknal_close_conn_locked (conn, 0);
1898         }
1899 }
1900
1901 int
1902 ksocknal_del_interface(__u32 ipaddress)
1903 {
1904         int                rc = -ENOENT;
1905         unsigned long      flags;
1906         struct list_head  *tmp;
1907         struct list_head  *nxt;
1908         ksock_peer_t      *peer;
1909         __u32              this_ip;
1910         int                i;
1911         int                j;
1912
1913         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1914
1915         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
1916                 this_ip = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
1917
1918                 if (!(ipaddress == 0 ||
1919                       ipaddress == this_ip))
1920                         continue;
1921
1922                 rc = 0;
1923
1924                 for (j = i+1; j < ksocknal_data.ksnd_ninterfaces; j++)
1925                         ksocknal_data.ksnd_interfaces[j-1] =
1926                                 ksocknal_data.ksnd_interfaces[j];
1927                 
1928                 ksocknal_data.ksnd_ninterfaces--;
1929
1930                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1931                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1932                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1933                                 
1934                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1935                         }
1936                 }
1937         }
1938         
1939         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1940         
1941         return (rc);
1942 }
1943
1944 int
1945 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1946 {
1947         int rc;
1948
1949         switch(pcfg->pcfg_command) {
1950         case NAL_CMD_GET_INTERFACE: {
1951                 ksock_interface_t *iface;
1952
1953                 read_lock (&ksocknal_data.ksnd_global_lock);
1954
1955                 if (pcfg->pcfg_count < 0 ||
1956                     pcfg->pcfg_count >= ksocknal_data.ksnd_ninterfaces) {
1957                         rc = -ENOENT;
1958                 } else {
1959                         rc = 0;
1960                         iface = &ksocknal_data.ksnd_interfaces[pcfg->pcfg_count];
1961
1962                         pcfg->pcfg_id    = iface->ksni_ipaddr;
1963                         pcfg->pcfg_misc  = iface->ksni_netmask;
1964                         pcfg->pcfg_fd    = iface->ksni_npeers;
1965                         pcfg->pcfg_count = iface->ksni_nroutes;
1966                 }
1967                 
1968                 read_unlock (&ksocknal_data.ksnd_global_lock);
1969                 break;
1970         }
1971         case NAL_CMD_ADD_INTERFACE: {
1972                 rc = ksocknal_add_interface(pcfg->pcfg_id, /* IP address */
1973                                             pcfg->pcfg_misc); /* net mask */
1974                 break;
1975         }
1976         case NAL_CMD_DEL_INTERFACE: {
1977                 rc = ksocknal_del_interface(pcfg->pcfg_id); /* IP address */
1978                 break;
1979         }
1980         case NAL_CMD_GET_PEER: {
1981                 ptl_nid_t    nid = 0;
1982                 __u32        myip = 0;
1983                 __u32        ip = 0;
1984                 int          port = 0;
1985                 int          conn_count = 0;
1986                 int          share_count = 0;
1987                 
1988                 rc = ksocknal_get_peer_info(pcfg->pcfg_count, &nid,
1989                                             &myip, &ip, &port,
1990                                             &conn_count,  &share_count);
1991                 pcfg->pcfg_nid   = nid;
1992                 pcfg->pcfg_size  = myip;
1993                 pcfg->pcfg_id    = ip;
1994                 pcfg->pcfg_misc  = port;
1995                 pcfg->pcfg_count = conn_count;
1996                 pcfg->pcfg_wait  = share_count;
1997                 break;
1998         }
1999         case NAL_CMD_ADD_PEER: {
2000                 rc = ksocknal_add_peer (pcfg->pcfg_nid, 
2001                                         pcfg->pcfg_id, /* IP */
2002                                         pcfg->pcfg_misc); /* port */
2003                 break;
2004         }
2005         case NAL_CMD_DEL_PEER: {
2006                 rc = ksocknal_del_peer (pcfg->pcfg_nid, 
2007                                         pcfg->pcfg_id, /* IP */
2008                                         pcfg->pcfg_flags); /* single_share? */
2009                 break;
2010         }
2011         case NAL_CMD_GET_CONN: {
2012                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
2013
2014                 if (conn == NULL)
2015                         rc = -ENOENT;
2016                 else {
2017                         int   txmem;
2018                         int   rxmem;
2019                         int   nagle;
2020
2021                         ksocknal_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2022
2023                         rc = 0;
2024                         pcfg->pcfg_nid    = conn->ksnc_peer->ksnp_nid;
2025                         pcfg->pcfg_id     = conn->ksnc_ipaddr;
2026                         pcfg->pcfg_misc   = conn->ksnc_port;
2027                         pcfg->pcfg_fd     = conn->ksnc_myipaddr;
2028                         pcfg->pcfg_flags  = conn->ksnc_type;
2029                         pcfg->pcfg_gw_nal = conn->ksnc_scheduler - 
2030                                             ksocknal_data.ksnd_schedulers;
2031                         pcfg->pcfg_count  = txmem;
2032                         pcfg->pcfg_size   = rxmem;
2033                         pcfg->pcfg_wait   = nagle;
2034                         ksocknal_put_conn (conn);
2035                 }
2036                 break;
2037         }
2038         case NAL_CMD_REGISTER_PEER_FD: {
2039                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
2040                 int            type = pcfg->pcfg_misc;
2041
2042                 if (sock == NULL)
2043                         break;
2044
2045                 switch (type) {
2046                 case SOCKNAL_CONN_NONE:
2047                 case SOCKNAL_CONN_ANY:
2048                 case SOCKNAL_CONN_CONTROL:
2049                 case SOCKNAL_CONN_BULK_IN:
2050                 case SOCKNAL_CONN_BULK_OUT:
2051                         rc = ksocknal_create_conn(NULL, sock, type);
2052                         break;
2053                 default:
2054                         rc = -EINVAL;
2055                         break;
2056                 }
2057                 if (rc != 0)
2058                         fput (sock->file);
2059                 break;
2060         }
2061         case NAL_CMD_CLOSE_CONNECTION: {
2062                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid, 
2063                                                     pcfg->pcfg_id);
2064                 break;
2065         }
2066         case NAL_CMD_REGISTER_MYNID: {
2067                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
2068                 break;
2069         }
2070         case NAL_CMD_PUSH_CONNECTION: {
2071                 rc = ksocknal_push (pcfg->pcfg_nid);
2072                 break;
2073         }
2074         default:
2075                 rc = -EINVAL;
2076                 break;
2077         }
2078
2079         return rc;
2080 }
2081
2082 void
2083 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
2084 {
2085         int          npages = p->fmp_buff_pages;
2086         ksock_fmb_t *fmb;
2087         int          i;
2088
2089         LASSERT (list_empty(&p->fmp_blocked_conns));
2090         LASSERT (p->fmp_nactive_fmbs == 0);
2091         
2092         while (!list_empty(&p->fmp_idle_fmbs)) {
2093
2094                 fmb = list_entry(p->fmp_idle_fmbs.next,
2095                                  ksock_fmb_t, fmb_list);
2096                 
2097                 for (i = 0; i < npages; i++)
2098                         if (fmb->fmb_kiov[i].kiov_page != NULL)
2099                                 __free_page(fmb->fmb_kiov[i].kiov_page);
2100
2101                 list_del(&fmb->fmb_list);
2102                 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
2103         }
2104 }
2105
2106 void
2107 ksocknal_free_buffers (void)
2108 {
2109         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
2110         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
2111
2112         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
2113
2114         if (ksocknal_data.ksnd_schedulers != NULL)
2115                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
2116                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2117
2118         PORTAL_FREE (ksocknal_data.ksnd_peers,
2119                      sizeof (struct list_head) * 
2120                      ksocknal_data.ksnd_peer_hash_size);
2121 }
2122
2123 void
2124 ksocknal_api_shutdown (nal_t *nal)
2125 {
2126         ksock_sched_t *sched;
2127         int            i;
2128
2129         if (nal->nal_refct != 0) {
2130                 /* This module got the first ref */
2131                 PORTAL_MODULE_UNUSE;
2132                 return;
2133         }
2134
2135         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2136                atomic_read (&portal_kmemory));
2137
2138         LASSERT(nal == &ksocknal_api);
2139
2140         switch (ksocknal_data.ksnd_init) {
2141         default:
2142                 LASSERT (0);
2143
2144         case SOCKNAL_INIT_ALL:
2145                 libcfs_nal_cmd_unregister(SOCKNAL);
2146
2147                 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
2148                 /* fall through */
2149
2150         case SOCKNAL_INIT_LIB:
2151                 /* No more calls to ksocknal_cmd() to create new
2152                  * autoroutes/connections since we're being unloaded. */
2153
2154                 /* Delete all peers */
2155                 ksocknal_del_peer(PTL_NID_ANY, 0, 0);
2156
2157                 /* Wait for all peer state to clean up */
2158                 i = 2;
2159                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
2160                         i++;
2161                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2162                                "waiting for %d peers to disconnect\n",
2163                                atomic_read (&ksocknal_data.ksnd_npeers));
2164                         set_current_state (TASK_UNINTERRUPTIBLE);
2165                         schedule_timeout (HZ);
2166                 }
2167
2168                 /* Tell lib we've stopped calling into her. */
2169                 lib_fini(&ksocknal_lib);
2170
2171                 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2172                 /* fall through */
2173
2174         case SOCKNAL_INIT_DATA:
2175                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
2176                 LASSERT (ksocknal_data.ksnd_peers != NULL);
2177                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2178                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
2179                 }
2180                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
2181                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
2182                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2183                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
2184                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
2185
2186                 if (ksocknal_data.ksnd_schedulers != NULL)
2187                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2188                                 ksock_sched_t *kss =
2189                                         &ksocknal_data.ksnd_schedulers[i];
2190
2191                                 LASSERT (list_empty (&kss->kss_tx_conns));
2192                                 LASSERT (list_empty (&kss->kss_rx_conns));
2193                                 LASSERT (kss->kss_nconns == 0);
2194                         }
2195
2196                 /* stop router calling me */
2197                 kpr_shutdown (&ksocknal_data.ksnd_router);
2198
2199                 /* flag threads to terminate; wake and wait for them to die */
2200                 ksocknal_data.ksnd_shuttingdown = 1;
2201                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
2202                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
2203
2204                 for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2205                         sched = &ksocknal_data.ksnd_schedulers[i];
2206                         wake_up_all(&sched->kss_waitq);
2207                 }
2208
2209                 i = 4;
2210                 read_lock(&ksocknal_data.ksnd_global_lock);
2211                 while (ksocknal_data.ksnd_nthreads != 0) {
2212                         i++;
2213                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2214                                "waiting for %d threads to terminate\n",
2215                                 ksocknal_data.ksnd_nthreads);
2216                         read_unlock(&ksocknal_data.ksnd_global_lock);
2217                         set_current_state (TASK_UNINTERRUPTIBLE);
2218                         schedule_timeout (HZ);
2219                         read_lock(&ksocknal_data.ksnd_global_lock);
2220                 }
2221                 read_unlock(&ksocknal_data.ksnd_global_lock);
2222
2223                 kpr_deregister (&ksocknal_data.ksnd_router);
2224
2225                 ksocknal_free_buffers();
2226
2227                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2228                 /* fall through */
2229
2230         case SOCKNAL_INIT_NOTHING:
2231                 break;
2232         }
2233
2234         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2235                atomic_read (&portal_kmemory));
2236
2237         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
2238                atomic_read(&portal_kmemory));
2239 }
2240
2241
2242 void
2243 ksocknal_init_incarnation (void)
2244 {
2245         struct timeval tv;
2246
2247         /* The incarnation number is the time this module loaded and it
2248          * identifies this particular instance of the socknal.  Hopefully
2249          * we won't be able to reboot more frequently than 1MHz for the
2250          * forseeable future :) */
2251         
2252         do_gettimeofday(&tv);
2253         
2254         ksocknal_data.ksnd_incarnation = 
2255                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2256 }
2257
2258 int
2259 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
2260                       ptl_ni_limits_t *requested_limits,
2261                       ptl_ni_limits_t *actual_limits)
2262 {
2263         ptl_process_id_t  process_id;
2264         int               pkmem = atomic_read(&portal_kmemory);
2265         int               rc;
2266         int               i;
2267         int               j;
2268
2269         LASSERT (nal == &ksocknal_api);
2270
2271         if (nal->nal_refct != 0) {
2272                 if (actual_limits != NULL)
2273                         *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
2274                 /* This module got the first ref */
2275                 PORTAL_MODULE_USE;
2276                 return (PTL_OK);
2277         }
2278
2279         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2280
2281         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2282
2283         ksocknal_init_incarnation();
2284         
2285         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2286         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
2287                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2288         if (ksocknal_data.ksnd_peers == NULL)
2289                 return (-ENOMEM);
2290
2291         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2292                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2293
2294         rwlock_init(&ksocknal_data.ksnd_global_lock);
2295
2296         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
2297         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
2298         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
2299         ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
2300
2301         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
2302         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
2303         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
2304         ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
2305
2306         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2307         INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2308         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2309         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2310         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2311
2312         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
2313         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
2314         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
2315
2316         /* NB memset above zeros whole of ksocknal_data, including
2317          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2318
2319         /* flag lists/ptrs/locks initialised */
2320         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2321
2322         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2323         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
2324                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2325         if (ksocknal_data.ksnd_schedulers == NULL) {
2326                 ksocknal_api_shutdown (nal);
2327                 return (-ENOMEM);
2328         }
2329
2330         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2331                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2332
2333                 spin_lock_init (&kss->kss_lock);
2334                 INIT_LIST_HEAD (&kss->kss_rx_conns);
2335                 INIT_LIST_HEAD (&kss->kss_tx_conns);
2336 #if SOCKNAL_ZC
2337                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
2338 #endif
2339                 init_waitqueue_head (&kss->kss_waitq);
2340         }
2341
2342         /* NB we have to wait to be told our true NID... */
2343         process_id.pid = requested_pid; 
2344         process_id.nid = 0;
2345         
2346         rc = lib_init(&ksocknal_lib, nal, process_id,
2347                       requested_limits, actual_limits);
2348         if (rc != PTL_OK) {
2349                 CERROR("lib_init failed: error %d\n", rc);
2350                 ksocknal_api_shutdown (nal);
2351                 return (rc);
2352         }
2353
2354         ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
2355
2356         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2357                 rc = ksocknal_thread_start (ksocknal_scheduler,
2358                                             &ksocknal_data.ksnd_schedulers[i]);
2359                 if (rc != 0) {
2360                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2361                                i, rc);
2362                         ksocknal_api_shutdown (nal);
2363                         return (rc);
2364                 }
2365         }
2366
2367         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
2368                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
2369                 if (rc != 0) {
2370                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
2371                         ksocknal_api_shutdown (nal);
2372                         return (rc);
2373                 }
2374         }
2375
2376         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2377         if (rc != 0) {
2378                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2379                 ksocknal_api_shutdown (nal);
2380                 return (rc);
2381         }
2382
2383         rc = kpr_register(&ksocknal_data.ksnd_router,
2384                           &ksocknal_router_interface);
2385         if (rc != 0) {
2386                 CDEBUG(D_NET, "Can't initialise routing interface "
2387                        "(rc = %d): not routing\n", rc);
2388         } else {
2389                 /* Only allocate forwarding buffers if there's a router */
2390
2391                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
2392                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
2393                         ksock_fmb_t      *fmb;
2394                         ksock_fmb_pool_t *pool;
2395                         
2396
2397                         if (i < SOCKNAL_SMALL_FWD_NMSGS)
2398                                 pool = &ksocknal_data.ksnd_small_fmp;
2399                         else
2400                                 pool = &ksocknal_data.ksnd_large_fmp;
2401                         
2402                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t, 
2403                                                    fmb_kiov[pool->fmp_buff_pages]));
2404                         if (fmb == NULL) {
2405                                 ksocknal_api_shutdown(nal);
2406                                 return (-ENOMEM);
2407                         }
2408
2409                         fmb->fmb_pool = pool;
2410                         
2411                         for (j = 0; j < pool->fmp_buff_pages; j++) {
2412                                 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
2413
2414                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
2415                                         ksocknal_api_shutdown (nal);
2416                                         return (-ENOMEM);
2417                                 }
2418
2419                                 LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
2420                         }
2421
2422                         list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
2423                 }
2424         }
2425
2426         rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
2427         if (rc != 0) {
2428                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
2429                 ksocknal_api_shutdown (nal);
2430                 return (rc);
2431         }
2432
2433         /* flag everything initialised */
2434         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2435
2436         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
2437                "(Routing %s, initial mem %d, incarnation "LPX64")\n",
2438                kpr_routing (&ksocknal_data.ksnd_router) ?
2439                "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
2440
2441         return (0);
2442 }
2443
2444 void __exit
2445 ksocknal_module_fini (void)
2446 {
2447 #ifdef CONFIG_SYSCTL
2448         if (ksocknal_tunables.ksnd_sysctl != NULL)
2449                 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
2450 #endif
2451         PtlNIFini(ksocknal_ni);
2452
2453         ptl_unregister_nal(SOCKNAL);
2454 }
2455
2456 int __init
2457 ksocknal_module_init (void)
2458 {
2459         int    rc;
2460
2461         /* packet descriptor must fit in a router descriptor's scratchpad */
2462         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
2463         /* the following must be sizeof(int) for proc_dointvec() */
2464         LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
2465         LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
2466         LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
2467         LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
2468         LASSERT(sizeof (ksocknal_tunables.ksnd_buffer_size) == sizeof (int));
2469         LASSERT(sizeof (ksocknal_tunables.ksnd_nagle) == sizeof (int));
2470         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_idle) == sizeof (int));
2471         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_count) == sizeof (int));
2472         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_intvl) == sizeof (int));
2473 #if CPU_AFFINITY
2474         LASSERT(sizeof (ksocknal_tunables.ksnd_irq_affinity) == sizeof (int));
2475 #endif
2476 #if SOCKNAL_ZC
2477         LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
2478 #endif
2479         /* check ksnr_connected/connecting field large enough */
2480         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
2481         
2482         ksocknal_api.nal_ni_init = ksocknal_api_startup;
2483         ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
2484
2485         /* Initialise dynamic tunables to defaults once only */
2486         ksocknal_tunables.ksnd_io_timeout      = SOCKNAL_IO_TIMEOUT;
2487         ksocknal_tunables.ksnd_eager_ack       = SOCKNAL_EAGER_ACK;
2488         ksocknal_tunables.ksnd_typed_conns     = SOCKNAL_TYPED_CONNS;
2489         ksocknal_tunables.ksnd_min_bulk        = SOCKNAL_MIN_BULK;
2490         ksocknal_tunables.ksnd_buffer_size     = SOCKNAL_BUFFER_SIZE;
2491         ksocknal_tunables.ksnd_nagle           = SOCKNAL_NAGLE;
2492         ksocknal_tunables.ksnd_keepalive_idle  = SOCKNAL_KEEPALIVE_IDLE;
2493         ksocknal_tunables.ksnd_keepalive_count = SOCKNAL_KEEPALIVE_COUNT;
2494         ksocknal_tunables.ksnd_keepalive_intvl = SOCKNAL_KEEPALIVE_INTVL;
2495 #if CPU_AFFINITY
2496         ksocknal_tunables.ksnd_irq_affinity = SOCKNAL_IRQ_AFFINITY;
2497 #endif
2498 #if SOCKNAL_ZC
2499         ksocknal_tunables.ksnd_zc_min_frag  = SOCKNAL_ZC_MIN_FRAG;
2500 #endif
2501
2502         rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
2503         if (rc != PTL_OK) {
2504                 CERROR("Can't register SOCKNAL: %d\n", rc);
2505                 return (-ENOMEM);               /* or something... */
2506         }
2507
2508         /* Pure gateways want the NAL started up at module load time... */
2509         rc = PtlNIInit(SOCKNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &ksocknal_ni);
2510         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
2511                 ptl_unregister_nal(SOCKNAL);
2512                 return (-ENODEV);
2513         }
2514         
2515 #ifdef CONFIG_SYSCTL
2516         /* Press on regardless even if registering sysctl doesn't work */
2517         ksocknal_tunables.ksnd_sysctl = 
2518                 register_sysctl_table (ksocknal_top_ctl_table, 0);
2519 #endif
2520         return (0);
2521 }
2522
2523 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2524 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
2525 MODULE_LICENSE("GPL");
2526
2527 module_init(ksocknal_module_init);
2528 module_exit(ksocknal_module_fini);
2529