Whamcloud - gitweb
Branch: b_cray
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002-2004 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * This file is part of Portals, http://www.lustre.org
6  *
7  * Portals is free software; you can redistribute it and/or
8  * modify it under the terms of version 2 of the GNU General Public
9  * License as published by the Free Software Foundation.
10  *
11  * Portals is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Portals; if not, write to the Free Software
18  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19  *
20  */
21
22 #include "qswnal.h"
23
24 nal_t                   kqswnal_api;
25 kqswnal_data_t          kqswnal_data;
26 ptl_handle_ni_t         kqswnal_ni;
27 kqswnal_tunables_t      kqswnal_tunables;
28
29 kpr_nal_interface_t kqswnal_router_interface = {
30         kprni_nalid:    QSWNAL,
31         kprni_arg:      NULL,
32         kprni_fwd:      kqswnal_fwd_packet,
33         kprni_notify:   NULL,                   /* we're connectionless */
34 };
35
36 #if CONFIG_SYSCTL
37 #define QSWNAL_SYSCTL  201
38
39 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
40 #define QSWNAL_SYSCTL_OPTIMIZED_PUTS     2
41
42 static ctl_table kqswnal_ctl_table[] = {
43         {QSWNAL_SYSCTL_OPTIMIZED_PUTS, "optimized_puts",
44          &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
45          0644, NULL, &proc_dointvec},
46         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
47          &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
48          0644, NULL, &proc_dointvec},
49         {0}
50 };
51
52 static ctl_table kqswnal_top_ctl_table[] = {
53         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
54         {0}
55 };
56 #endif
57
58 int
59 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
60 {
61         unsigned long      flags;
62         struct list_head  *tmp;
63         kqswnal_tx_t      *ktx;
64         ptl_hdr_t         *hdr;
65         int                index = pcfg->pcfg_count;
66         int                rc = -ENOENT;
67
68         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
69
70         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
71                 if (index-- != 0)
72                         continue;
73
74                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
75                 hdr = (ptl_hdr_t *)ktx->ktx_buffer;
76
77                 pcfg->pcfg_pbuf1 = (char *)ktx;
78                 pcfg->pcfg_count = le32_to_cpu(hdr->type);
79                 pcfg->pcfg_size  = le32_to_cpu(hdr->payload_length);
80                 pcfg->pcfg_nid   = le64_to_cpu(hdr->dest_nid);
81                 pcfg->pcfg_nid2  = ktx->ktx_nid;
82                 pcfg->pcfg_misc  = ktx->ktx_launcher;
83                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
84                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
85                                   (ktx->ktx_state << 2);
86                 rc = 0;
87                 break;
88         }
89         
90         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
91         return (rc);
92 }
93
94 int
95 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
96 {
97         LASSERT (pcfg != NULL);
98         
99         switch (pcfg->pcfg_command) {
100         case NAL_CMD_GET_TXDESC:
101                 return (kqswnal_get_tx_desc (pcfg));
102
103         case NAL_CMD_REGISTER_MYNID:
104                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
105                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
106                         kqswnal_data.kqn_nid_offset);
107                 kqswnal_data.kqn_nid_offset =
108                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
109                 kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
110                 return (0);
111                 
112         default:
113                 return (-EINVAL);
114         }
115 }
116
117 static void
118 kqswnal_shutdown(nal_t *nal)
119 {
120         unsigned long flags;
121         kqswnal_tx_t *ktx;
122         kqswnal_rx_t *krx;
123         int           do_lib_fini = 0;
124
125         /* NB The first ref was this module! */
126         if (nal->nal_refct != 0) {
127                 PORTAL_MODULE_UNUSE;
128                 return;
129         }
130
131         CDEBUG (D_NET, "shutdown\n");
132         LASSERT (nal == &kqswnal_api);
133
134         switch (kqswnal_data.kqn_init)
135         {
136         default:
137                 LASSERT (0);
138
139         case KQN_INIT_ALL:
140                 libcfs_nal_cmd_unregister(QSWNAL);
141                 /* fall through */
142
143         case KQN_INIT_LIB:
144                 do_lib_fini = 1;
145                 /* fall through */
146
147         case KQN_INIT_DATA:
148                 break;
149
150         case KQN_INIT_NOTHING:
151                 return;
152         }
153
154         /**********************************************************************/
155         /* Tell router we're shutting down.  Any router calls my threads
156          * make will now fail immediately and the router will stop calling
157          * into me. */
158         kpr_shutdown (&kqswnal_data.kqn_router);
159         
160         /**********************************************************************/
161         /* Signal the start of shutdown... */
162         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
163         kqswnal_data.kqn_shuttingdown = 1;
164         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
165
166         wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
167
168         /**********************************************************************/
169         /* wait for sends that have allocated a tx desc to launch or give up */
170         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
171                 CDEBUG(D_NET, "waiting for %d pending sends\n",
172                        atomic_read (&kqswnal_data.kqn_pending_txs));
173                 set_current_state (TASK_UNINTERRUPTIBLE);
174                 schedule_timeout (HZ);
175         }
176
177         /**********************************************************************/
178         /* close elan comms */
179 #if MULTIRAIL_EKC
180         /* Shut down receivers first; rx callbacks might try sending... */
181         if (kqswnal_data.kqn_eprx_small != NULL)
182                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
183
184         if (kqswnal_data.kqn_eprx_large != NULL)
185                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
186
187         /* NB ep_free_rcvr() returns only after we've freed off all receive
188          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
189          * means we must have completed any messages we passed to
190          * lib_parse() or kpr_fwd_start(). */
191
192         if (kqswnal_data.kqn_eptx != NULL)
193                 ep_free_xmtr (kqswnal_data.kqn_eptx);
194
195         /* NB ep_free_xmtr() returns only after all outstanding transmits
196          * have called their callback... */
197         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
198 #else
199         /* "Old" EKC just pretends to shutdown cleanly but actually
200          * provides no guarantees */
201         if (kqswnal_data.kqn_eprx_small != NULL)
202                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
203
204         if (kqswnal_data.kqn_eprx_large != NULL)
205                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
206
207         /* wait for transmits to complete */
208         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
209                 CWARN("waiting for active transmits to complete\n");
210                 set_current_state(TASK_UNINTERRUPTIBLE);
211                 schedule_timeout(HZ);
212         }
213
214         if (kqswnal_data.kqn_eptx != NULL)
215                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
216 #endif
217         /**********************************************************************/
218         /* flag threads to terminate, wake them and wait for them to die */
219         kqswnal_data.kqn_shuttingdown = 2;
220         wake_up_all (&kqswnal_data.kqn_sched_waitq);
221
222         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
223                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
224                        atomic_read (&kqswnal_data.kqn_nthreads));
225                 set_current_state (TASK_UNINTERRUPTIBLE);
226                 schedule_timeout (HZ);
227         }
228
229         /**********************************************************************/
230         /* No more threads.  No more portals, router or comms callbacks!
231          * I control the horizontals and the verticals...
232          */
233
234 #if MULTIRAIL_EKC
235         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
236         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
237         LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
238 #endif
239
240         /**********************************************************************/
241         /* Complete any blocked forwarding packets, with error
242          */
243
244         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
245         {
246                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
247                                                   kpr_fwd_desc_t, kprfd_list);
248                 list_del (&fwd->kprfd_list);
249                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
250         }
251
252         /**********************************************************************/
253         /* finalise router and portals lib */
254
255         kpr_deregister (&kqswnal_data.kqn_router);
256
257         if (do_lib_fini)
258                 lib_fini (&kqswnal_lib);
259
260         /**********************************************************************/
261         /* Unmap message buffers and free all descriptors and buffers
262          */
263
264 #if MULTIRAIL_EKC
265         /* FTTB, we need to unmap any remaining mapped memory.  When
266          * ep_dvma_release() get fixed (and releases any mappings in the
267          * region), we can delete all the code from here -------->  */
268
269         for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
270                 /* If ktx has a buffer, it got mapped; unmap now.  NB only
271                  * the pre-mapped stuff is still mapped since all tx descs
272                  * must be idle */
273
274                 if (ktx->ktx_buffer != NULL)
275                         ep_dvma_unload(kqswnal_data.kqn_ep,
276                                        kqswnal_data.kqn_ep_tx_nmh,
277                                        &ktx->ktx_ebuffer);
278         }
279
280         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
281                 /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
282                  * NB subsequent pages get merged */
283
284                 if (krx->krx_kiov[0].kiov_page != NULL)
285                         ep_dvma_unload(kqswnal_data.kqn_ep,
286                                        kqswnal_data.kqn_ep_rx_nmh,
287                                        &krx->krx_elanbuffer);
288         }
289         /* <----------- to here */
290
291         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
292                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
293
294         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
295                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
296 #else
297         if (kqswnal_data.kqn_eprxdmahandle != NULL)
298         {
299                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
300                                   kqswnal_data.kqn_eprxdmahandle, 0,
301                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
302                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
303
304                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
305                                   kqswnal_data.kqn_eprxdmahandle);
306         }
307
308         if (kqswnal_data.kqn_eptxdmahandle != NULL)
309         {
310                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
311                                   kqswnal_data.kqn_eptxdmahandle, 0,
312                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
313                                                       KQSW_NNBLK_TXMSGS));
314
315                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
316                                   kqswnal_data.kqn_eptxdmahandle);
317         }
318 #endif
319
320         while (kqswnal_data.kqn_txds != NULL) {
321                 ktx = kqswnal_data.kqn_txds;
322
323                 if (ktx->ktx_buffer != NULL)
324                         PORTAL_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
325
326                 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
327                 PORTAL_FREE(ktx, sizeof(*ktx));
328         }
329
330         while (kqswnal_data.kqn_rxds != NULL) {
331                 int           i;
332
333                 krx = kqswnal_data.kqn_rxds;
334                 for (i = 0; i < krx->krx_npages; i++)
335                         if (krx->krx_kiov[i].kiov_page != NULL)
336                                 __free_page (krx->krx_kiov[i].kiov_page);
337
338                 kqswnal_data.kqn_rxds = krx->krx_alloclist;
339                 PORTAL_FREE(krx, sizeof (*krx));
340         }
341
342         /* resets flags, pointers to NULL etc */
343         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
344
345         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
346
347         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
348                 atomic_read(&portal_kmemory));
349 }
350
351 static int
352 kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
353                  ptl_ni_limits_t *requested_limits, 
354                  ptl_ni_limits_t *actual_limits)
355 {
356 #if MULTIRAIL_EKC
357         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
358 #else
359         ELAN3_DMA_REQUEST dmareq;
360 #endif
361         int               rc;
362         int               i;
363         kqswnal_rx_t     *krx;
364         kqswnal_tx_t     *ktx;
365         int               elan_page_idx;
366         ptl_process_id_t  my_process_id;
367         int               pkmem = atomic_read(&portal_kmemory);
368
369         LASSERT (nal == &kqswnal_api);
370
371         if (nal->nal_refct != 0) {
372                 if (actual_limits != NULL)
373                         *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
374                 /* This module got the first ref */
375                 PORTAL_MODULE_USE;
376                 return (PTL_OK);
377         }
378
379         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
380
381         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
382
383         /* ensure all pointers NULL etc */
384         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
385
386         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
387         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
388         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
389         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
390         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
391         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
392
393         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
394         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
395         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
396
397         spin_lock_init (&kqswnal_data.kqn_sched_lock);
398         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
399
400         /* Leave kqn_rpc_success zeroed */
401 #if MULTIRAIL_EKC
402         kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
403 #else
404         kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
405 #endif
406
407         /* pointers/lists/locks initialised */
408         kqswnal_data.kqn_init = KQN_INIT_DATA;
409         
410 #if MULTIRAIL_EKC
411         kqswnal_data.kqn_ep = ep_system();
412         if (kqswnal_data.kqn_ep == NULL) {
413                 CERROR("Can't initialise EKC\n");
414                 kqswnal_shutdown(nal);
415                 return (PTL_IFACE_INVALID);
416         }
417
418         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
419                 CERROR("Can't get elan ID\n");
420                 kqswnal_shutdown(nal);
421                 return (PTL_IFACE_INVALID);
422         }
423 #else
424         /**********************************************************************/
425         /* Find the first Elan device */
426
427         kqswnal_data.kqn_ep = ep_device (0);
428         if (kqswnal_data.kqn_ep == NULL)
429         {
430                 CERROR ("Can't get elan device 0\n");
431                 kqswnal_shutdown(nal);
432                 return (PTL_IFACE_INVALID);
433         }
434 #endif
435
436         kqswnal_data.kqn_nid_offset = 0;
437         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
438         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
439         
440         /**********************************************************************/
441         /* Get the transmitter */
442
443         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
444         if (kqswnal_data.kqn_eptx == NULL)
445         {
446                 CERROR ("Can't allocate transmitter\n");
447                 kqswnal_shutdown (nal);
448                 return (PTL_NO_SPACE);
449         }
450
451         /**********************************************************************/
452         /* Get the receivers */
453
454         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
455                                                      EP_MSG_SVC_PORTALS_SMALL,
456                                                      KQSW_EP_ENVELOPES_SMALL);
457         if (kqswnal_data.kqn_eprx_small == NULL)
458         {
459                 CERROR ("Can't install small msg receiver\n");
460                 kqswnal_shutdown (nal);
461                 return (PTL_NO_SPACE);
462         }
463
464         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
465                                                      EP_MSG_SVC_PORTALS_LARGE,
466                                                      KQSW_EP_ENVELOPES_LARGE);
467         if (kqswnal_data.kqn_eprx_large == NULL)
468         {
469                 CERROR ("Can't install large msg receiver\n");
470                 kqswnal_shutdown (nal);
471                 return (PTL_NO_SPACE);
472         }
473
474         /**********************************************************************/
475         /* Reserve Elan address space for transmit descriptors NB we may
476          * either send the contents of associated buffers immediately, or
477          * map them for the peer to suck/blow... */
478 #if MULTIRAIL_EKC
479         kqswnal_data.kqn_ep_tx_nmh = 
480                 ep_dvma_reserve(kqswnal_data.kqn_ep,
481                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
482                                 EP_PERM_WRITE);
483         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
484                 CERROR("Can't reserve tx dma space\n");
485                 kqswnal_shutdown(nal);
486                 return (PTL_NO_SPACE);
487         }
488 #else
489         dmareq.Waitfn   = DDI_DMA_SLEEP;
490         dmareq.ElanAddr = (E3_Addr) 0;
491         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
492         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
493
494         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
495                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
496                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
497         if (rc != DDI_SUCCESS)
498         {
499                 CERROR ("Can't reserve rx dma space\n");
500                 kqswnal_shutdown (nal);
501                 return (PTL_NO_SPACE);
502         }
503 #endif
504         /**********************************************************************/
505         /* Reserve Elan address space for receive buffers */
506 #if MULTIRAIL_EKC
507         kqswnal_data.kqn_ep_rx_nmh =
508                 ep_dvma_reserve(kqswnal_data.kqn_ep,
509                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
510                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
511                                 EP_PERM_WRITE);
512         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
513                 CERROR("Can't reserve rx dma space\n");
514                 kqswnal_shutdown(nal);
515                 return (PTL_NO_SPACE);
516         }
517 #else
518         dmareq.Waitfn   = DDI_DMA_SLEEP;
519         dmareq.ElanAddr = (E3_Addr) 0;
520         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
521         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
522
523         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
524                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
525                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
526                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
527         if (rc != DDI_SUCCESS)
528         {
529                 CERROR ("Can't reserve rx dma space\n");
530                 kqswnal_shutdown (nal);
531                 return (PTL_NO_SPACE);
532         }
533 #endif
534         /**********************************************************************/
535         /* Allocate/Initialise transmit descriptors */
536
537         kqswnal_data.kqn_txds = NULL;
538         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
539         {
540                 int           premapped_pages;
541                 int           basepage = i * KQSW_NTXMSGPAGES;
542
543                 PORTAL_ALLOC (ktx, sizeof(*ktx));
544                 if (ktx == NULL) {
545                         kqswnal_shutdown (nal);
546                         return (PTL_NO_SPACE);
547                 }
548
549                 memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
550                 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
551                 kqswnal_data.kqn_txds = ktx;
552
553                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
554                 if (ktx->ktx_buffer == NULL)
555                 {
556                         kqswnal_shutdown (nal);
557                         return (PTL_NO_SPACE);
558                 }
559
560                 /* Map pre-allocated buffer NOW, to save latency on transmit */
561                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
562                                                         KQSW_TX_BUFFER_SIZE);
563 #if MULTIRAIL_EKC
564                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
565                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
566                              kqswnal_data.kqn_ep_tx_nmh, basepage,
567                              &all_rails, &ktx->ktx_ebuffer);
568 #else
569                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
570                                        kqswnal_data.kqn_eptxdmahandle,
571                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
572                                        basepage, &ktx->ktx_ebuffer);
573 #endif
574                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
575                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
576
577                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
578
579                 ktx->ktx_state = KTX_IDLE;
580 #if MULTIRAIL_EKC
581                 ktx->ktx_rail = -1;             /* unset rail */
582 #endif
583                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
584                 list_add_tail (&ktx->ktx_list, 
585                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
586                                                  &kqswnal_data.kqn_idletxds);
587         }
588
589         /**********************************************************************/
590         /* Allocate/Initialise receive descriptors */
591         kqswnal_data.kqn_rxds = NULL;
592         elan_page_idx = 0;
593         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
594         {
595 #if MULTIRAIL_EKC
596                 EP_NMD        elanbuffer;
597 #else
598                 E3_Addr       elanbuffer;
599 #endif
600                 int           j;
601
602                 PORTAL_ALLOC(krx, sizeof(*krx));
603                 if (krx == NULL) {
604                         kqswnal_shutdown(nal);
605                         return (PTL_NO_SPACE);
606                 }
607
608                 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
609                 krx->krx_alloclist = kqswnal_data.kqn_rxds;
610                 kqswnal_data.kqn_rxds = krx;
611
612                 if (i < KQSW_NRXMSGS_SMALL)
613                 {
614                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
615                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
616                 }
617                 else
618                 {
619                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
620                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
621                 }
622
623                 LASSERT (krx->krx_npages > 0);
624                 for (j = 0; j < krx->krx_npages; j++)
625                 {
626                         struct page *page = alloc_page(GFP_KERNEL);
627                         
628                         if (page == NULL) {
629                                 kqswnal_shutdown (nal);
630                                 return (PTL_NO_SPACE);
631                         }
632
633                         krx->krx_kiov[j].kiov_page = page;
634                         LASSERT(page_address(page) != NULL);
635
636 #if MULTIRAIL_EKC
637                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
638                                      page_address(page),
639                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
640                                      elan_page_idx, &all_rails, &elanbuffer);
641                         
642                         if (j == 0) {
643                                 krx->krx_elanbuffer = elanbuffer;
644                         } else {
645                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
646                                                   &krx->krx_elanbuffer, 
647                                                   &elanbuffer);
648                                 /* NB contiguous mapping */
649                                 LASSERT(rc);
650                         }
651 #else
652                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
653                                               kqswnal_data.kqn_eprxdmahandle,
654                                               page_address(page),
655                                               PAGE_SIZE, elan_page_idx,
656                                               &elanbuffer);
657                         if (j == 0)
658                                 krx->krx_elanbuffer = elanbuffer;
659
660                         /* NB contiguous mapping */
661                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
662 #endif
663                         elan_page_idx++;
664
665                 }
666         }
667         LASSERT (elan_page_idx ==
668                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
669                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
670
671         /**********************************************************************/
672         /* Network interface ready to initialise */
673
674         my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
675         my_process_id.pid = requested_pid;
676
677         rc = lib_init(&kqswnal_lib, nal, my_process_id,
678                       requested_limits, actual_limits);
679         if (rc != PTL_OK)
680         {
681                 CERROR ("lib_init failed %d\n", rc);
682                 kqswnal_shutdown (nal);
683                 return (rc);
684         }
685
686         kqswnal_data.kqn_init = KQN_INIT_LIB;
687
688         /**********************************************************************/
689         /* Queue receives, now that it's OK to run their completion callbacks */
690
691         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
692                 /* NB this enqueue can allocate/sleep (attr == 0) */
693                 krx->krx_state = KRX_POSTED;
694 #if MULTIRAIL_EKC
695                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
696                                       &krx->krx_elanbuffer, 0);
697 #else
698                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
699                                       krx->krx_elanbuffer,
700                                       krx->krx_npages * PAGE_SIZE, 0);
701 #endif
702                 if (rc != EP_SUCCESS)
703                 {
704                         CERROR ("failed ep_queue_receive %d\n", rc);
705                         kqswnal_shutdown (nal);
706                         return (PTL_FAIL);
707                 }
708         }
709
710         /**********************************************************************/
711         /* Spawn scheduling threads */
712         for (i = 0; i < num_online_cpus(); i++) {
713                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
714                 if (rc != 0)
715                 {
716                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
717                         kqswnal_shutdown (nal);
718                         return (PTL_FAIL);
719                 }
720         }
721
722         /**********************************************************************/
723         /* Connect to the router */
724         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
725         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
726
727         rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
728         if (rc != 0) {
729                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
730                 kqswnal_shutdown (nal);
731                 return (PTL_FAIL);
732         }
733
734         kqswnal_data.kqn_init = KQN_INIT_ALL;
735
736         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
737                "(Routing %s, initial mem %d)\n", 
738                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
739                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
740                pkmem);
741
742         return (PTL_OK);
743 }
744
745 void __exit
746 kqswnal_finalise (void)
747 {
748 #if CONFIG_SYSCTL
749         if (kqswnal_tunables.kqn_sysctl != NULL)
750                 unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
751 #endif
752         PtlNIFini(kqswnal_ni);
753
754         ptl_unregister_nal(QSWNAL);
755 }
756
757 static int __init
758 kqswnal_initialise (void)
759 {
760         int   rc;
761
762         kqswnal_api.nal_ni_init = kqswnal_startup;
763         kqswnal_api.nal_ni_fini = kqswnal_shutdown;
764
765         /* Initialise dynamic tunables to defaults once only */
766         kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
767         kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
768         
769         rc = ptl_register_nal(QSWNAL, &kqswnal_api);
770         if (rc != PTL_OK) {
771                 CERROR("Can't register QSWNAL: %d\n", rc);
772                 return (-ENOMEM);               /* or something... */
773         }
774
775         /* Pure gateways, and the workaround for 'EKC blocks forever until
776          * the service is active' want the NAL started up at module load
777          * time... */
778         rc = PtlNIInit(QSWNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kqswnal_ni);
779         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
780                 ptl_unregister_nal(QSWNAL);
781                 return (-ENODEV);
782         }
783
784 #if CONFIG_SYSCTL
785         /* Press on regardless even if registering sysctl doesn't work */
786         kqswnal_tunables.kqn_sysctl = 
787                 register_sysctl_table (kqswnal_top_ctl_table, 0);
788 #endif
789         return (0);
790 }
791
792 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
793 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
794 MODULE_LICENSE("GPL");
795
796 module_init (kqswnal_initialise);
797 module_exit (kqswnal_finalise);