From 70e78f530589eee926b6523a9885b3627eee19e3 Mon Sep 17 00:00:00 2001 From: eeb Date: Fri, 19 Aug 2005 17:11:42 +0000 Subject: [PATCH] * cleaned up startup/shutdown handling * queue rx descriptors for handling by thread directly --- lnet/klnds/gmlnd/gmlnd.h | 353 +++++------- lnet/klnds/gmlnd/gmlnd_api.c | 428 +++++++------- lnet/klnds/gmlnd/gmlnd_cb.c | 141 ++--- lnet/klnds/gmlnd/gmlnd_comm.c | 594 +++++++------------- lnet/klnds/gmlnd/gmlnd_module.c | 60 +- lnet/klnds/gmlnd/gmlnd_utils.c | 1180 ++++++++++++++++++--------------------- 6 files changed, 1143 insertions(+), 1613 deletions(-) diff --git a/lnet/klnds/gmlnd/gmlnd.h b/lnet/klnds/gmlnd/gmlnd.h index fe39506..e2dad13 100644 --- a/lnet/klnds/gmlnd/gmlnd.h +++ b/lnet/klnds/gmlnd/gmlnd.h @@ -21,9 +21,9 @@ /* - * Portals GM kernel NAL header file - * This file makes all declaration and prototypes - * for the API side and CB side of the NAL + * Portals GM kernel NAL header file + * This file makes all declaration and prototypes + * for the API side and CB side of the NAL */ #ifndef __INCLUDE_GMNAL_H__ #define __INCLUDE_GMNAL_H__ @@ -85,255 +85,168 @@ #include "gm.h" #include "gm_internal.h" - - /* - * Defines for the API NAL + * Defines for the API NAL */ -/* - * Small message size is configurable - * insmod can set small_msg_size - * which is used to populate nal_data.small_msg_size - */ -#define GMNAL_MAGIC 0x1234abcd +/* Wire protocol */ -#define GMNAL_SMALL_MESSAGE 1078 +typedef struct { + ptl_hdr_t gmim_hdr; /* portals header */ + char gmim_payload[0]; /* payload */ +} gmnal_immediate_msg_t; -extern int num_rx_threads; -extern int num_stxds; -extern int gm_port_id; +typedef struct { + /* First 2 fields fixed FOR ALL TIME */ + __u32 gmm_magic; /* I'm a GM message */ + __u16 gmm_version; /* this is my version number */ -/* - * Small Transmit Descriptor - * A structre to keep track of a small transmit operation - * This structure has a one-to-one relationship with a small - * transmit buffer (both create by gmnal_stxd_alloc). - * There are two free list of stxd. One for use by clients of the NAL - * and the other by the NAL rxthreads when doing sends. - * This helps prevent deadlock caused by stxd starvation. - */ -typedef struct gmnal_stxd { - struct gmnal_stxd *tx_next; - void *tx_buffer; - int tx_buffer_size; - gm_size_t tx_gm_size; - int tx_msg_size; - int tx_gmlid; - int tx_gm_priority; - int tx_type; + __u16 gmm_type; /* msg type */ + __u64 gmm_srcnid; /* sender's NID */ + __u64 gmm_dstnid; /* destination's NID */ + union { + gmnal_immediate_msg_t immediate; + } gmm_u; +} WIRE_ATTR gmnal_msg_t; + +#define GMNAL_MSG_MAGIC 0x6d797269 /* 'myri'! */ +#define GMNAL_MSG_VERSION 1 +#define GMNAL_MSG_IMMEDIATE 1 + +typedef struct gmnal_tx { + struct gmnal_tx *tx_next; + gmnal_msg_t *tx_msg; + int tx_buffer_size; + gm_size_t tx_gm_size; + int tx_msg_size; + int tx_gmlid; + int tx_gm_priority; ptl_nid_t tx_nid; - struct gmnal_ni *tx_gmni; - lib_msg_t *tx_cookie; - int tx_niov; + struct gmnal_ni *tx_gmni; + lib_msg_t *tx_libmsg; int tx_rxt; - int tx_kniov; - struct iovec *tx_iovec_dup; - struct iovec tx_iov[PTL_MD_MAX_IOV]; -} gmnal_stxd_t; +} gmnal_tx_t; /* - * as for gmnal_stxd_t - * a hash table in nal_data find srxds from - * the rx buffer address. hash table populated at init time + * as for gmnal_tx_t + * a hash table in nal_data find rxs from + * the rx buffer address. hash table populated at init time */ -typedef struct gmnal_srxd { - void *rx_buffer; - int rx_size; - gm_size_t rx_gmsize; - unsigned int rx_sender_gmid; - __u64 rx_source_stxd; - int rx_type; - int rx_nsiov; - int rx_nriov; - struct iovec *rx_riov; - int rx_ncallbacks; - spinlock_t rx_callback_lock; - int rx_callback_status; - lib_msg_t *rx_cookie; - struct gmnal_srxd *rx_next; - struct gmnal_ni *rx_gmni; -} gmnal_srxd_t; +typedef struct gmnal_rx { + struct list_head rx_list; + gmnal_msg_t *rx_msg; + int rx_size; + gm_size_t rx_gmsize; + unsigned int rx_recv_nob; + __u16 rx_recv_gmid; + __u8 rx_recv_port; + __u8 rx_recv_type; + struct gmnal_rx *rx_next; +} gmnal_rx_t; -/* - * Header which lmgnal puts at the start of each message - * watch alignment for ia32/64 interaction - */ -typedef struct gmnal_msghdr { - __s32 gmm_magic; - __s32 gmm_type; - __s32 gmm_niov; - __u32 gmm_sender_gmid; - __u64 gmm_stxd_remote_ptr; -} WIRE_ATTR gmnal_msghdr_t; - -/* - * the caretaker thread (ct_thread) gets receive events - * (and other events) from the myrinet device via the GM2 API. - * caretaker thread populates one work entry for each receive event, - * puts it on a Q in nal_data and wakes a receive thread to - * process the receive. - * Processing a portals receive can involve a transmit operation. - * Because of this the caretaker thread cannot process receives - * as it may get deadlocked when supply of transmit descriptors - * is exhausted (as caretaker thread is responsible for replacing - * transmit descriptors on the free list) - */ -typedef struct gmnal_rxtwe { - void *buffer; - unsigned snode; - unsigned sport; - unsigned type; - unsigned length; - struct gmnal_rxtwe *next; -} gmnal_rxtwe_t; /* - * 1 receive thread started on each CPU + * 1 receive thread started on each CPU */ #define NRXTHREADS 10 /* max number of receiver threads */ typedef struct gmnal_ni { - spinlock_t gmni_stxd_lock; - struct semaphore gmni_stxd_token; - gmnal_stxd_t *gmni_stxd; - spinlock_t gmni_rxt_stxd_lock; - struct semaphore gmni_rxt_stxd_token; - gmnal_stxd_t *gmni_rxt_stxd; - gmnal_srxd_t *gmni_srxd; - struct gm_hash *gmni_srxd_hash; - nal_t *gmni_nal; - lib_nal_t *gmni_libnal; - struct gm_port *gmni_port; - __u32 gmni_local_gmid; - __u32 gmni_global_gmid; - spinlock_t gmni_gm_lock; /* serialise GM calls */ - long gmni_rxthread_pid[NRXTHREADS]; - int gmni_rxthread_stop_flag; - spinlock_t gmni_rxthread_flag_lock; - long gmni_rxthread_flag; - long gmni_ctthread_pid; - int gmni_ctthread_flag; - gm_alarm_t gmni_ctthread_alarm; - int gmni_small_msg_size; - int gmni_small_msg_gmsize; - gmnal_rxtwe_t *gmni_rxtwe_head; - gmnal_rxtwe_t *gmni_rxtwe_tail; - spinlock_t gmni_rxtwe_lock; - struct semaphore gmni_rxtwe_wait; + spinlock_t gmni_tx_lock; + struct semaphore gmni_tx_token; + gmnal_tx_t *gmni_tx; + spinlock_t gmni_rxt_tx_lock; + struct semaphore gmni_rxt_tx_token; + gmnal_tx_t *gmni_rxt_tx; + gmnal_rx_t *gmni_rx; + struct gm_hash *gmni_rx_hash; + lib_nal_t *gmni_libnal; + struct gm_port *gmni_port; + spinlock_t gmni_gm_lock; /* serialise GM calls */ + long gmni_rxthread_pid[NRXTHREADS]; + int gmni_rxthread_stop_flag; + spinlock_t gmni_rxthread_flag_lock; + long gmni_rxthread_flag; + long gmni_ctthread_pid; + int gmni_ctthread_flag; + gm_alarm_t gmni_ctthread_alarm; + int gmni_msg_size; + struct list_head gmni_rxq; + spinlock_t gmni_rxq_lock; + struct semaphore gmni_rxq_wait; } gmnal_ni_t; /* - * Flags to start/stop and check status of threads - * each rxthread sets 1 bit (any bit) of the flag on startup - * and clears 1 bit when exiting + * Flags to start/stop and check status of threads + * each rxthread sets 1 bit (any bit) of the flag on startup + * and clears 1 bit when exiting */ -#define GMNAL_THREAD_RESET 0 -#define GMNAL_THREAD_STOP 666 -#define GMNAL_CTTHREAD_STARTED 333 +#define GMNAL_THREAD_RESET 0 +#define GMNAL_THREAD_STOP 666 +#define GMNAL_CTTHREAD_STARTED 333 #define GMNAL_RXTHREADS_STARTED ( (1<pcfg_command, private); + gmnalni = (gmnal_ni_t*)private; + + switch(pcfg->pcfg_command) { + case GMNAL_IOC_GET_GNID: + + PORTAL_ALLOC(name, pcfg->pcfg_plen1); + copy_from_user(name, PCFG_PBUF(pcfg, 1), pcfg->pcfg_plen1); + + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_host_name_to_node_id_ex(gmnalni->gmni_port, 0, + name, &nid); + spin_unlock(&gmnalni->gmni_gm_lock); + if (gm_status != GM_SUCCESS) { + CDEBUG(D_NET, "gm_host_name_to_node_id_ex(...host %s) " + "failed[%d]\n", name, gm_status); + return -ENOENT; + } + + CDEBUG(D_NET, "Local node %s id is [%d]\n", name, nid); + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, + nid, &gmid); + spin_unlock(&gmnalni->gmni_gm_lock); + if (gm_status != GM_SUCCESS) { + CDEBUG(D_NET, "gm_node_id_to_global_id failed[%d]\n", + gm_status); + return -ENOENT; + } + + CDEBUG(D_NET, "Global node is is [%u][%x]\n", gmid, gmid); + copy_to_user(PCFG_PBUF(pcfg, 2), &gmid, pcfg->pcfg_plen2); + return 0; + + case NAL_CMD_REGISTER_MYNID: + /* Same NID OK */ + if (pcfg->pcfg_nid == gmnalni->gmni_libnal->libnal_ni.ni_pid.nid) + return 0; + + CERROR("Can't change NID from "LPD64" to "LPD64"\n", + gmnalni->gmni_libnal->libnal_ni.ni_pid.nid, + pcfg->pcfg_nid); + return -EINVAL; + + default: + CERROR ("gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command); + return -EINVAL; + } + /* not reached */ +} + +ptl_nid_t +gmnal_get_local_nid (gmnal_ni_t *gmnalni) +{ + unsigned int local_gmid; + unsigned int global_gmid; + ptl_nid_t nid; + gm_status_t gm_status; + + /* Called before anything initialised: no need to lock */ + + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_get_node_id(gmnalni->gmni_port, &local_gmid); + spin_unlock(&gmnalni->gmni_gm_lock); + if (gm_status != GM_SUCCESS) + return PTL_NID_ANY; + + CDEBUG(D_NET, "Local node id is [%u]\n", local_gmid); + + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, + local_gmid, + &global_gmid); + spin_unlock(&gmnalni->gmni_gm_lock); + if (gm_status != GM_SUCCESS) + return PTL_NID_ANY; + + CDEBUG(D_NET, "Global node id is [%u]\n", global_gmid); + + nid = (__u64)global_gmid; + LASSERT (nid != PTL_NID_ANY); + + return global_gmid; +} -extern int gmnal_cmd(struct portals_cfg *pcfg, void *private); -/* - * gmnal_api_shutdown - * nal_refct == 0 => called on last matching PtlNIFini() - * Close down this interface and free any resources associated with it - * nal_t nal our nal to shutdown - */ void gmnal_api_shutdown(nal_t *nal) { - gmnal_ni_t *gmnalni; - lib_nal_t *libnal; + lib_nal_t *libnal = nal->nal_data; + gmnal_ni_t *gmnalni = libnal->libnal_data; if (nal->nal_refct != 0) { /* This module got the first ref */ @@ -47,32 +133,28 @@ gmnal_api_shutdown(nal_t *nal) return; } - libnal = (lib_nal_t *)nal->nal_data; - gmnalni = (gmnal_ni_t *)libnal->libnal_data; CDEBUG(D_TRACE, "gmnal_api_shutdown: gmnalni [%p]\n", gmnalni); /* Stop portals calling our ioctl handler */ libcfs_nal_cmd_unregister(GMNAL); - /* XXX for shutdown "under fire" we probably need to set a shutdown - * flag so when lib calls us we fail immediately and dont queue any - * more work but our threads can still call into lib OK. THEN - * shutdown our threads, THEN lib_fini() */ - lib_fini(libnal); - - gmnal_stop_rxthread(gmnalni); + /* stop processing messages */ gmnal_stop_ctthread(gmnalni); - gmnal_free_txd(gmnalni); - gmnal_free_srxd(gmnalni); + gmnal_stop_rxthread(gmnalni); + spin_lock(&gmnalni->gmni_gm_lock); gm_close(gmnalni->gmni_port); gm_finalize(); spin_unlock(&gmnalni->gmni_gm_lock); - /* Don't free 'nal'; it's a static struct */ - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); -} + lib_fini(libnal); + + gmnal_free_txs(gmnalni); + gmnal_free_rxs(gmnalni); + + PORTAL_FREE(gmnalni, sizeof(*gmnalni)); + PORTAL_FREE(libnal, sizeof(*libnal)); +} int gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, @@ -82,10 +164,10 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, lib_nal_t *libnal = NULL; gmnal_ni_t *gmnalni = NULL; - gmnal_srxd_t *srxd = NULL; - gm_status_t gm_status; - unsigned int local_gmid = 0, global_gmid = 0; + gmnal_rx_t *rx = NULL; + gm_status_t gm_status; ptl_process_id_t process_id; + int rc; if (nal->nal_refct != 0) { if (actual_limits != NULL) { @@ -93,49 +175,36 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, *actual_limits = libnal->libnal_ni.ni_actual_limits; } PORTAL_MODULE_USE; - return (PTL_OK); + return PTL_OK; } /* Called on first PtlNIInit() */ - CDEBUG(D_TRACE, "startup\n"); - PORTAL_ALLOC(gmnalni, sizeof(gmnal_ni_t)); - if (!gmnalni) { - CERROR("can't get memory\n"); - return(PTL_NO_SPACE); + PORTAL_ALLOC(gmnalni, sizeof(*gmnalni)); + if (gmnalni == NULL) { + CERROR("can't allocate gmnalni\n"); + return PTL_FAIL; + } + + PORTAL_ALLOC(libnal, sizeof(*libnal)); + if (libnal == NULL) { + CERROR("can't allocate lib_nal\n"); + goto failed_0; } - memset(gmnalni, 0, sizeof(gmnal_ni_t)); - /* - * set the small message buffer size - */ - - CDEBUG(D_NET, "Allocd and reset gmnalni[%p]\n", gmnalni); - CDEBUG(D_NET, "small_msg_size is [%d]\n", gmnalni->gmni_small_msg_size); - PORTAL_ALLOC(libnal, sizeof(lib_nal_t)); - if (!libnal) { - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - return(PTL_NO_SPACE); - } - - memset(libnal, 0, sizeof(lib_nal_t)); - libnal->libnal_send = gmnal_cb_send; - libnal->libnal_send_pages = gmnal_cb_send_pages; - libnal->libnal_recv = gmnal_cb_recv; - libnal->libnal_recv_pages = gmnal_cb_recv_pages; - libnal->libnal_map = NULL; - libnal->libnal_unmap = NULL; - libnal->libnal_dist = gmnal_cb_dist; - libnal->libnal_data = gmnalni; - - CDEBUG(D_NET, "Allocd and reset libnal[%p]\n", libnal); - - gmnalni->gmni_nal = nal; + memset(gmnalni, 0, sizeof(*gmnalni)); gmnalni->gmni_libnal = libnal; - spin_lock_init(&gmnalni->gmni_gm_lock); + *libnal = (lib_nal_t) { + .libnal_send = gmnal_cb_send, + .libnal_send_pages = gmnal_cb_send_pages, + .libnal_recv = gmnal_cb_recv, + .libnal_recv_pages = gmnal_cb_recv_pages, + .libnal_dist = gmnal_cb_dist, + .libnal_data = gmnalni, + }; /* * initialise the interface, @@ -143,12 +212,9 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, CDEBUG(D_NET, "Calling gm_init\n"); if (gm_init() != GM_SUCCESS) { CERROR("call to gm_init failed\n"); - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); - return(PTL_FAIL); + goto failed_1; } - CDEBUG(D_NET, "Calling gm_open with port [%d], " "name [%s], version [%d]\n", gm_port_id, "gmnal", GM_API_VERSION); @@ -158,202 +224,94 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, GM_API_VERSION); spin_unlock(&gmnalni->gmni_gm_lock); - CDEBUG(D_NET, "gm_open returned [%d]\n", gm_status); - if (gm_status == GM_SUCCESS) { - CDEBUG(D_NET,"gm_open succeeded port[%p]\n",gmnalni->gmni_port); - } else { - switch(gm_status) { - case(GM_INVALID_PARAMETER): - CERROR("gm_open Failure. Invalid Parameter\n"); - break; - case(GM_BUSY): - CERROR("gm_open Failure. GM Busy\n"); - break; - case(GM_NO_SUCH_DEVICE): - CERROR("gm_open Failure. No such device\n"); - break; - case(GM_INCOMPATIBLE_LIB_AND_DRIVER): - CERROR("gm_open Failure. Incompatile lib and driver\n"); - break; - case(GM_OUT_OF_MEMORY): - CERROR("gm_open Failure. Out of Memory\n"); - break; - default: - CERROR("gm_open Failure. Unknow error code [%d]\n", - gm_status); - break; - } - spin_lock(&gmnalni->gmni_gm_lock); - gm_finalize(); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); - return(PTL_FAIL); + if (gm_status != GM_SUCCESS) { + CERROR("Can't open GM port %d: %d (%s)\n", + gm_port_id, gm_status, gmnal_gmstatus2str(gm_status)); + goto failed_2; } - gmnalni->gmni_small_msg_size = sizeof(gmnal_msghdr_t) + - sizeof(ptl_hdr_t) + - PTL_MTU + - 928; /* !! */ - CWARN("Msg size %08x\n", gmnalni->gmni_small_msg_size); - - gmnalni->gmni_small_msg_gmsize = - gm_min_size_for_length(gmnalni->gmni_small_msg_size); - - if (gmnal_alloc_srxd(gmnalni) != 0) { - CERROR("Failed to allocate small rx descriptors\n"); - gmnal_free_txd(gmnalni); - spin_lock(&gmnalni->gmni_gm_lock); - gm_close(gmnalni->gmni_port); - gm_finalize(); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); - return(PTL_FAIL); - } + CDEBUG(D_NET,"gm_open succeeded port[%p]\n",gmnalni->gmni_port); + gmnalni->gmni_msg_size = offsetof(gmnal_msg_t, + gmm_u.immediate.gmim_payload[PTL_MTU]); + CWARN("Msg size %08x\n", gmnalni->gmni_msg_size); - /* - * Hang out a bunch of small receive buffers - * In fact hang them all out - */ - for (srxd = gmnalni->gmni_srxd; srxd != NULL; srxd = srxd->rx_next) { - CDEBUG(D_NET, "giving [%p] to gm_provide_recvive_buffer\n", - srxd->rx_buffer); - spin_lock(&gmnalni->gmni_gm_lock); - gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, - srxd->rx_buffer, - srxd->rx_gmsize, - GM_LOW_PRIORITY, 0); - spin_unlock(&gmnalni->gmni_gm_lock); - } - - /* - * Allocate pools of small tx buffers and descriptors - */ - if (gmnal_alloc_txd(gmnalni) != 0) { - CERROR("Failed to allocate small tx descriptors\n"); - spin_lock(&gmnalni->gmni_gm_lock); - gm_close(gmnalni->gmni_port); - gm_finalize(); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); - return(PTL_FAIL); + if (gmnal_alloc_rxs(gmnalni) != 0) { + CERROR("Failed to allocate rx descriptors\n"); + goto failed_3; } - /* - * Initialise the portals library - */ - CDEBUG(D_NET, "Getting node id\n"); - spin_lock(&gmnalni->gmni_gm_lock); - gm_status = gm_get_node_id(gmnalni->gmni_port, &local_gmid); - spin_unlock(&gmnalni->gmni_gm_lock); - if (gm_status != GM_SUCCESS) { - gmnal_stop_rxthread(gmnalni); - gmnal_stop_ctthread(gmnalni); - CERROR("can't determine node id\n"); - gmnal_free_txd(gmnalni); - gmnal_free_srxd(gmnalni); - spin_lock(&gmnalni->gmni_gm_lock); - gm_close(gmnalni->gmni_port); - gm_finalize(); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); - return(PTL_FAIL); + if (gmnal_alloc_txs(gmnalni) != 0) { + CERROR("Failed to allocate tx descriptors\n"); + goto failed_3; } - gmnalni->gmni_local_gmid = local_gmid; - CDEBUG(D_NET, "Local node id is [%u]\n", local_gmid); - - spin_lock(&gmnalni->gmni_gm_lock); - gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, - local_gmid, - &global_gmid); - spin_unlock(&gmnalni->gmni_gm_lock); - if (gm_status != GM_SUCCESS) { - CERROR("failed to obtain global id\n"); - gmnal_stop_rxthread(gmnalni); - gmnal_stop_ctthread(gmnalni); - gmnal_free_txd(gmnalni); - gmnal_free_srxd(gmnalni); - spin_lock(&gmnalni->gmni_gm_lock); - gm_close(gmnalni->gmni_port); - gm_finalize(); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); - return(PTL_FAIL); - } - CDEBUG(D_NET, "Global node id is [%u]\n", global_gmid); - gmnalni->gmni_global_gmid = global_gmid; - -/* - pid = gm_getpid(); -*/ process_id.pid = requested_pid; - process_id.nid = global_gmid; + process_id.nid = gmnal_get_local_nid(gmnalni); + if (process_id.nid == PTL_NID_ANY) + goto failed_3; CDEBUG(D_NET, "portals_pid is [%u]\n", process_id.pid); CDEBUG(D_NET, "portals_nid is ["LPU64"]\n", process_id.nid); - CDEBUG(D_PORTALS, "calling lib_init\n"); + /* Hang out a bunch of small receive buffers + * In fact hang them all out */ + for (rx = gmnalni->gmni_rx; rx != NULL; rx = rx->rx_next) + gmnal_post_rx(gmnalni, rx); + if (lib_init(libnal, nal, process_id, requested_limits, actual_limits) != PTL_OK) { CERROR("lib_init failed\n"); - gmnal_stop_rxthread(gmnalni); - gmnal_stop_ctthread(gmnalni); - gmnal_free_txd(gmnalni); - gmnal_free_srxd(gmnalni); - spin_lock(&gmnalni->gmni_gm_lock); - gm_close(gmnalni->gmni_port); - gm_finalize(); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); - return(PTL_FAIL); + goto failed_3; } - /* - * Now that we have initialised the portals library, start receive threads, - * we do this to avoid processing messages before we can parse them - */ - gmnal_start_kernel_threads(gmnalni); + /* Now that we have initialised the portals library, start receive + * threads, we do this to avoid processing messages before we can parse + * them */ + rc = gmnal_start_kernel_threads(gmnalni); + if (rc != 0) { + CERROR("Can't start threads: %d\n", rc); + goto failed_3; + } - while (gmnalni->gmni_rxthread_flag != GMNAL_RXTHREADS_STARTED) { - gmnal_yield(1); - CDEBUG(D_NET, "Waiting for receive thread signs of life\n"); - } + rc = libcfs_nal_cmd_register(GMNAL, &gmnal_cmd, libnal->libnal_data); + if (rc != 0) { + CDEBUG(D_NET, "libcfs_nal_cmd_register failed: %d\n", rc); + goto failed_4; + } - CDEBUG(D_NET, "receive thread seems to have started\n"); + CDEBUG(D_NET, "gmnal_init finished\n"); + return PTL_OK; - if (libcfs_nal_cmd_register(GMNAL, &gmnal_cmd, libnal->libnal_data) != 0) { - CDEBUG(D_NET, "libcfs_nal_cmd_register failed\n"); + failed_4: + gmnal_stop_rxthread(gmnalni); + gmnal_stop_ctthread(gmnalni); - /* XXX these cleanup cases should be restructured to - * minimise duplication... */ - lib_fini(libnal); - - gmnal_stop_rxthread(gmnalni); - gmnal_stop_ctthread(gmnalni); - gmnal_free_txd(gmnalni); - gmnal_free_srxd(gmnalni); - spin_lock(&gmnalni->gmni_gm_lock); - gm_close(gmnalni->gmni_port); - gm_finalize(); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); - PORTAL_FREE(libnal, sizeof(lib_nal_t)); - return(PTL_FAIL); - } + failed_3: + spin_lock(&gmnalni->gmni_gm_lock); + gm_close(gmnalni->gmni_port); + spin_unlock(&gmnalni->gmni_gm_lock); - CDEBUG(D_NET, "gmnal_init finished\n"); + failed_2: + spin_lock(&gmnalni->gmni_gm_lock); + gm_finalize(); + spin_unlock(&gmnalni->gmni_gm_lock); + + /* safe to free buffers after network has been shut down */ + gmnal_free_txs(gmnalni); + gmnal_free_rxs(gmnalni); - return(PTL_OK); + failed_1: + PORTAL_FREE(libnal, sizeof(*libnal)); + + failed_0: + PORTAL_FREE(gmnalni, sizeof(*gmnalni)); + + return PTL_FAIL; } -nal_t the_gm_nal; +ptl_handle_ni_t kgmnal_ni; +nal_t the_gm_nal; /* * Called when module loaded diff --git a/lnet/klnds/gmlnd/gmlnd_cb.c b/lnet/klnds/gmlnd/gmlnd_cb.c index ac4c485e..d7e7f5b 100644 --- a/lnet/klnds/gmlnd/gmlnd_cb.c +++ b/lnet/klnds/gmlnd/gmlnd_cb.c @@ -27,26 +27,35 @@ #include "gmnal.h" -ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - unsigned int niov, struct iovec *iov, size_t offset, - size_t mlen, size_t rlen) +ptl_err_t +gmnal_cb_recv(lib_nal_t *libnal, void *private, + lib_msg_t *libmsg, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; + gmnal_rx_t *rx = (gmnal_rx_t*)private; + gmnal_msg_t *msg = rx->rx_msg; size_t nobleft = mlen; - void *buffer = NULL; + int rxnob; + char *buffer; size_t nob; - CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], " + CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], libmsg[%p], " "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", - libnal, private, cookie, niov, iov, offset, mlen, rlen); + libnal, private, libmsg, niov, iov, offset, mlen, rlen); - LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE); + LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE); - buffer = srxd->rx_buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); - - while(nobleft > 0) { + buffer = &msg->gmm_u.immediate.gmim_payload[0]; + rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]); + + if (rx->rx_recv_nob < rxnob) { + CERROR("Short message from nid "LPD64": got %d, need %d\n", + msg->gmm_srcnid, rx->rx_recv_nob, rxnob); + return PTL_FAIL; + } + + while (nobleft > 0) { LASSERT (niov > 0); if (offset >= iov->iov_len) { @@ -64,32 +73,40 @@ ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie, iov++; } - lib_finalize(libnal, private, cookie, PTL_OK); + lib_finalize(libnal, private, libmsg, PTL_OK); return PTL_OK; } -ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, - lib_msg_t *cookie, unsigned int nkiov, - ptl_kiov_t *kiov, size_t offset, size_t mlen, - size_t rlen) +ptl_err_t +gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, + lib_msg_t *libmsg, + unsigned int nkiov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; + gmnal_rx_t *rx = (gmnal_rx_t*)private; + gmnal_msg_t *msg = rx->rx_msg; size_t nobleft = mlen; + int rxnob; size_t nob; char *ptr; void *buffer; CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], " - "cookie[%p], kniov[%d], kiov [%p], " + "libmsg[%p], kniov[%d], kiov [%p], " "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", - libnal, private, cookie, nkiov, kiov, offset, mlen, rlen); + libnal, private, libmsg, nkiov, kiov, offset, mlen, rlen); - LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE); + LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE); - buffer = srxd->rx_buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); + buffer = &msg->gmm_u.immediate.gmim_payload[0]; + rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]); + if (rx->rx_recv_nob < rxnob) { + CERROR("Short message from nid "LPD64": got %d, need %d\n", + msg->gmm_srcnid, rx->rx_recv_nob, rxnob); + return PTL_FAIL; + } + while (nobleft > 0) { LASSERT (nkiov > 0); @@ -113,24 +130,23 @@ ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, nkiov--; } - lib_finalize(libnal, private, cookie, PTL_OK); - + lib_finalize(libnal, private, libmsg, PTL_OK); return PTL_OK; } - -ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, struct iovec *iov, - size_t offset, size_t len) +ptl_err_t +gmnal_cb_send(lib_nal_t *libnal, void *private, + lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, + ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, struct iovec *iov, + size_t offset, size_t len) { gmnal_ni_t *gmnalni = libnal->libnal_data; - void *buffer = NULL; - gmnal_stxd_t *stxd = NULL; size_t nobleft = len; + void *buffer; + gmnal_tx_t *tx; size_t nob; - ptl_err_t rc; CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] " "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid); @@ -140,13 +156,13 @@ ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie, return PTL_FAIL; } - stxd = gmnal_get_stxd(gmnalni, 1); - CDEBUG(D_NET, "stxd [%p]\n", stxd); + tx = gmnal_get_tx(gmnalni, 1); - /* Set the offset of the data to copy into the buffer */ - buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t); + gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE); + gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr)); - while(nobleft > 0) { + buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0]; + while (nobleft > 0) { LASSERT (niov > 0); if (offset >= iov->iov_len) { @@ -163,27 +179,24 @@ ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie, niov--; iov++; } - - rc = gmnal_small_tx(libnal, private, cookie, hdr, type, - nid, stxd, len); - if (rc != PTL_OK) - gmnal_return_stxd(gmnalni, stxd); - - return rc; + + nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]); + return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob); } -ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private, - lib_msg_t *cookie, ptl_hdr_t *hdr, int type, - ptl_nid_t nid, ptl_pid_t pid, unsigned int nkiov, - ptl_kiov_t *kiov, size_t offset, size_t len) +ptl_err_t +gmnal_cb_send_pages(lib_nal_t *libnal, void *private, + lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, + ptl_nid_t nid, ptl_pid_t pid, + unsigned int nkiov, ptl_kiov_t *kiov, + size_t offset, size_t len) { gmnal_ni_t *gmnalni = libnal->libnal_data; - void *buffer = NULL; - gmnal_stxd_t *stxd = NULL; size_t nobleft = len; + void *buffer; + gmnal_tx_t *tx; char *ptr; - ptl_err_t rc; size_t nob; CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset[" @@ -194,12 +207,12 @@ ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private, return PTL_FAIL; } - stxd = gmnal_get_stxd(gmnalni, 1); - CDEBUG(D_NET, "stxd [%p]\n", stxd); + tx = gmnal_get_tx(gmnalni, 1); - /* Set the offset of the data to copy into the buffer */ - buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t); + gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE); + gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr)); + buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0]; while (nobleft > 0) { LASSERT (nkiov > 0); @@ -223,16 +236,12 @@ ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private, kiov++; } - rc = gmnal_small_tx(libnal, private, cookie, hdr, type, - nid, stxd, len); - - if (rc != PTL_OK) - gmnal_return_stxd(gmnalni, stxd); - - return rc; + nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]); + return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob); } -int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist) +int +gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist) { CDEBUG(D_TRACE, "gmnal_cb_dist\n"); diff --git a/lnet/klnds/gmlnd/gmlnd_comm.c b/lnet/klnds/gmlnd/gmlnd_comm.c index 4720099..b5213a5 100644 --- a/lnet/klnds/gmlnd/gmlnd_comm.c +++ b/lnet/klnds/gmlnd/gmlnd_comm.c @@ -25,6 +25,96 @@ #include "gmnal.h" +void +gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx, + ptl_nid_t dstnid, int type) +{ + gmnal_msg_t *msg = tx->tx_msg; + + /* CAVEAT EMPTOR! this only sets the common message fields. */ + msg->gmm_magic = GMNAL_MSG_MAGIC; + msg->gmm_version = GMNAL_MSG_VERSION; + msg->gmm_type = type; + msg->gmm_srcnid = gmnalni->gmni_libnal->libnal_ni.ni_pid.nid; + msg->gmm_dstnid = dstnid; +} + +int +gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx) +{ + gmnal_msg_t *msg = rx->rx_msg; + const int hdr_size = offsetof(gmnal_msg_t, gmm_u); + int flip; + + /* 6 bytes are enough to have received magic + version */ + if (rx->rx_recv_nob < 6) { + CERROR("Short message from gmid %u: %d\n", + rx->rx_recv_gmid, rx->rx_recv_nob); + return -EPROTO; + } + + if (msg->gmm_magic == GMNAL_MSG_MAGIC) { + flip = 0; + } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) { + flip = 1; + } else { + CERROR("Bad magic from gmid %u: %08x\n", + rx->rx_recv_gmid, msg->gmm_magic); + return -EPROTO; + } + + if (msg->gmm_version != + (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) { + CERROR("Bad version from gmid %u: %d\n", + rx->rx_recv_gmid, msg->gmm_version); + return -EPROTO; + } + + if (rx->rx_recv_nob < hdr_size) { + CERROR("Short message from %u: %d\n", + rx->rx_recv_gmid, rx->rx_recv_nob); + return -EPROTO; + } + + if (flip) { + /* leave magic unflipped as a clue to peer endianness */ + __swab16s(&msg->gmm_version); + __swab16s(&msg->gmm_type); + __swab64s(&msg->gmm_srcnid); + __swab64s(&msg->gmm_dstnid); + } + + if (msg->gmm_srcnid == PTL_NID_ANY) { + CERROR("Bad src nid from %u: "LPX64"\n", + rx->rx_recv_gmid, msg->gmm_srcnid); + return -EPROTO; + } + + if (msg->gmm_dstnid != gmnalni->gmni_libnal->libnal_ni.ni_pid.nid) { + CERROR("Bad dst nid from %u: "LPX64"\n", + rx->rx_recv_gmid, msg->gmm_dstnid); + return -EPROTO; + } + + switch (msg->gmm_type) { + default: + CERROR("Unknown message type from %u: %x\n", + rx->rx_recv_gmid, msg->gmm_type); + return -EPROTO; + + case GMNAL_MSG_IMMEDIATE: + if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) { + CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n", + rx->rx_recv_gmid, rx->rx_recv_nob, + offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])); + return -EPROTO; + } + break; + } + return 0; +} + + /* * The caretaker thread * This is main thread of execution for the NAL side @@ -36,74 +126,38 @@ int gmnal_ct_thread(void *arg) { - gmnal_ni_t *gmnalni; + gmnal_ni_t *gmnalni = arg; gm_recv_event_t *rxevent = NULL; gm_recv_t *recv = NULL; - if (!arg) { - CDEBUG(D_NET, "NO gmnalni. Exiting\n"); - return(-1); - } - - gmnalni = (gmnal_ni_t*)arg; - CDEBUG(D_NET, "gmnalni is [%p]\n", arg); - sprintf(current->comm, "gmnal_ct"); - kportal_daemonize("gmnalctd"); gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED; - spin_lock(&gmnalni->gmni_gm_lock); while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) { - CDEBUG(D_NET, "waiting\n"); + + spin_lock(&gmnalni->gmni_gm_lock); rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port); + spin_unlock(&gmnalni->gmni_gm_lock); + if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) { CDEBUG(D_NET, "time to exit\n"); break; } - CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent(rxevent)); - switch (GM_RECV_EVENT_TYPE(rxevent)) { - - case(GM_RECV_EVENT): - CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n"); - recv = (gm_recv_t*)&rxevent->recv; - spin_unlock(&gmnalni->gmni_gm_lock); - gmnal_add_rxtwe(gmnalni, recv); - spin_lock(&gmnalni->gmni_gm_lock); - CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n"); - break; - case(_GM_SLEEP_EVENT): - /* - * Blocking receive above just returns - * immediatly with _GM_SLEEP_EVENT - * Don't know what this is - */ - CDEBUG(D_NET, "Sleeping in gm_unknown\n"); - spin_unlock(&gmnalni->gmni_gm_lock); - gm_unknown(gmnalni->gmni_port, rxevent); - spin_lock(&gmnalni->gmni_gm_lock); - CDEBUG(D_NET, "Awake from gm_unknown\n"); - break; - - default: - /* - * Don't know what this is - * gm_unknown will make sense of it - * Should be able to do something with - * FAST_RECV_EVENTS here. - */ - CDEBUG(D_NET, "Passing event to gm_unknown\n"); - spin_unlock(&gmnalni->gmni_gm_lock); - gm_unknown(gmnalni->gmni_port, rxevent); - spin_lock(&gmnalni->gmni_gm_lock); - CDEBUG(D_NET, "Processed unknown event\n"); + + CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent)); + + if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) { + recv = (gm_recv_t*)&rxevent->recv; + gmnal_enqueue_rx(gmnalni, recv); + } else { + gm_unknown(gmnalni->gmni_port, rxevent); } } - spin_unlock(&gmnalni->gmni_gm_lock); + gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET; CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni); - return 0; } @@ -114,19 +168,10 @@ gmnal_ct_thread(void *arg) int gmnal_rx_thread(void *arg) { - char name[16]; - gmnal_ni_t *gmnalni; - void *buffer; - gmnal_rxtwe_t *we = NULL; - int rank; - - if (!arg) { - CDEBUG(D_NET, "NO gmnalni. Exiting\n"); - return(-1); - } - - gmnalni = (gmnal_ni_t*)arg; - CDEBUG(D_NET, "gmnalni is [%p]\n", arg); + gmnal_ni_t *gmnalni = arg; + char name[16]; + gmnal_rx_t *rx; + int rank; for (rank=0; rankgmni_rxthread_pid[rank] == current->pid) @@ -144,385 +189,160 @@ gmnal_rx_thread(void *arg) gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1; else gmnalni->gmni_rxthread_flag = 1; - CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag); spin_unlock(&gmnalni->gmni_rxthread_flag_lock); while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) { CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n"); - we = gmnal_get_rxtwe(gmnalni); - if (!we) { + + rx = gmnal_dequeue_rx(gmnalni); + if (rx == NULL) { CDEBUG(D_NET, "Receive thread time to exit\n"); break; } - - buffer = we->buffer; - switch(((gmnal_msghdr_t*)buffer)->gmm_type) { - case(GMNAL_SMALL_MESSAGE): - gmnal_pre_receive(gmnalni, we, GMNAL_SMALL_MESSAGE); - break; - default: -#warning better handling - CERROR("Unsupported message type\n"); - gmnal_rx_bad(gmnalni, we); - } - PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); + + /* We're connectionless: simply ignore packets on error */ + + if (gmnal_unpack_msg(gmnalni, rx) == 0) { + + LASSERT (rx->rx_msg->gmm_type == GMNAL_MSG_IMMEDIATE); + (void)lib_parse(gmnalni->gmni_libnal, + &rx->rx_msg->gmm_u.immediate.gmim_hdr, + rx); + } + + gmnal_post_rx(gmnalni, rx); } spin_lock(&gmnalni->gmni_rxthread_flag_lock); - gmnalni->gmni_rxthread_flag/=2; - CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag); + gmnalni->gmni_rxthread_flag /= 2; spin_unlock(&gmnalni->gmni_rxthread_flag_lock); - CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni); + CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni); return 0; } - - -/* - * Start processing a small message receive - * Get here from gmnal_receive_thread - * Hand off to lib_parse, which calls cb_recv - * which hands back to gmnal_small_receive - * Deal with all endian stuff here. - */ void -gmnal_pre_receive(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we, int gmnal_type) +gmnal_post_rx(gmnal_ni_t *gmnalni, gmnal_rx_t *rx) { - gmnal_srxd_t *srxd = NULL; - void *buffer = NULL; - gmnal_msghdr_t *gmnal_msghdr; - ptl_hdr_t *portals_hdr; + CDEBUG(D_NET, "requeueing rx[%p] gmnalni[%p]\n", rx, gmnalni); - CDEBUG(D_NET, "gmnalni [%p], we[%p] type [%d]\n", - gmnalni, we, gmnal_type); - - buffer = we->buffer; - - gmnal_msghdr = (gmnal_msghdr_t*)buffer; - portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t)); + spin_lock(&gmnalni->gmni_gm_lock); + gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, rx->rx_msg, + rx->rx_gmsize, GM_LOW_PRIORITY, 0 ); + spin_unlock(&gmnalni->gmni_gm_lock); +} - CDEBUG(D_NET, "rx_event:: Sender node [%d], Sender Port [%d], " - "type [%d], length [%d], buffer [%p]\n", - we->snode, we->sport, we->type, we->length, buffer); - CDEBUG(D_NET, "gmnal_msghdr:: Sender node [%u], magic [%d], " - "gmnal_type [%d]\n", gmnal_msghdr->gmm_sender_gmid, - gmnal_msghdr->gmm_magic, gmnal_msghdr->gmm_type); - CDEBUG(D_NET, "portals_hdr:: Sender node ["LPD64"], " - "dest_node ["LPD64"]\n", portals_hdr->src_nid, - portals_hdr->dest_nid); +void +gmnal_resume_sending_callback(struct gm_port *gm_port, void *context, + gm_status_t status) +{ + gmnal_tx_t *tx = (gmnal_tx_t*)context; + gmnal_ni_t *gmnalni = tx->tx_gmni; + lib_msg_t *libmsg = tx->tx_libmsg; - /* - * Get a receive descriptor for this message - */ - srxd = gmnal_rxbuffer_to_srxd(gmnalni, buffer); - CDEBUG(D_NET, "Back from gmnal_rxbuffer_to_srxd\n"); - if (!srxd) { - CERROR("Failed to get receive descriptor\n"); - LBUG(); - } + CWARN("status for tx [%p] is [%d][%s]\n", + tx, status, gmnal_gmstatus2str(status)); - srxd->rx_gmni = gmnalni; - srxd->rx_type = gmnal_type; - srxd->rx_nsiov = gmnal_msghdr->gmm_niov; - srxd->rx_sender_gmid = gmnal_msghdr->gmm_sender_gmid; + gmnal_return_tx(gmnalni, tx); + lib_finalize(gmnalni->gmni_libnal, NULL, libmsg, PTL_FAIL); +} - CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n", - buffer+sizeof(gmnal_msghdr_t)); +void +gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, + gm_status_t status) +{ + gmnal_tx_t *tx = (gmnal_tx_t*)context; + gmnal_ni_t *gmnalni = tx->tx_gmni; - (void)lib_parse(gmnalni->gmni_libnal, portals_hdr, srxd); - /* Ignore error; we're connectionless */ + CERROR("status for tx [%p] is [%d][%s]\n", + tx, status, gmnal_gmstatus2str(status)); - gmnal_rx_requeue_buffer(gmnalni, srxd); + gm_resume_sending(gmnalni->gmni_port, tx->tx_gm_priority, + tx->tx_gmlid, gm_port_id, + gmnal_resume_sending_callback, tx); } - - -/* - * After a receive has been processed, - * hang out the receive buffer again. - * This implicitly returns a receive token. - */ -void -gmnal_rx_requeue_buffer(gmnal_ni_t *gmnalni, gmnal_srxd_t *srxd) +void +gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) { - CDEBUG(D_NET, "requeueing srxd[%p] gmnalni[%p]\n", srxd, gmnalni); - - spin_lock(&gmnalni->gmni_gm_lock); - gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, srxd->rx_buffer, - srxd->rx_gmsize, GM_LOW_PRIORITY, 0 ); - spin_unlock(&gmnalni->gmni_gm_lock); -} + gmnal_tx_t *tx = (gmnal_tx_t*)context; + gmnal_ni_t *gmnalni = tx->tx_gmni; + lib_nal_t *libnal = gmnalni->gmni_libnal; + lib_msg_t *libmsg = tx->tx_libmsg; + ptl_err_t rc; + if (!tx) { + CERROR("send completion event for unknown tx\n"); + return; + } -/* - * Handle a bad message - * A bad message is one we don't expect or can't interpret - */ -void -gmnal_rx_bad(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we) -{ - gmnal_srxd_t *srxd = gmnal_rxbuffer_to_srxd(gmnalni, - we->buffer); - if (srxd == NULL) { - CERROR("Can't find a descriptor for this buffer\n"); + switch(status) { + case(GM_SUCCESS): + rc = PTL_OK; + break; + + case(GM_SEND_DROPPED): + rc = PTL_FAIL; + break; + + default: + CERROR("Error %d(%s), nid "LPD64"\n", + status, gmnal_gmstatus2str(status), tx->tx_nid); + + spin_lock(&gmnalni->gmni_gm_lock); + gm_drop_sends(gmnalni->gmni_port, tx->tx_gm_priority, + tx->tx_gmlid, gm_port_id, + gmnal_drop_sends_callback, tx); + spin_unlock(&gmnalni->gmni_gm_lock); return; } - gmnal_rx_requeue_buffer(gmnalni, srxd); + gmnal_return_tx(gmnalni, tx); + lib_finalize(libnal, NULL, libmsg, rc); + return; } - - -/* - * Start a small transmit. - * Use the given send token (and wired transmit buffer). - * Copy headers to wired buffer and initiate gm_send from the wired buffer. - * The callback function informs when the send is complete. - */ ptl_err_t -gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, - gmnal_stxd_t *stxd, int size) +gmnal_post_tx (gmnal_ni_t *gmnalni, gmnal_tx_t *tx, + lib_msg_t *libmsg, ptl_nid_t nid, int nob) { - gmnal_ni_t *gmnalni = (gmnal_ni_t*)libnal->libnal_data; - void *buffer = NULL; - gmnal_msghdr_t *msghdr = NULL; - int tot_size = 0; - gm_status_t gm_status = GM_SUCCESS; - - CDEBUG(D_NET, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] " - "hdr [%p] type [%d] nid ["LPU64"] stxd [%p] " - "size [%d]\n", libnal, private, cookie, hdr, type, - nid, stxd, size); + gm_status_t gm_status; - CDEBUG(D_NET, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n", - hdr->dest_nid, hdr->src_nid); + CDEBUG(D_NET, "send %d bytes to "LPU64"\n", nob, nid); LASSERT ((nid >> 32) == 0); - LASSERT (gmnalni != NULL); spin_lock(&gmnalni->gmni_gm_lock); gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid, - &stxd->tx_gmlid); + &tx->tx_gmlid); spin_unlock(&gmnalni->gmni_gm_lock); if (gm_status != GM_SUCCESS) { CERROR("Failed to obtain local id\n"); - return(PTL_FAIL); + gmnal_return_tx(gmnalni, tx); + return PTL_FAIL; } CDEBUG(D_NET, "Local Node_id is [%u][%x]\n", - stxd->tx_gmlid, stxd->tx_gmlid); + tx->tx_gmlid, tx->tx_gmlid); - stxd->tx_nid = nid; - stxd->tx_cookie = cookie; - stxd->tx_type = GMNAL_SMALL_MESSAGE; - stxd->tx_gm_priority = GM_LOW_PRIORITY; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then send the message, as the data has previously been copied in - * (HP SFS 1380). - */ - buffer = stxd->tx_buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - msghdr->gmm_magic = GMNAL_MAGIC; - msghdr->gmm_type = GMNAL_SMALL_MESSAGE; - msghdr->gmm_sender_gmid = gmnalni->gmni_global_gmid; - CDEBUG(D_NET, "processing msghdr at [%p]\n", buffer); - - buffer += sizeof(gmnal_msghdr_t); - - CDEBUG(D_NET, "processing portals hdr at [%p]\n", buffer); - gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t)); - - buffer += sizeof(ptl_hdr_t); - - CDEBUG(D_NET, "sending\n"); - tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t); - stxd->tx_msg_size = tot_size; + tx->tx_nid = nid; + tx->tx_libmsg = libmsg; + tx->tx_gm_priority = GM_LOW_PRIORITY; + tx->tx_msg_size = nob; CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] " "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] " - "stxd [%p]\n", gmnalni->gmni_port, stxd->tx_buffer, - stxd->tx_gm_size, stxd->tx_msg_size, nid, stxd->tx_gmlid, - stxd); + "tx [%p]\n", gmnalni->gmni_port, tx->tx_msg, + tx->tx_gm_size, tx->tx_msg_size, + tx->tx_nid, tx->tx_gmlid, tx); spin_lock(&gmnalni->gmni_gm_lock); - gm_send_to_peer_with_callback(gmnalni->gmni_port, stxd->tx_buffer, - stxd->tx_gm_size, stxd->tx_msg_size, - stxd->tx_gm_priority, stxd->tx_gmlid, - gmnal_small_tx_callback, (void*)stxd); + gm_send_to_peer_with_callback(gmnalni->gmni_port, tx->tx_msg, + tx->tx_gm_size, tx->tx_msg_size, + tx->tx_gm_priority, tx->tx_gmlid, + gmnal_tx_callback, (void*)tx); spin_unlock(&gmnalni->gmni_gm_lock); - CDEBUG(D_NET, "done\n"); - return(PTL_OK); + return PTL_OK; } - - -/* - * A callback to indicate the small transmit operation is compete - * Check for erros and try to deal with them. - * Call lib_finalise to inform the client application that the send - * is complete and the memory can be reused. - * Return the stxd when finished with it (returns a send token) - */ -void -gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) -{ - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - lib_msg_t *cookie = stxd->tx_cookie; - gmnal_ni_t *gmnalni = stxd->tx_gmni; - lib_nal_t *libnal = gmnalni->gmni_libnal; - - if (!stxd) { - CDEBUG(D_NET, "send completion event for unknown stxd\n"); - return; - } - if (status != GM_SUCCESS) - CERROR("Result of send stxd [%p] is [%s] to ["LPU64"]\n", - stxd, gmnal_gm_error(status), stxd->tx_nid); - - switch(status) { - case(GM_SUCCESS): - break; - - - - case(GM_SEND_DROPPED): - /* - * do a resend on the dropped ones - */ - CERROR("send stxd [%p] dropped, resending\n", context); - spin_lock(&gmnalni->gmni_gm_lock); - gm_send_to_peer_with_callback(gmnalni->gmni_port, - stxd->tx_buffer, - stxd->tx_gm_size, - stxd->tx_msg_size, - stxd->tx_gm_priority, - stxd->tx_gmlid, - gmnal_small_tx_callback, - context); - spin_unlock(&gmnalni->gmni_gm_lock); - return; - case(GM_TIMED_OUT): - case(GM_SEND_TIMED_OUT): - /* - * drop these ones - */ - CDEBUG(D_NET, "calling gm_drop_sends\n"); - spin_lock(&gmnalni->gmni_gm_lock); - gm_drop_sends(gmnalni->gmni_port, stxd->tx_gm_priority, - stxd->tx_gmlid, gm_port_id, - gmnal_drop_sends_callback, context); - spin_unlock(&gmnalni->gmni_gm_lock); - - return; - - - /* - * abort on these ? - */ - case(GM_TRY_AGAIN): - case(GM_INTERRUPTED): - case(GM_FAILURE): - case(GM_INPUT_BUFFER_TOO_SMALL): - case(GM_OUTPUT_BUFFER_TOO_SMALL): - case(GM_BUSY): - case(GM_MEMORY_FAULT): - case(GM_INVALID_PARAMETER): - case(GM_OUT_OF_MEMORY): - case(GM_INVALID_COMMAND): - case(GM_PERMISSION_DENIED): - case(GM_INTERNAL_ERROR): - case(GM_UNATTACHED): - case(GM_UNSUPPORTED_DEVICE): - case(GM_SEND_REJECTED): - case(GM_SEND_TARGET_PORT_CLOSED): - case(GM_SEND_TARGET_NODE_UNREACHABLE): - case(GM_SEND_PORT_CLOSED): - case(GM_NODE_ID_NOT_YET_SET): - case(GM_STILL_SHUTTING_DOWN): - case(GM_CLONE_BUSY): - case(GM_NO_SUCH_DEVICE): - case(GM_ABORTED): - case(GM_INCOMPATIBLE_LIB_AND_DRIVER): - case(GM_UNTRANSLATED_SYSTEM_ERROR): - case(GM_ACCESS_DENIED): - case(GM_NO_DRIVER_SUPPORT): - case(GM_PTE_REF_CNT_OVERFLOW): - case(GM_NOT_SUPPORTED_IN_KERNEL): - case(GM_NOT_SUPPORTED_ON_ARCH): - case(GM_NO_MATCH): - case(GM_USER_ERROR): - case(GM_DATA_CORRUPTED): - case(GM_HARDWARE_FAULT): - case(GM_SEND_ORPHANED): - case(GM_MINOR_OVERFLOW): - case(GM_PAGE_TABLE_FULL): - case(GM_UC_ERROR): - case(GM_INVALID_PORT_NUMBER): - case(GM_DEV_NOT_FOUND): - case(GM_FIRMWARE_NOT_RUNNING): - case(GM_YP_NO_MATCH): - default: - gm_resume_sending(gmnalni->gmni_port, stxd->tx_gm_priority, - stxd->tx_gmlid, gm_port_id, - gmnal_resume_sending_callback, context); - return; - - } - - gmnal_return_stxd(gmnalni, stxd); - lib_finalize(libnal, stxd, cookie, PTL_OK); - return; -} - -/* - * After an error on the port - * call this to allow future sends to complete - */ -void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context, - gm_status_t status) -{ - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_ni_t *gmnalni = stxd->tx_gmni; - - CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context); - gmnal_return_stxd(gmnalni, stxd); - lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL); - return; -} - - -void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, - gm_status_t status) -{ - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_ni_t *gmnalni = stxd->tx_gmni; - - CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context); - if (status == GM_SUCCESS) { - spin_lock(&gmnalni->gmni_gm_lock); - gm_send_to_peer_with_callback(gm_port, stxd->tx_buffer, - stxd->tx_gm_size, - stxd->tx_msg_size, - stxd->tx_gm_priority, - stxd->tx_gmlid, - gmnal_small_tx_callback, - context); - spin_unlock(&gmnalni->gmni_gm_lock); - } else { - CERROR("send_to_peer status for stxd [%p] is " - "[%d][%s]\n", stxd, status, gmnal_gm_error(status)); - /* Recycle the stxd */ - gmnal_return_stxd(gmnalni, stxd); - lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL); - } - - return; -} - - diff --git a/lnet/klnds/gmlnd/gmlnd_module.c b/lnet/klnds/gmlnd/gmlnd_module.c index 7a7a907..446f265 100644 --- a/lnet/klnds/gmlnd/gmlnd_module.c +++ b/lnet/klnds/gmlnd/gmlnd_module.c @@ -28,64 +28,9 @@ * See start_kernel_threads */ int num_rx_threads = -1; -int num_stxds = 5; +int num_txds = 5; int gm_port_id = 4; -int -gmnal_cmd(struct portals_cfg *pcfg, void *private) -{ - gmnal_ni_t *gmnalni = NULL; - char *name = NULL; - int nid = -2; - int gmid; - gm_status_t gm_status; - - - CDEBUG(D_TRACE, "gmnal_cmd [%d] private [%p]\n", - pcfg->pcfg_command, private); - gmnalni = (gmnal_ni_t*)private; - switch(pcfg->pcfg_command) { - /* - * just reuse already defined GET_NID. Should define GMNAL version - */ - case(GMNAL_IOC_GET_GNID): - - PORTAL_ALLOC(name, pcfg->pcfg_plen1); - copy_from_user(name, PCFG_PBUF(pcfg, 1), pcfg->pcfg_plen1); - - spin_lock(&gmnalni->gmni_gm_lock); - //nid = gm_host_name_to_node_id(gmnalni->gmni_port, name); - gm_status = gm_host_name_to_node_id_ex(gmnalni->gmni_port, 0, - name, &nid); - spin_unlock(&gmnalni->gmni_gm_lock); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_NET, "gm_host_name_to_node_id_ex(...host %s) " - "failed[%d]\n", name, gm_status); - return (-1); - } else - CDEBUG(D_NET, "Local node %s id is [%d]\n", name, nid); - spin_lock(&gmnalni->gmni_gm_lock); - gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, - nid, &gmid); - spin_unlock(&gmnalni->gmni_gm_lock); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_NET, "gm_node_id_to_global_id failed[%d]\n", - gm_status); - return(-1); - } - CDEBUG(D_NET, "Global node is is [%u][%x]\n", gmid, gmid); - copy_to_user(PCFG_PBUF(pcfg, 2), &gmid, pcfg->pcfg_plen2); - break; - default: - CDEBUG(D_NET, "gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command); - pcfg->pcfg_nid2 = -1; - } - - - return(0); -} - - static int __init gmnal_load(void) { @@ -117,11 +62,10 @@ gmnal_unload(void) module_init(gmnal_load); - module_exit(gmnal_unload); MODULE_PARM(num_rx_threads, "i"); -MODULE_PARM(num_stxds, "i"); +MODULE_PARM(num_txds, "i"); MODULE_PARM(gm_port_id, "i"); MODULE_AUTHOR("Morgan Doyle"); diff --git a/lnet/klnds/gmlnd/gmlnd_utils.c b/lnet/klnds/gmlnd/gmlnd_utils.c index aee16fb..4a8ee6c 100644 --- a/lnet/klnds/gmlnd/gmlnd_utils.c +++ b/lnet/klnds/gmlnd/gmlnd_utils.c @@ -38,51 +38,81 @@ gmnal_is_rxthread(gmnal_ni_t *gmnalni) return(0); } +gmnal_tx_t * +gmnal_alloc_tx (gmnal_ni_t *gmnalni) +{ + gmnal_tx_t *tx; + void *buffer; + + PORTAL_ALLOC(tx, sizeof(*tx)); + if (tx == NULL) { + CERROR ("Failed to allocate tx\n"); + return NULL; + } + + spin_lock(&gmnalni->gmni_gm_lock); + buffer = gm_dma_malloc(gmnalni->gmni_port, + gmnalni->gmni_msg_size); + spin_unlock(&gmnalni->gmni_gm_lock); + if (buffer == NULL) { + CERROR("Failed to gm_dma_malloc tx buffer size [%d]\n", + gmnalni->gmni_msg_size); + PORTAL_FREE(tx, sizeof(*tx)); + return NULL; + } + + memset(tx, 0, sizeof(*tx)); + tx->tx_msg = (gmnal_msg_t *)buffer; + tx->tx_buffer_size = gmnalni->gmni_msg_size; + tx->tx_gm_size = gm_min_size_for_length(tx->tx_buffer_size); + tx->tx_gmni = gmnalni; + + CDEBUG(D_NET, "Created tx [%p] with buffer [%p], size [%d]\n", + tx, tx->tx_msg, tx->tx_buffer_size); + + return tx; +} + +void +gmnal_free_tx (gmnal_tx_t *tx) +{ + gmnal_ni_t *gmnalni = tx->tx_gmni; + + CDEBUG(D_NET, "Freeing tx [%p] with buffer [%p], size [%d]\n", + tx, tx->tx_msg, tx->tx_buffer_size); +#if 0 + /* We free buffers after we've closed the GM port */ + spin_lock(&gmnalni->gmni_gm_lock); + gm_dma_free(gmnalni->gmni_port, tx->tx_msg); + spin_unlock(&gmnalni->gmni_gm_lock); +#endif + PORTAL_FREE(tx, sizeof(*tx)); +} -/* - * Allocate tx descriptors/tokens (large and small) - * allocate a number of small tx buffers and register with GM - * so they are wired and set up for DMA. This is a costly operation. - * Also allocate a corrosponding descriptor to keep track of - * the buffer. - * Put all small descriptors on singly linked list to be available to send - * function. - * Allocate the rest of the available tx tokens for large messages. These will be - * used to do gm_gets in gmnal_copyiov - */ int -gmnal_alloc_txd(gmnal_ni_t *gmnalni) +gmnal_alloc_txs(gmnal_ni_t *gmnalni) { int ntx; - int nstx; - int nrxt_stx; + int ntxcred; + int nrxt_tx; int i; - gmnal_stxd_t *txd; - void *txbuffer; + gmnal_tx_t *tx; CDEBUG(D_TRACE, "gmnal_alloc_small tx\n"); + /* get total number of transmit tokens */ spin_lock(&gmnalni->gmni_gm_lock); - /* - * total number of transmit tokens - */ - ntx = gm_num_send_tokens(gmnalni->gmni_port); + ntxcred = gm_num_send_tokens(gmnalni->gmni_port); spin_unlock(&gmnalni->gmni_gm_lock); - CDEBUG(D_NET, "total number of send tokens available is [%d]\n", ntx); + CDEBUG(D_NET, "total number of send tokens available is [%d]\n", + ntxcred); - /* - * allocate a number for small sends - * num_stxds from gmnal_module.c - */ - nstx = num_stxds; - /* - * give the rest to the receive threads - */ - nrxt_stx = num_stxds + 1; + ntx = num_txds; + nrxt_tx = num_txds + 1; - if (nstx + nrxt_stx > ntx) { + if (ntx + nrxt_tx > ntxcred) { CERROR ("Asked for %d + %d tx credits, but only %d available\n", - nstx, nrxt_stx, ntx); + ntx, nrxt_tx, ntxcred); return -ENOMEM; } @@ -92,184 +122,133 @@ gmnal_alloc_txd(gmnal_ni_t *gmnalni) * someone returning a stxd will release the semaphore and wake you) * When token is obtained acquire the spinlock to manipulate the * list */ - sema_init(&gmnalni->gmni_stxd_token, nstx); - spin_lock_init(&gmnalni->gmni_stxd_lock); + sema_init(&gmnalni->gmni_tx_token, ntx); + spin_lock_init(&gmnalni->gmni_tx_lock); + LASSERT (gmnalni->gmni_tx == NULL); + + for (i = 0; i <= ntx; i++) { + tx = gmnal_alloc_tx(gmnalni); + if (tx == NULL) { + CERROR("Failed to create tx %d\n", i); + return -ENOMEM; + } + + tx->tx_rxt = 0; + tx->tx_next = gmnalni->gmni_tx; + gmnalni->gmni_tx = tx; + } - sema_init(&gmnalni->gmni_rxt_stxd_token, nrxt_stx); - spin_lock_init(&gmnalni->gmni_rxt_stxd_lock); + sema_init(&gmnalni->gmni_rxt_tx_token, nrxt_tx); + spin_lock_init(&gmnalni->gmni_rxt_tx_lock); + LASSERT (gmnalni->gmni_rxt_tx == NULL); - for (i=0; i<=nstx; i++) { - PORTAL_ALLOC(txd, sizeof(*txd)); - if (txd == NULL) { - CERROR("Failed to malloc txd [%d]\n", i); - return -ENOMEM; - } - spin_lock(&gmnalni->gmni_gm_lock); - txbuffer = gm_dma_malloc(gmnalni->gmni_port, - gmnalni->gmni_small_msg_size); - spin_unlock(&gmnalni->gmni_gm_lock); - if (txbuffer == NULL) { - CERROR("Failed to gm_dma_malloc txbuffer [%d], " - "size [%d]\n", i, gmnalni->gmni_small_msg_size); - PORTAL_FREE(txd, sizeof(*txd)); - return -ENOMEM; - } - txd->tx_buffer = txbuffer; - txd->tx_buffer_size = gmnalni->gmni_small_msg_size; - txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size); - txd->tx_gmni = gmnalni; - txd->tx_rxt = 0; - - txd->tx_next = gmnalni->gmni_stxd; - gmnalni->gmni_stxd = txd; - CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size); - } + for (i = 0; i <= nrxt_tx; i++) { + tx = gmnal_alloc_tx(gmnalni); + if (tx == NULL) { + CERROR("Failed to create tx %d + %d\n", ntx, i); + return -ENOMEM; + } - for (i=0; i<=nrxt_stx; i++) { - PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t)); - if (!txd) { - CERROR("Failed to malloc txd [%d]\n", i); - return -ENOMEM; - } - spin_lock(&gmnalni->gmni_gm_lock); - txbuffer = gm_dma_malloc(gmnalni->gmni_port, - gmnalni->gmni_small_msg_size); - spin_unlock(&gmnalni->gmni_gm_lock); - if (!txbuffer) { - CERROR("Failed to gm_dma_malloc txbuffer [%d]," - " size [%d]\n",i, gmnalni->gmni_small_msg_size); - PORTAL_FREE(txd, sizeof(gmnal_stxd_t)); - return -ENOMEM; - } - txd->tx_buffer = txbuffer; - txd->tx_buffer_size = gmnalni->gmni_small_msg_size; - txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size); - txd->tx_gmni = gmnalni; - txd->tx_rxt = 1; - - txd->tx_next = gmnalni->gmni_rxt_stxd; - gmnalni->gmni_rxt_stxd = txd; - CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size); + tx->tx_rxt = 1; + tx->tx_next = gmnalni->gmni_rxt_tx; + gmnalni->gmni_rxt_tx = tx; } return 0; } -/* Free the list of wired and gm_registered small tx buffers and - * the tx descriptors that go along with them. - */ void -gmnal_free_txd(gmnal_ni_t *gmnalni) +gmnal_free_txs(gmnal_ni_t *gmnalni) { - gmnal_stxd_t *txd; - gmnal_stxd_t *_txd; - - CDEBUG(D_TRACE, "gmnal_free_small tx\n"); + gmnal_tx_t *tx; - txd = gmnalni->gmni_stxd; - while(txd != NULL) { - CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size); - _txd = txd; - txd = txd->tx_next; - spin_lock(&gmnalni->gmni_gm_lock); - gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(_txd, sizeof(gmnal_stxd_t)); + while ((tx = gmnalni->gmni_tx) != NULL) { + gmnalni->gmni_tx = tx->tx_next; + gmnal_free_tx (tx); } - txd = gmnalni->gmni_rxt_stxd; - while(txd) { - CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size); - _txd = txd; - txd = txd->tx_next; - spin_lock(&gmnalni->gmni_gm_lock); - gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer); - spin_unlock(&gmnalni->gmni_gm_lock); - PORTAL_FREE(_txd, sizeof(gmnal_stxd_t)); + while ((tx = gmnalni->gmni_rxt_tx) != NULL) { + gmnalni->gmni_rxt_tx = tx->tx_next; + gmnal_free_tx (tx); } } /* - * Get a txd from the list + * Get a tx from the list * This get us a wired and gm_registered small tx buffer. * This implicitly gets us a send token also. */ -gmnal_stxd_t * -gmnal_get_stxd(gmnal_ni_t *gmnalni, int block) +gmnal_tx_t * +gmnal_get_tx(gmnal_ni_t *gmnalni, int block) { - gmnal_stxd_t *txd = NULL; + gmnal_tx_t *tx = NULL; pid_t pid = current->pid; - CDEBUG(D_TRACE, "gmnal_get_stxd gmnalni [%p] block[%d] pid [%d]\n", + CDEBUG(D_TRACE, "gmnal_get_tx gmnalni [%p] block[%d] pid [%d]\n", gmnalni, block, pid); if (gmnal_is_rxthread(gmnalni)) { CDEBUG(D_NET, "RXTHREAD Attempting to get token\n"); - down(&gmnalni->gmni_rxt_stxd_token); - spin_lock(&gmnalni->gmni_rxt_stxd_lock); - txd = gmnalni->gmni_rxt_stxd; - gmnalni->gmni_rxt_stxd = txd->tx_next; - spin_unlock(&gmnalni->gmni_rxt_stxd_lock); + down(&gmnalni->gmni_rxt_tx_token); + spin_lock(&gmnalni->gmni_rxt_tx_lock); + tx = gmnalni->gmni_rxt_tx; + gmnalni->gmni_rxt_tx = tx->tx_next; + spin_unlock(&gmnalni->gmni_rxt_tx_lock); CDEBUG(D_NET, "RXTHREAD got [%p], head is [%p]\n", - txd, gmnalni->gmni_rxt_stxd); - txd->tx_kniov = 0; - txd->tx_rxt = 1; + tx, gmnalni->gmni_rxt_tx); + tx->tx_rxt = 1; } else { if (block) { CDEBUG(D_NET, "Attempting to get token\n"); - down(&gmnalni->gmni_stxd_token); + down(&gmnalni->gmni_tx_token); CDEBUG(D_PORTALS, "Got token\n"); } else { - if (down_trylock(&gmnalni->gmni_stxd_token)) { + if (down_trylock(&gmnalni->gmni_tx_token)) { CERROR("can't get token\n"); return(NULL); } } - spin_lock(&gmnalni->gmni_stxd_lock); - txd = gmnalni->gmni_stxd; - gmnalni->gmni_stxd = txd->tx_next; - spin_unlock(&gmnalni->gmni_stxd_lock); - CDEBUG(D_NET, "got [%p], head is [%p]\n", txd, - gmnalni->gmni_stxd); - txd->tx_kniov = 0; - } /* general txd get */ - return(txd); + spin_lock(&gmnalni->gmni_tx_lock); + tx = gmnalni->gmni_tx; + gmnalni->gmni_tx = tx->tx_next; + spin_unlock(&gmnalni->gmni_tx_lock); + CDEBUG(D_NET, "got [%p], head is [%p]\n", tx, + gmnalni->gmni_tx); + } /* general tx get */ + + return tx; } /* - * Return a txd to the list + * Return a tx to the list */ void -gmnal_return_stxd(gmnal_ni_t *gmnalni, gmnal_stxd_t *txd) +gmnal_return_tx(gmnal_ni_t *gmnalni, gmnal_tx_t *tx) { - CDEBUG(D_TRACE, "gmnalni [%p], txd[%p] rxt[%d]\n", gmnalni, - txd, txd->tx_rxt); + CDEBUG(D_TRACE, "gmnalni [%p], tx[%p] rxt[%d]\n", gmnalni, + tx, tx->tx_rxt); /* * this transmit descriptor is * for the rxthread */ - if (txd->tx_rxt) { - spin_lock(&gmnalni->gmni_rxt_stxd_lock); - txd->tx_next = gmnalni->gmni_rxt_stxd; - gmnalni->gmni_rxt_stxd = txd; - spin_unlock(&gmnalni->gmni_rxt_stxd_lock); - up(&gmnalni->gmni_rxt_stxd_token); - CDEBUG(D_NET, "Returned stxd to rxthread list\n"); + if (tx->tx_rxt) { + spin_lock(&gmnalni->gmni_rxt_tx_lock); + tx->tx_next = gmnalni->gmni_rxt_tx; + gmnalni->gmni_rxt_tx = tx; + spin_unlock(&gmnalni->gmni_rxt_tx_lock); + up(&gmnalni->gmni_rxt_tx_token); + CDEBUG(D_NET, "Returned tx to rxthread list\n"); } else { - spin_lock(&gmnalni->gmni_stxd_lock); - txd->tx_next = gmnalni->gmni_stxd; - gmnalni->gmni_stxd = txd; - spin_unlock(&gmnalni->gmni_stxd_lock); - up(&gmnalni->gmni_stxd_token); - CDEBUG(D_NET, "Returned stxd to general list\n"); + spin_lock(&gmnalni->gmni_tx_lock); + tx->tx_next = gmnalni->gmni_tx; + gmnalni->gmni_tx = tx; + spin_unlock(&gmnalni->gmni_tx_lock); + up(&gmnalni->gmni_tx_token); + CDEBUG(D_NET, "Returned tx to general list\n"); } return; } @@ -284,561 +263,155 @@ gmnal_return_stxd(gmnal_ni_t *gmnalni, gmnal_stxd_t *txd) * receive thread. */ int -gmnal_alloc_srxd(gmnal_ni_t *gmnalni) +gmnal_alloc_rxs (gmnal_ni_t *gmnalni) { - int nrx = 0, nsrx = 0, i = 0; - gmnal_srxd_t *rxd = NULL; - void *rxbuffer = NULL; + int nrxcred; + int nrx; + int i; + gmnal_rx_t *rxd; + void *rxbuffer; CDEBUG(D_TRACE, "gmnal_alloc_small rx\n"); spin_lock(&gmnalni->gmni_gm_lock); - nrx = gm_num_receive_tokens(gmnalni->gmni_port); + nrxcred = gm_num_receive_tokens(gmnalni->gmni_port); spin_unlock(&gmnalni->gmni_gm_lock); CDEBUG(D_NET, "total number of receive tokens available is [%d]\n", - nrx); + nrxcred); - nsrx = nrx/2; - nsrx = 12; - /* - * make the number of rxds twice our total - * number of stxds plus 1 - */ - nsrx = num_stxds*2 + 2; - - CDEBUG(D_NET, "Allocated [%d] receive tokens to small messages\n", - nsrx); + nrx = num_txds*2 + 2; + if (nrx > nrxcred) { + CERROR("Can't allocate %d rx credits: (%d available)\n", + nrx, nrxcred); + return -ENOMEM; + } + CDEBUG(D_NET, "Allocated [%d] receive tokens to small messages\n", nrx); spin_lock(&gmnalni->gmni_gm_lock); - gmnalni->gmni_srxd_hash = gm_create_hash(gm_hash_compare_ptrs, - gm_hash_hash_ptr, 0, 0, nsrx, 0); + gmnalni->gmni_rx_hash = gm_create_hash(gm_hash_compare_ptrs, + gm_hash_hash_ptr, 0, 0, nrx, 0); spin_unlock(&gmnalni->gmni_gm_lock); - if (!gmnalni->gmni_srxd_hash) { - CERROR("Failed to create hash table\n"); - return -ENOMEM; + if (gmnalni->gmni_rx_hash == NULL) { + CERROR("Failed to create hash table\n"); + return -ENOMEM; } - for (i=0; i<=nsrx; i++) { - PORTAL_ALLOC(rxd, sizeof(gmnal_srxd_t)); - if (!rxd) { + LASSERT (gmnalni->gmni_rx == NULL); + + for (i=0; i <= nrx; i++) { + + PORTAL_ALLOC(rxd, sizeof(*rxd)); + if (rxd == NULL) { CERROR("Failed to malloc rxd [%d]\n", i); return -ENOMEM; } spin_lock(&gmnalni->gmni_gm_lock); rxbuffer = gm_dma_malloc(gmnalni->gmni_port, - gmnalni->gmni_small_msg_size); + gmnalni->gmni_msg_size); spin_unlock(&gmnalni->gmni_gm_lock); - if (!rxbuffer) { + if (rxbuffer == NULL) { CERROR("Failed to gm_dma_malloc rxbuffer [%d], " - "size [%d]\n",i ,gmnalni->gmni_small_msg_size); - PORTAL_FREE(rxd, sizeof(gmnal_srxd_t)); + "size [%d]\n",i ,gmnalni->gmni_msg_size); + PORTAL_FREE(rxd, sizeof(*rxd)); return -ENOMEM; } - rxd->rx_buffer = rxbuffer; - rxd->rx_size = gmnalni->gmni_small_msg_size; + rxd->rx_msg = (gmnal_msg_t *)rxbuffer; + rxd->rx_size = gmnalni->gmni_msg_size; rxd->rx_gmsize = gm_min_size_for_length(rxd->rx_size); - if (gm_hash_insert(gmnalni->gmni_srxd_hash, - (void*)rxbuffer, (void*)rxd)) { + rxd->rx_next = gmnalni->gmni_rx; + gmnalni->gmni_rx = rxd; + if (gm_hash_insert(gmnalni->gmni_rx_hash, + (void*)rxbuffer, (void*)rxd)) { CERROR("failed to create hash entry rxd[%p] " "for rxbuffer[%p]\n", rxd, rxbuffer); return -ENOMEM; } - rxd->rx_next = gmnalni->gmni_srxd; - gmnalni->gmni_srxd = rxd; CDEBUG(D_NET, "Registered rxd [%p] with buffer [%p], " - "size [%d]\n", rxd, rxd->rx_buffer, rxd->rx_size); + "size [%d]\n", rxd, rxd->rx_msg, rxd->rx_size); } return 0; } - - -/* Free the list of wired and gm_registered small rx buffers and the - * rx descriptors that go along with them. - */ void -gmnal_free_srxd(gmnal_ni_t *gmnalni) +gmnal_free_rxs(gmnal_ni_t *gmnalni) { - gmnal_srxd_t *rxd = gmnalni->gmni_srxd, *_rxd = NULL; + gmnal_rx_t *rx; CDEBUG(D_TRACE, "gmnal_free_small rx\n"); - while(rxd) { - CDEBUG(D_NET, "Freeing rxd [%p] buffer [%p], size [%d]\n", - rxd, rxd->rx_buffer, rxd->rx_size); - _rxd = rxd; - rxd = rxd->rx_next; + while ((rx = gmnalni->gmni_rx) != NULL) { + gmnalni->gmni_rx = rx->rx_next; + CDEBUG(D_NET, "Freeing rxd [%p] buffer [%p], size [%d]\n", + rx, rx->rx_msg, rx->rx_size); +#if 0 + /* We free buffers after we've shutdown the GM port */ spin_lock(&gmnalni->gmni_gm_lock); - gm_dma_free(gmnalni->gmni_port, _rxd->rx_buffer); + gm_dma_free(gmnalni->gmni_port, _rxd->rx_msg); spin_unlock(&gmnalni->gmni_gm_lock); - - PORTAL_FREE(_rxd, sizeof(gmnal_srxd_t)); +#endif + PORTAL_FREE(rx, sizeof(*rx)); } - return; -} - -/* - * Given a pointer to a srxd find - * the relevant descriptor for it - * This is done by searching a hash - * list that is created when the srxd's - * are created - */ -gmnal_srxd_t * -gmnal_rxbuffer_to_srxd(gmnal_ni_t *gmnalni, void *rxbuffer) -{ - gmnal_srxd_t *srxd = NULL; - CDEBUG(D_TRACE, "gmnalni [%p], rxbuffer [%p]\n", gmnalni, rxbuffer); - srxd = gm_hash_find(gmnalni->gmni_srxd_hash, rxbuffer); - CDEBUG(D_NET, "srxd is [%p]\n", srxd); - return(srxd); +#if 0 + /* see above */ + if (gmnalni->gmni_rx_hash != NULL) { + spin_lock(&gmnalni->gmni_gm_lock); + gm_destroy_hash(gmnalni->gmni_rx_hash); + spin_unlock(&gmnalni->gmni_gm_lock); + } +#endif } - void gmnal_stop_rxthread(gmnal_ni_t *gmnalni) { - int delay = 30; - - - - CDEBUG(D_TRACE, "Attempting to stop rxthread gmnalni [%p]\n", - gmnalni); + int count = 2; + int i; gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_STOP; - gmnal_remove_rxtwe(gmnalni); - /* - * kick the thread - */ - up(&gmnalni->gmni_rxtwe_wait); + for (i = 0; i < num_rx_threads; i++) + up(&gmnalni->gmni_rxq_wait); - while(gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET && delay--) { + while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) { CDEBUG(D_NET, "gmnal_stop_rxthread sleeping\n"); gmnal_yield(1); - up(&gmnalni->gmni_rxtwe_wait); - } - if (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) { - CERROR("I don't know how to wake the thread\n"); - } else { - CDEBUG(D_NET, "rx thread seems to have stopped\n"); + count++; + if ((count & (count - 1)) == 0) + CWARN("Waiting for rxthreads to stop\n"); } } void gmnal_stop_ctthread(gmnal_ni_t *gmnalni) { - int delay = 15; - + int count = 2; - - CDEBUG(D_TRACE, "Attempting to stop ctthread gmnalni [%p]\n", - gmnalni); - gmnalni->gmni_ctthread_flag = GMNAL_THREAD_STOP; + spin_lock(&gmnalni->gmni_gm_lock); gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 10, NULL, NULL); spin_unlock(&gmnalni->gmni_gm_lock); - while(gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP && delay--) { + while (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) { CDEBUG(D_NET, "gmnal_stop_ctthread sleeping\n"); gmnal_yield(1); - } - - if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) { - CERROR("I DON'T KNOW HOW TO WAKE THE THREAD\n"); - } else { - CDEBUG(D_NET, "CT THREAD SEEMS TO HAVE STOPPED\n"); + count++; + if ((count & (count - 1)) == 0) + CWARN("Waiting for ctthread to stop\n"); } } - - -char * -gmnal_gm_error(gm_status_t status) -{ - return(gm_strerror(status)); - - switch(status) { - case(GM_SUCCESS): - return("SUCCESS"); - case(GM_FAILURE): - return("FAILURE"); - case(GM_INPUT_BUFFER_TOO_SMALL): - return("INPUT_BUFFER_TOO_SMALL"); - case(GM_OUTPUT_BUFFER_TOO_SMALL): - return("OUTPUT_BUFFER_TOO_SMALL"); - case(GM_TRY_AGAIN ): - return("TRY_AGAIN"); - case(GM_BUSY): - return("BUSY"); - case(GM_MEMORY_FAULT): - return("MEMORY_FAULT"); - case(GM_INTERRUPTED): - return("INTERRUPTED"); - case(GM_INVALID_PARAMETER): - return("INVALID_PARAMETER"); - case(GM_OUT_OF_MEMORY): - return("OUT_OF_MEMORY"); - case(GM_INVALID_COMMAND): - return("INVALID_COMMAND"); - case(GM_PERMISSION_DENIED): - return("PERMISSION_DENIED"); - case(GM_INTERNAL_ERROR): - return("INTERNAL_ERROR"); - case(GM_UNATTACHED): - return("UNATTACHED"); - case(GM_UNSUPPORTED_DEVICE): - return("UNSUPPORTED_DEVICE"); - case(GM_SEND_TIMED_OUT): - return("GM_SEND_TIMEDOUT"); - case(GM_SEND_REJECTED): - return("GM_SEND_REJECTED"); - case(GM_SEND_TARGET_PORT_CLOSED): - return("GM_SEND_TARGET_PORT_CLOSED"); - case(GM_SEND_TARGET_NODE_UNREACHABLE): - return("GM_SEND_TARGET_NODE_UNREACHABLE"); - case(GM_SEND_DROPPED): - return("GM_SEND_DROPPED"); - case(GM_SEND_PORT_CLOSED): - return("GM_SEND_PORT_CLOSED"); - case(GM_NODE_ID_NOT_YET_SET): - return("GM_NODE_ID_NOT_YET_SET"); - case(GM_STILL_SHUTTING_DOWN): - return("GM_STILL_SHUTTING_DOWN"); - case(GM_CLONE_BUSY): - return("GM_CLONE_BUSY"); - case(GM_NO_SUCH_DEVICE): - return("GM_NO_SUCH_DEVICE"); - case(GM_ABORTED): - return("GM_ABORTED"); - case(GM_INCOMPATIBLE_LIB_AND_DRIVER): - return("GM_INCOMPATIBLE_LIB_AND_DRIVER"); - case(GM_UNTRANSLATED_SYSTEM_ERROR): - return("GM_UNTRANSLATED_SYSTEM_ERROR"); - case(GM_ACCESS_DENIED): - return("GM_ACCESS_DENIED"); - - -/* - * These ones are in the docs but aren't in the header file - case(GM_DEV_NOT_FOUND): - return("GM_DEV_NOT_FOUND"); - case(GM_INVALID_PORT_NUMBER): - return("GM_INVALID_PORT_NUMBER"); - case(GM_UC_ERROR): - return("GM_US_ERROR"); - case(GM_PAGE_TABLE_FULL): - return("GM_PAGE_TABLE_FULL"); - case(GM_MINOR_OVERFLOW): - return("GM_MINOR_OVERFLOW"); - case(GM_SEND_ORPHANED): - return("GM_SEND_ORPHANED"); - case(GM_HARDWARE_FAULT): - return("GM_HARDWARE_FAULT"); - case(GM_DATA_CORRUPTED): - return("GM_DATA_CORRUPTED"); - case(GM_TIMED_OUT): - return("GM_TIMED_OUT"); - case(GM_USER_ERROR): - return("GM_USER_ERROR"); - case(GM_NO_MATCH): - return("GM_NOMATCH"); - case(GM_NOT_SUPPORTED_IN_KERNEL): - return("GM_NOT_SUPPORTED_IN_KERNEL"); - case(GM_NOT_SUPPORTED_ON_ARCH): - return("GM_NOT_SUPPORTED_ON_ARCH"); - case(GM_PTE_REF_CNT_OVERFLOW): - return("GM_PTR_REF_CNT_OVERFLOW"); - case(GM_NO_DRIVER_SUPPORT): - return("GM_NO_DRIVER_SUPPORT"); - case(GM_FIRMWARE_NOT_RUNNING): - return("GM_FIRMWARE_NOT_RUNNING"); - - * These ones are in the docs but aren't in the header file - */ - default: - return("UNKNOWN GM ERROR CODE"); - } -} - - -char * -gmnal_rxevent(gm_recv_event_t *ev) -{ - short event; - event = GM_RECV_EVENT_TYPE(ev); - switch(event) { - case(GM_NO_RECV_EVENT): - return("GM_NO_RECV_EVENT"); - case(GM_SENDS_FAILED_EVENT): - return("GM_SEND_FAILED_EVENT"); - case(GM_ALARM_EVENT): - return("GM_ALARM_EVENT"); - case(GM_SENT_EVENT): - return("GM_SENT_EVENT"); - case(_GM_SLEEP_EVENT): - return("_GM_SLEEP_EVENT"); - case(GM_RAW_RECV_EVENT): - return("GM_RAW_RECV_EVENT"); - case(GM_BAD_SEND_DETECTED_EVENT): - return("GM_BAD_SEND_DETECTED_EVENT"); - case(GM_SEND_TOKEN_VIOLATION_EVENT): - return("GM_SEND_TOKEN_VIOLATION_EVENT"); - case(GM_RECV_TOKEN_VIOLATION_EVENT): - return("GM_RECV_TOKEN_VIOLATION_EVENT"); - case(GM_BAD_RECV_TOKEN_EVENT): - return("GM_BAD_RECV_TOKEN_EVENT"); - case(GM_ALARM_VIOLATION_EVENT): - return("GM_ALARM_VIOLATION_EVENT"); - case(GM_RECV_EVENT): - return("GM_RECV_EVENT"); - case(GM_HIGH_RECV_EVENT): - return("GM_HIGH_RECV_EVENT"); - case(GM_PEER_RECV_EVENT): - return("GM_PEER_RECV_EVENT"); - case(GM_HIGH_PEER_RECV_EVENT): - return("GM_HIGH_PEER_RECV_EVENT"); - case(GM_FAST_RECV_EVENT): - return("GM_FAST_RECV_EVENT"); - case(GM_FAST_HIGH_RECV_EVENT): - return("GM_FAST_HIGH_RECV_EVENT"); - case(GM_FAST_PEER_RECV_EVENT): - return("GM_FAST_PEER_RECV_EVENT"); - case(GM_FAST_HIGH_PEER_RECV_EVENT): - return("GM_FAST_HIGH_PEER_RECV_EVENT"); - case(GM_REJECTED_SEND_EVENT): - return("GM_REJECTED_SEND_EVENT"); - case(GM_ORPHANED_SEND_EVENT): - return("GM_ORPHANED_SEND_EVENT"); - case(GM_BAD_RESEND_DETECTED_EVENT): - return("GM_BAD_RESEND_DETETED_EVENT"); - case(GM_DROPPED_SEND_EVENT): - return("GM_DROPPED_SEND_EVENT"); - case(GM_BAD_SEND_VMA_EVENT): - return("GM_BAD_SEND_VMA_EVENT"); - case(GM_BAD_RECV_VMA_EVENT): - return("GM_BAD_RECV_VMA_EVENT"); - case(_GM_FLUSHED_ALARM_EVENT): - return("GM_FLUSHED_ALARM_EVENT"); - case(GM_SENT_TOKENS_EVENT): - return("GM_SENT_TOKENS_EVENTS"); - case(GM_IGNORE_RECV_EVENT): - return("GM_IGNORE_RECV_EVENT"); - case(GM_ETHERNET_RECV_EVENT): - return("GM_ETHERNET_RECV_EVENT"); - case(GM_NEW_NO_RECV_EVENT): - return("GM_NEW_NO_RECV_EVENT"); - case(GM_NEW_SENDS_FAILED_EVENT): - return("GM_NEW_SENDS_FAILED_EVENT"); - case(GM_NEW_ALARM_EVENT): - return("GM_NEW_ALARM_EVENT"); - case(GM_NEW_SENT_EVENT): - return("GM_NEW_SENT_EVENT"); - case(_GM_NEW_SLEEP_EVENT): - return("GM_NEW_SLEEP_EVENT"); - case(GM_NEW_RAW_RECV_EVENT): - return("GM_NEW_RAW_RECV_EVENT"); - case(GM_NEW_BAD_SEND_DETECTED_EVENT): - return("GM_NEW_BAD_SEND_DETECTED_EVENT"); - case(GM_NEW_SEND_TOKEN_VIOLATION_EVENT): - return("GM_NEW_SEND_TOKEN_VIOLATION_EVENT"); - case(GM_NEW_RECV_TOKEN_VIOLATION_EVENT): - return("GM_NEW_RECV_TOKEN_VIOLATION_EVENT"); - case(GM_NEW_BAD_RECV_TOKEN_EVENT): - return("GM_NEW_BAD_RECV_TOKEN_EVENT"); - case(GM_NEW_ALARM_VIOLATION_EVENT): - return("GM_NEW_ALARM_VIOLATION_EVENT"); - case(GM_NEW_RECV_EVENT): - return("GM_NEW_RECV_EVENT"); - case(GM_NEW_HIGH_RECV_EVENT): - return("GM_NEW_HIGH_RECV_EVENT"); - case(GM_NEW_PEER_RECV_EVENT): - return("GM_NEW_PEER_RECV_EVENT"); - case(GM_NEW_HIGH_PEER_RECV_EVENT): - return("GM_NEW_HIGH_PEER_RECV_EVENT"); - case(GM_NEW_FAST_RECV_EVENT): - return("GM_NEW_FAST_RECV_EVENT"); - case(GM_NEW_FAST_HIGH_RECV_EVENT): - return("GM_NEW_FAST_HIGH_RECV_EVENT"); - case(GM_NEW_FAST_PEER_RECV_EVENT): - return("GM_NEW_FAST_PEER_RECV_EVENT"); - case(GM_NEW_FAST_HIGH_PEER_RECV_EVENT): - return("GM_NEW_FAST_HIGH_PEER_RECV_EVENT"); - case(GM_NEW_REJECTED_SEND_EVENT): - return("GM_NEW_REJECTED_SEND_EVENT"); - case(GM_NEW_ORPHANED_SEND_EVENT): - return("GM_NEW_ORPHANED_SEND_EVENT"); - case(_GM_NEW_PUT_NOTIFICATION_EVENT): - return("_GM_NEW_PUT_NOTIFICATION_EVENT"); - case(GM_NEW_FREE_SEND_TOKEN_EVENT): - return("GM_NEW_FREE_SEND_TOKEN_EVENT"); - case(GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT): - return("GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT"); - case(GM_NEW_BAD_RESEND_DETECTED_EVENT): - return("GM_NEW_BAD_RESEND_DETECTED_EVENT"); - case(GM_NEW_DROPPED_SEND_EVENT): - return("GM_NEW_DROPPED_SEND_EVENT"); - case(GM_NEW_BAD_SEND_VMA_EVENT): - return("GM_NEW_BAD_SEND_VMA_EVENT"); - case(GM_NEW_BAD_RECV_VMA_EVENT): - return("GM_NEW_BAD_RECV_VMA_EVENT"); - case(_GM_NEW_FLUSHED_ALARM_EVENT): - return("GM_NEW_FLUSHED_ALARM_EVENT"); - case(GM_NEW_SENT_TOKENS_EVENT): - return("GM_NEW_SENT_TOKENS_EVENT"); - case(GM_NEW_IGNORE_RECV_EVENT): - return("GM_NEW_IGNORE_RECV_EVENT"); - case(GM_NEW_ETHERNET_RECV_EVENT): - return("GM_NEW_ETHERNET_RECV_EVENT"); - default: - return("Unknown Recv event"); -#if 0 - case(/* _GM_PUT_NOTIFICATION_EVENT */ - case(/* GM_FREE_SEND_TOKEN_EVENT */ - case(/* GM_FREE_HIGH_SEND_TOKEN_EVENT */ -#endif - } -} - - -void -gmnal_yield(int delay) -{ - set_current_state(TASK_INTERRUPTIBLE); - schedule_timeout(delay); -} - -int -gmnal_is_small_msg(gmnal_ni_t *gmnalni, int niov, struct iovec *iov, - int len) -{ - - CDEBUG(D_TRACE, "len [%d] limit[%d]\n", len, - gmnalni->gmni_small_msg_size); - - if ((len + sizeof(ptl_hdr_t) + sizeof(gmnal_msghdr_t)) - < gmnalni->gmni_small_msg_size) { - - CDEBUG(D_NET, "Yep, small message\n"); - return(1); - } else { - CERROR("No, not small message\n"); - /* - * could be made up of lots of little ones ! - */ - return(0); - } - -} - -/* - * extract info from the receive event. - * Have to do this before the next call to gm_receive - * Deal with all endian stuff here. - * Then stick work entry on list where rxthreads - * can get it to complete the receive - */ -int -gmnal_add_rxtwe(gmnal_ni_t *gmnalni, gm_recv_t *recv) -{ - gmnal_rxtwe_t *we = NULL; - - CDEBUG(D_NET, "adding entry to list\n"); - - PORTAL_ALLOC(we, sizeof(gmnal_rxtwe_t)); - if (!we) { - CERROR("failed to malloc\n"); - return -ENOMEM; - } - we->buffer = gm_ntohp(recv->buffer); - we->snode = (int)gm_ntoh_u16(recv->sender_node_id); - we->sport = (int)gm_ntoh_u8(recv->sender_port_id); - we->type = (int)gm_ntoh_u8(recv->type); - we->length = (int)gm_ntohl(recv->length); - - spin_lock(&gmnalni->gmni_rxtwe_lock); - if (gmnalni->gmni_rxtwe_tail) { - gmnalni->gmni_rxtwe_tail->next = we; - } else { - gmnalni->gmni_rxtwe_head = we; - gmnalni->gmni_rxtwe_tail = we; - } - gmnalni->gmni_rxtwe_tail = we; - spin_unlock(&gmnalni->gmni_rxtwe_lock); - - up(&gmnalni->gmni_rxtwe_wait); - return 0; -} - -void -gmnal_remove_rxtwe(gmnal_ni_t *gmnalni) -{ - gmnal_rxtwe_t *_we, *we = gmnalni->gmni_rxtwe_head; - - CDEBUG(D_NET, "removing all work list entries\n"); - - spin_lock(&gmnalni->gmni_rxtwe_lock); - CDEBUG(D_NET, "Got lock\n"); - while (we) { - _we = we; - we = we->next; - PORTAL_FREE(_we, sizeof(gmnal_rxtwe_t)); - } - spin_unlock(&gmnalni->gmni_rxtwe_lock); - gmnalni->gmni_rxtwe_head = NULL; - gmnalni->gmni_rxtwe_tail = NULL; -} - -gmnal_rxtwe_t * -gmnal_get_rxtwe(gmnal_ni_t *gmnalni) -{ - gmnal_rxtwe_t *we = NULL; - - CDEBUG(D_NET, "Getting entry to list\n"); - - do { - while(down_interruptible(&gmnalni->gmni_rxtwe_wait) != 0) - /* do nothing */; - - if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP) { - /* - * time to stop - * TO DO some one free the work entries - */ - return(NULL); - } - - spin_lock(&gmnalni->gmni_rxtwe_lock); - if (gmnalni->gmni_rxtwe_head) { - CDEBUG(D_NET, "Got a work entry\n"); - we = gmnalni->gmni_rxtwe_head; - gmnalni->gmni_rxtwe_head = we->next; - if (!gmnalni->gmni_rxtwe_head) - gmnalni->gmni_rxtwe_tail = NULL; - } else { - CWARN("woken but no work\n"); - } - - spin_unlock(&gmnalni->gmni_rxtwe_lock); - } while (!we); - - CDEBUG(D_NET, "Returning we[%p]\n", we); - return(we); -} - - /* * Start the caretaker thread and a number of receiver threads * The caretaker thread gets events from the gm library. @@ -851,16 +424,21 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni) { int threads = 0; + int flag; + + INIT_LIST_HEAD(&gmnalni->gmni_rxq); + spin_lock_init(&gmnalni->gmni_rxq_lock); + sema_init(&gmnalni->gmni_rxq_wait, 0); + /* * the alarm is used to wake the caretaker thread from * gm_unknown call (sleeping) to exit it. */ CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n"); gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm); - gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET; - CDEBUG(D_NET, "Starting caretaker thread\n"); + gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET; gmnalni->gmni_ctthread_pid = kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0); if (gmnalni->gmni_ctthread_pid <= 0) { @@ -868,14 +446,13 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni) return -ENOMEM; } - while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) { + while (gmnalni->gmni_ctthread_flag != GMNAL_THREAD_RESET) { gmnal_yield(1); CDEBUG(D_NET, "Waiting for caretaker thread signs of life\n"); } CDEBUG(D_NET, "caretaker thread has started\n"); - /* * Now start a number of receiver threads * these treads get work to do from the caretaker (ct) thread @@ -883,19 +460,17 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni) gmnalni->gmni_rxthread_flag = GMNAL_THREAD_RESET; gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_RESET; + spin_lock_init(&gmnalni->gmni_rxthread_flag_lock); for (threads=0; threadsgmni_rxthread_pid[threads] = -1; - spin_lock_init(&gmnalni->gmni_rxtwe_lock); - spin_lock_init(&gmnalni->gmni_rxthread_flag_lock); - sema_init(&gmnalni->gmni_rxtwe_wait, 0); - gmnalni->gmni_rxtwe_head = NULL; - gmnalni->gmni_rxtwe_tail = NULL; + /* * If the default number of receive threades isn't * modified at load time, then start one thread per cpu */ if (num_rx_threads == -1) num_rx_threads = smp_num_cpus; + CDEBUG(D_NET, "Starting [%d] receive threads\n", num_rx_threads); for (threads=0; threadsgmni_rxthread_pid[threads] = @@ -910,11 +485,12 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni) for (;;) { spin_lock(&gmnalni->gmni_rxthread_flag_lock); - if (gmnalni->gmni_rxthread_flag == GMNAL_RXTHREADS_STARTED) { - spin_unlock(&gmnalni->gmni_rxthread_flag_lock); - break; - } + flag = gmnalni->gmni_rxthread_flag; spin_unlock(&gmnalni->gmni_rxthread_flag_lock); + + if (flag == GMNAL_RXTHREADS_STARTED) + break; + gmnal_yield(1); } @@ -922,3 +498,313 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni) return 0; } + +char * +gmnal_gmstatus2str(gm_status_t status) +{ + return(gm_strerror(status)); + + switch(status) { + case(GM_SUCCESS): + return("SUCCESS"); + case(GM_FAILURE): + return("FAILURE"); + case(GM_INPUT_BUFFER_TOO_SMALL): + return("INPUT_BUFFER_TOO_SMALL"); + case(GM_OUTPUT_BUFFER_TOO_SMALL): + return("OUTPUT_BUFFER_TOO_SMALL"); + case(GM_TRY_AGAIN ): + return("TRY_AGAIN"); + case(GM_BUSY): + return("BUSY"); + case(GM_MEMORY_FAULT): + return("MEMORY_FAULT"); + case(GM_INTERRUPTED): + return("INTERRUPTED"); + case(GM_INVALID_PARAMETER): + return("INVALID_PARAMETER"); + case(GM_OUT_OF_MEMORY): + return("OUT_OF_MEMORY"); + case(GM_INVALID_COMMAND): + return("INVALID_COMMAND"); + case(GM_PERMISSION_DENIED): + return("PERMISSION_DENIED"); + case(GM_INTERNAL_ERROR): + return("INTERNAL_ERROR"); + case(GM_UNATTACHED): + return("UNATTACHED"); + case(GM_UNSUPPORTED_DEVICE): + return("UNSUPPORTED_DEVICE"); + case(GM_SEND_TIMED_OUT): + return("GM_SEND_TIMEDOUT"); + case(GM_SEND_REJECTED): + return("GM_SEND_REJECTED"); + case(GM_SEND_TARGET_PORT_CLOSED): + return("GM_SEND_TARGET_PORT_CLOSED"); + case(GM_SEND_TARGET_NODE_UNREACHABLE): + return("GM_SEND_TARGET_NODE_UNREACHABLE"); + case(GM_SEND_DROPPED): + return("GM_SEND_DROPPED"); + case(GM_SEND_PORT_CLOSED): + return("GM_SEND_PORT_CLOSED"); + case(GM_NODE_ID_NOT_YET_SET): + return("GM_NODE_ID_NOT_YET_SET"); + case(GM_STILL_SHUTTING_DOWN): + return("GM_STILL_SHUTTING_DOWN"); + case(GM_CLONE_BUSY): + return("GM_CLONE_BUSY"); + case(GM_NO_SUCH_DEVICE): + return("GM_NO_SUCH_DEVICE"); + case(GM_ABORTED): + return("GM_ABORTED"); + case(GM_INCOMPATIBLE_LIB_AND_DRIVER): + return("GM_INCOMPATIBLE_LIB_AND_DRIVER"); + case(GM_UNTRANSLATED_SYSTEM_ERROR): + return("GM_UNTRANSLATED_SYSTEM_ERROR"); + case(GM_ACCESS_DENIED): + return("GM_ACCESS_DENIED"); + + + /* + * These ones are in the docs but aren't in the header file + case(GM_DEV_NOT_FOUND): + return("GM_DEV_NOT_FOUND"); + case(GM_INVALID_PORT_NUMBER): + return("GM_INVALID_PORT_NUMBER"); + case(GM_UC_ERROR): + return("GM_US_ERROR"); + case(GM_PAGE_TABLE_FULL): + return("GM_PAGE_TABLE_FULL"); + case(GM_MINOR_OVERFLOW): + return("GM_MINOR_OVERFLOW"); + case(GM_SEND_ORPHANED): + return("GM_SEND_ORPHANED"); + case(GM_HARDWARE_FAULT): + return("GM_HARDWARE_FAULT"); + case(GM_DATA_CORRUPTED): + return("GM_DATA_CORRUPTED"); + case(GM_TIMED_OUT): + return("GM_TIMED_OUT"); + case(GM_USER_ERROR): + return("GM_USER_ERROR"); + case(GM_NO_MATCH): + return("GM_NOMATCH"); + case(GM_NOT_SUPPORTED_IN_KERNEL): + return("GM_NOT_SUPPORTED_IN_KERNEL"); + case(GM_NOT_SUPPORTED_ON_ARCH): + return("GM_NOT_SUPPORTED_ON_ARCH"); + case(GM_PTE_REF_CNT_OVERFLOW): + return("GM_PTR_REF_CNT_OVERFLOW"); + case(GM_NO_DRIVER_SUPPORT): + return("GM_NO_DRIVER_SUPPORT"); + case(GM_FIRMWARE_NOT_RUNNING): + return("GM_FIRMWARE_NOT_RUNNING"); + * These ones are in the docs but aren't in the header file + */ + + default: + return("UNKNOWN GM ERROR CODE"); + } +} + + +char * +gmnal_rxevent2str(gm_recv_event_t *ev) +{ + short event; + event = GM_RECV_EVENT_TYPE(ev); + switch(event) { + case(GM_NO_RECV_EVENT): + return("GM_NO_RECV_EVENT"); + case(GM_SENDS_FAILED_EVENT): + return("GM_SEND_FAILED_EVENT"); + case(GM_ALARM_EVENT): + return("GM_ALARM_EVENT"); + case(GM_SENT_EVENT): + return("GM_SENT_EVENT"); + case(_GM_SLEEP_EVENT): + return("_GM_SLEEP_EVENT"); + case(GM_RAW_RECV_EVENT): + return("GM_RAW_RECV_EVENT"); + case(GM_BAD_SEND_DETECTED_EVENT): + return("GM_BAD_SEND_DETECTED_EVENT"); + case(GM_SEND_TOKEN_VIOLATION_EVENT): + return("GM_SEND_TOKEN_VIOLATION_EVENT"); + case(GM_RECV_TOKEN_VIOLATION_EVENT): + return("GM_RECV_TOKEN_VIOLATION_EVENT"); + case(GM_BAD_RECV_TOKEN_EVENT): + return("GM_BAD_RECV_TOKEN_EVENT"); + case(GM_ALARM_VIOLATION_EVENT): + return("GM_ALARM_VIOLATION_EVENT"); + case(GM_RECV_EVENT): + return("GM_RECV_EVENT"); + case(GM_HIGH_RECV_EVENT): + return("GM_HIGH_RECV_EVENT"); + case(GM_PEER_RECV_EVENT): + return("GM_PEER_RECV_EVENT"); + case(GM_HIGH_PEER_RECV_EVENT): + return("GM_HIGH_PEER_RECV_EVENT"); + case(GM_FAST_RECV_EVENT): + return("GM_FAST_RECV_EVENT"); + case(GM_FAST_HIGH_RECV_EVENT): + return("GM_FAST_HIGH_RECV_EVENT"); + case(GM_FAST_PEER_RECV_EVENT): + return("GM_FAST_PEER_RECV_EVENT"); + case(GM_FAST_HIGH_PEER_RECV_EVENT): + return("GM_FAST_HIGH_PEER_RECV_EVENT"); + case(GM_REJECTED_SEND_EVENT): + return("GM_REJECTED_SEND_EVENT"); + case(GM_ORPHANED_SEND_EVENT): + return("GM_ORPHANED_SEND_EVENT"); + case(GM_BAD_RESEND_DETECTED_EVENT): + return("GM_BAD_RESEND_DETETED_EVENT"); + case(GM_DROPPED_SEND_EVENT): + return("GM_DROPPED_SEND_EVENT"); + case(GM_BAD_SEND_VMA_EVENT): + return("GM_BAD_SEND_VMA_EVENT"); + case(GM_BAD_RECV_VMA_EVENT): + return("GM_BAD_RECV_VMA_EVENT"); + case(_GM_FLUSHED_ALARM_EVENT): + return("GM_FLUSHED_ALARM_EVENT"); + case(GM_SENT_TOKENS_EVENT): + return("GM_SENT_TOKENS_EVENTS"); + case(GM_IGNORE_RECV_EVENT): + return("GM_IGNORE_RECV_EVENT"); + case(GM_ETHERNET_RECV_EVENT): + return("GM_ETHERNET_RECV_EVENT"); + case(GM_NEW_NO_RECV_EVENT): + return("GM_NEW_NO_RECV_EVENT"); + case(GM_NEW_SENDS_FAILED_EVENT): + return("GM_NEW_SENDS_FAILED_EVENT"); + case(GM_NEW_ALARM_EVENT): + return("GM_NEW_ALARM_EVENT"); + case(GM_NEW_SENT_EVENT): + return("GM_NEW_SENT_EVENT"); + case(_GM_NEW_SLEEP_EVENT): + return("GM_NEW_SLEEP_EVENT"); + case(GM_NEW_RAW_RECV_EVENT): + return("GM_NEW_RAW_RECV_EVENT"); + case(GM_NEW_BAD_SEND_DETECTED_EVENT): + return("GM_NEW_BAD_SEND_DETECTED_EVENT"); + case(GM_NEW_SEND_TOKEN_VIOLATION_EVENT): + return("GM_NEW_SEND_TOKEN_VIOLATION_EVENT"); + case(GM_NEW_RECV_TOKEN_VIOLATION_EVENT): + return("GM_NEW_RECV_TOKEN_VIOLATION_EVENT"); + case(GM_NEW_BAD_RECV_TOKEN_EVENT): + return("GM_NEW_BAD_RECV_TOKEN_EVENT"); + case(GM_NEW_ALARM_VIOLATION_EVENT): + return("GM_NEW_ALARM_VIOLATION_EVENT"); + case(GM_NEW_RECV_EVENT): + return("GM_NEW_RECV_EVENT"); + case(GM_NEW_HIGH_RECV_EVENT): + return("GM_NEW_HIGH_RECV_EVENT"); + case(GM_NEW_PEER_RECV_EVENT): + return("GM_NEW_PEER_RECV_EVENT"); + case(GM_NEW_HIGH_PEER_RECV_EVENT): + return("GM_NEW_HIGH_PEER_RECV_EVENT"); + case(GM_NEW_FAST_RECV_EVENT): + return("GM_NEW_FAST_RECV_EVENT"); + case(GM_NEW_FAST_HIGH_RECV_EVENT): + return("GM_NEW_FAST_HIGH_RECV_EVENT"); + case(GM_NEW_FAST_PEER_RECV_EVENT): + return("GM_NEW_FAST_PEER_RECV_EVENT"); + case(GM_NEW_FAST_HIGH_PEER_RECV_EVENT): + return("GM_NEW_FAST_HIGH_PEER_RECV_EVENT"); + case(GM_NEW_REJECTED_SEND_EVENT): + return("GM_NEW_REJECTED_SEND_EVENT"); + case(GM_NEW_ORPHANED_SEND_EVENT): + return("GM_NEW_ORPHANED_SEND_EVENT"); + case(_GM_NEW_PUT_NOTIFICATION_EVENT): + return("_GM_NEW_PUT_NOTIFICATION_EVENT"); + case(GM_NEW_FREE_SEND_TOKEN_EVENT): + return("GM_NEW_FREE_SEND_TOKEN_EVENT"); + case(GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT): + return("GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT"); + case(GM_NEW_BAD_RESEND_DETECTED_EVENT): + return("GM_NEW_BAD_RESEND_DETECTED_EVENT"); + case(GM_NEW_DROPPED_SEND_EVENT): + return("GM_NEW_DROPPED_SEND_EVENT"); + case(GM_NEW_BAD_SEND_VMA_EVENT): + return("GM_NEW_BAD_SEND_VMA_EVENT"); + case(GM_NEW_BAD_RECV_VMA_EVENT): + return("GM_NEW_BAD_RECV_VMA_EVENT"); + case(_GM_NEW_FLUSHED_ALARM_EVENT): + return("GM_NEW_FLUSHED_ALARM_EVENT"); + case(GM_NEW_SENT_TOKENS_EVENT): + return("GM_NEW_SENT_TOKENS_EVENT"); + case(GM_NEW_IGNORE_RECV_EVENT): + return("GM_NEW_IGNORE_RECV_EVENT"); + case(GM_NEW_ETHERNET_RECV_EVENT): + return("GM_NEW_ETHERNET_RECV_EVENT"); + default: + return("Unknown Recv event"); + /* _GM_PUT_NOTIFICATION_EVENT */ + /* GM_FREE_SEND_TOKEN_EVENT */ + /* GM_FREE_HIGH_SEND_TOKEN_EVENT */ + } +} + + +void +gmnal_yield(int delay) +{ + set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(delay); +} + +int +gmnal_enqueue_rx(gmnal_ni_t *gmnalni, gm_recv_t *recv) +{ + void *ptr = gm_ntohp(recv->buffer); + gmnal_rx_t *rx = gm_hash_find(gmnalni->gmni_rx_hash, ptr); + + LASSERT (rx != NULL); + LASSERT (rx->rx_msg == (gmnal_msg_t *)ptr); + + rx->rx_recv_nob = gm_ntohl(recv->length); + rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id); + rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id); + rx->rx_recv_type = gm_ntoh_u8(recv->type); + + spin_lock(&gmnalni->gmni_rxq_lock); + list_add_tail (&rx->rx_list, &gmnalni->gmni_rxq); + spin_unlock(&gmnalni->gmni_rxq_lock); + + up(&gmnalni->gmni_rxq_wait); + return 0; +} + +gmnal_rx_t * +gmnal_dequeue_rx(gmnal_ni_t *gmnalni) +{ + gmnal_rx_t *rx; + + CDEBUG(D_NET, "Getting entry to list\n"); + + for (;;) { + while(down_interruptible(&gmnalni->gmni_rxq_wait) != 0) + /* do nothing */; + + if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP) + return NULL; + + spin_lock(&gmnalni->gmni_rxq_lock); + + if (list_empty(&gmnalni->gmni_rxq)) { + rx = NULL; + } else { + rx = list_entry(gmnalni->gmni_rxq.next, + gmnal_rx_t, rx_list); + list_del(&rx->rx_list); + } + + spin_unlock(&gmnalni->gmni_rxq_lock); + + if (rx != NULL) + return rx; + + CWARN("woken but no work\n"); + } +} + + -- 1.8.3.1