Whamcloud - gitweb
- merge 2 weeks of b1_4 fixes onto HEAD
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 nal_t                   ksocknal_api;
29 ksock_nal_data_t        ksocknal_data;
30 ptl_handle_ni_t         ksocknal_ni;
31 ksock_tunables_t        ksocknal_tunables;
32
33 kpr_nal_interface_t ksocknal_router_interface = {
34         kprni_nalid:      SOCKNAL,
35         kprni_arg:        &ksocknal_data,
36         kprni_fwd:        ksocknal_fwd_packet,
37         kprni_notify:     ksocknal_notify,
38 };
39
40 #ifdef CONFIG_SYSCTL
41 #define SOCKNAL_SYSCTL  200
42
43 #define SOCKNAL_SYSCTL_TIMEOUT     1
44 #define SOCKNAL_SYSCTL_EAGER_ACK   2
45 #define SOCKNAL_SYSCTL_ZERO_COPY   3
46 #define SOCKNAL_SYSCTL_TYPED       4
47 #define SOCKNAL_SYSCTL_MIN_BULK    5
48
49 static ctl_table ksocknal_ctl_table[] = {
50         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
51          &ksocknal_tunables.ksnd_io_timeout, sizeof (int),
52          0644, NULL, &proc_dointvec},
53         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
54          &ksocknal_tunables.ksnd_eager_ack, sizeof (int),
55          0644, NULL, &proc_dointvec},
56 #if SOCKNAL_ZC
57         {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy", 
58          &ksocknal_tunables.ksnd_zc_min_frag, sizeof (int),
59          0644, NULL, &proc_dointvec},
60 #endif
61         {SOCKNAL_SYSCTL_TYPED, "typed", 
62          &ksocknal_tunables.ksnd_typed_conns, sizeof (int),
63          0644, NULL, &proc_dointvec},
64         {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk", 
65          &ksocknal_tunables.ksnd_min_bulk, sizeof (int),
66          0644, NULL, &proc_dointvec},
67         { 0 }
68 };
69
70 static ctl_table ksocknal_top_ctl_table[] = {
71         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
72         { 0 }
73 };
74 #endif
75
76 int
77 ksocknal_set_mynid(ptl_nid_t nid)
78 {
79         lib_ni_t *ni = &ksocknal_lib.libnal_ni;
80
81         /* FIXME: we have to do this because we call lib_init() at module
82          * insertion time, which is before we have 'mynid' available.  lib_init
83          * sets the NAL's nid, which it uses to tell other nodes where packets
84          * are coming from.  This is not a very graceful solution to this
85          * problem. */
86
87         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
88                nid, ni->ni_pid.nid);
89
90         ni->ni_pid.nid = nid;
91         return (0);
92 }
93
94 void
95 ksocknal_bind_irq (unsigned int irq)
96 {
97 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
98         int              bind;
99         unsigned long    flags;
100         char             cmdline[64];
101         ksock_irqinfo_t *info;
102         char            *argv[] = {"/bin/sh",
103                                    "-c",
104                                    cmdline,
105                                    NULL};
106         char            *envp[] = {"HOME=/",
107                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
108                                    NULL};
109
110         LASSERT (irq < NR_IRQS);
111         if (irq == 0)                           /* software NIC */
112                 return;
113
114         info = &ksocknal_data.ksnd_irqinfo[irq];
115
116         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
117
118         LASSERT (info->ksni_valid);
119         bind = !info->ksni_bound;
120         info->ksni_bound = 1;
121
122         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
123
124         if (!bind)                              /* bound already */
125                 return;
126
127         snprintf (cmdline, sizeof (cmdline),
128                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
129
130         printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
131                 irq, info->ksni_sched, cmdline);
132
133         /* FIXME: Find a better method of setting IRQ affinity...
134          */
135
136         USERMODEHELPER(argv[0], argv, envp);
137 #endif
138 }
139
140 ksock_route_t *
141 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
142                        int irq_affinity, int eager)
143 {
144         ksock_route_t *route;
145
146         PORTAL_ALLOC (route, sizeof (*route));
147         if (route == NULL)
148                 return (NULL);
149
150         atomic_set (&route->ksnr_refcount, 1);
151         route->ksnr_sharecount = 0;
152         route->ksnr_peer = NULL;
153         route->ksnr_timeout = jiffies;
154         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
155         route->ksnr_ipaddr = ipaddr;
156         route->ksnr_port = port;
157         route->ksnr_buffer_size = buffer_size;
158         route->ksnr_irq_affinity = irq_affinity;
159         route->ksnr_eager = eager;
160         route->ksnr_connecting = 0;
161         route->ksnr_connected = 0;
162         route->ksnr_deleted = 0;
163         route->ksnr_conn_count = 0;
164
165         return (route);
166 }
167
168 void
169 ksocknal_destroy_route (ksock_route_t *route)
170 {
171         LASSERT (route->ksnr_sharecount == 0);
172
173         if (route->ksnr_peer != NULL)
174                 ksocknal_put_peer (route->ksnr_peer);
175
176         PORTAL_FREE (route, sizeof (*route));
177 }
178
179 void
180 ksocknal_put_route (ksock_route_t *route)
181 {
182         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
183                 route, atomic_read (&route->ksnr_refcount));
184
185         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
186         if (!atomic_dec_and_test (&route->ksnr_refcount))
187              return;
188
189         ksocknal_destroy_route (route);
190 }
191
192 ksock_peer_t *
193 ksocknal_create_peer (ptl_nid_t nid)
194 {
195         ksock_peer_t *peer;
196
197         LASSERT (nid != PTL_NID_ANY);
198
199         PORTAL_ALLOC (peer, sizeof (*peer));
200         if (peer == NULL)
201                 return (NULL);
202
203         memset (peer, 0, sizeof (*peer));
204
205         peer->ksnp_nid = nid;
206         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
207         peer->ksnp_closing = 0;
208         INIT_LIST_HEAD (&peer->ksnp_conns);
209         INIT_LIST_HEAD (&peer->ksnp_routes);
210         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
211
212         atomic_inc (&ksocknal_data.ksnd_npeers);
213         return (peer);
214 }
215
216 void
217 ksocknal_destroy_peer (ksock_peer_t *peer)
218 {
219         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
220
221         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
222         LASSERT (list_empty (&peer->ksnp_conns));
223         LASSERT (list_empty (&peer->ksnp_routes));
224         LASSERT (list_empty (&peer->ksnp_tx_queue));
225
226         PORTAL_FREE (peer, sizeof (*peer));
227
228         /* NB a peer's connections and autoconnect routes keep a reference
229          * on their peer until they are destroyed, so we can be assured
230          * that _all_ state to do with this peer has been cleaned up when
231          * its refcount drops to zero. */
232         atomic_dec (&ksocknal_data.ksnd_npeers);
233 }
234
235 void
236 ksocknal_put_peer (ksock_peer_t *peer)
237 {
238         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
239                 peer, peer->ksnp_nid,
240                 atomic_read (&peer->ksnp_refcount));
241
242         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
243         if (!atomic_dec_and_test (&peer->ksnp_refcount))
244                 return;
245
246         ksocknal_destroy_peer (peer);
247 }
248
249 ksock_peer_t *
250 ksocknal_find_peer_locked (ptl_nid_t nid)
251 {
252         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
253         struct list_head *tmp;
254         ksock_peer_t     *peer;
255
256         list_for_each (tmp, peer_list) {
257
258                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
259
260                 LASSERT (!peer->ksnp_closing);
261                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
262                            list_empty (&peer->ksnp_conns)));
263
264                 if (peer->ksnp_nid != nid)
265                         continue;
266
267                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
268                        peer, nid, atomic_read (&peer->ksnp_refcount));
269                 return (peer);
270         }
271         return (NULL);
272 }
273
274 ksock_peer_t *
275 ksocknal_get_peer (ptl_nid_t nid)
276 {
277         ksock_peer_t     *peer;
278
279         read_lock (&ksocknal_data.ksnd_global_lock);
280         peer = ksocknal_find_peer_locked (nid);
281         if (peer != NULL)                       /* +1 ref for caller? */
282                 atomic_inc (&peer->ksnp_refcount);
283         read_unlock (&ksocknal_data.ksnd_global_lock);
284
285         return (peer);
286 }
287
288 void
289 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
290 {
291         LASSERT (!peer->ksnp_closing);
292         peer->ksnp_closing = 1;
293         list_del (&peer->ksnp_list);
294         /* lose peerlist's ref */
295         ksocknal_put_peer (peer);
296 }
297
298 ksock_route_t *
299 ksocknal_get_route_by_idx (int index)
300 {
301         ksock_peer_t      *peer;
302         struct list_head  *ptmp;
303         ksock_route_t     *route;
304         struct list_head  *rtmp;
305         int                i;
306
307         read_lock (&ksocknal_data.ksnd_global_lock);
308
309         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
310                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
311                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
312
313                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
314                                    list_empty (&peer->ksnp_conns)));
315
316                         list_for_each (rtmp, &peer->ksnp_routes) {
317                                 if (index-- > 0)
318                                         continue;
319
320                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
321                                 atomic_inc (&route->ksnr_refcount);
322                                 read_unlock (&ksocknal_data.ksnd_global_lock);
323                                 return (route);
324                         }
325                 }
326         }
327
328         read_unlock (&ksocknal_data.ksnd_global_lock);
329         return (NULL);
330 }
331
332 int
333 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
334                     int bind_irq, int share, int eager)
335 {
336         unsigned long      flags;
337         ksock_peer_t      *peer;
338         ksock_peer_t      *peer2;
339         ksock_route_t     *route;
340         struct list_head  *rtmp;
341         ksock_route_t     *route2;
342         
343         if (nid == PTL_NID_ANY)
344                 return (-EINVAL);
345
346         /* Have a brand new peer ready... */
347         peer = ksocknal_create_peer (nid);
348         if (peer == NULL)
349                 return (-ENOMEM);
350
351         route = ksocknal_create_route (ipaddr, port, bufnob, 
352                                        bind_irq, eager);
353         if (route == NULL) {
354                 ksocknal_put_peer (peer);
355                 return (-ENOMEM);
356         }
357
358         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
359
360         peer2 = ksocknal_find_peer_locked (nid);
361         if (peer2 != NULL) {
362                 ksocknal_put_peer (peer);
363                 peer = peer2;
364         } else {
365                 /* peer table takes existing ref on peer */
366                 list_add (&peer->ksnp_list,
367                           ksocknal_nid2peerlist (nid));
368         }
369
370         route2 = NULL;
371         if (share) {
372                 /* check for existing route to this NID via this ipaddr */
373                 list_for_each (rtmp, &peer->ksnp_routes) {
374                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
375                         
376                         if (route2->ksnr_ipaddr == ipaddr)
377                                 break;
378
379                         route2 = NULL;
380                 }
381         }
382
383         if (route2 != NULL) {
384                 ksocknal_put_route (route);
385                 route = route2;
386         } else {
387                 /* route takes a ref on peer */
388                 route->ksnr_peer = peer;
389                 atomic_inc (&peer->ksnp_refcount);
390                 /* peer's route list takes existing ref on route */
391                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
392         }
393         
394         route->ksnr_sharecount++;
395
396         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
397
398         return (0);
399 }
400
401 void
402 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
403 {
404         ksock_peer_t     *peer = route->ksnr_peer;
405         ksock_conn_t     *conn;
406         struct list_head *ctmp;
407         struct list_head *cnxt;
408
409         if (!share)
410                 route->ksnr_sharecount = 0;
411         else {
412                 route->ksnr_sharecount--;
413                 if (route->ksnr_sharecount != 0)
414                         return;
415         }
416
417         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
418                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
419
420                 if (conn->ksnc_route != route)
421                         continue;
422                 
423                 if (!keep_conn) {
424                         ksocknal_close_conn_locked (conn, 0);
425                         continue;
426                 }
427                 
428                 /* keeping the conn; just dissociate it and route... */
429                 conn->ksnc_route = NULL;
430                 ksocknal_put_route (route); /* drop conn's ref on route */
431         }
432         
433         route->ksnr_deleted = 1;
434         list_del (&route->ksnr_list);
435         ksocknal_put_route (route);             /* drop peer's ref */
436
437         if (list_empty (&peer->ksnp_routes) &&
438             list_empty (&peer->ksnp_conns)) {
439                 /* I've just removed the last autoconnect route of a peer
440                  * with no active connections */
441                 ksocknal_unlink_peer_locked (peer);
442         }
443 }
444
445 int
446 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
447 {
448         unsigned long      flags;
449         struct list_head  *ptmp;
450         struct list_head  *pnxt;
451         ksock_peer_t      *peer;
452         struct list_head  *rtmp;
453         struct list_head  *rnxt;
454         ksock_route_t     *route;
455         int                lo;
456         int                hi;
457         int                i;
458         int                rc = -ENOENT;
459
460         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
461
462         if (nid != PTL_NID_ANY)
463                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
464         else {
465                 lo = 0;
466                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
467         }
468
469         for (i = lo; i <= hi; i++) {
470                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
471                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
472
473                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
474                                 continue;
475
476                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
477                                 route = list_entry (rtmp, ksock_route_t,
478                                                     ksnr_list);
479
480                                 if (!(ipaddr == 0 ||
481                                       route->ksnr_ipaddr == ipaddr))
482                                         continue;
483
484                                 ksocknal_del_route_locked (route, share, keep_conn);
485                                 rc = 0;         /* matched something */
486                                 if (share)
487                                         goto out;
488                         }
489                 }
490         }
491  out:
492         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
493
494         return (rc);
495 }
496
497 ksock_conn_t *
498 ksocknal_get_conn_by_idx (int index)
499 {
500         ksock_peer_t      *peer;
501         struct list_head  *ptmp;
502         ksock_conn_t      *conn;
503         struct list_head  *ctmp;
504         int                i;
505
506         read_lock (&ksocknal_data.ksnd_global_lock);
507
508         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
509                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
510                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
511
512                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
513                                    list_empty (&peer->ksnp_conns)));
514
515                         list_for_each (ctmp, &peer->ksnp_conns) {
516                                 if (index-- > 0)
517                                         continue;
518
519                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
520                                 atomic_inc (&conn->ksnc_refcount);
521                                 read_unlock (&ksocknal_data.ksnd_global_lock);
522                                 return (conn);
523                         }
524                 }
525         }
526
527         read_unlock (&ksocknal_data.ksnd_global_lock);
528         return (NULL);
529 }
530
531 void
532 ksocknal_get_peer_addr (ksock_conn_t *conn)
533 {
534         struct sockaddr_in sin;
535         int                len = sizeof (sin);
536         int                rc;
537         
538         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
539                                             (struct sockaddr *)&sin, &len, 2);
540         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
541         LASSERT (!conn->ksnc_closing);
542         LASSERT (len <= sizeof (sin));
543
544         if (rc != 0) {
545                 CERROR ("Error %d getting sock peer IP\n", rc);
546                 return;
547         }
548
549         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
550         conn->ksnc_port   = ntohs (sin.sin_port);
551 }
552
553 unsigned int
554 ksocknal_conn_irq (ksock_conn_t *conn)
555 {
556         int                irq = 0;
557         struct dst_entry  *dst;
558
559         dst = sk_dst_get (conn->ksnc_sock->sk);
560         if (dst != NULL) {
561                 if (dst->dev != NULL) {
562                         irq = dst->dev->irq;
563                         if (irq >= NR_IRQS) {
564                                 CERROR ("Unexpected IRQ %x\n", irq);
565                                 irq = 0;
566                         }
567                 }
568                 dst_release (dst);
569         }
570         
571         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
572         LASSERT (!conn->ksnc_closing);
573         return (irq);
574 }
575
576 ksock_sched_t *
577 ksocknal_choose_scheduler_locked (unsigned int irq)
578 {
579         ksock_sched_t    *sched;
580         ksock_irqinfo_t  *info;
581         int               i;
582
583         LASSERT (irq < NR_IRQS);
584         info = &ksocknal_data.ksnd_irqinfo[irq];
585
586         if (irq != 0 &&                         /* hardware NIC */
587             info->ksni_valid) {                 /* already set up */
588                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
589         }
590
591         /* software NIC (irq == 0) || not associated with a scheduler yet.
592          * Choose the CPU with the fewest connections... */
593         sched = &ksocknal_data.ksnd_schedulers[0];
594         for (i = 1; i < SOCKNAL_N_SCHED; i++)
595                 if (sched->kss_nconns >
596                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
597                         sched = &ksocknal_data.ksnd_schedulers[i];
598
599         if (irq != 0) {                         /* Hardware NIC */
600                 info->ksni_valid = 1;
601                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
602
603                 /* no overflow... */
604                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
605         }
606
607         return (sched);
608 }
609
610 int
611 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
612                       int bind_irq, int type)
613 {
614         ptl_nid_t          nid;
615         __u64              incarnation;
616         unsigned long      flags;
617         ksock_conn_t      *conn;
618         ksock_peer_t      *peer;
619         ksock_peer_t      *peer2;
620         ksock_sched_t     *sched;
621         unsigned int       irq;
622         ksock_tx_t        *tx;
623         int                rc;
624
625         /* NB, sock has an associated file since (a) this connection might
626          * have been created in userland and (b) we need to refcount the
627          * socket so that we don't close it while I/O is being done on
628          * it, and sock->file has that pre-cooked... */
629         LASSERT (sock->file != NULL);
630         LASSERT (file_count(sock->file) > 0);
631
632         rc = ksocknal_setup_sock (sock);
633         if (rc != 0)
634                 return (rc);
635
636         if (route == NULL) {
637                 /* acceptor or explicit connect */
638                 nid = PTL_NID_ANY;
639         } else {
640                 LASSERT (type != SOCKNAL_CONN_NONE);
641                 /* autoconnect: expect this nid on exchange */
642                 nid = route->ksnr_peer->ksnp_nid;
643         }
644
645         rc = ksocknal_hello (sock, &nid, &type, &incarnation);
646         if (rc != 0)
647                 return (rc);
648         
649         peer = NULL;
650         if (route == NULL) {                    /* not autoconnect */
651                 /* Assume this socket connects to a brand new peer */
652                 peer = ksocknal_create_peer (nid);
653                 if (peer == NULL)
654                         return (-ENOMEM);
655         }
656
657         PORTAL_ALLOC(conn, sizeof(*conn));
658         if (conn == NULL) {
659                 if (peer != NULL)
660                         ksocknal_put_peer (peer);
661                 return (-ENOMEM);
662         }
663
664         memset (conn, 0, sizeof (*conn));
665         conn->ksnc_peer = NULL;
666         conn->ksnc_route = NULL;
667         conn->ksnc_sock = sock;
668         conn->ksnc_type = type;
669         conn->ksnc_incarnation = incarnation;
670         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
671         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
672         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
673
674         conn->ksnc_rx_ready = 0;
675         conn->ksnc_rx_scheduled = 0;
676         ksocknal_new_packet (conn, 0);
677
678         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
679         conn->ksnc_tx_ready = 0;
680         conn->ksnc_tx_scheduled = 0;
681         atomic_set (&conn->ksnc_tx_nob, 0);
682
683         ksocknal_get_peer_addr (conn);
684
685         CWARN("New conn nid:"LPX64" ip:%08x/%d incarnation:"LPX64"\n",
686               nid, conn->ksnc_ipaddr, conn->ksnc_port, incarnation);
687
688         irq = ksocknal_conn_irq (conn);
689
690         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
691
692         if (route != NULL) {
693                 /* Autoconnected! */
694                 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
695                 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
696
697                 if (route->ksnr_deleted) {
698                         /* This conn was autoconnected, but the autoconnect
699                          * route got deleted while it was being
700                          * established! */
701                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
702                                                  flags);
703                         PORTAL_FREE (conn, sizeof (*conn));
704                         return (-ESTALE);
705                 }
706
707
708                 /* associate conn/route */
709                 conn->ksnc_route = route;
710                 atomic_inc (&route->ksnr_refcount);
711
712                 route->ksnr_connecting &= ~(1 << type);
713                 route->ksnr_connected  |= (1 << type);
714                 route->ksnr_conn_count++;
715                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
716
717                 peer = route->ksnr_peer;
718         } else {
719                 /* Not an autoconnected connection; see if there is an
720                  * existing peer for this NID */
721                 peer2 = ksocknal_find_peer_locked (nid);
722                 if (peer2 != NULL) {
723                         ksocknal_put_peer (peer);
724                         peer = peer2;
725                 } else {
726                         list_add (&peer->ksnp_list,
727                                   ksocknal_nid2peerlist (nid));
728                         /* peer list takes over existing ref */
729                 }
730         }
731
732         LASSERT (!peer->ksnp_closing);
733
734         conn->ksnc_peer = peer;
735         atomic_inc (&peer->ksnp_refcount);
736         peer->ksnp_last_alive = jiffies;
737         peer->ksnp_error = 0;
738
739         /* Set the deadline for the outgoing HELLO to drain */
740         conn->ksnc_tx_deadline = jiffies +
741                                  ksocknal_tunables.ksnd_io_timeout * HZ;
742
743         list_add (&conn->ksnc_list, &peer->ksnp_conns);
744         atomic_inc (&conn->ksnc_refcount);
745
746         sched = ksocknal_choose_scheduler_locked (irq);
747         sched->kss_nconns++;
748         conn->ksnc_scheduler = sched;
749
750         /* NB my callbacks block while I hold ksnd_global_lock */
751         sock->sk->sk_user_data = conn;
752         sock->sk->sk_data_ready = ksocknal_data_ready;
753         sock->sk->sk_write_space = ksocknal_write_space;
754
755         /* Take all the packets blocking for a connection.
756          * NB, it might be nicer to share these blocked packets among any
757          * other connections that are becoming established, however that
758          * confuses the normal packet launching operation, which selects a
759          * connection and queues the packet on it without needing an
760          * exclusive lock on ksnd_global_lock. */
761         while (!list_empty (&peer->ksnp_tx_queue)) {
762                 tx = list_entry (peer->ksnp_tx_queue.next,
763                                  ksock_tx_t, tx_list);
764
765                 list_del (&tx->tx_list);
766                 ksocknal_queue_tx_locked (tx, conn);
767         }
768
769         rc = ksocknal_close_stale_conns_locked (peer, incarnation);
770
771         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
772
773         if (rc != 0)
774                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
775                         rc, conn->ksnc_peer->ksnp_nid,
776                         HIPQUAD(conn->ksnc_ipaddr));
777
778         if (bind_irq)                           /* irq binding required */
779                 ksocknal_bind_irq (irq);
780
781         /* Call the callbacks right now to get things going. */
782         ksocknal_data_ready (sock->sk, 0);
783         ksocknal_write_space (sock->sk);
784
785         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64" ip %d.%d.%d.%d\n",
786                conn, conn->ksnc_peer->ksnp_nid, HIPQUAD(conn->ksnc_ipaddr));
787
788         ksocknal_put_conn (conn);
789         return (0);
790 }
791
792 void
793 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
794 {
795         /* This just does the immmediate housekeeping, and queues the
796          * connection for the reaper to terminate.
797          * Caller holds ksnd_global_lock exclusively in irq context */
798         ksock_peer_t   *peer = conn->ksnc_peer;
799         ksock_route_t  *route;
800
801         LASSERT (peer->ksnp_error == 0);
802         LASSERT (!conn->ksnc_closing);
803         conn->ksnc_closing = 1;
804         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
805         
806         route = conn->ksnc_route;
807         if (route != NULL) {
808                 /* dissociate conn from route... */
809                 LASSERT (!route->ksnr_deleted);
810                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
811                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
812
813                 route->ksnr_connected &= ~(1 << conn->ksnc_type);
814                 conn->ksnc_route = NULL;
815
816                 list_del (&route->ksnr_list);   /* make route least favourite */
817                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
818                 
819                 ksocknal_put_route (route);     /* drop conn's ref on route */
820         }
821
822         /* ksnd_deathrow_conns takes over peer's ref */
823         list_del (&conn->ksnc_list);
824
825         if (list_empty (&peer->ksnp_conns)) {
826                 /* No more connections to this peer */
827
828                 peer->ksnp_error = error;       /* stash last conn close reason */
829
830                 if (list_empty (&peer->ksnp_routes)) {
831                         /* I've just closed last conn belonging to a
832                          * non-autoconnecting peer */
833                         ksocknal_unlink_peer_locked (peer);
834                 }
835         }
836
837         spin_lock (&ksocknal_data.ksnd_reaper_lock);
838
839         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
840         wake_up (&ksocknal_data.ksnd_reaper_waitq);
841                 
842         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
843 }
844
845 void
846 ksocknal_terminate_conn (ksock_conn_t *conn)
847 {
848         /* This gets called by the reaper (guaranteed thread context) to
849          * disengage the socket from its callbacks and close it.
850          * ksnc_refcount will eventually hit zero, and then the reaper will
851          * destroy it. */
852         unsigned long   flags;
853         ksock_peer_t   *peer = conn->ksnc_peer;
854         ksock_sched_t  *sched = conn->ksnc_scheduler;
855         struct timeval  now;
856         time_t          then = 0;
857         int             notify = 0;
858
859         LASSERT(conn->ksnc_closing);
860
861         /* wake up the scheduler to "send" all remaining packets to /dev/null */
862         spin_lock_irqsave(&sched->kss_lock, flags);
863
864         if (!conn->ksnc_tx_scheduled &&
865             !list_empty(&conn->ksnc_tx_queue)){
866                 list_add_tail (&conn->ksnc_tx_list,
867                                &sched->kss_tx_conns);
868                 /* a closing conn is always ready to tx */
869                 conn->ksnc_tx_ready = 1;
870                 conn->ksnc_tx_scheduled = 1;
871                 /* extra ref for scheduler */
872                 atomic_inc (&conn->ksnc_refcount);
873
874                 wake_up (&sched->kss_waitq);
875         }
876
877         spin_unlock_irqrestore (&sched->kss_lock, flags);
878
879         /* serialise with callbacks */
880         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
881
882         /* Remove conn's network callbacks.
883          * NB I _have_ to restore the callback, rather than storing a noop,
884          * since the socket could survive past this module being unloaded!! */
885         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
886         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
887
888         /* A callback could be in progress already; they hold a read lock
889          * on ksnd_global_lock (to serialise with me) and NOOP if
890          * sk_user_data is NULL. */
891         conn->ksnc_sock->sk->sk_user_data = NULL;
892
893         /* OK, so this conn may not be completely disengaged from its
894          * scheduler yet, but it _has_ committed to terminate... */
895         conn->ksnc_scheduler->kss_nconns--;
896
897         if (peer->ksnp_error != 0) {
898                 /* peer's last conn closed in error */
899                 LASSERT (list_empty (&peer->ksnp_conns));
900                 
901                 /* convert peer's last-known-alive timestamp from jiffies */
902                 do_gettimeofday (&now);
903                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
904                 notify = 1;
905         }
906         
907         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
908
909         /* The socket is closed on the final put; either here, or in
910          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
911          * when the connection was established, this will close the socket
912          * immediately, aborting anything buffered in it. Any hung
913          * zero-copy transmits will therefore complete in finite time. */
914         ksocknal_putconnsock (conn);
915
916         if (notify)
917                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
918                             0, then);
919 }
920
921 void
922 ksocknal_destroy_conn (ksock_conn_t *conn)
923 {
924         /* Final coup-de-grace of the reaper */
925         CDEBUG (D_NET, "connection %p\n", conn);
926
927         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
928         LASSERT (conn->ksnc_route == NULL);
929         LASSERT (!conn->ksnc_tx_scheduled);
930         LASSERT (!conn->ksnc_rx_scheduled);
931         LASSERT (list_empty(&conn->ksnc_tx_queue));
932
933         /* complete current receive if any */
934         switch (conn->ksnc_rx_state) {
935         case SOCKNAL_RX_BODY:
936                 CERROR("Completing partial receive from "LPX64
937                        ", ip %d.%d.%d.%d:%d, with error\n",
938                        conn->ksnc_peer->ksnp_nid,
939                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
940                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
941                 break;
942         case SOCKNAL_RX_BODY_FWD:
943                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
944                 break;
945         case SOCKNAL_RX_HEADER:
946         case SOCKNAL_RX_SLOP:
947                 break;
948         default:
949                 LBUG ();
950                 break;
951         }
952
953         ksocknal_put_peer (conn->ksnc_peer);
954
955         PORTAL_FREE (conn, sizeof (*conn));
956         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
957 }
958
959 void
960 ksocknal_put_conn (ksock_conn_t *conn)
961 {
962         unsigned long flags;
963
964         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
965                 conn, conn->ksnc_peer->ksnp_nid,
966                 atomic_read (&conn->ksnc_refcount));
967
968         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
969         if (!atomic_dec_and_test (&conn->ksnc_refcount))
970                 return;
971
972         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
973
974         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
975         wake_up (&ksocknal_data.ksnd_reaper_waitq);
976
977         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
978 }
979
980 int
981 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
982 {
983         ksock_conn_t       *conn;
984         struct list_head   *ctmp;
985         struct list_head   *cnxt;
986         int                 count = 0;
987
988         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
989                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
990
991                 if (ipaddr == 0 ||
992                     conn->ksnc_ipaddr == ipaddr) {
993                         count++;
994                         ksocknal_close_conn_locked (conn, why);
995                 }
996         }
997
998         return (count);
999 }
1000
1001 int
1002 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1003 {
1004         ksock_conn_t       *conn;
1005         struct list_head   *ctmp;
1006         struct list_head   *cnxt;
1007         int                 count = 0;
1008
1009         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1010                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1011
1012                 if (conn->ksnc_incarnation == incarnation)
1013                         continue;
1014
1015                 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1016                       "incarnation:"LPX64"("LPX64")\n",
1017                       peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1018                       conn->ksnc_incarnation, incarnation);
1019                 
1020                 count++;
1021                 ksocknal_close_conn_locked (conn, -ESTALE);
1022         }
1023
1024         return (count);
1025 }
1026
1027 int
1028 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why) 
1029 {
1030         ksock_peer_t     *peer = conn->ksnc_peer;
1031         __u32             ipaddr = conn->ksnc_ipaddr;
1032         unsigned long     flags;
1033         int               count;
1034
1035         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1036
1037         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1038         
1039         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1040
1041         return (count);
1042 }
1043
1044 int
1045 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1046 {
1047         unsigned long       flags;
1048         ksock_peer_t       *peer;
1049         struct list_head   *ptmp;
1050         struct list_head   *pnxt;
1051         int                 lo;
1052         int                 hi;
1053         int                 i;
1054         int                 count = 0;
1055
1056         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1057
1058         if (nid != PTL_NID_ANY)
1059                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1060         else {
1061                 lo = 0;
1062                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1063         }
1064
1065         for (i = lo; i <= hi; i++) {
1066                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1067
1068                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1069
1070                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1071                                 continue;
1072
1073                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1074                 }
1075         }
1076
1077         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1078
1079         /* wildcards always succeed */
1080         if (nid == PTL_NID_ANY || ipaddr == 0)
1081                 return (0);
1082         
1083         return (count == 0 ? -ENOENT : 0);
1084 }
1085
1086 void
1087 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1088 {
1089         /* The router is telling me she's been notified of a change in
1090          * gateway state.... */
1091
1092         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1093
1094         if (!alive) {
1095                 /* If the gateway crashed, close all open connections... */
1096                 ksocknal_close_matching_conns (gw_nid, 0);
1097                 return;
1098         }
1099         
1100         /* ...otherwise do nothing.  We can only establish new connections
1101          * if we have autroutes, and these connect on demand. */
1102 }
1103
1104 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1105 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1106 {
1107         return &(sk->tp_pinfo.af_tcp);
1108 }
1109 #else
1110 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1111 {
1112         struct tcp_sock *s = (struct tcp_sock *)sk;
1113         return &s->tcp;
1114 }
1115 #endif
1116
1117 void
1118 ksocknal_push_conn (ksock_conn_t *conn)
1119 {
1120         struct sock    *sk;
1121         struct tcp_opt *tp;
1122         int             nonagle;
1123         int             val = 1;
1124         int             rc;
1125         mm_segment_t    oldmm;
1126
1127         rc = ksocknal_getconnsock (conn);
1128         if (rc != 0)                            /* being shut down */
1129                 return;
1130         
1131         sk = conn->ksnc_sock->sk;
1132         tp = sock2tcp_opt(sk);
1133         
1134         lock_sock (sk);
1135         nonagle = tp->nonagle;
1136         tp->nonagle = 1;
1137         release_sock (sk);
1138
1139         oldmm = get_fs ();
1140         set_fs (KERNEL_DS);
1141
1142         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1143                                       (char *)&val, sizeof (val));
1144         LASSERT (rc == 0);
1145
1146         set_fs (oldmm);
1147
1148         lock_sock (sk);
1149         tp->nonagle = nonagle;
1150         release_sock (sk);
1151
1152         ksocknal_putconnsock (conn);
1153 }
1154
1155 void
1156 ksocknal_push_peer (ksock_peer_t *peer)
1157 {
1158         int               index;
1159         int               i;
1160         struct list_head *tmp;
1161         ksock_conn_t     *conn;
1162
1163         for (index = 0; ; index++) {
1164                 read_lock (&ksocknal_data.ksnd_global_lock);
1165
1166                 i = 0;
1167                 conn = NULL;
1168
1169                 list_for_each (tmp, &peer->ksnp_conns) {
1170                         if (i++ == index) {
1171                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1172                                 atomic_inc (&conn->ksnc_refcount);
1173                                 break;
1174                         }
1175                 }
1176
1177                 read_unlock (&ksocknal_data.ksnd_global_lock);
1178
1179                 if (conn == NULL)
1180                         break;
1181
1182                 ksocknal_push_conn (conn);
1183                 ksocknal_put_conn (conn);
1184         }
1185 }
1186
1187 int
1188 ksocknal_push (ptl_nid_t nid)
1189 {
1190         ksock_peer_t      *peer;
1191         struct list_head  *tmp;
1192         int                index;
1193         int                i;
1194         int                j;
1195         int                rc = -ENOENT;
1196
1197         if (nid != PTL_NID_ANY) {
1198                 peer = ksocknal_get_peer (nid);
1199
1200                 if (peer != NULL) {
1201                         rc = 0;
1202                         ksocknal_push_peer (peer);
1203                         ksocknal_put_peer (peer);
1204                 }
1205                 return (rc);
1206         }
1207
1208         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1209                 for (j = 0; ; j++) {
1210                         read_lock (&ksocknal_data.ksnd_global_lock);
1211
1212                         index = 0;
1213                         peer = NULL;
1214
1215                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1216                                 if (index++ == j) {
1217                                         peer = list_entry(tmp, ksock_peer_t,
1218                                                           ksnp_list);
1219                                         atomic_inc (&peer->ksnp_refcount);
1220                                         break;
1221                                 }
1222                         }
1223
1224                         read_unlock (&ksocknal_data.ksnd_global_lock);
1225
1226                         if (peer != NULL) {
1227                                 rc = 0;
1228                                 ksocknal_push_peer (peer);
1229                                 ksocknal_put_peer (peer);
1230                         }
1231                 }
1232
1233         }
1234
1235         return (rc);
1236 }
1237
1238 int
1239 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1240 {
1241         int rc = -EINVAL;
1242
1243         LASSERT (pcfg != NULL);
1244
1245         switch(pcfg->pcfg_command) {
1246         case NAL_CMD_GET_AUTOCONN: {
1247                 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1248
1249                 if (route == NULL)
1250                         rc = -ENOENT;
1251                 else {
1252                         rc = 0;
1253                         pcfg->pcfg_nid   = route->ksnr_peer->ksnp_nid;
1254                         pcfg->pcfg_id    = route->ksnr_ipaddr;
1255                         pcfg->pcfg_misc  = route->ksnr_port;
1256                         pcfg->pcfg_count = route->ksnr_conn_count;
1257                         pcfg->pcfg_size  = route->ksnr_buffer_size;
1258                         pcfg->pcfg_wait  = route->ksnr_sharecount;
1259                         pcfg->pcfg_flags = (route->ksnr_irq_affinity ? 2 : 0) |
1260                                            (route->ksnr_eager        ? 4 : 0);
1261                         ksocknal_put_route (route);
1262                 }
1263                 break;
1264         }
1265         case NAL_CMD_ADD_AUTOCONN: {
1266                 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1267                                          pcfg->pcfg_misc, pcfg->pcfg_size,
1268                                          (pcfg->pcfg_flags & 0x02) != 0,
1269                                          (pcfg->pcfg_flags & 0x04) != 0,
1270                                          (pcfg->pcfg_flags & 0x08) != 0);
1271                 break;
1272         }
1273         case NAL_CMD_DEL_AUTOCONN: {
1274                 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id, 
1275                                          (pcfg->pcfg_flags & 1) != 0,
1276                                          (pcfg->pcfg_flags & 2) != 0);
1277                 break;
1278         }
1279         case NAL_CMD_GET_CONN: {
1280                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1281
1282                 if (conn == NULL)
1283                         rc = -ENOENT;
1284                 else {
1285                         rc = 0;
1286                         pcfg->pcfg_nid   = conn->ksnc_peer->ksnp_nid;
1287                         pcfg->pcfg_id    = conn->ksnc_ipaddr;
1288                         pcfg->pcfg_misc  = conn->ksnc_port;
1289                         pcfg->pcfg_flags = conn->ksnc_type;
1290                         ksocknal_put_conn (conn);
1291                 }
1292                 break;
1293         }
1294         case NAL_CMD_REGISTER_PEER_FD: {
1295                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1296                 int            type = pcfg->pcfg_misc;
1297
1298                 if (sock == NULL)
1299                         break;
1300
1301                 switch (type) {
1302                 case SOCKNAL_CONN_NONE:
1303                 case SOCKNAL_CONN_ANY:
1304                 case SOCKNAL_CONN_CONTROL:
1305                 case SOCKNAL_CONN_BULK_IN:
1306                 case SOCKNAL_CONN_BULK_OUT:
1307                         rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1308                 default:
1309                         break;
1310                 }
1311                 if (rc != 0)
1312                         fput (sock->file);
1313                 break;
1314         }
1315         case NAL_CMD_CLOSE_CONNECTION: {
1316                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid, 
1317                                                     pcfg->pcfg_id);
1318                 break;
1319         }
1320         case NAL_CMD_REGISTER_MYNID: {
1321                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1322                 break;
1323         }
1324         case NAL_CMD_PUSH_CONNECTION: {
1325                 rc = ksocknal_push (pcfg->pcfg_nid);
1326                 break;
1327         }
1328         }
1329
1330         return rc;
1331 }
1332
1333 void
1334 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1335 {
1336         int          npages = p->fmp_buff_pages;
1337         ksock_fmb_t *fmb;
1338         int          i;
1339
1340         LASSERT (list_empty(&p->fmp_blocked_conns));
1341         LASSERT (p->fmp_nactive_fmbs == 0);
1342         
1343         while (!list_empty(&p->fmp_idle_fmbs)) {
1344
1345                 fmb = list_entry(p->fmp_idle_fmbs.next,
1346                                  ksock_fmb_t, fmb_list);
1347                 
1348                 for (i = 0; i < npages; i++)
1349                         if (fmb->fmb_kiov[i].kiov_page != NULL)
1350                                 __free_page(fmb->fmb_kiov[i].kiov_page);
1351
1352                 list_del(&fmb->fmb_list);
1353                 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1354         }
1355 }
1356
1357 void
1358 ksocknal_free_buffers (void)
1359 {
1360         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1361         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1362
1363         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1364
1365         if (ksocknal_data.ksnd_schedulers != NULL)
1366                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1367                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1368
1369         PORTAL_FREE (ksocknal_data.ksnd_peers,
1370                      sizeof (struct list_head) * 
1371                      ksocknal_data.ksnd_peer_hash_size);
1372 }
1373
1374 void
1375 ksocknal_api_shutdown (nal_t *nal)
1376 {
1377         int   i;
1378
1379         if (nal->nal_refct != 0) {
1380                 /* This module got the first ref */
1381                 PORTAL_MODULE_UNUSE;
1382                 return;
1383         }
1384
1385         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1386                atomic_read (&portal_kmemory));
1387
1388         LASSERT(nal == &ksocknal_api);
1389
1390         switch (ksocknal_data.ksnd_init) {
1391         default:
1392                 LASSERT (0);
1393
1394         case SOCKNAL_INIT_ALL:
1395                 libcfs_nal_cmd_unregister(SOCKNAL);
1396
1397                 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
1398                 /* fall through */
1399
1400         case SOCKNAL_INIT_LIB:
1401                 /* No more calls to ksocknal_cmd() to create new
1402                  * autoroutes/connections since we're being unloaded. */
1403
1404                 /* Delete all autoroute entries */
1405                 ksocknal_del_route(PTL_NID_ANY, 0, 0, 0);
1406
1407                 /* Delete all connections */
1408                 ksocknal_close_matching_conns (PTL_NID_ANY, 0);
1409                 
1410                 /* Wait for all peer state to clean up */
1411                 i = 2;
1412                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1413                         i++;
1414                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1415                                "waiting for %d peers to disconnect\n",
1416                                atomic_read (&ksocknal_data.ksnd_npeers));
1417                         set_current_state (TASK_UNINTERRUPTIBLE);
1418                         schedule_timeout (HZ);
1419                 }
1420
1421                 /* Tell lib we've stopped calling into her. */
1422                 lib_fini(&ksocknal_lib);
1423
1424                 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1425                 /* fall through */
1426
1427         case SOCKNAL_INIT_DATA:
1428                 /* Module refcount only gets to zero when all peers
1429                  * have been closed so all lists must be empty */
1430                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1431                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1432                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1433                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1434                 }
1435                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1436                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1437                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1438                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1439                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1440
1441                 if (ksocknal_data.ksnd_schedulers != NULL)
1442                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1443                                 ksock_sched_t *kss =
1444                                         &ksocknal_data.ksnd_schedulers[i];
1445
1446                                 LASSERT (list_empty (&kss->kss_tx_conns));
1447                                 LASSERT (list_empty (&kss->kss_rx_conns));
1448                                 LASSERT (kss->kss_nconns == 0);
1449                         }
1450
1451                 /* stop router calling me */
1452                 kpr_shutdown (&ksocknal_data.ksnd_router);
1453
1454                 /* flag threads to terminate; wake and wait for them to die */
1455                 ksocknal_data.ksnd_shuttingdown = 1;
1456                 mb();
1457                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1458                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1459
1460                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1461                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1462
1463                 i = 4;
1464                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1465                         i++;
1466                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1467                                "waiting for %d threads to terminate\n",
1468                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1469                         set_current_state (TASK_UNINTERRUPTIBLE);
1470                         schedule_timeout (HZ);
1471                 }
1472
1473                 kpr_deregister (&ksocknal_data.ksnd_router);
1474
1475                 ksocknal_free_buffers();
1476
1477                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
1478                 /* fall through */
1479
1480         case SOCKNAL_INIT_NOTHING:
1481                 break;
1482         }
1483
1484         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1485                atomic_read (&portal_kmemory));
1486
1487         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1488                atomic_read(&portal_kmemory));
1489 }
1490
1491
1492 void
1493 ksocknal_init_incarnation (void)
1494 {
1495         struct timeval tv;
1496
1497         /* The incarnation number is the time this module loaded and it
1498          * identifies this particular instance of the socknal.  Hopefully
1499          * we won't be able to reboot more frequently than 1MHz for the
1500          * forseeable future :) */
1501         
1502         do_gettimeofday(&tv);
1503         
1504         ksocknal_data.ksnd_incarnation = 
1505                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1506 }
1507
1508 int
1509 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1510                       ptl_ni_limits_t *requested_limits,
1511                       ptl_ni_limits_t *actual_limits)
1512 {
1513         ptl_process_id_t  process_id;
1514         int               pkmem = atomic_read(&portal_kmemory);
1515         int               rc;
1516         int               i;
1517         int               j;
1518
1519         LASSERT (nal == &ksocknal_api);
1520
1521         if (nal->nal_refct != 0) {
1522                 if (actual_limits != NULL)
1523                         *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
1524                 /* This module got the first ref */
1525                 PORTAL_MODULE_USE;
1526                 return (PTL_OK);
1527         }
1528
1529         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1530
1531         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1532
1533         ksocknal_init_incarnation();
1534         
1535         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1536         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1537                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1538         if (ksocknal_data.ksnd_peers == NULL)
1539                 return (-ENOMEM);
1540
1541         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1542                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1543
1544         rwlock_init(&ksocknal_data.ksnd_global_lock);
1545
1546         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1547         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1548         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1549         ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
1550
1551         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1552         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1553         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1554         ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
1555
1556         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1557         INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1558         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1559         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1560         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1561
1562         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1563         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1564         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1565
1566         /* NB memset above zeros whole of ksocknal_data, including
1567          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1568
1569         /* flag lists/ptrs/locks initialised */
1570         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1571
1572         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1573                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1574         if (ksocknal_data.ksnd_schedulers == NULL) {
1575                 ksocknal_api_shutdown (nal);
1576                 return (-ENOMEM);
1577         }
1578
1579         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1580                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1581
1582                 spin_lock_init (&kss->kss_lock);
1583                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1584                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1585 #if SOCKNAL_ZC
1586                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1587 #endif
1588                 init_waitqueue_head (&kss->kss_waitq);
1589         }
1590
1591         /* NB we have to wait to be told our true NID... */
1592         process_id.pid = 0;
1593         process_id.nid = 0;
1594         
1595         rc = lib_init(&ksocknal_lib, nal, process_id,
1596                       requested_limits, actual_limits);
1597         if (rc != PTL_OK) {
1598                 CERROR("lib_init failed: error %d\n", rc);
1599                 ksocknal_api_shutdown (nal);
1600                 return (rc);
1601         }
1602
1603         ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
1604
1605         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1606                 rc = ksocknal_thread_start (ksocknal_scheduler,
1607                                             &ksocknal_data.ksnd_schedulers[i]);
1608                 if (rc != 0) {
1609                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1610                                i, rc);
1611                         ksocknal_api_shutdown (nal);
1612                         return (rc);
1613                 }
1614         }
1615
1616         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1617                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1618                 if (rc != 0) {
1619                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1620                         ksocknal_api_shutdown (nal);
1621                         return (rc);
1622                 }
1623         }
1624
1625         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1626         if (rc != 0) {
1627                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1628                 ksocknal_api_shutdown (nal);
1629                 return (rc);
1630         }
1631
1632         rc = kpr_register(&ksocknal_data.ksnd_router,
1633                           &ksocknal_router_interface);
1634         if (rc != 0) {
1635                 CDEBUG(D_NET, "Can't initialise routing interface "
1636                        "(rc = %d): not routing\n", rc);
1637         } else {
1638                 /* Only allocate forwarding buffers if there's a router */
1639
1640                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1641                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1642                         ksock_fmb_t      *fmb;
1643                         ksock_fmb_pool_t *pool;
1644                         
1645
1646                         if (i < SOCKNAL_SMALL_FWD_NMSGS)
1647                                 pool = &ksocknal_data.ksnd_small_fmp;
1648                         else
1649                                 pool = &ksocknal_data.ksnd_large_fmp;
1650                         
1651                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t, 
1652                                                    fmb_kiov[pool->fmp_buff_pages]));
1653                         if (fmb == NULL) {
1654                                 ksocknal_api_shutdown(nal);
1655                                 return (-ENOMEM);
1656                         }
1657
1658                         fmb->fmb_pool = pool;
1659                         
1660                         for (j = 0; j < pool->fmp_buff_pages; j++) {
1661                                 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
1662
1663                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
1664                                         ksocknal_api_shutdown (nal);
1665                                         return (-ENOMEM);
1666                                 }
1667
1668                                 LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
1669                         }
1670
1671                         list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
1672                 }
1673         }
1674
1675         rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
1676         if (rc != 0) {
1677                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1678                 ksocknal_api_shutdown (nal);
1679                 return (rc);
1680         }
1681
1682         /* flag everything initialised */
1683         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1684
1685         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1686                "(Routing %s, initial mem %d, incarnation "LPX64")\n",
1687                kpr_routing (&ksocknal_data.ksnd_router) ?
1688                "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
1689
1690         return (0);
1691 }
1692
1693 void __exit
1694 ksocknal_module_fini (void)
1695 {
1696 #ifdef CONFIG_SYSCTL
1697         if (ksocknal_tunables.ksnd_sysctl != NULL)
1698                 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
1699 #endif
1700         PtlNIFini(ksocknal_ni);
1701
1702         ptl_unregister_nal(SOCKNAL);
1703 }
1704
1705 int __init
1706 ksocknal_module_init (void)
1707 {
1708         int    rc;
1709
1710         /* packet descriptor must fit in a router descriptor's scratchpad */
1711         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1712         /* the following must be sizeof(int) for proc_dointvec() */
1713         LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
1714         LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
1715         LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
1716         LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
1717 #if SOCKNAL_ZC
1718         LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
1719 #endif
1720         /* check ksnr_connected/connecting field large enough */
1721         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1722         
1723         ksocknal_api.nal_ni_init = ksocknal_api_startup;
1724         ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
1725
1726         /* Initialise dynamic tunables to defaults once only */
1727         ksocknal_tunables.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1728         ksocknal_tunables.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1729         ksocknal_tunables.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1730         ksocknal_tunables.ksnd_min_bulk   = SOCKNAL_MIN_BULK;
1731 #if SOCKNAL_ZC
1732         ksocknal_tunables.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1733 #endif
1734
1735         rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
1736         if (rc != PTL_OK) {
1737                 CERROR("Can't register SOCKNAL: %d\n", rc);
1738                 return (-ENOMEM);               /* or something... */
1739         }
1740
1741         /* Pure gateways want the NAL started up at module load time... */
1742         rc = PtlNIInit(SOCKNAL, 0, NULL, NULL, &ksocknal_ni);
1743         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
1744                 ptl_unregister_nal(SOCKNAL);
1745                 return (-ENODEV);
1746         }
1747         
1748 #ifdef CONFIG_SYSCTL
1749         /* Press on regardless even if registering sysctl doesn't work */
1750         ksocknal_tunables.ksnd_sysctl = 
1751                 register_sysctl_table (ksocknal_top_ctl_table, 0);
1752 #endif
1753         return (0);
1754 }
1755
1756 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1757 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1758 MODULE_LICENSE("GPL");
1759
1760 module_init(ksocknal_module_init);
1761 module_exit(ksocknal_module_fini);
1762