Whamcloud - gitweb
land b1_4_bgl on HEAD (20050404_1913)
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002-2004 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * This file is part of Portals, http://www.lustre.org
6  *
7  * Portals is free software; you can redistribute it and/or
8  * modify it under the terms of version 2 of the GNU General Public
9  * License as published by the Free Software Foundation.
10  *
11  * Portals is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Portals; if not, write to the Free Software
18  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19  *
20  */
21
22 #include "qswnal.h"
23
24 nal_t                   kqswnal_api;
25 kqswnal_data_t          kqswnal_data;
26 ptl_handle_ni_t         kqswnal_ni;
27 kqswnal_tunables_t      kqswnal_tunables;
28
29 kpr_nal_interface_t kqswnal_router_interface = {
30         kprni_nalid:    QSWNAL,
31         kprni_arg:      NULL,
32         kprni_fwd:      kqswnal_fwd_packet,
33         kprni_notify:   NULL,                   /* we're connectionless */
34 };
35
36 #if CONFIG_SYSCTL
37 #define QSWNAL_SYSCTL  201
38
39 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
40 #define QSWNAL_SYSCTL_OPTIMIZED_PUTS     2
41
42 static ctl_table kqswnal_ctl_table[] = {
43         {QSWNAL_SYSCTL_OPTIMIZED_PUTS, "optimized_puts",
44          &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
45          0644, NULL, &proc_dointvec},
46         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
47          &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
48          0644, NULL, &proc_dointvec},
49         {0}
50 };
51
52 static ctl_table kqswnal_top_ctl_table[] = {
53         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
54         {0}
55 };
56 #endif
57
58 int
59 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
60 {
61         unsigned long      flags;
62         struct list_head  *tmp;
63         kqswnal_tx_t      *ktx;
64         ptl_hdr_t         *hdr;
65         int                index = pcfg->pcfg_count;
66         int                rc = -ENOENT;
67
68         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
69
70         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
71                 if (index-- != 0)
72                         continue;
73
74                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
75                 hdr = (ptl_hdr_t *)ktx->ktx_buffer;
76
77                 memcpy(pcfg->pcfg_pbuf, ktx,
78                        MIN(sizeof(*ktx), pcfg->pcfg_plen1));
79                 pcfg->pcfg_count = le32_to_cpu(hdr->type);
80                 pcfg->pcfg_size  = le32_to_cpu(hdr->payload_length);
81                 pcfg->pcfg_nid   = le64_to_cpu(hdr->dest_nid);
82                 pcfg->pcfg_nid2  = ktx->ktx_nid;
83                 pcfg->pcfg_misc  = ktx->ktx_launcher;
84                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
85                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
86                                   (ktx->ktx_state << 2);
87                 rc = 0;
88                 break;
89         }
90         
91         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
92         return (rc);
93 }
94
95 int
96 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
97 {
98         LASSERT (pcfg != NULL);
99         
100         switch (pcfg->pcfg_command) {
101         case NAL_CMD_GET_TXDESC:
102                 return (kqswnal_get_tx_desc (pcfg));
103
104         case NAL_CMD_REGISTER_MYNID:
105                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
106                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
107                         kqswnal_data.kqn_nid_offset);
108                 kqswnal_data.kqn_nid_offset =
109                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
110                 kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
111                 return (0);
112                 
113         default:
114                 return (-EINVAL);
115         }
116 }
117
118 static void
119 kqswnal_shutdown(nal_t *nal)
120 {
121         unsigned long flags;
122         kqswnal_tx_t *ktx;
123         kqswnal_rx_t *krx;
124         int           do_lib_fini = 0;
125
126         /* NB The first ref was this module! */
127         if (nal->nal_refct != 0) {
128                 PORTAL_MODULE_UNUSE;
129                 return;
130         }
131
132         CDEBUG (D_NET, "shutdown\n");
133         LASSERT (nal == &kqswnal_api);
134
135         switch (kqswnal_data.kqn_init)
136         {
137         default:
138                 LASSERT (0);
139
140         case KQN_INIT_ALL:
141                 libcfs_nal_cmd_unregister(QSWNAL);
142                 /* fall through */
143
144         case KQN_INIT_LIB:
145                 do_lib_fini = 1;
146                 /* fall through */
147
148         case KQN_INIT_DATA:
149                 break;
150
151         case KQN_INIT_NOTHING:
152                 return;
153         }
154
155         /**********************************************************************/
156         /* Tell router we're shutting down.  Any router calls my threads
157          * make will now fail immediately and the router will stop calling
158          * into me. */
159         kpr_shutdown (&kqswnal_data.kqn_router);
160         
161         /**********************************************************************/
162         /* Signal the start of shutdown... */
163         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
164         kqswnal_data.kqn_shuttingdown = 1;
165         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
166
167         wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
168
169         /**********************************************************************/
170         /* wait for sends that have allocated a tx desc to launch or give up */
171         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
172                 CDEBUG(D_NET, "waiting for %d pending sends\n",
173                        atomic_read (&kqswnal_data.kqn_pending_txs));
174                 set_current_state (TASK_UNINTERRUPTIBLE);
175                 schedule_timeout (HZ);
176         }
177
178         /**********************************************************************/
179         /* close elan comms */
180 #if MULTIRAIL_EKC
181         /* Shut down receivers first; rx callbacks might try sending... */
182         if (kqswnal_data.kqn_eprx_small != NULL)
183                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
184
185         if (kqswnal_data.kqn_eprx_large != NULL)
186                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
187
188         /* NB ep_free_rcvr() returns only after we've freed off all receive
189          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
190          * means we must have completed any messages we passed to
191          * lib_parse() or kpr_fwd_start(). */
192
193         if (kqswnal_data.kqn_eptx != NULL)
194                 ep_free_xmtr (kqswnal_data.kqn_eptx);
195
196         /* NB ep_free_xmtr() returns only after all outstanding transmits
197          * have called their callback... */
198         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
199 #else
200         /* "Old" EKC just pretends to shutdown cleanly but actually
201          * provides no guarantees */
202         if (kqswnal_data.kqn_eprx_small != NULL)
203                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
204
205         if (kqswnal_data.kqn_eprx_large != NULL)
206                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
207
208         /* wait for transmits to complete */
209         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
210                 CWARN("waiting for active transmits to complete\n");
211                 set_current_state(TASK_UNINTERRUPTIBLE);
212                 schedule_timeout(HZ);
213         }
214
215         if (kqswnal_data.kqn_eptx != NULL)
216                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
217 #endif
218         /**********************************************************************/
219         /* flag threads to terminate, wake them and wait for them to die */
220         kqswnal_data.kqn_shuttingdown = 2;
221         wake_up_all (&kqswnal_data.kqn_sched_waitq);
222
223         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
224                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
225                        atomic_read (&kqswnal_data.kqn_nthreads));
226                 set_current_state (TASK_UNINTERRUPTIBLE);
227                 schedule_timeout (HZ);
228         }
229
230         /**********************************************************************/
231         /* No more threads.  No more portals, router or comms callbacks!
232          * I control the horizontals and the verticals...
233          */
234
235 #if MULTIRAIL_EKC
236         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
237         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
238         LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
239 #endif
240
241         /**********************************************************************/
242         /* Complete any blocked forwarding packets, with error
243          */
244
245         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
246         {
247                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
248                                                   kpr_fwd_desc_t, kprfd_list);
249                 list_del (&fwd->kprfd_list);
250                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
251         }
252
253         /**********************************************************************/
254         /* finalise router and portals lib */
255
256         kpr_deregister (&kqswnal_data.kqn_router);
257
258         if (do_lib_fini)
259                 lib_fini (&kqswnal_lib);
260
261         /**********************************************************************/
262         /* Unmap message buffers and free all descriptors and buffers
263          */
264
265 #if MULTIRAIL_EKC
266         /* FTTB, we need to unmap any remaining mapped memory.  When
267          * ep_dvma_release() get fixed (and releases any mappings in the
268          * region), we can delete all the code from here -------->  */
269
270         for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
271                 /* If ktx has a buffer, it got mapped; unmap now.  NB only
272                  * the pre-mapped stuff is still mapped since all tx descs
273                  * must be idle */
274
275                 if (ktx->ktx_buffer != NULL)
276                         ep_dvma_unload(kqswnal_data.kqn_ep,
277                                        kqswnal_data.kqn_ep_tx_nmh,
278                                        &ktx->ktx_ebuffer);
279         }
280
281         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
282                 /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
283                  * NB subsequent pages get merged */
284
285                 if (krx->krx_kiov[0].kiov_page != NULL)
286                         ep_dvma_unload(kqswnal_data.kqn_ep,
287                                        kqswnal_data.kqn_ep_rx_nmh,
288                                        &krx->krx_elanbuffer);
289         }
290         /* <----------- to here */
291
292         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
293                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
294
295         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
296                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
297 #else
298         if (kqswnal_data.kqn_eprxdmahandle != NULL)
299         {
300                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
301                                   kqswnal_data.kqn_eprxdmahandle, 0,
302                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
303                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
304
305                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
306                                   kqswnal_data.kqn_eprxdmahandle);
307         }
308
309         if (kqswnal_data.kqn_eptxdmahandle != NULL)
310         {
311                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
312                                   kqswnal_data.kqn_eptxdmahandle, 0,
313                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
314                                                       KQSW_NNBLK_TXMSGS));
315
316                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
317                                   kqswnal_data.kqn_eptxdmahandle);
318         }
319 #endif
320
321         while (kqswnal_data.kqn_txds != NULL) {
322                 ktx = kqswnal_data.kqn_txds;
323
324                 if (ktx->ktx_buffer != NULL)
325                         PORTAL_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
326
327                 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
328                 PORTAL_FREE(ktx, sizeof(*ktx));
329         }
330
331         while (kqswnal_data.kqn_rxds != NULL) {
332                 int           i;
333
334                 krx = kqswnal_data.kqn_rxds;
335                 for (i = 0; i < krx->krx_npages; i++)
336                         if (krx->krx_kiov[i].kiov_page != NULL)
337                                 __free_page (krx->krx_kiov[i].kiov_page);
338
339                 kqswnal_data.kqn_rxds = krx->krx_alloclist;
340                 PORTAL_FREE(krx, sizeof (*krx));
341         }
342
343         /* resets flags, pointers to NULL etc */
344         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
345
346         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
347
348         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
349                 atomic_read(&portal_kmemory));
350 }
351
352 static int
353 kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
354                  ptl_ni_limits_t *requested_limits, 
355                  ptl_ni_limits_t *actual_limits)
356 {
357 #if MULTIRAIL_EKC
358         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
359 #else
360         ELAN3_DMA_REQUEST dmareq;
361 #endif
362         int               rc;
363         int               i;
364         kqswnal_rx_t     *krx;
365         kqswnal_tx_t     *ktx;
366         int               elan_page_idx;
367         ptl_process_id_t  my_process_id;
368         int               pkmem = atomic_read(&portal_kmemory);
369
370         LASSERT (nal == &kqswnal_api);
371
372         if (nal->nal_refct != 0) {
373                 if (actual_limits != NULL)
374                         *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
375                 /* This module got the first ref */
376                 PORTAL_MODULE_USE;
377                 return (PTL_OK);
378         }
379
380         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
381
382         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
383
384         /* ensure all pointers NULL etc */
385         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
386
387         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
388         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
389         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
390         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
391         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
392         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
393
394         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
395         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
396         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
397
398         spin_lock_init (&kqswnal_data.kqn_sched_lock);
399         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
400
401         /* Leave kqn_rpc_success zeroed */
402 #if MULTIRAIL_EKC
403         kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
404 #else
405         kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
406 #endif
407
408         /* pointers/lists/locks initialised */
409         kqswnal_data.kqn_init = KQN_INIT_DATA;
410         
411 #if MULTIRAIL_EKC
412         kqswnal_data.kqn_ep = ep_system();
413         if (kqswnal_data.kqn_ep == NULL) {
414                 CERROR("Can't initialise EKC\n");
415                 kqswnal_shutdown(nal);
416                 return (PTL_IFACE_INVALID);
417         }
418
419         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
420                 CERROR("Can't get elan ID\n");
421                 kqswnal_shutdown(nal);
422                 return (PTL_IFACE_INVALID);
423         }
424 #else
425         /**********************************************************************/
426         /* Find the first Elan device */
427
428         kqswnal_data.kqn_ep = ep_device (0);
429         if (kqswnal_data.kqn_ep == NULL)
430         {
431                 CERROR ("Can't get elan device 0\n");
432                 kqswnal_shutdown(nal);
433                 return (PTL_IFACE_INVALID);
434         }
435 #endif
436
437         kqswnal_data.kqn_nid_offset = 0;
438         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
439         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
440         
441         /**********************************************************************/
442         /* Get the transmitter */
443
444         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
445         if (kqswnal_data.kqn_eptx == NULL)
446         {
447                 CERROR ("Can't allocate transmitter\n");
448                 kqswnal_shutdown (nal);
449                 return (PTL_NO_SPACE);
450         }
451
452         /**********************************************************************/
453         /* Get the receivers */
454
455         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
456                                                      EP_MSG_SVC_PORTALS_SMALL,
457                                                      KQSW_EP_ENVELOPES_SMALL);
458         if (kqswnal_data.kqn_eprx_small == NULL)
459         {
460                 CERROR ("Can't install small msg receiver\n");
461                 kqswnal_shutdown (nal);
462                 return (PTL_NO_SPACE);
463         }
464
465         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
466                                                      EP_MSG_SVC_PORTALS_LARGE,
467                                                      KQSW_EP_ENVELOPES_LARGE);
468         if (kqswnal_data.kqn_eprx_large == NULL)
469         {
470                 CERROR ("Can't install large msg receiver\n");
471                 kqswnal_shutdown (nal);
472                 return (PTL_NO_SPACE);
473         }
474
475         /**********************************************************************/
476         /* Reserve Elan address space for transmit descriptors NB we may
477          * either send the contents of associated buffers immediately, or
478          * map them for the peer to suck/blow... */
479 #if MULTIRAIL_EKC
480         kqswnal_data.kqn_ep_tx_nmh = 
481                 ep_dvma_reserve(kqswnal_data.kqn_ep,
482                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
483                                 EP_PERM_WRITE);
484         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
485                 CERROR("Can't reserve tx dma space\n");
486                 kqswnal_shutdown(nal);
487                 return (PTL_NO_SPACE);
488         }
489 #else
490         dmareq.Waitfn   = DDI_DMA_SLEEP;
491         dmareq.ElanAddr = (E3_Addr) 0;
492         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
493         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
494
495         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
496                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
497                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
498         if (rc != DDI_SUCCESS)
499         {
500                 CERROR ("Can't reserve rx dma space\n");
501                 kqswnal_shutdown (nal);
502                 return (PTL_NO_SPACE);
503         }
504 #endif
505         /**********************************************************************/
506         /* Reserve Elan address space for receive buffers */
507 #if MULTIRAIL_EKC
508         kqswnal_data.kqn_ep_rx_nmh =
509                 ep_dvma_reserve(kqswnal_data.kqn_ep,
510                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
511                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
512                                 EP_PERM_WRITE);
513         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
514                 CERROR("Can't reserve rx dma space\n");
515                 kqswnal_shutdown(nal);
516                 return (PTL_NO_SPACE);
517         }
518 #else
519         dmareq.Waitfn   = DDI_DMA_SLEEP;
520         dmareq.ElanAddr = (E3_Addr) 0;
521         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
522         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
523
524         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
525                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
526                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
527                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
528         if (rc != DDI_SUCCESS)
529         {
530                 CERROR ("Can't reserve rx dma space\n");
531                 kqswnal_shutdown (nal);
532                 return (PTL_NO_SPACE);
533         }
534 #endif
535         /**********************************************************************/
536         /* Allocate/Initialise transmit descriptors */
537
538         kqswnal_data.kqn_txds = NULL;
539         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
540         {
541                 int           premapped_pages;
542                 int           basepage = i * KQSW_NTXMSGPAGES;
543
544                 PORTAL_ALLOC (ktx, sizeof(*ktx));
545                 if (ktx == NULL) {
546                         kqswnal_shutdown (nal);
547                         return (PTL_NO_SPACE);
548                 }
549
550                 memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
551                 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
552                 kqswnal_data.kqn_txds = ktx;
553
554                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
555                 if (ktx->ktx_buffer == NULL)
556                 {
557                         kqswnal_shutdown (nal);
558                         return (PTL_NO_SPACE);
559                 }
560
561                 /* Map pre-allocated buffer NOW, to save latency on transmit */
562                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
563                                                         KQSW_TX_BUFFER_SIZE);
564 #if MULTIRAIL_EKC
565                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
566                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
567                              kqswnal_data.kqn_ep_tx_nmh, basepage,
568                              &all_rails, &ktx->ktx_ebuffer);
569 #else
570                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
571                                        kqswnal_data.kqn_eptxdmahandle,
572                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
573                                        basepage, &ktx->ktx_ebuffer);
574 #endif
575                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
576                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
577
578                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
579
580                 ktx->ktx_state = KTX_IDLE;
581 #if MULTIRAIL_EKC
582                 ktx->ktx_rail = -1;             /* unset rail */
583 #endif
584                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
585                 list_add_tail (&ktx->ktx_list, 
586                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
587                                                  &kqswnal_data.kqn_idletxds);
588         }
589
590         /**********************************************************************/
591         /* Allocate/Initialise receive descriptors */
592         kqswnal_data.kqn_rxds = NULL;
593         elan_page_idx = 0;
594         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
595         {
596 #if MULTIRAIL_EKC
597                 EP_NMD        elanbuffer;
598 #else
599                 E3_Addr       elanbuffer;
600 #endif
601                 int           j;
602
603                 PORTAL_ALLOC(krx, sizeof(*krx));
604                 if (krx == NULL) {
605                         kqswnal_shutdown(nal);
606                         return (PTL_NO_SPACE);
607                 }
608
609                 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
610                 krx->krx_alloclist = kqswnal_data.kqn_rxds;
611                 kqswnal_data.kqn_rxds = krx;
612
613                 if (i < KQSW_NRXMSGS_SMALL)
614                 {
615                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
616                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
617                 }
618                 else
619                 {
620                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
621                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
622                 }
623
624                 LASSERT (krx->krx_npages > 0);
625                 for (j = 0; j < krx->krx_npages; j++)
626                 {
627                         struct page *page = alloc_page(GFP_KERNEL);
628                         
629                         if (page == NULL) {
630                                 kqswnal_shutdown (nal);
631                                 return (PTL_NO_SPACE);
632                         }
633
634                         krx->krx_kiov[j].kiov_page = page;
635                         LASSERT(page_address(page) != NULL);
636
637 #if MULTIRAIL_EKC
638                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
639                                      page_address(page),
640                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
641                                      elan_page_idx, &all_rails, &elanbuffer);
642                         
643                         if (j == 0) {
644                                 krx->krx_elanbuffer = elanbuffer;
645                         } else {
646                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
647                                                   &krx->krx_elanbuffer, 
648                                                   &elanbuffer);
649                                 /* NB contiguous mapping */
650                                 LASSERT(rc);
651                         }
652 #else
653                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
654                                               kqswnal_data.kqn_eprxdmahandle,
655                                               page_address(page),
656                                               PAGE_SIZE, elan_page_idx,
657                                               &elanbuffer);
658                         if (j == 0)
659                                 krx->krx_elanbuffer = elanbuffer;
660
661                         /* NB contiguous mapping */
662                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
663 #endif
664                         elan_page_idx++;
665
666                 }
667         }
668         LASSERT (elan_page_idx ==
669                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
670                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
671
672         /**********************************************************************/
673         /* Network interface ready to initialise */
674
675         my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
676         my_process_id.pid = requested_pid;
677
678         rc = lib_init(&kqswnal_lib, nal, my_process_id,
679                       requested_limits, actual_limits);
680         if (rc != PTL_OK)
681         {
682                 CERROR ("lib_init failed %d\n", rc);
683                 kqswnal_shutdown (nal);
684                 return (rc);
685         }
686
687         kqswnal_data.kqn_init = KQN_INIT_LIB;
688
689         /**********************************************************************/
690         /* Queue receives, now that it's OK to run their completion callbacks */
691
692         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
693                 /* NB this enqueue can allocate/sleep (attr == 0) */
694                 krx->krx_state = KRX_POSTED;
695 #if MULTIRAIL_EKC
696                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
697                                       &krx->krx_elanbuffer, 0);
698 #else
699                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
700                                       krx->krx_elanbuffer,
701                                       krx->krx_npages * PAGE_SIZE, 0);
702 #endif
703                 if (rc != EP_SUCCESS)
704                 {
705                         CERROR ("failed ep_queue_receive %d\n", rc);
706                         kqswnal_shutdown (nal);
707                         return (PTL_FAIL);
708                 }
709         }
710
711         /**********************************************************************/
712         /* Spawn scheduling threads */
713         for (i = 0; i < num_online_cpus(); i++) {
714                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
715                 if (rc != 0)
716                 {
717                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
718                         kqswnal_shutdown (nal);
719                         return (PTL_FAIL);
720                 }
721         }
722
723         /**********************************************************************/
724         /* Connect to the router */
725         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
726         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
727
728         rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
729         if (rc != 0) {
730                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
731                 kqswnal_shutdown (nal);
732                 return (PTL_FAIL);
733         }
734
735         kqswnal_data.kqn_init = KQN_INIT_ALL;
736
737         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
738                "(Routing %s, initial mem %d)\n", 
739                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
740                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
741                pkmem);
742
743         return (PTL_OK);
744 }
745
746 void __exit
747 kqswnal_finalise (void)
748 {
749 #if CONFIG_SYSCTL
750         if (kqswnal_tunables.kqn_sysctl != NULL)
751                 unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
752 #endif
753         PtlNIFini(kqswnal_ni);
754
755         ptl_unregister_nal(QSWNAL);
756 }
757
758 static int __init
759 kqswnal_initialise (void)
760 {
761         int   rc;
762
763         kqswnal_api.nal_ni_init = kqswnal_startup;
764         kqswnal_api.nal_ni_fini = kqswnal_shutdown;
765
766         /* Initialise dynamic tunables to defaults once only */
767         kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
768         kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
769         
770         rc = ptl_register_nal(QSWNAL, &kqswnal_api);
771         if (rc != PTL_OK) {
772                 CERROR("Can't register QSWNAL: %d\n", rc);
773                 return (-ENOMEM);               /* or something... */
774         }
775
776         /* Pure gateways, and the workaround for 'EKC blocks forever until
777          * the service is active' want the NAL started up at module load
778          * time... */
779         rc = PtlNIInit(QSWNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kqswnal_ni);
780         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
781                 ptl_unregister_nal(QSWNAL);
782                 return (-ENODEV);
783         }
784
785 #if CONFIG_SYSCTL
786         /* Press on regardless even if registering sysctl doesn't work */
787         kqswnal_tunables.kqn_sysctl = 
788                 register_sysctl_table (kqswnal_top_ctl_table, 0);
789 #endif
790         return (0);
791 }
792
793 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
794 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
795 MODULE_LICENSE("GPL");
796
797 module_init (kqswnal_initialise);
798 module_exit (kqswnal_finalise);