Whamcloud - gitweb
b5e1e3986b30664e6a4a7eff09da997288e32f76
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 ptl_handle_ni_t         kqswnal_ni;
28 nal_t                   kqswnal_api;
29 kqswnal_data_t          kqswnal_data;
30
31 kpr_nal_interface_t kqswnal_router_interface = {
32         kprni_nalid:    QSWNAL,
33         kprni_arg:      NULL,
34         kprni_fwd:      kqswnal_fwd_packet,
35 };
36
37
38 static int
39 kqswnal_forward(nal_t   *nal,
40                 int     id,
41                 void    *args,  size_t args_len,
42                 void    *ret,   size_t ret_len)
43 {
44         kqswnal_data_t *k = nal->nal_data;
45         nal_cb_t       *nal_cb = k->kqn_cb;
46
47         LASSERT (nal == &kqswnal_api);
48         LASSERT (k == &kqswnal_data);
49         LASSERT (nal_cb == &kqswnal_lib);
50
51         lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
52         return (PTL_OK);
53 }
54
55 static void
56 kqswnal_lock (nal_t *nal, unsigned long *flags)
57 {
58         kqswnal_data_t *k = nal->nal_data;
59         nal_cb_t       *nal_cb = k->kqn_cb;
60
61         LASSERT (nal == &kqswnal_api);
62         LASSERT (k == &kqswnal_data);
63         LASSERT (nal_cb == &kqswnal_lib);
64
65         nal_cb->cb_cli(nal_cb,flags);
66 }
67
68 static void
69 kqswnal_unlock(nal_t *nal, unsigned long *flags)
70 {
71         kqswnal_data_t *k = nal->nal_data;
72         nal_cb_t       *nal_cb = k->kqn_cb;
73
74         LASSERT (nal == &kqswnal_api);
75         LASSERT (k == &kqswnal_data);
76         LASSERT (nal_cb == &kqswnal_lib);
77
78         nal_cb->cb_sti(nal_cb,flags);
79 }
80
81 static int
82 kqswnal_shutdown(nal_t *nal, int ni)
83 {
84         CDEBUG (D_NET, "shutdown\n");
85
86         LASSERT (nal == &kqswnal_api);
87         return (0);
88 }
89
90 static void
91 kqswnal_yield( nal_t *nal )
92 {
93         CDEBUG (D_NET, "yield\n");
94
95         if (current->need_resched)
96                 schedule();
97         return;
98 }
99
100 static nal_t *
101 kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
102              ptl_pid_t requested_pid)
103 {
104         ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
105         int       nnids = kqswnal_data.kqn_nnodes;
106
107         CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
108
109         lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
110
111         return (&kqswnal_api);
112 }
113
114 int
115 kqswnal_get_tx_desc (struct portal_ioctl_data *data)
116 {
117         unsigned long      flags;
118         struct list_head  *tmp;
119         kqswnal_tx_t      *ktx;
120         int                index = data->ioc_count;
121         int                rc = -ENOENT;
122
123         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
124
125         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
126                 if (index-- != 0)
127                         continue;
128                 
129                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
130
131                 data->ioc_pbuf1 = (char *)ktx;
132                 data->ioc_count = NTOH__u32(ktx->ktx_wire_hdr->type);
133                 data->ioc_size  = NTOH__u32(PTL_HDR_LENGTH(ktx->ktx_wire_hdr));
134                 data->ioc_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
135                 data->ioc_nid2  = ktx->ktx_nid;
136                 data->ioc_misc  = ktx->ktx_launcher;
137                 data->ioc_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
138                                   ((!ktx->ktx_forwarding)              ? 0 : 2) |
139                                   ((!ktx->ktx_isnblk)                  ? 0 : 4);
140
141                 rc = 0;
142                 break;
143         }
144         
145         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
146         return (rc);
147 }
148
149 int
150 kqswnal_cmd (struct portal_ioctl_data *data, void *private)
151 {
152         LASSERT (data != NULL);
153         
154         switch (data->ioc_nal_cmd) {
155         case NAL_CMD_GET_TXDESC:
156                 return (kqswnal_get_tx_desc (data));
157
158         case NAL_CMD_REGISTER_MYNID:
159                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
160                         data->ioc_nid - kqswnal_data.kqn_elanid,
161                         kqswnal_data.kqn_nid_offset);
162                 kqswnal_data.kqn_nid_offset =
163                         data->ioc_nid - kqswnal_data.kqn_elanid;
164                 kqswnal_lib.ni.nid = data->ioc_nid;
165                 return (0);
166                 
167         default:
168                 return (-EINVAL);
169         }
170 }
171
172 void __exit
173 kqswnal_finalise (void)
174 {
175         switch (kqswnal_data.kqn_init)
176         {
177         default:
178                 LASSERT (0);
179
180         case KQN_INIT_ALL:
181                 PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
182                 /* fall through */
183
184         case KQN_INIT_PTL:
185                 PtlNIFini (kqswnal_ni);
186                 lib_fini (&kqswnal_lib);
187                 /* fall through */
188
189         case KQN_INIT_DATA:
190                 break;
191
192         case KQN_INIT_NOTHING:
193                 return;
194         }
195
196         /**********************************************************************/
197         /* Make router stop her calling me and fail any more call-ins */
198         kpr_shutdown (&kqswnal_data.kqn_router);
199
200         /**********************************************************************/
201         /* flag threads to terminate, wake them and wait for them to die */
202
203         kqswnal_data.kqn_shuttingdown = 1;
204         wake_up_all (&kqswnal_data.kqn_sched_waitq);
205
206         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
207                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
208                        atomic_read (&kqswnal_data.kqn_nthreads));
209                 set_current_state (TASK_UNINTERRUPTIBLE);
210                 schedule_timeout (HZ);
211         }
212
213         /**********************************************************************/
214         /* close elan comms */
215
216         if (kqswnal_data.kqn_eprx_small != NULL)
217                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
218
219         if (kqswnal_data.kqn_eprx_large != NULL)
220                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
221
222         if (kqswnal_data.kqn_eptx != NULL)
223                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
224
225         /**********************************************************************/
226         /* No more threads.  No more portals, router or comms callbacks!
227          * I control the horizontals and the verticals...
228          */
229
230         /**********************************************************************/
231         /* Complete any blocked forwarding packets with error
232          */
233
234         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
235         {
236                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
237                                                   kpr_fwd_desc_t, kprfd_list);
238                 list_del (&fwd->kprfd_list);
239                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
240         }
241
242         while (!list_empty (&kqswnal_data.kqn_delayedfwds))
243         {
244                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
245                                                   kpr_fwd_desc_t, kprfd_list);
246                 list_del (&fwd->kprfd_list);
247                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
248         }
249
250         /**********************************************************************/
251         /* Wait for router to complete any packets I sent her
252          */
253
254         kpr_deregister (&kqswnal_data.kqn_router);
255
256
257         /**********************************************************************/
258         /* Unmap message buffers and free all descriptors and buffers
259          */
260
261         if (kqswnal_data.kqn_eprxdmahandle != NULL)
262         {
263                 elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
264                                   kqswnal_data.kqn_eprxdmahandle, 0,
265                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
266                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
267
268                 elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
269                                   kqswnal_data.kqn_eprxdmahandle);
270         }
271
272         if (kqswnal_data.kqn_eptxdmahandle != NULL)
273         {
274                 elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
275                                   kqswnal_data.kqn_eptxdmahandle, 0,
276                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
277                                                       KQSW_NNBLK_TXMSGS));
278
279                 elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
280                                   kqswnal_data.kqn_eptxdmahandle);
281         }
282
283         if (kqswnal_data.kqn_txds != NULL)
284         {
285                 int   i;
286
287                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
288                 {
289                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
290
291                         if (ktx->ktx_buffer != NULL)
292                                 PORTAL_FREE(ktx->ktx_buffer,
293                                             KQSW_TX_BUFFER_SIZE);
294                 }
295
296                 PORTAL_FREE(kqswnal_data.kqn_txds,
297                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
298                                                      KQSW_NNBLK_TXMSGS));
299         }
300
301         if (kqswnal_data.kqn_rxds != NULL)
302         {
303                 int   i;
304                 int   j;
305
306                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
307                 {
308                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
309
310                         for (j = 0; j < krx->krx_npages; j++)
311                                 if (krx->krx_pages[j] != NULL)
312                                         __free_page (krx->krx_pages[j]);
313                 }
314
315                 PORTAL_FREE(kqswnal_data.kqn_rxds,
316                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
317                                                     KQSW_NRXMSGS_LARGE));
318         }
319
320         /* resets flags, pointers to NULL etc */
321         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
322
323         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
324
325         printk (KERN_INFO "Routing QSW NAL unloaded (final mem %d)\n",
326                 atomic_read(&portal_kmemory));
327 }
328
329 static int __init
330 kqswnal_initialise (void)
331 {
332         ELAN3_DMA_REQUEST dmareq;
333         int               rc;
334         int               i;
335         int               elan_page_idx;
336         int               pkmem = atomic_read(&portal_kmemory);
337
338         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
339
340         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
341
342         kqswnal_api.forward  = kqswnal_forward;
343         kqswnal_api.shutdown = kqswnal_shutdown;
344         kqswnal_api.yield    = kqswnal_yield;
345         kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
346         kqswnal_api.lock     = kqswnal_lock;
347         kqswnal_api.unlock   = kqswnal_unlock;
348         kqswnal_api.nal_data = &kqswnal_data;
349
350         kqswnal_lib.nal_data = &kqswnal_data;
351
352         /* ensure all pointers NULL etc */
353         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
354
355         kqswnal_data.kqn_cb = &kqswnal_lib;
356
357         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
358         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
359         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
360         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
361         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
362         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
363
364         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
365         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
366         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
367
368         spin_lock_init (&kqswnal_data.kqn_sched_lock);
369         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
370
371         spin_lock_init (&kqswnal_data.kqn_statelock);
372
373         /* pointers/lists/locks initialised */
374         kqswnal_data.kqn_init = KQN_INIT_DATA;
375
376         /**********************************************************************/
377         /* Find the first Elan device */
378
379         kqswnal_data.kqn_epdev = ep_device (0);
380         if (kqswnal_data.kqn_epdev == NULL)
381         {
382                 CERROR ("Can't get elan device 0\n");
383                 return (-ENOMEM);
384         }
385
386         kqswnal_data.kqn_nid_offset = 0;
387         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_epdev);
388         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_epdev);
389         
390         /**********************************************************************/
391         /* Get the transmitter */
392
393         kqswnal_data.kqn_eptx = ep_alloc_large_xmtr (kqswnal_data.kqn_epdev);
394         if (kqswnal_data.kqn_eptx == NULL)
395         {
396                 CERROR ("Can't allocate transmitter\n");
397                 kqswnal_finalise ();
398                 return (-ENOMEM);
399         }
400
401         /**********************************************************************/
402         /* Get the receivers */
403
404         kqswnal_data.kqn_eprx_small = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
405                                                              EP_SVC_LARGE_PORTALS_SMALL,
406                                                              KQSW_EP_ENVELOPES_SMALL);
407         if (kqswnal_data.kqn_eprx_small == NULL)
408         {
409                 CERROR ("Can't install small msg receiver\n");
410                 kqswnal_finalise ();
411                 return (-ENOMEM);
412         }
413
414         kqswnal_data.kqn_eprx_large = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
415                                                              EP_SVC_LARGE_PORTALS_LARGE,
416                                                              KQSW_EP_ENVELOPES_LARGE);
417         if (kqswnal_data.kqn_eprx_large == NULL)
418         {
419                 CERROR ("Can't install large msg receiver\n");
420                 kqswnal_finalise ();
421                 return (-ENOMEM);
422         }
423
424         /**********************************************************************/
425         /* Reserve Elan address space for transmit buffers */
426
427         dmareq.Waitfn   = DDI_DMA_SLEEP;
428         dmareq.ElanAddr = (E3_Addr) 0;
429         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
430         dmareq.Perm     = ELAN_PERM_REMOTEREAD;
431
432         rc = elan3_dma_reserve(kqswnal_data.kqn_epdev->DmaState,
433                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
434                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
435         if (rc != DDI_SUCCESS)
436         {
437                 CERROR ("Can't reserve rx dma space\n");
438                 kqswnal_finalise ();
439                 return (-ENOMEM);
440         }
441
442         /**********************************************************************/
443         /* Reserve Elan address space for receive buffers */
444
445         dmareq.Waitfn   = DDI_DMA_SLEEP;
446         dmareq.ElanAddr = (E3_Addr) 0;
447         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
448         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
449
450         rc = elan3_dma_reserve (kqswnal_data.kqn_epdev->DmaState,
451                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
452                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
453                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
454         if (rc != DDI_SUCCESS)
455         {
456                 CERROR ("Can't reserve rx dma space\n");
457                 kqswnal_finalise ();
458                 return (-ENOMEM);
459         }
460
461         /**********************************************************************/
462         /* Allocate/Initialise transmit descriptors */
463
464         PORTAL_ALLOC(kqswnal_data.kqn_txds,
465                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
466         if (kqswnal_data.kqn_txds == NULL)
467         {
468                 kqswnal_finalise ();
469                 return (-ENOMEM);
470         }
471
472         /* clear flags, null pointers etc */
473         memset(kqswnal_data.kqn_txds, 0,
474                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
475         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
476         {
477                 int           premapped_pages;
478                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
479                 int           basepage = i * KQSW_NTXMSGPAGES;
480
481                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
482                 if (ktx->ktx_buffer == NULL)
483                 {
484                         kqswnal_finalise ();
485                         return (-ENOMEM);
486                 }
487
488                 /* Map pre-allocated buffer NOW, to save latency on transmit */
489                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
490                                                         KQSW_TX_BUFFER_SIZE);
491
492                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
493                                        kqswnal_data.kqn_eptxdmahandle,
494                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
495                                        basepage, &ktx->ktx_ebuffer);
496
497                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
498                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
499
500                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
501
502                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
503                 list_add_tail (&ktx->ktx_list, 
504                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
505                                                  &kqswnal_data.kqn_idletxds);
506         }
507
508         /**********************************************************************/
509         /* Allocate/Initialise receive descriptors */
510
511         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
512                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
513         if (kqswnal_data.kqn_rxds == NULL)
514         {
515                 kqswnal_finalise ();
516                 return (-ENOMEM);
517         }
518
519         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
520                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
521
522         elan_page_idx = 0;
523         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
524         {
525                 E3_Addr       elanaddr;
526                 int           j;
527                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
528
529                 if (i < KQSW_NRXMSGS_SMALL)
530                 {
531                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
532                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
533                 }
534                 else
535                 {
536                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
537                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
538                 }
539
540                 LASSERT (krx->krx_npages > 0);
541                 for (j = 0; j < krx->krx_npages; j++)
542                 {
543                         krx->krx_pages[j] = alloc_page(GFP_KERNEL);
544                         if (krx->krx_pages[j] == NULL)
545                         {
546                                 kqswnal_finalise ();
547                                 return (-ENOMEM);
548                         }
549
550                         LASSERT(page_address(krx->krx_pages[j]) != NULL);
551
552                         elan3_dvma_kaddr_load(kqswnal_data.kqn_epdev->DmaState,
553                                               kqswnal_data.kqn_eprxdmahandle,
554                                               page_address(krx->krx_pages[j]),
555                                               PAGE_SIZE, elan_page_idx,
556                                               &elanaddr);
557                         elan_page_idx++;
558
559                         if (j == 0)
560                                 krx->krx_elanaddr = elanaddr;
561
562                         /* NB we assume a contiguous  */
563                         LASSERT (elanaddr == krx->krx_elanaddr + j * PAGE_SIZE);
564                 }
565         }
566         LASSERT (elan_page_idx ==
567                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
568                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
569
570         /**********************************************************************/
571         /* Network interface ready to initialise */
572
573         rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
574         if (rc != 0)
575         {
576                 CERROR ("PtlNIInit failed %d\n", rc);
577                 kqswnal_finalise ();
578                 return (-ENOMEM);
579         }
580
581         kqswnal_data.kqn_init = KQN_INIT_PTL;
582
583         /**********************************************************************/
584         /* Queue receives, now that it's OK to run their completion callbacks */
585
586         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
587         {
588                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
589
590                 /* NB this enqueue can allocate/sleep (attr == 0) */
591                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
592                                       krx->krx_elanaddr,
593                                       krx->krx_npages * PAGE_SIZE, 0);
594                 if (rc != 0)
595                 {
596                         CERROR ("failed ep_queue_receive %d\n", rc);
597                         kqswnal_finalise ();
598                         return (-ENOMEM);
599                 }
600         }
601
602         /**********************************************************************/
603         /* Spawn scheduling threads */
604         for (i = 0; i < smp_num_cpus; i++)
605         {
606                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
607                 if (rc != 0)
608                 {
609                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
610                         kqswnal_finalise ();
611                         return (rc);
612                 }
613         }
614
615         /**********************************************************************/
616         /* Connect to the router */
617         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
618         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
619
620         rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
621         if (rc != 0) {
622                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
623                 kqswnal_finalise ();
624                 return (rc);
625         }
626
627         PORTAL_SYMBOL_REGISTER(kqswnal_ni);
628         kqswnal_data.kqn_init = KQN_INIT_ALL;
629
630         printk(KERN_INFO "Routing QSW NAL loaded on node %d of %d "
631                "(Routing %s, initial mem %d)\n", 
632                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
633                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
634                pkmem);
635
636         return (0);
637 }
638
639
640 MODULE_AUTHOR("W. Marcus Miller <marcusm@llnl.gov>");
641 MODULE_DESCRIPTION("Kernel Quadrics Switch NAL v1.00");
642 MODULE_LICENSE("GPL");
643
644 module_init (kqswnal_initialise);
645 module_exit (kqswnal_finalise);
646
647 EXPORT_SYMBOL (kqswnal_ni);