Whamcloud - gitweb
* Added ENOMEM detection and retry on socknal sends
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40         kprni_notify:     ksocknal_notify,
41 };
42
43 #define SOCKNAL_SYSCTL  200
44
45 #define SOCKNAL_SYSCTL_TIMEOUT     1
46 #define SOCKNAL_SYSCTL_EAGER_ACK   2
47 #define SOCKNAL_SYSCTL_ZERO_COPY   3
48 #define SOCKNAL_SYSCTL_TYPED       4
49 #define SOCKNAL_SYSCTL_MIN_BULK    5
50
51 static ctl_table ksocknal_ctl_table[] = {
52         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
53          &ksocknal_data.ksnd_io_timeout, sizeof (int),
54          0644, NULL, &proc_dointvec},
55         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
56          &ksocknal_data.ksnd_eager_ack, sizeof (int),
57          0644, NULL, &proc_dointvec},
58 #if SOCKNAL_ZC
59         {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", 
60          &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
61          0644, NULL, &proc_dointvec},
62 #endif
63         {SOCKNAL_SYSCTL_TYPED, "typed", 
64          &ksocknal_data.ksnd_typed_conns, sizeof (int),
65          0644, NULL, &proc_dointvec},
66         {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk", 
67          &ksocknal_data.ksnd_min_bulk, sizeof (int),
68          0644, NULL, &proc_dointvec},
69         { 0 }
70 };
71
72 static ctl_table ksocknal_top_ctl_table[] = {
73         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
74         { 0 }
75 };
76
77 int
78 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
79                        void *ret, size_t ret_len)
80 {
81         ksock_nal_data_t *k;
82         nal_cb_t *nal_cb;
83
84         k = nal->nal_data;
85         nal_cb = k->ksnd_nal_cb;
86
87         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
88         return PTL_OK;
89 }
90
91 int
92 ksocknal_api_shutdown(nal_t *nal, int ni)
93 {
94         CDEBUG (D_NET, "closing all connections\n");
95
96         ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
97         ksocknal_close_matching_conns (PTL_NID_ANY, 0);
98         return PTL_OK;
99 }
100
101 void
102 ksocknal_api_yield(nal_t *nal)
103 {
104         our_cond_resched();
105         return;
106 }
107
108 void
109 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
110 {
111         ksock_nal_data_t *k;
112         nal_cb_t *nal_cb;
113
114         k = nal->nal_data;
115         nal_cb = k->ksnd_nal_cb;
116         nal_cb->cb_cli(nal_cb,flags);
117 }
118
119 void
120 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
121 {
122         ksock_nal_data_t *k;
123         nal_cb_t *nal_cb;
124
125         k = nal->nal_data;
126         nal_cb = k->ksnd_nal_cb;
127         nal_cb->cb_sti(nal_cb,flags);
128 }
129
130 nal_t *
131 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
132               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
133 {
134         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
135         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
136         return (&ksocknal_api);
137 }
138
139 /*
140  *  EXTRA functions follow
141  */
142
143 int
144 ksocknal_set_mynid(ptl_nid_t nid)
145 {
146         lib_ni_t *ni = &ksocknal_lib.ni;
147
148         /* FIXME: we have to do this because we call lib_init() at module
149          * insertion time, which is before we have 'mynid' available.  lib_init
150          * sets the NAL's nid, which it uses to tell other nodes where packets
151          * are coming from.  This is not a very graceful solution to this
152          * problem. */
153
154         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
155                nid, ni->nid);
156
157         ni->nid = nid;
158         return (0);
159 }
160
161 void
162 ksocknal_bind_irq (unsigned int irq)
163 {
164 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
165         int              bind;
166         unsigned long    flags;
167         char             cmdline[64];
168         ksock_irqinfo_t *info;
169         char            *argv[] = {"/bin/sh",
170                                    "-c",
171                                    cmdline,
172                                    NULL};
173         char            *envp[] = {"HOME=/",
174                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
175                                    NULL};
176
177         LASSERT (irq < NR_IRQS);
178         if (irq == 0)                           /* software NIC */
179                 return;
180
181         info = &ksocknal_data.ksnd_irqinfo[irq];
182
183         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
184
185         LASSERT (info->ksni_valid);
186         bind = !info->ksni_bound;
187         info->ksni_bound = 1;
188
189         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
190
191         if (!bind)                              /* bound already */
192                 return;
193
194         snprintf (cmdline, sizeof (cmdline),
195                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
196
197         printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
198                 irq, info->ksni_sched, cmdline);
199
200         /* FIXME: Find a better method of setting IRQ affinity...
201          */
202
203         call_usermodehelper (argv[0], argv, envp);
204 #endif
205 }
206
207 ksock_route_t *
208 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
209                        int nonagel, int irq_affinity, int eager)
210 {
211         ksock_route_t *route;
212
213         PORTAL_ALLOC (route, sizeof (*route));
214         if (route == NULL)
215                 return (NULL);
216
217         atomic_set (&route->ksnr_refcount, 1);
218         route->ksnr_sharecount = 0;
219         route->ksnr_peer = NULL;
220         route->ksnr_timeout = jiffies;
221         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
222         route->ksnr_ipaddr = ipaddr;
223         route->ksnr_port = port;
224         route->ksnr_buffer_size = buffer_size;
225         route->ksnr_irq_affinity = irq_affinity;
226         route->ksnr_nonagel = nonagel;
227         route->ksnr_eager = eager;
228         route->ksnr_connecting = 0;
229         route->ksnr_connected = 0;
230         route->ksnr_deleted = 0;
231         route->ksnr_conn_count = 0;
232
233         return (route);
234 }
235
236 void
237 ksocknal_destroy_route (ksock_route_t *route)
238 {
239         LASSERT (route->ksnr_sharecount == 0);
240
241         if (route->ksnr_peer != NULL)
242                 ksocknal_put_peer (route->ksnr_peer);
243
244         PORTAL_FREE (route, sizeof (*route));
245 }
246
247 void
248 ksocknal_put_route (ksock_route_t *route)
249 {
250         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
251                 route, atomic_read (&route->ksnr_refcount));
252
253         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
254         if (!atomic_dec_and_test (&route->ksnr_refcount))
255              return;
256
257         ksocknal_destroy_route (route);
258 }
259
260 ksock_peer_t *
261 ksocknal_create_peer (ptl_nid_t nid)
262 {
263         ksock_peer_t *peer;
264
265         LASSERT (nid != PTL_NID_ANY);
266
267         PORTAL_ALLOC (peer, sizeof (*peer));
268         if (peer == NULL)
269                 return (NULL);
270
271         memset (peer, 0, sizeof (*peer));
272
273         peer->ksnp_nid = nid;
274         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
275         peer->ksnp_closing = 0;
276         INIT_LIST_HEAD (&peer->ksnp_conns);
277         INIT_LIST_HEAD (&peer->ksnp_routes);
278         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
279
280         /* Can't unload while peers exist; ensures all I/O has terminated
281          * before unload attempts */
282         PORTAL_MODULE_USE;
283         atomic_inc (&ksocknal_data.ksnd_npeers);
284         return (peer);
285 }
286
287 void
288 ksocknal_destroy_peer (ksock_peer_t *peer)
289 {
290         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
291
292         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
293         LASSERT (list_empty (&peer->ksnp_conns));
294         LASSERT (list_empty (&peer->ksnp_routes));
295         LASSERT (list_empty (&peer->ksnp_tx_queue));
296
297         PORTAL_FREE (peer, sizeof (*peer));
298
299         /* NB a peer's connections and autoconnect routes keep a reference
300          * on their peer until they are destroyed, so we can be assured
301          * that _all_ state to do with this peer has been cleaned up when
302          * its refcount drops to zero. */
303         atomic_dec (&ksocknal_data.ksnd_npeers);
304         PORTAL_MODULE_UNUSE;
305 }
306
307 void
308 ksocknal_put_peer (ksock_peer_t *peer)
309 {
310         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
311                 peer, peer->ksnp_nid,
312                 atomic_read (&peer->ksnp_refcount));
313
314         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
315         if (!atomic_dec_and_test (&peer->ksnp_refcount))
316                 return;
317
318         ksocknal_destroy_peer (peer);
319 }
320
321 ksock_peer_t *
322 ksocknal_find_peer_locked (ptl_nid_t nid)
323 {
324         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
325         struct list_head *tmp;
326         ksock_peer_t     *peer;
327
328         list_for_each (tmp, peer_list) {
329
330                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
331
332                 LASSERT (!peer->ksnp_closing);
333                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
334                            list_empty (&peer->ksnp_conns)));
335
336                 if (peer->ksnp_nid != nid)
337                         continue;
338
339                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
340                        peer, nid, atomic_read (&peer->ksnp_refcount));
341                 return (peer);
342         }
343         return (NULL);
344 }
345
346 ksock_peer_t *
347 ksocknal_get_peer (ptl_nid_t nid)
348 {
349         ksock_peer_t     *peer;
350
351         read_lock (&ksocknal_data.ksnd_global_lock);
352         peer = ksocknal_find_peer_locked (nid);
353         if (peer != NULL)                       /* +1 ref for caller? */
354                 atomic_inc (&peer->ksnp_refcount);
355         read_unlock (&ksocknal_data.ksnd_global_lock);
356
357         return (peer);
358 }
359
360 void
361 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
362 {
363         LASSERT (!peer->ksnp_closing);
364         peer->ksnp_closing = 1;
365         list_del (&peer->ksnp_list);
366         /* lose peerlist's ref */
367         ksocknal_put_peer (peer);
368 }
369
370 ksock_route_t *
371 ksocknal_get_route_by_idx (int index)
372 {
373         ksock_peer_t      *peer;
374         struct list_head  *ptmp;
375         ksock_route_t     *route;
376         struct list_head  *rtmp;
377         int                i;
378
379         read_lock (&ksocknal_data.ksnd_global_lock);
380
381         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
382                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
383                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
384
385                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
386                                    list_empty (&peer->ksnp_conns)));
387
388                         list_for_each (rtmp, &peer->ksnp_routes) {
389                                 if (index-- > 0)
390                                         continue;
391
392                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
393                                 atomic_inc (&route->ksnr_refcount);
394                                 read_unlock (&ksocknal_data.ksnd_global_lock);
395                                 return (route);
396                         }
397                 }
398         }
399
400         read_unlock (&ksocknal_data.ksnd_global_lock);
401         return (NULL);
402 }
403
404 int
405 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
406                     int nonagle, int bind_irq, int share, int eager)
407 {
408         unsigned long      flags;
409         ksock_peer_t      *peer;
410         ksock_peer_t      *peer2;
411         ksock_route_t     *route;
412         struct list_head  *rtmp;
413         ksock_route_t     *route2;
414         
415         if (nid == PTL_NID_ANY)
416                 return (-EINVAL);
417
418         /* Have a brand new peer ready... */
419         peer = ksocknal_create_peer (nid);
420         if (peer == NULL)
421                 return (-ENOMEM);
422
423         route = ksocknal_create_route (ipaddr, port, bufnob, 
424                                        nonagle, bind_irq, eager);
425         if (route == NULL) {
426                 ksocknal_put_peer (peer);
427                 return (-ENOMEM);
428         }
429
430         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
431
432         peer2 = ksocknal_find_peer_locked (nid);
433         if (peer2 != NULL) {
434                 ksocknal_put_peer (peer);
435                 peer = peer2;
436         } else {
437                 /* peer table takes existing ref on peer */
438                 list_add (&peer->ksnp_list,
439                           ksocknal_nid2peerlist (nid));
440         }
441
442         route2 = NULL;
443         if (share) {
444                 /* check for existing route to this NID via this ipaddr */
445                 list_for_each (rtmp, &peer->ksnp_routes) {
446                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
447                         
448                         if (route2->ksnr_ipaddr == ipaddr)
449                                 break;
450
451                         route2 = NULL;
452                 }
453         }
454
455         if (route2 != NULL) {
456                 ksocknal_put_route (route);
457                 route = route2;
458         } else {
459                 /* route takes a ref on peer */
460                 route->ksnr_peer = peer;
461                 atomic_inc (&peer->ksnp_refcount);
462                 /* peer's route list takes existing ref on route */
463                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
464         }
465         
466         route->ksnr_sharecount++;
467
468         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
469
470         return (0);
471 }
472
473 void
474 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
475 {
476         ksock_peer_t     *peer = route->ksnr_peer;
477         ksock_conn_t     *conn;
478         struct list_head *ctmp;
479         struct list_head *cnxt;
480
481         if (!share)
482                 route->ksnr_sharecount = 0;
483         else {
484                 route->ksnr_sharecount--;
485                 if (route->ksnr_sharecount != 0)
486                         return;
487         }
488
489         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
490                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
491
492                 if (conn->ksnc_route != route)
493                         continue;
494                 
495                 if (!keep_conn) {
496                         ksocknal_close_conn_locked (conn, 0);
497                         continue;
498                 }
499                 
500                 /* keeping the conn; just dissociate it and route... */
501                 conn->ksnc_route = NULL;
502                 ksocknal_put_route (route); /* drop conn's ref on route */
503         }
504         
505         route->ksnr_deleted = 1;
506         list_del (&route->ksnr_list);
507         ksocknal_put_route (route);             /* drop peer's ref */
508
509         if (list_empty (&peer->ksnp_routes) &&
510             list_empty (&peer->ksnp_conns)) {
511                 /* I've just removed the last autoconnect route of a peer
512                  * with no active connections */
513                 ksocknal_unlink_peer_locked (peer);
514         }
515 }
516
517 int
518 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
519 {
520         unsigned long      flags;
521         struct list_head  *ptmp;
522         struct list_head  *pnxt;
523         ksock_peer_t      *peer;
524         struct list_head  *rtmp;
525         struct list_head  *rnxt;
526         ksock_route_t     *route;
527         int                lo;
528         int                hi;
529         int                i;
530         int                rc = -ENOENT;
531
532         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
533
534         if (nid != PTL_NID_ANY)
535                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
536         else {
537                 lo = 0;
538                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
539         }
540
541         for (i = lo; i <= hi; i++) {
542                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
543                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
544
545                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
546                                 continue;
547
548                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
549                                 route = list_entry (rtmp, ksock_route_t,
550                                                     ksnr_list);
551
552                                 if (!(ipaddr == 0 ||
553                                       route->ksnr_ipaddr == ipaddr))
554                                         continue;
555
556                                 ksocknal_del_route_locked (route, share, keep_conn);
557                                 rc = 0;         /* matched something */
558                                 if (share)
559                                         goto out;
560                         }
561                 }
562         }
563  out:
564         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
565
566         return (rc);
567 }
568
569 ksock_conn_t *
570 ksocknal_get_conn_by_idx (int index)
571 {
572         ksock_peer_t      *peer;
573         struct list_head  *ptmp;
574         ksock_conn_t      *conn;
575         struct list_head  *ctmp;
576         int                i;
577
578         read_lock (&ksocknal_data.ksnd_global_lock);
579
580         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
581                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
582                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
583
584                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
585                                    list_empty (&peer->ksnp_conns)));
586
587                         list_for_each (ctmp, &peer->ksnp_conns) {
588                                 if (index-- > 0)
589                                         continue;
590
591                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
592                                 atomic_inc (&conn->ksnc_refcount);
593                                 read_unlock (&ksocknal_data.ksnd_global_lock);
594                                 return (conn);
595                         }
596                 }
597         }
598
599         read_unlock (&ksocknal_data.ksnd_global_lock);
600         return (NULL);
601 }
602
603 void
604 ksocknal_get_peer_addr (ksock_conn_t *conn)
605 {
606         struct sockaddr_in sin;
607         int                len = sizeof (sin);
608         int                rc;
609         
610         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
611                                             (struct sockaddr *)&sin, &len, 2);
612         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
613         LASSERT (!conn->ksnc_closing);
614         LASSERT (len <= sizeof (sin));
615
616         if (rc != 0) {
617                 CERROR ("Error %d getting sock peer IP\n", rc);
618                 return;
619         }
620
621         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
622         conn->ksnc_port   = ntohs (sin.sin_port);
623 }
624
625 unsigned int
626 ksocknal_conn_irq (ksock_conn_t *conn)
627 {
628         int                irq = 0;
629         struct dst_entry  *dst;
630
631         dst = sk_dst_get (conn->ksnc_sock->sk);
632         if (dst != NULL) {
633                 if (dst->dev != NULL) {
634                         irq = dst->dev->irq;
635                         if (irq >= NR_IRQS) {
636                                 CERROR ("Unexpected IRQ %x\n", irq);
637                                 irq = 0;
638                         }
639                 }
640                 dst_release (dst);
641         }
642         
643         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
644         LASSERT (!conn->ksnc_closing);
645         return (irq);
646 }
647
648 ksock_sched_t *
649 ksocknal_choose_scheduler_locked (unsigned int irq)
650 {
651         ksock_sched_t    *sched;
652         ksock_irqinfo_t  *info;
653         int               i;
654
655         LASSERT (irq < NR_IRQS);
656         info = &ksocknal_data.ksnd_irqinfo[irq];
657
658         if (irq != 0 &&                         /* hardware NIC */
659             info->ksni_valid) {                 /* already set up */
660                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
661         }
662
663         /* software NIC (irq == 0) || not associated with a scheduler yet.
664          * Choose the CPU with the fewest connections... */
665         sched = &ksocknal_data.ksnd_schedulers[0];
666         for (i = 1; i < SOCKNAL_N_SCHED; i++)
667                 if (sched->kss_nconns >
668                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
669                         sched = &ksocknal_data.ksnd_schedulers[i];
670
671         if (irq != 0) {                         /* Hardware NIC */
672                 info->ksni_valid = 1;
673                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
674
675                 /* no overflow... */
676                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
677         }
678
679         return (sched);
680 }
681
682 int
683 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
684                       int bind_irq, int type)
685 {
686         ptl_nid_t          nid;
687         __u64              incarnation;
688         unsigned long      flags;
689         ksock_conn_t      *conn;
690         ksock_peer_t      *peer;
691         ksock_peer_t      *peer2;
692         ksock_sched_t     *sched;
693         unsigned int       irq;
694         ksock_tx_t        *tx;
695         int                rc;
696
697         /* NB, sock has an associated file since (a) this connection might
698          * have been created in userland and (b) we need to refcount the
699          * socket so that we don't close it while I/O is being done on
700          * it, and sock->file has that pre-cooked... */
701         LASSERT (sock->file != NULL);
702         LASSERT (file_count(sock->file) > 0);
703
704         rc = ksocknal_setup_sock (sock);
705         if (rc != 0)
706                 return (rc);
707
708         if (route == NULL) {
709                 /* acceptor or explicit connect */
710                 nid = PTL_NID_ANY;
711         } else {
712                 LASSERT (type != SOCKNAL_CONN_NONE);
713                 /* autoconnect: expect this nid on exchange */
714                 nid = route->ksnr_peer->ksnp_nid;
715         }
716
717         rc = ksocknal_hello (sock, &nid, &type, &incarnation);
718         if (rc != 0)
719                 return (rc);
720         
721         peer = NULL;
722         if (route == NULL) {                    /* not autoconnect */
723                 /* Assume this socket connects to a brand new peer */
724                 peer = ksocknal_create_peer (nid);
725                 if (peer == NULL)
726                         return (-ENOMEM);
727         }
728
729         PORTAL_ALLOC(conn, sizeof(*conn));
730         if (conn == NULL) {
731                 if (peer != NULL)
732                         ksocknal_put_peer (peer);
733                 return (-ENOMEM);
734         }
735
736         memset (conn, 0, sizeof (*conn));
737         conn->ksnc_peer = NULL;
738         conn->ksnc_route = NULL;
739         conn->ksnc_sock = sock;
740         conn->ksnc_type = type;
741         conn->ksnc_incarnation = incarnation;
742         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
743         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
744         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
745
746         conn->ksnc_rx_ready = 0;
747         conn->ksnc_rx_scheduled = 0;
748         ksocknal_new_packet (conn, 0);
749
750         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
751         conn->ksnc_tx_ready = 0;
752         conn->ksnc_tx_scheduled = 0;
753         atomic_set (&conn->ksnc_tx_nob, 0);
754
755         ksocknal_get_peer_addr (conn);
756
757         irq = ksocknal_conn_irq (conn);
758
759         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
760
761         if (route != NULL) {
762                 /* Autoconnected! */
763                 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
764                 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
765
766                 if (route->ksnr_deleted) {
767                         /* This conn was autoconnected, but the autoconnect
768                          * route got deleted while it was being
769                          * established! */
770                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
771                                                  flags);
772                         PORTAL_FREE (conn, sizeof (*conn));
773                         return (-ESTALE);
774                 }
775
776
777                 /* associate conn/route */
778                 conn->ksnc_route = route;
779                 atomic_inc (&route->ksnr_refcount);
780
781                 route->ksnr_connecting &= ~(1 << type);
782                 route->ksnr_connected  |= (1 << type);
783                 route->ksnr_conn_count++;
784                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
785
786                 peer = route->ksnr_peer;
787         } else {
788                 /* Not an autoconnected connection; see if there is an
789                  * existing peer for this NID */
790                 peer2 = ksocknal_find_peer_locked (nid);
791                 if (peer2 != NULL) {
792                         ksocknal_put_peer (peer);
793                         peer = peer2;
794                 } else {
795                         list_add (&peer->ksnp_list,
796                                   ksocknal_nid2peerlist (nid));
797                         /* peer list takes over existing ref */
798                 }
799         }
800
801         LASSERT (!peer->ksnp_closing);
802
803         conn->ksnc_peer = peer;
804         atomic_inc (&peer->ksnp_refcount);
805         peer->ksnp_last_alive = jiffies;
806         peer->ksnp_error = 0;
807
808         list_add (&conn->ksnc_list, &peer->ksnp_conns);
809         atomic_inc (&conn->ksnc_refcount);
810
811         sched = ksocknal_choose_scheduler_locked (irq);
812         sched->kss_nconns++;
813         conn->ksnc_scheduler = sched;
814
815         /* NB my callbacks block while I hold ksnd_global_lock */
816         sock->sk->sk_user_data = conn;
817         sock->sk->sk_data_ready = ksocknal_data_ready;
818         sock->sk->sk_write_space = ksocknal_write_space;
819
820         /* Take all the packets blocking for a connection.
821          * NB, it might be nicer to share these blocked packets among any
822          * other connections that are becoming established, however that
823          * confuses the normal packet launching operation, which selects a
824          * connection and queues the packet on it without needing an
825          * exclusive lock on ksnd_global_lock. */
826         while (!list_empty (&peer->ksnp_tx_queue)) {
827                 tx = list_entry (peer->ksnp_tx_queue.next,
828                                  ksock_tx_t, tx_list);
829
830                 list_del (&tx->tx_list);
831                 ksocknal_queue_tx_locked (tx, conn);
832         }
833
834         rc = ksocknal_close_stale_conns_locked (peer, incarnation);
835
836         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
837
838         if (rc != 0)
839                 CERROR ("Closed %d stale conns to "LPX64"\n", rc, nid);
840
841         if (bind_irq)                           /* irq binding required */
842                 ksocknal_bind_irq (irq);
843
844         /* Call the callbacks right now to get things going. */
845         ksocknal_data_ready (sock->sk, 0);
846         ksocknal_write_space (sock->sk);
847
848         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
849                conn, conn->ksnc_peer->ksnp_nid);
850
851         ksocknal_put_conn (conn);
852         return (0);
853 }
854
855 void
856 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
857 {
858         /* This just does the immmediate housekeeping, and queues the
859          * connection for the reaper to terminate. 
860          * Caller holds ksnd_global_lock exclusively in irq context */
861         ksock_peer_t   *peer = conn->ksnc_peer;
862         ksock_route_t  *route;
863
864         LASSERT (peer->ksnp_error == 0);
865         LASSERT (!conn->ksnc_closing);
866         conn->ksnc_closing = 1;
867         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
868         
869         route = conn->ksnc_route;
870         if (route != NULL) {
871                 /* dissociate conn from route... */
872                 LASSERT (!route->ksnr_deleted);
873                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
874                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
875
876                 route->ksnr_connected &= ~(1 << conn->ksnc_type);
877                 conn->ksnc_route = NULL;
878
879                 list_del (&route->ksnr_list);   /* make route least favourite */
880                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
881                 
882                 ksocknal_put_route (route);     /* drop conn's ref on route */
883         }
884
885         /* ksnd_deathrow_conns takes over peer's ref */
886         list_del (&conn->ksnc_list);
887
888         if (list_empty (&peer->ksnp_conns)) {
889                 /* No more connections to this peer */
890
891                 peer->ksnp_error = error;       /* stash last conn close reason */
892
893                 if (list_empty (&peer->ksnp_routes)) {
894                         /* I've just closed last conn belonging to a
895                          * non-autoconnecting peer */
896                         ksocknal_unlink_peer_locked (peer);
897                 }
898         }
899
900         spin_lock (&ksocknal_data.ksnd_reaper_lock);
901
902         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
903         wake_up (&ksocknal_data.ksnd_reaper_waitq);
904                 
905         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
906 }
907
908 void
909 ksocknal_terminate_conn (ksock_conn_t *conn)
910 {
911         /* This gets called by the reaper (guaranteed thread context) to
912          * disengage the socket from its callbacks and close it.
913          * ksnc_refcount will eventually hit zero, and then the reaper will
914          * destroy it. */
915         unsigned long   flags;
916         ksock_peer_t   *peer = conn->ksnc_peer;
917         ksock_sched_t  *sched = conn->ksnc_scheduler;
918         struct timeval  now;
919         time_t          then = 0;
920         int             notify = 0;
921
922         LASSERT(conn->ksnc_closing);
923
924         /* wake up the scheduler to "send" all remaining packets to /dev/null */
925         spin_lock_irqsave(&sched->kss_lock, flags);
926
927         if (!conn->ksnc_tx_scheduled &&
928             !list_empty(&conn->ksnc_tx_queue)){
929                 list_add_tail (&conn->ksnc_tx_list,
930                                &sched->kss_tx_conns);
931                 /* a closing conn is always ready to tx */
932                 conn->ksnc_tx_ready = 1;
933                 conn->ksnc_tx_scheduled = 1;
934                 /* extra ref for scheduler */
935                 atomic_inc (&conn->ksnc_refcount);
936
937                 wake_up (&sched->kss_waitq);
938         }
939
940         spin_unlock_irqrestore (&sched->kss_lock, flags);
941
942         /* serialise with callbacks */
943         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
944
945         /* Remove conn's network callbacks.
946          * NB I _have_ to restore the callback, rather than storing a noop,
947          * since the socket could survive past this module being unloaded!! */
948         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
949         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
950
951         /* A callback could be in progress already; they hold a read lock
952          * on ksnd_global_lock (to serialise with me) and NOOP if
953          * sk_user_data is NULL. */
954         conn->ksnc_sock->sk->sk_user_data = NULL;
955
956         /* OK, so this conn may not be completely disengaged from its
957          * scheduler yet, but it _has_ committed to terminate... */
958         conn->ksnc_scheduler->kss_nconns--;
959
960         if (peer->ksnp_error != 0) {
961                 /* peer's last conn closed in error */
962                 LASSERT (list_empty (&peer->ksnp_conns));
963                 
964                 /* convert peer's last-known-alive timestamp from jiffies */
965                 do_gettimeofday (&now);
966                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
967                 notify = 1;
968         }
969         
970         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
971
972         /* The socket is closed on the final put; either here, or in
973          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
974          * when the connection was established, this will close the socket
975          * immediately, aborting anything buffered in it. Any hung
976          * zero-copy transmits will therefore complete in finite time. */
977         ksocknal_putconnsock (conn);
978
979         if (notify)
980                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
981                             0, then);
982 }
983
984 void
985 ksocknal_destroy_conn (ksock_conn_t *conn)
986 {
987         /* Final coup-de-grace of the reaper */
988         CDEBUG (D_NET, "connection %p\n", conn);
989
990         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
991         LASSERT (conn->ksnc_route == NULL);
992         LASSERT (!conn->ksnc_tx_scheduled);
993         LASSERT (!conn->ksnc_rx_scheduled);
994         LASSERT (list_empty(&conn->ksnc_tx_queue));
995
996         /* complete current receive if any */
997         switch (conn->ksnc_rx_state) {
998         case SOCKNAL_RX_BODY:
999 #if 0
1000                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
1001 #else
1002                 CERROR ("Refusing to complete a partial receive from "
1003                         LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid,
1004                         conn->ksnc_ipaddr);
1005                 CERROR ("This may hang communications and "
1006                         "prevent modules from unloading\n");
1007 #endif
1008                 break;
1009         case SOCKNAL_RX_BODY_FWD:
1010                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1011                 break;
1012         case SOCKNAL_RX_HEADER:
1013         case SOCKNAL_RX_SLOP:
1014                 break;
1015         default:
1016                 LBUG ();
1017                 break;
1018         }
1019
1020         ksocknal_put_peer (conn->ksnc_peer);
1021
1022         PORTAL_FREE (conn, sizeof (*conn));
1023         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1024 }
1025
1026 void
1027 ksocknal_put_conn (ksock_conn_t *conn)
1028 {
1029         unsigned long flags;
1030
1031         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1032                 conn, conn->ksnc_peer->ksnp_nid,
1033                 atomic_read (&conn->ksnc_refcount));
1034
1035         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1036         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1037                 return;
1038
1039         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1040
1041         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1042         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1043
1044         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1045 }
1046
1047 int
1048 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1049 {
1050         ksock_conn_t       *conn;
1051         struct list_head   *ctmp;
1052         struct list_head   *cnxt;
1053         int                 count = 0;
1054
1055         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1056                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1057
1058                 if (ipaddr == 0 ||
1059                     conn->ksnc_ipaddr == ipaddr) {
1060                         count++;
1061                         ksocknal_close_conn_locked (conn, why);
1062                 }
1063         }
1064
1065         return (count);
1066 }
1067
1068 int
1069 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1070 {
1071         ksock_conn_t       *conn;
1072         struct list_head   *ctmp;
1073         struct list_head   *cnxt;
1074         int                 count = 0;
1075
1076         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1077                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1078
1079                 if (conn->ksnc_incarnation == incarnation)
1080                         continue;
1081                 
1082                 count++;
1083                 ksocknal_close_conn_locked (conn, -ESTALE);
1084         }
1085
1086         return (count);
1087 }
1088
1089 int
1090 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why) 
1091 {
1092         ksock_peer_t     *peer = conn->ksnc_peer;
1093         __u32             ipaddr = conn->ksnc_ipaddr;
1094         unsigned long     flags;
1095         int               count;
1096
1097         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1098
1099         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1100         
1101         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1102
1103         return (count);
1104 }
1105
1106 int
1107 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1108 {
1109         unsigned long       flags;
1110         ksock_peer_t       *peer;
1111         struct list_head   *ptmp;
1112         struct list_head   *pnxt;
1113         int                 lo;
1114         int                 hi;
1115         int                 i;
1116         int                 count = 0;
1117
1118         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1119
1120         if (nid != PTL_NID_ANY)
1121                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1122         else {
1123                 lo = 0;
1124                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1125         }
1126
1127         for (i = lo; i <= hi; i++) {
1128                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1129
1130                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1131
1132                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1133                                 continue;
1134
1135                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1136                 }
1137         }
1138
1139         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1140
1141         /* wildcards always succeed */
1142         if (nid == PTL_NID_ANY || ipaddr == 0)
1143                 return (0);
1144         
1145         return (count == 0 ? -ENOENT : 0);
1146 }
1147
1148 void
1149 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1150 {
1151         /* The router is telling me she's been notified of a change in
1152          * gateway state.... */
1153
1154         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1155
1156         if (!alive) {
1157                 /* If the gateway crashed, close all open connections... */
1158                 ksocknal_close_matching_conns (gw_nid, 0);
1159                 return;
1160         }
1161         
1162         /* ...otherwise do nothing.  We can only establish new connections
1163          * if we have autroutes, and these connect on demand. */
1164 }
1165
1166 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1167 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1168 {
1169         return &(sk->tp_pinfo.af_tcp);
1170 }
1171 #else
1172 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1173 {
1174         struct tcp_sock *s = (struct tcp_sock *)sk;
1175         return &s->tcp;
1176 }
1177 #endif
1178
1179 void
1180 ksocknal_push_conn (ksock_conn_t *conn)
1181 {
1182         struct sock    *sk;
1183         struct tcp_opt *tp;
1184         int             nonagle;
1185         int             val = 1;
1186         int             rc;
1187         mm_segment_t    oldmm;
1188
1189         rc = ksocknal_getconnsock (conn);
1190         if (rc != 0)                            /* being shut down */
1191                 return;
1192         
1193         sk = conn->ksnc_sock->sk;
1194         tp = sock2tcp_opt(sk);
1195         
1196         lock_sock (sk);
1197         nonagle = tp->nonagle;
1198         tp->nonagle = 1;
1199         release_sock (sk);
1200
1201         oldmm = get_fs ();
1202         set_fs (KERNEL_DS);
1203
1204         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1205                                       (char *)&val, sizeof (val));
1206         LASSERT (rc == 0);
1207
1208         set_fs (oldmm);
1209
1210         lock_sock (sk);
1211         tp->nonagle = nonagle;
1212         release_sock (sk);
1213
1214         ksocknal_putconnsock (conn);
1215 }
1216
1217 void
1218 ksocknal_push_peer (ksock_peer_t *peer)
1219 {
1220         int               index;
1221         int               i;
1222         struct list_head *tmp;
1223         ksock_conn_t     *conn;
1224
1225         for (index = 0; ; index++) {
1226                 read_lock (&ksocknal_data.ksnd_global_lock);
1227
1228                 i = 0;
1229                 conn = NULL;
1230
1231                 list_for_each (tmp, &peer->ksnp_conns) {
1232                         if (i++ == index) {
1233                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1234                                 atomic_inc (&conn->ksnc_refcount);
1235                                 break;
1236                         }
1237                 }
1238
1239                 read_unlock (&ksocknal_data.ksnd_global_lock);
1240
1241                 if (conn == NULL)
1242                         break;
1243
1244                 ksocknal_push_conn (conn);
1245                 ksocknal_put_conn (conn);
1246         }
1247 }
1248
1249 int
1250 ksocknal_push (ptl_nid_t nid)
1251 {
1252         ksock_peer_t      *peer;
1253         struct list_head  *tmp;
1254         int                index;
1255         int                i;
1256         int                j;
1257         int                rc = -ENOENT;
1258
1259         if (nid != PTL_NID_ANY) {
1260                 peer = ksocknal_get_peer (nid);
1261
1262                 if (peer != NULL) {
1263                         rc = 0;
1264                         ksocknal_push_peer (peer);
1265                         ksocknal_put_peer (peer);
1266                 }
1267                 return (rc);
1268         }
1269
1270         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1271                 for (j = 0; ; j++) {
1272                         read_lock (&ksocknal_data.ksnd_global_lock);
1273
1274                         index = 0;
1275                         peer = NULL;
1276
1277                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1278                                 if (index++ == j) {
1279                                         peer = list_entry(tmp, ksock_peer_t,
1280                                                           ksnp_list);
1281                                         atomic_inc (&peer->ksnp_refcount);
1282                                         break;
1283                                 }
1284                         }
1285
1286                         read_unlock (&ksocknal_data.ksnd_global_lock);
1287
1288                         if (peer != NULL) {
1289                                 rc = 0;
1290                                 ksocknal_push_peer (peer);
1291                                 ksocknal_put_peer (peer);
1292                         }
1293                 }
1294
1295         }
1296
1297         return (rc);
1298 }
1299
1300 int
1301 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1302 {
1303         int rc = -EINVAL;
1304
1305         LASSERT (pcfg != NULL);
1306
1307         switch(pcfg->pcfg_command) {
1308         case NAL_CMD_GET_AUTOCONN: {
1309                 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1310
1311                 if (route == NULL)
1312                         rc = -ENOENT;
1313                 else {
1314                         rc = 0;
1315                         pcfg->pcfg_nid   = route->ksnr_peer->ksnp_nid;
1316                         pcfg->pcfg_id    = route->ksnr_ipaddr;
1317                         pcfg->pcfg_misc  = route->ksnr_port;
1318                         pcfg->pcfg_count = route->ksnr_conn_count;
1319                         pcfg->pcfg_size  = route->ksnr_buffer_size;
1320                         pcfg->pcfg_wait  = route->ksnr_sharecount;
1321                         pcfg->pcfg_flags = (route->ksnr_nonagel      ? 1 : 0) |
1322                                            (route->ksnr_irq_affinity ? 2 : 0) |
1323                                            (route->ksnr_eager        ? 4 : 0);
1324                         ksocknal_put_route (route);
1325                 }
1326                 break;
1327         }
1328         case NAL_CMD_ADD_AUTOCONN: {
1329                 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1330                                          pcfg->pcfg_misc, pcfg->pcfg_size,
1331                                          (pcfg->pcfg_flags & 0x01) != 0,
1332                                          (pcfg->pcfg_flags & 0x02) != 0,
1333                                          (pcfg->pcfg_flags & 0x04) != 0,
1334                                          (pcfg->pcfg_flags & 0x08) != 0);
1335                 break;
1336         }
1337         case NAL_CMD_DEL_AUTOCONN: {
1338                 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id, 
1339                                          (pcfg->pcfg_flags & 1) != 0,
1340                                          (pcfg->pcfg_flags & 2) != 0);
1341                 break;
1342         }
1343         case NAL_CMD_GET_CONN: {
1344                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1345
1346                 if (conn == NULL)
1347                         rc = -ENOENT;
1348                 else {
1349                         rc = 0;
1350                         pcfg->pcfg_nid   = conn->ksnc_peer->ksnp_nid;
1351                         pcfg->pcfg_id    = conn->ksnc_ipaddr;
1352                         pcfg->pcfg_misc  = conn->ksnc_port;
1353                         pcfg->pcfg_flags = conn->ksnc_type;
1354                         ksocknal_put_conn (conn);
1355                 }
1356                 break;
1357         }
1358         case NAL_CMD_REGISTER_PEER_FD: {
1359                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1360                 int            type = pcfg->pcfg_misc;
1361
1362                 if (sock == NULL)
1363                         break;
1364
1365                 switch (type) {
1366                 case SOCKNAL_CONN_NONE:
1367                 case SOCKNAL_CONN_ANY:
1368                 case SOCKNAL_CONN_CONTROL:
1369                 case SOCKNAL_CONN_BULK_IN:
1370                 case SOCKNAL_CONN_BULK_OUT:
1371                         rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1372                 default:
1373                         break;
1374                 }
1375                 if (rc != 0)
1376                         fput (sock->file);
1377                 break;
1378         }
1379         case NAL_CMD_CLOSE_CONNECTION: {
1380                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid, 
1381                                                     pcfg->pcfg_id);
1382                 break;
1383         }
1384         case NAL_CMD_REGISTER_MYNID: {
1385                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1386                 break;
1387         }
1388         case NAL_CMD_PUSH_CONNECTION: {
1389                 rc = ksocknal_push (pcfg->pcfg_nid);
1390                 break;
1391         }
1392         }
1393
1394         return rc;
1395 }
1396
1397 void
1398 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1399 {
1400         ksock_fmb_t *fmb;
1401         int          i;
1402
1403         LASSERT (list_empty(&p->fmp_blocked_conns));
1404         LASSERT (p->fmp_nactive_fmbs == 0);
1405         
1406         while (!list_empty(&p->fmp_idle_fmbs)) {
1407
1408                 fmb = list_entry(p->fmp_idle_fmbs.next,
1409                                  ksock_fmb_t, fmb_list);
1410                 
1411                 for (i = 0; i < fmb->fmb_npages; i++)
1412                         if (fmb->fmb_pages[i] != NULL)
1413                                 __free_page(fmb->fmb_pages[i]);
1414                 
1415                 list_del(&fmb->fmb_list);
1416                 PORTAL_FREE(fmb, sizeof(*fmb));
1417         }
1418 }
1419
1420 void
1421 ksocknal_free_buffers (void)
1422 {
1423         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1424         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1425
1426         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1427
1428         if (ksocknal_data.ksnd_schedulers != NULL)
1429                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1430                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1431
1432         PORTAL_FREE (ksocknal_data.ksnd_peers,
1433                      sizeof (struct list_head) * 
1434                      ksocknal_data.ksnd_peer_hash_size);
1435 }
1436
1437 void
1438 ksocknal_module_fini (void)
1439 {
1440         int   i;
1441
1442         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1443                atomic_read (&portal_kmemory));
1444
1445         switch (ksocknal_data.ksnd_init) {
1446         default:
1447                 LASSERT (0);
1448
1449         case SOCKNAL_INIT_ALL:
1450 #if CONFIG_SYSCTL
1451                 if (ksocknal_data.ksnd_sysctl != NULL)
1452                         unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1453 #endif
1454                 kportal_nal_unregister(SOCKNAL);
1455                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1456                 /* fall through */
1457
1458         case SOCKNAL_INIT_PTL:
1459                 PtlNIFini(ksocknal_ni);
1460                 lib_fini(&ksocknal_lib);
1461                 /* fall through */
1462
1463         case SOCKNAL_INIT_DATA:
1464                 /* Module refcount only gets to zero when all peers
1465                  * have been closed so all lists must be empty */
1466                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1467                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1468                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1469                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1470                 }
1471                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1472                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1473                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1474                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1475                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1476
1477                 if (ksocknal_data.ksnd_schedulers != NULL)
1478                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1479                                 ksock_sched_t *kss =
1480                                         &ksocknal_data.ksnd_schedulers[i];
1481
1482                                 LASSERT (list_empty (&kss->kss_tx_conns));
1483                                 LASSERT (list_empty (&kss->kss_rx_conns));
1484                                 LASSERT (kss->kss_nconns == 0);
1485                         }
1486
1487                 /* stop router calling me */
1488                 kpr_shutdown (&ksocknal_data.ksnd_router);
1489
1490                 /* flag threads to terminate; wake and wait for them to die */
1491                 ksocknal_data.ksnd_shuttingdown = 1;
1492                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1493                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1494
1495                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1496                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1497
1498                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1499                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1500                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1501                         set_current_state (TASK_UNINTERRUPTIBLE);
1502                         schedule_timeout (HZ);
1503                 }
1504
1505                 kpr_deregister (&ksocknal_data.ksnd_router);
1506
1507                 ksocknal_free_buffers();
1508                 /* fall through */
1509
1510         case SOCKNAL_INIT_NOTHING:
1511                 break;
1512         }
1513
1514         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1515                atomic_read (&portal_kmemory));
1516
1517         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1518                atomic_read(&portal_kmemory));
1519 }
1520
1521
1522 void __init
1523 ksocknal_init_incarnation (void)
1524 {
1525         struct timeval tv;
1526
1527         /* The incarnation number is the time this module loaded and it
1528          * identifies this particular instance of the socknal.  Hopefully
1529          * we won't be able to reboot more frequently than 1MHz for the
1530          * forseeable future :) */
1531         
1532         do_gettimeofday(&tv);
1533         
1534         ksocknal_data.ksnd_incarnation = 
1535                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1536 }
1537
1538 int __init
1539 ksocknal_module_init (void)
1540 {
1541         int   pkmem = atomic_read(&portal_kmemory);
1542         int   rc;
1543         int   i;
1544         int   j;
1545
1546         /* packet descriptor must fit in a router descriptor's scratchpad */
1547         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1548         /* the following must be sizeof(int) for proc_dointvec() */
1549         LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1550         LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1551         /* check ksnr_connected/connecting field large enough */
1552         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1553         
1554         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1555
1556         ksocknal_api.forward  = ksocknal_api_forward;
1557         ksocknal_api.shutdown = ksocknal_api_shutdown;
1558         ksocknal_api.yield    = ksocknal_api_yield;
1559         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
1560         ksocknal_api.lock     = ksocknal_api_lock;
1561         ksocknal_api.unlock   = ksocknal_api_unlock;
1562         ksocknal_api.nal_data = &ksocknal_data;
1563
1564         ksocknal_lib.nal_data = &ksocknal_data;
1565
1566         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1567
1568         ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1569         ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1570         ksocknal_data.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1571         ksocknal_data.ksnd_min_bulk   = SOCKNAL_MIN_BULK;
1572 #if SOCKNAL_ZC
1573         ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1574 #endif
1575         ksocknal_init_incarnation();
1576         
1577         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1578         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1579                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1580         if (ksocknal_data.ksnd_peers == NULL)
1581                 return (-ENOMEM);
1582
1583         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1584                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1585
1586         rwlock_init(&ksocknal_data.ksnd_global_lock);
1587
1588         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1589         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1590
1591         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1592         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1593         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1594
1595         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1596         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1597         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1598
1599         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1600         INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1601         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1602         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1603         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1604
1605         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1606         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1607         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1608
1609         /* NB memset above zeros whole of ksocknal_data, including
1610          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1611
1612         /* flag lists/ptrs/locks initialised */
1613         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1614
1615         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1616                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1617         if (ksocknal_data.ksnd_schedulers == NULL) {
1618                 ksocknal_module_fini ();
1619                 return (-ENOMEM);
1620         }
1621
1622         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1623                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1624
1625                 spin_lock_init (&kss->kss_lock);
1626                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1627                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1628 #if SOCKNAL_ZC
1629                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1630 #endif
1631                 init_waitqueue_head (&kss->kss_waitq);
1632         }
1633
1634         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1635         if (rc != 0) {
1636                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1637                 ksocknal_module_fini ();
1638                 return (rc);
1639         }
1640         PtlNIDebug(ksocknal_ni, ~0);
1641
1642         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1643
1644         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1645                 rc = ksocknal_thread_start (ksocknal_scheduler,
1646                                             &ksocknal_data.ksnd_schedulers[i]);
1647                 if (rc != 0) {
1648                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1649                                i, rc);
1650                         ksocknal_module_fini ();
1651                         return (rc);
1652                 }
1653         }
1654
1655         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1656                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1657                 if (rc != 0) {
1658                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1659                         ksocknal_module_fini ();
1660                         return (rc);
1661                 }
1662         }
1663
1664         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1665         if (rc != 0) {
1666                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1667                 ksocknal_module_fini ();
1668                 return (rc);
1669         }
1670
1671         rc = kpr_register(&ksocknal_data.ksnd_router,
1672                           &ksocknal_router_interface);
1673         if (rc != 0) {
1674                 CDEBUG(D_NET, "Can't initialise routing interface "
1675                        "(rc = %d): not routing\n", rc);
1676         } else {
1677                 /* Only allocate forwarding buffers if I'm on a gateway */
1678
1679                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1680                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1681                         ksock_fmb_t *fmb;
1682                         
1683                         PORTAL_ALLOC(fmb, sizeof(*fmb));
1684                         if (fmb == NULL) {
1685                                 ksocknal_module_fini();
1686                                 return (-ENOMEM);
1687                         }
1688
1689                         if (i < SOCKNAL_SMALL_FWD_NMSGS) {
1690                                 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
1691                                 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
1692                         } else {
1693                                 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
1694                                 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
1695                         }
1696
1697                         for (j = 0; j < fmb->fmb_npages; j++) {
1698                                 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
1699
1700                                 if (fmb->fmb_pages[j] == NULL) {
1701                                         ksocknal_module_fini ();
1702                                         return (-ENOMEM);
1703                                 }
1704
1705                                 LASSERT(page_address(fmb->fmb_pages[j]) != NULL);
1706                         }
1707
1708                         list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
1709                 }
1710         }
1711
1712         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1713         if (rc != 0) {
1714                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1715                 ksocknal_module_fini ();
1716                 return (rc);
1717         }
1718
1719         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1720
1721 #ifdef CONFIG_SYSCTL
1722         /* Press on regardless even if registering sysctl doesn't work */
1723         ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1724 #endif
1725         /* flag everything initialised */
1726         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1727
1728         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1729                "(Routing %s, initial mem %d)\n",
1730                kpr_routing (&ksocknal_data.ksnd_router) ?
1731                "enabled" : "disabled", pkmem);
1732
1733         return (0);
1734 }
1735
1736 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1737 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1738 MODULE_LICENSE("GPL");
1739
1740 module_init(ksocknal_module_init);
1741 module_exit(ksocknal_module_fini);
1742
1743 EXPORT_SYMBOL (ksocknal_ni);