Whamcloud - gitweb
b=2776
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 ptl_handle_ni_t         kqswnal_ni;
28 nal_t                   kqswnal_api;
29 kqswnal_data_t          kqswnal_data;
30
31 kpr_nal_interface_t kqswnal_router_interface = {
32         kprni_nalid:    QSWNAL,
33         kprni_arg:      NULL,
34         kprni_fwd:      kqswnal_fwd_packet,
35         kprni_notify:   NULL,                   /* we're connectionless */
36 };
37
38 #if CONFIG_SYSCTL
39 #define QSWNAL_SYSCTL  201
40
41 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
42 #define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
43
44 static ctl_table kqswnal_ctl_table[] = {
45         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
46          &kqswnal_data.kqn_optimized_gets, sizeof (int),
47          0644, NULL, &proc_dointvec},
48         {QSWNAL_SYSCTL_COPY_SMALL_FWD, "copy_small_fwd",
49          &kqswnal_data.kqn_copy_small_fwd, sizeof (int),
50          0644, NULL, &proc_dointvec},
51         {0}
52 };
53
54 static ctl_table kqswnal_top_ctl_table[] = {
55         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
56         {0}
57 };
58 #endif
59
60 static int
61 kqswnal_forward(nal_t   *nal,
62                 int     id,
63                 void    *args,  size_t args_len,
64                 void    *ret,   size_t ret_len)
65 {
66         kqswnal_data_t *k = nal->nal_data;
67         nal_cb_t       *nal_cb = k->kqn_cb;
68
69         LASSERT (nal == &kqswnal_api);
70         LASSERT (k == &kqswnal_data);
71         LASSERT (nal_cb == &kqswnal_lib);
72
73         lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
74         return (PTL_OK);
75 }
76
77 static void
78 kqswnal_lock (nal_t *nal, unsigned long *flags)
79 {
80         kqswnal_data_t *k = nal->nal_data;
81         nal_cb_t       *nal_cb = k->kqn_cb;
82
83         LASSERT (nal == &kqswnal_api);
84         LASSERT (k == &kqswnal_data);
85         LASSERT (nal_cb == &kqswnal_lib);
86
87         nal_cb->cb_cli(nal_cb,flags);
88 }
89
90 static void
91 kqswnal_unlock(nal_t *nal, unsigned long *flags)
92 {
93         kqswnal_data_t *k = nal->nal_data;
94         nal_cb_t       *nal_cb = k->kqn_cb;
95
96         LASSERT (nal == &kqswnal_api);
97         LASSERT (k == &kqswnal_data);
98         LASSERT (nal_cb == &kqswnal_lib);
99
100         nal_cb->cb_sti(nal_cb,flags);
101 }
102
103 static int
104 kqswnal_shutdown(nal_t *nal, int ni)
105 {
106         CDEBUG (D_NET, "shutdown\n");
107
108         LASSERT (nal == &kqswnal_api);
109         return (0);
110 }
111
112 static int
113 kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
114 {
115         /* NB called holding statelock */
116         wait_queue_t       wait;
117         unsigned long      now = jiffies;
118
119         CDEBUG (D_NET, "yield\n");
120
121         if (milliseconds == 0) {
122                 if (current->need_resched)
123                         schedule();
124                 return 0;
125         }
126
127         init_waitqueue_entry(&wait, current);
128         set_current_state(TASK_INTERRUPTIBLE);
129         add_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
130
131         kqswnal_unlock(nal, flags);
132
133         if (milliseconds < 0)
134                 schedule ();
135         else
136                 schedule_timeout((milliseconds * HZ) / 1000);
137         
138         kqswnal_lock(nal, flags);
139
140         remove_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
141
142         if (milliseconds > 0) {
143                 milliseconds -= ((jiffies - now) * 1000) / HZ;
144                 if (milliseconds < 0)
145                         milliseconds = 0;
146         }
147         
148         return (milliseconds);
149 }
150
151 static nal_t *
152 kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
153              ptl_pid_t requested_pid)
154 {
155         ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
156         int       nnids = kqswnal_data.kqn_nnodes;
157
158         CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
159
160         lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
161
162         return (&kqswnal_api);
163 }
164
165 int
166 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
167 {
168         unsigned long      flags;
169         struct list_head  *tmp;
170         kqswnal_tx_t      *ktx;
171         int                index = pcfg->pcfg_count;
172         int                rc = -ENOENT;
173
174         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
175
176         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
177                 if (index-- != 0)
178                         continue;
179
180                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
181
182                 pcfg->pcfg_pbuf1 = (char *)ktx;
183                 pcfg->pcfg_count = NTOH__u32(ktx->ktx_wire_hdr->type);
184                 pcfg->pcfg_size  = NTOH__u32(ktx->ktx_wire_hdr->payload_length);
185                 pcfg->pcfg_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
186                 pcfg->pcfg_nid2  = ktx->ktx_nid;
187                 pcfg->pcfg_misc  = ktx->ktx_launcher;
188                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
189                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
190                                   (ktx->ktx_state << 2);
191                 rc = 0;
192                 break;
193         }
194         
195         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
196         return (rc);
197 }
198
199 int
200 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
201 {
202         LASSERT (pcfg != NULL);
203         
204         switch (pcfg->pcfg_command) {
205         case NAL_CMD_GET_TXDESC:
206                 return (kqswnal_get_tx_desc (pcfg));
207
208         case NAL_CMD_REGISTER_MYNID:
209                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
210                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
211                         kqswnal_data.kqn_nid_offset);
212                 kqswnal_data.kqn_nid_offset =
213                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
214                 kqswnal_lib.ni.nid = pcfg->pcfg_nid;
215                 return (0);
216                 
217         default:
218                 return (-EINVAL);
219         }
220 }
221
222 void __exit
223 kqswnal_finalise (void)
224 {
225         switch (kqswnal_data.kqn_init)
226         {
227         default:
228                 LASSERT (0);
229
230         case KQN_INIT_ALL:
231 #if CONFIG_SYSCTL
232                 if (kqswnal_data.kqn_sysctl != NULL)
233                         unregister_sysctl_table (kqswnal_data.kqn_sysctl);
234 #endif          
235                 PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
236                 kportal_nal_unregister(QSWNAL);
237                 /* fall through */
238
239         case KQN_INIT_PTL:
240                 PtlNIFini (kqswnal_ni);
241                 lib_fini (&kqswnal_lib);
242                 /* fall through */
243
244         case KQN_INIT_DATA:
245                 break;
246
247         case KQN_INIT_NOTHING:
248                 return;
249         }
250
251         /**********************************************************************/
252         /* Make router stop her calling me and fail any more call-ins */
253         kpr_shutdown (&kqswnal_data.kqn_router);
254
255         /**********************************************************************/
256         /* flag threads we've started to terminate and wait for all to ack */
257
258         kqswnal_data.kqn_shuttingdown = 1;
259         wake_up_all (&kqswnal_data.kqn_sched_waitq);
260
261         while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
262                 CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
263                        atomic_read (&kqswnal_data.kqn_nthreads_running));
264                 set_current_state (TASK_UNINTERRUPTIBLE);
265                 schedule_timeout (HZ);
266         }
267
268         /**********************************************************************/
269         /* close elan comms */
270 #if MULTIRAIL_EKC
271         if (kqswnal_data.kqn_eprx_small != NULL)
272                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
273
274         if (kqswnal_data.kqn_eprx_large != NULL)
275                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
276
277         if (kqswnal_data.kqn_eptx != NULL)
278                 ep_free_xmtr (kqswnal_data.kqn_eptx);
279
280         /* freeing the xmtr completes all txs pdq */
281         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
282 #else
283         if (kqswnal_data.kqn_eprx_small != NULL)
284                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
285
286         if (kqswnal_data.kqn_eprx_large != NULL)
287                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
288
289         /* wait for transmits to complete */
290         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
291                 CWARN("waiting for active transmits to complete\n");
292                 set_current_state(TASK_UNINTERRUPTIBLE);
293                 schedule_timeout(HZ);
294         }
295
296         if (kqswnal_data.kqn_eptx != NULL)
297                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
298 #endif
299         /**********************************************************************/
300         /* flag threads to terminate, wake them and wait for them to die */
301
302         kqswnal_data.kqn_shuttingdown = 2;
303         wake_up_all (&kqswnal_data.kqn_sched_waitq);
304
305         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
306                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
307                        atomic_read (&kqswnal_data.kqn_nthreads));
308                 set_current_state (TASK_UNINTERRUPTIBLE);
309                 schedule_timeout (HZ);
310         }
311
312         /**********************************************************************/
313         /* No more threads.  No more portals, router or comms callbacks!
314          * I control the horizontals and the verticals...
315          */
316
317 #if MULTIRAIL_EKC
318         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
319 #endif
320
321         /**********************************************************************/
322         /* Complete any blocked forwarding packets with error
323          */
324
325         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
326         {
327                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
328                                                   kpr_fwd_desc_t, kprfd_list);
329                 list_del (&fwd->kprfd_list);
330                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
331         }
332
333         while (!list_empty (&kqswnal_data.kqn_delayedfwds))
334         {
335                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
336                                                   kpr_fwd_desc_t, kprfd_list);
337                 list_del (&fwd->kprfd_list);
338                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
339         }
340
341         /**********************************************************************/
342         /* Wait for router to complete any packets I sent her
343          */
344
345         kpr_deregister (&kqswnal_data.kqn_router);
346
347
348         /**********************************************************************/
349         /* Unmap message buffers and free all descriptors and buffers
350          */
351
352 #if MULTIRAIL_EKC
353         /* FTTB, we need to unmap any remaining mapped memory.  When
354          * ep_dvma_release() get fixed (and releases any mappings in the
355          * region), we can delete all the code from here -------->  */
356
357         if (kqswnal_data.kqn_txds != NULL) {
358                 int  i;
359
360                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
361                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
362
363                         /* If ktx has a buffer, it got mapped; unmap now.
364                          * NB only the pre-mapped stuff is still mapped
365                          * since all tx descs must be idle */
366
367                         if (ktx->ktx_buffer != NULL)
368                                 ep_dvma_unload(kqswnal_data.kqn_ep,
369                                                kqswnal_data.kqn_ep_tx_nmh,
370                                                &ktx->ktx_ebuffer);
371                 }
372         }
373
374         if (kqswnal_data.kqn_rxds != NULL) {
375                 int   i;
376
377                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
378                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
379
380                         /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
381                          * NB subsequent pages get merged */
382
383                         if (krx->krx_kiov[0].kiov_page != NULL)
384                                 ep_dvma_unload(kqswnal_data.kqn_ep,
385                                                kqswnal_data.kqn_ep_rx_nmh,
386                                                &krx->krx_elanbuffer);
387                 }
388         }
389         /* <----------- to here */
390
391         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
392                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
393
394         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
395                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
396 #else
397         if (kqswnal_data.kqn_eprxdmahandle != NULL)
398         {
399                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
400                                   kqswnal_data.kqn_eprxdmahandle, 0,
401                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
402                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
403
404                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
405                                   kqswnal_data.kqn_eprxdmahandle);
406         }
407
408         if (kqswnal_data.kqn_eptxdmahandle != NULL)
409         {
410                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
411                                   kqswnal_data.kqn_eptxdmahandle, 0,
412                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
413                                                       KQSW_NNBLK_TXMSGS));
414
415                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
416                                   kqswnal_data.kqn_eptxdmahandle);
417         }
418 #endif
419
420         if (kqswnal_data.kqn_txds != NULL)
421         {
422                 int   i;
423
424                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
425                 {
426                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
427
428                         if (ktx->ktx_buffer != NULL)
429                                 PORTAL_FREE(ktx->ktx_buffer,
430                                             KQSW_TX_BUFFER_SIZE);
431                 }
432
433                 PORTAL_FREE(kqswnal_data.kqn_txds,
434                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
435                                                      KQSW_NNBLK_TXMSGS));
436         }
437
438         if (kqswnal_data.kqn_rxds != NULL)
439         {
440                 int   i;
441                 int   j;
442
443                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
444                 {
445                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
446
447                         for (j = 0; j < krx->krx_npages; j++)
448                                 if (krx->krx_kiov[j].kiov_page != NULL)
449                                         __free_page (krx->krx_kiov[j].kiov_page);
450                 }
451
452                 PORTAL_FREE(kqswnal_data.kqn_rxds,
453                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
454                                                     KQSW_NRXMSGS_LARGE));
455         }
456
457         /* resets flags, pointers to NULL etc */
458         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
459
460         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
461
462         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
463                 atomic_read(&portal_kmemory));
464 }
465
466 static int __init
467 kqswnal_initialise (void)
468 {
469 #if MULTIRAIL_EKC
470         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
471 #else
472         ELAN3_DMA_REQUEST dmareq;
473 #endif
474         int               rc;
475         int               i;
476         int               elan_page_idx;
477         int               pkmem = atomic_read(&portal_kmemory);
478
479         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
480
481         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
482
483         kqswnal_api.forward  = kqswnal_forward;
484         kqswnal_api.shutdown = kqswnal_shutdown;
485         kqswnal_api.yield    = kqswnal_yield;
486         kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
487         kqswnal_api.lock     = kqswnal_lock;
488         kqswnal_api.unlock   = kqswnal_unlock;
489         kqswnal_api.nal_data = &kqswnal_data;
490
491         kqswnal_lib.nal_data = &kqswnal_data;
492
493         memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
494         memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
495 #if MULTIRAIL_EKC
496         kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
497 #else
498         kqswnal_rpc_failed.Status = -ECONNREFUSED;
499 #endif
500         /* ensure all pointers NULL etc */
501         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
502
503         kqswnal_data.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
504         kqswnal_data.kqn_copy_small_fwd = KQSW_COPY_SMALL_FWD;
505
506         kqswnal_data.kqn_cb = &kqswnal_lib;
507
508         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
509         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
510         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
511         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
512         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
513         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
514
515         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
516         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
517         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
518
519         spin_lock_init (&kqswnal_data.kqn_sched_lock);
520         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
521
522         spin_lock_init (&kqswnal_data.kqn_statelock);
523         init_waitqueue_head (&kqswnal_data.kqn_yield_waitq);
524
525         /* pointers/lists/locks initialised */
526         kqswnal_data.kqn_init = KQN_INIT_DATA;
527
528 #if MULTIRAIL_EKC
529         kqswnal_data.kqn_ep = ep_system();
530         if (kqswnal_data.kqn_ep == NULL) {
531                 CERROR("Can't initialise EKC\n");
532                 return (-ENODEV);
533         }
534
535         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
536                 CERROR("Can't get elan ID\n");
537                 kqswnal_finalise();
538                 return (-ENODEV);
539         }
540 #else
541         /**********************************************************************/
542         /* Find the first Elan device */
543
544         kqswnal_data.kqn_ep = ep_device (0);
545         if (kqswnal_data.kqn_ep == NULL)
546         {
547                 CERROR ("Can't get elan device 0\n");
548                 return (-ENODEV);
549         }
550 #endif
551
552         kqswnal_data.kqn_nid_offset = 0;
553         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
554         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
555         
556         /**********************************************************************/
557         /* Get the transmitter */
558
559         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
560         if (kqswnal_data.kqn_eptx == NULL)
561         {
562                 CERROR ("Can't allocate transmitter\n");
563                 kqswnal_finalise ();
564                 return (-ENOMEM);
565         }
566
567         /**********************************************************************/
568         /* Get the receivers */
569
570         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
571                                                      EP_MSG_SVC_PORTALS_SMALL,
572                                                      KQSW_EP_ENVELOPES_SMALL);
573         if (kqswnal_data.kqn_eprx_small == NULL)
574         {
575                 CERROR ("Can't install small msg receiver\n");
576                 kqswnal_finalise ();
577                 return (-ENOMEM);
578         }
579
580         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
581                                                      EP_MSG_SVC_PORTALS_LARGE,
582                                                      KQSW_EP_ENVELOPES_LARGE);
583         if (kqswnal_data.kqn_eprx_large == NULL)
584         {
585                 CERROR ("Can't install large msg receiver\n");
586                 kqswnal_finalise ();
587                 return (-ENOMEM);
588         }
589
590         /**********************************************************************/
591         /* Reserve Elan address space for transmit descriptors NB we may
592          * either send the contents of associated buffers immediately, or
593          * map them for the peer to suck/blow... */
594 #if MULTIRAIL_EKC
595         kqswnal_data.kqn_ep_tx_nmh = 
596                 ep_dvma_reserve(kqswnal_data.kqn_ep,
597                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
598                                 EP_PERM_WRITE);
599         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
600                 CERROR("Can't reserve tx dma space\n");
601                 kqswnal_finalise();
602                 return (-ENOMEM);
603         }
604 #else
605         dmareq.Waitfn   = DDI_DMA_SLEEP;
606         dmareq.ElanAddr = (E3_Addr) 0;
607         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
608         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
609
610         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
611                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
612                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
613         if (rc != DDI_SUCCESS)
614         {
615                 CERROR ("Can't reserve rx dma space\n");
616                 kqswnal_finalise ();
617                 return (-ENOMEM);
618         }
619 #endif
620         /**********************************************************************/
621         /* Reserve Elan address space for receive buffers */
622 #if MULTIRAIL_EKC
623         kqswnal_data.kqn_ep_rx_nmh =
624                 ep_dvma_reserve(kqswnal_data.kqn_ep,
625                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
626                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
627                                 EP_PERM_WRITE);
628         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
629                 CERROR("Can't reserve rx dma space\n");
630                 kqswnal_finalise();
631                 return (-ENOMEM);
632         }
633 #else
634         dmareq.Waitfn   = DDI_DMA_SLEEP;
635         dmareq.ElanAddr = (E3_Addr) 0;
636         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
637         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
638
639         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
640                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
641                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
642                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
643         if (rc != DDI_SUCCESS)
644         {
645                 CERROR ("Can't reserve rx dma space\n");
646                 kqswnal_finalise ();
647                 return (-ENOMEM);
648         }
649 #endif
650         /**********************************************************************/
651         /* Allocate/Initialise transmit descriptors */
652
653         PORTAL_ALLOC(kqswnal_data.kqn_txds,
654                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
655         if (kqswnal_data.kqn_txds == NULL)
656         {
657                 kqswnal_finalise ();
658                 return (-ENOMEM);
659         }
660
661         /* clear flags, null pointers etc */
662         memset(kqswnal_data.kqn_txds, 0,
663                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
664         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
665         {
666                 int           premapped_pages;
667                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
668                 int           basepage = i * KQSW_NTXMSGPAGES;
669
670                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
671                 if (ktx->ktx_buffer == NULL)
672                 {
673                         kqswnal_finalise ();
674                         return (-ENOMEM);
675                 }
676
677                 /* Map pre-allocated buffer NOW, to save latency on transmit */
678                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
679                                                         KQSW_TX_BUFFER_SIZE);
680 #if MULTIRAIL_EKC
681                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
682                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
683                              kqswnal_data.kqn_ep_tx_nmh, basepage,
684                              &all_rails, &ktx->ktx_ebuffer);
685 #else
686                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
687                                        kqswnal_data.kqn_eptxdmahandle,
688                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
689                                        basepage, &ktx->ktx_ebuffer);
690 #endif
691                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
692                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
693
694                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
695
696                 ktx->ktx_state = KTX_IDLE;
697                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
698                 list_add_tail (&ktx->ktx_list, 
699                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
700                                                  &kqswnal_data.kqn_idletxds);
701         }
702
703         /**********************************************************************/
704         /* Allocate/Initialise receive descriptors */
705
706         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
707                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
708         if (kqswnal_data.kqn_rxds == NULL)
709         {
710                 kqswnal_finalise ();
711                 return (-ENOMEM);
712         }
713
714         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
715                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
716
717         elan_page_idx = 0;
718         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
719         {
720 #if MULTIRAIL_EKC
721                 EP_NMD        elanbuffer;
722 #else
723                 E3_Addr       elanbuffer;
724 #endif
725                 int           j;
726                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
727
728                 if (i < KQSW_NRXMSGS_SMALL)
729                 {
730                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
731                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
732                 }
733                 else
734                 {
735                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
736                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
737                 }
738
739                 LASSERT (krx->krx_npages > 0);
740                 for (j = 0; j < krx->krx_npages; j++)
741                 {
742                         struct page *page = alloc_page(GFP_KERNEL);
743                         
744                         if (page == NULL) {
745                                 kqswnal_finalise ();
746                                 return (-ENOMEM);
747                         }
748
749                         krx->krx_kiov[j].kiov_page = page;
750                         LASSERT(page_address(page) != NULL);
751
752 #if MULTIRAIL_EKC
753                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
754                                      page_address(page),
755                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
756                                      elan_page_idx, &all_rails, &elanbuffer);
757                         
758                         if (j == 0) {
759                                 krx->krx_elanbuffer = elanbuffer;
760                         } else {
761                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
762                                                   &krx->krx_elanbuffer, 
763                                                   &elanbuffer);
764                                 /* NB contiguous mapping */
765                                 LASSERT(rc);
766                         }
767 #else
768                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
769                                               kqswnal_data.kqn_eprxdmahandle,
770                                               page_address(page),
771                                               PAGE_SIZE, elan_page_idx,
772                                               &elanbuffer);
773                         if (j == 0)
774                                 krx->krx_elanbuffer = elanbuffer;
775
776                         /* NB contiguous mapping */
777                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
778 #endif
779                         elan_page_idx++;
780
781                 }
782         }
783         LASSERT (elan_page_idx ==
784                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
785                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
786
787         /**********************************************************************/
788         /* Network interface ready to initialise */
789
790         rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
791         if (rc != 0)
792         {
793                 CERROR ("PtlNIInit failed %d\n", rc);
794                 kqswnal_finalise ();
795                 return (-ENOMEM);
796         }
797
798         kqswnal_data.kqn_init = KQN_INIT_PTL;
799
800         /**********************************************************************/
801         /* Queue receives, now that it's OK to run their completion callbacks */
802
803         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
804         {
805                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
806
807                 /* NB this enqueue can allocate/sleep (attr == 0) */
808 #if MULTIRAIL_EKC
809                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
810                                       &krx->krx_elanbuffer, 0);
811 #else
812                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
813                                       krx->krx_elanbuffer,
814                                       krx->krx_npages * PAGE_SIZE, 0);
815 #endif
816                 if (rc != EP_SUCCESS)
817                 {
818                         CERROR ("failed ep_queue_receive %d\n", rc);
819                         kqswnal_finalise ();
820                         return (-ENOMEM);
821                 }
822         }
823
824         /**********************************************************************/
825         /* Spawn scheduling threads */
826         for (i = 0; i < smp_num_cpus; i++)
827         {
828                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
829                 if (rc != 0)
830                 {
831                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
832                         kqswnal_finalise ();
833                         return (rc);
834                 }
835         }
836
837         /**********************************************************************/
838         /* Connect to the router */
839         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
840         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
841
842         rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
843         if (rc != 0) {
844                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
845                 kqswnal_finalise ();
846                 return (rc);
847         }
848
849 #if CONFIG_SYSCTL
850         /* Press on regardless even if registering sysctl doesn't work */
851         kqswnal_data.kqn_sysctl = register_sysctl_table (kqswnal_top_ctl_table, 0);
852 #endif
853
854         PORTAL_SYMBOL_REGISTER(kqswnal_ni);
855         kqswnal_data.kqn_init = KQN_INIT_ALL;
856
857         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
858                "(Routing %s, initial mem %d)\n", 
859                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
860                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
861                pkmem);
862
863         return (0);
864 }
865
866
867 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
868 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
869 MODULE_LICENSE("GPL");
870
871 module_init (kqswnal_initialise);
872 module_exit (kqswnal_finalise);
873
874 EXPORT_SYMBOL (kqswnal_ni);