/*
- * Copyright (C) 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2002-2004 Cluster File Systems, Inc.
* Author: Eric Barton <eric@bartonsoftware.com>
*
- * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
- * W. Marcus Miller - Based on ksocknal
- *
- * This file is part of Portals, http://www.sf.net/projects/lustre/
+ * This file is part of Portals, http://www.lustre.org
*
* Portals is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
#include "qswnal.h"
-ptl_handle_ni_t kqswnal_ni;
nal_t kqswnal_api;
kqswnal_data_t kqswnal_data;
+ptl_handle_ni_t kqswnal_ni;
+kqswnal_tunables_t kqswnal_tunables;
kpr_nal_interface_t kqswnal_router_interface = {
kprni_nalid: QSWNAL,
#define QSWNAL_SYSCTL 201
#define QSWNAL_SYSCTL_OPTIMIZED_GETS 1
-#define QSWNAL_SYSCTL_COPY_SMALL_FWD 2
+#define QSWNAL_SYSCTL_OPTIMIZED_PUTS 2
static ctl_table kqswnal_ctl_table[] = {
- {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
- &kqswnal_data.kqn_optimized_gets, sizeof (int),
+ {QSWNAL_SYSCTL_OPTIMIZED_PUTS, "optimized_puts",
+ &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
0644, NULL, &proc_dointvec},
- {QSWNAL_SYSCTL_COPY_SMALL_FWD, "copy_small_fwd",
- &kqswnal_data.kqn_copy_small_fwd, sizeof (int),
+ {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
+ &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
0644, NULL, &proc_dointvec},
{0}
};
};
#endif
-static int
-kqswnal_forward(nal_t *nal,
- int id,
- void *args, size_t args_len,
- void *ret, size_t ret_len)
-{
- kqswnal_data_t *k = nal->nal_data;
- nal_cb_t *nal_cb = k->kqn_cb;
-
- LASSERT (nal == &kqswnal_api);
- LASSERT (k == &kqswnal_data);
- LASSERT (nal_cb == &kqswnal_lib);
-
- lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
- return (PTL_OK);
-}
-
-static void
-kqswnal_lock (nal_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *k = nal->nal_data;
- nal_cb_t *nal_cb = k->kqn_cb;
-
- LASSERT (nal == &kqswnal_api);
- LASSERT (k == &kqswnal_data);
- LASSERT (nal_cb == &kqswnal_lib);
-
- nal_cb->cb_cli(nal_cb,flags);
-}
-
-static void
-kqswnal_unlock(nal_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *k = nal->nal_data;
- nal_cb_t *nal_cb = k->kqn_cb;
-
- LASSERT (nal == &kqswnal_api);
- LASSERT (k == &kqswnal_data);
- LASSERT (nal_cb == &kqswnal_lib);
-
- nal_cb->cb_sti(nal_cb,flags);
-}
-
-static int
-kqswnal_shutdown(nal_t *nal, int ni)
-{
- CDEBUG (D_NET, "shutdown\n");
-
- LASSERT (nal == &kqswnal_api);
- return (0);
-}
-
-static int
-kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
-{
- /* NB called holding statelock */
- wait_queue_t wait;
- unsigned long now = jiffies;
-
- CDEBUG (D_NET, "yield\n");
-
- if (milliseconds == 0) {
- if (current->need_resched)
- schedule();
- return 0;
- }
-
- init_waitqueue_entry(&wait, current);
- set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
-
- kqswnal_unlock(nal, flags);
-
- if (milliseconds < 0)
- schedule ();
- else
- schedule_timeout((milliseconds * HZ) / 1000);
-
- kqswnal_lock(nal, flags);
-
- remove_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
-
- if (milliseconds > 0) {
- milliseconds -= ((jiffies - now) * 1000) / HZ;
- if (milliseconds < 0)
- milliseconds = 0;
- }
-
- return (milliseconds);
-}
-
-static nal_t *
-kqswnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
- ptl_pid_t requested_pid)
-{
- ptl_nid_t mynid = kqswnal_elanid2nid (kqswnal_data.kqn_elanid);
- int nnids = kqswnal_data.kqn_nnodes;
-
- CDEBUG(D_NET, "calling lib_init with nid "LPX64" of %d\n", mynid, nnids);
-
- lib_init(&kqswnal_lib, mynid, 0, nnids, ptl_size, ac_size);
-
- return (&kqswnal_api);
-}
-
int
kqswnal_get_tx_desc (struct portals_cfg *pcfg)
{
unsigned long flags;
struct list_head *tmp;
kqswnal_tx_t *ktx;
+ ptl_hdr_t *hdr;
int index = pcfg->pcfg_count;
int rc = -ENOENT;
continue;
ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
+ hdr = (ptl_hdr_t *)ktx->ktx_buffer;
- pcfg->pcfg_pbuf1 = (char *)ktx;
- pcfg->pcfg_count = NTOH__u32(ktx->ktx_wire_hdr->type);
- pcfg->pcfg_size = NTOH__u32(ktx->ktx_wire_hdr->payload_length);
- pcfg->pcfg_nid = NTOH__u64(ktx->ktx_wire_hdr->dest_nid);
+ memcpy(pcfg->pcfg_pbuf, ktx,
+ MIN(sizeof(*ktx), pcfg->pcfg_plen1));
+ pcfg->pcfg_count = le32_to_cpu(hdr->type);
+ pcfg->pcfg_size = le32_to_cpu(hdr->payload_length);
+ pcfg->pcfg_nid = le64_to_cpu(hdr->dest_nid);
pcfg->pcfg_nid2 = ktx->ktx_nid;
pcfg->pcfg_misc = ktx->ktx_launcher;
pcfg->pcfg_flags = (list_empty (&ktx->ktx_delayed_list) ? 0 : 1) |
kqswnal_data.kqn_nid_offset);
kqswnal_data.kqn_nid_offset =
pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
- kqswnal_lib.ni.nid = pcfg->pcfg_nid;
+ kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
return (0);
default:
}
}
-void __exit
-kqswnal_finalise (void)
+static void
+kqswnal_shutdown(nal_t *nal)
{
+ unsigned long flags;
+ kqswnal_tx_t *ktx;
+ kqswnal_rx_t *krx;
+ int do_lib_fini = 0;
+
+ /* NB The first ref was this module! */
+ if (nal->nal_refct != 0) {
+ PORTAL_MODULE_UNUSE;
+ return;
+ }
+
+ CDEBUG (D_NET, "shutdown\n");
+ LASSERT (nal == &kqswnal_api);
+
switch (kqswnal_data.kqn_init)
{
default:
LASSERT (0);
case KQN_INIT_ALL:
-#if CONFIG_SYSCTL
- if (kqswnal_data.kqn_sysctl != NULL)
- unregister_sysctl_table (kqswnal_data.kqn_sysctl);
-#endif
- PORTAL_SYMBOL_UNREGISTER (kqswnal_ni);
- kportal_nal_unregister(QSWNAL);
+ libcfs_nal_cmd_unregister(QSWNAL);
/* fall through */
- case KQN_INIT_PTL:
- PtlNIFini (kqswnal_ni);
- lib_fini (&kqswnal_lib);
+ case KQN_INIT_LIB:
+ do_lib_fini = 1;
/* fall through */
case KQN_INIT_DATA:
}
/**********************************************************************/
- /* Make router stop her calling me and fail any more call-ins */
+ /* Tell router we're shutting down. Any router calls my threads
+ * make will now fail immediately and the router will stop calling
+ * into me. */
kpr_shutdown (&kqswnal_data.kqn_router);
-
+
/**********************************************************************/
- /* flag threads we've started to terminate and wait for all to ack */
-
+ /* Signal the start of shutdown... */
+ spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
kqswnal_data.kqn_shuttingdown = 1;
- wake_up_all (&kqswnal_data.kqn_sched_waitq);
+ spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
- while (atomic_read (&kqswnal_data.kqn_nthreads_running) != 0) {
- CDEBUG(D_NET, "waiting for %d threads to start shutting down\n",
- atomic_read (&kqswnal_data.kqn_nthreads_running));
+ wake_up_all(&kqswnal_data.kqn_idletxd_waitq);
+
+ /**********************************************************************/
+ /* wait for sends that have allocated a tx desc to launch or give up */
+ while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
+ CDEBUG(D_NET, "waiting for %d pending sends\n",
+ atomic_read (&kqswnal_data.kqn_pending_txs));
set_current_state (TASK_UNINTERRUPTIBLE);
schedule_timeout (HZ);
}
/**********************************************************************/
/* close elan comms */
#if MULTIRAIL_EKC
+ /* Shut down receivers first; rx callbacks might try sending... */
if (kqswnal_data.kqn_eprx_small != NULL)
ep_free_rcvr (kqswnal_data.kqn_eprx_small);
if (kqswnal_data.kqn_eprx_large != NULL)
ep_free_rcvr (kqswnal_data.kqn_eprx_large);
+ /* NB ep_free_rcvr() returns only after we've freed off all receive
+ * buffers (see shutdown handling in kqswnal_requeue_rx()). This
+ * means we must have completed any messages we passed to
+ * lib_parse() or kpr_fwd_start(). */
+
if (kqswnal_data.kqn_eptx != NULL)
ep_free_xmtr (kqswnal_data.kqn_eptx);
- /* freeing the xmtr completes all txs pdq */
+ /* NB ep_free_xmtr() returns only after all outstanding transmits
+ * have called their callback... */
LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
#else
+ /* "Old" EKC just pretends to shutdown cleanly but actually
+ * provides no guarantees */
if (kqswnal_data.kqn_eprx_small != NULL)
ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
#endif
/**********************************************************************/
/* flag threads to terminate, wake them and wait for them to die */
-
kqswnal_data.kqn_shuttingdown = 2;
wake_up_all (&kqswnal_data.kqn_sched_waitq);
#if MULTIRAIL_EKC
LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
+ LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
+ LASSERT (list_empty (&kqswnal_data.kqn_delayedfwds));
#endif
/**********************************************************************/
- /* Complete any blocked forwarding packets with error
+ /* Complete any blocked forwarding packets, with error
*/
while (!list_empty (&kqswnal_data.kqn_idletxd_fwdq))
kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
- }
-
- while (!list_empty (&kqswnal_data.kqn_delayedfwds))
- {
- kpr_fwd_desc_t *fwd = list_entry (kqswnal_data.kqn_delayedfwds.next,
- kpr_fwd_desc_t, kprfd_list);
- list_del (&fwd->kprfd_list);
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -EHOSTUNREACH);
+ kpr_fwd_done (&kqswnal_data.kqn_router, fwd, -ESHUTDOWN);
}
/**********************************************************************/
- /* Wait for router to complete any packets I sent her
- */
+ /* finalise router and portals lib */
kpr_deregister (&kqswnal_data.kqn_router);
+ if (do_lib_fini)
+ lib_fini (&kqswnal_lib);
/**********************************************************************/
/* Unmap message buffers and free all descriptors and buffers
* ep_dvma_release() get fixed (and releases any mappings in the
* region), we can delete all the code from here --------> */
- if (kqswnal_data.kqn_txds != NULL) {
- int i;
-
- for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++) {
- kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
-
- /* If ktx has a buffer, it got mapped; unmap now.
- * NB only the pre-mapped stuff is still mapped
- * since all tx descs must be idle */
+ for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
+ /* If ktx has a buffer, it got mapped; unmap now. NB only
+ * the pre-mapped stuff is still mapped since all tx descs
+ * must be idle */
- if (ktx->ktx_buffer != NULL)
- ep_dvma_unload(kqswnal_data.kqn_ep,
- kqswnal_data.kqn_ep_tx_nmh,
- &ktx->ktx_ebuffer);
- }
+ if (ktx->ktx_buffer != NULL)
+ ep_dvma_unload(kqswnal_data.kqn_ep,
+ kqswnal_data.kqn_ep_tx_nmh,
+ &ktx->ktx_ebuffer);
}
- if (kqswnal_data.kqn_rxds != NULL) {
- int i;
-
- for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
- kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
+ for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
+ /* If krx_kiov[0].kiov_page got allocated, it got mapped.
+ * NB subsequent pages get merged */
- /* If krx_kiov[0].kiov_page got allocated, it got mapped.
- * NB subsequent pages get merged */
-
- if (krx->krx_kiov[0].kiov_page != NULL)
- ep_dvma_unload(kqswnal_data.kqn_ep,
- kqswnal_data.kqn_ep_rx_nmh,
- &krx->krx_elanbuffer);
- }
+ if (krx->krx_kiov[0].kiov_page != NULL)
+ ep_dvma_unload(kqswnal_data.kqn_ep,
+ kqswnal_data.kqn_ep_rx_nmh,
+ &krx->krx_elanbuffer);
}
/* <----------- to here */
}
#endif
- if (kqswnal_data.kqn_txds != NULL)
- {
- int i;
-
- for (i = 0; i < KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS; i++)
- {
- kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
+ while (kqswnal_data.kqn_txds != NULL) {
+ ktx = kqswnal_data.kqn_txds;
- if (ktx->ktx_buffer != NULL)
- PORTAL_FREE(ktx->ktx_buffer,
- KQSW_TX_BUFFER_SIZE);
- }
+ if (ktx->ktx_buffer != NULL)
+ PORTAL_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
- PORTAL_FREE(kqswnal_data.kqn_txds,
- sizeof (kqswnal_tx_t) * (KQSW_NTXMSGS +
- KQSW_NNBLK_TXMSGS));
+ kqswnal_data.kqn_txds = ktx->ktx_alloclist;
+ PORTAL_FREE(ktx, sizeof(*ktx));
}
- if (kqswnal_data.kqn_rxds != NULL)
- {
- int i;
- int j;
+ while (kqswnal_data.kqn_rxds != NULL) {
+ int i;
- for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
- {
- kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
+ krx = kqswnal_data.kqn_rxds;
+ for (i = 0; i < krx->krx_npages; i++)
+ if (krx->krx_kiov[i].kiov_page != NULL)
+ __free_page (krx->krx_kiov[i].kiov_page);
- for (j = 0; j < krx->krx_npages; j++)
- if (krx->krx_kiov[j].kiov_page != NULL)
- __free_page (krx->krx_kiov[j].kiov_page);
- }
-
- PORTAL_FREE(kqswnal_data.kqn_rxds,
- sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL +
- KQSW_NRXMSGS_LARGE));
+ kqswnal_data.kqn_rxds = krx->krx_alloclist;
+ PORTAL_FREE(krx, sizeof (*krx));
}
/* resets flags, pointers to NULL etc */
atomic_read(&portal_kmemory));
}
-static int __init
-kqswnal_initialise (void)
+static int
+kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
+ ptl_ni_limits_t *requested_limits,
+ ptl_ni_limits_t *actual_limits)
{
#if MULTIRAIL_EKC
EP_RAILMASK all_rails = EP_RAILMASK_ALL;
#endif
int rc;
int i;
+ kqswnal_rx_t *krx;
+ kqswnal_tx_t *ktx;
int elan_page_idx;
+ ptl_process_id_t my_process_id;
int pkmem = atomic_read(&portal_kmemory);
- LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
+ LASSERT (nal == &kqswnal_api);
- CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
+ if (nal->nal_refct != 0) {
+ if (actual_limits != NULL)
+ *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
+ /* This module got the first ref */
+ PORTAL_MODULE_USE;
+ return (PTL_OK);
+ }
- kqswnal_api.forward = kqswnal_forward;
- kqswnal_api.shutdown = kqswnal_shutdown;
- kqswnal_api.yield = kqswnal_yield;
- kqswnal_api.validate = NULL; /* our api validate is a NOOP */
- kqswnal_api.lock = kqswnal_lock;
- kqswnal_api.unlock = kqswnal_unlock;
- kqswnal_api.nal_data = &kqswnal_data;
+ LASSERT (kqswnal_data.kqn_init == KQN_INIT_NOTHING);
- kqswnal_lib.nal_data = &kqswnal_data;
+ CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
- memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
- memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
-#if MULTIRAIL_EKC
- kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
-#else
- kqswnal_rpc_failed.Status = -ECONNREFUSED;
-#endif
/* ensure all pointers NULL etc */
memset (&kqswnal_data, 0, sizeof (kqswnal_data));
- kqswnal_data.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
- kqswnal_data.kqn_copy_small_fwd = KQSW_COPY_SMALL_FWD;
-
- kqswnal_data.kqn_cb = &kqswnal_lib;
-
INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
spin_lock_init (&kqswnal_data.kqn_sched_lock);
init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
- spin_lock_init (&kqswnal_data.kqn_statelock);
- init_waitqueue_head (&kqswnal_data.kqn_yield_waitq);
+ /* Leave kqn_rpc_success zeroed */
+#if MULTIRAIL_EKC
+ kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
+#else
+ kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
+#endif
/* pointers/lists/locks initialised */
kqswnal_data.kqn_init = KQN_INIT_DATA;
-
+
#if MULTIRAIL_EKC
kqswnal_data.kqn_ep = ep_system();
if (kqswnal_data.kqn_ep == NULL) {
CERROR("Can't initialise EKC\n");
- return (-ENODEV);
+ kqswnal_shutdown(nal);
+ return (PTL_IFACE_INVALID);
}
if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
CERROR("Can't get elan ID\n");
- kqswnal_finalise();
- return (-ENODEV);
+ kqswnal_shutdown(nal);
+ return (PTL_IFACE_INVALID);
}
#else
/**********************************************************************/
if (kqswnal_data.kqn_ep == NULL)
{
CERROR ("Can't get elan device 0\n");
- return (-ENODEV);
+ kqswnal_shutdown(nal);
+ return (PTL_IFACE_INVALID);
}
#endif
if (kqswnal_data.kqn_eptx == NULL)
{
CERROR ("Can't allocate transmitter\n");
- kqswnal_finalise ();
- return (-ENOMEM);
+ kqswnal_shutdown (nal);
+ return (PTL_NO_SPACE);
}
/**********************************************************************/
if (kqswnal_data.kqn_eprx_small == NULL)
{
CERROR ("Can't install small msg receiver\n");
- kqswnal_finalise ();
- return (-ENOMEM);
+ kqswnal_shutdown (nal);
+ return (PTL_NO_SPACE);
}
kqswnal_data.kqn_eprx_large = ep_alloc_rcvr (kqswnal_data.kqn_ep,
if (kqswnal_data.kqn_eprx_large == NULL)
{
CERROR ("Can't install large msg receiver\n");
- kqswnal_finalise ();
- return (-ENOMEM);
+ kqswnal_shutdown (nal);
+ return (PTL_NO_SPACE);
}
/**********************************************************************/
EP_PERM_WRITE);
if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
CERROR("Can't reserve tx dma space\n");
- kqswnal_finalise();
- return (-ENOMEM);
+ kqswnal_shutdown(nal);
+ return (PTL_NO_SPACE);
}
#else
dmareq.Waitfn = DDI_DMA_SLEEP;
if (rc != DDI_SUCCESS)
{
CERROR ("Can't reserve rx dma space\n");
- kqswnal_finalise ();
- return (-ENOMEM);
+ kqswnal_shutdown (nal);
+ return (PTL_NO_SPACE);
}
#endif
/**********************************************************************/
EP_PERM_WRITE);
if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
CERROR("Can't reserve rx dma space\n");
- kqswnal_finalise();
- return (-ENOMEM);
+ kqswnal_shutdown(nal);
+ return (PTL_NO_SPACE);
}
#else
dmareq.Waitfn = DDI_DMA_SLEEP;
if (rc != DDI_SUCCESS)
{
CERROR ("Can't reserve rx dma space\n");
- kqswnal_finalise ();
- return (-ENOMEM);
+ kqswnal_shutdown (nal);
+ return (PTL_NO_SPACE);
}
#endif
/**********************************************************************/
/* Allocate/Initialise transmit descriptors */
- PORTAL_ALLOC(kqswnal_data.kqn_txds,
- sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
- if (kqswnal_data.kqn_txds == NULL)
- {
- kqswnal_finalise ();
- return (-ENOMEM);
- }
-
- /* clear flags, null pointers etc */
- memset(kqswnal_data.kqn_txds, 0,
- sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
+ kqswnal_data.kqn_txds = NULL;
for (i = 0; i < (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS); i++)
{
int premapped_pages;
- kqswnal_tx_t *ktx = &kqswnal_data.kqn_txds[i];
int basepage = i * KQSW_NTXMSGPAGES;
+ PORTAL_ALLOC (ktx, sizeof(*ktx));
+ if (ktx == NULL) {
+ kqswnal_shutdown (nal);
+ return (PTL_NO_SPACE);
+ }
+
+ memset(ktx, 0, sizeof(*ktx)); /* NULL pointers; zero flags */
+ ktx->ktx_alloclist = kqswnal_data.kqn_txds;
+ kqswnal_data.kqn_txds = ktx;
+
PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
if (ktx->ktx_buffer == NULL)
{
- kqswnal_finalise ();
- return (-ENOMEM);
+ kqswnal_shutdown (nal);
+ return (PTL_NO_SPACE);
}
/* Map pre-allocated buffer NOW, to save latency on transmit */
INIT_LIST_HEAD (&ktx->ktx_delayed_list);
ktx->ktx_state = KTX_IDLE;
+#if MULTIRAIL_EKC
+ ktx->ktx_rail = -1; /* unset rail */
+#endif
ktx->ktx_isnblk = (i >= KQSW_NTXMSGS);
list_add_tail (&ktx->ktx_list,
ktx->ktx_isnblk ? &kqswnal_data.kqn_nblk_idletxds :
/**********************************************************************/
/* Allocate/Initialise receive descriptors */
-
- PORTAL_ALLOC (kqswnal_data.kqn_rxds,
- sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
- if (kqswnal_data.kqn_rxds == NULL)
- {
- kqswnal_finalise ();
- return (-ENOMEM);
- }
-
- memset(kqswnal_data.kqn_rxds, 0, /* clear flags, null pointers etc */
- sizeof(kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL+KQSW_NRXMSGS_LARGE));
-
+ kqswnal_data.kqn_rxds = NULL;
elan_page_idx = 0;
for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
{
E3_Addr elanbuffer;
#endif
int j;
- kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
+
+ PORTAL_ALLOC(krx, sizeof(*krx));
+ if (krx == NULL) {
+ kqswnal_shutdown(nal);
+ return (PTL_NO_SPACE);
+ }
+
+ memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
+ krx->krx_alloclist = kqswnal_data.kqn_rxds;
+ kqswnal_data.kqn_rxds = krx;
if (i < KQSW_NRXMSGS_SMALL)
{
struct page *page = alloc_page(GFP_KERNEL);
if (page == NULL) {
- kqswnal_finalise ();
- return (-ENOMEM);
+ kqswnal_shutdown (nal);
+ return (PTL_NO_SPACE);
}
krx->krx_kiov[j].kiov_page = page;
/**********************************************************************/
/* Network interface ready to initialise */
- rc = PtlNIInit(kqswnal_init, 32, 4, 0, &kqswnal_ni);
- if (rc != 0)
+ my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
+ my_process_id.pid = requested_pid;
+
+ rc = lib_init(&kqswnal_lib, nal, my_process_id,
+ requested_limits, actual_limits);
+ if (rc != PTL_OK)
{
- CERROR ("PtlNIInit failed %d\n", rc);
- kqswnal_finalise ();
- return (-ENOMEM);
+ CERROR ("lib_init failed %d\n", rc);
+ kqswnal_shutdown (nal);
+ return (rc);
}
- kqswnal_data.kqn_init = KQN_INIT_PTL;
+ kqswnal_data.kqn_init = KQN_INIT_LIB;
/**********************************************************************/
/* Queue receives, now that it's OK to run their completion callbacks */
- for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++)
- {
- kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
-
+ for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
/* NB this enqueue can allocate/sleep (attr == 0) */
+ krx->krx_state = KRX_POSTED;
#if MULTIRAIL_EKC
rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
&krx->krx_elanbuffer, 0);
if (rc != EP_SUCCESS)
{
CERROR ("failed ep_queue_receive %d\n", rc);
- kqswnal_finalise ();
- return (-ENOMEM);
+ kqswnal_shutdown (nal);
+ return (PTL_FAIL);
}
}
/**********************************************************************/
/* Spawn scheduling threads */
- for (i = 0; i < smp_num_cpus; i++)
- {
+ for (i = 0; i < num_online_cpus(); i++) {
rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
if (rc != 0)
{
CERROR ("failed to spawn scheduling thread: %d\n", rc);
- kqswnal_finalise ();
- return (rc);
+ kqswnal_shutdown (nal);
+ return (PTL_FAIL);
}
}
rc = kpr_register (&kqswnal_data.kqn_router, &kqswnal_router_interface);
CDEBUG(D_NET, "Can't initialise routing interface (rc = %d): not routing\n",rc);
- rc = kportal_nal_register (QSWNAL, &kqswnal_cmd, NULL);
+ rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
if (rc != 0) {
CERROR ("Can't initialise command interface (rc = %d)\n", rc);
- kqswnal_finalise ();
- return (rc);
+ kqswnal_shutdown (nal);
+ return (PTL_FAIL);
}
-#if CONFIG_SYSCTL
- /* Press on regardless even if registering sysctl doesn't work */
- kqswnal_data.kqn_sysctl = register_sysctl_table (kqswnal_top_ctl_table, 0);
-#endif
-
- PORTAL_SYMBOL_REGISTER(kqswnal_ni);
kqswnal_data.kqn_init = KQN_INIT_ALL;
printk(KERN_INFO "Lustre: Routing QSW NAL loaded on node %d of %d "
kpr_routing (&kqswnal_data.kqn_router) ? "enabled" : "disabled",
pkmem);
- return (0);
+ return (PTL_OK);
+}
+
+void __exit
+kqswnal_finalise (void)
+{
+#if CONFIG_SYSCTL
+ if (kqswnal_tunables.kqn_sysctl != NULL)
+ unregister_sysctl_table (kqswnal_tunables.kqn_sysctl);
+#endif
+ PtlNIFini(kqswnal_ni);
+
+ ptl_unregister_nal(QSWNAL);
}
+static int __init
+kqswnal_initialise (void)
+{
+ int rc;
+
+ kqswnal_api.nal_ni_init = kqswnal_startup;
+ kqswnal_api.nal_ni_fini = kqswnal_shutdown;
+
+ /* Initialise dynamic tunables to defaults once only */
+ kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
+ kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
+
+ rc = ptl_register_nal(QSWNAL, &kqswnal_api);
+ if (rc != PTL_OK) {
+ CERROR("Can't register QSWNAL: %d\n", rc);
+ return (-ENOMEM); /* or something... */
+ }
+
+ /* Pure gateways, and the workaround for 'EKC blocks forever until
+ * the service is active' want the NAL started up at module load
+ * time... */
+ rc = PtlNIInit(QSWNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kqswnal_ni);
+ if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
+ ptl_unregister_nal(QSWNAL);
+ return (-ENODEV);
+ }
+
+#if CONFIG_SYSCTL
+ /* Press on regardless even if registering sysctl doesn't work */
+ kqswnal_tunables.kqn_sysctl =
+ register_sysctl_table (kqswnal_top_ctl_table, 0);
+#endif
+ return (0);
+}
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
MODULE_DESCRIPTION("Kernel Quadrics/Elan NAL v1.01");
module_init (kqswnal_initialise);
module_exit (kqswnal_finalise);
-
-EXPORT_SYMBOL (kqswnal_ni);