2 * Copyright (C) 2002-2004 Cluster File Systems, Inc.
3 * Author: Eric Barton <eric@bartonsoftware.com>
5 * This file is part of Portals, http://www.lustre.org
7 * Portals is free software; you can redistribute it and/or
8 * modify it under the terms of version 2 of the GNU General Public
9 * License as published by the Free Software Foundation.
11 * Portals is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with Portals; if not, write to the Free Software
18 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 kqswnal_data_t kqswnal_data;
26 ptl_handle_ni_t kqswnal_ni;
27 kqswnal_tunables_t kqswnal_tunables;
29 kpr_nal_interface_t kqswnal_router_interface = {
32 kprni_fwd: kqswnal_fwd_packet,
33 kprni_notify: NULL, /* we're connectionless */
37 #define QSWNAL_SYSCTL 201
39 #define QSWNAL_SYSCTL_OPTIMIZED_GETS 1
40 #define QSWNAL_SYSCTL_OPTIMIZED_PUTS 2
42 static ctl_table kqswnal_ctl_table[] = {
43 {QSWNAL_SYSCTL_OPTIMIZED_PUTS, "optimized_puts",
44 &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
45 0644, NULL, &proc_dointvec},
46 {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
47 &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
48 0644, NULL, &proc_dointvec},
52 static ctl_table kqswnal_top_ctl_table[] = {
53 {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
59 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
62 struct list_head *tmp;
65 int index = pcfg->pcfg_count;
68 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
70 list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
74 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
75 hdr = (ptl_hdr_t *)ktx->ktx_buffer;
77 memcpy(pcfg->pcfg_pbuf, ktx,
78 MIN(sizeof(*ktx), pcfg->pcfg_plen1));
79 pcfg->pcfg_count = le32_to_cpu(hdr->type);
80 pcfg->pcfg_size = le32_to_cpu(hdr->payload_length);
81 pcfg->pcfg_nid = le64_to_cpu(hdr->dest_nid);
82 pcfg->pcfg_nid2 = ktx->ktx_nid;
83 pcfg->pcfg_misc = ktx->ktx_launcher;
84 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
85 (!ktx->ktx_isnblk ? 0 : 2) |
86 (ktx->ktx_state << 2);
91 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
96 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
98 LASSERT (pcfg != NULL);
100 switch (pcfg->pcfg_command) {
101 case NAL_CMD_GET_TXDESC:
102 return (kqswnal_get_tx_desc (pcfg));
104 case NAL_CMD_REGISTER_MYNID:
105 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
106 pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
107 kqswnal_data.kqn_nid_offset);
108 kqswnal_data.kqn_nid_offset =
109 pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
110 kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
119 kqswnal_shutdown(nal_t *nal)
126 /* NB The first ref was this module! */
127 if (nal->nal_refct != 0) {
132 CDEBUG (D_NET, "shutdown\n");
133 LASSERT (nal == &kqswnal_api);
135 switch (kqswnal_data.kqn_init)
141 libcfs_nal_cmd_unregister(QSWNAL);
151 case KQN_INIT_NOTHING:
155 /**********************************************************************/
156 /* Tell router we're shutting down. Any router calls my threads
157 * make will now fail immediately and the router will stop calling
159 kpr_shutdown (&kqswnal_data.kqn_router);
161 /**********************************************************************/
162 /* Signal the start of shutdown... */
163 spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
164 kqswnal_data.kqn_shuttingdown = 1;
165 spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
167 wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
169 /**********************************************************************/
170 /* wait for sends that have allocated a tx desc to launch or give up */
171 while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
172 CDEBUG(D_NET, "waiting for %d pending sends\n",
173 atomic_read (&kqswnal_data.kqn_pending_txs));
174 set_current_state (TASK_UNINTERRUPTIBLE);
175 schedule_timeout (HZ);
178 /**********************************************************************/
179 /* close elan comms */
181 /* Shut down receivers first; rx callbacks might try sending... */
182 if (kqswnal_data.kqn_eprx_small != NULL)
183 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
185 if (kqswnal_data.kqn_eprx_large != NULL)
186 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
188 /* NB ep_free_rcvr() returns only after we've freed off all receive
189 * buffers (see shutdown handling in kqswnal_requeue_rx()). This
190 * means we must have completed any messages we passed to
191 * lib_parse() or kpr_fwd_start(). */
193 if (kqswnal_data.kqn_eptx != NULL)
194 ep_free_xmtr (kqswnal_data.kqn_eptx);
196 /* NB ep_free_xmtr() returns only after all outstanding transmits
197 * have called their callback... */
198 LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
200 /* "Old" EKC just pretends to shutdown cleanly but actually
201 * provides no guarantees */
202 if (kqswnal_data.kqn_eprx_small != NULL)
203 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
205 if (kqswnal_data.kqn_eprx_large != NULL)
206 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
208 /* wait for transmits to complete */
209 while (!list_empty(&kqswnal_data.kqn_activetxds)) {
210 CWARN("waiting for active transmits to complete\n");
211 set_current_state(TASK_UNINTERRUPTIBLE);
212 schedule_timeout(HZ);
215 if (kqswnal_data.kqn_eptx != NULL)
216 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
218 /**********************************************************************/
219 /* flag threads to terminate, wake them and wait for them to die */
220 kqswnal_data.kqn_shuttingdown = 2;
221 wake_up_all (&kqswnal_data.kqn_sched_waitq);
223 while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
224 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
225 atomic_read (&kqswnal_data.kqn_nthreads));
226 set_current_state (TASK_UNINTERRUPTIBLE);
227 schedule_timeout (HZ);
230 /**********************************************************************/
231 /* No more threads. No more portals, router or comms callbacks!
232 * I control the horizontals and the verticals...
236 LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
237 LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
238 LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
241 /**********************************************************************/
242 /* Complete any blocked forwarding packets, with error
245 while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
247 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
248 kpr_fwd_desc_t, kprfd_list);
249 list_del (&fwd->kprfd_list);
250 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
253 /**********************************************************************/
254 /* finalise router and portals lib */
256 kpr_deregister (&kqswnal_data.kqn_router);
259 lib_fini (&kqswnal_lib);
261 /**********************************************************************/
262 /* Unmap message buffers and free all descriptors and buffers
266 /* FTTB, we need to unmap any remaining mapped memory. When
267 * ep_dvma_release() get fixed (and releases any mappings in the
268 * region), we can delete all the code from here --------> */
270 for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
271 /* If ktx has a buffer, it got mapped; unmap now. NB only
272 * the pre-mapped stuff is still mapped since all tx descs
275 if (ktx->ktx_buffer != NULL)
276 ep_dvma_unload(kqswnal_data.kqn_ep,
277 kqswnal_data.kqn_ep_tx_nmh,
281 for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
282 /* If krx_kiov[0].kiov_page got allocated, it got mapped.
283 * NB subsequent pages get merged */
285 if (krx->krx_kiov[0].kiov_page != NULL)
286 ep_dvma_unload(kqswnal_data.kqn_ep,
287 kqswnal_data.kqn_ep_rx_nmh,
288 &krx->krx_elanbuffer);
290 /* <----------- to here */
292 if (kqswnal_data.kqn_ep_rx_nmh != NULL)
293 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
295 if (kqswnal_data.kqn_ep_tx_nmh != NULL)
296 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
298 if (kqswnal_data.kqn_eprxdmahandle != NULL)
300 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
301 kqswnal_data.kqn_eprxdmahandle, 0,
302 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
303 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
305 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
306 kqswnal_data.kqn_eprxdmahandle);
309 if (kqswnal_data.kqn_eptxdmahandle != NULL)
311 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
312 kqswnal_data.kqn_eptxdmahandle, 0,
313 KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
316 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
317 kqswnal_data.kqn_eptxdmahandle);
321 while (kqswnal_data.kqn_txds != NULL) {
322 ktx = kqswnal_data.kqn_txds;
324 if (ktx->ktx_buffer != NULL)
325 PORTAL_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
327 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
328 PORTAL_FREE(ktx, sizeof(*ktx));
331 while (kqswnal_data.kqn_rxds != NULL) {
334 krx = kqswnal_data.kqn_rxds;
335 for (i = 0; i < krx->krx_npages; i++)
336 if (krx->krx_kiov[i].kiov_page != NULL)
337 __free_page (krx->krx_kiov[i].kiov_page);
339 kqswnal_data.kqn_rxds = krx->krx_alloclist;
340 PORTAL_FREE(krx, sizeof (*krx));
343 /* resets flags, pointers to NULL etc */
344 memset(&kqswnal_data, 0, sizeof (kqswnal_data));
346 CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
348 printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
349 atomic_read(&portal_kmemory));
353 kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
354 ptl_ni_limits_t *requested_limits,
355 ptl_ni_limits_t *actual_limits)
358 EP_RAILMASK all_rails = EP_RAILMASK_ALL;
360 ELAN3_DMA_REQUEST dmareq;
367 ptl_process_id_t my_process_id;
368 int pkmem = atomic_read(&portal_kmemory);
370 LASSERT (nal == &kqswnal_api);
372 if (nal->nal_refct != 0) {
373 if (actual_limits != NULL)
374 *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
375 /* This module got the first ref */
380 LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
382 CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
384 /* ensure all pointers NULL etc */
385 memset (&kqswnal_data, 0, sizeof (kqswnal_data));
387 INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
388 INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
389 INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
390 spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
391 init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
392 INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
394 INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
395 INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
396 INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
398 spin_lock_init (&kqswnal_data.kqn_sched_lock);
399 init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
401 /* Leave kqn_rpc_success zeroed */
403 kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
405 kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
408 /* pointers/lists/locks initialised */
409 kqswnal_data.kqn_init = KQN_INIT_DATA;
412 kqswnal_data.kqn_ep = ep_system();
413 if (kqswnal_data.kqn_ep == NULL) {
414 CERROR("Can't initialise EKC\n");
415 kqswnal_shutdown(nal);
416 return (PTL_IFACE_INVALID);
419 if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
420 CERROR("Can't get elan ID\n");
421 kqswnal_shutdown(nal);
422 return (PTL_IFACE_INVALID);
425 /**********************************************************************/
426 /* Find the first Elan device */
428 kqswnal_data.kqn_ep = ep_device (0);
429 if (kqswnal_data.kqn_ep == NULL)
431 CERROR ("Can't get elan device 0\n");
432 kqswnal_shutdown(nal);
433 return (PTL_IFACE_INVALID);
437 kqswnal_data.kqn_nid_offset = 0;
438 kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
439 kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
441 /**********************************************************************/
442 /* Get the transmitter */
444 kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
445 if (kqswnal_data.kqn_eptx == NULL)
447 CERROR ("Can't allocate transmitter\n");
448 kqswnal_shutdown (nal);
449 return (PTL_NO_SPACE);
452 /**********************************************************************/
453 /* Get the receivers */
455 kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
456 EP_MSG_SVC_PORTALS_SMALL,
457 KQSW_EP_ENVELOPES_SMALL);
458 if (kqswnal_data.kqn_eprx_small == NULL)
460 CERROR ("Can't install small msg receiver\n");
461 kqswnal_shutdown (nal);
462 return (PTL_NO_SPACE);
465 kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
466 EP_MSG_SVC_PORTALS_LARGE,
467 KQSW_EP_ENVELOPES_LARGE);
468 if (kqswnal_data.kqn_eprx_large == NULL)
470 CERROR ("Can't install large msg receiver\n");
471 kqswnal_shutdown (nal);
472 return (PTL_NO_SPACE);
475 /**********************************************************************/
476 /* Reserve Elan address space for transmit descriptors NB we may
477 * either send the contents of associated buffers immediately, or
478 * map them for the peer to suck/blow... */
480 kqswnal_data.kqn_ep_tx_nmh =
481 ep_dvma_reserve(kqswnal_data.kqn_ep,
482 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
484 if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
485 CERROR("Can't reserve tx dma space\n");
486 kqswnal_shutdown(nal);
487 return (PTL_NO_SPACE);
490 dmareq.Waitfn = DDI_DMA_SLEEP;
491 dmareq.ElanAddr = (E3_Addr) 0;
492 dmareq.Attr = PTE_LOAD_LITTLE_ENDIAN;
493 dmareq.Perm = ELAN_PERM_REMOTEWRITE;
495 rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
496 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
497 &dmareq, &kqswnal_data.kqn_eptxdmahandle);
498 if (rc != DDI_SUCCESS)
500 CERROR ("Can't reserve rx dma space\n");
501 kqswnal_shutdown (nal);
502 return (PTL_NO_SPACE);
505 /**********************************************************************/
506 /* Reserve Elan address space for receive buffers */
508 kqswnal_data.kqn_ep_rx_nmh =
509 ep_dvma_reserve(kqswnal_data.kqn_ep,
510 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
511 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
513 if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
514 CERROR("Can't reserve rx dma space\n");
515 kqswnal_shutdown(nal);
516 return (PTL_NO_SPACE);
519 dmareq.Waitfn = DDI_DMA_SLEEP;
520 dmareq.ElanAddr = (E3_Addr) 0;
521 dmareq.Attr = PTE_LOAD_LITTLE_ENDIAN;
522 dmareq.Perm = ELAN_PERM_REMOTEWRITE;
524 rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
525 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
526 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
527 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
528 if (rc != DDI_SUCCESS)
530 CERROR ("Can't reserve rx dma space\n");
531 kqswnal_shutdown (nal);
532 return (PTL_NO_SPACE);
535 /**********************************************************************/
536 /* Allocate/Initialise transmit descriptors */
538 kqswnal_data.kqn_txds = NULL;
539 for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
542 int basepage = i * KQSW_NTXMSGPAGES;
544 PORTAL_ALLOC (ktx, sizeof(*ktx));
546 kqswnal_shutdown (nal);
547 return (PTL_NO_SPACE);
550 memset(ktx, 0, sizeof(*ktx)); /* NULL pointers; zero flags */
551 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
552 kqswnal_data.kqn_txds = ktx;
554 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
555 if (ktx->ktx_buffer == NULL)
557 kqswnal_shutdown (nal);
558 return (PTL_NO_SPACE);
561 /* Map pre-allocated buffer NOW, to save latency on transmit */
562 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
563 KQSW_TX_BUFFER_SIZE);
565 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
566 ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
567 kqswnal_data.kqn_ep_tx_nmh, basepage,
568 &all_rails, &ktx->ktx_ebuffer);
570 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
571 kqswnal_data.kqn_eptxdmahandle,
572 ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
573 basepage, &ktx->ktx_ebuffer);
575 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
576 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
578 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
580 ktx->ktx_state = KTX_IDLE;
582 ktx->ktx_rail = -1; /* unset rail */
584 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
585 list_add_tail (&ktx->ktx_list,
586 ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
587 &kqswnal_data.kqn_idletxds);
590 /**********************************************************************/
591 /* Allocate/Initialise receive descriptors */
592 kqswnal_data.kqn_rxds = NULL;
594 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
603 PORTAL_ALLOC(krx, sizeof(*krx));
605 kqswnal_shutdown(nal);
606 return (PTL_NO_SPACE);
609 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
610 krx->krx_alloclist = kqswnal_data.kqn_rxds;
611 kqswnal_data.kqn_rxds = krx;
613 if (i < KQSW_NRXMSGS_SMALL)
615 krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
616 krx->krx_eprx = kqswnal_data.kqn_eprx_small;
620 krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
621 krx->krx_eprx = kqswnal_data.kqn_eprx_large;
624 LASSERT (krx->krx_npages > 0);
625 for (j = 0; j < krx->krx_npages; j++)
627 struct page *page = alloc_page(GFP_KERNEL);
630 kqswnal_shutdown (nal);
631 return (PTL_NO_SPACE);
634 krx->krx_kiov[j].kiov_page = page;
635 LASSERT(page_address(page) != NULL);
638 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
640 PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
641 elan_page_idx, &all_rails, &elanbuffer);
644 krx->krx_elanbuffer = elanbuffer;
646 rc = ep_nmd_merge(&krx->krx_elanbuffer,
647 &krx->krx_elanbuffer,
649 /* NB contiguous mapping */
653 elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
654 kqswnal_data.kqn_eprxdmahandle,
656 PAGE_SIZE, elan_page_idx,
659 krx->krx_elanbuffer = elanbuffer;
661 /* NB contiguous mapping */
662 LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
668 LASSERT (elan_page_idx ==
669 (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
670 (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
672 /**********************************************************************/
673 /* Network interface ready to initialise */
675 my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
676 my_process_id.pid = requested_pid;
678 rc = lib_init(&kqswnal_lib, nal, my_process_id,
679 requested_limits, actual_limits);
682 CERROR ("lib_init failed %d\n", rc);
683 kqswnal_shutdown (nal);
687 kqswnal_data.kqn_init = KQN_INIT_LIB;
689 /**********************************************************************/
690 /* Queue receives, now that it's OK to run their completion callbacks */
692 for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
693 /* NB this enqueue can allocate/sleep (attr == 0) */
694 krx->krx_state = KRX_POSTED;
696 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
697 &krx->krx_elanbuffer, 0);
699 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
701 krx->krx_npages * PAGE_SIZE, 0);
703 if (rc != EP_SUCCESS)
705 CERROR ("failed ep_queue_receive %d\n", rc);
706 kqswnal_shutdown (nal);
711 /**********************************************************************/
712 /* Spawn scheduling threads */
713 for (i = 0; i < num_online_cpus(); i++) {
714 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
717 CERROR ("failed to spawn scheduling thread: %d\n", rc);
718 kqswnal_shutdown (nal);
723 /**********************************************************************/
724 /* Connect to the router */
725 rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
726 CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
728 rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
730 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
731 kqswnal_shutdown (nal);
735 kqswnal_data.kqn_init = KQN_INIT_ALL;
737 printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
738 "(Routing %s, initial mem %d)\n",
739 kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
740 kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
747 kqswnal_finalise (void)
750 if (kqswnal_tunables.kqn_sysctl != NULL)
751 unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
753 PtlNIFini(kqswnal_ni);
755 ptl_unregister_nal(QSWNAL);
759 kqswnal_initialise (void)
763 kqswnal_api.nal_ni_init = kqswnal_startup;
764 kqswnal_api.nal_ni_fini = kqswnal_shutdown;
766 /* Initialise dynamic tunables to defaults once only */
767 kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
768 kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
770 rc = ptl_register_nal(QSWNAL, &kqswnal_api);
772 CERROR("Can't register QSWNAL: %d\n", rc);
773 return (-ENOMEM); /* or something... */
776 /* Pure gateways, and the workaround for 'EKC blocks forever until
777 * the service is active' want the NAL started up at module load
779 rc = PtlNIInit(QSWNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kqswnal_ni);
780 if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
781 ptl_unregister_nal(QSWNAL);
786 /* Press on regardless even if registering sysctl doesn't work */
787 kqswnal_tunables.kqn_sysctl =
788 register_sysctl_table (kqswnal_top_ctl_table, 0);
793 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
794 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
795 MODULE_LICENSE("GPL");
797 module_init (kqswnal_initialise);
798 module_exit (kqswnal_finalise);