Whamcloud - gitweb
* lctl set_route <nid> <up/down> enables or disables particular portals
[fs/lustre-release.git] / lustre / portals / knals / socknal / socknal.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40         kprni_notify:     ksocknal_notify,
41 };
42
43 #define SOCKNAL_SYSCTL  200
44
45 #define SOCKNAL_SYSCTL_TIMEOUT     1
46 #define SOCKNAL_SYSCTL_EAGER_ACK   2
47 #define SOCKNAL_SYSCTL_ZERO_COPY   3
48
49 static ctl_table ksocknal_ctl_table[] = {
50         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
51          &ksocknal_data.ksnd_io_timeout, sizeof (int),
52          0644, NULL, &proc_dointvec},
53         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
54          &ksocknal_data.ksnd_eager_ack, sizeof (int),
55          0644, NULL, &proc_dointvec},
56 #if SOCKNAL_ZC
57         {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", 
58          &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
59          0644, NULL, &proc_dointvec},
60 #endif
61         { 0 }
62 };
63
64 static ctl_table ksocknal_top_ctl_table[] = {
65         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
66         { 0 }
67 };
68
69 int
70 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
71                        void *ret, size_t ret_len)
72 {
73         ksock_nal_data_t *k;
74         nal_cb_t *nal_cb;
75
76         k = nal->nal_data;
77         nal_cb = k->ksnd_nal_cb;
78
79         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
80         return PTL_OK;
81 }
82
83 int
84 ksocknal_api_shutdown(nal_t *nal, int ni)
85 {
86         CDEBUG (D_NET, "closing all connections\n");
87
88         ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
89         ksocknal_close_conn (PTL_NID_ANY, 0);
90         return PTL_OK;
91 }
92
93 void
94 ksocknal_api_yield(nal_t *nal)
95 {
96         our_cond_resched();
97         return;
98 }
99
100 void
101 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
102 {
103         ksock_nal_data_t *k;
104         nal_cb_t *nal_cb;
105
106         k = nal->nal_data;
107         nal_cb = k->ksnd_nal_cb;
108         nal_cb->cb_cli(nal_cb,flags);
109 }
110
111 void
112 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
113 {
114         ksock_nal_data_t *k;
115         nal_cb_t *nal_cb;
116
117         k = nal->nal_data;
118         nal_cb = k->ksnd_nal_cb;
119         nal_cb->cb_sti(nal_cb,flags);
120 }
121
122 nal_t *
123 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
124               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
125 {
126         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
127         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
128         return (&ksocknal_api);
129 }
130
131 /*
132  *  EXTRA functions follow
133  */
134
135 int
136 ksocknal_set_mynid(ptl_nid_t nid)
137 {
138         lib_ni_t *ni = &ksocknal_lib.ni;
139
140         /* FIXME: we have to do this because we call lib_init() at module
141          * insertion time, which is before we have 'mynid' available.  lib_init
142          * sets the NAL's nid, which it uses to tell other nodes where packets
143          * are coming from.  This is not a very graceful solution to this
144          * problem. */
145
146         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
147                nid, ni->nid);
148
149         ni->nid = nid;
150         return (0);
151 }
152
153 void
154 ksocknal_bind_irq (unsigned int irq)
155 {
156 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
157         int              bind;
158         unsigned long    flags;
159         char             cmdline[64];
160         ksock_irqinfo_t *info;
161         char            *argv[] = {"/bin/sh",
162                                    "-c",
163                                    cmdline,
164                                    NULL};
165         char            *envp[] = {"HOME=/",
166                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
167                                    NULL};
168
169         LASSERT (irq < NR_IRQS);
170         if (irq == 0)                           /* software NIC */
171                 return;
172
173         info = &ksocknal_data.ksnd_irqinfo[irq];
174
175         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
176
177         LASSERT (info->ksni_valid);
178         bind = !info->ksni_bound;
179         info->ksni_bound = 1;
180
181         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
182
183         if (!bind)                              /* bound already */
184                 return;
185
186         snprintf (cmdline, sizeof (cmdline),
187                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
188
189         printk (KERN_INFO "Binding irq %u to CPU %d with cmd: %s\n",
190                 irq, info->ksni_sched, cmdline);
191
192         /* FIXME: Find a better method of setting IRQ affinity...
193          */
194
195         call_usermodehelper (argv[0], argv, envp);
196 #endif
197 }
198
199 ksock_route_t *
200 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
201                        int nonagel, int xchange_nids, int irq_affinity, int eager)
202 {
203         ksock_route_t *route;
204
205         PORTAL_ALLOC (route, sizeof (*route));
206         if (route == NULL)
207                 return (NULL);
208
209         atomic_set (&route->ksnr_refcount, 1);
210         route->ksnr_sharecount = 0;
211         route->ksnr_peer = NULL;
212         route->ksnr_timeout = jiffies;
213         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
214         route->ksnr_ipaddr = ipaddr;
215         route->ksnr_port = port;
216         route->ksnr_buffer_size = buffer_size;
217         route->ksnr_irq_affinity = irq_affinity;
218         route->ksnr_xchange_nids = xchange_nids;
219         route->ksnr_nonagel = nonagel;
220         route->ksnr_eager = eager;
221         route->ksnr_connecting = 0;
222         route->ksnr_deleted = 0;
223         route->ksnr_generation = 0;
224         route->ksnr_conn = NULL;
225
226         return (route);
227 }
228
229 void
230 ksocknal_destroy_route (ksock_route_t *route)
231 {
232         LASSERT (route->ksnr_sharecount == 0);
233         LASSERT (route->ksnr_conn == NULL);
234
235         if (route->ksnr_peer != NULL)
236                 ksocknal_put_peer (route->ksnr_peer);
237
238         PORTAL_FREE (route, sizeof (*route));
239 }
240
241 void
242 ksocknal_put_route (ksock_route_t *route)
243 {
244         CDEBUG (D_OTHER, "putting route[%p] -> "LPX64" (%d)\n",
245                 route, route->ksnr_peer->ksnp_nid,
246                 atomic_read (&route->ksnr_refcount));
247
248         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
249         if (!atomic_dec_and_test (&route->ksnr_refcount))
250              return;
251
252         ksocknal_destroy_route (route);
253 }
254
255 ksock_peer_t *
256 ksocknal_create_peer (ptl_nid_t nid)
257 {
258         ksock_peer_t *peer;
259
260         LASSERT (nid != PTL_NID_ANY);
261
262         PORTAL_ALLOC (peer, sizeof (*peer));
263         if (peer == NULL)
264                 return (NULL);
265
266         memset (peer, 0, sizeof (*peer));
267
268         peer->ksnp_nid = nid;
269         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
270         peer->ksnp_closing = 0;
271         INIT_LIST_HEAD (&peer->ksnp_conns);
272         INIT_LIST_HEAD (&peer->ksnp_routes);
273         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
274
275         /* Can't unload while peers exist; ensures all I/O has terminated
276          * before unload attempts */
277         PORTAL_MODULE_USE;
278         atomic_inc (&ksocknal_data.ksnd_npeers);
279         return (peer);
280 }
281
282 void
283 ksocknal_destroy_peer (ksock_peer_t *peer)
284 {
285         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
286
287         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
288         LASSERT (list_empty (&peer->ksnp_conns));
289         LASSERT (list_empty (&peer->ksnp_routes));
290         LASSERT (list_empty (&peer->ksnp_tx_queue));
291
292         PORTAL_FREE (peer, sizeof (*peer));
293
294         /* NB a peer's connections and autoconnect routes keep a reference
295          * on their peer until they are destroyed, so we can be assured
296          * that _all_ state to do with this peer has been cleaned up when
297          * its refcount drops to zero. */
298         atomic_dec (&ksocknal_data.ksnd_npeers);
299         PORTAL_MODULE_UNUSE;
300 }
301
302 void
303 ksocknal_put_peer (ksock_peer_t *peer)
304 {
305         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
306                 peer, peer->ksnp_nid,
307                 atomic_read (&peer->ksnp_refcount));
308
309         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
310         if (!atomic_dec_and_test (&peer->ksnp_refcount))
311                 return;
312
313         ksocknal_destroy_peer (peer);
314 }
315
316 ksock_peer_t *
317 ksocknal_find_peer_locked (ptl_nid_t nid)
318 {
319         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
320         struct list_head *tmp;
321         ksock_peer_t     *peer;
322
323         list_for_each (tmp, peer_list) {
324
325                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
326
327                 LASSERT (!peer->ksnp_closing);
328                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
329                            list_empty (&peer->ksnp_conns)));
330
331                 if (peer->ksnp_nid != nid)
332                         continue;
333
334                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
335                        peer, nid, atomic_read (&peer->ksnp_refcount));
336                 return (peer);
337         }
338         return (NULL);
339 }
340
341 ksock_peer_t *
342 ksocknal_get_peer (ptl_nid_t nid)
343 {
344         ksock_peer_t     *peer;
345
346         read_lock (&ksocknal_data.ksnd_global_lock);
347         peer = ksocknal_find_peer_locked (nid);
348         if (peer != NULL)                       /* +1 ref for caller? */
349                 atomic_inc (&peer->ksnp_refcount);
350         read_unlock (&ksocknal_data.ksnd_global_lock);
351
352         return (peer);
353 }
354
355 void
356 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
357 {
358         LASSERT (!peer->ksnp_closing);
359         peer->ksnp_closing = 1;
360         list_del (&peer->ksnp_list);
361         /* lose peerlist's ref */
362         ksocknal_put_peer (peer);
363 }
364
365 ksock_route_t *
366 ksocknal_get_route_by_idx (int index)
367 {
368         ksock_peer_t      *peer;
369         struct list_head  *ptmp;
370         ksock_route_t     *route;
371         struct list_head  *rtmp;
372         int                i;
373
374         read_lock (&ksocknal_data.ksnd_global_lock);
375
376         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
377                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
378                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
379
380                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
381                                    list_empty (&peer->ksnp_conns)));
382
383                         list_for_each (rtmp, &peer->ksnp_routes) {
384                                 if (index-- > 0)
385                                         continue;
386
387                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
388                                 atomic_inc (&route->ksnr_refcount);
389                                 read_unlock (&ksocknal_data.ksnd_global_lock);
390                                 return (route);
391                         }
392                 }
393         }
394
395         read_unlock (&ksocknal_data.ksnd_global_lock);
396         return (NULL);
397 }
398
399 int
400 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
401                     int nonagle, int xchange_nids, int bind_irq, 
402                     int share, int eager)
403 {
404         unsigned long      flags;
405         ksock_peer_t      *peer;
406         ksock_peer_t      *peer2;
407         ksock_route_t     *route;
408         struct list_head  *rtmp;
409         ksock_route_t     *route2;
410         
411         if (nid == PTL_NID_ANY)
412                 return (-EINVAL);
413
414         /* Have a brand new peer ready... */
415         peer = ksocknal_create_peer (nid);
416         if (peer == NULL)
417                 return (-ENOMEM);
418
419         route = ksocknal_create_route (ipaddr, port, bufnob, nonagle,
420                                        xchange_nids, bind_irq, eager);
421         if (route == NULL) {
422                 ksocknal_put_peer (peer);
423                 return (-ENOMEM);
424         }
425
426         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
427
428         peer2 = ksocknal_find_peer_locked (nid);
429         if (peer2 != NULL) {
430                 ksocknal_put_peer (peer);
431                 peer = peer2;
432         } else {
433                 /* peer table takes existing ref on peer */
434                 list_add (&peer->ksnp_list,
435                           ksocknal_nid2peerlist (nid));
436         }
437
438         route2 = NULL;
439         if (share) {
440                 /* check for existing route to this NID via this ipaddr */
441                 list_for_each (rtmp, &peer->ksnp_routes) {
442                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
443                         
444                         if (route2->ksnr_ipaddr == ipaddr)
445                                 break;
446
447                         route2 = NULL;
448                 }
449         }
450
451         if (route2 != NULL) {
452                 ksocknal_put_route (route);
453                 route = route2;
454         } else {
455                 /* route takes a ref on peer */
456                 route->ksnr_peer = peer;
457                 atomic_inc (&peer->ksnp_refcount);
458                 /* peer's route list takes existing ref on route */
459                 list_add (&route->ksnr_list, &peer->ksnp_routes);
460         }
461         
462         route->ksnr_sharecount++;
463
464         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
465
466         return (0);
467 }
468
469 void
470 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
471 {
472         ksock_peer_t *peer = route->ksnr_peer;
473         ksock_conn_t *conn = route->ksnr_conn;
474
475         if (!share)
476                 route->ksnr_sharecount = 0;
477         else {
478                 route->ksnr_sharecount--;
479                 if (route->ksnr_sharecount != 0)
480                         return;
481         }
482
483         if (conn != NULL) {
484                 if (!keep_conn)
485                         ksocknal_close_conn_locked (conn, 0);
486                 else {
487                         /* keeping the conn; just dissociate it and route... */
488                         conn->ksnc_route = NULL;
489                         route->ksnr_conn = NULL;
490                         ksocknal_put_route (route); /* drop conn's ref on route */
491                         ksocknal_put_conn (conn); /* drop route's ref on conn */
492                 }
493         }
494
495         route->ksnr_deleted = 1;
496         list_del (&route->ksnr_list);
497         ksocknal_put_route (route);             /* drop peer's ref */
498
499         if (list_empty (&peer->ksnp_routes) &&
500             list_empty (&peer->ksnp_conns)) {
501                 /* I've just removed the last autoconnect route of a peer
502                  * with no active connections */
503                 ksocknal_unlink_peer_locked (peer);
504         }
505 }
506
507 int
508 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
509 {
510         unsigned long      flags;
511         struct list_head  *ptmp;
512         struct list_head  *pnxt;
513         ksock_peer_t      *peer;
514         struct list_head  *rtmp;
515         struct list_head  *rnxt;
516         ksock_route_t     *route;
517         int                lo;
518         int                hi;
519         int                i;
520         int                rc = -ENOENT;
521
522         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
523
524         if (nid != PTL_NID_ANY)
525                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
526         else {
527                 lo = 0;
528                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
529         }
530
531         for (i = lo; i <= hi; i++) {
532                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
533                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
534
535                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
536                                 continue;
537
538                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
539                                 route = list_entry (rtmp, ksock_route_t,
540                                                     ksnr_list);
541
542                                 if (!(ipaddr == 0 ||
543                                       route->ksnr_ipaddr == ipaddr))
544                                         continue;
545
546                                 ksocknal_del_route_locked (route, share, keep_conn);
547                                 rc = 0;         /* matched something */
548                                 if (share)
549                                         goto out;
550                         }
551                 }
552         }
553  out:
554         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
555
556         return (rc);
557 }
558
559 ksock_conn_t *
560 ksocknal_get_conn_by_idx (int index)
561 {
562         ksock_peer_t      *peer;
563         struct list_head  *ptmp;
564         ksock_conn_t      *conn;
565         struct list_head  *ctmp;
566         int                i;
567
568         read_lock (&ksocknal_data.ksnd_global_lock);
569
570         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
571                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
572                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
573
574                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
575                                    list_empty (&peer->ksnp_conns)));
576
577                         list_for_each (ctmp, &peer->ksnp_conns) {
578                                 if (index-- > 0)
579                                         continue;
580
581                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
582                                 atomic_inc (&conn->ksnc_refcount);
583                                 read_unlock (&ksocknal_data.ksnd_global_lock);
584                                 return (conn);
585                         }
586                 }
587         }
588
589         read_unlock (&ksocknal_data.ksnd_global_lock);
590         return (NULL);
591 }
592
593 void
594 ksocknal_get_peer_addr (ksock_conn_t *conn)
595 {
596         struct sockaddr_in sin;
597         int                len = sizeof (sin);
598         int                rc;
599         
600         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
601                                             (struct sockaddr *)&sin, &len, 2);
602         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
603         LASSERT (!conn->ksnc_closing);
604         LASSERT (len <= sizeof (sin));
605
606         if (rc != 0) {
607                 CERROR ("Error %d getting sock peer IP\n", rc);
608                 return;
609         }
610
611         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
612         conn->ksnc_port   = ntohs (sin.sin_port);
613 }
614
615 unsigned int
616 ksocknal_conn_irq (ksock_conn_t *conn)
617 {
618         int                irq = 0;
619         struct dst_entry  *dst;
620
621         dst = sk_dst_get (conn->ksnc_sock->sk);
622         if (dst != NULL) {
623                 if (dst->dev != NULL) {
624                         irq = dst->dev->irq;
625                         if (irq >= NR_IRQS) {
626                                 CERROR ("Unexpected IRQ %x\n", irq);
627                                 irq = 0;
628                         }
629                 }
630                 dst_release (dst);
631         }
632         
633         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
634         LASSERT (!conn->ksnc_closing);
635         return (irq);
636 }
637
638 ksock_sched_t *
639 ksocknal_choose_scheduler_locked (unsigned int irq)
640 {
641         ksock_sched_t    *sched;
642         ksock_irqinfo_t  *info;
643         int               i;
644
645         LASSERT (irq < NR_IRQS);
646         info = &ksocknal_data.ksnd_irqinfo[irq];
647
648         if (irq != 0 &&                         /* hardware NIC */
649             info->ksni_valid) {                 /* already set up */
650                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
651         }
652
653         /* software NIC (irq == 0) || not associated with a scheduler yet.
654          * Choose the CPU with the fewest connections... */
655         sched = &ksocknal_data.ksnd_schedulers[0];
656         for (i = 1; i < SOCKNAL_N_SCHED; i++)
657                 if (sched->kss_nconns >
658                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
659                         sched = &ksocknal_data.ksnd_schedulers[i];
660
661         if (irq != 0) {                         /* Hardware NIC */
662                 info->ksni_valid = 1;
663                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
664
665                 /* no overflow... */
666                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
667         }
668
669         return (sched);
670 }
671
672 int
673 ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
674                       struct socket *sock, int bind_irq)
675 {
676         unsigned long      flags;
677         ksock_conn_t      *conn;
678         ksock_peer_t      *peer;
679         ksock_peer_t      *peer2;
680         ksock_sched_t     *sched;
681         unsigned int       irq;
682         ksock_tx_t        *tx;
683         int                rc;
684
685         /* NB, sock has an associated file since (a) this connection might
686          * have been created in userland and (b) we need to refcount the
687          * socket so that we don't close it while I/O is being done on
688          * it, and sock->file has that pre-cooked... */
689         LASSERT (sock->file != NULL);
690         LASSERT (file_count(sock->file) > 0);
691
692         rc = ksocknal_setup_sock (sock);
693         if (rc != 0)
694                 return (rc);
695
696         peer = NULL;
697         if (route == NULL) {                    /* not autoconnect */
698                 /* Assume this socket connects to a brand new peer */
699                 peer = ksocknal_create_peer (nid);
700                 if (peer == NULL)
701                         return (-ENOMEM);
702         }
703
704         PORTAL_ALLOC(conn, sizeof(*conn));
705         if (conn == NULL) {
706                 if (peer != NULL)
707                         ksocknal_put_peer (peer);
708                 return (-ENOMEM);
709         }
710
711         memset (conn, 0, sizeof (*conn));
712         conn->ksnc_peer = NULL;
713         conn->ksnc_route = NULL;
714         conn->ksnc_sock = sock;
715         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
716         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
717         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
718
719         conn->ksnc_rx_ready = 0;
720         conn->ksnc_rx_scheduled = 0;
721         ksocknal_new_packet (conn, 0);
722
723         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
724         conn->ksnc_tx_ready = 0;
725         conn->ksnc_tx_scheduled = 0;
726         atomic_set (&conn->ksnc_tx_nob, 0);
727
728         ksocknal_get_peer_addr (conn);
729
730         irq = ksocknal_conn_irq (conn);
731
732         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
733
734         if (route != NULL) {
735                 /* Autoconnected! */
736                 LASSERT (route->ksnr_conn == NULL && route->ksnr_connecting);
737
738                 if (route->ksnr_deleted) {
739                         /* This conn was autoconnected, but the autoconnect
740                          * route got deleted while it was being
741                          * established! */
742                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
743                                                  flags);
744                         PORTAL_FREE (conn, sizeof (*conn));
745                         return (-ESTALE);
746                 }
747
748
749                 /* associate conn/route for auto-reconnect */
750                 route->ksnr_conn = conn;
751                 atomic_inc (&conn->ksnc_refcount);
752                 conn->ksnc_route = route;
753                 atomic_inc (&route->ksnr_refcount);
754                 route->ksnr_connecting = 0;
755
756                 route->ksnr_generation++;
757                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
758
759                 peer = route->ksnr_peer;
760         } else {
761                 /* Not an autoconnected connection; see if there is an
762                  * existing peer for this NID */
763                 peer2 = ksocknal_find_peer_locked (nid);
764                 if (peer2 != NULL) {
765                         ksocknal_put_peer (peer);
766                         peer = peer2;
767                 } else {
768                         list_add (&peer->ksnp_list,
769                                   ksocknal_nid2peerlist (nid));
770                         /* peer list takes over existing ref */
771                 }
772         }
773
774         LASSERT (!peer->ksnp_closing);
775
776         conn->ksnc_peer = peer;
777         atomic_inc (&peer->ksnp_refcount);
778         peer->ksnp_last_alive = jiffies;
779         peer->ksnp_error = 0;
780
781         list_add (&conn->ksnc_list, &peer->ksnp_conns);
782         atomic_inc (&conn->ksnc_refcount);
783
784         sched = ksocknal_choose_scheduler_locked (irq);
785         sched->kss_nconns++;
786         conn->ksnc_scheduler = sched;
787
788         /* NB my callbacks block while I hold ksnd_global_lock */
789         sock->sk->sk_user_data = conn;
790         sock->sk->sk_data_ready = ksocknal_data_ready;
791         sock->sk->sk_write_space = ksocknal_write_space;
792
793         /* Take all the packets blocking for a connection.
794          * NB, it might be nicer to share these blocked packets among any
795          * other connections that are becoming established, however that
796          * confuses the normal packet launching operation, which selects a
797          * connection and queues the packet on it without needing an
798          * exclusive lock on ksnd_global_lock. */
799         while (!list_empty (&peer->ksnp_tx_queue)) {
800                 tx = list_entry (peer->ksnp_tx_queue.next,
801                                  ksock_tx_t, tx_list);
802
803                 list_del (&tx->tx_list);
804                 ksocknal_queue_tx_locked (tx, conn);
805         }
806
807         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
808
809         if (bind_irq)                           /* irq binding required */
810                 ksocknal_bind_irq (irq);
811
812         /* Call the callbacks right now to get things going. */
813         ksocknal_data_ready (sock->sk, 0);
814         ksocknal_write_space (sock->sk);
815
816         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
817                conn, conn->ksnc_peer->ksnp_nid);
818
819         ksocknal_put_conn (conn);
820         return (0);
821 }
822
823 void
824 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
825 {
826         /* This just does the immmediate housekeeping, and queues the
827          * connection for the reaper to terminate. 
828          * Caller holds ksnd_global_lock exclusively in irq context */
829         ksock_peer_t   *peer = conn->ksnc_peer;
830         ksock_route_t  *route;
831
832         LASSERT (peer->ksnp_error == 0);
833         LASSERT (!conn->ksnc_closing);
834         conn->ksnc_closing = 1;
835         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
836         
837         route = conn->ksnc_route;
838         if (route != NULL) {
839                 /* dissociate conn from route... */
840                 LASSERT (!route->ksnr_connecting &&
841                          !route->ksnr_deleted);
842
843                 route->ksnr_conn = NULL;
844                 conn->ksnc_route = NULL;
845
846                 ksocknal_put_route (route);     /* drop conn's ref on route */
847                 ksocknal_put_conn (conn);       /* drop route's ref on conn */
848         }
849
850         /* ksnd_deathrow_conns takes over peer's ref */
851         list_del (&conn->ksnc_list);
852
853         if (list_empty (&peer->ksnp_conns)) {
854                 /* No more connections to this peer */
855
856                 peer->ksnp_error = error;       /* stash last conn close reason */
857
858                 if (list_empty (&peer->ksnp_routes)) {
859                         /* I've just closed last conn belonging to a
860                          * non-autoconnecting peer */
861                         ksocknal_unlink_peer_locked (peer);
862                 }
863         }
864
865         spin_lock (&ksocknal_data.ksnd_reaper_lock);
866
867         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
868         if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
869                 wake_up (&ksocknal_data.ksnd_reaper_waitq);
870                 
871         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
872 }
873
874 int
875 ksocknal_close_conn_unlocked (ksock_conn_t *conn, int why) 
876 {
877         unsigned long flags;
878         int           did_it = 0;
879         
880         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
881                 
882         if (!conn->ksnc_closing) {
883                 did_it = 1;
884                 ksocknal_close_conn_locked (conn, why);
885         }
886         
887         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
888
889         return (did_it);
890 }
891
892 void
893 ksocknal_terminate_conn (ksock_conn_t *conn)
894 {
895         /* This gets called by the reaper (guaranteed thread context) to
896          * disengage the socket from its callbacks and close it.
897          * ksnc_refcount will eventually hit zero, and then the reaper will
898          * destroy it. */
899         unsigned long   flags;
900         ksock_peer_t   *peer = conn->ksnc_peer;
901         struct timeval  now;
902         time_t          then = 0;
903         int             notify = 0;
904
905         /* serialise with callbacks */
906         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
907
908         LASSERT (conn->ksnc_closing);
909         
910         /* Remove conn's network callbacks.
911          * NB I _have_ to restore the callback, rather than storing a noop,
912          * since the socket could survive past this module being unloaded!! */
913         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
914         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
915
916         /* A callback could be in progress already; they hold a read lock
917          * on ksnd_global_lock (to serialise with me) and NOOP if
918          * sk_user_data is NULL. */
919         conn->ksnc_sock->sk->sk_user_data = NULL;
920
921         conn->ksnc_scheduler->kss_nconns--;
922
923         if (peer->ksnp_error != 0) {
924                 /* peer's last conn closed in error */
925                 LASSERT (list_empty (&peer->ksnp_conns));
926                 
927                 /* convert peer's last-known-alive timestamp from jiffies */
928                 do_gettimeofday (&now);
929                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
930                 notify = 1;
931         }
932         
933         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
934
935         /* The socket is closed on the final put; either here, or in
936          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
937          * when the connection was established, this will close the socket
938          * immediately, aborting anything buffered in it. Any hung
939          * zero-copy transmits will therefore complete in finite time. */
940         ksocknal_putconnsock (conn);
941
942         if (notify)
943                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
944                             0, then);
945 }
946
947 void
948 ksocknal_destroy_conn (ksock_conn_t *conn)
949 {
950         /* Final coup-de-grace of the reaper */
951         CDEBUG (D_NET, "connection %p\n", conn);
952
953         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
954         LASSERT (conn->ksnc_route == NULL);
955         LASSERT (!conn->ksnc_tx_scheduled);
956         LASSERT (!conn->ksnc_rx_scheduled);
957
958         /* complete queued packets */
959         while (!list_empty (&conn->ksnc_tx_queue)) {
960                 ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
961                                              ksock_tx_t, tx_list);
962
963                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
964                         NTOH__u32 (tx->tx_hdr->type),
965                         NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
966                         NTOH__u64 (tx->tx_hdr->src_nid),
967                         NTOH__u64 (tx->tx_hdr->dest_nid));
968
969                 list_del (&tx->tx_list);
970                 ksocknal_tx_done (tx, 0);
971         }
972
973         /* complete current receive if any */
974         switch (conn->ksnc_rx_state) {
975         case SOCKNAL_RX_BODY:
976                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
977                 break;
978         case SOCKNAL_RX_BODY_FWD:
979                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
980                 break;
981         case SOCKNAL_RX_HEADER:
982         case SOCKNAL_RX_SLOP:
983                 break;
984         default:
985                 LBUG ();
986                 break;
987         }
988
989         ksocknal_put_peer (conn->ksnc_peer);
990
991         PORTAL_FREE (conn, sizeof (*conn));
992         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
993 }
994
995 void
996 ksocknal_put_conn (ksock_conn_t *conn)
997 {
998         unsigned long flags;
999
1000         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1001                 conn, conn->ksnc_peer->ksnp_nid,
1002                 atomic_read (&conn->ksnc_refcount));
1003
1004         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1005         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1006                 return;
1007
1008         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1009
1010         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1011         if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
1012                 wake_up (&ksocknal_data.ksnd_reaper_waitq);
1013
1014         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1015 }
1016
1017 int
1018 ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
1019 {
1020         unsigned long       flags;
1021         ksock_conn_t       *conn;
1022         struct list_head   *ctmp;
1023         struct list_head   *cnxt;
1024         ksock_peer_t       *peer;
1025         struct list_head   *ptmp;
1026         struct list_head   *pnxt;
1027         int                 lo;
1028         int                 hi;
1029         int                 i;
1030         int                 rc = -ENOENT;
1031
1032         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1033
1034         if (nid != PTL_NID_ANY)
1035                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1036         else {
1037                 lo = 0;
1038                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1039         }
1040
1041         for (i = lo; i <= hi; i++) {
1042                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1043
1044                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1045
1046                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1047                                 continue;
1048
1049                         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1050
1051                                 conn = list_entry (ctmp, ksock_conn_t,
1052                                                    ksnc_list);
1053
1054                                 if (!(ipaddr == 0 ||
1055                                       conn->ksnc_ipaddr == ipaddr))
1056                                         continue;
1057
1058                                 rc = 0;
1059                                 ksocknal_close_conn_locked (conn, 0);
1060                         }
1061                 }
1062         }
1063
1064         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1065
1066         return (rc);
1067 }
1068
1069 void
1070 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1071 {
1072         /* The router is telling me she's been notified of a change in
1073          * gateway state.... */
1074
1075         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1076
1077         if (!alive) {
1078                 /* If the gateway crashed, close all open connections... */
1079                 ksocknal_close_conn (gw_nid, 0);
1080                 return;
1081         }
1082         
1083         /* ...otherwise do nothing.  We can only establish new connections
1084          * if we have autroutes, and these connect on demand. */
1085 }
1086
1087 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1088 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1089 {
1090         return &(sk->tp_pinfo.af_tcp);
1091 }
1092 #else
1093 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1094 {
1095         struct tcp_sock *s = (struct tcp_sock *)sk;
1096         return &s->tcp;
1097 }
1098 #endif
1099
1100 void
1101 ksocknal_push_conn (ksock_conn_t *conn)
1102 {
1103         struct sock    *sk;
1104         struct tcp_opt *tp;
1105         int             nonagle;
1106         int             val = 1;
1107         int             rc;
1108         mm_segment_t    oldmm;
1109
1110         rc = ksocknal_getconnsock (conn);
1111         if (rc != 0)                            /* being shut down */
1112                 return;
1113         
1114         sk = conn->ksnc_sock->sk;
1115         tp = sock2tcp_opt(sk);
1116         
1117         lock_sock (sk);
1118         nonagle = tp->nonagle;
1119         tp->nonagle = 1;
1120         release_sock (sk);
1121
1122         oldmm = get_fs ();
1123         set_fs (KERNEL_DS);
1124
1125         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1126                                       (char *)&val, sizeof (val));
1127         LASSERT (rc == 0);
1128
1129         set_fs (oldmm);
1130
1131         lock_sock (sk);
1132         tp->nonagle = nonagle;
1133         release_sock (sk);
1134
1135         ksocknal_putconnsock (conn);
1136 }
1137
1138 void
1139 ksocknal_push_peer (ksock_peer_t *peer)
1140 {
1141         int               index;
1142         int               i;
1143         struct list_head *tmp;
1144         ksock_conn_t     *conn;
1145
1146         for (index = 0; ; index++) {
1147                 read_lock (&ksocknal_data.ksnd_global_lock);
1148
1149                 i = 0;
1150                 conn = NULL;
1151
1152                 list_for_each (tmp, &peer->ksnp_conns) {
1153                         if (i++ == index) {
1154                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1155                                 atomic_inc (&conn->ksnc_refcount);
1156                                 break;
1157                         }
1158                 }
1159
1160                 read_unlock (&ksocknal_data.ksnd_global_lock);
1161
1162                 if (conn == NULL)
1163                         break;
1164
1165                 ksocknal_push_conn (conn);
1166                 ksocknal_put_conn (conn);
1167         }
1168 }
1169
1170 int
1171 ksocknal_push (ptl_nid_t nid)
1172 {
1173         ksock_peer_t      *peer;
1174         struct list_head  *tmp;
1175         int                index;
1176         int                i;
1177         int                j;
1178         int                rc = -ENOENT;
1179
1180         if (nid != PTL_NID_ANY) {
1181                 peer = ksocknal_get_peer (nid);
1182
1183                 if (peer != NULL) {
1184                         rc = 0;
1185                         ksocknal_push_peer (peer);
1186                         ksocknal_put_peer (peer);
1187                 }
1188                 return (rc);
1189         }
1190
1191         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1192                 for (j = 0; ; j++) {
1193                         read_lock (&ksocknal_data.ksnd_global_lock);
1194
1195                         index = 0;
1196                         peer = NULL;
1197
1198                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1199                                 if (index++ == j) {
1200                                         peer = list_entry(tmp, ksock_peer_t,
1201                                                           ksnp_list);
1202                                         atomic_inc (&peer->ksnp_refcount);
1203                                         break;
1204                                 }
1205                         }
1206
1207                         read_unlock (&ksocknal_data.ksnd_global_lock);
1208
1209                         if (peer != NULL) {
1210                                 rc = 0;
1211                                 ksocknal_push_peer (peer);
1212                                 ksocknal_put_peer (peer);
1213                         }
1214                 }
1215
1216         }
1217
1218         return (rc);
1219 }
1220
1221 int
1222 ksocknal_cmd(struct portal_ioctl_data * data, void * private)
1223 {
1224         int rc = -EINVAL;
1225
1226         LASSERT (data != NULL);
1227
1228         switch(data->ioc_nal_cmd) {
1229         case NAL_CMD_GET_AUTOCONN: {
1230                 ksock_route_t *route = ksocknal_get_route_by_idx (data->ioc_count);
1231
1232                 if (route == NULL)
1233                         rc = -ENOENT;
1234                 else {
1235                         rc = 0;
1236                         data->ioc_nid   = route->ksnr_peer->ksnp_nid;
1237                         data->ioc_id    = route->ksnr_ipaddr;
1238                         data->ioc_misc  = route->ksnr_port;
1239                         data->ioc_count = route->ksnr_generation;
1240                         data->ioc_size  = route->ksnr_buffer_size;
1241                         data->ioc_wait  = route->ksnr_sharecount;
1242                         data->ioc_flags = (route->ksnr_nonagel      ? 1 : 0) |
1243                                           (route->ksnr_xchange_nids ? 2 : 0) |
1244                                           (route->ksnr_irq_affinity ? 4 : 0) |
1245                                           (route->ksnr_eager        ? 8 : 0);
1246                         ksocknal_put_route (route);
1247                 }
1248                 break;
1249         }
1250         case NAL_CMD_ADD_AUTOCONN: {
1251                 rc = ksocknal_add_route (data->ioc_nid, data->ioc_id,
1252                                          data->ioc_misc, data->ioc_size,
1253                                          (data->ioc_flags & 0x01) != 0,
1254                                          (data->ioc_flags & 0x02) != 0,
1255                                          (data->ioc_flags & 0x04) != 0,
1256                                          (data->ioc_flags & 0x08) != 0,
1257                                          (data->ioc_flags & 0x10) != 0);
1258                 break;
1259         }
1260         case NAL_CMD_DEL_AUTOCONN: {
1261                 rc = ksocknal_del_route (data->ioc_nid, data->ioc_id, 
1262                                          (data->ioc_flags & 1) != 0,
1263                                          (data->ioc_flags & 2) != 0);
1264                 break;
1265         }
1266         case NAL_CMD_GET_CONN: {
1267                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (data->ioc_count);
1268
1269                 if (conn == NULL)
1270                         rc = -ENOENT;
1271                 else {
1272                         rc = 0;
1273                         data->ioc_nid  = conn->ksnc_peer->ksnp_nid;
1274                         data->ioc_id   = conn->ksnc_ipaddr;
1275                         data->ioc_misc = conn->ksnc_port;
1276                         ksocknal_put_conn (conn);
1277                 }
1278                 break;
1279         }
1280         case NAL_CMD_REGISTER_PEER_FD: {
1281                 struct socket *sock = sockfd_lookup (data->ioc_fd, &rc);
1282
1283                 if (sock != NULL) {
1284                         rc = ksocknal_create_conn (data->ioc_nid, NULL,
1285                                                    sock, data->ioc_flags);
1286                         if (rc != 0)
1287                                 fput (sock->file);
1288                 }
1289                 break;
1290         }
1291         case NAL_CMD_CLOSE_CONNECTION: {
1292                 rc = ksocknal_close_conn (data->ioc_nid, data->ioc_id);
1293                 break;
1294         }
1295         case NAL_CMD_REGISTER_MYNID: {
1296                 rc = ksocknal_set_mynid (data->ioc_nid);
1297                 break;
1298         }
1299         case NAL_CMD_PUSH_CONNECTION: {
1300                 rc = ksocknal_push (data->ioc_nid);
1301                 break;
1302         }
1303         }
1304
1305         return rc;
1306 }
1307
1308 void
1309 ksocknal_free_buffers (void)
1310 {
1311         if (ksocknal_data.ksnd_fmbs != NULL) {
1312                 ksock_fmb_t *fmb = (ksock_fmb_t *)ksocknal_data.ksnd_fmbs;
1313                 int          i;
1314                 int          j;
1315
1316                 for (i = 0;
1317                      i < (SOCKNAL_SMALL_FWD_NMSGS + SOCKNAL_LARGE_FWD_NMSGS);
1318                      i++, fmb++)
1319                         for (j = 0; j < fmb->fmb_npages; j++)
1320                                 if (fmb->fmb_pages[j] != NULL)
1321                                         __free_page (fmb->fmb_pages[j]);
1322
1323                 PORTAL_FREE (ksocknal_data.ksnd_fmbs,
1324                              sizeof (ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1325                                                      SOCKNAL_LARGE_FWD_NMSGS));
1326         }
1327
1328         LASSERT (ksocknal_data.ksnd_active_ltxs == 0);
1329         if (ksocknal_data.ksnd_ltxs != NULL)
1330                 PORTAL_FREE (ksocknal_data.ksnd_ltxs,
1331                              sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS +
1332                                                      SOCKNAL_NNBLK_LTXS));
1333
1334         if (ksocknal_data.ksnd_schedulers != NULL)
1335                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1336                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1337
1338         PORTAL_FREE (ksocknal_data.ksnd_peers,
1339                      sizeof (struct list_head) * 
1340                      ksocknal_data.ksnd_peer_hash_size);
1341 }
1342
1343 void /*__exit*/
1344 ksocknal_module_fini (void)
1345 {
1346         int   i;
1347
1348         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1349                atomic_read (&portal_kmemory));
1350
1351         switch (ksocknal_data.ksnd_init) {
1352         default:
1353                 LASSERT (0);
1354
1355         case SOCKNAL_INIT_ALL:
1356 #if CONFIG_SYSCTL
1357                 if (ksocknal_data.ksnd_sysctl != NULL)
1358                         unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1359 #endif
1360                 kportal_nal_unregister(SOCKNAL);
1361                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1362                 /* fall through */
1363
1364         case SOCKNAL_INIT_PTL:
1365                 PtlNIFini(ksocknal_ni);
1366                 lib_fini(&ksocknal_lib);
1367                 /* fall through */
1368
1369         case SOCKNAL_INIT_DATA:
1370                 /* Module refcount only gets to zero when all peers
1371                  * have been closed so all lists must be empty */
1372                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1373                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1374                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1375                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1376                 }
1377                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1378                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1379                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1380                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1381
1382                 if (ksocknal_data.ksnd_schedulers != NULL)
1383                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1384                                 ksock_sched_t *kss =
1385                                         &ksocknal_data.ksnd_schedulers[i];
1386
1387                                 LASSERT (list_empty (&kss->kss_tx_conns));
1388                                 LASSERT (list_empty (&kss->kss_rx_conns));
1389                                 LASSERT (kss->kss_nconns == 0);
1390                         }
1391
1392                 /* stop router calling me */
1393                 kpr_shutdown (&ksocknal_data.ksnd_router);
1394
1395                 /* flag threads to terminate; wake and wait for them to die */
1396                 ksocknal_data.ksnd_shuttingdown = 1;
1397                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1398                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1399
1400                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1401                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1402
1403                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1404                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1405                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1406                         set_current_state (TASK_UNINTERRUPTIBLE);
1407                         schedule_timeout (HZ);
1408                 }
1409
1410                 kpr_deregister (&ksocknal_data.ksnd_router);
1411
1412                 ksocknal_free_buffers();
1413                 /* fall through */
1414
1415         case SOCKNAL_INIT_NOTHING:
1416                 break;
1417         }
1418
1419         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1420                atomic_read (&portal_kmemory));
1421
1422         printk(KERN_INFO "Routing socket NAL unloaded (final mem %d)\n",
1423                atomic_read(&portal_kmemory));
1424 }
1425
1426
1427 int __init
1428 ksocknal_module_init (void)
1429 {
1430         int   pkmem = atomic_read(&portal_kmemory);
1431         int   rc;
1432         int   i;
1433         int   j;
1434
1435         /* packet descriptor must fit in a router descriptor's scratchpad */
1436         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1437         /* the following must be sizeof(int) for proc_dointvec() */
1438         LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1439         LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1440
1441         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1442
1443         ksocknal_api.forward  = ksocknal_api_forward;
1444         ksocknal_api.shutdown = ksocknal_api_shutdown;
1445         ksocknal_api.yield    = ksocknal_api_yield;
1446         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
1447         ksocknal_api.lock     = ksocknal_api_lock;
1448         ksocknal_api.unlock   = ksocknal_api_unlock;
1449         ksocknal_api.nal_data = &ksocknal_data;
1450
1451         ksocknal_lib.nal_data = &ksocknal_data;
1452
1453         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1454
1455         ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1456         ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1457 #if SOCKNAL_ZC
1458         ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1459 #endif
1460
1461         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1462         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1463                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1464         if (ksocknal_data.ksnd_peers == NULL)
1465                 RETURN (-ENOMEM);
1466
1467         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1468                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1469
1470         rwlock_init(&ksocknal_data.ksnd_global_lock);
1471
1472         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1473         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1474
1475         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1476         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1477         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1478
1479         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1480         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1481         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1482
1483         spin_lock_init(&ksocknal_data.ksnd_idle_ltx_lock);
1484         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_nblk_ltx_list);
1485         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_ltx_list);
1486         init_waitqueue_head(&ksocknal_data.ksnd_idle_ltx_waitq);
1487
1488         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1489         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1490         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1491         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1492
1493         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1494         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1495         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1496
1497         /* NB memset above zeros whole of ksocknal_data, including
1498          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1499
1500         /* flag lists/ptrs/locks initialised */
1501         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1502
1503         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1504                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1505         if (ksocknal_data.ksnd_schedulers == NULL) {
1506                 ksocknal_module_fini ();
1507                 RETURN(-ENOMEM);
1508         }
1509
1510         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1511                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1512
1513                 spin_lock_init (&kss->kss_lock);
1514                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1515                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1516 #if SOCKNAL_ZC
1517                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1518 #endif
1519                 init_waitqueue_head (&kss->kss_waitq);
1520         }
1521
1522         CDEBUG (D_MALLOC, "ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
1523                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
1524
1525         PORTAL_ALLOC(ksocknal_data.ksnd_ltxs,
1526                      sizeof(ksock_ltx_t) * (SOCKNAL_NLTXS +SOCKNAL_NNBLK_LTXS));
1527         if (ksocknal_data.ksnd_ltxs == NULL) {
1528                 ksocknal_module_fini ();
1529                 return (-ENOMEM);
1530         }
1531
1532         /* Deterministic bugs please */
1533         memset (ksocknal_data.ksnd_ltxs, 0xeb,
1534                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
1535
1536         for (i = 0; i < SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS; i++) {
1537                 ksock_ltx_t *ltx = &((ksock_ltx_t *)ksocknal_data.ksnd_ltxs)[i];
1538
1539                 ltx->ltx_tx.tx_hdr = &ltx->ltx_hdr;
1540                 ltx->ltx_idle = i < SOCKNAL_NLTXS ?
1541                                 &ksocknal_data.ksnd_idle_ltx_list :
1542                                 &ksocknal_data.ksnd_idle_nblk_ltx_list;
1543                 list_add (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
1544         }
1545
1546         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1547         if (rc != 0) {
1548                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1549                 ksocknal_module_fini ();
1550                 RETURN (rc);
1551         }
1552         PtlNIDebug(ksocknal_ni, ~0);
1553
1554         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1555
1556         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1557                 rc = ksocknal_thread_start (ksocknal_scheduler,
1558                                             &ksocknal_data.ksnd_schedulers[i]);
1559                 if (rc != 0) {
1560                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1561                                i, rc);
1562                         ksocknal_module_fini ();
1563                         RETURN (rc);
1564                 }
1565         }
1566
1567         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1568                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1569                 if (rc != 0) {
1570                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1571                         ksocknal_module_fini ();
1572                         RETURN (rc);
1573                 }
1574         }
1575
1576         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1577         if (rc != 0) {
1578                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1579                 ksocknal_module_fini ();
1580                 RETURN (rc);
1581         }
1582
1583         rc = kpr_register(&ksocknal_data.ksnd_router,
1584                           &ksocknal_router_interface);
1585         if (rc != 0) {
1586                 CDEBUG(D_NET, "Can't initialise routing interface "
1587                        "(rc = %d): not routing\n", rc);
1588         } else {
1589                 /* Only allocate forwarding buffers if I'm on a gateway */
1590
1591                 PORTAL_ALLOC(ksocknal_data.ksnd_fmbs,
1592                              sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1593                                                     SOCKNAL_LARGE_FWD_NMSGS));
1594                 if (ksocknal_data.ksnd_fmbs == NULL) {
1595                         ksocknal_module_fini ();
1596                         RETURN(-ENOMEM);
1597                 }
1598
1599                 /* NULL out buffer pointers etc */
1600                 memset(ksocknal_data.ksnd_fmbs, 0,
1601                        sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1602                                               SOCKNAL_LARGE_FWD_NMSGS));
1603
1604                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1605                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1606                         ksock_fmb_t *fmb =
1607                                 &((ksock_fmb_t *)ksocknal_data.ksnd_fmbs)[i];
1608
1609                         if (i < SOCKNAL_SMALL_FWD_NMSGS) {
1610                                 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
1611                                 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
1612                         } else {
1613                                 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
1614                                 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
1615                         }
1616
1617                         LASSERT (fmb->fmb_npages > 0);
1618                         for (j = 0; j < fmb->fmb_npages; j++) {
1619                                 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
1620
1621                                 if (fmb->fmb_pages[j] == NULL) {
1622                                         ksocknal_module_fini ();
1623                                         return (-ENOMEM);
1624                                 }
1625
1626                                 LASSERT(page_address (fmb->fmb_pages[j]) !=
1627                                         NULL);
1628                         }
1629
1630                         list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
1631                 }
1632         }
1633
1634         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1635         if (rc != 0) {
1636                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1637                 ksocknal_module_fini ();
1638                 return (rc);
1639         }
1640
1641         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1642
1643 #ifdef CONFIG_SYSCTL
1644         /* Press on regardless even if registering sysctl doesn't work */
1645         ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1646 #endif
1647         /* flag everything initialised */
1648         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1649         
1650         printk(KERN_INFO "Routing socket NAL loaded (Routing %s, initial "
1651                "mem %d)\n",
1652                kpr_routing (&ksocknal_data.ksnd_router) ?
1653                "enabled" : "disabled", pkmem);
1654
1655         return (0);
1656 }
1657
1658 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1659 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1660 MODULE_LICENSE("GPL");
1661
1662 module_init(ksocknal_module_init);
1663 module_exit(ksocknal_module_fini);
1664
1665 EXPORT_SYMBOL (ksocknal_ni);