Whamcloud - gitweb
- merge 2 weeks of b1_4 fixes onto HEAD
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 nal_t                   kqswnal_api;
28 kqswnal_data_t          kqswnal_data;
29 ptl_handle_ni_t         kqswnal_ni;
30 kqswnal_tunables_t      kqswnal_tunables;
31
32 kpr_nal_interface_t kqswnal_router_interface = {
33         kprni_nalid:    QSWNAL,
34         kprni_arg:      NULL,
35         kprni_fwd:      kqswnal_fwd_packet,
36         kprni_notify:   NULL,                   /* we're connectionless */
37 };
38
39 #if CONFIG_SYSCTL
40 #define QSWNAL_SYSCTL  201
41
42 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
43 #define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
44
45 static ctl_table kqswnal_ctl_table[] = {
46         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_puts",
47          &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
48          0644, NULL, &proc_dointvec},
49         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
50          &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
51          0644, NULL, &proc_dointvec},
52         {0}
53 };
54
55 static ctl_table kqswnal_top_ctl_table[] = {
56         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
57         {0}
58 };
59 #endif
60
61 int
62 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
63 {
64         unsigned long      flags;
65         struct list_head  *tmp;
66         kqswnal_tx_t      *ktx;
67         int                index = pcfg->pcfg_count;
68         int                rc = -ENOENT;
69
70         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
71
72         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
73                 if (index-- != 0)
74                         continue;
75
76                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
77
78                 pcfg->pcfg_pbuf1 = (char *)ktx;
79                 pcfg->pcfg_count = NTOH__u32(ktx->ktx_wire_hdr->type);
80                 pcfg->pcfg_size  = NTOH__u32(ktx->ktx_wire_hdr->payload_length);
81                 pcfg->pcfg_nid   = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
82                 pcfg->pcfg_nid2  = ktx->ktx_nid;
83                 pcfg->pcfg_misc  = ktx->ktx_launcher;
84                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
85                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
86                                   (ktx->ktx_state << 2);
87                 rc = 0;
88                 break;
89         }
90         
91         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
92         return (rc);
93 }
94
95 int
96 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
97 {
98         LASSERT (pcfg != NULL);
99         
100         switch (pcfg->pcfg_command) {
101         case NAL_CMD_GET_TXDESC:
102                 return (kqswnal_get_tx_desc (pcfg));
103
104         case NAL_CMD_REGISTER_MYNID:
105                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
106                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
107                         kqswnal_data.kqn_nid_offset);
108                 kqswnal_data.kqn_nid_offset =
109                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
110                 kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
111                 return (0);
112                 
113         default:
114                 return (-EINVAL);
115         }
116 }
117
118 static void
119 kqswnal_shutdown(nal_t *nal)
120 {
121         unsigned long flags;
122         int           do_lib_fini = 0;
123
124         /* NB The first ref was this module! */
125         if (nal->nal_refct != 0) {
126                 PORTAL_MODULE_UNUSE;
127                 return;
128         }
129
130         CDEBUG (D_NET, "shutdown\n");
131         LASSERT (nal == &kqswnal_api);
132
133         switch (kqswnal_data.kqn_init)
134         {
135         default:
136                 LASSERT (0);
137
138         case KQN_INIT_ALL:
139                 libcfs_nal_cmd_unregister(QSWNAL);
140                 /* fall through */
141
142         case KQN_INIT_LIB:
143                 do_lib_fini = 1;
144                 /* fall through */
145
146         case KQN_INIT_DATA:
147                 break;
148
149         case KQN_INIT_NOTHING:
150                 return;
151         }
152
153         /**********************************************************************/
154         /* Tell router we're shutting down.  Any router calls my threads
155          * make will now fail immediately and the router will stop calling
156          * into me. */
157         kpr_shutdown (&kqswnal_data.kqn_router);
158         
159         /**********************************************************************/
160         /* Signal the start of shutdown... */
161         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
162         kqswnal_data.kqn_shuttingdown = 1;
163         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
164
165         wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
166
167         /**********************************************************************/
168         /* wait for sends that have allocated a tx desc to launch or give up */
169         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
170                 CDEBUG(D_NET, "waiting for %d pending sends\n",
171                        atomic_read (&kqswnal_data.kqn_pending_txs));
172                 set_current_state (TASK_UNINTERRUPTIBLE);
173                 schedule_timeout (HZ);
174         }
175
176         /**********************************************************************/
177         /* close elan comms */
178 #if MULTIRAIL_EKC
179         /* Shut down receivers first; rx callbacks might try sending... */
180         if (kqswnal_data.kqn_eprx_small != NULL)
181                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
182
183         if (kqswnal_data.kqn_eprx_large != NULL)
184                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
185
186         /* NB ep_free_rcvr() returns only after we've freed off all receive
187          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
188          * means we must have completed any messages we passed to
189          * lib_parse() or kpr_fwd_start(). */
190
191         if (kqswnal_data.kqn_eptx != NULL)
192                 ep_free_xmtr (kqswnal_data.kqn_eptx);
193
194         /* NB ep_free_xmtr() returns only after all outstanding transmits
195          * have called their callback... */
196         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
197 #else
198         /* "Old" EKC just pretends to shutdown cleanly but actually
199          * provides no guarantees */
200         if (kqswnal_data.kqn_eprx_small != NULL)
201                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
202
203         if (kqswnal_data.kqn_eprx_large != NULL)
204                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
205
206         /* wait for transmits to complete */
207         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
208                 CWARN("waiting for active transmits to complete\n");
209                 set_current_state(TASK_UNINTERRUPTIBLE);
210                 schedule_timeout(HZ);
211         }
212
213         if (kqswnal_data.kqn_eptx != NULL)
214                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
215 #endif
216         /**********************************************************************/
217         /* flag threads to terminate, wake them and wait for them to die */
218         kqswnal_data.kqn_shuttingdown = 2;
219         wake_up_all (&kqswnal_data.kqn_sched_waitq);
220
221         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
222                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
223                        atomic_read (&kqswnal_data.kqn_nthreads));
224                 set_current_state (TASK_UNINTERRUPTIBLE);
225                 schedule_timeout (HZ);
226         }
227
228         /**********************************************************************/
229         /* No more threads.  No more portals, router or comms callbacks!
230          * I control the horizontals and the verticals...
231          */
232
233 #if MULTIRAIL_EKC
234         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
235         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
236         LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
237 #endif
238
239         /**********************************************************************/
240         /* Complete any blocked forwarding packets, with error
241          */
242
243         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
244         {
245                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
246                                                   kpr_fwd_desc_t, kprfd_list);
247                 list_del (&fwd->kprfd_list);
248                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
249         }
250
251         /**********************************************************************/
252         /* finalise router and portals lib */
253
254         kpr_deregister (&kqswnal_data.kqn_router);
255
256         if (do_lib_fini)
257                 lib_fini (&kqswnal_lib);
258
259         /**********************************************************************/
260         /* Unmap message buffers and free all descriptors and buffers
261          */
262
263 #if MULTIRAIL_EKC
264         /* FTTB, we need to unmap any remaining mapped memory.  When
265          * ep_dvma_release() get fixed (and releases any mappings in the
266          * region), we can delete all the code from here -------->  */
267
268         if (kqswnal_data.kqn_txds != NULL) {
269                 int  i;
270
271                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
272                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
273
274                         /* If ktx has a buffer, it got mapped; unmap now.
275                          * NB only the pre-mapped stuff is still mapped
276                          * since all tx descs must be idle */
277
278                         if (ktx->ktx_buffer != NULL)
279                                 ep_dvma_unload(kqswnal_data.kqn_ep,
280                                                kqswnal_data.kqn_ep_tx_nmh,
281                                                &ktx->ktx_ebuffer);
282                 }
283         }
284
285         if (kqswnal_data.kqn_rxds != NULL) {
286                 int   i;
287
288                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
289                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
290
291                         /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
292                          * NB subsequent pages get merged */
293
294                         if (krx->krx_kiov[0].kiov_page != NULL)
295                                 ep_dvma_unload(kqswnal_data.kqn_ep,
296                                                kqswnal_data.kqn_ep_rx_nmh,
297                                                &krx->krx_elanbuffer);
298                 }
299         }
300         /* <----------- to here */
301
302         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
303                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
304
305         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
306                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
307 #else
308         if (kqswnal_data.kqn_eprxdmahandle != NULL)
309         {
310                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
311                                   kqswnal_data.kqn_eprxdmahandle, 0,
312                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
313                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
314
315                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
316                                   kqswnal_data.kqn_eprxdmahandle);
317         }
318
319         if (kqswnal_data.kqn_eptxdmahandle != NULL)
320         {
321                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
322                                   kqswnal_data.kqn_eptxdmahandle, 0,
323                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
324                                                       KQSW_NNBLK_TXMSGS));
325
326                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
327                                   kqswnal_data.kqn_eptxdmahandle);
328         }
329 #endif
330
331         if (kqswnal_data.kqn_txds != NULL)
332         {
333                 int   i;
334
335                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
336                 {
337                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
338
339                         if (ktx->ktx_buffer != NULL)
340                                 PORTAL_FREE(ktx->ktx_buffer,
341                                             KQSW_TX_BUFFER_SIZE);
342                 }
343
344                 PORTAL_FREE(kqswnal_data.kqn_txds,
345                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
346                                                      KQSW_NNBLK_TXMSGS));
347         }
348
349         if (kqswnal_data.kqn_rxds != NULL)
350         {
351                 int   i;
352                 int   j;
353
354                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
355                 {
356                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
357
358                         for (j = 0; j < krx->krx_npages; j++)
359                                 if (krx->krx_kiov[j].kiov_page != NULL)
360                                         __free_page (krx->krx_kiov[j].kiov_page);
361                 }
362
363                 PORTAL_FREE(kqswnal_data.kqn_rxds,
364                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
365                                                     KQSW_NRXMSGS_LARGE));
366         }
367
368         /* resets flags, pointers to NULL etc */
369         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
370
371         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
372
373         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
374                 atomic_read(&portal_kmemory));
375 }
376
377 static int __init
378 kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
379                  ptl_ni_limits_t *requested_limits, 
380                  ptl_ni_limits_t *actual_limits)
381 {
382 #if MULTIRAIL_EKC
383         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
384 #else
385         ELAN3_DMA_REQUEST dmareq;
386 #endif
387         int               rc;
388         int               i;
389         int               elan_page_idx;
390         ptl_process_id_t  my_process_id;
391         int               pkmem = atomic_read(&portal_kmemory);
392
393         LASSERT (nal == &kqswnal_api);
394
395         if (nal->nal_refct != 0) {
396                 if (actual_limits != NULL)
397                         *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
398                 /* This module got the first ref */
399                 PORTAL_MODULE_USE;
400                 return (PTL_OK);
401         }
402
403         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
404
405         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
406
407         /* ensure all pointers NULL etc */
408         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
409
410         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
411         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
412         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
413         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
414         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
415         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
416
417         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
418         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
419         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
420
421         spin_lock_init (&kqswnal_data.kqn_sched_lock);
422         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
423
424         /* Leave kqn_rpc_success zeroed */
425 #if MULTIRAIL_EKC
426         kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
427 #else
428         kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
429 #endif
430
431         /* pointers/lists/locks initialised */
432         kqswnal_data.kqn_init = KQN_INIT_DATA;
433         
434 #if MULTIRAIL_EKC
435         kqswnal_data.kqn_ep = ep_system();
436         if (kqswnal_data.kqn_ep == NULL) {
437                 CERROR("Can't initialise EKC\n");
438                 kqswnal_shutdown(nal);
439                 return (PTL_IFACE_INVALID);
440         }
441
442         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
443                 CERROR("Can't get elan ID\n");
444                 kqswnal_shutdown(nal);
445                 return (PTL_IFACE_INVALID);
446         }
447 #else
448         /**********************************************************************/
449         /* Find the first Elan device */
450
451         kqswnal_data.kqn_ep = ep_device (0);
452         if (kqswnal_data.kqn_ep == NULL)
453         {
454                 CERROR ("Can't get elan device 0\n");
455                 kqswnal_shutdown(nal);
456                 return (PTL_IFACE_INVALID);
457         }
458 #endif
459
460         kqswnal_data.kqn_nid_offset = 0;
461         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
462         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
463         
464         /**********************************************************************/
465         /* Get the transmitter */
466
467         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
468         if (kqswnal_data.kqn_eptx == NULL)
469         {
470                 CERROR ("Can't allocate transmitter\n");
471                 kqswnal_shutdown (nal);
472                 return (PTL_NO_SPACE);
473         }
474
475         /**********************************************************************/
476         /* Get the receivers */
477
478         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
479                                                      EP_MSG_SVC_PORTALS_SMALL,
480                                                      KQSW_EP_ENVELOPES_SMALL);
481         if (kqswnal_data.kqn_eprx_small == NULL)
482         {
483                 CERROR ("Can't install small msg receiver\n");
484                 kqswnal_shutdown (nal);
485                 return (PTL_NO_SPACE);
486         }
487
488         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
489                                                      EP_MSG_SVC_PORTALS_LARGE,
490                                                      KQSW_EP_ENVELOPES_LARGE);
491         if (kqswnal_data.kqn_eprx_large == NULL)
492         {
493                 CERROR ("Can't install large msg receiver\n");
494                 kqswnal_shutdown (nal);
495                 return (PTL_NO_SPACE);
496         }
497
498         /**********************************************************************/
499         /* Reserve Elan address space for transmit descriptors NB we may
500          * either send the contents of associated buffers immediately, or
501          * map them for the peer to suck/blow... */
502 #if MULTIRAIL_EKC
503         kqswnal_data.kqn_ep_tx_nmh = 
504                 ep_dvma_reserve(kqswnal_data.kqn_ep,
505                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
506                                 EP_PERM_WRITE);
507         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
508                 CERROR("Can't reserve tx dma space\n");
509                 kqswnal_shutdown(nal);
510                 return (PTL_NO_SPACE);
511         }
512 #else
513         dmareq.Waitfn   = DDI_DMA_SLEEP;
514         dmareq.ElanAddr = (E3_Addr) 0;
515         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
516         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
517
518         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
519                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
520                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
521         if (rc != DDI_SUCCESS)
522         {
523                 CERROR ("Can't reserve rx dma space\n");
524                 kqswnal_shutdown (nal);
525                 return (PTL_NO_SPACE);
526         }
527 #endif
528         /**********************************************************************/
529         /* Reserve Elan address space for receive buffers */
530 #if MULTIRAIL_EKC
531         kqswnal_data.kqn_ep_rx_nmh =
532                 ep_dvma_reserve(kqswnal_data.kqn_ep,
533                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
534                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
535                                 EP_PERM_WRITE);
536         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
537                 CERROR("Can't reserve rx dma space\n");
538                 kqswnal_shutdown(nal);
539                 return (PTL_NO_SPACE);
540         }
541 #else
542         dmareq.Waitfn   = DDI_DMA_SLEEP;
543         dmareq.ElanAddr = (E3_Addr) 0;
544         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
545         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
546
547         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
548                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
549                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
550                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
551         if (rc != DDI_SUCCESS)
552         {
553                 CERROR ("Can't reserve rx dma space\n");
554                 kqswnal_shutdown (nal);
555                 return (PTL_NO_SPACE);
556         }
557 #endif
558         /**********************************************************************/
559         /* Allocate/Initialise transmit descriptors */
560
561         PORTAL_ALLOC(kqswnal_data.kqn_txds,
562                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
563         if (kqswnal_data.kqn_txds == NULL)
564         {
565                 kqswnal_shutdown (nal);
566                 return (PTL_NO_SPACE);
567         }
568
569         /* clear flags, null pointers etc */
570         memset(kqswnal_data.kqn_txds, 0,
571                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
572         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
573         {
574                 int           premapped_pages;
575                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
576                 int           basepage = i * KQSW_NTXMSGPAGES;
577
578                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
579                 if (ktx->ktx_buffer == NULL)
580                 {
581                         kqswnal_shutdown (nal);
582                         return (PTL_NO_SPACE);
583                 }
584
585                 /* Map pre-allocated buffer NOW, to save latency on transmit */
586                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
587                                                         KQSW_TX_BUFFER_SIZE);
588 #if MULTIRAIL_EKC
589                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
590                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
591                              kqswnal_data.kqn_ep_tx_nmh, basepage,
592                              &all_rails, &ktx->ktx_ebuffer);
593 #else
594                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
595                                        kqswnal_data.kqn_eptxdmahandle,
596                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
597                                        basepage, &ktx->ktx_ebuffer);
598 #endif
599                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
600                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
601
602                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
603
604                 ktx->ktx_state = KTX_IDLE;
605                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
606                 list_add_tail (&ktx->ktx_list, 
607                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
608                                                  &kqswnal_data.kqn_idletxds);
609         }
610
611         /**********************************************************************/
612         /* Allocate/Initialise receive descriptors */
613
614         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
615                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
616         if (kqswnal_data.kqn_rxds == NULL)
617         {
618                 kqswnal_shutdown (nal);
619                 return (PTL_NO_SPACE);
620         }
621
622         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
623                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
624
625         elan_page_idx = 0;
626         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
627         {
628 #if MULTIRAIL_EKC
629                 EP_NMD        elanbuffer;
630 #else
631                 E3_Addr       elanbuffer;
632 #endif
633                 int           j;
634                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
635
636                 if (i < KQSW_NRXMSGS_SMALL)
637                 {
638                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
639                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
640                 }
641                 else
642                 {
643                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
644                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
645                 }
646
647                 LASSERT (krx->krx_npages > 0);
648                 for (j = 0; j < krx->krx_npages; j++)
649                 {
650                         struct page *page = alloc_page(GFP_KERNEL);
651                         
652                         if (page == NULL) {
653                                 kqswnal_shutdown (nal);
654                                 return (PTL_NO_SPACE);
655                         }
656
657                         krx->krx_kiov[j].kiov_page = page;
658                         LASSERT(page_address(page) != NULL);
659
660 #if MULTIRAIL_EKC
661                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
662                                      page_address(page),
663                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
664                                      elan_page_idx, &all_rails, &elanbuffer);
665                         
666                         if (j == 0) {
667                                 krx->krx_elanbuffer = elanbuffer;
668                         } else {
669                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
670                                                   &krx->krx_elanbuffer, 
671                                                   &elanbuffer);
672                                 /* NB contiguous mapping */
673                                 LASSERT(rc);
674                         }
675 #else
676                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
677                                               kqswnal_data.kqn_eprxdmahandle,
678                                               page_address(page),
679                                               PAGE_SIZE, elan_page_idx,
680                                               &elanbuffer);
681                         if (j == 0)
682                                 krx->krx_elanbuffer = elanbuffer;
683
684                         /* NB contiguous mapping */
685                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
686 #endif
687                         elan_page_idx++;
688
689                 }
690         }
691         LASSERT (elan_page_idx ==
692                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
693                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
694
695         /**********************************************************************/
696         /* Network interface ready to initialise */
697
698         my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
699         my_process_id.pid = 0;
700
701         rc = lib_init(&kqswnal_lib, nal, my_process_id,
702                       requested_limits, actual_limits);
703         if (rc != PTL_OK)
704         {
705                 CERROR ("lib_init failed %d\n", rc);
706                 kqswnal_shutdown (nal);
707                 return (rc);
708         }
709
710         kqswnal_data.kqn_init = KQN_INIT_LIB;
711
712         /**********************************************************************/
713         /* Queue receives, now that it's OK to run their completion callbacks */
714
715         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
716         {
717                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
718
719                 /* NB this enqueue can allocate/sleep (attr == 0) */
720                 krx->krx_state = KRX_POSTED;
721 #if MULTIRAIL_EKC
722                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
723                                       &krx->krx_elanbuffer, 0);
724 #else
725                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
726                                       krx->krx_elanbuffer,
727                                       krx->krx_npages * PAGE_SIZE, 0);
728 #endif
729                 if (rc != EP_SUCCESS)
730                 {
731                         CERROR ("failed ep_queue_receive %d\n", rc);
732                         kqswnal_shutdown (nal);
733                         return (PTL_FAIL);
734                 }
735         }
736
737         /**********************************************************************/
738         /* Spawn scheduling threads */
739         for (i = 0; i < num_online_cpus(); i++) {
740                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
741                 if (rc != 0)
742                 {
743                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
744                         kqswnal_shutdown (nal);
745                         return (PTL_FAIL);
746                 }
747         }
748
749         /**********************************************************************/
750         /* Connect to the router */
751         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
752         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
753
754         rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
755         if (rc != 0) {
756                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
757                 kqswnal_shutdown (nal);
758                 return (PTL_FAIL);
759         }
760
761         kqswnal_data.kqn_init = KQN_INIT_ALL;
762
763         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
764                "(Routing %s, initial mem %d)\n", 
765                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
766                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
767                pkmem);
768
769         return (PTL_OK);
770 }
771
772 void __exit
773 kqswnal_finalise (void)
774 {
775 #if CONFIG_SYSCTL
776         if (kqswnal_tunables.kqn_sysctl != NULL)
777                 unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
778 #endif
779         PtlNIFini(kqswnal_ni);
780
781         ptl_unregister_nal(QSWNAL);
782 }
783
784 static int __init
785 kqswnal_initialise (void)
786 {
787         int   rc;
788
789         kqswnal_api.nal_ni_init = kqswnal_startup;
790         kqswnal_api.nal_ni_fini = kqswnal_shutdown;
791
792         /* Initialise dynamic tunables to defaults once only */
793         kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
794         kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
795         
796         rc = ptl_register_nal(QSWNAL, &kqswnal_api);
797         if (rc != PTL_OK) {
798                 CERROR("Can't register QSWNAL: %d\n", rc);
799                 return (-ENOMEM);               /* or something... */
800         }
801
802         /* Pure gateways, and the workaround for 'EKC blocks forever until
803          * the service is active' want the NAL started up at module load
804          * time... */
805         rc = PtlNIInit(QSWNAL, 0, NULL, NULL, &kqswnal_ni);
806         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
807                 ptl_unregister_nal(QSWNAL);
808                 return (-ENODEV);
809         }
810
811 #if CONFIG_SYSCTL
812         /* Press on regardless even if registering sysctl doesn't work */
813         kqswnal_tunables.kqn_sysctl = 
814                 register_sysctl_table (kqswnal_top_ctl_table, 0);
815 #endif
816         return (0);
817 }
818
819 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
820 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
821 MODULE_LICENSE("GPL");
822
823 module_init (kqswnal_initialise);
824 module_exit (kqswnal_finalise);