* transmit descriptors on the free list)
*/
typedef struct _gmnal_rxtwe {
- gm_recv_event_t *rx;
+ void *buffer;
+ unsigned snode;
+ unsigned sport;
+ unsigned type;
+ unsigned length;
struct _gmnal_rxtwe *next;
} gmnal_rxtwe_t;
*/
int gmnal_ct_thread(void *); /* caretaker thread */
int gmnal_rx_thread(void *); /* receive thread */
-int gmnal_pre_receive(gmnal_data_t*, gm_recv_t*, int);
-int gmnal_rx_bad(gmnal_data_t *, gm_recv_t *, gmnal_srxd_t *);
+int gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int);
+int gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*);
int gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *);
-int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_event_t *);
+int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *);
gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *);
void gmnal_remove_rxtwe(gmnal_data_t *);
{
gmnal_data_t *nal_data;
gm_recv_event_t *rxevent = NULL;
+ gm_recv_t *recv = NULL;
if (!arg) {
CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
CDEBUG(D_NET, "waiting\n");
rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
- CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
CDEBUG(D_INFO, "time to exit\n");
break;
}
+ CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
switch (GM_RECV_EVENT_TYPE(rxevent)) {
case(GM_RECV_EVENT):
CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
+ recv = (gm_recv_t*)&rxevent->recv;
GMNAL_GM_UNLOCK(nal_data);
- gmnal_add_rxtwe(nal_data, rxevent);
+ gmnal_add_rxtwe(nal_data, recv);
GMNAL_GM_LOCK(nal_data);
CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
break;
int gmnal_rx_thread(void *arg)
{
gmnal_data_t *nal_data;
- gm_recv_event_t *rxevent = NULL;
- gm_recv_t *recv = NULL;
void *buffer;
gmnal_rxtwe_t *we = NULL;
CDEBUG(D_INFO, "Receive thread time to exit\n");
break;
}
- rxevent = we->rx;
- CDEBUG(D_INFO, "thread got [%s]\n", gmnal_rxevent(rxevent));
- recv = (gm_recv_t*)&(rxevent->recv);
- buffer = gm_ntohp(recv->buffer);
- PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
+ buffer = we->buffer;
switch(((gmnal_msghdr_t*)buffer)->type) {
case(GMNAL_SMALL_MESSAGE):
- gmnal_pre_receive(nal_data, recv,
+ gmnal_pre_receive(nal_data, we,
GMNAL_SMALL_MESSAGE);
break;
case(GMNAL_LARGE_MESSAGE_INIT):
- gmnal_pre_receive(nal_data, recv,
+ gmnal_pre_receive(nal_data, we,
GMNAL_LARGE_MESSAGE_INIT);
break;
case(GMNAL_LARGE_MESSAGE_ACK):
- gmnal_pre_receive(nal_data, recv,
+ gmnal_pre_receive(nal_data, we,
GMNAL_LARGE_MESSAGE_ACK);
break;
default:
CDEBUG(D_ERROR, "Unsupported message type\n");
- gmnal_rx_bad(nal_data, recv, NULL);
+ gmnal_rx_bad(nal_data, we, NULL);
}
+ PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
}
spin_lock(&nal_data->rxthread_flag_lock);
* Deal with all endian stuff here.
*/
int
-gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type)
+gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
{
gmnal_srxd_t *srxd = NULL;
void *buffer = NULL;
gmnal_msghdr_t *gmnal_msghdr;
ptl_hdr_t *portals_hdr;
- CDEBUG(D_INFO, "nal_data [%p], recv [%p] type [%d]\n",
- nal_data, recv, gmnal_type);
+ CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
+ nal_data, we, gmnal_type);
- buffer = gm_ntohp(recv->buffer);;
- snode = (int)gm_ntoh_u16(recv->sender_node_id);
- sport = (int)gm_ntoh_u8(recv->sender_port_id);
- type = (int)gm_ntoh_u8(recv->type);
- buffer = gm_ntohp(recv->buffer);
- length = (int) gm_ntohl(recv->length);
+ buffer = we->buffer;
+ snode = we->snode;
+ sport = we->sport;
+ type = we->type;
+ buffer = we->buffer;
+ length = we->length;
gmnal_msghdr = (gmnal_msghdr_t*)buffer;
portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
* A bad message is one we don't expect or can't interpret
*/
int
-gmnal_rx_bad(gmnal_data_t *nal_data, gm_recv_t *recv, gmnal_srxd_t *srxd)
+gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
{
CDEBUG(D_TRACE, "Can't handle message\n");
if (!srxd)
srxd = gmnal_rxbuffer_to_srxd(nal_data,
- gm_ntohp(recv->buffer));
+ we->buffer);
if (srxd) {
gmnal_rx_requeue_buffer(nal_data, srxd);
} else {
{
gmnal_stxd_t *txd = NULL;
- pid_t pid = current->pid;
+ pid_t pid = current->pid;
CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n",
}
+/*
+ * extract info from the receive event.
+ * Have to do this before the next call to gm_receive
+ * Deal with all endian stuff here.
+ * Then stick work entry on list where rxthreads
+ * can get it to complete the receive
+ */
int
-gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent)
+gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
{
gmnal_rxtwe_t *we = NULL;
CDEBUG(D_ERROR, "failed to malloc\n");
return(GMNAL_STATUS_FAIL);
}
- we->rx = rxevent;
+ we->buffer = gm_ntohp(recv->buffer);
+ we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
+ we->sport = (int)gm_ntoh_u8(recv->sender_port_id);
+ we->type = (int)gm_ntoh_u8(recv->type);
+ we->length = (int)gm_ntohl(recv->length);
spin_lock(&nal_data->rxtwe_lock);
if (nal_data->rxtwe_tail) {
* transmit descriptors on the free list)
*/
typedef struct _gmnal_rxtwe {
- gm_recv_event_t *rx;
+ void *buffer;
+ unsigned snode;
+ unsigned sport;
+ unsigned type;
+ unsigned length;
struct _gmnal_rxtwe *next;
} gmnal_rxtwe_t;
*/
int gmnal_ct_thread(void *); /* caretaker thread */
int gmnal_rx_thread(void *); /* receive thread */
-int gmnal_pre_receive(gmnal_data_t*, gm_recv_t*, int);
-int gmnal_rx_bad(gmnal_data_t *, gm_recv_t *, gmnal_srxd_t *);
+int gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int);
+int gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*);
int gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *);
-int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_event_t *);
+int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *);
gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *);
void gmnal_remove_rxtwe(gmnal_data_t *);
{
gmnal_data_t *nal_data;
gm_recv_event_t *rxevent = NULL;
+ gm_recv_t *recv = NULL;
if (!arg) {
CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
CDEBUG(D_NET, "waiting\n");
rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
- CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
CDEBUG(D_INFO, "time to exit\n");
break;
}
+ CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
switch (GM_RECV_EVENT_TYPE(rxevent)) {
case(GM_RECV_EVENT):
CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
+ recv = (gm_recv_t*)&rxevent->recv;
GMNAL_GM_UNLOCK(nal_data);
- gmnal_add_rxtwe(nal_data, rxevent);
+ gmnal_add_rxtwe(nal_data, recv);
GMNAL_GM_LOCK(nal_data);
CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
break;
int gmnal_rx_thread(void *arg)
{
gmnal_data_t *nal_data;
- gm_recv_event_t *rxevent = NULL;
- gm_recv_t *recv = NULL;
void *buffer;
gmnal_rxtwe_t *we = NULL;
CDEBUG(D_INFO, "Receive thread time to exit\n");
break;
}
- rxevent = we->rx;
- CDEBUG(D_INFO, "thread got [%s]\n", gmnal_rxevent(rxevent));
- recv = (gm_recv_t*)&(rxevent->recv);
- buffer = gm_ntohp(recv->buffer);
- PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
+ buffer = we->buffer;
switch(((gmnal_msghdr_t*)buffer)->type) {
case(GMNAL_SMALL_MESSAGE):
- gmnal_pre_receive(nal_data, recv,
+ gmnal_pre_receive(nal_data, we,
GMNAL_SMALL_MESSAGE);
break;
case(GMNAL_LARGE_MESSAGE_INIT):
- gmnal_pre_receive(nal_data, recv,
+ gmnal_pre_receive(nal_data, we,
GMNAL_LARGE_MESSAGE_INIT);
break;
case(GMNAL_LARGE_MESSAGE_ACK):
- gmnal_pre_receive(nal_data, recv,
+ gmnal_pre_receive(nal_data, we,
GMNAL_LARGE_MESSAGE_ACK);
break;
default:
CDEBUG(D_ERROR, "Unsupported message type\n");
- gmnal_rx_bad(nal_data, recv, NULL);
+ gmnal_rx_bad(nal_data, we, NULL);
}
+ PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
}
spin_lock(&nal_data->rxthread_flag_lock);
* Deal with all endian stuff here.
*/
int
-gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type)
+gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
{
gmnal_srxd_t *srxd = NULL;
void *buffer = NULL;
gmnal_msghdr_t *gmnal_msghdr;
ptl_hdr_t *portals_hdr;
- CDEBUG(D_INFO, "nal_data [%p], recv [%p] type [%d]\n",
- nal_data, recv, gmnal_type);
+ CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
+ nal_data, we, gmnal_type);
- buffer = gm_ntohp(recv->buffer);;
- snode = (int)gm_ntoh_u16(recv->sender_node_id);
- sport = (int)gm_ntoh_u8(recv->sender_port_id);
- type = (int)gm_ntoh_u8(recv->type);
- buffer = gm_ntohp(recv->buffer);
- length = (int) gm_ntohl(recv->length);
+ buffer = we->buffer;
+ snode = we->snode;
+ sport = we->sport;
+ type = we->type;
+ buffer = we->buffer;
+ length = we->length;
gmnal_msghdr = (gmnal_msghdr_t*)buffer;
portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
* A bad message is one we don't expect or can't interpret
*/
int
-gmnal_rx_bad(gmnal_data_t *nal_data, gm_recv_t *recv, gmnal_srxd_t *srxd)
+gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
{
CDEBUG(D_TRACE, "Can't handle message\n");
if (!srxd)
srxd = gmnal_rxbuffer_to_srxd(nal_data,
- gm_ntohp(recv->buffer));
+ we->buffer);
if (srxd) {
gmnal_rx_requeue_buffer(nal_data, srxd);
} else {
{
gmnal_stxd_t *txd = NULL;
- pid_t pid = current->pid;
+ pid_t pid = current->pid;
CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n",
}
+/*
+ * extract info from the receive event.
+ * Have to do this before the next call to gm_receive
+ * Deal with all endian stuff here.
+ * Then stick work entry on list where rxthreads
+ * can get it to complete the receive
+ */
int
-gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent)
+gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
{
gmnal_rxtwe_t *we = NULL;
CDEBUG(D_ERROR, "failed to malloc\n");
return(GMNAL_STATUS_FAIL);
}
- we->rx = rxevent;
+ we->buffer = gm_ntohp(recv->buffer);
+ we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
+ we->sport = (int)gm_ntoh_u8(recv->sender_port_id);
+ we->type = (int)gm_ntoh_u8(recv->type);
+ we->length = (int)gm_ntohl(recv->length);
spin_lock(&nal_data->rxtwe_lock);
if (nal_data->rxtwe_tail) {