Whamcloud - gitweb
extract all required info from gm_recv_event_t before next call to gm_receive.
authormdoyle <mdoyle>
Tue, 7 Oct 2003 10:18:38 +0000 (10:18 +0000)
committermdoyle <mdoyle>
Tue, 7 Oct 2003 10:18:38 +0000 (10:18 +0000)
lnet/klnds/gmlnd/gmlnd.h
lnet/klnds/gmlnd/gmlnd_comm.c
lnet/klnds/gmlnd/gmlnd_utils.c
lustre/portals/knals/gmnal/gmnal.h
lustre/portals/knals/gmnal/gmnal_comm.c
lustre/portals/knals/gmnal/gmnal_utils.c

index fdde839..2db6c9b 100644 (file)
@@ -159,7 +159,11 @@ typedef struct     _gmnal_msghdr {
  *     transmit descriptors on the free list)
  */
 typedef struct _gmnal_rxtwe {
-       gm_recv_event_t *rx;
+       void                    *buffer;
+       unsigned                snode;
+       unsigned                sport;
+       unsigned                type;
+       unsigned                length;
        struct _gmnal_rxtwe     *next;
 } gmnal_rxtwe_t;
 
@@ -408,10 +412,10 @@ int               gmnal_start_kernel_threads(gmnal_data_t *);
  */
 int            gmnal_ct_thread(void *); /* caretaker thread */
 int            gmnal_rx_thread(void *); /* receive thread */
-int            gmnal_pre_receive(gmnal_data_t*, gm_recv_t*, int);
-int            gmnal_rx_bad(gmnal_data_t *, gm_recv_t *, gmnal_srxd_t *);
+int            gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int);
+int            gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*);
 int            gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *);
-int            gmnal_add_rxtwe(gmnal_data_t *, gm_recv_event_t *);
+int            gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *);
 gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *);
 void           gmnal_remove_rxtwe(gmnal_data_t *);
 
