From f2695b84eba9e97591b86f45ec15996fc2c95dab Mon Sep 17 00:00:00 2001 From: eeb Date: Thu, 18 Aug 2005 17:43:26 +0000 Subject: [PATCH] * GMNAL - cleaned up structs - removed buggy RDMA handling (previous version is tagged GM_PRE_REMOVE_BUGGY_RDMA for reference) - fixed memory leak on teardown --- lnet/klnds/gmlnd/gmlnd.h | 244 +++------ lnet/klnds/gmlnd/gmlnd_api.c | 266 ++++----- lnet/klnds/gmlnd/gmlnd_cb.c | 394 ++++++------- lnet/klnds/gmlnd/gmlnd_comm.c | 1153 ++++++--------------------------------- lnet/klnds/gmlnd/gmlnd_module.c | 44 +- lnet/klnds/gmlnd/gmlnd_utils.c | 637 +++++++++------------ 6 files changed, 819 insertions(+), 1919 deletions(-) diff --git a/lnet/klnds/gmlnd/gmlnd.h b/lnet/klnds/gmlnd/gmlnd.h index 6076d14..fe39506 100644 --- a/lnet/klnds/gmlnd/gmlnd.h +++ b/lnet/klnds/gmlnd/gmlnd.h @@ -99,11 +99,7 @@ #define GMNAL_MAGIC 0x1234abcd #define GMNAL_SMALL_MESSAGE 1078 -#define GMNAL_LARGE_MESSAGE_INIT 1079 -#define GMNAL_LARGE_MESSAGE_ACK 1080 -#define GMNAL_LARGE_MESSAGE_FINI 1081 -extern int gmnal_small_msg_size; extern int num_rx_threads; extern int num_stxds; extern int gm_port_id; @@ -117,67 +113,58 @@ extern int gm_port_id; * and the other by the NAL rxthreads when doing sends. * This helps prevent deadlock caused by stxd starvation. */ -typedef struct _gmnal_stxd_t { - void *buffer; - int buffer_size; - gm_size_t gm_size; - int msg_size; - int gm_target_node; - int gm_priority; - int type; - struct _gmnal_data_t *nal_data; - lib_msg_t *cookie; - int niov; - struct iovec iov[PTL_MD_MAX_IOV]; - struct _gmnal_stxd_t *next; - int rxt; - int kniov; - struct iovec *iovec_dup; +typedef struct gmnal_stxd { + struct gmnal_stxd *tx_next; + void *tx_buffer; + int tx_buffer_size; + gm_size_t tx_gm_size; + int tx_msg_size; + int tx_gmlid; + int tx_gm_priority; + int tx_type; + ptl_nid_t tx_nid; + struct gmnal_ni *tx_gmni; + lib_msg_t *tx_cookie; + int tx_niov; + int tx_rxt; + int tx_kniov; + struct iovec *tx_iovec_dup; + struct iovec tx_iov[PTL_MD_MAX_IOV]; } gmnal_stxd_t; /* - * keeps a transmit token for large transmit (gm_get) - * and a pointer to rxd that is used as context for large receive - */ -typedef struct _gmnal_ltxd_t { - struct _gmnal_ltxd_t *next; - struct _gmnal_srxd_t *srxd; -} gmnal_ltxd_t; - - -/* * as for gmnal_stxd_t * a hash table in nal_data find srxds from * the rx buffer address. hash table populated at init time */ -typedef struct _gmnal_srxd_t { - void *buffer; - int size; - gm_size_t gmsize; - unsigned int gm_source_node; - gmnal_stxd_t *source_stxd; - int type; - int nsiov; - int nriov; - struct iovec *riov; - int ncallbacks; - spinlock_t callback_lock; - int callback_status; - lib_msg_t *cookie; - struct _gmnal_srxd_t *next; - struct _gmnal_data_t *nal_data; +typedef struct gmnal_srxd { + void *rx_buffer; + int rx_size; + gm_size_t rx_gmsize; + unsigned int rx_sender_gmid; + __u64 rx_source_stxd; + int rx_type; + int rx_nsiov; + int rx_nriov; + struct iovec *rx_riov; + int rx_ncallbacks; + spinlock_t rx_callback_lock; + int rx_callback_status; + lib_msg_t *rx_cookie; + struct gmnal_srxd *rx_next; + struct gmnal_ni *rx_gmni; } gmnal_srxd_t; /* * Header which lmgnal puts at the start of each message * watch alignment for ia32/64 interaction */ -typedef struct _gmnal_msghdr { - __s32 magic; - __s32 type; - __u32 sender_node_id; - __s32 niov; - gm_remote_ptr_t stxd_remote_ptr; /* 64 bits */ +typedef struct gmnal_msghdr { + __s32 gmm_magic; + __s32 gmm_type; + __s32 gmm_niov; + __u32 gmm_sender_gmid; + __u64 gmm_stxd_remote_ptr; } WIRE_ATTR gmnal_msghdr_t; /* @@ -192,13 +179,13 @@ typedef struct _gmnal_msghdr { * is exhausted (as caretaker thread is responsible for replacing * transmit descriptors on the free list) */ -typedef struct _gmnal_rxtwe { +typedef struct gmnal_rxtwe { void *buffer; unsigned snode; unsigned sport; unsigned type; unsigned length; - struct _gmnal_rxtwe *next; + struct gmnal_rxtwe *next; } gmnal_rxtwe_t; /* @@ -206,43 +193,35 @@ typedef struct _gmnal_rxtwe { */ #define NRXTHREADS 10 /* max number of receiver threads */ -typedef struct _gmnal_data_t { - int refcnt; - spinlock_t cb_lock; - spinlock_t stxd_lock; - struct semaphore stxd_token; - gmnal_stxd_t *stxd; - spinlock_t rxt_stxd_lock; - struct semaphore rxt_stxd_token; - gmnal_stxd_t *rxt_stxd; - spinlock_t ltxd_lock; - struct semaphore ltxd_token; - gmnal_ltxd_t *ltxd; - spinlock_t srxd_lock; - struct semaphore srxd_token; - gmnal_srxd_t *srxd; - struct gm_hash *srxd_hash; - nal_t *nal; - lib_nal_t *libnal; - struct gm_port *gm_port; - unsigned int gm_local_nid; - unsigned int gm_global_nid; - spinlock_t gm_lock; - long rxthread_pid[NRXTHREADS]; - int rxthread_stop_flag; - spinlock_t rxthread_flag_lock; - long rxthread_flag; - long ctthread_pid; - int ctthread_flag; - gm_alarm_t ctthread_alarm; - int small_msg_size; - int small_msg_gmsize; - gmnal_rxtwe_t *rxtwe_head; - gmnal_rxtwe_t *rxtwe_tail; - spinlock_t rxtwe_lock; - struct semaphore rxtwe_wait; - struct ctl_table_header *sysctl; -} gmnal_data_t; +typedef struct gmnal_ni { + spinlock_t gmni_stxd_lock; + struct semaphore gmni_stxd_token; + gmnal_stxd_t *gmni_stxd; + spinlock_t gmni_rxt_stxd_lock; + struct semaphore gmni_rxt_stxd_token; + gmnal_stxd_t *gmni_rxt_stxd; + gmnal_srxd_t *gmni_srxd; + struct gm_hash *gmni_srxd_hash; + nal_t *gmni_nal; + lib_nal_t *gmni_libnal; + struct gm_port *gmni_port; + __u32 gmni_local_gmid; + __u32 gmni_global_gmid; + spinlock_t gmni_gm_lock; /* serialise GM calls */ + long gmni_rxthread_pid[NRXTHREADS]; + int gmni_rxthread_stop_flag; + spinlock_t gmni_rxthread_flag_lock; + long gmni_rxthread_flag; + long gmni_ctthread_pid; + int gmni_ctthread_flag; + gm_alarm_t gmni_ctthread_alarm; + int gmni_small_msg_size; + int gmni_small_msg_gmsize; + gmnal_rxtwe_t *gmni_rxtwe_head; + gmnal_rxtwe_t *gmni_rxtwe_tail; + spinlock_t gmni_rxtwe_lock; + struct semaphore gmni_rxtwe_wait; +} gmnal_ni_t; /* * Flags to start/stop and check status of threads @@ -255,22 +234,12 @@ typedef struct _gmnal_data_t { #define GMNAL_RXTHREADS_STARTED ( (1< called on last matching PtlNIFini() * Close down this interface and free any resources associated with it @@ -62,7 +38,7 @@ static ctl_table gmnalnal_top_sysctl_table[] = { void gmnal_api_shutdown(nal_t *nal) { - gmnal_data_t *nal_data; + gmnal_ni_t *gmnalni; lib_nal_t *libnal; if (nal->nal_refct != 0) { @@ -71,11 +47,9 @@ gmnal_api_shutdown(nal_t *nal) return; } - LASSERT(nal == global_nal_data->nal); libnal = (lib_nal_t *)nal->nal_data; - nal_data = (gmnal_data_t *)libnal->libnal_data; - LASSERT(nal_data == global_nal_data); - CDEBUG(D_TRACE, "gmnal_api_shutdown: nal_data [%p]\n", nal_data); + gmnalni = (gmnal_ni_t *)libnal->libnal_data; + CDEBUG(D_TRACE, "gmnal_api_shutdown: gmnalni [%p]\n", gmnalni); /* Stop portals calling our ioctl handler */ libcfs_nal_cmd_unregister(GMNAL); @@ -86,21 +60,17 @@ gmnal_api_shutdown(nal_t *nal) * shutdown our threads, THEN lib_fini() */ lib_fini(libnal); - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - gmnal_free_txd(nal_data); - gmnal_free_srxd(nal_data); - spin_lock(&nal_data->gm_lock); - gm_close(nal_data->gm_port); + gmnal_stop_rxthread(gmnalni); + gmnal_stop_ctthread(gmnalni); + gmnal_free_txd(gmnalni); + gmnal_free_srxd(gmnalni); + spin_lock(&gmnalni->gmni_gm_lock); + gm_close(gmnalni->gmni_port); gm_finalize(); - spin_unlock(&nal_data->gm_lock); - if (nal_data->sysctl) - unregister_sysctl_table (nal_data->sysctl); + spin_unlock(&gmnalni->gmni_gm_lock); /* Don't free 'nal'; it's a static struct */ - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); - - global_nal_data = NULL; } @@ -111,10 +81,10 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, { lib_nal_t *libnal = NULL; - gmnal_data_t *nal_data = NULL; + gmnal_ni_t *gmnalni = NULL; gmnal_srxd_t *srxd = NULL; gm_status_t gm_status; - unsigned int local_nid = 0, global_nid = 0; + unsigned int local_gmid = 0, global_gmid = 0; ptl_process_id_t process_id; if (nal->nal_refct != 0) { @@ -130,24 +100,22 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, CDEBUG(D_TRACE, "startup\n"); - LASSERT(global_nal_data == NULL); - - PORTAL_ALLOC(nal_data, sizeof(gmnal_data_t)); - if (!nal_data) { + PORTAL_ALLOC(gmnalni, sizeof(gmnal_ni_t)); + if (!gmnalni) { CERROR("can't get memory\n"); return(PTL_NO_SPACE); } - memset(nal_data, 0, sizeof(gmnal_data_t)); + memset(gmnalni, 0, sizeof(gmnal_ni_t)); /* * set the small message buffer size */ - CDEBUG(D_INFO, "Allocd and reset nal_data[%p]\n", nal_data); - CDEBUG(D_INFO, "small_msg_size is [%d]\n", nal_data->small_msg_size); + CDEBUG(D_NET, "Allocd and reset gmnalni[%p]\n", gmnalni); + CDEBUG(D_NET, "small_msg_size is [%d]\n", gmnalni->gmni_small_msg_size); PORTAL_ALLOC(libnal, sizeof(lib_nal_t)); if (!libnal) { - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); return(PTL_NO_SPACE); } @@ -159,27 +127,23 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, libnal->libnal_map = NULL; libnal->libnal_unmap = NULL; libnal->libnal_dist = gmnal_cb_dist; - libnal->libnal_data = NULL; + libnal->libnal_data = gmnalni; - CDEBUG(D_INFO, "Allocd and reset libnal[%p]\n", libnal); + CDEBUG(D_NET, "Allocd and reset libnal[%p]\n", libnal); - /* - * String them all together - */ - libnal->libnal_data = (void*)nal_data; - nal_data->nal = nal; - nal_data->libnal = libnal; + gmnalni->gmni_nal = nal; + gmnalni->gmni_libnal = libnal; - spin_lock_init(&nal_data->gm_lock); + spin_lock_init(&gmnalni->gmni_gm_lock); /* * initialise the interface, */ - CDEBUG(D_INFO, "Calling gm_init\n"); + CDEBUG(D_NET, "Calling gm_init\n"); if (gm_init() != GM_SUCCESS) { CERROR("call to gm_init failed\n"); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); return(PTL_FAIL); } @@ -189,14 +153,14 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, "name [%s], version [%d]\n", gm_port_id, "gmnal", GM_API_VERSION); - spin_lock(&nal_data->gm_lock); - gm_status = gm_open(&nal_data->gm_port, 0, gm_port_id, "gmnal", + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_open(&gmnalni->gmni_port, 0, gm_port_id, "gmnal", GM_API_VERSION); - spin_unlock(&nal_data->gm_lock); + spin_unlock(&gmnalni->gmni_gm_lock); - CDEBUG(D_INFO, "gm_open returned [%d]\n", gm_status); + CDEBUG(D_NET, "gm_open returned [%d]\n", gm_status); if (gm_status == GM_SUCCESS) { - CDEBUG(D_INFO,"gm_open succeeded port[%p]\n",nal_data->gm_port); + CDEBUG(D_NET,"gm_open succeeded port[%p]\n",gmnalni->gmni_port); } else { switch(gm_status) { case(GM_INVALID_PARAMETER): @@ -219,26 +183,31 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, gm_status); break; } - spin_lock(&nal_data->gm_lock); + spin_lock(&gmnalni->gmni_gm_lock); gm_finalize(); - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + spin_unlock(&gmnalni->gmni_gm_lock); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); return(PTL_FAIL); } - nal_data->small_msg_size = gmnal_small_msg_size; - nal_data->small_msg_gmsize = - gm_min_size_for_length(gmnal_small_msg_size); + gmnalni->gmni_small_msg_size = sizeof(gmnal_msghdr_t) + + sizeof(ptl_hdr_t) + + PTL_MTU + + 928; /* !! */ + CWARN("Msg size %08x\n", gmnalni->gmni_small_msg_size); - if (gmnal_alloc_srxd(nal_data) != GMNAL_STATUS_OK) { + gmnalni->gmni_small_msg_gmsize = + gm_min_size_for_length(gmnalni->gmni_small_msg_size); + + if (gmnal_alloc_srxd(gmnalni) != 0) { CERROR("Failed to allocate small rx descriptors\n"); - gmnal_free_txd(nal_data); - spin_lock(&nal_data->gm_lock); - gm_close(nal_data->gm_port); + gmnal_free_txd(gmnalni); + spin_lock(&gmnalni->gmni_gm_lock); + gm_close(gmnalni->gmni_port); gm_finalize(); - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + spin_unlock(&gmnalni->gmni_gm_lock); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); return(PTL_FAIL); } @@ -248,26 +217,27 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, * Hang out a bunch of small receive buffers * In fact hang them all out */ - while((srxd = gmnal_get_srxd(nal_data, 0))) { + for (srxd = gmnalni->gmni_srxd; srxd != NULL; srxd = srxd->rx_next) { CDEBUG(D_NET, "giving [%p] to gm_provide_recvive_buffer\n", - srxd->buffer); - spin_lock(&nal_data->gm_lock); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, - srxd->buffer, srxd->gmsize, + srxd->rx_buffer); + spin_lock(&gmnalni->gmni_gm_lock); + gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, + srxd->rx_buffer, + srxd->rx_gmsize, GM_LOW_PRIORITY, 0); - spin_unlock(&nal_data->gm_lock); + spin_unlock(&gmnalni->gmni_gm_lock); } /* * Allocate pools of small tx buffers and descriptors */ - if (gmnal_alloc_txd(nal_data) != GMNAL_STATUS_OK) { + if (gmnal_alloc_txd(gmnalni) != 0) { CERROR("Failed to allocate small tx descriptors\n"); - spin_lock(&nal_data->gm_lock); - gm_close(nal_data->gm_port); + spin_lock(&gmnalni->gmni_gm_lock); + gm_close(gmnalni->gmni_port); gm_finalize(); - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + spin_unlock(&gmnalni->gmni_gm_lock); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); return(PTL_FAIL); } @@ -276,71 +246,71 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, * Initialise the portals library */ CDEBUG(D_NET, "Getting node id\n"); - spin_lock(&nal_data->gm_lock); - gm_status = gm_get_node_id(nal_data->gm_port, &local_nid); - spin_unlock(&nal_data->gm_lock); + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_get_node_id(gmnalni->gmni_port, &local_gmid); + spin_unlock(&gmnalni->gmni_gm_lock); if (gm_status != GM_SUCCESS) { - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); + gmnal_stop_rxthread(gmnalni); + gmnal_stop_ctthread(gmnalni); CERROR("can't determine node id\n"); - gmnal_free_txd(nal_data); - gmnal_free_srxd(nal_data); - spin_lock(&nal_data->gm_lock); - gm_close(nal_data->gm_port); + gmnal_free_txd(gmnalni); + gmnal_free_srxd(gmnalni); + spin_lock(&gmnalni->gmni_gm_lock); + gm_close(gmnalni->gmni_port); gm_finalize(); - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + spin_unlock(&gmnalni->gmni_gm_lock); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); return(PTL_FAIL); } - nal_data->gm_local_nid = local_nid; - CDEBUG(D_INFO, "Local node id is [%u]\n", local_nid); + gmnalni->gmni_local_gmid = local_gmid; + CDEBUG(D_NET, "Local node id is [%u]\n", local_gmid); - spin_lock(&nal_data->gm_lock); - gm_status = gm_node_id_to_global_id(nal_data->gm_port, local_nid, - &global_nid); - spin_unlock(&nal_data->gm_lock); + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, + local_gmid, + &global_gmid); + spin_unlock(&gmnalni->gmni_gm_lock); if (gm_status != GM_SUCCESS) { CERROR("failed to obtain global id\n"); - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - gmnal_free_txd(nal_data); - gmnal_free_srxd(nal_data); - spin_lock(&nal_data->gm_lock); - gm_close(nal_data->gm_port); + gmnal_stop_rxthread(gmnalni); + gmnal_stop_ctthread(gmnalni); + gmnal_free_txd(gmnalni); + gmnal_free_srxd(gmnalni); + spin_lock(&gmnalni->gmni_gm_lock); + gm_close(gmnalni->gmni_port); gm_finalize(); - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + spin_unlock(&gmnalni->gmni_gm_lock); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); return(PTL_FAIL); } - CDEBUG(D_INFO, "Global node id is [%u]\n", global_nid); - nal_data->gm_global_nid = global_nid; - snprintf(global_nid_str, GLOBAL_NID_STR_LEN, "%u", global_nid); + CDEBUG(D_NET, "Global node id is [%u]\n", global_gmid); + gmnalni->gmni_global_gmid = global_gmid; /* pid = gm_getpid(); */ process_id.pid = requested_pid; - process_id.nid = global_nid; + process_id.nid = global_gmid; - CDEBUG(D_INFO, "portals_pid is [%u]\n", process_id.pid); - CDEBUG(D_INFO, "portals_nid is ["LPU64"]\n", process_id.nid); + CDEBUG(D_NET, "portals_pid is [%u]\n", process_id.pid); + CDEBUG(D_NET, "portals_nid is ["LPU64"]\n", process_id.nid); CDEBUG(D_PORTALS, "calling lib_init\n"); if (lib_init(libnal, nal, process_id, requested_limits, actual_limits) != PTL_OK) { CERROR("lib_init failed\n"); - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - gmnal_free_txd(nal_data); - gmnal_free_srxd(nal_data); - spin_lock(&nal_data->gm_lock); - gm_close(nal_data->gm_port); + gmnal_stop_rxthread(gmnalni); + gmnal_stop_ctthread(gmnalni); + gmnal_free_txd(gmnalni); + gmnal_free_srxd(gmnalni); + spin_lock(&gmnalni->gmni_gm_lock); + gm_close(gmnalni->gmni_port); gm_finalize(); - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + spin_unlock(&gmnalni->gmni_gm_lock); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); return(PTL_FAIL); } @@ -349,43 +319,36 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, * Now that we have initialised the portals library, start receive threads, * we do this to avoid processing messages before we can parse them */ - gmnal_start_kernel_threads(nal_data); + gmnal_start_kernel_threads(gmnalni); - while (nal_data->rxthread_flag != GMNAL_RXTHREADS_STARTED) { + while (gmnalni->gmni_rxthread_flag != GMNAL_RXTHREADS_STARTED) { gmnal_yield(1); - CDEBUG(D_INFO, "Waiting for receive thread signs of life\n"); + CDEBUG(D_NET, "Waiting for receive thread signs of life\n"); } - CDEBUG(D_INFO, "receive thread seems to have started\n"); + CDEBUG(D_NET, "receive thread seems to have started\n"); if (libcfs_nal_cmd_register(GMNAL, &gmnal_cmd, libnal->libnal_data) != 0) { - CDEBUG(D_INFO, "libcfs_nal_cmd_register failed\n"); + CDEBUG(D_NET, "libcfs_nal_cmd_register failed\n"); /* XXX these cleanup cases should be restructured to * minimise duplication... */ lib_fini(libnal); - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - gmnal_free_txd(nal_data); - gmnal_free_srxd(nal_data); - spin_lock(&nal_data->gm_lock); - gm_close(nal_data->gm_port); + gmnal_stop_rxthread(gmnalni); + gmnal_stop_ctthread(gmnalni); + gmnal_free_txd(gmnalni); + gmnal_free_srxd(gmnalni); + spin_lock(&gmnalni->gmni_gm_lock); + gm_close(gmnalni->gmni_port); gm_finalize(); - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); + spin_unlock(&gmnalni->gmni_gm_lock); + PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t)); PORTAL_FREE(libnal, sizeof(lib_nal_t)); return(PTL_FAIL); } - /* might be better to initialise this at module load rather than in - * NAL startup */ - nal_data->sysctl = NULL; - nal_data->sysctl = register_sysctl_table (gmnalnal_top_sysctl_table, 0); - - CDEBUG(D_INFO, "gmnal_init finished\n"); - - global_nal_data = libnal->libnal_data; + CDEBUG(D_NET, "gmnal_init finished\n"); return(PTL_OK); } @@ -399,7 +362,7 @@ int gmnal_init(void) { int rc; - CDEBUG(D_INFO, "reset nal[%p]\n", &the_gm_nal); + CDEBUG(D_NET, "reset nal[%p]\n", &the_gm_nal); the_gm_nal = (nal_t) { .nal_ni_init = gmnal_api_startup, @@ -430,5 +393,4 @@ void gmnal_fini() PtlNIFini(kgmnal_ni); ptl_unregister_nal(GMNAL); - LASSERT(global_nal_data == NULL); } diff --git a/lnet/klnds/gmlnd/gmlnd_cb.c b/lnet/klnds/gmlnd/gmlnd_cb.c index d94bb88..ac4c485e 100644 --- a/lnet/klnds/gmlnd/gmlnd_cb.c +++ b/lnet/klnds/gmlnd/gmlnd_cb.c @@ -31,281 +31,213 @@ ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie, unsigned int niov, struct iovec *iov, size_t offset, size_t mlen, size_t rlen) { - void *buffer = NULL; gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - int status = PTL_OK; - size_t msglen = mlen; - size_t nob; + size_t nobleft = mlen; + void *buffer = NULL; + size_t nob; CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], " "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", libnal, private, cookie, niov, iov, offset, mlen, rlen); - switch(srxd->type) { - case(GMNAL_SMALL_MESSAGE): - CDEBUG(D_INFO, "gmnal_cb_recv got small message\n"); - /* HP SFS 1380: Proactively change receives to avoid a receive - * side occurrence of filling pkmap_count[]. - */ - buffer = srxd->buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); - - while(niov--) { - if (offset >= iov->iov_len) { - offset -= iov->iov_len; - } else { - nob = MIN (iov->iov_len - offset, msglen); - CDEBUG(D_INFO, "processing iov [%p] base [%p] " - "offset [%d] len ["LPSZ"] to [%p] left " - "["LPSZ"]\n", iov, iov->iov_base, - offset, nob, buffer, msglen); - gm_bcopy(buffer, iov->iov_base + offset, nob); - buffer += nob; - msglen -= nob; - offset = 0; - } - iov++; - } - status = gmnal_small_rx(libnal, private, cookie); - break; - case(GMNAL_LARGE_MESSAGE_INIT): - CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n"); - status = gmnal_large_rx(libnal, private, cookie, niov, - iov, offset, mlen, rlen); - } + LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE); + + buffer = srxd->rx_buffer; + buffer += sizeof(gmnal_msghdr_t); + buffer += sizeof(ptl_hdr_t); + + while(nobleft > 0) { + LASSERT (niov > 0); + + if (offset >= iov->iov_len) { + offset -= iov->iov_len; + } else { + nob = MIN (iov->iov_len - offset, nobleft); - CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status); - return(status); + gm_bcopy(buffer, iov->iov_base + offset, nob); + + buffer += nob; + nobleft -= nob; + offset = 0; + } + niov--; + iov++; + } + + lib_finalize(libnal, private, cookie, PTL_OK); + return PTL_OK; } ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, - lib_msg_t *cookie, unsigned int kniov, + lib_msg_t *cookie, unsigned int nkiov, ptl_kiov_t *kiov, size_t offset, size_t mlen, size_t rlen) { gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - int status = PTL_OK; - char *ptr = NULL; - void *buffer = NULL; - + size_t nobleft = mlen; + size_t nob; + char *ptr; + void *buffer; CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], " - "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", - libnal, private, cookie, kniov, kiov, offset, mlen, rlen); - - if (srxd->type == GMNAL_SMALL_MESSAGE) { - size_t msglen = mlen; - size_t nob; - - buffer = srxd->buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); - - /* - * map each page and create an iovec for it - */ - while (kniov--) { - /* HP SFS 1380: Proactively change receives to avoid a - * receive side occurrence of filling pkmap_count[]. - */ - CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", - kniov, kiov); - - if (offset >= kiov->kiov_len) { - offset -= kiov->kiov_len; - } else { - nob = MIN (kiov->kiov_len - offset, msglen); - CDEBUG(D_INFO, "kniov page [%p] len [%d] " - "offset[%d]\n", kiov->kiov_page, - kiov->kiov_len, kiov->kiov_offset); - ptr = ((char *)kmap(kiov->kiov_page)) + - kiov->kiov_offset; - - CDEBUG(D_INFO, "processing ptr [%p] offset [%d] " - "len ["LPSZ"] from [%p] left ["LPSZ"]\n", - ptr, offset, nob, buffer, msglen); - gm_bcopy(buffer, ptr + offset, nob); - kunmap(kiov->kiov_page); - buffer += nob; - msglen -= nob; - offset = 0; - } - kiov++; + "cookie[%p], kniov[%d], kiov [%p], " + "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", + libnal, private, cookie, nkiov, kiov, offset, mlen, rlen); + + LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE); + + buffer = srxd->rx_buffer; + buffer += sizeof(gmnal_msghdr_t); + buffer += sizeof(ptl_hdr_t); + + while (nobleft > 0) { + LASSERT (nkiov > 0); + + if (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + } else { + nob = MIN (kiov->kiov_len - offset, nobleft); + + ptr = ((char *)kmap(kiov->kiov_page)) + + kiov->kiov_offset; + + gm_bcopy(buffer, ptr + offset, nob); + + kunmap(kiov->kiov_page); + + buffer += nob; + nobleft -= nob; + offset = 0; } - CDEBUG(D_INFO, "calling gmnal_small_rx\n"); - status = gmnal_small_rx(libnal, private, cookie); + kiov++; + nkiov--; } - CDEBUG(D_INFO, "gmnal_return status [%d]\n", status); - return(status); + lib_finalize(libnal, private, cookie, PTL_OK); + + return PTL_OK; } ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, struct iovec *iov, size_t offset, - size_t len) + unsigned int niov, struct iovec *iov, + size_t offset, size_t len) { - gmnal_data_t *nal_data; + gmnal_ni_t *gmnalni = libnal->libnal_data; void *buffer = NULL; gmnal_stxd_t *stxd = NULL; - - - CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ - "] nid["LPU64"]\n", niov, offset, len, nid); - nal_data = libnal->libnal_data; - if (!nal_data) { - CERROR("no nal_data\n"); - return(PTL_FAIL); - } else { - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - } - - if (gmnal_is_small_msg(nal_data, niov, iov, len)) { - size_t msglen = len; - size_t nob; - - CDEBUG(D_INFO, "This is a small message send\n"); - /* - * HP SFS 1380: With the change to gmnal_small_tx, need to get - * the stxd and do relevant setup here - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); - /* Set the offset of the data to copy into the buffer */ - buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t); - while(niov--) { - if (offset >= iov->iov_len) { - offset -= iov->iov_len; - } else { - nob = MIN (iov->iov_len - offset, msglen); - CDEBUG(D_INFO, "processing iov [%p] base [%p]" - " offset [%d] len ["LPSZ"] to [%p] left" - " ["LPSZ"]\n", iov, iov->iov_base, - offset, nob, buffer, msglen); - gm_bcopy(iov->iov_base + offset, buffer, nob); - buffer += nob; - msglen -= nob; - offset = 0; - } - iov++; - } - gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid, - stxd, len); - } else { - CERROR("Large message send is not supported\n"); - lib_finalize(libnal, private, cookie, PTL_FAIL); - return(PTL_FAIL); - gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid, - niov, iov, offset, len); - } - return(PTL_OK); + size_t nobleft = len; + size_t nob; + ptl_err_t rc; + + CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] " + "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid); + + if ((nid >> 32) != 0) { + CERROR("Illegal nid: "LPU64"\n", nid); + return PTL_FAIL; + } + + stxd = gmnal_get_stxd(gmnalni, 1); + CDEBUG(D_NET, "stxd [%p]\n", stxd); + + /* Set the offset of the data to copy into the buffer */ + buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t); + + while(nobleft > 0) { + LASSERT (niov > 0); + + if (offset >= iov->iov_len) { + offset -= iov->iov_len; + } else { + nob = MIN (iov->iov_len - offset, nobleft); + + gm_bcopy(iov->iov_base + offset, buffer, nob); + + buffer += nob; + nobleft -= nob; + offset = 0; + } + niov--; + iov++; + } + + rc = gmnal_small_tx(libnal, private, cookie, hdr, type, + nid, stxd, len); + if (rc != PTL_OK) + gmnal_return_stxd(gmnalni, stxd); + + return rc; } ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, - ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov, + ptl_nid_t nid, ptl_pid_t pid, unsigned int nkiov, ptl_kiov_t *kiov, size_t offset, size_t len) { - gmnal_data_t *nal_data; - char *ptr; + gmnal_ni_t *gmnalni = libnal->libnal_data; void *buffer = NULL; gmnal_stxd_t *stxd = NULL; - ptl_err_t status = PTL_OK; + size_t nobleft = len; + char *ptr; + ptl_err_t rc; + size_t nob; CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset[" - LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len); - nal_data = libnal->libnal_data; - if (!nal_data) { - CERROR("no nal_data\n"); - return(PTL_FAIL); - } else { - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - } + LPSZ"] len["LPSZ"]\n", nid, nkiov, offset, len); + + if ((nid >> 32) != 0) { + CERROR("Illegal nid: "LPU64"\n", nid); + return PTL_FAIL; + } + + stxd = gmnal_get_stxd(gmnalni, 1); + CDEBUG(D_NET, "stxd [%p]\n", stxd); - /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap - * more aggressively. This is the fix for a livelock situation under - * load on ia32 that occurs when there are no more available entries in - * the pkmap_count array. Just fill the buffer and let gmnal_small_tx - * put the headers in after we pass it the stxd pointer. - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); /* Set the offset of the data to copy into the buffer */ - buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t); - - if (gmnal_is_small_msg(nal_data, 0, NULL, len)) { - size_t msglen = len; - size_t nob; - - CDEBUG(D_INFO, "This is a small message send\n"); - - while(kniov--) { - CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov); - if (offset >= kiov->kiov_len) { - offset -= kiov->kiov_len; - } else { - nob = MIN (kiov->kiov_len - offset, msglen); - CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n", - kiov->kiov_page, kiov->kiov_len, - kiov->kiov_offset); - - ptr = ((char *)kmap(kiov->kiov_page)) + - kiov->kiov_offset; - - CDEBUG(D_INFO, "processing ptr [%p] offset [%d]" - " len ["LPSZ"] to [%p] left ["LPSZ"]\n", - ptr, offset, nob, buffer, msglen); - gm_bcopy(ptr + offset, buffer, nob); - kunmap(kiov->kiov_page); - buffer += nob; - msglen -= nob; - offset = 0; - } - kiov++; - } - status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid, - pid, stxd, len); - } else { - int i = 0; - struct iovec *iovec = NULL, *iovec_dup = NULL; - ptl_kiov_t *kiov_dup = kiov; - - PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec)); - iovec_dup = iovec; - CERROR("Large message send it is not supported yet\n"); - PORTAL_FREE(iovec, kniov*sizeof(struct iovec)); - return(PTL_FAIL); - for (i=0; ikiov_page, kiov->kiov_len, - kiov->kiov_offset); - - iovec->iov_base = kmap(kiov->kiov_page) - + kiov->kiov_offset; - iovec->iov_len = kiov->kiov_len; - iovec++; - kiov++; - } - gmnal_large_tx(libnal, private, cookie, hdr, type, nid, - pid, kniov, iovec, offset, len); - for (i=0; ikiov_page); - kiov_dup++; - } - PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec)); - } - return(status); + buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t); + + while (nobleft > 0) { + LASSERT (nkiov > 0); + + if (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + } else { + nob = MIN (kiov->kiov_len - offset, nobleft); + + ptr = ((char *)kmap(kiov->kiov_page)) + + kiov->kiov_offset; + + gm_bcopy(ptr + offset, buffer, nob); + + kunmap(kiov->kiov_page); + + buffer += nob; + nobleft -= nob; + offset = 0; + } + nkiov--; + kiov++; + } + + rc = gmnal_small_tx(libnal, private, cookie, hdr, type, + nid, stxd, len); + + if (rc != PTL_OK) + gmnal_return_stxd(gmnalni, stxd); + + return rc; } int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist) { CDEBUG(D_TRACE, "gmnal_cb_dist\n"); - if (dist) - *dist = 27; - return(PTL_OK); + + if (dist != NULL) + *dist = 1; + + return PTL_OK; } diff --git a/lnet/klnds/gmlnd/gmlnd_comm.c b/lnet/klnds/gmlnd/gmlnd_comm.c index c618680..4720099 100644 --- a/lnet/klnds/gmlnd/gmlnd_comm.c +++ b/lnet/klnds/gmlnd/gmlnd_comm.c @@ -36,41 +36,41 @@ int gmnal_ct_thread(void *arg) { - gmnal_data_t *nal_data; + gmnal_ni_t *gmnalni; gm_recv_event_t *rxevent = NULL; gm_recv_t *recv = NULL; if (!arg) { - CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); + CDEBUG(D_NET, "NO gmnalni. Exiting\n"); return(-1); } - nal_data = (gmnal_data_t*)arg; - CDEBUG(D_TRACE, "nal_data is [%p]\n", arg); + gmnalni = (gmnal_ni_t*)arg; + CDEBUG(D_NET, "gmnalni is [%p]\n", arg); sprintf(current->comm, "gmnal_ct"); kportal_daemonize("gmnalctd"); - nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED; + gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED; - spin_lock(&nal_data->gm_lock); - while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) { + spin_lock(&gmnalni->gmni_gm_lock); + while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) { CDEBUG(D_NET, "waiting\n"); - rxevent = gm_blocking_receive_no_spin(nal_data->gm_port); - if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { - CDEBUG(D_INFO, "time to exit\n"); + rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port); + if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) { + CDEBUG(D_NET, "time to exit\n"); break; } - CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); + CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent(rxevent)); switch (GM_RECV_EVENT_TYPE(rxevent)) { case(GM_RECV_EVENT): CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n"); recv = (gm_recv_t*)&rxevent->recv; - spin_unlock(&nal_data->gm_lock); - gmnal_add_rxtwe(nal_data, recv); - spin_lock(&nal_data->gm_lock); + spin_unlock(&gmnalni->gmni_gm_lock); + gmnal_add_rxtwe(gmnalni, recv); + spin_lock(&gmnalni->gmni_gm_lock); CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n"); break; case(_GM_SLEEP_EVENT): @@ -80,10 +80,10 @@ gmnal_ct_thread(void *arg) * Don't know what this is */ CDEBUG(D_NET, "Sleeping in gm_unknown\n"); - spin_unlock(&nal_data->gm_lock); - gm_unknown(nal_data->gm_port, rxevent); - spin_lock(&nal_data->gm_lock); - CDEBUG(D_INFO, "Awake from gm_unknown\n"); + spin_unlock(&gmnalni->gmni_gm_lock); + gm_unknown(gmnalni->gmni_port, rxevent); + spin_lock(&gmnalni->gmni_gm_lock); + CDEBUG(D_NET, "Awake from gm_unknown\n"); break; default: @@ -94,89 +94,87 @@ gmnal_ct_thread(void *arg) * FAST_RECV_EVENTS here. */ CDEBUG(D_NET, "Passing event to gm_unknown\n"); - spin_unlock(&nal_data->gm_lock); - gm_unknown(nal_data->gm_port, rxevent); - spin_lock(&nal_data->gm_lock); - CDEBUG(D_INFO, "Processed unknown event\n"); + spin_unlock(&gmnalni->gmni_gm_lock); + gm_unknown(gmnalni->gmni_port, rxevent); + spin_lock(&gmnalni->gmni_gm_lock); + CDEBUG(D_NET, "Processed unknown event\n"); } } - spin_unlock(&nal_data->gm_lock); - nal_data->ctthread_flag = GMNAL_THREAD_RESET; - CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data); - return(GMNAL_STATUS_OK); + spin_unlock(&gmnalni->gmni_gm_lock); + gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET; + CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni); + + return 0; } /* * process a receive event */ -int gmnal_rx_thread(void *arg) +int +gmnal_rx_thread(void *arg) { char name[16]; - gmnal_data_t *nal_data; + gmnal_ni_t *gmnalni; void *buffer; gmnal_rxtwe_t *we = NULL; int rank; if (!arg) { - CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); + CDEBUG(D_NET, "NO gmnalni. Exiting\n"); return(-1); } - nal_data = (gmnal_data_t*)arg; - CDEBUG(D_TRACE, "nal_data is [%p]\n", arg); + gmnalni = (gmnal_ni_t*)arg; + CDEBUG(D_NET, "gmnalni is [%p]\n", arg); for (rank=0; rankrxthread_pid[rank] == current->pid) + if (gmnalni->gmni_rxthread_pid[rank] == current->pid) break; snprintf(name, sizeof(name), "gmnal_rx_%d", rank); - kportal_daemonize(name); + /* * set 1 bit for each thread started * doesn't matter which bit */ - spin_lock(&nal_data->rxthread_flag_lock); - if (nal_data->rxthread_flag) - nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1; + spin_lock(&gmnalni->gmni_rxthread_flag_lock); + if (gmnalni->gmni_rxthread_flag) + gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1; else - nal_data->rxthread_flag = 1; - CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag); - spin_unlock(&nal_data->rxthread_flag_lock); + gmnalni->gmni_rxthread_flag = 1; + CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag); + spin_unlock(&gmnalni->gmni_rxthread_flag_lock); - while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) { + while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) { CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n"); - we = gmnal_get_rxtwe(nal_data); + we = gmnal_get_rxtwe(gmnalni); if (!we) { - CDEBUG(D_INFO, "Receive thread time to exit\n"); + CDEBUG(D_NET, "Receive thread time to exit\n"); break; } buffer = we->buffer; - switch(((gmnal_msghdr_t*)buffer)->type) { + switch(((gmnal_msghdr_t*)buffer)->gmm_type) { case(GMNAL_SMALL_MESSAGE): - gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE); - break; - case(GMNAL_LARGE_MESSAGE_INIT): - gmnal_pre_receive(nal_data,we,GMNAL_LARGE_MESSAGE_INIT); - break; - case(GMNAL_LARGE_MESSAGE_ACK): - gmnal_pre_receive(nal_data, we,GMNAL_LARGE_MESSAGE_ACK); + gmnal_pre_receive(gmnalni, we, GMNAL_SMALL_MESSAGE); break; default: +#warning better handling CERROR("Unsupported message type\n"); - gmnal_rx_bad(nal_data, we, NULL); + gmnal_rx_bad(gmnalni, we); } PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); } - spin_lock(&nal_data->rxthread_flag_lock); - nal_data->rxthread_flag/=2; - CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag); - spin_unlock(&nal_data->rxthread_flag_lock); - CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data); - return(GMNAL_STATUS_OK); + spin_lock(&gmnalni->gmni_rxthread_flag_lock); + gmnalni->gmni_rxthread_flag/=2; + CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag); + spin_unlock(&gmnalni->gmni_rxthread_flag_lock); + CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni); + + return 0; } @@ -188,83 +186,54 @@ int gmnal_rx_thread(void *arg) * which hands back to gmnal_small_receive * Deal with all endian stuff here. */ -int -gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type) +void +gmnal_pre_receive(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we, int gmnal_type) { gmnal_srxd_t *srxd = NULL; void *buffer = NULL; - unsigned int snode, sport, type, length; gmnal_msghdr_t *gmnal_msghdr; ptl_hdr_t *portals_hdr; - int rc; - CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", - nal_data, we, gmnal_type); + CDEBUG(D_NET, "gmnalni [%p], we[%p] type [%d]\n", + gmnalni, we, gmnal_type); buffer = we->buffer; - snode = we->snode; - sport = we->sport; - type = we->type; - buffer = we->buffer; - length = we->length; gmnal_msghdr = (gmnal_msghdr_t*)buffer; portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t)); - CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], " + CDEBUG(D_NET, "rx_event:: Sender node [%d], Sender Port [%d], " "type [%d], length [%d], buffer [%p]\n", - snode, sport, type, length, buffer); - CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], " - "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id, - gmnal_msghdr->magic, gmnal_msghdr->type); - CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], " + we->snode, we->sport, we->type, we->length, buffer); + CDEBUG(D_NET, "gmnal_msghdr:: Sender node [%u], magic [%d], " + "gmnal_type [%d]\n", gmnal_msghdr->gmm_sender_gmid, + gmnal_msghdr->gmm_magic, gmnal_msghdr->gmm_type); + CDEBUG(D_NET, "portals_hdr:: Sender node ["LPD64"], " "dest_node ["LPD64"]\n", portals_hdr->src_nid, portals_hdr->dest_nid); /* * Get a receive descriptor for this message */ - srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer); - CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n"); + srxd = gmnal_rxbuffer_to_srxd(gmnalni, buffer); + CDEBUG(D_NET, "Back from gmnal_rxbuffer_to_srxd\n"); if (!srxd) { CERROR("Failed to get receive descriptor\n"); - /* I think passing a NULL srxd to lib_parse will crash - * gmnal_recv() */ LBUG(); - lib_parse(nal_data->libnal, portals_hdr, srxd); - return(GMNAL_STATUS_FAIL); } - /* - * no need to bother portals library with this - */ - if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) { - gmnal_large_tx_ack_received(nal_data, srxd); - return(GMNAL_STATUS_OK); - } - - srxd->nal_data = nal_data; - srxd->type = gmnal_type; - srxd->nsiov = gmnal_msghdr->niov; - srxd->gm_source_node = gmnal_msghdr->sender_node_id; + srxd->rx_gmni = gmnalni; + srxd->rx_type = gmnal_type; + srxd->rx_nsiov = gmnal_msghdr->gmm_niov; + srxd->rx_sender_gmid = gmnal_msghdr->gmm_sender_gmid; CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n", buffer+sizeof(gmnal_msghdr_t)); - /* - * control passes to lib, which calls cb_recv - * cb_recv is responsible for returning the buffer - * for future receive - */ - rc = lib_parse(nal_data->libnal, portals_hdr, srxd); - if (rc != PTL_OK) { - /* I just received garbage; return the srxd for use */ - CWARN("Returning srxd and discarding message, " - "lib_parse didn't like it.\n"); - return(gmnal_rx_bad(nal_data, we, srxd)); - } + (void)lib_parse(gmnalni->gmni_libnal, portals_hdr, srxd); + /* Ignore error; we're connectionless */ - return(GMNAL_STATUS_OK); + gmnal_rx_requeue_buffer(gmnalni, srxd); } @@ -274,19 +243,15 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type) * hang out the receive buffer again. * This implicitly returns a receive token. */ -int -gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) +void +gmnal_rx_requeue_buffer(gmnal_ni_t *gmnalni, gmnal_srxd_t *srxd) { - CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n"); - - CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data); + CDEBUG(D_NET, "requeueing srxd[%p] gmnalni[%p]\n", srxd, gmnalni); - spin_lock(&nal_data->gm_lock); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0 ); - spin_unlock(&nal_data->gm_lock); - - return(GMNAL_STATUS_OK); + spin_lock(&gmnalni->gmni_gm_lock); + gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, srxd->rx_buffer, + srxd->rx_gmsize, GM_LOW_PRIORITY, 0 ); + spin_unlock(&gmnalni->gmni_gm_lock); } @@ -294,71 +259,22 @@ gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) * Handle a bad message * A bad message is one we don't expect or can't interpret */ -int -gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd) +void +gmnal_rx_bad(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we) { - CDEBUG(D_TRACE, "Can't handle message\n"); - - if (!srxd) - srxd = gmnal_rxbuffer_to_srxd(nal_data, - we->buffer); - if (srxd) { - gmnal_rx_requeue_buffer(nal_data, srxd); - } else { + gmnal_srxd_t *srxd = gmnal_rxbuffer_to_srxd(gmnalni, + we->buffer); + if (srxd == NULL) { CERROR("Can't find a descriptor for this buffer\n"); - /* - * get rid of it ? - */ - return(GMNAL_STATUS_FAIL); + return; } - return(GMNAL_STATUS_OK); + gmnal_rx_requeue_buffer(gmnalni, srxd); } /* - * Process a small message receive. - * Get here from gmnal_receive_thread, gmnal_pre_receive - * lib_parse, cb_recv - * Put data from prewired receive buffer into users buffer(s) - * Hang out the receive buffer again for another receive - * Call lib_finalize - */ -ptl_err_t -gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie) -{ - gmnal_srxd_t *srxd = NULL; - gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data; - - - if (!private) { - CERROR("gmnal_small_rx no context\n"); - lib_finalize(libnal, private, cookie, PTL_FAIL); - return(PTL_FAIL); - } - - srxd = (gmnal_srxd_t*)private; - - /* - * let portals library know receive is complete - */ - CDEBUG(D_PORTALS, "calling lib_finalize\n"); - lib_finalize(libnal, private, cookie, PTL_OK); - /* - * return buffer so it can be used again - */ - CDEBUG(D_NET, "calling gm_provide_receive_buffer\n"); - spin_lock(&nal_data->gm_lock); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0); - spin_unlock(&nal_data->gm_lock); - - return(PTL_OK); -} - - -/* * Start a small transmit. * Use the given send token (and wired transmit buffer). * Copy headers to wired buffer and initiate gm_send from the wired buffer. @@ -366,83 +282,81 @@ gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie) */ ptl_err_t gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, + ptl_hdr_t *hdr, int type, ptl_nid_t nid, gmnal_stxd_t *stxd, int size) { - gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data; + gmnal_ni_t *gmnalni = (gmnal_ni_t*)libnal->libnal_data; void *buffer = NULL; gmnal_msghdr_t *msghdr = NULL; int tot_size = 0; - unsigned int local_nid; gm_status_t gm_status = GM_SUCCESS; - CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] " - "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] " + CDEBUG(D_NET, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] " + "hdr [%p] type [%d] nid ["LPU64"] stxd [%p] " "size [%d]\n", libnal, private, cookie, hdr, type, - global_nid, pid, stxd, size); + nid, stxd, size); - CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n", + CDEBUG(D_NET, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n", hdr->dest_nid, hdr->src_nid); - if (!nal_data) { - CERROR("no nal_data\n"); - return(PTL_FAIL); - } else { - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - } + LASSERT ((nid >> 32) == 0); + LASSERT (gmnalni != NULL); + + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid, + &stxd->tx_gmlid); + spin_unlock(&gmnalni->gmni_gm_lock); - spin_lock(&nal_data->gm_lock); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, - &local_nid); - spin_unlock(&nal_data->gm_lock); if (gm_status != GM_SUCCESS) { CERROR("Failed to obtain local id\n"); return(PTL_FAIL); } - CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid); - stxd->type = GMNAL_SMALL_MESSAGE; - stxd->cookie = cookie; + CDEBUG(D_NET, "Local Node_id is [%u][%x]\n", + stxd->tx_gmlid, stxd->tx_gmlid); + + stxd->tx_nid = nid; + stxd->tx_cookie = cookie; + stxd->tx_type = GMNAL_SMALL_MESSAGE; + stxd->tx_gm_priority = GM_LOW_PRIORITY; /* * Copy gmnal_msg_hdr and portals header to the transmit buffer * Then send the message, as the data has previously been copied in * (HP SFS 1380). */ - buffer = stxd->buffer; + buffer = stxd->tx_buffer; msghdr = (gmnal_msghdr_t*)buffer; - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_SMALL_MESSAGE; - msghdr->sender_node_id = nal_data->gm_global_nid; - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); + msghdr->gmm_magic = GMNAL_MAGIC; + msghdr->gmm_type = GMNAL_SMALL_MESSAGE; + msghdr->gmm_sender_gmid = gmnalni->gmni_global_gmid; + CDEBUG(D_NET, "processing msghdr at [%p]\n", buffer); buffer += sizeof(gmnal_msghdr_t); - CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer); + CDEBUG(D_NET, "processing portals hdr at [%p]\n", buffer); gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t)); buffer += sizeof(ptl_hdr_t); - CDEBUG(D_INFO, "sending\n"); + CDEBUG(D_NET, "sending\n"); tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t); - stxd->msg_size = tot_size; - + stxd->tx_msg_size = tot_size; CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] " - "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] " - "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, - stxd->msg_size, global_nid, local_nid, stxd); - - spin_lock(&nal_data->gm_lock); - stxd->gm_priority = GM_LOW_PRIORITY; - stxd->gm_target_node = local_nid; - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - GM_LOW_PRIORITY, local_nid, + "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] " + "stxd [%p]\n", gmnalni->gmni_port, stxd->tx_buffer, + stxd->tx_gm_size, stxd->tx_msg_size, nid, stxd->tx_gmlid, + stxd); + + spin_lock(&gmnalni->gmni_gm_lock); + gm_send_to_peer_with_callback(gmnalni->gmni_port, stxd->tx_buffer, + stxd->tx_gm_size, stxd->tx_msg_size, + stxd->tx_gm_priority, stxd->tx_gmlid, gmnal_small_tx_callback, (void*)stxd); - spin_unlock(&nal_data->gm_lock); - CDEBUG(D_INFO, "done\n"); + spin_unlock(&gmnalni->gmni_gm_lock); + CDEBUG(D_NET, "done\n"); return(PTL_OK); } @@ -459,29 +373,17 @@ void gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) { gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - lib_msg_t *cookie = stxd->cookie; - gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data; - lib_nal_t *libnal = nal_data->libnal; - unsigned gnid = 0; - gm_status_t gm_status = 0; + lib_msg_t *cookie = stxd->tx_cookie; + gmnal_ni_t *gmnalni = stxd->tx_gmni; + lib_nal_t *libnal = gmnalni->gmni_libnal; if (!stxd) { - CDEBUG(D_TRACE, "send completion event for unknown stxd\n"); + CDEBUG(D_NET, "send completion event for unknown stxd\n"); return; } - if (status != GM_SUCCESS) { - spin_lock(&nal_data->gm_lock); - gm_status = gm_node_id_to_global_id(nal_data->gm_port, - stxd->gm_target_node,&gnid); - spin_unlock(&nal_data->gm_lock); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n", - gm_status); - gnid = 0; - } - CERROR("Result of send stxd [%p] is [%s] to [%u]\n", - stxd, gmnal_gm_error(status), gnid); - } + if (status != GM_SUCCESS) + CERROR("Result of send stxd [%p] is [%s] to ["LPU64"]\n", + stxd, gmnal_gm_error(status), stxd->tx_nid); switch(status) { case(GM_SUCCESS): @@ -494,28 +396,28 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) * do a resend on the dropped ones */ CERROR("send stxd [%p] dropped, resending\n", context); - spin_lock(&nal_data->gm_lock); - gm_send_to_peer_with_callback(nal_data->gm_port, - stxd->buffer, - stxd->gm_size, - stxd->msg_size, - stxd->gm_priority, - stxd->gm_target_node, + spin_lock(&gmnalni->gmni_gm_lock); + gm_send_to_peer_with_callback(gmnalni->gmni_port, + stxd->tx_buffer, + stxd->tx_gm_size, + stxd->tx_msg_size, + stxd->tx_gm_priority, + stxd->tx_gmlid, gmnal_small_tx_callback, context); - spin_unlock(&nal_data->gm_lock); + spin_unlock(&gmnalni->gmni_gm_lock); return; case(GM_TIMED_OUT): case(GM_SEND_TIMED_OUT): /* * drop these ones */ - CDEBUG(D_INFO, "calling gm_drop_sends\n"); - spin_lock(&nal_data->gm_lock); - gm_drop_sends(nal_data->gm_port, stxd->gm_priority, - stxd->gm_target_node, gm_port_id, + CDEBUG(D_NET, "calling gm_drop_sends\n"); + spin_lock(&gmnalni->gmni_gm_lock); + gm_drop_sends(gmnalni->gmni_port, stxd->tx_gm_priority, + stxd->tx_gmlid, gm_port_id, gmnal_drop_sends_callback, context); - spin_unlock(&nal_data->gm_lock); + spin_unlock(&gmnalni->gmni_gm_lock); return; @@ -566,29 +468,14 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) case(GM_FIRMWARE_NOT_RUNNING): case(GM_YP_NO_MATCH): default: - gm_resume_sending(nal_data->gm_port, stxd->gm_priority, - stxd->gm_target_node, gm_port_id, - gmnal_resume_sending_callback, context); + gm_resume_sending(gmnalni->gmni_port, stxd->tx_gm_priority, + stxd->tx_gmlid, gm_port_id, + gmnal_resume_sending_callback, context); return; } - /* - * TO DO - * If this is a large message init, - * we're not finished with the data yet, - * so can't call lib_finalise. - * However, we're also holding on to a - * stxd here (to keep track of the source - * iovec only). Should use another structure - * to keep track of iovec and return stxd to - * free list earlier. - */ - if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) { - CDEBUG(D_INFO, "large transmit done\n"); - return; - } - gmnal_return_stxd(nal_data, stxd); + gmnal_return_stxd(gmnalni, stxd); lib_finalize(libnal, stxd, cookie, PTL_OK); return; } @@ -601,10 +488,11 @@ void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context, gm_status_t status) { gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data; - CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context); - gmnal_return_stxd(stxd->nal_data, stxd); - lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL); + gmnal_ni_t *gmnalni = stxd->tx_gmni; + + CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context); + gmnal_return_stxd(gmnalni, stxd); + lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL); return; } @@ -613,735 +501,28 @@ void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, gm_status_t status) { gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_data_t *nal_data = stxd->nal_data; + gmnal_ni_t *gmnalni = stxd->tx_gmni; - CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context); + CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context); if (status == GM_SUCCESS) { - spin_lock(&nal_data->gm_lock); - gm_send_to_peer_with_callback(gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - stxd->gm_priority, - stxd->gm_target_node, + spin_lock(&gmnalni->gmni_gm_lock); + gm_send_to_peer_with_callback(gm_port, stxd->tx_buffer, + stxd->tx_gm_size, + stxd->tx_msg_size, + stxd->tx_gm_priority, + stxd->tx_gmlid, gmnal_small_tx_callback, context); - spin_unlock(&nal_data->gm_lock); + spin_unlock(&gmnalni->gmni_gm_lock); } else { CERROR("send_to_peer status for stxd [%p] is " "[%d][%s]\n", stxd, status, gmnal_gm_error(status)); /* Recycle the stxd */ - gmnal_return_stxd(nal_data, stxd); - lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL); - } - - return; -} - - -/* - * Begine a large transmit. - * Do a gm_register of the memory pointed to by the iovec - * and send details to the receiver. The receiver does a gm_get - * to pull the data and sends and ack when finished. Upon receipt of - * this ack, deregister the memory. Only 1 send token is required here. - */ -int -gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, - unsigned int niov, struct iovec *iov, size_t offset, int size) -{ - - gmnal_data_t *nal_data; - gmnal_stxd_t *stxd = NULL; - void *buffer = NULL; - gmnal_msghdr_t *msghdr = NULL; - unsigned int local_nid; - int mlen = 0; /* the size of the init message data */ - struct iovec *iov_dup = NULL; - gm_status_t gm_status; - int niov_dup; - - - CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] " - "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], " - "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type, - global_nid, pid, niov, iov, size); - - if (libnal) - nal_data = (gmnal_data_t*)libnal->libnal_data; - else { - CERROR("no libnal.\n"); - return(GMNAL_STATUS_FAIL); - } - - - /* - * Get stxd and buffer. Put local address of data in buffer, - * send local addresses to target, - * wait for the target node to suck the data over. - * The stxd is used to ren - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); - - stxd->type = GMNAL_LARGE_MESSAGE_INIT; - stxd->cookie = cookie; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then copy the iov in - */ - buffer = stxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); - - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_LARGE_MESSAGE_INIT; - msghdr->sender_node_id = nal_data->gm_global_nid; - msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd; - msghdr->niov = niov ; - buffer += sizeof(gmnal_msghdr_t); - mlen = sizeof(gmnal_msghdr_t); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - - CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer); - - gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t)); - buffer += sizeof(ptl_hdr_t); - mlen += sizeof(ptl_hdr_t); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - while (offset >= iov->iov_len) { - offset -= iov->iov_len; - niov--; - iov++; - } - - LASSERT(offset >= 0); - /* - * Store the iovs in the stxd for we can get - * them later if we need them - */ - stxd->iov[0].iov_base = iov->iov_base + offset; - stxd->iov[0].iov_len = iov->iov_len - offset; - CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov); - if (niov > 1) - gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec)); - stxd->niov = niov; - - /* - * copy the iov to the buffer so target knows - * where to get the data from - */ - CDEBUG(D_INFO, "processing iov to [%p]\n", buffer); - gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec)); - mlen += stxd->niov*(sizeof(struct iovec)); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - /* - * register the memory so the NIC can get hold of the data - * This is a slow process. it'd be good to overlap it - * with something else. - */ - iov = stxd->iov; - iov_dup = iov; - niov_dup = niov; - while(niov--) { - CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", - iov->iov_base, iov->iov_len); - spin_lock(&nal_data->gm_lock); - gm_status = gm_register_memory(nal_data->gm_port, - iov->iov_base, iov->iov_len); - if (gm_status != GM_SUCCESS) { - spin_unlock(&nal_data->gm_lock); - CERROR("gm_register_memory returns [%d][%s] " - "for memory [%p] len ["LPSZ"]\n", - gm_status, gmnal_gm_error(gm_status), - iov->iov_base, iov->iov_len); - spin_lock(&nal_data->gm_lock); - while (iov_dup != iov) { - gm_deregister_memory(nal_data->gm_port, - iov_dup->iov_base, - iov_dup->iov_len); - iov_dup++; - } - spin_unlock(&nal_data->gm_lock); - gmnal_return_stxd(nal_data, stxd); - return(PTL_FAIL); - } - - spin_unlock(&nal_data->gm_lock); - iov++; - } - - /* - * Send the init message to the target - */ - CDEBUG(D_INFO, "sending mlen [%d]\n", mlen); - spin_lock(&nal_data->gm_lock); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, - &local_nid); - if (gm_status != GM_SUCCESS) { - spin_unlock(&nal_data->gm_lock); - CERROR("Failed to obtain local id\n"); - gmnal_return_stxd(nal_data, stxd); - /* TO DO deregister memory on failure */ - return(GMNAL_STATUS_FAIL); - } - CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid); - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, mlen, GM_LOW_PRIORITY, - local_nid, gmnal_large_tx_callback, - (void*)stxd); - spin_unlock(&nal_data->gm_lock); - - CDEBUG(D_INFO, "done\n"); - - return(PTL_OK); -} - -/* - * Callback function indicates that send of buffer with - * large message iovec has completed (or failed). - */ -void -gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) -{ - gmnal_small_tx_callback(gm_port, context, status); - -} - - - -/* - * Have received a buffer that contains an iovec of the sender. - * Do a gm_register_memory of the receivers buffer and then do a get - * data from the sender. - */ -int -gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - unsigned int nriov, struct iovec *riov, size_t offset, - size_t mlen, size_t rlen) -{ - gmnal_data_t *nal_data = libnal->libnal_data; - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - void *buffer = NULL; - struct iovec *riov_dup; - int nriov_dup; - gmnal_msghdr_t *msghdr = NULL; - gm_status_t gm_status; - - CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], " - "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n", - libnal, private, cookie, nriov, riov, mlen, rlen); - - if (!srxd) { - CERROR("gmnal_large_rx no context\n"); - lib_finalize(libnal, private, cookie, PTL_FAIL); - return(PTL_FAIL); - } - - buffer = srxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); - - /* - * Store the senders stxd address in the srxd for this message - * The gmnal_large_message_ack needs it to notify the sender - * the pull of data is complete - */ - srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr; - - /* - * Register the receivers memory - * get the data, - * tell the sender that we got the data - * then tell the receiver we got the data - * TO DO - * If the iovecs match, could interleave - * gm_registers and gm_gets for each element - */ - while (offset >= riov->iov_len) { - offset -= riov->iov_len; - riov++; - nriov--; - } - LASSERT (nriov >= 0); - LASSERT (offset >= 0); - /* - * do this so the final gm_get callback can deregister the memory - */ - PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec))); - - srxd->riov[0].iov_base = riov->iov_base + offset; - srxd->riov[0].iov_len = riov->iov_len - offset; - if (nriov > 1) - gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec))); - srxd->nriov = nriov; - - riov = srxd->riov; - nriov_dup = nriov; - riov_dup = riov; - while(nriov--) { - CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", - riov->iov_base, riov->iov_len); - spin_lock(&nal_data->gm_lock); - gm_status = gm_register_memory(nal_data->gm_port, - riov->iov_base, riov->iov_len); - if (gm_status != GM_SUCCESS) { - spin_unlock(&nal_data->gm_lock); - CERROR("gm_register_memory returns [%d][%s] " - "for memory [%p] len ["LPSZ"]\n", - gm_status, gmnal_gm_error(gm_status), - riov->iov_base, riov->iov_len); - spin_lock(&nal_data->gm_lock); - while (riov_dup != riov) { - gm_deregister_memory(nal_data->gm_port, - riov_dup->iov_base, - riov_dup->iov_len); - riov_dup++; - } - spin_lock(&nal_data->gm_lock); - /* - * give back srxd and buffer. Send NACK to sender - */ - PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec))); - return(PTL_FAIL); - } - spin_unlock(&nal_data->gm_lock); - riov++; + gmnal_return_stxd(gmnalni, stxd); + lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL); } - /* - * now do gm_get to get the data - */ - srxd->cookie = cookie; - if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer, - nriov_dup, riov_dup) != GMNAL_STATUS_OK) { - CERROR("can't get the data"); - } - - CDEBUG(D_INFO, "lgmanl_large_rx done\n"); - - return(PTL_OK); -} - - -/* - * Perform a number of remote gets as part of receiving - * a large message. - * The final one to complete (i.e. the last callback to get called) - * tidies up. - * gm_get requires a send token. - */ -int -gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, - int nriov, struct iovec *riov) -{ - - int ncalls = 0; - - CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], " - "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov); - - - ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov); - if (ncalls < 0) { - CERROR("there's something wrong with the iovecs\n"); - return(GMNAL_STATUS_FAIL); - } - CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls); - spin_lock_init(&srxd->callback_lock); - srxd->ncallbacks = ncalls; - srxd->callback_status = 0; - - ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov); - if (ncalls < 0) { - CERROR("there's something wrong with the iovecs\n"); - return(GMNAL_STATUS_FAIL); - } - - return(GMNAL_STATUS_OK); - -} - - -/* - * pull data from source node (source iovec) to a local iovec. - * The iovecs may not match which adds the complications below. - * Count the number of gm_gets that will be required so the callbacks - * can determine who is the last one. - */ -int -gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, - struct iovec *siov, int nriov, struct iovec *riov) -{ - - int ncalls = 0; - int slen = siov->iov_len, rlen = riov->iov_len; - char *sbuf = siov->iov_base, *rbuf = riov->iov_base; - unsigned long sbuf_long; - gm_remote_ptr_t remote_ptr = 0; - unsigned int source_node; - gmnal_ltxd_t *ltxd = NULL; - gmnal_data_t *nal_data = srxd->nal_data; - - CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data); - if (do_copy) { - if (!nal_data) { - CERROR("Bad args No nal_data\n"); - return(GMNAL_STATUS_FAIL); - } - spin_lock(&nal_data->gm_lock); - if (gm_global_id_to_node_id(nal_data->gm_port, - srxd->gm_source_node, - &source_node) != GM_SUCCESS) { - - CERROR("cannot resolve global_id [%u] " - "to local node_id\n", srxd->gm_source_node); - spin_unlock(&nal_data->gm_lock); - return(GMNAL_STATUS_FAIL); - } - spin_unlock(&nal_data->gm_lock); - /* - * We need a send token to use gm_get - * getting an stxd gets us a send token. - * the stxd is used as the context to the - * callback function (so stxd can be returned). - * Set pointer in stxd to srxd so callback count in srxd - * can be decremented to find last callback to complete - */ - CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n", - srxd->gm_source_node, source_node); - } - - do { - CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n", - sbuf, slen, rbuf, rlen); - if (slen > rlen) { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "slen>rlen\n"); - ltxd = gmnal_get_ltxd(nal_data); - ltxd->srxd = srxd; - spin_lock(&nal_data->gm_lock); - /* - * funny business to get rid - * of compiler warning - */ - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - rlen, GM_LOW_PRIORITY, source_node, - gm_port_id, - gmnal_remote_get_callback, ltxd); - spin_unlock(&nal_data->gm_lock); - } - /* - * at the end of 1 iov element - */ - sbuf+=rlen; - slen-=rlen; - riov++; - nriov--; - rbuf = riov->iov_base; - rlen = riov->iov_len; - } else if (rlen > slen) { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "slensrxd = srxd; - spin_lock(&nal_data->gm_lock); - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - slen, GM_LOW_PRIORITY, source_node, - gm_port_id, - gmnal_remote_get_callback, ltxd); - spin_unlock(&nal_data->gm_lock); - } - /* - * at end of siov element - */ - rbuf+=slen; - rlen-=slen; - siov++; - sbuf = siov->iov_base; - slen = siov->iov_len; - } else { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "rlen=slen\n"); - ltxd = gmnal_get_ltxd(nal_data); - ltxd->srxd = srxd; - spin_lock(&nal_data->gm_lock); - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - rlen, GM_LOW_PRIORITY, source_node, - gm_port_id, - gmnal_remote_get_callback, ltxd); - spin_unlock(&nal_data->gm_lock); - } - /* - * at end of siov and riov element - */ - siov++; - sbuf = siov->iov_base; - slen = siov->iov_len; - riov++; - nriov--; - rbuf = riov->iov_base; - rlen = riov->iov_len; - } - - } while (nriov); - return(ncalls); -} - - -/* - * The callback function that is invoked after each gm_get call completes. - * Multiple callbacks may be invoked for 1 transaction, only the final - * callback has work to do. - */ -void -gmnal_remote_get_callback(gm_port_t *gm_port, void *context, - gm_status_t status) -{ - - gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context; - gmnal_srxd_t *srxd = ltxd->srxd; - lib_nal_t *libnal = srxd->nal_data->libnal; - int lastone; - struct iovec *riov; - int nriov; - gmnal_data_t *nal_data; - - CDEBUG(D_TRACE, "called for context [%p]\n", context); - - if (status != GM_SUCCESS) { - CERROR("reports error [%d/%s]\n",status,gmnal_gm_error(status)); - } - - spin_lock(&srxd->callback_lock); - srxd->ncallbacks--; - srxd->callback_status |= status; - lastone = srxd->ncallbacks?0:1; - spin_unlock(&srxd->callback_lock); - nal_data = srxd->nal_data; - - /* - * everyone returns a send token - */ - gmnal_return_ltxd(nal_data, ltxd); - - if (!lastone) { - CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd); - return; - } - - /* - * Let our client application proceed - */ - CERROR("final callback context[%p]\n", srxd); - lib_finalize(libnal, srxd, srxd->cookie, PTL_OK); - - /* - * send an ack to the sender to let him know we got the data - */ - gmnal_large_tx_ack(nal_data, srxd); - - /* - * Unregister the memory that was used - * This is a very slow business (slower then register) - */ - nriov = srxd->nriov; - riov = srxd->riov; - spin_lock(&nal_data->gm_lock); - while (nriov--) { - CERROR("deregister memory [%p]\n", riov->iov_base); - if (gm_deregister_memory(srxd->nal_data->gm_port, - riov->iov_base, riov->iov_len)) { - CERROR("failed to deregister memory [%p]\n", - riov->iov_base); - } - riov++; - } - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov); - - /* - * repost the receive buffer (return receive token) - */ - spin_lock(&nal_data->gm_lock); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0); - spin_unlock(&nal_data->gm_lock); - return; } -/* - * Called on target node. - * After pulling data from a source node - * send an ack message to indicate the large transmit is complete. - */ -void -gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) -{ - - gmnal_stxd_t *stxd; - gmnal_msghdr_t *msghdr; - void *buffer = NULL; - unsigned int local_nid; - gm_status_t gm_status = GM_SUCCESS; - - CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd, - srxd->gm_source_node); - - spin_lock(&nal_data->gm_lock); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, - srxd->gm_source_node, &local_nid); - spin_unlock(&nal_data->gm_lock); - if (gm_status != GM_SUCCESS) { - CERROR("Failed to obtain local id\n"); - return; - } - CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid); - - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd); - - stxd->nal_data = nal_data; - stxd->type = GMNAL_LARGE_MESSAGE_ACK; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then copy the data in - */ - buffer = stxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - /* - * Add in the address of the original stxd from the sender node - * so it knows which thread to notify. - */ - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_LARGE_MESSAGE_ACK; - msghdr->sender_node_id = nal_data->gm_global_nid; - msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd; - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); - - CDEBUG(D_INFO, "sending\n"); - stxd->msg_size= sizeof(gmnal_msghdr_t); - - - CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] " - "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] " - "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, - stxd->msg_size, srxd->gm_source_node, local_nid, stxd); - spin_lock(&nal_data->gm_lock); - stxd->gm_priority = GM_LOW_PRIORITY; - stxd->gm_target_node = local_nid; - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - GM_LOW_PRIORITY, local_nid, - gmnal_large_tx_ack_callback, - (void*)stxd); - - spin_unlock(&nal_data->gm_lock); - CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n"); - - return; -} - - -/* - * A callback to indicate the small transmit operation is compete - * Check for errors and try to deal with them. - * Call lib_finalise to inform the client application that the - * send is complete and the memory can be reused. - * Return the stxd when finished with it (returns a send token) - */ -void -gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context, - gm_status_t status) -{ - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data; - - if (!stxd) { - CERROR("send completion event for unknown stxd\n"); - return; - } - CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n", - stxd, status); - gmnal_return_stxd(stxd->nal_data, stxd); - - spin_unlock(&nal_data->gm_lock); - return; -} - -/* - * Indicates the large transmit operation is compete. - * Called on transmit side (means data has been pulled by receiver - * or failed). - * Call lib_finalise to inform the client application that the send - * is complete, deregister the memory and return the stxd. - * Finally, report the rx buffer that the ack message was delivered in. - */ -void -gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) -{ - lib_nal_t *libnal = nal_data->libnal; - gmnal_stxd_t *stxd = NULL; - gmnal_msghdr_t *msghdr = NULL; - void *buffer = NULL; - struct iovec *iov; - - - CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer); - - buffer = srxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr; - - CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd); - - lib_finalize(libnal, stxd, stxd->cookie, PTL_OK); - - /* - * extract the iovec from the stxd, deregister the memory. - * free the space used to store the iovec - */ - iov = stxd->iov; - while(stxd->niov--) { - CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n", - iov->iov_base, iov->iov_len); - spin_lock(&nal_data->gm_lock); - gm_deregister_memory(nal_data->gm_port, iov->iov_base, - iov->iov_len); - spin_unlock(&nal_data->gm_lock); - iov++; - } - - /* - * return the send token - * TO DO It is bad to hold onto the send token so long? - */ - gmnal_return_stxd(nal_data, stxd); - - - /* - * requeue the receive buffer - */ - gmnal_rx_requeue_buffer(nal_data, srxd); - - - return; -} diff --git a/lnet/klnds/gmlnd/gmlnd_module.c b/lnet/klnds/gmlnd/gmlnd_module.c index 9fa2ea5..7a7a907 100644 --- a/lnet/klnds/gmlnd/gmlnd_module.c +++ b/lnet/klnds/gmlnd/gmlnd_module.c @@ -22,7 +22,6 @@ #include "gmnal.h" -int gmnal_small_msg_size = sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t) + PTL_MTU + 928; /* * -1 indicates default value. * This is 1 thread per cpu @@ -35,16 +34,16 @@ int gm_port_id = 4; int gmnal_cmd(struct portals_cfg *pcfg, void *private) { - gmnal_data_t *nal_data = NULL; + gmnal_ni_t *gmnalni = NULL; char *name = NULL; int nid = -2; - int gnid; + int gmid; gm_status_t gm_status; CDEBUG(D_TRACE, "gmnal_cmd [%d] private [%p]\n", pcfg->pcfg_command, private); - nal_data = (gmnal_data_t*)private; + gmnalni = (gmnal_ni_t*)private; switch(pcfg->pcfg_command) { /* * just reuse already defined GET_NID. Should define GMNAL version @@ -54,31 +53,31 @@ gmnal_cmd(struct portals_cfg *pcfg, void *private) PORTAL_ALLOC(name, pcfg->pcfg_plen1); copy_from_user(name, PCFG_PBUF(pcfg, 1), pcfg->pcfg_plen1); - spin_lock(&nal_data->gm_lock); - //nid = gm_host_name_to_node_id(nal_data->gm_port, name); - gm_status = gm_host_name_to_node_id_ex(nal_data->gm_port, 0, + spin_lock(&gmnalni->gmni_gm_lock); + //nid = gm_host_name_to_node_id(gmnalni->gmni_port, name); + gm_status = gm_host_name_to_node_id_ex(gmnalni->gmni_port, 0, name, &nid); - spin_unlock(&nal_data->gm_lock); + spin_unlock(&gmnalni->gmni_gm_lock); if (gm_status != GM_SUCCESS) { - CDEBUG(D_INFO, "gm_host_name_to_node_id_ex(...host %s) " + CDEBUG(D_NET, "gm_host_name_to_node_id_ex(...host %s) " "failed[%d]\n", name, gm_status); return (-1); } else - CDEBUG(D_INFO, "Local node %s id is [%d]\n", name, nid); - spin_lock(&nal_data->gm_lock); - gm_status = gm_node_id_to_global_id(nal_data->gm_port, - nid, &gnid); - spin_unlock(&nal_data->gm_lock); + CDEBUG(D_NET, "Local node %s id is [%d]\n", name, nid); + spin_lock(&gmnalni->gmni_gm_lock); + gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, + nid, &gmid); + spin_unlock(&gmnalni->gmni_gm_lock); if (gm_status != GM_SUCCESS) { - CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n", + CDEBUG(D_NET, "gm_node_id_to_global_id failed[%d]\n", gm_status); return(-1); } - CDEBUG(D_INFO, "Global node is is [%u][%x]\n", gnid, gnid); - copy_to_user(PCFG_PBUF(pcfg, 2), &gnid, pcfg->pcfg_plen2); + CDEBUG(D_NET, "Global node is is [%u][%x]\n", gmid, gmid); + copy_to_user(PCFG_PBUF(pcfg, 2), &gmid, pcfg->pcfg_plen2); break; default: - CDEBUG(D_INFO, "gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command); + CDEBUG(D_NET, "gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command); pcfg->pcfg_nid2 = -1; } @@ -94,16 +93,16 @@ gmnal_load(void) CDEBUG(D_TRACE, "This is the gmnal module initialisation routine\n"); - CDEBUG(D_INFO, "Calling gmnal_init\n"); + CDEBUG(D_NET, "Calling gmnal_init\n"); status = gmnal_init(); if (status == PTL_OK) { - CDEBUG(D_INFO, "Portals GMNAL initialised ok\n"); + CDEBUG(D_NET, "Portals GMNAL initialised ok\n"); } else { - CDEBUG(D_INFO, "Portals GMNAL Failed to initialise\n"); + CDEBUG(D_NET, "Portals GMNAL Failed to initialise\n"); return(-ENODEV); } - CDEBUG(D_INFO, "This is the end of the gmnal init routine"); + CDEBUG(D_NET, "This is the end of the gmnal init routine"); return(0); } @@ -121,7 +120,6 @@ module_init(gmnal_load); module_exit(gmnal_unload); -MODULE_PARM(gmnal_small_msg_size, "i"); MODULE_PARM(num_rx_threads, "i"); MODULE_PARM(num_stxds, "i"); MODULE_PARM(gm_port_id, "i"); diff --git a/lnet/klnds/gmlnd/gmlnd_utils.c b/lnet/klnds/gmlnd/gmlnd_utils.c index 1cbb728..aee16fb 100644 --- a/lnet/klnds/gmlnd/gmlnd_utils.c +++ b/lnet/klnds/gmlnd/gmlnd_utils.c @@ -28,11 +28,11 @@ * Am I one of the gmnal rxthreads ? */ int -gmnal_is_rxthread(gmnal_data_t *nal_data) +gmnal_is_rxthread(gmnal_ni_t *gmnalni) { int i; for (i=0; irxthread_pid[i] == current->pid) + if (gmnalni->gmni_rxthread_pid[i] == current->pid) return(1); } return(0); @@ -51,23 +51,24 @@ gmnal_is_rxthread(gmnal_data_t *nal_data) * used to do gm_gets in gmnal_copyiov */ int -gmnal_alloc_txd(gmnal_data_t *nal_data) +gmnal_alloc_txd(gmnal_ni_t *gmnalni) { - int ntx= 0, nstx= 0, nrxt_stx= 0, - nltx= 0, i = 0; - gmnal_stxd_t *txd = NULL; - gmnal_ltxd_t *ltxd = NULL; - void *txbuffer = NULL; + int ntx; + int nstx; + int nrxt_stx; + int i; + gmnal_stxd_t *txd; + void *txbuffer; CDEBUG(D_TRACE, "gmnal_alloc_small tx\n"); - spin_lock(&nal_data->gm_lock); + spin_lock(&gmnalni->gmni_gm_lock); /* * total number of transmit tokens */ - ntx = gm_num_send_tokens(nal_data->gm_port); - spin_unlock(&nal_data->gm_lock); - CDEBUG(D_INFO, "total number of send tokens available is [%d]\n", ntx); + ntx = gm_num_send_tokens(gmnalni->gmni_port); + spin_unlock(&gmnalni->gmni_gm_lock); + CDEBUG(D_NET, "total number of send tokens available is [%d]\n", ntx); /* * allocate a number for small sends @@ -75,144 +76,121 @@ gmnal_alloc_txd(gmnal_data_t *nal_data) */ nstx = num_stxds; /* - * give that number plus 1 to the receive threads + * give the rest to the receive threads */ - nrxt_stx = nstx + 1; - - /* - * give the rest for gm_gets - */ - nltx = ntx - (nrxt_stx + nstx); - if (nltx < 1) { - CERROR("No tokens available for large messages\n"); - return(GMNAL_STATUS_FAIL); - } + nrxt_stx = num_stxds + 1; - - /* - * A semaphore is initialised with the - * number of transmit tokens available. - * To get a stxd, acquire the token semaphore. - * this decrements the available token count - * (if no tokens you block here, someone returning a - * stxd will release the semaphore and wake you) - * When token is obtained acquire the spinlock - * to manipulate the list - */ - sema_init(&nal_data->stxd_token, nstx); - spin_lock_init(&nal_data->stxd_lock); - sema_init(&nal_data->rxt_stxd_token, nrxt_stx); - spin_lock_init(&nal_data->rxt_stxd_lock); - sema_init(&nal_data->ltxd_token, nltx); - spin_lock_init(&nal_data->ltxd_lock); + if (nstx + nrxt_stx > ntx) { + CERROR ("Asked for %d + %d tx credits, but only %d available\n", + nstx, nrxt_stx, ntx); + return -ENOMEM; + } + + /* A semaphore is initialised with the number of transmit tokens + * available. To get a stxd, acquire the token semaphore. this + * decrements the available token count (if no tokens you block here, + * someone returning a stxd will release the semaphore and wake you) + * When token is obtained acquire the spinlock to manipulate the + * list */ + sema_init(&gmnalni->gmni_stxd_token, nstx); + spin_lock_init(&gmnalni->gmni_stxd_lock); + + sema_init(&gmnalni->gmni_rxt_stxd_token, nrxt_stx); + spin_lock_init(&gmnalni->gmni_rxt_stxd_lock); for (i=0; i<=nstx; i++) { - PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t)); - if (!txd) { + PORTAL_ALLOC(txd, sizeof(*txd)); + if (txd == NULL) { CERROR("Failed to malloc txd [%d]\n", i); - return(GMNAL_STATUS_NOMEM); + return -ENOMEM; } - spin_lock(&nal_data->gm_lock); - txbuffer = gm_dma_malloc(nal_data->gm_port, - nal_data->small_msg_size); - spin_unlock(&nal_data->gm_lock); - if (!txbuffer) { + spin_lock(&gmnalni->gmni_gm_lock); + txbuffer = gm_dma_malloc(gmnalni->gmni_port, + gmnalni->gmni_small_msg_size); + spin_unlock(&gmnalni->gmni_gm_lock); + if (txbuffer == NULL) { CERROR("Failed to gm_dma_malloc txbuffer [%d], " - "size [%d]\n", i, nal_data->small_msg_size); - PORTAL_FREE(txd, sizeof(gmnal_stxd_t)); - return(GMNAL_STATUS_FAIL); + "size [%d]\n", i, gmnalni->gmni_small_msg_size); + PORTAL_FREE(txd, sizeof(*txd)); + return -ENOMEM; } - txd->buffer = txbuffer; - txd->buffer_size = nal_data->small_msg_size; - txd->gm_size = gm_min_size_for_length(txd->buffer_size); - txd->nal_data = (struct _gmnal_data_t*)nal_data; - txd->rxt = 0; - - txd->next = nal_data->stxd; - nal_data->stxd = txd; - CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->buffer, txd->buffer_size); + txd->tx_buffer = txbuffer; + txd->tx_buffer_size = gmnalni->gmni_small_msg_size; + txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size); + txd->tx_gmni = gmnalni; + txd->tx_rxt = 0; + + txd->tx_next = gmnalni->gmni_stxd; + gmnalni->gmni_stxd = txd; + CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], " + "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size); } for (i=0; i<=nrxt_stx; i++) { PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t)); if (!txd) { CERROR("Failed to malloc txd [%d]\n", i); - return(GMNAL_STATUS_NOMEM); + return -ENOMEM; } - spin_lock(&nal_data->gm_lock); - txbuffer = gm_dma_malloc(nal_data->gm_port, - nal_data->small_msg_size); - spin_unlock(&nal_data->gm_lock); + spin_lock(&gmnalni->gmni_gm_lock); + txbuffer = gm_dma_malloc(gmnalni->gmni_port, + gmnalni->gmni_small_msg_size); + spin_unlock(&gmnalni->gmni_gm_lock); if (!txbuffer) { CERROR("Failed to gm_dma_malloc txbuffer [%d]," - " size [%d]\n",i, nal_data->small_msg_size); + " size [%d]\n",i, gmnalni->gmni_small_msg_size); PORTAL_FREE(txd, sizeof(gmnal_stxd_t)); - return(GMNAL_STATUS_FAIL); + return -ENOMEM; } - txd->buffer = txbuffer; - txd->buffer_size = nal_data->small_msg_size; - txd->gm_size = gm_min_size_for_length(txd->buffer_size); - txd->nal_data = (struct _gmnal_data_t*)nal_data; - txd->rxt = 1; - - txd->next = nal_data->rxt_stxd; - nal_data->rxt_stxd = txd; - CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->buffer, txd->buffer_size); + txd->tx_buffer = txbuffer; + txd->tx_buffer_size = gmnalni->gmni_small_msg_size; + txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size); + txd->tx_gmni = gmnalni; + txd->tx_rxt = 1; + + txd->tx_next = gmnalni->gmni_rxt_stxd; + gmnalni->gmni_rxt_stxd = txd; + CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], " + "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size); } - /* - * string together large tokens - */ - for (i=0; i<=nltx ; i++) { - PORTAL_ALLOC(ltxd, sizeof(gmnal_ltxd_t)); - ltxd->next = nal_data->ltxd; - nal_data->ltxd = ltxd; - } - return(GMNAL_STATUS_OK); + return 0; } /* Free the list of wired and gm_registered small tx buffers and * the tx descriptors that go along with them. */ void -gmnal_free_txd(gmnal_data_t *nal_data) +gmnal_free_txd(gmnal_ni_t *gmnalni) { - gmnal_stxd_t *txd = nal_data->stxd, *_txd = NULL; - gmnal_ltxd_t *ltxd = NULL, *_ltxd = NULL; + gmnal_stxd_t *txd; + gmnal_stxd_t *_txd; CDEBUG(D_TRACE, "gmnal_free_small tx\n"); - while(txd) { - CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->buffer, txd->buffer_size); + txd = gmnalni->gmni_stxd; + while(txd != NULL) { + CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], " + "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size); _txd = txd; - txd = txd->next; - spin_lock(&nal_data->gm_lock); - gm_dma_free(nal_data->gm_port, _txd->buffer); - spin_unlock(&nal_data->gm_lock); + txd = txd->tx_next; + spin_lock(&gmnalni->gmni_gm_lock); + gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer); + spin_unlock(&gmnalni->gmni_gm_lock); PORTAL_FREE(_txd, sizeof(gmnal_stxd_t)); } - txd = nal_data->rxt_stxd; + + txd = gmnalni->gmni_rxt_stxd; while(txd) { - CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->buffer, txd->buffer_size); + CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], " + "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size); _txd = txd; - txd = txd->next; - spin_lock(&nal_data->gm_lock); - gm_dma_free(nal_data->gm_port, _txd->buffer); - spin_unlock(&nal_data->gm_lock); + txd = txd->tx_next; + spin_lock(&gmnalni->gmni_gm_lock); + gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer); + spin_unlock(&gmnalni->gmni_gm_lock); PORTAL_FREE(_txd, sizeof(gmnal_stxd_t)); } - ltxd = nal_data->ltxd; - while(txd) { - _ltxd = ltxd; - ltxd = ltxd->next; - PORTAL_FREE(_ltxd, sizeof(gmnal_ltxd_t)); - } - - return; } @@ -222,45 +200,45 @@ gmnal_free_txd(gmnal_data_t *nal_data) * This implicitly gets us a send token also. */ gmnal_stxd_t * -gmnal_get_stxd(gmnal_data_t *nal_data, int block) +gmnal_get_stxd(gmnal_ni_t *gmnalni, int block) { gmnal_stxd_t *txd = NULL; pid_t pid = current->pid; - CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", - nal_data, block, pid); - - if (gmnal_is_rxthread(nal_data)) { - CDEBUG(D_INFO, "RXTHREAD Attempting to get token\n"); - down(&nal_data->rxt_stxd_token); - spin_lock(&nal_data->rxt_stxd_lock); - txd = nal_data->rxt_stxd; - nal_data->rxt_stxd = txd->next; - spin_unlock(&nal_data->rxt_stxd_lock); - CDEBUG(D_INFO, "RXTHREAD got [%p], head is [%p]\n", - txd, nal_data->rxt_stxd); - txd->kniov = 0; - txd->rxt = 1; + CDEBUG(D_TRACE, "gmnal_get_stxd gmnalni [%p] block[%d] pid [%d]\n", + gmnalni, block, pid); + + if (gmnal_is_rxthread(gmnalni)) { + CDEBUG(D_NET, "RXTHREAD Attempting to get token\n"); + down(&gmnalni->gmni_rxt_stxd_token); + spin_lock(&gmnalni->gmni_rxt_stxd_lock); + txd = gmnalni->gmni_rxt_stxd; + gmnalni->gmni_rxt_stxd = txd->tx_next; + spin_unlock(&gmnalni->gmni_rxt_stxd_lock); + CDEBUG(D_NET, "RXTHREAD got [%p], head is [%p]\n", + txd, gmnalni->gmni_rxt_stxd); + txd->tx_kniov = 0; + txd->tx_rxt = 1; } else { if (block) { - CDEBUG(D_INFO, "Attempting to get token\n"); - down(&nal_data->stxd_token); + CDEBUG(D_NET, "Attempting to get token\n"); + down(&gmnalni->gmni_stxd_token); CDEBUG(D_PORTALS, "Got token\n"); } else { - if (down_trylock(&nal_data->stxd_token)) { + if (down_trylock(&gmnalni->gmni_stxd_token)) { CERROR("can't get token\n"); return(NULL); } } - spin_lock(&nal_data->stxd_lock); - txd = nal_data->stxd; - nal_data->stxd = txd->next; - spin_unlock(&nal_data->stxd_lock); - CDEBUG(D_INFO, "got [%p], head is [%p]\n", txd, - nal_data->stxd); - txd->kniov = 0; + spin_lock(&gmnalni->gmni_stxd_lock); + txd = gmnalni->gmni_stxd; + gmnalni->gmni_stxd = txd->tx_next; + spin_unlock(&gmnalni->gmni_stxd_lock); + CDEBUG(D_NET, "got [%p], head is [%p]\n", txd, + gmnalni->gmni_stxd); + txd->tx_kniov = 0; } /* general txd get */ return(txd); } @@ -269,72 +247,35 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block) * Return a txd to the list */ void -gmnal_return_stxd(gmnal_data_t *nal_data, gmnal_stxd_t *txd) +gmnal_return_stxd(gmnal_ni_t *gmnalni, gmnal_stxd_t *txd) { - CDEBUG(D_TRACE, "nal_data [%p], txd[%p] rxt[%d]\n", nal_data, - txd, txd->rxt); + CDEBUG(D_TRACE, "gmnalni [%p], txd[%p] rxt[%d]\n", gmnalni, + txd, txd->tx_rxt); /* * this transmit descriptor is * for the rxthread */ - if (txd->rxt) { - spin_lock(&nal_data->rxt_stxd_lock); - txd->next = nal_data->rxt_stxd; - nal_data->rxt_stxd = txd; - spin_unlock(&nal_data->rxt_stxd_lock); - up(&nal_data->rxt_stxd_token); - CDEBUG(D_INFO, "Returned stxd to rxthread list\n"); + if (txd->tx_rxt) { + spin_lock(&gmnalni->gmni_rxt_stxd_lock); + txd->tx_next = gmnalni->gmni_rxt_stxd; + gmnalni->gmni_rxt_stxd = txd; + spin_unlock(&gmnalni->gmni_rxt_stxd_lock); + up(&gmnalni->gmni_rxt_stxd_token); + CDEBUG(D_NET, "Returned stxd to rxthread list\n"); } else { - spin_lock(&nal_data->stxd_lock); - txd->next = nal_data->stxd; - nal_data->stxd = txd; - spin_unlock(&nal_data->stxd_lock); - up(&nal_data->stxd_token); - CDEBUG(D_INFO, "Returned stxd to general list\n"); + spin_lock(&gmnalni->gmni_stxd_lock); + txd->tx_next = gmnalni->gmni_stxd; + gmnalni->gmni_stxd = txd; + spin_unlock(&gmnalni->gmni_stxd_lock); + up(&gmnalni->gmni_stxd_token); + CDEBUG(D_NET, "Returned stxd to general list\n"); } return; } /* - * Get a large transmit descriptor from the free list - * This implicitly gets us a transmit token . - * always wait for one. - */ -gmnal_ltxd_t * -gmnal_get_ltxd(gmnal_data_t *nal_data) -{ - - gmnal_ltxd_t *ltxd = NULL; - - CDEBUG(D_TRACE, "nal_data [%p]\n", nal_data); - - down(&nal_data->ltxd_token); - spin_lock(&nal_data->ltxd_lock); - ltxd = nal_data->ltxd; - nal_data->ltxd = ltxd->next; - spin_unlock(&nal_data->ltxd_lock); - CDEBUG(D_INFO, "got [%p], head is [%p]\n", ltxd, nal_data->ltxd); - return(ltxd); -} - -/* - * Return an ltxd to the list - */ -void -gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd) -{ - CDEBUG(D_TRACE, "nal_data [%p], ltxd[%p]\n", nal_data, ltxd); - - spin_lock(&nal_data->ltxd_lock); - ltxd->next = nal_data->ltxd; - nal_data->ltxd = ltxd; - spin_unlock(&nal_data->ltxd_lock); - up(&nal_data->ltxd_token); - return; -} -/* * allocate a number of small rx buffers and register with GM * so they are wired and set up for DMA. This is a costly operation. * Also allocate a corrosponding descriptor to keep track of @@ -343,7 +284,7 @@ gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd) * receive thread. */ int -gmnal_alloc_srxd(gmnal_data_t *nal_data) +gmnal_alloc_srxd(gmnal_ni_t *gmnalni) { int nrx = 0, nsrx = 0, i = 0; gmnal_srxd_t *rxd = NULL; @@ -351,10 +292,10 @@ gmnal_alloc_srxd(gmnal_data_t *nal_data) CDEBUG(D_TRACE, "gmnal_alloc_small rx\n"); - spin_lock(&nal_data->gm_lock); - nrx = gm_num_receive_tokens(nal_data->gm_port); - spin_unlock(&nal_data->gm_lock); - CDEBUG(D_INFO, "total number of receive tokens available is [%d]\n", + spin_lock(&gmnalni->gmni_gm_lock); + nrx = gm_num_receive_tokens(gmnalni->gmni_port); + spin_unlock(&gmnalni->gmni_gm_lock); + CDEBUG(D_NET, "total number of receive tokens available is [%d]\n", nrx); nsrx = nrx/2; @@ -365,59 +306,56 @@ gmnal_alloc_srxd(gmnal_data_t *nal_data) */ nsrx = num_stxds*2 + 2; - CDEBUG(D_INFO, "Allocated [%d] receive tokens to small messages\n", + CDEBUG(D_NET, "Allocated [%d] receive tokens to small messages\n", nsrx); - spin_lock(&nal_data->gm_lock); - nal_data->srxd_hash = gm_create_hash(gm_hash_compare_ptrs, - gm_hash_hash_ptr, 0, 0, nsrx, 0); - spin_unlock(&nal_data->gm_lock); - if (!nal_data->srxd_hash) { + spin_lock(&gmnalni->gmni_gm_lock); + gmnalni->gmni_srxd_hash = gm_create_hash(gm_hash_compare_ptrs, + gm_hash_hash_ptr, 0, 0, nsrx, 0); + spin_unlock(&gmnalni->gmni_gm_lock); + if (!gmnalni->gmni_srxd_hash) { CERROR("Failed to create hash table\n"); - return(GMNAL_STATUS_NOMEM); + return -ENOMEM; } - sema_init(&nal_data->srxd_token, nsrx); - spin_lock_init(&nal_data->srxd_lock); - for (i=0; i<=nsrx; i++) { PORTAL_ALLOC(rxd, sizeof(gmnal_srxd_t)); if (!rxd) { CERROR("Failed to malloc rxd [%d]\n", i); - return(GMNAL_STATUS_NOMEM); + return -ENOMEM; } - spin_lock(&nal_data->gm_lock); - rxbuffer = gm_dma_malloc(nal_data->gm_port, - nal_data->small_msg_size); - spin_unlock(&nal_data->gm_lock); + spin_lock(&gmnalni->gmni_gm_lock); + rxbuffer = gm_dma_malloc(gmnalni->gmni_port, + gmnalni->gmni_small_msg_size); + spin_unlock(&gmnalni->gmni_gm_lock); if (!rxbuffer) { CERROR("Failed to gm_dma_malloc rxbuffer [%d], " - "size [%d]\n",i ,nal_data->small_msg_size); + "size [%d]\n",i ,gmnalni->gmni_small_msg_size); PORTAL_FREE(rxd, sizeof(gmnal_srxd_t)); - return(GMNAL_STATUS_FAIL); + return -ENOMEM; } - rxd->buffer = rxbuffer; - rxd->size = nal_data->small_msg_size; - rxd->gmsize = gm_min_size_for_length(rxd->size); + rxd->rx_buffer = rxbuffer; + rxd->rx_size = gmnalni->gmni_small_msg_size; + rxd->rx_gmsize = gm_min_size_for_length(rxd->rx_size); - if (gm_hash_insert(nal_data->srxd_hash, + if (gm_hash_insert(gmnalni->gmni_srxd_hash, (void*)rxbuffer, (void*)rxd)) { CERROR("failed to create hash entry rxd[%p] " "for rxbuffer[%p]\n", rxd, rxbuffer); - return(GMNAL_STATUS_FAIL); + return -ENOMEM; } - rxd->next = nal_data->srxd; - nal_data->srxd = rxd; - CDEBUG(D_INFO, "Registered rxd [%p] with buffer [%p], " - "size [%d]\n", rxd, rxd->buffer, rxd->size); + rxd->rx_next = gmnalni->gmni_srxd; + gmnalni->gmni_srxd = rxd; + CDEBUG(D_NET, "Registered rxd [%p] with buffer [%p], " + "size [%d]\n", rxd, rxd->rx_buffer, rxd->rx_size); } - return(GMNAL_STATUS_OK); + return 0; } @@ -426,29 +364,22 @@ gmnal_alloc_srxd(gmnal_data_t *nal_data) * rx descriptors that go along with them. */ void -gmnal_free_srxd(gmnal_data_t *nal_data) +gmnal_free_srxd(gmnal_ni_t *gmnalni) { - gmnal_srxd_t *rxd = nal_data->srxd, *_rxd = NULL; + gmnal_srxd_t *rxd = gmnalni->gmni_srxd, *_rxd = NULL; CDEBUG(D_TRACE, "gmnal_free_small rx\n"); while(rxd) { - CDEBUG(D_INFO, "Freeing rxd [%p] buffer [%p], size [%d]\n", - rxd, rxd->buffer, rxd->size); + CDEBUG(D_NET, "Freeing rxd [%p] buffer [%p], size [%d]\n", + rxd, rxd->rx_buffer, rxd->rx_size); _rxd = rxd; - rxd = rxd->next; + rxd = rxd->rx_next; + + spin_lock(&gmnalni->gmni_gm_lock); + gm_dma_free(gmnalni->gmni_port, _rxd->rx_buffer); + spin_unlock(&gmnalni->gmni_gm_lock); -#if 0 - spin_lock(&nal_data->gm_lock); - gm_deregister_memory(nal_data->gm_port, _rxd->buffer, - _rxd->size); - spin_unlock(&nal_data->gm_lock); - PORTAL_FREE(_rxd->buffer, GMNAL_SMALL_RXBUFFER_SIZE); -#else - spin_lock(&nal_data->gm_lock); - gm_dma_free(nal_data->gm_port, _rxd->buffer); - spin_unlock(&nal_data->gm_lock); -#endif PORTAL_FREE(_rxd, sizeof(gmnal_srxd_t)); } return; @@ -456,51 +387,6 @@ gmnal_free_srxd(gmnal_data_t *nal_data) /* - * Get a rxd from the free list - * This get us a wired and gm_registered small rx buffer. - * This implicitly gets us a receive token also. - */ -gmnal_srxd_t * -gmnal_get_srxd(gmnal_data_t *nal_data, int block) -{ - - gmnal_srxd_t *rxd = NULL; - CDEBUG(D_TRACE, "nal_data [%p] block [%d]\n", nal_data, block); - - if (block) { - down(&nal_data->srxd_token); - } else { - if (down_trylock(&nal_data->srxd_token)) { - CDEBUG(D_INFO, "gmnal_get_srxd Can't get token\n"); - return(NULL); - } - } - spin_lock(&nal_data->srxd_lock); - rxd = nal_data->srxd; - if (rxd) - nal_data->srxd = rxd->next; - spin_unlock(&nal_data->srxd_lock); - CDEBUG(D_INFO, "got [%p], head is [%p]\n", rxd, nal_data->srxd); - return(rxd); -} - -/* - * Return an rxd to the list - */ -void -gmnal_return_srxd(gmnal_data_t *nal_data, gmnal_srxd_t *rxd) -{ - CDEBUG(D_TRACE, "nal_data [%p], rxd[%p]\n", nal_data, rxd); - - spin_lock(&nal_data->srxd_lock); - rxd->next = nal_data->srxd; - nal_data->srxd = rxd; - spin_unlock(&nal_data->srxd_lock); - up(&nal_data->srxd_token); - return; -} - -/* * Given a pointer to a srxd find * the relevant descriptor for it * This is done by searching a hash @@ -508,72 +394,72 @@ gmnal_return_srxd(gmnal_data_t *nal_data, gmnal_srxd_t *rxd) * are created */ gmnal_srxd_t * -gmnal_rxbuffer_to_srxd(gmnal_data_t *nal_data, void *rxbuffer) +gmnal_rxbuffer_to_srxd(gmnal_ni_t *gmnalni, void *rxbuffer) { gmnal_srxd_t *srxd = NULL; - CDEBUG(D_TRACE, "nal_data [%p], rxbuffer [%p]\n", nal_data, rxbuffer); - srxd = gm_hash_find(nal_data->srxd_hash, rxbuffer); - CDEBUG(D_INFO, "srxd is [%p]\n", srxd); + CDEBUG(D_TRACE, "gmnalni [%p], rxbuffer [%p]\n", gmnalni, rxbuffer); + srxd = gm_hash_find(gmnalni->gmni_srxd_hash, rxbuffer); + CDEBUG(D_NET, "srxd is [%p]\n", srxd); return(srxd); } void -gmnal_stop_rxthread(gmnal_data_t *nal_data) +gmnal_stop_rxthread(gmnal_ni_t *gmnalni) { int delay = 30; - CDEBUG(D_TRACE, "Attempting to stop rxthread nal_data [%p]\n", - nal_data); + CDEBUG(D_TRACE, "Attempting to stop rxthread gmnalni [%p]\n", + gmnalni); - nal_data->rxthread_stop_flag = GMNAL_THREAD_STOP; + gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_STOP; - gmnal_remove_rxtwe(nal_data); + gmnal_remove_rxtwe(gmnalni); /* * kick the thread */ - up(&nal_data->rxtwe_wait); + up(&gmnalni->gmni_rxtwe_wait); - while(nal_data->rxthread_flag != GMNAL_THREAD_RESET && delay--) { - CDEBUG(D_INFO, "gmnal_stop_rxthread sleeping\n"); + while(gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET && delay--) { + CDEBUG(D_NET, "gmnal_stop_rxthread sleeping\n"); gmnal_yield(1); - up(&nal_data->rxtwe_wait); + up(&gmnalni->gmni_rxtwe_wait); } - if (nal_data->rxthread_flag != GMNAL_THREAD_RESET) { + if (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) { CERROR("I don't know how to wake the thread\n"); } else { - CDEBUG(D_INFO, "rx thread seems to have stopped\n"); + CDEBUG(D_NET, "rx thread seems to have stopped\n"); } } void -gmnal_stop_ctthread(gmnal_data_t *nal_data) +gmnal_stop_ctthread(gmnal_ni_t *gmnalni) { int delay = 15; - CDEBUG(D_TRACE, "Attempting to stop ctthread nal_data [%p]\n", - nal_data); + CDEBUG(D_TRACE, "Attempting to stop ctthread gmnalni [%p]\n", + gmnalni); - nal_data->ctthread_flag = GMNAL_THREAD_STOP; - spin_lock(&nal_data->gm_lock); - gm_set_alarm(nal_data->gm_port, &nal_data->ctthread_alarm, 10, + gmnalni->gmni_ctthread_flag = GMNAL_THREAD_STOP; + spin_lock(&gmnalni->gmni_gm_lock); + gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 10, NULL, NULL); - spin_unlock(&nal_data->gm_lock); + spin_unlock(&gmnalni->gmni_gm_lock); - while(nal_data->ctthread_flag == GMNAL_THREAD_STOP && delay--) { - CDEBUG(D_INFO, "gmnal_stop_ctthread sleeping\n"); + while(gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP && delay--) { + CDEBUG(D_NET, "gmnal_stop_ctthread sleeping\n"); gmnal_yield(1); } - if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { + if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) { CERROR("I DON'T KNOW HOW TO WAKE THE THREAD\n"); } else { - CDEBUG(D_INFO, "CT THREAD SEEMS TO HAVE STOPPED\n"); + CDEBUG(D_NET, "CT THREAD SEEMS TO HAVE STOPPED\n"); } } @@ -835,17 +721,17 @@ gmnal_yield(int delay) } int -gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, +gmnal_is_small_msg(gmnal_ni_t *gmnalni, int niov, struct iovec *iov, int len) { CDEBUG(D_TRACE, "len [%d] limit[%d]\n", len, - nal_data->small_msg_size); + gmnalni->gmni_small_msg_size); if ((len + sizeof(ptl_hdr_t) + sizeof(gmnal_msghdr_t)) - < nal_data->small_msg_size) { + < gmnalni->gmni_small_msg_size) { - CDEBUG(D_INFO, "Yep, small message\n"); + CDEBUG(D_NET, "Yep, small message\n"); return(1); } else { CERROR("No, not small message\n"); @@ -865,7 +751,7 @@ gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, * can get it to complete the receive */ int -gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv) +gmnal_add_rxtwe(gmnal_ni_t *gmnalni, gm_recv_t *recv) { gmnal_rxtwe_t *we = NULL; @@ -874,7 +760,7 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv) PORTAL_ALLOC(we, sizeof(gmnal_rxtwe_t)); if (!we) { CERROR("failed to malloc\n"); - return(GMNAL_STATUS_FAIL); + return -ENOMEM; } we->buffer = gm_ntohp(recv->buffer); we->snode = (int)gm_ntoh_u16(recv->sender_node_id); @@ -882,70 +768,73 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv) we->type = (int)gm_ntoh_u8(recv->type); we->length = (int)gm_ntohl(recv->length); - spin_lock(&nal_data->rxtwe_lock); - if (nal_data->rxtwe_tail) { - nal_data->rxtwe_tail->next = we; + spin_lock(&gmnalni->gmni_rxtwe_lock); + if (gmnalni->gmni_rxtwe_tail) { + gmnalni->gmni_rxtwe_tail->next = we; } else { - nal_data->rxtwe_head = we; - nal_data->rxtwe_tail = we; + gmnalni->gmni_rxtwe_head = we; + gmnalni->gmni_rxtwe_tail = we; } - nal_data->rxtwe_tail = we; - spin_unlock(&nal_data->rxtwe_lock); + gmnalni->gmni_rxtwe_tail = we; + spin_unlock(&gmnalni->gmni_rxtwe_lock); - up(&nal_data->rxtwe_wait); - return(GMNAL_STATUS_OK); + up(&gmnalni->gmni_rxtwe_wait); + return 0; } void -gmnal_remove_rxtwe(gmnal_data_t *nal_data) +gmnal_remove_rxtwe(gmnal_ni_t *gmnalni) { - gmnal_rxtwe_t *_we, *we = nal_data->rxtwe_head; + gmnal_rxtwe_t *_we, *we = gmnalni->gmni_rxtwe_head; CDEBUG(D_NET, "removing all work list entries\n"); - spin_lock(&nal_data->rxtwe_lock); + spin_lock(&gmnalni->gmni_rxtwe_lock); CDEBUG(D_NET, "Got lock\n"); while (we) { _we = we; we = we->next; PORTAL_FREE(_we, sizeof(gmnal_rxtwe_t)); } - spin_unlock(&nal_data->rxtwe_lock); - nal_data->rxtwe_head = NULL; - nal_data->rxtwe_tail = NULL; + spin_unlock(&gmnalni->gmni_rxtwe_lock); + gmnalni->gmni_rxtwe_head = NULL; + gmnalni->gmni_rxtwe_tail = NULL; } gmnal_rxtwe_t * -gmnal_get_rxtwe(gmnal_data_t *nal_data) +gmnal_get_rxtwe(gmnal_ni_t *gmnalni) { gmnal_rxtwe_t *we = NULL; CDEBUG(D_NET, "Getting entry to list\n"); do { - while(down_interruptible(&nal_data->rxtwe_wait) != 0) + while(down_interruptible(&gmnalni->gmni_rxtwe_wait) != 0) /* do nothing */; - if (nal_data->rxthread_stop_flag == GMNAL_THREAD_STOP) { + + if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP) { /* * time to stop * TO DO some one free the work entries */ return(NULL); } - spin_lock(&nal_data->rxtwe_lock); - if (nal_data->rxtwe_head) { - CDEBUG(D_INFO, "Got a work entry\n"); - we = nal_data->rxtwe_head; - nal_data->rxtwe_head = we->next; - if (!nal_data->rxtwe_head) - nal_data->rxtwe_tail = NULL; + + spin_lock(&gmnalni->gmni_rxtwe_lock); + if (gmnalni->gmni_rxtwe_head) { + CDEBUG(D_NET, "Got a work entry\n"); + we = gmnalni->gmni_rxtwe_head; + gmnalni->gmni_rxtwe_head = we->next; + if (!gmnalni->gmni_rxtwe_head) + gmnalni->gmni_rxtwe_tail = NULL; } else { CWARN("woken but no work\n"); } - spin_unlock(&nal_data->rxtwe_lock); + + spin_unlock(&gmnalni->gmni_rxtwe_lock); } while (!we); - CDEBUG(D_INFO, "Returning we[%p]\n", we); + CDEBUG(D_NET, "Returning we[%p]\n", we); return(we); } @@ -958,7 +847,7 @@ gmnal_get_rxtwe(gmnal_data_t *nal_data) * callback events or sleeps. */ int -gmnal_start_kernel_threads(gmnal_data_t *nal_data) +gmnal_start_kernel_threads(gmnal_ni_t *gmnalni) { int threads = 0; @@ -967,69 +856,69 @@ gmnal_start_kernel_threads(gmnal_data_t *nal_data) * gm_unknown call (sleeping) to exit it. */ CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n"); - gm_initialize_alarm(&nal_data->ctthread_alarm); - nal_data->ctthread_flag = GMNAL_THREAD_RESET; + gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm); + gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET; - CDEBUG(D_INFO, "Starting caretaker thread\n"); - nal_data->ctthread_pid = - kernel_thread(gmnal_ct_thread, (void*)nal_data, 0); - if (nal_data->ctthread_pid <= 0) { + CDEBUG(D_NET, "Starting caretaker thread\n"); + gmnalni->gmni_ctthread_pid = + kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0); + if (gmnalni->gmni_ctthread_pid <= 0) { CERROR("Caretaker thread failed to start\n"); - return(GMNAL_STATUS_FAIL); + return -ENOMEM; } - while (nal_data->rxthread_flag != GMNAL_THREAD_RESET) { + while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) { gmnal_yield(1); - CDEBUG(D_INFO, "Waiting for caretaker thread signs of life\n"); + CDEBUG(D_NET, "Waiting for caretaker thread signs of life\n"); } - CDEBUG(D_INFO, "caretaker thread has started\n"); + CDEBUG(D_NET, "caretaker thread has started\n"); /* * Now start a number of receiver threads * these treads get work to do from the caretaker (ct) thread */ - nal_data->rxthread_flag = GMNAL_THREAD_RESET; - nal_data->rxthread_stop_flag = GMNAL_THREAD_RESET; + gmnalni->gmni_rxthread_flag = GMNAL_THREAD_RESET; + gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_RESET; for (threads=0; threadsrxthread_pid[threads] = -1; - spin_lock_init(&nal_data->rxtwe_lock); - spin_lock_init(&nal_data->rxthread_flag_lock); - sema_init(&nal_data->rxtwe_wait, 0); - nal_data->rxtwe_head = NULL; - nal_data->rxtwe_tail = NULL; + gmnalni->gmni_rxthread_pid[threads] = -1; + spin_lock_init(&gmnalni->gmni_rxtwe_lock); + spin_lock_init(&gmnalni->gmni_rxthread_flag_lock); + sema_init(&gmnalni->gmni_rxtwe_wait, 0); + gmnalni->gmni_rxtwe_head = NULL; + gmnalni->gmni_rxtwe_tail = NULL; /* * If the default number of receive threades isn't * modified at load time, then start one thread per cpu */ if (num_rx_threads == -1) num_rx_threads = smp_num_cpus; - CDEBUG(D_INFO, "Starting [%d] receive threads\n", num_rx_threads); + CDEBUG(D_NET, "Starting [%d] receive threads\n", num_rx_threads); for (threads=0; threadsrxthread_pid[threads] = - kernel_thread(gmnal_rx_thread, (void*)nal_data, 0); - if (nal_data->rxthread_pid[threads] <= 0) { + gmnalni->gmni_rxthread_pid[threads] = + kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0); + if (gmnalni->gmni_rxthread_pid[threads] <= 0) { CERROR("Receive thread failed to start\n"); - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - return(GMNAL_STATUS_FAIL); + gmnal_stop_rxthread(gmnalni); + gmnal_stop_ctthread(gmnalni); + return -ENOMEM; } } for (;;) { - spin_lock(&nal_data->rxthread_flag_lock); - if (nal_data->rxthread_flag == GMNAL_RXTHREADS_STARTED) { - spin_unlock(&nal_data->rxthread_flag_lock); + spin_lock(&gmnalni->gmni_rxthread_flag_lock); + if (gmnalni->gmni_rxthread_flag == GMNAL_RXTHREADS_STARTED) { + spin_unlock(&gmnalni->gmni_rxthread_flag_lock); break; } - spin_unlock(&nal_data->rxthread_flag_lock); + spin_unlock(&gmnalni->gmni_rxthread_flag_lock); gmnal_yield(1); } - CDEBUG(D_INFO, "receive threads seem to have started\n"); + CDEBUG(D_NET, "receive threads seem to have started\n"); - return(GMNAL_STATUS_OK); + return 0; } -- 1.8.3.1