Whamcloud - gitweb
* Landed b_cray_portals_merge.
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 nal_t                   kqswnal_api;
28 kqswnal_data_t          kqswnal_data;
29 ptl_handle_ni_t         kqswnal_ni;
30 kqswnal_tunables_t      kqswnal_tunables;
31
32 kpr_nal_interface_t kqswnal_router_interface = {
33         kprni_nalid:    QSWNAL,
34         kprni_arg:      NULL,
35         kprni_fwd:      kqswnal_fwd_packet,
36         kprni_notify:   NULL,                   /* we're connectionless */
37 };
38
39 #if CONFIG_SYSCTL
40 #define QSWNAL_SYSCTL  201
41
42 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
43 #define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
44
45 static ctl_table kqswnal_ctl_table[] = {
46         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
47          &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
48          0644, NULL, &proc_dointvec},
49         {0}
50 };
51
52 static ctl_table kqswnal_top_ctl_table[] = {
53         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
54         {0}
55 };
56 #endif
57
58 static int
59 kqswnal_forward(nal_t   *nal,
60                 int     id,
61                 void    *args,  size_t args_len,
62                 void    *ret,   size_t ret_len)
63 {
64         kqswnal_data_t *k = nal->nal_data;
65         nal_cb_t       *nal_cb = k->kqn_cb;
66
67         LASSERT (nal == &kqswnal_api);
68         LASSERT (k == &kqswnal_data);
69         LASSERT (nal_cb == &kqswnal_lib);
70
71         lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
72         return (PTL_OK);
73 }
74
75 static void
76 kqswnal_lock (nal_t *nal, unsigned long *flags)
77 {
78         kqswnal_data_t *k = nal->nal_data;
79         nal_cb_t       *nal_cb = k->kqn_cb;
80
81         LASSERT (nal == &kqswnal_api);
82         LASSERT (k == &kqswnal_data);
83         LASSERT (nal_cb == &kqswnal_lib);
84
85         nal_cb->cb_cli(nal_cb,flags);
86 }
87
88 static void
89 kqswnal_unlock(nal_t *nal, unsigned long *flags)
90 {
91         kqswnal_data_t *k = nal->nal_data;
92         nal_cb_t       *nal_cb = k->kqn_cb;
93
94         LASSERT (nal == &kqswnal_api);
95         LASSERT (k == &kqswnal_data);
96         LASSERT (nal_cb == &kqswnal_lib);
97
98         nal_cb->cb_sti(nal_cb,flags);
99 }
100
101 static int
102 kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
103 {
104         /* NB called holding statelock */
105         wait_queue_t       wait;
106         unsigned long      now = jiffies;
107
108         CDEBUG (D_NET, "yield\n");
109
110         if (milliseconds == 0) {
111                 if (current->need_resched)
112                         schedule();
113                 return 0;
114         }
115
116         init_waitqueue_entry(&wait, current);
117         set_current_state(TASK_INTERRUPTIBLE);
118         add_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
119
120         kqswnal_unlock(nal, flags);
121
122         if (milliseconds < 0)
123                 schedule ();
124         else
125                 schedule_timeout((milliseconds * HZ) / 1000);
126         
127         kqswnal_lock(nal, flags);
128
129         remove_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
130
131         if (milliseconds > 0) {
132                 milliseconds -= ((jiffies - now) * 1000) / HZ;
133                 if (milliseconds < 0)
134                         milliseconds = 0;
135         }
136         
137         return (milliseconds);
138 }
139
140 int
141 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
142 {
143         unsigned long      flags;
144         struct list_head  *tmp;
145         kqswnal_tx_t      *ktx;
146         int                index = pcfg->pcfg_count;
147         int                rc = -ENOENT;
148
149         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
150
151         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
152                 if (index-- != 0)
153                         continue;
154
155                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
156
157                 pcfg->pcfg_pbuf1 = (char *)ktx;
158                 pcfg->pcfg_count = NTOH__u32(ktx->ktx_wire_hdr->type);
159                 pcfg->pcfg_size  = NTOH__u32(ktx->ktx_wire_hdr->payload_length);
160                 pcfg->pcfg_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
161                 pcfg->pcfg_nid2  = ktx->ktx_nid;
162                 pcfg->pcfg_misc  = ktx->ktx_launcher;
163                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
164                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
165                                   (ktx->ktx_state << 2);
166                 rc = 0;
167                 break;
168         }
169         
170         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
171         return (rc);
172 }
173
174 int
175 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
176 {
177         LASSERT (pcfg != NULL);
178         
179         switch (pcfg->pcfg_command) {
180         case NAL_CMD_GET_TXDESC:
181                 return (kqswnal_get_tx_desc (pcfg));
182
183         case NAL_CMD_REGISTER_MYNID:
184                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
185                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
186                         kqswnal_data.kqn_nid_offset);
187                 kqswnal_data.kqn_nid_offset =
188                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
189                 kqswnal_lib.ni.nid = pcfg->pcfg_nid;
190                 return (0);
191                 
192         default:
193                 return (-EINVAL);
194         }
195 }
196
197 static void
198 kqswnal_shutdown(nal_t *nal)
199 {
200         unsigned long flags;
201         int           do_lib_fini = 0;
202
203         /* NB The first ref was this module! */
204         if (nal->nal_refct != 0) {
205                 PORTAL_MODULE_UNUSE;
206                 return;
207         }
208
209         CDEBUG (D_NET, "shutdown\n");
210         LASSERT (nal == &kqswnal_api);
211
212         switch (kqswnal_data.kqn_init)
213         {
214         default:
215                 LASSERT (0);
216
217         case KQN_INIT_ALL:
218                 libcfs_nal_cmd_unregister(QSWNAL);
219                 /* fall through */
220
221         case KQN_INIT_LIB:
222                 do_lib_fini = 1;
223                 /* fall through */
224
225         case KQN_INIT_DATA:
226                 break;
227
228         case KQN_INIT_NOTHING:
229                 return;
230         }
231
232         /**********************************************************************/
233         /* Tell router we're shutting down.  Any router calls my threads
234          * make will now fail immediately and the router will stop calling
235          * into me. */
236         kpr_shutdown (&kqswnal_data.kqn_router);
237         
238         /**********************************************************************/
239         /* Signal the start of shutdown... */
240         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
241         kqswnal_data.kqn_shuttingdown = 1;
242         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
243
244         wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
245
246         /**********************************************************************/
247         /* wait for sends that have allocated a tx desc to launch or give up */
248         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
249                 CDEBUG(D_NET, "waiting for %d pending sends\n",
250                        atomic_read (&kqswnal_data.kqn_pending_txs));
251                 set_current_state (TASK_UNINTERRUPTIBLE);
252                 schedule_timeout (HZ);
253         }
254
255         /**********************************************************************/
256         /* close elan comms */
257 #if MULTIRAIL_EKC
258         /* Shut down receivers first; rx callbacks might try sending... */
259         if (kqswnal_data.kqn_eprx_small != NULL)
260                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
261
262         if (kqswnal_data.kqn_eprx_large != NULL)
263                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
264
265         /* NB ep_free_rcvr() returns only after we've freed off all receive
266          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
267          * means we must have completed any messages we passed to
268          * lib_parse() or kpr_fwd_start(). */
269
270         if (kqswnal_data.kqn_eptx != NULL)
271                 ep_free_xmtr (kqswnal_data.kqn_eptx);
272
273         /* NB ep_free_xmtr() returns only after all outstanding transmits
274          * have called their callback... */
275         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
276 #else
277         /* "Old" EKC just pretends to shutdown cleanly but actually
278          * provides no guarantees */
279         if (kqswnal_data.kqn_eprx_small != NULL)
280                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
281
282         if (kqswnal_data.kqn_eprx_large != NULL)
283                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
284
285         /* wait for transmits to complete */
286         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
287                 CWARN("waiting for active transmits to complete\n");
288                 set_current_state(TASK_UNINTERRUPTIBLE);
289                 schedule_timeout(HZ);
290         }
291
292         if (kqswnal_data.kqn_eptx != NULL)
293                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
294 #endif
295         /**********************************************************************/
296         /* flag threads to terminate, wake them and wait for them to die */
297         kqswnal_data.kqn_shuttingdown = 2;
298         wake_up_all (&kqswnal_data.kqn_sched_waitq);
299
300         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
301                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
302                        atomic_read (&kqswnal_data.kqn_nthreads));
303                 set_current_state (TASK_UNINTERRUPTIBLE);
304                 schedule_timeout (HZ);
305         }
306
307         /**********************************************************************/
308         /* No more threads.  No more portals, router or comms callbacks!
309          * I control the horizontals and the verticals...
310          */
311
312 #if MULTIRAIL_EKC
313         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
314         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
315         LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
316 #endif
317
318         /**********************************************************************/
319         /* Complete any blocked forwarding packets, with error
320          */
321
322         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
323         {
324                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
325                                                   kpr_fwd_desc_t, kprfd_list);
326                 list_del (&fwd->kprfd_list);
327                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
328         }
329
330         /**********************************************************************/
331         /* finalise router and portals lib */
332
333         kpr_deregister (&kqswnal_data.kqn_router);
334
335         if (do_lib_fini)
336                 lib_fini (&kqswnal_lib);
337
338         /**********************************************************************/
339         /* Unmap message buffers and free all descriptors and buffers
340          */
341
342 #if MULTIRAIL_EKC
343         /* FTTB, we need to unmap any remaining mapped memory.  When
344          * ep_dvma_release() get fixed (and releases any mappings in the
345          * region), we can delete all the code from here -------->  */
346
347         if (kqswnal_data.kqn_txds != NULL) {
348                 int  i;
349
350                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
351                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
352
353                         /* If ktx has a buffer, it got mapped; unmap now.
354                          * NB only the pre-mapped stuff is still mapped
355                          * since all tx descs must be idle */
356
357                         if (ktx->ktx_buffer != NULL)
358                                 ep_dvma_unload(kqswnal_data.kqn_ep,
359                                                kqswnal_data.kqn_ep_tx_nmh,
360                                                &ktx->ktx_ebuffer);
361                 }
362         }
363
364         if (kqswnal_data.kqn_rxds != NULL) {
365                 int   i;
366
367                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
368                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
369
370                         /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
371                          * NB subsequent pages get merged */
372
373                         if (krx->krx_kiov[0].kiov_page != NULL)
374                                 ep_dvma_unload(kqswnal_data.kqn_ep,
375                                                kqswnal_data.kqn_ep_rx_nmh,
376                                                &krx->krx_elanbuffer);
377                 }
378         }
379         /* <----------- to here */
380
381         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
382                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
383
384         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
385                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
386 #else
387         if (kqswnal_data.kqn_eprxdmahandle != NULL)
388         {
389                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
390                                   kqswnal_data.kqn_eprxdmahandle, 0,
391                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
392                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
393
394                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
395                                   kqswnal_data.kqn_eprxdmahandle);
396         }
397
398         if (kqswnal_data.kqn_eptxdmahandle != NULL)
399         {
400                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
401                                   kqswnal_data.kqn_eptxdmahandle, 0,
402                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
403                                                       KQSW_NNBLK_TXMSGS));
404
405                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
406                                   kqswnal_data.kqn_eptxdmahandle);
407         }
408 #endif
409
410         if (kqswnal_data.kqn_txds != NULL)
411         {
412                 int   i;
413
414                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
415                 {
416                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
417
418                         if (ktx->ktx_buffer != NULL)
419                                 PORTAL_FREE(ktx->ktx_buffer,
420                                             KQSW_TX_BUFFER_SIZE);
421                 }
422
423                 PORTAL_FREE(kqswnal_data.kqn_txds,
424                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
425                                                      KQSW_NNBLK_TXMSGS));
426         }
427
428         if (kqswnal_data.kqn_rxds != NULL)
429         {
430                 int   i;
431                 int   j;
432
433                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
434                 {
435                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
436
437                         for (j = 0; j < krx->krx_npages; j++)
438                                 if (krx->krx_kiov[j].kiov_page != NULL)
439                                         __free_page (krx->krx_kiov[j].kiov_page);
440                 }
441
442                 PORTAL_FREE(kqswnal_data.kqn_rxds,
443                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
444                                                     KQSW_NRXMSGS_LARGE));
445         }
446
447         /* resets flags, pointers to NULL etc */
448         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
449
450         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
451
452         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
453                 atomic_read(&portal_kmemory));
454 }
455
456 static int __init
457 kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
458                  ptl_ni_limits_t *requested_limits, 
459                  ptl_ni_limits_t *actual_limits)
460 {
461 #if MULTIRAIL_EKC
462         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
463 #else
464         ELAN3_DMA_REQUEST dmareq;
465 #endif
466         int               rc;
467         int               i;
468         int               elan_page_idx;
469         ptl_process_id_t  my_process_id;
470         int               pkmem = atomic_read(&portal_kmemory);
471
472         if (nal->nal_refct != 0) {
473                 if (actual_limits != NULL)
474                         *actual_limits = kqswnal_lib.ni.actual_limits;
475                 /* This module got the first ref */
476                 PORTAL_MODULE_USE;
477                 return (PTL_OK);
478         }
479
480         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
481
482         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
483
484         memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
485         memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
486 #if MULTIRAIL_EKC
487         kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
488 #else
489         kqswnal_rpc_failed.Status = -ECONNREFUSED;
490 #endif
491         /* ensure all pointers NULL etc */
492         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
493
494         kqswnal_data.kqn_cb = &kqswnal_lib;
495
496         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
497         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
498         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
499         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
500         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
501         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
502
503         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
504         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
505         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
506
507         spin_lock_init (&kqswnal_data.kqn_sched_lock);
508         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
509
510         spin_lock_init (&kqswnal_data.kqn_statelock);
511         init_waitqueue_head (&kqswnal_data.kqn_yield_waitq);
512
513         /* pointers/lists/locks initialised */
514         kqswnal_data.kqn_init = KQN_INIT_DATA;
515         
516 #if MULTIRAIL_EKC
517         kqswnal_data.kqn_ep = ep_system();
518         if (kqswnal_data.kqn_ep == NULL) {
519                 CERROR("Can't initialise EKC\n");
520                 kqswnal_shutdown(&kqswnal_api);
521                 return (PTL_IFACE_INVALID);
522         }
523
524         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
525                 CERROR("Can't get elan ID\n");
526                 kqswnal_shutdown(&kqswnal_api);
527                 return (PTL_IFACE_INVALID);
528         }
529 #else
530         /**********************************************************************/
531         /* Find the first Elan device */
532
533         kqswnal_data.kqn_ep = ep_device (0);
534         if (kqswnal_data.kqn_ep == NULL)
535         {
536                 CERROR ("Can't get elan device 0\n");
537                 kqswnal_shutdown(&kqswnal_api);
538                 return (PTL_IFACE_INVALID);
539         }
540 #endif
541
542         kqswnal_data.kqn_nid_offset = 0;
543         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
544         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
545         
546         /**********************************************************************/
547         /* Get the transmitter */
548
549         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
550         if (kqswnal_data.kqn_eptx == NULL)
551         {
552                 CERROR ("Can't allocate transmitter\n");
553                 kqswnal_shutdown (&kqswnal_api);
554                 return (PTL_NO_SPACE);
555         }
556
557         /**********************************************************************/
558         /* Get the receivers */
559
560         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
561                                                      EP_MSG_SVC_PORTALS_SMALL,
562                                                      KQSW_EP_ENVELOPES_SMALL);
563         if (kqswnal_data.kqn_eprx_small == NULL)
564         {
565                 CERROR ("Can't install small msg receiver\n");
566                 kqswnal_shutdown (&kqswnal_api);
567                 return (PTL_NO_SPACE);
568         }
569
570         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
571                                                      EP_MSG_SVC_PORTALS_LARGE,
572                                                      KQSW_EP_ENVELOPES_LARGE);
573         if (kqswnal_data.kqn_eprx_large == NULL)
574         {
575                 CERROR ("Can't install large msg receiver\n");
576                 kqswnal_shutdown (&kqswnal_api);
577                 return (PTL_NO_SPACE);
578         }
579
580         /**********************************************************************/
581         /* Reserve Elan address space for transmit descriptors NB we may
582          * either send the contents of associated buffers immediately, or
583          * map them for the peer to suck/blow... */
584 #if MULTIRAIL_EKC
585         kqswnal_data.kqn_ep_tx_nmh = 
586                 ep_dvma_reserve(kqswnal_data.kqn_ep,
587                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
588                                 EP_PERM_WRITE);
589         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
590                 CERROR("Can't reserve tx dma space\n");
591                 kqswnal_shutdown(&kqswnal_api);
592                 return (PTL_NO_SPACE);
593         }
594 #else
595         dmareq.Waitfn   = DDI_DMA_SLEEP;
596         dmareq.ElanAddr = (E3_Addr) 0;
597         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
598         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
599
600         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
601                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
602                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
603         if (rc != DDI_SUCCESS)
604         {
605                 CERROR ("Can't reserve rx dma space\n");
606                 kqswnal_shutdown (&kqswnal_api);
607                 return (PTL_NO_SPACE);
608         }
609 #endif
610         /**********************************************************************/
611         /* Reserve Elan address space for receive buffers */
612 #if MULTIRAIL_EKC
613         kqswnal_data.kqn_ep_rx_nmh =
614                 ep_dvma_reserve(kqswnal_data.kqn_ep,
615                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
616                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
617                                 EP_PERM_WRITE);
618         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
619                 CERROR("Can't reserve rx dma space\n");
620                 kqswnal_shutdown(&kqswnal_api);
621                 return (PTL_NO_SPACE);
622         }
623 #else
624         dmareq.Waitfn   = DDI_DMA_SLEEP;
625         dmareq.ElanAddr = (E3_Addr) 0;
626         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
627         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
628
629         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
630                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
631                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
632                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
633         if (rc != DDI_SUCCESS)
634         {
635                 CERROR ("Can't reserve rx dma space\n");
636                 kqswnal_shutdown (&kqswnal_api);
637                 return (PTL_NO_SPACE);
638         }
639 #endif
640         /**********************************************************************/
641         /* Allocate/Initialise transmit descriptors */
642
643         PORTAL_ALLOC(kqswnal_data.kqn_txds,
644                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
645         if (kqswnal_data.kqn_txds == NULL)
646         {
647                 kqswnal_shutdown (&kqswnal_api);
648                 return (PTL_NO_SPACE);
649         }
650
651         /* clear flags, null pointers etc */
652         memset(kqswnal_data.kqn_txds, 0,
653                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
654         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
655         {
656                 int           premapped_pages;
657                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
658                 int           basepage = i * KQSW_NTXMSGPAGES;
659
660                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
661                 if (ktx->ktx_buffer == NULL)
662                 {
663                         kqswnal_shutdown (&kqswnal_api);
664                         return (PTL_NO_SPACE);
665                 }
666
667                 /* Map pre-allocated buffer NOW, to save latency on transmit */
668                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
669                                                         KQSW_TX_BUFFER_SIZE);
670 #if MULTIRAIL_EKC
671                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
672                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
673                              kqswnal_data.kqn_ep_tx_nmh, basepage,
674                              &all_rails, &ktx->ktx_ebuffer);
675 #else
676                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
677                                        kqswnal_data.kqn_eptxdmahandle,
678                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
679                                        basepage, &ktx->ktx_ebuffer);
680 #endif
681                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
682                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
683
684                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
685
686                 ktx->ktx_state = KTX_IDLE;
687                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
688                 list_add_tail (&ktx->ktx_list, 
689                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
690                                                  &kqswnal_data.kqn_idletxds);
691         }
692
693         /**********************************************************************/
694         /* Allocate/Initialise receive descriptors */
695
696         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
697                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
698         if (kqswnal_data.kqn_rxds == NULL)
699         {
700                 kqswnal_shutdown (&kqswnal_api);
701                 return (PTL_NO_SPACE);
702         }
703
704         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
705                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
706
707         elan_page_idx = 0;
708         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
709         {
710 #if MULTIRAIL_EKC
711                 EP_NMD        elanbuffer;
712 #else
713                 E3_Addr       elanbuffer;
714 #endif
715                 int           j;
716                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
717
718                 if (i < KQSW_NRXMSGS_SMALL)
719                 {
720                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
721                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
722                 }
723                 else
724                 {
725                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
726                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
727                 }
728
729                 LASSERT (krx->krx_npages > 0);
730                 for (j = 0; j < krx->krx_npages; j++)
731                 {
732                         struct page *page = alloc_page(GFP_KERNEL);
733                         
734                         if (page == NULL) {
735                                 kqswnal_shutdown (&kqswnal_api);
736                                 return (PTL_NO_SPACE);
737                         }
738
739                         krx->krx_kiov[j].kiov_page = page;
740                         LASSERT(page_address(page) != NULL);
741
742 #if MULTIRAIL_EKC
743                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
744                                      page_address(page),
745                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
746                                      elan_page_idx, &all_rails, &elanbuffer);
747                         
748                         if (j == 0) {
749                                 krx->krx_elanbuffer = elanbuffer;
750                         } else {
751                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
752                                                   &krx->krx_elanbuffer, 
753                                                   &elanbuffer);
754                                 /* NB contiguous mapping */
755                                 LASSERT(rc);
756                         }
757 #else
758                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
759                                               kqswnal_data.kqn_eprxdmahandle,
760                                               page_address(page),
761                                               PAGE_SIZE, elan_page_idx,
762                                               &elanbuffer);
763                         if (j == 0)
764                                 krx->krx_elanbuffer = elanbuffer;
765
766                         /* NB contiguous mapping */
767                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
768 #endif
769                         elan_page_idx++;
770
771                 }
772         }
773         LASSERT (elan_page_idx ==
774                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
775                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
776
777         /**********************************************************************/
778         /* Network interface ready to initialise */
779
780         my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
781         my_process_id.pid = 0;
782
783         rc = lib_init(&kqswnal_lib, my_process_id,
784                       requested_limits, actual_limits);
785         if (rc != PTL_OK)
786         {
787                 CERROR ("lib_init failed %d\n", rc);
788                 kqswnal_shutdown (&kqswnal_api);
789                 return (rc);
790         }
791
792         kqswnal_data.kqn_init = KQN_INIT_LIB;
793
794         /**********************************************************************/
795         /* Queue receives, now that it's OK to run their completion callbacks */
796
797         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
798         {
799                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
800
801                 /* NB this enqueue can allocate/sleep (attr == 0) */
802 #if MULTIRAIL_EKC
803                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
804                                       &krx->krx_elanbuffer, 0);
805 #else
806                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
807                                       krx->krx_elanbuffer,
808                                       krx->krx_npages * PAGE_SIZE, 0);
809 #endif
810                 if (rc != EP_SUCCESS)
811                 {
812                         CERROR ("failed ep_queue_receive %d\n", rc);
813                         kqswnal_shutdown (&kqswnal_api);
814                         return (PTL_FAIL);
815                 }
816         }
817
818         /**********************************************************************/
819         /* Spawn scheduling threads */
820         for (i = 0; i < smp_num_cpus; i++)
821         {
822                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
823                 if (rc != 0)
824                 {
825                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
826                         kqswnal_shutdown (&kqswnal_api);
827                         return (PTL_FAIL);
828                 }
829         }
830
831         /**********************************************************************/
832         /* Connect to the router */
833         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
834         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
835
836         rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
837         if (rc != 0) {
838                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
839                 kqswnal_shutdown (&kqswnal_api);
840                 return (PTL_FAIL);
841         }
842
843         kqswnal_data.kqn_init = KQN_INIT_ALL;
844
845         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
846                "(Routing %s, initial mem %d)\n", 
847                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
848                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
849                pkmem);
850
851         return (PTL_OK);
852 }
853
854 void __exit
855 kqswnal_finalise (void)
856 {
857 #if CONFIG_SYSCTL
858         if (kqswnal_tunables.kqn_sysctl != NULL)
859                 unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
860 #endif
861         PtlNIFini(kqswnal_ni);
862
863         ptl_unregister_nal(QSWNAL);
864 }
865
866 static int __init
867 kqswnal_initialise (void)
868 {
869         int   rc;
870
871         kqswnal_api.startup  = kqswnal_startup;
872         kqswnal_api.shutdown = kqswnal_shutdown;
873         kqswnal_api.forward  = kqswnal_forward;
874         kqswnal_api.yield    = kqswnal_yield;
875         kqswnal_api.lock     = kqswnal_lock;
876         kqswnal_api.unlock   = kqswnal_unlock;
877         kqswnal_api.nal_data = &kqswnal_data;
878
879         kqswnal_lib.nal_data = &kqswnal_data;
880
881         /* Initialise dynamic tunables to defaults once only */
882         kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
883         
884         rc = ptl_register_nal(QSWNAL, &kqswnal_api);
885         if (rc != PTL_OK) {
886                 CERROR("Can't register QSWNAL: %d\n", rc);
887                 return (-ENOMEM);               /* or something... */
888         }
889
890         /* Pure gateways, and the workaround for 'EKC blocks forever until
891          * the service is active' want the NAL started up at module load
892          * time... */
893         rc = PtlNIInit(QSWNAL, 0, NULL, NULL, &kqswnal_ni);
894         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
895                 ptl_unregister_nal(QSWNAL);
896                 return (-ENODEV);
897         }
898
899 #if CONFIG_SYSCTL
900         /* Press on regardless even if registering sysctl doesn't work */
901         kqswnal_tunables.kqn_sysctl = 
902                 register_sysctl_table (kqswnal_top_ctl_table, 0);
903 #endif
904         return (0);
905 }
906
907 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
908 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
909 MODULE_LICENSE("GPL");
910
911 module_init (kqswnal_initialise);
912 module_exit (kqswnal_finalise);