Whamcloud - gitweb
Always disable nagle in the socknal, even for 0conf setups (from b1_0).
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40         kprni_notify:     ksocknal_notify,
41 };
42
43 #define SOCKNAL_SYSCTL  200
44
45 #define SOCKNAL_SYSCTL_TIMEOUT     1
46 #define SOCKNAL_SYSCTL_EAGER_ACK   2
47 #define SOCKNAL_SYSCTL_ZERO_COPY   3
48 #define SOCKNAL_SYSCTL_TYPED       4
49 #define SOCKNAL_SYSCTL_MIN_BULK    5
50
51 static ctl_table ksocknal_ctl_table[] = {
52         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
53          &ksocknal_data.ksnd_io_timeout, sizeof (int),
54          0644, NULL, &proc_dointvec},
55         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
56          &ksocknal_data.ksnd_eager_ack, sizeof (int),
57          0644, NULL, &proc_dointvec},
58 #if SOCKNAL_ZC
59         {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", 
60          &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
61          0644, NULL, &proc_dointvec},
62 #endif
63         {SOCKNAL_SYSCTL_TYPED, "typed", 
64          &ksocknal_data.ksnd_typed_conns, sizeof (int),
65          0644, NULL, &proc_dointvec},
66         {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk", 
67          &ksocknal_data.ksnd_min_bulk, sizeof (int),
68          0644, NULL, &proc_dointvec},
69         { 0 }
70 };
71
72 static ctl_table ksocknal_top_ctl_table[] = {
73         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
74         { 0 }
75 };
76
77 int
78 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
79                        void *ret, size_t ret_len)
80 {
81         ksock_nal_data_t *k;
82         nal_cb_t *nal_cb;
83
84         k = nal->nal_data;
85         nal_cb = k->ksnd_nal_cb;
86
87         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
88         return PTL_OK;
89 }
90
91 int
92 ksocknal_api_shutdown(nal_t *nal, int ni)
93 {
94         CDEBUG (D_NET, "closing all connections\n");
95
96         ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
97         ksocknal_close_matching_conns (PTL_NID_ANY, 0);
98         return PTL_OK;
99 }
100
101 void
102 ksocknal_api_yield(nal_t *nal)
103 {
104         our_cond_resched();
105         return;
106 }
107
108 void
109 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
110 {
111         ksock_nal_data_t *k;
112         nal_cb_t *nal_cb;
113
114         k = nal->nal_data;
115         nal_cb = k->ksnd_nal_cb;
116         nal_cb->cb_cli(nal_cb,flags);
117 }
118
119 void
120 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
121 {
122         ksock_nal_data_t *k;
123         nal_cb_t *nal_cb;
124
125         k = nal->nal_data;
126         nal_cb = k->ksnd_nal_cb;
127         nal_cb->cb_sti(nal_cb,flags);
128 }
129
130 nal_t *
131 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
132               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
133 {
134         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
135         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
136         return (&ksocknal_api);
137 }
138
139 /*
140  *  EXTRA functions follow
141  */
142
143 int
144 ksocknal_set_mynid(ptl_nid_t nid)
145 {
146         lib_ni_t *ni = &ksocknal_lib.ni;
147
148         /* FIXME: we have to do this because we call lib_init() at module
149          * insertion time, which is before we have 'mynid' available.  lib_init
150          * sets the NAL's nid, which it uses to tell other nodes where packets
151          * are coming from.  This is not a very graceful solution to this
152          * problem. */
153
154         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
155                nid, ni->nid);
156
157         ni->nid = nid;
158         return (0);
159 }
160
161 void
162 ksocknal_bind_irq (unsigned int irq)
163 {
164 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
165         int              bind;
166         unsigned long    flags;
167         char             cmdline[64];
168         ksock_irqinfo_t *info;
169         char            *argv[] = {"/bin/sh",
170                                    "-c",
171                                    cmdline,
172                                    NULL};
173         char            *envp[] = {"HOME=/",
174                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
175                                    NULL};
176
177         LASSERT (irq < NR_IRQS);
178         if (irq == 0)                           /* software NIC */
179                 return;
180
181         info = &ksocknal_data.ksnd_irqinfo[irq];
182
183         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
184
185         LASSERT (info->ksni_valid);
186         bind = !info->ksni_bound;
187         info->ksni_bound = 1;
188
189         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
190
191         if (!bind)                              /* bound already */
192                 return;
193
194         snprintf (cmdline, sizeof (cmdline),
195                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
196
197         printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
198                 irq, info->ksni_sched, cmdline);
199
200         /* FIXME: Find a better method of setting IRQ affinity...
201          */
202
203         call_usermodehelper (argv[0], argv, envp);
204 #endif
205 }
206
207 ksock_route_t *
208 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
209                        int irq_affinity, int eager)
210 {
211         ksock_route_t *route;
212
213         PORTAL_ALLOC (route, sizeof (*route));
214         if (route == NULL)
215                 return (NULL);
216
217         atomic_set (&route->ksnr_refcount, 1);
218         route->ksnr_sharecount = 0;
219         route->ksnr_peer = NULL;
220         route->ksnr_timeout = jiffies;
221         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
222         route->ksnr_ipaddr = ipaddr;
223         route->ksnr_port = port;
224         route->ksnr_buffer_size = buffer_size;
225         route->ksnr_irq_affinity = irq_affinity;
226         route->ksnr_eager = eager;
227         route->ksnr_connecting = 0;
228         route->ksnr_connected = 0;
229         route->ksnr_deleted = 0;
230         route->ksnr_conn_count = 0;
231
232         return (route);
233 }
234
235 void
236 ksocknal_destroy_route (ksock_route_t *route)
237 {
238         LASSERT (route->ksnr_sharecount == 0);
239
240         if (route->ksnr_peer != NULL)
241                 ksocknal_put_peer (route->ksnr_peer);
242
243         PORTAL_FREE (route, sizeof (*route));
244 }
245
246 void
247 ksocknal_put_route (ksock_route_t *route)
248 {
249         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
250                 route, atomic_read (&route->ksnr_refcount));
251
252         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
253         if (!atomic_dec_and_test (&route->ksnr_refcount))
254              return;
255
256         ksocknal_destroy_route (route);
257 }
258
259 ksock_peer_t *
260 ksocknal_create_peer (ptl_nid_t nid)
261 {
262         ksock_peer_t *peer;
263
264         LASSERT (nid != PTL_NID_ANY);
265
266         PORTAL_ALLOC (peer, sizeof (*peer));
267         if (peer == NULL)
268                 return (NULL);
269
270         memset (peer, 0, sizeof (*peer));
271
272         peer->ksnp_nid = nid;
273         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
274         peer->ksnp_closing = 0;
275         INIT_LIST_HEAD (&peer->ksnp_conns);
276         INIT_LIST_HEAD (&peer->ksnp_routes);
277         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
278
279         /* Can't unload while peers exist; ensures all I/O has terminated
280          * before unload attempts */
281         PORTAL_MODULE_USE;
282         atomic_inc (&ksocknal_data.ksnd_npeers);
283         return (peer);
284 }
285
286 void
287 ksocknal_destroy_peer (ksock_peer_t *peer)
288 {
289         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
290
291         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
292         LASSERT (list_empty (&peer->ksnp_conns));
293         LASSERT (list_empty (&peer->ksnp_routes));
294         LASSERT (list_empty (&peer->ksnp_tx_queue));
295
296         PORTAL_FREE (peer, sizeof (*peer));
297
298         /* NB a peer's connections and autoconnect routes keep a reference
299          * on their peer until they are destroyed, so we can be assured
300          * that _all_ state to do with this peer has been cleaned up when
301          * its refcount drops to zero. */
302         atomic_dec (&ksocknal_data.ksnd_npeers);
303         PORTAL_MODULE_UNUSE;
304 }
305
306 void
307 ksocknal_put_peer (ksock_peer_t *peer)
308 {
309         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
310                 peer, peer->ksnp_nid,
311                 atomic_read (&peer->ksnp_refcount));
312
313         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
314         if (!atomic_dec_and_test (&peer->ksnp_refcount))
315                 return;
316
317         ksocknal_destroy_peer (peer);
318 }
319
320 ksock_peer_t *
321 ksocknal_find_peer_locked (ptl_nid_t nid)
322 {
323         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
324         struct list_head *tmp;
325         ksock_peer_t     *peer;
326
327         list_for_each (tmp, peer_list) {
328
329                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
330
331                 LASSERT (!peer->ksnp_closing);
332                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
333                            list_empty (&peer->ksnp_conns)));
334
335                 if (peer->ksnp_nid != nid)
336                         continue;
337
338                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
339                        peer, nid, atomic_read (&peer->ksnp_refcount));
340                 return (peer);
341         }
342         return (NULL);
343 }
344
345 ksock_peer_t *
346 ksocknal_get_peer (ptl_nid_t nid)
347 {
348         ksock_peer_t     *peer;
349
350         read_lock (&ksocknal_data.ksnd_global_lock);
351         peer = ksocknal_find_peer_locked (nid);
352         if (peer != NULL)                       /* +1 ref for caller? */
353                 atomic_inc (&peer->ksnp_refcount);
354         read_unlock (&ksocknal_data.ksnd_global_lock);
355
356         return (peer);
357 }
358
359 void
360 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
361 {
362         LASSERT (!peer->ksnp_closing);
363         peer->ksnp_closing = 1;
364         list_del (&peer->ksnp_list);
365         /* lose peerlist's ref */
366         ksocknal_put_peer (peer);
367 }
368
369 ksock_route_t *
370 ksocknal_get_route_by_idx (int index)
371 {
372         ksock_peer_t      *peer;
373         struct list_head  *ptmp;
374         ksock_route_t     *route;
375         struct list_head  *rtmp;
376         int                i;
377
378         read_lock (&ksocknal_data.ksnd_global_lock);
379
380         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
381                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
382                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
383
384                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
385                                    list_empty (&peer->ksnp_conns)));
386
387                         list_for_each (rtmp, &peer->ksnp_routes) {
388                                 if (index-- > 0)
389                                         continue;
390
391                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
392                                 atomic_inc (&route->ksnr_refcount);
393                                 read_unlock (&ksocknal_data.ksnd_global_lock);
394                                 return (route);
395                         }
396                 }
397         }
398
399         read_unlock (&ksocknal_data.ksnd_global_lock);
400         return (NULL);
401 }
402
403 int
404 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
405                     int bind_irq, int share, int eager)
406 {
407         unsigned long      flags;
408         ksock_peer_t      *peer;
409         ksock_peer_t      *peer2;
410         ksock_route_t     *route;
411         struct list_head  *rtmp;
412         ksock_route_t     *route2;
413         
414         if (nid == PTL_NID_ANY)
415                 return (-EINVAL);
416
417         /* Have a brand new peer ready... */
418         peer = ksocknal_create_peer (nid);
419         if (peer == NULL)
420                 return (-ENOMEM);
421
422         route = ksocknal_create_route (ipaddr, port, bufnob, 
423                                        bind_irq, eager);
424         if (route == NULL) {
425                 ksocknal_put_peer (peer);
426                 return (-ENOMEM);
427         }
428
429         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
430
431         peer2 = ksocknal_find_peer_locked (nid);
432         if (peer2 != NULL) {
433                 ksocknal_put_peer (peer);
434                 peer = peer2;
435         } else {
436                 /* peer table takes existing ref on peer */
437                 list_add (&peer->ksnp_list,
438                           ksocknal_nid2peerlist (nid));
439         }
440
441         route2 = NULL;
442         if (share) {
443                 /* check for existing route to this NID via this ipaddr */
444                 list_for_each (rtmp, &peer->ksnp_routes) {
445                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
446                         
447                         if (route2->ksnr_ipaddr == ipaddr)
448                                 break;
449
450                         route2 = NULL;
451                 }
452         }
453
454         if (route2 != NULL) {
455                 ksocknal_put_route (route);
456                 route = route2;
457         } else {
458                 /* route takes a ref on peer */
459                 route->ksnr_peer = peer;
460                 atomic_inc (&peer->ksnp_refcount);
461                 /* peer's route list takes existing ref on route */
462                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
463         }
464         
465         route->ksnr_sharecount++;
466
467         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
468
469         return (0);
470 }
471
472 void
473 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
474 {
475         ksock_peer_t     *peer = route->ksnr_peer;
476         ksock_conn_t     *conn;
477         struct list_head *ctmp;
478         struct list_head *cnxt;
479
480         if (!share)
481                 route->ksnr_sharecount = 0;
482         else {
483                 route->ksnr_sharecount--;
484                 if (route->ksnr_sharecount != 0)
485                         return;
486         }
487
488         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
489                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
490
491                 if (conn->ksnc_route != route)
492                         continue;
493                 
494                 if (!keep_conn) {
495                         ksocknal_close_conn_locked (conn, 0);
496                         continue;
497                 }
498                 
499                 /* keeping the conn; just dissociate it and route... */
500                 conn->ksnc_route = NULL;
501                 ksocknal_put_route (route); /* drop conn's ref on route */
502         }
503         
504         route->ksnr_deleted = 1;
505         list_del (&route->ksnr_list);
506         ksocknal_put_route (route);             /* drop peer's ref */
507
508         if (list_empty (&peer->ksnp_routes) &&
509             list_empty (&peer->ksnp_conns)) {
510                 /* I've just removed the last autoconnect route of a peer
511                  * with no active connections */
512                 ksocknal_unlink_peer_locked (peer);
513         }
514 }
515
516 int
517 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
518 {
519         unsigned long      flags;
520         struct list_head  *ptmp;
521         struct list_head  *pnxt;
522         ksock_peer_t      *peer;
523         struct list_head  *rtmp;
524         struct list_head  *rnxt;
525         ksock_route_t     *route;
526         int                lo;
527         int                hi;
528         int                i;
529         int                rc = -ENOENT;
530
531         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
532
533         if (nid != PTL_NID_ANY)
534                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
535         else {
536                 lo = 0;
537                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
538         }
539
540         for (i = lo; i <= hi; i++) {
541                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
542                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
543
544                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
545                                 continue;
546
547                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
548                                 route = list_entry (rtmp, ksock_route_t,
549                                                     ksnr_list);
550
551                                 if (!(ipaddr == 0 ||
552                                       route->ksnr_ipaddr == ipaddr))
553                                         continue;
554
555                                 ksocknal_del_route_locked (route, share, keep_conn);
556                                 rc = 0;         /* matched something */
557                                 if (share)
558                                         goto out;
559                         }
560                 }
561         }
562  out:
563         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
564
565         return (rc);
566 }
567
568 ksock_conn_t *
569 ksocknal_get_conn_by_idx (int index)
570 {
571         ksock_peer_t      *peer;
572         struct list_head  *ptmp;
573         ksock_conn_t      *conn;
574         struct list_head  *ctmp;
575         int                i;
576
577         read_lock (&ksocknal_data.ksnd_global_lock);
578
579         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
580                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
581                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
582
583                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
584                                    list_empty (&peer->ksnp_conns)));
585
586                         list_for_each (ctmp, &peer->ksnp_conns) {
587                                 if (index-- > 0)
588                                         continue;
589
590                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
591                                 atomic_inc (&conn->ksnc_refcount);
592                                 read_unlock (&ksocknal_data.ksnd_global_lock);
593                                 return (conn);
594                         }
595                 }
596         }
597
598         read_unlock (&ksocknal_data.ksnd_global_lock);
599         return (NULL);
600 }
601
602 void
603 ksocknal_get_peer_addr (ksock_conn_t *conn)
604 {
605         struct sockaddr_in sin;
606         int                len = sizeof (sin);
607         int                rc;
608         
609         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
610                                             (struct sockaddr *)&sin, &len, 2);
611         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
612         LASSERT (!conn->ksnc_closing);
613         LASSERT (len <= sizeof (sin));
614
615         if (rc != 0) {
616                 CERROR ("Error %d getting sock peer IP\n", rc);
617                 return;
618         }
619
620         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
621         conn->ksnc_port   = ntohs (sin.sin_port);
622 }
623
624 unsigned int
625 ksocknal_conn_irq (ksock_conn_t *conn)
626 {
627         int                irq = 0;
628         struct dst_entry  *dst;
629
630         dst = sk_dst_get (conn->ksnc_sock->sk);
631         if (dst != NULL) {
632                 if (dst->dev != NULL) {
633                         irq = dst->dev->irq;
634                         if (irq >= NR_IRQS) {
635                                 CERROR ("Unexpected IRQ %x\n", irq);
636                                 irq = 0;
637                         }
638                 }
639                 dst_release (dst);
640         }
641         
642         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
643         LASSERT (!conn->ksnc_closing);
644         return (irq);
645 }
646
647 ksock_sched_t *
648 ksocknal_choose_scheduler_locked (unsigned int irq)
649 {
650         ksock_sched_t    *sched;
651         ksock_irqinfo_t  *info;
652         int               i;
653
654         LASSERT (irq < NR_IRQS);
655         info = &ksocknal_data.ksnd_irqinfo[irq];
656
657         if (irq != 0 &&                         /* hardware NIC */
658             info->ksni_valid) {                 /* already set up */
659                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
660         }
661
662         /* software NIC (irq == 0) || not associated with a scheduler yet.
663          * Choose the CPU with the fewest connections... */
664         sched = &ksocknal_data.ksnd_schedulers[0];
665         for (i = 1; i < SOCKNAL_N_SCHED; i++)
666                 if (sched->kss_nconns >
667                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
668                         sched = &ksocknal_data.ksnd_schedulers[i];
669
670         if (irq != 0) {                         /* Hardware NIC */
671                 info->ksni_valid = 1;
672                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
673
674                 /* no overflow... */
675                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
676         }
677
678         return (sched);
679 }
680
681 int
682 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
683                       int bind_irq, int type)
684 {
685         ptl_nid_t          nid;
686         __u64              incarnation;
687         unsigned long      flags;
688         ksock_conn_t      *conn;
689         ksock_peer_t      *peer;
690         ksock_peer_t      *peer2;
691         ksock_sched_t     *sched;
692         unsigned int       irq;
693         ksock_tx_t        *tx;
694         int                rc;
695
696         /* NB, sock has an associated file since (a) this connection might
697          * have been created in userland and (b) we need to refcount the
698          * socket so that we don't close it while I/O is being done on
699          * it, and sock->file has that pre-cooked... */
700         LASSERT (sock->file != NULL);
701         LASSERT (file_count(sock->file) > 0);
702
703         rc = ksocknal_setup_sock (sock);
704         if (rc != 0)
705                 return (rc);
706
707         if (route == NULL) {
708                 /* acceptor or explicit connect */
709                 nid = PTL_NID_ANY;
710         } else {
711                 LASSERT (type != SOCKNAL_CONN_NONE);
712                 /* autoconnect: expect this nid on exchange */
713                 nid = route->ksnr_peer->ksnp_nid;
714         }
715
716         rc = ksocknal_hello (sock, &nid, &type, &incarnation);
717         if (rc != 0)
718                 return (rc);
719         
720         peer = NULL;
721         if (route == NULL) {                    /* not autoconnect */
722                 /* Assume this socket connects to a brand new peer */
723                 peer = ksocknal_create_peer (nid);
724                 if (peer == NULL)
725                         return (-ENOMEM);
726         }
727
728         PORTAL_ALLOC(conn, sizeof(*conn));
729         if (conn == NULL) {
730                 if (peer != NULL)
731                         ksocknal_put_peer (peer);
732                 return (-ENOMEM);
733         }
734
735         memset (conn, 0, sizeof (*conn));
736         conn->ksnc_peer = NULL;
737         conn->ksnc_route = NULL;
738         conn->ksnc_sock = sock;
739         conn->ksnc_type = type;
740         conn->ksnc_incarnation = incarnation;
741         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
742         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
743         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
744
745         conn->ksnc_rx_ready = 0;
746         conn->ksnc_rx_scheduled = 0;
747         ksocknal_new_packet (conn, 0);
748
749         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
750         conn->ksnc_tx_ready = 0;
751         conn->ksnc_tx_scheduled = 0;
752         atomic_set (&conn->ksnc_tx_nob, 0);
753
754         ksocknal_get_peer_addr (conn);
755
756         irq = ksocknal_conn_irq (conn);
757
758         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
759
760         if (route != NULL) {
761                 /* Autoconnected! */
762                 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
763                 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
764
765                 if (route->ksnr_deleted) {
766                         /* This conn was autoconnected, but the autoconnect
767                          * route got deleted while it was being
768                          * established! */
769                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
770                                                  flags);
771                         PORTAL_FREE (conn, sizeof (*conn));
772                         return (-ESTALE);
773                 }
774
775
776                 /* associate conn/route */
777                 conn->ksnc_route = route;
778                 atomic_inc (&route->ksnr_refcount);
779
780                 route->ksnr_connecting &= ~(1 << type);
781                 route->ksnr_connected  |= (1 << type);
782                 route->ksnr_conn_count++;
783                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
784
785                 peer = route->ksnr_peer;
786         } else {
787                 /* Not an autoconnected connection; see if there is an
788                  * existing peer for this NID */
789                 peer2 = ksocknal_find_peer_locked (nid);
790                 if (peer2 != NULL) {
791                         ksocknal_put_peer (peer);
792                         peer = peer2;
793                 } else {
794                         list_add (&peer->ksnp_list,
795                                   ksocknal_nid2peerlist (nid));
796                         /* peer list takes over existing ref */
797                 }
798         }
799
800         LASSERT (!peer->ksnp_closing);
801
802         conn->ksnc_peer = peer;
803         atomic_inc (&peer->ksnp_refcount);
804         peer->ksnp_last_alive = jiffies;
805         peer->ksnp_error = 0;
806
807         list_add (&conn->ksnc_list, &peer->ksnp_conns);
808         atomic_inc (&conn->ksnc_refcount);
809
810         sched = ksocknal_choose_scheduler_locked (irq);
811         sched->kss_nconns++;
812         conn->ksnc_scheduler = sched;
813
814         /* NB my callbacks block while I hold ksnd_global_lock */
815         sock->sk->sk_user_data = conn;
816         sock->sk->sk_data_ready = ksocknal_data_ready;
817         sock->sk->sk_write_space = ksocknal_write_space;
818
819         /* Take all the packets blocking for a connection.
820          * NB, it might be nicer to share these blocked packets among any
821          * other connections that are becoming established, however that
822          * confuses the normal packet launching operation, which selects a
823          * connection and queues the packet on it without needing an
824          * exclusive lock on ksnd_global_lock. */
825         while (!list_empty (&peer->ksnp_tx_queue)) {
826                 tx = list_entry (peer->ksnp_tx_queue.next,
827                                  ksock_tx_t, tx_list);
828
829                 list_del (&tx->tx_list);
830                 ksocknal_queue_tx_locked (tx, conn);
831         }
832
833         rc = ksocknal_close_stale_conns_locked (peer, incarnation);
834
835         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
836
837         if (rc != 0)
838                 CERROR ("Closed %d stale conns to "LPX64"\n", rc, nid);
839
840         if (bind_irq)                           /* irq binding required */
841                 ksocknal_bind_irq (irq);
842
843         /* Call the callbacks right now to get things going. */
844         ksocknal_data_ready (sock->sk, 0);
845         ksocknal_write_space (sock->sk);
846
847         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
848                conn, conn->ksnc_peer->ksnp_nid);
849
850         ksocknal_put_conn (conn);
851         return (0);
852 }
853
854 void
855 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
856 {
857         /* This just does the immmediate housekeeping, and queues the
858          * connection for the reaper to terminate. 
859          * Caller holds ksnd_global_lock exclusively in irq context */
860         ksock_peer_t   *peer = conn->ksnc_peer;
861         ksock_route_t  *route;
862
863         LASSERT (peer->ksnp_error == 0);
864         LASSERT (!conn->ksnc_closing);
865         conn->ksnc_closing = 1;
866         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
867         
868         route = conn->ksnc_route;
869         if (route != NULL) {
870                 /* dissociate conn from route... */
871                 LASSERT (!route->ksnr_deleted);
872                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
873                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
874
875                 route->ksnr_connected &= ~(1 << conn->ksnc_type);
876                 conn->ksnc_route = NULL;
877
878                 list_del (&route->ksnr_list);   /* make route least favourite */
879                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
880                 
881                 ksocknal_put_route (route);     /* drop conn's ref on route */
882         }
883
884         /* ksnd_deathrow_conns takes over peer's ref */
885         list_del (&conn->ksnc_list);
886
887         if (list_empty (&peer->ksnp_conns)) {
888                 /* No more connections to this peer */
889
890                 peer->ksnp_error = error;       /* stash last conn close reason */
891
892                 if (list_empty (&peer->ksnp_routes)) {
893                         /* I've just closed last conn belonging to a
894                          * non-autoconnecting peer */
895                         ksocknal_unlink_peer_locked (peer);
896                 }
897         }
898
899         spin_lock (&ksocknal_data.ksnd_reaper_lock);
900
901         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
902         wake_up (&ksocknal_data.ksnd_reaper_waitq);
903                 
904         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
905 }
906
907 void
908 ksocknal_terminate_conn (ksock_conn_t *conn)
909 {
910         /* This gets called by the reaper (guaranteed thread context) to
911          * disengage the socket from its callbacks and close it.
912          * ksnc_refcount will eventually hit zero, and then the reaper will
913          * destroy it. */
914         unsigned long   flags;
915         ksock_peer_t   *peer = conn->ksnc_peer;
916         ksock_sched_t  *sched = conn->ksnc_scheduler;
917         struct timeval  now;
918         time_t          then = 0;
919         int             notify = 0;
920
921         LASSERT(conn->ksnc_closing);
922
923         /* wake up the scheduler to "send" all remaining packets to /dev/null */
924         spin_lock_irqsave(&sched->kss_lock, flags);
925
926         if (!conn->ksnc_tx_scheduled &&
927             !list_empty(&conn->ksnc_tx_queue)){
928                 list_add_tail (&conn->ksnc_tx_list,
929                                &sched->kss_tx_conns);
930                 /* a closing conn is always ready to tx */
931                 conn->ksnc_tx_ready = 1;
932                 conn->ksnc_tx_scheduled = 1;
933                 /* extra ref for scheduler */
934                 atomic_inc (&conn->ksnc_refcount);
935
936                 wake_up (&sched->kss_waitq);
937         }
938
939         spin_unlock_irqrestore (&sched->kss_lock, flags);
940
941         /* serialise with callbacks */
942         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
943
944         /* Remove conn's network callbacks.
945          * NB I _have_ to restore the callback, rather than storing a noop,
946          * since the socket could survive past this module being unloaded!! */
947         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
948         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
949
950         /* A callback could be in progress already; they hold a read lock
951          * on ksnd_global_lock (to serialise with me) and NOOP if
952          * sk_user_data is NULL. */
953         conn->ksnc_sock->sk->sk_user_data = NULL;
954
955         /* OK, so this conn may not be completely disengaged from its
956          * scheduler yet, but it _has_ committed to terminate... */
957         conn->ksnc_scheduler->kss_nconns--;
958
959         if (peer->ksnp_error != 0) {
960                 /* peer's last conn closed in error */
961                 LASSERT (list_empty (&peer->ksnp_conns));
962                 
963                 /* convert peer's last-known-alive timestamp from jiffies */
964                 do_gettimeofday (&now);
965                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
966                 notify = 1;
967         }
968         
969         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
970
971         /* The socket is closed on the final put; either here, or in
972          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
973          * when the connection was established, this will close the socket
974          * immediately, aborting anything buffered in it. Any hung
975          * zero-copy transmits will therefore complete in finite time. */
976         ksocknal_putconnsock (conn);
977
978         if (notify)
979                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
980                             0, then);
981 }
982
983 void
984 ksocknal_destroy_conn (ksock_conn_t *conn)
985 {
986         /* Final coup-de-grace of the reaper */
987         CDEBUG (D_NET, "connection %p\n", conn);
988
989         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
990         LASSERT (conn->ksnc_route == NULL);
991         LASSERT (!conn->ksnc_tx_scheduled);
992         LASSERT (!conn->ksnc_rx_scheduled);
993         LASSERT (list_empty(&conn->ksnc_tx_queue));
994
995         /* complete current receive if any */
996         switch (conn->ksnc_rx_state) {
997         case SOCKNAL_RX_BODY:
998 #if 0
999                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
1000 #else
1001                 CERROR ("Refusing to complete a partial receive from "
1002                         LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid,
1003                         conn->ksnc_ipaddr);
1004                 CERROR ("This may hang communications and "
1005                         "prevent modules from unloading\n");
1006 #endif
1007                 break;
1008         case SOCKNAL_RX_BODY_FWD:
1009                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1010                 break;
1011         case SOCKNAL_RX_HEADER:
1012         case SOCKNAL_RX_SLOP:
1013                 break;
1014         default:
1015                 LBUG ();
1016                 break;
1017         }
1018
1019         ksocknal_put_peer (conn->ksnc_peer);
1020
1021         PORTAL_FREE (conn, sizeof (*conn));
1022         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1023 }
1024
1025 void
1026 ksocknal_put_conn (ksock_conn_t *conn)
1027 {
1028         unsigned long flags;
1029
1030         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1031                 conn, conn->ksnc_peer->ksnp_nid,
1032                 atomic_read (&conn->ksnc_refcount));
1033
1034         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1035         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1036                 return;
1037
1038         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1039
1040         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1041         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1042
1043         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1044 }
1045
1046 int
1047 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1048 {
1049         ksock_conn_t       *conn;
1050         struct list_head   *ctmp;
1051         struct list_head   *cnxt;
1052         int                 count = 0;
1053
1054         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1055                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1056
1057                 if (ipaddr == 0 ||
1058                     conn->ksnc_ipaddr == ipaddr) {
1059                         count++;
1060                         ksocknal_close_conn_locked (conn, why);
1061                 }
1062         }
1063
1064         return (count);
1065 }
1066
1067 int
1068 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1069 {
1070         ksock_conn_t       *conn;
1071         struct list_head   *ctmp;
1072         struct list_head   *cnxt;
1073         int                 count = 0;
1074
1075         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1076                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1077
1078                 if (conn->ksnc_incarnation == incarnation)
1079                         continue;
1080                 
1081                 count++;
1082                 ksocknal_close_conn_locked (conn, -ESTALE);
1083         }
1084
1085         return (count);
1086 }
1087
1088 int
1089 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why) 
1090 {
1091         ksock_peer_t     *peer = conn->ksnc_peer;
1092         __u32             ipaddr = conn->ksnc_ipaddr;
1093         unsigned long     flags;
1094         int               count;
1095
1096         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1097
1098         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1099         
1100         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1101
1102         return (count);
1103 }
1104
1105 int
1106 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1107 {
1108         unsigned long       flags;
1109         ksock_peer_t       *peer;
1110         struct list_head   *ptmp;
1111         struct list_head   *pnxt;
1112         int                 lo;
1113         int                 hi;
1114         int                 i;
1115         int                 count = 0;
1116
1117         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1118
1119         if (nid != PTL_NID_ANY)
1120                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1121         else {
1122                 lo = 0;
1123                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1124         }
1125
1126         for (i = lo; i <= hi; i++) {
1127                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1128
1129                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1130
1131                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1132                                 continue;
1133
1134                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1135                 }
1136         }
1137
1138         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1139
1140         /* wildcards always succeed */
1141         if (nid == PTL_NID_ANY || ipaddr == 0)
1142                 return (0);
1143         
1144         return (count == 0 ? -ENOENT : 0);
1145 }
1146
1147 void
1148 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1149 {
1150         /* The router is telling me she's been notified of a change in
1151          * gateway state.... */
1152
1153         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1154
1155         if (!alive) {
1156                 /* If the gateway crashed, close all open connections... */
1157                 ksocknal_close_matching_conns (gw_nid, 0);
1158                 return;
1159         }
1160         
1161         /* ...otherwise do nothing.  We can only establish new connections
1162          * if we have autroutes, and these connect on demand. */
1163 }
1164
1165 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1166 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1167 {
1168         return &(sk->tp_pinfo.af_tcp);
1169 }
1170 #else
1171 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1172 {
1173         struct tcp_sock *s = (struct tcp_sock *)sk;
1174         return &s->tcp;
1175 }
1176 #endif
1177
1178 void
1179 ksocknal_push_conn (ksock_conn_t *conn)
1180 {
1181         struct sock    *sk;
1182         struct tcp_opt *tp;
1183         int             nonagle;
1184         int             val = 1;
1185         int             rc;
1186         mm_segment_t    oldmm;
1187
1188         rc = ksocknal_getconnsock (conn);
1189         if (rc != 0)                            /* being shut down */
1190                 return;
1191         
1192         sk = conn->ksnc_sock->sk;
1193         tp = sock2tcp_opt(sk);
1194         
1195         lock_sock (sk);
1196         nonagle = tp->nonagle;
1197         tp->nonagle = 1;
1198         release_sock (sk);
1199
1200         oldmm = get_fs ();
1201         set_fs (KERNEL_DS);
1202
1203         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1204                                       (char *)&val, sizeof (val));
1205         LASSERT (rc == 0);
1206
1207         set_fs (oldmm);
1208
1209         lock_sock (sk);
1210         tp->nonagle = nonagle;
1211         release_sock (sk);
1212
1213         ksocknal_putconnsock (conn);
1214 }
1215
1216 void
1217 ksocknal_push_peer (ksock_peer_t *peer)
1218 {
1219         int               index;
1220         int               i;
1221         struct list_head *tmp;
1222         ksock_conn_t     *conn;
1223
1224         for (index = 0; ; index++) {
1225                 read_lock (&ksocknal_data.ksnd_global_lock);
1226
1227                 i = 0;
1228                 conn = NULL;
1229
1230                 list_for_each (tmp, &peer->ksnp_conns) {
1231                         if (i++ == index) {
1232                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1233                                 atomic_inc (&conn->ksnc_refcount);
1234                                 break;
1235                         }
1236                 }
1237
1238                 read_unlock (&ksocknal_data.ksnd_global_lock);
1239
1240                 if (conn == NULL)
1241                         break;
1242
1243                 ksocknal_push_conn (conn);
1244                 ksocknal_put_conn (conn);
1245         }
1246 }
1247
1248 int
1249 ksocknal_push (ptl_nid_t nid)
1250 {
1251         ksock_peer_t      *peer;
1252         struct list_head  *tmp;
1253         int                index;
1254         int                i;
1255         int                j;
1256         int                rc = -ENOENT;
1257
1258         if (nid != PTL_NID_ANY) {
1259                 peer = ksocknal_get_peer (nid);
1260
1261                 if (peer != NULL) {
1262                         rc = 0;
1263                         ksocknal_push_peer (peer);
1264                         ksocknal_put_peer (peer);
1265                 }
1266                 return (rc);
1267         }
1268
1269         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1270                 for (j = 0; ; j++) {
1271                         read_lock (&ksocknal_data.ksnd_global_lock);
1272
1273                         index = 0;
1274                         peer = NULL;
1275
1276                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1277                                 if (index++ == j) {
1278                                         peer = list_entry(tmp, ksock_peer_t,
1279                                                           ksnp_list);
1280                                         atomic_inc (&peer->ksnp_refcount);
1281                                         break;
1282                                 }
1283                         }
1284
1285                         read_unlock (&ksocknal_data.ksnd_global_lock);
1286
1287                         if (peer != NULL) {
1288                                 rc = 0;
1289                                 ksocknal_push_peer (peer);
1290                                 ksocknal_put_peer (peer);
1291                         }
1292                 }
1293
1294         }
1295
1296         return (rc);
1297 }
1298
1299 int
1300 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1301 {
1302         int rc = -EINVAL;
1303
1304         LASSERT (pcfg != NULL);
1305
1306         switch(pcfg->pcfg_command) {
1307         case NAL_CMD_GET_AUTOCONN: {
1308                 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1309
1310                 if (route == NULL)
1311                         rc = -ENOENT;
1312                 else {
1313                         rc = 0;
1314                         pcfg->pcfg_nid   = route->ksnr_peer->ksnp_nid;
1315                         pcfg->pcfg_id    = route->ksnr_ipaddr;
1316                         pcfg->pcfg_misc  = route->ksnr_port;
1317                         pcfg->pcfg_count = route->ksnr_conn_count;
1318                         pcfg->pcfg_size  = route->ksnr_buffer_size;
1319                         pcfg->pcfg_wait  = route->ksnr_sharecount;
1320                         pcfg->pcfg_flags = (route->ksnr_irq_affinity ? 2 : 0) |
1321                                            (route->ksnr_eager        ? 4 : 0);
1322                         ksocknal_put_route (route);
1323                 }
1324                 break;
1325         }
1326         case NAL_CMD_ADD_AUTOCONN: {
1327                 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1328                                          pcfg->pcfg_misc, pcfg->pcfg_size,
1329                                          (pcfg->pcfg_flags & 0x02) != 0,
1330                                          (pcfg->pcfg_flags & 0x04) != 0,
1331                                          (pcfg->pcfg_flags & 0x08) != 0);
1332                 break;
1333         }
1334         case NAL_CMD_DEL_AUTOCONN: {
1335                 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id, 
1336                                          (pcfg->pcfg_flags & 1) != 0,
1337                                          (pcfg->pcfg_flags & 2) != 0);
1338                 break;
1339         }
1340         case NAL_CMD_GET_CONN: {
1341                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1342
1343                 if (conn == NULL)
1344                         rc = -ENOENT;
1345                 else {
1346                         rc = 0;
1347                         pcfg->pcfg_nid   = conn->ksnc_peer->ksnp_nid;
1348                         pcfg->pcfg_id    = conn->ksnc_ipaddr;
1349                         pcfg->pcfg_misc  = conn->ksnc_port;
1350                         pcfg->pcfg_flags = conn->ksnc_type;
1351                         ksocknal_put_conn (conn);
1352                 }
1353                 break;
1354         }
1355         case NAL_CMD_REGISTER_PEER_FD: {
1356                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1357                 int            type = pcfg->pcfg_misc;
1358
1359                 if (sock == NULL)
1360                         break;
1361
1362                 switch (type) {
1363                 case SOCKNAL_CONN_NONE:
1364                 case SOCKNAL_CONN_ANY:
1365                 case SOCKNAL_CONN_CONTROL:
1366                 case SOCKNAL_CONN_BULK_IN:
1367                 case SOCKNAL_CONN_BULK_OUT:
1368                         rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1369                 default:
1370                         break;
1371                 }
1372                 if (rc != 0)
1373                         fput (sock->file);
1374                 break;
1375         }
1376         case NAL_CMD_CLOSE_CONNECTION: {
1377                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid, 
1378                                                     pcfg->pcfg_id);
1379                 break;
1380         }
1381         case NAL_CMD_REGISTER_MYNID: {
1382                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1383                 break;
1384         }
1385         case NAL_CMD_PUSH_CONNECTION: {
1386                 rc = ksocknal_push (pcfg->pcfg_nid);
1387                 break;
1388         }
1389         }
1390
1391         return rc;
1392 }
1393
1394 void
1395 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1396 {
1397         ksock_fmb_t *fmb;
1398         int          i;
1399
1400         LASSERT (list_empty(&p->fmp_blocked_conns));
1401         LASSERT (p->fmp_nactive_fmbs == 0);
1402         
1403         while (!list_empty(&p->fmp_idle_fmbs)) {
1404
1405                 fmb = list_entry(p->fmp_idle_fmbs.next,
1406                                  ksock_fmb_t, fmb_list);
1407                 
1408                 for (i = 0; i < fmb->fmb_npages; i++)
1409                         if (fmb->fmb_pages[i] != NULL)
1410                                 __free_page(fmb->fmb_pages[i]);
1411                 
1412                 list_del(&fmb->fmb_list);
1413                 PORTAL_FREE(fmb, sizeof(*fmb));
1414         }
1415 }
1416
1417 void
1418 ksocknal_free_buffers (void)
1419 {
1420         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1421         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1422
1423         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1424
1425         if (ksocknal_data.ksnd_schedulers != NULL)
1426                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1427                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1428
1429         PORTAL_FREE (ksocknal_data.ksnd_peers,
1430                      sizeof (struct list_head) * 
1431                      ksocknal_data.ksnd_peer_hash_size);
1432 }
1433
1434 void
1435 ksocknal_module_fini (void)
1436 {
1437         int   i;
1438
1439         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1440                atomic_read (&portal_kmemory));
1441
1442         switch (ksocknal_data.ksnd_init) {
1443         default:
1444                 LASSERT (0);
1445
1446         case SOCKNAL_INIT_ALL:
1447 #if CONFIG_SYSCTL
1448                 if (ksocknal_data.ksnd_sysctl != NULL)
1449                         unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1450 #endif
1451                 kportal_nal_unregister(SOCKNAL);
1452                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1453                 /* fall through */
1454
1455         case SOCKNAL_INIT_PTL:
1456                 PtlNIFini(ksocknal_ni);
1457                 lib_fini(&ksocknal_lib);
1458                 /* fall through */
1459
1460         case SOCKNAL_INIT_DATA:
1461                 /* Module refcount only gets to zero when all peers
1462                  * have been closed so all lists must be empty */
1463                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1464                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1465                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1466                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1467                 }
1468                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1469                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1470                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1471                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1472                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1473
1474                 if (ksocknal_data.ksnd_schedulers != NULL)
1475                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1476                                 ksock_sched_t *kss =
1477                                         &ksocknal_data.ksnd_schedulers[i];
1478
1479                                 LASSERT (list_empty (&kss->kss_tx_conns));
1480                                 LASSERT (list_empty (&kss->kss_rx_conns));
1481                                 LASSERT (kss->kss_nconns == 0);
1482                         }
1483
1484                 /* stop router calling me */
1485                 kpr_shutdown (&ksocknal_data.ksnd_router);
1486
1487                 /* flag threads to terminate; wake and wait for them to die */
1488                 ksocknal_data.ksnd_shuttingdown = 1;
1489                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1490                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1491
1492                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1493                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1494
1495                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1496                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1497                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1498                         set_current_state (TASK_UNINTERRUPTIBLE);
1499                         schedule_timeout (HZ);
1500                 }
1501
1502                 kpr_deregister (&ksocknal_data.ksnd_router);
1503
1504                 ksocknal_free_buffers();
1505                 /* fall through */
1506
1507         case SOCKNAL_INIT_NOTHING:
1508                 break;
1509         }
1510
1511         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1512                atomic_read (&portal_kmemory));
1513
1514         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1515                atomic_read(&portal_kmemory));
1516 }
1517
1518
1519 void __init
1520 ksocknal_init_incarnation (void)
1521 {
1522         struct timeval tv;
1523
1524         /* The incarnation number is the time this module loaded and it
1525          * identifies this particular instance of the socknal.  Hopefully
1526          * we won't be able to reboot more frequently than 1MHz for the
1527          * forseeable future :) */
1528         
1529         do_gettimeofday(&tv);
1530         
1531         ksocknal_data.ksnd_incarnation = 
1532                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1533 }
1534
1535 int __init
1536 ksocknal_module_init (void)
1537 {
1538         int   pkmem = atomic_read(&portal_kmemory);
1539         int   rc;
1540         int   i;
1541         int   j;
1542
1543         /* packet descriptor must fit in a router descriptor's scratchpad */
1544         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1545         /* the following must be sizeof(int) for proc_dointvec() */
1546         LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1547         LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1548         /* check ksnr_connected/connecting field large enough */
1549         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1550         
1551         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1552
1553         ksocknal_api.forward  = ksocknal_api_forward;
1554         ksocknal_api.shutdown = ksocknal_api_shutdown;
1555         ksocknal_api.yield    = ksocknal_api_yield;
1556         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
1557         ksocknal_api.lock     = ksocknal_api_lock;
1558         ksocknal_api.unlock   = ksocknal_api_unlock;
1559         ksocknal_api.nal_data = &ksocknal_data;
1560
1561         ksocknal_lib.nal_data = &ksocknal_data;
1562
1563         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1564
1565         ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1566         ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1567         ksocknal_data.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1568         ksocknal_data.ksnd_min_bulk   = SOCKNAL_MIN_BULK;
1569 #if SOCKNAL_ZC
1570         ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1571 #endif
1572         ksocknal_init_incarnation();
1573         
1574         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1575         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1576                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1577         if (ksocknal_data.ksnd_peers == NULL)
1578                 return (-ENOMEM);
1579
1580         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1581                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1582
1583         rwlock_init(&ksocknal_data.ksnd_global_lock);
1584
1585         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1586         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1587
1588         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1589         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1590         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1591
1592         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1593         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1594         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1595
1596         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1597         INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1598         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1599         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1600         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1601
1602         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1603         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1604         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1605
1606         /* NB memset above zeros whole of ksocknal_data, including
1607          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1608
1609         /* flag lists/ptrs/locks initialised */
1610         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1611
1612         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1613                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1614         if (ksocknal_data.ksnd_schedulers == NULL) {
1615                 ksocknal_module_fini ();
1616                 return (-ENOMEM);
1617         }
1618
1619         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1620                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1621
1622                 spin_lock_init (&kss->kss_lock);
1623                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1624                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1625 #if SOCKNAL_ZC
1626                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1627 #endif
1628                 init_waitqueue_head (&kss->kss_waitq);
1629         }
1630
1631         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1632         if (rc != 0) {
1633                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1634                 ksocknal_module_fini ();
1635                 return (rc);
1636         }
1637         PtlNIDebug(ksocknal_ni, ~0);
1638
1639         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1640
1641         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1642                 rc = ksocknal_thread_start (ksocknal_scheduler,
1643                                             &ksocknal_data.ksnd_schedulers[i]);
1644                 if (rc != 0) {
1645                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1646                                i, rc);
1647                         ksocknal_module_fini ();
1648                         return (rc);
1649                 }
1650         }
1651
1652         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1653                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1654                 if (rc != 0) {
1655                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1656                         ksocknal_module_fini ();
1657                         return (rc);
1658                 }
1659         }
1660
1661         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1662         if (rc != 0) {
1663                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1664                 ksocknal_module_fini ();
1665                 return (rc);
1666         }
1667
1668         rc = kpr_register(&ksocknal_data.ksnd_router,
1669                           &ksocknal_router_interface);
1670         if (rc != 0) {
1671                 CDEBUG(D_NET, "Can't initialise routing interface "
1672                        "(rc = %d): not routing\n", rc);
1673         } else {
1674                 /* Only allocate forwarding buffers if I'm on a gateway */
1675
1676                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1677                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1678                         ksock_fmb_t *fmb;
1679                         
1680                         PORTAL_ALLOC(fmb, sizeof(*fmb));
1681                         if (fmb == NULL) {
1682                                 ksocknal_module_fini();
1683                                 return (-ENOMEM);
1684                         }
1685
1686                         if (i < SOCKNAL_SMALL_FWD_NMSGS) {
1687                                 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
1688                                 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
1689                         } else {
1690                                 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
1691                                 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
1692                         }
1693
1694                         for (j = 0; j < fmb->fmb_npages; j++) {
1695                                 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
1696
1697                                 if (fmb->fmb_pages[j] == NULL) {
1698                                         ksocknal_module_fini ();
1699                                         return (-ENOMEM);
1700                                 }
1701
1702                                 LASSERT(page_address(fmb->fmb_pages[j]) != NULL);
1703                         }
1704
1705                         list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
1706                 }
1707         }
1708
1709         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1710         if (rc != 0) {
1711                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1712                 ksocknal_module_fini ();
1713                 return (rc);
1714         }
1715
1716         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1717
1718 #ifdef CONFIG_SYSCTL
1719         /* Press on regardless even if registering sysctl doesn't work */
1720         ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1721 #endif
1722         /* flag everything initialised */
1723         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1724
1725         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1726                "(Routing %s, initial mem %d)\n",
1727                kpr_routing (&ksocknal_data.ksnd_router) ?
1728                "enabled" : "disabled", pkmem);
1729
1730         return (0);
1731 }
1732
1733 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1734 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1735 MODULE_LICENSE("GPL");
1736
1737 module_init(ksocknal_module_init);
1738 module_exit(ksocknal_module_fini);
1739
1740 EXPORT_SYMBOL (ksocknal_ni);