From 51b0f6b04d6034b42d9be868d62aa98494518f3b Mon Sep 17 00:00:00 2001 From: mdoyle Date: Tue, 7 Oct 2003 10:18:38 +0000 Subject: [PATCH] extract all required info from gm_recv_event_t before next call to gm_receive. --- lnet/klnds/gmlnd/gmlnd.h | 12 ++++++--- lnet/klnds/gmlnd/gmlnd_comm.c | 45 +++++++++++++++----------------- lnet/klnds/gmlnd/gmlnd_utils.c | 17 +++++++++--- lustre/portals/knals/gmnal/gmnal.h | 12 ++++++--- lustre/portals/knals/gmnal/gmnal_comm.c | 45 +++++++++++++++----------------- lustre/portals/knals/gmnal/gmnal_utils.c | 17 +++++++++--- 6 files changed, 86 insertions(+), 62 deletions(-) diff --git a/lnet/klnds/gmlnd/gmlnd.h b/lnet/klnds/gmlnd/gmlnd.h index fdde839..2db6c9b 100644 --- a/lnet/klnds/gmlnd/gmlnd.h +++ b/lnet/klnds/gmlnd/gmlnd.h @@ -159,7 +159,11 @@ typedef struct _gmnal_msghdr { * transmit descriptors on the free list) */ typedef struct _gmnal_rxtwe { - gm_recv_event_t *rx; + void *buffer; + unsigned snode; + unsigned sport; + unsigned type; + unsigned length; struct _gmnal_rxtwe *next; } gmnal_rxtwe_t; @@ -408,10 +412,10 @@ int gmnal_start_kernel_threads(gmnal_data_t *); */ int gmnal_ct_thread(void *); /* caretaker thread */ int gmnal_rx_thread(void *); /* receive thread */ -int gmnal_pre_receive(gmnal_data_t*, gm_recv_t*, int); -int gmnal_rx_bad(gmnal_data_t *, gm_recv_t *, gmnal_srxd_t *); +int gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int); +int gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*); int gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *); -int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_event_t *); +int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *); gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *); void gmnal_remove_rxtwe(gmnal_data_t *); diff --git a/lnet/klnds/gmlnd/gmlnd_comm.c b/lnet/klnds/gmlnd/gmlnd_comm.c index 9e32145..772b9cc 100644 --- a/lnet/klnds/gmlnd/gmlnd_comm.c +++ b/lnet/klnds/gmlnd/gmlnd_comm.c @@ -38,6 +38,7 @@ gmnal_ct_thread(void *arg) { gmnal_data_t *nal_data; gm_recv_event_t *rxevent = NULL; + gm_recv_t *recv = NULL; if (!arg) { CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); @@ -55,17 +56,18 @@ gmnal_ct_thread(void *arg) while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) { CDEBUG(D_NET, "waiting\n"); rxevent = gm_blocking_receive_no_spin(nal_data->gm_port); - CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { CDEBUG(D_INFO, "time to exit\n"); break; } + CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); switch (GM_RECV_EVENT_TYPE(rxevent)) { case(GM_RECV_EVENT): CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n"); + recv = (gm_recv_t*)&rxevent->recv; GMNAL_GM_UNLOCK(nal_data); - gmnal_add_rxtwe(nal_data, rxevent); + gmnal_add_rxtwe(nal_data, recv); GMNAL_GM_LOCK(nal_data); CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n"); break; @@ -109,8 +111,6 @@ gmnal_ct_thread(void *arg) int gmnal_rx_thread(void *arg) { gmnal_data_t *nal_data; - gm_recv_event_t *rxevent = NULL; - gm_recv_t *recv = NULL; void *buffer; gmnal_rxtwe_t *we = NULL; @@ -142,29 +142,26 @@ int gmnal_rx_thread(void *arg) CDEBUG(D_INFO, "Receive thread time to exit\n"); break; } - rxevent = we->rx; - CDEBUG(D_INFO, "thread got [%s]\n", gmnal_rxevent(rxevent)); - recv = (gm_recv_t*)&(rxevent->recv); - buffer = gm_ntohp(recv->buffer); - PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); + buffer = we->buffer; switch(((gmnal_msghdr_t*)buffer)->type) { case(GMNAL_SMALL_MESSAGE): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE); break; case(GMNAL_LARGE_MESSAGE_INIT): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_LARGE_MESSAGE_INIT); break; case(GMNAL_LARGE_MESSAGE_ACK): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_LARGE_MESSAGE_ACK); break; default: CDEBUG(D_ERROR, "Unsupported message type\n"); - gmnal_rx_bad(nal_data, recv, NULL); + gmnal_rx_bad(nal_data, we, NULL); } + PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); } spin_lock(&nal_data->rxthread_flag_lock); @@ -185,7 +182,7 @@ int gmnal_rx_thread(void *arg) * Deal with all endian stuff here. */ int -gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type) +gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type) { gmnal_srxd_t *srxd = NULL; void *buffer = NULL; @@ -193,15 +190,15 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type) gmnal_msghdr_t *gmnal_msghdr; ptl_hdr_t *portals_hdr; - CDEBUG(D_INFO, "nal_data [%p], recv [%p] type [%d]\n", - nal_data, recv, gmnal_type); + CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", + nal_data, we, gmnal_type); - buffer = gm_ntohp(recv->buffer);; - snode = (int)gm_ntoh_u16(recv->sender_node_id); - sport = (int)gm_ntoh_u8(recv->sender_port_id); - type = (int)gm_ntoh_u8(recv->type); - buffer = gm_ntohp(recv->buffer); - length = (int) gm_ntohl(recv->length); + buffer = we->buffer; + snode = we->snode; + sport = we->sport; + type = we->type; + buffer = we->buffer; + length = we->length; gmnal_msghdr = (gmnal_msghdr_t*)buffer; portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE); @@ -281,13 +278,13 @@ gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) * A bad message is one we don't expect or can't interpret */ int -gmnal_rx_bad(gmnal_data_t *nal_data, gm_recv_t *recv, gmnal_srxd_t *srxd) +gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd) { CDEBUG(D_TRACE, "Can't handle message\n"); if (!srxd) srxd = gmnal_rxbuffer_to_srxd(nal_data, - gm_ntohp(recv->buffer)); + we->buffer); if (srxd) { gmnal_rx_requeue_buffer(nal_data, srxd); } else { diff --git a/lnet/klnds/gmlnd/gmlnd_utils.c b/lnet/klnds/gmlnd/gmlnd_utils.c index 84fc3a0..d40a943 100644 --- a/lnet/klnds/gmlnd/gmlnd_utils.c +++ b/lnet/klnds/gmlnd/gmlnd_utils.c @@ -192,7 +192,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block) { gmnal_stxd_t *txd = NULL; - pid_t pid = current->pid; + pid_t pid = current->pid; CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", @@ -825,8 +825,15 @@ gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, } +/* + * extract info from the receive event. + * Have to do this before the next call to gm_receive + * Deal with all endian stuff here. + * Then stick work entry on list where rxthreads + * can get it to complete the receive + */ int -gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent) +gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv) { gmnal_rxtwe_t *we = NULL; @@ -837,7 +844,11 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent) CDEBUG(D_ERROR, "failed to malloc\n"); return(GMNAL_STATUS_FAIL); } - we->rx = rxevent; + we->buffer = gm_ntohp(recv->buffer); + we->snode = (int)gm_ntoh_u16(recv->sender_node_id); + we->sport = (int)gm_ntoh_u8(recv->sender_port_id); + we->type = (int)gm_ntoh_u8(recv->type); + we->length = (int)gm_ntohl(recv->length); spin_lock(&nal_data->rxtwe_lock); if (nal_data->rxtwe_tail) { diff --git a/lustre/portals/knals/gmnal/gmnal.h b/lustre/portals/knals/gmnal/gmnal.h index fdde839..2db6c9b 100644 --- a/lustre/portals/knals/gmnal/gmnal.h +++ b/lustre/portals/knals/gmnal/gmnal.h @@ -159,7 +159,11 @@ typedef struct _gmnal_msghdr { * transmit descriptors on the free list) */ typedef struct _gmnal_rxtwe { - gm_recv_event_t *rx; + void *buffer; + unsigned snode; + unsigned sport; + unsigned type; + unsigned length; struct _gmnal_rxtwe *next; } gmnal_rxtwe_t; @@ -408,10 +412,10 @@ int gmnal_start_kernel_threads(gmnal_data_t *); */ int gmnal_ct_thread(void *); /* caretaker thread */ int gmnal_rx_thread(void *); /* receive thread */ -int gmnal_pre_receive(gmnal_data_t*, gm_recv_t*, int); -int gmnal_rx_bad(gmnal_data_t *, gm_recv_t *, gmnal_srxd_t *); +int gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int); +int gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*); int gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *); -int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_event_t *); +int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *); gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *); void gmnal_remove_rxtwe(gmnal_data_t *); diff --git a/lustre/portals/knals/gmnal/gmnal_comm.c b/lustre/portals/knals/gmnal/gmnal_comm.c index 9e32145..772b9cc 100644 --- a/lustre/portals/knals/gmnal/gmnal_comm.c +++ b/lustre/portals/knals/gmnal/gmnal_comm.c @@ -38,6 +38,7 @@ gmnal_ct_thread(void *arg) { gmnal_data_t *nal_data; gm_recv_event_t *rxevent = NULL; + gm_recv_t *recv = NULL; if (!arg) { CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); @@ -55,17 +56,18 @@ gmnal_ct_thread(void *arg) while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) { CDEBUG(D_NET, "waiting\n"); rxevent = gm_blocking_receive_no_spin(nal_data->gm_port); - CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { CDEBUG(D_INFO, "time to exit\n"); break; } + CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); switch (GM_RECV_EVENT_TYPE(rxevent)) { case(GM_RECV_EVENT): CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n"); + recv = (gm_recv_t*)&rxevent->recv; GMNAL_GM_UNLOCK(nal_data); - gmnal_add_rxtwe(nal_data, rxevent); + gmnal_add_rxtwe(nal_data, recv); GMNAL_GM_LOCK(nal_data); CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n"); break; @@ -109,8 +111,6 @@ gmnal_ct_thread(void *arg) int gmnal_rx_thread(void *arg) { gmnal_data_t *nal_data; - gm_recv_event_t *rxevent = NULL; - gm_recv_t *recv = NULL; void *buffer; gmnal_rxtwe_t *we = NULL; @@ -142,29 +142,26 @@ int gmnal_rx_thread(void *arg) CDEBUG(D_INFO, "Receive thread time to exit\n"); break; } - rxevent = we->rx; - CDEBUG(D_INFO, "thread got [%s]\n", gmnal_rxevent(rxevent)); - recv = (gm_recv_t*)&(rxevent->recv); - buffer = gm_ntohp(recv->buffer); - PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); + buffer = we->buffer; switch(((gmnal_msghdr_t*)buffer)->type) { case(GMNAL_SMALL_MESSAGE): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE); break; case(GMNAL_LARGE_MESSAGE_INIT): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_LARGE_MESSAGE_INIT); break; case(GMNAL_LARGE_MESSAGE_ACK): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_LARGE_MESSAGE_ACK); break; default: CDEBUG(D_ERROR, "Unsupported message type\n"); - gmnal_rx_bad(nal_data, recv, NULL); + gmnal_rx_bad(nal_data, we, NULL); } + PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); } spin_lock(&nal_data->rxthread_flag_lock); @@ -185,7 +182,7 @@ int gmnal_rx_thread(void *arg) * Deal with all endian stuff here. */ int -gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type) +gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type) { gmnal_srxd_t *srxd = NULL; void *buffer = NULL; @@ -193,15 +190,15 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type) gmnal_msghdr_t *gmnal_msghdr; ptl_hdr_t *portals_hdr; - CDEBUG(D_INFO, "nal_data [%p], recv [%p] type [%d]\n", - nal_data, recv, gmnal_type); + CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", + nal_data, we, gmnal_type); - buffer = gm_ntohp(recv->buffer);; - snode = (int)gm_ntoh_u16(recv->sender_node_id); - sport = (int)gm_ntoh_u8(recv->sender_port_id); - type = (int)gm_ntoh_u8(recv->type); - buffer = gm_ntohp(recv->buffer); - length = (int) gm_ntohl(recv->length); + buffer = we->buffer; + snode = we->snode; + sport = we->sport; + type = we->type; + buffer = we->buffer; + length = we->length; gmnal_msghdr = (gmnal_msghdr_t*)buffer; portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE); @@ -281,13 +278,13 @@ gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) * A bad message is one we don't expect or can't interpret */ int -gmnal_rx_bad(gmnal_data_t *nal_data, gm_recv_t *recv, gmnal_srxd_t *srxd) +gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd) { CDEBUG(D_TRACE, "Can't handle message\n"); if (!srxd) srxd = gmnal_rxbuffer_to_srxd(nal_data, - gm_ntohp(recv->buffer)); + we->buffer); if (srxd) { gmnal_rx_requeue_buffer(nal_data, srxd); } else { diff --git a/lustre/portals/knals/gmnal/gmnal_utils.c b/lustre/portals/knals/gmnal/gmnal_utils.c index 84fc3a0..d40a943 100644 --- a/lustre/portals/knals/gmnal/gmnal_utils.c +++ b/lustre/portals/knals/gmnal/gmnal_utils.c @@ -192,7 +192,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block) { gmnal_stxd_t *txd = NULL; - pid_t pid = current->pid; + pid_t pid = current->pid; CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", @@ -825,8 +825,15 @@ gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, } +/* + * extract info from the receive event. + * Have to do this before the next call to gm_receive + * Deal with all endian stuff here. + * Then stick work entry on list where rxthreads + * can get it to complete the receive + */ int -gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent) +gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv) { gmnal_rxtwe_t *we = NULL; @@ -837,7 +844,11 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent) CDEBUG(D_ERROR, "failed to malloc\n"); return(GMNAL_STATUS_FAIL); } - we->rx = rxevent; + we->buffer = gm_ntohp(recv->buffer); + we->snode = (int)gm_ntoh_u16(recv->sender_node_id); + we->sport = (int)gm_ntoh_u8(recv->sender_port_id); + we->type = (int)gm_ntoh_u8(recv->type); + we->length = (int)gm_ntohl(recv->length); spin_lock(&nal_data->rxtwe_lock); if (nal_data->rxtwe_tail) { -- 1.8.3.1