Whamcloud - gitweb
0841d641dc3134709c69e280a35b9feb46f74913
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 ptl_handle_ni_t         kqswnal_ni;
28 nal_t                   kqswnal_api;
29 kqswnal_data_t          kqswnal_data;
30
31 kpr_nal_interface_t kqswnal_router_interface = {
32         kprni_nalid:    QSWNAL,
33         kprni_arg:      NULL,
34         kprni_fwd:      kqswnal_fwd_packet,
35         kprni_notify:   NULL,                   /* we're connectionless */
36 };
37
38
39 static int
40 kqswnal_forward(nal_t   *nal,
41                 int     id,
42                 void    *args,  size_t args_len,
43                 void    *ret,   size_t ret_len)
44 {
45         kqswnal_data_t *k = nal->nal_data;
46         nal_cb_t       *nal_cb = k->kqn_cb;
47
48         LASSERT (nal == &kqswnal_api);
49         LASSERT (k == &kqswnal_data);
50         LASSERT (nal_cb == &kqswnal_lib);
51
52         lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
53         return (PTL_OK);
54 }
55
56 static void
57 kqswnal_lock (nal_t *nal, unsigned long *flags)
58 {
59         kqswnal_data_t *k = nal->nal_data;
60         nal_cb_t       *nal_cb = k->kqn_cb;
61
62         LASSERT (nal == &kqswnal_api);
63         LASSERT (k == &kqswnal_data);
64         LASSERT (nal_cb == &kqswnal_lib);
65
66         nal_cb->cb_cli(nal_cb,flags);
67 }
68
69 static void
70 kqswnal_unlock(nal_t *nal, unsigned long *flags)
71 {
72         kqswnal_data_t *k = nal->nal_data;
73         nal_cb_t       *nal_cb = k->kqn_cb;
74
75         LASSERT (nal == &kqswnal_api);
76         LASSERT (k == &kqswnal_data);
77         LASSERT (nal_cb == &kqswnal_lib);
78
79         nal_cb->cb_sti(nal_cb,flags);
80 }
81
82 static int
83 kqswnal_shutdown(nal_t *nal, int ni)
84 {
85         CDEBUG (D_NET, "shutdown\n");
86
87         LASSERT (nal == &kqswnal_api);
88         return (0);
89 }
90
91 static void
92 kqswnal_yield( nal_t *nal )
93 {
94         CDEBUG (D_NET, "yield\n");
95
96         if (current->need_resched)
97                 schedule();
98         return;
99 }
100
101 static nal_t *
102 kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
103              ptl_pid_t requested_pid)
104 {
105         ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
106         int       nnids = kqswnal_data.kqn_nnodes;
107
108         CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
109
110         lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
111
112         return (&kqswnal_api);
113 }
114
115 int
116 kqswnal_get_tx_desc (struct portal_ioctl_data *data)
117 {
118         unsigned long      flags;
119         struct list_head  *tmp;
120         kqswnal_tx_t      *ktx;
121         int                index = data->ioc_count;
122         int                rc = -ENOENT;
123
124         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
125
126         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
127                 if (index-- != 0)
128                         continue;
129                 
130                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
131
132                 data->ioc_pbuf1 = (char *)ktx;
133                 data->ioc_count = NTOH__u32(ktx->ktx_wire_hdr->type);
134                 data->ioc_size  = NTOH__u32(PTL_HDR_LENGTH(ktx->ktx_wire_hdr));
135                 data->ioc_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
136                 data->ioc_nid2  = ktx->ktx_nid;
137                 data->ioc_misc  = ktx->ktx_launcher;
138                 data->ioc_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
139                                   ((!ktx->ktx_forwarding)              ? 0 : 2) |
140                                   ((!ktx->ktx_isnblk)                  ? 0 : 4);
141
142                 rc = 0;
143                 break;
144         }
145         
146         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
147         return (rc);
148 }
149
150 int
151 kqswnal_cmd (struct portal_ioctl_data *data, void *private)
152 {
153         LASSERT (data != NULL);
154         
155         switch (data->ioc_nal_cmd) {
156         case NAL_CMD_GET_TXDESC:
157                 return (kqswnal_get_tx_desc (data));
158
159         case NAL_CMD_REGISTER_MYNID:
160                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
161                         data->ioc_nid - kqswnal_data.kqn_elanid,
162                         kqswnal_data.kqn_nid_offset);
163                 kqswnal_data.kqn_nid_offset =
164                         data->ioc_nid - kqswnal_data.kqn_elanid;
165                 kqswnal_lib.ni.nid = data->ioc_nid;
166                 return (0);
167                 
168         default:
169                 return (-EINVAL);
170         }
171 }
172
173 void __exit
174 kqswnal_finalise (void)
175 {
176         switch (kqswnal_data.kqn_init)
177         {
178         default:
179                 LASSERT (0);
180
181         case KQN_INIT_ALL:
182                 PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
183                 /* fall through */
184
185         case KQN_INIT_PTL:
186                 PtlNIFini (kqswnal_ni);
187                 lib_fini (&kqswnal_lib);
188                 /* fall through */
189
190         case KQN_INIT_DATA:
191                 break;
192
193         case KQN_INIT_NOTHING:
194                 return;
195         }
196
197         /**********************************************************************/
198         /* Make router stop her calling me and fail any more call-ins */
199         kpr_shutdown (&kqswnal_data.kqn_router);
200
201         /**********************************************************************/
202         /* flag threads to terminate, wake them and wait for them to die */
203
204         kqswnal_data.kqn_shuttingdown = 1;
205         wake_up_all (&kqswnal_data.kqn_sched_waitq);
206
207         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
208                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
209                        atomic_read (&kqswnal_data.kqn_nthreads));
210                 set_current_state (TASK_UNINTERRUPTIBLE);
211                 schedule_timeout (HZ);
212         }
213
214         /**********************************************************************/
215         /* close elan comms */
216
217         if (kqswnal_data.kqn_eprx_small != NULL)
218                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
219
220         if (kqswnal_data.kqn_eprx_large != NULL)
221                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
222
223         if (kqswnal_data.kqn_eptx != NULL)
224                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
225
226         /**********************************************************************/
227         /* No more threads.  No more portals, router or comms callbacks!
228          * I control the horizontals and the verticals...
229          */
230
231         /**********************************************************************/
232         /* Complete any blocked forwarding packets with error
233          */
234
235         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
236         {
237                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
238                                                   kpr_fwd_desc_t, kprfd_list);
239                 list_del (&fwd->kprfd_list);
240                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
241         }
242
243         while (!list_empty (&kqswnal_data.kqn_delayedfwds))
244         {
245                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
246                                                   kpr_fwd_desc_t, kprfd_list);
247                 list_del (&fwd->kprfd_list);
248                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
249         }
250
251         /**********************************************************************/
252         /* Wait for router to complete any packets I sent her
253          */
254
255         kpr_deregister (&kqswnal_data.kqn_router);
256
257
258         /**********************************************************************/
259         /* Unmap message buffers and free all descriptors and buffers
260          */
261
262         if (kqswnal_data.kqn_eprxdmahandle != NULL)
263         {
264                 elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
265                                   kqswnal_data.kqn_eprxdmahandle, 0,
266                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
267                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
268
269                 elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
270                                   kqswnal_data.kqn_eprxdmahandle);
271         }
272
273         if (kqswnal_data.kqn_eptxdmahandle != NULL)
274         {
275                 elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
276                                   kqswnal_data.kqn_eptxdmahandle, 0,
277                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
278                                                       KQSW_NNBLK_TXMSGS));
279
280                 elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
281                                   kqswnal_data.kqn_eptxdmahandle);
282         }
283
284         if (kqswnal_data.kqn_txds != NULL)
285         {
286                 int   i;
287
288                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
289                 {
290                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
291
292                         if (ktx->ktx_buffer != NULL)
293                                 PORTAL_FREE(ktx->ktx_buffer,
294                                             KQSW_TX_BUFFER_SIZE);
295                 }
296
297                 PORTAL_FREE(kqswnal_data.kqn_txds,
298                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
299                                                      KQSW_NNBLK_TXMSGS));
300         }
301
302         if (kqswnal_data.kqn_rxds != NULL)
303         {
304                 int   i;
305                 int   j;
306
307                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
308                 {
309                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
310
311                         for (j = 0; j < krx->krx_npages; j++)
312                                 if (krx->krx_pages[j] != NULL)
313                                         __free_page (krx->krx_pages[j]);
314                 }
315
316                 PORTAL_FREE(kqswnal_data.kqn_rxds,
317                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
318                                                     KQSW_NRXMSGS_LARGE));
319         }
320
321         /* resets flags, pointers to NULL etc */
322         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
323
324         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
325
326         printk (KERN_INFO "Routing QSW NAL unloaded (final mem %d)\n",
327                 atomic_read(&portal_kmemory));
328 }
329
330 static int __init
331 kqswnal_initialise (void)
332 {
333         ELAN3_DMA_REQUEST dmareq;
334         int               rc;
335         int               i;
336         int               elan_page_idx;
337         int               pkmem = atomic_read(&portal_kmemory);
338
339         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
340
341         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
342
343         kqswnal_api.forward  = kqswnal_forward;
344         kqswnal_api.shutdown = kqswnal_shutdown;
345         kqswnal_api.yield    = kqswnal_yield;
346         kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
347         kqswnal_api.lock     = kqswnal_lock;
348         kqswnal_api.unlock   = kqswnal_unlock;
349         kqswnal_api.nal_data = &kqswnal_data;
350
351         kqswnal_lib.nal_data = &kqswnal_data;
352
353         /* ensure all pointers NULL etc */
354         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
355
356         kqswnal_data.kqn_cb = &kqswnal_lib;
357
358         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
359         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
360         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
361         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
362         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
363         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
364
365         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
366         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
367         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
368
369         spin_lock_init (&kqswnal_data.kqn_sched_lock);
370         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
371
372         spin_lock_init (&kqswnal_data.kqn_statelock);
373
374         /* pointers/lists/locks initialised */
375         kqswnal_data.kqn_init = KQN_INIT_DATA;
376
377         /**********************************************************************/
378         /* Find the first Elan device */
379
380         kqswnal_data.kqn_epdev = ep_device (0);
381         if (kqswnal_data.kqn_epdev == NULL)
382         {
383                 CERROR ("Can't get elan device 0\n");
384                 return (-ENOMEM);
385         }
386
387         kqswnal_data.kqn_nid_offset = 0;
388         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_epdev);
389         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_epdev);
390         
391         /**********************************************************************/
392         /* Get the transmitter */
393
394         kqswnal_data.kqn_eptx = ep_alloc_large_xmtr (kqswnal_data.kqn_epdev);
395         if (kqswnal_data.kqn_eptx == NULL)
396         {
397                 CERROR ("Can't allocate transmitter\n");
398                 kqswnal_finalise ();
399                 return (-ENOMEM);
400         }
401
402         /**********************************************************************/
403         /* Get the receivers */
404
405         kqswnal_data.kqn_eprx_small = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
406                                                              EP_SVC_LARGE_PORTALS_SMALL,
407                                                              KQSW_EP_ENVELOPES_SMALL);
408         if (kqswnal_data.kqn_eprx_small == NULL)
409         {
410                 CERROR ("Can't install small msg receiver\n");
411                 kqswnal_finalise ();
412                 return (-ENOMEM);
413         }
414
415         kqswnal_data.kqn_eprx_large = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
416                                                              EP_SVC_LARGE_PORTALS_LARGE,
417                                                              KQSW_EP_ENVELOPES_LARGE);
418         if (kqswnal_data.kqn_eprx_large == NULL)
419         {
420                 CERROR ("Can't install large msg receiver\n");
421                 kqswnal_finalise ();
422                 return (-ENOMEM);
423         }
424
425         /**********************************************************************/
426         /* Reserve Elan address space for transmit buffers */
427
428         dmareq.Waitfn   = DDI_DMA_SLEEP;
429         dmareq.ElanAddr = (E3_Addr) 0;
430         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
431         dmareq.Perm     = ELAN_PERM_REMOTEREAD;
432
433         rc = elan3_dma_reserve(kqswnal_data.kqn_epdev->DmaState,
434                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
435                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
436         if (rc != DDI_SUCCESS)
437         {
438                 CERROR ("Can't reserve rx dma space\n");
439                 kqswnal_finalise ();
440                 return (-ENOMEM);
441         }
442
443         /**********************************************************************/
444         /* Reserve Elan address space for receive buffers */
445
446         dmareq.Waitfn   = DDI_DMA_SLEEP;
447         dmareq.ElanAddr = (E3_Addr) 0;
448         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
449         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
450
451         rc = elan3_dma_reserve (kqswnal_data.kqn_epdev->DmaState,
452                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
453                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
454                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
455         if (rc != DDI_SUCCESS)
456         {
457                 CERROR ("Can't reserve rx dma space\n");
458                 kqswnal_finalise ();
459                 return (-ENOMEM);
460         }
461
462         /**********************************************************************/
463         /* Allocate/Initialise transmit descriptors */
464
465         PORTAL_ALLOC(kqswnal_data.kqn_txds,
466                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
467         if (kqswnal_data.kqn_txds == NULL)
468         {
469                 kqswnal_finalise ();
470                 return (-ENOMEM);
471         }
472
473         /* clear flags, null pointers etc */
474         memset(kqswnal_data.kqn_txds, 0,
475                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
476         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
477         {
478                 int           premapped_pages;
479                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
480                 int           basepage = i * KQSW_NTXMSGPAGES;
481
482                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
483                 if (ktx->ktx_buffer == NULL)
484                 {
485                         kqswnal_finalise ();
486                         return (-ENOMEM);
487                 }
488
489                 /* Map pre-allocated buffer NOW, to save latency on transmit */
490                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
491                                                         KQSW_TX_BUFFER_SIZE);
492
493                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
494                                        kqswnal_data.kqn_eptxdmahandle,
495                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
496                                        basepage, &ktx->ktx_ebuffer);
497
498                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
499                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
500
501                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
502
503                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
504                 list_add_tail (&ktx->ktx_list, 
505                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
506                                                  &kqswnal_data.kqn_idletxds);
507         }
508
509         /**********************************************************************/
510         /* Allocate/Initialise receive descriptors */
511
512         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
513                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
514         if (kqswnal_data.kqn_rxds == NULL)
515         {
516                 kqswnal_finalise ();
517                 return (-ENOMEM);
518         }
519
520         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
521                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
522
523         elan_page_idx = 0;
524         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
525         {
526                 E3_Addr       elanaddr;
527                 int           j;
528                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
529
530                 if (i < KQSW_NRXMSGS_SMALL)
531                 {
532                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
533                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
534                 }
535                 else
536                 {
537                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
538                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
539                 }
540
541                 LASSERT (krx->krx_npages > 0);
542                 for (j = 0; j < krx->krx_npages; j++)
543                 {
544                         krx->krx_pages[j] = alloc_page(GFP_KERNEL);
545                         if (krx->krx_pages[j] == NULL)
546                         {
547                                 kqswnal_finalise ();
548                                 return (-ENOMEM);
549                         }
550
551                         LASSERT(page_address(krx->krx_pages[j]) != NULL);
552
553                         elan3_dvma_kaddr_load(kqswnal_data.kqn_epdev->DmaState,
554                                               kqswnal_data.kqn_eprxdmahandle,
555                                               page_address(krx->krx_pages[j]),
556                                               PAGE_SIZE, elan_page_idx,
557                                               &elanaddr);
558                         elan_page_idx++;
559
560                         if (j == 0)
561                                 krx->krx_elanaddr = elanaddr;
562
563                         /* NB we assume a contiguous  */
564                         LASSERT (elanaddr == krx->krx_elanaddr + j * PAGE_SIZE);
565                 }
566         }
567         LASSERT (elan_page_idx ==
568                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
569                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
570
571         /**********************************************************************/
572         /* Network interface ready to initialise */
573
574         rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
575         if (rc != 0)
576         {
577                 CERROR ("PtlNIInit failed %d\n", rc);
578                 kqswnal_finalise ();
579                 return (-ENOMEM);
580         }
581
582         kqswnal_data.kqn_init = KQN_INIT_PTL;
583
584         /**********************************************************************/
585         /* Queue receives, now that it's OK to run their completion callbacks */
586
587         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
588         {
589                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
590
591                 /* NB this enqueue can allocate/sleep (attr == 0) */
592                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
593                                       krx->krx_elanaddr,
594                                       krx->krx_npages * PAGE_SIZE, 0);
595                 if (rc != 0)
596                 {
597                         CERROR ("failed ep_queue_receive %d\n", rc);
598                         kqswnal_finalise ();
599                         return (-ENOMEM);
600                 }
601         }
602
603         /**********************************************************************/
604         /* Spawn scheduling threads */
605         for (i = 0; i < smp_num_cpus; i++)
606         {
607                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
608                 if (rc != 0)
609                 {
610                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
611                         kqswnal_finalise ();
612                         return (rc);
613                 }
614         }
615
616         /**********************************************************************/
617         /* Connect to the router */
618         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
619         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
620
621         rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
622         if (rc != 0) {
623                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
624                 kqswnal_finalise ();
625                 return (rc);
626         }
627
628         PORTAL_SYMBOL_REGISTER(kqswnal_ni);
629         kqswnal_data.kqn_init = KQN_INIT_ALL;
630
631         printk(KERN_INFO "Routing QSW NAL loaded on node %d of %d "
632                "(Routing %s, initial mem %d)\n", 
633                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
634                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
635                pkmem);
636
637         return (0);
638 }
639
640
641 MODULE_AUTHOR("W. Marcus Miller <marcusm@llnl.gov>");
642 MODULE_DESCRIPTION("Kernel Quadrics Switch NAL v1.00");
643 MODULE_LICENSE("GPL");
644
645 module_init (kqswnal_initialise);
646 module_exit (kqswnal_finalise);
647
648 EXPORT_SYMBOL (kqswnal_ni);