Whamcloud - gitweb
* landed unified portals (b_hd_cleanup_merge_singleportals) on HEAD
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 nal_t                   kqswnal_api;
28 kqswnal_data_t          kqswnal_data;
29 ptl_handle_ni_t         kqswnal_ni;
30 kqswnal_tunables_t      kqswnal_tunables;
31
32 kpr_nal_interface_t kqswnal_router_interface = {
33         kprni_nalid:    QSWNAL,
34         kprni_arg:      NULL,
35         kprni_fwd:      kqswnal_fwd_packet,
36         kprni_notify:   NULL,                   /* we're connectionless */
37 };
38
39 #if CONFIG_SYSCTL
40 #define QSWNAL_SYSCTL  201
41
42 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
43 #define QSWNAL_SYSCTL_OPTIMIZED_PUTS     2
44
45 static ctl_table kqswnal_ctl_table[] = {
46         {QSWNAL_SYSCTL_OPTIMIZED_PUTS, "optimized_puts",
47          &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
48          0644, NULL, &proc_dointvec},
49         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
50          &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
51          0644, NULL, &proc_dointvec},
52         {0}
53 };
54
55 static ctl_table kqswnal_top_ctl_table[] = {
56         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
57         {0}
58 };
59 #endif
60
61 int
62 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
63 {
64         unsigned long      flags;
65         struct list_head  *tmp;
66         kqswnal_tx_t      *ktx;
67         ptl_hdr_t         *hdr;
68         int                index = pcfg->pcfg_count;
69         int                rc = -ENOENT;
70
71         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
72
73         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
74                 if (index-- != 0)
75                         continue;
76
77                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
78                 hdr = (ptl_hdr_t *)ktx->ktx_buffer;
79
80                 pcfg->pcfg_pbuf1 = (char *)ktx;
81                 pcfg->pcfg_count = le32_to_cpu(hdr->type);
82                 pcfg->pcfg_size  = le32_to_cpu(hdr->payload_length);
83                 pcfg->pcfg_nid   = le64_to_cpu(hdr->dest_nid);
84                 pcfg->pcfg_nid2  = ktx->ktx_nid;
85                 pcfg->pcfg_misc  = ktx->ktx_launcher;
86                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
87                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
88                                   (ktx->ktx_state << 2);
89                 rc = 0;
90                 break;
91         }
92         
93         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
94         return (rc);
95 }
96
97 int
98 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
99 {
100         LASSERT (pcfg != NULL);
101         
102         switch (pcfg->pcfg_command) {
103         case NAL_CMD_GET_TXDESC:
104                 return (kqswnal_get_tx_desc (pcfg));
105
106         case NAL_CMD_REGISTER_MYNID:
107                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
108                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
109                         kqswnal_data.kqn_nid_offset);
110                 kqswnal_data.kqn_nid_offset =
111                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
112                 kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
113                 return (0);
114                 
115         default:
116                 return (-EINVAL);
117         }
118 }
119
120 static void
121 kqswnal_shutdown(nal_t *nal)
122 {
123         unsigned long flags;
124         kqswnal_tx_t *ktx;
125         kqswnal_rx_t *krx;
126         int           do_lib_fini = 0;
127
128         /* NB The first ref was this module! */
129         if (nal->nal_refct != 0) {
130                 PORTAL_MODULE_UNUSE;
131                 return;
132         }
133
134         CDEBUG (D_NET, "shutdown\n");
135         LASSERT (nal == &kqswnal_api);
136
137         switch (kqswnal_data.kqn_init)
138         {
139         default:
140                 LASSERT (0);
141
142         case KQN_INIT_ALL:
143                 libcfs_nal_cmd_unregister(QSWNAL);
144                 /* fall through */
145
146         case KQN_INIT_LIB:
147                 do_lib_fini = 1;
148                 /* fall through */
149
150         case KQN_INIT_DATA:
151                 break;
152
153         case KQN_INIT_NOTHING:
154                 return;
155         }
156
157         /**********************************************************************/
158         /* Tell router we're shutting down.  Any router calls my threads
159          * make will now fail immediately and the router will stop calling
160          * into me. */
161         kpr_shutdown (&kqswnal_data.kqn_router);
162         
163         /**********************************************************************/
164         /* Signal the start of shutdown... */
165         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
166         kqswnal_data.kqn_shuttingdown = 1;
167         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
168
169         wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
170
171         /**********************************************************************/
172         /* wait for sends that have allocated a tx desc to launch or give up */
173         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
174                 CDEBUG(D_NET, "waiting for %d pending sends\n",
175                        atomic_read (&kqswnal_data.kqn_pending_txs));
176                 set_current_state (TASK_UNINTERRUPTIBLE);
177                 schedule_timeout (HZ);
178         }
179
180         /**********************************************************************/
181         /* close elan comms */
182 #if MULTIRAIL_EKC
183         /* Shut down receivers first; rx callbacks might try sending... */
184         if (kqswnal_data.kqn_eprx_small != NULL)
185                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
186
187         if (kqswnal_data.kqn_eprx_large != NULL)
188                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
189
190         /* NB ep_free_rcvr() returns only after we've freed off all receive
191          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
192          * means we must have completed any messages we passed to
193          * lib_parse() or kpr_fwd_start(). */
194
195         if (kqswnal_data.kqn_eptx != NULL)
196                 ep_free_xmtr (kqswnal_data.kqn_eptx);
197
198         /* NB ep_free_xmtr() returns only after all outstanding transmits
199          * have called their callback... */
200         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
201 #else
202         /* "Old" EKC just pretends to shutdown cleanly but actually
203          * provides no guarantees */
204         if (kqswnal_data.kqn_eprx_small != NULL)
205                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
206
207         if (kqswnal_data.kqn_eprx_large != NULL)
208                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
209
210         /* wait for transmits to complete */
211         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
212                 CWARN("waiting for active transmits to complete\n");
213                 set_current_state(TASK_UNINTERRUPTIBLE);
214                 schedule_timeout(HZ);
215         }
216
217         if (kqswnal_data.kqn_eptx != NULL)
218                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
219 #endif
220         /**********************************************************************/
221         /* flag threads to terminate, wake them and wait for them to die */
222         kqswnal_data.kqn_shuttingdown = 2;
223         wake_up_all (&kqswnal_data.kqn_sched_waitq);
224
225         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
226                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
227                        atomic_read (&kqswnal_data.kqn_nthreads));
228                 set_current_state (TASK_UNINTERRUPTIBLE);
229                 schedule_timeout (HZ);
230         }
231
232         /**********************************************************************/
233         /* No more threads.  No more portals, router or comms callbacks!
234          * I control the horizontals and the verticals...
235          */
236
237 #if MULTIRAIL_EKC
238         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
239         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
240         LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
241 #endif
242
243         /**********************************************************************/
244         /* Complete any blocked forwarding packets, with error
245          */
246
247         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
248         {
249                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
250                                                   kpr_fwd_desc_t, kprfd_list);
251                 list_del (&fwd->kprfd_list);
252                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
253         }
254
255         /**********************************************************************/
256         /* finalise router and portals lib */
257
258         kpr_deregister (&kqswnal_data.kqn_router);
259
260         if (do_lib_fini)
261                 lib_fini (&kqswnal_lib);
262
263         /**********************************************************************/
264         /* Unmap message buffers and free all descriptors and buffers
265          */
266
267 #if MULTIRAIL_EKC
268         /* FTTB, we need to unmap any remaining mapped memory.  When
269          * ep_dvma_release() get fixed (and releases any mappings in the
270          * region), we can delete all the code from here -------->  */
271
272         for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
273                 /* If ktx has a buffer, it got mapped; unmap now.  NB only
274                  * the pre-mapped stuff is still mapped since all tx descs
275                  * must be idle */
276
277                 if (ktx->ktx_buffer != NULL)
278                         ep_dvma_unload(kqswnal_data.kqn_ep,
279                                        kqswnal_data.kqn_ep_tx_nmh,
280                                        &ktx->ktx_ebuffer);
281         }
282
283         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
284                 /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
285                  * NB subsequent pages get merged */
286
287                 if (krx->krx_kiov[0].kiov_page != NULL)
288                         ep_dvma_unload(kqswnal_data.kqn_ep,
289                                        kqswnal_data.kqn_ep_rx_nmh,
290                                        &krx->krx_elanbuffer);
291         }
292         /* <----------- to here */
293
294         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
295                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
296
297         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
298                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
299 #else
300         if (kqswnal_data.kqn_eprxdmahandle != NULL)
301         {
302                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
303                                   kqswnal_data.kqn_eprxdmahandle, 0,
304                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
305                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
306
307                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
308                                   kqswnal_data.kqn_eprxdmahandle);
309         }
310
311         if (kqswnal_data.kqn_eptxdmahandle != NULL)
312         {
313                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
314                                   kqswnal_data.kqn_eptxdmahandle, 0,
315                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
316                                                       KQSW_NNBLK_TXMSGS));
317
318                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
319                                   kqswnal_data.kqn_eptxdmahandle);
320         }
321 #endif
322
323         while (kqswnal_data.kqn_txds != NULL) {
324                 ktx = kqswnal_data.kqn_txds;
325
326                 if (ktx->ktx_buffer != NULL)
327                         PORTAL_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
328
329                 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
330                 PORTAL_FREE(ktx, sizeof(*ktx));
331         }
332
333         while (kqswnal_data.kqn_rxds != NULL) {
334                 int           i;
335
336                 krx = kqswnal_data.kqn_rxds;
337                 for (i = 0; i < krx->krx_npages; i++)
338                         if (krx->krx_kiov[i].kiov_page != NULL)
339                                 __free_page (krx->krx_kiov[i].kiov_page);
340
341                 kqswnal_data.kqn_rxds = krx->krx_alloclist;
342                 PORTAL_FREE(krx, sizeof (*krx));
343         }
344
345         /* resets flags, pointers to NULL etc */
346         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
347
348         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
349
350         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
351                 atomic_read(&portal_kmemory));
352 }
353
354 static int
355 kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
356                  ptl_ni_limits_t *requested_limits, 
357                  ptl_ni_limits_t *actual_limits)
358 {
359 #if MULTIRAIL_EKC
360         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
361 #else
362         ELAN3_DMA_REQUEST dmareq;
363 #endif
364         int               rc;
365         int               i;
366         kqswnal_rx_t     *krx;
367         kqswnal_tx_t     *ktx;
368         int               elan_page_idx;
369         ptl_process_id_t  my_process_id;
370         int               pkmem = atomic_read(&portal_kmemory);
371
372         LASSERT (nal == &kqswnal_api);
373
374         if (nal->nal_refct != 0) {
375                 if (actual_limits != NULL)
376                         *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
377                 /* This module got the first ref */
378                 PORTAL_MODULE_USE;
379                 return (PTL_OK);
380         }
381
382         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
383
384         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
385
386         /* ensure all pointers NULL etc */
387         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
388
389         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
390         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
391         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
392         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
393         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
394         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
395
396         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
397         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
398         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
399
400         spin_lock_init (&kqswnal_data.kqn_sched_lock);
401         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
402
403         /* Leave kqn_rpc_success zeroed */
404 #if MULTIRAIL_EKC
405         kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
406 #else
407         kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
408 #endif
409
410         /* pointers/lists/locks initialised */
411         kqswnal_data.kqn_init = KQN_INIT_DATA;
412         
413 #if MULTIRAIL_EKC
414         kqswnal_data.kqn_ep = ep_system();
415         if (kqswnal_data.kqn_ep == NULL) {
416                 CERROR("Can't initialise EKC\n");
417                 kqswnal_shutdown(nal);
418                 return (PTL_IFACE_INVALID);
419         }
420
421         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
422                 CERROR("Can't get elan ID\n");
423                 kqswnal_shutdown(nal);
424                 return (PTL_IFACE_INVALID);
425         }
426 #else
427         /**********************************************************************/
428         /* Find the first Elan device */
429
430         kqswnal_data.kqn_ep = ep_device (0);
431         if (kqswnal_data.kqn_ep == NULL)
432         {
433                 CERROR ("Can't get elan device 0\n");
434                 kqswnal_shutdown(nal);
435                 return (PTL_IFACE_INVALID);
436         }
437 #endif
438
439         kqswnal_data.kqn_nid_offset = 0;
440         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
441         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
442         
443         /**********************************************************************/
444         /* Get the transmitter */
445
446         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
447         if (kqswnal_data.kqn_eptx == NULL)
448         {
449                 CERROR ("Can't allocate transmitter\n");
450                 kqswnal_shutdown (nal);
451                 return (PTL_NO_SPACE);
452         }
453
454         /**********************************************************************/
455         /* Get the receivers */
456
457         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
458                                                      EP_MSG_SVC_PORTALS_SMALL,
459                                                      KQSW_EP_ENVELOPES_SMALL);
460         if (kqswnal_data.kqn_eprx_small == NULL)
461         {
462                 CERROR ("Can't install small msg receiver\n");
463                 kqswnal_shutdown (nal);
464                 return (PTL_NO_SPACE);
465         }
466
467         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
468                                                      EP_MSG_SVC_PORTALS_LARGE,
469                                                      KQSW_EP_ENVELOPES_LARGE);
470         if (kqswnal_data.kqn_eprx_large == NULL)
471         {
472                 CERROR ("Can't install large msg receiver\n");
473                 kqswnal_shutdown (nal);
474                 return (PTL_NO_SPACE);
475         }
476
477         /**********************************************************************/
478         /* Reserve Elan address space for transmit descriptors NB we may
479          * either send the contents of associated buffers immediately, or
480          * map them for the peer to suck/blow... */
481 #if MULTIRAIL_EKC
482         kqswnal_data.kqn_ep_tx_nmh = 
483                 ep_dvma_reserve(kqswnal_data.kqn_ep,
484                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
485                                 EP_PERM_WRITE);
486         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
487                 CERROR("Can't reserve tx dma space\n");
488                 kqswnal_shutdown(nal);
489                 return (PTL_NO_SPACE);
490         }
491 #else
492         dmareq.Waitfn   = DDI_DMA_SLEEP;
493         dmareq.ElanAddr = (E3_Addr) 0;
494         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
495         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
496
497         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
498                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
499                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
500         if (rc != DDI_SUCCESS)
501         {
502                 CERROR ("Can't reserve rx dma space\n");
503                 kqswnal_shutdown (nal);
504                 return (PTL_NO_SPACE);
505         }
506 #endif
507         /**********************************************************************/
508         /* Reserve Elan address space for receive buffers */
509 #if MULTIRAIL_EKC
510         kqswnal_data.kqn_ep_rx_nmh =
511                 ep_dvma_reserve(kqswnal_data.kqn_ep,
512                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
513                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
514                                 EP_PERM_WRITE);
515         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
516                 CERROR("Can't reserve rx dma space\n");
517                 kqswnal_shutdown(nal);
518                 return (PTL_NO_SPACE);
519         }
520 #else
521         dmareq.Waitfn   = DDI_DMA_SLEEP;
522         dmareq.ElanAddr = (E3_Addr) 0;
523         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
524         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
525
526         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
527                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
528                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
529                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
530         if (rc != DDI_SUCCESS)
531         {
532                 CERROR ("Can't reserve rx dma space\n");
533                 kqswnal_shutdown (nal);
534                 return (PTL_NO_SPACE);
535         }
536 #endif
537         /**********************************************************************/
538         /* Allocate/Initialise transmit descriptors */
539
540         kqswnal_data.kqn_txds = NULL;
541         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
542         {
543                 int           premapped_pages;
544                 int           basepage = i * KQSW_NTXMSGPAGES;
545
546                 PORTAL_ALLOC (ktx, sizeof(*ktx));
547                 if (ktx == NULL) {
548                         kqswnal_shutdown (nal);
549                         return (PTL_NO_SPACE);
550                 }
551
552                 memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
553                 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
554                 kqswnal_data.kqn_txds = ktx;
555
556                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
557                 if (ktx->ktx_buffer == NULL)
558                 {
559                         kqswnal_shutdown (nal);
560                         return (PTL_NO_SPACE);
561                 }
562
563                 /* Map pre-allocated buffer NOW, to save latency on transmit */
564                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
565                                                         KQSW_TX_BUFFER_SIZE);
566 #if MULTIRAIL_EKC
567                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
568                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
569                              kqswnal_data.kqn_ep_tx_nmh, basepage,
570                              &all_rails, &ktx->ktx_ebuffer);
571 #else
572                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
573                                        kqswnal_data.kqn_eptxdmahandle,
574                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
575                                        basepage, &ktx->ktx_ebuffer);
576 #endif
577                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
578                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
579
580                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
581
582                 ktx->ktx_state = KTX_IDLE;
583 #if MULTIRAIL_EKC
584                 ktx->ktx_rail = -1;             /* unset rail */
585 #endif
586                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
587                 list_add_tail (&ktx->ktx_list, 
588                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
589                                                  &kqswnal_data.kqn_idletxds);
590         }
591
592         /**********************************************************************/
593         /* Allocate/Initialise receive descriptors */
594         kqswnal_data.kqn_rxds = NULL;
595         elan_page_idx = 0;
596         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
597         {
598 #if MULTIRAIL_EKC
599                 EP_NMD        elanbuffer;
600 #else
601                 E3_Addr       elanbuffer;
602 #endif
603                 int           j;
604
605                 PORTAL_ALLOC(krx, sizeof(*krx));
606                 if (krx == NULL) {
607                         kqswnal_shutdown(nal);
608                         return (PTL_NO_SPACE);
609                 }
610
611                 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
612                 krx->krx_alloclist = kqswnal_data.kqn_rxds;
613                 kqswnal_data.kqn_rxds = krx;
614
615                 if (i < KQSW_NRXMSGS_SMALL)
616                 {
617                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
618                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
619                 }
620                 else
621                 {
622                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
623                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
624                 }
625
626                 LASSERT (krx->krx_npages > 0);
627                 for (j = 0; j < krx->krx_npages; j++)
628                 {
629                         struct page *page = alloc_page(GFP_KERNEL);
630                         
631                         if (page == NULL) {
632                                 kqswnal_shutdown (nal);
633                                 return (PTL_NO_SPACE);
634                         }
635
636                         krx->krx_kiov[j].kiov_page = page;
637                         LASSERT(page_address(page) != NULL);
638
639 #if MULTIRAIL_EKC
640                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
641                                      page_address(page),
642                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
643                                      elan_page_idx, &all_rails, &elanbuffer);
644                         
645                         if (j == 0) {
646                                 krx->krx_elanbuffer = elanbuffer;
647                         } else {
648                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
649                                                   &krx->krx_elanbuffer, 
650                                                   &elanbuffer);
651                                 /* NB contiguous mapping */
652                                 LASSERT(rc);
653                         }
654 #else
655                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
656                                               kqswnal_data.kqn_eprxdmahandle,
657                                               page_address(page),
658                                               PAGE_SIZE, elan_page_idx,
659                                               &elanbuffer);
660                         if (j == 0)
661                                 krx->krx_elanbuffer = elanbuffer;
662
663                         /* NB contiguous mapping */
664                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
665 #endif
666                         elan_page_idx++;
667
668                 }
669         }
670         LASSERT (elan_page_idx ==
671                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
672                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
673
674         /**********************************************************************/
675         /* Network interface ready to initialise */
676
677         my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
678         my_process_id.pid = requested_pid;
679
680         rc = lib_init(&kqswnal_lib, nal, my_process_id,
681                       requested_limits, actual_limits);
682         if (rc != PTL_OK)
683         {
684                 CERROR ("lib_init failed %d\n", rc);
685                 kqswnal_shutdown (nal);
686                 return (rc);
687         }
688
689         kqswnal_data.kqn_init = KQN_INIT_LIB;
690
691         /**********************************************************************/
692         /* Queue receives, now that it's OK to run their completion callbacks */
693
694         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
695                 /* NB this enqueue can allocate/sleep (attr == 0) */
696                 krx->krx_state = KRX_POSTED;
697 #if MULTIRAIL_EKC
698                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
699                                       &krx->krx_elanbuffer, 0);
700 #else
701                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
702                                       krx->krx_elanbuffer,
703                                       krx->krx_npages * PAGE_SIZE, 0);
704 #endif
705                 if (rc != EP_SUCCESS)
706                 {
707                         CERROR ("failed ep_queue_receive %d\n", rc);
708                         kqswnal_shutdown (nal);
709                         return (PTL_FAIL);
710                 }
711         }
712
713         /**********************************************************************/
714         /* Spawn scheduling threads */
715         for (i = 0; i < num_online_cpus(); i++) {
716                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
717                 if (rc != 0)
718                 {
719                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
720                         kqswnal_shutdown (nal);
721                         return (PTL_FAIL);
722                 }
723         }
724
725         /**********************************************************************/
726         /* Connect to the router */
727         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
728         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
729
730         rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
731         if (rc != 0) {
732                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
733                 kqswnal_shutdown (nal);
734                 return (PTL_FAIL);
735         }
736
737         kqswnal_data.kqn_init = KQN_INIT_ALL;
738
739         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
740                "(Routing %s, initial mem %d)\n", 
741                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
742                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
743                pkmem);
744
745         return (PTL_OK);
746 }
747
748 void __exit
749 kqswnal_finalise (void)
750 {
751 #if CONFIG_SYSCTL
752         if (kqswnal_tunables.kqn_sysctl != NULL)
753                 unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
754 #endif
755         PtlNIFini(kqswnal_ni);
756
757         ptl_unregister_nal(QSWNAL);
758 }
759
760 static int __init
761 kqswnal_initialise (void)
762 {
763         int   rc;
764
765         kqswnal_api.nal_ni_init = kqswnal_startup;
766         kqswnal_api.nal_ni_fini = kqswnal_shutdown;
767
768         /* Initialise dynamic tunables to defaults once only */
769         kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
770         kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
771         
772         rc = ptl_register_nal(QSWNAL, &kqswnal_api);
773         if (rc != PTL_OK) {
774                 CERROR("Can't register QSWNAL: %d\n", rc);
775                 return (-ENOMEM);               /* or something... */
776         }
777
778         /* Pure gateways, and the workaround for 'EKC blocks forever until
779          * the service is active' want the NAL started up at module load
780          * time... */
781         rc = PtlNIInit(QSWNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kqswnal_ni);
782         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
783                 ptl_unregister_nal(QSWNAL);
784                 return (-ENODEV);
785         }
786
787 #if CONFIG_SYSCTL
788         /* Press on regardless even if registering sysctl doesn't work */
789         kqswnal_tunables.kqn_sysctl = 
790                 register_sysctl_table (kqswnal_top_ctl_table, 0);
791 #endif
792         return (0);
793 }
794
795 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
796 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
797 MODULE_LICENSE("GPL");
798
799 module_init (kqswnal_initialise);
800 module_exit (kqswnal_finalise);