Whamcloud - gitweb
e7232a05a91b9eec42339d1039a394f6cb7273c8
[fs/lustre-release.git] / lustre / portals / knals / socknal / socknal.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40 };
41
42
43 int
44 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
45                        void *ret, size_t ret_len)
46 {
47         ksock_nal_data_t *k;
48         nal_cb_t *nal_cb;
49
50         k = nal->nal_data;
51         nal_cb = k->ksnd_nal_cb;
52
53         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
54         return PTL_OK;
55 }
56
57 int
58 ksocknal_api_shutdown(nal_t *nal, int ni)
59 {
60         CDEBUG (D_NET, "closing all connections\n");
61
62         ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
63         ksocknal_close_conn (PTL_NID_ANY, 0);
64         return PTL_OK;
65 }
66
67 void
68 ksocknal_api_yield(nal_t *nal)
69 {
70         our_cond_resched();
71         return;
72 }
73
74 void
75 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
76 {
77         ksock_nal_data_t *k;
78         nal_cb_t *nal_cb;
79
80         k = nal->nal_data;
81         nal_cb = k->ksnd_nal_cb;
82         nal_cb->cb_cli(nal_cb,flags);
83 }
84
85 void
86 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
87 {
88         ksock_nal_data_t *k;
89         nal_cb_t *nal_cb;
90
91         k = nal->nal_data;
92         nal_cb = k->ksnd_nal_cb;
93         nal_cb->cb_sti(nal_cb,flags);
94 }
95
96 nal_t *
97 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
98               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
99 {
100         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
101         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
102         return (&ksocknal_api);
103 }
104
105 /*
106  *  EXTRA functions follow
107  */
108
109 int
110 ksocknal_set_mynid(ptl_nid_t nid)
111 {
112         lib_ni_t *ni = &ksocknal_lib.ni;
113
114         /* FIXME: we have to do this because we call lib_init() at module
115          * insertion time, which is before we have 'mynid' available.  lib_init
116          * sets the NAL's nid, which it uses to tell other nodes where packets
117          * are coming from.  This is not a very graceful solution to this
118          * problem. */
119
120         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
121                nid, ni->nid);
122
123         ni->nid = nid;
124         return (0);
125 }
126
127 void
128 ksocknal_bind_irq (unsigned int irq)
129 {
130 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
131         int              bind;
132         unsigned long    flags;
133         char             cmdline[64];
134         ksock_irqinfo_t *info;
135         char            *argv[] = {"/bin/sh",
136                                    "-c",
137                                    cmdline,
138                                    NULL};
139         char            *envp[] = {"HOME=/",
140                                    "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
141                                    NULL};
142
143         LASSERT (irq < NR_IRQS);
144         if (irq == 0)                           /* software NIC */
145                 return;
146
147         info = &ksocknal_data.ksnd_irqinfo[irq];
148
149         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
150
151         LASSERT (info->ksni_valid);
152         bind = !info->ksni_bound;
153         info->ksni_bound = 1;
154
155         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
156
157         if (!bind)                              /* bound already */
158                 return;
159
160         snprintf (cmdline, sizeof (cmdline),
161                   "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
162
163         printk (KERN_INFO "Binding irq %u to CPU %d with cmd: %s\n",
164                 irq, info->ksni_sched, cmdline);
165
166         /* FIXME: Find a better method of setting IRQ affinity...
167          */
168
169         call_usermodehelper (argv[0], argv, envp);
170 #endif
171 }
172
173 ksock_route_t *
174 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
175                        int irq_affinity, int xchange_nids, int nonagel)
176 {
177         ksock_route_t *route;
178
179         PORTAL_ALLOC (route, sizeof (*route));
180         if (route == NULL)
181                 return (NULL);
182
183         atomic_set (&route->ksnr_refcount, 1);
184         route->ksnr_sharecount = 0;
185         route->ksnr_peer = NULL;
186         route->ksnr_timeout = jiffies_64;
187         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
188         route->ksnr_ipaddr = ipaddr;
189         route->ksnr_port = port;
190         route->ksnr_buffer_size = buffer_size;
191         route->ksnr_irq_affinity = irq_affinity;
192         route->ksnr_xchange_nids = xchange_nids;
193         route->ksnr_nonagel = nonagel;
194         route->ksnr_connecting = 0;
195         route->ksnr_deleted = 0;
196         route->ksnr_generation = 0;
197         route->ksnr_conn = NULL;
198
199         return (route);
200 }
201
202 void
203 ksocknal_destroy_route (ksock_route_t *route)
204 {
205         LASSERT (route->ksnr_sharecount == 0);
206         LASSERT (route->ksnr_conn == NULL);
207
208         if (route->ksnr_peer != NULL)
209                 ksocknal_put_peer (route->ksnr_peer);
210
211         PORTAL_FREE (route, sizeof (*route));
212 }
213
214 void
215 ksocknal_put_route (ksock_route_t *route)
216 {
217         CDEBUG (D_OTHER, "putting route[%p] -> "LPX64" (%d)\n",
218                 route, route->ksnr_peer->ksnp_nid,
219                 atomic_read (&route->ksnr_refcount));
220
221         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
222         if (!atomic_dec_and_test (&route->ksnr_refcount))
223              return;
224
225         ksocknal_destroy_route (route);
226 }
227
228 ksock_peer_t *
229 ksocknal_create_peer (ptl_nid_t nid)
230 {
231         ksock_peer_t *peer;
232
233         LASSERT (nid != PTL_NID_ANY);
234
235         PORTAL_ALLOC (peer, sizeof (*peer));
236         if (peer == NULL)
237                 return (NULL);
238
239         memset (peer, 0, sizeof (*peer));
240
241         peer->ksnp_nid = nid;
242         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
243         peer->ksnp_closing = 0;
244         INIT_LIST_HEAD (&peer->ksnp_conns);
245         INIT_LIST_HEAD (&peer->ksnp_routes);
246         INIT_LIST_HEAD (&peer->ksnp_tx_queue);
247
248         /* Can't unload while peers exist; ensures all I/O has terminated
249          * before unload attempts */
250         PORTAL_MODULE_USE;
251         atomic_inc (&ksocknal_data.ksnd_npeers);
252         return (peer);
253 }
254
255 void
256 ksocknal_destroy_peer (ksock_peer_t *peer)
257 {
258         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
259
260         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
261         LASSERT (list_empty (&peer->ksnp_conns));
262         LASSERT (list_empty (&peer->ksnp_routes));
263         LASSERT (list_empty (&peer->ksnp_tx_queue));
264
265         PORTAL_FREE (peer, sizeof (*peer));
266
267         /* NB a peer's connections and autoconnect routes keep a reference
268          * on their peer until they are destroyed, so we can be assured
269          * that _all_ state to do with this peer has been cleaned up when
270          * its refcount drops to zero. */
271         atomic_dec (&ksocknal_data.ksnd_npeers);
272         PORTAL_MODULE_UNUSE;
273 }
274
275 void
276 ksocknal_put_peer (ksock_peer_t *peer)
277 {
278         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
279                 peer, peer->ksnp_nid,
280                 atomic_read (&peer->ksnp_refcount));
281
282         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
283         if (!atomic_dec_and_test (&peer->ksnp_refcount))
284                 return;
285
286         ksocknal_destroy_peer (peer);
287 }
288
289 ksock_peer_t *
290 ksocknal_find_peer_locked (ptl_nid_t nid)
291 {
292         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
293         struct list_head *tmp;
294         ksock_peer_t     *peer;
295
296         list_for_each (tmp, peer_list) {
297
298                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
299
300                 LASSERT (!peer->ksnp_closing);
301                 LASSERT (!(list_empty (&peer->ksnp_routes) &&
302                            list_empty (&peer->ksnp_conns)));
303
304                 if (peer->ksnp_nid != nid)
305                         continue;
306
307                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
308                        peer, nid, atomic_read (&peer->ksnp_refcount));
309                 return (peer);
310         }
311         return (NULL);
312 }
313
314 ksock_peer_t *
315 ksocknal_get_peer (ptl_nid_t nid)
316 {
317         ksock_peer_t     *peer;
318
319         read_lock (&ksocknal_data.ksnd_global_lock);
320         peer = ksocknal_find_peer_locked (nid);
321         if (peer != NULL)                       /* +1 ref for caller? */
322                 atomic_inc (&peer->ksnp_refcount);
323         read_unlock (&ksocknal_data.ksnd_global_lock);
324
325         return (peer);
326 }
327
328 void
329 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
330 {
331         LASSERT (!peer->ksnp_closing);
332         peer->ksnp_closing = 1;
333         list_del (&peer->ksnp_list);
334         /* lose peerlist's ref */
335         ksocknal_put_peer (peer);
336 }
337
338 ksock_route_t *
339 ksocknal_get_route_by_idx (int index)
340 {
341         ksock_peer_t      *peer;
342         struct list_head  *ptmp;
343         ksock_route_t     *route;
344         struct list_head  *rtmp;
345         int                i;
346
347         read_lock (&ksocknal_data.ksnd_global_lock);
348
349         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
350                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
351                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
352
353                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
354                                    list_empty (&peer->ksnp_conns)));
355
356                         list_for_each (rtmp, &peer->ksnp_routes) {
357                                 if (index-- > 0)
358                                         continue;
359
360                                 route = list_entry (rtmp, ksock_route_t, ksnr_list);
361                                 atomic_inc (&route->ksnr_refcount);
362                                 read_unlock (&ksocknal_data.ksnd_global_lock);
363                                 return (route);
364                         }
365                 }
366         }
367
368         read_unlock (&ksocknal_data.ksnd_global_lock);
369         return (NULL);
370 }
371
372 int
373 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
374                     int nonagle, int xchange_nids, int bind_irq, int share)
375 {
376         unsigned long      flags;
377         ksock_peer_t      *peer;
378         ksock_peer_t      *peer2;
379         ksock_route_t     *route;
380         struct list_head  *rtmp;
381         ksock_route_t     *route2;
382         
383         if (nid == PTL_NID_ANY)
384                 return (-EINVAL);
385
386         /* Have a brand new peer ready... */
387         peer = ksocknal_create_peer (nid);
388         if (peer == NULL)
389                 return (-ENOMEM);
390
391         route = ksocknal_create_route (ipaddr, port, bufnob,
392                                        nonagle, xchange_nids, bind_irq);
393         if (route == NULL) {
394                 ksocknal_put_peer (peer);
395                 return (-ENOMEM);
396         }
397
398         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
399
400         peer2 = ksocknal_find_peer_locked (nid);
401         if (peer2 != NULL) {
402                 ksocknal_put_peer (peer);
403                 peer = peer2;
404         } else {
405                 /* peer table takes existing ref on peer */
406                 list_add (&peer->ksnp_list,
407                           ksocknal_nid2peerlist (nid));
408         }
409
410         route2 = NULL;
411         if (share) {
412                 /* check for existing route to this NID via this ipaddr */
413                 list_for_each (rtmp, &peer->ksnp_routes) {
414                         route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
415                         
416                         if (route2->ksnr_ipaddr == ipaddr)
417                                 break;
418
419                         route2 = NULL;
420                 }
421         }
422
423         if (route2 != NULL) {
424                 ksocknal_put_route (route);
425                 route = route2;
426         } else {
427                 /* route takes a ref on peer */
428                 route->ksnr_peer = peer;
429                 atomic_inc (&peer->ksnp_refcount);
430                 /* peer's route list takes existing ref on route */
431                 list_add (&route->ksnr_list, &peer->ksnp_routes);
432         }
433         
434         route->ksnr_sharecount++;
435
436         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
437
438         return (0);
439 }
440
441 void
442 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
443 {
444         ksock_peer_t *peer = route->ksnr_peer;
445         ksock_conn_t *conn = route->ksnr_conn;
446
447         if (!share)
448                 route->ksnr_sharecount = 0;
449         else {
450                 route->ksnr_sharecount--;
451                 if (route->ksnr_sharecount != 0)
452                         return;
453         }
454
455         if (conn != NULL) {
456                 if (!keep_conn)
457                         ksocknal_close_conn_locked (conn);
458                 else {
459                         /* keeping the conn; just dissociate it and route... */
460                         conn->ksnc_route = NULL;
461                         route->ksnr_conn = NULL;
462                         ksocknal_put_route (route); /* drop conn's ref on route */
463                         ksocknal_put_conn (conn); /* drop route's ref on conn */
464                 }
465         }
466
467         route->ksnr_deleted = 1;
468         list_del (&route->ksnr_list);
469         ksocknal_put_route (route);             /* drop peer's ref */
470
471         if (list_empty (&peer->ksnp_routes) &&
472             list_empty (&peer->ksnp_conns)) {
473                 /* I've just removed the last autoconnect route of a peer
474                  * with no active connections */
475                 ksocknal_unlink_peer_locked (peer);
476         }
477 }
478
479 int
480 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
481 {
482         unsigned long      flags;
483         struct list_head  *ptmp;
484         struct list_head  *pnxt;
485         ksock_peer_t      *peer;
486         struct list_head  *rtmp;
487         struct list_head  *rnxt;
488         ksock_route_t     *route;
489         int                lo;
490         int                hi;
491         int                i;
492         int                rc = -ENOENT;
493
494         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
495
496         if (nid != PTL_NID_ANY)
497                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
498         else {
499                 lo = 0;
500                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
501         }
502
503         for (i = lo; i <= hi; i++) {
504                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
505                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
506
507                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
508                                 continue;
509
510                         list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
511                                 route = list_entry (rtmp, ksock_route_t,
512                                                     ksnr_list);
513
514                                 if (!(ipaddr == 0 ||
515                                       route->ksnr_ipaddr == ipaddr))
516                                         continue;
517
518                                 ksocknal_del_route_locked (route, share, keep_conn);
519                                 rc = 0;         /* matched something */
520                                 if (share)
521                                         goto out;
522                         }
523                 }
524         }
525  out:
526         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
527
528         return (rc);
529 }
530
531 ksock_conn_t *
532 ksocknal_get_conn_by_idx (int index)
533 {
534         ksock_peer_t      *peer;
535         struct list_head  *ptmp;
536         ksock_conn_t      *conn;
537         struct list_head  *ctmp;
538         int                i;
539
540         read_lock (&ksocknal_data.ksnd_global_lock);
541
542         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
543                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
544                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
545
546                         LASSERT (!(list_empty (&peer->ksnp_routes) &&
547                                    list_empty (&peer->ksnp_conns)));
548
549                         list_for_each (ctmp, &peer->ksnp_conns) {
550                                 if (index-- > 0)
551                                         continue;
552
553                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
554                                 atomic_inc (&conn->ksnc_refcount);
555                                 read_unlock (&ksocknal_data.ksnd_global_lock);
556                                 return (conn);
557                         }
558                 }
559         }
560
561         read_unlock (&ksocknal_data.ksnd_global_lock);
562         return (NULL);
563 }
564
565 void
566 ksocknal_get_peer_addr (ksock_conn_t *conn)
567 {
568         struct sockaddr_in sin;
569         int                len = sizeof (sin);
570         int                rc;
571
572         rc = ksocknal_getconnsock (conn);
573         LASSERT (rc == 0);
574         
575         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
576                                             (struct sockaddr *)&sin, &len, 2);
577         LASSERT (len <= sizeof (sin));
578         ksocknal_putconnsock (conn);
579
580         if (rc != 0) {
581                 CERROR ("Error %d getting sock peer IP\n", rc);
582                 return;
583         }
584
585         conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
586         conn->ksnc_port   = ntohs (sin.sin_port);
587 }
588
589 unsigned int
590 ksocknal_conn_irq (ksock_conn_t *conn)
591 {
592         int                irq = 0;
593         int                rc;
594         struct dst_entry  *dst;
595
596         rc = ksocknal_getconnsock (conn);
597         LASSERT (rc == 0);
598         
599         dst = sk_dst_get (conn->ksnc_sock->sk);
600         if (dst != NULL) {
601                 if (dst->dev != NULL) {
602                         irq = dst->dev->irq;
603                         if (irq >= NR_IRQS) {
604                                 CERROR ("Unexpected IRQ %x\n", irq);
605                                 irq = 0;
606                         }
607                 }
608                 dst_release (dst);
609         }
610         
611         ksocknal_putconnsock (conn);
612         return (irq);
613 }
614
615 ksock_sched_t *
616 ksocknal_choose_scheduler_locked (unsigned int irq)
617 {
618         ksock_sched_t    *sched;
619         ksock_irqinfo_t  *info;
620         int               i;
621
622         LASSERT (irq < NR_IRQS);
623         info = &ksocknal_data.ksnd_irqinfo[irq];
624
625         if (irq != 0 &&                         /* hardware NIC */
626             info->ksni_valid) {                 /* already set up */
627                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
628         }
629
630         /* software NIC (irq == 0) || not associated with a scheduler yet.
631          * Choose the CPU with the fewest connections... */
632         sched = &ksocknal_data.ksnd_schedulers[0];
633         for (i = 1; i < SOCKNAL_N_SCHED; i++)
634                 if (sched->kss_nconns >
635                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
636                         sched = &ksocknal_data.ksnd_schedulers[i];
637
638         if (irq != 0) {                         /* Hardware NIC */
639                 info->ksni_valid = 1;
640                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
641
642                 /* no overflow... */
643                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
644         }
645
646         return (sched);
647 }
648
649 int
650 ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
651                       struct socket *sock, int bind_irq)
652 {
653         unsigned long      flags;
654         ksock_conn_t      *conn;
655         ksock_peer_t      *peer;
656         ksock_peer_t      *peer2;
657         ksock_sched_t     *sched;
658         unsigned int       irq;
659         ksock_tx_t        *tx;
660         int                rc;
661
662         /* NB, sock has an associated file since (a) this connection might
663          * have been created in userland and (b) we need the refcounting so
664          * that we don't close the socket while I/O is being done on it. */
665         LASSERT (sock->file != NULL);
666
667         rc = ksocknal_set_linger (sock);
668         if (rc != 0)
669                 return (rc);
670
671         peer = NULL;
672         if (route == NULL) {                    /* not autoconnect */
673                 /* Assume this socket connects to a brand new peer */
674                 peer = ksocknal_create_peer (nid);
675                 if (peer == NULL)
676                         return (-ENOMEM);
677         }
678
679         PORTAL_ALLOC(conn, sizeof(*conn));
680         if (conn == NULL) {
681                 if (peer != NULL)
682                         ksocknal_put_peer (peer);
683                 return (-ENOMEM);
684         }
685
686         memset (conn, 0, sizeof (*conn));
687         conn->ksnc_peer = NULL;
688         conn->ksnc_route = NULL;
689         conn->ksnc_sock = sock;
690         conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
691         conn->ksnc_saved_write_space = sock->sk->sk_write_space;
692         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
693
694         conn->ksnc_rx_ready = 0;
695         conn->ksnc_rx_scheduled = 0;
696         ksocknal_new_packet (conn, 0);
697
698         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
699 #if SOCKNAL_ZC
700         INIT_LIST_HEAD (&conn->ksnc_tx_pending);
701 #endif
702         conn->ksnc_tx_ready = 0;
703         conn->ksnc_tx_scheduled = 0;
704         atomic_set (&conn->ksnc_tx_nob, 0);
705
706         ksocknal_get_peer_addr (conn);
707
708         irq = ksocknal_conn_irq (conn);
709
710         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
711
712         if (route != NULL) {
713                 /* Autoconnected! */
714                 LASSERT (route->ksnr_conn == NULL && route->ksnr_connecting);
715
716                 if (route->ksnr_deleted) {
717                         /* This conn was autoconnected, but the autoconnect
718                          * route got deleted while it was being
719                          * established! */
720                         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
721                                                  flags);
722                         PORTAL_FREE (conn, sizeof (*conn));
723                         return (-ESTALE);
724                 }
725
726
727                 /* associate conn/route for auto-reconnect */
728                 route->ksnr_conn = conn;
729                 atomic_inc (&conn->ksnc_refcount);
730                 conn->ksnc_route = route;
731                 atomic_inc (&route->ksnr_refcount);
732                 route->ksnr_connecting = 0;
733
734                 route->ksnr_generation++;
735                 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
736
737                 peer = route->ksnr_peer;
738         } else {
739                 /* Not an autoconnected connection; see if there is an
740                  * existing peer for this NID */
741                 peer2 = ksocknal_find_peer_locked (nid);
742                 if (peer2 != NULL) {
743                         ksocknal_put_peer (peer);
744                         peer = peer2;
745                 } else {
746                         list_add (&peer->ksnp_list,
747                                   ksocknal_nid2peerlist (nid));
748                         /* peer list takes over existing ref */
749                 }
750         }
751
752         LASSERT (!peer->ksnp_closing);
753
754         conn->ksnc_peer = peer;
755         atomic_inc (&peer->ksnp_refcount);
756
757         list_add (&conn->ksnc_list, &peer->ksnp_conns);
758         atomic_inc (&conn->ksnc_refcount);
759
760         sched = ksocknal_choose_scheduler_locked (irq);
761         sched->kss_nconns++;
762         conn->ksnc_scheduler = sched;
763
764         /* NB my callbacks block while I hold ksnd_global_lock */
765         sock->sk->sk_user_data = conn;
766         sock->sk->sk_data_ready = ksocknal_data_ready;
767         sock->sk->sk_write_space = ksocknal_write_space;
768
769         /* Take all the packets blocking for a connection.
770          * NB, it might be nicer to share these blocked packets among any
771          * other connections that are becoming established, however that
772          * confuses the normal packet launching operation, which selects a
773          * connection and queues the packet on it without needing an
774          * exclusive lock on ksnd_global_lock. */
775         while (!list_empty (&peer->ksnp_tx_queue)) {
776                 tx = list_entry (peer->ksnp_tx_queue.next,
777                                  ksock_tx_t, tx_list);
778
779                 list_del (&tx->tx_list);
780                 ksocknal_queue_tx_locked (tx, conn);
781         }
782
783         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
784
785         if (bind_irq)                           /* irq binding required */
786                 ksocknal_bind_irq (irq);
787
788         /* Call the callbacks right now to get things going. */
789         ksocknal_data_ready (sock->sk, 0);
790         ksocknal_write_space (sock->sk);
791
792         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
793                conn, conn->ksnc_peer->ksnp_nid);
794
795         ksocknal_put_conn (conn);
796         return (0);
797 }
798
799 void
800 ksocknal_close_conn_locked (ksock_conn_t *conn)
801 {
802         /* This just does the immmediate housekeeping, and queues the
803          * connection for the reaper to terminate. 
804          * Caller holds ksnd_global_lock exclusively in irq context */
805         ksock_peer_t   *peer = conn->ksnc_peer;
806         ksock_route_t  *route;
807
808         LASSERT (!conn->ksnc_closing);
809         conn->ksnc_closing = 1;
810         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
811         
812         route = conn->ksnc_route;
813         if (route != NULL) {
814                 /* dissociate conn from route... */
815                 LASSERT (!route->ksnr_connecting &&
816                          !route->ksnr_deleted);
817
818                 route->ksnr_conn = NULL;
819                 conn->ksnc_route = NULL;
820
821                 ksocknal_put_route (route);     /* drop conn's ref on route */
822                 ksocknal_put_conn (conn);       /* drop route's ref on conn */
823         }
824
825         /* ksnd_deathrow_conns takes over peer's ref */
826         list_del (&conn->ksnc_list);
827
828         if (list_empty (&peer->ksnp_conns) &&
829             list_empty (&peer->ksnp_routes)) {
830                 /* I've just closed last conn belonging to a
831                  * non-autoconnecting peer */
832                 ksocknal_unlink_peer_locked (peer);
833         }
834
835         spin_lock (&ksocknal_data.ksnd_reaper_lock);
836
837         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
838         if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
839                 wake_up (&ksocknal_data.ksnd_reaper_waitq);
840                 
841         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
842 }
843
844 int
845 ksocknal_close_conn_unlocked (ksock_conn_t *conn) 
846 {
847         unsigned long flags;
848         int           did_it = 0;
849         
850         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
851                 
852         if (!conn->ksnc_closing) {
853                 did_it = 1;
854                 ksocknal_close_conn_locked (conn);
855         }
856         
857         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
858
859         return (did_it);
860 }
861
862 void
863 ksocknal_terminate_conn (ksock_conn_t *conn)
864 {
865         /* This gets called by the reaper (guaranteed thread context) to
866          * disengage the socket from its callbacks and close it.
867          * ksnc_refcount will eventually hit zero, and then the reaper will
868          * destroy it. */
869         unsigned long   flags;
870
871         /* serialise with callbacks */
872         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
873
874         LASSERT (conn->ksnc_closing);
875         
876         /* Remove conn's network callbacks.
877          * NB I _have_ to restore the callback, rather than storing a noop,
878          * since the socket could survive past this module being unloaded!! */
879         conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
880         conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
881
882         /* A callback could be in progress already; they hold a read lock
883          * on ksnd_global_lock (to serialise with me) and NOOP if
884          * sk_user_data is NULL. */
885         conn->ksnc_sock->sk->sk_user_data = NULL;
886
887         conn->ksnc_scheduler->kss_nconns--;
888
889         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
890
891         /* The socket is closed on the final put; either here, or in
892          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
893          * when the connection was established, this will close the socket
894          * immediately, aborting anything buffered in it. Any hung
895          * zero-copy transmits will therefore complete in finite time. */
896         ksocknal_putconnsock (conn);
897 }
898
899 void
900 ksocknal_destroy_conn (ksock_conn_t *conn)
901 {
902         /* Final coup-de-grace of the reaper */
903         CDEBUG (D_NET, "connection %p\n", conn);
904
905         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
906         LASSERT (conn->ksnc_route == NULL);
907         LASSERT (!conn->ksnc_tx_scheduled);
908         LASSERT (!conn->ksnc_rx_scheduled);
909 #if SOCKNAL_ZC
910         LASSERT (list_empty (&conn->ksnc_tx_pending));
911 #endif
912         /* complete queued packets */
913         while (!list_empty (&conn->ksnc_tx_queue)) {
914                 ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
915                                              ksock_tx_t, tx_list);
916
917                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
918                         NTOH__u32 (tx->tx_hdr->type),
919                         NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
920                         NTOH__u64 (tx->tx_hdr->src_nid),
921                         NTOH__u64 (tx->tx_hdr->dest_nid));
922
923                 list_del (&tx->tx_list);
924                 ksocknal_tx_done (tx, 0);
925         }
926
927         /* complete current receive if any */
928         switch (conn->ksnc_rx_state) {
929         case SOCKNAL_RX_BODY:
930                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
931                 break;
932         case SOCKNAL_RX_BODY_FWD:
933                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
934                 break;
935         case SOCKNAL_RX_HEADER:
936         case SOCKNAL_RX_SLOP:
937                 break;
938         default:
939                 LBUG ();
940                 break;
941         }
942
943         ksocknal_put_peer (conn->ksnc_peer);
944
945         PORTAL_FREE (conn, sizeof (*conn));
946         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
947 }
948
949 void
950 ksocknal_put_conn (ksock_conn_t *conn)
951 {
952         unsigned long flags;
953
954         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
955                 conn, conn->ksnc_peer->ksnp_nid,
956                 atomic_read (&conn->ksnc_refcount));
957
958         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
959         if (!atomic_dec_and_test (&conn->ksnc_refcount))
960                 return;
961
962         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
963
964         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
965         if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
966                 wake_up (&ksocknal_data.ksnd_reaper_waitq);
967
968         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
969 }
970
971 int
972 ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
973 {
974         unsigned long       flags;
975         ksock_conn_t       *conn;
976         struct list_head   *ctmp;
977         struct list_head   *cnxt;
978         ksock_peer_t       *peer;
979         struct list_head   *ptmp;
980         struct list_head   *pnxt;
981         int                 lo;
982         int                 hi;
983         int                 i;
984         int                 rc = -ENOENT;
985
986         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
987
988         if (nid != PTL_NID_ANY)
989                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
990         else {
991                 lo = 0;
992                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
993         }
994
995         for (i = lo; i <= hi; i++) {
996                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
997
998                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
999
1000                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1001                                 continue;
1002
1003                         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1004
1005                                 conn = list_entry (ctmp, ksock_conn_t,
1006                                                    ksnc_list);
1007
1008                                 if (!(ipaddr == 0 ||
1009                                       conn->ksnc_ipaddr == ipaddr))
1010                                         continue;
1011
1012                                 rc = 0;
1013                                 ksocknal_close_conn_locked (conn);
1014                         }
1015                 }
1016         }
1017
1018         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1019
1020         return (rc);
1021 }
1022
1023 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1024 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1025 {
1026         return &(sk->tp_pinfo.af_tcp);
1027 }
1028 #else
1029 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1030 {
1031         struct tcp_sock *s = (struct tcp_sock *)sk;
1032         return &s->tcp;
1033 }
1034 #endif
1035
1036 void
1037 ksocknal_push_conn (ksock_conn_t *conn)
1038 {
1039         struct sock    *sk;
1040         struct tcp_opt *tp;
1041         int             nonagle;
1042         int             val = 1;
1043         int             rc;
1044         mm_segment_t    oldmm;
1045
1046         rc = ksocknal_getconnsock (conn);
1047         if (rc != 0)                            /* being shut down */
1048                 return;
1049         
1050         sk = conn->ksnc_sock->sk;
1051         tp = sock2tcp_opt(sk);
1052         
1053         lock_sock (sk);
1054         nonagle = tp->nonagle;
1055         tp->nonagle = 1;
1056         release_sock (sk);
1057
1058         oldmm = get_fs ();
1059         set_fs (KERNEL_DS);
1060
1061         rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1062                                       (char *)&val, sizeof (val));
1063         LASSERT (rc == 0);
1064
1065         set_fs (oldmm);
1066
1067         lock_sock (sk);
1068         tp->nonagle = nonagle;
1069         release_sock (sk);
1070
1071         ksocknal_putconnsock (conn);
1072 }
1073
1074 void
1075 ksocknal_push_peer (ksock_peer_t *peer)
1076 {
1077         int               index;
1078         int               i;
1079         struct list_head *tmp;
1080         ksock_conn_t     *conn;
1081
1082         for (index = 0; ; index++) {
1083                 read_lock (&ksocknal_data.ksnd_global_lock);
1084
1085                 i = 0;
1086                 conn = NULL;
1087
1088                 list_for_each (tmp, &peer->ksnp_conns) {
1089                         if (i++ == index) {
1090                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1091                                 atomic_inc (&conn->ksnc_refcount);
1092                                 break;
1093                         }
1094                 }
1095
1096                 read_unlock (&ksocknal_data.ksnd_global_lock);
1097
1098                 if (conn == NULL)
1099                         break;
1100
1101                 ksocknal_push_conn (conn);
1102                 ksocknal_put_conn (conn);
1103         }
1104 }
1105
1106 int
1107 ksocknal_push (ptl_nid_t nid)
1108 {
1109         ksock_peer_t      *peer;
1110         struct list_head  *tmp;
1111         int                index;
1112         int                i;
1113         int                j;
1114         int                rc = -ENOENT;
1115
1116         if (nid != PTL_NID_ANY) {
1117                 peer = ksocknal_get_peer (nid);
1118
1119                 if (peer != NULL) {
1120                         rc = 0;
1121                         ksocknal_push_peer (peer);
1122                         ksocknal_put_peer (peer);
1123                 }
1124                 return (rc);
1125         }
1126
1127         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1128                 for (j = 0; ; j++) {
1129                         read_lock (&ksocknal_data.ksnd_global_lock);
1130
1131                         index = 0;
1132                         peer = NULL;
1133
1134                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1135                                 if (index++ == j) {
1136                                         peer = list_entry(tmp, ksock_peer_t,
1137                                                           ksnp_list);
1138                                         atomic_inc (&peer->ksnp_refcount);
1139                                         break;
1140                                 }
1141                         }
1142
1143                         read_unlock (&ksocknal_data.ksnd_global_lock);
1144
1145                         if (peer != NULL) {
1146                                 rc = 0;
1147                                 ksocknal_push_peer (peer);
1148                                 ksocknal_put_peer (peer);
1149                         }
1150                 }
1151
1152         }
1153
1154         return (rc);
1155 }
1156
1157 int
1158 ksocknal_cmd(struct portal_ioctl_data * data, void * private)
1159 {
1160         int rc = -EINVAL;
1161
1162         LASSERT (data != NULL);
1163
1164         switch(data->ioc_nal_cmd) {
1165         case NAL_CMD_GET_AUTOCONN: {
1166                 ksock_route_t *route = ksocknal_get_route_by_idx (data->ioc_count);
1167
1168                 if (route == NULL)
1169                         rc = -ENOENT;
1170                 else {
1171                         rc = 0;
1172                         data->ioc_nid   = route->ksnr_peer->ksnp_nid;
1173                         data->ioc_id    = route->ksnr_ipaddr;
1174                         data->ioc_misc  = route->ksnr_port;
1175                         data->ioc_count = route->ksnr_generation;
1176                         data->ioc_size  = route->ksnr_buffer_size;
1177                         data->ioc_wait  = route->ksnr_sharecount;
1178                         data->ioc_flags = (route->ksnr_nonagel      ? 1 : 0) |
1179                                           (route->ksnr_xchange_nids ? 2 : 0) |
1180                                           (route->ksnr_irq_affinity ? 4 : 0);
1181                         ksocknal_put_route (route);
1182                 }
1183                 break;
1184         }
1185         case NAL_CMD_ADD_AUTOCONN: {
1186                 rc = ksocknal_add_route (data->ioc_nid, data->ioc_id,
1187                                          data->ioc_misc, data->ioc_size,
1188                                          (data->ioc_flags & 1) != 0,
1189                                          (data->ioc_flags & 2) != 0,
1190                                          (data->ioc_flags & 4) != 0,
1191                                          (data->ioc_flags & 8) != 0);
1192                 break;
1193         }
1194         case NAL_CMD_DEL_AUTOCONN: {
1195                 rc = ksocknal_del_route (data->ioc_nid, data->ioc_id, 
1196                                          (data->ioc_flags & 1) != 0,
1197                                          (data->ioc_flags & 2) != 0);
1198                 break;
1199         }
1200         case NAL_CMD_GET_CONN: {
1201                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (data->ioc_count);
1202
1203                 if (conn == NULL)
1204                         rc = -ENOENT;
1205                 else {
1206                         rc = 0;
1207                         data->ioc_nid  = conn->ksnc_peer->ksnp_nid;
1208                         data->ioc_id   = conn->ksnc_ipaddr;
1209                         data->ioc_misc = conn->ksnc_port;
1210                         ksocknal_put_conn (conn);
1211                 }
1212                 break;
1213         }
1214         case NAL_CMD_REGISTER_PEER_FD: {
1215                 struct socket *sock = sockfd_lookup (data->ioc_fd, &rc);
1216
1217                 if (sock != NULL) {
1218                         rc = ksocknal_create_conn (data->ioc_nid, NULL,
1219                                                    sock, data->ioc_flags);
1220                         if (rc != 0)
1221                                 fput (sock->file);
1222                 }
1223                 break;
1224         }
1225         case NAL_CMD_CLOSE_CONNECTION: {
1226                 rc = ksocknal_close_conn (data->ioc_nid, data->ioc_id);
1227                 break;
1228         }
1229         case NAL_CMD_REGISTER_MYNID: {
1230                 rc = ksocknal_set_mynid (data->ioc_nid);
1231                 break;
1232         }
1233         case NAL_CMD_PUSH_CONNECTION: {
1234                 rc = ksocknal_push (data->ioc_nid);
1235                 break;
1236         }
1237         }
1238
1239         return rc;
1240 }
1241
1242 void
1243 ksocknal_free_buffers (void)
1244 {
1245         if (ksocknal_data.ksnd_fmbs != NULL) {
1246                 ksock_fmb_t *fmb = (ksock_fmb_t *)ksocknal_data.ksnd_fmbs;
1247                 int          i;
1248                 int          j;
1249
1250                 for (i = 0;
1251                      i < (SOCKNAL_SMALL_FWD_NMSGS + SOCKNAL_LARGE_FWD_NMSGS);
1252                      i++, fmb++)
1253                         for (j = 0; j < fmb->fmb_npages; j++)
1254                                 if (fmb->fmb_pages[j] != NULL)
1255                                         __free_page (fmb->fmb_pages[j]);
1256
1257                 PORTAL_FREE (ksocknal_data.ksnd_fmbs,
1258                              sizeof (ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1259                                                      SOCKNAL_LARGE_FWD_NMSGS));
1260         }
1261
1262         LASSERT (ksocknal_data.ksnd_active_ltxs == 0);
1263         if (ksocknal_data.ksnd_ltxs != NULL)
1264                 PORTAL_FREE (ksocknal_data.ksnd_ltxs,
1265                              sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS +
1266                                                      SOCKNAL_NNBLK_LTXS));
1267
1268         if (ksocknal_data.ksnd_schedulers != NULL)
1269                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1270                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1271
1272         PORTAL_FREE (ksocknal_data.ksnd_peers,
1273                      sizeof (struct list_head) * 
1274                      ksocknal_data.ksnd_peer_hash_size);
1275 }
1276
1277 void /*__exit*/
1278 ksocknal_module_fini (void)
1279 {
1280         int   i;
1281
1282         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1283                atomic_read (&portal_kmemory));
1284
1285         switch (ksocknal_data.ksnd_init) {
1286         default:
1287                 LASSERT (0);
1288
1289         case SOCKNAL_INIT_ALL:
1290                 kportal_nal_unregister(SOCKNAL);
1291                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1292                 /* fall through */
1293
1294         case SOCKNAL_INIT_PTL:
1295                 PtlNIFini(ksocknal_ni);
1296                 lib_fini(&ksocknal_lib);
1297                 /* fall through */
1298
1299         case SOCKNAL_INIT_DATA:
1300                 /* Module refcount only gets to zero when all peers
1301                  * have been closed so all lists must be empty */
1302                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1303                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1304                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1305                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1306                 }
1307                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1308                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1309                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1310                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1311
1312                 if (ksocknal_data.ksnd_schedulers != NULL)
1313                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1314                                 ksock_sched_t *kss =
1315                                         &ksocknal_data.ksnd_schedulers[i];
1316
1317                                 LASSERT (list_empty (&kss->kss_tx_conns));
1318                                 LASSERT (list_empty (&kss->kss_rx_conns));
1319                                 LASSERT (kss->kss_nconns == 0);
1320                         }
1321
1322                 /* stop router calling me */
1323                 kpr_shutdown (&ksocknal_data.ksnd_router);
1324
1325                 /* flag threads to terminate; wake and wait for them to die */
1326                 ksocknal_data.ksnd_shuttingdown = 1;
1327                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1328                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1329
1330                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1331                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1332
1333                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1334                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1335                                 atomic_read (&ksocknal_data.ksnd_nthreads));
1336                         set_current_state (TASK_UNINTERRUPTIBLE);
1337                         schedule_timeout (HZ);
1338                 }
1339
1340                 kpr_deregister (&ksocknal_data.ksnd_router);
1341
1342                 ksocknal_free_buffers();
1343                 /* fall through */
1344
1345         case SOCKNAL_INIT_NOTHING:
1346                 break;
1347         }
1348
1349         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1350                atomic_read (&portal_kmemory));
1351
1352         printk(KERN_INFO "Routing socket NAL unloaded (final mem %d)\n",
1353                atomic_read(&portal_kmemory));
1354 }
1355
1356
1357 int __init
1358 ksocknal_module_init (void)
1359 {
1360         int   pkmem = atomic_read(&portal_kmemory);
1361         int   rc;
1362         int   i;
1363         int   j;
1364
1365         /* packet descriptor must fit in a router descriptor's scratchpad */
1366         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1367
1368         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1369
1370         ksocknal_api.forward  = ksocknal_api_forward;
1371         ksocknal_api.shutdown = ksocknal_api_shutdown;
1372         ksocknal_api.yield    = ksocknal_api_yield;
1373         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
1374         ksocknal_api.lock     = ksocknal_api_lock;
1375         ksocknal_api.unlock   = ksocknal_api_unlock;
1376         ksocknal_api.nal_data = &ksocknal_data;
1377
1378         ksocknal_lib.nal_data = &ksocknal_data;
1379
1380         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1381
1382         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1383         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1384                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1385         if (ksocknal_data.ksnd_peers == NULL)
1386                 RETURN (-ENOMEM);
1387
1388         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1389                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1390
1391         rwlock_init(&ksocknal_data.ksnd_global_lock);
1392
1393         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1394         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1395
1396         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1397         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1398         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1399
1400         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1401         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1402         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1403
1404         spin_lock_init(&ksocknal_data.ksnd_idle_ltx_lock);
1405         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_nblk_ltx_list);
1406         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_ltx_list);
1407         init_waitqueue_head(&ksocknal_data.ksnd_idle_ltx_waitq);
1408
1409         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1410         INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1411         INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1412         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1413
1414         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1415         INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1416         init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1417
1418         /* NB memset above zeros whole of ksocknal_data, including
1419          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1420
1421         /* flag lists/ptrs/locks initialised */
1422         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1423
1424         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1425                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1426         if (ksocknal_data.ksnd_schedulers == NULL) {
1427                 ksocknal_module_fini ();
1428                 RETURN(-ENOMEM);
1429         }
1430
1431         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1432                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1433
1434                 spin_lock_init (&kss->kss_lock);
1435                 INIT_LIST_HEAD (&kss->kss_rx_conns);
1436                 INIT_LIST_HEAD (&kss->kss_tx_conns);
1437 #if SOCKNAL_ZC
1438                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1439 #endif
1440                 init_waitqueue_head (&kss->kss_waitq);
1441         }
1442
1443         CDEBUG (D_MALLOC, "ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
1444                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
1445
1446         PORTAL_ALLOC(ksocknal_data.ksnd_ltxs,
1447                      sizeof(ksock_ltx_t) * (SOCKNAL_NLTXS +SOCKNAL_NNBLK_LTXS));
1448         if (ksocknal_data.ksnd_ltxs == NULL) {
1449                 ksocknal_module_fini ();
1450                 return (-ENOMEM);
1451         }
1452
1453         /* Deterministic bugs please */
1454         memset (ksocknal_data.ksnd_ltxs, 0xeb,
1455                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
1456
1457         for (i = 0; i < SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS; i++) {
1458                 ksock_ltx_t *ltx = &((ksock_ltx_t *)ksocknal_data.ksnd_ltxs)[i];
1459
1460                 ltx->ltx_tx.tx_hdr = &ltx->ltx_hdr;
1461                 ltx->ltx_idle = i < SOCKNAL_NLTXS ?
1462                                 &ksocknal_data.ksnd_idle_ltx_list :
1463                                 &ksocknal_data.ksnd_idle_nblk_ltx_list;
1464                 list_add (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
1465         }
1466
1467         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1468         if (rc != 0) {
1469                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1470                 ksocknal_module_fini ();
1471                 RETURN (rc);
1472         }
1473         PtlNIDebug(ksocknal_ni, ~0);
1474
1475         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1476
1477         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1478                 rc = ksocknal_thread_start (ksocknal_scheduler,
1479                                             &ksocknal_data.ksnd_schedulers[i]);
1480                 if (rc != 0) {
1481                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1482                                i, rc);
1483                         ksocknal_module_fini ();
1484                         RETURN (rc);
1485                 }
1486         }
1487
1488         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1489                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1490                 if (rc != 0) {
1491                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1492                         ksocknal_module_fini ();
1493                         RETURN (rc);
1494                 }
1495         }
1496
1497         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1498         if (rc != 0) {
1499                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1500                 ksocknal_module_fini ();
1501                 RETURN (rc);
1502         }
1503
1504         rc = kpr_register(&ksocknal_data.ksnd_router,
1505                           &ksocknal_router_interface);
1506         if (rc != 0) {
1507                 CDEBUG(D_NET, "Can't initialise routing interface "
1508                        "(rc = %d): not routing\n", rc);
1509         } else {
1510                 /* Only allocate forwarding buffers if I'm on a gateway */
1511
1512                 PORTAL_ALLOC(ksocknal_data.ksnd_fmbs,
1513                              sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1514                                                     SOCKNAL_LARGE_FWD_NMSGS));
1515                 if (ksocknal_data.ksnd_fmbs == NULL) {
1516                         ksocknal_module_fini ();
1517                         RETURN(-ENOMEM);
1518                 }
1519
1520                 /* NULL out buffer pointers etc */
1521                 memset(ksocknal_data.ksnd_fmbs, 0,
1522                        sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
1523                                               SOCKNAL_LARGE_FWD_NMSGS));
1524
1525                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1526                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
1527                         ksock_fmb_t *fmb =
1528                                 &((ksock_fmb_t *)ksocknal_data.ksnd_fmbs)[i];
1529
1530                         if (i < SOCKNAL_SMALL_FWD_NMSGS) {
1531                                 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
1532                                 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
1533                         } else {
1534                                 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
1535                                 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
1536                         }
1537
1538                         LASSERT (fmb->fmb_npages > 0);
1539                         for (j = 0; j < fmb->fmb_npages; j++) {
1540                                 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
1541
1542                                 if (fmb->fmb_pages[j] == NULL) {
1543                                         ksocknal_module_fini ();
1544                                         return (-ENOMEM);
1545                                 }
1546
1547                                 LASSERT(page_address (fmb->fmb_pages[j]) !=
1548                                         NULL);
1549                         }
1550
1551                         list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
1552                 }
1553         }
1554
1555         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1556         if (rc != 0) {
1557                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1558                 ksocknal_module_fini ();
1559                 return (rc);
1560         }
1561
1562         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1563
1564         /* flag everything initialised */
1565         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1566
1567         printk(KERN_INFO "Routing socket NAL loaded (Routing %s, initial "
1568                "mem %d)\n",
1569                kpr_routing (&ksocknal_data.ksnd_router) ?
1570                "enabled" : "disabled", pkmem);
1571
1572         return (0);
1573 }
1574
1575 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1576 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1577 MODULE_LICENSE("GPL");
1578
1579 module_init(ksocknal_module_init);
1580 module_exit(ksocknal_module_fini);
1581
1582 EXPORT_SYMBOL (ksocknal_ni);