Whamcloud - gitweb
91d971ce7f7494cacb81971bf9c145973051b61e
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 ptl_handle_ni_t         ksocknal_ni;
29 static nal_t            ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
32 #else
33 static ksock_nal_data_t ksocknal_data;
34 #endif
35
36 kpr_nal_interface_t ksocknal_router_interface = {
37         kprni_nalid:      SOCKNAL,
38         kprni_arg:        &ksocknal_data,
39         kprni_fwd:        ksocknal_fwd_packet,
40 };
41
42
43 int
44 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
45                        void *ret, size_t ret_len)
46 {
47         ksock_nal_data_t *k;
48         nal_cb_t *nal_cb;
49
50         k = nal->nal_data;
51         nal_cb = k->ksnd_nal_cb;
52
53         lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
54         return PTL_OK;
55 }
56
57 int
58 ksocknal_api_shutdown(nal_t *nal, int ni)
59 {
60         CDEBUG (D_NET, "closing all connections\n");
61
62         return ksocknal_close_sock(0);          /* close all sockets */
63 }
64
65 void
66 ksocknal_api_yield(nal_t *nal)
67 {
68         our_cond_resched();
69         return;
70 }
71
72 void
73 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
74 {
75         ksock_nal_data_t *k;
76         nal_cb_t *nal_cb;
77
78         k = nal->nal_data;
79         nal_cb = k->ksnd_nal_cb;
80         nal_cb->cb_cli(nal_cb,flags);
81 }
82
83 void
84 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
85 {
86         ksock_nal_data_t *k;
87         nal_cb_t *nal_cb;
88
89         k = nal->nal_data;
90         nal_cb = k->ksnd_nal_cb;
91         nal_cb->cb_sti(nal_cb,flags);
92 }
93
94 nal_t *
95 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
96               ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
97 {
98         CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
99         lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
100         return (&ksocknal_api);
101 }
102
103 /*
104  *  EXTRA functions follow
105  */
106
107 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
108 #define SOCKET_I(inode) (&(inode)->u.socket_i)
109 #endif
110 static __inline__ struct socket *
111 socki_lookup(struct inode *inode)
112 {
113         return SOCKET_I(inode);
114 }
115
116 int
117 ksocknal_set_mynid(ptl_nid_t nid)
118 {
119         lib_ni_t *ni = &ksocknal_lib.ni;
120
121         /* FIXME: we have to do this because we call lib_init() at module
122          * insertion time, which is before we have 'mynid' available.  lib_init
123          * sets the NAL's nid, which it uses to tell other nodes where packets
124          * are coming from.  This is not a very graceful solution to this
125          * problem. */
126
127         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
128                nid, ni->nid);
129
130         ni->nid = nid;
131         return (0);
132 }
133
134 void
135 ksocknal_bind_irq (unsigned int irq, int cpu)
136 {
137 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
138         char  cmdline[64];
139         char *argv[] = {"/bin/sh",
140                         "-c",
141                         cmdline,
142                         NULL};
143         char *envp[] = {"HOME=/",
144                         "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
145                         NULL};
146
147         snprintf (cmdline, sizeof (cmdline),
148                   "echo %d > /proc/irq/%u/smp_affinity", 1 << cpu, irq);
149
150         printk (KERN_INFO "Binding irq %u to CPU %d with cmd: %s\n",
151                 irq, cpu, cmdline);
152
153         /* FIXME: Find a better method of setting IRQ affinity...
154          */
155
156         call_usermodehelper (argv[0], argv, envp);
157 #endif
158 }
159
160 int
161 ksocknal_add_sock (ptl_nid_t nid, int fd, int bind_irq)
162 {
163         unsigned long      flags;
164         ksock_conn_t      *conn;
165         struct file       *file = NULL;
166         struct socket     *sock = NULL;
167         ksock_sched_t     *sched = NULL;
168         unsigned int       irq = 0;
169         struct net_device *dev = NULL;
170         int                ret;
171         int                idx;
172         ENTRY;
173
174         LASSERT (!in_interrupt());
175
176         file = fget(fd);
177         if (file == NULL)
178                 RETURN(-EINVAL);
179
180         ret = -EINVAL;
181         sock = socki_lookup(file->f_dentry->d_inode);
182         if (sock == NULL)
183                 GOTO(error, ret);
184
185         ret = -ENOMEM;
186         PORTAL_ALLOC(conn, sizeof(*conn));
187         if (!conn)
188                 GOTO(error, ret);
189
190         sock->sk->allocation = GFP_NOFS;    /* don't call info fs for alloc */
191
192         conn->ksnc_file = file;
193         conn->ksnc_sock = sock;
194         conn->ksnc_saved_data_ready = sock->sk->data_ready;
195         conn->ksnc_saved_write_space = sock->sk->write_space;
196         conn->ksnc_peernid = nid;
197         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for socklist */
198
199         conn->ksnc_rx_ready = 0;
200         conn->ksnc_rx_scheduled = 0;
201         ksocknal_new_packet (conn, 0);
202
203         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
204         conn->ksnc_tx_ready = 0;
205         conn->ksnc_tx_scheduled = 0;
206
207 #warning check it is OK to derefence sk->dst_cache->dev like this...
208         lock_sock (conn->ksnc_sock->sk);
209
210         if (conn->ksnc_sock->sk->dst_cache != NULL) {
211                 dev = conn->ksnc_sock->sk->dst_cache->dev;
212                 if (dev != NULL) {
213                         irq = dev->irq;
214                         if (irq >= NR_IRQS) {
215                                 CERROR ("Unexpected IRQ %x\n", irq);
216                                 irq = 0;
217                         }
218                 }
219         }
220
221         release_sock (conn->ksnc_sock->sk);
222
223         write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
224
225         if (irq == 0 ||
226             ksocknal_data.ksnd_irq_info[irq] == SOCKNAL_IRQ_UNASSIGNED) {
227                 /* This is a software NIC, or we haven't associated it with
228                  * a CPU yet */
229
230                 /* Choose the CPU with the fewest connections */
231                 sched = ksocknal_data.ksnd_schedulers;
232                 for (idx = 1; idx < SOCKNAL_N_SCHED; idx++)
233                         if (sched->kss_nconns >
234                             ksocknal_data.ksnd_schedulers[idx].kss_nconns)
235                                 sched = &ksocknal_data.ksnd_schedulers[idx];
236
237                 if (irq != 0) {                 /* Hardware NIC */
238                         /* Remember which scheduler we chose */
239                         idx = sched - ksocknal_data.ksnd_schedulers;
240
241                         LASSERT (idx < SOCKNAL_IRQ_SCHED_MASK);
242
243                         if (bind_irq)       /* remember if we will bind below */
244                                 idx |= SOCKNAL_IRQ_BOUND;
245
246                         ksocknal_data.ksnd_irq_info[irq] = idx;
247                 }
248         } else { 
249                 /* This is a hardware NIC, associated with a CPU */
250                 idx = ksocknal_data.ksnd_irq_info[irq];
251
252                 /* Don't bind again if we've bound already */
253                 if ((idx & SOCKNAL_IRQ_BOUND) != 0)
254                         bind_irq = 0;
255                 
256                 sched = &ksocknal_data.ksnd_schedulers[idx & SOCKNAL_IRQ_SCHED_MASK];
257         }
258
259         sched->kss_nconns++;
260         conn->ksnc_scheduler = sched;
261
262         list_add(&conn->ksnc_list, &ksocknal_data.ksnd_socklist);
263
264         write_unlock_irqrestore (&ksocknal_data.ksnd_socklist_lock, flags);
265
266         if (bind_irq &&                         /* irq binding required */
267             irq != 0)                           /* hardware NIC */
268                 ksocknal_bind_irq (irq, sched - ksocknal_data.ksnd_schedulers);
269
270         /* NOW it's safe to get called back when socket is ready... */
271         sock->sk->user_data = conn;
272         sock->sk->data_ready = ksocknal_data_ready;
273         sock->sk->write_space = ksocknal_write_space;
274
275         /* ...which I call right now to get things going */
276         ksocknal_data_ready (sock->sk, 0);
277         ksocknal_write_space (sock->sk);
278
279         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
280                conn, conn->ksnc_peernid);
281
282         /* Can't unload while connection active */
283         PORTAL_MODULE_USE;
284         RETURN(0);
285
286 error:
287         fput(file);
288         return (ret);
289 }
290
291 /* Passing in a zero nid will close all connections */
292 int
293 ksocknal_close_sock(ptl_nid_t nid)
294 {
295         long               flags;
296         ksock_conn_t      *conn;
297         LIST_HEAD         (death_row);
298         struct list_head  *tmp;
299
300         LASSERT (!in_interrupt());
301         write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
302
303         if (nid == 0) {                         /* close ALL connections */
304                 /* insert 'death row' into the socket list... */
305                 list_add (&death_row, &ksocknal_data.ksnd_socklist);
306                 /* ...extract and reinitialise the socket list itself... */
307                 list_del_init (&ksocknal_data.ksnd_socklist);
308                 /* ...and voila, death row is the proud owner of all conns */
309         } else list_for_each (tmp, &ksocknal_data.ksnd_socklist) {
310
311                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
312
313                 if (conn->ksnc_peernid == nid) {
314                         list_del (&conn->ksnc_list);
315                         list_add (&conn->ksnc_list, &death_row);
316                         break;
317                 }
318         }
319
320         write_unlock_irqrestore (&ksocknal_data.ksnd_socklist_lock, flags);
321
322         if (nid && list_empty (&death_row))
323                 return (-ENOENT);
324
325         while (!list_empty (&death_row)) {
326                 conn = list_entry (death_row.next, ksock_conn_t, ksnc_list);
327                 list_del (&conn->ksnc_list);
328
329                 /* NB I _have_ to restore the callback, rather than storing
330                  * a noop, since the socket could survive past this module
331                  * being unloaded!! */
332                 conn->ksnc_sock->sk->data_ready = conn->ksnc_saved_data_ready;
333                 conn->ksnc_sock->sk->write_space = conn->ksnc_saved_write_space;
334
335                 /* OK; no more callbacks, but they could be in progress now,
336                  * so wait for them to complete... */
337                 write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
338
339                 /* ...however if I get the lock before a callback gets it,
340                  * this will make them noop
341                  */
342                 conn->ksnc_sock->sk->user_data = NULL;
343
344                 /* And drop the scheduler's connection count while I've got
345                  * the exclusive lock */
346                 conn->ksnc_scheduler->kss_nconns--;
347
348                 write_unlock_irqrestore(&ksocknal_data.ksnd_socklist_lock,
349                                         flags);
350
351                 ksocknal_put_conn (conn);       /* drop ref for ksnd_socklist */
352         }
353
354         return (0);
355 }
356
357 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
358 struct tcp_opt *sock2tcp_opt(struct sock *sk)
359 {
360         return &(sk->tp_pinfo.af_tcp);
361 }
362 #else
363 struct tcp_opt *sock2tcp_opt(struct sock *sk)
364 {
365         struct tcp_sock *s = (struct tcp_sock *)sk;
366         return &s->tcp;
367 }
368 #endif
369
370 void
371 ksocknal_push_conn (ksock_conn_t *conn)
372 {
373         struct sock    *sk = conn->ksnc_sock->sk;
374         struct tcp_opt *tp = sock2tcp_opt(sk);
375         int             nonagle;
376         int             val = 1;
377         int             rc;
378         mm_segment_t    oldmm;
379
380         lock_sock (sk);
381         nonagle = tp->nonagle;
382         tp->nonagle = 1;
383         release_sock (sk);
384
385         oldmm = get_fs ();
386         set_fs (KERNEL_DS);
387
388         rc = sk->prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
389                                    (char *)&val, sizeof (val));
390         LASSERT (rc == 0);
391
392         set_fs (oldmm);
393
394         lock_sock (sk);
395         tp->nonagle = nonagle;
396         release_sock (sk);
397 }
398
399 /* Passing in a zero nid pushes all connections */
400 int
401 ksocknal_push_sock (ptl_nid_t nid)
402 {
403         ksock_conn_t      *conn;
404         struct list_head  *tmp;
405         int                index;
406         int                i;
407
408         if (nid != 0) {
409                 conn = ksocknal_get_conn (nid);
410
411                 if (conn == NULL)
412                         return (-ENOENT);
413
414                 ksocknal_push_conn (conn);
415                 ksocknal_put_conn (conn);
416
417                 return (0);
418         }
419
420         /* NB we can't remove connections from the socket list so we have to
421          * cope with them being removed from under us...
422          */
423         for (index = 0; ; index++) {
424                 read_lock (&ksocknal_data.ksnd_socklist_lock);
425
426                 i = 0;
427                 conn = NULL;
428
429                 list_for_each (tmp, &ksocknal_data.ksnd_socklist) {
430                         if (i++ == index) {
431                                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
432                                 atomic_inc (&conn->ksnc_refcount); // take a ref
433                                 break;
434                         }
435                 }
436
437                 read_unlock (&ksocknal_data.ksnd_socklist_lock);
438
439                 if (conn == NULL)
440                         break;
441
442                 ksocknal_push_conn (conn);
443                 ksocknal_put_conn (conn);
444         }
445
446         return (0);
447 }
448
449 ksock_conn_t *
450 ksocknal_get_conn (ptl_nid_t nid)
451 {
452         struct list_head *tmp;
453         ksock_conn_t     *conn;
454
455         PROF_START(conn_list_walk);
456
457         read_lock (&ksocknal_data.ksnd_socklist_lock);
458
459         list_for_each(tmp, &ksocknal_data.ksnd_socklist) {
460
461                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
462
463                 if (conn->ksnc_peernid == nid) {
464                         /* caller is referencing */
465                         atomic_inc (&conn->ksnc_refcount);
466
467                         read_unlock (&ksocknal_data.ksnd_socklist_lock);
468
469                         CDEBUG(D_NET, "got conn [%p] -> "LPX64" (%d)\n",
470                                conn, nid, atomic_read (&conn->ksnc_refcount));
471
472                         PROF_FINISH(conn_list_walk);
473                         return (conn);
474                 }
475         }
476
477         read_unlock (&ksocknal_data.ksnd_socklist_lock);
478
479         CDEBUG(D_NET, "No connection found when looking for nid "LPX64"\n",
480                nid);
481         PROF_FINISH(conn_list_walk);
482         return (NULL);
483 }
484
485 void
486 ksocknal_close_conn (ksock_conn_t *conn)
487 {
488         CDEBUG (D_NET, "connection [%p] closed \n", conn);
489
490         fput (conn->ksnc_file);
491         PORTAL_FREE (conn, sizeof (*conn));
492
493         /* One less connection keeping us hanging on */
494         PORTAL_MODULE_UNUSE;
495 }
496
497 void
498 _ksocknal_put_conn (ksock_conn_t *conn)
499 {
500         unsigned long flags;
501
502         CDEBUG (D_NET, "connection [%p] handed the black spot\n", conn);
503
504         /* "But what is the black spot, captain?" I asked.
505          * "That's a summons, mate..." */
506
507         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
508         LASSERT (conn->ksnc_sock->sk->data_ready != ksocknal_data_ready);
509         LASSERT (conn->ksnc_sock->sk->write_space != ksocknal_write_space);
510         LASSERT (conn->ksnc_sock->sk->user_data == NULL);
511         LASSERT (!conn->ksnc_rx_scheduled);
512
513         if (!in_interrupt()) {
514                 ksocknal_close_conn (conn);
515                 return;
516         }
517
518         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
519
520         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_reaper_list);
521         wake_up (&ksocknal_data.ksnd_reaper_waitq);
522
523         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
524 }
525
526 int
527 ksocknal_cmd(struct portal_ioctl_data * data, void * private)
528 {
529         int rc = -EINVAL;
530
531         LASSERT (data != NULL);
532
533         switch(data->ioc_nal_cmd) {
534         case NAL_CMD_REGISTER_PEER_FD: {
535                 rc = ksocknal_add_sock(data->ioc_nid, data->ioc_fd,
536                                        data->ioc_flags);
537                 break;
538         }
539         case NAL_CMD_CLOSE_CONNECTION: {
540                 rc = ksocknal_close_sock(data->ioc_nid);
541                 break;
542         }
543         case NAL_CMD_REGISTER_MYNID: {
544                 rc = ksocknal_set_mynid (data->ioc_nid);
545                 break;
546         }
547         case NAL_CMD_PUSH_CONNECTION: {
548                 rc = ksocknal_push_sock (data->ioc_nid);
549                 break;
550         }
551         }
552
553         return rc;
554 }
555
556 void
557 ksocknal_free_buffers (void)
558 {
559         if (ksocknal_data.ksnd_fmbs != NULL) {
560                 ksock_fmb_t *fmb = (ksock_fmb_t *)ksocknal_data.ksnd_fmbs;
561                 int          i;
562                 int          j;
563
564                 for (i = 0;
565                      i < (SOCKNAL_SMALL_FWD_NMSGS + SOCKNAL_LARGE_FWD_NMSGS);
566                      i++, fmb++)
567                         for (j = 0; j < fmb->fmb_npages; j++)
568                                 if (fmb->fmb_pages[j] != NULL)
569                                         __free_page (fmb->fmb_pages[j]);
570
571                 PORTAL_FREE (ksocknal_data.ksnd_fmbs,
572                              sizeof (ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
573                                                      SOCKNAL_LARGE_FWD_NMSGS));
574         }
575
576         if (ksocknal_data.ksnd_ltxs != NULL)
577                 PORTAL_FREE (ksocknal_data.ksnd_ltxs,
578                              sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS +
579                                                      SOCKNAL_NNBLK_LTXS));
580
581         if (ksocknal_data.ksnd_schedulers != NULL)
582                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
583                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
584 }
585
586 void __exit
587 ksocknal_module_fini (void)
588 {
589         int   i;
590
591         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
592                atomic_read (&portal_kmemory));
593
594         switch (ksocknal_data.ksnd_init) {
595         default:
596                 LASSERT (0);
597
598         case SOCKNAL_INIT_ALL:
599                 kportal_nal_unregister(SOCKNAL);
600                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
601                 /* fall through */
602
603         case SOCKNAL_INIT_PTL:
604                 PtlNIFini(ksocknal_ni);
605                 lib_fini(&ksocknal_lib);
606                 /* fall through */
607
608         case SOCKNAL_INIT_DATA:
609                 /* Module refcount only gets to zero when all connections
610                  * have been closed so all lists must be empty */
611                 LASSERT (list_empty (&ksocknal_data.ksnd_socklist));
612                 LASSERT (list_empty (&ksocknal_data.ksnd_reaper_list));
613                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
614                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
615
616                 if (ksocknal_data.ksnd_schedulers != NULL)
617                         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
618                                 ksock_sched_t *kss =
619                                         &ksocknal_data.ksnd_schedulers[i];
620
621                                 LASSERT (list_empty (&kss->kss_tx_conns));
622                                 LASSERT (list_empty (&kss->kss_rx_conns));
623                                 LASSERT (kss->kss_nconns == 0);
624                         }
625
626                 /* stop router calling me */
627                 kpr_shutdown (&ksocknal_data.ksnd_router);
628
629                 /* flag threads to terminate; wake and wait for them to die */
630                 ksocknal_data.ksnd_shuttingdown = 1;
631                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
632
633                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
634                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
635
636                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
637                         CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
638                                 atomic_read (&ksocknal_data.ksnd_nthreads));
639                         set_current_state (TASK_UNINTERRUPTIBLE);
640                         schedule_timeout (HZ);
641                 }
642
643                 kpr_deregister (&ksocknal_data.ksnd_router);
644
645                 ksocknal_free_buffers();
646                 /* fall through */
647
648         case SOCKNAL_INIT_NOTHING:
649                 break;
650         }
651
652         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
653                atomic_read (&portal_kmemory));
654
655         printk(KERN_INFO "Routing socket NAL unloaded (final mem %d)\n",
656                atomic_read(&portal_kmemory));
657 }
658
659
660 int __init
661 ksocknal_module_init (void)
662 {
663         int   pkmem = atomic_read(&portal_kmemory);
664         int   rc;
665         int   i;
666         int   j;
667
668         /* packet descriptor must fit in a router descriptor's scratchpad */
669         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
670
671         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
672
673         ksocknal_api.forward  = ksocknal_api_forward;
674         ksocknal_api.shutdown = ksocknal_api_shutdown;
675         ksocknal_api.yield    = ksocknal_api_yield;
676         ksocknal_api.validate = NULL;           /* our api validate is a NOOP */
677         ksocknal_api.lock     = ksocknal_api_lock;
678         ksocknal_api.unlock   = ksocknal_api_unlock;
679         ksocknal_api.nal_data = &ksocknal_data;
680
681         ksocknal_lib.nal_data = &ksocknal_data;
682
683         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
684
685         INIT_LIST_HEAD(&ksocknal_data.ksnd_socklist);
686         rwlock_init(&ksocknal_data.ksnd_socklist_lock);
687
688         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
689         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
690
691         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
692         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
693         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
694
695         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
696         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
697         INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
698
699         spin_lock_init(&ksocknal_data.ksnd_idle_ltx_lock);
700         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_nblk_ltx_list);
701         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_ltx_list);
702         init_waitqueue_head(&ksocknal_data.ksnd_idle_ltx_waitq);
703
704         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
705         INIT_LIST_HEAD (&ksocknal_data.ksnd_reaper_list);
706         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
707
708         memset (&ksocknal_data.ksnd_irq_info, SOCKNAL_IRQ_UNASSIGNED,
709                 sizeof (ksocknal_data.ksnd_irq_info));
710
711         /* flag lists/ptrs/locks initialised */
712         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
713
714         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
715                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
716         if (ksocknal_data.ksnd_schedulers == NULL)
717                 RETURN(-ENOMEM);
718
719         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
720                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
721
722                 spin_lock_init (&kss->kss_lock);
723                 INIT_LIST_HEAD (&kss->kss_rx_conns);
724                 INIT_LIST_HEAD (&kss->kss_tx_conns);
725 #if SOCKNAL_ZC
726                 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
727 #endif
728                 init_waitqueue_head (&kss->kss_waitq);
729         }
730
731         CERROR ("ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
732                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
733
734         PORTAL_ALLOC(ksocknal_data.ksnd_ltxs,
735                      sizeof(ksock_ltx_t) * (SOCKNAL_NLTXS +SOCKNAL_NNBLK_LTXS));
736         if (ksocknal_data.ksnd_ltxs == NULL) {
737                 ksocknal_module_fini ();
738                 return (-ENOMEM);
739         }
740
741         /* Deterministic bugs please */
742         memset (ksocknal_data.ksnd_ltxs, 0xeb,
743                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
744
745         for (i = 0; i < SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS; i++) {
746                 ksock_ltx_t *ltx = &((ksock_ltx_t *)ksocknal_data.ksnd_ltxs)[i];
747
748                 ltx->ltx_idle = i < SOCKNAL_NLTXS ?
749                                 &ksocknal_data.ksnd_idle_ltx_list :
750                                 &ksocknal_data.ksnd_idle_nblk_ltx_list;
751                 list_add (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
752         }
753
754         rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
755         if (rc != 0) {
756                 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
757                 ksocknal_module_fini ();
758                 RETURN (rc);
759         }
760         PtlNIDebug(ksocknal_ni, ~0);
761
762         ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
763
764         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
765                 rc = ksocknal_thread_start (ksocknal_scheduler,
766                                             &ksocknal_data.ksnd_schedulers[i]);
767                 if (rc != 0) {
768                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
769                                i, rc);
770                         ksocknal_module_fini ();
771                         RETURN (rc);
772                 }
773         }
774
775         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
776         if (rc != 0) {
777                 CERROR("Can't spawn socknal reaper: %d\n", rc);
778                 ksocknal_module_fini ();
779                 RETURN (rc);
780         }
781
782         rc = kpr_register(&ksocknal_data.ksnd_router,
783                           &ksocknal_router_interface);
784         if (rc != 0) {
785                 CDEBUG(D_NET, "Can't initialise routing interface "
786                        "(rc = %d): not routing\n", rc);
787         } else {
788                 /* Only allocate forwarding buffers if I'm on a gateway */
789
790                 PORTAL_ALLOC(ksocknal_data.ksnd_fmbs,
791                              sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
792                                                     SOCKNAL_LARGE_FWD_NMSGS));
793                 if (ksocknal_data.ksnd_fmbs == NULL) {
794                         ksocknal_module_fini ();
795                         RETURN(-ENOMEM);
796                 }
797
798                 /* NULL out buffer pointers etc */
799                 memset(ksocknal_data.ksnd_fmbs, 0,
800                        sizeof(ksock_fmb_t) * (SOCKNAL_SMALL_FWD_NMSGS +
801                                               SOCKNAL_LARGE_FWD_NMSGS));
802
803                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
804                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
805                         ksock_fmb_t *fmb =
806                                 &((ksock_fmb_t *)ksocknal_data.ksnd_fmbs)[i];
807
808                         if (i < SOCKNAL_SMALL_FWD_NMSGS) {
809                                 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
810                                 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
811                         } else {
812                                 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
813                                 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
814                         }
815
816                         LASSERT (fmb->fmb_npages > 0);
817                         for (j = 0; j < fmb->fmb_npages; j++) {
818                                 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
819
820                                 if (fmb->fmb_pages[j] == NULL) {
821                                         ksocknal_module_fini ();
822                                         return (-ENOMEM);
823                                 }
824
825                                 LASSERT(page_address (fmb->fmb_pages[j]) !=
826                                         NULL);
827                         }
828
829                         list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
830                 }
831         }
832
833         rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
834         if (rc != 0) {
835                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
836                 ksocknal_module_fini ();
837                 return (rc);
838         }
839
840         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
841
842         /* flag everything initialised */
843         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
844
845         printk(KERN_INFO "Routing socket NAL loaded (Routing %s, initial "
846                "mem %d)\n",
847                kpr_routing (&ksocknal_data.ksnd_router) ?
848                "enabled" : "disabled", pkmem);
849
850         return (0);
851 }
852
853 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
854 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
855 MODULE_LICENSE("GPL");
856
857 module_init(ksocknal_module_init);
858 module_exit(ksocknal_module_fini);
859
860 EXPORT_SYMBOL (ksocknal_ni);