Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / portals / knals / socknal / socknal.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 nal_t                   ksocknal_api;
29 ksock_nal_data_t        ksocknal_data;
30 ptl_handle_ni_t         ksocknal_ni;
31 ksock_tunables_t        ksocknal_tunables;
32
33 kpr_nal_interface_t ksocknal_router_interface = {
34         kprni_nalid:      SOCKNAL,
35         kprni_arg:        &ksocknal_data,
36         kprni_fwd:        ksocknal_fwd_packet,
37         kprni_notify:     ksocknal_notify,
38 };
39
40 #ifdef CONFIG_SYSCTL
41 #define SOCKNAL_SYSCTL  200
42
43 #define SOCKNAL_SYSCTL_TIMEOUT     1
44 #define SOCKNAL_SYSCTL_EAGER_ACK   2
45 #define SOCKNAL_SYSCTL_ZERO_COPY   3
46 #define SOCKNAL_SYSCTL_TYPED       4
47 #define SOCKNAL_SYSCTL_MIN_BULK    5
48
49 static ctl_table ksocknal_ctl_table[] = {
50         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
51          &ksocknal_tunables.ksnd_io_timeout, sizeof (int),
52          0644, NULL, &proc_dointvec},
53         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
54          &ksocknal_tunables.ksnd_eager_ack, sizeof (int),
55          0644, NULL, &proc_dointvec},
56 #if SOCKNAL_ZC
57         {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy", 
58          &ksocknal_tunables.ksnd_zc_min_frag, sizeof (int),
59          0644, NULL, &proc_dointvec},
60 #endif
61         {SOCKNAL_SYSCTL_TYPED, "typed", 
62          &ksocknal_tunables.ksnd_typed_conns, sizeof (int),
63          0644, NULL, &proc_dointvec},
64         {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk", 
65          &ksocknal_tunables.ksnd_min_bulk, sizeof (int),
66          0644, NULL, &proc_dointvec},
67         { 0 }
68 };
69
70 static ctl_table ksocknal_top_ctl_table[] = {
71         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
72         { 0 }
73 };
74 #endif
75
76 int
77 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
78                        void *ret, size_t ret_len)
79 {
80         ksock_nal_data_t *k;
81         nal_cb_t *nal_cb;
82
83         k = nal->nal_data;
84         nal_cb = k->ksnd_nal_cb;
85
86         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
87         return PTL_OK;
88 }
89
90 void
91 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
92 {
93         ksock_nal_data_t *k;
94         nal_cb_t *nal_cb;
95
96         k = nal->nal_data;
97         nal_cb = k->ksnd_nal_cb;
98         nal_cb->cb_cli(nal_cb,flags);
99 }
100
101 void
102 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
103 {
104         ksock_nal_data_t *k;
105         nal_cb_t *nal_cb;
106
107         k = nal->nal_data;
108         nal_cb = k->ksnd_nal_cb;
109         nal_cb->cb_sti(nal_cb,flags);
110 }
111
112 int
113 ksocknal_api_yield(nal_t *nal, unsigned long *flags, int milliseconds)
114 {
115         /* NB called holding statelock */
116         wait_queue_t       wait;
117         unsigned long      now = jiffies;
118
119         CDEBUG (D_NET, "yield\n");
120
121         if (milliseconds == 0) {
122                 our_cond_resched();
123                 return 0;
124         }
125
126         init_waitqueue_entry(&wait, current);
127         set_current_state (TASK_INTERRUPTIBLE);
128         add_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
129
130         ksocknal_api_unlock(nal, flags);
131
132         if (milliseconds < 0)
133                 schedule ();
134         else
135                 schedule_timeout((milliseconds * HZ) / 1000);
136         
137         ksocknal_api_lock(nal, flags);
138
139         remove_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
140
141         if (milliseconds > 0) {
142                 milliseconds -= ((jiffies - now) * 1000) / HZ;
143                 if (milliseconds < 0)
144                         milliseconds = 0;
145         }
146         
147         return (milliseconds);
148 }
149
150 int
151 ksocknal_set_mynid(ptl_nid_t nid)
152 {
153         lib_ni_t *ni = &ksocknal_lib.ni;
154
155         /* FIXME: we have to do this because we call lib_init() at module
156          * insertion time, which is before we have 'mynid' available.  lib_init
157          * sets the NAL's nid, which it uses to tell other nodes where packets
158          * are coming from.  This is not a very graceful solution to this
159          * problem. */
160
161         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
162                nid, ni->nid);
163
164         ni->nid = nid;
165         return (0);
166 }
167
168 void
169 ksocknal_bind_irq (unsigned int irq)
170 {
171 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
172         int              bind;
173         unsigned long    flags;
174         char             cmdline[64];
175         ksock_irqinfo_t *info;
176         char            *argv[] = {"/bin/sh",
177                                    "-c",
178                                    cmdline,
179                                    NULL};
180         char            *envp[] = {"HOME=/",
181                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
182                                    NULL};
183
184         LASSERT (irq < NR_IRQS);
185         if (irq == 0)                           /* software NIC */
186                 return;
187
188         info = &ksocknal_data.ksnd_irqinfo[irq];
189
190         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
191
192         LASSERT (info->ksni_valid);
193         bind = !info->ksni_bound;
194         info->ksni_bound = 1;
195
196         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
197
198         if (!bind)                              /* bound already */
199                 return;
200
201         snprintf (cmdline, sizeof (cmdline),
202                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
203
204         printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
205                 irq, info->ksni_sched, cmdline);
206
207         /* FIXME: Find a better method of setting IRQ affinity...
208          */
209
210         USERMODEHELPER(argv[0], argv, envp);
211 #endif
212 }
213
214 ksock_route_t *
215 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
216                        int irq_affinity, int eager)
217 {
218         ksock_route_t *route;
219
220         PORTAL_ALLOC (route, sizeof (*route));
221         if (route == NULL)
222                 return (NULL);
223
224         atomic_set (&route->ksnr_refcount, 1);
225         route->ksnr_sharecount = 0;
226         route->ksnr_peer = NULL;
227         route->ksnr_timeout = jiffies;
228         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
229         route->ksnr_ipaddr = ipaddr;
230         route->ksnr_port = port;
231         route->ksnr_buffer_size = buffer_size;
232         route->ksnr_irq_affinity = irq_affinity;
233         route->ksnr_eager = eager;
234         route->ksnr_connecting = 0;
235         route->ksnr_connected = 0;
236         route->ksnr_deleted = 0;
237         route->ksnr_conn_count = 0;
238
239         return (route);
240 }
241
242 void
243 ksocknal_destroy_route (ksock_route_t *route)
244 {
245         LASSERT (route->ksnr_sharecount == 0);
246
247         if (route->ksnr_peer != NULL)
248                 ksocknal_put_peer (route->ksnr_peer);
249
250         PORTAL_FREE (route, sizeof (*route));
251 }
252
253 void
254 ksocknal_put_route (ksock_route_t *route)
255 {
256         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
257                 route, atomic_read (&route->ksnr_refcount));
258
259         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
260         if (!atomic_dec_and_test (&route->ksnr_refcount))
261              return;
262
263         ksocknal_destroy_route (route);
264 }
265
266 ksock_peer_t *
267 ksocknal_create_peer (ptl_nid_t nid)
268 {
269         ksock_peer_t *peer;
270
271         LASSERT (nid != PTL_NID_ANY);
272
273         PORTAL_ALLOC (peer, sizeof (*peer));
274         if (peer == NULL)
275                 return (NULL);
276
277         memset (peer, 0, sizeof (*peer));
278
279         peer->ksnp_nid = nid;
280         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
281         peer->ksnp_closing = 0;
282         INIT_LIST_HEAD (&peer->ksnp_conns);
283         INIT_LIST_HEAD (&peer->ksnp_routes);
284         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
285
286         atomic_inc (&ksocknal_data.ksnd_npeers);
287         return (peer);
288 }
289
290 void
291 ksocknal_destroy_peer (ksock_peer_t *peer)
292 {
293         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
294
295         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
296         LASSERT (list_empty (&peer->ksnp_conns));
297         LASSERT (list_empty (&peer->ksnp_routes));
298         LASSERT (list_empty (&peer->ksnp_tx_queue));
299
300         PORTAL_FREE (peer, sizeof (*peer));
301
302         /* NB a peer's connections and autoconnect routes keep a reference
303          * on their peer until they are destroyed, so we can be assured
304          * that _all_ state to do with this peer has been cleaned up when
305          * its refcount drops to zero. */
306         atomic_dec (&ksocknal_data.ksnd_npeers);
307 }
308
309 void
310 ksocknal_put_peer (ksock_peer_t *peer)
311 {
312         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
313                 peer, peer->ksnp_nid,
314                 atomic_read (&peer->ksnp_refcount));
315
316         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
317         if (!atomic_dec_and_test (&peer->ksnp_refcount))
318                 return;
319
320         ksocknal_destroy_peer (peer);
321 }
322
323 ksock_peer_t *
324 ksocknal_find_peer_locked (ptl_nid_t nid)
325 {
326         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
327         struct list_head *tmp;
328         ksock_peer_t     *peer;
329
330         list_for_each (tmp, peer_list) {
331
332                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
333
334                 LASSERT (!peer->ksnp_closing);
335                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
336                            list_empty (&peer->ksnp_conns)));
337
338                 if (peer->ksnp_nid != nid)
339                         continue;
340
341                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
342                        peer, nid, atomic_read (&peer->ksnp_refcount));
343                 return (peer);
344         }
345         return (NULL);
346 }
347
348 ksock_peer_t *
349 ksocknal_get_peer (ptl_nid_t nid)
350 {
351         ksock_peer_t     *peer;
352
353         read_lock (&ksocknal_data.ksnd_global_lock);
354         peer = ksocknal_find_peer_locked (nid);
355         if (peer != NULL)                       /* +1 ref for caller? */
356                 atomic_inc (&peer->ksnp_refcount);
357         read_unlock (&ksocknal_data.ksnd_global_lock);
358
359         return (peer);
360 }
361
362 void
363 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
364 {
365         LASSERT (!peer->ksnp_closing);
366         peer->ksnp_closing = 1;
367         list_del (&peer->ksnp_list);
368         /* lose peerlist's ref */
369         ksocknal_put_peer (peer);
370 }
371
372 ksock_route_t *
373 ksocknal_get_route_by_idx (int index)
374 {
375         ksock_peer_t      *peer;
376         struct list_head  *ptmp;
377         ksock_route_t     *route;
378         struct list_head  *rtmp;
379         int                i;
380
381         read_lock (&ksocknal_data.ksnd_global_lock);
382
383         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
384                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
385                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
386
387                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
388                                    list_empty (&peer->ksnp_conns)));
389
390                         list_for_each (rtmp, &peer->ksnp_routes) {
391                                 if (index-- > 0)
392                                         continue;
393
394                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
395                                 atomic_inc (&route->ksnr_refcount);
396                                 read_unlock (&ksocknal_data.ksnd_global_lock);
397                                 return (route);
398                         }
399                 }
400         }
401
402         read_unlock (&ksocknal_data.ksnd_global_lock);
403         return (NULL);
404 }
405
406 int
407 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
408                     int bind_irq, int share, int eager)
409 {
410         unsigned long      flags;
411         ksock_peer_t      *peer;
412         ksock_peer_t      *peer2;
413         ksock_route_t     *route;
414         struct list_head  *rtmp;
415         ksock_route_t     *route2;
416         
417         if (nid == PTL_NID_ANY)
418                 return (-EINVAL);
419
420         /* Have a brand new peer ready... */
421         peer = ksocknal_create_peer (nid);
422         if (peer == NULL)
423                 return (-ENOMEM);
424
425         route = ksocknal_create_route (ipaddr, port, bufnob, 
426                                        bind_irq, eager);
427         if (route == NULL) {
428                 ksocknal_put_peer (peer);
429                 return (-ENOMEM);
430         }
431
432         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
433
434         peer2 = ksocknal_find_peer_locked (nid);
435         if (peer2 != NULL) {
436                 ksocknal_put_peer (peer);
437                 peer = peer2;
438         } else {
439                 /* peer table takes existing ref on peer */
440                 list_add (&peer->ksnp_list,
441                           ksocknal_nid2peerlist (nid));
442         }
443
444         route2 = NULL;
445         if (share) {
446                 /* check for existing route to this NID via this ipaddr */
447                 list_for_each (rtmp, &peer->ksnp_routes) {
448                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
449                         
450                         if (route2->ksnr_ipaddr == ipaddr)
451                                 break;
452
453                         route2 = NULL;
454                 }
455         }
456
457         if (route2 != NULL) {
458                 ksocknal_put_route (route);
459                 route = route2;
460         } else {
461                 /* route takes a ref on peer */
462                 route->ksnr_peer = peer;
463                 atomic_inc (&peer->ksnp_refcount);
464                 /* peer's route list takes existing ref on route */
465                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
466         }
467         
468         route->ksnr_sharecount++;
469
470         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
471
472         return (0);
473 }
474
475 void
476 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
477 {
478         ksock_peer_t     *peer = route->ksnr_peer;
479         ksock_conn_t     *conn;
480         struct list_head *ctmp;
481         struct list_head *cnxt;
482
483         if (!share)
484                 route->ksnr_sharecount = 0;
485         else {
486                 route->ksnr_sharecount--;
487                 if (route->ksnr_sharecount != 0)
488                         return;
489         }
490
491         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
492                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
493
494                 if (conn->ksnc_route != route)
495                         continue;
496                 
497                 if (!keep_conn) {
498                         ksocknal_close_conn_locked (conn, 0);
499                         continue;
500                 }
501                 
502                 /* keeping the conn; just dissociate it and route... */
503                 conn->ksnc_route = NULL;
504                 ksocknal_put_route (route); /* drop conn's ref on route */
505         }
506         
507         route->ksnr_deleted = 1;
508         list_del (&route->ksnr_list);
509         ksocknal_put_route (route);             /* drop peer's ref */
510
511         if (list_empty (&peer->ksnp_routes) &&
512             list_empty (&peer->ksnp_conns)) {
513                 /* I've just removed the last autoconnect route of a peer
514                  * with no active connections */
515                 ksocknal_unlink_peer_locked (peer);
516         }
517 }
518
519 int
520 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
521 {
522         unsigned long      flags;
523         struct list_head  *ptmp;
524         struct list_head  *pnxt;
525         ksock_peer_t      *peer;
526         struct list_head  *rtmp;
527         struct list_head  *rnxt;
528         ksock_route_t     *route;
529         int                lo;
530         int                hi;
531         int                i;
532         int                rc = -ENOENT;
533
534         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
535
536         if (nid != PTL_NID_ANY)
537                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
538         else {
539                 lo = 0;
540                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
541         }
542
543         for (i = lo; i <= hi; i++) {
544                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
545                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
546
547                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
548                                 continue;
549
550                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
551                                 route = list_entry (rtmp, ksock_route_t,
552                                                     ksnr_list);
553
554                                 if (!(ipaddr == 0 ||
555                                       route->ksnr_ipaddr == ipaddr))
556                                         continue;
557
558                                 ksocknal_del_route_locked (route, share, keep_conn);
559                                 rc = 0;         /* matched something */
560                                 if (share)
561                                         goto out;
562                         }
563                 }
564         }
565  out:
566         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
567
568         return (rc);
569 }
570
571 ksock_conn_t *
572 ksocknal_get_conn_by_idx (int index)
573 {
574         ksock_peer_t      *peer;
575         struct list_head  *ptmp;
576         ksock_conn_t      *conn;
577         struct list_head  *ctmp;
578         int                i;
579
580         read_lock (&ksocknal_data.ksnd_global_lock);
581
582         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
583                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
584                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
585
586                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
587                                    list_empty (&peer->ksnp_conns)));
588
589                         list_for_each (ctmp, &peer->ksnp_conns) {
590                                 if (index-- > 0)
591                                         continue;
592
593                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
594                                 atomic_inc (&conn->ksnc_refcount);
595                                 read_unlock (&ksocknal_data.ksnd_global_lock);
596                                 return (conn);
597                         }
598                 }
599         }
600
601         read_unlock (&ksocknal_data.ksnd_global_lock);
602         return (NULL);
603 }
604
605 void
606 ksocknal_get_peer_addr (ksock_conn_t *conn)
607 {
608         struct sockaddr_in sin;
609         int                len = sizeof (sin);
610         int                rc;
611         
612         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
613                                             (struct sockaddr *)&sin, &len, 2);
614         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
615         LASSERT (!conn->ksnc_closing);
616         LASSERT (len <= sizeof (sin));
617
618         if (rc != 0) {
619                 CERROR ("Error %d getting sock peer IP\n", rc);
620                 return;
621         }
622
623         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
624         conn->ksnc_port   = ntohs (sin.sin_port);
625 }
626
627 unsigned int
628 ksocknal_conn_irq (ksock_conn_t *conn)
629 {
630         int                irq = 0;
631         struct dst_entry  *dst;
632
633         dst = sk_dst_get (conn->ksnc_sock->sk);
634         if (dst != NULL) {
635                 if (dst->dev != NULL) {
636                         irq = dst->dev->irq;
637                         if (irq >= NR_IRQS) {
638                                 CERROR ("Unexpected IRQ %x\n", irq);
639                                 irq = 0;
640                         }
641                 }
642                 dst_release (dst);
643         }
644         
645         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
646         LASSERT (!conn->ksnc_closing);
647         return (irq);
648 }
649
650 ksock_sched_t *
651 ksocknal_choose_scheduler_locked (unsigned int irq)
652 {
653         ksock_sched_t    *sched;
654         ksock_irqinfo_t  *info;
655         int               i;
656
657         LASSERT (irq < NR_IRQS);
658         info = &ksocknal_data.ksnd_irqinfo[irq];
659
660         if (irq != 0 &&                         /* hardware NIC */
661             info->ksni_valid) {                 /* already set up */
662                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
663         }
664
665         /* software NIC (irq == 0) || not associated with a scheduler yet.
666          * Choose the CPU with the fewest connections... */
667         sched = &ksocknal_data.ksnd_schedulers[0];
668         for (i = 1; i < SOCKNAL_N_SCHED; i++)
669                 if (sched->kss_nconns >
670                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
671                         sched = &ksocknal_data.ksnd_schedulers[i];
672
673         if (irq != 0) {                         /* Hardware NIC */
674                 info->ksni_valid = 1;
675                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
676
677                 /* no overflow... */
678                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
679         }
680
681         return (sched);
682 }
683
684 int
685 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
686                       int bind_irq, int type)
687 {
688         ptl_nid_t          nid;
689         __u64              incarnation;
690         unsigned long      flags;
691         ksock_conn_t      *conn;
692         ksock_peer_t      *peer;
693         ksock_peer_t      *peer2;
694         ksock_sched_t     *sched;
695         unsigned int       irq;
696         ksock_tx_t        *tx;
697         int                rc;
698
699         /* NB, sock has an associated file since (a) this connection might
700          * have been created in userland and (b) we need to refcount the
701          * socket so that we don't close it while I/O is being done on
702          * it, and sock->file has that pre-cooked... */
703         LASSERT (sock->file != NULL);
704         LASSERT (file_count(sock->file) > 0);
705
706         rc = ksocknal_setup_sock (sock);
707         if (rc != 0)
708                 return (rc);
709
710         if (route == NULL) {
711                 /* acceptor or explicit connect */
712                 nid = PTL_NID_ANY;
713         } else {
714                 LASSERT (type != SOCKNAL_CONN_NONE);
715                 /* autoconnect: expect this nid on exchange */
716                 nid = route->ksnr_peer->ksnp_nid;
717         }
718
719         rc = ksocknal_hello (sock, &nid, &type, &incarnation);
720         if (rc != 0)
721                 return (rc);
722         
723         peer = NULL;
724         if (route == NULL) {                    /* not autoconnect */
725                 /* Assume this socket connects to a brand new peer */
726                 peer = ksocknal_create_peer (nid);
727                 if (peer == NULL)
728                         return (-ENOMEM);
729         }
730
731         PORTAL_ALLOC(conn, sizeof(*conn));
732         if (conn == NULL) {
733                 if (peer != NULL)
734                         ksocknal_put_peer (peer);
735                 return (-ENOMEM);
736         }
737
738         memset (conn, 0, sizeof (*conn));
739         conn->ksnc_peer = NULL;
740         conn->ksnc_route = NULL;
741         conn->ksnc_sock = sock;
742         conn->ksnc_type = type;
743         conn->ksnc_incarnation = incarnation;
744         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
745         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
746         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
747
748         conn->ksnc_rx_ready = 0;
749         conn->ksnc_rx_scheduled = 0;
750         ksocknal_new_packet (conn, 0);
751
752         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
753         conn->ksnc_tx_ready = 0;
754         conn->ksnc_tx_scheduled = 0;
755         atomic_set (&conn->ksnc_tx_nob, 0);
756
757         ksocknal_get_peer_addr (conn);
758
759         CWARN("New conn nid:"LPX64" ip:%08x/%d incarnation:"LPX64"\n",
760               nid, conn->ksnc_ipaddr, conn->ksnc_port, incarnation);
761
762         irq = ksocknal_conn_irq (conn);
763
764         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
765
766         if (route != NULL) {
767                 /* Autoconnected! */
768                 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
769                 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
770
771                 if (route->ksnr_deleted) {
772                         /* This conn was autoconnected, but the autoconnect
773                          * route got deleted while it was being
774                          * established! */
775                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
776                                                  flags);
777                         PORTAL_FREE (conn, sizeof (*conn));
778                         return (-ESTALE);
779                 }
780
781
782                 /* associate conn/route */
783                 conn->ksnc_route = route;
784                 atomic_inc (&route->ksnr_refcount);
785
786                 route->ksnr_connecting &= ~(1 << type);
787                 route->ksnr_connected  |= (1 << type);
788                 route->ksnr_conn_count++;
789                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
790
791                 peer = route->ksnr_peer;
792         } else {
793                 /* Not an autoconnected connection; see if there is an
794                  * existing peer for this NID */
795                 peer2 = ksocknal_find_peer_locked (nid);
796                 if (peer2 != NULL) {
797                         ksocknal_put_peer (peer);
798                         peer = peer2;
799                 } else {
800                         list_add (&peer->ksnp_list,
801                                   ksocknal_nid2peerlist (nid));
802                         /* peer list takes over existing ref */
803                 }
804         }
805
806         LASSERT (!peer->ksnp_closing);
807
808         conn->ksnc_peer = peer;
809         atomic_inc (&peer->ksnp_refcount);
810         peer->ksnp_last_alive = jiffies;
811         peer->ksnp_error = 0;
812
813         /* Set the deadline for the outgoing HELLO to drain */
814         conn->ksnc_tx_deadline = jiffies +
815                                  ksocknal_tunables.ksnd_io_timeout * HZ;
816
817         list_add (&conn->ksnc_list, &peer->ksnp_conns);
818         atomic_inc (&conn->ksnc_refcount);
819
820         sched = ksocknal_choose_scheduler_locked (irq);
821         sched->kss_nconns++;
822         conn->ksnc_scheduler = sched;
823
824         /* NB my callbacks block while I hold ksnd_global_lock */
825         sock->sk->sk_user_data = conn;
826         sock->sk->sk_data_ready = ksocknal_data_ready;
827         sock->sk->sk_write_space = ksocknal_write_space;
828
829         /* Take all the packets blocking for a connection.
830          * NB, it might be nicer to share these blocked packets among any
831          * other connections that are becoming established, however that
832          * confuses the normal packet launching operation, which selects a
833          * connection and queues the packet on it without needing an
834          * exclusive lock on ksnd_global_lock. */
835         while (!list_empty (&peer->ksnp_tx_queue)) {
836                 tx = list_entry (peer->ksnp_tx_queue.next,
837                                  ksock_tx_t, tx_list);
838
839                 list_del (&tx->tx_list);
840                 ksocknal_queue_tx_locked (tx, conn);
841         }
842
843         rc = ksocknal_close_stale_conns_locked (peer, incarnation);
844
845         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
846
847         if (rc != 0)
848                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
849                         rc, conn->ksnc_peer->ksnp_nid,
850                         HIPQUAD(conn->ksnc_ipaddr));
851
852         if (bind_irq)                           /* irq binding required */
853                 ksocknal_bind_irq (irq);
854
855         /* Call the callbacks right now to get things going. */
856         ksocknal_data_ready (sock->sk, 0);
857         ksocknal_write_space (sock->sk);
858
859         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64" ip %d.%d.%d.%d\n",
860                conn, conn->ksnc_peer->ksnp_nid, HIPQUAD(conn->ksnc_ipaddr));
861
862         ksocknal_put_conn (conn);
863         return (0);
864 }
865
866 void
867 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
868 {
869         /* This just does the immmediate housekeeping, and queues the
870          * connection for the reaper to terminate.
871          * Caller holds ksnd_global_lock exclusively in irq context */
872         ksock_peer_t   *peer = conn->ksnc_peer;
873         ksock_route_t  *route;
874
875         LASSERT (peer->ksnp_error == 0);
876         LASSERT (!conn->ksnc_closing);
877         conn->ksnc_closing = 1;
878         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
879         
880         route = conn->ksnc_route;
881         if (route != NULL) {
882                 /* dissociate conn from route... */
883                 LASSERT (!route->ksnr_deleted);
884                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
885                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
886
887                 route->ksnr_connected &= ~(1 << conn->ksnc_type);
888                 conn->ksnc_route = NULL;
889
890                 list_del (&route->ksnr_list);   /* make route least favourite */
891                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
892                 
893                 ksocknal_put_route (route);     /* drop conn's ref on route */
894         }
895
896         /* ksnd_deathrow_conns takes over peer's ref */
897         list_del (&conn->ksnc_list);
898
899         if (list_empty (&peer->ksnp_conns)) {
900                 /* No more connections to this peer */
901
902                 peer->ksnp_error = error;       /* stash last conn close reason */
903
904                 if (list_empty (&peer->ksnp_routes)) {
905                         /* I've just closed last conn belonging to a
906                          * non-autoconnecting peer */
907                         ksocknal_unlink_peer_locked (peer);
908                 }
909         }
910
911         spin_lock (&ksocknal_data.ksnd_reaper_lock);
912
913         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
914         wake_up (&ksocknal_data.ksnd_reaper_waitq);
915                 
916         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
917 }
918
919 void
920 ksocknal_terminate_conn (ksock_conn_t *conn)
921 {
922         /* This gets called by the reaper (guaranteed thread context) to
923          * disengage the socket from its callbacks and close it.
924          * ksnc_refcount will eventually hit zero, and then the reaper will
925          * destroy it. */
926         unsigned long   flags;
927         ksock_peer_t   *peer = conn->ksnc_peer;
928         ksock_sched_t  *sched = conn->ksnc_scheduler;
929         struct timeval  now;
930         time_t          then = 0;
931         int             notify = 0;
932
933         LASSERT(conn->ksnc_closing);
934
935         /* wake up the scheduler to "send" all remaining packets to /dev/null */
936         spin_lock_irqsave(&sched->kss_lock, flags);
937
938         if (!conn->ksnc_tx_scheduled &&
939             !list_empty(&conn->ksnc_tx_queue)){
940                 list_add_tail (&conn->ksnc_tx_list,
941                                &sched->kss_tx_conns);
942                 /* a closing conn is always ready to tx */
943                 conn->ksnc_tx_ready = 1;
944                 conn->ksnc_tx_scheduled = 1;
945                 /* extra ref for scheduler */
946                 atomic_inc (&conn->ksnc_refcount);
947
948                 wake_up (&sched->kss_waitq);
949         }
950
951         spin_unlock_irqrestore (&sched->kss_lock, flags);
952
953         /* serialise with callbacks */
954         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
955
956         /* Remove conn's network callbacks.
957          * NB I _have_ to restore the callback, rather than storing a noop,
958          * since the socket could survive past this module being unloaded!! */
959         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
960         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
961
962         /* A callback could be in progress already; they hold a read lock
963          * on ksnd_global_lock (to serialise with me) and NOOP if
964          * sk_user_data is NULL. */
965         conn->ksnc_sock->sk->sk_user_data = NULL;
966
967         /* OK, so this conn may not be completely disengaged from its
968          * scheduler yet, but it _has_ committed to terminate... */
969         conn->ksnc_scheduler->kss_nconns--;
970
971         if (peer->ksnp_error != 0) {
972                 /* peer's last conn closed in error */
973                 LASSERT (list_empty (&peer->ksnp_conns));
974                 
975                 /* convert peer's last-known-alive timestamp from jiffies */
976                 do_gettimeofday (&now);
977                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
978                 notify = 1;
979         }
980         
981         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
982
983         /* The socket is closed on the final put; either here, or in
984          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
985          * when the connection was established, this will close the socket
986          * immediately, aborting anything buffered in it. Any hung
987          * zero-copy transmits will therefore complete in finite time. */
988         ksocknal_putconnsock (conn);
989
990         if (notify)
991                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
992                             0, then);
993 }
994
995 void
996 ksocknal_destroy_conn (ksock_conn_t *conn)
997 {
998         /* Final coup-de-grace of the reaper */
999         CDEBUG (D_NET, "connection %p\n", conn);
1000
1001         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1002         LASSERT (conn->ksnc_route == NULL);
1003         LASSERT (!conn->ksnc_tx_scheduled);
1004         LASSERT (!conn->ksnc_rx_scheduled);
1005         LASSERT (list_empty(&conn->ksnc_tx_queue));
1006
1007         /* complete current receive if any */
1008         switch (conn->ksnc_rx_state) {
1009         case SOCKNAL_RX_BODY:
1010                 CERROR("Completing partial receive from "LPX64
1011                        ", ip %d.%d.%d.%d:%d, with error\n",
1012                        conn->ksnc_peer->ksnp_nid,
1013                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1014                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1015                 break;
1016         case SOCKNAL_RX_BODY_FWD:
1017                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1018                 break;
1019         case SOCKNAL_RX_HEADER:
1020         case SOCKNAL_RX_SLOP:
1021                 break;
1022         default:
1023                 LBUG ();
1024                 break;
1025         }
1026
1027         ksocknal_put_peer (conn->ksnc_peer);
1028
1029         PORTAL_FREE (conn, sizeof (*conn));
1030         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1031 }
1032
1033 void
1034 ksocknal_put_conn (ksock_conn_t *conn)
1035 {
1036         unsigned long flags;
1037
1038         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1039                 conn, conn->ksnc_peer->ksnp_nid,
1040                 atomic_read (&conn->ksnc_refcount));
1041
1042         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1043         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1044                 return;
1045
1046         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1047
1048         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1049         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1050
1051         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1052 }
1053
1054 int
1055 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1056 {
1057         ksock_conn_t       *conn;
1058         struct list_head   *ctmp;
1059         struct list_head   *cnxt;
1060         int                 count = 0;
1061
1062         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1063                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1064
1065                 if (ipaddr == 0 ||
1066                     conn->ksnc_ipaddr == ipaddr) {
1067                         count++;
1068                         ksocknal_close_conn_locked (conn, why);
1069                 }
1070         }
1071
1072         return (count);
1073 }
1074
1075 int
1076 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1077 {
1078         ksock_conn_t       *conn;
1079         struct list_head   *ctmp;
1080         struct list_head   *cnxt;
1081         int                 count = 0;
1082
1083         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1084                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1085
1086                 if (conn->ksnc_incarnation == incarnation)
1087                         continue;
1088
1089                 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1090                       "incarnation:"LPX64"("LPX64")\n",
1091                       peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1092                       conn->ksnc_incarnation, incarnation);
1093                 
1094                 count++;
1095                 ksocknal_close_conn_locked (conn, -ESTALE);
1096         }
1097
1098         return (count);
1099 }
1100
1101 int
1102 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why) 
1103 {
1104         ksock_peer_t     *peer = conn->ksnc_peer;
1105         __u32             ipaddr = conn->ksnc_ipaddr;
1106         unsigned long     flags;
1107         int               count;
1108
1109         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1110
1111         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1112         
1113         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1114
1115         return (count);
1116 }
1117
1118 int
1119 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1120 {
1121         unsigned long       flags;
1122         ksock_peer_t       *peer;
1123         struct list_head   *ptmp;
1124         struct list_head   *pnxt;
1125         int                 lo;
1126         int                 hi;
1127         int                 i;
1128         int                 count = 0;
1129
1130         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1131
1132         if (nid != PTL_NID_ANY)
1133                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1134         else {
1135                 lo = 0;
1136                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1137         }
1138
1139         for (i = lo; i <= hi; i++) {
1140                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1141
1142                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1143
1144                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1145                                 continue;
1146
1147                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1148                 }
1149         }
1150
1151         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1152
1153         /* wildcards always succeed */
1154         if (nid == PTL_NID_ANY || ipaddr == 0)
1155                 return (0);
1156         
1157         return (count == 0 ? -ENOENT : 0);
1158 }
1159
1160 void
1161 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1162 {
1163         /* The router is telling me she's been notified of a change in
1164          * gateway state.... */
1165
1166         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1167
1168         if (!alive) {
1169                 /* If the gateway crashed, close all open connections... */
1170                 ksocknal_close_matching_conns (gw_nid, 0);
1171                 return;
1172         }
1173         
1174         /* ...otherwise do nothing.  We can only establish new connections
1175          * if we have autroutes, and these connect on demand. */
1176 }
1177
1178 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1179 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1180 {
1181         return &(sk->tp_pinfo.af_tcp);
1182 }
1183 #else
1184 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1185 {
1186         struct tcp_sock *s = (struct tcp_sock *)sk;
1187         return &s->tcp;
1188 }
1189 #endif
1190
1191 void
1192 ksocknal_push_conn (ksock_conn_t *conn)
1193 {
1194         struct sock    *sk;
1195         struct tcp_opt *tp;
1196         int             nonagle;
1197         int             val = 1;
1198         int             rc;
1199         mm_segment_t    oldmm;
1200
1201         rc = ksocknal_getconnsock (conn);
1202         if (rc != 0)                            /* being shut down */
1203                 return;
1204         
1205         sk = conn->ksnc_sock->sk;
1206         tp = sock2tcp_opt(sk);
1207         
1208         lock_sock (sk);
1209         nonagle = tp->nonagle;
1210         tp->nonagle = 1;
1211         release_sock (sk);
1212
1213         oldmm = get_fs ();
1214         set_fs (KERNEL_DS);
1215
1216         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1217                                       (char *)&val, sizeof (val));
1218         LASSERT (rc == 0);
1219
1220         set_fs (oldmm);
1221
1222         lock_sock (sk);
1223         tp->nonagle = nonagle;
1224         release_sock (sk);
1225
1226         ksocknal_putconnsock (conn);
1227 }
1228
1229 void
1230 ksocknal_push_peer (ksock_peer_t *peer)
1231 {
1232         int               index;
1233         int               i;
1234         struct list_head *tmp;
1235         ksock_conn_t     *conn;
1236
1237         for (index = 0; ; index++) {
1238                 read_lock (&ksocknal_data.ksnd_global_lock);
1239
1240                 i = 0;
1241                 conn = NULL;
1242
1243                 list_for_each (tmp, &peer->ksnp_conns) {
1244                         if (i++ == index) {
1245                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1246                                 atomic_inc (&conn->ksnc_refcount);
1247                                 break;
1248                         }
1249                 }
1250
1251                 read_unlock (&ksocknal_data.ksnd_global_lock);
1252
1253                 if (conn == NULL)
1254                         break;
1255
1256                 ksocknal_push_conn (conn);
1257                 ksocknal_put_conn (conn);
1258         }
1259 }
1260
1261 int
1262 ksocknal_push (ptl_nid_t nid)
1263 {
1264         ksock_peer_t      *peer;
1265         struct list_head  *tmp;
1266         int                index;
1267         int                i;
1268         int                j;
1269         int                rc = -ENOENT;
1270
1271         if (nid != PTL_NID_ANY) {
1272                 peer = ksocknal_get_peer (nid);
1273
1274                 if (peer != NULL) {
1275                         rc = 0;
1276                         ksocknal_push_peer (peer);
1277                         ksocknal_put_peer (peer);
1278                 }
1279                 return (rc);
1280         }
1281
1282         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1283                 for (j = 0; ; j++) {
1284                         read_lock (&ksocknal_data.ksnd_global_lock);
1285
1286                         index = 0;
1287                         peer = NULL;
1288
1289                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1290                                 if (index++ == j) {
1291                                         peer = list_entry(tmp, ksock_peer_t,
1292                                                           ksnp_list);
1293                                         atomic_inc (&peer->ksnp_refcount);
1294                                         break;
1295                                 }
1296                         }
1297
1298                         read_unlock (&ksocknal_data.ksnd_global_lock);
1299
1300                         if (peer != NULL) {
1301                                 rc = 0;
1302                                 ksocknal_push_peer (peer);
1303                                 ksocknal_put_peer (peer);
1304                         }
1305                 }
1306
1307         }
1308
1309         return (rc);
1310 }
1311
1312 int
1313 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1314 {
1315         int rc = -EINVAL;
1316
1317         LASSERT (pcfg != NULL);
1318
1319         switch(pcfg->pcfg_command) {
1320         case NAL_CMD_GET_AUTOCONN: {
1321                 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1322
1323                 if (route == NULL)
1324                         rc = -ENOENT;
1325                 else {
1326                         rc = 0;
1327                         pcfg->pcfg_nid   = route->ksnr_peer->ksnp_nid;
1328                         pcfg->pcfg_id    = route->ksnr_ipaddr;
1329                         pcfg->pcfg_misc  = route->ksnr_port;
1330                         pcfg->pcfg_count = route->ksnr_conn_count;
1331                         pcfg->pcfg_size  = route->ksnr_buffer_size;
1332                         pcfg->pcfg_wait  = route->ksnr_sharecount;
1333                         pcfg->pcfg_flags = (route->ksnr_irq_affinity ? 2 : 0) |
1334                                            (route->ksnr_eager        ? 4 : 0);
1335                         ksocknal_put_route (route);
1336                 }
1337                 break;
1338         }
1339         case NAL_CMD_ADD_AUTOCONN: {
1340                 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1341                                          pcfg->pcfg_misc, pcfg->pcfg_size,
1342                                          (pcfg->pcfg_flags & 0x02) != 0,
1343                                          (pcfg->pcfg_flags & 0x04) != 0,
1344                                          (pcfg->pcfg_flags & 0x08) != 0);
1345                 break;
1346         }
1347         case NAL_CMD_DEL_AUTOCONN: {
1348                 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id, 
1349                                          (pcfg->pcfg_flags & 1) != 0,
1350                                          (pcfg->pcfg_flags & 2) != 0);
1351                 break;
1352         }
1353         case NAL_CMD_GET_CONN: {
1354                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1355
1356                 if (conn == NULL)
1357                         rc = -ENOENT;
1358                 else {
1359                         rc = 0;
1360                         pcfg->pcfg_nid   = conn->ksnc_peer->ksnp_nid;
1361                         pcfg->pcfg_id    = conn->ksnc_ipaddr;
1362                         pcfg->pcfg_misc  = conn->ksnc_port;
1363                         pcfg->pcfg_flags = conn->ksnc_type;
1364                         ksocknal_put_conn (conn);
1365                 }
1366                 break;
1367         }
1368         case NAL_CMD_REGISTER_PEER_FD: {
1369                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1370                 int            type = pcfg->pcfg_misc;
1371
1372                 if (sock == NULL)
1373                         break;
1374
1375                 switch (type) {
1376                 case SOCKNAL_CONN_NONE:
1377                 case SOCKNAL_CONN_ANY:
1378                 case SOCKNAL_CONN_CONTROL:
1379                 case SOCKNAL_CONN_BULK_IN:
1380                 case SOCKNAL_CONN_BULK_OUT:
1381                         rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1382                 default:
1383                         break;
1384                 }
1385                 if (rc != 0)
1386                         fput (sock->file);
1387                 break;
1388         }
1389         case NAL_CMD_CLOSE_CONNECTION: {
1390                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid, 
1391                                                     pcfg->pcfg_id);
1392                 break;
1393         }
1394         case NAL_CMD_REGISTER_MYNID: {
1395                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1396                 break;
1397         }
1398         case NAL_CMD_PUSH_CONNECTION: {
1399                 rc = ksocknal_push (pcfg->pcfg_nid);
1400                 break;
1401         }
1402         }
1403
1404         return rc;
1405 }
1406
1407 void
1408 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1409 {
1410         int          npages = p->fmp_buff_pages;
1411         ksock_fmb_t *fmb;
1412         int          i;
1413
1414         LASSERT (list_empty(&p->fmp_blocked_conns));
1415         LASSERT (p->fmp_nactive_fmbs == 0);
1416         
1417         while (!list_empty(&p->fmp_idle_fmbs)) {
1418
1419                 fmb = list_entry(p->fmp_idle_fmbs.next,
1420                                  ksock_fmb_t, fmb_list);
1421                 
1422                 for (i = 0; i < npages; i++)
1423                         if (fmb->fmb_kiov[i].kiov_page != NULL)
1424                                 __free_page(fmb->fmb_kiov[i].kiov_page);
1425
1426                 list_del(&fmb->fmb_list);
1427                 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1428         }
1429 }
1430
1431 void
1432 ksocknal_free_buffers (void)
1433 {
1434         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1435         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1436
1437         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1438
1439         if (ksocknal_data.ksnd_schedulers != NULL)
1440                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1441                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1442
1443         PORTAL_FREE (ksocknal_data.ksnd_peers,
1444                      sizeof (struct list_head) * 
1445                      ksocknal_data.ksnd_peer_hash_size);
1446 }
1447
1448 void
1449 ksocknal_api_shutdown (nal_t *nal)
1450 {
1451         int   i;
1452
1453         if (nal->nal_refct != 0) {
1454                 /* This module got the first ref */
1455                 PORTAL_MODULE_UNUSE;
1456                 return;
1457         }
1458
1459         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1460                atomic_read (&portal_kmemory));
1461
1462         LASSERT(nal == &ksocknal_api);
1463
1464         switch (ksocknal_data.ksnd_init) {
1465         default:
1466                 LASSERT (0);
1467
1468         case SOCKNAL_INIT_ALL:
1469                 libcfs_nal_cmd_unregister(SOCKNAL);
1470
1471                 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
1472                 /* fall through */
1473
1474         case SOCKNAL_INIT_LIB:
1475                 /* No more calls to ksocknal_cmd() to create new
1476                  * autoroutes/connections since we're being unloaded. */
1477
1478                 /* Delete all autoroute entries */
1479                 ksocknal_del_route(PTL_NID_ANY, 0, 0, 0);
1480
1481                 /* Delete all connections */
1482                 ksocknal_close_matching_conns (PTL_NID_ANY, 0);
1483                 
1484                 /* Wait for all peer state to clean up */
1485                 i = 2;
1486                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1487                         i++;
1488                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1489                                "waiting for %d peers to disconnect\n",
1490                                atomic_read (&ksocknal_data.ksnd_npeers));
1491                         set_current_state (TASK_UNINTERRUPTIBLE);
1492                         schedule_timeout (HZ);
1493                 }
1494
1495                 /* Tell lib we've stopped calling into her. */
1496                 lib_fini(&ksocknal_lib);
1497
1498                 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1499                 /* fall through */
1500
1501         case SOCKNAL_INIT_DATA:
1502                 /* Module refcount only gets to zero when all peers
1503                  * have been closed so all lists must be empty */
1504                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1505                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1506                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1507                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1508                 }
1509                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1510                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1511                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1512                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1513                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1514
1515                 if (ksocknal_data.ksnd_schedulers != NULL)
1516                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1517                                 ksock_sched_t *kss =
1518                                         &ksocknal_data.ksnd_schedulers[i];
1519
1520                                 LASSERT (list_empty (&kss->kss_tx_conns));
1521                                 LASSERT (list_empty (&kss->kss_rx_conns));
1522                                 LASSERT (kss->kss_nconns == 0);
1523                         }
1524
1525                 /* stop router calling me */
1526                 kpr_shutdown (&ksocknal_data.ksnd_router);
1527
1528                 /* flag threads to terminate; wake and wait for them to die */
1529                 ksocknal_data.ksnd_shuttingdown = 1;
1530                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1531                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1532
1533                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1534                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1535
1536                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1537                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1538                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1539                         set_current_state (TASK_UNINTERRUPTIBLE);
1540                         schedule_timeout (HZ);
1541                 }
1542
1543                 kpr_deregister (&ksocknal_data.ksnd_router);
1544
1545                 ksocknal_free_buffers();
1546
1547                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
1548                 /* fall through */
1549
1550         case SOCKNAL_INIT_NOTHING:
1551                 break;
1552         }
1553
1554         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1555                atomic_read (&portal_kmemory));
1556
1557         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1558                atomic_read(&portal_kmemory));
1559 }
1560
1561
1562 void
1563 ksocknal_init_incarnation (void)
1564 {
1565         struct timeval tv;
1566
1567         /* The incarnation number is the time this module loaded and it
1568          * identifies this particular instance of the socknal.  Hopefully
1569          * we won't be able to reboot more frequently than 1MHz for the
1570          * forseeable future :) */
1571         
1572         do_gettimeofday(&tv);
1573         
1574         ksocknal_data.ksnd_incarnation = 
1575                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1576 }
1577
1578 int
1579 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1580                       ptl_ni_limits_t *requested_limits,
1581                       ptl_ni_limits_t *actual_limits)
1582 {
1583         ptl_process_id_t  process_id;
1584         int               pkmem = atomic_read(&portal_kmemory);
1585         int               rc;
1586         int               i;
1587         int               j;
1588
1589         LASSERT (nal == &ksocknal_api);
1590
1591         if (nal->nal_refct != 0) {
1592                 if (actual_limits != NULL)
1593                         *actual_limits = ksocknal_lib.ni.actual_limits;
1594                 /* This module got the first ref */
1595                 PORTAL_MODULE_USE;
1596                 return (PTL_OK);
1597         }
1598
1599         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1600
1601         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1602
1603         ksocknal_init_incarnation();
1604         
1605         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1606         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1607                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1608         if (ksocknal_data.ksnd_peers == NULL)
1609                 return (-ENOMEM);
1610
1611         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1612                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1613
1614         rwlock_init(&ksocknal_data.ksnd_global_lock);
1615
1616         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1617         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1618         init_waitqueue_head(&ksocknal_data.ksnd_yield_waitq);
1619         
1620         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1621         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1622         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1623         ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
1624
1625         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1626         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1627         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1628         ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
1629
1630         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1631         INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1632         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1633         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1634         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1635
1636         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1637         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1638         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1639
1640         /* NB memset above zeros whole of ksocknal_data, including
1641          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1642
1643         /* flag lists/ptrs/locks initialised */
1644         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1645
1646         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1647                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1648         if (ksocknal_data.ksnd_schedulers == NULL) {
1649                 ksocknal_api_shutdown (&ksocknal_api);
1650                 return (-ENOMEM);
1651         }
1652
1653         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1654                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1655
1656                 spin_lock_init (&kss->kss_lock);
1657                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1658                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1659 #if SOCKNAL_ZC
1660                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1661 #endif
1662                 init_waitqueue_head (&kss->kss_waitq);
1663         }
1664
1665         /* NB we have to wait to be told our true NID... */
1666         process_id.pid = 0;
1667         process_id.nid = 0;
1668         
1669         rc = lib_init(&ksocknal_lib, process_id,
1670                       requested_limits, actual_limits);
1671         if (rc != PTL_OK) {
1672                 CERROR("lib_init failed: error %d\n", rc);
1673                 ksocknal_api_shutdown (&ksocknal_api);
1674                 return (rc);
1675         }
1676
1677         ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
1678
1679         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1680                 rc = ksocknal_thread_start (ksocknal_scheduler,
1681                                             &ksocknal_data.ksnd_schedulers[i]);
1682                 if (rc != 0) {
1683                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1684                                i, rc);
1685                         ksocknal_api_shutdown (&ksocknal_api);
1686                         return (rc);
1687                 }
1688         }
1689
1690         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1691                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1692                 if (rc != 0) {
1693                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1694                         ksocknal_api_shutdown (&ksocknal_api);
1695                         return (rc);
1696                 }
1697         }
1698
1699         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1700         if (rc != 0) {
1701                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1702                 ksocknal_api_shutdown (&ksocknal_api);
1703                 return (rc);
1704         }
1705
1706         rc = kpr_register(&ksocknal_data.ksnd_router,
1707                           &ksocknal_router_interface);
1708         if (rc != 0) {
1709                 CDEBUG(D_NET, "Can't initialise routing interface "
1710                        "(rc = %d): not routing\n", rc);
1711         } else {
1712                 /* Only allocate forwarding buffers if there's a router */
1713
1714                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1715                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1716                         ksock_fmb_t      *fmb;
1717                         ksock_fmb_pool_t *pool;
1718                         
1719
1720                         if (i < SOCKNAL_SMALL_FWD_NMSGS)
1721                                 pool = &ksocknal_data.ksnd_small_fmp;
1722                         else
1723                                 pool = &ksocknal_data.ksnd_large_fmp;
1724                         
1725                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t, 
1726                                                    fmb_kiov[pool->fmp_buff_pages]));
1727                         if (fmb == NULL) {
1728                                 ksocknal_api_shutdown(&ksocknal_api);
1729                                 return (-ENOMEM);
1730                         }
1731
1732                         fmb->fmb_pool = pool;
1733                         
1734                         for (j = 0; j < pool->fmp_buff_pages; j++) {
1735                                 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
1736
1737                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
1738                                         ksocknal_api_shutdown (&ksocknal_api);
1739                                         return (-ENOMEM);
1740                                 }
1741
1742                                 LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
1743                         }
1744
1745                         list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
1746                 }
1747         }
1748
1749         rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
1750         if (rc != 0) {
1751                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1752                 ksocknal_api_shutdown (&ksocknal_api);
1753                 return (rc);
1754         }
1755
1756         /* flag everything initialised */
1757         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1758
1759         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1760                "(Routing %s, initial mem %d, incarnation "LPX64")\n",
1761                kpr_routing (&ksocknal_data.ksnd_router) ?
1762                "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
1763
1764         return (0);
1765 }
1766
1767 void __exit
1768 ksocknal_module_fini (void)
1769 {
1770 #ifdef CONFIG_SYSCTL
1771         if (ksocknal_tunables.ksnd_sysctl != NULL)
1772                 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
1773 #endif
1774         PtlNIFini(ksocknal_ni);
1775
1776         ptl_unregister_nal(SOCKNAL);
1777 }
1778
1779 int __init
1780 ksocknal_module_init (void)
1781 {
1782         int    rc;
1783
1784         /* packet descriptor must fit in a router descriptor's scratchpad */
1785         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1786         /* the following must be sizeof(int) for proc_dointvec() */
1787         LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
1788         LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
1789         LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
1790         LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
1791 #if SOCKNAL_ZC
1792         LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
1793 #endif
1794         /* check ksnr_connected/connecting field large enough */
1795         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1796         
1797         ksocknal_api.startup  = ksocknal_api_startup;
1798         ksocknal_api.forward  = ksocknal_api_forward;
1799         ksocknal_api.shutdown = ksocknal_api_shutdown;
1800         ksocknal_api.lock     = ksocknal_api_lock;
1801         ksocknal_api.unlock   = ksocknal_api_unlock;
1802         ksocknal_api.nal_data = &ksocknal_data;
1803
1804         ksocknal_lib.nal_data = &ksocknal_data;
1805
1806         /* Initialise dynamic tunables to defaults once only */
1807         ksocknal_tunables.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1808         ksocknal_tunables.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1809         ksocknal_tunables.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1810         ksocknal_tunables.ksnd_min_bulk   = SOCKNAL_MIN_BULK;
1811 #if SOCKNAL_ZC
1812         ksocknal_tunables.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1813 #endif
1814
1815         rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
1816         if (rc != PTL_OK) {
1817                 CERROR("Can't register SOCKNAL: %d\n", rc);
1818                 return (-ENOMEM);               /* or something... */
1819         }
1820
1821         /* Pure gateways want the NAL started up at module load time... */
1822         rc = PtlNIInit(SOCKNAL, 0, NULL, NULL, &ksocknal_ni);
1823         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
1824                 ptl_unregister_nal(SOCKNAL);
1825                 return (-ENODEV);
1826         }
1827         
1828 #ifdef CONFIG_SYSCTL
1829         /* Press on regardless even if registering sysctl doesn't work */
1830         ksocknal_tunables.ksnd_sysctl = 
1831                 register_sysctl_table (ksocknal_top_ctl_table, 0);
1832 #endif
1833         return (0);
1834 }
1835
1836 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1837 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1838 MODULE_LICENSE("GPL");
1839
1840 module_init(ksocknal_module_init);
1841 module_exit(ksocknal_module_fini);
1842