Whamcloud - gitweb
3b3b5d45b42ede5e257c956218b1ded74d8132bf
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 ptl_handle_ni_t         kqswnal_ni;
28 nal_t                   kqswnal_api;
29 kqswnal_data_t          kqswnal_data;
30
31 kpr_nal_interface_t kqswnal_router_interface = {
32         kprni_nalid:    QSWNAL,
33         kprni_arg:      NULL,
34         kprni_fwd:      kqswnal_fwd_packet,
35         kprni_notify:   NULL,                   /* we're connectionless */
36 };
37
38 #if CONFIG_SYSCTL
39 #define QSWNAL_SYSCTL  201
40
41 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
42 #define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
43
44 static ctl_table kqswnal_ctl_table[] = {
45         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
46          &kqswnal_data.kqn_optimized_gets, sizeof (int),
47          0644, NULL, &proc_dointvec},
48         {QSWNAL_SYSCTL_COPY_SMALL_FWD, "copy_small_fwd",
49          &kqswnal_data.kqn_copy_small_fwd, sizeof (int),
50          0644, NULL, &proc_dointvec},
51         {0}
52 };
53
54 static ctl_table kqswnal_top_ctl_table[] = {
55         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
56         {0}
57 };
58 #endif
59
60 static int
61 kqswnal_forward(nal_t   *nal,
62                 int     id,
63                 void    *args,  size_t args_len,
64                 void    *ret,   size_t ret_len)
65 {
66         kqswnal_data_t *k = nal->nal_data;
67         nal_cb_t       *nal_cb = k->kqn_cb;
68
69         LASSERT (nal == &kqswnal_api);
70         LASSERT (k == &kqswnal_data);
71         LASSERT (nal_cb == &kqswnal_lib);
72
73         lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
74         return (PTL_OK);
75 }
76
77 static void
78 kqswnal_lock (nal_t *nal, unsigned long *flags)
79 {
80         kqswnal_data_t *k = nal->nal_data;
81         nal_cb_t       *nal_cb = k->kqn_cb;
82
83         LASSERT (nal == &kqswnal_api);
84         LASSERT (k == &kqswnal_data);
85         LASSERT (nal_cb == &kqswnal_lib);
86
87         nal_cb->cb_cli(nal_cb,flags);
88 }
89
90 static void
91 kqswnal_unlock(nal_t *nal, unsigned long *flags)
92 {
93         kqswnal_data_t *k = nal->nal_data;
94         nal_cb_t       *nal_cb = k->kqn_cb;
95
96         LASSERT (nal == &kqswnal_api);
97         LASSERT (k == &kqswnal_data);
98         LASSERT (nal_cb == &kqswnal_lib);
99
100         nal_cb->cb_sti(nal_cb,flags);
101 }
102
103 static int
104 kqswnal_shutdown(nal_t *nal, int ni)
105 {
106         CDEBUG (D_NET, "shutdown\n");
107
108         LASSERT (nal == &kqswnal_api);
109         return (0);
110 }
111
112 static void
113 kqswnal_yield( nal_t *nal )
114 {
115         CDEBUG (D_NET, "yield\n");
116
117         if (current->need_resched)
118                 schedule();
119         return;
120 }
121
122 static nal_t *
123 kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
124              ptl_pid_t requested_pid)
125 {
126         ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
127         int       nnids = kqswnal_data.kqn_nnodes;
128
129         CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
130
131         lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
132
133         return (&kqswnal_api);
134 }
135
136 int
137 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
138 {
139         unsigned long      flags;
140         struct list_head  *tmp;
141         kqswnal_tx_t      *ktx;
142         int                index = pcfg->pcfg_count;
143         int                rc = -ENOENT;
144
145         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
146
147         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
148                 if (index-- != 0)
149                         continue;
150
151                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
152
153                 pcfg->pcfg_pbuf1 = (char *)ktx;
154                 pcfg->pcfg_count = NTOH__u32(ktx->ktx_wire_hdr->type);
155                 pcfg->pcfg_size  = NTOH__u32(ktx->ktx_wire_hdr->payload_length);
156                 pcfg->pcfg_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
157                 pcfg->pcfg_nid2  = ktx->ktx_nid;
158                 pcfg->pcfg_misc  = ktx->ktx_launcher;
159                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
160                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
161                                   (ktx->ktx_state << 2);
162                 rc = 0;
163                 break;
164         }
165         
166         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
167         return (rc);
168 }
169
170 int
171 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
172 {
173         LASSERT (pcfg != NULL);
174         
175         switch (pcfg->pcfg_command) {
176         case NAL_CMD_GET_TXDESC:
177                 return (kqswnal_get_tx_desc (pcfg));
178
179         case NAL_CMD_REGISTER_MYNID:
180                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
181                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
182                         kqswnal_data.kqn_nid_offset);
183                 kqswnal_data.kqn_nid_offset =
184                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
185                 kqswnal_lib.ni.nid = pcfg->pcfg_nid;
186                 return (0);
187                 
188         default:
189                 return (-EINVAL);
190         }
191 }
192
193 void __exit
194 kqswnal_finalise (void)
195 {
196         switch (kqswnal_data.kqn_init)
197         {
198         default:
199                 LASSERT (0);
200
201         case KQN_INIT_ALL:
202 #if CONFIG_SYSCTL
203                 if (kqswnal_data.kqn_sysctl != NULL)
204                         unregister_sysctl_table (kqswnal_data.kqn_sysctl);
205 #endif          
206                 PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
207                 kportal_nal_unregister(QSWNAL);
208                 /* fall through */
209
210         case KQN_INIT_PTL:
211                 PtlNIFini (kqswnal_ni);
212                 lib_fini (&kqswnal_lib);
213                 /* fall through */
214
215         case KQN_INIT_DATA:
216                 break;
217
218         case KQN_INIT_NOTHING:
219                 return;
220         }
221
222         /**********************************************************************/
223         /* Make router stop her calling me and fail any more call-ins */
224         kpr_shutdown (&kqswnal_data.kqn_router);
225
226         /**********************************************************************/
227         /* flag threads we've started to terminate and wait for all to ack */
228
229         kqswnal_data.kqn_shuttingdown = 1;
230         wake_up_all (&kqswnal_data.kqn_sched_waitq);
231
232         while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
233                 CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
234                        atomic_read (&kqswnal_data.kqn_nthreads_running));
235                 set_current_state (TASK_UNINTERRUPTIBLE);
236                 schedule_timeout (HZ);
237         }
238
239         /**********************************************************************/
240         /* close elan comms */
241 #if MULTIRAIL_EKC
242         if (kqswnal_data.kqn_eprx_small != NULL)
243                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
244
245         if (kqswnal_data.kqn_eprx_large != NULL)
246                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
247
248         if (kqswnal_data.kqn_eptx != NULL)
249                 ep_free_xmtr (kqswnal_data.kqn_eptx);
250
251         /* freeing the xmtr completes all txs pdq */
252         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
253 #else
254         if (kqswnal_data.kqn_eprx_small != NULL)
255                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
256
257         if (kqswnal_data.kqn_eprx_large != NULL)
258                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
259
260         /* wait for transmits to complete */
261         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
262                 CWARN("waiting for active transmits to complete\n");
263                 set_current_state(TASK_UNINTERRUPTIBLE);
264                 schedule_timeout(HZ);
265         }
266
267         if (kqswnal_data.kqn_eptx != NULL)
268                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
269 #endif
270         /**********************************************************************/
271         /* flag threads to terminate, wake them and wait for them to die */
272
273         kqswnal_data.kqn_shuttingdown = 2;
274         wake_up_all (&kqswnal_data.kqn_sched_waitq);
275
276         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
277                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
278                        atomic_read (&kqswnal_data.kqn_nthreads));
279                 set_current_state (TASK_UNINTERRUPTIBLE);
280                 schedule_timeout (HZ);
281         }
282
283         /**********************************************************************/
284         /* No more threads.  No more portals, router or comms callbacks!
285          * I control the horizontals and the verticals...
286          */
287
288 #if MULTIRAIL_EKC
289         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
290 #endif
291
292         /**********************************************************************/
293         /* Complete any blocked forwarding packets with error
294          */
295
296         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
297         {
298                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
299                                                   kpr_fwd_desc_t, kprfd_list);
300                 list_del (&fwd->kprfd_list);
301                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
302         }
303
304         while (!list_empty (&kqswnal_data.kqn_delayedfwds))
305         {
306                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
307                                                   kpr_fwd_desc_t, kprfd_list);
308                 list_del (&fwd->kprfd_list);
309                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
310         }
311
312         /**********************************************************************/
313         /* Wait for router to complete any packets I sent her
314          */
315
316         kpr_deregister (&kqswnal_data.kqn_router);
317
318
319         /**********************************************************************/
320         /* Unmap message buffers and free all descriptors and buffers
321          */
322
323 #if MULTIRAIL_EKC
324         /* FTTB, we need to unmap any remaining mapped memory.  When
325          * ep_dvma_release() get fixed (and releases any mappings in the
326          * region), we can delete all the code from here -------->  */
327
328         if (kqswnal_data.kqn_txds != NULL) {
329                 int  i;
330
331                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
332                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
333
334                         /* If ktx has a buffer, it got mapped; unmap now.
335                          * NB only the pre-mapped stuff is still mapped
336                          * since all tx descs must be idle */
337
338                         if (ktx->ktx_buffer != NULL)
339                                 ep_dvma_unload(kqswnal_data.kqn_ep,
340                                                kqswnal_data.kqn_ep_tx_nmh,
341                                                &ktx->ktx_ebuffer);
342                 }
343         }
344
345         if (kqswnal_data.kqn_rxds != NULL) {
346                 int   i;
347
348                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
349                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
350
351                         /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
352                          * NB subsequent pages get merged */
353
354                         if (krx->krx_kiov[0].kiov_page != NULL)
355                                 ep_dvma_unload(kqswnal_data.kqn_ep,
356                                                kqswnal_data.kqn_ep_rx_nmh,
357                                                &krx->krx_elanbuffer);
358                 }
359         }
360         /* <----------- to here */
361
362         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
363                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
364
365         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
366                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
367 #else
368         if (kqswnal_data.kqn_eprxdmahandle != NULL)
369         {
370                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
371                                   kqswnal_data.kqn_eprxdmahandle, 0,
372                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
373                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
374
375                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
376                                   kqswnal_data.kqn_eprxdmahandle);
377         }
378
379         if (kqswnal_data.kqn_eptxdmahandle != NULL)
380         {
381                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
382                                   kqswnal_data.kqn_eptxdmahandle, 0,
383                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
384                                                       KQSW_NNBLK_TXMSGS));
385
386                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
387                                   kqswnal_data.kqn_eptxdmahandle);
388         }
389 #endif
390
391         if (kqswnal_data.kqn_txds != NULL)
392         {
393                 int   i;
394
395                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
396                 {
397                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
398
399                         if (ktx->ktx_buffer != NULL)
400                                 PORTAL_FREE(ktx->ktx_buffer,
401                                             KQSW_TX_BUFFER_SIZE);
402                 }
403
404                 PORTAL_FREE(kqswnal_data.kqn_txds,
405                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
406                                                      KQSW_NNBLK_TXMSGS));
407         }
408
409         if (kqswnal_data.kqn_rxds != NULL)
410         {
411                 int   i;
412                 int   j;
413
414                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
415                 {
416                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
417
418                         for (j = 0; j < krx->krx_npages; j++)
419                                 if (krx->krx_kiov[j].kiov_page != NULL)
420                                         __free_page (krx->krx_kiov[j].kiov_page);
421                 }
422
423                 PORTAL_FREE(kqswnal_data.kqn_rxds,
424                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
425                                                     KQSW_NRXMSGS_LARGE));
426         }
427
428         /* resets flags, pointers to NULL etc */
429         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
430
431         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
432
433         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
434                 atomic_read(&portal_kmemory));
435 }
436
437 static int __init
438 kqswnal_initialise (void)
439 {
440 #if MULTIRAIL_EKC
441         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
442 #else
443         ELAN3_DMA_REQUEST dmareq;
444 #endif
445         int               rc;
446         int               i;
447         int               elan_page_idx;
448         int               pkmem = atomic_read(&portal_kmemory);
449
450         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
451
452         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
453
454         kqswnal_api.forward  = kqswnal_forward;
455         kqswnal_api.shutdown = kqswnal_shutdown;
456         kqswnal_api.yield    = kqswnal_yield;
457         kqswnal_api.validate = NULL;            /* our api validate is a NOOP */
458         kqswnal_api.lock     = kqswnal_lock;
459         kqswnal_api.unlock   = kqswnal_unlock;
460         kqswnal_api.nal_data = &kqswnal_data;
461
462         kqswnal_lib.nal_data = &kqswnal_data;
463
464         memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
465         memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
466 #if MULTIRAIL_EKC
467         kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
468 #else
469         kqswnal_rpc_failed.Status = -ECONNREFUSED;
470 #endif
471         /* ensure all pointers NULL etc */
472         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
473
474         kqswnal_data.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
475         kqswnal_data.kqn_copy_small_fwd = KQSW_COPY_SMALL_FWD;
476
477         kqswnal_data.kqn_cb = &kqswnal_lib;
478
479         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
480         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
481         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
482         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
483         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
484         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
485
486         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
487         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
488         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
489
490         spin_lock_init (&kqswnal_data.kqn_sched_lock);
491         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
492
493         spin_lock_init (&kqswnal_data.kqn_statelock);
494
495         /* pointers/lists/locks initialised */
496         kqswnal_data.kqn_init = KQN_INIT_DATA;
497
498 #if MULTIRAIL_EKC
499         kqswnal_data.kqn_ep = ep_system();
500         if (kqswnal_data.kqn_ep == NULL) {
501                 CERROR("Can't initialise EKC\n");
502                 return (-ENODEV);
503         }
504
505         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
506                 CERROR("Can't get elan ID\n");
507                 kqswnal_finalise();
508                 return (-ENODEV);
509         }
510 #else
511         /**********************************************************************/
512         /* Find the first Elan device */
513
514         kqswnal_data.kqn_ep = ep_device (0);
515         if (kqswnal_data.kqn_ep == NULL)
516         {
517                 CERROR ("Can't get elan device 0\n");
518                 return (-ENODEV);
519         }
520 #endif
521
522         kqswnal_data.kqn_nid_offset = 0;
523         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
524         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
525         
526         /**********************************************************************/
527         /* Get the transmitter */
528
529         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
530         if (kqswnal_data.kqn_eptx == NULL)
531         {
532                 CERROR ("Can't allocate transmitter\n");
533                 kqswnal_finalise ();
534                 return (-ENOMEM);
535         }
536
537         /**********************************************************************/
538         /* Get the receivers */
539
540         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
541                                                      EP_MSG_SVC_PORTALS_SMALL,
542                                                      KQSW_EP_ENVELOPES_SMALL);
543         if (kqswnal_data.kqn_eprx_small == NULL)
544         {
545                 CERROR ("Can't install small msg receiver\n");
546                 kqswnal_finalise ();
547                 return (-ENOMEM);
548         }
549
550         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
551                                                      EP_MSG_SVC_PORTALS_LARGE,
552                                                      KQSW_EP_ENVELOPES_LARGE);
553         if (kqswnal_data.kqn_eprx_large == NULL)
554         {
555                 CERROR ("Can't install large msg receiver\n");
556                 kqswnal_finalise ();
557                 return (-ENOMEM);
558         }
559
560         /**********************************************************************/
561         /* Reserve Elan address space for transmit descriptors NB we may
562          * either send the contents of associated buffers immediately, or
563          * map them for the peer to suck/blow... */
564 #if MULTIRAIL_EKC
565         kqswnal_data.kqn_ep_tx_nmh = 
566                 ep_dvma_reserve(kqswnal_data.kqn_ep,
567                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
568                                 EP_PERM_WRITE);
569         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
570                 CERROR("Can't reserve tx dma space\n");
571                 kqswnal_finalise();
572                 return (-ENOMEM);
573         }
574 #else
575         dmareq.Waitfn   = DDI_DMA_SLEEP;
576         dmareq.ElanAddr = (E3_Addr) 0;
577         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
578         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
579
580         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
581                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
582                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
583         if (rc != DDI_SUCCESS)
584         {
585                 CERROR ("Can't reserve rx dma space\n");
586                 kqswnal_finalise ();
587                 return (-ENOMEM);
588         }
589 #endif
590         /**********************************************************************/
591         /* Reserve Elan address space for receive buffers */
592 #if MULTIRAIL_EKC
593         kqswnal_data.kqn_ep_rx_nmh =
594                 ep_dvma_reserve(kqswnal_data.kqn_ep,
595                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
596                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
597                                 EP_PERM_WRITE);
598         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
599                 CERROR("Can't reserve rx dma space\n");
600                 kqswnal_finalise();
601                 return (-ENOMEM);
602         }
603 #else
604         dmareq.Waitfn   = DDI_DMA_SLEEP;
605         dmareq.ElanAddr = (E3_Addr) 0;
606         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
607         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
608
609         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
610                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
611                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
612                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
613         if (rc != DDI_SUCCESS)
614         {
615                 CERROR ("Can't reserve rx dma space\n");
616                 kqswnal_finalise ();
617                 return (-ENOMEM);
618         }
619 #endif
620         /**********************************************************************/
621         /* Allocate/Initialise transmit descriptors */
622
623         PORTAL_ALLOC(kqswnal_data.kqn_txds,
624                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
625         if (kqswnal_data.kqn_txds == NULL)
626         {
627                 kqswnal_finalise ();
628                 return (-ENOMEM);
629         }
630
631         /* clear flags, null pointers etc */
632         memset(kqswnal_data.kqn_txds, 0,
633                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
634         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
635         {
636                 int           premapped_pages;
637                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
638                 int           basepage = i * KQSW_NTXMSGPAGES;
639
640                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
641                 if (ktx->ktx_buffer == NULL)
642                 {
643                         kqswnal_finalise ();
644                         return (-ENOMEM);
645                 }
646
647                 /* Map pre-allocated buffer NOW, to save latency on transmit */
648                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
649                                                         KQSW_TX_BUFFER_SIZE);
650 #if MULTIRAIL_EKC
651                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
652                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
653                              kqswnal_data.kqn_ep_tx_nmh, basepage,
654                              &all_rails, &ktx->ktx_ebuffer);
655 #else
656                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
657                                        kqswnal_data.kqn_eptxdmahandle,
658                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
659                                        basepage, &ktx->ktx_ebuffer);
660 #endif
661                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
662                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
663
664                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
665
666                 ktx->ktx_state = KTX_IDLE;
667                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
668                 list_add_tail (&ktx->ktx_list, 
669                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
670                                                  &kqswnal_data.kqn_idletxds);
671         }
672
673         /**********************************************************************/
674         /* Allocate/Initialise receive descriptors */
675
676         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
677                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
678         if (kqswnal_data.kqn_rxds == NULL)
679         {
680                 kqswnal_finalise ();
681                 return (-ENOMEM);
682         }
683
684         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
685                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
686
687         elan_page_idx = 0;
688         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
689         {
690 #if MULTIRAIL_EKC
691                 EP_NMD        elanbuffer;
692 #else
693                 E3_Addr       elanbuffer;
694 #endif
695                 int           j;
696                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
697
698                 if (i < KQSW_NRXMSGS_SMALL)
699                 {
700                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
701                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
702                 }
703                 else
704                 {
705                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
706                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
707                 }
708
709                 LASSERT (krx->krx_npages > 0);
710                 for (j = 0; j < krx->krx_npages; j++)
711                 {
712                         struct page *page = alloc_page(GFP_KERNEL);
713                         
714                         if (page == NULL) {
715                                 kqswnal_finalise ();
716                                 return (-ENOMEM);
717                         }
718
719                         krx->krx_kiov[j].kiov_page = page;
720                         LASSERT(page_address(page) != NULL);
721
722 #if MULTIRAIL_EKC
723                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
724                                      page_address(page),
725                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
726                                      elan_page_idx, &all_rails, &elanbuffer);
727                         
728                         if (j == 0) {
729                                 krx->krx_elanbuffer = elanbuffer;
730                         } else {
731                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
732                                                   &krx->krx_elanbuffer, 
733                                                   &elanbuffer);
734                                 /* NB contiguous mapping */
735                                 LASSERT(rc);
736                         }
737 #else
738                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
739                                               kqswnal_data.kqn_eprxdmahandle,
740                                               page_address(page),
741                                               PAGE_SIZE, elan_page_idx,
742                                               &elanbuffer);
743                         if (j == 0)
744                                 krx->krx_elanbuffer = elanbuffer;
745
746                         /* NB contiguous mapping */
747                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
748 #endif
749                         elan_page_idx++;
750
751                 }
752         }
753         LASSERT (elan_page_idx ==
754                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
755                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
756
757         /**********************************************************************/
758         /* Network interface ready to initialise */
759
760         rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
761         if (rc != 0)
762         {
763                 CERROR ("PtlNIInit failed %d\n", rc);
764                 kqswnal_finalise ();
765                 return (-ENOMEM);
766         }
767
768         kqswnal_data.kqn_init = KQN_INIT_PTL;
769
770         /**********************************************************************/
771         /* Queue receives, now that it's OK to run their completion callbacks */
772
773         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
774         {
775                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
776
777                 /* NB this enqueue can allocate/sleep (attr == 0) */
778 #if MULTIRAIL_EKC
779                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
780                                       &krx->krx_elanbuffer, 0);
781 #else
782                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
783                                       krx->krx_elanbuffer,
784                                       krx->krx_npages * PAGE_SIZE, 0);
785 #endif
786                 if (rc != EP_SUCCESS)
787                 {
788                         CERROR ("failed ep_queue_receive %d\n", rc);
789                         kqswnal_finalise ();
790                         return (-ENOMEM);
791                 }
792         }
793
794         /**********************************************************************/
795         /* Spawn scheduling threads */
796         for (i = 0; i < smp_num_cpus; i++)
797         {
798                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
799                 if (rc != 0)
800                 {
801                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
802                         kqswnal_finalise ();
803                         return (rc);
804                 }
805         }
806
807         /**********************************************************************/
808         /* Connect to the router */
809         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
810         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
811
812         rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
813         if (rc != 0) {
814                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
815                 kqswnal_finalise ();
816                 return (rc);
817         }
818
819 #if CONFIG_SYSCTL
820         /* Press on regardless even if registering sysctl doesn't work */
821         kqswnal_data.kqn_sysctl = register_sysctl_table (kqswnal_top_ctl_table, 0);
822 #endif
823
824         PORTAL_SYMBOL_REGISTER(kqswnal_ni);
825         kqswnal_data.kqn_init = KQN_INIT_ALL;
826
827         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
828                "(Routing %s, initial mem %d)\n", 
829                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
830                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
831                pkmem);
832
833         return (0);
834 }
835
836
837 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
838 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
839 MODULE_LICENSE("GPL");
840
841 module_init (kqswnal_initialise);
842 module_exit (kqswnal_finalise);
843
844 EXPORT_SYMBOL (kqswnal_ni);