2 * Copyright (C) 2002 Cluster File Systems, Inc.
3 * Author: Eric Barton <eric@bartonsoftware.com>
5 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
6 * W. Marcus Miller - Based on ksocknal
8 * This file is part of Portals, http://www.sf.net/projects/lustre/
10 * Portals is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Portals is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Portals; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 kqswnal_data_t kqswnal_data;
29 ptl_handle_ni_t kqswnal_ni;
30 kqswnal_tunables_t kqswnal_tunables;
32 kpr_nal_interface_t kqswnal_router_interface = {
35 kprni_fwd: kqswnal_fwd_packet,
36 kprni_notify: NULL, /* we're connectionless */
40 #define QSWNAL_SYSCTL 201
42 #define QSWNAL_SYSCTL_OPTIMIZED_GETS 1
43 #define QSWNAL_SYSCTL_OPTIMIZED_PUTS 2
45 static ctl_table kqswnal_ctl_table[] = {
46 {QSWNAL_SYSCTL_OPTIMIZED_PUTS, "optimized_puts",
47 &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
48 0644, NULL, &proc_dointvec},
49 {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
50 &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
51 0644, NULL, &proc_dointvec},
55 static ctl_table kqswnal_top_ctl_table[] = {
56 {QSWNAL_SYSCTL, "qswnal", NULL, 0, 0555, kqswnal_ctl_table},
62 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
65 struct list_head *tmp;
68 int index = pcfg->pcfg_count;
71 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
73 list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
77 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
78 hdr = (ptl_hdr_t *)ktx->ktx_buffer;
80 pcfg->pcfg_pbuf1 = (char *)ktx;
81 pcfg->pcfg_count = le32_to_cpu(hdr->type);
82 pcfg->pcfg_size = le32_to_cpu(hdr->payload_length);
83 pcfg->pcfg_nid = le64_to_cpu(hdr->dest_nid);
84 pcfg->pcfg_nid2 = ktx->ktx_nid;
85 pcfg->pcfg_misc = ktx->ktx_launcher;
86 pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
87 (!ktx->ktx_isnblk ? 0 : 2) |
88 (ktx->ktx_state << 2);
93 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
98 kqswnal_cmd (struct portals_cfg *pcfg, void *private)
100 LASSERT (pcfg != NULL);
102 switch (pcfg->pcfg_command) {
103 case NAL_CMD_GET_TXDESC:
104 return (kqswnal_get_tx_desc (pcfg));
106 case NAL_CMD_REGISTER_MYNID:
107 CDEBUG (D_IOCTL, "setting NID offset to "LPX64" (was "LPX64")\n",
108 pcfg->pcfg_nid - kqswnal_data.kqn_elanid,
109 kqswnal_data.kqn_nid_offset);
110 kqswnal_data.kqn_nid_offset =
111 pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
112 kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
121 kqswnal_shutdown(nal_t *nal)
128 /* NB The first ref was this module! */
129 if (nal->nal_refct != 0) {
134 CDEBUG (D_NET, "shutdown\n");
135 LASSERT (nal == &kqswnal_api);
137 switch (kqswnal_data.kqn_init)
143 libcfs_nal_cmd_unregister(QSWNAL);
153 case KQN_INIT_NOTHING:
157 /**********************************************************************/
158 /* Tell router we're shutting down. Any router calls my threads
159 * make will now fail immediately and the router will stop calling
161 kpr_shutdown (&kqswnal_data.kqn_router);
163 /**********************************************************************/
164 /* Signal the start of shutdown... */
165 spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
166 kqswnal_data.kqn_shuttingdown = 1;
167 spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
169 wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
171 /**********************************************************************/
172 /* wait for sends that have allocated a tx desc to launch or give up */
173 while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
174 CDEBUG(D_NET, "waiting for %d pending sends\n",
175 atomic_read (&kqswnal_data.kqn_pending_txs));
176 set_current_state (TASK_UNINTERRUPTIBLE);
177 schedule_timeout (HZ);
180 /**********************************************************************/
181 /* close elan comms */
183 /* Shut down receivers first; rx callbacks might try sending... */
184 if (kqswnal_data.kqn_eprx_small != NULL)
185 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
187 if (kqswnal_data.kqn_eprx_large != NULL)
188 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
190 /* NB ep_free_rcvr() returns only after we've freed off all receive
191 * buffers (see shutdown handling in kqswnal_requeue_rx()). This
192 * means we must have completed any messages we passed to
193 * lib_parse() or kpr_fwd_start(). */
195 if (kqswnal_data.kqn_eptx != NULL)
196 ep_free_xmtr (kqswnal_data.kqn_eptx);
198 /* NB ep_free_xmtr() returns only after all outstanding transmits
199 * have called their callback... */
200 LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
202 /* "Old" EKC just pretends to shutdown cleanly but actually
203 * provides no guarantees */
204 if (kqswnal_data.kqn_eprx_small != NULL)
205 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
207 if (kqswnal_data.kqn_eprx_large != NULL)
208 ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
210 /* wait for transmits to complete */
211 while (!list_empty(&kqswnal_data.kqn_activetxds)) {
212 CWARN("waiting for active transmits to complete\n");
213 set_current_state(TASK_UNINTERRUPTIBLE);
214 schedule_timeout(HZ);
217 if (kqswnal_data.kqn_eptx != NULL)
218 ep_free_large_xmtr (kqswnal_data.kqn_eptx);
220 /**********************************************************************/
221 /* flag threads to terminate, wake them and wait for them to die */
222 kqswnal_data.kqn_shuttingdown = 2;
223 wake_up_all (&kqswnal_data.kqn_sched_waitq);
225 while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
226 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
227 atomic_read (&kqswnal_data.kqn_nthreads));
228 set_current_state (TASK_UNINTERRUPTIBLE);
229 schedule_timeout (HZ);
232 /**********************************************************************/
233 /* No more threads. No more portals, router or comms callbacks!
234 * I control the horizontals and the verticals...
238 LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
239 LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
240 LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
243 /**********************************************************************/
244 /* Complete any blocked forwarding packets, with error
247 while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
249 kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
250 kpr_fwd_desc_t, kprfd_list);
251 list_del (&fwd->kprfd_list);
252 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
255 /**********************************************************************/
256 /* finalise router and portals lib */
258 kpr_deregister (&kqswnal_data.kqn_router);
261 lib_fini (&kqswnal_lib);
263 /**********************************************************************/
264 /* Unmap message buffers and free all descriptors and buffers
268 /* FTTB, we need to unmap any remaining mapped memory. When
269 * ep_dvma_release() get fixed (and releases any mappings in the
270 * region), we can delete all the code from here --------> */
272 for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
273 /* If ktx has a buffer, it got mapped; unmap now. NB only
274 * the pre-mapped stuff is still mapped since all tx descs
277 if (ktx->ktx_buffer != NULL)
278 ep_dvma_unload(kqswnal_data.kqn_ep,
279 kqswnal_data.kqn_ep_tx_nmh,
283 for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
284 /* If krx_kiov[0].kiov_page got allocated, it got mapped.
285 * NB subsequent pages get merged */
287 if (krx->krx_kiov[0].kiov_page != NULL)
288 ep_dvma_unload(kqswnal_data.kqn_ep,
289 kqswnal_data.kqn_ep_rx_nmh,
290 &krx->krx_elanbuffer);
292 /* <----------- to here */
294 if (kqswnal_data.kqn_ep_rx_nmh != NULL)
295 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
297 if (kqswnal_data.kqn_ep_tx_nmh != NULL)
298 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
300 if (kqswnal_data.kqn_eprxdmahandle != NULL)
302 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
303 kqswnal_data.kqn_eprxdmahandle, 0,
304 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
305 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE);
307 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
308 kqswnal_data.kqn_eprxdmahandle);
311 if (kqswnal_data.kqn_eptxdmahandle != NULL)
313 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
314 kqswnal_data.kqn_eptxdmahandle, 0,
315 KQSW_NTXMSGPAGES * (KQSW_NTXMSGS +
318 elan3_dma_release(kqswnal_data.kqn_ep->DmaState,
319 kqswnal_data.kqn_eptxdmahandle);
323 while (kqswnal_data.kqn_txds != NULL) {
324 ktx = kqswnal_data.kqn_txds;
326 if (ktx->ktx_buffer != NULL)
327 PORTAL_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
329 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
330 PORTAL_FREE(ktx, sizeof(*ktx));
333 while (kqswnal_data.kqn_rxds != NULL) {
336 krx = kqswnal_data.kqn_rxds;
337 for (i = 0; i < krx->krx_npages; i++)
338 if (krx->krx_kiov[i].kiov_page != NULL)
339 __free_page (krx->krx_kiov[i].kiov_page);
341 kqswnal_data.kqn_rxds = krx->krx_alloclist;
342 PORTAL_FREE(krx, sizeof (*krx));
345 /* resets flags, pointers to NULL etc */
346 memset(&kqswnal_data, 0, sizeof (kqswnal_data));
348 CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&portal_kmemory));
350 printk (KERN_INFO "Lustre: Routing QSW NAL unloaded (final mem %d)\n",
351 atomic_read(&portal_kmemory));
355 kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
356 ptl_ni_limits_t *requested_limits,
357 ptl_ni_limits_t *actual_limits)
360 EP_RAILMASK all_rails = EP_RAILMASK_ALL;
362 ELAN3_DMA_REQUEST dmareq;
369 ptl_process_id_t my_process_id;
370 int pkmem = atomic_read(&portal_kmemory);
372 LASSERT (nal == &kqswnal_api);
374 if (nal->nal_refct != 0) {
375 if (actual_limits != NULL)
376 *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
377 /* This module got the first ref */
382 LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
384 CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
386 /* ensure all pointers NULL etc */
387 memset (&kqswnal_data, 0, sizeof (kqswnal_data));
389 INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
390 INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
391 INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
392 spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
393 init_waitqueue_head (&kqswnal_data.kqn_idletxd_waitq);
394 INIT_LIST_HEAD (&kqswnal_data.kqn_idletxd_fwdq);
396 INIT_LIST_HEAD (&kqswnal_data.kqn_delayedfwds);
397 INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
398 INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
400 spin_lock_init (&kqswnal_data.kqn_sched_lock);
401 init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
403 /* Leave kqn_rpc_success zeroed */
405 kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
407 kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
410 /* pointers/lists/locks initialised */
411 kqswnal_data.kqn_init = KQN_INIT_DATA;
414 kqswnal_data.kqn_ep = ep_system();
415 if (kqswnal_data.kqn_ep == NULL) {
416 CERROR("Can't initialise EKC\n");
417 kqswnal_shutdown(nal);
418 return (PTL_IFACE_INVALID);
421 if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
422 CERROR("Can't get elan ID\n");
423 kqswnal_shutdown(nal);
424 return (PTL_IFACE_INVALID);
427 /**********************************************************************/
428 /* Find the first Elan device */
430 kqswnal_data.kqn_ep = ep_device (0);
431 if (kqswnal_data.kqn_ep == NULL)
433 CERROR ("Can't get elan device 0\n");
434 kqswnal_shutdown(nal);
435 return (PTL_IFACE_INVALID);
439 kqswnal_data.kqn_nid_offset = 0;
440 kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
441 kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
443 /**********************************************************************/
444 /* Get the transmitter */
446 kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
447 if (kqswnal_data.kqn_eptx == NULL)
449 CERROR ("Can't allocate transmitter\n");
450 kqswnal_shutdown (nal);
451 return (PTL_NO_SPACE);
454 /**********************************************************************/
455 /* Get the receivers */
457 kqswnal_data.kqn_eprx_small = ep_alloc_rcvr (kqswnal_data.kqn_ep,
458 EP_MSG_SVC_PORTALS_SMALL,
459 KQSW_EP_ENVELOPES_SMALL);
460 if (kqswnal_data.kqn_eprx_small == NULL)
462 CERROR ("Can't install small msg receiver\n");
463 kqswnal_shutdown (nal);
464 return (PTL_NO_SPACE);
467 kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
468 EP_MSG_SVC_PORTALS_LARGE,
469 KQSW_EP_ENVELOPES_LARGE);
470 if (kqswnal_data.kqn_eprx_large == NULL)
472 CERROR ("Can't install large msg receiver\n");
473 kqswnal_shutdown (nal);
474 return (PTL_NO_SPACE);
477 /**********************************************************************/
478 /* Reserve Elan address space for transmit descriptors NB we may
479 * either send the contents of associated buffers immediately, or
480 * map them for the peer to suck/blow... */
482 kqswnal_data.kqn_ep_tx_nmh =
483 ep_dvma_reserve(kqswnal_data.kqn_ep,
484 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
486 if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
487 CERROR("Can't reserve tx dma space\n");
488 kqswnal_shutdown(nal);
489 return (PTL_NO_SPACE);
492 dmareq.Waitfn = DDI_DMA_SLEEP;
493 dmareq.ElanAddr = (E3_Addr) 0;
494 dmareq.Attr = PTE_LOAD_LITTLE_ENDIAN;
495 dmareq.Perm = ELAN_PERM_REMOTEWRITE;
497 rc = elan3_dma_reserve(kqswnal_data.kqn_ep->DmaState,
498 KQSW_NTXMSGPAGES*(KQSW_NTXMSGS+KQSW_NNBLK_TXMSGS),
499 &dmareq, &kqswnal_data.kqn_eptxdmahandle);
500 if (rc != DDI_SUCCESS)
502 CERROR ("Can't reserve rx dma space\n");
503 kqswnal_shutdown (nal);
504 return (PTL_NO_SPACE);
507 /**********************************************************************/
508 /* Reserve Elan address space for receive buffers */
510 kqswnal_data.kqn_ep_rx_nmh =
511 ep_dvma_reserve(kqswnal_data.kqn_ep,
512 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
513 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
515 if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
516 CERROR("Can't reserve rx dma space\n");
517 kqswnal_shutdown(nal);
518 return (PTL_NO_SPACE);
521 dmareq.Waitfn = DDI_DMA_SLEEP;
522 dmareq.ElanAddr = (E3_Addr) 0;
523 dmareq.Attr = PTE_LOAD_LITTLE_ENDIAN;
524 dmareq.Perm = ELAN_PERM_REMOTEWRITE;
526 rc = elan3_dma_reserve (kqswnal_data.kqn_ep->DmaState,
527 KQSW_NRXMSGPAGES_SMALL * KQSW_NRXMSGS_SMALL +
528 KQSW_NRXMSGPAGES_LARGE * KQSW_NRXMSGS_LARGE,
529 &dmareq, &kqswnal_data.kqn_eprxdmahandle);
530 if (rc != DDI_SUCCESS)
532 CERROR ("Can't reserve rx dma space\n");
533 kqswnal_shutdown (nal);
534 return (PTL_NO_SPACE);
537 /**********************************************************************/
538 /* Allocate/Initialise transmit descriptors */
540 kqswnal_data.kqn_txds = NULL;
541 for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
544 int basepage = i * KQSW_NTXMSGPAGES;
546 PORTAL_ALLOC (ktx, sizeof(*ktx));
548 kqswnal_shutdown (nal);
549 return (PTL_NO_SPACE);
552 memset(ktx, 0, sizeof(*ktx)); /* NULL pointers; zero flags */
553 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
554 kqswnal_data.kqn_txds = ktx;
556 PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
557 if (ktx->ktx_buffer == NULL)
559 kqswnal_shutdown (nal);
560 return (PTL_NO_SPACE);
563 /* Map pre-allocated buffer NOW, to save latency on transmit */
564 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
565 KQSW_TX_BUFFER_SIZE);
567 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
568 ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
569 kqswnal_data.kqn_ep_tx_nmh, basepage,
570 &all_rails, &ktx->ktx_ebuffer);
572 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
573 kqswnal_data.kqn_eptxdmahandle,
574 ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
575 basepage, &ktx->ktx_ebuffer);
577 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
578 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
580 INIT_LIST_HEAD (&ktx->ktx_delayed_list);
582 ktx->ktx_state = KTX_IDLE;
584 ktx->ktx_rail = -1; /* unset rail */
586 ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
587 list_add_tail (&ktx->ktx_list,
588 ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
589 &kqswnal_data.kqn_idletxds);
592 /**********************************************************************/
593 /* Allocate/Initialise receive descriptors */
594 kqswnal_data.kqn_rxds = NULL;
596 for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
605 PORTAL_ALLOC(krx, sizeof(*krx));
607 kqswnal_shutdown(nal);
608 return (PTL_NO_SPACE);
611 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
612 krx->krx_alloclist = kqswnal_data.kqn_rxds;
613 kqswnal_data.kqn_rxds = krx;
615 if (i < KQSW_NRXMSGS_SMALL)
617 krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
618 krx->krx_eprx = kqswnal_data.kqn_eprx_small;
622 krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
623 krx->krx_eprx = kqswnal_data.kqn_eprx_large;
626 LASSERT (krx->krx_npages > 0);
627 for (j = 0; j < krx->krx_npages; j++)
629 struct page *page = alloc_page(GFP_KERNEL);
632 kqswnal_shutdown (nal);
633 return (PTL_NO_SPACE);
636 krx->krx_kiov[j].kiov_page = page;
637 LASSERT(page_address(page) != NULL);
640 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
642 PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
643 elan_page_idx, &all_rails, &elanbuffer);
646 krx->krx_elanbuffer = elanbuffer;
648 rc = ep_nmd_merge(&krx->krx_elanbuffer,
649 &krx->krx_elanbuffer,
651 /* NB contiguous mapping */
655 elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
656 kqswnal_data.kqn_eprxdmahandle,
658 PAGE_SIZE, elan_page_idx,
661 krx->krx_elanbuffer = elanbuffer;
663 /* NB contiguous mapping */
664 LASSERT (elanbuffer == krx->krx_elanbuffer + j * PAGE_SIZE);
670 LASSERT (elan_page_idx ==
671 (KQSW_NRXMSGS_SMALL * KQSW_NRXMSGPAGES_SMALL) +
672 (KQSW_NRXMSGS_LARGE * KQSW_NRXMSGPAGES_LARGE));
674 /**********************************************************************/
675 /* Network interface ready to initialise */
677 my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
678 my_process_id.pid = requested_pid;
680 rc = lib_init(&kqswnal_lib, nal, my_process_id,
681 requested_limits, actual_limits);
684 CERROR ("lib_init failed %d\n", rc);
685 kqswnal_shutdown (nal);
689 kqswnal_data.kqn_init = KQN_INIT_LIB;
691 /**********************************************************************/
692 /* Queue receives, now that it's OK to run their completion callbacks */
694 for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
695 /* NB this enqueue can allocate/sleep (attr == 0) */
696 krx->krx_state = KRX_POSTED;
698 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
699 &krx->krx_elanbuffer, 0);
701 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
703 krx->krx_npages * PAGE_SIZE, 0);
705 if (rc != EP_SUCCESS)
707 CERROR ("failed ep_queue_receive %d\n", rc);
708 kqswnal_shutdown (nal);
713 /**********************************************************************/
714 /* Spawn scheduling threads */
715 for (i = 0; i < num_online_cpus(); i++) {
716 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
719 CERROR ("failed to spawn scheduling thread: %d\n", rc);
720 kqswnal_shutdown (nal);
725 /**********************************************************************/
726 /* Connect to the router */
727 rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
728 CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
730 rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
732 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
733 kqswnal_shutdown (nal);
737 kqswnal_data.kqn_init = KQN_INIT_ALL;
739 printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
740 "(Routing %s, initial mem %d)\n",
741 kqswnal_data.kqn_elanid, kqswnal_data.kqn_nnodes,
742 kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
749 kqswnal_finalise (void)
752 if (kqswnal_tunables.kqn_sysctl != NULL)
753 unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
755 PtlNIFini(kqswnal_ni);
757 ptl_unregister_nal(QSWNAL);
761 kqswnal_initialise (void)
765 kqswnal_api.nal_ni_init = kqswnal_startup;
766 kqswnal_api.nal_ni_fini = kqswnal_shutdown;
768 /* Initialise dynamic tunables to defaults once only */
769 kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
770 kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
772 rc = ptl_register_nal(QSWNAL, &kqswnal_api);
774 CERROR("Can't register QSWNAL: %d\n", rc);
775 return (-ENOMEM); /* or something... */
778 /* Pure gateways, and the workaround for 'EKC blocks forever until
779 * the service is active' want the NAL started up at module load
781 rc = PtlNIInit(QSWNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kqswnal_ni);
782 if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
783 ptl_unregister_nal(QSWNAL);
788 /* Press on regardless even if registering sysctl doesn't work */
789 kqswnal_tunables.kqn_sysctl =
790 register_sysctl_table (kqswnal_top_ctl_table, 0);
795 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
796 MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
797 MODULE_LICENSE("GPL");
799 module_init (kqswnal_initialise);
800 module_exit (kqswnal_finalise);