Whamcloud - gitweb
b=2776
[fs/lustre-release.git] / lustre / portals / knals / socknal / socknal.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40         kprni_notify:     ksocknal_notify,
41 };
42
43 #define SOCKNAL_SYSCTL  200
44
45 #define SOCKNAL_SYSCTL_TIMEOUT     1
46 #define SOCKNAL_SYSCTL_EAGER_ACK   2
47 #define SOCKNAL_SYSCTL_ZERO_COPY   3
48 #define SOCKNAL_SYSCTL_TYPED       4
49 #define SOCKNAL_SYSCTL_MIN_BULK    5
50
51 static ctl_table ksocknal_ctl_table[] = {
52         {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
53          &ksocknal_data.ksnd_io_timeout, sizeof (int),
54          0644, NULL, &proc_dointvec},
55         {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
56          &ksocknal_data.ksnd_eager_ack, sizeof (int),
57          0644, NULL, &proc_dointvec},
58 #if SOCKNAL_ZC
59         {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy", 
60          &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
61          0644, NULL, &proc_dointvec},
62 #endif
63         {SOCKNAL_SYSCTL_TYPED, "typed", 
64          &ksocknal_data.ksnd_typed_conns, sizeof (int),
65          0644, NULL, &proc_dointvec},
66         {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk", 
67          &ksocknal_data.ksnd_min_bulk, sizeof (int),
68          0644, NULL, &proc_dointvec},
69         { 0 }
70 };
71
72 static ctl_table ksocknal_top_ctl_table[] = {
73         {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
74         { 0 }
75 };
76
77 int
78 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
79                        void *ret, size_t ret_len)
80 {
81         ksock_nal_data_t *k;
82         nal_cb_t *nal_cb;
83
84         k = nal->nal_data;
85         nal_cb = k->ksnd_nal_cb;
86
87         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
88         return PTL_OK;
89 }
90
91 int
92 ksocknal_api_shutdown(nal_t *nal, int ni)
93 {
94         return PTL_OK;
95 }
96
97 void
98 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
99 {
100         ksock_nal_data_t *k;
101         nal_cb_t *nal_cb;
102
103         k = nal->nal_data;
104         nal_cb = k->ksnd_nal_cb;
105         nal_cb->cb_cli(nal_cb,flags);
106 }
107
108 void
109 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
110 {
111         ksock_nal_data_t *k;
112         nal_cb_t *nal_cb;
113
114         k = nal->nal_data;
115         nal_cb = k->ksnd_nal_cb;
116         nal_cb->cb_sti(nal_cb,flags);
117 }
118
119 int
120 ksocknal_api_yield(nal_t *nal, unsigned long *flags, int milliseconds)
121 {
122         /* NB called holding statelock */
123         wait_queue_t       wait;
124         unsigned long      now = jiffies;
125
126         CDEBUG (D_NET, "yield\n");
127
128         if (milliseconds == 0) {
129                 our_cond_resched();
130                 return 0;
131         }
132
133         init_waitqueue_entry(&wait, current);
134         set_current_state (TASK_INTERRUPTIBLE);
135         add_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
136
137         ksocknal_api_unlock(nal, flags);
138
139         if (milliseconds < 0)
140                 schedule ();
141         else
142                 schedule_timeout((milliseconds * HZ) / 1000);
143         
144         ksocknal_api_lock(nal, flags);
145
146         remove_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
147
148         if (milliseconds > 0) {
149                 milliseconds -= ((jiffies - now) * 1000) / HZ;
150                 if (milliseconds < 0)
151                         milliseconds = 0;
152         }
153         
154         return (milliseconds);
155 }
156
157 nal_t *
158 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
159               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
160 {
161         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
162         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
163         return (&ksocknal_api);
164 }
165
166 /*
167  *  EXTRA functions follow
168  */
169
170 int
171 ksocknal_set_mynid(ptl_nid_t nid)
172 {
173         lib_ni_t *ni = &ksocknal_lib.ni;
174
175         /* FIXME: we have to do this because we call lib_init() at module
176          * insertion time, which is before we have 'mynid' available.  lib_init
177          * sets the NAL's nid, which it uses to tell other nodes where packets
178          * are coming from.  This is not a very graceful solution to this
179          * problem. */
180
181         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
182                nid, ni->nid);
183
184         ni->nid = nid;
185         return (0);
186 }
187
188 void
189 ksocknal_bind_irq (unsigned int irq)
190 {
191 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
192         int              bind;
193         unsigned long    flags;
194         char             cmdline[64];
195         ksock_irqinfo_t *info;
196         char            *argv[] = {"/bin/sh",
197                                    "-c",
198                                    cmdline,
199                                    NULL};
200         char            *envp[] = {"HOME=/",
201                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
202                                    NULL};
203
204         LASSERT (irq < NR_IRQS);
205         if (irq == 0)                           /* software NIC */
206                 return;
207
208         info = &ksocknal_data.ksnd_irqinfo[irq];
209
210         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
211
212         LASSERT (info->ksni_valid);
213         bind = !info->ksni_bound;
214         info->ksni_bound = 1;
215
216         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
217
218         if (!bind)                              /* bound already */
219                 return;
220
221         snprintf (cmdline, sizeof (cmdline),
222                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
223
224         printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
225                 irq, info->ksni_sched, cmdline);
226
227         /* FIXME: Find a better method of setting IRQ affinity...
228          */
229
230         USERMODEHELPER(argv[0], argv, envp);
231 #endif
232 }
233
234 ksock_route_t *
235 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
236                        int irq_affinity, int eager)
237 {
238         ksock_route_t *route;
239
240         PORTAL_ALLOC (route, sizeof (*route));
241         if (route == NULL)
242                 return (NULL);
243
244         atomic_set (&route->ksnr_refcount, 1);
245         route->ksnr_sharecount = 0;
246         route->ksnr_peer = NULL;
247         route->ksnr_timeout = jiffies;
248         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
249         route->ksnr_ipaddr = ipaddr;
250         route->ksnr_port = port;
251         route->ksnr_buffer_size = buffer_size;
252         route->ksnr_irq_affinity = irq_affinity;
253         route->ksnr_eager = eager;
254         route->ksnr_connecting = 0;
255         route->ksnr_connected = 0;
256         route->ksnr_deleted = 0;
257         route->ksnr_conn_count = 0;
258
259         return (route);
260 }
261
262 void
263 ksocknal_destroy_route (ksock_route_t *route)
264 {
265         LASSERT (route->ksnr_sharecount == 0);
266
267         if (route->ksnr_peer != NULL)
268                 ksocknal_put_peer (route->ksnr_peer);
269
270         PORTAL_FREE (route, sizeof (*route));
271 }
272
273 void
274 ksocknal_put_route (ksock_route_t *route)
275 {
276         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
277                 route, atomic_read (&route->ksnr_refcount));
278
279         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
280         if (!atomic_dec_and_test (&route->ksnr_refcount))
281              return;
282
283         ksocknal_destroy_route (route);
284 }
285
286 ksock_peer_t *
287 ksocknal_create_peer (ptl_nid_t nid)
288 {
289         ksock_peer_t *peer;
290
291         LASSERT (nid != PTL_NID_ANY);
292
293         PORTAL_ALLOC (peer, sizeof (*peer));
294         if (peer == NULL)
295                 return (NULL);
296
297         memset (peer, 0, sizeof (*peer));
298
299         peer->ksnp_nid = nid;
300         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
301         peer->ksnp_closing = 0;
302         INIT_LIST_HEAD (&peer->ksnp_conns);
303         INIT_LIST_HEAD (&peer->ksnp_routes);
304         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
305
306         atomic_inc (&ksocknal_data.ksnd_npeers);
307         return (peer);
308 }
309
310 void
311 ksocknal_destroy_peer (ksock_peer_t *peer)
312 {
313         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
314
315         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
316         LASSERT (list_empty (&peer->ksnp_conns));
317         LASSERT (list_empty (&peer->ksnp_routes));
318         LASSERT (list_empty (&peer->ksnp_tx_queue));
319
320         PORTAL_FREE (peer, sizeof (*peer));
321
322         /* NB a peer's connections and autoconnect routes keep a reference
323          * on their peer until they are destroyed, so we can be assured
324          * that _all_ state to do with this peer has been cleaned up when
325          * its refcount drops to zero. */
326         atomic_dec (&ksocknal_data.ksnd_npeers);
327 }
328
329 void
330 ksocknal_put_peer (ksock_peer_t *peer)
331 {
332         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
333                 peer, peer->ksnp_nid,
334                 atomic_read (&peer->ksnp_refcount));
335
336         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
337         if (!atomic_dec_and_test (&peer->ksnp_refcount))
338                 return;
339
340         ksocknal_destroy_peer (peer);
341 }
342
343 ksock_peer_t *
344 ksocknal_find_peer_locked (ptl_nid_t nid)
345 {
346         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
347         struct list_head *tmp;
348         ksock_peer_t     *peer;
349
350         list_for_each (tmp, peer_list) {
351
352                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
353
354                 LASSERT (!peer->ksnp_closing);
355                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
356                            list_empty (&peer->ksnp_conns)));
357
358                 if (peer->ksnp_nid != nid)
359                         continue;
360
361                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
362                        peer, nid, atomic_read (&peer->ksnp_refcount));
363                 return (peer);
364         }
365         return (NULL);
366 }
367
368 ksock_peer_t *
369 ksocknal_get_peer (ptl_nid_t nid)
370 {
371         ksock_peer_t     *peer;
372
373         read_lock (&ksocknal_data.ksnd_global_lock);
374         peer = ksocknal_find_peer_locked (nid);
375         if (peer != NULL)                       /* +1 ref for caller? */
376                 atomic_inc (&peer->ksnp_refcount);
377         read_unlock (&ksocknal_data.ksnd_global_lock);
378
379         return (peer);
380 }
381
382 void
383 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
384 {
385         LASSERT (!peer->ksnp_closing);
386         peer->ksnp_closing = 1;
387         list_del (&peer->ksnp_list);
388         /* lose peerlist's ref */
389         ksocknal_put_peer (peer);
390 }
391
392 ksock_route_t *
393 ksocknal_get_route_by_idx (int index)
394 {
395         ksock_peer_t      *peer;
396         struct list_head  *ptmp;
397         ksock_route_t     *route;
398         struct list_head  *rtmp;
399         int                i;
400
401         read_lock (&ksocknal_data.ksnd_global_lock);
402
403         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
404                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
405                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
406
407                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
408                                    list_empty (&peer->ksnp_conns)));
409
410                         list_for_each (rtmp, &peer->ksnp_routes) {
411                                 if (index-- > 0)
412                                         continue;
413
414                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
415                                 atomic_inc (&route->ksnr_refcount);
416                                 read_unlock (&ksocknal_data.ksnd_global_lock);
417                                 return (route);
418                         }
419                 }
420         }
421
422         read_unlock (&ksocknal_data.ksnd_global_lock);
423         return (NULL);
424 }
425
426 int
427 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
428                     int bind_irq, int share, int eager)
429 {
430         unsigned long      flags;
431         ksock_peer_t      *peer;
432         ksock_peer_t      *peer2;
433         ksock_route_t     *route;
434         struct list_head  *rtmp;
435         ksock_route_t     *route2;
436         
437         if (nid == PTL_NID_ANY)
438                 return (-EINVAL);
439
440         /* Have a brand new peer ready... */
441         peer = ksocknal_create_peer (nid);
442         if (peer == NULL)
443                 return (-ENOMEM);
444
445         route = ksocknal_create_route (ipaddr, port, bufnob, 
446                                        bind_irq, eager);
447         if (route == NULL) {
448                 ksocknal_put_peer (peer);
449                 return (-ENOMEM);
450         }
451
452         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
453
454         peer2 = ksocknal_find_peer_locked (nid);
455         if (peer2 != NULL) {
456                 ksocknal_put_peer (peer);
457                 peer = peer2;
458         } else {
459                 /* peer table takes existing ref on peer */
460                 list_add (&peer->ksnp_list,
461                           ksocknal_nid2peerlist (nid));
462         }
463
464         route2 = NULL;
465         if (share) {
466                 /* check for existing route to this NID via this ipaddr */
467                 list_for_each (rtmp, &peer->ksnp_routes) {
468                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
469                         
470                         if (route2->ksnr_ipaddr == ipaddr)
471                                 break;
472
473                         route2 = NULL;
474                 }
475         }
476
477         if (route2 != NULL) {
478                 ksocknal_put_route (route);
479                 route = route2;
480         } else {
481                 /* route takes a ref on peer */
482                 route->ksnr_peer = peer;
483                 atomic_inc (&peer->ksnp_refcount);
484                 /* peer's route list takes existing ref on route */
485                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
486         }
487         
488         route->ksnr_sharecount++;
489
490         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
491
492         return (0);
493 }
494
495 void
496 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
497 {
498         ksock_peer_t     *peer = route->ksnr_peer;
499         ksock_conn_t     *conn;
500         struct list_head *ctmp;
501         struct list_head *cnxt;
502
503         if (!share)
504                 route->ksnr_sharecount = 0;
505         else {
506                 route->ksnr_sharecount--;
507                 if (route->ksnr_sharecount != 0)
508                         return;
509         }
510
511         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
512                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
513
514                 if (conn->ksnc_route != route)
515                         continue;
516                 
517                 if (!keep_conn) {
518                         ksocknal_close_conn_locked (conn, 0);
519                         continue;
520                 }
521                 
522                 /* keeping the conn; just dissociate it and route... */
523                 conn->ksnc_route = NULL;
524                 ksocknal_put_route (route); /* drop conn's ref on route */
525         }
526         
527         route->ksnr_deleted = 1;
528         list_del (&route->ksnr_list);
529         ksocknal_put_route (route);             /* drop peer's ref */
530
531         if (list_empty (&peer->ksnp_routes) &&
532             list_empty (&peer->ksnp_conns)) {
533                 /* I've just removed the last autoconnect route of a peer
534                  * with no active connections */
535                 ksocknal_unlink_peer_locked (peer);
536         }
537 }
538
539 int
540 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
541 {
542         unsigned long      flags;
543         struct list_head  *ptmp;
544         struct list_head  *pnxt;
545         ksock_peer_t      *peer;
546         struct list_head  *rtmp;
547         struct list_head  *rnxt;
548         ksock_route_t     *route;
549         int                lo;
550         int                hi;
551         int                i;
552         int                rc = -ENOENT;
553
554         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
555
556         if (nid != PTL_NID_ANY)
557                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
558         else {
559                 lo = 0;
560                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
561         }
562
563         for (i = lo; i <= hi; i++) {
564                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
565                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
566
567                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
568                                 continue;
569
570                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
571                                 route = list_entry (rtmp, ksock_route_t,
572                                                     ksnr_list);
573
574                                 if (!(ipaddr == 0 ||
575                                       route->ksnr_ipaddr == ipaddr))
576                                         continue;
577
578                                 ksocknal_del_route_locked (route, share, keep_conn);
579                                 rc = 0;         /* matched something */
580                                 if (share)
581                                         goto out;
582                         }
583                 }
584         }
585  out:
586         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
587
588         return (rc);
589 }
590
591 ksock_conn_t *
592 ksocknal_get_conn_by_idx (int index)
593 {
594         ksock_peer_t      *peer;
595         struct list_head  *ptmp;
596         ksock_conn_t      *conn;
597         struct list_head  *ctmp;
598         int                i;
599
600         read_lock (&ksocknal_data.ksnd_global_lock);
601
602         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
603                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
604                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
605
606                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
607                                    list_empty (&peer->ksnp_conns)));
608
609                         list_for_each (ctmp, &peer->ksnp_conns) {
610                                 if (index-- > 0)
611                                         continue;
612
613                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
614                                 atomic_inc (&conn->ksnc_refcount);
615                                 read_unlock (&ksocknal_data.ksnd_global_lock);
616                                 return (conn);
617                         }
618                 }
619         }
620
621         read_unlock (&ksocknal_data.ksnd_global_lock);
622         return (NULL);
623 }
624
625 void
626 ksocknal_get_peer_addr (ksock_conn_t *conn)
627 {
628         struct sockaddr_in sin;
629         int                len = sizeof (sin);
630         int                rc;
631         
632         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
633                                             (struct sockaddr *)&sin, &len, 2);
634         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
635         LASSERT (!conn->ksnc_closing);
636         LASSERT (len <= sizeof (sin));
637
638         if (rc != 0) {
639                 CERROR ("Error %d getting sock peer IP\n", rc);
640                 return;
641         }
642
643         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
644         conn->ksnc_port   = ntohs (sin.sin_port);
645 }
646
647 unsigned int
648 ksocknal_conn_irq (ksock_conn_t *conn)
649 {
650         int                irq = 0;
651         struct dst_entry  *dst;
652
653         dst = sk_dst_get (conn->ksnc_sock->sk);
654         if (dst != NULL) {
655                 if (dst->dev != NULL) {
656                         irq = dst->dev->irq;
657                         if (irq >= NR_IRQS) {
658                                 CERROR ("Unexpected IRQ %x\n", irq);
659                                 irq = 0;
660                         }
661                 }
662                 dst_release (dst);
663         }
664         
665         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
666         LASSERT (!conn->ksnc_closing);
667         return (irq);
668 }
669
670 ksock_sched_t *
671 ksocknal_choose_scheduler_locked (unsigned int irq)
672 {
673         ksock_sched_t    *sched;
674         ksock_irqinfo_t  *info;
675         int               i;
676
677         LASSERT (irq < NR_IRQS);
678         info = &ksocknal_data.ksnd_irqinfo[irq];
679
680         if (irq != 0 &&                         /* hardware NIC */
681             info->ksni_valid) {                 /* already set up */
682                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
683         }
684
685         /* software NIC (irq == 0) || not associated with a scheduler yet.
686          * Choose the CPU with the fewest connections... */
687         sched = &ksocknal_data.ksnd_schedulers[0];
688         for (i = 1; i < SOCKNAL_N_SCHED; i++)
689                 if (sched->kss_nconns >
690                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
691                         sched = &ksocknal_data.ksnd_schedulers[i];
692
693         if (irq != 0) {                         /* Hardware NIC */
694                 info->ksni_valid = 1;
695                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
696
697                 /* no overflow... */
698                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
699         }
700
701         return (sched);
702 }
703
704 int
705 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
706                       int bind_irq, int type)
707 {
708         ptl_nid_t          nid;
709         __u64              incarnation;
710         unsigned long      flags;
711         ksock_conn_t      *conn;
712         ksock_peer_t      *peer;
713         ksock_peer_t      *peer2;
714         ksock_sched_t     *sched;
715         unsigned int       irq;
716         ksock_tx_t        *tx;
717         int                rc;
718
719         /* NB, sock has an associated file since (a) this connection might
720          * have been created in userland and (b) we need to refcount the
721          * socket so that we don't close it while I/O is being done on
722          * it, and sock->file has that pre-cooked... */
723         LASSERT (sock->file != NULL);
724         LASSERT (file_count(sock->file) > 0);
725
726         rc = ksocknal_setup_sock (sock);
727         if (rc != 0)
728                 return (rc);
729
730         if (route == NULL) {
731                 /* acceptor or explicit connect */
732                 nid = PTL_NID_ANY;
733         } else {
734                 LASSERT (type != SOCKNAL_CONN_NONE);
735                 /* autoconnect: expect this nid on exchange */
736                 nid = route->ksnr_peer->ksnp_nid;
737         }
738
739         rc = ksocknal_hello (sock, &nid, &type, &incarnation);
740         if (rc != 0)
741                 return (rc);
742         
743         peer = NULL;
744         if (route == NULL) {                    /* not autoconnect */
745                 /* Assume this socket connects to a brand new peer */
746                 peer = ksocknal_create_peer (nid);
747                 if (peer == NULL)
748                         return (-ENOMEM);
749         }
750
751         PORTAL_ALLOC(conn, sizeof(*conn));
752         if (conn == NULL) {
753                 if (peer != NULL)
754                         ksocknal_put_peer (peer);
755                 return (-ENOMEM);
756         }
757
758         memset (conn, 0, sizeof (*conn));
759         conn->ksnc_peer = NULL;
760         conn->ksnc_route = NULL;
761         conn->ksnc_sock = sock;
762         conn->ksnc_type = type;
763         conn->ksnc_incarnation = incarnation;
764         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
765         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
766         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
767
768         conn->ksnc_rx_ready = 0;
769         conn->ksnc_rx_scheduled = 0;
770         ksocknal_new_packet (conn, 0);
771
772         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
773         conn->ksnc_tx_ready = 0;
774         conn->ksnc_tx_scheduled = 0;
775         atomic_set (&conn->ksnc_tx_nob, 0);
776
777         ksocknal_get_peer_addr (conn);
778
779         CWARN("New conn nid:"LPX64" ip:%08x/%d incarnation:"LPX64"\n",
780               nid, conn->ksnc_ipaddr, conn->ksnc_port, incarnation);
781
782         irq = ksocknal_conn_irq (conn);
783
784         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
785
786         if (route != NULL) {
787                 /* Autoconnected! */
788                 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
789                 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
790
791                 if (route->ksnr_deleted) {
792                         /* This conn was autoconnected, but the autoconnect
793                          * route got deleted while it was being
794                          * established! */
795                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
796                                                  flags);
797                         PORTAL_FREE (conn, sizeof (*conn));
798                         return (-ESTALE);
799                 }
800
801
802                 /* associate conn/route */
803                 conn->ksnc_route = route;
804                 atomic_inc (&route->ksnr_refcount);
805
806                 route->ksnr_connecting &= ~(1 << type);
807                 route->ksnr_connected  |= (1 << type);
808                 route->ksnr_conn_count++;
809                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
810
811                 peer = route->ksnr_peer;
812         } else {
813                 /* Not an autoconnected connection; see if there is an
814                  * existing peer for this NID */
815                 peer2 = ksocknal_find_peer_locked (nid);
816                 if (peer2 != NULL) {
817                         ksocknal_put_peer (peer);
818                         peer = peer2;
819                 } else {
820                         list_add (&peer->ksnp_list,
821                                   ksocknal_nid2peerlist (nid));
822                         /* peer list takes over existing ref */
823                 }
824         }
825
826         LASSERT (!peer->ksnp_closing);
827
828         conn->ksnc_peer = peer;
829         atomic_inc (&peer->ksnp_refcount);
830         peer->ksnp_last_alive = jiffies;
831         peer->ksnp_error = 0;
832
833         /* Set the deadline for the outgoing HELLO to drain */
834         conn->ksnc_tx_deadline = jiffies +
835                                  ksocknal_data.ksnd_io_timeout * HZ;
836
837         list_add (&conn->ksnc_list, &peer->ksnp_conns);
838         atomic_inc (&conn->ksnc_refcount);
839
840         sched = ksocknal_choose_scheduler_locked (irq);
841         sched->kss_nconns++;
842         conn->ksnc_scheduler = sched;
843
844         /* NB my callbacks block while I hold ksnd_global_lock */
845         sock->sk->sk_user_data = conn;
846         sock->sk->sk_data_ready = ksocknal_data_ready;
847         sock->sk->sk_write_space = ksocknal_write_space;
848
849         /* Take all the packets blocking for a connection.
850          * NB, it might be nicer to share these blocked packets among any
851          * other connections that are becoming established, however that
852          * confuses the normal packet launching operation, which selects a
853          * connection and queues the packet on it without needing an
854          * exclusive lock on ksnd_global_lock. */
855         while (!list_empty (&peer->ksnp_tx_queue)) {
856                 tx = list_entry (peer->ksnp_tx_queue.next,
857                                  ksock_tx_t, tx_list);
858
859                 list_del (&tx->tx_list);
860                 ksocknal_queue_tx_locked (tx, conn);
861         }
862
863         rc = ksocknal_close_stale_conns_locked (peer, incarnation);
864
865         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
866
867         if (rc != 0)
868                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
869                         rc, conn->ksnc_peer->ksnp_nid,
870                         HIPQUAD(conn->ksnc_ipaddr));
871
872         if (bind_irq)                           /* irq binding required */
873                 ksocknal_bind_irq (irq);
874
875         /* Call the callbacks right now to get things going. */
876         ksocknal_data_ready (sock->sk, 0);
877         ksocknal_write_space (sock->sk);
878
879         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64" ip %d.%d.%d.%d\n",
880                conn, conn->ksnc_peer->ksnp_nid, HIPQUAD(conn->ksnc_ipaddr));
881
882         ksocknal_put_conn (conn);
883         return (0);
884 }
885
886 void
887 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
888 {
889         /* This just does the immmediate housekeeping, and queues the
890          * connection for the reaper to terminate.
891          * Caller holds ksnd_global_lock exclusively in irq context */
892         ksock_peer_t   *peer = conn->ksnc_peer;
893         ksock_route_t  *route;
894
895         LASSERT (peer->ksnp_error == 0);
896         LASSERT (!conn->ksnc_closing);
897         conn->ksnc_closing = 1;
898         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
899         
900         route = conn->ksnc_route;
901         if (route != NULL) {
902                 /* dissociate conn from route... */
903                 LASSERT (!route->ksnr_deleted);
904                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
905                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
906
907                 route->ksnr_connected &= ~(1 << conn->ksnc_type);
908                 conn->ksnc_route = NULL;
909
910                 list_del (&route->ksnr_list);   /* make route least favourite */
911                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
912                 
913                 ksocknal_put_route (route);     /* drop conn's ref on route */
914         }
915
916         /* ksnd_deathrow_conns takes over peer's ref */
917         list_del (&conn->ksnc_list);
918
919         if (list_empty (&peer->ksnp_conns)) {
920                 /* No more connections to this peer */
921
922                 peer->ksnp_error = error;       /* stash last conn close reason */
923
924                 if (list_empty (&peer->ksnp_routes)) {
925                         /* I've just closed last conn belonging to a
926                          * non-autoconnecting peer */
927                         ksocknal_unlink_peer_locked (peer);
928                 }
929         }
930
931         spin_lock (&ksocknal_data.ksnd_reaper_lock);
932
933         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
934         wake_up (&ksocknal_data.ksnd_reaper_waitq);
935                 
936         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
937 }
938
939 void
940 ksocknal_terminate_conn (ksock_conn_t *conn)
941 {
942         /* This gets called by the reaper (guaranteed thread context) to
943          * disengage the socket from its callbacks and close it.
944          * ksnc_refcount will eventually hit zero, and then the reaper will
945          * destroy it. */
946         unsigned long   flags;
947         ksock_peer_t   *peer = conn->ksnc_peer;
948         ksock_sched_t  *sched = conn->ksnc_scheduler;
949         struct timeval  now;
950         time_t          then = 0;
951         int             notify = 0;
952
953         LASSERT(conn->ksnc_closing);
954
955         /* wake up the scheduler to "send" all remaining packets to /dev/null */
956         spin_lock_irqsave(&sched->kss_lock, flags);
957
958         if (!conn->ksnc_tx_scheduled &&
959             !list_empty(&conn->ksnc_tx_queue)){
960                 list_add_tail (&conn->ksnc_tx_list,
961                                &sched->kss_tx_conns);
962                 /* a closing conn is always ready to tx */
963                 conn->ksnc_tx_ready = 1;
964                 conn->ksnc_tx_scheduled = 1;
965                 /* extra ref for scheduler */
966                 atomic_inc (&conn->ksnc_refcount);
967
968                 wake_up (&sched->kss_waitq);
969         }
970
971         spin_unlock_irqrestore (&sched->kss_lock, flags);
972
973         /* serialise with callbacks */
974         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
975
976         /* Remove conn's network callbacks.
977          * NB I _have_ to restore the callback, rather than storing a noop,
978          * since the socket could survive past this module being unloaded!! */
979         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
980         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
981
982         /* A callback could be in progress already; they hold a read lock
983          * on ksnd_global_lock (to serialise with me) and NOOP if
984          * sk_user_data is NULL. */
985         conn->ksnc_sock->sk->sk_user_data = NULL;
986
987         /* OK, so this conn may not be completely disengaged from its
988          * scheduler yet, but it _has_ committed to terminate... */
989         conn->ksnc_scheduler->kss_nconns--;
990
991         if (peer->ksnp_error != 0) {
992                 /* peer's last conn closed in error */
993                 LASSERT (list_empty (&peer->ksnp_conns));
994                 
995                 /* convert peer's last-known-alive timestamp from jiffies */
996                 do_gettimeofday (&now);
997                 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
998                 notify = 1;
999         }
1000         
1001         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1002
1003         /* The socket is closed on the final put; either here, or in
1004          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1005          * when the connection was established, this will close the socket
1006          * immediately, aborting anything buffered in it. Any hung
1007          * zero-copy transmits will therefore complete in finite time. */
1008         ksocknal_putconnsock (conn);
1009
1010         if (notify)
1011                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
1012                             0, then);
1013 }
1014
1015 void
1016 ksocknal_destroy_conn (ksock_conn_t *conn)
1017 {
1018         /* Final coup-de-grace of the reaper */
1019         CDEBUG (D_NET, "connection %p\n", conn);
1020
1021         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1022         LASSERT (conn->ksnc_route == NULL);
1023         LASSERT (!conn->ksnc_tx_scheduled);
1024         LASSERT (!conn->ksnc_rx_scheduled);
1025         LASSERT (list_empty(&conn->ksnc_tx_queue));
1026
1027         /* complete current receive if any */
1028         switch (conn->ksnc_rx_state) {
1029         case SOCKNAL_RX_BODY:
1030                 CERROR("Completing partial receive from "LPX64
1031                        ", ip %d.%d.%d.%d:%d, with error\n",
1032                        conn->ksnc_peer->ksnp_nid,
1033                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1034                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1035                 break;
1036         case SOCKNAL_RX_BODY_FWD:
1037                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1038                 break;
1039         case SOCKNAL_RX_HEADER:
1040         case SOCKNAL_RX_SLOP:
1041                 break;
1042         default:
1043                 LBUG ();
1044                 break;
1045         }
1046
1047         ksocknal_put_peer (conn->ksnc_peer);
1048
1049         PORTAL_FREE (conn, sizeof (*conn));
1050         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1051 }
1052
1053 void
1054 ksocknal_put_conn (ksock_conn_t *conn)
1055 {
1056         unsigned long flags;
1057
1058         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1059                 conn, conn->ksnc_peer->ksnp_nid,
1060                 atomic_read (&conn->ksnc_refcount));
1061
1062         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1063         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1064                 return;
1065
1066         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1067
1068         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1069         wake_up (&ksocknal_data.ksnd_reaper_waitq);
1070
1071         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1072 }
1073
1074 int
1075 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1076 {
1077         ksock_conn_t       *conn;
1078         struct list_head   *ctmp;
1079         struct list_head   *cnxt;
1080         int                 count = 0;
1081
1082         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1083                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1084
1085                 if (ipaddr == 0 ||
1086                     conn->ksnc_ipaddr == ipaddr) {
1087                         count++;
1088                         ksocknal_close_conn_locked (conn, why);
1089                 }
1090         }
1091
1092         return (count);
1093 }
1094
1095 int
1096 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1097 {
1098         ksock_conn_t       *conn;
1099         struct list_head   *ctmp;
1100         struct list_head   *cnxt;
1101         int                 count = 0;
1102
1103         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1104                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1105
1106                 if (conn->ksnc_incarnation == incarnation)
1107                         continue;
1108
1109                 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1110                       "incarnation:"LPX64"("LPX64")\n",
1111                       peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1112                       conn->ksnc_incarnation, incarnation);
1113                 
1114                 count++;
1115                 ksocknal_close_conn_locked (conn, -ESTALE);
1116         }
1117
1118         return (count);
1119 }
1120
1121 int
1122 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why) 
1123 {
1124         ksock_peer_t     *peer = conn->ksnc_peer;
1125         __u32             ipaddr = conn->ksnc_ipaddr;
1126         unsigned long     flags;
1127         int               count;
1128
1129         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1130
1131         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1132         
1133         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1134
1135         return (count);
1136 }
1137
1138 int
1139 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1140 {
1141         unsigned long       flags;
1142         ksock_peer_t       *peer;
1143         struct list_head   *ptmp;
1144         struct list_head   *pnxt;
1145         int                 lo;
1146         int                 hi;
1147         int                 i;
1148         int                 count = 0;
1149
1150         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1151
1152         if (nid != PTL_NID_ANY)
1153                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1154         else {
1155                 lo = 0;
1156                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1157         }
1158
1159         for (i = lo; i <= hi; i++) {
1160                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1161
1162                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1163
1164                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1165                                 continue;
1166
1167                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1168                 }
1169         }
1170
1171         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1172
1173         /* wildcards always succeed */
1174         if (nid == PTL_NID_ANY || ipaddr == 0)
1175                 return (0);
1176         
1177         return (count == 0 ? -ENOENT : 0);
1178 }
1179
1180 void
1181 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1182 {
1183         /* The router is telling me she's been notified of a change in
1184          * gateway state.... */
1185
1186         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1187
1188         if (!alive) {
1189                 /* If the gateway crashed, close all open connections... */
1190                 ksocknal_close_matching_conns (gw_nid, 0);
1191                 return;
1192         }
1193         
1194         /* ...otherwise do nothing.  We can only establish new connections
1195          * if we have autroutes, and these connect on demand. */
1196 }
1197
1198 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1199 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1200 {
1201         return &(sk->tp_pinfo.af_tcp);
1202 }
1203 #else
1204 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1205 {
1206         struct tcp_sock *s = (struct tcp_sock *)sk;
1207         return &s->tcp;
1208 }
1209 #endif
1210
1211 void
1212 ksocknal_push_conn (ksock_conn_t *conn)
1213 {
1214         struct sock    *sk;
1215         struct tcp_opt *tp;
1216         int             nonagle;
1217         int             val = 1;
1218         int             rc;
1219         mm_segment_t    oldmm;
1220
1221         rc = ksocknal_getconnsock (conn);
1222         if (rc != 0)                            /* being shut down */
1223                 return;
1224         
1225         sk = conn->ksnc_sock->sk;
1226         tp = sock2tcp_opt(sk);
1227         
1228         lock_sock (sk);
1229         nonagle = tp->nonagle;
1230         tp->nonagle = 1;
1231         release_sock (sk);
1232
1233         oldmm = get_fs ();
1234         set_fs (KERNEL_DS);
1235
1236         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1237                                       (char *)&val, sizeof (val));
1238         LASSERT (rc == 0);
1239
1240         set_fs (oldmm);
1241
1242         lock_sock (sk);
1243         tp->nonagle = nonagle;
1244         release_sock (sk);
1245
1246         ksocknal_putconnsock (conn);
1247 }
1248
1249 void
1250 ksocknal_push_peer (ksock_peer_t *peer)
1251 {
1252         int               index;
1253         int               i;
1254         struct list_head *tmp;
1255         ksock_conn_t     *conn;
1256
1257         for (index = 0; ; index++) {
1258                 read_lock (&ksocknal_data.ksnd_global_lock);
1259
1260                 i = 0;
1261                 conn = NULL;
1262
1263                 list_for_each (tmp, &peer->ksnp_conns) {
1264                         if (i++ == index) {
1265                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1266                                 atomic_inc (&conn->ksnc_refcount);
1267                                 break;
1268                         }
1269                 }
1270
1271                 read_unlock (&ksocknal_data.ksnd_global_lock);
1272
1273                 if (conn == NULL)
1274                         break;
1275
1276                 ksocknal_push_conn (conn);
1277                 ksocknal_put_conn (conn);
1278         }
1279 }
1280
1281 int
1282 ksocknal_push (ptl_nid_t nid)
1283 {
1284         ksock_peer_t      *peer;
1285         struct list_head  *tmp;
1286         int                index;
1287         int                i;
1288         int                j;
1289         int                rc = -ENOENT;
1290
1291         if (nid != PTL_NID_ANY) {
1292                 peer = ksocknal_get_peer (nid);
1293
1294                 if (peer != NULL) {
1295                         rc = 0;
1296                         ksocknal_push_peer (peer);
1297                         ksocknal_put_peer (peer);
1298                 }
1299                 return (rc);
1300         }
1301
1302         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1303                 for (j = 0; ; j++) {
1304                         read_lock (&ksocknal_data.ksnd_global_lock);
1305
1306                         index = 0;
1307                         peer = NULL;
1308
1309                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1310                                 if (index++ == j) {
1311                                         peer = list_entry(tmp, ksock_peer_t,
1312                                                           ksnp_list);
1313                                         atomic_inc (&peer->ksnp_refcount);
1314                                         break;
1315                                 }
1316                         }
1317
1318                         read_unlock (&ksocknal_data.ksnd_global_lock);
1319
1320                         if (peer != NULL) {
1321                                 rc = 0;
1322                                 ksocknal_push_peer (peer);
1323                                 ksocknal_put_peer (peer);
1324                         }
1325                 }
1326
1327         }
1328
1329         return (rc);
1330 }
1331
1332 int
1333 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1334 {
1335         int rc = -EINVAL;
1336
1337         LASSERT (pcfg != NULL);
1338
1339         switch(pcfg->pcfg_command) {
1340         case NAL_CMD_GET_AUTOCONN: {
1341                 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1342
1343                 if (route == NULL)
1344                         rc = -ENOENT;
1345                 else {
1346                         rc = 0;
1347                         pcfg->pcfg_nid   = route->ksnr_peer->ksnp_nid;
1348                         pcfg->pcfg_id    = route->ksnr_ipaddr;
1349                         pcfg->pcfg_misc  = route->ksnr_port;
1350                         pcfg->pcfg_count = route->ksnr_conn_count;
1351                         pcfg->pcfg_size  = route->ksnr_buffer_size;
1352                         pcfg->pcfg_wait  = route->ksnr_sharecount;
1353                         pcfg->pcfg_flags = (route->ksnr_irq_affinity ? 2 : 0) |
1354                                            (route->ksnr_eager        ? 4 : 0);
1355                         ksocknal_put_route (route);
1356                 }
1357                 break;
1358         }
1359         case NAL_CMD_ADD_AUTOCONN: {
1360                 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1361                                          pcfg->pcfg_misc, pcfg->pcfg_size,
1362                                          (pcfg->pcfg_flags & 0x02) != 0,
1363                                          (pcfg->pcfg_flags & 0x04) != 0,
1364                                          (pcfg->pcfg_flags & 0x08) != 0);
1365                 break;
1366         }
1367         case NAL_CMD_DEL_AUTOCONN: {
1368                 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id, 
1369                                          (pcfg->pcfg_flags & 1) != 0,
1370                                          (pcfg->pcfg_flags & 2) != 0);
1371                 break;
1372         }
1373         case NAL_CMD_GET_CONN: {
1374                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1375
1376                 if (conn == NULL)
1377                         rc = -ENOENT;
1378                 else {
1379                         rc = 0;
1380                         pcfg->pcfg_nid   = conn->ksnc_peer->ksnp_nid;
1381                         pcfg->pcfg_id    = conn->ksnc_ipaddr;
1382                         pcfg->pcfg_misc  = conn->ksnc_port;
1383                         pcfg->pcfg_flags = conn->ksnc_type;
1384                         ksocknal_put_conn (conn);
1385                 }
1386                 break;
1387         }
1388         case NAL_CMD_REGISTER_PEER_FD: {
1389                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1390                 int            type = pcfg->pcfg_misc;
1391
1392                 if (sock == NULL)
1393                         break;
1394
1395                 switch (type) {
1396                 case SOCKNAL_CONN_NONE:
1397                 case SOCKNAL_CONN_ANY:
1398                 case SOCKNAL_CONN_CONTROL:
1399                 case SOCKNAL_CONN_BULK_IN:
1400                 case SOCKNAL_CONN_BULK_OUT:
1401                         rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1402                 default:
1403                         break;
1404                 }
1405                 if (rc != 0)
1406                         fput (sock->file);
1407                 break;
1408         }
1409         case NAL_CMD_CLOSE_CONNECTION: {
1410                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid, 
1411                                                     pcfg->pcfg_id);
1412                 break;
1413         }
1414         case NAL_CMD_REGISTER_MYNID: {
1415                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1416                 break;
1417         }
1418         case NAL_CMD_PUSH_CONNECTION: {
1419                 rc = ksocknal_push (pcfg->pcfg_nid);
1420                 break;
1421         }
1422         }
1423
1424         return rc;
1425 }
1426
1427 void
1428 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1429 {
1430         int          npages = p->fmp_buff_pages;
1431         ksock_fmb_t *fmb;
1432         int          i;
1433
1434         LASSERT (list_empty(&p->fmp_blocked_conns));
1435         LASSERT (p->fmp_nactive_fmbs == 0);
1436         
1437         while (!list_empty(&p->fmp_idle_fmbs)) {
1438
1439                 fmb = list_entry(p->fmp_idle_fmbs.next,
1440                                  ksock_fmb_t, fmb_list);
1441                 
1442                 for (i = 0; i < npages; i++)
1443                         if (fmb->fmb_kiov[i].kiov_page != NULL)
1444                                 __free_page(fmb->fmb_kiov[i].kiov_page);
1445
1446                 list_del(&fmb->fmb_list);
1447                 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1448         }
1449 }
1450
1451 void
1452 ksocknal_free_buffers (void)
1453 {
1454         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1455         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1456
1457         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1458
1459         if (ksocknal_data.ksnd_schedulers != NULL)
1460                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1461                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1462
1463         PORTAL_FREE (ksocknal_data.ksnd_peers,
1464                      sizeof (struct list_head) * 
1465                      ksocknal_data.ksnd_peer_hash_size);
1466 }
1467
1468 void
1469 ksocknal_module_fini (void)
1470 {
1471         int   i;
1472
1473         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1474                atomic_read (&portal_kmemory));
1475
1476         switch (ksocknal_data.ksnd_init) {
1477         default:
1478                 LASSERT (0);
1479
1480         case SOCKNAL_INIT_ALL:
1481 #if CONFIG_SYSCTL
1482                 if (ksocknal_data.ksnd_sysctl != NULL)
1483                         unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1484 #endif
1485                 kportal_nal_unregister(SOCKNAL);
1486                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1487                 /* fall through */
1488
1489         case SOCKNAL_INIT_PTL:
1490                 /* No more calls to ksocknal_cmd() to create new
1491                  * autoroutes/connections since we're being unloaded. */
1492                 PtlNIFini(ksocknal_ni);
1493
1494                 /* Delete all autoroute entries */
1495                 ksocknal_del_route(PTL_NID_ANY, 0, 0, 0);
1496
1497                 /* Delete all connections */
1498                 ksocknal_close_matching_conns (PTL_NID_ANY, 0);
1499                 
1500                 /* Wait for all peer state to clean up */
1501                 i = 2;
1502                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1503                         i++;
1504                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1505                                "waiting for %d peers to disconnect\n",
1506                                atomic_read (&ksocknal_data.ksnd_npeers));
1507                         set_current_state (TASK_UNINTERRUPTIBLE);
1508                         schedule_timeout (HZ);
1509                 }
1510
1511                 /* Tell lib we've stopped calling into her. */
1512                 lib_fini(&ksocknal_lib);
1513                 /* fall through */
1514
1515         case SOCKNAL_INIT_DATA:
1516                 /* Module refcount only gets to zero when all peers
1517                  * have been closed so all lists must be empty */
1518                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1519                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1520                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1521                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1522                 }
1523                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1524                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1525                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1526                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1527                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1528
1529                 if (ksocknal_data.ksnd_schedulers != NULL)
1530                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1531                                 ksock_sched_t *kss =
1532                                         &ksocknal_data.ksnd_schedulers[i];
1533
1534                                 LASSERT (list_empty (&kss->kss_tx_conns));
1535                                 LASSERT (list_empty (&kss->kss_rx_conns));
1536                                 LASSERT (kss->kss_nconns == 0);
1537                         }
1538
1539                 /* stop router calling me */
1540                 kpr_shutdown (&ksocknal_data.ksnd_router);
1541
1542                 /* flag threads to terminate; wake and wait for them to die */
1543                 ksocknal_data.ksnd_shuttingdown = 1;
1544                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1545                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1546
1547                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1548                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1549
1550                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1551                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1552                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1553                         set_current_state (TASK_UNINTERRUPTIBLE);
1554                         schedule_timeout (HZ);
1555                 }
1556
1557                 kpr_deregister (&ksocknal_data.ksnd_router);
1558
1559                 ksocknal_free_buffers();
1560                 /* fall through */
1561
1562         case SOCKNAL_INIT_NOTHING:
1563                 break;
1564         }
1565
1566         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1567                atomic_read (&portal_kmemory));
1568
1569         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1570                atomic_read(&portal_kmemory));
1571 }
1572
1573
1574 void __init
1575 ksocknal_init_incarnation (void)
1576 {
1577         struct timeval tv;
1578
1579         /* The incarnation number is the time this module loaded and it
1580          * identifies this particular instance of the socknal.  Hopefully
1581          * we won't be able to reboot more frequently than 1MHz for the
1582          * forseeable future :) */
1583         
1584         do_gettimeofday(&tv);
1585         
1586         ksocknal_data.ksnd_incarnation = 
1587                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1588 }
1589
1590 int __init
1591 ksocknal_module_init (void)
1592 {
1593         int   pkmem = atomic_read(&portal_kmemory);
1594         int   rc;
1595         int   i;
1596         int   j;
1597
1598         /* packet descriptor must fit in a router descriptor's scratchpad */
1599         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1600         /* the following must be sizeof(int) for proc_dointvec() */
1601         LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1602         LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1603         /* check ksnr_connected/connecting field large enough */
1604         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1605         
1606         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1607
1608         ksocknal_api.forward  = ksocknal_api_forward;
1609         ksocknal_api.shutdown = ksocknal_api_shutdown;
1610         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
1611         ksocknal_api.lock     = ksocknal_api_lock;
1612         ksocknal_api.unlock   = ksocknal_api_unlock;
1613         ksocknal_api.nal_data = &ksocknal_data;
1614
1615         ksocknal_lib.nal_data = &ksocknal_data;
1616
1617         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1618
1619         ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1620         ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
1621         ksocknal_data.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1622         ksocknal_data.ksnd_min_bulk   = SOCKNAL_MIN_BULK;
1623 #if SOCKNAL_ZC
1624         ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1625 #endif
1626         ksocknal_init_incarnation();
1627         
1628         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1629         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1630                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1631         if (ksocknal_data.ksnd_peers == NULL)
1632                 return (-ENOMEM);
1633
1634         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1635                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1636
1637         rwlock_init(&ksocknal_data.ksnd_global_lock);
1638
1639         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1640         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1641         init_waitqueue_head(&ksocknal_data.ksnd_yield_waitq);
1642         
1643         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1644         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1645         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1646         ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
1647
1648         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1649         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1650         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1651         ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
1652
1653         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1654         INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1655         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1656         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1657         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1658
1659         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1660         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1661         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1662
1663         /* NB memset above zeros whole of ksocknal_data, including
1664          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1665
1666         /* flag lists/ptrs/locks initialised */
1667         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1668
1669         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1670                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1671         if (ksocknal_data.ksnd_schedulers == NULL) {
1672                 ksocknal_module_fini ();
1673                 return (-ENOMEM);
1674         }
1675
1676         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1677                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1678
1679                 spin_lock_init (&kss->kss_lock);
1680                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1681                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1682 #if SOCKNAL_ZC
1683                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1684 #endif
1685                 init_waitqueue_head (&kss->kss_waitq);
1686         }
1687
1688         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1689         if (rc != 0) {
1690                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1691                 ksocknal_module_fini ();
1692                 return (rc);
1693         }
1694         PtlNIDebug(ksocknal_ni, ~0);
1695
1696         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1697
1698         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1699                 rc = ksocknal_thread_start (ksocknal_scheduler,
1700                                             &ksocknal_data.ksnd_schedulers[i]);
1701                 if (rc != 0) {
1702                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1703                                i, rc);
1704                         ksocknal_module_fini ();
1705                         return (rc);
1706                 }
1707         }
1708
1709         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1710                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1711                 if (rc != 0) {
1712                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1713                         ksocknal_module_fini ();
1714                         return (rc);
1715                 }
1716         }
1717
1718         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1719         if (rc != 0) {
1720                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1721                 ksocknal_module_fini ();
1722                 return (rc);
1723         }
1724
1725         rc = kpr_register(&ksocknal_data.ksnd_router,
1726                           &ksocknal_router_interface);
1727         if (rc != 0) {
1728                 CDEBUG(D_NET, "Can't initialise routing interface "
1729                        "(rc = %d): not routing\n", rc);
1730         } else {
1731                 /* Only allocate forwarding buffers if I'm on a gateway */
1732
1733                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1734                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1735                         ksock_fmb_t      *fmb;
1736                         ksock_fmb_pool_t *pool;
1737                         
1738
1739                         if (i < SOCKNAL_SMALL_FWD_NMSGS)
1740                                 pool = &ksocknal_data.ksnd_small_fmp;
1741                         else
1742                                 pool = &ksocknal_data.ksnd_large_fmp;
1743                         
1744                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t, 
1745                                                    fmb_kiov[pool->fmp_buff_pages]));
1746                         if (fmb == NULL) {
1747                                 ksocknal_module_fini();
1748                                 return (-ENOMEM);
1749                         }
1750
1751                         fmb->fmb_pool = pool;
1752                         
1753                         for (j = 0; j < pool->fmp_buff_pages; j++) {
1754                                 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
1755
1756                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
1757                                         ksocknal_module_fini ();
1758                                         return (-ENOMEM);
1759                                 }
1760
1761                                 LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
1762                         }
1763
1764                         list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
1765                 }
1766         }
1767
1768         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1769         if (rc != 0) {
1770                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1771                 ksocknal_module_fini ();
1772                 return (rc);
1773         }
1774
1775         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1776
1777 #ifdef CONFIG_SYSCTL
1778         /* Press on regardless even if registering sysctl doesn't work */
1779         ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1780 #endif
1781         /* flag everything initialised */
1782         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1783
1784         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1785                "(Routing %s, initial mem %d, incarnation "LPX64")\n",
1786                kpr_routing (&ksocknal_data.ksnd_router) ?
1787                "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
1788
1789         return (0);
1790 }
1791
1792 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1793 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1794 MODULE_LICENSE("GPL");
1795
1796 module_init(ksocknal_module_init);
1797 module_exit(ksocknal_module_fini);
1798
1799 EXPORT_SYMBOL (ksocknal_ni);