Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 ptl_handle_ni_t         kqswnal_ni;
28 nal_t                   kqswnal_api;
29 kqswnal_data_t          kqswnal_data;
30
31 kpr_nal_interface_t kqswnal_router_interface = {
32         kprni_nalid:    QSWNAL,
33         kprni_arg:      NULL,
34         kprni_fwd:      kqswnal_fwd_packet,
35         kprni_notify:   NULL,                   /* we're connectionless */
36 };
37
38
39 static int
40 kqswnal_forward(nal_t   *nal,
41                 int     id,
42                 void    *args,  size_t args_len,
43                 void    *ret,   size_t ret_len)
44 {
45         kqswnal_data_t *k = nal->nal_data;
46         nal_cb_t       *nal_cb = k->kqn_cb;
47
48         LASSERT (nal == &kqswnal_api);
49         LASSERT (k == &kqswnal_data);
50         LASSERT (nal_cb == &kqswnal_lib);
51
52         lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
53         return (PTL_OK);
54 }
55
56 static void
57 kqswnal_lock (nal_t *nal, unsigned long *flags)
58 {
59         kqswnal_data_t *k = nal->nal_data;
60         nal_cb_t       *nal_cb = k->kqn_cb;
61
62         LASSERT (nal == &kqswnal_api);
63         LASSERT (k == &kqswnal_data);
64         LASSERT (nal_cb == &kqswnal_lib);
65
66         nal_cb->cb_cli(nal_cb,flags);
67 }
68
69 static void
70 kqswnal_unlock(nal_t *nal, unsigned long *flags)
71 {
72         kqswnal_data_t *k = nal->nal_data;
73         nal_cb_t       *nal_cb = k->kqn_cb;
74
75         LASSERT (nal == &kqswnal_api);
76         LASSERT (k == &kqswnal_data);
77         LASSERT (nal_cb == &kqswnal_lib);
78
79         nal_cb->cb_sti(nal_cb,flags);
80 }
81
82 static int
83 kqswnal_shutdown(nal_t *nal, int ni)
84 {
85         CDEBUG (D_NET, "shutdown\n");
86
87         LASSERT (nal == &kqswnal_api);
88         return (0);
89 }
90
91 static void
92 kqswnal_yield( nal_t *nal )
93 {
94         CDEBUG (D_NET, "yield\n");
95
96         if (current->need_resched)
97                 schedule();
98         return;
99 }
100
101 static nal_t *
102 kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
103              ptl_pid_t requested_pid)
104 {
105         ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
106         int       nnids = kqswnal_data.kqn_nnodes;
107
108         CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
109
110         lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
111
112         return (&kqswnal_api);
113 }
114
115 int
116 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
117 {
118         unsigned long      flags;
119         struct list_head  *tmp;
120         kqswnal_tx_t      *ktx;
121         int                index = pcfg->pcfg_count;
122         int                rc = -ENOENT;
123
124         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
125
126         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
127                 if (index-- != 0)
128                         continue;
129
130                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
131
132                 pcfg->pcfg_pbuf1 = (char *)ktx;
133                 pcfg->pcfg_count = NTOH__u32(ktx->ktx_wire_hdr->type);
134                 pcfg->pcfg_size  = NTOH__u32(ktx->ktx_wire_hdr->payload_length);
135                 pcfg->pcfg_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
136                 pcfg->pcfg_nid2  = ktx->ktx_nid;
137                 pcfg->pcfg_misc  = ktx->ktx_launcher;
138                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
139                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
140                                   (ktx->ktx_state << 2);
141                 rc = 0;
142                 break;
143         }
144         
145         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
146         return (rc);
147 }
148
149 int
150 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
151 {
152         LASSERT (pcfg != NULL);
153         
154         switch (pcfg->pcfg_command) {
155         case NAL_CMD_GET_TXDESC:
156                 return (kqswnal_get_tx_desc (pcfg));
157
158         case NAL_CMD_REGISTER_MYNID:
159                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
160                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
161                         kqswnal_data.kqn_nid_offset);
162                 kqswnal_data.kqn_nid_offset =
163                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
164                 kqswnal_lib.ni.nid = pcfg->pcfg_nid;
165                 return (0);
166                 
167         default:
168                 return (-EINVAL);
169         }
170 }
171
172 void __exit
173 kqswnal_finalise (void)
174 {
175         switch (kqswnal_data.kqn_init)
176         {
177         default:
178                 LASSERT (0);
179
180         case KQN_INIT_ALL:
181                 PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
182                 kportal_nal_unregister(QSWNAL);
183                 /* fall through */
184
185         case KQN_INIT_PTL:
186                 PtlNIFini (kqswnal_ni);
187                 lib_fini (&kqswnal_lib);
188                 /* fall through */
189
190         case KQN_INIT_DATA:
191                 LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
192                 break;
193
194         case KQN_INIT_NOTHING:
195                 return;
196         }
197
198         /**********************************************************************/
199         /* Make router stop her calling me and fail any more call-ins */
200         kpr_shutdown (&kqswnal_data.kqn_router);
201
202         /**********************************************************************/
203         /* flag threads to terminate, wake them and wait for them to die */
204
205         kqswnal_data.kqn_shuttingdown = 1;
206         wake_up_all (&kqswnal_data.kqn_sched_waitq);
207
208         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
209                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
210                        atomic_read (&kqswnal_data.kqn_nthreads));
211                 set_current_state (TASK_UNINTERRUPTIBLE);
212                 schedule_timeout (HZ);
213         }
214
215         /**********************************************************************/
216         /* close elan comms */
217
218         if (kqswnal_data.kqn_eprx_small != NULL)
219                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
220
221         if (kqswnal_data.kqn_eprx_large != NULL)
222                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
223
224         if (kqswnal_data.kqn_eptx != NULL)
225                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
226
227         /**********************************************************************/
228         /* No more threads.  No more portals, router or comms callbacks!
229          * I control the horizontals and the verticals...
230          */
231
232         /**********************************************************************/
233         /* Complete any blocked forwarding packets with error
234          */
235
236         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
237         {
238                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
239                                                   kpr_fwd_desc_t, kprfd_list);
240                 list_del (&fwd->kprfd_list);
241                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
242         }
243
244         while (!list_empty (&kqswnal_data.kqn_delayedfwds))
245         {
246                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
247                                                   kpr_fwd_desc_t, kprfd_list);
248                 list_del (&fwd->kprfd_list);
249                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
250         }
251
252         /**********************************************************************/
253         /* Wait for router to complete any packets I sent her
254          */
255
256         kpr_deregister (&kqswnal_data.kqn_router);
257
258
259         /**********************************************************************/
260         /* Unmap message buffers and free all descriptors and buffers
261          */
262
263         if (kqswnal_data.kqn_eprxdmahandle != NULL)
264         {
265                 elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
266                                   kqswnal_data.kqn_eprxdmahandle, 0,
267                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
268                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
269
270                 elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
271                                   kqswnal_data.kqn_eprxdmahandle);
272         }
273
274         if (kqswnal_data.kqn_eptxdmahandle != NULL)
275         {
276                 elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
277                                   kqswnal_data.kqn_eptxdmahandle, 0,
278                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
279                                                       KQSW_NNBLK_TXMSGS));
280
281                 elan3_dma_release(kqswnal_data.kqn_epdev->DmaState,
282                                   kqswnal_data.kqn_eptxdmahandle);
283         }
284
285         if (kqswnal_data.kqn_txds != NULL)
286         {
287                 int   i;
288
289                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
290                 {
291                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
292
293                         if (ktx->ktx_buffer != NULL)
294                                 PORTAL_FREE(ktx->ktx_buffer,
295                                             KQSW_TX_BUFFER_SIZE);
296                 }
297
298                 PORTAL_FREE(kqswnal_data.kqn_txds,
299                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
300                                                      KQSW_NNBLK_TXMSGS));
301         }
302
303         if (kqswnal_data.kqn_rxds != NULL)
304         {
305                 int   i;
306                 int   j;
307
308                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
309                 {
310                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
311
312                         for (j = 0; j < krx->krx_npages; j++)
313                                 if (krx->krx_pages[j] != NULL)
314                                         __free_page (krx->krx_pages[j]);
315                 }
316
317                 PORTAL_FREE(kqswnal_data.kqn_rxds,
318                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
319                                                     KQSW_NRXMSGS_LARGE));
320         }
321
322         /* resets flags, pointers to NULL etc */
323         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
324
325         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
326
327         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
328                 atomic_read(&portal_kmemory));
329 }
330
331 static int __init
332 kqswnal_initialise (void)
333 {
334         ELAN3_DMA_REQUEST dmareq;
335         int               rc;
336         int               i;
337         int               elan_page_idx;
338         int               pkmem = atomic_read(&portal_kmemory);
339
340         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
341
342         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
343
344         kqswnal_api.forward  = kqswnal_forward;
345         kqswnal_api.shutdown = kqswnal_shutdown;
346         kqswnal_api.yield    = kqswnal_yield;
347         kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
348         kqswnal_api.lock     = kqswnal_lock;
349         kqswnal_api.unlock   = kqswnal_unlock;
350         kqswnal_api.nal_data = &kqswnal_data;
351
352         kqswnal_lib.nal_data = &kqswnal_data;
353
354         /* ensure all pointers NULL etc */
355         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
356
357         kqswnal_data.kqn_cb = &kqswnal_lib;
358
359         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
360         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
361         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
362         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
363         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
364         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
365
366         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
367         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
368         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
369
370         spin_lock_init (&kqswnal_data.kqn_sched_lock);
371         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
372
373         spin_lock_init (&kqswnal_data.kqn_statelock);
374
375         /* pointers/lists/locks initialised */
376         kqswnal_data.kqn_init = KQN_INIT_DATA;
377
378         /**********************************************************************/
379         /* Find the first Elan device */
380
381         kqswnal_data.kqn_epdev = ep_device (0);
382         if (kqswnal_data.kqn_epdev == NULL)
383         {
384                 CERROR ("Can't get elan device 0\n");
385                 return (-ENOMEM);
386         }
387
388         kqswnal_data.kqn_nid_offset = 0;
389         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_epdev);
390         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_epdev);
391         
392         /**********************************************************************/
393         /* Get the transmitter */
394
395         kqswnal_data.kqn_eptx = ep_alloc_large_xmtr (kqswnal_data.kqn_epdev);
396         if (kqswnal_data.kqn_eptx == NULL)
397         {
398                 CERROR ("Can't allocate transmitter\n");
399                 kqswnal_finalise ();
400                 return (-ENOMEM);
401         }
402
403         /**********************************************************************/
404         /* Get the receivers */
405
406         kqswnal_data.kqn_eprx_small = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
407                                                              EP_SVC_LARGE_PORTALS_SMALL,
408                                                              KQSW_EP_ENVELOPES_SMALL);
409         if (kqswnal_data.kqn_eprx_small == NULL)
410         {
411                 CERROR ("Can't install small msg receiver\n");
412                 kqswnal_finalise ();
413                 return (-ENOMEM);
414         }
415
416         kqswnal_data.kqn_eprx_large = ep_install_large_rcvr (kqswnal_data.kqn_epdev,
417                                                              EP_SVC_LARGE_PORTALS_LARGE,
418                                                              KQSW_EP_ENVELOPES_LARGE);
419         if (kqswnal_data.kqn_eprx_large == NULL)
420         {
421                 CERROR ("Can't install large msg receiver\n");
422                 kqswnal_finalise ();
423                 return (-ENOMEM);
424         }
425
426         /**********************************************************************/
427         /* Reserve Elan address space for transmit descriptors NB we may
428          * either send the contents of associated buffers immediately, or
429          * map them for the peer to suck/blow... */
430
431         dmareq.Waitfn   = DDI_DMA_SLEEP;
432         dmareq.ElanAddr = (E3_Addr) 0;
433         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
434         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
435
436         rc = elan3_dma_reserve(kqswnal_data.kqn_epdev->DmaState,
437                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
438                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
439         if (rc != DDI_SUCCESS)
440         {
441                 CERROR ("Can't reserve rx dma space\n");
442                 kqswnal_finalise ();
443                 return (-ENOMEM);
444         }
445
446         /**********************************************************************/
447         /* Reserve Elan address space for receive buffers */
448
449         dmareq.Waitfn   = DDI_DMA_SLEEP;
450         dmareq.ElanAddr = (E3_Addr) 0;
451         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
452         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
453
454         rc = elan3_dma_reserve (kqswnal_data.kqn_epdev->DmaState,
455                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
456                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
457                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
458         if (rc != DDI_SUCCESS)
459         {
460                 CERROR ("Can't reserve rx dma space\n");
461                 kqswnal_finalise ();
462                 return (-ENOMEM);
463         }
464
465         /**********************************************************************/
466         /* Allocate/Initialise transmit descriptors */
467
468         PORTAL_ALLOC(kqswnal_data.kqn_txds,
469                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
470         if (kqswnal_data.kqn_txds == NULL)
471         {
472                 kqswnal_finalise ();
473                 return (-ENOMEM);
474         }
475
476         /* clear flags, null pointers etc */
477         memset(kqswnal_data.kqn_txds, 0,
478                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
479         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
480         {
481                 int           premapped_pages;
482                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
483                 int           basepage = i * KQSW_NTXMSGPAGES;
484
485                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
486                 if (ktx->ktx_buffer == NULL)
487                 {
488                         kqswnal_finalise ();
489                         return (-ENOMEM);
490                 }
491
492                 /* Map pre-allocated buffer NOW, to save latency on transmit */
493                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
494                                                         KQSW_TX_BUFFER_SIZE);
495
496                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
497                                        kqswnal_data.kqn_eptxdmahandle,
498                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
499                                        basepage, &ktx->ktx_ebuffer);
500
501                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
502                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
503
504                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
505
506                 ktx->ktx_state = KTX_IDLE;
507                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
508                 list_add_tail (&ktx->ktx_list, 
509                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
510                                                  &kqswnal_data.kqn_idletxds);
511         }
512
513         /**********************************************************************/
514         /* Allocate/Initialise receive descriptors */
515
516         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
517                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
518         if (kqswnal_data.kqn_rxds == NULL)
519         {
520                 kqswnal_finalise ();
521                 return (-ENOMEM);
522         }
523
524         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
525                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
526
527         elan_page_idx = 0;
528         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
529         {
530                 E3_Addr       elanaddr;
531                 int           j;
532                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
533
534                 if (i < KQSW_NRXMSGS_SMALL)
535                 {
536                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
537                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
538                 }
539                 else
540                 {
541                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
542                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
543                 }
544
545                 LASSERT (krx->krx_npages > 0);
546                 for (j = 0; j < krx->krx_npages; j++)
547                 {
548                         krx->krx_pages[j] = alloc_page(GFP_KERNEL);
549                         if (krx->krx_pages[j] == NULL)
550                         {
551                                 kqswnal_finalise ();
552                                 return (-ENOMEM);
553                         }
554
555                         LASSERT(page_address(krx->krx_pages[j]) != NULL);
556
557                         elan3_dvma_kaddr_load(kqswnal_data.kqn_epdev->DmaState,
558                                               kqswnal_data.kqn_eprxdmahandle,
559                                               page_address(krx->krx_pages[j]),
560                                               PAGE_SIZE, elan_page_idx,
561                                               &elanaddr);
562                         elan_page_idx++;
563
564                         if (j == 0)
565                                 krx->krx_elanaddr = elanaddr;
566
567                         /* NB we assume a contiguous  */
568                         LASSERT (elanaddr == krx->krx_elanaddr + j * PAGE_SIZE);
569                 }
570         }
571         LASSERT (elan_page_idx ==
572                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
573                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
574
575         /**********************************************************************/
576         /* Network interface ready to initialise */
577
578         rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
579         if (rc != 0)
580         {
581                 CERROR ("PtlNIInit failed %d\n", rc);
582                 kqswnal_finalise ();
583                 return (-ENOMEM);
584         }
585
586         kqswnal_data.kqn_init = KQN_INIT_PTL;
587
588         /**********************************************************************/
589         /* Queue receives, now that it's OK to run their completion callbacks */
590
591         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
592         {
593                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
594
595                 /* NB this enqueue can allocate/sleep (attr == 0) */
596                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
597                                       krx->krx_elanaddr,
598                                       krx->krx_npages * PAGE_SIZE, 0);
599                 if (rc != ESUCCESS)
600                 {
601                         CERROR ("failed ep_queue_receive %d\n", rc);
602                         kqswnal_finalise ();
603                         return (-ENOMEM);
604                 }
605         }
606
607         /**********************************************************************/
608         /* Spawn scheduling threads */
609         for (i = 0; i < smp_num_cpus; i++)
610         {
611                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
612                 if (rc != 0)
613                 {
614                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
615                         kqswnal_finalise ();
616                         return (rc);
617                 }
618         }
619
620         /**********************************************************************/
621         /* Connect to the router */
622         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
623         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
624
625         rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
626         if (rc != 0) {
627                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
628                 kqswnal_finalise ();
629                 return (rc);
630         }
631
632         PORTAL_SYMBOL_REGISTER(kqswnal_ni);
633         kqswnal_data.kqn_init = KQN_INIT_ALL;
634
635         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
636                "(Routing %s, initial mem %d)\n", 
637                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
638                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
639                pkmem);
640
641         return (0);
642 }
643
644
645 MODULE_AUTHOR("W. Marcus Miller <marcusm@llnl.gov>");
646 MODULE_DESCRIPTION("Kernel Quadrics Switch NAL v1.00");
647 MODULE_LICENSE("GPL");
648
649 module_init (kqswnal_initialise);
650 module_exit (kqswnal_finalise);
651
652 EXPORT_SYMBOL (kqswnal_ni);