index 9e32145..772b9cc 100644 (file)
@@ -38,6 +38,7 @@ gmnal_ct_thread(void *arg)
 {
        gmnal_data_t            *nal_data;
        gm_recv_event_t         *rxevent = NULL;
+       gm_recv_t               *recv = NULL;
 
        if (!arg) {
                CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
@@ -55,17 +56,18 @@ gmnal_ct_thread(void *arg)
        while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
                CDEBUG(D_NET, "waiting\n");
                rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
-               CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
                if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
                        CDEBUG(D_INFO, "time to exit\n");
                        break;
                }
+               CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
                switch (GM_RECV_EVENT_TYPE(rxevent)) {
 
                        case(GM_RECV_EVENT):
                                CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
+                               recv = (gm_recv_t*)&rxevent->recv;
                                GMNAL_GM_UNLOCK(nal_data);
-                               gmnal_add_rxtwe(nal_data, rxevent);
+                               gmnal_add_rxtwe(nal_data, recv);
                                GMNAL_GM_LOCK(nal_data);
                                CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
                        break;
@@ -109,8 +111,6 @@ gmnal_ct_thread(void *arg)
 int gmnal_rx_thread(void *arg)
 {
        gmnal_data_t            *nal_data;
-       gm_recv_event_t         *rxevent = NULL;
-       gm_recv_t               *recv = NULL;
        void                    *buffer;
        gmnal_rxtwe_t           *we = NULL;
 
@@ -142,29 +142,26 @@ int gmnal_rx_thread(void *arg)
                        CDEBUG(D_INFO, "Receive thread time to exit\n");
                        break;
                }
-               rxevent = we->rx;
-               CDEBUG(D_INFO, "thread got [%s]\n", gmnal_rxevent(rxevent));
-               recv = (gm_recv_t*)&(rxevent->recv);
-               buffer = gm_ntohp(recv->buffer);
-               PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
 
+               buffer = we->buffer;
                switch(((gmnal_msghdr_t*)buffer)->type) {
                case(GMNAL_SMALL_MESSAGE):
-                       gmnal_pre_receive(nal_data, recv
+                       gmnal_pre_receive(nal_data, we
                                           GMNAL_SMALL_MESSAGE);
                break;  
                case(GMNAL_LARGE_MESSAGE_INIT):
-                       gmnal_pre_receive(nal_data, recv
+                       gmnal_pre_receive(nal_data, we
                                           GMNAL_LARGE_MESSAGE_INIT);
                break;  
                case(GMNAL_LARGE_MESSAGE_ACK):
-                       gmnal_pre_receive(nal_data, recv
+                       gmnal_pre_receive(nal_data, we
                                           GMNAL_LARGE_MESSAGE_ACK);
                break;  
                default:
                        CDEBUG(D_ERROR, "Unsupported message type\n");
-                       gmnal_rx_bad(nal_data, recv, NULL);
+                       gmnal_rx_bad(nal_data, we, NULL);
                }
+               PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
        }
 
        spin_lock(&nal_data->rxthread_flag_lock);
@@ -185,7 +182,7 @@ int gmnal_rx_thread(void *arg)
  *     Deal with all endian stuff here.
  */
 int
-gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type)
+gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
 {
        gmnal_srxd_t    *srxd = NULL;
        void            *buffer = NULL;
@@ -193,15 +190,15 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type)
        gmnal_msghdr_t  *gmnal_msghdr;
        ptl_hdr_t       *portals_hdr;
 
-       CDEBUG(D_INFO, "nal_data [%p], recv [%p] type [%d]\n", 
-              nal_data, recv, gmnal_type);
+       CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", 
+              nal_data, we, gmnal_type);
 
-       buffer = gm_ntohp(recv->buffer);;
-       snode = (int)gm_ntoh_u16(recv->sender_node_id);
-       sport = (int)gm_ntoh_u8(recv->sender_port_id);
-       type = (int)gm_ntoh_u8(recv->type);
-       buffer = gm_ntohp(recv->buffer);
-       length = (int) gm_ntohl(recv->length);
+       buffer = we->buffer;
+       snode = we->snode;
+       sport = we->sport;
+       type = we->type;
+       buffer = we->buffer;
+       length = we->length;
 
        gmnal_msghdr = (gmnal_msghdr_t*)buffer;
        portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
@@ -281,13 +278,13 @@ gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
  *     A bad message is one we don't expect or can't interpret
  */
 int
-gmnal_rx_bad(gmnal_data_t *nal_data, gm_recv_t *recv, gmnal_srxd_t *srxd)
+gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
 {
        CDEBUG(D_TRACE, "Can't handle message\n");
 
        if (!srxd)
                srxd = gmnal_rxbuffer_to_srxd(nal_data, 
-                                              gm_ntohp(recv->buffer));
+                                              we->buffer);
        if (srxd) {
                gmnal_rx_requeue_buffer(nal_data, srxd);
        } else {
index 84fc3a0..d40a943 100644 (file)
@@ -192,7 +192,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block)
 {
 
        gmnal_stxd_t    *txd = NULL;
-        pid_t           pid = current->pid;
+       pid_t           pid = current->pid;
 
 
        CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", 
@@ -825,8 +825,15 @@ gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov,
 
 }
 
+/* 
+ *     extract info from the receive event.
+ *     Have to do this before the next call to gm_receive
+ *     Deal with all endian stuff here.
+ *     Then stick work entry on list where rxthreads
+ *     can get it to complete the receive
+ */
 int
-gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent)
+gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
 {
        gmnal_rxtwe_t   *we = NULL;
 
@@ -837,7 +844,11 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent)
                CDEBUG(D_ERROR, "failed to malloc\n");
                return(GMNAL_STATUS_FAIL);
        }
