Whamcloud - gitweb
* Fixed bug 2119
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40         kprni_notify:     ksocknal_notify,
41 };
42
43 #define SOCKNAL_SYSCTL  200
44
45 #define SOCKNAL_SYSCTL_TIMEOUT     1
46 #define SOCKNAL_SYSCTL_EAGER_ACK   2
47 #define SOCKNAL_SYSCTL_ZERO_COPY   3
48
49 static ctl_table ksocknal_ctl_table[] = {
50         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
51          &ksocknal_data.ksnd_io_timeout, sizeof (int),
52          0644, NULL, &proc_dointvec},
53         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
54          &ksocknal_data.ksnd_eager_ack, sizeof (int),
55          0644, NULL, &proc_dointvec},
56 #if SOCKNAL_ZC
57         {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", 
58          &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
59          0644, NULL, &proc_dointvec},
60 #endif
61         { 0 }
62 };
63
64 static ctl_table ksocknal_top_ctl_table[] = {
65         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
66         { 0 }
67 };
68
69 int
70 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
71                        void *ret, size_t ret_len)
72 {
73         ksock_nal_data_t *k;
74         nal_cb_t *nal_cb;
75
76         k = nal->nal_data;
77         nal_cb = k->ksnd_nal_cb;
78
79         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
80         return PTL_OK;
81 }
82
83 int
84 ksocknal_api_shutdown(nal_t *nal, int ni)
85 {
86         CDEBUG (D_NET, "closing all connections\n");
87
88         ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
89         ksocknal_close_conn (PTL_NID_ANY, 0);
90         return PTL_OK;
91 }
92
93 void
94 ksocknal_api_yield(nal_t *nal)
95 {
96         our_cond_resched();
97         return;
98 }
99
100 void
101 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
102 {
103         ksock_nal_data_t *k;
104         nal_cb_t *nal_cb;
105
106         k = nal->nal_data;
107         nal_cb = k->ksnd_nal_cb;
108         nal_cb->cb_cli(nal_cb,flags);
109 }
110
111 void
112 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
113 {
114         ksock_nal_data_t *k;
115         nal_cb_t *nal_cb;
116
117         k = nal->nal_data;
118         nal_cb = k->ksnd_nal_cb;
119         nal_cb->cb_sti(nal_cb,flags);
120 }
121
122 nal_t *
123 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
124               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
125 {
126         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
127         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
128         return (&ksocknal_api);
129 }
130
131 /*
132  *  EXTRA functions follow
133  */
134
135 int
136 ksocknal_set_mynid(ptl_nid_t nid)
137 {
138         lib_ni_t *ni = &ksocknal_lib.ni;
139
140         /* FIXME: we have to do this because we call lib_init() at module
141          * insertion time, which is before we have 'mynid' available.  lib_init
142          * sets the NAL's nid, which it uses to tell other nodes where packets
143          * are coming from.  This is not a very graceful solution to this
144          * problem. */
145
146         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
147                nid, ni->nid);
148
149         ni->nid = nid;
150         return (0);
151 }
152
153 void
154 ksocknal_bind_irq (unsigned int irq)
155 {
156 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
157         int              bind;
158         unsigned long    flags;
159         char             cmdline[64];
160         ksock_irqinfo_t *info;
161         char            *argv[] = {"/bin/sh",
162                                    "-c",
163                                    cmdline,
164                                    NULL};
165         char            *envp[] = {"HOME=/",
166                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
167                                    NULL};
168
169         LASSERT (irq < NR_IRQS);
170         if (irq == 0)                           /* software NIC */
171                 return;
172
173         info = &ksocknal_data.ksnd_irqinfo[irq];
174
175         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
176
177         LASSERT (info->ksni_valid);
178         bind = !info->ksni_bound;
179         info->ksni_bound = 1;
180
181         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
182
183         if (!bind)                              /* bound already */
184                 return;
185
186         snprintf (cmdline, sizeof (cmdline),
187                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
188
189         printk (KERN_INFO "Binding irq %u to CPU %d with cmd: %s\n",
190                 irq, info->ksni_sched, cmdline);
191
192         /* FIXME: Find a better method of setting IRQ affinity...
193          */
194
195         call_usermodehelper (argv[0], argv, envp);
196 #endif
197 }
198
199 ksock_route_t *
200 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
201                        int nonagel, int xchange_nids, int irq_affinity, int eager)
202 {
203         ksock_route_t *route;
204
205         PORTAL_ALLOC (route, sizeof (*route));
206         if (route == NULL)
207                 return (NULL);
208
209         atomic_set (&route->ksnr_refcount, 1);
210         route->ksnr_sharecount = 0;
211         route->ksnr_peer = NULL;
212         route->ksnr_timeout = jiffies;
213         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
214         route->ksnr_ipaddr = ipaddr;
215         route->ksnr_port = port;
216         route->ksnr_buffer_size = buffer_size;
217         route->ksnr_irq_affinity = irq_affinity;
218         route->ksnr_xchange_nids = xchange_nids;
219         route->ksnr_nonagel = nonagel;
220         route->ksnr_eager = eager;
221         route->ksnr_connecting = 0;
222         route->ksnr_deleted = 0;
223         route->ksnr_generation = 0;
224         route->ksnr_conn = NULL;
225
226         return (route);
227 }
228
229 void
230 ksocknal_destroy_route (ksock_route_t *route)
231 {
232         LASSERT (route->ksnr_sharecount == 0);
233         LASSERT (route->ksnr_conn == NULL);
234
235         if (route->ksnr_peer != NULL)
236                 ksocknal_put_peer (route->ksnr_peer);
237
238         PORTAL_FREE (route, sizeof (*route));
239 }
240
241 void
242 ksocknal_put_route (ksock_route_t *route)
243 {
244         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
245                 route, atomic_read (&route->ksnr_refcount));
246
247         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
248         if (!atomic_dec_and_test (&route->ksnr_refcount))
249              return;
250
251         ksocknal_destroy_route (route);
252 }
253
254 ksock_peer_t *
255 ksocknal_create_peer (ptl_nid_t nid)
256 {
257         ksock_peer_t *peer;
258
259         LASSERT (nid != PTL_NID_ANY);
260
261         PORTAL_ALLOC (peer, sizeof (*peer));
262         if (peer == NULL)
263                 return (NULL);
264
265         memset (peer, 0, sizeof (*peer));
266
267         peer->ksnp_nid = nid;
268         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
269         peer->ksnp_closing = 0;
270         INIT_LIST_HEAD (&peer->ksnp_conns);
271         INIT_LIST_HEAD (&peer->ksnp_routes);
272         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
273
274         /* Can't unload while peers exist; ensures all I/O has terminated
275          * before unload attempts */
276         PORTAL_MODULE_USE;
277         atomic_inc (&ksocknal_data.ksnd_npeers);
278         return (peer);
279 }
280
281 void
282 ksocknal_destroy_peer (ksock_peer_t *peer)
283 {
284         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
285
286         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
287         LASSERT (list_empty (&peer->ksnp_conns));
288         LASSERT (list_empty (&peer->ksnp_routes));
289         LASSERT (list_empty (&peer->ksnp_tx_queue));
290
291         PORTAL_FREE (peer, sizeof (*peer));
292
293         /* NB a peer's connections and autoconnect routes keep a reference
294          * on their peer until they are destroyed, so we can be assured
295          * that _all_ state to do with this peer has been cleaned up when
296          * its refcount drops to zero. */
297         atomic_dec (&ksocknal_data.ksnd_npeers);
298         PORTAL_MODULE_UNUSE;
299 }
300
301 void
302 ksocknal_put_peer (ksock_peer_t *peer)
303 {
304         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
305                 peer, peer->ksnp_nid,
306                 atomic_read (&peer->ksnp_refcount));
307
308         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
309         if (!atomic_dec_and_test (&peer->ksnp_refcount))
310                 return;
311
312         ksocknal_destroy_peer (peer);
313 }
314
315 ksock_peer_t *
316 ksocknal_find_peer_locked (ptl_nid_t nid)
317 {
318         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
319         struct list_head *tmp;
320         ksock_peer_t     *peer;
321
322         list_for_each (tmp, peer_list) {
323
324                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
325
326                 LASSERT (!peer->ksnp_closing);
327                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
328                            list_empty (&peer->ksnp_conns)));
329
330                 if (peer->ksnp_nid != nid)
331                         continue;
332
333                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
334                        peer, nid, atomic_read (&peer->ksnp_refcount));
335                 return (peer);
336         }
337         return (NULL);
338 }
339
340 ksock_peer_t *
341 ksocknal_get_peer (ptl_nid_t nid)
342 {
343         ksock_peer_t     *peer;
344
345         read_lock (&ksocknal_data.ksnd_global_lock);
346         peer = ksocknal_find_peer_locked (nid);
347         if (peer != NULL)                       /* +1 ref for caller? */
348                 atomic_inc (&peer->ksnp_refcount);
349         read_unlock (&ksocknal_data.ksnd_global_lock);
350
351         return (peer);
352 }
353
354 void
355 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
356 {
357         LASSERT (!peer->ksnp_closing);
358         peer->ksnp_closing = 1;
359         list_del (&peer->ksnp_list);
360         /* lose peerlist's ref */
361         ksocknal_put_peer (peer);
362 }
363
364 ksock_route_t *
365 ksocknal_get_route_by_idx (int index)
366 {
367         ksock_peer_t      *peer;
368         struct list_head  *ptmp;
369         ksock_route_t     *route;
370         struct list_head  *rtmp;
371         int                i;
372
373         read_lock (&ksocknal_data.ksnd_global_lock);
374
375         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
376                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
377                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
378
379                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
380                                    list_empty (&peer->ksnp_conns)));
381
382                         list_for_each (rtmp, &peer->ksnp_routes) {
383                                 if (index-- > 0)
384                                         continue;
385
386                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
387                                 atomic_inc (&route->ksnr_refcount);
388                                 read_unlock (&ksocknal_data.ksnd_global_lock);
389                                 return (route);
390                         }
391                 }
392         }
393
394         read_unlock (&ksocknal_data.ksnd_global_lock);
395         return (NULL);
396 }
397
398 int
399 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
400                     int nonagle, int xchange_nids, int bind_irq, 
401                     int share, int eager)
402 {
403         unsigned long      flags;
404         ksock_peer_t      *peer;
405         ksock_peer_t      *peer2;
406         ksock_route_t     *route;
407         struct list_head  *rtmp;
408         ksock_route_t     *route2;
409         
410         if (nid == PTL_NID_ANY)
411                 return (-EINVAL);
412
413         /* Have a brand new peer ready... */
414         peer = ksocknal_create_peer (nid);
415         if (peer == NULL)
416                 return (-ENOMEM);
417
418         route = ksocknal_create_route (ipaddr, port, bufnob, nonagle,
419                                        xchange_nids, bind_irq, eager);
420         if (route == NULL) {
421                 ksocknal_put_peer (peer);
422                 return (-ENOMEM);
423         }
424
425         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
426
427         peer2 = ksocknal_find_peer_locked (nid);
428         if (peer2 != NULL) {
429                 ksocknal_put_peer (peer);
430                 peer = peer2;
431         } else {
432                 /* peer table takes existing ref on peer */
433                 list_add (&peer->ksnp_list,
434                           ksocknal_nid2peerlist (nid));
435         }
436
437         route2 = NULL;
438         if (share) {
439                 /* check for existing route to this NID via this ipaddr */
440                 list_for_each (rtmp, &peer->ksnp_routes) {
441                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
442                         
443                         if (route2->ksnr_ipaddr == ipaddr)
444                                 break;
445
446                         route2 = NULL;
447                 }
448         }
449
450         if (route2 != NULL) {
451                 ksocknal_put_route (route);
452                 route = route2;
453         } else {
454                 /* route takes a ref on peer */
455                 route->ksnr_peer = peer;
456                 atomic_inc (&peer->ksnp_refcount);
457                 /* peer's route list takes existing ref on route */
458                 list_add (&route->ksnr_list, &peer->ksnp_routes);
459         }
460         
461         route->ksnr_sharecount++;
462
463         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
464
465         return (0);
466 }
467
468 void
469 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
470 {
471         ksock_peer_t *peer = route->ksnr_peer;
472         ksock_conn_t *conn = route->ksnr_conn;
473
474         if (!share)
475                 route->ksnr_sharecount = 0;
476         else {
477                 route->ksnr_sharecount--;
478                 if (route->ksnr_sharecount != 0)
479                         return;
480         }
481
482         if (conn != NULL) {
483                 if (!keep_conn)
484                         ksocknal_close_conn_locked (conn, 0);
485                 else {
486                         /* keeping the conn; just dissociate it and route... */
487                         conn->ksnc_route = NULL;
488                         route->ksnr_conn = NULL;
489                         ksocknal_put_route (route); /* drop conn's ref on route */
490                         ksocknal_put_conn (conn); /* drop route's ref on conn */
491                 }
492         }
493
494         route->ksnr_deleted = 1;
495         list_del (&route->ksnr_list);
496         ksocknal_put_route (route);             /* drop peer's ref */
497
498         if (list_empty (&peer->ksnp_routes) &&
499             list_empty (&peer->ksnp_conns)) {
500                 /* I've just removed the last autoconnect route of a peer
501                  * with no active connections */
502                 ksocknal_unlink_peer_locked (peer);
503         }
504 }
505
506 int
507 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
508 {
509         unsigned long      flags;
510         struct list_head  *ptmp;
511         struct list_head  *pnxt;
512         ksock_peer_t      *peer;
513         struct list_head  *rtmp;
514         struct list_head  *rnxt;
515         ksock_route_t     *route;
516         int                lo;
517         int                hi;
518         int                i;
519         int                rc = -ENOENT;
520
521         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
522
523         if (nid != PTL_NID_ANY)
524                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
525         else {
526                 lo = 0;
527                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
528         }
529
530         for (i = lo; i <= hi; i++) {
531                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
532                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
533
534                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
535                                 continue;
536
537                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
538                                 route = list_entry (rtmp, ksock_route_t,
539                                                     ksnr_list);
540
541                                 if (!(ipaddr == 0 ||
542                                       route->ksnr_ipaddr == ipaddr))
543                                         continue;
544
545                                 ksocknal_del_route_locked (route, share, keep_conn);
546                                 rc = 0;         /* matched something */
547                                 if (share)
548                                         goto out;
549                         }
550                 }
551         }
552  out:
553         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
554
555         return (rc);
556 }
557
558 ksock_conn_t *
559 ksocknal_get_conn_by_idx (int index)
560 {
561         ksock_peer_t      *peer;
562         struct list_head  *ptmp;
563         ksock_conn_t      *conn;
564         struct list_head  *ctmp;
565         int                i;
566
567         read_lock (&ksocknal_data.ksnd_global_lock);
568
569         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
570                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
571                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
572
573                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
574                                    list_empty (&peer->ksnp_conns)));
575
576                         list_for_each (ctmp, &peer->ksnp_conns) {
577                                 if (index-- > 0)
578                                         continue;
579
580                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
581                                 atomic_inc (&conn->ksnc_refcount);
582                                 read_unlock (&ksocknal_data.ksnd_global_lock);
583                                 return (conn);
584                         }
585                 }
586         }
587
588         read_unlock (&ksocknal_data.ksnd_global_lock);
589         return (NULL);
590 }
591
592 void
593 ksocknal_get_peer_addr (ksock_conn_t *conn)
594 {
595         struct sockaddr_in sin;
596         int                len = sizeof (sin);
597         int                rc;
598         
599         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
600                                             (struct sockaddr *)&sin, &len, 2);
601         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
602         LASSERT (!conn->ksnc_closing);
603         LASSERT (len <= sizeof (sin));
604
605         if (rc != 0) {
606                 CERROR ("Error %d getting sock peer IP\n", rc);
607                 return;
608         }
609
610         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
611         conn->ksnc_port   = ntohs (sin.sin_port);
612 }
613
614 unsigned int
615 ksocknal_conn_irq (ksock_conn_t *conn)
616 {
617         int                irq = 0;
618         struct dst_entry  *dst;
619
620         dst = sk_dst_get (conn->ksnc_sock->sk);
621         if (dst != NULL) {
622                 if (dst->dev != NULL) {
623                         irq = dst->dev->irq;
624                         if (irq >= NR_IRQS) {
625                                 CERROR ("Unexpected IRQ %x\n", irq);
626                                 irq = 0;
627                         }
628                 }
629                 dst_release (dst);
630         }
631         
632         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
633         LASSERT (!conn->ksnc_closing);
634         return (irq);
635 }
636
637 ksock_sched_t *
638 ksocknal_choose_scheduler_locked (unsigned int irq)
639 {
640         ksock_sched_t    *sched;
641         ksock_irqinfo_t  *info;
642         int               i;
643
644         LASSERT (irq < NR_IRQS);
645         info = &ksocknal_data.ksnd_irqinfo[irq];
646
647         if (irq != 0 &&                         /* hardware NIC */
648             info->ksni_valid) {                 /* already set up */
649                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
650         }
651
652         /* software NIC (irq == 0) || not associated with a scheduler yet.
653          * Choose the CPU with the fewest connections... */
654         sched = &ksocknal_data.ksnd_schedulers[0];
655         for (i = 1; i < SOCKNAL_N_SCHED; i++)
656                 if (sched->kss_nconns >
657                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
658                         sched = &ksocknal_data.ksnd_schedulers[i];
659
660         if (irq != 0) {                         /* Hardware NIC */
661                 info->ksni_valid = 1;
662                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
663
664                 /* no overflow... */
665                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
666         }
667
668         return (sched);
669 }
670
671 int
672 ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
673                       struct socket *sock, int bind_irq)
674 {
675         unsigned long      flags;
676         ksock_conn_t      *conn;
677         ksock_peer_t      *peer;
678         ksock_peer_t      *peer2;
679         ksock_sched_t     *sched;
680         unsigned int       irq;
681         ksock_tx_t        *tx;
682         int                rc;
683
684         /* NB, sock has an associated file since (a) this connection might
685          * have been created in userland and (b) we need to refcount the
686          * socket so that we don't close it while I/O is being done on
687          * it, and sock->file has that pre-cooked... */
688         LASSERT (sock->file != NULL);
689         LASSERT (file_count(sock->file) > 0);
690
691         rc = ksocknal_setup_sock (sock);
692         if (rc != 0)
693                 return (rc);
694
695         peer = NULL;
696         if (route == NULL) {                    /* not autoconnect */
697                 /* Assume this socket connects to a brand new peer */
698                 peer = ksocknal_create_peer (nid);
699                 if (peer == NULL)
700                         return (-ENOMEM);
701         }
702
703         PORTAL_ALLOC(conn, sizeof(*conn));
704         if (conn == NULL) {
705                 if (peer != NULL)
706                         ksocknal_put_peer (peer);
707                 return (-ENOMEM);
708         }
709
710         memset (conn, 0, sizeof (*conn));
711         conn->ksnc_peer = NULL;
712         conn->ksnc_route = NULL;
713         conn->ksnc_sock = sock;
714         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
715         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
716         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
717
718         conn->ksnc_rx_ready = 0;
719         conn->ksnc_rx_scheduled = 0;
720         ksocknal_new_packet (conn, 0);
721
722         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
723         conn->ksnc_tx_ready = 0;
724         conn->ksnc_tx_scheduled = 0;
725         atomic_set (&conn->ksnc_tx_nob, 0);
726
727         ksocknal_get_peer_addr (conn);
728
729         irq = ksocknal_conn_irq (conn);
730
731         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
732
733         if (route != NULL) {
734                 /* Autoconnected! */
735                 LASSERT (route->ksnr_conn == NULL && route->ksnr_connecting);
736
737                 if (route->ksnr_deleted) {
738                         /* This conn was autoconnected, but the autoconnect
739                          * route got deleted while it was being
740                          * established! */
741                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
742                                                  flags);
743                         PORTAL_FREE (conn, sizeof (*conn));
744                         return (-ESTALE);
745                 }
746
747
748                 /* associate conn/route for auto-reconnect */
749                 route->ksnr_conn = conn;
750                 atomic_inc (&conn->ksnc_refcount);
751                 conn->ksnc_route = route;
752                 atomic_inc (&route->ksnr_refcount);
753                 route->ksnr_connecting = 0;
754
755                 route->ksnr_generation++;
756                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
757
758                 peer = route->ksnr_peer;
759         } else {
760                 /* Not an autoconnected connection; see if there is an
761                  * existing peer for this NID */
762                 peer2 = ksocknal_find_peer_locked (nid);
763                 if (peer2 != NULL) {
764                         ksocknal_put_peer (peer);
765                         peer = peer2;
766                 } else {
767                         list_add (&peer->ksnp_list,
768                                   ksocknal_nid2peerlist (nid));
769                         /* peer list takes over existing ref */
770                 }
771         }
772
773         LASSERT (!peer->ksnp_closing);
774
775         conn->ksnc_peer = peer;
776         atomic_inc (&peer->ksnp_refcount);
777         peer->ksnp_last_alive = jiffies;
778         peer->ksnp_error = 0;
779
780         list_add (&conn->ksnc_list, &peer->ksnp_conns);
781         atomic_inc (&conn->ksnc_refcount);
782
783         sched = ksocknal_choose_scheduler_locked (irq);
784         sched->kss_nconns++;
785         conn->ksnc_scheduler = sched;
786
787         /* NB my callbacks block while I hold ksnd_global_lock */
788         sock->sk->sk_user_data = conn;
789         sock->sk->sk_data_ready = ksocknal_data_ready;
790         sock->sk->sk_write_space = ksocknal_write_space;
791
792         /* Take all the packets blocking for a connection.
793          * NB, it might be nicer to share these blocked packets among any
794          * other connections that are becoming established, however that
795          * confuses the normal packet launching operation, which selects a
796          * connection and queues the packet on it without needing an
797          * exclusive lock on ksnd_global_lock. */
798         while (!list_empty (&peer->ksnp_tx_queue)) {
799                 tx = list_entry (peer->ksnp_tx_queue.next,
800                                  ksock_tx_t, tx_list);
801
802                 list_del (&tx->tx_list);
803                 ksocknal_queue_tx_locked (tx, conn);
804         }
805
806         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
807
808         if (bind_irq)                           /* irq binding required */
809                 ksocknal_bind_irq (irq);
810
811         /* Call the callbacks right now to get things going. */
812         ksocknal_data_ready (sock->sk, 0);
813         ksocknal_write_space (sock->sk);
814
815         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
816                conn, conn->ksnc_peer->ksnp_nid);
817
818         ksocknal_put_conn (conn);
819         return (0);
820 }
821
822 void
823 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
824 {
825         /* This just does the immmediate housekeeping, and queues the
826          * connection for the reaper to terminate. 
827          * Caller holds ksnd_global_lock exclusively in irq context */
828         ksock_peer_t   *peer = conn->ksnc_peer;
829         ksock_route_t  *route;
830
831         LASSERT (peer->ksnp_error == 0);
832         LASSERT (!conn->ksnc_closing);
833         conn->ksnc_closing = 1;
834         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
835         
836         route = conn->ksnc_route;
837         if (route != NULL) {
838                 /* dissociate conn from route... */
839                 LASSERT (!route->ksnr_connecting &&
840                          !route->ksnr_deleted);
841
842                 route->ksnr_conn = NULL;
843                 conn->ksnc_route = NULL;
844
845                 ksocknal_put_route (route);     /* drop conn's ref on route */
846                 ksocknal_put_conn (conn);       /* drop route's ref on conn */
847         }
848
849         /* ksnd_deathrow_conns takes over peer's ref */
850         list_del (&conn->ksnc_list);
851
852         if (list_empty (&peer->ksnp_conns)) {
853                 /* No more connections to this peer */
854
855                 peer->ksnp_error = error;       /* stash last conn close reason */
856
857                 if (list_empty (&peer->ksnp_routes)) {
858                         /* I've just closed last conn belonging to a
859                          * non-autoconnecting peer */
860                         ksocknal_unlink_peer_locked (peer);
861                 }
862         }
863
864         spin_lock (&ksocknal_data.ksnd_reaper_lock);
865
866         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
867         wake_up (&ksocknal_data.ksnd_reaper_waitq);
868                 
869         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
870 }
871
872 int
873 ksocknal_close_conn_unlocked (ksock_conn_t *conn, int why) 
874 {
875         unsigned long flags;
876         int           did_it = 0;
877         
878         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
879                 
880         if (!conn->ksnc_closing) {
881                 did_it = 1;
882                 ksocknal_close_conn_locked (conn, why);
883         }
884         
885         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
886
887         return (did_it);
888 }
889
890 void
891 ksocknal_terminate_conn (ksock_conn_t *conn)
892 {
893         /* This gets called by the reaper (guaranteed thread context) to
894          * disengage the socket from its callbacks and close it.
895          * ksnc_refcount will eventually hit zero, and then the reaper will
896          * destroy it. */
897         unsigned long   flags;
898         ksock_peer_t   *peer = conn->ksnc_peer;
899         struct timeval  now;
900         time_t          then = 0;
901         int             notify = 0;
902
903         /* serialise with callbacks */
904         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
905
906         LASSERT (conn->ksnc_closing);
907         
908         /* Remove conn's network callbacks.
909          * NB I _have_ to restore the callback, rather than storing a noop,
910          * since the socket could survive past this module being unloaded!! */
911         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
912         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
913
914         /* A callback could be in progress already; they hold a read lock
915          * on ksnd_global_lock (to serialise with me) and NOOP if
916          * sk_user_data is NULL. */
917         conn->ksnc_sock->sk->sk_user_data = NULL;
918
919         conn->ksnc_scheduler->kss_nconns--;
920
921         if (peer->ksnp_error != 0) {
922                 /* peer's last conn closed in error */
923                 LASSERT (list_empty (&peer->ksnp_conns));
924                 
925                 /* convert peer's last-known-alive timestamp from jiffies */
926                 do_gettimeofday (&now);
927                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
928                 notify = 1;
929         }
930         
931         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
932
933         /* The socket is closed on the final put; either here, or in
934          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
935          * when the connection was established, this will close the socket
936          * immediately, aborting anything buffered in it. Any hung
937          * zero-copy transmits will therefore complete in finite time. */
938         ksocknal_putconnsock (conn);
939
940         if (notify)
941                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
942                             0, then);
943 }
944
945 void
946 ksocknal_destroy_conn (ksock_conn_t *conn)
947 {
948         /* Final coup-de-grace of the reaper */
949         CDEBUG (D_NET, "connection %p\n", conn);
950
951         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
952         LASSERT (conn->ksnc_route == NULL);
953         LASSERT (!conn->ksnc_tx_scheduled);
954         LASSERT (!conn->ksnc_rx_scheduled);
955
956         /* complete queued packets */
957         while (!list_empty (&conn->ksnc_tx_queue)) {
958                 ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
959                                              ksock_tx_t, tx_list);
960
961                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
962                         NTOH__u32 (tx->tx_hdr->type),
963                         NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
964                         NTOH__u64 (tx->tx_hdr->src_nid),
965                         NTOH__u64 (tx->tx_hdr->dest_nid));
966
967                 list_del (&tx->tx_list);
968                 ksocknal_tx_done (tx, 0);
969         }
970
971         /* complete current receive if any */
972         switch (conn->ksnc_rx_state) {
973         case SOCKNAL_RX_BODY:
974                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
975                 break;
976         case SOCKNAL_RX_BODY_FWD:
977                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
978                 break;
979         case SOCKNAL_RX_HEADER:
980         case SOCKNAL_RX_SLOP:
981                 break;
982         default:
983                 LBUG ();
984                 break;
985         }
986
987         ksocknal_put_peer (conn->ksnc_peer);
988
989         PORTAL_FREE (conn, sizeof (*conn));
990         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
991 }
992
993 void
994 ksocknal_put_conn (ksock_conn_t *conn)
995 {
996         unsigned long flags;
997
998         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
999                 conn, conn->ksnc_peer->ksnp_nid,
1000                 atomic_read (&conn->ksnc_refcount));
1001
1002         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1003         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1004                 return;
1005
1006         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1007
1008         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1009         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1010
1011         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1012 }
1013
1014 int
1015 ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
1016 {
1017         unsigned long       flags;
1018         ksock_conn_t       *conn;
1019         struct list_head   *ctmp;
1020         struct list_head   *cnxt;
1021         ksock_peer_t       *peer;
1022         struct list_head   *ptmp;
1023         struct list_head   *pnxt;
1024         int                 lo;
1025         int                 hi;
1026         int                 i;
1027         int                 rc = -ENOENT;
1028
1029         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1030
1031         if (nid != PTL_NID_ANY)
1032                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1033         else {
1034                 lo = 0;
1035                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1036         }
1037
1038         for (i = lo; i <= hi; i++) {
1039                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1040
1041                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1042
1043                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1044                                 continue;
1045
1046                         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1047
1048                                 conn = list_entry (ctmp, ksock_conn_t,
1049                                                    ksnc_list);
1050
1051                                 if (!(ipaddr == 0 ||
1052                                       conn->ksnc_ipaddr == ipaddr))
1053                                         continue;
1054
1055                                 rc = 0;
1056                                 ksocknal_close_conn_locked (conn, 0);
1057                         }
1058                 }
1059         }
1060
1061         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1062
1063         return (rc);
1064 }
1065
1066 void
1067 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1068 {
1069         /* The router is telling me she's been notified of a change in
1070          * gateway state.... */
1071
1072         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1073
1074         if (!alive) {
1075                 /* If the gateway crashed, close all open connections... */
1076                 ksocknal_close_conn (gw_nid, 0);
1077                 return;
1078         }
1079         
1080         /* ...otherwise do nothing.  We can only establish new connections
1081          * if we have autroutes, and these connect on demand. */
1082 }
1083
1084 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1085 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1086 {
1087         return &(sk->tp_pinfo.af_tcp);
1088 }
1089 #else
1090 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1091 {
1092         struct tcp_sock *s = (struct tcp_sock *)sk;
1093         return &s->tcp;
1094 }
1095 #endif
1096
1097 void
1098 ksocknal_push_conn (ksock_conn_t *conn)
1099 {
1100         struct sock    *sk;
1101         struct tcp_opt *tp;
1102         int             nonagle;
1103         int             val = 1;
1104         int             rc;
1105         mm_segment_t    oldmm;
1106
1107         rc = ksocknal_getconnsock (conn);
1108         if (rc != 0)                            /* being shut down */
1109                 return;
1110         
1111         sk = conn->ksnc_sock->sk;
1112         tp = sock2tcp_opt(sk);
1113         
1114         lock_sock (sk);
1115         nonagle = tp->nonagle;
1116         tp->nonagle = 1;
1117         release_sock (sk);
1118
1119         oldmm = get_fs ();
1120         set_fs (KERNEL_DS);
1121
1122         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1123                                       (char *)&val, sizeof (val));
1124         LASSERT (rc == 0);
1125
1126         set_fs (oldmm);
1127
1128         lock_sock (sk);
1129         tp->nonagle = nonagle;
1130         release_sock (sk);
1131
1132         ksocknal_putconnsock (conn);
1133 }
1134
1135 void
1136 ksocknal_push_peer (ksock_peer_t *peer)
1137 {
1138         int               index;
1139         int               i;
1140         struct list_head *tmp;
1141         ksock_conn_t     *conn;
1142
1143         for (index = 0; ; index++) {
1144                 read_lock (&ksocknal_data.ksnd_global_lock);
1145
1146                 i = 0;
1147                 conn = NULL;
1148
1149                 list_for_each (tmp, &peer->ksnp_conns) {
1150                         if (i++ == index) {
1151                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1152                                 atomic_inc (&conn->ksnc_refcount);
1153                                 break;
1154                         }
1155                 }
1156
1157                 read_unlock (&ksocknal_data.ksnd_global_lock);
1158
1159                 if (conn == NULL)
1160                         break;
1161
1162                 ksocknal_push_conn (conn);
1163                 ksocknal_put_conn (conn);
1164         }
1165 }
1166
1167 int
1168 ksocknal_push (ptl_nid_t nid)
1169 {
1170         ksock_peer_t      *peer;
1171         struct list_head  *tmp;
1172         int                index;
1173         int                i;
1174         int                j;
1175         int                rc = -ENOENT;
1176
1177         if (nid != PTL_NID_ANY) {
1178                 peer = ksocknal_get_peer (nid);
1179
1180                 if (peer != NULL) {
1181                         rc = 0;
1182                         ksocknal_push_peer (peer);
1183                         ksocknal_put_peer (peer);
1184                 }
1185                 return (rc);
1186         }
1187
1188         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1189                 for (j = 0; ; j++) {
1190                         read_lock (&ksocknal_data.ksnd_global_lock);
1191
1192                         index = 0;
1193                         peer = NULL;
1194
1195                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1196                                 if (index++ == j) {
1197                                         peer = list_entry(tmp, ksock_peer_t,
1198                                                           ksnp_list);
1199                                         atomic_inc (&peer->ksnp_refcount);
1200                                         break;
1201                                 }
1202                         }
1203
1204                         read_unlock (&ksocknal_data.ksnd_global_lock);
1205
1206                         if (peer != NULL) {
1207                                 rc = 0;
1208                                 ksocknal_push_peer (peer);
1209                                 ksocknal_put_peer (peer);
1210                         }
1211                 }
1212
1213         }
1214
1215         return (rc);
1216 }
1217
1218 int
1219 ksocknal_cmd(struct portal_ioctl_data * data, void * private)
1220 {
1221         int rc = -EINVAL;
1222
1223         LASSERT (data != NULL);
1224
1225         switch(data->ioc_nal_cmd) {
1226         case NAL_CMD_GET_AUTOCONN: {
1227                 ksock_route_t *route = ksocknal_get_route_by_idx (data->ioc_count);
1228
1229                 if (route == NULL)
1230                         rc = -ENOENT;
1231                 else {
1232                         rc = 0;
1233                         data->ioc_nid   = route->ksnr_peer->ksnp_nid;
1234                         data->ioc_id    = route->ksnr_ipaddr;
1235                         data->ioc_misc  = route->ksnr_port;
1236                         data->ioc_count = route->ksnr_generation;
1237                         data->ioc_size  = route->ksnr_buffer_size;
1238                         data->ioc_wait  = route->ksnr_sharecount;
1239                         data->ioc_flags = (route->ksnr_nonagel      ? 1 : 0) |
1240                                           (route->ksnr_xchange_nids ? 2 : 0) |
1241                                           (route->ksnr_irq_affinity ? 4 : 0) |
1242                                           (route->ksnr_eager        ? 8 : 0);
1243                         ksocknal_put_route (route);
1244                 }
1245                 break;
1246         }
1247         case NAL_CMD_ADD_AUTOCONN: {
1248                 rc = ksocknal_add_route (data->ioc_nid, data->ioc_id,
1249                                          data->ioc_misc, data->ioc_size,
1250                                          (data->ioc_flags & 0x01) != 0,
1251                                          (data->ioc_flags & 0x02) != 0,
1252                                          (data->ioc_flags & 0x04) != 0,
1253                                          (data->ioc_flags & 0x08) != 0,
1254                                          (data->ioc_flags & 0x10) != 0);
1255                 break;
1256         }
1257         case NAL_CMD_DEL_AUTOCONN: {
1258                 rc = ksocknal_del_route (data->ioc_nid, data->ioc_id, 
1259                                          (data->ioc_flags & 1) != 0,
1260                                          (data->ioc_flags & 2) != 0);
1261                 break;
1262         }
1263         case NAL_CMD_GET_CONN: {
1264                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (data->ioc_count);
1265
1266                 if (conn == NULL)
1267                         rc = -ENOENT;
1268                 else {
1269                         rc = 0;
1270                         data->ioc_nid  = conn->ksnc_peer->ksnp_nid;
1271                         data->ioc_id   = conn->ksnc_ipaddr;
1272                         data->ioc_misc = conn->ksnc_port;
1273                         ksocknal_put_conn (conn);
1274                 }
1275                 break;
1276         }
1277         case NAL_CMD_REGISTER_PEER_FD: {
1278                 struct socket *sock = sockfd_lookup (data->ioc_fd, &rc);
1279
1280                 if (sock != NULL) {
1281                         rc = ksocknal_create_conn (data->ioc_nid, NULL,
1282                                                    sock, data->ioc_flags);
1283                         if (rc != 0)
1284                                 fput (sock->file);
1285                 }
1286                 break;
1287         }
1288         case NAL_CMD_CLOSE_CONNECTION: {
1289                 rc = ksocknal_close_conn (data->ioc_nid, data->ioc_id);
1290                 break;
1291         }
1292         case NAL_CMD_REGISTER_MYNID: {
1293                 rc = ksocknal_set_mynid (data->ioc_nid);
1294                 break;
1295         }
1296         case NAL_CMD_PUSH_CONNECTION: {
1297                 rc = ksocknal_push (data->ioc_nid);
1298                 break;
1299         }
1300         }
1301
1302         return rc;
1303 }
1304
1305 void
1306 ksocknal_free_buffers (void)
1307 {
1308         if (ksocknal_data.ksnd_fmbs != NULL) {
1309                 ksock_fmb_t *fmb = (ksock_fmb_t *)ksocknal_data.ksnd_fmbs;
1310                 int          i;
1311                 int          j;
1312
1313                 for (i = 0;
1314                      i < (SOCKNAL_SMALL_FWD_NMSGS + SOCKNAL_LARGE_FWD_NMSGS);
1315                      i++, fmb++)
1316                         for (j = 0; j < fmb->fmb_npages; j++)
1317                                 if (fmb->fmb_pages[j] != NULL)
1318                                         __free_page (fmb->fmb_pages[j]);
1319
1320                 PORTAL_FREE (ksocknal_data.ksnd_fmbs,
1321                              sizeof (ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1322                                                      SOCKNAL_LARGE_FWD_NMSGS));
1323         }
1324
1325         LASSERT (ksocknal_data.ksnd_active_ltxs == 0);
1326         if (ksocknal_data.ksnd_ltxs != NULL)
1327                 PORTAL_FREE (ksocknal_data.ksnd_ltxs,
1328                              sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS +
1329                                                      SOCKNAL_NNBLK_LTXS));
1330
1331         if (ksocknal_data.ksnd_schedulers != NULL)
1332                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1333                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1334
1335         PORTAL_FREE (ksocknal_data.ksnd_peers,
1336                      sizeof (struct list_head) * 
1337                      ksocknal_data.ksnd_peer_hash_size);
1338 }
1339
1340 void /*__exit*/
1341 ksocknal_module_fini (void)
1342 {
1343         int   i;
1344
1345         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1346                atomic_read (&portal_kmemory));
1347
1348         switch (ksocknal_data.ksnd_init) {
1349         default:
1350                 LASSERT (0);
1351
1352         case SOCKNAL_INIT_ALL:
1353 #if CONFIG_SYSCTL
1354                 if (ksocknal_data.ksnd_sysctl != NULL)
1355                         unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1356 #endif
1357                 kportal_nal_unregister(SOCKNAL);
1358                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1359                 /* fall through */
1360
1361         case SOCKNAL_INIT_PTL:
1362                 PtlNIFini(ksocknal_ni);
1363                 lib_fini(&ksocknal_lib);
1364                 /* fall through */
1365
1366         case SOCKNAL_INIT_DATA:
1367                 /* Module refcount only gets to zero when all peers
1368                  * have been closed so all lists must be empty */
1369                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1370                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1371                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1372                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1373                 }
1374                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1375                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1376                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1377                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1378
1379                 if (ksocknal_data.ksnd_schedulers != NULL)
1380                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1381                                 ksock_sched_t *kss =
1382                                         &ksocknal_data.ksnd_schedulers[i];
1383
1384                                 LASSERT (list_empty (&kss->kss_tx_conns));
1385                                 LASSERT (list_empty (&kss->kss_rx_conns));
1386                                 LASSERT (kss->kss_nconns == 0);
1387                         }
1388
1389                 /* stop router calling me */
1390                 kpr_shutdown (&ksocknal_data.ksnd_router);
1391
1392                 /* flag threads to terminate; wake and wait for them to die */
1393                 ksocknal_data.ksnd_shuttingdown = 1;
1394                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1395                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1396
1397                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1398                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1399
1400                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1401                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1402                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1403                         set_current_state (TASK_UNINTERRUPTIBLE);
1404                         schedule_timeout (HZ);
1405                 }
1406
1407                 kpr_deregister (&ksocknal_data.ksnd_router);
1408
1409                 ksocknal_free_buffers();
1410                 /* fall through */
1411
1412         case SOCKNAL_INIT_NOTHING:
1413                 break;
1414         }
1415
1416         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1417                atomic_read (&portal_kmemory));
1418
1419         printk(KERN_INFO "Routing socket NAL unloaded (final mem %d)\n",
1420                atomic_read(&portal_kmemory));
1421 }
1422
1423
1424 int __init
1425 ksocknal_module_init (void)
1426 {
1427         int   pkmem = atomic_read(&portal_kmemory);
1428         int   rc;
1429         int   i;
1430         int   j;
1431
1432         /* packet descriptor must fit in a router descriptor's scratchpad */
1433         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1434         /* the following must be sizeof(int) for proc_dointvec() */
1435         LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1436         LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1437
1438         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1439
1440         ksocknal_api.forward  = ksocknal_api_forward;
1441         ksocknal_api.shutdown = ksocknal_api_shutdown;
1442         ksocknal_api.yield    = ksocknal_api_yield;
1443         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
1444         ksocknal_api.lock     = ksocknal_api_lock;
1445         ksocknal_api.unlock   = ksocknal_api_unlock;
1446         ksocknal_api.nal_data = &ksocknal_data;
1447
1448         ksocknal_lib.nal_data = &ksocknal_data;
1449
1450         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1451
1452         ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1453         ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1454 #if SOCKNAL_ZC
1455         ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1456 #endif
1457
1458         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1459         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1460                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1461         if (ksocknal_data.ksnd_peers == NULL)
1462                 RETURN (-ENOMEM);
1463
1464         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1465                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1466
1467         rwlock_init(&ksocknal_data.ksnd_global_lock);
1468
1469         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1470         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1471
1472         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1473         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1474         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1475
1476         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1477         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1478         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1479
1480         spin_lock_init(&ksocknal_data.ksnd_idle_ltx_lock);
1481         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_nblk_ltx_list);
1482         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_ltx_list);
1483         init_waitqueue_head(&ksocknal_data.ksnd_idle_ltx_waitq);
1484
1485         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1486         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1487         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1488         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1489
1490         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1491         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1492         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1493
1494         /* NB memset above zeros whole of ksocknal_data, including
1495          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1496
1497         /* flag lists/ptrs/locks initialised */
1498         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1499
1500         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1501                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1502         if (ksocknal_data.ksnd_schedulers == NULL) {
1503                 ksocknal_module_fini ();
1504                 RETURN(-ENOMEM);
1505         }
1506
1507         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1508                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1509
1510                 spin_lock_init (&kss->kss_lock);
1511                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1512                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1513 #if SOCKNAL_ZC
1514                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1515 #endif
1516                 init_waitqueue_head (&kss->kss_waitq);
1517         }
1518
1519         CDEBUG (D_MALLOC, "ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
1520                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
1521
1522         PORTAL_ALLOC(ksocknal_data.ksnd_ltxs,
1523                      sizeof(ksock_ltx_t) * (SOCKNAL_NLTXS +SOCKNAL_NNBLK_LTXS));
1524         if (ksocknal_data.ksnd_ltxs == NULL) {
1525                 ksocknal_module_fini ();
1526                 return (-ENOMEM);
1527         }
1528
1529         /* Deterministic bugs please */
1530         memset (ksocknal_data.ksnd_ltxs, 0xeb,
1531                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
1532
1533         for (i = 0; i < SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS; i++) {
1534                 ksock_ltx_t *ltx = &((ksock_ltx_t *)ksocknal_data.ksnd_ltxs)[i];
1535
1536                 ltx->ltx_tx.tx_hdr = &ltx->ltx_hdr;
1537                 ltx->ltx_idle = i < SOCKNAL_NLTXS ?
1538                                 &ksocknal_data.ksnd_idle_ltx_list :
1539                                 &ksocknal_data.ksnd_idle_nblk_ltx_list;
1540                 list_add (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
1541         }
1542
1543         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1544         if (rc != 0) {
1545                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1546                 ksocknal_module_fini ();
1547                 RETURN (rc);
1548         }
1549         PtlNIDebug(ksocknal_ni, ~0);
1550
1551         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1552
1553         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1554                 rc = ksocknal_thread_start (ksocknal_scheduler,
1555                                             &ksocknal_data.ksnd_schedulers[i]);
1556                 if (rc != 0) {
1557                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1558                                i, rc);
1559                         ksocknal_module_fini ();
1560                         RETURN (rc);
1561                 }
1562         }
1563
1564         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1565                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1566                 if (rc != 0) {
1567                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1568                         ksocknal_module_fini ();
1569                         RETURN (rc);
1570                 }
1571         }
1572
1573         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1574         if (rc != 0) {
1575                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1576                 ksocknal_module_fini ();
1577                 RETURN (rc);
1578         }
1579
1580         rc = kpr_register(&ksocknal_data.ksnd_router,
1581                           &ksocknal_router_interface);
1582         if (rc != 0) {
1583                 CDEBUG(D_NET, "Can't initialise routing interface "
1584                        "(rc = %d): not routing\n", rc);
1585         } else {
1586                 /* Only allocate forwarding buffers if I'm on a gateway */
1587
1588                 PORTAL_ALLOC(ksocknal_data.ksnd_fmbs,
1589                              sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1590                                                     SOCKNAL_LARGE_FWD_NMSGS));
1591                 if (ksocknal_data.ksnd_fmbs == NULL) {
1592                         ksocknal_module_fini ();
1593                         RETURN(-ENOMEM);
1594                 }
1595
1596                 /* NULL out buffer pointers etc */
1597                 memset(ksocknal_data.ksnd_fmbs, 0,
1598                        sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1599                                               SOCKNAL_LARGE_FWD_NMSGS));
1600
1601                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1602                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1603                         ksock_fmb_t *fmb =
1604                                 &((ksock_fmb_t *)ksocknal_data.ksnd_fmbs)[i];
1605
1606                         if (i < SOCKNAL_SMALL_FWD_NMSGS) {
1607                                 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
1608                                 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
1609                         } else {
1610                                 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
1611                                 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
1612                         }
1613
1614                         LASSERT (fmb->fmb_npages > 0);
1615                         for (j = 0; j < fmb->fmb_npages; j++) {
1616                                 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
1617
1618                                 if (fmb->fmb_pages[j] == NULL) {
1619                                         ksocknal_module_fini ();
1620                                         return (-ENOMEM);
1621                                 }
1622
1623                                 LASSERT(page_address (fmb->fmb_pages[j]) !=
1624                                         NULL);
1625                         }
1626
1627                         list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
1628                 }
1629         }
1630
1631         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1632         if (rc != 0) {
1633                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1634                 ksocknal_module_fini ();
1635                 return (rc);
1636         }
1637
1638         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1639
1640 #ifdef CONFIG_SYSCTL
1641         /* Press on regardless even if registering sysctl doesn't work */
1642         ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1643 #endif
1644         /* flag everything initialised */
1645         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1646         
1647         printk(KERN_INFO "Routing socket NAL loaded (Routing %s, initial "
1648                "mem %d)\n",
1649                kpr_routing (&ksocknal_data.ksnd_router) ?
1650                "enabled" : "disabled", pkmem);
1651
1652         return (0);
1653 }
1654
1655 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1656 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1657 MODULE_LICENSE("GPL");
1658
1659 module_init(ksocknal_module_init);
1660 module_exit(ksocknal_module_fini);
1661
1662 EXPORT_SYMBOL (ksocknal_ni);