Whamcloud - gitweb
Landing b_smallfix onto HEAD (20040210_1202)
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40         kprni_notify:     ksocknal_notify,
41 };
42
43 #define SOCKNAL_SYSCTL  200
44
45 #define SOCKNAL_SYSCTL_TIMEOUT     1
46 #define SOCKNAL_SYSCTL_EAGER_ACK   2
47 #define SOCKNAL_SYSCTL_ZERO_COPY   3
48 #define SOCKNAL_SYSCTL_TYPED       4
49 #define SOCKNAL_SYSCTL_MIN_BULK    5
50
51 static ctl_table ksocknal_ctl_table[] = {
52         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
53          &ksocknal_data.ksnd_io_timeout, sizeof (int),
54          0644, NULL, &proc_dointvec},
55         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
56          &ksocknal_data.ksnd_eager_ack, sizeof (int),
57          0644, NULL, &proc_dointvec},
58 #if SOCKNAL_ZC
59         {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy", 
60          &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
61          0644, NULL, &proc_dointvec},
62 #endif
63         {SOCKNAL_SYSCTL_TYPED, "typed", 
64          &ksocknal_data.ksnd_typed_conns, sizeof (int),
65          0644, NULL, &proc_dointvec},
66         {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk", 
67          &ksocknal_data.ksnd_min_bulk, sizeof (int),
68          0644, NULL, &proc_dointvec},
69         { 0 }
70 };
71
72 static ctl_table ksocknal_top_ctl_table[] = {
73         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
74         { 0 }
75 };
76
77 int
78 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
79                        void *ret, size_t ret_len)
80 {
81         ksock_nal_data_t *k;
82         nal_cb_t *nal_cb;
83
84         k = nal->nal_data;
85         nal_cb = k->ksnd_nal_cb;
86
87         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
88         return PTL_OK;
89 }
90
91 int
92 ksocknal_api_shutdown(nal_t *nal, int ni)
93 {
94         return PTL_OK;
95 }
96
97 void
98 ksocknal_api_yield(nal_t *nal)
99 {
100         our_cond_resched();
101         return;
102 }
103
104 void
105 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
106 {
107         ksock_nal_data_t *k;
108         nal_cb_t *nal_cb;
109
110         k = nal->nal_data;
111         nal_cb = k->ksnd_nal_cb;
112         nal_cb->cb_cli(nal_cb,flags);
113 }
114
115 void
116 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
117 {
118         ksock_nal_data_t *k;
119         nal_cb_t *nal_cb;
120
121         k = nal->nal_data;
122         nal_cb = k->ksnd_nal_cb;
123         nal_cb->cb_sti(nal_cb,flags);
124 }
125
126 nal_t *
127 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
128               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
129 {
130         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
131         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
132         return (&ksocknal_api);
133 }
134
135 /*
136  *  EXTRA functions follow
137  */
138
139 int
140 ksocknal_set_mynid(ptl_nid_t nid)
141 {
142         lib_ni_t *ni = &ksocknal_lib.ni;
143
144         /* FIXME: we have to do this because we call lib_init() at module
145          * insertion time, which is before we have 'mynid' available.  lib_init
146          * sets the NAL's nid, which it uses to tell other nodes where packets
147          * are coming from.  This is not a very graceful solution to this
148          * problem. */
149
150         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
151                nid, ni->nid);
152
153         ni->nid = nid;
154         return (0);
155 }
156
157 void
158 ksocknal_bind_irq (unsigned int irq)
159 {
160 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
161         int              bind;
162         unsigned long    flags;
163         char             cmdline[64];
164         ksock_irqinfo_t *info;
165         char            *argv[] = {"/bin/sh",
166                                    "-c",
167                                    cmdline,
168                                    NULL};
169         char            *envp[] = {"HOME=/",
170                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
171                                    NULL};
172
173         LASSERT (irq < NR_IRQS);
174         if (irq == 0)                           /* software NIC */
175                 return;
176
177         info = &ksocknal_data.ksnd_irqinfo[irq];
178
179         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
180
181         LASSERT (info->ksni_valid);
182         bind = !info->ksni_bound;
183         info->ksni_bound = 1;
184
185         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
186
187         if (!bind)                              /* bound already */
188                 return;
189
190         snprintf (cmdline, sizeof (cmdline),
191                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
192
193         printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
194                 irq, info->ksni_sched, cmdline);
195
196         /* FIXME: Find a better method of setting IRQ affinity...
197          */
198
199         call_usermodehelper (argv[0], argv, envp);
200 #endif
201 }
202
203 ksock_route_t *
204 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
205                        int irq_affinity, int eager)
206 {
207         ksock_route_t *route;
208
209         PORTAL_ALLOC (route, sizeof (*route));
210         if (route == NULL)
211                 return (NULL);
212
213         atomic_set (&route->ksnr_refcount, 1);
214         route->ksnr_sharecount = 0;
215         route->ksnr_peer = NULL;
216         route->ksnr_timeout = jiffies;
217         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
218         route->ksnr_ipaddr = ipaddr;
219         route->ksnr_port = port;
220         route->ksnr_buffer_size = buffer_size;
221         route->ksnr_irq_affinity = irq_affinity;
222         route->ksnr_eager = eager;
223         route->ksnr_connecting = 0;
224         route->ksnr_connected = 0;
225         route->ksnr_deleted = 0;
226         route->ksnr_conn_count = 0;
227
228         return (route);
229 }
230
231 void
232 ksocknal_destroy_route (ksock_route_t *route)
233 {
234         LASSERT (route->ksnr_sharecount == 0);
235
236         if (route->ksnr_peer != NULL)
237                 ksocknal_put_peer (route->ksnr_peer);
238
239         PORTAL_FREE (route, sizeof (*route));
240 }
241
242 void
243 ksocknal_put_route (ksock_route_t *route)
244 {
245         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
246                 route, atomic_read (&route->ksnr_refcount));
247
248         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
249         if (!atomic_dec_and_test (&route->ksnr_refcount))
250              return;
251
252         ksocknal_destroy_route (route);
253 }
254
255 ksock_peer_t *
256 ksocknal_create_peer (ptl_nid_t nid)
257 {
258         ksock_peer_t *peer;
259
260         LASSERT (nid != PTL_NID_ANY);
261
262         PORTAL_ALLOC (peer, sizeof (*peer));
263         if (peer == NULL)
264                 return (NULL);
265
266         memset (peer, 0, sizeof (*peer));
267
268         peer->ksnp_nid = nid;
269         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
270         peer->ksnp_closing = 0;
271         INIT_LIST_HEAD (&peer->ksnp_conns);
272         INIT_LIST_HEAD (&peer->ksnp_routes);
273         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
274
275         atomic_inc (&ksocknal_data.ksnd_npeers);
276         return (peer);
277 }
278
279 void
280 ksocknal_destroy_peer (ksock_peer_t *peer)
281 {
282         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
283
284         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
285         LASSERT (list_empty (&peer->ksnp_conns));
286         LASSERT (list_empty (&peer->ksnp_routes));
287         LASSERT (list_empty (&peer->ksnp_tx_queue));
288
289         PORTAL_FREE (peer, sizeof (*peer));
290
291         /* NB a peer's connections and autoconnect routes keep a reference
292          * on their peer until they are destroyed, so we can be assured
293          * that _all_ state to do with this peer has been cleaned up when
294          * its refcount drops to zero. */
295         atomic_dec (&ksocknal_data.ksnd_npeers);
296 }
297
298 void
299 ksocknal_put_peer (ksock_peer_t *peer)
300 {
301         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
302                 peer, peer->ksnp_nid,
303                 atomic_read (&peer->ksnp_refcount));
304
305         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
306         if (!atomic_dec_and_test (&peer->ksnp_refcount))
307                 return;
308
309         ksocknal_destroy_peer (peer);
310 }
311
312 ksock_peer_t *
313 ksocknal_find_peer_locked (ptl_nid_t nid)
314 {
315         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
316         struct list_head *tmp;
317         ksock_peer_t     *peer;
318
319         list_for_each (tmp, peer_list) {
320
321                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
322
323                 LASSERT (!peer->ksnp_closing);
324                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
325                            list_empty (&peer->ksnp_conns)));
326
327                 if (peer->ksnp_nid != nid)
328                         continue;
329
330                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
331                        peer, nid, atomic_read (&peer->ksnp_refcount));
332                 return (peer);
333         }
334         return (NULL);
335 }
336
337 ksock_peer_t *
338 ksocknal_get_peer (ptl_nid_t nid)
339 {
340         ksock_peer_t     *peer;
341
342         read_lock (&ksocknal_data.ksnd_global_lock);
343         peer = ksocknal_find_peer_locked (nid);
344         if (peer != NULL)                       /* +1 ref for caller? */
345                 atomic_inc (&peer->ksnp_refcount);
346         read_unlock (&ksocknal_data.ksnd_global_lock);
347
348         return (peer);
349 }
350
351 void
352 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
353 {
354         LASSERT (!peer->ksnp_closing);
355         peer->ksnp_closing = 1;
356         list_del (&peer->ksnp_list);
357         /* lose peerlist's ref */
358         ksocknal_put_peer (peer);
359 }
360
361 ksock_route_t *
362 ksocknal_get_route_by_idx (int index)
363 {
364         ksock_peer_t      *peer;
365         struct list_head  *ptmp;
366         ksock_route_t     *route;
367         struct list_head  *rtmp;
368         int                i;
369
370         read_lock (&ksocknal_data.ksnd_global_lock);
371
372         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
373                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
374                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
375
376                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
377                                    list_empty (&peer->ksnp_conns)));
378
379                         list_for_each (rtmp, &peer->ksnp_routes) {
380                                 if (index-- > 0)
381                                         continue;
382
383                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
384                                 atomic_inc (&route->ksnr_refcount);
385                                 read_unlock (&ksocknal_data.ksnd_global_lock);
386                                 return (route);
387                         }
388                 }
389         }
390
391         read_unlock (&ksocknal_data.ksnd_global_lock);
392         return (NULL);
393 }
394
395 int
396 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
397                     int bind_irq, int share, int eager)
398 {
399         unsigned long      flags;
400         ksock_peer_t      *peer;
401         ksock_peer_t      *peer2;
402         ksock_route_t     *route;
403         struct list_head  *rtmp;
404         ksock_route_t     *route2;
405         
406         if (nid == PTL_NID_ANY)
407                 return (-EINVAL);
408
409         /* Have a brand new peer ready... */
410         peer = ksocknal_create_peer (nid);
411         if (peer == NULL)
412                 return (-ENOMEM);
413
414         route = ksocknal_create_route (ipaddr, port, bufnob, 
415                                        bind_irq, eager);
416         if (route == NULL) {
417                 ksocknal_put_peer (peer);
418                 return (-ENOMEM);
419         }
420
421         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
422
423         peer2 = ksocknal_find_peer_locked (nid);
424         if (peer2 != NULL) {
425                 ksocknal_put_peer (peer);
426                 peer = peer2;
427         } else {
428                 /* peer table takes existing ref on peer */
429                 list_add (&peer->ksnp_list,
430                           ksocknal_nid2peerlist (nid));
431         }
432
433         route2 = NULL;
434         if (share) {
435                 /* check for existing route to this NID via this ipaddr */
436                 list_for_each (rtmp, &peer->ksnp_routes) {
437                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
438                         
439                         if (route2->ksnr_ipaddr == ipaddr)
440                                 break;
441
442                         route2 = NULL;
443                 }
444         }
445
446         if (route2 != NULL) {
447                 ksocknal_put_route (route);
448                 route = route2;
449         } else {
450                 /* route takes a ref on peer */
451                 route->ksnr_peer = peer;
452                 atomic_inc (&peer->ksnp_refcount);
453                 /* peer's route list takes existing ref on route */
454                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
455         }
456         
457         route->ksnr_sharecount++;
458
459         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
460
461         return (0);
462 }
463
464 void
465 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
466 {
467         ksock_peer_t     *peer = route->ksnr_peer;
468         ksock_conn_t     *conn;
469         struct list_head *ctmp;
470         struct list_head *cnxt;
471
472         if (!share)
473                 route->ksnr_sharecount = 0;
474         else {
475                 route->ksnr_sharecount--;
476                 if (route->ksnr_sharecount != 0)
477                         return;
478         }
479
480         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
481                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
482
483                 if (conn->ksnc_route != route)
484                         continue;
485                 
486                 if (!keep_conn) {
487                         ksocknal_close_conn_locked (conn, 0);
488                         continue;
489                 }
490                 
491                 /* keeping the conn; just dissociate it and route... */
492                 conn->ksnc_route = NULL;
493                 ksocknal_put_route (route); /* drop conn's ref on route */
494         }
495         
496         route->ksnr_deleted = 1;
497         list_del (&route->ksnr_list);
498         ksocknal_put_route (route);             /* drop peer's ref */
499
500         if (list_empty (&peer->ksnp_routes) &&
501             list_empty (&peer->ksnp_conns)) {
502                 /* I've just removed the last autoconnect route of a peer
503                  * with no active connections */
504                 ksocknal_unlink_peer_locked (peer);
505         }
506 }
507
508 int
509 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
510 {
511         unsigned long      flags;
512         struct list_head  *ptmp;
513         struct list_head  *pnxt;
514         ksock_peer_t      *peer;
515         struct list_head  *rtmp;
516         struct list_head  *rnxt;
517         ksock_route_t     *route;
518         int                lo;
519         int                hi;
520         int                i;
521         int                rc = -ENOENT;
522
523         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
524
525         if (nid != PTL_NID_ANY)
526                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
527         else {
528                 lo = 0;
529                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
530         }
531
532         for (i = lo; i <= hi; i++) {
533                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
534                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
535
536                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
537                                 continue;
538
539                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
540                                 route = list_entry (rtmp, ksock_route_t,
541                                                     ksnr_list);
542
543                                 if (!(ipaddr == 0 ||
544                                       route->ksnr_ipaddr == ipaddr))
545                                         continue;
546
547                                 ksocknal_del_route_locked (route, share, keep_conn);
548                                 rc = 0;         /* matched something */
549                                 if (share)
550                                         goto out;
551                         }
552                 }
553         }
554  out:
555         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
556
557         return (rc);
558 }
559
560 ksock_conn_t *
561 ksocknal_get_conn_by_idx (int index)
562 {
563         ksock_peer_t      *peer;
564         struct list_head  *ptmp;
565         ksock_conn_t      *conn;
566         struct list_head  *ctmp;
567         int                i;
568
569         read_lock (&ksocknal_data.ksnd_global_lock);
570
571         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
572                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
573                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
574
575                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
576                                    list_empty (&peer->ksnp_conns)));
577
578                         list_for_each (ctmp, &peer->ksnp_conns) {
579                                 if (index-- > 0)
580                                         continue;
581
582                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
583                                 atomic_inc (&conn->ksnc_refcount);
584                                 read_unlock (&ksocknal_data.ksnd_global_lock);
585                                 return (conn);
586                         }
587                 }
588         }
589
590         read_unlock (&ksocknal_data.ksnd_global_lock);
591         return (NULL);
592 }
593
594 void
595 ksocknal_get_peer_addr (ksock_conn_t *conn)
596 {
597         struct sockaddr_in sin;
598         int                len = sizeof (sin);
599         int                rc;
600         
601         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
602                                             (struct sockaddr *)&sin, &len, 2);
603         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
604         LASSERT (!conn->ksnc_closing);
605         LASSERT (len <= sizeof (sin));
606
607         if (rc != 0) {
608                 CERROR ("Error %d getting sock peer IP\n", rc);
609                 return;
610         }
611
612         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
613         conn->ksnc_port   = ntohs (sin.sin_port);
614 }
615
616 unsigned int
617 ksocknal_conn_irq (ksock_conn_t *conn)
618 {
619         int                irq = 0;
620         struct dst_entry  *dst;
621
622         dst = sk_dst_get (conn->ksnc_sock->sk);
623         if (dst != NULL) {
624                 if (dst->dev != NULL) {
625                         irq = dst->dev->irq;
626                         if (irq >= NR_IRQS) {
627                                 CERROR ("Unexpected IRQ %x\n", irq);
628                                 irq = 0;
629                         }
630                 }
631                 dst_release (dst);
632         }
633         
634         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
635         LASSERT (!conn->ksnc_closing);
636         return (irq);
637 }
638
639 ksock_sched_t *
640 ksocknal_choose_scheduler_locked (unsigned int irq)
641 {
642         ksock_sched_t    *sched;
643         ksock_irqinfo_t  *info;
644         int               i;
645
646         LASSERT (irq < NR_IRQS);
647         info = &ksocknal_data.ksnd_irqinfo[irq];
648
649         if (irq != 0 &&                         /* hardware NIC */
650             info->ksni_valid) {                 /* already set up */
651                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
652         }
653
654         /* software NIC (irq == 0) || not associated with a scheduler yet.
655          * Choose the CPU with the fewest connections... */
656         sched = &ksocknal_data.ksnd_schedulers[0];
657         for (i = 1; i < SOCKNAL_N_SCHED; i++)
658                 if (sched->kss_nconns >
659                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
660                         sched = &ksocknal_data.ksnd_schedulers[i];
661
662         if (irq != 0) {                         /* Hardware NIC */
663                 info->ksni_valid = 1;
664                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
665
666                 /* no overflow... */
667                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
668         }
669
670         return (sched);
671 }
672
673 int
674 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
675                       int bind_irq, int type)
676 {
677         ptl_nid_t          nid;
678         __u64              incarnation;
679         unsigned long      flags;
680         ksock_conn_t      *conn;
681         ksock_peer_t      *peer;
682         ksock_peer_t      *peer2;
683         ksock_sched_t     *sched;
684         unsigned int       irq;
685         ksock_tx_t        *tx;
686         int                rc;
687
688         /* NB, sock has an associated file since (a) this connection might
689          * have been created in userland and (b) we need to refcount the
690          * socket so that we don't close it while I/O is being done on
691          * it, and sock->file has that pre-cooked... */
692         LASSERT (sock->file != NULL);
693         LASSERT (file_count(sock->file) > 0);
694
695         rc = ksocknal_setup_sock (sock);
696         if (rc != 0)
697                 return (rc);
698
699         if (route == NULL) {
700                 /* acceptor or explicit connect */
701                 nid = PTL_NID_ANY;
702         } else {
703                 LASSERT (type != SOCKNAL_CONN_NONE);
704                 /* autoconnect: expect this nid on exchange */
705                 nid = route->ksnr_peer->ksnp_nid;
706         }
707
708         rc = ksocknal_hello (sock, &nid, &type, &incarnation);
709         if (rc != 0)
710                 return (rc);
711         
712         peer = NULL;
713         if (route == NULL) {                    /* not autoconnect */
714                 /* Assume this socket connects to a brand new peer */
715                 peer = ksocknal_create_peer (nid);
716                 if (peer == NULL)
717                         return (-ENOMEM);
718         }
719
720         PORTAL_ALLOC(conn, sizeof(*conn));
721         if (conn == NULL) {
722                 if (peer != NULL)
723                         ksocknal_put_peer (peer);
724                 return (-ENOMEM);
725         }
726
727         memset (conn, 0, sizeof (*conn));
728         conn->ksnc_peer = NULL;
729         conn->ksnc_route = NULL;
730         conn->ksnc_sock = sock;
731         conn->ksnc_type = type;
732         conn->ksnc_incarnation = incarnation;
733         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
734         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
735         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
736
737         conn->ksnc_rx_ready = 0;
738         conn->ksnc_rx_scheduled = 0;
739         ksocknal_new_packet (conn, 0);
740
741         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
742         conn->ksnc_tx_ready = 0;
743         conn->ksnc_tx_scheduled = 0;
744         atomic_set (&conn->ksnc_tx_nob, 0);
745
746         ksocknal_get_peer_addr (conn);
747
748         irq = ksocknal_conn_irq (conn);
749
750         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
751
752         if (route != NULL) {
753                 /* Autoconnected! */
754                 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
755                 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
756
757                 if (route->ksnr_deleted) {
758                         /* This conn was autoconnected, but the autoconnect
759                          * route got deleted while it was being
760                          * established! */
761                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
762                                                  flags);
763                         PORTAL_FREE (conn, sizeof (*conn));
764                         return (-ESTALE);
765                 }
766
767
768                 /* associate conn/route */
769                 conn->ksnc_route = route;
770                 atomic_inc (&route->ksnr_refcount);
771
772                 route->ksnr_connecting &= ~(1 << type);
773                 route->ksnr_connected  |= (1 << type);
774                 route->ksnr_conn_count++;
775                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
776
777                 peer = route->ksnr_peer;
778         } else {
779                 /* Not an autoconnected connection; see if there is an
780                  * existing peer for this NID */
781                 peer2 = ksocknal_find_peer_locked (nid);
782                 if (peer2 != NULL) {
783                         ksocknal_put_peer (peer);
784                         peer = peer2;
785                 } else {
786                         list_add (&peer->ksnp_list,
787                                   ksocknal_nid2peerlist (nid));
788                         /* peer list takes over existing ref */
789                 }
790         }
791
792         LASSERT (!peer->ksnp_closing);
793
794         conn->ksnc_peer = peer;
795         atomic_inc (&peer->ksnp_refcount);
796         peer->ksnp_last_alive = jiffies;
797         peer->ksnp_error = 0;
798
799         /* Set the deadline for the outgoing HELLO to drain */
800         conn->ksnc_tx_deadline = jiffies +
801                                  ksocknal_data.ksnd_io_timeout * HZ;
802
803         list_add (&conn->ksnc_list, &peer->ksnp_conns);
804         atomic_inc (&conn->ksnc_refcount);
805
806         sched = ksocknal_choose_scheduler_locked (irq);
807         sched->kss_nconns++;
808         conn->ksnc_scheduler = sched;
809
810         /* NB my callbacks block while I hold ksnd_global_lock */
811         sock->sk->sk_user_data = conn;
812         sock->sk->sk_data_ready = ksocknal_data_ready;
813         sock->sk->sk_write_space = ksocknal_write_space;
814
815         /* Take all the packets blocking for a connection.
816          * NB, it might be nicer to share these blocked packets among any
817          * other connections that are becoming established, however that
818          * confuses the normal packet launching operation, which selects a
819          * connection and queues the packet on it without needing an
820          * exclusive lock on ksnd_global_lock. */
821         while (!list_empty (&peer->ksnp_tx_queue)) {
822                 tx = list_entry (peer->ksnp_tx_queue.next,
823                                  ksock_tx_t, tx_list);
824
825                 list_del (&tx->tx_list);
826                 ksocknal_queue_tx_locked (tx, conn);
827         }
828
829         rc = ksocknal_close_stale_conns_locked (peer, incarnation);
830
831         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
832
833         if (rc != 0)
834                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
835                         rc, conn->ksnc_peer->ksnp_nid,
836                         HIPQUAD(conn->ksnc_ipaddr));
837
838         if (bind_irq)                           /* irq binding required */
839                 ksocknal_bind_irq (irq);
840
841         /* Call the callbacks right now to get things going. */
842         ksocknal_data_ready (sock->sk, 0);
843         ksocknal_write_space (sock->sk);
844
845         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64" ip %d.%d.%d.%d\n",
846                conn, conn->ksnc_peer->ksnp_nid, HIPQUAD(conn->ksnc_ipaddr));
847
848         ksocknal_put_conn (conn);
849         return (0);
850 }
851
852 void
853 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
854 {
855         /* This just does the immmediate housekeeping, and queues the
856          * connection for the reaper to terminate.
857          * Caller holds ksnd_global_lock exclusively in irq context */
858         ksock_peer_t   *peer = conn->ksnc_peer;
859         ksock_route_t  *route;
860
861         LASSERT (peer->ksnp_error == 0);
862         LASSERT (!conn->ksnc_closing);
863         conn->ksnc_closing = 1;
864         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
865         
866         route = conn->ksnc_route;
867         if (route != NULL) {
868                 /* dissociate conn from route... */
869                 LASSERT (!route->ksnr_deleted);
870                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
871                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
872
873                 route->ksnr_connected &= ~(1 << conn->ksnc_type);
874                 conn->ksnc_route = NULL;
875
876                 list_del (&route->ksnr_list);   /* make route least favourite */
877                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
878                 
879                 ksocknal_put_route (route);     /* drop conn's ref on route */
880         }
881
882         /* ksnd_deathrow_conns takes over peer's ref */
883         list_del (&conn->ksnc_list);
884
885         if (list_empty (&peer->ksnp_conns)) {
886                 /* No more connections to this peer */
887
888                 peer->ksnp_error = error;       /* stash last conn close reason */
889
890                 if (list_empty (&peer->ksnp_routes)) {
891                         /* I've just closed last conn belonging to a
892                          * non-autoconnecting peer */
893                         ksocknal_unlink_peer_locked (peer);
894                 }
895         }
896
897         spin_lock (&ksocknal_data.ksnd_reaper_lock);
898
899         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
900         wake_up (&ksocknal_data.ksnd_reaper_waitq);
901                 
902         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
903 }
904
905 void
906 ksocknal_terminate_conn (ksock_conn_t *conn)
907 {
908         /* This gets called by the reaper (guaranteed thread context) to
909          * disengage the socket from its callbacks and close it.
910          * ksnc_refcount will eventually hit zero, and then the reaper will
911          * destroy it. */
912         unsigned long   flags;
913         ksock_peer_t   *peer = conn->ksnc_peer;
914         ksock_sched_t  *sched = conn->ksnc_scheduler;
915         struct timeval  now;
916         time_t          then = 0;
917         int             notify = 0;
918
919         LASSERT(conn->ksnc_closing);
920
921         /* wake up the scheduler to "send" all remaining packets to /dev/null */
922         spin_lock_irqsave(&sched->kss_lock, flags);
923
924         if (!conn->ksnc_tx_scheduled &&
925             !list_empty(&conn->ksnc_tx_queue)){
926                 list_add_tail (&conn->ksnc_tx_list,
927                                &sched->kss_tx_conns);
928                 /* a closing conn is always ready to tx */
929                 conn->ksnc_tx_ready = 1;
930                 conn->ksnc_tx_scheduled = 1;
931                 /* extra ref for scheduler */
932                 atomic_inc (&conn->ksnc_refcount);
933
934                 wake_up (&sched->kss_waitq);
935         }
936
937         spin_unlock_irqrestore (&sched->kss_lock, flags);
938
939         /* serialise with callbacks */
940         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
941
942         /* Remove conn's network callbacks.
943          * NB I _have_ to restore the callback, rather than storing a noop,
944          * since the socket could survive past this module being unloaded!! */
945         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
946         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
947
948         /* A callback could be in progress already; they hold a read lock
949          * on ksnd_global_lock (to serialise with me) and NOOP if
950          * sk_user_data is NULL. */
951         conn->ksnc_sock->sk->sk_user_data = NULL;
952
953         /* OK, so this conn may not be completely disengaged from its
954          * scheduler yet, but it _has_ committed to terminate... */
955         conn->ksnc_scheduler->kss_nconns--;
956
957         if (peer->ksnp_error != 0) {
958                 /* peer's last conn closed in error */
959                 LASSERT (list_empty (&peer->ksnp_conns));
960                 
961                 /* convert peer's last-known-alive timestamp from jiffies */
962                 do_gettimeofday (&now);
963                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
964                 notify = 1;
965         }
966         
967         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
968
969         /* The socket is closed on the final put; either here, or in
970          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
971          * when the connection was established, this will close the socket
972          * immediately, aborting anything buffered in it. Any hung
973          * zero-copy transmits will therefore complete in finite time. */
974         ksocknal_putconnsock (conn);
975
976         if (notify)
977                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
978                             0, then);
979 }
980
981 void
982 ksocknal_destroy_conn (ksock_conn_t *conn)
983 {
984         /* Final coup-de-grace of the reaper */
985         CDEBUG (D_NET, "connection %p\n", conn);
986
987         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
988         LASSERT (conn->ksnc_route == NULL);
989         LASSERT (!conn->ksnc_tx_scheduled);
990         LASSERT (!conn->ksnc_rx_scheduled);
991         LASSERT (list_empty(&conn->ksnc_tx_queue));
992
993         /* complete current receive if any */
994         switch (conn->ksnc_rx_state) {
995         case SOCKNAL_RX_BODY:
996 #if 0
997                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
998 #else
999                 CERROR ("Refusing to complete a partial receive from "
1000                         LPX64", ip %d.%d.%d.%d:%d\n", conn->ksnc_peer->ksnp_nid,
1001                         HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1002                 CERROR ("This may hang communications and "
1003                         "prevent modules from unloading\n");
1004 #endif
1005                 break;
1006         case SOCKNAL_RX_BODY_FWD:
1007                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1008                 break;
1009         case SOCKNAL_RX_HEADER:
1010         case SOCKNAL_RX_SLOP:
1011                 break;
1012         default:
1013                 LBUG ();
1014                 break;
1015         }
1016
1017         ksocknal_put_peer (conn->ksnc_peer);
1018
1019         PORTAL_FREE (conn, sizeof (*conn));
1020         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1021 }
1022
1023 void
1024 ksocknal_put_conn (ksock_conn_t *conn)
1025 {
1026         unsigned long flags;
1027
1028         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1029                 conn, conn->ksnc_peer->ksnp_nid,
1030                 atomic_read (&conn->ksnc_refcount));
1031
1032         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1033         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1034                 return;
1035
1036         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1037
1038         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1039         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1040
1041         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1042 }
1043
1044 int
1045 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1046 {
1047         ksock_conn_t       *conn;
1048         struct list_head   *ctmp;
1049         struct list_head   *cnxt;
1050         int                 count = 0;
1051
1052         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1053                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1054
1055                 if (ipaddr == 0 ||
1056                     conn->ksnc_ipaddr == ipaddr) {
1057                         count++;
1058                         ksocknal_close_conn_locked (conn, why);
1059                 }
1060         }
1061
1062         return (count);
1063 }
1064
1065 int
1066 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1067 {
1068         ksock_conn_t       *conn;
1069         struct list_head   *ctmp;
1070         struct list_head   *cnxt;
1071         int                 count = 0;
1072
1073         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1074                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1075
1076                 if (conn->ksnc_incarnation == incarnation)
1077                         continue;
1078                 
1079                 count++;
1080                 ksocknal_close_conn_locked (conn, -ESTALE);
1081         }
1082
1083         return (count);
1084 }
1085
1086 int
1087 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why) 
1088 {
1089         ksock_peer_t     *peer = conn->ksnc_peer;
1090         __u32             ipaddr = conn->ksnc_ipaddr;
1091         unsigned long     flags;
1092         int               count;
1093
1094         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1095
1096         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1097         
1098         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1099
1100         return (count);
1101 }
1102
1103 int
1104 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1105 {
1106         unsigned long       flags;
1107         ksock_peer_t       *peer;
1108         struct list_head   *ptmp;
1109         struct list_head   *pnxt;
1110         int                 lo;
1111         int                 hi;
1112         int                 i;
1113         int                 count = 0;
1114
1115         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1116
1117         if (nid != PTL_NID_ANY)
1118                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1119         else {
1120                 lo = 0;
1121                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1122         }
1123
1124         for (i = lo; i <= hi; i++) {
1125                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1126
1127                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1128
1129                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1130                                 continue;
1131
1132                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1133                 }
1134         }
1135
1136         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1137
1138         /* wildcards always succeed */
1139         if (nid == PTL_NID_ANY || ipaddr == 0)
1140                 return (0);
1141         
1142         return (count == 0 ? -ENOENT : 0);
1143 }
1144
1145 void
1146 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1147 {
1148         /* The router is telling me she's been notified of a change in
1149          * gateway state.... */
1150
1151         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1152
1153         if (!alive) {
1154                 /* If the gateway crashed, close all open connections... */
1155                 ksocknal_close_matching_conns (gw_nid, 0);
1156                 return;
1157         }
1158         
1159         /* ...otherwise do nothing.  We can only establish new connections
1160          * if we have autroutes, and these connect on demand. */
1161 }
1162
1163 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1164 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1165 {
1166         return &(sk->tp_pinfo.af_tcp);
1167 }
1168 #else
1169 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1170 {
1171         struct tcp_sock *s = (struct tcp_sock *)sk;
1172         return &s->tcp;
1173 }
1174 #endif
1175
1176 void
1177 ksocknal_push_conn (ksock_conn_t *conn)
1178 {
1179         struct sock    *sk;
1180         struct tcp_opt *tp;
1181         int             nonagle;
1182         int             val = 1;
1183         int             rc;
1184         mm_segment_t    oldmm;
1185
1186         rc = ksocknal_getconnsock (conn);
1187         if (rc != 0)                            /* being shut down */
1188                 return;
1189         
1190         sk = conn->ksnc_sock->sk;
1191         tp = sock2tcp_opt(sk);
1192         
1193         lock_sock (sk);
1194         nonagle = tp->nonagle;
1195         tp->nonagle = 1;
1196         release_sock (sk);
1197
1198         oldmm = get_fs ();
1199         set_fs (KERNEL_DS);
1200
1201         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1202                                       (char *)&val, sizeof (val));
1203         LASSERT (rc == 0);
1204
1205         set_fs (oldmm);
1206
1207         lock_sock (sk);
1208         tp->nonagle = nonagle;
1209         release_sock (sk);
1210
1211         ksocknal_putconnsock (conn);
1212 }
1213
1214 void
1215 ksocknal_push_peer (ksock_peer_t *peer)
1216 {
1217         int               index;
1218         int               i;
1219         struct list_head *tmp;
1220         ksock_conn_t     *conn;
1221
1222         for (index = 0; ; index++) {
1223                 read_lock (&ksocknal_data.ksnd_global_lock);
1224
1225                 i = 0;
1226                 conn = NULL;
1227
1228                 list_for_each (tmp, &peer->ksnp_conns) {
1229                         if (i++ == index) {
1230                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1231                                 atomic_inc (&conn->ksnc_refcount);
1232                                 break;
1233                         }
1234                 }
1235
1236                 read_unlock (&ksocknal_data.ksnd_global_lock);
1237
1238                 if (conn == NULL)
1239                         break;
1240
1241                 ksocknal_push_conn (conn);
1242                 ksocknal_put_conn (conn);
1243         }
1244 }
1245
1246 int
1247 ksocknal_push (ptl_nid_t nid)
1248 {
1249         ksock_peer_t      *peer;
1250         struct list_head  *tmp;
1251         int                index;
1252         int                i;
1253         int                j;
1254         int                rc = -ENOENT;
1255
1256         if (nid != PTL_NID_ANY) {
1257                 peer = ksocknal_get_peer (nid);
1258
1259                 if (peer != NULL) {
1260                         rc = 0;
1261                         ksocknal_push_peer (peer);
1262                         ksocknal_put_peer (peer);
1263                 }
1264                 return (rc);
1265         }
1266
1267         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1268                 for (j = 0; ; j++) {
1269                         read_lock (&ksocknal_data.ksnd_global_lock);
1270
1271                         index = 0;
1272                         peer = NULL;
1273
1274                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1275                                 if (index++ == j) {
1276                                         peer = list_entry(tmp, ksock_peer_t,
1277                                                           ksnp_list);
1278                                         atomic_inc (&peer->ksnp_refcount);
1279                                         break;
1280                                 }
1281                         }
1282
1283                         read_unlock (&ksocknal_data.ksnd_global_lock);
1284
1285                         if (peer != NULL) {
1286                                 rc = 0;
1287                                 ksocknal_push_peer (peer);
1288                                 ksocknal_put_peer (peer);
1289                         }
1290                 }
1291
1292         }
1293
1294         return (rc);
1295 }
1296
1297 int
1298 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1299 {
1300         int rc = -EINVAL;
1301
1302         LASSERT (pcfg != NULL);
1303
1304         switch(pcfg->pcfg_command) {
1305         case NAL_CMD_GET_AUTOCONN: {
1306                 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1307
1308                 if (route == NULL)
1309                         rc = -ENOENT;
1310                 else {
1311                         rc = 0;
1312                         pcfg->pcfg_nid   = route->ksnr_peer->ksnp_nid;
1313                         pcfg->pcfg_id    = route->ksnr_ipaddr;
1314                         pcfg->pcfg_misc  = route->ksnr_port;
1315                         pcfg->pcfg_count = route->ksnr_conn_count;
1316                         pcfg->pcfg_size  = route->ksnr_buffer_size;
1317                         pcfg->pcfg_wait  = route->ksnr_sharecount;
1318                         pcfg->pcfg_flags = (route->ksnr_irq_affinity ? 2 : 0) |
1319                                            (route->ksnr_eager        ? 4 : 0);
1320                         ksocknal_put_route (route);
1321                 }
1322                 break;
1323         }
1324         case NAL_CMD_ADD_AUTOCONN: {
1325                 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1326                                          pcfg->pcfg_misc, pcfg->pcfg_size,
1327                                          (pcfg->pcfg_flags & 0x02) != 0,
1328                                          (pcfg->pcfg_flags & 0x04) != 0,
1329                                          (pcfg->pcfg_flags & 0x08) != 0);
1330                 break;
1331         }
1332         case NAL_CMD_DEL_AUTOCONN: {
1333                 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id, 
1334                                          (pcfg->pcfg_flags & 1) != 0,
1335                                          (pcfg->pcfg_flags & 2) != 0);
1336                 break;
1337         }
1338         case NAL_CMD_GET_CONN: {
1339                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1340
1341                 if (conn == NULL)
1342                         rc = -ENOENT;
1343                 else {
1344                         rc = 0;
1345                         pcfg->pcfg_nid   = conn->ksnc_peer->ksnp_nid;
1346                         pcfg->pcfg_id    = conn->ksnc_ipaddr;
1347                         pcfg->pcfg_misc  = conn->ksnc_port;
1348                         pcfg->pcfg_flags = conn->ksnc_type;
1349                         ksocknal_put_conn (conn);
1350                 }
1351                 break;
1352         }
1353         case NAL_CMD_REGISTER_PEER_FD: {
1354                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1355                 int            type = pcfg->pcfg_misc;
1356
1357                 if (sock == NULL)
1358                         break;
1359
1360                 switch (type) {
1361                 case SOCKNAL_CONN_NONE:
1362                 case SOCKNAL_CONN_ANY:
1363                 case SOCKNAL_CONN_CONTROL:
1364                 case SOCKNAL_CONN_BULK_IN:
1365                 case SOCKNAL_CONN_BULK_OUT:
1366                         rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1367                 default:
1368                         break;
1369                 }
1370                 if (rc != 0)
1371                         fput (sock->file);
1372                 break;
1373         }
1374         case NAL_CMD_CLOSE_CONNECTION: {
1375                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid, 
1376                                                     pcfg->pcfg_id);
1377                 break;
1378         }
1379         case NAL_CMD_REGISTER_MYNID: {
1380                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1381                 break;
1382         }
1383         case NAL_CMD_PUSH_CONNECTION: {
1384                 rc = ksocknal_push (pcfg->pcfg_nid);
1385                 break;
1386         }
1387         }
1388
1389         return rc;
1390 }
1391
1392 void
1393 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1394 {
1395         ksock_fmb_t *fmb;
1396         int          i;
1397
1398         LASSERT (list_empty(&p->fmp_blocked_conns));
1399         LASSERT (p->fmp_nactive_fmbs == 0);
1400         
1401         while (!list_empty(&p->fmp_idle_fmbs)) {
1402
1403                 fmb = list_entry(p->fmp_idle_fmbs.next,
1404                                  ksock_fmb_t, fmb_list);
1405                 
1406                 for (i = 0; i < fmb->fmb_npages; i++)
1407                         if (fmb->fmb_pages[i] != NULL)
1408                                 __free_page(fmb->fmb_pages[i]);
1409                 
1410                 list_del(&fmb->fmb_list);
1411                 PORTAL_FREE(fmb, sizeof(*fmb));
1412         }
1413 }
1414
1415 void
1416 ksocknal_free_buffers (void)
1417 {
1418         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1419         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1420
1421         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1422
1423         if (ksocknal_data.ksnd_schedulers != NULL)
1424                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1425                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1426
1427         PORTAL_FREE (ksocknal_data.ksnd_peers,
1428                      sizeof (struct list_head) * 
1429                      ksocknal_data.ksnd_peer_hash_size);
1430 }
1431
1432 void
1433 ksocknal_module_fini (void)
1434 {
1435         int   i;
1436
1437         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1438                atomic_read (&portal_kmemory));
1439
1440         switch (ksocknal_data.ksnd_init) {
1441         default:
1442                 LASSERT (0);
1443
1444         case SOCKNAL_INIT_ALL:
1445 #if CONFIG_SYSCTL
1446                 if (ksocknal_data.ksnd_sysctl != NULL)
1447                         unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1448 #endif
1449                 kportal_nal_unregister(SOCKNAL);
1450                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1451                 /* fall through */
1452
1453         case SOCKNAL_INIT_PTL:
1454                 /* No more calls to ksocknal_cmd() to create new
1455                  * autoroutes/connections since we're being unloaded. */
1456                 PtlNIFini(ksocknal_ni);
1457
1458                 /* Delete all autoroute entries */
1459                 ksocknal_del_route(PTL_NID_ANY, 0, 0, 0);
1460
1461                 /* Delete all connections */
1462                 ksocknal_close_matching_conns (PTL_NID_ANY, 0);
1463                 
1464                 /* Wait for all peer state to clean up */
1465                 i = 2;
1466                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1467                         i++;
1468                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1469                                "waiting for %d peers to disconnect\n",
1470                                atomic_read (&ksocknal_data.ksnd_npeers));
1471                         set_current_state (TASK_UNINTERRUPTIBLE);
1472                         schedule_timeout (HZ);
1473                 }
1474
1475                 /* Tell lib we've stopped calling into her. */
1476                 lib_fini(&ksocknal_lib);
1477                 /* fall through */
1478
1479         case SOCKNAL_INIT_DATA:
1480                 /* Module refcount only gets to zero when all peers
1481                  * have been closed so all lists must be empty */
1482                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1483                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1484                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1485                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1486                 }
1487                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1488                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1489                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1490                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1491                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1492
1493                 if (ksocknal_data.ksnd_schedulers != NULL)
1494                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1495                                 ksock_sched_t *kss =
1496                                         &ksocknal_data.ksnd_schedulers[i];
1497
1498                                 LASSERT (list_empty (&kss->kss_tx_conns));
1499                                 LASSERT (list_empty (&kss->kss_rx_conns));
1500                                 LASSERT (kss->kss_nconns == 0);
1501                         }
1502
1503                 /* stop router calling me */
1504                 kpr_shutdown (&ksocknal_data.ksnd_router);
1505
1506                 /* flag threads to terminate; wake and wait for them to die */
1507                 ksocknal_data.ksnd_shuttingdown = 1;
1508                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1509                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1510
1511                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1512                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1513
1514                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1515                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1516                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1517                         set_current_state (TASK_UNINTERRUPTIBLE);
1518                         schedule_timeout (HZ);
1519                 }
1520
1521                 kpr_deregister (&ksocknal_data.ksnd_router);
1522
1523                 ksocknal_free_buffers();
1524                 /* fall through */
1525
1526         case SOCKNAL_INIT_NOTHING:
1527                 break;
1528         }
1529
1530         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1531                atomic_read (&portal_kmemory));
1532
1533         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1534                atomic_read(&portal_kmemory));
1535 }
1536
1537
1538 void __init
1539 ksocknal_init_incarnation (void)
1540 {
1541         struct timeval tv;
1542
1543         /* The incarnation number is the time this module loaded and it
1544          * identifies this particular instance of the socknal.  Hopefully
1545          * we won't be able to reboot more frequently than 1MHz for the
1546          * forseeable future :) */
1547         
1548         do_gettimeofday(&tv);
1549         
1550         ksocknal_data.ksnd_incarnation = 
1551                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1552 }
1553
1554 int __init
1555 ksocknal_module_init (void)
1556 {
1557         int   pkmem = atomic_read(&portal_kmemory);
1558         int   rc;
1559         int   i;
1560         int   j;
1561
1562         /* packet descriptor must fit in a router descriptor's scratchpad */
1563         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1564         /* the following must be sizeof(int) for proc_dointvec() */
1565         LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1566         LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1567         /* check ksnr_connected/connecting field large enough */
1568         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1569         
1570         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1571
1572         ksocknal_api.forward  = ksocknal_api_forward;
1573         ksocknal_api.shutdown = ksocknal_api_shutdown;
1574         ksocknal_api.yield    = ksocknal_api_yield;
1575         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
1576         ksocknal_api.lock     = ksocknal_api_lock;
1577         ksocknal_api.unlock   = ksocknal_api_unlock;
1578         ksocknal_api.nal_data = &ksocknal_data;
1579
1580         ksocknal_lib.nal_data = &ksocknal_data;
1581
1582         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1583
1584         ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1585         ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1586         ksocknal_data.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1587         ksocknal_data.ksnd_min_bulk   = SOCKNAL_MIN_BULK;
1588 #if SOCKNAL_ZC
1589         ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1590 #endif
1591         ksocknal_init_incarnation();
1592         
1593         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1594         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1595                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1596         if (ksocknal_data.ksnd_peers == NULL)
1597                 return (-ENOMEM);
1598
1599         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1600                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1601
1602         rwlock_init(&ksocknal_data.ksnd_global_lock);
1603
1604         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1605         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1606
1607         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1608         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1609         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1610
1611         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1612         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1613         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1614
1615         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1616         INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1617         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1618         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1619         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1620
1621         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1622         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1623         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1624
1625         /* NB memset above zeros whole of ksocknal_data, including
1626          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1627
1628         /* flag lists/ptrs/locks initialised */
1629         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1630
1631         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1632                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1633         if (ksocknal_data.ksnd_schedulers == NULL) {
1634                 ksocknal_module_fini ();
1635                 return (-ENOMEM);
1636         }
1637
1638         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1639                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1640
1641                 spin_lock_init (&kss->kss_lock);
1642                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1643                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1644 #if SOCKNAL_ZC
1645                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1646 #endif
1647                 init_waitqueue_head (&kss->kss_waitq);
1648         }
1649
1650         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1651         if (rc != 0) {
1652                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1653                 ksocknal_module_fini ();
1654                 return (rc);
1655         }
1656         PtlNIDebug(ksocknal_ni, ~0);
1657
1658         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1659
1660         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1661                 rc = ksocknal_thread_start (ksocknal_scheduler,
1662                                             &ksocknal_data.ksnd_schedulers[i]);
1663                 if (rc != 0) {
1664                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1665                                i, rc);
1666                         ksocknal_module_fini ();
1667                         return (rc);
1668                 }
1669         }
1670
1671         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1672                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1673                 if (rc != 0) {
1674                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1675                         ksocknal_module_fini ();
1676                         return (rc);
1677                 }
1678         }
1679
1680         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1681         if (rc != 0) {
1682                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1683                 ksocknal_module_fini ();
1684                 return (rc);
1685         }
1686
1687         rc = kpr_register(&ksocknal_data.ksnd_router,
1688                           &ksocknal_router_interface);
1689         if (rc != 0) {
1690                 CDEBUG(D_NET, "Can't initialise routing interface "
1691                        "(rc = %d): not routing\n", rc);
1692         } else {
1693                 /* Only allocate forwarding buffers if I'm on a gateway */
1694
1695                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1696                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1697                         ksock_fmb_t *fmb;
1698                         
1699                         PORTAL_ALLOC(fmb, sizeof(*fmb));
1700                         if (fmb == NULL) {
1701                                 ksocknal_module_fini();
1702                                 return (-ENOMEM);
1703                         }
1704
1705                         if (i < SOCKNAL_SMALL_FWD_NMSGS) {
1706                                 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
1707                                 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
1708                         } else {
1709                                 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
1710                                 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
1711                         }
1712
1713                         for (j = 0; j < fmb->fmb_npages; j++) {
1714                                 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
1715
1716                                 if (fmb->fmb_pages[j] == NULL) {
1717                                         ksocknal_module_fini ();
1718                                         return (-ENOMEM);
1719                                 }
1720
1721                                 LASSERT(page_address(fmb->fmb_pages[j]) != NULL);
1722                         }
1723
1724                         list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
1725                 }
1726         }
1727
1728         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1729         if (rc != 0) {
1730                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1731                 ksocknal_module_fini ();
1732                 return (rc);
1733         }
1734
1735         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1736
1737 #ifdef CONFIG_SYSCTL
1738         /* Press on regardless even if registering sysctl doesn't work */
1739         ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1740 #endif
1741         /* flag everything initialised */
1742         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1743
1744         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1745                "(Routing %s, initial mem %d)\n",
1746                kpr_routing (&ksocknal_data.ksnd_router) ?
1747                "enabled" : "disabled", pkmem);
1748
1749         return (0);
1750 }
1751
1752 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1753 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1754 MODULE_LICENSE("GPL");
1755
1756 module_init(ksocknal_module_init);
1757 module_exit(ksocknal_module_fini);
1758
1759 EXPORT_SYMBOL (ksocknal_ni);