Whamcloud - gitweb
* Fix for 2895
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 ptl_handle_ni_t         kqswnal_ni;
28 nal_t                   kqswnal_api;
29 kqswnal_data_t          kqswnal_data;
30
31 kpr_nal_interface_t kqswnal_router_interface = {
32         kprni_nalid:    QSWNAL,
33         kprni_arg:      NULL,
34         kprni_fwd:      kqswnal_fwd_packet,
35         kprni_notify:   NULL,                   /* we're connectionless */
36 };
37
38 #if CONFIG_SYSCTL
39 #define QSWNAL_SYSCTL  201
40
41 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
42 #define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
43
44 static ctl_table kqswnal_ctl_table[] = {
45         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
46          &kqswnal_data.kqn_optimized_gets, sizeof (int),
47          0644, NULL, &proc_dointvec},
48         {QSWNAL_SYSCTL_COPY_SMALL_FWD, "copy_small_fwd",
49          &kqswnal_data.kqn_copy_small_fwd, sizeof (int),
50          0644, NULL, &proc_dointvec},
51         {0}
52 };
53
54 static ctl_table kqswnal_top_ctl_table[] = {
55         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
56         {0}
57 };
58 #endif
59
60 static int
61 kqswnal_forward(nal_t   *nal,
62                 int     id,
63                 void    *args,  size_t args_len,
64                 void    *ret,   size_t ret_len)
65 {
66         kqswnal_data_t *k = nal->nal_data;
67         nal_cb_t       *nal_cb = k->kqn_cb;
68
69         LASSERT (nal == &kqswnal_api);
70         LASSERT (k == &kqswnal_data);
71         LASSERT (nal_cb == &kqswnal_lib);
72
73         lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
74         return (PTL_OK);
75 }
76
77 static void
78 kqswnal_lock (nal_t *nal, unsigned long *flags)
79 {
80         kqswnal_data_t *k = nal->nal_data;
81         nal_cb_t       *nal_cb = k->kqn_cb;
82
83         LASSERT (nal == &kqswnal_api);
84         LASSERT (k == &kqswnal_data);
85         LASSERT (nal_cb == &kqswnal_lib);
86
87         nal_cb->cb_cli(nal_cb,flags);
88 }
89
90 static void
91 kqswnal_unlock(nal_t *nal, unsigned long *flags)
92 {
93         kqswnal_data_t *k = nal->nal_data;
94         nal_cb_t       *nal_cb = k->kqn_cb;
95
96         LASSERT (nal == &kqswnal_api);
97         LASSERT (k == &kqswnal_data);
98         LASSERT (nal_cb == &kqswnal_lib);
99
100         nal_cb->cb_sti(nal_cb,flags);
101 }
102
103 static int
104 kqswnal_shutdown(nal_t *nal, int ni)
105 {
106         CDEBUG (D_NET, "shutdown\n");
107
108         LASSERT (nal == &kqswnal_api);
109         return (0);
110 }
111
112 static int
113 kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
114 {
115         /* NB called holding statelock */
116         wait_queue_t       wait;
117         unsigned long      now = jiffies;
118
119         CDEBUG (D_NET, "yield\n");
120
121         if (milliseconds == 0) {
122                 if (current->need_resched)
123                         schedule();
124                 return 0;
125         }
126
127         init_waitqueue_entry(&wait, current);
128         set_current_state(TASK_INTERRUPTIBLE);
129         add_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
130
131         kqswnal_unlock(nal, flags);
132
133         if (milliseconds < 0)
134                 schedule ();
135         else
136                 schedule_timeout((milliseconds * HZ) / 1000);
137         
138         kqswnal_lock(nal, flags);
139
140         remove_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
141
142         if (milliseconds > 0) {
143                 milliseconds -= ((jiffies - now) * 1000) / HZ;
144                 if (milliseconds < 0)
145                         milliseconds = 0;
146         }
147         
148         return (milliseconds);
149 }
150
151 static nal_t *
152 kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
153              ptl_pid_t requested_pid)
154 {
155         ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
156         int       nnids = kqswnal_data.kqn_nnodes;
157
158         CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
159
160         lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
161
162         return (&kqswnal_api);
163 }
164
165 int
166 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
167 {
168         unsigned long      flags;
169         struct list_head  *tmp;
170         kqswnal_tx_t      *ktx;
171         int                index = pcfg->pcfg_count;
172         int                rc = -ENOENT;
173
174         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
175
176         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
177                 if (index-- != 0)
178                         continue;
179
180                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
181
182                 pcfg->pcfg_pbuf1 = (char *)ktx;
183                 pcfg->pcfg_count = NTOH__u32(ktx->ktx_wire_hdr->type);
184                 pcfg->pcfg_size  = NTOH__u32(ktx->ktx_wire_hdr->payload_length);
185                 pcfg->pcfg_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
186                 pcfg->pcfg_nid2  = ktx->ktx_nid;
187                 pcfg->pcfg_misc  = ktx->ktx_launcher;
188                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
189                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
190                                   (ktx->ktx_state << 2);
191                 rc = 0;
192                 break;
193         }
194         
195         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
196         return (rc);
197 }
198
199 int
200 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
201 {
202         LASSERT (pcfg != NULL);
203         
204         switch (pcfg->pcfg_command) {
205         case NAL_CMD_GET_TXDESC:
206                 return (kqswnal_get_tx_desc (pcfg));
207
208         case NAL_CMD_REGISTER_MYNID:
209                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
210                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
211                         kqswnal_data.kqn_nid_offset);
212                 kqswnal_data.kqn_nid_offset =
213                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
214                 kqswnal_lib.ni.nid = pcfg->pcfg_nid;
215                 return (0);
216                 
217         default:
218                 return (-EINVAL);
219         }
220 }
221
222 void __exit
223 kqswnal_finalise (void)
224 {
225         unsigned long flags;
226         int           do_ptl_fini = 0;
227
228         switch (kqswnal_data.kqn_init)
229         {
230         default:
231                 LASSERT (0);
232
233         case KQN_INIT_ALL:
234 #if CONFIG_SYSCTL
235                 if (kqswnal_data.kqn_sysctl != NULL)
236                         unregister_sysctl_table (kqswnal_data.kqn_sysctl);
237 #endif          
238                 PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
239                 kportal_nal_unregister(QSWNAL);
240                 /* fall through */
241
242         case KQN_INIT_PTL:
243                 do_ptl_fini = 1;
244                 /* fall through */
245
246         case KQN_INIT_DATA:
247                 break;
248
249         case KQN_INIT_NOTHING:
250                 return;
251         }
252
253         /**********************************************************************/
254         /* Tell router we're shutting down.  Any router calls my threads
255          * make will now fail immediately and the router will stop calling
256          * into me. */
257         kpr_shutdown (&kqswnal_data.kqn_router);
258         
259         /**********************************************************************/
260         /* Signal the start of shutdown... */
261         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
262         kqswnal_data.kqn_shuttingdown = 1;
263         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
264
265         wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
266
267         /**********************************************************************/
268         /* wait for sends that have allocated a tx desc to launch or give up */
269         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
270                 CDEBUG(D_NET, "waiting for %d pending sends\n",
271                        atomic_read (&kqswnal_data.kqn_pending_txs));
272                 set_current_state (TASK_UNINTERRUPTIBLE);
273                 schedule_timeout (HZ);
274         }
275
276         /**********************************************************************/
277         /* close elan comms */
278 #if MULTIRAIL_EKC
279         /* Shut down receivers first; rx callbacks might try sending... */
280         if (kqswnal_data.kqn_eprx_small != NULL)
281                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
282
283         if (kqswnal_data.kqn_eprx_large != NULL)
284                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
285
286         /* NB ep_free_rcvr() returns only after we've freed off all receive
287          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
288          * means we must have completed any messages we passed to
289          * lib_parse() or kpr_fwd_start(). */
290
291         if (kqswnal_data.kqn_eptx != NULL)
292                 ep_free_xmtr (kqswnal_data.kqn_eptx);
293
294         /* NB ep_free_xmtr() returns only after all outstanding transmits
295          * have called their callback... */
296         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
297 #else
298         /* "Old" EKC just pretends to shutdown cleanly but actually
299          * provides no guarantees */
300         if (kqswnal_data.kqn_eprx_small != NULL)
301                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
302
303         if (kqswnal_data.kqn_eprx_large != NULL)
304                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
305
306         /* wait for transmits to complete */
307         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
308                 CWARN("waiting for active transmits to complete\n");
309                 set_current_state(TASK_UNINTERRUPTIBLE);
310                 schedule_timeout(HZ);
311         }
312
313         if (kqswnal_data.kqn_eptx != NULL)
314                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
315 #endif
316         /**********************************************************************/
317         /* flag threads to terminate, wake them and wait for them to die */
318         kqswnal_data.kqn_shuttingdown = 2;
319         wake_up_all (&kqswnal_data.kqn_sched_waitq);
320
321         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
322                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
323                        atomic_read (&kqswnal_data.kqn_nthreads));
324                 set_current_state (TASK_UNINTERRUPTIBLE);
325                 schedule_timeout (HZ);
326         }
327
328         /**********************************************************************/
329         /* No more threads.  No more portals, router or comms callbacks!
330          * I control the horizontals and the verticals...
331          */
332
333 #if MULTIRAIL_EKC
334         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
335         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
336         LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
337 #endif
338
339         /**********************************************************************/
340         /* Complete any blocked forwarding packets, with error
341          */
342
343         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
344         {
345                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
346                                                   kpr_fwd_desc_t, kprfd_list);
347                 list_del (&fwd->kprfd_list);
348                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
349         }
350
351         /**********************************************************************/
352         /* finalise router and portals lib */
353
354         kpr_deregister (&kqswnal_data.kqn_router);
355
356         if (do_ptl_fini) {
357                 PtlNIFini (kqswnal_ni);
358                 lib_fini (&kqswnal_lib);
359         }
360
361         /**********************************************************************/
362         /* Unmap message buffers and free all descriptors and buffers
363          */
364
365 #if MULTIRAIL_EKC
366         /* FTTB, we need to unmap any remaining mapped memory.  When
367          * ep_dvma_release() get fixed (and releases any mappings in the
368          * region), we can delete all the code from here -------->  */
369
370         if (kqswnal_data.kqn_txds != NULL) {
371                 int  i;
372
373                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
374                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
375
376                         /* If ktx has a buffer, it got mapped; unmap now.
377                          * NB only the pre-mapped stuff is still mapped
378                          * since all tx descs must be idle */
379
380                         if (ktx->ktx_buffer != NULL)
381                                 ep_dvma_unload(kqswnal_data.kqn_ep,
382                                                kqswnal_data.kqn_ep_tx_nmh,
383                                                &ktx->ktx_ebuffer);
384                 }
385         }
386
387         if (kqswnal_data.kqn_rxds != NULL) {
388                 int   i;
389
390                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
391                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
392
393                         /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
394                          * NB subsequent pages get merged */
395
396                         if (krx->krx_kiov[0].kiov_page != NULL)
397                                 ep_dvma_unload(kqswnal_data.kqn_ep,
398                                                kqswnal_data.kqn_ep_rx_nmh,
399                                                &krx->krx_elanbuffer);
400                 }
401         }
402         /* <----------- to here */
403
404         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
405                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
406
407         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
408                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
409 #else
410         if (kqswnal_data.kqn_eprxdmahandle != NULL)
411         {
412                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
413                                   kqswnal_data.kqn_eprxdmahandle, 0,
414                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
415                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
416
417                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
418                                   kqswnal_data.kqn_eprxdmahandle);
419         }
420
421         if (kqswnal_data.kqn_eptxdmahandle != NULL)
422         {
423                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
424                                   kqswnal_data.kqn_eptxdmahandle, 0,
425                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
426                                                       KQSW_NNBLK_TXMSGS));
427
428                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
429                                   kqswnal_data.kqn_eptxdmahandle);
430         }
431 #endif
432
433         if (kqswnal_data.kqn_txds != NULL)
434         {
435                 int   i;
436
437                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
438                 {
439                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
440
441                         if (ktx->ktx_buffer != NULL)
442                                 PORTAL_FREE(ktx->ktx_buffer,
443                                             KQSW_TX_BUFFER_SIZE);
444                 }
445
446                 PORTAL_FREE(kqswnal_data.kqn_txds,
447                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
448                                                      KQSW_NNBLK_TXMSGS));
449         }
450
451         if (kqswnal_data.kqn_rxds != NULL)
452         {
453                 int   i;
454                 int   j;
455
456                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
457                 {
458                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
459
460                         for (j = 0; j < krx->krx_npages; j++)
461                                 if (krx->krx_kiov[j].kiov_page != NULL)
462                                         __free_page (krx->krx_kiov[j].kiov_page);
463                 }
464
465                 PORTAL_FREE(kqswnal_data.kqn_rxds,
466                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
467                                                     KQSW_NRXMSGS_LARGE));
468         }
469
470         /* resets flags, pointers to NULL etc */
471         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
472
473         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
474
475         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
476                 atomic_read(&portal_kmemory));
477 }
478
479 static int __init
480 kqswnal_initialise (void)
481 {
482 #if MULTIRAIL_EKC
483         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
484 #else
485         ELAN3_DMA_REQUEST dmareq;
486 #endif
487         int               rc;
488         int               i;
489         int               elan_page_idx;
490         int               pkmem = atomic_read(&portal_kmemory);
491
492         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
493
494         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
495
496         kqswnal_api.forward  = kqswnal_forward;
497         kqswnal_api.shutdown = kqswnal_shutdown;
498         kqswnal_api.yield    = kqswnal_yield;
499         kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
500         kqswnal_api.lock     = kqswnal_lock;
501         kqswnal_api.unlock   = kqswnal_unlock;
502         kqswnal_api.nal_data = &kqswnal_data;
503
504         kqswnal_lib.nal_data = &kqswnal_data;
505
506         memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
507         memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
508 #if MULTIRAIL_EKC
509         kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
510 #else
511         kqswnal_rpc_failed.Status = -ECONNREFUSED;
512 #endif
513         /* ensure all pointers NULL etc */
514         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
515
516         kqswnal_data.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
517         kqswnal_data.kqn_copy_small_fwd = KQSW_COPY_SMALL_FWD;
518
519         kqswnal_data.kqn_cb = &kqswnal_lib;
520
521         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
522         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
523         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
524         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
525         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
526         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
527
528         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
529         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
530         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
531
532         spin_lock_init (&kqswnal_data.kqn_sched_lock);
533         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
534
535         spin_lock_init (&kqswnal_data.kqn_statelock);
536         init_waitqueue_head (&kqswnal_data.kqn_yield_waitq);
537
538         /* pointers/lists/locks initialised */
539         kqswnal_data.kqn_init = KQN_INIT_DATA;
540
541 #if MULTIRAIL_EKC
542         kqswnal_data.kqn_ep = ep_system();
543         if (kqswnal_data.kqn_ep == NULL) {
544                 CERROR("Can't initialise EKC\n");
545                 return (-ENODEV);
546         }
547
548         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
549                 CERROR("Can't get elan ID\n");
550                 kqswnal_finalise();
551                 return (-ENODEV);
552         }
553 #else
554         /**********************************************************************/
555         /* Find the first Elan device */
556
557         kqswnal_data.kqn_ep = ep_device (0);
558         if (kqswnal_data.kqn_ep == NULL)
559         {
560                 CERROR ("Can't get elan device 0\n");
561                 return (-ENODEV);
562         }
563 #endif
564
565         kqswnal_data.kqn_nid_offset = 0;
566         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
567         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
568         
569         /**********************************************************************/
570         /* Get the transmitter */
571
572         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
573         if (kqswnal_data.kqn_eptx == NULL)
574         {
575                 CERROR ("Can't allocate transmitter\n");
576                 kqswnal_finalise ();
577                 return (-ENOMEM);
578         }
579
580         /**********************************************************************/
581         /* Get the receivers */
582
583         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
584                                                      EP_MSG_SVC_PORTALS_SMALL,
585                                                      KQSW_EP_ENVELOPES_SMALL);
586         if (kqswnal_data.kqn_eprx_small == NULL)
587         {
588                 CERROR ("Can't install small msg receiver\n");
589                 kqswnal_finalise ();
590                 return (-ENOMEM);
591         }
592
593         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
594                                                      EP_MSG_SVC_PORTALS_LARGE,
595                                                      KQSW_EP_ENVELOPES_LARGE);
596         if (kqswnal_data.kqn_eprx_large == NULL)
597         {
598                 CERROR ("Can't install large msg receiver\n");
599                 kqswnal_finalise ();
600                 return (-ENOMEM);
601         }
602
603         /**********************************************************************/
604         /* Reserve Elan address space for transmit descriptors NB we may
605          * either send the contents of associated buffers immediately, or
606          * map them for the peer to suck/blow... */
607 #if MULTIRAIL_EKC
608         kqswnal_data.kqn_ep_tx_nmh = 
609                 ep_dvma_reserve(kqswnal_data.kqn_ep,
610                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
611                                 EP_PERM_WRITE);
612         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
613                 CERROR("Can't reserve tx dma space\n");
614                 kqswnal_finalise();
615                 return (-ENOMEM);
616         }
617 #else
618         dmareq.Waitfn   = DDI_DMA_SLEEP;
619         dmareq.ElanAddr = (E3_Addr) 0;
620         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
621         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
622
623         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
624                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
625                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
626         if (rc != DDI_SUCCESS)
627         {
628                 CERROR ("Can't reserve rx dma space\n");
629                 kqswnal_finalise ();
630                 return (-ENOMEM);
631         }
632 #endif
633         /**********************************************************************/
634         /* Reserve Elan address space for receive buffers */
635 #if MULTIRAIL_EKC
636         kqswnal_data.kqn_ep_rx_nmh =
637                 ep_dvma_reserve(kqswnal_data.kqn_ep,
638                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
639                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
640                                 EP_PERM_WRITE);
641         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
642                 CERROR("Can't reserve rx dma space\n");
643                 kqswnal_finalise();
644                 return (-ENOMEM);
645         }
646 #else
647         dmareq.Waitfn   = DDI_DMA_SLEEP;
648         dmareq.ElanAddr = (E3_Addr) 0;
649         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
650         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
651
652         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
653                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
654                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
655                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
656         if (rc != DDI_SUCCESS)
657         {
658                 CERROR ("Can't reserve rx dma space\n");
659                 kqswnal_finalise ();
660                 return (-ENOMEM);
661         }
662 #endif
663         /**********************************************************************/
664         /* Allocate/Initialise transmit descriptors */
665
666         PORTAL_ALLOC(kqswnal_data.kqn_txds,
667                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
668         if (kqswnal_data.kqn_txds == NULL)
669         {
670                 kqswnal_finalise ();
671                 return (-ENOMEM);
672         }
673
674         /* clear flags, null pointers etc */
675         memset(kqswnal_data.kqn_txds, 0,
676                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
677         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
678         {
679                 int           premapped_pages;
680                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
681                 int           basepage = i * KQSW_NTXMSGPAGES;
682
683                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
684                 if (ktx->ktx_buffer == NULL)
685                 {
686                         kqswnal_finalise ();
687                         return (-ENOMEM);
688                 }
689
690                 /* Map pre-allocated buffer NOW, to save latency on transmit */
691                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
692                                                         KQSW_TX_BUFFER_SIZE);
693 #if MULTIRAIL_EKC
694                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
695                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
696                              kqswnal_data.kqn_ep_tx_nmh, basepage,
697                              &all_rails, &ktx->ktx_ebuffer);
698 #else
699                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
700                                        kqswnal_data.kqn_eptxdmahandle,
701                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
702                                        basepage, &ktx->ktx_ebuffer);
703 #endif
704                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
705                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
706
707                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
708
709                 ktx->ktx_state = KTX_IDLE;
710                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
711                 list_add_tail (&ktx->ktx_list, 
712                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
713                                                  &kqswnal_data.kqn_idletxds);
714         }
715
716         /**********************************************************************/
717         /* Allocate/Initialise receive descriptors */
718
719         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
720                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
721         if (kqswnal_data.kqn_rxds == NULL)
722         {
723                 kqswnal_finalise ();
724                 return (-ENOMEM);
725         }
726
727         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
728                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
729
730         elan_page_idx = 0;
731         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
732         {
733 #if MULTIRAIL_EKC
734                 EP_NMD        elanbuffer;
735 #else
736                 E3_Addr       elanbuffer;
737 #endif
738                 int           j;
739                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
740
741                 if (i < KQSW_NRXMSGS_SMALL)
742                 {
743                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
744                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
745                 }
746                 else
747                 {
748                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
749                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
750                 }
751
752                 LASSERT (krx->krx_npages > 0);
753                 for (j = 0; j < krx->krx_npages; j++)
754                 {
755                         struct page *page = alloc_page(GFP_KERNEL);
756                         
757                         if (page == NULL) {
758                                 kqswnal_finalise ();
759                                 return (-ENOMEM);
760                         }
761
762                         krx->krx_kiov[j].kiov_page = page;
763                         LASSERT(page_address(page) != NULL);
764
765 #if MULTIRAIL_EKC
766                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
767                                      page_address(page),
768                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
769                                      elan_page_idx, &all_rails, &elanbuffer);
770                         
771                         if (j == 0) {
772                                 krx->krx_elanbuffer = elanbuffer;
773                         } else {
774                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
775                                                   &krx->krx_elanbuffer, 
776                                                   &elanbuffer);
777                                 /* NB contiguous mapping */
778                                 LASSERT(rc);
779                         }
780 #else
781                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
782                                               kqswnal_data.kqn_eprxdmahandle,
783                                               page_address(page),
784                                               PAGE_SIZE, elan_page_idx,
785                                               &elanbuffer);
786                         if (j == 0)
787                                 krx->krx_elanbuffer = elanbuffer;
788
789                         /* NB contiguous mapping */
790                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
791 #endif
792                         elan_page_idx++;
793
794                 }
795         }
796         LASSERT (elan_page_idx ==
797                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
798                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
799
800         /**********************************************************************/
801         /* Network interface ready to initialise */
802
803         rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
804         if (rc != 0)
805         {
806                 CERROR ("PtlNIInit failed %d\n", rc);
807                 kqswnal_finalise ();
808                 return (-ENOMEM);
809         }
810
811         kqswnal_data.kqn_init = KQN_INIT_PTL;
812
813         /**********************************************************************/
814         /* Queue receives, now that it's OK to run their completion callbacks */
815
816         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
817         {
818                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
819
820                 /* NB this enqueue can allocate/sleep (attr == 0) */
821 #if MULTIRAIL_EKC
822                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
823                                       &krx->krx_elanbuffer, 0);
824 #else
825                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
826                                       krx->krx_elanbuffer,
827                                       krx->krx_npages * PAGE_SIZE, 0);
828 #endif
829                 if (rc != EP_SUCCESS)
830                 {
831                         CERROR ("failed ep_queue_receive %d\n", rc);
832                         kqswnal_finalise ();
833                         return (-ENOMEM);
834                 }
835         }
836
837         /**********************************************************************/
838         /* Spawn scheduling threads */
839         for (i = 0; i < smp_num_cpus; i++)
840         {
841                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
842                 if (rc != 0)
843                 {
844                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
845                         kqswnal_finalise ();
846                         return (rc);
847                 }
848         }
849
850         /**********************************************************************/
851         /* Connect to the router */
852         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
853         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
854
855         rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
856         if (rc != 0) {
857                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
858                 kqswnal_finalise ();
859                 return (rc);
860         }
861
862 #if CONFIG_SYSCTL
863         /* Press on regardless even if registering sysctl doesn't work */
864         kqswnal_data.kqn_sysctl = register_sysctl_table (kqswnal_top_ctl_table, 0);
865 #endif
866
867         PORTAL_SYMBOL_REGISTER(kqswnal_ni);
868         kqswnal_data.kqn_init = KQN_INIT_ALL;
869
870         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
871                "(Routing %s, initial mem %d)\n", 
872                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
873                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
874                pkmem);
875
876         return (0);
877 }
878
879
880 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
881 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
882 MODULE_LICENSE("GPL");
883
884 module_init (kqswnal_initialise);
885 module_exit (kqswnal_finalise);
886
887 EXPORT_SYMBOL (kqswnal_ni);