Whamcloud - gitweb
16123c21f28f9350d43a381b243477de9e717334
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6  * W. Marcus Miller - Based on ksocknal
7  *
8  * This file is part of Portals, http://www.sf.net/projects/lustre/
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "qswnal.h"
26
27 nal_t                   kqswnal_api;
28 kqswnal_data_t          kqswnal_data;
29 ptl_handle_ni_t         kqswnal_ni;
30 kqswnal_tunables_t      kqswnal_tunables;
31
32 kpr_nal_interface_t kqswnal_router_interface = {
33         kprni_nalid:    QSWNAL,
34         kprni_arg:      NULL,
35         kprni_fwd:      kqswnal_fwd_packet,
36         kprni_notify:   NULL,                   /* we're connectionless */
37 };
38
39 #if CONFIG_SYSCTL
40 #define QSWNAL_SYSCTL  201
41
42 #define QSWNAL_SYSCTL_OPTIMIZED_GETS     1
43 #define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
44
45 static ctl_table kqswnal_ctl_table[] = {
46         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_puts",
47          &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
48          0644, NULL, &proc_dointvec},
49         {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
50          &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
51          0644, NULL, &proc_dointvec},
52         {0}
53 };
54
55 static ctl_table kqswnal_top_ctl_table[] = {
56         {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
57         {0}
58 };
59 #endif
60
61 int
62 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
63 {
64         unsigned long      flags;
65         struct list_head  *tmp;
66         kqswnal_tx_t      *ktx;
67         ptl_hdr_t         *hdr;
68         int                index = pcfg->pcfg_count;
69         int                rc = -ENOENT;
70
71         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
72
73         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
74                 if (index-- != 0)
75                         continue;
76
77                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
78                 hdr = (ptl_hdr_t *)ktx->ktx_buffer;
79
80                 pcfg->pcfg_pbuf1 = (char *)ktx;
81                 pcfg->pcfg_count = le32_to_cpu(hdr->type);
82                 pcfg->pcfg_size  = le32_to_cpu(hdr->payload_length);
83                 pcfg->pcfg_nid   = le64_to_cpu(hdr->dest_nid);
84                 pcfg->pcfg_nid2  = ktx->ktx_nid;
85                 pcfg->pcfg_misc  = ktx->ktx_launcher;
86                 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
87                                   (!ktx->ktx_isnblk                    ? 0 : 2) |
88                                   (ktx->ktx_state << 2);
89                 rc = 0;
90                 break;
91         }
92         
93         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
94         return (rc);
95 }
96
97 int
98 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
99 {
100         LASSERT (pcfg != NULL);
101         
102         switch (pcfg->pcfg_command) {
103         case NAL_CMD_GET_TXDESC:
104                 return (kqswnal_get_tx_desc (pcfg));
105
106         case NAL_CMD_REGISTER_MYNID:
107                 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
108                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
109                         kqswnal_data.kqn_nid_offset);
110                 kqswnal_data.kqn_nid_offset =
111                         pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
112                 kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
113                 return (0);
114                 
115         default:
116                 return (-EINVAL);
117         }
118 }
119
120 static void
121 kqswnal_shutdown(nal_t *nal)
122 {
123         unsigned long flags;
124         int           do_lib_fini = 0;
125
126         /* NB The first ref was this module! */
127         if (nal->nal_refct != 0) {
128                 PORTAL_MODULE_UNUSE;
129                 return;
130         }
131
132         CDEBUG (D_NET, "shutdown\n");
133         LASSERT (nal == &kqswnal_api);
134
135         switch (kqswnal_data.kqn_init)
136         {
137         default:
138                 LASSERT (0);
139
140         case KQN_INIT_ALL:
141                 libcfs_nal_cmd_unregister(QSWNAL);
142                 /* fall through */
143
144         case KQN_INIT_LIB:
145                 do_lib_fini = 1;
146                 /* fall through */
147
148         case KQN_INIT_DATA:
149                 break;
150
151         case KQN_INIT_NOTHING:
152                 return;
153         }
154
155         /**********************************************************************/
156         /* Tell router we're shutting down.  Any router calls my threads
157          * make will now fail immediately and the router will stop calling
158          * into me. */
159         kpr_shutdown (&kqswnal_data.kqn_router);
160         
161         /**********************************************************************/
162         /* Signal the start of shutdown... */
163         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
164         kqswnal_data.kqn_shuttingdown = 1;
165         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
166
167         wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
168
169         /**********************************************************************/
170         /* wait for sends that have allocated a tx desc to launch or give up */
171         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
172                 CDEBUG(D_NET, "waiting for %d pending sends\n",
173                        atomic_read (&kqswnal_data.kqn_pending_txs));
174                 set_current_state (TASK_UNINTERRUPTIBLE);
175                 schedule_timeout (HZ);
176         }
177
178         /**********************************************************************/
179         /* close elan comms */
180 #if MULTIRAIL_EKC
181         /* Shut down receivers first; rx callbacks might try sending... */
182         if (kqswnal_data.kqn_eprx_small != NULL)
183                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
184
185         if (kqswnal_data.kqn_eprx_large != NULL)
186                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
187
188         /* NB ep_free_rcvr() returns only after we've freed off all receive
189          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
190          * means we must have completed any messages we passed to
191          * lib_parse() or kpr_fwd_start(). */
192
193         if (kqswnal_data.kqn_eptx != NULL)
194                 ep_free_xmtr (kqswnal_data.kqn_eptx);
195
196         /* NB ep_free_xmtr() returns only after all outstanding transmits
197          * have called their callback... */
198         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
199 #else
200         /* "Old" EKC just pretends to shutdown cleanly but actually
201          * provides no guarantees */
202         if (kqswnal_data.kqn_eprx_small != NULL)
203                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
204
205         if (kqswnal_data.kqn_eprx_large != NULL)
206                 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
207
208         /* wait for transmits to complete */
209         while (!list_empty(&kqswnal_data.kqn_activetxds)) {
210                 CWARN("waiting for active transmits to complete\n");
211                 set_current_state(TASK_UNINTERRUPTIBLE);
212                 schedule_timeout(HZ);
213         }
214
215         if (kqswnal_data.kqn_eptx != NULL)
216                 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
217 #endif
218         /**********************************************************************/
219         /* flag threads to terminate, wake them and wait for them to die */
220         kqswnal_data.kqn_shuttingdown = 2;
221         wake_up_all (&kqswnal_data.kqn_sched_waitq);
222
223         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
224                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
225                        atomic_read (&kqswnal_data.kqn_nthreads));
226                 set_current_state (TASK_UNINTERRUPTIBLE);
227                 schedule_timeout (HZ);
228         }
229
230         /**********************************************************************/
231         /* No more threads.  No more portals, router or comms callbacks!
232          * I control the horizontals and the verticals...
233          */
234
235 #if MULTIRAIL_EKC
236         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
237         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
238         LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
239 #endif
240
241         /**********************************************************************/
242         /* Complete any blocked forwarding packets, with error
243          */
244
245         while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
246         {
247                 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
248                                                   kpr_fwd_desc_t, kprfd_list);
249                 list_del (&fwd->kprfd_list);
250                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
251         }
252
253         /**********************************************************************/
254         /* finalise router and portals lib */
255
256         kpr_deregister (&kqswnal_data.kqn_router);
257
258         if (do_lib_fini)
259                 lib_fini (&kqswnal_lib);
260
261         /**********************************************************************/
262         /* Unmap message buffers and free all descriptors and buffers
263          */
264
265 #if MULTIRAIL_EKC
266         /* FTTB, we need to unmap any remaining mapped memory.  When
267          * ep_dvma_release() get fixed (and releases any mappings in the
268          * region), we can delete all the code from here -------->  */
269
270         if (kqswnal_data.kqn_txds != NULL) {
271                 int  i;
272
273                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
274                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
275
276                         /* If ktx has a buffer, it got mapped; unmap now.
277                          * NB only the pre-mapped stuff is still mapped
278                          * since all tx descs must be idle */
279
280                         if (ktx->ktx_buffer != NULL)
281                                 ep_dvma_unload(kqswnal_data.kqn_ep,
282                                                kqswnal_data.kqn_ep_tx_nmh,
283                                                &ktx->ktx_ebuffer);
284                 }
285         }
286
287         if (kqswnal_data.kqn_rxds != NULL) {
288                 int   i;
289
290                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
291                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
292
293                         /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
294                          * NB subsequent pages get merged */
295
296                         if (krx->krx_kiov[0].kiov_page != NULL)
297                                 ep_dvma_unload(kqswnal_data.kqn_ep,
298                                                kqswnal_data.kqn_ep_rx_nmh,
299                                                &krx->krx_elanbuffer);
300                 }
301         }
302         /* <----------- to here */
303
304         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
305                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
306
307         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
308                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
309 #else
310         if (kqswnal_data.kqn_eprxdmahandle != NULL)
311         {
312                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
313                                   kqswnal_data.kqn_eprxdmahandle, 0,
314                                   KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
315                                   KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
316
317                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
318                                   kqswnal_data.kqn_eprxdmahandle);
319         }
320
321         if (kqswnal_data.kqn_eptxdmahandle != NULL)
322         {
323                 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
324                                   kqswnal_data.kqn_eptxdmahandle, 0,
325                                   KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
326                                                       KQSW_NNBLK_TXMSGS));
327
328                 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
329                                   kqswnal_data.kqn_eptxdmahandle);
330         }
331 #endif
332
333         if (kqswnal_data.kqn_txds != NULL)
334         {
335                 int   i;
336
337                 for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
338                 {
339                         kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
340
341                         if (ktx->ktx_buffer != NULL)
342                                 PORTAL_FREE(ktx->ktx_buffer,
343                                             KQSW_TX_BUFFER_SIZE);
344                 }
345
346                 PORTAL_FREE(kqswnal_data.kqn_txds,
347                             sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
348                                                      KQSW_NNBLK_TXMSGS));
349         }
350
351         if (kqswnal_data.kqn_rxds != NULL)
352         {
353                 int   i;
354                 int   j;
355
356                 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
357                 {
358                         kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
359
360                         for (j = 0; j < krx->krx_npages; j++)
361                                 if (krx->krx_kiov[j].kiov_page != NULL)
362                                         __free_page (krx->krx_kiov[j].kiov_page);
363                 }
364
365                 PORTAL_FREE(kqswnal_data.kqn_rxds,
366                             sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
367                                                     KQSW_NRXMSGS_LARGE));
368         }
369
370         /* resets flags, pointers to NULL etc */
371         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
372
373         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
374
375         printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
376                 atomic_read(&portal_kmemory));
377 }
378
379 static int
380 kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
381                  ptl_ni_limits_t *requested_limits, 
382                  ptl_ni_limits_t *actual_limits)
383 {
384 #if MULTIRAIL_EKC
385         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
386 #else
387         ELAN3_DMA_REQUEST dmareq;
388 #endif
389         int               rc;
390         int               i;
391         int               elan_page_idx;
392         ptl_process_id_t  my_process_id;
393         int               pkmem = atomic_read(&portal_kmemory);
394
395         LASSERT (nal == &kqswnal_api);
396
397         if (nal->nal_refct != 0) {
398                 if (actual_limits != NULL)
399                         *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
400                 /* This module got the first ref */
401                 PORTAL_MODULE_USE;
402                 return (PTL_OK);
403         }
404
405         LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
406
407         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
408
409         /* ensure all pointers NULL etc */
410         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
411
412         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
413         INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
414         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
415         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
416         init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
417         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
418
419         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
420         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
421         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
422
423         spin_lock_init (&kqswnal_data.kqn_sched_lock);
424         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
425
426         /* Leave kqn_rpc_success zeroed */
427 #if MULTIRAIL_EKC
428         kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
429 #else
430         kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
431 #endif
432
433         /* pointers/lists/locks initialised */
434         kqswnal_data.kqn_init = KQN_INIT_DATA;
435         
436 #if MULTIRAIL_EKC
437         kqswnal_data.kqn_ep = ep_system();
438         if (kqswnal_data.kqn_ep == NULL) {
439                 CERROR("Can't initialise EKC\n");
440                 kqswnal_shutdown(nal);
441                 return (PTL_IFACE_INVALID);
442         }
443
444         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
445                 CERROR("Can't get elan ID\n");
446                 kqswnal_shutdown(nal);
447                 return (PTL_IFACE_INVALID);
448         }
449 #else
450         /**********************************************************************/
451         /* Find the first Elan device */
452
453         kqswnal_data.kqn_ep = ep_device (0);
454         if (kqswnal_data.kqn_ep == NULL)
455         {
456                 CERROR ("Can't get elan device 0\n");
457                 kqswnal_shutdown(nal);
458                 return (PTL_IFACE_INVALID);
459         }
460 #endif
461
462         kqswnal_data.kqn_nid_offset = 0;
463         kqswnal_data.kqn_nnodes     = ep_numnodes (kqswnal_data.kqn_ep);
464         kqswnal_data.kqn_elanid     = ep_nodeid (kqswnal_data.kqn_ep);
465         
466         /**********************************************************************/
467         /* Get the transmitter */
468
469         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
470         if (kqswnal_data.kqn_eptx == NULL)
471         {
472                 CERROR ("Can't allocate transmitter\n");
473                 kqswnal_shutdown (nal);
474                 return (PTL_NO_SPACE);
475         }
476
477         /**********************************************************************/
478         /* Get the receivers */
479
480         kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
481                                                      EP_MSG_SVC_PORTALS_SMALL,
482                                                      KQSW_EP_ENVELOPES_SMALL);
483         if (kqswnal_data.kqn_eprx_small == NULL)
484         {
485                 CERROR ("Can't install small msg receiver\n");
486                 kqswnal_shutdown (nal);
487                 return (PTL_NO_SPACE);
488         }
489
490         kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
491                                                      EP_MSG_SVC_PORTALS_LARGE,
492                                                      KQSW_EP_ENVELOPES_LARGE);
493         if (kqswnal_data.kqn_eprx_large == NULL)
494         {
495                 CERROR ("Can't install large msg receiver\n");
496                 kqswnal_shutdown (nal);
497                 return (PTL_NO_SPACE);
498         }
499
500         /**********************************************************************/
501         /* Reserve Elan address space for transmit descriptors NB we may
502          * either send the contents of associated buffers immediately, or
503          * map them for the peer to suck/blow... */
504 #if MULTIRAIL_EKC
505         kqswnal_data.kqn_ep_tx_nmh = 
506                 ep_dvma_reserve(kqswnal_data.kqn_ep,
507                                 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
508                                 EP_PERM_WRITE);
509         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
510                 CERROR("Can't reserve tx dma space\n");
511                 kqswnal_shutdown(nal);
512                 return (PTL_NO_SPACE);
513         }
514 #else
515         dmareq.Waitfn   = DDI_DMA_SLEEP;
516         dmareq.ElanAddr = (E3_Addr) 0;
517         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
518         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
519
520         rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
521                               KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
522                               &dmareq, &kqswnal_data.kqn_eptxdmahandle);
523         if (rc != DDI_SUCCESS)
524         {
525                 CERROR ("Can't reserve rx dma space\n");
526                 kqswnal_shutdown (nal);
527                 return (PTL_NO_SPACE);
528         }
529 #endif
530         /**********************************************************************/
531         /* Reserve Elan address space for receive buffers */
532 #if MULTIRAIL_EKC
533         kqswnal_data.kqn_ep_rx_nmh =
534                 ep_dvma_reserve(kqswnal_data.kqn_ep,
535                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
536                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
537                                 EP_PERM_WRITE);
538         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
539                 CERROR("Can't reserve rx dma space\n");
540                 kqswnal_shutdown(nal);
541                 return (PTL_NO_SPACE);
542         }
543 #else
544         dmareq.Waitfn   = DDI_DMA_SLEEP;
545         dmareq.ElanAddr = (E3_Addr) 0;
546         dmareq.Attr     = PTE_LOAD_LITTLE_ENDIAN;
547         dmareq.Perm     = ELAN_PERM_REMOTEWRITE;
548
549         rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
550                                 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
551                                 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
552                                 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
553         if (rc != DDI_SUCCESS)
554         {
555                 CERROR ("Can't reserve rx dma space\n");
556                 kqswnal_shutdown (nal);
557                 return (PTL_NO_SPACE);
558         }
559 #endif
560         /**********************************************************************/
561         /* Allocate/Initialise transmit descriptors */
562
563         PORTAL_ALLOC(kqswnal_data.kqn_txds,
564                      sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
565         if (kqswnal_data.kqn_txds == NULL)
566         {
567                 kqswnal_shutdown (nal);
568                 return (PTL_NO_SPACE);
569         }
570
571         /* clear flags, null pointers etc */
572         memset(kqswnal_data.kqn_txds, 0,
573                sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
574         for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
575         {
576                 int           premapped_pages;
577                 kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
578                 int           basepage = i * KQSW_NTXMSGPAGES;
579
580                 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
581                 if (ktx->ktx_buffer == NULL)
582                 {
583                         kqswnal_shutdown (nal);
584                         return (PTL_NO_SPACE);
585                 }
586
587                 /* Map pre-allocated buffer NOW, to save latency on transmit */
588                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
589                                                         KQSW_TX_BUFFER_SIZE);
590 #if MULTIRAIL_EKC
591                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
592                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
593                              kqswnal_data.kqn_ep_tx_nmh, basepage,
594                              &all_rails, &ktx->ktx_ebuffer);
595 #else
596                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
597                                        kqswnal_data.kqn_eptxdmahandle,
598                                        ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
599                                        basepage, &ktx->ktx_ebuffer);
600 #endif
601                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
602                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
603
604                 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
605
606                 ktx->ktx_state = KTX_IDLE;
607 #if MULTIRAIL_EKC
608                 ktx->ktx_rail = -1;             /* unset rail */
609 #endif
610                 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
611                 list_add_tail (&ktx->ktx_list, 
612                                ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
613                                                  &kqswnal_data.kqn_idletxds);
614         }
615
616         /**********************************************************************/
617         /* Allocate/Initialise receive descriptors */
618
619         PORTAL_ALLOC (kqswnal_data.kqn_rxds,
620                       sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
621         if (kqswnal_data.kqn_rxds == NULL)
622         {
623                 kqswnal_shutdown (nal);
624                 return (PTL_NO_SPACE);
625         }
626
627         memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
628                sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
629
630         elan_page_idx = 0;
631         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
632         {
633 #if MULTIRAIL_EKC
634                 EP_NMD        elanbuffer;
635 #else
636                 E3_Addr       elanbuffer;
637 #endif
638                 int           j;
639                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
640
641                 if (i < KQSW_NRXMSGS_SMALL)
642                 {
643                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
644                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
645                 }
646                 else
647                 {
648                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
649                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
650                 }
651
652                 LASSERT (krx->krx_npages > 0);
653                 for (j = 0; j < krx->krx_npages; j++)
654                 {
655                         struct page *page = alloc_page(GFP_KERNEL);
656                         
657                         if (page == NULL) {
658                                 kqswnal_shutdown (nal);
659                                 return (PTL_NO_SPACE);
660                         }
661
662                         krx->krx_kiov[j].kiov_page = page;
663                         LASSERT(page_address(page) != NULL);
664
665 #if MULTIRAIL_EKC
666                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
667                                      page_address(page),
668                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
669                                      elan_page_idx, &all_rails, &elanbuffer);
670                         
671                         if (j == 0) {
672                                 krx->krx_elanbuffer = elanbuffer;
673                         } else {
674                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
675                                                   &krx->krx_elanbuffer, 
676                                                   &elanbuffer);
677                                 /* NB contiguous mapping */
678                                 LASSERT(rc);
679                         }
680 #else
681                         elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
682                                               kqswnal_data.kqn_eprxdmahandle,
683                                               page_address(page),
684                                               PAGE_SIZE, elan_page_idx,
685                                               &elanbuffer);
686                         if (j == 0)
687                                 krx->krx_elanbuffer = elanbuffer;
688
689                         /* NB contiguous mapping */
690                         LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
691 #endif
692                         elan_page_idx++;
693
694                 }
695         }
696         LASSERT (elan_page_idx ==
697                  (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
698                  (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
699
700         /**********************************************************************/
701         /* Network interface ready to initialise */
702
703         my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
704         my_process_id.pid = requested_pid;
705
706         rc = lib_init(&kqswnal_lib, nal, my_process_id,
707                       requested_limits, actual_limits);
708         if (rc != PTL_OK)
709         {
710                 CERROR ("lib_init failed %d\n", rc);
711                 kqswnal_shutdown (nal);
712                 return (rc);
713         }
714
715         kqswnal_data.kqn_init = KQN_INIT_LIB;
716
717         /**********************************************************************/
718         /* Queue receives, now that it's OK to run their completion callbacks */
719
720         for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
721         {
722                 kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
723
724                 /* NB this enqueue can allocate/sleep (attr == 0) */
725                 krx->krx_state = KRX_POSTED;
726 #if MULTIRAIL_EKC
727                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
728                                       &krx->krx_elanbuffer, 0);
729 #else
730                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
731                                       krx->krx_elanbuffer,
732                                       krx->krx_npages * PAGE_SIZE, 0);
733 #endif
734                 if (rc != EP_SUCCESS)
735                 {
736                         CERROR ("failed ep_queue_receive %d\n", rc);
737                         kqswnal_shutdown (nal);
738                         return (PTL_FAIL);
739                 }
740         }
741
742         /**********************************************************************/
743         /* Spawn scheduling threads */
744         for (i = 0; i < num_online_cpus(); i++) {
745                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
746                 if (rc != 0)
747                 {
748                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
749                         kqswnal_shutdown (nal);
750                         return (PTL_FAIL);
751                 }
752         }
753
754         /**********************************************************************/
755         /* Connect to the router */
756         rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
757         CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
758
759         rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
760         if (rc != 0) {
761                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
762                 kqswnal_shutdown (nal);
763                 return (PTL_FAIL);
764         }
765
766         kqswnal_data.kqn_init = KQN_INIT_ALL;
767
768         printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
769                "(Routing %s, initial mem %d)\n", 
770                kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
771                kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
772                pkmem);
773
774         return (PTL_OK);
775 }
776
777 void __exit
778 kqswnal_finalise (void)
779 {
780 #if CONFIG_SYSCTL
781         if (kqswnal_tunables.kqn_sysctl != NULL)
782                 unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
783 #endif
784         PtlNIFini(kqswnal_ni);
785
786         ptl_unregister_nal(QSWNAL);
787 }
788
789 static int __init
790 kqswnal_initialise (void)
791 {
792         int   rc;
793
794         kqswnal_api.nal_ni_init = kqswnal_startup;
795         kqswnal_api.nal_ni_fini = kqswnal_shutdown;
796
797         /* Initialise dynamic tunables to defaults once only */
798         kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
799         kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
800         
801         rc = ptl_register_nal(QSWNAL, &kqswnal_api);
802         if (rc != PTL_OK) {
803                 CERROR("Can't register QSWNAL: %d\n", rc);
804                 return (-ENOMEM);               /* or something... */
805         }
806
807         /* Pure gateways, and the workaround for 'EKC blocks forever until
808          * the service is active' want the NAL started up at module load
809          * time... */
810         rc = PtlNIInit(QSWNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kqswnal_ni);
811         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
812                 ptl_unregister_nal(QSWNAL);
813                 return (-ENODEV);
814         }
815
816 #if CONFIG_SYSCTL
817         /* Press on regardless even if registering sysctl doesn't work */
818         kqswnal_tunables.kqn_sysctl = 
819                 register_sysctl_table (kqswnal_top_ctl_table, 0);
820 #endif
821         return (0);
822 }
823
824 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
825 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
826 MODULE_LICENSE("GPL");
827
828 module_init (kqswnal_initialise);
829 module_exit (kqswnal_finalise);