-        we->rx = rxevent;
+       we->buffer = gm_ntohp(recv->buffer);
+       we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
+       we->sport = (int)gm_ntoh_u8(recv->sender_port_id);
+       we->type = (int)gm_ntoh_u8(recv->type);
+       we->length = (int)gm_ntohl(recv->length);
 
        spin_lock(&nal_data->rxtwe_lock);
        if (nal_data->rxtwe_tail) {
index fdde839..2db6c9b 100644 (file)
@@ -159,7 +159,11 @@ typedef struct     _gmnal_msghdr {
  *     transmit descriptors on the free list)
  */
 typedef struct _gmnal_rxtwe {
-       gm_recv_event_t *rx;
+       void                    *buffer;
+       unsigned                snode;
+       unsigned                sport;
+       unsigned                type;
+       unsigned                length;
        struct _gmnal_rxtwe     *next;
 } gmnal_rxtwe_t;
 
@@ -408,10 +412,10 @@ int               gmnal_start_kernel_threads(gmnal_data_t *);
  */
 int            gmnal_ct_thread(void *); /* caretaker thread */
 int            gmnal_rx_thread(void *); /* receive thread */
-int            gmnal_pre_receive(gmnal_data_t*, gm_recv_t*, int);
-int            gmnal_rx_bad(gmnal_data_t *, gm_recv_t *, gmnal_srxd_t *);
+int            gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int);
+int            gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*);
 int            gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *);
-int            gmnal_add_rxtwe(gmnal_data_t *, gm_recv_event_t *);
+int            gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *);
 gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *);
 void           gmnal_remove_rxtwe(gmnal_data_t *);
 
