Whamcloud - gitweb
* Fixed 2098
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40         kprni_notify:     ksocknal_notify,
41 };
42
43 #define SOCKNAL_SYSCTL  200
44
45 #define SOCKNAL_SYSCTL_TIMEOUT     1
46 #define SOCKNAL_SYSCTL_EAGER_ACK   2
47 #define SOCKNAL_SYSCTL_ZERO_COPY   3
48
49 static ctl_table ksocknal_ctl_table[] = {
50         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
51          &ksocknal_data.ksnd_io_timeout, sizeof (int),
52          0644, NULL, &proc_dointvec},
53         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
54          &ksocknal_data.ksnd_eager_ack, sizeof (int),
55          0644, NULL, &proc_dointvec},
56 #if SOCKNAL_ZC
57         {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", 
58          &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
59          0644, NULL, &proc_dointvec},
60 #endif
61         { 0 }
62 };
63
64 static ctl_table ksocknal_top_ctl_table[] = {
65         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
66         { 0 }
67 };
68
69 int
70 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
71                        void *ret, size_t ret_len)
72 {
73         ksock_nal_data_t *k;
74         nal_cb_t *nal_cb;
75
76         k = nal->nal_data;
77         nal_cb = k->ksnd_nal_cb;
78
79         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
80         return PTL_OK;
81 }
82
83 int
84 ksocknal_api_shutdown(nal_t *nal, int ni)
85 {
86         CDEBUG (D_NET, "closing all connections\n");
87
88         ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
89         ksocknal_close_conn (PTL_NID_ANY, 0);
90         return PTL_OK;
91 }
92
93 void
94 ksocknal_api_yield(nal_t *nal)
95 {
96         our_cond_resched();
97         return;
98 }
99
100 void
101 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
102 {
103         ksock_nal_data_t *k;
104         nal_cb_t *nal_cb;
105
106         k = nal->nal_data;
107         nal_cb = k->ksnd_nal_cb;
108         nal_cb->cb_cli(nal_cb,flags);
109 }
110
111 void
112 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
113 {
114         ksock_nal_data_t *k;
115         nal_cb_t *nal_cb;
116
117         k = nal->nal_data;
118         nal_cb = k->ksnd_nal_cb;
119         nal_cb->cb_sti(nal_cb,flags);
120 }
121
122 nal_t *
123 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
124               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
125 {
126         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
127         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
128         return (&ksocknal_api);
129 }
130
131 /*
132  *  EXTRA functions follow
133  */
134
135 int
136 ksocknal_set_mynid(ptl_nid_t nid)
137 {
138         lib_ni_t *ni = &ksocknal_lib.ni;
139
140         /* FIXME: we have to do this because we call lib_init() at module
141          * insertion time, which is before we have 'mynid' available.  lib_init
142          * sets the NAL's nid, which it uses to tell other nodes where packets
143          * are coming from.  This is not a very graceful solution to this
144          * problem. */
145
146         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
147                nid, ni->nid);
148
149         ni->nid = nid;
150         return (0);
151 }
152
153 void
154 ksocknal_bind_irq (unsigned int irq)
155 {
156 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
157         int              bind;
158         unsigned long    flags;
159         char             cmdline[64];
160         ksock_irqinfo_t *info;
161         char            *argv[] = {"/bin/sh",
162                                    "-c",
163                                    cmdline,
164                                    NULL};
165         char            *envp[] = {"HOME=/",
166                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
167                                    NULL};
168
169         LASSERT (irq < NR_IRQS);
170         if (irq == 0)                           /* software NIC */
171                 return;
172
173         info = &ksocknal_data.ksnd_irqinfo[irq];
174
175         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
176
177         LASSERT (info->ksni_valid);
178         bind = !info->ksni_bound;
179         info->ksni_bound = 1;
180
181         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
182
183         if (!bind)                              /* bound already */
184                 return;
185
186         snprintf (cmdline, sizeof (cmdline),
187                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
188
189         printk (KERN_INFO "Binding irq %u to CPU %d with cmd: %s\n",
190                 irq, info->ksni_sched, cmdline);
191
192         /* FIXME: Find a better method of setting IRQ affinity...
193          */
194
195         call_usermodehelper (argv[0], argv, envp);
196 #endif
197 }
198
199 ksock_route_t *
200 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
201                        int nonagel, int xchange_nids, int irq_affinity, int eager)
202 {
203         ksock_route_t *route;
204
205         PORTAL_ALLOC (route, sizeof (*route));
206         if (route == NULL)
207                 return (NULL);
208
209         atomic_set (&route->ksnr_refcount, 1);
210         route->ksnr_sharecount = 0;
211         route->ksnr_peer = NULL;
212         route->ksnr_timeout = jiffies;
213         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
214         route->ksnr_ipaddr = ipaddr;
215         route->ksnr_port = port;
216         route->ksnr_buffer_size = buffer_size;
217         route->ksnr_irq_affinity = irq_affinity;
218         route->ksnr_xchange_nids = xchange_nids;
219         route->ksnr_nonagel = nonagel;
220         route->ksnr_eager = eager;
221         route->ksnr_connecting = 0;
222         route->ksnr_deleted = 0;
223         route->ksnr_generation = 0;
224         route->ksnr_conn = NULL;
225
226         return (route);
227 }
228
229 void
230 ksocknal_destroy_route (ksock_route_t *route)
231 {
232         LASSERT (route->ksnr_sharecount == 0);
233         LASSERT (route->ksnr_conn == NULL);
234
235         if (route->ksnr_peer != NULL)
236                 ksocknal_put_peer (route->ksnr_peer);
237
238         PORTAL_FREE (route, sizeof (*route));
239 }
240
241 void
242 ksocknal_put_route (ksock_route_t *route)
243 {
244         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
245                 route, atomic_read (&route->ksnr_refcount));
246
247         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
248         if (!atomic_dec_and_test (&route->ksnr_refcount))
249              return;
250
251         ksocknal_destroy_route (route);
252 }
253
254 ksock_peer_t *
255 ksocknal_create_peer (ptl_nid_t nid)
256 {
257         ksock_peer_t *peer;
258
259         LASSERT (nid != PTL_NID_ANY);
260
261         PORTAL_ALLOC (peer, sizeof (*peer));
262         if (peer == NULL)
263                 return (NULL);
264
265         memset (peer, 0, sizeof (*peer));
266
267         peer->ksnp_nid = nid;
268         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
269         peer->ksnp_closing = 0;
270         INIT_LIST_HEAD (&peer->ksnp_conns);
271         INIT_LIST_HEAD (&peer->ksnp_routes);
272         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
273
274         /* Can't unload while peers exist; ensures all I/O has terminated
275          * before unload attempts */
276         PORTAL_MODULE_USE;
277         atomic_inc (&ksocknal_data.ksnd_npeers);
278         return (peer);
279 }
280
281 void
282 ksocknal_destroy_peer (ksock_peer_t *peer)
283 {
284         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
285
286         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
287         LASSERT (list_empty (&peer->ksnp_conns));
288         LASSERT (list_empty (&peer->ksnp_routes));
289         LASSERT (list_empty (&peer->ksnp_tx_queue));
290
291         PORTAL_FREE (peer, sizeof (*peer));
292
293         /* NB a peer's connections and autoconnect routes keep a reference
294          * on their peer until they are destroyed, so we can be assured
295          * that _all_ state to do with this peer has been cleaned up when
296          * its refcount drops to zero. */
297         atomic_dec (&ksocknal_data.ksnd_npeers);
298         PORTAL_MODULE_UNUSE;
299 }
300
301 void
302 ksocknal_put_peer (ksock_peer_t *peer)
303 {
304         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
305                 peer, peer->ksnp_nid,
306                 atomic_read (&peer->ksnp_refcount));
307
308         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
309         if (!atomic_dec_and_test (&peer->ksnp_refcount))
310                 return;
311
312         ksocknal_destroy_peer (peer);
313 }
314
315 ksock_peer_t *
316 ksocknal_find_peer_locked (ptl_nid_t nid)
317 {
318         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
319         struct list_head *tmp;
320         ksock_peer_t     *peer;
321
322         list_for_each (tmp, peer_list) {
323
324                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
325
326                 LASSERT (!peer->ksnp_closing);
327                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
328                            list_empty (&peer->ksnp_conns)));
329
330                 if (peer->ksnp_nid != nid)
331                         continue;
332
333                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
334                        peer, nid, atomic_read (&peer->ksnp_refcount));
335                 return (peer);
336         }
337         return (NULL);
338 }
339
340 ksock_peer_t *
341 ksocknal_get_peer (ptl_nid_t nid)
342 {
343         ksock_peer_t     *peer;
344
345         read_lock (&ksocknal_data.ksnd_global_lock);
346         peer = ksocknal_find_peer_locked (nid);
347         if (peer != NULL)                       /* +1 ref for caller? */
348                 atomic_inc (&peer->ksnp_refcount);
349         read_unlock (&ksocknal_data.ksnd_global_lock);
350
351         return (peer);
352 }
353
354 void
355 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
356 {
357         LASSERT (!peer->ksnp_closing);
358         peer->ksnp_closing = 1;
359         list_del (&peer->ksnp_list);
360         /* lose peerlist's ref */
361         ksocknal_put_peer (peer);
362 }
363
364 ksock_route_t *
365 ksocknal_get_route_by_idx (int index)
366 {
367         ksock_peer_t      *peer;
368         struct list_head  *ptmp;
369         ksock_route_t     *route;
370         struct list_head  *rtmp;
371         int                i;
372
373         read_lock (&ksocknal_data.ksnd_global_lock);
374
375         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
376                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
377                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
378
379                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
380                                    list_empty (&peer->ksnp_conns)));
381
382                         list_for_each (rtmp, &peer->ksnp_routes) {
383                                 if (index-- > 0)
384                                         continue;
385
386                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
387                                 atomic_inc (&route->ksnr_refcount);
388                                 read_unlock (&ksocknal_data.ksnd_global_lock);
389                                 return (route);
390                         }
391                 }
392         }
393
394         read_unlock (&ksocknal_data.ksnd_global_lock);
395         return (NULL);
396 }
397
398 int
399 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
400                     int nonagle, int xchange_nids, int bind_irq, 
401                     int share, int eager)
402 {
403         unsigned long      flags;
404         ksock_peer_t      *peer;
405         ksock_peer_t      *peer2;
406         ksock_route_t     *route;
407         struct list_head  *rtmp;
408         ksock_route_t     *route2;
409         
410         if (nid == PTL_NID_ANY)
411                 return (-EINVAL);
412
413         /* Have a brand new peer ready... */
414         peer = ksocknal_create_peer (nid);
415         if (peer == NULL)
416                 return (-ENOMEM);
417
418         route = ksocknal_create_route (ipaddr, port, bufnob, nonagle,
419                                        xchange_nids, bind_irq, eager);
420         if (route == NULL) {
421                 ksocknal_put_peer (peer);
422                 return (-ENOMEM);
423         }
424
425         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
426
427         peer2 = ksocknal_find_peer_locked (nid);
428         if (peer2 != NULL) {
429                 ksocknal_put_peer (peer);
430                 peer = peer2;
431         } else {
432                 /* peer table takes existing ref on peer */
433                 list_add (&peer->ksnp_list,
434                           ksocknal_nid2peerlist (nid));
435         }
436
437         route2 = NULL;
438         if (share) {
439                 /* check for existing route to this NID via this ipaddr */
440                 list_for_each (rtmp, &peer->ksnp_routes) {
441                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
442                         
443                         if (route2->ksnr_ipaddr == ipaddr)
444                                 break;
445
446                         route2 = NULL;
447                 }
448         }
449
450         if (route2 != NULL) {
451                 ksocknal_put_route (route);
452                 route = route2;
453         } else {
454                 /* route takes a ref on peer */
455                 route->ksnr_peer = peer;
456                 atomic_inc (&peer->ksnp_refcount);
457                 /* peer's route list takes existing ref on route */
458                 list_add (&route->ksnr_list, &peer->ksnp_routes);
459         }
460         
461         route->ksnr_sharecount++;
462
463         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
464
465         return (0);
466 }
467
468 void
469 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
470 {
471         ksock_peer_t *peer = route->ksnr_peer;
472         ksock_conn_t *conn = route->ksnr_conn;
473
474         if (!share)
475                 route->ksnr_sharecount = 0;
476         else {
477                 route->ksnr_sharecount--;
478                 if (route->ksnr_sharecount != 0)
479                         return;
480         }
481
482         if (conn != NULL) {
483                 if (!keep_conn)
484                         ksocknal_close_conn_locked (conn, 0);
485                 else {
486                         /* keeping the conn; just dissociate it and route... */
487                         conn->ksnc_route = NULL;
488                         route->ksnr_conn = NULL;
489                         ksocknal_put_route (route); /* drop conn's ref on route */
490                         ksocknal_put_conn (conn); /* drop route's ref on conn */
491                 }
492         }
493
494         route->ksnr_deleted = 1;
495         list_del (&route->ksnr_list);
496         ksocknal_put_route (route);             /* drop peer's ref */
497
498         if (list_empty (&peer->ksnp_routes) &&
499             list_empty (&peer->ksnp_conns)) {
500                 /* I've just removed the last autoconnect route of a peer
501                  * with no active connections */
502                 ksocknal_unlink_peer_locked (peer);
503         }
504 }
505
506 int
507 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
508 {
509         unsigned long      flags;
510         struct list_head  *ptmp;
511         struct list_head  *pnxt;
512         ksock_peer_t      *peer;
513         struct list_head  *rtmp;
514         struct list_head  *rnxt;
515         ksock_route_t     *route;
516         int                lo;
517         int                hi;
518         int                i;
519         int                rc = -ENOENT;
520
521         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
522
523         if (nid != PTL_NID_ANY)
524                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
525         else {
526                 lo = 0;
527                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
528         }
529
530         for (i = lo; i <= hi; i++) {
531                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
532                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
533
534                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
535                                 continue;
536
537                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
538                                 route = list_entry (rtmp, ksock_route_t,
539                                                     ksnr_list);
540
541                                 if (!(ipaddr == 0 ||
542                                       route->ksnr_ipaddr == ipaddr))
543                                         continue;
544
545                                 ksocknal_del_route_locked (route, share, keep_conn);
546                                 rc = 0;         /* matched something */
547                                 if (share)
548                                         goto out;
549                         }
550                 }
551         }
552  out:
553         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
554
555         return (rc);
556 }
557
558 ksock_conn_t *
559 ksocknal_get_conn_by_idx (int index)
560 {
561         ksock_peer_t      *peer;
562         struct list_head  *ptmp;
563         ksock_conn_t      *conn;
564         struct list_head  *ctmp;
565         int                i;
566
567         read_lock (&ksocknal_data.ksnd_global_lock);
568
569         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
570                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
571                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
572
573                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
574                                    list_empty (&peer->ksnp_conns)));
575
576                         list_for_each (ctmp, &peer->ksnp_conns) {
577                                 if (index-- > 0)
578                                         continue;
579
580                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
581                                 atomic_inc (&conn->ksnc_refcount);
582                                 read_unlock (&ksocknal_data.ksnd_global_lock);
583                                 return (conn);
584                         }
585                 }
586         }
587
588         read_unlock (&ksocknal_data.ksnd_global_lock);
589         return (NULL);
590 }
591
592 void
593 ksocknal_get_peer_addr (ksock_conn_t *conn)
594 {
595         struct sockaddr_in sin;
596         int                len = sizeof (sin);
597         int                rc;
598         
599         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
600                                             (struct sockaddr *)&sin, &len, 2);
601         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
602         LASSERT (!conn->ksnc_closing);
603         LASSERT (len <= sizeof (sin));
604
605         if (rc != 0) {
606                 CERROR ("Error %d getting sock peer IP\n", rc);
607                 return;
608         }
609
610         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
611         conn->ksnc_port   = ntohs (sin.sin_port);
612 }
613
614 unsigned int
615 ksocknal_conn_irq (ksock_conn_t *conn)
616 {
617         int                irq = 0;
618         struct dst_entry  *dst;
619
620         dst = sk_dst_get (conn->ksnc_sock->sk);
621         if (dst != NULL) {
622                 if (dst->dev != NULL) {
623                         irq = dst->dev->irq;
624                         if (irq >= NR_IRQS) {
625                                 CERROR ("Unexpected IRQ %x\n", irq);
626                                 irq = 0;
627                         }
628                 }
629                 dst_release (dst);
630         }
631         
632         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
633         LASSERT (!conn->ksnc_closing);
634         return (irq);
635 }
636
637 ksock_sched_t *
638 ksocknal_choose_scheduler_locked (unsigned int irq)
639 {
640         ksock_sched_t    *sched;
641         ksock_irqinfo_t  *info;
642         int               i;
643
644         LASSERT (irq < NR_IRQS);
645         info = &ksocknal_data.ksnd_irqinfo[irq];
646
647         if (irq != 0 &&                         /* hardware NIC */
648             info->ksni_valid) {                 /* already set up */
649                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
650         }
651
652         /* software NIC (irq == 0) || not associated with a scheduler yet.
653          * Choose the CPU with the fewest connections... */
654         sched = &ksocknal_data.ksnd_schedulers[0];
655         for (i = 1; i < SOCKNAL_N_SCHED; i++)
656                 if (sched->kss_nconns >
657                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
658                         sched = &ksocknal_data.ksnd_schedulers[i];
659
660         if (irq != 0) {                         /* Hardware NIC */
661                 info->ksni_valid = 1;
662                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
663
664                 /* no overflow... */
665                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
666         }
667
668         return (sched);
669 }
670
671 int
672 ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
673                       struct socket *sock, int bind_irq)
674 {
675         unsigned long      flags;
676         ksock_conn_t      *conn;
677         ksock_peer_t      *peer;
678         ksock_peer_t      *peer2;
679         ksock_sched_t     *sched;
680         unsigned int       irq;
681         ksock_tx_t        *tx;
682         int                rc;
683
684         /* NB, sock has an associated file since (a) this connection might
685          * have been created in userland and (b) we need to refcount the
686          * socket so that we don't close it while I/O is being done on
687          * it, and sock->file has that pre-cooked... */
688         LASSERT (sock->file != NULL);
689         LASSERT (file_count(sock->file) > 0);
690
691         rc = ksocknal_setup_sock (sock);
692         if (rc != 0)
693                 return (rc);
694
695         peer = NULL;
696         if (route == NULL) {                    /* not autoconnect */
697                 /* Assume this socket connects to a brand new peer */
698                 peer = ksocknal_create_peer (nid);
699                 if (peer == NULL)
700                         return (-ENOMEM);
701         }
702
703         PORTAL_ALLOC(conn, sizeof(*conn));
704         if (conn == NULL) {
705                 if (peer != NULL)
706                         ksocknal_put_peer (peer);
707                 return (-ENOMEM);
708         }
709
710         memset (conn, 0, sizeof (*conn));
711         conn->ksnc_peer = NULL;
712         conn->ksnc_route = NULL;
713         conn->ksnc_sock = sock;
714         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
715         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
716         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
717
718         conn->ksnc_rx_ready = 0;
719         conn->ksnc_rx_scheduled = 0;
720         ksocknal_new_packet (conn, 0);
721
722         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
723         conn->ksnc_tx_ready = 0;
724         conn->ksnc_tx_scheduled = 0;
725         atomic_set (&conn->ksnc_tx_nob, 0);
726
727         ksocknal_get_peer_addr (conn);
728
729         irq = ksocknal_conn_irq (conn);
730
731         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
732
733         if (route != NULL) {
734                 /* Autoconnected! */
735                 LASSERT (route->ksnr_conn == NULL && route->ksnr_connecting);
736
737                 if (route->ksnr_deleted) {
738                         /* This conn was autoconnected, but the autoconnect
739                          * route got deleted while it was being
740                          * established! */
741                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
742                                                  flags);
743                         PORTAL_FREE (conn, sizeof (*conn));
744                         return (-ESTALE);
745                 }
746
747
748                 /* associate conn/route for auto-reconnect */
749                 route->ksnr_conn = conn;
750                 atomic_inc (&conn->ksnc_refcount);
751                 conn->ksnc_route = route;
752                 atomic_inc (&route->ksnr_refcount);
753                 route->ksnr_connecting = 0;
754
755                 route->ksnr_generation++;
756                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
757
758                 peer = route->ksnr_peer;
759         } else {
760                 /* Not an autoconnected connection; see if there is an
761                  * existing peer for this NID */
762                 peer2 = ksocknal_find_peer_locked (nid);
763                 if (peer2 != NULL) {
764                         ksocknal_put_peer (peer);
765                         peer = peer2;
766                 } else {
767                         list_add (&peer->ksnp_list,
768                                   ksocknal_nid2peerlist (nid));
769                         /* peer list takes over existing ref */
770                 }
771         }
772
773         LASSERT (!peer->ksnp_closing);
774
775         conn->ksnc_peer = peer;
776         atomic_inc (&peer->ksnp_refcount);
777         peer->ksnp_last_alive = jiffies;
778         peer->ksnp_error = 0;
779
780         list_add (&conn->ksnc_list, &peer->ksnp_conns);
781         atomic_inc (&conn->ksnc_refcount);
782
783         sched = ksocknal_choose_scheduler_locked (irq);
784         sched->kss_nconns++;
785         conn->ksnc_scheduler = sched;
786
787         /* NB my callbacks block while I hold ksnd_global_lock */
788         sock->sk->sk_user_data = conn;
789         sock->sk->sk_data_ready = ksocknal_data_ready;
790         sock->sk->sk_write_space = ksocknal_write_space;
791
792         /* Take all the packets blocking for a connection.
793          * NB, it might be nicer to share these blocked packets among any
794          * other connections that are becoming established, however that
795          * confuses the normal packet launching operation, which selects a
796          * connection and queues the packet on it without needing an
797          * exclusive lock on ksnd_global_lock. */
798         while (!list_empty (&peer->ksnp_tx_queue)) {
799                 tx = list_entry (peer->ksnp_tx_queue.next,
800                                  ksock_tx_t, tx_list);
801
802                 list_del (&tx->tx_list);
803                 ksocknal_queue_tx_locked (tx, conn);
804         }
805
806         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
807
808         if (bind_irq)                           /* irq binding required */
809                 ksocknal_bind_irq (irq);
810
811         /* Call the callbacks right now to get things going. */
812         ksocknal_data_ready (sock->sk, 0);
813         ksocknal_write_space (sock->sk);
814
815         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
816                conn, conn->ksnc_peer->ksnp_nid);
817
818         ksocknal_put_conn (conn);
819         return (0);
820 }
821
822 void
823 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
824 {
825         /* This just does the immmediate housekeeping, and queues the
826          * connection for the reaper to terminate. 
827          * Caller holds ksnd_global_lock exclusively in irq context */
828         ksock_peer_t   *peer = conn->ksnc_peer;
829         ksock_route_t  *route;
830
831         LASSERT (peer->ksnp_error == 0);
832         LASSERT (!conn->ksnc_closing);
833         conn->ksnc_closing = 1;
834         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
835         
836         route = conn->ksnc_route;
837         if (route != NULL) {
838                 /* dissociate conn from route... */
839                 LASSERT (!route->ksnr_connecting &&
840                          !route->ksnr_deleted);
841
842                 route->ksnr_conn = NULL;
843                 conn->ksnc_route = NULL;
844
845                 ksocknal_put_route (route);     /* drop conn's ref on route */
846                 ksocknal_put_conn (conn);       /* drop route's ref on conn */
847         }
848
849         /* ksnd_deathrow_conns takes over peer's ref */
850         list_del (&conn->ksnc_list);
851
852         if (list_empty (&peer->ksnp_conns)) {
853                 /* No more connections to this peer */
854
855                 peer->ksnp_error = error;       /* stash last conn close reason */
856
857                 if (list_empty (&peer->ksnp_routes)) {
858                         /* I've just closed last conn belonging to a
859                          * non-autoconnecting peer */
860                         ksocknal_unlink_peer_locked (peer);
861                 }
862         }
863
864         spin_lock (&ksocknal_data.ksnd_reaper_lock);
865
866         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
867         if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
868                 wake_up (&ksocknal_data.ksnd_reaper_waitq);
869                 
870         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
871 }
872
873 int
874 ksocknal_close_conn_unlocked (ksock_conn_t *conn, int why) 
875 {
876         unsigned long flags;
877         int           did_it = 0;
878         
879         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
880                 
881         if (!conn->ksnc_closing) {
882                 did_it = 1;
883                 ksocknal_close_conn_locked (conn, why);
884         }
885         
886         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
887
888         return (did_it);
889 }
890
891 void
892 ksocknal_terminate_conn (ksock_conn_t *conn)
893 {
894         /* This gets called by the reaper (guaranteed thread context) to
895          * disengage the socket from its callbacks and close it.
896          * ksnc_refcount will eventually hit zero, and then the reaper will
897          * destroy it. */
898         unsigned long   flags;
899         ksock_peer_t   *peer = conn->ksnc_peer;
900         struct timeval  now;
901         time_t          then = 0;
902         int             notify = 0;
903
904         /* serialise with callbacks */
905         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
906
907         LASSERT (conn->ksnc_closing);
908         
909         /* Remove conn's network callbacks.
910          * NB I _have_ to restore the callback, rather than storing a noop,
911          * since the socket could survive past this module being unloaded!! */
912         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
913         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
914
915         /* A callback could be in progress already; they hold a read lock
916          * on ksnd_global_lock (to serialise with me) and NOOP if
917          * sk_user_data is NULL. */
918         conn->ksnc_sock->sk->sk_user_data = NULL;
919
920         conn->ksnc_scheduler->kss_nconns--;
921
922         if (peer->ksnp_error != 0) {
923                 /* peer's last conn closed in error */
924                 LASSERT (list_empty (&peer->ksnp_conns));
925                 
926                 /* convert peer's last-known-alive timestamp from jiffies */
927                 do_gettimeofday (&now);
928                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
929                 notify = 1;
930         }
931         
932         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
933
934         /* The socket is closed on the final put; either here, or in
935          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
936          * when the connection was established, this will close the socket
937          * immediately, aborting anything buffered in it. Any hung
938          * zero-copy transmits will therefore complete in finite time. */
939         ksocknal_putconnsock (conn);
940
941         if (notify)
942                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
943                             0, then);
944 }
945
946 void
947 ksocknal_destroy_conn (ksock_conn_t *conn)
948 {
949         /* Final coup-de-grace of the reaper */
950         CDEBUG (D_NET, "connection %p\n", conn);
951
952         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
953         LASSERT (conn->ksnc_route == NULL);
954         LASSERT (!conn->ksnc_tx_scheduled);
955         LASSERT (!conn->ksnc_rx_scheduled);
956
957         /* complete queued packets */
958         while (!list_empty (&conn->ksnc_tx_queue)) {
959                 ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
960                                              ksock_tx_t, tx_list);
961
962                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
963                         NTOH__u32 (tx->tx_hdr->type),
964                         NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
965                         NTOH__u64 (tx->tx_hdr->src_nid),
966                         NTOH__u64 (tx->tx_hdr->dest_nid));
967
968                 list_del (&tx->tx_list);
969                 ksocknal_tx_done (tx, 0);
970         }
971
972         /* complete current receive if any */
973         switch (conn->ksnc_rx_state) {
974         case SOCKNAL_RX_BODY:
975                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
976                 break;
977         case SOCKNAL_RX_BODY_FWD:
978                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
979                 break;
980         case SOCKNAL_RX_HEADER:
981         case SOCKNAL_RX_SLOP:
982                 break;
983         default:
984                 LBUG ();
985                 break;
986         }
987
988         ksocknal_put_peer (conn->ksnc_peer);
989
990         PORTAL_FREE (conn, sizeof (*conn));
991         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
992 }
993
994 void
995 ksocknal_put_conn (ksock_conn_t *conn)
996 {
997         unsigned long flags;
998
999         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1000                 conn, conn->ksnc_peer->ksnp_nid,
1001                 atomic_read (&conn->ksnc_refcount));
1002
1003         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1004         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1005                 return;
1006
1007         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1008
1009         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1010         if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
1011                 wake_up (&ksocknal_data.ksnd_reaper_waitq);
1012
1013         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1014 }
1015
1016 int
1017 ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
1018 {
1019         unsigned long       flags;
1020         ksock_conn_t       *conn;
1021         struct list_head   *ctmp;
1022         struct list_head   *cnxt;
1023         ksock_peer_t       *peer;
1024         struct list_head   *ptmp;
1025         struct list_head   *pnxt;
1026         int                 lo;
1027         int                 hi;
1028         int                 i;
1029         int                 rc = -ENOENT;
1030
1031         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1032
1033         if (nid != PTL_NID_ANY)
1034                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1035         else {
1036                 lo = 0;
1037                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1038         }
1039
1040         for (i = lo; i <= hi; i++) {
1041                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1042
1043                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1044
1045                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1046                                 continue;
1047
1048                         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1049
1050                                 conn = list_entry (ctmp, ksock_conn_t,
1051                                                    ksnc_list);
1052
1053                                 if (!(ipaddr == 0 ||
1054                                       conn->ksnc_ipaddr == ipaddr))
1055                                         continue;
1056
1057                                 rc = 0;
1058                                 ksocknal_close_conn_locked (conn, 0);
1059                         }
1060                 }
1061         }
1062
1063         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1064
1065         return (rc);
1066 }
1067
1068 void
1069 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1070 {
1071         /* The router is telling me she's been notified of a change in
1072          * gateway state.... */
1073
1074         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1075
1076         if (!alive) {
1077                 /* If the gateway crashed, close all open connections... */
1078                 ksocknal_close_conn (gw_nid, 0);
1079                 return;
1080         }
1081         
1082         /* ...otherwise do nothing.  We can only establish new connections
1083          * if we have autroutes, and these connect on demand. */
1084 }
1085
1086 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1087 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1088 {
1089         return &(sk->tp_pinfo.af_tcp);
1090 }
1091 #else
1092 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1093 {
1094         struct tcp_sock *s = (struct tcp_sock *)sk;
1095         return &s->tcp;
1096 }
1097 #endif
1098
1099 void
1100 ksocknal_push_conn (ksock_conn_t *conn)
1101 {
1102         struct sock    *sk;
1103         struct tcp_opt *tp;
1104         int             nonagle;
1105         int             val = 1;
1106         int             rc;
1107         mm_segment_t    oldmm;
1108
1109         rc = ksocknal_getconnsock (conn);
1110         if (rc != 0)                            /* being shut down */
1111                 return;
1112         
1113         sk = conn->ksnc_sock->sk;
1114         tp = sock2tcp_opt(sk);
1115         
1116         lock_sock (sk);
1117         nonagle = tp->nonagle;
1118         tp->nonagle = 1;
1119         release_sock (sk);
1120
1121         oldmm = get_fs ();
1122         set_fs (KERNEL_DS);
1123
1124         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1125                                       (char *)&val, sizeof (val));
1126         LASSERT (rc == 0);
1127
1128         set_fs (oldmm);
1129
1130         lock_sock (sk);
1131         tp->nonagle = nonagle;
1132         release_sock (sk);
1133
1134         ksocknal_putconnsock (conn);
1135 }
1136
1137 void
1138 ksocknal_push_peer (ksock_peer_t *peer)
1139 {
1140         int               index;
1141         int               i;
1142         struct list_head *tmp;
1143         ksock_conn_t     *conn;
1144
1145         for (index = 0; ; index++) {
1146                 read_lock (&ksocknal_data.ksnd_global_lock);
1147
1148                 i = 0;
1149                 conn = NULL;
1150
1151                 list_for_each (tmp, &peer->ksnp_conns) {
1152                         if (i++ == index) {
1153                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1154                                 atomic_inc (&conn->ksnc_refcount);
1155                                 break;
1156                         }
1157                 }
1158
1159                 read_unlock (&ksocknal_data.ksnd_global_lock);
1160
1161                 if (conn == NULL)
1162                         break;
1163
1164                 ksocknal_push_conn (conn);
1165                 ksocknal_put_conn (conn);
1166         }
1167 }
1168
1169 int
1170 ksocknal_push (ptl_nid_t nid)
1171 {
1172         ksock_peer_t      *peer;
1173         struct list_head  *tmp;
1174         int                index;
1175         int                i;
1176         int                j;
1177         int                rc = -ENOENT;
1178
1179         if (nid != PTL_NID_ANY) {
1180                 peer = ksocknal_get_peer (nid);
1181
1182                 if (peer != NULL) {
1183                         rc = 0;
1184                         ksocknal_push_peer (peer);
1185                         ksocknal_put_peer (peer);
1186                 }
1187                 return (rc);
1188         }
1189
1190         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1191                 for (j = 0; ; j++) {
1192                         read_lock (&ksocknal_data.ksnd_global_lock);
1193
1194                         index = 0;
1195                         peer = NULL;
1196
1197                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1198                                 if (index++ == j) {
1199                                         peer = list_entry(tmp, ksock_peer_t,
1200                                                           ksnp_list);
1201                                         atomic_inc (&peer->ksnp_refcount);
1202                                         break;
1203                                 }
1204                         }
1205
1206                         read_unlock (&ksocknal_data.ksnd_global_lock);
1207
1208                         if (peer != NULL) {
1209                                 rc = 0;
1210                                 ksocknal_push_peer (peer);
1211                                 ksocknal_put_peer (peer);
1212                         }
1213                 }
1214
1215         }
1216
1217         return (rc);
1218 }
1219
1220 int
1221 ksocknal_cmd(struct portal_ioctl_data * data, void * private)
1222 {
1223         int rc = -EINVAL;
1224
1225         LASSERT (data != NULL);
1226
1227         switch(data->ioc_nal_cmd) {
1228         case NAL_CMD_GET_AUTOCONN: {
1229                 ksock_route_t *route = ksocknal_get_route_by_idx (data->ioc_count);
1230
1231                 if (route == NULL)
1232                         rc = -ENOENT;
1233                 else {
1234                         rc = 0;
1235                         data->ioc_nid   = route->ksnr_peer->ksnp_nid;
1236                         data->ioc_id    = route->ksnr_ipaddr;
1237                         data->ioc_misc  = route->ksnr_port;
1238                         data->ioc_count = route->ksnr_generation;
1239                         data->ioc_size  = route->ksnr_buffer_size;
1240                         data->ioc_wait  = route->ksnr_sharecount;
1241                         data->ioc_flags = (route->ksnr_nonagel      ? 1 : 0) |
1242                                           (route->ksnr_xchange_nids ? 2 : 0) |
1243                                           (route->ksnr_irq_affinity ? 4 : 0) |
1244                                           (route->ksnr_eager        ? 8 : 0);
1245                         ksocknal_put_route (route);
1246                 }
1247                 break;
1248         }
1249         case NAL_CMD_ADD_AUTOCONN: {
1250                 rc = ksocknal_add_route (data->ioc_nid, data->ioc_id,
1251                                          data->ioc_misc, data->ioc_size,
1252                                          (data->ioc_flags & 0x01) != 0,
1253                                          (data->ioc_flags & 0x02) != 0,
1254                                          (data->ioc_flags & 0x04) != 0,
1255                                          (data->ioc_flags & 0x08) != 0,
1256                                          (data->ioc_flags & 0x10) != 0);
1257                 break;
1258         }
1259         case NAL_CMD_DEL_AUTOCONN: {
1260                 rc = ksocknal_del_route (data->ioc_nid, data->ioc_id, 
1261                                          (data->ioc_flags & 1) != 0,
1262                                          (data->ioc_flags & 2) != 0);
1263                 break;
1264         }
1265         case NAL_CMD_GET_CONN: {
1266                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (data->ioc_count);
1267
1268                 if (conn == NULL)
1269                         rc = -ENOENT;
1270                 else {
1271                         rc = 0;
1272                         data->ioc_nid  = conn->ksnc_peer->ksnp_nid;
1273                         data->ioc_id   = conn->ksnc_ipaddr;
1274                         data->ioc_misc = conn->ksnc_port;
1275                         ksocknal_put_conn (conn);
1276                 }
1277                 break;
1278         }
1279         case NAL_CMD_REGISTER_PEER_FD: {
1280                 struct socket *sock = sockfd_lookup (data->ioc_fd, &rc);
1281
1282                 if (sock != NULL) {
1283                         rc = ksocknal_create_conn (data->ioc_nid, NULL,
1284                                                    sock, data->ioc_flags);
1285                         if (rc != 0)
1286                                 fput (sock->file);
1287                 }
1288                 break;
1289         }
1290         case NAL_CMD_CLOSE_CONNECTION: {
1291                 rc = ksocknal_close_conn (data->ioc_nid, data->ioc_id);
1292                 break;
1293         }
1294         case NAL_CMD_REGISTER_MYNID: {
1295                 rc = ksocknal_set_mynid (data->ioc_nid);
1296                 break;
1297         }
1298         case NAL_CMD_PUSH_CONNECTION: {
1299                 rc = ksocknal_push (data->ioc_nid);
1300                 break;
1301         }
1302         }
1303
1304         return rc;
1305 }
1306
1307 void
1308 ksocknal_free_buffers (void)
1309 {
1310         if (ksocknal_data.ksnd_fmbs != NULL) {
1311                 ksock_fmb_t *fmb = (ksock_fmb_t *)ksocknal_data.ksnd_fmbs;
1312                 int          i;
1313                 int          j;
1314
1315                 for (i = 0;
1316                      i < (SOCKNAL_SMALL_FWD_NMSGS + SOCKNAL_LARGE_FWD_NMSGS);
1317                      i++, fmb++)
1318                         for (j = 0; j < fmb->fmb_npages; j++)
1319                                 if (fmb->fmb_pages[j] != NULL)
1320                                         __free_page (fmb->fmb_pages[j]);
1321
1322                 PORTAL_FREE (ksocknal_data.ksnd_fmbs,
1323                              sizeof (ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1324                                                      SOCKNAL_LARGE_FWD_NMSGS));
1325         }
1326
1327         LASSERT (ksocknal_data.ksnd_active_ltxs == 0);
1328         if (ksocknal_data.ksnd_ltxs != NULL)
1329                 PORTAL_FREE (ksocknal_data.ksnd_ltxs,
1330                              sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS +
1331                                                      SOCKNAL_NNBLK_LTXS));
1332
1333         if (ksocknal_data.ksnd_schedulers != NULL)
1334                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1335                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1336
1337         PORTAL_FREE (ksocknal_data.ksnd_peers,
1338                      sizeof (struct list_head) * 
1339                      ksocknal_data.ksnd_peer_hash_size);
1340 }
1341
1342 void /*__exit*/
1343 ksocknal_module_fini (void)
1344 {
1345         int   i;
1346
1347         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1348                atomic_read (&portal_kmemory));
1349
1350         switch (ksocknal_data.ksnd_init) {
1351         default:
1352                 LASSERT (0);
1353
1354         case SOCKNAL_INIT_ALL:
1355 #if CONFIG_SYSCTL
1356                 if (ksocknal_data.ksnd_sysctl != NULL)
1357                         unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1358 #endif
1359                 kportal_nal_unregister(SOCKNAL);
1360                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1361                 /* fall through */
1362
1363         case SOCKNAL_INIT_PTL:
1364                 PtlNIFini(ksocknal_ni);
1365                 lib_fini(&ksocknal_lib);
1366                 /* fall through */
1367
1368         case SOCKNAL_INIT_DATA:
1369                 /* Module refcount only gets to zero when all peers
1370                  * have been closed so all lists must be empty */
1371                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1372                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1373                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1374                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1375                 }
1376                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1377                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1378                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1379                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1380
1381                 if (ksocknal_data.ksnd_schedulers != NULL)
1382                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1383                                 ksock_sched_t *kss =
1384                                         &ksocknal_data.ksnd_schedulers[i];
1385
1386                                 LASSERT (list_empty (&kss->kss_tx_conns));
1387                                 LASSERT (list_empty (&kss->kss_rx_conns));
1388                                 LASSERT (kss->kss_nconns == 0);
1389                         }
1390
1391                 /* stop router calling me */
1392                 kpr_shutdown (&ksocknal_data.ksnd_router);
1393
1394                 /* flag threads to terminate; wake and wait for them to die */
1395                 ksocknal_data.ksnd_shuttingdown = 1;
1396                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1397                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1398
1399                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1400                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1401
1402                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1403                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1404                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1405                         set_current_state (TASK_UNINTERRUPTIBLE);
1406                         schedule_timeout (HZ);
1407                 }
1408
1409                 kpr_deregister (&ksocknal_data.ksnd_router);
1410
1411                 ksocknal_free_buffers();
1412                 /* fall through */
1413
1414         case SOCKNAL_INIT_NOTHING:
1415                 break;
1416         }
1417
1418         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1419                atomic_read (&portal_kmemory));
1420
1421         printk(KERN_INFO "Routing socket NAL unloaded (final mem %d)\n",
1422                atomic_read(&portal_kmemory));
1423 }
1424
1425
1426 int __init
1427 ksocknal_module_init (void)
1428 {
1429         int   pkmem = atomic_read(&portal_kmemory);
1430         int   rc;
1431         int   i;
1432         int   j;
1433
1434         /* packet descriptor must fit in a router descriptor's scratchpad */
1435         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1436         /* the following must be sizeof(int) for proc_dointvec() */
1437         LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1438         LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1439
1440         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1441
1442         ksocknal_api.forward  = ksocknal_api_forward;
1443         ksocknal_api.shutdown = ksocknal_api_shutdown;
1444         ksocknal_api.yield    = ksocknal_api_yield;
1445         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
1446         ksocknal_api.lock     = ksocknal_api_lock;
1447         ksocknal_api.unlock   = ksocknal_api_unlock;
1448         ksocknal_api.nal_data = &ksocknal_data;
1449
1450         ksocknal_lib.nal_data = &ksocknal_data;
1451
1452         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1453
1454         ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1455         ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1456 #if SOCKNAL_ZC
1457         ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1458 #endif
1459
1460         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1461         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1462                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1463         if (ksocknal_data.ksnd_peers == NULL)
1464                 RETURN (-ENOMEM);
1465
1466         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1467                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1468
1469         rwlock_init(&ksocknal_data.ksnd_global_lock);
1470
1471         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1472         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1473
1474         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1475         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1476         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1477
1478         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1479         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1480         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1481
1482         spin_lock_init(&ksocknal_data.ksnd_idle_ltx_lock);
1483         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_nblk_ltx_list);
1484         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_ltx_list);
1485         init_waitqueue_head(&ksocknal_data.ksnd_idle_ltx_waitq);
1486
1487         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1488         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1489         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1490         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1491
1492         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1493         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1494         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1495
1496         /* NB memset above zeros whole of ksocknal_data, including
1497          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1498
1499         /* flag lists/ptrs/locks initialised */
1500         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1501
1502         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1503                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1504         if (ksocknal_data.ksnd_schedulers == NULL) {
1505                 ksocknal_module_fini ();
1506                 RETURN(-ENOMEM);
1507         }
1508
1509         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1510                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1511
1512                 spin_lock_init (&kss->kss_lock);
1513                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1514                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1515 #if SOCKNAL_ZC
1516                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1517 #endif
1518                 init_waitqueue_head (&kss->kss_waitq);
1519         }
1520
1521         CDEBUG (D_MALLOC, "ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
1522                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
1523
1524         PORTAL_ALLOC(ksocknal_data.ksnd_ltxs,
1525                      sizeof(ksock_ltx_t) * (SOCKNAL_NLTXS +SOCKNAL_NNBLK_LTXS));
1526         if (ksocknal_data.ksnd_ltxs == NULL) {
1527                 ksocknal_module_fini ();
1528                 return (-ENOMEM);
1529         }
1530
1531         /* Deterministic bugs please */
1532         memset (ksocknal_data.ksnd_ltxs, 0xeb,
1533                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
1534
1535         for (i = 0; i < SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS; i++) {
1536                 ksock_ltx_t *ltx = &((ksock_ltx_t *)ksocknal_data.ksnd_ltxs)[i];
1537
1538                 ltx->ltx_tx.tx_hdr = &ltx->ltx_hdr;
1539                 ltx->ltx_idle = i < SOCKNAL_NLTXS ?
1540                                 &ksocknal_data.ksnd_idle_ltx_list :
1541                                 &ksocknal_data.ksnd_idle_nblk_ltx_list;
1542                 list_add (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
1543         }
1544
1545         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1546         if (rc != 0) {
1547                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1548                 ksocknal_module_fini ();
1549                 RETURN (rc);
1550         }
1551         PtlNIDebug(ksocknal_ni, ~0);
1552
1553         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1554
1555         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1556                 rc = ksocknal_thread_start (ksocknal_scheduler,
1557                                             &ksocknal_data.ksnd_schedulers[i]);
1558                 if (rc != 0) {
1559                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1560                                i, rc);
1561                         ksocknal_module_fini ();
1562                         RETURN (rc);
1563                 }
1564         }
1565
1566         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1567                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1568                 if (rc != 0) {
1569                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1570                         ksocknal_module_fini ();
1571                         RETURN (rc);
1572                 }
1573         }
1574
1575         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1576         if (rc != 0) {
1577                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1578                 ksocknal_module_fini ();
1579                 RETURN (rc);
1580         }
1581
1582         rc = kpr_register(&ksocknal_data.ksnd_router,
1583                           &ksocknal_router_interface);
1584         if (rc != 0) {
1585                 CDEBUG(D_NET, "Can't initialise routing interface "
1586                        "(rc = %d): not routing\n", rc);
1587         } else {
1588                 /* Only allocate forwarding buffers if I'm on a gateway */
1589
1590                 PORTAL_ALLOC(ksocknal_data.ksnd_fmbs,
1591                              sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1592                                                     SOCKNAL_LARGE_FWD_NMSGS));
1593                 if (ksocknal_data.ksnd_fmbs == NULL) {
1594                         ksocknal_module_fini ();
1595                         RETURN(-ENOMEM);
1596                 }
1597
1598                 /* NULL out buffer pointers etc */
1599                 memset(ksocknal_data.ksnd_fmbs, 0,
1600                        sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1601                                               SOCKNAL_LARGE_FWD_NMSGS));
1602
1603                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1604                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1605                         ksock_fmb_t *fmb =
1606                                 &((ksock_fmb_t *)ksocknal_data.ksnd_fmbs)[i];
1607
1608                         if (i < SOCKNAL_SMALL_FWD_NMSGS) {
1609                                 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
1610                                 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
1611                         } else {
1612                                 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
1613                                 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
1614                         }
1615
1616                         LASSERT (fmb->fmb_npages > 0);
1617                         for (j = 0; j < fmb->fmb_npages; j++) {
1618                                 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
1619
1620                                 if (fmb->fmb_pages[j] == NULL) {
1621                                         ksocknal_module_fini ();
1622                                         return (-ENOMEM);
1623                                 }
1624
1625                                 LASSERT(page_address (fmb->fmb_pages[j]) !=
1626                                         NULL);
1627                         }
1628
1629                         list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
1630                 }
1631         }
1632
1633         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1634         if (rc != 0) {
1635                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1636                 ksocknal_module_fini ();
1637                 return (rc);
1638         }
1639
1640         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1641
1642 #ifdef CONFIG_SYSCTL
1643         /* Press on regardless even if registering sysctl doesn't work */
1644         ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1645 #endif
1646         /* flag everything initialised */
1647         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1648         
1649         printk(KERN_INFO "Routing socket NAL loaded (Routing %s, initial "
1650                "mem %d)\n",
1651                kpr_routing (&ksocknal_data.ksnd_router) ?
1652                "enabled" : "disabled", pkmem);
1653
1654         return (0);
1655 }
1656
1657 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1658 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1659 MODULE_LICENSE("GPL");
1660
1661 module_init(ksocknal_module_init);
1662 module_exit(ksocknal_module_fini);
1663
1664 EXPORT_SYMBOL (ksocknal_ni);