#define GMNAL_MAGIC 0x1234abcd
#define GMNAL_SMALL_MESSAGE 1078
-#define GMNAL_LARGE_MESSAGE_INIT 1079
-#define GMNAL_LARGE_MESSAGE_ACK 1080
-#define GMNAL_LARGE_MESSAGE_FINI 1081
-extern int gmnal_small_msg_size;
extern int num_rx_threads;
extern int num_stxds;
extern int gm_port_id;
* and the other by the NAL rxthreads when doing sends.
* This helps prevent deadlock caused by stxd starvation.
*/
-typedef struct _gmnal_stxd_t {
- void *buffer;
- int buffer_size;
- gm_size_t gm_size;
- int msg_size;
- int gm_target_node;
- int gm_priority;
- int type;
- struct _gmnal_data_t *nal_data;
- lib_msg_t *cookie;
- int niov;
- struct iovec iov[PTL_MD_MAX_IOV];
- struct _gmnal_stxd_t *next;
- int rxt;
- int kniov;
- struct iovec *iovec_dup;
+typedef struct gmnal_stxd {
+ struct gmnal_stxd *tx_next;
+ void *tx_buffer;
+ int tx_buffer_size;
+ gm_size_t tx_gm_size;
+ int tx_msg_size;
+ int tx_gmlid;
+ int tx_gm_priority;
+ int tx_type;
+ ptl_nid_t tx_nid;
+ struct gmnal_ni *tx_gmni;
+ lib_msg_t *tx_cookie;
+ int tx_niov;
+ int tx_rxt;
+ int tx_kniov;
+ struct iovec *tx_iovec_dup;
+ struct iovec tx_iov[PTL_MD_MAX_IOV];
} gmnal_stxd_t;
/*
- * keeps a transmit token for large transmit (gm_get)
- * and a pointer to rxd that is used as context for large receive
- */
-typedef struct _gmnal_ltxd_t {
- struct _gmnal_ltxd_t *next;
- struct _gmnal_srxd_t *srxd;
-} gmnal_ltxd_t;
-
-
-/*
* as for gmnal_stxd_t
* a hash table in nal_data find srxds from
* the rx buffer address. hash table populated at init time
*/
-typedef struct _gmnal_srxd_t {
- void *buffer;
- int size;
- gm_size_t gmsize;
- unsigned int gm_source_node;
- gmnal_stxd_t *source_stxd;
- int type;
- int nsiov;
- int nriov;
- struct iovec *riov;
- int ncallbacks;
- spinlock_t callback_lock;
- int callback_status;
- lib_msg_t *cookie;
- struct _gmnal_srxd_t *next;
- struct _gmnal_data_t *nal_data;
+typedef struct gmnal_srxd {
+ void *rx_buffer;
+ int rx_size;
+ gm_size_t rx_gmsize;
+ unsigned int rx_sender_gmid;
+ __u64 rx_source_stxd;
+ int rx_type;
+ int rx_nsiov;
+ int rx_nriov;
+ struct iovec *rx_riov;
+ int rx_ncallbacks;
+ spinlock_t rx_callback_lock;
+ int rx_callback_status;
+ lib_msg_t *rx_cookie;
+ struct gmnal_srxd *rx_next;
+ struct gmnal_ni *rx_gmni;
} gmnal_srxd_t;
/*
* Header which lmgnal puts at the start of each message
* watch alignment for ia32/64 interaction
*/
-typedef struct _gmnal_msghdr {
- __s32 magic;
- __s32 type;
- __u32 sender_node_id;
- __s32 niov;
- gm_remote_ptr_t stxd_remote_ptr; /* 64 bits */
+typedef struct gmnal_msghdr {
+ __s32 gmm_magic;
+ __s32 gmm_type;
+ __s32 gmm_niov;
+ __u32 gmm_sender_gmid;
+ __u64 gmm_stxd_remote_ptr;
} WIRE_ATTR gmnal_msghdr_t;
/*
* is exhausted (as caretaker thread is responsible for replacing
* transmit descriptors on the free list)
*/
-typedef struct _gmnal_rxtwe {
+typedef struct gmnal_rxtwe {
void *buffer;
unsigned snode;
unsigned sport;
unsigned type;
unsigned length;
- struct _gmnal_rxtwe *next;
+ struct gmnal_rxtwe *next;
} gmnal_rxtwe_t;
/*
*/
#define NRXTHREADS 10 /* max number of receiver threads */
-typedef struct _gmnal_data_t {
- int refcnt;
- spinlock_t cb_lock;
- spinlock_t stxd_lock;
- struct semaphore stxd_token;
- gmnal_stxd_t *stxd;
- spinlock_t rxt_stxd_lock;
- struct semaphore rxt_stxd_token;
- gmnal_stxd_t *rxt_stxd;
- spinlock_t ltxd_lock;
- struct semaphore ltxd_token;
- gmnal_ltxd_t *ltxd;
- spinlock_t srxd_lock;
- struct semaphore srxd_token;
- gmnal_srxd_t *srxd;
- struct gm_hash *srxd_hash;
- nal_t *nal;
- lib_nal_t *libnal;
- struct gm_port *gm_port;
- unsigned int gm_local_nid;
- unsigned int gm_global_nid;
- spinlock_t gm_lock;
- long rxthread_pid[NRXTHREADS];
- int rxthread_stop_flag;
- spinlock_t rxthread_flag_lock;
- long rxthread_flag;
- long ctthread_pid;
- int ctthread_flag;
- gm_alarm_t ctthread_alarm;
- int small_msg_size;
- int small_msg_gmsize;
- gmnal_rxtwe_t *rxtwe_head;
- gmnal_rxtwe_t *rxtwe_tail;
- spinlock_t rxtwe_lock;
- struct semaphore rxtwe_wait;
- struct ctl_table_header *sysctl;
-} gmnal_data_t;
+typedef struct gmnal_ni {
+ spinlock_t gmni_stxd_lock;
+ struct semaphore gmni_stxd_token;
+ gmnal_stxd_t *gmni_stxd;
+ spinlock_t gmni_rxt_stxd_lock;
+ struct semaphore gmni_rxt_stxd_token;
+ gmnal_stxd_t *gmni_rxt_stxd;
+ gmnal_srxd_t *gmni_srxd;
+ struct gm_hash *gmni_srxd_hash;
+ nal_t *gmni_nal;
+ lib_nal_t *gmni_libnal;
+ struct gm_port *gmni_port;
+ __u32 gmni_local_gmid;
+ __u32 gmni_global_gmid;
+ spinlock_t gmni_gm_lock; /* serialise GM calls */
+ long gmni_rxthread_pid[NRXTHREADS];
+ int gmni_rxthread_stop_flag;
+ spinlock_t gmni_rxthread_flag_lock;
+ long gmni_rxthread_flag;
+ long gmni_ctthread_pid;
+ int gmni_ctthread_flag;
+ gm_alarm_t gmni_ctthread_alarm;
+ int gmni_small_msg_size;
+ int gmni_small_msg_gmsize;
+ gmnal_rxtwe_t *gmni_rxtwe_head;
+ gmnal_rxtwe_t *gmni_rxtwe_tail;
+ spinlock_t gmni_rxtwe_lock;
+ struct semaphore gmni_rxtwe_wait;
+} gmnal_ni_t;
/*
* Flags to start/stop and check status of threads
#define GMNAL_RXTHREADS_STARTED ( (1<<num_rx_threads)-1)
-extern gmnal_data_t *global_nal_data;
-
/*
* for ioctl get pid
*/
#define GMNAL_IOC_GET_GNID 1
/*
- * Return codes
- */
-#define GMNAL_STATUS_OK 0
-#define GMNAL_STATUS_FAIL 1
-#define GMNAL_STATUS_NOMEM 2
-
-
-/*
* FUNCTION PROTOTYPES
*/
/*
* Small and Large Transmit and Receive Descriptor Functions
*/
-int gmnal_alloc_txd(gmnal_data_t *);
-void gmnal_free_txd(gmnal_data_t *);
-gmnal_stxd_t* gmnal_get_stxd(gmnal_data_t *, int);
-void gmnal_return_stxd(gmnal_data_t *, gmnal_stxd_t *);
-gmnal_ltxd_t* gmnal_get_ltxd(gmnal_data_t *);
-void gmnal_return_ltxd(gmnal_data_t *, gmnal_ltxd_t *);
-
-int gmnal_alloc_srxd(gmnal_data_t *);
-void gmnal_free_srxd(gmnal_data_t *);
-gmnal_srxd_t* gmnal_get_srxd(gmnal_data_t *, int);
-void gmnal_return_srxd(gmnal_data_t *, gmnal_srxd_t *);
+int gmnal_alloc_txd(gmnal_ni_t *);
+void gmnal_free_txd(gmnal_ni_t *);
+gmnal_stxd_t* gmnal_get_stxd(gmnal_ni_t *, int);
+void gmnal_return_stxd(gmnal_ni_t *, gmnal_stxd_t *);
+
+int gmnal_alloc_srxd(gmnal_ni_t *);
+void gmnal_free_srxd(gmnal_ni_t *);
/*
* general utility functions
*/
-gmnal_srxd_t *gmnal_rxbuffer_to_srxd(gmnal_data_t *, void*);
-void gmnal_stop_rxthread(gmnal_data_t *);
-void gmnal_stop_ctthread(gmnal_data_t *);
+gmnal_srxd_t *gmnal_rxbuffer_to_srxd(gmnal_ni_t *, void*);
+void gmnal_stop_rxthread(gmnal_ni_t *);
+void gmnal_stop_ctthread(gmnal_ni_t *);
void gmnal_drop_sends_callback(gm_port_t *, void *, gm_status_t);
void gmnal_resume_sending_callback(gm_port_t *, void *, gm_status_t);
char *gmnal_gm_error(gm_status_t);
char *gmnal_rxevent(gm_recv_event_t*);
-int gmnal_is_small_msg(gmnal_data_t*, int, struct iovec*, int);
void gmnal_yield(int);
-int gmnal_start_kernel_threads(gmnal_data_t *);
+int gmnal_start_kernel_threads(gmnal_ni_t *);
/*
*/
int gmnal_ct_thread(void *); /* caretaker thread */
int gmnal_rx_thread(void *); /* receive thread */
-int gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int);
-int gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*);
-int gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *);
-int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *);
-gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *);
-void gmnal_remove_rxtwe(gmnal_data_t *);
+void gmnal_pre_receive(gmnal_ni_t*, gmnal_rxtwe_t*, int);
+void gmnal_rx_bad(gmnal_ni_t *, gmnal_rxtwe_t *);
+void gmnal_rx_requeue_buffer(gmnal_ni_t *, gmnal_srxd_t *);
+int gmnal_add_rxtwe(gmnal_ni_t *, gm_recv_t *);
+gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_ni_t *);
+void gmnal_remove_rxtwe(gmnal_ni_t *);
/*
* Small messages
*/
-ptl_err_t gmnal_small_rx(lib_nal_t *, void *, lib_msg_t *);
-ptl_err_t gmnal_small_tx(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
- int, ptl_nid_t, ptl_pid_t,
- gmnal_stxd_t*, int);
+ptl_err_t gmnal_small_tx(lib_nal_t *libnal, void *private,
+ lib_msg_t *cookie, ptl_hdr_t *hdr,
+ int type, ptl_nid_t nid,
+ gmnal_stxd_t *stxd, int size);
void gmnal_small_tx_callback(gm_port_t *, void *, gm_status_t);
-
-
-/*
- * Large messages
- */
-int gmnal_large_rx(lib_nal_t *, void *, lib_msg_t *, unsigned int,
- struct iovec *, size_t, size_t, size_t);
-
-int gmnal_large_tx(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
- int, ptl_nid_t, ptl_pid_t, unsigned int,
- struct iovec*, size_t, int);
-
-void gmnal_large_tx_callback(gm_port_t *, void *, gm_status_t);
-
-int gmnal_remote_get(gmnal_srxd_t *, int, struct iovec*, int,
- struct iovec*);
-
-void gmnal_remote_get_callback(gm_port_t *, void *, gm_status_t);
-
-int gmnal_copyiov(int, gmnal_srxd_t *, int, struct iovec*, int,
- struct iovec*);
-
-void gmnal_large_tx_ack(gmnal_data_t *, gmnal_srxd_t *);
-void gmnal_large_tx_ack_callback(gm_port_t *, void *, gm_status_t);
-void gmnal_large_tx_ack_received(gmnal_data_t *, gmnal_srxd_t *);
-
#endif /*__INCLUDE_GMNAL_H__*/
#include "gmnal.h"
-
-
-gmnal_data_t *global_nal_data = NULL;
-#define GLOBAL_NID_STR_LEN 16
-char global_nid_str[GLOBAL_NID_STR_LEN] = {0};
ptl_handle_ni_t kgmnal_ni;
extern int gmnal_cmd(struct portals_cfg *pcfg, void *private);
/*
- * Write the global nid /proc/sys/gmnal/globalnid
- */
-#define GMNAL_SYSCTL 201
-#define GMNAL_SYSCTL_GLOBALNID 1
-
-static ctl_table gmnal_sysctl_table[] = {
- {GMNAL_SYSCTL_GLOBALNID, "globalnid",
- global_nid_str, GLOBAL_NID_STR_LEN,
- 0444, NULL, &proc_dostring},
- { 0 }
-};
-
-
-static ctl_table gmnalnal_top_sysctl_table[] = {
- {GMNAL_SYSCTL, "gmnal", NULL, 0, 0555, gmnal_sysctl_table},
- { 0 }
-};
-
-/*
* gmnal_api_shutdown
* nal_refct == 0 => called on last matching PtlNIFini()
* Close down this interface and free any resources associated with it
void
gmnal_api_shutdown(nal_t *nal)
{
- gmnal_data_t *nal_data;
+ gmnal_ni_t *gmnalni;
lib_nal_t *libnal;
if (nal->nal_refct != 0) {
return;
}
- LASSERT(nal == global_nal_data->nal);
libnal = (lib_nal_t *)nal->nal_data;
- nal_data = (gmnal_data_t *)libnal->libnal_data;
- LASSERT(nal_data == global_nal_data);
- CDEBUG(D_TRACE, "gmnal_api_shutdown: nal_data [%p]\n", nal_data);
+ gmnalni = (gmnal_ni_t *)libnal->libnal_data;
+ CDEBUG(D_TRACE, "gmnal_api_shutdown: gmnalni [%p]\n", gmnalni);
/* Stop portals calling our ioctl handler */
libcfs_nal_cmd_unregister(GMNAL);
* shutdown our threads, THEN lib_fini() */
lib_fini(libnal);
- gmnal_stop_rxthread(nal_data);
- gmnal_stop_ctthread(nal_data);
- gmnal_free_txd(nal_data);
- gmnal_free_srxd(nal_data);
- spin_lock(&nal_data->gm_lock);
- gm_close(nal_data->gm_port);
+ gmnal_stop_rxthread(gmnalni);
+ gmnal_stop_ctthread(gmnalni);
+ gmnal_free_txd(gmnalni);
+ gmnal_free_srxd(gmnalni);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_close(gmnalni->gmni_port);
gm_finalize();
- spin_unlock(&nal_data->gm_lock);
- if (nal_data->sysctl)
- unregister_sysctl_table (nal_data->sysctl);
+ spin_unlock(&gmnalni->gmni_gm_lock);
/* Don't free 'nal'; it's a static struct */
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
-
- global_nal_data = NULL;
}
{
lib_nal_t *libnal = NULL;
- gmnal_data_t *nal_data = NULL;
+ gmnal_ni_t *gmnalni = NULL;
gmnal_srxd_t *srxd = NULL;
gm_status_t gm_status;
- unsigned int local_nid = 0, global_nid = 0;
+ unsigned int local_gmid = 0, global_gmid = 0;
ptl_process_id_t process_id;
if (nal->nal_refct != 0) {
CDEBUG(D_TRACE, "startup\n");
- LASSERT(global_nal_data == NULL);
-
- PORTAL_ALLOC(nal_data, sizeof(gmnal_data_t));
- if (!nal_data) {
+ PORTAL_ALLOC(gmnalni, sizeof(gmnal_ni_t));
+ if (!gmnalni) {
CERROR("can't get memory\n");
return(PTL_NO_SPACE);
}
- memset(nal_data, 0, sizeof(gmnal_data_t));
+ memset(gmnalni, 0, sizeof(gmnal_ni_t));
/*
* set the small message buffer size
*/
- CDEBUG(D_INFO, "Allocd and reset nal_data[%p]\n", nal_data);
- CDEBUG(D_INFO, "small_msg_size is [%d]\n", nal_data->small_msg_size);
+ CDEBUG(D_NET, "Allocd and reset gmnalni[%p]\n", gmnalni);
+ CDEBUG(D_NET, "small_msg_size is [%d]\n", gmnalni->gmni_small_msg_size);
PORTAL_ALLOC(libnal, sizeof(lib_nal_t));
if (!libnal) {
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
return(PTL_NO_SPACE);
}
libnal->libnal_map = NULL;
libnal->libnal_unmap = NULL;
libnal->libnal_dist = gmnal_cb_dist;
- libnal->libnal_data = NULL;
+ libnal->libnal_data = gmnalni;
- CDEBUG(D_INFO, "Allocd and reset libnal[%p]\n", libnal);
+ CDEBUG(D_NET, "Allocd and reset libnal[%p]\n", libnal);
- /*
- * String them all together
- */
- libnal->libnal_data = (void*)nal_data;
- nal_data->nal = nal;
- nal_data->libnal = libnal;
+ gmnalni->gmni_nal = nal;
+ gmnalni->gmni_libnal = libnal;
- spin_lock_init(&nal_data->gm_lock);
+ spin_lock_init(&gmnalni->gmni_gm_lock);
/*
* initialise the interface,
*/
- CDEBUG(D_INFO, "Calling gm_init\n");
+ CDEBUG(D_NET, "Calling gm_init\n");
if (gm_init() != GM_SUCCESS) {
CERROR("call to gm_init failed\n");
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
"name [%s], version [%d]\n", gm_port_id,
"gmnal", GM_API_VERSION);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_open(&nal_data->gm_port, 0, gm_port_id, "gmnal",
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_status = gm_open(&gmnalni->gmni_port, 0, gm_port_id, "gmnal",
GM_API_VERSION);
- spin_unlock(&nal_data->gm_lock);
+ spin_unlock(&gmnalni->gmni_gm_lock);
- CDEBUG(D_INFO, "gm_open returned [%d]\n", gm_status);
+ CDEBUG(D_NET, "gm_open returned [%d]\n", gm_status);
if (gm_status == GM_SUCCESS) {
- CDEBUG(D_INFO,"gm_open succeeded port[%p]\n",nal_data->gm_port);
+ CDEBUG(D_NET,"gm_open succeeded port[%p]\n",gmnalni->gmni_port);
} else {
switch(gm_status) {
case(GM_INVALID_PARAMETER):
gm_status);
break;
}
- spin_lock(&nal_data->gm_lock);
+ spin_lock(&gmnalni->gmni_gm_lock);
gm_finalize();
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
- nal_data->small_msg_size = gmnal_small_msg_size;
- nal_data->small_msg_gmsize =
- gm_min_size_for_length(gmnal_small_msg_size);
+ gmnalni->gmni_small_msg_size = sizeof(gmnal_msghdr_t) +
+ sizeof(ptl_hdr_t) +
+ PTL_MTU +
+ 928; /* !! */
+ CWARN("Msg size %08x\n", gmnalni->gmni_small_msg_size);
- if (gmnal_alloc_srxd(nal_data) != GMNAL_STATUS_OK) {
+ gmnalni->gmni_small_msg_gmsize =
+ gm_min_size_for_length(gmnalni->gmni_small_msg_size);
+
+ if (gmnal_alloc_srxd(gmnalni) != 0) {
CERROR("Failed to allocate small rx descriptors\n");
- gmnal_free_txd(nal_data);
- spin_lock(&nal_data->gm_lock);
- gm_close(nal_data->gm_port);
+ gmnal_free_txd(gmnalni);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_close(gmnalni->gmni_port);
gm_finalize();
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
* Hang out a bunch of small receive buffers
* In fact hang them all out
*/
- while((srxd = gmnal_get_srxd(nal_data, 0))) {
+ for (srxd = gmnalni->gmni_srxd; srxd != NULL; srxd = srxd->rx_next) {
CDEBUG(D_NET, "giving [%p] to gm_provide_recvive_buffer\n",
- srxd->buffer);
- spin_lock(&nal_data->gm_lock);
- gm_provide_receive_buffer_with_tag(nal_data->gm_port,
- srxd->buffer, srxd->gmsize,
+ srxd->rx_buffer);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_provide_receive_buffer_with_tag(gmnalni->gmni_port,
+ srxd->rx_buffer,
+ srxd->rx_gmsize,
GM_LOW_PRIORITY, 0);
- spin_unlock(&nal_data->gm_lock);
+ spin_unlock(&gmnalni->gmni_gm_lock);
}
/*
* Allocate pools of small tx buffers and descriptors
*/
- if (gmnal_alloc_txd(nal_data) != GMNAL_STATUS_OK) {
+ if (gmnal_alloc_txd(gmnalni) != 0) {
CERROR("Failed to allocate small tx descriptors\n");
- spin_lock(&nal_data->gm_lock);
- gm_close(nal_data->gm_port);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_close(gmnalni->gmni_port);
gm_finalize();
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
* Initialise the portals library
*/
CDEBUG(D_NET, "Getting node id\n");
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_get_node_id(nal_data->gm_port, &local_nid);
- spin_unlock(&nal_data->gm_lock);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_status = gm_get_node_id(gmnalni->gmni_port, &local_gmid);
+ spin_unlock(&gmnalni->gmni_gm_lock);
if (gm_status != GM_SUCCESS) {
- gmnal_stop_rxthread(nal_data);
- gmnal_stop_ctthread(nal_data);
+ gmnal_stop_rxthread(gmnalni);
+ gmnal_stop_ctthread(gmnalni);
CERROR("can't determine node id\n");
- gmnal_free_txd(nal_data);
- gmnal_free_srxd(nal_data);
- spin_lock(&nal_data->gm_lock);
- gm_close(nal_data->gm_port);
+ gmnal_free_txd(gmnalni);
+ gmnal_free_srxd(gmnalni);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_close(gmnalni->gmni_port);
gm_finalize();
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
- nal_data->gm_local_nid = local_nid;
- CDEBUG(D_INFO, "Local node id is [%u]\n", local_nid);
+ gmnalni->gmni_local_gmid = local_gmid;
+ CDEBUG(D_NET, "Local node id is [%u]\n", local_gmid);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_node_id_to_global_id(nal_data->gm_port, local_nid,
- &global_nid);
- spin_unlock(&nal_data->gm_lock);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_status = gm_node_id_to_global_id(gmnalni->gmni_port,
+ local_gmid,
+ &global_gmid);
+ spin_unlock(&gmnalni->gmni_gm_lock);
if (gm_status != GM_SUCCESS) {
CERROR("failed to obtain global id\n");
- gmnal_stop_rxthread(nal_data);
- gmnal_stop_ctthread(nal_data);
- gmnal_free_txd(nal_data);
- gmnal_free_srxd(nal_data);
- spin_lock(&nal_data->gm_lock);
- gm_close(nal_data->gm_port);
+ gmnal_stop_rxthread(gmnalni);
+ gmnal_stop_ctthread(gmnalni);
+ gmnal_free_txd(gmnalni);
+ gmnal_free_srxd(gmnalni);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_close(gmnalni->gmni_port);
gm_finalize();
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
- CDEBUG(D_INFO, "Global node id is [%u]\n", global_nid);
- nal_data->gm_global_nid = global_nid;
- snprintf(global_nid_str, GLOBAL_NID_STR_LEN, "%u", global_nid);
+ CDEBUG(D_NET, "Global node id is [%u]\n", global_gmid);
+ gmnalni->gmni_global_gmid = global_gmid;
/*
pid = gm_getpid();
*/
process_id.pid = requested_pid;
- process_id.nid = global_nid;
+ process_id.nid = global_gmid;
- CDEBUG(D_INFO, "portals_pid is [%u]\n", process_id.pid);
- CDEBUG(D_INFO, "portals_nid is ["LPU64"]\n", process_id.nid);
+ CDEBUG(D_NET, "portals_pid is [%u]\n", process_id.pid);
+ CDEBUG(D_NET, "portals_nid is ["LPU64"]\n", process_id.nid);
CDEBUG(D_PORTALS, "calling lib_init\n");
if (lib_init(libnal, nal, process_id,
requested_limits, actual_limits) != PTL_OK) {
CERROR("lib_init failed\n");
- gmnal_stop_rxthread(nal_data);
- gmnal_stop_ctthread(nal_data);
- gmnal_free_txd(nal_data);
- gmnal_free_srxd(nal_data);
- spin_lock(&nal_data->gm_lock);
- gm_close(nal_data->gm_port);
+ gmnal_stop_rxthread(gmnalni);
+ gmnal_stop_ctthread(gmnalni);
+ gmnal_free_txd(gmnalni);
+ gmnal_free_srxd(gmnalni);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_close(gmnalni->gmni_port);
gm_finalize();
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
* Now that we have initialised the portals library, start receive threads,
* we do this to avoid processing messages before we can parse them
*/
- gmnal_start_kernel_threads(nal_data);
+ gmnal_start_kernel_threads(gmnalni);
- while (nal_data->rxthread_flag != GMNAL_RXTHREADS_STARTED) {
+ while (gmnalni->gmni_rxthread_flag != GMNAL_RXTHREADS_STARTED) {
gmnal_yield(1);
- CDEBUG(D_INFO, "Waiting for receive thread signs of life\n");
+ CDEBUG(D_NET, "Waiting for receive thread signs of life\n");
}
- CDEBUG(D_INFO, "receive thread seems to have started\n");
+ CDEBUG(D_NET, "receive thread seems to have started\n");
if (libcfs_nal_cmd_register(GMNAL, &gmnal_cmd, libnal->libnal_data) != 0) {
- CDEBUG(D_INFO, "libcfs_nal_cmd_register failed\n");
+ CDEBUG(D_NET, "libcfs_nal_cmd_register failed\n");
/* XXX these cleanup cases should be restructured to
* minimise duplication... */
lib_fini(libnal);
- gmnal_stop_rxthread(nal_data);
- gmnal_stop_ctthread(nal_data);
- gmnal_free_txd(nal_data);
- gmnal_free_srxd(nal_data);
- spin_lock(&nal_data->gm_lock);
- gm_close(nal_data->gm_port);
+ gmnal_stop_rxthread(gmnalni);
+ gmnal_stop_ctthread(gmnalni);
+ gmnal_free_txd(gmnalni);
+ gmnal_free_srxd(gmnalni);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_close(gmnalni->gmni_port);
gm_finalize();
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
- /* might be better to initialise this at module load rather than in
- * NAL startup */
- nal_data->sysctl = NULL;
- nal_data->sysctl = register_sysctl_table (gmnalnal_top_sysctl_table, 0);
-
- CDEBUG(D_INFO, "gmnal_init finished\n");
-
- global_nal_data = libnal->libnal_data;
+ CDEBUG(D_NET, "gmnal_init finished\n");
return(PTL_OK);
}
{
int rc;
- CDEBUG(D_INFO, "reset nal[%p]\n", &the_gm_nal);
+ CDEBUG(D_NET, "reset nal[%p]\n", &the_gm_nal);
the_gm_nal = (nal_t) {
.nal_ni_init = gmnal_api_startup,
PtlNIFini(kgmnal_ni);
ptl_unregister_nal(GMNAL);
- LASSERT(global_nal_data == NULL);
}
unsigned int niov, struct iovec *iov, size_t offset,
size_t mlen, size_t rlen)
{
- void *buffer = NULL;
gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- int status = PTL_OK;
- size_t msglen = mlen;
- size_t nob;
+ size_t nobleft = mlen;
+ void *buffer = NULL;
+ size_t nob;
CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
"niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
libnal, private, cookie, niov, iov, offset, mlen, rlen);
- switch(srxd->type) {
- case(GMNAL_SMALL_MESSAGE):
- CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
- /* HP SFS 1380: Proactively change receives to avoid a receive
- * side occurrence of filling pkmap_count[].
- */
- buffer = srxd->buffer;
- buffer += sizeof(gmnal_msghdr_t);
- buffer += sizeof(ptl_hdr_t);
-
- while(niov--) {
- if (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- } else {
- nob = MIN (iov->iov_len - offset, msglen);
- CDEBUG(D_INFO, "processing iov [%p] base [%p] "
- "offset [%d] len ["LPSZ"] to [%p] left "
- "["LPSZ"]\n", iov, iov->iov_base,
- offset, nob, buffer, msglen);
- gm_bcopy(buffer, iov->iov_base + offset, nob);
- buffer += nob;
- msglen -= nob;
- offset = 0;
- }
- iov++;
- }
- status = gmnal_small_rx(libnal, private, cookie);
- break;
- case(GMNAL_LARGE_MESSAGE_INIT):
- CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
- status = gmnal_large_rx(libnal, private, cookie, niov,
- iov, offset, mlen, rlen);
- }
+ LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE);
+
+ buffer = srxd->rx_buffer;
+ buffer += sizeof(gmnal_msghdr_t);
+ buffer += sizeof(ptl_hdr_t);
+
+ while(nobleft > 0) {
+ LASSERT (niov > 0);
+
+ if (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ } else {
+ nob = MIN (iov->iov_len - offset, nobleft);
- CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
- return(status);
+ gm_bcopy(buffer, iov->iov_base + offset, nob);
+
+ buffer += nob;
+ nobleft -= nob;
+ offset = 0;
+ }
+ niov--;
+ iov++;
+ }
+
+ lib_finalize(libnal, private, cookie, PTL_OK);
+ return PTL_OK;
}
ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
- lib_msg_t *cookie, unsigned int kniov,
+ lib_msg_t *cookie, unsigned int nkiov,
ptl_kiov_t *kiov, size_t offset, size_t mlen,
size_t rlen)
{
gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- int status = PTL_OK;
- char *ptr = NULL;
- void *buffer = NULL;
-
+ size_t nobleft = mlen;
+ size_t nob;
+ char *ptr;
+ void *buffer;
CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
- "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
-
- if (srxd->type == GMNAL_SMALL_MESSAGE) {
- size_t msglen = mlen;
- size_t nob;
-
- buffer = srxd->buffer;
- buffer += sizeof(gmnal_msghdr_t);
- buffer += sizeof(ptl_hdr_t);
-
- /*
- * map each page and create an iovec for it
- */
- while (kniov--) {
- /* HP SFS 1380: Proactively change receives to avoid a
- * receive side occurrence of filling pkmap_count[].
- */
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
- kniov, kiov);
-
- if (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- } else {
- nob = MIN (kiov->kiov_len - offset, msglen);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] "
- "offset[%d]\n", kiov->kiov_page,
- kiov->kiov_len, kiov->kiov_offset);
- ptr = ((char *)kmap(kiov->kiov_page)) +
- kiov->kiov_offset;
-
- CDEBUG(D_INFO, "processing ptr [%p] offset [%d] "
- "len ["LPSZ"] from [%p] left ["LPSZ"]\n",
- ptr, offset, nob, buffer, msglen);
- gm_bcopy(buffer, ptr + offset, nob);
- kunmap(kiov->kiov_page);
- buffer += nob;
- msglen -= nob;
- offset = 0;
- }
- kiov++;
+ "cookie[%p], kniov[%d], kiov [%p], "
+ "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+ libnal, private, cookie, nkiov, kiov, offset, mlen, rlen);
+
+ LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE);
+
+ buffer = srxd->rx_buffer;
+ buffer += sizeof(gmnal_msghdr_t);
+ buffer += sizeof(ptl_hdr_t);
+
+ while (nobleft > 0) {
+ LASSERT (nkiov > 0);
+
+ if (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ } else {
+ nob = MIN (kiov->kiov_len - offset, nobleft);
+
+ ptr = ((char *)kmap(kiov->kiov_page)) +
+ kiov->kiov_offset;
+
+ gm_bcopy(buffer, ptr + offset, nob);
+
+ kunmap(kiov->kiov_page);
+
+ buffer += nob;
+ nobleft -= nob;
+ offset = 0;
}
- CDEBUG(D_INFO, "calling gmnal_small_rx\n");
- status = gmnal_small_rx(libnal, private, cookie);
+ kiov++;
+ nkiov--;
}
- CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
- return(status);
+ lib_finalize(libnal, private, cookie, PTL_OK);
+
+ return PTL_OK;
}
ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov, size_t offset,
- size_t len)
+ unsigned int niov, struct iovec *iov,
+ size_t offset, size_t len)
{
- gmnal_data_t *nal_data;
+ gmnal_ni_t *gmnalni = libnal->libnal_data;
void *buffer = NULL;
gmnal_stxd_t *stxd = NULL;
-
-
- CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
- "] nid["LPU64"]\n", niov, offset, len, nid);
- nal_data = libnal->libnal_data;
- if (!nal_data) {
- CERROR("no nal_data\n");
- return(PTL_FAIL);
- } else {
- CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
- }
-
- if (gmnal_is_small_msg(nal_data, niov, iov, len)) {
- size_t msglen = len;
- size_t nob;
-
- CDEBUG(D_INFO, "This is a small message send\n");
- /*
- * HP SFS 1380: With the change to gmnal_small_tx, need to get
- * the stxd and do relevant setup here
- */
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_INFO, "stxd [%p]\n", stxd);
- /* Set the offset of the data to copy into the buffer */
- buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
- while(niov--) {
- if (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- } else {
- nob = MIN (iov->iov_len - offset, msglen);
- CDEBUG(D_INFO, "processing iov [%p] base [%p]"
- " offset [%d] len ["LPSZ"] to [%p] left"
- " ["LPSZ"]\n", iov, iov->iov_base,
- offset, nob, buffer, msglen);
- gm_bcopy(iov->iov_base + offset, buffer, nob);
- buffer += nob;
- msglen -= nob;
- offset = 0;
- }
- iov++;
- }
- gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
- stxd, len);
- } else {
- CERROR("Large message send is not supported\n");
- lib_finalize(libnal, private, cookie, PTL_FAIL);
- return(PTL_FAIL);
- gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
- niov, iov, offset, len);
- }
- return(PTL_OK);
+ size_t nobleft = len;
+ size_t nob;
+ ptl_err_t rc;
+
+ CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] "
+ "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid);
+
+ if ((nid >> 32) != 0) {
+ CERROR("Illegal nid: "LPU64"\n", nid);
+ return PTL_FAIL;
+ }
+
+ stxd = gmnal_get_stxd(gmnalni, 1);
+ CDEBUG(D_NET, "stxd [%p]\n", stxd);
+
+ /* Set the offset of the data to copy into the buffer */
+ buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
+
+ while(nobleft > 0) {
+ LASSERT (niov > 0);
+
+ if (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ } else {
+ nob = MIN (iov->iov_len - offset, nobleft);
+
+ gm_bcopy(iov->iov_base + offset, buffer, nob);
+
+ buffer += nob;
+ nobleft -= nob;
+ offset = 0;
+ }
+ niov--;
+ iov++;
+ }
+
+ rc = gmnal_small_tx(libnal, private, cookie, hdr, type,
+ nid, stxd, len);
+ if (rc != PTL_OK)
+ gmnal_return_stxd(gmnalni, stxd);
+
+ return rc;
}
ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
- ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
+ ptl_nid_t nid, ptl_pid_t pid, unsigned int nkiov,
ptl_kiov_t *kiov, size_t offset, size_t len)
{
- gmnal_data_t *nal_data;
- char *ptr;
+ gmnal_ni_t *gmnalni = libnal->libnal_data;
void *buffer = NULL;
gmnal_stxd_t *stxd = NULL;
- ptl_err_t status = PTL_OK;
+ size_t nobleft = len;
+ char *ptr;
+ ptl_err_t rc;
+ size_t nob;
CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
- LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
- nal_data = libnal->libnal_data;
- if (!nal_data) {
- CERROR("no nal_data\n");
- return(PTL_FAIL);
- } else {
- CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
- }
+ LPSZ"] len["LPSZ"]\n", nid, nkiov, offset, len);
+
+ if ((nid >> 32) != 0) {
+ CERROR("Illegal nid: "LPU64"\n", nid);
+ return PTL_FAIL;
+ }
+
+ stxd = gmnal_get_stxd(gmnalni, 1);
+ CDEBUG(D_NET, "stxd [%p]\n", stxd);
- /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
- * more aggressively. This is the fix for a livelock situation under
- * load on ia32 that occurs when there are no more available entries in
- * the pkmap_count array. Just fill the buffer and let gmnal_small_tx
- * put the headers in after we pass it the stxd pointer.
- */
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_INFO, "stxd [%p]\n", stxd);
/* Set the offset of the data to copy into the buffer */
- buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
-
- if (gmnal_is_small_msg(nal_data, 0, NULL, len)) {
- size_t msglen = len;
- size_t nob;
-
- CDEBUG(D_INFO, "This is a small message send\n");
-
- while(kniov--) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
- if (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- } else {
- nob = MIN (kiov->kiov_len - offset, msglen);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
-
- ptr = ((char *)kmap(kiov->kiov_page)) +
- kiov->kiov_offset;
-
- CDEBUG(D_INFO, "processing ptr [%p] offset [%d]"
- " len ["LPSZ"] to [%p] left ["LPSZ"]\n",
- ptr, offset, nob, buffer, msglen);
- gm_bcopy(ptr + offset, buffer, nob);
- kunmap(kiov->kiov_page);
- buffer += nob;
- msglen -= nob;
- offset = 0;
- }
- kiov++;
- }
- status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
- pid, stxd, len);
- } else {
- int i = 0;
- struct iovec *iovec = NULL, *iovec_dup = NULL;
- ptl_kiov_t *kiov_dup = kiov;
-
- PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
- iovec_dup = iovec;
- CERROR("Large message send it is not supported yet\n");
- PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
- return(PTL_FAIL);
- for (i=0; i<kniov; i++) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
-
- iovec->iov_base = kmap(kiov->kiov_page)
- + kiov->kiov_offset;
- iovec->iov_len = kiov->kiov_len;
- iovec++;
- kiov++;
- }
- gmnal_large_tx(libnal, private, cookie, hdr, type, nid,
- pid, kniov, iovec, offset, len);
- for (i=0; i<kniov; i++) {
- kunmap(kiov_dup->kiov_page);
- kiov_dup++;
- }
- PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
- }
- return(status);
+ buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
+
+ while (nobleft > 0) {
+ LASSERT (nkiov > 0);
+
+ if (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ } else {
+ nob = MIN (kiov->kiov_len - offset, nobleft);
+
+ ptr = ((char *)kmap(kiov->kiov_page)) +
+ kiov->kiov_offset;
+
+ gm_bcopy(ptr + offset, buffer, nob);
+
+ kunmap(kiov->kiov_page);
+
+ buffer += nob;
+ nobleft -= nob;
+ offset = 0;
+ }
+ nkiov--;
+ kiov++;
+ }
+
+ rc = gmnal_small_tx(libnal, private, cookie, hdr, type,
+ nid, stxd, len);
+
+ if (rc != PTL_OK)
+ gmnal_return_stxd(gmnalni, stxd);
+
+ return rc;
}
int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
{
CDEBUG(D_TRACE, "gmnal_cb_dist\n");
- if (dist)
- *dist = 27;
- return(PTL_OK);
+
+ if (dist != NULL)
+ *dist = 1;
+
+ return PTL_OK;
}
int
gmnal_ct_thread(void *arg)
{
- gmnal_data_t *nal_data;
+ gmnal_ni_t *gmnalni;
gm_recv_event_t *rxevent = NULL;
gm_recv_t *recv = NULL;
if (!arg) {
- CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
+ CDEBUG(D_NET, "NO gmnalni. Exiting\n");
return(-1);
}
- nal_data = (gmnal_data_t*)arg;
- CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
+ gmnalni = (gmnal_ni_t*)arg;
+ CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
sprintf(current->comm, "gmnal_ct");
kportal_daemonize("gmnalctd");
- nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
+ gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
- spin_lock(&nal_data->gm_lock);
- while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
+ spin_lock(&gmnalni->gmni_gm_lock);
+ while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
CDEBUG(D_NET, "waiting\n");
- rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
- if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
- CDEBUG(D_INFO, "time to exit\n");
+ rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
+ if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
+ CDEBUG(D_NET, "time to exit\n");
break;
}
- CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
+ CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent(rxevent));
switch (GM_RECV_EVENT_TYPE(rxevent)) {
case(GM_RECV_EVENT):
CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
recv = (gm_recv_t*)&rxevent->recv;
- spin_unlock(&nal_data->gm_lock);
- gmnal_add_rxtwe(nal_data, recv);
- spin_lock(&nal_data->gm_lock);
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ gmnal_add_rxtwe(gmnalni, recv);
+ spin_lock(&gmnalni->gmni_gm_lock);
CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
break;
case(_GM_SLEEP_EVENT):
* Don't know what this is
*/
CDEBUG(D_NET, "Sleeping in gm_unknown\n");
- spin_unlock(&nal_data->gm_lock);
- gm_unknown(nal_data->gm_port, rxevent);
- spin_lock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "Awake from gm_unknown\n");
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ gm_unknown(gmnalni->gmni_port, rxevent);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ CDEBUG(D_NET, "Awake from gm_unknown\n");
break;
default:
* FAST_RECV_EVENTS here.
*/
CDEBUG(D_NET, "Passing event to gm_unknown\n");
- spin_unlock(&nal_data->gm_lock);
- gm_unknown(nal_data->gm_port, rxevent);
- spin_lock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "Processed unknown event\n");
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ gm_unknown(gmnalni->gmni_port, rxevent);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ CDEBUG(D_NET, "Processed unknown event\n");
}
}
- spin_unlock(&nal_data->gm_lock);
- nal_data->ctthread_flag = GMNAL_THREAD_RESET;
- CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
- return(GMNAL_STATUS_OK);
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
+ CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
+
+ return 0;
}
/*
* process a receive event
*/
-int gmnal_rx_thread(void *arg)
+int
+gmnal_rx_thread(void *arg)
{
char name[16];
- gmnal_data_t *nal_data;
+ gmnal_ni_t *gmnalni;
void *buffer;
gmnal_rxtwe_t *we = NULL;
int rank;
if (!arg) {
- CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
+ CDEBUG(D_NET, "NO gmnalni. Exiting\n");
return(-1);
}
- nal_data = (gmnal_data_t*)arg;
- CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
+ gmnalni = (gmnal_ni_t*)arg;
+ CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
for (rank=0; rank<num_rx_threads; rank++)
- if (nal_data->rxthread_pid[rank] == current->pid)
+ if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
break;
snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
-
kportal_daemonize(name);
+
/*
* set 1 bit for each thread started
* doesn't matter which bit
*/
- spin_lock(&nal_data->rxthread_flag_lock);
- if (nal_data->rxthread_flag)
- nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
+ spin_lock(&gmnalni->gmni_rxthread_flag_lock);
+ if (gmnalni->gmni_rxthread_flag)
+ gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
else
- nal_data->rxthread_flag = 1;
- CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
- spin_unlock(&nal_data->rxthread_flag_lock);
+ gmnalni->gmni_rxthread_flag = 1;
+ CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
+ spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
- while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
+ while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
- we = gmnal_get_rxtwe(nal_data);
+ we = gmnal_get_rxtwe(gmnalni);
if (!we) {
- CDEBUG(D_INFO, "Receive thread time to exit\n");
+ CDEBUG(D_NET, "Receive thread time to exit\n");
break;
}
buffer = we->buffer;
- switch(((gmnal_msghdr_t*)buffer)->type) {
+ switch(((gmnal_msghdr_t*)buffer)->gmm_type) {
case(GMNAL_SMALL_MESSAGE):
- gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE);
- break;
- case(GMNAL_LARGE_MESSAGE_INIT):
- gmnal_pre_receive(nal_data,we,GMNAL_LARGE_MESSAGE_INIT);
- break;
- case(GMNAL_LARGE_MESSAGE_ACK):
- gmnal_pre_receive(nal_data, we,GMNAL_LARGE_MESSAGE_ACK);
+ gmnal_pre_receive(gmnalni, we, GMNAL_SMALL_MESSAGE);
break;
default:
+#warning better handling
CERROR("Unsupported message type\n");
- gmnal_rx_bad(nal_data, we, NULL);
+ gmnal_rx_bad(gmnalni, we);
}
PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
}
- spin_lock(&nal_data->rxthread_flag_lock);
- nal_data->rxthread_flag/=2;
- CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
- spin_unlock(&nal_data->rxthread_flag_lock);
- CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
- return(GMNAL_STATUS_OK);
+ spin_lock(&gmnalni->gmni_rxthread_flag_lock);
+ gmnalni->gmni_rxthread_flag/=2;
+ CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
+ spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
+ CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
+
+ return 0;
}
* which hands back to gmnal_small_receive
* Deal with all endian stuff here.
*/
-int
-gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
+void
+gmnal_pre_receive(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we, int gmnal_type)
{
gmnal_srxd_t *srxd = NULL;
void *buffer = NULL;
- unsigned int snode, sport, type, length;
gmnal_msghdr_t *gmnal_msghdr;
ptl_hdr_t *portals_hdr;
- int rc;
- CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
- nal_data, we, gmnal_type);
+ CDEBUG(D_NET, "gmnalni [%p], we[%p] type [%d]\n",
+ gmnalni, we, gmnal_type);
buffer = we->buffer;
- snode = we->snode;
- sport = we->sport;
- type = we->type;
- buffer = we->buffer;
- length = we->length;
gmnal_msghdr = (gmnal_msghdr_t*)buffer;
portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t));
- CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
+ CDEBUG(D_NET, "rx_event:: Sender node [%d], Sender Port [%d], "
"type [%d], length [%d], buffer [%p]\n",
- snode, sport, type, length, buffer);
- CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
- "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
- gmnal_msghdr->magic, gmnal_msghdr->type);
- CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
+ we->snode, we->sport, we->type, we->length, buffer);
+ CDEBUG(D_NET, "gmnal_msghdr:: Sender node [%u], magic [%d], "
+ "gmnal_type [%d]\n", gmnal_msghdr->gmm_sender_gmid,
+ gmnal_msghdr->gmm_magic, gmnal_msghdr->gmm_type);
+ CDEBUG(D_NET, "portals_hdr:: Sender node ["LPD64"], "
"dest_node ["LPD64"]\n", portals_hdr->src_nid,
portals_hdr->dest_nid);
/*
* Get a receive descriptor for this message
*/
- srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
- CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
+ srxd = gmnal_rxbuffer_to_srxd(gmnalni, buffer);
+ CDEBUG(D_NET, "Back from gmnal_rxbuffer_to_srxd\n");
if (!srxd) {
CERROR("Failed to get receive descriptor\n");
- /* I think passing a NULL srxd to lib_parse will crash
- * gmnal_recv() */
LBUG();
- lib_parse(nal_data->libnal, portals_hdr, srxd);
- return(GMNAL_STATUS_FAIL);
}
- /*
- * no need to bother portals library with this
- */
- if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
- gmnal_large_tx_ack_received(nal_data, srxd);
- return(GMNAL_STATUS_OK);
- }
-
- srxd->nal_data = nal_data;
- srxd->type = gmnal_type;
- srxd->nsiov = gmnal_msghdr->niov;
- srxd->gm_source_node = gmnal_msghdr->sender_node_id;
+ srxd->rx_gmni = gmnalni;
+ srxd->rx_type = gmnal_type;
+ srxd->rx_nsiov = gmnal_msghdr->gmm_niov;
+ srxd->rx_sender_gmid = gmnal_msghdr->gmm_sender_gmid;
CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
buffer+sizeof(gmnal_msghdr_t));
- /*
- * control passes to lib, which calls cb_recv
- * cb_recv is responsible for returning the buffer
- * for future receive
- */
- rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
- if (rc != PTL_OK) {
- /* I just received garbage; return the srxd for use */
- CWARN("Returning srxd and discarding message, "
- "lib_parse didn't like it.\n");
- return(gmnal_rx_bad(nal_data, we, srxd));
- }
+ (void)lib_parse(gmnalni->gmni_libnal, portals_hdr, srxd);
+ /* Ignore error; we're connectionless */
- return(GMNAL_STATUS_OK);
+ gmnal_rx_requeue_buffer(gmnalni, srxd);
}
* hang out the receive buffer again.
* This implicitly returns a receive token.
*/
-int
-gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
+void
+gmnal_rx_requeue_buffer(gmnal_ni_t *gmnalni, gmnal_srxd_t *srxd)
{
- CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
-
- CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
+ CDEBUG(D_NET, "requeueing srxd[%p] gmnalni[%p]\n", srxd, gmnalni);
- spin_lock(&nal_data->gm_lock);
- gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
- srxd->gmsize, GM_LOW_PRIORITY, 0 );
- spin_unlock(&nal_data->gm_lock);
-
- return(GMNAL_STATUS_OK);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, srxd->rx_buffer,
+ srxd->rx_gmsize, GM_LOW_PRIORITY, 0 );
+ spin_unlock(&gmnalni->gmni_gm_lock);
}
* Handle a bad message
* A bad message is one we don't expect or can't interpret
*/
-int
-gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
+void
+gmnal_rx_bad(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we)
{
- CDEBUG(D_TRACE, "Can't handle message\n");
-
- if (!srxd)
- srxd = gmnal_rxbuffer_to_srxd(nal_data,
- we->buffer);
- if (srxd) {
- gmnal_rx_requeue_buffer(nal_data, srxd);
- } else {
+ gmnal_srxd_t *srxd = gmnal_rxbuffer_to_srxd(gmnalni,
+ we->buffer);
+ if (srxd == NULL) {
CERROR("Can't find a descriptor for this buffer\n");
- /*
- * get rid of it ?
- */
- return(GMNAL_STATUS_FAIL);
+ return;
}
- return(GMNAL_STATUS_OK);
+ gmnal_rx_requeue_buffer(gmnalni, srxd);
}
/*
- * Process a small message receive.
- * Get here from gmnal_receive_thread, gmnal_pre_receive
- * lib_parse, cb_recv
- * Put data from prewired receive buffer into users buffer(s)
- * Hang out the receive buffer again for another receive
- * Call lib_finalize
- */
-ptl_err_t
-gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
-{
- gmnal_srxd_t *srxd = NULL;
- gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
-
-
- if (!private) {
- CERROR("gmnal_small_rx no context\n");
- lib_finalize(libnal, private, cookie, PTL_FAIL);
- return(PTL_FAIL);
- }
-
- srxd = (gmnal_srxd_t*)private;
-
- /*
- * let portals library know receive is complete
- */
- CDEBUG(D_PORTALS, "calling lib_finalize\n");
- lib_finalize(libnal, private, cookie, PTL_OK);
- /*
- * return buffer so it can be used again
- */
- CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
- spin_lock(&nal_data->gm_lock);
- gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
- srxd->gmsize, GM_LOW_PRIORITY, 0);
- spin_unlock(&nal_data->gm_lock);
-
- return(PTL_OK);
-}
-
-
-/*
* Start a small transmit.
* Use the given send token (and wired transmit buffer).
* Copy headers to wired buffer and initiate gm_send from the wired buffer.
*/
ptl_err_t
gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
+ ptl_hdr_t *hdr, int type, ptl_nid_t nid,
gmnal_stxd_t *stxd, int size)
{
- gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
+ gmnal_ni_t *gmnalni = (gmnal_ni_t*)libnal->libnal_data;
void *buffer = NULL;
gmnal_msghdr_t *msghdr = NULL;
int tot_size = 0;
- unsigned int local_nid;
gm_status_t gm_status = GM_SUCCESS;
- CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
- "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
+ CDEBUG(D_NET, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
+ "hdr [%p] type [%d] nid ["LPU64"] stxd [%p] "
"size [%d]\n", libnal, private, cookie, hdr, type,
- global_nid, pid, stxd, size);
+ nid, stxd, size);
- CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
+ CDEBUG(D_NET, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
hdr->dest_nid, hdr->src_nid);
- if (!nal_data) {
- CERROR("no nal_data\n");
- return(PTL_FAIL);
- } else {
- CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
- }
+ LASSERT ((nid >> 32) == 0);
+ LASSERT (gmnalni != NULL);
+
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid,
+ &stxd->tx_gmlid);
+ spin_unlock(&gmnalni->gmni_gm_lock);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
- &local_nid);
- spin_unlock(&nal_data->gm_lock);
if (gm_status != GM_SUCCESS) {
CERROR("Failed to obtain local id\n");
return(PTL_FAIL);
}
- CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
- stxd->type = GMNAL_SMALL_MESSAGE;
- stxd->cookie = cookie;
+ CDEBUG(D_NET, "Local Node_id is [%u][%x]\n",
+ stxd->tx_gmlid, stxd->tx_gmlid);
+
+ stxd->tx_nid = nid;
+ stxd->tx_cookie = cookie;
+ stxd->tx_type = GMNAL_SMALL_MESSAGE;
+ stxd->tx_gm_priority = GM_LOW_PRIORITY;
/*
* Copy gmnal_msg_hdr and portals header to the transmit buffer
* Then send the message, as the data has previously been copied in
* (HP SFS 1380).
*/
- buffer = stxd->buffer;
+ buffer = stxd->tx_buffer;
msghdr = (gmnal_msghdr_t*)buffer;
- msghdr->magic = GMNAL_MAGIC;
- msghdr->type = GMNAL_SMALL_MESSAGE;
- msghdr->sender_node_id = nal_data->gm_global_nid;
- CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
+ msghdr->gmm_magic = GMNAL_MAGIC;
+ msghdr->gmm_type = GMNAL_SMALL_MESSAGE;
+ msghdr->gmm_sender_gmid = gmnalni->gmni_global_gmid;
+ CDEBUG(D_NET, "processing msghdr at [%p]\n", buffer);
buffer += sizeof(gmnal_msghdr_t);
- CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
+ CDEBUG(D_NET, "processing portals hdr at [%p]\n", buffer);
gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
buffer += sizeof(ptl_hdr_t);
- CDEBUG(D_INFO, "sending\n");
+ CDEBUG(D_NET, "sending\n");
tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
- stxd->msg_size = tot_size;
-
+ stxd->tx_msg_size = tot_size;
CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
- "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
- "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
- stxd->msg_size, global_nid, local_nid, stxd);
-
- spin_lock(&nal_data->gm_lock);
- stxd->gm_priority = GM_LOW_PRIORITY;
- stxd->gm_target_node = local_nid;
- gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
- stxd->gm_size, stxd->msg_size,
- GM_LOW_PRIORITY, local_nid,
+ "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
+ "stxd [%p]\n", gmnalni->gmni_port, stxd->tx_buffer,
+ stxd->tx_gm_size, stxd->tx_msg_size, nid, stxd->tx_gmlid,
+ stxd);
+
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_send_to_peer_with_callback(gmnalni->gmni_port, stxd->tx_buffer,
+ stxd->tx_gm_size, stxd->tx_msg_size,
+ stxd->tx_gm_priority, stxd->tx_gmlid,
gmnal_small_tx_callback, (void*)stxd);
- spin_unlock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "done\n");
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ CDEBUG(D_NET, "done\n");
return(PTL_OK);
}
gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
{
gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
- lib_msg_t *cookie = stxd->cookie;
- gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
- lib_nal_t *libnal = nal_data->libnal;
- unsigned gnid = 0;
- gm_status_t gm_status = 0;
+ lib_msg_t *cookie = stxd->tx_cookie;
+ gmnal_ni_t *gmnalni = stxd->tx_gmni;
+ lib_nal_t *libnal = gmnalni->gmni_libnal;
if (!stxd) {
- CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
+ CDEBUG(D_NET, "send completion event for unknown stxd\n");
return;
}
- if (status != GM_SUCCESS) {
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_node_id_to_global_id(nal_data->gm_port,
- stxd->gm_target_node,&gnid);
- spin_unlock(&nal_data->gm_lock);
- if (gm_status != GM_SUCCESS) {
- CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
- gm_status);
- gnid = 0;
- }
- CERROR("Result of send stxd [%p] is [%s] to [%u]\n",
- stxd, gmnal_gm_error(status), gnid);
- }
+ if (status != GM_SUCCESS)
+ CERROR("Result of send stxd [%p] is [%s] to ["LPU64"]\n",
+ stxd, gmnal_gm_error(status), stxd->tx_nid);
switch(status) {
case(GM_SUCCESS):
* do a resend on the dropped ones
*/
CERROR("send stxd [%p] dropped, resending\n", context);
- spin_lock(&nal_data->gm_lock);
- gm_send_to_peer_with_callback(nal_data->gm_port,
- stxd->buffer,
- stxd->gm_size,
- stxd->msg_size,
- stxd->gm_priority,
- stxd->gm_target_node,
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_send_to_peer_with_callback(gmnalni->gmni_port,
+ stxd->tx_buffer,
+ stxd->tx_gm_size,
+ stxd->tx_msg_size,
+ stxd->tx_gm_priority,
+ stxd->tx_gmlid,
gmnal_small_tx_callback,
context);
- spin_unlock(&nal_data->gm_lock);
+ spin_unlock(&gmnalni->gmni_gm_lock);
return;
case(GM_TIMED_OUT):
case(GM_SEND_TIMED_OUT):
/*
* drop these ones
*/
- CDEBUG(D_INFO, "calling gm_drop_sends\n");
- spin_lock(&nal_data->gm_lock);
- gm_drop_sends(nal_data->gm_port, stxd->gm_priority,
- stxd->gm_target_node, gm_port_id,
+ CDEBUG(D_NET, "calling gm_drop_sends\n");
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_drop_sends(gmnalni->gmni_port, stxd->tx_gm_priority,
+ stxd->tx_gmlid, gm_port_id,
gmnal_drop_sends_callback, context);
- spin_unlock(&nal_data->gm_lock);
+ spin_unlock(&gmnalni->gmni_gm_lock);
return;
case(GM_FIRMWARE_NOT_RUNNING):
case(GM_YP_NO_MATCH):
default:
- gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
- stxd->gm_target_node, gm_port_id,
- gmnal_resume_sending_callback, context);
+ gm_resume_sending(gmnalni->gmni_port, stxd->tx_gm_priority,
+ stxd->tx_gmlid, gm_port_id,
+ gmnal_resume_sending_callback, context);
return;
}
- /*
- * TO DO
- * If this is a large message init,
- * we're not finished with the data yet,
- * so can't call lib_finalise.
- * However, we're also holding on to a
- * stxd here (to keep track of the source
- * iovec only). Should use another structure
- * to keep track of iovec and return stxd to
- * free list earlier.
- */
- if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
- CDEBUG(D_INFO, "large transmit done\n");
- return;
- }
- gmnal_return_stxd(nal_data, stxd);
+ gmnal_return_stxd(gmnalni, stxd);
lib_finalize(libnal, stxd, cookie, PTL_OK);
return;
}
gm_status_t status)
{
gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
- gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
- CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
- gmnal_return_stxd(stxd->nal_data, stxd);
- lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL);
+ gmnal_ni_t *gmnalni = stxd->tx_gmni;
+
+ CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
+ gmnal_return_stxd(gmnalni, stxd);
+ lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
return;
}
gm_status_t status)
{
gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
- gmnal_data_t *nal_data = stxd->nal_data;
+ gmnal_ni_t *gmnalni = stxd->tx_gmni;
- CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
+ CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
if (status == GM_SUCCESS) {
- spin_lock(&nal_data->gm_lock);
- gm_send_to_peer_with_callback(gm_port, stxd->buffer,
- stxd->gm_size, stxd->msg_size,
- stxd->gm_priority,
- stxd->gm_target_node,
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_send_to_peer_with_callback(gm_port, stxd->tx_buffer,
+ stxd->tx_gm_size,
+ stxd->tx_msg_size,
+ stxd->tx_gm_priority,
+ stxd->tx_gmlid,
gmnal_small_tx_callback,
context);
- spin_unlock(&nal_data->gm_lock);
+ spin_unlock(&gmnalni->gmni_gm_lock);
} else {
CERROR("send_to_peer status for stxd [%p] is "
"[%d][%s]\n", stxd, status, gmnal_gm_error(status));
/* Recycle the stxd */
- gmnal_return_stxd(nal_data, stxd);
- lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL);
- }
-
- return;
-}
-
-
-/*
- * Begine a large transmit.
- * Do a gm_register of the memory pointed to by the iovec
- * and send details to the receiver. The receiver does a gm_get
- * to pull the data and sends and ack when finished. Upon receipt of
- * this ack, deregister the memory. Only 1 send token is required here.
- */
-int
-gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov, size_t offset, int size)
-{
-
- gmnal_data_t *nal_data;
- gmnal_stxd_t *stxd = NULL;
- void *buffer = NULL;
- gmnal_msghdr_t *msghdr = NULL;
- unsigned int local_nid;
- int mlen = 0; /* the size of the init message data */
- struct iovec *iov_dup = NULL;
- gm_status_t gm_status;
- int niov_dup;
-
-
- CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
- "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
- "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type,
- global_nid, pid, niov, iov, size);
-
- if (libnal)
- nal_data = (gmnal_data_t*)libnal->libnal_data;
- else {
- CERROR("no libnal.\n");
- return(GMNAL_STATUS_FAIL);
- }
-
-
- /*
- * Get stxd and buffer. Put local address of data in buffer,
- * send local addresses to target,
- * wait for the target node to suck the data over.
- * The stxd is used to ren
- */
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_INFO, "stxd [%p]\n", stxd);
-
- stxd->type = GMNAL_LARGE_MESSAGE_INIT;
- stxd->cookie = cookie;
-
- /*
- * Copy gmnal_msg_hdr and portals header to the transmit buffer
- * Then copy the iov in
- */
- buffer = stxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
-
- CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
- msghdr->magic = GMNAL_MAGIC;
- msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
- msghdr->sender_node_id = nal_data->gm_global_nid;
- msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
- msghdr->niov = niov ;
- buffer += sizeof(gmnal_msghdr_t);
- mlen = sizeof(gmnal_msghdr_t);
- CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
-
- CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
-
- gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
- buffer += sizeof(ptl_hdr_t);
- mlen += sizeof(ptl_hdr_t);
- CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
- while (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- niov--;
- iov++;
- }
-
- LASSERT(offset >= 0);
- /*
- * Store the iovs in the stxd for we can get
- * them later if we need them
- */
- stxd->iov[0].iov_base = iov->iov_base + offset;
- stxd->iov[0].iov_len = iov->iov_len - offset;
- CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
- if (niov > 1)
- gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
- stxd->niov = niov;
-
- /*
- * copy the iov to the buffer so target knows
- * where to get the data from
- */
- CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
- gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
- mlen += stxd->niov*(sizeof(struct iovec));
- CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
- /*
- * register the memory so the NIC can get hold of the data
- * This is a slow process. it'd be good to overlap it
- * with something else.
- */
- iov = stxd->iov;
- iov_dup = iov;
- niov_dup = niov;
- while(niov--) {
- CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
- iov->iov_base, iov->iov_len);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_register_memory(nal_data->gm_port,
- iov->iov_base, iov->iov_len);
- if (gm_status != GM_SUCCESS) {
- spin_unlock(&nal_data->gm_lock);
- CERROR("gm_register_memory returns [%d][%s] "
- "for memory [%p] len ["LPSZ"]\n",
- gm_status, gmnal_gm_error(gm_status),
- iov->iov_base, iov->iov_len);
- spin_lock(&nal_data->gm_lock);
- while (iov_dup != iov) {
- gm_deregister_memory(nal_data->gm_port,
- iov_dup->iov_base,
- iov_dup->iov_len);
- iov_dup++;
- }
- spin_unlock(&nal_data->gm_lock);
- gmnal_return_stxd(nal_data, stxd);
- return(PTL_FAIL);
- }
-
- spin_unlock(&nal_data->gm_lock);
- iov++;
- }
-
- /*
- * Send the init message to the target
- */
- CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
- &local_nid);
- if (gm_status != GM_SUCCESS) {
- spin_unlock(&nal_data->gm_lock);
- CERROR("Failed to obtain local id\n");
- gmnal_return_stxd(nal_data, stxd);
- /* TO DO deregister memory on failure */
- return(GMNAL_STATUS_FAIL);
- }
- CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
- gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
- stxd->gm_size, mlen, GM_LOW_PRIORITY,
- local_nid, gmnal_large_tx_callback,
- (void*)stxd);
- spin_unlock(&nal_data->gm_lock);
-
- CDEBUG(D_INFO, "done\n");
-
- return(PTL_OK);
-}
-
-/*
- * Callback function indicates that send of buffer with
- * large message iovec has completed (or failed).
- */
-void
-gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
-{
- gmnal_small_tx_callback(gm_port, context, status);
-
-}
-
-
-
-/*
- * Have received a buffer that contains an iovec of the sender.
- * Do a gm_register_memory of the receivers buffer and then do a get
- * data from the sender.
- */
-int
-gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- unsigned int nriov, struct iovec *riov, size_t offset,
- size_t mlen, size_t rlen)
-{
- gmnal_data_t *nal_data = libnal->libnal_data;
- gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- void *buffer = NULL;
- struct iovec *riov_dup;
- int nriov_dup;
- gmnal_msghdr_t *msghdr = NULL;
- gm_status_t gm_status;
-
- CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
- "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, cookie, nriov, riov, mlen, rlen);
-
- if (!srxd) {
- CERROR("gmnal_large_rx no context\n");
- lib_finalize(libnal, private, cookie, PTL_FAIL);
- return(PTL_FAIL);
- }
-
- buffer = srxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
- buffer += sizeof(gmnal_msghdr_t);
- buffer += sizeof(ptl_hdr_t);
-
- /*
- * Store the senders stxd address in the srxd for this message
- * The gmnal_large_message_ack needs it to notify the sender
- * the pull of data is complete
- */
- srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
-
- /*
- * Register the receivers memory
- * get the data,
- * tell the sender that we got the data
- * then tell the receiver we got the data
- * TO DO
- * If the iovecs match, could interleave
- * gm_registers and gm_gets for each element
- */
- while (offset >= riov->iov_len) {
- offset -= riov->iov_len;
- riov++;
- nriov--;
- }
- LASSERT (nriov >= 0);
- LASSERT (offset >= 0);
- /*
- * do this so the final gm_get callback can deregister the memory
- */
- PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
-
- srxd->riov[0].iov_base = riov->iov_base + offset;
- srxd->riov[0].iov_len = riov->iov_len - offset;
- if (nriov > 1)
- gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
- srxd->nriov = nriov;
-
- riov = srxd->riov;
- nriov_dup = nriov;
- riov_dup = riov;
- while(nriov--) {
- CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
- riov->iov_base, riov->iov_len);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_register_memory(nal_data->gm_port,
- riov->iov_base, riov->iov_len);
- if (gm_status != GM_SUCCESS) {
- spin_unlock(&nal_data->gm_lock);
- CERROR("gm_register_memory returns [%d][%s] "
- "for memory [%p] len ["LPSZ"]\n",
- gm_status, gmnal_gm_error(gm_status),
- riov->iov_base, riov->iov_len);
- spin_lock(&nal_data->gm_lock);
- while (riov_dup != riov) {
- gm_deregister_memory(nal_data->gm_port,
- riov_dup->iov_base,
- riov_dup->iov_len);
- riov_dup++;
- }
- spin_lock(&nal_data->gm_lock);
- /*
- * give back srxd and buffer. Send NACK to sender
- */
- PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
- return(PTL_FAIL);
- }
- spin_unlock(&nal_data->gm_lock);
- riov++;
+ gmnal_return_stxd(gmnalni, stxd);
+ lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
}
- /*
- * now do gm_get to get the data
- */
- srxd->cookie = cookie;
- if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
- nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
- CERROR("can't get the data");
- }
-
- CDEBUG(D_INFO, "lgmanl_large_rx done\n");
-
- return(PTL_OK);
-}
-
-
-/*
- * Perform a number of remote gets as part of receiving
- * a large message.
- * The final one to complete (i.e. the last callback to get called)
- * tidies up.
- * gm_get requires a send token.
- */
-int
-gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov,
- int nriov, struct iovec *riov)
-{
-
- int ncalls = 0;
-
- CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
- "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
-
-
- ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
- if (ncalls < 0) {
- CERROR("there's something wrong with the iovecs\n");
- return(GMNAL_STATUS_FAIL);
- }
- CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
- spin_lock_init(&srxd->callback_lock);
- srxd->ncallbacks = ncalls;
- srxd->callback_status = 0;
-
- ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
- if (ncalls < 0) {
- CERROR("there's something wrong with the iovecs\n");
- return(GMNAL_STATUS_FAIL);
- }
-
- return(GMNAL_STATUS_OK);
-
-}
-
-
-/*
- * pull data from source node (source iovec) to a local iovec.
- * The iovecs may not match which adds the complications below.
- * Count the number of gm_gets that will be required so the callbacks
- * can determine who is the last one.
- */
-int
-gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
- struct iovec *siov, int nriov, struct iovec *riov)
-{
-
- int ncalls = 0;
- int slen = siov->iov_len, rlen = riov->iov_len;
- char *sbuf = siov->iov_base, *rbuf = riov->iov_base;
- unsigned long sbuf_long;
- gm_remote_ptr_t remote_ptr = 0;
- unsigned int source_node;
- gmnal_ltxd_t *ltxd = NULL;
- gmnal_data_t *nal_data = srxd->nal_data;
-
- CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
- if (do_copy) {
- if (!nal_data) {
- CERROR("Bad args No nal_data\n");
- return(GMNAL_STATUS_FAIL);
- }
- spin_lock(&nal_data->gm_lock);
- if (gm_global_id_to_node_id(nal_data->gm_port,
- srxd->gm_source_node,
- &source_node) != GM_SUCCESS) {
-
- CERROR("cannot resolve global_id [%u] "
- "to local node_id\n", srxd->gm_source_node);
- spin_unlock(&nal_data->gm_lock);
- return(GMNAL_STATUS_FAIL);
- }
- spin_unlock(&nal_data->gm_lock);
- /*
- * We need a send token to use gm_get
- * getting an stxd gets us a send token.
- * the stxd is used as the context to the
- * callback function (so stxd can be returned).
- * Set pointer in stxd to srxd so callback count in srxd
- * can be decremented to find last callback to complete
- */
- CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
- srxd->gm_source_node, source_node);
- }
-
- do {
- CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
- sbuf, slen, rbuf, rlen);
- if (slen > rlen) {
- ncalls++;
- if (do_copy) {
- CDEBUG(D_INFO, "slen>rlen\n");
- ltxd = gmnal_get_ltxd(nal_data);
- ltxd->srxd = srxd;
- spin_lock(&nal_data->gm_lock);
- /*
- * funny business to get rid
- * of compiler warning
- */
- sbuf_long = (unsigned long) sbuf;
- remote_ptr = (gm_remote_ptr_t)sbuf_long;
- gm_get(nal_data->gm_port, remote_ptr, rbuf,
- rlen, GM_LOW_PRIORITY, source_node,
- gm_port_id,
- gmnal_remote_get_callback, ltxd);
- spin_unlock(&nal_data->gm_lock);
- }
- /*
- * at the end of 1 iov element
- */
- sbuf+=rlen;
- slen-=rlen;
- riov++;
- nriov--;
- rbuf = riov->iov_base;
- rlen = riov->iov_len;
- } else if (rlen > slen) {
- ncalls++;
- if (do_copy) {
- CDEBUG(D_INFO, "slen<rlen\n");
- ltxd = gmnal_get_ltxd(nal_data);
- ltxd->srxd = srxd;
- spin_lock(&nal_data->gm_lock);
- sbuf_long = (unsigned long) sbuf;
- remote_ptr = (gm_remote_ptr_t)sbuf_long;
- gm_get(nal_data->gm_port, remote_ptr, rbuf,
- slen, GM_LOW_PRIORITY, source_node,
- gm_port_id,
- gmnal_remote_get_callback, ltxd);
- spin_unlock(&nal_data->gm_lock);
- }
- /*
- * at end of siov element
- */
- rbuf+=slen;
- rlen-=slen;
- siov++;
- sbuf = siov->iov_base;
- slen = siov->iov_len;
- } else {
- ncalls++;
- if (do_copy) {
- CDEBUG(D_INFO, "rlen=slen\n");
- ltxd = gmnal_get_ltxd(nal_data);
- ltxd->srxd = srxd;
- spin_lock(&nal_data->gm_lock);
- sbuf_long = (unsigned long) sbuf;
- remote_ptr = (gm_remote_ptr_t)sbuf_long;
- gm_get(nal_data->gm_port, remote_ptr, rbuf,
- rlen, GM_LOW_PRIORITY, source_node,
- gm_port_id,
- gmnal_remote_get_callback, ltxd);
- spin_unlock(&nal_data->gm_lock);
- }
- /*
- * at end of siov and riov element
- */
- siov++;
- sbuf = siov->iov_base;
- slen = siov->iov_len;
- riov++;
- nriov--;
- rbuf = riov->iov_base;
- rlen = riov->iov_len;
- }
-
- } while (nriov);
- return(ncalls);
-}
-
-
-/*
- * The callback function that is invoked after each gm_get call completes.
- * Multiple callbacks may be invoked for 1 transaction, only the final
- * callback has work to do.
- */
-void
-gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
- gm_status_t status)
-{
-
- gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context;
- gmnal_srxd_t *srxd = ltxd->srxd;
- lib_nal_t *libnal = srxd->nal_data->libnal;
- int lastone;
- struct iovec *riov;
- int nriov;
- gmnal_data_t *nal_data;
-
- CDEBUG(D_TRACE, "called for context [%p]\n", context);
-
- if (status != GM_SUCCESS) {
- CERROR("reports error [%d/%s]\n",status,gmnal_gm_error(status));
- }
-
- spin_lock(&srxd->callback_lock);
- srxd->ncallbacks--;
- srxd->callback_status |= status;
- lastone = srxd->ncallbacks?0:1;
- spin_unlock(&srxd->callback_lock);
- nal_data = srxd->nal_data;
-
- /*
- * everyone returns a send token
- */
- gmnal_return_ltxd(nal_data, ltxd);
-
- if (!lastone) {
- CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
- return;
- }
-
- /*
- * Let our client application proceed
- */
- CERROR("final callback context[%p]\n", srxd);
- lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
-
- /*
- * send an ack to the sender to let him know we got the data
- */
- gmnal_large_tx_ack(nal_data, srxd);
-
- /*
- * Unregister the memory that was used
- * This is a very slow business (slower then register)
- */
- nriov = srxd->nriov;
- riov = srxd->riov;
- spin_lock(&nal_data->gm_lock);
- while (nriov--) {
- CERROR("deregister memory [%p]\n", riov->iov_base);
- if (gm_deregister_memory(srxd->nal_data->gm_port,
- riov->iov_base, riov->iov_len)) {
- CERROR("failed to deregister memory [%p]\n",
- riov->iov_base);
- }
- riov++;
- }
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
-
- /*
- * repost the receive buffer (return receive token)
- */
- spin_lock(&nal_data->gm_lock);
- gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
- srxd->gmsize, GM_LOW_PRIORITY, 0);
- spin_unlock(&nal_data->gm_lock);
-
return;
}
-/*
- * Called on target node.
- * After pulling data from a source node
- * send an ack message to indicate the large transmit is complete.
- */
-void
-gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
-{
-
- gmnal_stxd_t *stxd;
- gmnal_msghdr_t *msghdr;
- void *buffer = NULL;
- unsigned int local_nid;
- gm_status_t gm_status = GM_SUCCESS;
-
- CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
- srxd->gm_source_node);
-
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_global_id_to_node_id(nal_data->gm_port,
- srxd->gm_source_node, &local_nid);
- spin_unlock(&nal_data->gm_lock);
- if (gm_status != GM_SUCCESS) {
- CERROR("Failed to obtain local id\n");
- return;
- }
- CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
-
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
-
- stxd->nal_data = nal_data;
- stxd->type = GMNAL_LARGE_MESSAGE_ACK;
-
- /*
- * Copy gmnal_msg_hdr and portals header to the transmit buffer
- * Then copy the data in
- */
- buffer = stxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
-
- /*
- * Add in the address of the original stxd from the sender node
- * so it knows which thread to notify.
- */
- msghdr->magic = GMNAL_MAGIC;
- msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
- msghdr->sender_node_id = nal_data->gm_global_nid;
- msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
- CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
- CDEBUG(D_INFO, "sending\n");
- stxd->msg_size= sizeof(gmnal_msghdr_t);
-
-
- CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
- "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
- "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
- stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
- spin_lock(&nal_data->gm_lock);
- stxd->gm_priority = GM_LOW_PRIORITY;
- stxd->gm_target_node = local_nid;
- gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
- stxd->gm_size, stxd->msg_size,
- GM_LOW_PRIORITY, local_nid,
- gmnal_large_tx_ack_callback,
- (void*)stxd);
-
- spin_unlock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
-
- return;
-}
-
-
-/*
- * A callback to indicate the small transmit operation is compete
- * Check for errors and try to deal with them.
- * Call lib_finalise to inform the client application that the
- * send is complete and the memory can be reused.
- * Return the stxd when finished with it (returns a send token)
- */
-void
-gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
- gm_status_t status)
-{
- gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
- gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
-
- if (!stxd) {
- CERROR("send completion event for unknown stxd\n");
- return;
- }
- CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
- stxd, status);
- gmnal_return_stxd(stxd->nal_data, stxd);
-
- spin_unlock(&nal_data->gm_lock);
- return;
-}
-
-/*
- * Indicates the large transmit operation is compete.
- * Called on transmit side (means data has been pulled by receiver
- * or failed).
- * Call lib_finalise to inform the client application that the send
- * is complete, deregister the memory and return the stxd.
- * Finally, report the rx buffer that the ack message was delivered in.
- */
-void
-gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
-{
- lib_nal_t *libnal = nal_data->libnal;
- gmnal_stxd_t *stxd = NULL;
- gmnal_msghdr_t *msghdr = NULL;
- void *buffer = NULL;
- struct iovec *iov;
-
-
- CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
-
- buffer = srxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
- stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
-
- CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
-
- lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
-
- /*
- * extract the iovec from the stxd, deregister the memory.
- * free the space used to store the iovec
- */
- iov = stxd->iov;
- while(stxd->niov--) {
- CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
- iov->iov_base, iov->iov_len);
- spin_lock(&nal_data->gm_lock);
- gm_deregister_memory(nal_data->gm_port, iov->iov_base,
- iov->iov_len);
- spin_unlock(&nal_data->gm_lock);
- iov++;
- }
-
- /*
- * return the send token
- * TO DO It is bad to hold onto the send token so long?
- */
- gmnal_return_stxd(nal_data, stxd);
-
-
- /*
- * requeue the receive buffer
- */
- gmnal_rx_requeue_buffer(nal_data, srxd);
-
-
- return;
-}
#include "gmnal.h"
-int gmnal_small_msg_size = sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t) + PTL_MTU + 928;
/*
* -1 indicates default value.
* This is 1 thread per cpu
int
gmnal_cmd(struct portals_cfg *pcfg, void *private)
{
- gmnal_data_t *nal_data = NULL;
+ gmnal_ni_t *gmnalni = NULL;
char *name = NULL;
int nid = -2;
- int gnid;
+ int gmid;
gm_status_t gm_status;
CDEBUG(D_TRACE, "gmnal_cmd [%d] private [%p]\n",
pcfg->pcfg_command, private);
- nal_data = (gmnal_data_t*)private;
+ gmnalni = (gmnal_ni_t*)private;
switch(pcfg->pcfg_command) {
/*
* just reuse already defined GET_NID. Should define GMNAL version
PORTAL_ALLOC(name, pcfg->pcfg_plen1);
copy_from_user(name, PCFG_PBUF(pcfg, 1), pcfg->pcfg_plen1);
- spin_lock(&nal_data->gm_lock);
- //nid = gm_host_name_to_node_id(nal_data->gm_port, name);
- gm_status = gm_host_name_to_node_id_ex(nal_data->gm_port, 0,
+ spin_lock(&gmnalni->gmni_gm_lock);
+ //nid = gm_host_name_to_node_id(gmnalni->gmni_port, name);
+ gm_status = gm_host_name_to_node_id_ex(gmnalni->gmni_port, 0,
name, &nid);
- spin_unlock(&nal_data->gm_lock);
+ spin_unlock(&gmnalni->gmni_gm_lock);
if (gm_status != GM_SUCCESS) {
- CDEBUG(D_INFO, "gm_host_name_to_node_id_ex(...host %s) "
+ CDEBUG(D_NET, "gm_host_name_to_node_id_ex(...host %s) "
"failed[%d]\n", name, gm_status);
return (-1);
} else
- CDEBUG(D_INFO, "Local node %s id is [%d]\n", name, nid);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_node_id_to_global_id(nal_data->gm_port,
- nid, &gnid);
- spin_unlock(&nal_data->gm_lock);
+ CDEBUG(D_NET, "Local node %s id is [%d]\n", name, nid);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_status = gm_node_id_to_global_id(gmnalni->gmni_port,
+ nid, &gmid);
+ spin_unlock(&gmnalni->gmni_gm_lock);
if (gm_status != GM_SUCCESS) {
- CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
+ CDEBUG(D_NET, "gm_node_id_to_global_id failed[%d]\n",
gm_status);
return(-1);
}
- CDEBUG(D_INFO, "Global node is is [%u][%x]\n", gnid, gnid);
- copy_to_user(PCFG_PBUF(pcfg, 2), &gnid, pcfg->pcfg_plen2);
+ CDEBUG(D_NET, "Global node is is [%u][%x]\n", gmid, gmid);
+ copy_to_user(PCFG_PBUF(pcfg, 2), &gmid, pcfg->pcfg_plen2);
break;
default:
- CDEBUG(D_INFO, "gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command);
+ CDEBUG(D_NET, "gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command);
pcfg->pcfg_nid2 = -1;
}
CDEBUG(D_TRACE, "This is the gmnal module initialisation routine\n");
- CDEBUG(D_INFO, "Calling gmnal_init\n");
+ CDEBUG(D_NET, "Calling gmnal_init\n");
status = gmnal_init();
if (status == PTL_OK) {
- CDEBUG(D_INFO, "Portals GMNAL initialised ok\n");
+ CDEBUG(D_NET, "Portals GMNAL initialised ok\n");
} else {
- CDEBUG(D_INFO, "Portals GMNAL Failed to initialise\n");
+ CDEBUG(D_NET, "Portals GMNAL Failed to initialise\n");
return(-ENODEV);
}
- CDEBUG(D_INFO, "This is the end of the gmnal init routine");
+ CDEBUG(D_NET, "This is the end of the gmnal init routine");
return(0);
}
module_exit(gmnal_unload);
-MODULE_PARM(gmnal_small_msg_size, "i");
MODULE_PARM(num_rx_threads, "i");
MODULE_PARM(num_stxds, "i");
MODULE_PARM(gm_port_id, "i");
* Am I one of the gmnal rxthreads ?
*/
int
-gmnal_is_rxthread(gmnal_data_t *nal_data)
+gmnal_is_rxthread(gmnal_ni_t *gmnalni)
{
int i;
for (i=0; i<num_rx_threads; i++) {
- if (nal_data->rxthread_pid[i] == current->pid)
+ if (gmnalni->gmni_rxthread_pid[i] == current->pid)
return(1);
}
return(0);
* used to do gm_gets in gmnal_copyiov
*/
int
-gmnal_alloc_txd(gmnal_data_t *nal_data)
+gmnal_alloc_txd(gmnal_ni_t *gmnalni)
{
- int ntx= 0, nstx= 0, nrxt_stx= 0,
- nltx= 0, i = 0;
- gmnal_stxd_t *txd = NULL;
- gmnal_ltxd_t *ltxd = NULL;
- void *txbuffer = NULL;
+ int ntx;
+ int nstx;
+ int nrxt_stx;
+ int i;
+ gmnal_stxd_t *txd;
+ void *txbuffer;
CDEBUG(D_TRACE, "gmnal_alloc_small tx\n");
- spin_lock(&nal_data->gm_lock);
+ spin_lock(&gmnalni->gmni_gm_lock);
/*
* total number of transmit tokens
*/
- ntx = gm_num_send_tokens(nal_data->gm_port);
- spin_unlock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "total number of send tokens available is [%d]\n", ntx);
+ ntx = gm_num_send_tokens(gmnalni->gmni_port);
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ CDEBUG(D_NET, "total number of send tokens available is [%d]\n", ntx);
/*
* allocate a number for small sends
*/
nstx = num_stxds;
/*
- * give that number plus 1 to the receive threads
+ * give the rest to the receive threads
*/
- nrxt_stx = nstx + 1;
-
- /*
- * give the rest for gm_gets
- */
- nltx = ntx - (nrxt_stx + nstx);
- if (nltx < 1) {
- CERROR("No tokens available for large messages\n");
- return(GMNAL_STATUS_FAIL);
- }
+ nrxt_stx = num_stxds + 1;
-
- /*
- * A semaphore is initialised with the
- * number of transmit tokens available.
- * To get a stxd, acquire the token semaphore.
- * this decrements the available token count
- * (if no tokens you block here, someone returning a
- * stxd will release the semaphore and wake you)
- * When token is obtained acquire the spinlock
- * to manipulate the list
- */
- sema_init(&nal_data->stxd_token, nstx);
- spin_lock_init(&nal_data->stxd_lock);
- sema_init(&nal_data->rxt_stxd_token, nrxt_stx);
- spin_lock_init(&nal_data->rxt_stxd_lock);
- sema_init(&nal_data->ltxd_token, nltx);
- spin_lock_init(&nal_data->ltxd_lock);
+ if (nstx + nrxt_stx > ntx) {
+ CERROR ("Asked for %d + %d tx credits, but only %d available\n",
+ nstx, nrxt_stx, ntx);
+ return -ENOMEM;
+ }
+
+ /* A semaphore is initialised with the number of transmit tokens
+ * available. To get a stxd, acquire the token semaphore. this
+ * decrements the available token count (if no tokens you block here,
+ * someone returning a stxd will release the semaphore and wake you)
+ * When token is obtained acquire the spinlock to manipulate the
+ * list */
+ sema_init(&gmnalni->gmni_stxd_token, nstx);
+ spin_lock_init(&gmnalni->gmni_stxd_lock);
+
+ sema_init(&gmnalni->gmni_rxt_stxd_token, nrxt_stx);
+ spin_lock_init(&gmnalni->gmni_rxt_stxd_lock);
for (i=0; i<=nstx; i++) {
- PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
- if (!txd) {
+ PORTAL_ALLOC(txd, sizeof(*txd));
+ if (txd == NULL) {
CERROR("Failed to malloc txd [%d]\n", i);
- return(GMNAL_STATUS_NOMEM);
+ return -ENOMEM;
}
- spin_lock(&nal_data->gm_lock);
- txbuffer = gm_dma_malloc(nal_data->gm_port,
- nal_data->small_msg_size);
- spin_unlock(&nal_data->gm_lock);
- if (!txbuffer) {
+ spin_lock(&gmnalni->gmni_gm_lock);
+ txbuffer = gm_dma_malloc(gmnalni->gmni_port,
+ gmnalni->gmni_small_msg_size);
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ if (txbuffer == NULL) {
CERROR("Failed to gm_dma_malloc txbuffer [%d], "
- "size [%d]\n", i, nal_data->small_msg_size);
- PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
- return(GMNAL_STATUS_FAIL);
+ "size [%d]\n", i, gmnalni->gmni_small_msg_size);
+ PORTAL_FREE(txd, sizeof(*txd));
+ return -ENOMEM;
}
- txd->buffer = txbuffer;
- txd->buffer_size = nal_data->small_msg_size;
- txd->gm_size = gm_min_size_for_length(txd->buffer_size);
- txd->nal_data = (struct _gmnal_data_t*)nal_data;
- txd->rxt = 0;
-
- txd->next = nal_data->stxd;
- nal_data->stxd = txd;
- CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], "
- "size [%d]\n", txd, txd->buffer, txd->buffer_size);
+ txd->tx_buffer = txbuffer;
+ txd->tx_buffer_size = gmnalni->gmni_small_msg_size;
+ txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size);
+ txd->tx_gmni = gmnalni;
+ txd->tx_rxt = 0;
+
+ txd->tx_next = gmnalni->gmni_stxd;
+ gmnalni->gmni_stxd = txd;
+ CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], "
+ "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
}
for (i=0; i<=nrxt_stx; i++) {
PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
if (!txd) {
CERROR("Failed to malloc txd [%d]\n", i);
- return(GMNAL_STATUS_NOMEM);
+ return -ENOMEM;
}
- spin_lock(&nal_data->gm_lock);
- txbuffer = gm_dma_malloc(nal_data->gm_port,
- nal_data->small_msg_size);
- spin_unlock(&nal_data->gm_lock);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ txbuffer = gm_dma_malloc(gmnalni->gmni_port,
+ gmnalni->gmni_small_msg_size);
+ spin_unlock(&gmnalni->gmni_gm_lock);
if (!txbuffer) {
CERROR("Failed to gm_dma_malloc txbuffer [%d],"
- " size [%d]\n",i, nal_data->small_msg_size);
+ " size [%d]\n",i, gmnalni->gmni_small_msg_size);
PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
- return(GMNAL_STATUS_FAIL);
+ return -ENOMEM;
}
- txd->buffer = txbuffer;
- txd->buffer_size = nal_data->small_msg_size;
- txd->gm_size = gm_min_size_for_length(txd->buffer_size);
- txd->nal_data = (struct _gmnal_data_t*)nal_data;
- txd->rxt = 1;
-
- txd->next = nal_data->rxt_stxd;
- nal_data->rxt_stxd = txd;
- CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], "
- "size [%d]\n", txd, txd->buffer, txd->buffer_size);
+ txd->tx_buffer = txbuffer;
+ txd->tx_buffer_size = gmnalni->gmni_small_msg_size;
+ txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size);
+ txd->tx_gmni = gmnalni;
+ txd->tx_rxt = 1;
+
+ txd->tx_next = gmnalni->gmni_rxt_stxd;
+ gmnalni->gmni_rxt_stxd = txd;
+ CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], "
+ "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
}
- /*
- * string together large tokens
- */
- for (i=0; i<=nltx ; i++) {
- PORTAL_ALLOC(ltxd, sizeof(gmnal_ltxd_t));
- ltxd->next = nal_data->ltxd;
- nal_data->ltxd = ltxd;
- }
- return(GMNAL_STATUS_OK);
+ return 0;
}
/* Free the list of wired and gm_registered small tx buffers and
* the tx descriptors that go along with them.
*/
void
-gmnal_free_txd(gmnal_data_t *nal_data)
+gmnal_free_txd(gmnal_ni_t *gmnalni)
{
- gmnal_stxd_t *txd = nal_data->stxd, *_txd = NULL;
- gmnal_ltxd_t *ltxd = NULL, *_ltxd = NULL;
+ gmnal_stxd_t *txd;
+ gmnal_stxd_t *_txd;
CDEBUG(D_TRACE, "gmnal_free_small tx\n");
- while(txd) {
- CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], "
- "size [%d]\n", txd, txd->buffer, txd->buffer_size);
+ txd = gmnalni->gmni_stxd;
+ while(txd != NULL) {
+ CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], "
+ "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
_txd = txd;
- txd = txd->next;
- spin_lock(&nal_data->gm_lock);
- gm_dma_free(nal_data->gm_port, _txd->buffer);
- spin_unlock(&nal_data->gm_lock);
+ txd = txd->tx_next;
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer);
+ spin_unlock(&gmnalni->gmni_gm_lock);
PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
}
- txd = nal_data->rxt_stxd;
+
+ txd = gmnalni->gmni_rxt_stxd;
while(txd) {
- CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], "
- "size [%d]\n", txd, txd->buffer, txd->buffer_size);
+ CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], "
+ "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
_txd = txd;
- txd = txd->next;
- spin_lock(&nal_data->gm_lock);
- gm_dma_free(nal_data->gm_port, _txd->buffer);
- spin_unlock(&nal_data->gm_lock);
+ txd = txd->tx_next;
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer);
+ spin_unlock(&gmnalni->gmni_gm_lock);
PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
}
- ltxd = nal_data->ltxd;
- while(txd) {
- _ltxd = ltxd;
- ltxd = ltxd->next;
- PORTAL_FREE(_ltxd, sizeof(gmnal_ltxd_t));
- }
-
- return;
}
* This implicitly gets us a send token also.
*/
gmnal_stxd_t *
-gmnal_get_stxd(gmnal_data_t *nal_data, int block)
+gmnal_get_stxd(gmnal_ni_t *gmnalni, int block)
{
gmnal_stxd_t *txd = NULL;
pid_t pid = current->pid;
- CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n",
- nal_data, block, pid);
-
- if (gmnal_is_rxthread(nal_data)) {
- CDEBUG(D_INFO, "RXTHREAD Attempting to get token\n");
- down(&nal_data->rxt_stxd_token);
- spin_lock(&nal_data->rxt_stxd_lock);
- txd = nal_data->rxt_stxd;
- nal_data->rxt_stxd = txd->next;
- spin_unlock(&nal_data->rxt_stxd_lock);
- CDEBUG(D_INFO, "RXTHREAD got [%p], head is [%p]\n",
- txd, nal_data->rxt_stxd);
- txd->kniov = 0;
- txd->rxt = 1;
+ CDEBUG(D_TRACE, "gmnal_get_stxd gmnalni [%p] block[%d] pid [%d]\n",
+ gmnalni, block, pid);
+
+ if (gmnal_is_rxthread(gmnalni)) {
+ CDEBUG(D_NET, "RXTHREAD Attempting to get token\n");
+ down(&gmnalni->gmni_rxt_stxd_token);
+ spin_lock(&gmnalni->gmni_rxt_stxd_lock);
+ txd = gmnalni->gmni_rxt_stxd;
+ gmnalni->gmni_rxt_stxd = txd->tx_next;
+ spin_unlock(&gmnalni->gmni_rxt_stxd_lock);
+ CDEBUG(D_NET, "RXTHREAD got [%p], head is [%p]\n",
+ txd, gmnalni->gmni_rxt_stxd);
+ txd->tx_kniov = 0;
+ txd->tx_rxt = 1;
} else {
if (block) {
- CDEBUG(D_INFO, "Attempting to get token\n");
- down(&nal_data->stxd_token);
+ CDEBUG(D_NET, "Attempting to get token\n");
+ down(&gmnalni->gmni_stxd_token);
CDEBUG(D_PORTALS, "Got token\n");
} else {
- if (down_trylock(&nal_data->stxd_token)) {
+ if (down_trylock(&gmnalni->gmni_stxd_token)) {
CERROR("can't get token\n");
return(NULL);
}
}
- spin_lock(&nal_data->stxd_lock);
- txd = nal_data->stxd;
- nal_data->stxd = txd->next;
- spin_unlock(&nal_data->stxd_lock);
- CDEBUG(D_INFO, "got [%p], head is [%p]\n", txd,
- nal_data->stxd);
- txd->kniov = 0;
+ spin_lock(&gmnalni->gmni_stxd_lock);
+ txd = gmnalni->gmni_stxd;
+ gmnalni->gmni_stxd = txd->tx_next;
+ spin_unlock(&gmnalni->gmni_stxd_lock);
+ CDEBUG(D_NET, "got [%p], head is [%p]\n", txd,
+ gmnalni->gmni_stxd);
+ txd->tx_kniov = 0;
} /* general txd get */
return(txd);
}
* Return a txd to the list
*/
void
-gmnal_return_stxd(gmnal_data_t *nal_data, gmnal_stxd_t *txd)
+gmnal_return_stxd(gmnal_ni_t *gmnalni, gmnal_stxd_t *txd)
{
- CDEBUG(D_TRACE, "nal_data [%p], txd[%p] rxt[%d]\n", nal_data,
- txd, txd->rxt);
+ CDEBUG(D_TRACE, "gmnalni [%p], txd[%p] rxt[%d]\n", gmnalni,
+ txd, txd->tx_rxt);
/*
* this transmit descriptor is
* for the rxthread
*/
- if (txd->rxt) {
- spin_lock(&nal_data->rxt_stxd_lock);
- txd->next = nal_data->rxt_stxd;
- nal_data->rxt_stxd = txd;
- spin_unlock(&nal_data->rxt_stxd_lock);
- up(&nal_data->rxt_stxd_token);
- CDEBUG(D_INFO, "Returned stxd to rxthread list\n");
+ if (txd->tx_rxt) {
+ spin_lock(&gmnalni->gmni_rxt_stxd_lock);
+ txd->tx_next = gmnalni->gmni_rxt_stxd;
+ gmnalni->gmni_rxt_stxd = txd;
+ spin_unlock(&gmnalni->gmni_rxt_stxd_lock);
+ up(&gmnalni->gmni_rxt_stxd_token);
+ CDEBUG(D_NET, "Returned stxd to rxthread list\n");
} else {
- spin_lock(&nal_data->stxd_lock);
- txd->next = nal_data->stxd;
- nal_data->stxd = txd;
- spin_unlock(&nal_data->stxd_lock);
- up(&nal_data->stxd_token);
- CDEBUG(D_INFO, "Returned stxd to general list\n");
+ spin_lock(&gmnalni->gmni_stxd_lock);
+ txd->tx_next = gmnalni->gmni_stxd;
+ gmnalni->gmni_stxd = txd;
+ spin_unlock(&gmnalni->gmni_stxd_lock);
+ up(&gmnalni->gmni_stxd_token);
+ CDEBUG(D_NET, "Returned stxd to general list\n");
}
return;
}
/*
- * Get a large transmit descriptor from the free list
- * This implicitly gets us a transmit token .
- * always wait for one.
- */
-gmnal_ltxd_t *
-gmnal_get_ltxd(gmnal_data_t *nal_data)
-{
-
- gmnal_ltxd_t *ltxd = NULL;
-
- CDEBUG(D_TRACE, "nal_data [%p]\n", nal_data);
-
- down(&nal_data->ltxd_token);
- spin_lock(&nal_data->ltxd_lock);
- ltxd = nal_data->ltxd;
- nal_data->ltxd = ltxd->next;
- spin_unlock(&nal_data->ltxd_lock);
- CDEBUG(D_INFO, "got [%p], head is [%p]\n", ltxd, nal_data->ltxd);
- return(ltxd);
-}
-
-/*
- * Return an ltxd to the list
- */
-void
-gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd)
-{
- CDEBUG(D_TRACE, "nal_data [%p], ltxd[%p]\n", nal_data, ltxd);
-
- spin_lock(&nal_data->ltxd_lock);
- ltxd->next = nal_data->ltxd;
- nal_data->ltxd = ltxd;
- spin_unlock(&nal_data->ltxd_lock);
- up(&nal_data->ltxd_token);
- return;
-}
-/*
* allocate a number of small rx buffers and register with GM
* so they are wired and set up for DMA. This is a costly operation.
* Also allocate a corrosponding descriptor to keep track of
* receive thread.
*/
int
-gmnal_alloc_srxd(gmnal_data_t *nal_data)
+gmnal_alloc_srxd(gmnal_ni_t *gmnalni)
{
int nrx = 0, nsrx = 0, i = 0;
gmnal_srxd_t *rxd = NULL;
CDEBUG(D_TRACE, "gmnal_alloc_small rx\n");
- spin_lock(&nal_data->gm_lock);
- nrx = gm_num_receive_tokens(nal_data->gm_port);
- spin_unlock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "total number of receive tokens available is [%d]\n",
+ spin_lock(&gmnalni->gmni_gm_lock);
+ nrx = gm_num_receive_tokens(gmnalni->gmni_port);
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ CDEBUG(D_NET, "total number of receive tokens available is [%d]\n",
nrx);
nsrx = nrx/2;
*/
nsrx = num_stxds*2 + 2;
- CDEBUG(D_INFO, "Allocated [%d] receive tokens to small messages\n",
+ CDEBUG(D_NET, "Allocated [%d] receive tokens to small messages\n",
nsrx);
- spin_lock(&nal_data->gm_lock);
- nal_data->srxd_hash = gm_create_hash(gm_hash_compare_ptrs,
- gm_hash_hash_ptr, 0, 0, nsrx, 0);
- spin_unlock(&nal_data->gm_lock);
- if (!nal_data->srxd_hash) {
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gmnalni->gmni_srxd_hash = gm_create_hash(gm_hash_compare_ptrs,
+ gm_hash_hash_ptr, 0, 0, nsrx, 0);
+ spin_unlock(&gmnalni->gmni_gm_lock);
+ if (!gmnalni->gmni_srxd_hash) {
CERROR("Failed to create hash table\n");
- return(GMNAL_STATUS_NOMEM);
+ return -ENOMEM;
}
- sema_init(&nal_data->srxd_token, nsrx);
- spin_lock_init(&nal_data->srxd_lock);
-
for (i=0; i<=nsrx; i++) {
PORTAL_ALLOC(rxd, sizeof(gmnal_srxd_t));
if (!rxd) {
CERROR("Failed to malloc rxd [%d]\n", i);
- return(GMNAL_STATUS_NOMEM);
+ return -ENOMEM;
}
- spin_lock(&nal_data->gm_lock);
- rxbuffer = gm_dma_malloc(nal_data->gm_port,
- nal_data->small_msg_size);
- spin_unlock(&nal_data->gm_lock);
+ spin_lock(&gmnalni->gmni_gm_lock);
+ rxbuffer = gm_dma_malloc(gmnalni->gmni_port,
+ gmnalni->gmni_small_msg_size);
+ spin_unlock(&gmnalni->gmni_gm_lock);
if (!rxbuffer) {
CERROR("Failed to gm_dma_malloc rxbuffer [%d], "
- "size [%d]\n",i ,nal_data->small_msg_size);
+ "size [%d]\n",i ,gmnalni->gmni_small_msg_size);
PORTAL_FREE(rxd, sizeof(gmnal_srxd_t));
- return(GMNAL_STATUS_FAIL);
+ return -ENOMEM;
}
- rxd->buffer = rxbuffer;
- rxd->size = nal_data->small_msg_size;
- rxd->gmsize = gm_min_size_for_length(rxd->size);
+ rxd->rx_buffer = rxbuffer;
+ rxd->rx_size = gmnalni->gmni_small_msg_size;
+ rxd->rx_gmsize = gm_min_size_for_length(rxd->rx_size);
- if (gm_hash_insert(nal_data->srxd_hash,
+ if (gm_hash_insert(gmnalni->gmni_srxd_hash,
(void*)rxbuffer, (void*)rxd)) {
CERROR("failed to create hash entry rxd[%p] "
"for rxbuffer[%p]\n", rxd, rxbuffer);
- return(GMNAL_STATUS_FAIL);
+ return -ENOMEM;
}
- rxd->next = nal_data->srxd;
- nal_data->srxd = rxd;
- CDEBUG(D_INFO, "Registered rxd [%p] with buffer [%p], "
- "size [%d]\n", rxd, rxd->buffer, rxd->size);
+ rxd->rx_next = gmnalni->gmni_srxd;
+ gmnalni->gmni_srxd = rxd;
+ CDEBUG(D_NET, "Registered rxd [%p] with buffer [%p], "
+ "size [%d]\n", rxd, rxd->rx_buffer, rxd->rx_size);
}
- return(GMNAL_STATUS_OK);
+ return 0;
}
* rx descriptors that go along with them.
*/
void
-gmnal_free_srxd(gmnal_data_t *nal_data)
+gmnal_free_srxd(gmnal_ni_t *gmnalni)
{
- gmnal_srxd_t *rxd = nal_data->srxd, *_rxd = NULL;
+ gmnal_srxd_t *rxd = gmnalni->gmni_srxd, *_rxd = NULL;
CDEBUG(D_TRACE, "gmnal_free_small rx\n");
while(rxd) {
- CDEBUG(D_INFO, "Freeing rxd [%p] buffer [%p], size [%d]\n",
- rxd, rxd->buffer, rxd->size);
+ CDEBUG(D_NET, "Freeing rxd [%p] buffer [%p], size [%d]\n",
+ rxd, rxd->rx_buffer, rxd->rx_size);
_rxd = rxd;
- rxd = rxd->next;
+ rxd = rxd->rx_next;
+
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_dma_free(gmnalni->gmni_port, _rxd->rx_buffer);
+ spin_unlock(&gmnalni->gmni_gm_lock);
-#if 0
- spin_lock(&nal_data->gm_lock);
- gm_deregister_memory(nal_data->gm_port, _rxd->buffer,
- _rxd->size);
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(_rxd->buffer, GMNAL_SMALL_RXBUFFER_SIZE);
-#else
- spin_lock(&nal_data->gm_lock);
- gm_dma_free(nal_data->gm_port, _rxd->buffer);
- spin_unlock(&nal_data->gm_lock);
-#endif
PORTAL_FREE(_rxd, sizeof(gmnal_srxd_t));
}
return;
/*
- * Get a rxd from the free list
- * This get us a wired and gm_registered small rx buffer.
- * This implicitly gets us a receive token also.
- */
-gmnal_srxd_t *
-gmnal_get_srxd(gmnal_data_t *nal_data, int block)
-{
-
- gmnal_srxd_t *rxd = NULL;
- CDEBUG(D_TRACE, "nal_data [%p] block [%d]\n", nal_data, block);
-
- if (block) {
- down(&nal_data->srxd_token);
- } else {
- if (down_trylock(&nal_data->srxd_token)) {
- CDEBUG(D_INFO, "gmnal_get_srxd Can't get token\n");
- return(NULL);
- }
- }
- spin_lock(&nal_data->srxd_lock);
- rxd = nal_data->srxd;
- if (rxd)
- nal_data->srxd = rxd->next;
- spin_unlock(&nal_data->srxd_lock);
- CDEBUG(D_INFO, "got [%p], head is [%p]\n", rxd, nal_data->srxd);
- return(rxd);
-}
-
-/*
- * Return an rxd to the list
- */
-void
-gmnal_return_srxd(gmnal_data_t *nal_data, gmnal_srxd_t *rxd)
-{
- CDEBUG(D_TRACE, "nal_data [%p], rxd[%p]\n", nal_data, rxd);
-
- spin_lock(&nal_data->srxd_lock);
- rxd->next = nal_data->srxd;
- nal_data->srxd = rxd;
- spin_unlock(&nal_data->srxd_lock);
- up(&nal_data->srxd_token);
- return;
-}
-
-/*
* Given a pointer to a srxd find
* the relevant descriptor for it
* This is done by searching a hash
* are created
*/
gmnal_srxd_t *
-gmnal_rxbuffer_to_srxd(gmnal_data_t *nal_data, void *rxbuffer)
+gmnal_rxbuffer_to_srxd(gmnal_ni_t *gmnalni, void *rxbuffer)
{
gmnal_srxd_t *srxd = NULL;
- CDEBUG(D_TRACE, "nal_data [%p], rxbuffer [%p]\n", nal_data, rxbuffer);
- srxd = gm_hash_find(nal_data->srxd_hash, rxbuffer);
- CDEBUG(D_INFO, "srxd is [%p]\n", srxd);
+ CDEBUG(D_TRACE, "gmnalni [%p], rxbuffer [%p]\n", gmnalni, rxbuffer);
+ srxd = gm_hash_find(gmnalni->gmni_srxd_hash, rxbuffer);
+ CDEBUG(D_NET, "srxd is [%p]\n", srxd);
return(srxd);
}
void
-gmnal_stop_rxthread(gmnal_data_t *nal_data)
+gmnal_stop_rxthread(gmnal_ni_t *gmnalni)
{
int delay = 30;
- CDEBUG(D_TRACE, "Attempting to stop rxthread nal_data [%p]\n",
- nal_data);
+ CDEBUG(D_TRACE, "Attempting to stop rxthread gmnalni [%p]\n",
+ gmnalni);
- nal_data->rxthread_stop_flag = GMNAL_THREAD_STOP;
+ gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_STOP;
- gmnal_remove_rxtwe(nal_data);
+ gmnal_remove_rxtwe(gmnalni);
/*
* kick the thread
*/
- up(&nal_data->rxtwe_wait);
+ up(&gmnalni->gmni_rxtwe_wait);
- while(nal_data->rxthread_flag != GMNAL_THREAD_RESET && delay--) {
- CDEBUG(D_INFO, "gmnal_stop_rxthread sleeping\n");
+ while(gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET && delay--) {
+ CDEBUG(D_NET, "gmnal_stop_rxthread sleeping\n");
gmnal_yield(1);
- up(&nal_data->rxtwe_wait);
+ up(&gmnalni->gmni_rxtwe_wait);
}
- if (nal_data->rxthread_flag != GMNAL_THREAD_RESET) {
+ if (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
CERROR("I don't know how to wake the thread\n");
} else {
- CDEBUG(D_INFO, "rx thread seems to have stopped\n");
+ CDEBUG(D_NET, "rx thread seems to have stopped\n");
}
}
void
-gmnal_stop_ctthread(gmnal_data_t *nal_data)
+gmnal_stop_ctthread(gmnal_ni_t *gmnalni)
{
int delay = 15;
- CDEBUG(D_TRACE, "Attempting to stop ctthread nal_data [%p]\n",
- nal_data);
+ CDEBUG(D_TRACE, "Attempting to stop ctthread gmnalni [%p]\n",
+ gmnalni);
- nal_data->ctthread_flag = GMNAL_THREAD_STOP;
- spin_lock(&nal_data->gm_lock);
- gm_set_alarm(nal_data->gm_port, &nal_data->ctthread_alarm, 10,
+ gmnalni->gmni_ctthread_flag = GMNAL_THREAD_STOP;
+ spin_lock(&gmnalni->gmni_gm_lock);
+ gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 10,
NULL, NULL);
- spin_unlock(&nal_data->gm_lock);
+ spin_unlock(&gmnalni->gmni_gm_lock);
- while(nal_data->ctthread_flag == GMNAL_THREAD_STOP && delay--) {
- CDEBUG(D_INFO, "gmnal_stop_ctthread sleeping\n");
+ while(gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP && delay--) {
+ CDEBUG(D_NET, "gmnal_stop_ctthread sleeping\n");
gmnal_yield(1);
}
- if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
+ if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
CERROR("I DON'T KNOW HOW TO WAKE THE THREAD\n");
} else {
- CDEBUG(D_INFO, "CT THREAD SEEMS TO HAVE STOPPED\n");
+ CDEBUG(D_NET, "CT THREAD SEEMS TO HAVE STOPPED\n");
}
}
}
int
-gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov,
+gmnal_is_small_msg(gmnal_ni_t *gmnalni, int niov, struct iovec *iov,
int len)
{
CDEBUG(D_TRACE, "len [%d] limit[%d]\n", len,
- nal_data->small_msg_size);
+ gmnalni->gmni_small_msg_size);
if ((len + sizeof(ptl_hdr_t) + sizeof(gmnal_msghdr_t))
- < nal_data->small_msg_size) {
+ < gmnalni->gmni_small_msg_size) {
- CDEBUG(D_INFO, "Yep, small message\n");
+ CDEBUG(D_NET, "Yep, small message\n");
return(1);
} else {
CERROR("No, not small message\n");
* can get it to complete the receive
*/
int
-gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
+gmnal_add_rxtwe(gmnal_ni_t *gmnalni, gm_recv_t *recv)
{
gmnal_rxtwe_t *we = NULL;
PORTAL_ALLOC(we, sizeof(gmnal_rxtwe_t));
if (!we) {
CERROR("failed to malloc\n");
- return(GMNAL_STATUS_FAIL);
+ return -ENOMEM;
}
we->buffer = gm_ntohp(recv->buffer);
we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
we->type = (int)gm_ntoh_u8(recv->type);
we->length = (int)gm_ntohl(recv->length);
- spin_lock(&nal_data->rxtwe_lock);
- if (nal_data->rxtwe_tail) {
- nal_data->rxtwe_tail->next = we;
+ spin_lock(&gmnalni->gmni_rxtwe_lock);
+ if (gmnalni->gmni_rxtwe_tail) {
+ gmnalni->gmni_rxtwe_tail->next = we;
} else {
- nal_data->rxtwe_head = we;
- nal_data->rxtwe_tail = we;
+ gmnalni->gmni_rxtwe_head = we;
+ gmnalni->gmni_rxtwe_tail = we;
}
- nal_data->rxtwe_tail = we;
- spin_unlock(&nal_data->rxtwe_lock);
+ gmnalni->gmni_rxtwe_tail = we;
+ spin_unlock(&gmnalni->gmni_rxtwe_lock);
- up(&nal_data->rxtwe_wait);
- return(GMNAL_STATUS_OK);
+ up(&gmnalni->gmni_rxtwe_wait);
+ return 0;
}
void
-gmnal_remove_rxtwe(gmnal_data_t *nal_data)
+gmnal_remove_rxtwe(gmnal_ni_t *gmnalni)
{
- gmnal_rxtwe_t *_we, *we = nal_data->rxtwe_head;
+ gmnal_rxtwe_t *_we, *we = gmnalni->gmni_rxtwe_head;
CDEBUG(D_NET, "removing all work list entries\n");
- spin_lock(&nal_data->rxtwe_lock);
+ spin_lock(&gmnalni->gmni_rxtwe_lock);
CDEBUG(D_NET, "Got lock\n");
while (we) {
_we = we;
we = we->next;
PORTAL_FREE(_we, sizeof(gmnal_rxtwe_t));
}
- spin_unlock(&nal_data->rxtwe_lock);
- nal_data->rxtwe_head = NULL;
- nal_data->rxtwe_tail = NULL;
+ spin_unlock(&gmnalni->gmni_rxtwe_lock);
+ gmnalni->gmni_rxtwe_head = NULL;
+ gmnalni->gmni_rxtwe_tail = NULL;
}
gmnal_rxtwe_t *
-gmnal_get_rxtwe(gmnal_data_t *nal_data)
+gmnal_get_rxtwe(gmnal_ni_t *gmnalni)
{
gmnal_rxtwe_t *we = NULL;
CDEBUG(D_NET, "Getting entry to list\n");
do {
- while(down_interruptible(&nal_data->rxtwe_wait) != 0)
+ while(down_interruptible(&gmnalni->gmni_rxtwe_wait) != 0)
/* do nothing */;
- if (nal_data->rxthread_stop_flag == GMNAL_THREAD_STOP) {
+
+ if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP) {
/*
* time to stop
* TO DO some one free the work entries
*/
return(NULL);
}
- spin_lock(&nal_data->rxtwe_lock);
- if (nal_data->rxtwe_head) {
- CDEBUG(D_INFO, "Got a work entry\n");
- we = nal_data->rxtwe_head;
- nal_data->rxtwe_head = we->next;
- if (!nal_data->rxtwe_head)
- nal_data->rxtwe_tail = NULL;
+
+ spin_lock(&gmnalni->gmni_rxtwe_lock);
+ if (gmnalni->gmni_rxtwe_head) {
+ CDEBUG(D_NET, "Got a work entry\n");
+ we = gmnalni->gmni_rxtwe_head;
+ gmnalni->gmni_rxtwe_head = we->next;
+ if (!gmnalni->gmni_rxtwe_head)
+ gmnalni->gmni_rxtwe_tail = NULL;
} else {
CWARN("woken but no work\n");
}
- spin_unlock(&nal_data->rxtwe_lock);
+
+ spin_unlock(&gmnalni->gmni_rxtwe_lock);
} while (!we);
- CDEBUG(D_INFO, "Returning we[%p]\n", we);
+ CDEBUG(D_NET, "Returning we[%p]\n", we);
return(we);
}
* callback events or sleeps.
*/
int
-gmnal_start_kernel_threads(gmnal_data_t *nal_data)
+gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
{
int threads = 0;
* gm_unknown call (sleeping) to exit it.
*/
CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n");
- gm_initialize_alarm(&nal_data->ctthread_alarm);
- nal_data->ctthread_flag = GMNAL_THREAD_RESET;
+ gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm);
+ gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
- CDEBUG(D_INFO, "Starting caretaker thread\n");
- nal_data->ctthread_pid =
- kernel_thread(gmnal_ct_thread, (void*)nal_data, 0);
- if (nal_data->ctthread_pid <= 0) {
+ CDEBUG(D_NET, "Starting caretaker thread\n");
+ gmnalni->gmni_ctthread_pid =
+ kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0);
+ if (gmnalni->gmni_ctthread_pid <= 0) {
CERROR("Caretaker thread failed to start\n");
- return(GMNAL_STATUS_FAIL);
+ return -ENOMEM;
}
- while (nal_data->rxthread_flag != GMNAL_THREAD_RESET) {
+ while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
gmnal_yield(1);
- CDEBUG(D_INFO, "Waiting for caretaker thread signs of life\n");
+ CDEBUG(D_NET, "Waiting for caretaker thread signs of life\n");
}
- CDEBUG(D_INFO, "caretaker thread has started\n");
+ CDEBUG(D_NET, "caretaker thread has started\n");
/*
* Now start a number of receiver threads
* these treads get work to do from the caretaker (ct) thread
*/
- nal_data->rxthread_flag = GMNAL_THREAD_RESET;
- nal_data->rxthread_stop_flag = GMNAL_THREAD_RESET;
+ gmnalni->gmni_rxthread_flag = GMNAL_THREAD_RESET;
+ gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_RESET;
for (threads=0; threads<NRXTHREADS; threads++)
- nal_data->rxthread_pid[threads] = -1;
- spin_lock_init(&nal_data->rxtwe_lock);
- spin_lock_init(&nal_data->rxthread_flag_lock);
- sema_init(&nal_data->rxtwe_wait, 0);
- nal_data->rxtwe_head = NULL;
- nal_data->rxtwe_tail = NULL;
+ gmnalni->gmni_rxthread_pid[threads] = -1;
+ spin_lock_init(&gmnalni->gmni_rxtwe_lock);
+ spin_lock_init(&gmnalni->gmni_rxthread_flag_lock);
+ sema_init(&gmnalni->gmni_rxtwe_wait, 0);
+ gmnalni->gmni_rxtwe_head = NULL;
+ gmnalni->gmni_rxtwe_tail = NULL;
/*
* If the default number of receive threades isn't
* modified at load time, then start one thread per cpu
*/
if (num_rx_threads == -1)
num_rx_threads = smp_num_cpus;
- CDEBUG(D_INFO, "Starting [%d] receive threads\n", num_rx_threads);
+ CDEBUG(D_NET, "Starting [%d] receive threads\n", num_rx_threads);
for (threads=0; threads<num_rx_threads; threads++) {
- nal_data->rxthread_pid[threads] =
- kernel_thread(gmnal_rx_thread, (void*)nal_data, 0);
- if (nal_data->rxthread_pid[threads] <= 0) {
+ gmnalni->gmni_rxthread_pid[threads] =
+ kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0);
+ if (gmnalni->gmni_rxthread_pid[threads] <= 0) {
CERROR("Receive thread failed to start\n");
- gmnal_stop_rxthread(nal_data);
- gmnal_stop_ctthread(nal_data);
- return(GMNAL_STATUS_FAIL);
+ gmnal_stop_rxthread(gmnalni);
+ gmnal_stop_ctthread(gmnalni);
+ return -ENOMEM;
}
}
for (;;) {
- spin_lock(&nal_data->rxthread_flag_lock);
- if (nal_data->rxthread_flag == GMNAL_RXTHREADS_STARTED) {
- spin_unlock(&nal_data->rxthread_flag_lock);
+ spin_lock(&gmnalni->gmni_rxthread_flag_lock);
+ if (gmnalni->gmni_rxthread_flag == GMNAL_RXTHREADS_STARTED) {
+ spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
break;
}
- spin_unlock(&nal_data->rxthread_flag_lock);
+ spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
gmnal_yield(1);
}
- CDEBUG(D_INFO, "receive threads seem to have started\n");
+ CDEBUG(D_NET, "receive threads seem to have started\n");
- return(GMNAL_STATUS_OK);
+ return 0;
}