index 9e32145..772b9cc 100644 (file)
@@ -38,6 +38,7 @@ gmnal_ct_thread(void *arg)
 {
        gmnal_data_t            *nal_data;
        gm_recv_event_t         *rxevent = NULL;
+       gm_recv_t               *recv = NULL;
 
        if (!arg) {
                CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
@@ -55,17 +56,18 @@ gmnal_ct_thread(void *arg)
        while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
                CDEBUG(D_NET, "waiting\n");
                rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
-               CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
                if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
                        CDEBUG(D_INFO, "time to exit\n");
                        break;
                }
+               CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
                switch (GM_RECV_EVENT_TYPE(rxevent)) {
 
                        case(GM_RECV_EVENT):
                                CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
+                               recv = (gm_recv_t*)&rxevent->recv;
                                GMNAL_GM_UNLOCK(nal_data);
-                               gmnal_add_rxtwe(nal_data, rxevent);
+                               gmnal_add_rxtwe(nal_data, recv);
                                GMNAL_GM_LOCK(nal_data);
                                CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
                        break;
@@ -109,8 +111,6 @@ gmnal_ct_thread(void *arg)
 int gmnal_rx_thread(void *arg)
 {
        gmnal_data_t            *nal_data;
-       gm_recv_event_t         *rxevent = NULL;
-       gm_recv_t               *recv = NULL;
        void                    *buffer;
        gmnal_rxtwe_t           *we = NULL;
 
@@ -142,29 +142,26 @@ int gmnal_rx_thread(void *arg)
                        CDEBUG(D_INFO, "Receive thread time to exit\n");
                        break;
                }
-               rxevent = we->rx;
-               CDEBUG(D_INFO, "thread got [%s]\n", gmnal_rxevent(rxevent));
-               recv = (gm_recv_t*)&(rxevent->recv);
-               buffer = gm_ntohp(recv->buffer);
-               PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
 
+               buffer = we->buffer;
                switch(((gmnal_msghdr_t*)buffer)->type) {
                case(GMNAL_SMALL_MESSAGE):
-                       gmnal_pre_receive(nal_data, recv
+                       gmnal_pre_receive(nal_data, we
                                           GMNAL_SMALL_MESSAGE);
                break;  
                case(GMNAL_LARGE_MESSAGE_INIT):
-                       gmnal_pre_receive(nal_data, recv
+                       gmnal_pre_receive(nal_data, we
                                           GMNAL_LARGE_MESSAGE_INIT);
                break;  
                case(GMNAL_LARGE_MESSAGE_ACK):
-                       gmnal_pre_receive(nal_data, recv
+                       gmnal_pre_receive(nal_data, we
                                           GMNAL_LARGE_MESSAGE_ACK);
                break;  
                default:
                        CDEBUG(D_ERROR, "Unsupported message type\n");
-                       gmnal_rx_bad(nal_data, recv, NULL);
+                       gmnal_rx_bad(nal_data, we, NULL);
                }
+               PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
        }
 
        spin_lock(&nal_data->rxthread_flag_lock);
@@ -185,7 +182,7 @@ int gmnal_rx_thread(void *arg)
  *     Deal with all endian stuff here.
  */
 int
-gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type)
+gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
 {
        gmnal_srxd_t    *srxd = NULL;
        void            *buffer = NULL;
@@ -193,15 +190,15 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type)
        gmnal_msghdr_t  *gmnal_msghdr;
        ptl_hdr_t       *portals_hdr;
 
-       CDEBUG(D_INFO, "nal_data [%p], recv [%p] type [%d]\n", 
-              nal_data, recv, gmnal_type);
+       CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", 
+              nal_data, we, gmnal_type);
 
-       buffer = gm_ntohp(recv->buffer);;
-       snode = (int)gm_ntoh_u16(recv->sender_node_id);
-       sport = (int)gm_ntoh_u8(recv->sender_port_id);
-       type = (int)gm_ntoh_u8(recv->type);
-       buffer = gm_ntohp(recv->buffer);
-       length = (int) gm_ntohl(recv->length);
+       buffer = we->buffer;
+       snode = we->snode;
+       sport = we->sport;
+       type = we->type;
+       buffer = we->buffer;
+       length = we->length;
 
        gmnal_msghdr = (gmnal_msghdr_t*)buffer;
        portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
@@ -281,13 +278,13 @@ gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
  *     A bad message is one we don't expect or can't interpret
  */
 int
-gmnal_rx_bad(gmnal_data_t *nal_data, gm_recv_t *recv, gmnal_srxd_t *srxd)
+gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
 {
        CDEBUG(D_TRACE, "Can't handle message\n");
 
        if (!srxd)
                srxd = gmnal_rxbuffer_to_srxd(nal_data, 
-                                              gm_ntohp(recv->buffer));
+                                              we->buffer);
        if (srxd) {
                gmnal_rx_requeue_buffer(nal_data, srxd);
        } else {
index 84fc3a0..d40a943 100644 (file)
@@ -192,7 +192,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block)
 {
 
        gmnal_stxd_t    *txd = NULL;
-        pid_t           pid = current->pid;
+       pid_t           pid = current->pid;
 
 
        CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", 
@@ -825,8 +825,15 @@ gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov,
 
 }
 
+/* 
+ *     extract info from the receive event.
+ *     Have to do this before the next call to gm_receive
+ *     Deal with all endian stuff here.
+ *     Then stick work entry on list where rxthreads
+ *     can get it to complete the receive
+ */
 int
-gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent)
+gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
 {
        gmnal_rxtwe_t   *we = NULL;
 
@@ -837,7 +844,11 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent)
                CDEBUG(D_ERROR, "failed to malloc\n");
                return(GMNAL_STATUS_FAIL);
        }
-        we->rx = rxevent;
+       we->buffer = gm_ntohp(recv->buffer);
+       we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
+       we->sport = (int)gm_ntoh_u8(recv->sender_port_id);
+       we->type = (int)gm_ntoh_u8(recv->type);
+       we->length = (int)gm_ntohl(recv->length);
 
        spin_lock(&nal_data->rxtwe_lock);
        if (nal_data->rxtwe_tail) {