Whamcloud - gitweb
* cleaned up startup/shutdown handling
authoreeb <eeb>
Fri, 19 Aug 2005 17:11:42 +0000 (17:11 +0000)
committereeb <eeb>
Fri, 19 Aug 2005 17:11:42 +0000 (17:11 +0000)
*    queue rx descriptors for handling by thread directly

lnet/klnds/gmlnd/gmlnd.h
lnet/klnds/gmlnd/gmlnd_api.c
lnet/klnds/gmlnd/gmlnd_cb.c
lnet/klnds/gmlnd/gmlnd_comm.c
lnet/klnds/gmlnd/gmlnd_module.c
lnet/klnds/gmlnd/gmlnd_utils.c

index fe39506..e2dad13 100644 (file)
@@ -21,9 +21,9 @@
 
 
 /*
- *     Portals GM kernel NAL header file
- *     This file makes all declaration and prototypes 
- *     for the API side and CB side of the NAL
+ *      Portals GM kernel NAL header file
+ *      This file makes all declaration and prototypes 
+ *      for the API side and CB side of the NAL
  */
 #ifndef __INCLUDE_GMNAL_H__
 #define __INCLUDE_GMNAL_H__
 #include "gm.h"
 #include "gm_internal.h"
 
-
-
 /*
- *     Defines for the API NAL
+ *      Defines for the API NAL
  */
 
-/*
- *     Small message size is configurable
- *     insmod can set small_msg_size
- *     which is used to populate nal_data.small_msg_size
- */
-#define GMNAL_MAGIC                    0x1234abcd
+/* Wire protocol */
 
-#define GMNAL_SMALL_MESSAGE            1078
+typedef struct {
+        ptl_hdr_t       gmim_hdr;               /* portals header */
+        char            gmim_payload[0];        /* payload */
+} gmnal_immediate_msg_t;
 
-extern  int num_rx_threads;
-extern  int num_stxds;
-extern  int gm_port_id;
+typedef struct {
+        /* First 2 fields fixed FOR ALL TIME */
+        __u32           gmm_magic;              /* I'm a GM message */
+        __u16           gmm_version;            /* this is my version number */
 
-/*
- *     Small Transmit Descriptor
- *     A structre to keep track of a small transmit operation
- *     This structure has a one-to-one relationship with a small
- *     transmit buffer (both create by gmnal_stxd_alloc). 
- *     There are two free list of stxd. One for use by clients of the NAL
- *     and the other by the NAL rxthreads when doing sends. 
- *     This helps prevent deadlock caused by stxd starvation.
- */
-typedef struct gmnal_stxd {
-       struct gmnal_stxd       *tx_next;
-       void                    *tx_buffer;
-       int                      tx_buffer_size;
-       gm_size_t                tx_gm_size;
-       int                      tx_msg_size;
-       int                      tx_gmlid;
-       int                      tx_gm_priority;
-       int                      tx_type;
+        __u16           gmm_type;               /* msg type */
+        __u64           gmm_srcnid;             /* sender's NID */
+        __u64           gmm_dstnid;             /* destination's NID */
+        union {
+                gmnal_immediate_msg_t   immediate;
+        }               gmm_u;
+} WIRE_ATTR gmnal_msg_t;
+
+#define GMNAL_MSG_MAGIC                 0x6d797269 /* 'myri'! */
+#define GMNAL_MSG_VERSION               1
+#define GMNAL_MSG_IMMEDIATE             1
+
+typedef struct gmnal_tx {
+        struct gmnal_tx         *tx_next;
+        gmnal_msg_t             *tx_msg;
+        int                      tx_buffer_size;
+        gm_size_t                tx_gm_size;
+        int                      tx_msg_size;
+        int                      tx_gmlid;
+        int                      tx_gm_priority;
         ptl_nid_t                tx_nid;
-       struct gmnal_ni         *tx_gmni;
-       lib_msg_t               *tx_cookie;
-       int                      tx_niov;
+        struct gmnal_ni         *tx_gmni;
+        lib_msg_t               *tx_libmsg;
         int                      tx_rxt; 
-        int                      tx_kniov;
-        struct iovec            *tx_iovec_dup;
-       struct iovec             tx_iov[PTL_MD_MAX_IOV];
-} gmnal_stxd_t;
+} gmnal_tx_t;
 
 /*
- *     as for gmnal_stxd_t 
- *     a hash table in nal_data find srxds from
- *     the rx buffer address. hash table populated at init time
+ *      as for gmnal_tx_t 
+ *      a hash table in nal_data find rxs from
+ *      the rx buffer address. hash table populated at init time
  */
-typedef struct gmnal_srxd {
-       void                    *rx_buffer;
-       int                      rx_size;
-       gm_size_t                rx_gmsize;
-       unsigned int             rx_sender_gmid;
-       __u64                    rx_source_stxd;
-       int                      rx_type;
-       int                      rx_nsiov;
-       int                      rx_nriov;
-       struct iovec            *rx_riov;
-       int                      rx_ncallbacks;
-       spinlock_t               rx_callback_lock;
-       int                      rx_callback_status;
-       lib_msg_t               *rx_cookie;
-       struct gmnal_srxd       *rx_next;
-       struct gmnal_ni         *rx_gmni;
-} gmnal_srxd_t;
+typedef struct gmnal_rx {
+        struct list_head         rx_list;
+        gmnal_msg_t             *rx_msg;
+        int                      rx_size;
+        gm_size_t                rx_gmsize;
+        unsigned int             rx_recv_nob;
+        __u16                    rx_recv_gmid;
+        __u8                     rx_recv_port;
+        __u8                     rx_recv_type;
+        struct gmnal_rx         *rx_next;
+} gmnal_rx_t;
 
-/*
- *     Header which lmgnal puts at the start of each message
- *     watch alignment for ia32/64 interaction
- */
-typedef struct gmnal_msghdr {
-       __s32           gmm_magic;
-       __s32           gmm_type;
-       __s32           gmm_niov;
-       __u32           gmm_sender_gmid;
-       __u64           gmm_stxd_remote_ptr;
-} WIRE_ATTR gmnal_msghdr_t;
-
-/*
- *     the caretaker thread (ct_thread) gets receive events
- *     (and other events) from the myrinet device via the GM2 API.
- *     caretaker thread populates one work entry for each receive event,
- *     puts it on a Q in nal_data and wakes a receive thread to  
- *     process the receive.  
- *     Processing a portals receive can involve a transmit operation. 
- *     Because of this the caretaker thread cannot process receives 
- *     as it may get deadlocked when supply of transmit descriptors 
- *     is exhausted (as caretaker thread is responsible for replacing 
- *     transmit descriptors on the free list)
- */
-typedef struct gmnal_rxtwe {
-       void                    *buffer;
-       unsigned                snode;
-       unsigned                sport;
-       unsigned                type;
-       unsigned                length;
-       struct gmnal_rxtwe      *next;
-} gmnal_rxtwe_t;
 
 /*
- *     1 receive thread started on each CPU
+ *      1 receive thread started on each CPU
  */
 #define NRXTHREADS 10 /* max number of receiver threads */
 
 typedef struct gmnal_ni {
-       spinlock_t       gmni_stxd_lock;
-       struct semaphore gmni_stxd_token;
-       gmnal_stxd_t    *gmni_stxd;
-       spinlock_t       gmni_rxt_stxd_lock;
-       struct semaphore gmni_rxt_stxd_token;
-       gmnal_stxd_t    *gmni_rxt_stxd;
-       gmnal_srxd_t    *gmni_srxd;
-       struct gm_hash  *gmni_srxd_hash;
-       nal_t           *gmni_nal;      
-       lib_nal_t       *gmni_libnal;
-       struct gm_port  *gmni_port;
-       __u32            gmni_local_gmid;
-       __u32            gmni_global_gmid;
-       spinlock_t       gmni_gm_lock;          /* serialise GM calls */
-       long             gmni_rxthread_pid[NRXTHREADS];
-       int              gmni_rxthread_stop_flag;
-       spinlock_t       gmni_rxthread_flag_lock;
-       long             gmni_rxthread_flag;
-       long             gmni_ctthread_pid;
-       int              gmni_ctthread_flag;
-       gm_alarm_t       gmni_ctthread_alarm;
-       int              gmni_small_msg_size;
-       int              gmni_small_msg_gmsize;
-       gmnal_rxtwe_t   *gmni_rxtwe_head;
-       gmnal_rxtwe_t   *gmni_rxtwe_tail;
-       spinlock_t       gmni_rxtwe_lock;
-       struct semaphore gmni_rxtwe_wait;
+        spinlock_t       gmni_tx_lock;
+        struct semaphore gmni_tx_token;
+        gmnal_tx_t      *gmni_tx;
+        spinlock_t       gmni_rxt_tx_lock;
+        struct semaphore gmni_rxt_tx_token;
+        gmnal_tx_t      *gmni_rxt_tx;
+        gmnal_rx_t      *gmni_rx;
+        struct gm_hash  *gmni_rx_hash;
+        lib_nal_t       *gmni_libnal;
+        struct gm_port  *gmni_port;
+        spinlock_t       gmni_gm_lock;          /* serialise GM calls */
+        long             gmni_rxthread_pid[NRXTHREADS];
+        int              gmni_rxthread_stop_flag;
+        spinlock_t       gmni_rxthread_flag_lock;
+        long             gmni_rxthread_flag;
+        long             gmni_ctthread_pid;
+        int              gmni_ctthread_flag;
+        gm_alarm_t       gmni_ctthread_alarm;
+        int              gmni_msg_size;
+        struct list_head gmni_rxq;
+        spinlock_t       gmni_rxq_lock;
+        struct semaphore gmni_rxq_wait;
 } gmnal_ni_t;
 
 /*
- *     Flags to start/stop and check status of threads
- *     each rxthread sets 1 bit (any bit) of the flag on startup
- *     and clears 1 bit when exiting
+ *      Flags to start/stop and check status of threads
+ *      each rxthread sets 1 bit (any bit) of the flag on startup
+ *      and clears 1 bit when exiting
  */
-#define GMNAL_THREAD_RESET     0
-#define GMNAL_THREAD_STOP      666
-#define GMNAL_CTTHREAD_STARTED 333
+#define GMNAL_THREAD_RESET      0
+#define GMNAL_THREAD_STOP       666
+#define GMNAL_CTTHREAD_STARTED  333
 #define GMNAL_RXTHREADS_STARTED ( (1<<num_rx_threads)-1)
 
 
 /*
  * for ioctl get pid
  */
-#define GMNAL_IOC_GET_GNID 1   
-
-/*
- *     FUNCTION PROTOTYPES
- */
-
-/*
- *     API NAL
- */
-int gmnal_api_startup(nal_t *, ptl_pid_t, 
-                      ptl_ni_limits_t *, ptl_ni_limits_t *);
-
-int gmnal_api_forward(nal_t *, int, void *, size_t, void *, size_t);
-
-void gmnal_api_shutdown(nal_t *);
-
-int gmnal_api_validate(nal_t *, void *, size_t);
-
-void gmnal_api_yield(nal_t *, unsigned long *, int);
-
-void gmnal_api_lock(nal_t *, unsigned long *);
-
-void gmnal_api_unlock(nal_t *, unsigned long *);
-
-
-/*
- *     CB NAL
- */
-
-ptl_err_t gmnal_cb_send(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
-       int, ptl_nid_t, ptl_pid_t, unsigned int, struct iovec *, size_t, size_t);
-
-ptl_err_t gmnal_cb_send_pages(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
-       int, ptl_nid_t, ptl_pid_t, unsigned int, ptl_kiov_t *, size_t, size_t);
-
-ptl_err_t gmnal_cb_recv(lib_nal_t *, void *, lib_msg_t *, 
-       unsigned int, struct iovec *, size_t, size_t, size_t);
-
-ptl_err_t gmnal_cb_recv_pages(lib_nal_t *, void *, lib_msg_t *, 
-       unsigned int, ptl_kiov_t *, size_t, size_t, size_t);
+#define GMNAL_IOC_GET_GNID 1    
 
-int gmnal_cb_dist(lib_nal_t *, ptl_nid_t, unsigned long *);
 
+/* gmnal_api.c */
 int gmnal_init(void);
-
 void  gmnal_fini(void);
 
-
-/*
- *     Small and Large Transmit and Receive Descriptor Functions
- */
-int            gmnal_alloc_txd(gmnal_ni_t *);
-void           gmnal_free_txd(gmnal_ni_t *);
-gmnal_stxd_t*  gmnal_get_stxd(gmnal_ni_t *, int);
-void           gmnal_return_stxd(gmnal_ni_t *, gmnal_stxd_t *);
-
-int            gmnal_alloc_srxd(gmnal_ni_t *);
-void           gmnal_free_srxd(gmnal_ni_t *);
-
-/*
- *     general utility functions
- */
-gmnal_srxd_t   *gmnal_rxbuffer_to_srxd(gmnal_ni_t *, void*);
-void           gmnal_stop_rxthread(gmnal_ni_t *);
-void           gmnal_stop_ctthread(gmnal_ni_t *);
-void           gmnal_drop_sends_callback(gm_port_t *, void *, gm_status_t);
-void           gmnal_resume_sending_callback(gm_port_t *, void *, gm_status_t);
-char           *gmnal_gm_error(gm_status_t);
-char           *gmnal_rxevent(gm_recv_event_t*);
-void           gmnal_yield(int);
-int            gmnal_start_kernel_threads(gmnal_ni_t *);
-
-
-/*
- *     Communication functions
- */
-
-/*
- *     Receive threads
- */
-int            gmnal_ct_thread(void *); /* caretaker thread */
-int            gmnal_rx_thread(void *); /* receive thread */
-void           gmnal_pre_receive(gmnal_ni_t*, gmnal_rxtwe_t*, int);
-void           gmnal_rx_bad(gmnal_ni_t *, gmnal_rxtwe_t *);
-void           gmnal_rx_requeue_buffer(gmnal_ni_t *, gmnal_srxd_t *);
-int            gmnal_add_rxtwe(gmnal_ni_t *, gm_recv_t *);
-gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_ni_t *);
-void           gmnal_remove_rxtwe(gmnal_ni_t *);
-
-
-/*
- *     Small messages
- */
-ptl_err_t       gmnal_small_tx(lib_nal_t *libnal, void *private, 
-                               lib_msg_t *cookie, ptl_hdr_t *hdr, 
-                               int type, ptl_nid_t nid, 
-                               gmnal_stxd_t *stxd, int size);
-void           gmnal_small_tx_callback(gm_port_t *, void *, gm_status_t);
+/* gmnal_cb.c */
+ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, 
+                        lib_msg_t *libmsg,
+                        unsigned int niov, struct iovec *iov, 
+                        size_t offset, size_t mlen, size_t rlen);
+ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, 
+                              lib_msg_t *libmsg, 
+                              unsigned int nkiov, ptl_kiov_t *kiov, 
+                              size_t offset, size_t mlen, size_t rlen);
+ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, 
+                        lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, 
+                        ptl_nid_t nid, ptl_pid_t pid,
+                        unsigned int niov, struct iovec *iov, 
+                        size_t offset, size_t len);
+ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
+                              lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
+                              ptl_nid_t nid, ptl_pid_t pid, 
+                              unsigned int nkiov, ptl_kiov_t *kiov, 
+                              size_t offset, size_t len);
+int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist);
+
+/* gmnal_util.c */
+int gmnal_is_rxthread(gmnal_ni_t *gmnalni);
+int gmnal_alloc_txs(gmnal_ni_t *gmnalni);
+void gmnal_free_txs(gmnal_ni_t *gmnalni);
+gmnal_tx_t *gmnal_get_tx(gmnal_ni_t *gmnalni, int block);
+void gmnal_return_tx(gmnal_ni_t *gmnalni, gmnal_tx_t *tx);
+int gmnal_alloc_rxs(gmnal_ni_t *gmnalni);
+void gmnal_free_rxs(gmnal_ni_t *gmnalni);
+void gmnal_stop_rxthread(gmnal_ni_t *gmnalni);
+void gmnal_stop_ctthread(gmnal_ni_t *gmnalni);
+char *gmnal_gmstatus2str(gm_status_t status);
+char *gmnal_rxevent2str(gm_recv_event_t *ev);
+void gmnal_yield(int delay);
+int gmnal_enqueue_rx(gmnal_ni_t *gmnalni, gm_recv_t *recv);
+gmnal_rx_t *gmnal_dequeue_rx(gmnal_ni_t *gmnalni);
+int gmnal_start_kernel_threads(gmnal_ni_t *gmnalni);
+
+/* gmnal_comm.c */
+void gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
+                    ptl_nid_t dstnid, int type);
+int gmnal_ct_thread(void *arg);
+int gmnal_rx_thread(void *arg);
+void gmnal_post_rx(gmnal_ni_t *gmnalni, gmnal_rx_t *rx);
+ptl_err_t gmnal_post_tx(gmnal_ni_t *gmnalni, gmnal_tx_t *tx, 
+                        lib_msg_t *libmsg, ptl_nid_t nid, int nob);
+
+
+/* Module Parameters */
+extern  int num_rx_threads;
+extern  int num_txds;
+extern  int gm_port_id;
 
 #endif /*__INCLUDE_GMNAL_H__*/
index 105be90..c721ddd 100644 (file)
 
 #include "gmnal.h"
 
-ptl_handle_ni_t kgmnal_ni;
+int
+gmnal_cmd(struct portals_cfg *pcfg, void *private)
+{
+       gmnal_ni_t      *gmnalni = private;
+       char            *name;
+       int              nid;
+       int              gmid;
+       gm_status_t      gm_status;
+
+       CDEBUG(D_TRACE, "gmnal_cmd [%d] private [%p]\n",
+              pcfg->pcfg_command, private);
+       gmnalni = (gmnal_ni_t*)private;
+
+       switch(pcfg->pcfg_command) {
+       case GMNAL_IOC_GET_GNID:
+
+               PORTAL_ALLOC(name, pcfg->pcfg_plen1);
+               copy_from_user(name, PCFG_PBUF(pcfg, 1), pcfg->pcfg_plen1);
+
+               spin_lock(&gmnalni->gmni_gm_lock);
+                gm_status = gm_host_name_to_node_id_ex(gmnalni->gmni_port, 0,
+                                                       name, &nid);
+               spin_unlock(&gmnalni->gmni_gm_lock);
+                if (gm_status != GM_SUCCESS) {
+                        CDEBUG(D_NET, "gm_host_name_to_node_id_ex(...host %s) "
+                               "failed[%d]\n", name, gm_status);
+                        return -ENOENT;
+                }
+
+                CDEBUG(D_NET, "Local node %s id is [%d]\n", name, nid);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_status = gm_node_id_to_global_id(gmnalni->gmni_port,
+                                                   nid, &gmid);
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               if (gm_status != GM_SUCCESS) {
+                       CDEBUG(D_NET, "gm_node_id_to_global_id failed[%d]\n",
+                              gm_status);
+                       return -ENOENT;
+               }
+
+               CDEBUG(D_NET, "Global node is is [%u][%x]\n", gmid, gmid);
+               copy_to_user(PCFG_PBUF(pcfg, 2), &gmid, pcfg->pcfg_plen2);
+                return 0;
+
+       case NAL_CMD_REGISTER_MYNID:
+                /* Same NID OK */
+                if (pcfg->pcfg_nid == gmnalni->gmni_libnal->libnal_ni.ni_pid.nid)
+                        return 0;
+
+                CERROR("Can't change NID from "LPD64" to "LPD64"\n",
+                       gmnalni->gmni_libnal->libnal_ni.ni_pid.nid,
+                       pcfg->pcfg_nid);
+                return -EINVAL;
+
+       default:
+               CERROR ("gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command);
+               return -EINVAL;
+       }
+        /* not reached */
+}
+
+ptl_nid_t
+gmnal_get_local_nid (gmnal_ni_t *gmnalni)
+{
+       unsigned int     local_gmid;
+        unsigned int     global_gmid;
+        ptl_nid_t        nid;
+        gm_status_t      gm_status;
+
+        /* Called before anything initialised: no need to lock */
+
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_status = gm_get_node_id(gmnalni->gmni_port, &local_gmid);
+       spin_unlock(&gmnalni->gmni_gm_lock);
+       if (gm_status != GM_SUCCESS)
+               return PTL_NID_ANY;
+
+       CDEBUG(D_NET, "Local node id is [%u]\n", local_gmid);
+        
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, 
+                                            local_gmid, 
+                                           &global_gmid);
+       spin_unlock(&gmnalni->gmni_gm_lock);
+       if (gm_status != GM_SUCCESS)
+               return PTL_NID_ANY;
+        
+       CDEBUG(D_NET, "Global node id is [%u]\n", global_gmid);
+
+        nid = (__u64)global_gmid;
+        LASSERT (nid != PTL_NID_ANY);
+        
+        return global_gmid;
+}
 
-extern int gmnal_cmd(struct portals_cfg *pcfg, void *private);
 
-/*
- *     gmnal_api_shutdown
- *      nal_refct == 0 => called on last matching PtlNIFini()
- *     Close down this interface and free any resources associated with it
- *     nal_t   nal     our nal to shutdown
- */
 void
 gmnal_api_shutdown(nal_t *nal)
 {
-       gmnal_ni_t      *gmnalni;
-       lib_nal_t       *libnal;
+       lib_nal_t       *libnal = nal->nal_data;
+       gmnal_ni_t      *gmnalni = libnal->libnal_data;
 
         if (nal->nal_refct != 0) {
                 /* This module got the first ref */
@@ -47,32 +133,28 @@ gmnal_api_shutdown(nal_t *nal)
                 return;
         }
 
-        libnal = (lib_nal_t *)nal->nal_data;
-        gmnalni = (gmnal_ni_t *)libnal->libnal_data;
        CDEBUG(D_TRACE, "gmnal_api_shutdown: gmnalni [%p]\n", gmnalni);
 
         /* Stop portals calling our ioctl handler */
         libcfs_nal_cmd_unregister(GMNAL);
 
-        /* XXX for shutdown "under fire" we probably need to set a shutdown
-         * flag so when lib calls us we fail immediately and dont queue any
-         * more work but our threads can still call into lib OK.  THEN
-         * shutdown our threads, THEN lib_fini() */
-        lib_fini(libnal);
-
-       gmnal_stop_rxthread(gmnalni);
+        /* stop processing messages */
        gmnal_stop_ctthread(gmnalni);
-       gmnal_free_txd(gmnalni);
-       gmnal_free_srxd(gmnalni);
+       gmnal_stop_rxthread(gmnalni);
+
        spin_lock(&gmnalni->gmni_gm_lock);
        gm_close(gmnalni->gmni_port);
        gm_finalize();
        spin_unlock(&gmnalni->gmni_gm_lock);
-        /* Don't free 'nal'; it's a static struct */
-       PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
-       PORTAL_FREE(libnal, sizeof(lib_nal_t));
-}
 
+        lib_fini(libnal);
+
+       gmnal_free_txs(gmnalni);
+       gmnal_free_rxs(gmnalni);
+
+       PORTAL_FREE(gmnalni, sizeof(*gmnalni));
+       PORTAL_FREE(libnal, sizeof(*libnal));
+}
 
 int
 gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
@@ -82,10 +164,10 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
 
        lib_nal_t       *libnal = NULL;
        gmnal_ni_t      *gmnalni = NULL;
-       gmnal_srxd_t    *srxd = NULL;
-       gm_status_t     gm_status;
-       unsigned int    local_gmid = 0, global_gmid = 0;
+       gmnal_rx_t      *rx = NULL;
+       gm_status_t      gm_status;
         ptl_process_id_t process_id;
+        int              rc;
 
         if (nal->nal_refct != 0) {
                 if (actual_limits != NULL) {
@@ -93,49 +175,36 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                         *actual_limits = libnal->libnal_ni.ni_actual_limits;
                 }
                 PORTAL_MODULE_USE;
-                return (PTL_OK);
+                return PTL_OK;
         }
 
         /* Called on first PtlNIInit() */
-
        CDEBUG(D_TRACE, "startup\n");
 
-       PORTAL_ALLOC(gmnalni, sizeof(gmnal_ni_t));
-       if (!gmnalni) {
-               CERROR("can't get memory\n");
-               return(PTL_NO_SPACE);
+       PORTAL_ALLOC(gmnalni, sizeof(*gmnalni));
+       if (gmnalni == NULL) {
+               CERROR("can't allocate gmnalni\n");
+                return PTL_FAIL;
+        }
+        
+       PORTAL_ALLOC(libnal, sizeof(*libnal));
+       if (libnal == NULL) {
+               CERROR("can't allocate lib_nal\n");
+                goto failed_0;
        }       
-       memset(gmnalni, 0, sizeof(gmnal_ni_t));
-       /*
-        *      set the small message buffer size 
-        */
-
-       CDEBUG(D_NET, "Allocd and reset gmnalni[%p]\n", gmnalni);
-       CDEBUG(D_NET, "small_msg_size is [%d]\n", gmnalni->gmni_small_msg_size);
 
-       PORTAL_ALLOC(libnal, sizeof(lib_nal_t));
-       if (!libnal) {
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
-               return(PTL_NO_SPACE);
-       }
-
-       memset(libnal, 0, sizeof(lib_nal_t));
-        libnal->libnal_send = gmnal_cb_send;
-        libnal->libnal_send_pages = gmnal_cb_send_pages;
-        libnal->libnal_recv = gmnal_cb_recv;
-        libnal->libnal_recv_pages = gmnal_cb_recv_pages;
-        libnal->libnal_map = NULL;
-        libnal->libnal_unmap = NULL;
-        libnal->libnal_dist = gmnal_cb_dist;
-        libnal->libnal_data = gmnalni;
-
-       CDEBUG(D_NET, "Allocd and reset libnal[%p]\n", libnal);
-
-       gmnalni->gmni_nal = nal;
+       memset(gmnalni, 0, sizeof(*gmnalni));
        gmnalni->gmni_libnal = libnal;
-
        spin_lock_init(&gmnalni->gmni_gm_lock);
 
+        *libnal = (lib_nal_t) {
+                .libnal_send       = gmnal_cb_send,
+                .libnal_send_pages = gmnal_cb_send_pages,
+                .libnal_recv       = gmnal_cb_recv,
+                .libnal_recv_pages = gmnal_cb_recv_pages,
+                .libnal_dist       = gmnal_cb_dist,
+                .libnal_data       = gmnalni,
+        };
 
        /*
         *      initialise the interface,
@@ -143,12 +212,9 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
        CDEBUG(D_NET, "Calling gm_init\n");
        if (gm_init() != GM_SUCCESS) {
                CERROR("call to gm_init failed\n");
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
-               PORTAL_FREE(libnal, sizeof(lib_nal_t));
-               return(PTL_FAIL);
+                goto failed_1;
        }
 
-
        CDEBUG(D_NET, "Calling gm_open with port [%d], "
               "name [%s], version [%d]\n", gm_port_id,
               "gmnal", GM_API_VERSION);
@@ -158,202 +224,94 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                            GM_API_VERSION);
        spin_unlock(&gmnalni->gmni_gm_lock);
 
-       CDEBUG(D_NET, "gm_open returned [%d]\n", gm_status);
-       if (gm_status == GM_SUCCESS) {
-               CDEBUG(D_NET,"gm_open succeeded port[%p]\n",gmnalni->gmni_port);
-       } else {
-               switch(gm_status) {
-               case(GM_INVALID_PARAMETER):
-                       CERROR("gm_open Failure. Invalid Parameter\n");
-                       break;
-               case(GM_BUSY):
-                       CERROR("gm_open Failure. GM Busy\n");
-                       break;
-               case(GM_NO_SUCH_DEVICE):
-                       CERROR("gm_open Failure. No such device\n");
-                       break;
-               case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
-                       CERROR("gm_open Failure. Incompatile lib and driver\n");
-                       break;
-               case(GM_OUT_OF_MEMORY):
-                       CERROR("gm_open Failure. Out of Memory\n");
-                       break;
-               default:
-                       CERROR("gm_open Failure. Unknow error code [%d]\n",
-                               gm_status);
-                       break;
-               }       
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_finalize();
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
-               PORTAL_FREE(libnal, sizeof(lib_nal_t));
-               return(PTL_FAIL);
+        if (gm_status != GM_SUCCESS) {
+                CERROR("Can't open GM port %d: %d (%s)\n",
+                       gm_port_id, gm_status, gmnal_gmstatus2str(gm_status));
+                goto failed_2;
        }
 
-       gmnalni->gmni_small_msg_size = sizeof(gmnal_msghdr_t) + 
-                                        sizeof(ptl_hdr_t) +
-                                        PTL_MTU +
-                                        928;    /* !! */
-        CWARN("Msg size %08x\n", gmnalni->gmni_small_msg_size);
-
-       gmnalni->gmni_small_msg_gmsize =
-                gm_min_size_for_length(gmnalni->gmni_small_msg_size);
-
-       if (gmnal_alloc_srxd(gmnalni) != 0) {
-               CERROR("Failed to allocate small rx descriptors\n");
-               gmnal_free_txd(gmnalni);
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_close(gmnalni->gmni_port);
-               gm_finalize();
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
-               PORTAL_FREE(libnal, sizeof(lib_nal_t));
-               return(PTL_FAIL);
-       }
+        CDEBUG(D_NET,"gm_open succeeded port[%p]\n",gmnalni->gmni_port);
 
+       gmnalni->gmni_msg_size = offsetof(gmnal_msg_t,
+                                          gmm_u.immediate.gmim_payload[PTL_MTU]);
+        CWARN("Msg size %08x\n", gmnalni->gmni_msg_size);
 
-       /*
-        *      Hang out a bunch of small receive buffers
-        *      In fact hang them all out
-        */
-        for (srxd = gmnalni->gmni_srxd; srxd != NULL; srxd = srxd->rx_next) {
-               CDEBUG(D_NET, "giving [%p] to gm_provide_recvive_buffer\n", 
-                      srxd->rx_buffer);
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, 
-                                                  srxd->rx_buffer, 
-                                                   srxd->rx_gmsize, 
-                                                  GM_LOW_PRIORITY, 0);
-               spin_unlock(&gmnalni->gmni_gm_lock);
-       }
-       
-       /*
-        *      Allocate pools of small tx buffers and descriptors
-        */
-       if (gmnal_alloc_txd(gmnalni) != 0) {
-               CERROR("Failed to allocate small tx descriptors\n");
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_close(gmnalni->gmni_port);
-               gm_finalize();
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
-               PORTAL_FREE(libnal, sizeof(lib_nal_t));
-               return(PTL_FAIL);
+       if (gmnal_alloc_rxs(gmnalni) != 0) {
+               CERROR("Failed to allocate rx descriptors\n");
+                goto failed_3;
        }
 
-       /*
-        *      Initialise the portals library
-        */
-       CDEBUG(D_NET, "Getting node id\n");
-       spin_lock(&gmnalni->gmni_gm_lock);
-       gm_status = gm_get_node_id(gmnalni->gmni_port, &local_gmid);
-       spin_unlock(&gmnalni->gmni_gm_lock);
-       if (gm_status != GM_SUCCESS) {
-               gmnal_stop_rxthread(gmnalni);
-               gmnal_stop_ctthread(gmnalni);
-               CERROR("can't determine node id\n");
-               gmnal_free_txd(gmnalni);
-               gmnal_free_srxd(gmnalni);
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_close(gmnalni->gmni_port);
-               gm_finalize();
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
-               PORTAL_FREE(libnal, sizeof(lib_nal_t));
-               return(PTL_FAIL);
+       if (gmnal_alloc_txs(gmnalni) != 0) {
+               CERROR("Failed to allocate tx descriptors\n");
+                goto failed_3;
        }
 
-       gmnalni->gmni_local_gmid = local_gmid;
-       CDEBUG(D_NET, "Local node id is [%u]\n", local_gmid);
-
-       spin_lock(&gmnalni->gmni_gm_lock);
-       gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, 
-                                            local_gmid, 
-                                           &global_gmid);
-       spin_unlock(&gmnalni->gmni_gm_lock);
-       if (gm_status != GM_SUCCESS) {
-               CERROR("failed to obtain global id\n");
-               gmnal_stop_rxthread(gmnalni);
-               gmnal_stop_ctthread(gmnalni);
-               gmnal_free_txd(gmnalni);
-               gmnal_free_srxd(gmnalni);
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_close(gmnalni->gmni_port);
-               gm_finalize();
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
-               PORTAL_FREE(libnal, sizeof(lib_nal_t));
-               return(PTL_FAIL);
-       }
-       CDEBUG(D_NET, "Global node id is [%u]\n", global_gmid);
-       gmnalni->gmni_global_gmid = global_gmid;
-
-/*
-       pid = gm_getpid();
-*/
         process_id.pid = requested_pid;
-        process_id.nid = global_gmid;
+        process_id.nid = gmnal_get_local_nid(gmnalni);
+        if (process_id.nid == PTL_NID_ANY)
+                goto failed_3;
 
        CDEBUG(D_NET, "portals_pid is [%u]\n", process_id.pid);
        CDEBUG(D_NET, "portals_nid is ["LPU64"]\n", process_id.nid);
 
-       CDEBUG(D_PORTALS, "calling lib_init\n");
+       /*      Hang out a bunch of small receive buffers
+        *      In fact hang them all out */
+        for (rx = gmnalni->gmni_rx; rx != NULL; rx = rx->rx_next)
+                gmnal_post_rx(gmnalni, rx);
+
        if (lib_init(libnal, nal, process_id,
                      requested_limits, actual_limits) != PTL_OK) {
                CERROR("lib_init failed\n");
-               gmnal_stop_rxthread(gmnalni);
-               gmnal_stop_ctthread(gmnalni);
-               gmnal_free_txd(gmnalni);
-               gmnal_free_srxd(gmnalni);
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_close(gmnalni->gmni_port);
-               gm_finalize();
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
-               PORTAL_FREE(libnal, sizeof(lib_nal_t));
-               return(PTL_FAIL);
+                goto failed_3;
        }
 
-       /*
-        * Now that we have initialised the portals library, start receive threads,
-        * we do this to avoid processing messages before we can parse them
-        */
-       gmnal_start_kernel_threads(gmnalni);
+       /* Now that we have initialised the portals library, start receive
+        * threads, we do this to avoid processing messages before we can parse
+        * them */
+       rc = gmnal_start_kernel_threads(gmnalni);
+        if (rc != 0) {
+                CERROR("Can't start threads: %d\n", rc);
+                goto failed_3;
+        }
 
-       while (gmnalni->gmni_rxthread_flag != GMNAL_RXTHREADS_STARTED) {
-               gmnal_yield(1);
-               CDEBUG(D_NET, "Waiting for receive thread signs of life\n");
-       }
+        rc = libcfs_nal_cmd_register(GMNAL, &gmnal_cmd, libnal->libnal_data);
+       if (rc != 0) {
+               CDEBUG(D_NET, "libcfs_nal_cmd_register failed: %d\n", rc);
+                goto failed_4;
+        }
 
-       CDEBUG(D_NET, "receive thread seems to have started\n");
+       CDEBUG(D_NET, "gmnal_init finished\n");
+       return PTL_OK;
 
-       if (libcfs_nal_cmd_register(GMNAL, &gmnal_cmd, libnal->libnal_data) != 0) {
-               CDEBUG(D_NET, "libcfs_nal_cmd_register failed\n");
+ failed_4:
+       gmnal_stop_rxthread(gmnalni);
+       gmnal_stop_ctthread(gmnalni);
 
-                /* XXX these cleanup cases should be restructured to
-                 * minimise duplication... */
-                lib_fini(libnal);
-                
-               gmnal_stop_rxthread(gmnalni);
-               gmnal_stop_ctthread(gmnalni);
-               gmnal_free_txd(gmnalni);
-               gmnal_free_srxd(gmnalni);
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_close(gmnalni->gmni_port);
-               gm_finalize();
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
-               PORTAL_FREE(libnal, sizeof(lib_nal_t));
-               return(PTL_FAIL);
-        }
+ failed_3:
+        spin_lock(&gmnalni->gmni_gm_lock);
+        gm_close(gmnalni->gmni_port);
+        spin_unlock(&gmnalni->gmni_gm_lock);
 
-       CDEBUG(D_NET, "gmnal_init finished\n");
+ failed_2:
+        spin_lock(&gmnalni->gmni_gm_lock);
+        gm_finalize();
+        spin_unlock(&gmnalni->gmni_gm_lock);
+
+        /* safe to free buffers after network has been shut down */
+        gmnal_free_txs(gmnalni);
+        gmnal_free_rxs(gmnalni);
 
-       return(PTL_OK);
+ failed_1:
+        PORTAL_FREE(libnal, sizeof(*libnal));
+
+ failed_0:
+        PORTAL_FREE(gmnalni, sizeof(*gmnalni));
+
+        return PTL_FAIL;
 }
 
-nal_t the_gm_nal;
+ptl_handle_ni_t kgmnal_ni;
+nal_t           the_gm_nal;
 
 /* 
  *        Called when module loaded
index ac4c485..d7e7f5b 100644 (file)
 
 #include "gmnal.h"
 
-ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-                  unsigned int niov, struct iovec *iov, size_t offset,
-                  size_t mlen, size_t rlen)
+ptl_err_t 
+gmnal_cb_recv(lib_nal_t *libnal, void *private, 
+              lib_msg_t *libmsg,
+              unsigned int niov, struct iovec *iov, 
+              size_t offset, size_t mlen, size_t rlen)
 {
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
+       gmnal_rx_t      *rx = (gmnal_rx_t*)private;
+        gmnal_msg_t     *msg = rx->rx_msg;
         size_t           nobleft = mlen;
-        void            *buffer = NULL;
+        int              rxnob;
+        char            *buffer;
         size_t           nob;
 
-       CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
+       CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], libmsg[%p], "
               "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
-              libnal, private, cookie, niov, iov, offset, mlen, rlen);
+              libnal, private, libmsg, niov, iov, offset, mlen, rlen);
 
-       LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE);
+       LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
         
-        buffer = srxd->rx_buffer;
-        buffer += sizeof(gmnal_msghdr_t);
-        buffer += sizeof(ptl_hdr_t);
-
-        while(nobleft > 0) {
+        buffer = &msg->gmm_u.immediate.gmim_payload[0];
+        rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
+        
+        if (rx->rx_recv_nob < rxnob) {
+                CERROR("Short message from nid "LPD64": got %d, need %d\n",
+                       msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
+                return PTL_FAIL;
+        }
+        
+        while (nobleft > 0) {
                 LASSERT (niov > 0);
 
                 if (offset >= iov->iov_len) {
@@ -64,32 +73,40 @@ ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                 iov++;
         }
 
-        lib_finalize(libnal, private, cookie, PTL_OK);
+        lib_finalize(libnal, private, libmsg, PTL_OK);
        return PTL_OK;
 }
 
-ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
-                              lib_msg_t *cookie, unsigned int nkiov,
-                              ptl_kiov_t *kiov, size_t offset, size_t mlen,
-                              size_t rlen)
+ptl_err_t 
+gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, 
+                    lib_msg_t *libmsg, 
+                    unsigned int nkiov, ptl_kiov_t *kiov, 
+                    size_t offset, size_t mlen, size_t rlen)
 {
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
+       gmnal_rx_t      *rx = (gmnal_rx_t*)private;
+        gmnal_msg_t     *msg = rx->rx_msg;
         size_t           nobleft = mlen;
+        int              rxnob;
         size_t           nob;
        char            *ptr;
        void            *buffer;
 
        CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
-              "cookie[%p], kniov[%d], kiov [%p], "
+              "libmsg[%p], kniov[%d], kiov [%p], "
                "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
-              libnal, private, cookie, nkiov, kiov, offset, mlen, rlen);
+              libnal, private, libmsg, nkiov, kiov, offset, mlen, rlen);
 
-       LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE);
+       LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
 
-        buffer = srxd->rx_buffer;
-        buffer += sizeof(gmnal_msghdr_t);
-        buffer += sizeof(ptl_hdr_t);
+        buffer = &msg->gmm_u.immediate.gmim_payload[0];
+        rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
 
+        if (rx->rx_recv_nob < rxnob) {
+                CERROR("Short message from nid "LPD64": got %d, need %d\n",
+                       msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
+                return PTL_FAIL;
+        }
+        
         while (nobleft > 0) {
                 LASSERT (nkiov > 0);
 
@@ -113,24 +130,23 @@ ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
                 nkiov--;
        }
 
-        lib_finalize(libnal, private, cookie, PTL_OK);
-
+        lib_finalize(libnal, private, libmsg, PTL_OK);
        return PTL_OK;
 }
 
-
-ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                        unsigned int niov, struct iovec *iov, 
-                        size_t offset, size_t len)
+ptl_err_t
+gmnal_cb_send(lib_nal_t *libnal, void *private, 
+              lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, 
+              ptl_nid_t nid, ptl_pid_t pid,
+              unsigned int niov, struct iovec *iov, 
+              size_t offset, size_t len)
 {
 
        gmnal_ni_t      *gmnalni = libnal->libnal_data;
-       void            *buffer = NULL;
-       gmnal_stxd_t    *stxd = NULL;
         size_t           nobleft = len;
+       void            *buffer;
+       gmnal_tx_t      *tx;
         size_t           nob;
-        ptl_err_t        rc;
 
        CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] "
                "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid);
@@ -140,13 +156,13 @@ ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                 return PTL_FAIL;
         }
 
-        stxd = gmnal_get_stxd(gmnalni, 1);
-        CDEBUG(D_NET, "stxd [%p]\n", stxd);
+        tx = gmnal_get_tx(gmnalni, 1);
 
-        /* Set the offset of the data to copy into the buffer */
-        buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
+        gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
+        gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
 
-        while(nobleft > 0) {
+        buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
+        while (nobleft > 0) {
                 LASSERT (niov > 0);
                 
                 if (offset >= iov->iov_len) {
@@ -163,27 +179,24 @@ ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                 niov--;
                 iov++;
         }
-
-        rc = gmnal_small_tx(libnal, private, cookie, hdr, type, 
-                            nid, stxd,  len);
-        if (rc != PTL_OK)
-                gmnal_return_stxd(gmnalni, stxd);
-
-       return rc;
+        
+        nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
+        return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
 }
 
-ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
-                              lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
-                              ptl_nid_t nid, ptl_pid_t pid, unsigned int nkiov,
-                              ptl_kiov_t *kiov, size_t offset, size_t len)
+ptl_err_t
+gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
+                    lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
+                    ptl_nid_t nid, ptl_pid_t pid, 
+                    unsigned int nkiov, ptl_kiov_t *kiov, 
+                    size_t offset, size_t len)
 {
 
        gmnal_ni_t      *gmnalni = libnal->libnal_data;
-       void            *buffer = NULL;
-       gmnal_stxd_t    *stxd = NULL;
         size_t           nobleft = len;
+       void            *buffer;
+       gmnal_tx_t      *tx;
        char            *ptr;
-       ptl_err_t        rc;
         size_t           nob;
 
        CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
@@ -194,12 +207,12 @@ ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
                 return PTL_FAIL;
         }
 
-       stxd = gmnal_get_stxd(gmnalni, 1);
-       CDEBUG(D_NET, "stxd [%p]\n", stxd);
+       tx = gmnal_get_tx(gmnalni, 1);
 
-       /* Set the offset of the data to copy into the buffer */
-       buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
+        gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
+        gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
 
+       buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
         while (nobleft > 0) {
                 LASSERT (nkiov > 0);
 
@@ -223,16 +236,12 @@ ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
                 kiov++;
         }
 
-        rc = gmnal_small_tx(libnal, private, cookie, hdr, type, 
-                                nid, stxd, len);
-
-        if (rc != PTL_OK)
-                gmnal_return_stxd(gmnalni, stxd);
-        
-       return rc;
+        nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
+        return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
 }
 
-int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
+int
+gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
 {
        CDEBUG(D_TRACE, "gmnal_cb_dist\n");
 
index 4720099..b5213a5 100644 (file)
 
 #include "gmnal.h"
 
+void
+gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
+               ptl_nid_t dstnid, int type)
+{
+        gmnal_msg_t *msg = tx->tx_msg;
+
+        /* CAVEAT EMPTOR! this only sets the common message fields. */
+        msg->gmm_magic    = GMNAL_MSG_MAGIC;
+        msg->gmm_version  = GMNAL_MSG_VERSION;
+        msg->gmm_type     = type;
+        msg->gmm_srcnid   = gmnalni->gmni_libnal->libnal_ni.ni_pid.nid;
+        msg->gmm_dstnid   = dstnid;
+}
+
+int
+gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
+{
+        gmnal_msg_t *msg = rx->rx_msg;
+        const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);
+        int          flip;
+
+        /* 6 bytes are enough to have received magic + version */
+        if (rx->rx_recv_nob < 6) {
+                CERROR("Short message from gmid %u: %d\n", 
+                       rx->rx_recv_gmid, rx->rx_recv_nob);
+                return -EPROTO;
+        }
+
+        if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
+                flip = 0;
+        } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
+                flip = 1;
+        } else {
+                CERROR("Bad magic from gmid %u: %08x\n", 
+                       rx->rx_recv_gmid, msg->gmm_magic);
+                return -EPROTO;
+        }
+
+        if (msg->gmm_version != 
+            (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
+                CERROR("Bad version from gmid %u: %d\n", 
+                       rx->rx_recv_gmid, msg->gmm_version);
+                return -EPROTO;
+        }
+
+        if (rx->rx_recv_nob < hdr_size) {
+                CERROR("Short message from %u: %d\n",
+                       rx->rx_recv_gmid, rx->rx_recv_nob);
+                return -EPROTO;
+        }
+
+        if (flip) {
+                /* leave magic unflipped as a clue to peer endianness */
+                __swab16s(&msg->gmm_version);
+                __swab16s(&msg->gmm_type);
+                __swab64s(&msg->gmm_srcnid);
+                __swab64s(&msg->gmm_dstnid);
+        }
+        
+        if (msg->gmm_srcnid == PTL_NID_ANY) {
+                CERROR("Bad src nid from %u: "LPX64"\n", 
+                       rx->rx_recv_gmid, msg->gmm_srcnid);
+                return -EPROTO;
+        }
+
+        if (msg->gmm_dstnid != gmnalni->gmni_libnal->libnal_ni.ni_pid.nid) {
+                CERROR("Bad dst nid from %u: "LPX64"\n",
+                       rx->rx_recv_gmid, msg->gmm_dstnid);
+                return -EPROTO;
+        }
+        
+        switch (msg->gmm_type) {
+        default:
+                CERROR("Unknown message type from %u: %x\n", 
+                       rx->rx_recv_gmid, msg->gmm_type);
+                return -EPROTO;
+                
+        case GMNAL_MSG_IMMEDIATE:
+                if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
+                        CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n", 
+                               rx->rx_recv_gmid, rx->rx_recv_nob, 
+                               offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
+                        return -EPROTO;
+                }
+                break;
+        }
+        return 0;
+}
+
+
 /*
  *     The caretaker thread
  *     This is main thread of execution for the NAL side
 int
 gmnal_ct_thread(void *arg)
 {
-       gmnal_ni_t              *gmnalni;
+       gmnal_ni_t              *gmnalni = arg;
        gm_recv_event_t         *rxevent = NULL;
        gm_recv_t               *recv = NULL;
 
-       if (!arg) {
-               CDEBUG(D_NET, "NO gmnalni. Exiting\n");
-               return(-1);
-       }
-
-       gmnalni = (gmnal_ni_t*)arg;
-       CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
-
        sprintf(current->comm, "gmnal_ct");
-
        kportal_daemonize("gmnalctd");
 
        gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
 
-       spin_lock(&gmnalni->gmni_gm_lock);
        while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
-               CDEBUG(D_NET, "waiting\n");
+
+                spin_lock(&gmnalni->gmni_gm_lock);
                rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
+                spin_unlock(&gmnalni->gmni_gm_lock);
+
                if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
                        CDEBUG(D_NET, "time to exit\n");
                        break;
                }
-               CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent(rxevent));
-               switch (GM_RECV_EVENT_TYPE(rxevent)) {
-
-                       case(GM_RECV_EVENT):
-                               CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
-                               recv = (gm_recv_t*)&rxevent->recv;
-                               spin_unlock(&gmnalni->gmni_gm_lock);
-                               gmnal_add_rxtwe(gmnalni, recv);
-                               spin_lock(&gmnalni->gmni_gm_lock);
-                               CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
-                       break;
-                       case(_GM_SLEEP_EVENT):
-                               /*
-                                *      Blocking receive above just returns
-                                *      immediatly with _GM_SLEEP_EVENT
-                                *      Don't know what this is
-                                */
-                               CDEBUG(D_NET, "Sleeping in gm_unknown\n");
-                               spin_unlock(&gmnalni->gmni_gm_lock);
-                               gm_unknown(gmnalni->gmni_port, rxevent);
-                               spin_lock(&gmnalni->gmni_gm_lock);
-                               CDEBUG(D_NET, "Awake from gm_unknown\n");
-                               break;
-                               
-                       default:
-                               /*
-                                *      Don't know what this is
-                                *      gm_unknown will make sense of it
-                                *      Should be able to do something with
-                                *      FAST_RECV_EVENTS here.
-                                */
-                               CDEBUG(D_NET, "Passing event to gm_unknown\n");
-                               spin_unlock(&gmnalni->gmni_gm_lock);
-                               gm_unknown(gmnalni->gmni_port, rxevent);
-                               spin_lock(&gmnalni->gmni_gm_lock);
-                               CDEBUG(D_NET, "Processed unknown event\n");
+
+               CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent));
+
+               if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) {
+                        recv = (gm_recv_t*)&rxevent->recv;
+                        gmnal_enqueue_rx(gmnalni, recv);
+                } else {
+                        gm_unknown(gmnalni->gmni_port, rxevent);
                }
        }
-       spin_unlock(&gmnalni->gmni_gm_lock);
+
        gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
        CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
-
        return 0;
 }
 
@@ -114,19 +168,10 @@ gmnal_ct_thread(void *arg)
 int 
 gmnal_rx_thread(void *arg)
 {
-        char                     name[16];
-       gmnal_ni_t              *gmnalni;
-       void                    *buffer;
-       gmnal_rxtwe_t           *we = NULL;
-       int                     rank;
-
-       if (!arg) {
-               CDEBUG(D_NET, "NO gmnalni. Exiting\n");
-               return(-1);
-       }
-
-       gmnalni = (gmnal_ni_t*)arg;
-       CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
+       gmnal_ni_t    *gmnalni = arg;
+        char           name[16];
+       gmnal_rx_t    *rx;
+       int            rank;
 
        for (rank=0; rank<num_rx_threads; rank++)
                if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
@@ -144,385 +189,160 @@ gmnal_rx_thread(void *arg)
                gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
        else
                gmnalni->gmni_rxthread_flag = 1;
-       CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
        spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
 
        while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
                CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
-               we = gmnal_get_rxtwe(gmnalni);
-               if (!we) {
+
+               rx = gmnal_dequeue_rx(gmnalni);
+               if (rx == NULL) {
                        CDEBUG(D_NET, "Receive thread time to exit\n");
                        break;
                }
-
-               buffer = we->buffer;
-               switch(((gmnal_msghdr_t*)buffer)->gmm_type) {
-               case(GMNAL_SMALL_MESSAGE):
-                       gmnal_pre_receive(gmnalni, we, GMNAL_SMALL_MESSAGE);
-               break;
-               default:
-#warning better handling
-                       CERROR("Unsupported message type\n");
-                       gmnal_rx_bad(gmnalni, we);
-               }
-               PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
+                
+                /* We're connectionless: simply ignore packets on error */
+                
+                if (gmnal_unpack_msg(gmnalni, rx) == 0) {
+                        
+                        LASSERT (rx->rx_msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+                        (void)lib_parse(gmnalni->gmni_libnal, 
+                                        &rx->rx_msg->gmm_u.immediate.gmim_hdr,
+                                        rx);
+                }
+
+                gmnal_post_rx(gmnalni, rx);
        }
 
        spin_lock(&gmnalni->gmni_rxthread_flag_lock);
-       gmnalni->gmni_rxthread_flag/=2;
-       CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
+       gmnalni->gmni_rxthread_flag /= 2;
        spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-       CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
 
+       CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
        return 0;
 }
 
-
-
-/*
- *     Start processing a small message receive
- *     Get here from gmnal_receive_thread
- *     Hand off to lib_parse, which calls cb_recv
- *     which hands back to gmnal_small_receive
- *     Deal with all endian stuff here.
- */
 void
-gmnal_pre_receive(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we, int gmnal_type)
+gmnal_post_rx(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
 {
-       gmnal_srxd_t    *srxd = NULL;
-       void            *buffer = NULL;
-       gmnal_msghdr_t  *gmnal_msghdr;
-       ptl_hdr_t       *portals_hdr;
+       CDEBUG(D_NET, "requeueing rx[%p] gmnalni[%p]\n", rx, gmnalni);
 
-       CDEBUG(D_NET, "gmnalni [%p], we[%p] type [%d]\n",
-              gmnalni, we, gmnal_type);
-
-       buffer = we->buffer;
-
-       gmnal_msghdr = (gmnal_msghdr_t*)buffer;
-       portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t));
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, rx->rx_msg,
+                                           rx->rx_gmsize, GM_LOW_PRIORITY, 0 );
+       spin_unlock(&gmnalni->gmni_gm_lock);
+}
 
-       CDEBUG(D_NET, "rx_event:: Sender node [%d], Sender Port [%d], "
-              "type [%d], length [%d], buffer [%p]\n",
-               we->snode, we->sport, we->type, we->length, buffer);
-       CDEBUG(D_NET, "gmnal_msghdr:: Sender node [%u], magic [%d], "
-              "gmnal_type [%d]\n", gmnal_msghdr->gmm_sender_gmid,
-              gmnal_msghdr->gmm_magic, gmnal_msghdr->gmm_type);
-       CDEBUG(D_NET, "portals_hdr:: Sender node ["LPD64"], "
-              "dest_node ["LPD64"]\n", portals_hdr->src_nid,
-              portals_hdr->dest_nid);
+void 
+gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
+                              gm_status_t status)
+{
+       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
+       gmnal_ni_t      *gmnalni = tx->tx_gmni;
+       lib_msg_t       *libmsg = tx->tx_libmsg;
 
-       /*
-        *      Get a receive descriptor for this message
-        */
-       srxd = gmnal_rxbuffer_to_srxd(gmnalni, buffer);
-       CDEBUG(D_NET, "Back from gmnal_rxbuffer_to_srxd\n");
-       if (!srxd) {
-               CERROR("Failed to get receive descriptor\n");
-                LBUG();
-       }
+        CWARN("status for tx [%p] is [%d][%s]\n", 
+              tx, status, gmnal_gmstatus2str(status));
 
-       srxd->rx_gmni = gmnalni;
-       srxd->rx_type = gmnal_type;
-       srxd->rx_nsiov = gmnal_msghdr->gmm_niov;
-       srxd->rx_sender_gmid = gmnal_msghdr->gmm_sender_gmid;
+        gmnal_return_tx(gmnalni, tx);
+        lib_finalize(gmnalni->gmni_libnal, NULL, libmsg, PTL_FAIL);
+}
 
-       CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
-              buffer+sizeof(gmnal_msghdr_t));
+void 
+gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
+                          gm_status_t status)
+{
+       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
+       gmnal_ni_t      *gmnalni = tx->tx_gmni;
 
-       (void)lib_parse(gmnalni->gmni_libnal, portals_hdr, srxd);
-        /* Ignore error; we're connectionless */
+        CERROR("status for tx [%p] is [%d][%s]\n", 
+               tx, status, gmnal_gmstatus2str(status));
 
-        gmnal_rx_requeue_buffer(gmnalni, srxd);
+        gm_resume_sending(gmnalni->gmni_port, tx->tx_gm_priority,
+                          tx->tx_gmlid, gm_port_id,
+                          gmnal_resume_sending_callback, tx);
 }
 
-
-
-/*
- *     After a receive has been processed, 
- *     hang out the receive buffer again.
- *     This implicitly returns a receive token.
- */
-void
-gmnal_rx_requeue_buffer(gmnal_ni_t *gmnalni, gmnal_srxd_t *srxd)
+void 
+gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
 {
-       CDEBUG(D_NET, "requeueing srxd[%p] gmnalni[%p]\n", srxd, gmnalni);
-
-       spin_lock(&gmnalni->gmni_gm_lock);
-       gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, srxd->rx_buffer,
-                                           srxd->rx_gmsize, GM_LOW_PRIORITY, 0 );
-       spin_unlock(&gmnalni->gmni_gm_lock);
-}
+       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
+       gmnal_ni_t      *gmnalni = tx->tx_gmni;
+       lib_nal_t       *libnal = gmnalni->gmni_libnal;
+       lib_msg_t       *libmsg = tx->tx_libmsg;
+        ptl_err_t        rc;
 
+       if (!tx) {
+               CERROR("send completion event for unknown tx\n");
+               return;
+       }
 
-/*
- *     Handle a bad message
- *     A bad message is one we don't expect or can't interpret
- */
-void
-gmnal_rx_bad(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we)
-{
-        gmnal_srxd_t *srxd = gmnal_rxbuffer_to_srxd(gmnalni, 
-                                                    we->buffer);
-       if (srxd == NULL) {
-               CERROR("Can't find a descriptor for this buffer\n");
+       switch(status) {
+        case(GM_SUCCESS):
+                rc = PTL_OK;
+                break;
+
+        case(GM_SEND_DROPPED):
+                rc = PTL_FAIL;
+                break;
+                        
+        default:
+                CERROR("Error %d(%s), nid "LPD64"\n",
+                       status, gmnal_gmstatus2str(status), tx->tx_nid);
+
+                spin_lock(&gmnalni->gmni_gm_lock);
+                gm_drop_sends(gmnalni->gmni_port, tx->tx_gm_priority, 
+                              tx->tx_gmlid, gm_port_id, 
+                              gmnal_drop_sends_callback, tx);
+                spin_unlock(&gmnalni->gmni_gm_lock);
                return;
        }
 
-        gmnal_rx_requeue_buffer(gmnalni, srxd);
+       gmnal_return_tx(gmnalni, tx);
+       lib_finalize(libnal, NULL, libmsg, rc);
+       return;
 }
 
-
-
-/*
- *     Start a small transmit. 
- *     Use the given send token (and wired transmit buffer).
- *     Copy headers to wired buffer and initiate gm_send from the wired buffer.
- *     The callback function informs when the send is complete.
- */
 ptl_err_t
-gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-               ptl_hdr_t *hdr, int type, ptl_nid_t nid,
-               gmnal_stxd_t *stxd, int size)
+gmnal_post_tx (gmnal_ni_t *gmnalni, gmnal_tx_t *tx, 
+               lib_msg_t *libmsg, ptl_nid_t nid, int nob)
 {
-       gmnal_ni_t      *gmnalni = (gmnal_ni_t*)libnal->libnal_data;
-       void            *buffer = NULL;
-       gmnal_msghdr_t  *msghdr = NULL;
-       int             tot_size = 0;
-       gm_status_t     gm_status = GM_SUCCESS;
-
-       CDEBUG(D_NET, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
-              "hdr [%p] type [%d] nid ["LPU64"] stxd [%p] "
-              "size [%d]\n", libnal, private, cookie, hdr, type,
-              nid, stxd, size);
+        gm_status_t  gm_status;
 
-       CDEBUG(D_NET, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
-              hdr->dest_nid, hdr->src_nid);
+       CDEBUG(D_NET, "send %d bytes to "LPU64"\n", nob, nid);
 
         LASSERT ((nid >> 32) == 0);
-        LASSERT (gmnalni != NULL);
 
        spin_lock(&gmnalni->gmni_gm_lock);
        gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid, 
-                                            &stxd->tx_gmlid);
+                                            &tx->tx_gmlid);
        spin_unlock(&gmnalni->gmni_gm_lock);
 
        if (gm_status != GM_SUCCESS) {
                CERROR("Failed to obtain local id\n");
-               return(PTL_FAIL);
+                gmnal_return_tx(gmnalni, tx);
+               return PTL_FAIL;
        }
 
        CDEBUG(D_NET, "Local Node_id is [%u][%x]\n", 
-               stxd->tx_gmlid, stxd->tx_gmlid);
+               tx->tx_gmlid, tx->tx_gmlid);
 
-        stxd->tx_nid = nid;
-       stxd->tx_cookie = cookie;
-       stxd->tx_type = GMNAL_SMALL_MESSAGE;
-       stxd->tx_gm_priority = GM_LOW_PRIORITY;
-
-       /*
-        *      Copy gmnal_msg_hdr and portals header to the transmit buffer
-        *      Then send the message, as the data has previously been copied in
-        *      (HP SFS 1380).
-        */
-       buffer = stxd->tx_buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-
-       msghdr->gmm_magic = GMNAL_MAGIC;
-       msghdr->gmm_type = GMNAL_SMALL_MESSAGE;
-       msghdr->gmm_sender_gmid = gmnalni->gmni_global_gmid;
-       CDEBUG(D_NET, "processing msghdr at [%p]\n", buffer);
-
-       buffer += sizeof(gmnal_msghdr_t);
-
-       CDEBUG(D_NET, "processing  portals hdr at [%p]\n", buffer);
-       gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
-
-       buffer += sizeof(ptl_hdr_t);
-
-       CDEBUG(D_NET, "sending\n");
-       tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
-       stxd->tx_msg_size = tot_size;
+        tx->tx_nid = nid;
+       tx->tx_libmsg = libmsg;
+       tx->tx_gm_priority = GM_LOW_PRIORITY;
+       tx->tx_msg_size = nob;
 
        CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
               "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
-              "stxd [%p]\n", gmnalni->gmni_port, stxd->tx_buffer
-               stxd->tx_gm_size, stxd->tx_msg_size, nid, stxd->tx_gmlid
-               stxd);
+              "tx [%p]\n", gmnalni->gmni_port, tx->tx_msg
+               tx->tx_gm_size, tx->tx_msg_size
+               tx->tx_nid, tx->tx_gmlid, tx);
 
        spin_lock(&gmnalni->gmni_gm_lock);
-       gm_send_to_peer_with_callback(gmnalni->gmni_port, stxd->tx_buffer,
-                                     stxd->tx_gm_size, stxd->tx_msg_size,
-                                      stxd->tx_gm_priority, stxd->tx_gmlid,
-                                     gmnal_small_tx_callback, (void*)stxd);
+       gm_send_to_peer_with_callback(gmnalni->gmni_port, tx->tx_msg,
+                                     tx->tx_gm_size, tx->tx_msg_size,
+                                      tx->tx_gm_priority, tx->tx_gmlid,
+                                     gmnal_tx_callback, (void*)tx);
        spin_unlock(&gmnalni->gmni_gm_lock);
-       CDEBUG(D_NET, "done\n");
 
-       return(PTL_OK);
+       return PTL_OK;
 }
-
-
-/*
- *     A callback to indicate the small transmit operation is compete
- *     Check for erros and try to deal with them.
- *     Call lib_finalise to inform the client application that the send 
- *     is complete and the memory can be reused.
- *     Return the stxd when finished with it (returns a send token)
- */
-void 
-gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
-{
-       gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       lib_msg_t       *cookie = stxd->tx_cookie;
-       gmnal_ni_t      *gmnalni = stxd->tx_gmni;
-       lib_nal_t       *libnal = gmnalni->gmni_libnal;
-
-       if (!stxd) {
-               CDEBUG(D_NET, "send completion event for unknown stxd\n");
-               return;
-       }
-       if (status != GM_SUCCESS)
-               CERROR("Result of send stxd [%p] is [%s] to ["LPU64"]\n",
-                      stxd, gmnal_gm_error(status), stxd->tx_nid);
-
-       switch(status) {
-               case(GM_SUCCESS):
-               break;
-
-
-
-               case(GM_SEND_DROPPED):
-               /*
-                *      do a resend on the dropped ones
-                */
-                       CERROR("send stxd [%p] dropped, resending\n", context);
-                       spin_lock(&gmnalni->gmni_gm_lock);
-                       gm_send_to_peer_with_callback(gmnalni->gmni_port,
-                                                     stxd->tx_buffer,
-                                                     stxd->tx_gm_size,
-                                                     stxd->tx_msg_size,
-                                                     stxd->tx_gm_priority,
-                                                     stxd->tx_gmlid,
-                                                     gmnal_small_tx_callback,
-                                                     context);
-                       spin_unlock(&gmnalni->gmni_gm_lock);
-               return;
-               case(GM_TIMED_OUT):
-               case(GM_SEND_TIMED_OUT):
-               /*
-                *      drop these ones
-                */
-                       CDEBUG(D_NET, "calling gm_drop_sends\n");
-                       spin_lock(&gmnalni->gmni_gm_lock);
-                       gm_drop_sends(gmnalni->gmni_port, stxd->tx_gm_priority, 
-                                     stxd->tx_gmlid, gm_port_id, 
-                                     gmnal_drop_sends_callback, context);
-                       spin_unlock(&gmnalni->gmni_gm_lock);
-
-               return;
-
-
-               /*
-                *      abort on these ?
-                */
-               case(GM_TRY_AGAIN):
-               case(GM_INTERRUPTED):
-               case(GM_FAILURE):
-               case(GM_INPUT_BUFFER_TOO_SMALL):
-               case(GM_OUTPUT_BUFFER_TOO_SMALL):
-               case(GM_BUSY):
-               case(GM_MEMORY_FAULT):
-               case(GM_INVALID_PARAMETER):
-               case(GM_OUT_OF_MEMORY):
-               case(GM_INVALID_COMMAND):
-               case(GM_PERMISSION_DENIED):
-               case(GM_INTERNAL_ERROR):
-               case(GM_UNATTACHED):
-               case(GM_UNSUPPORTED_DEVICE):
-               case(GM_SEND_REJECTED):
-               case(GM_SEND_TARGET_PORT_CLOSED):
-               case(GM_SEND_TARGET_NODE_UNREACHABLE):
-               case(GM_SEND_PORT_CLOSED):
-               case(GM_NODE_ID_NOT_YET_SET):
-               case(GM_STILL_SHUTTING_DOWN):
-               case(GM_CLONE_BUSY):
-               case(GM_NO_SUCH_DEVICE):
-               case(GM_ABORTED):
-               case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
-               case(GM_UNTRANSLATED_SYSTEM_ERROR):
-               case(GM_ACCESS_DENIED):
-               case(GM_NO_DRIVER_SUPPORT):
-               case(GM_PTE_REF_CNT_OVERFLOW):
-               case(GM_NOT_SUPPORTED_IN_KERNEL):
-               case(GM_NOT_SUPPORTED_ON_ARCH):
-               case(GM_NO_MATCH):
-               case(GM_USER_ERROR):
-               case(GM_DATA_CORRUPTED):
-               case(GM_HARDWARE_FAULT):
-               case(GM_SEND_ORPHANED):
-               case(GM_MINOR_OVERFLOW):
-               case(GM_PAGE_TABLE_FULL):
-               case(GM_UC_ERROR):
-               case(GM_INVALID_PORT_NUMBER):
-               case(GM_DEV_NOT_FOUND):
-               case(GM_FIRMWARE_NOT_RUNNING):
-               case(GM_YP_NO_MATCH):
-               default:
-                gm_resume_sending(gmnalni->gmni_port, stxd->tx_gm_priority,
-                                  stxd->tx_gmlid, gm_port_id,
-                                  gmnal_resume_sending_callback, context);
-                return;
-
-       }
-
-       gmnal_return_stxd(gmnalni, stxd);
-       lib_finalize(libnal, stxd, cookie, PTL_OK);
-       return;
-}
-
-/*
- *     After an error on the port
- *     call this to allow future sends to complete
- */
-void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
-                                 gm_status_t status)
-{
-        gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-        gmnal_ni_t     *gmnalni = stxd->tx_gmni;
-
-        CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
-        gmnal_return_stxd(gmnalni, stxd);
-        lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
-        return;
-}
-
-
-void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
-                               gm_status_t status)
-{
-       gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       gmnal_ni_t      *gmnalni = stxd->tx_gmni;
-
-       CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
-       if (status == GM_SUCCESS) {
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_send_to_peer_with_callback(gm_port, stxd->tx_buffer, 
-                                             stxd->tx_gm_size, 
-                                              stxd->tx_msg_size, 
-                                             stxd->tx_gm_priority, 
-                                             stxd->tx_gmlid, 
-                                             gmnal_small_tx_callback, 
-                                             context);
-               spin_unlock(&gmnalni->gmni_gm_lock);
-       } else {
-               CERROR("send_to_peer status for stxd [%p] is "
-                      "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
-                /* Recycle the stxd */
-               gmnal_return_stxd(gmnalni, stxd);
-               lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
-       }
-
-       return;
-}
-
-
index 7a7a907..446f265 100644 (file)
  *      See start_kernel_threads
  */
 int num_rx_threads = -1;
-int num_stxds = 5;
+int num_txds = 5;
 int gm_port_id = 4;
 
-int
-gmnal_cmd(struct portals_cfg *pcfg, void *private)
-{
-       gmnal_ni_t      *gmnalni = NULL;
-       char            *name = NULL;
-       int             nid = -2;
-       int             gmid;
-       gm_status_t     gm_status;
-
-
-       CDEBUG(D_TRACE, "gmnal_cmd [%d] private [%p]\n",
-              pcfg->pcfg_command, private);
-       gmnalni = (gmnal_ni_t*)private;
-       switch(pcfg->pcfg_command) {
-       /*
-        * just reuse already defined GET_NID. Should define GMNAL version
-        */
-       case(GMNAL_IOC_GET_GNID):
-
-               PORTAL_ALLOC(name, pcfg->pcfg_plen1);
-               copy_from_user(name, PCFG_PBUF(pcfg, 1), pcfg->pcfg_plen1);
-
-               spin_lock(&gmnalni->gmni_gm_lock);
-               //nid = gm_host_name_to_node_id(gmnalni->gmni_port, name);
-                gm_status = gm_host_name_to_node_id_ex(gmnalni->gmni_port, 0,
-                                                       name, &nid);
-               spin_unlock(&gmnalni->gmni_gm_lock);
-                if (gm_status != GM_SUCCESS) {
-                        CDEBUG(D_NET, "gm_host_name_to_node_id_ex(...host %s) "
-                               "failed[%d]\n", name, gm_status);
-                        return (-1);
-                } else
-                       CDEBUG(D_NET, "Local node %s id is [%d]\n", name, nid);
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_status = gm_node_id_to_global_id(gmnalni->gmni_port,
-                                                   nid, &gmid);
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               if (gm_status != GM_SUCCESS) {
-                       CDEBUG(D_NET, "gm_node_id_to_global_id failed[%d]\n",
-                              gm_status);
-                       return(-1);
-               }
-               CDEBUG(D_NET, "Global node is is [%u][%x]\n", gmid, gmid);
-               copy_to_user(PCFG_PBUF(pcfg, 2), &gmid, pcfg->pcfg_plen2);
-       break;
-       default:
-               CDEBUG(D_NET, "gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command);
-               pcfg->pcfg_nid2 = -1;
-       }
-
-
-       return(0);
-}
-
-
 static int __init
 gmnal_load(void)
 {
@@ -117,11 +62,10 @@ gmnal_unload(void)
 
 
 module_init(gmnal_load);
-
 module_exit(gmnal_unload);
 
 MODULE_PARM(num_rx_threads, "i");
-MODULE_PARM(num_stxds, "i");
+MODULE_PARM(num_txds, "i");
 MODULE_PARM(gm_port_id, "i");
 
 MODULE_AUTHOR("Morgan Doyle");
index aee16fb..4a8ee6c 100644 (file)
@@ -38,51 +38,81 @@ gmnal_is_rxthread(gmnal_ni_t *gmnalni)
        return(0);
 }
 
+gmnal_tx_t *
+gmnal_alloc_tx (gmnal_ni_t *gmnalni) 
+{
+        gmnal_tx_t  *tx;
+        void        *buffer;
+        
+        PORTAL_ALLOC(tx, sizeof(*tx));
+        if (tx == NULL) {
+                CERROR ("Failed to allocate tx\n");
+                return NULL;
+        }
+        
+        spin_lock(&gmnalni->gmni_gm_lock);
+        buffer = gm_dma_malloc(gmnalni->gmni_port,
+                                 gmnalni->gmni_msg_size);
+        spin_unlock(&gmnalni->gmni_gm_lock);
+        if (buffer == NULL) {
+                CERROR("Failed to gm_dma_malloc tx buffer size [%d]\n", 
+                       gmnalni->gmni_msg_size);
+                PORTAL_FREE(tx, sizeof(*tx));
+                return NULL;
+        }
+
+        memset(tx, 0, sizeof(*tx));
+        tx->tx_msg = (gmnal_msg_t *)buffer;
+        tx->tx_buffer_size = gmnalni->gmni_msg_size;
+        tx->tx_gm_size = gm_min_size_for_length(tx->tx_buffer_size);
+        tx->tx_gmni = gmnalni;
+
+        CDEBUG(D_NET, "Created tx [%p] with buffer [%p], size [%d]\n", 
+               tx, tx->tx_msg, tx->tx_buffer_size);
+
+        return tx;
+}
+
+void
+gmnal_free_tx (gmnal_tx_t *tx)
+{
+        gmnal_ni_t *gmnalni = tx->tx_gmni;
+        
+        CDEBUG(D_NET, "Freeing tx [%p] with buffer [%p], size [%d]\n", 
+               tx, tx->tx_msg, tx->tx_buffer_size);
+#if 0
+        /* We free buffers after we've closed the GM port */
+        spin_lock(&gmnalni->gmni_gm_lock);
+        gm_dma_free(gmnalni->gmni_port, tx->tx_msg);
+        spin_unlock(&gmnalni->gmni_gm_lock);
+#endif
+        PORTAL_FREE(tx, sizeof(*tx));
+}
 
-/*
- *     Allocate tx descriptors/tokens (large and small)
- *     allocate a number of small tx buffers and register with GM
- *     so they are wired and set up for DMA. This is a costly operation.
- *     Also allocate a corrosponding descriptor to keep track of 
- *     the buffer.
- *     Put all small descriptors on singly linked list to be available to send 
- *     function.
- *     Allocate the rest of the available tx tokens for large messages. These will be
- *     used to do gm_gets in gmnal_copyiov     
- */
 int
-gmnal_alloc_txd(gmnal_ni_t *gmnalni)
+gmnal_alloc_txs(gmnal_ni_t *gmnalni)
 {
        int           ntx;
-        int           nstx;
-        int           nrxt_stx;
+        int           ntxcred;
+        int           nrxt_tx;
         int           i;
-       gmnal_stxd_t *txd;
-       void         *txbuffer;
+       gmnal_tx_t   *tx;
 
        CDEBUG(D_TRACE, "gmnal_alloc_small tx\n");
 
+       /* get total number of transmit tokens */
        spin_lock(&gmnalni->gmni_gm_lock);
-       /*
-        *      total number of transmit tokens
-        */
-       ntx = gm_num_send_tokens(gmnalni->gmni_port);
+       ntxcred = gm_num_send_tokens(gmnalni->gmni_port);
        spin_unlock(&gmnalni->gmni_gm_lock);
-       CDEBUG(D_NET, "total number of send tokens available is [%d]\n", ntx);
+       CDEBUG(D_NET, "total number of send tokens available is [%d]\n", 
+               ntxcred);
 
-       /*
-        *      allocate a number for small sends
-        *      num_stxds from gmnal_module.c
-        */
-       nstx = num_stxds;
-       /*
-        *      give the rest to the receive threads
-        */
-        nrxt_stx = num_stxds + 1;
+       ntx = num_txds;
+        nrxt_tx = num_txds + 1;
 
-        if (nstx + nrxt_stx > ntx) {
+        if (ntx + nrxt_tx > ntxcred) {
                 CERROR ("Asked for %d + %d tx credits, but only %d available\n",
-                        nstx, nrxt_stx, ntx);
+                        ntx, nrxt_tx, ntxcred);
                 return -ENOMEM;
         }
         
@@ -92,184 +122,133 @@ gmnal_alloc_txd(gmnal_ni_t *gmnalni)
         * someone returning a stxd will release the semaphore and wake you)
         * When token is obtained acquire the spinlock to manipulate the
         * list */
-       sema_init(&gmnalni->gmni_stxd_token, nstx);
-       spin_lock_init(&gmnalni->gmni_stxd_lock);
+       sema_init(&gmnalni->gmni_tx_token, ntx);
+       spin_lock_init(&gmnalni->gmni_tx_lock);
+        LASSERT (gmnalni->gmni_tx == NULL);
+
+       for (i = 0; i <= ntx; i++) {
+                tx = gmnal_alloc_tx(gmnalni);
+               if (tx == NULL) {
+                        CERROR("Failed to create tx %d\n", i);
+                        return -ENOMEM;
+                }
+                
+                tx->tx_rxt = 0;
+               tx->tx_next = gmnalni->gmni_tx;
+               gmnalni->gmni_tx = tx;
+       }
 
-       sema_init(&gmnalni->gmni_rxt_stxd_token, nrxt_stx);
-       spin_lock_init(&gmnalni->gmni_rxt_stxd_lock);
+       sema_init(&gmnalni->gmni_rxt_tx_token, nrxt_tx);
+       spin_lock_init(&gmnalni->gmni_rxt_tx_lock);
+        LASSERT (gmnalni->gmni_rxt_tx == NULL);
 
-       for (i=0; i<=nstx; i++) {
-               PORTAL_ALLOC(txd, sizeof(*txd));
-               if (txd == NULL) {
-                       CERROR("Failed to malloc txd [%d]\n", i);
-                       return -ENOMEM;
-               }
-               spin_lock(&gmnalni->gmni_gm_lock);
-               txbuffer = gm_dma_malloc(gmnalni->gmni_port,
-                                        gmnalni->gmni_small_msg_size);
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               if (txbuffer == NULL) {
-                       CERROR("Failed to gm_dma_malloc txbuffer [%d], "
-                              "size [%d]\n", i, gmnalni->gmni_small_msg_size);
-                       PORTAL_FREE(txd, sizeof(*txd));
-                       return -ENOMEM;
-               }
-               txd->tx_buffer = txbuffer;
-               txd->tx_buffer_size = gmnalni->gmni_small_msg_size;
-               txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size);
-               txd->tx_gmni = gmnalni;
-                txd->tx_rxt = 0;
-
-               txd->tx_next = gmnalni->gmni_stxd;
-               gmnalni->gmni_stxd = txd;
-               CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], "
-                      "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
-       }
+       for (i = 0; i <= nrxt_tx; i++) {
+                tx = gmnal_alloc_tx(gmnalni);
+               if (tx == NULL) {
+                        CERROR("Failed to create tx %d + %d\n", ntx, i);
+                        return -ENOMEM;
+                }
 
-       for (i=0; i<=nrxt_stx; i++) {
-               PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
-               if (!txd) {
-                       CERROR("Failed to malloc txd [%d]\n", i);
-                       return -ENOMEM;
-               }
-               spin_lock(&gmnalni->gmni_gm_lock);
-               txbuffer = gm_dma_malloc(gmnalni->gmni_port, 
-                                        gmnalni->gmni_small_msg_size);
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               if (!txbuffer) {
-                       CERROR("Failed to gm_dma_malloc txbuffer [%d],"
-                              " size [%d]\n",i, gmnalni->gmni_small_msg_size);
-                       PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
-                       return -ENOMEM;
-               }
-               txd->tx_buffer = txbuffer;
-               txd->tx_buffer_size = gmnalni->gmni_small_msg_size;
-               txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size);
-               txd->tx_gmni = gmnalni;
-                txd->tx_rxt = 1;
-
-               txd->tx_next = gmnalni->gmni_rxt_stxd;
-               gmnalni->gmni_rxt_stxd = txd;
-               CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], "
-                      "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
+                tx->tx_rxt = 1;
+               tx->tx_next = gmnalni->gmni_rxt_tx;
+               gmnalni->gmni_rxt_tx = tx;
        }
 
        return 0;
 }
 
-/*     Free the list of wired and gm_registered small tx buffers and 
- *     the tx descriptors that go along with them.
- */
 void
-gmnal_free_txd(gmnal_ni_t *gmnalni)
+gmnal_free_txs(gmnal_ni_t *gmnalni)
 {
-       gmnal_stxd_t *txd;
-        gmnal_stxd_t *_txd;
-
-       CDEBUG(D_TRACE, "gmnal_free_small tx\n");
+       gmnal_tx_t *tx;
 
-        txd = gmnalni->gmni_stxd;
-       while(txd != NULL) {
-               CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], "
-                      "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
-               _txd = txd;
-               txd = txd->tx_next;
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer);
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
+        while ((tx = gmnalni->gmni_tx) != NULL) {
+                gmnalni->gmni_tx = tx->tx_next;
+                gmnal_free_tx (tx);
        }
 
-        txd = gmnalni->gmni_rxt_stxd;
-       while(txd) {
-               CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], "
-                      "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
-               _txd = txd;
-               txd = txd->tx_next;
-               spin_lock(&gmnalni->gmni_gm_lock);
-               gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer);
-               spin_unlock(&gmnalni->gmni_gm_lock);
-               PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
+        while ((tx = gmnalni->gmni_rxt_tx) != NULL) {
+                gmnalni->gmni_rxt_tx = tx->tx_next;
+                gmnal_free_tx (tx);
        }
 }
 
 
 /*
- *     Get a txd from the list
+ *     Get a tx from the list
  *     This get us a wired and gm_registered small tx buffer.
  *     This implicitly gets us a send token also.
  */
-gmnal_stxd_t *
-gmnal_get_stxd(gmnal_ni_t *gmnalni, int block)
+gmnal_tx_t *
+gmnal_get_tx(gmnal_ni_t *gmnalni, int block)
 {
 
-       gmnal_stxd_t    *txd = NULL;
+       gmnal_tx_t      *tx = NULL;
        pid_t           pid = current->pid;
 
 
-       CDEBUG(D_TRACE, "gmnal_get_stxd gmnalni [%p] block[%d] pid [%d]\n", 
+       CDEBUG(D_TRACE, "gmnal_get_tx gmnalni [%p] block[%d] pid [%d]\n", 
               gmnalni, block, pid);
 
        if (gmnal_is_rxthread(gmnalni)) {
                 CDEBUG(D_NET, "RXTHREAD Attempting to get token\n");
-               down(&gmnalni->gmni_rxt_stxd_token);
-               spin_lock(&gmnalni->gmni_rxt_stxd_lock);
-               txd = gmnalni->gmni_rxt_stxd;
-               gmnalni->gmni_rxt_stxd = txd->tx_next;
-               spin_unlock(&gmnalni->gmni_rxt_stxd_lock);
+               down(&gmnalni->gmni_rxt_tx_token);
+               spin_lock(&gmnalni->gmni_rxt_tx_lock);
+               tx = gmnalni->gmni_rxt_tx;
+               gmnalni->gmni_rxt_tx = tx->tx_next;
+               spin_unlock(&gmnalni->gmni_rxt_tx_lock);
                CDEBUG(D_NET, "RXTHREAD got [%p], head is [%p]\n", 
-                      txd, gmnalni->gmni_rxt_stxd);
-                txd->tx_kniov = 0;
-                txd->tx_rxt = 1;
+                      tx, gmnalni->gmni_rxt_tx);
+                tx->tx_rxt = 1;
         } else {
                if (block) {
                         CDEBUG(D_NET, "Attempting to get token\n");
-                       down(&gmnalni->gmni_stxd_token);
+                       down(&gmnalni->gmni_tx_token);
                         CDEBUG(D_PORTALS, "Got token\n");
                } else {
-                       if (down_trylock(&gmnalni->gmni_stxd_token)) {
+                       if (down_trylock(&gmnalni->gmni_tx_token)) {
                                CERROR("can't get token\n");
                                return(NULL);
                        }
                }
-               spin_lock(&gmnalni->gmni_stxd_lock);
-               txd = gmnalni->gmni_stxd;
-               gmnalni->gmni_stxd = txd->tx_next;
-               spin_unlock(&gmnalni->gmni_stxd_lock);
-               CDEBUG(D_NET, "got [%p], head is [%p]\n", txd,
-                      gmnalni->gmni_stxd);
-                txd->tx_kniov = 0;
-        }       /* general txd get */
-       return(txd);
+               spin_lock(&gmnalni->gmni_tx_lock);
+               tx = gmnalni->gmni_tx;
+               gmnalni->gmni_tx = tx->tx_next;
+               spin_unlock(&gmnalni->gmni_tx_lock);
+               CDEBUG(D_NET, "got [%p], head is [%p]\n", tx,
+                      gmnalni->gmni_tx);
+        }       /* general tx get */
+
+       return tx;
 }
 
 /*
- *     Return a txd to the list
+ *     Return a tx to the list
  */
 void
-gmnal_return_stxd(gmnal_ni_t *gmnalni, gmnal_stxd_t *txd)
+gmnal_return_tx(gmnal_ni_t *gmnalni, gmnal_tx_t *tx)
 {
-       CDEBUG(D_TRACE, "gmnalni [%p], txd[%p] rxt[%d]\n", gmnalni,
-              txd, txd->tx_rxt);
+       CDEBUG(D_TRACE, "gmnalni [%p], tx[%p] rxt[%d]\n", gmnalni,
+              tx, tx->tx_rxt);
 
         /*
          *      this transmit descriptor is 
          *      for the rxthread
          */
-        if (txd->tx_rxt) {
-               spin_lock(&gmnalni->gmni_rxt_stxd_lock);
-               txd->tx_next = gmnalni->gmni_rxt_stxd;
-               gmnalni->gmni_rxt_stxd = txd;
-               spin_unlock(&gmnalni->gmni_rxt_stxd_lock);
-               up(&gmnalni->gmni_rxt_stxd_token);
-                CDEBUG(D_NET, "Returned stxd to rxthread list\n");
+        if (tx->tx_rxt) {
+               spin_lock(&gmnalni->gmni_rxt_tx_lock);
+               tx->tx_next = gmnalni->gmni_rxt_tx;
+               gmnalni->gmni_rxt_tx = tx;
+               spin_unlock(&gmnalni->gmni_rxt_tx_lock);
+               up(&gmnalni->gmni_rxt_tx_token);
+                CDEBUG(D_NET, "Returned tx to rxthread list\n");
         } else {
-               spin_lock(&gmnalni->gmni_stxd_lock);
-               txd->tx_next = gmnalni->gmni_stxd;
-               gmnalni->gmni_stxd = txd;
-               spin_unlock(&gmnalni->gmni_stxd_lock);
-               up(&gmnalni->gmni_stxd_token);
-                CDEBUG(D_NET, "Returned stxd to general list\n");
+               spin_lock(&gmnalni->gmni_tx_lock);
+               tx->tx_next = gmnalni->gmni_tx;
+               gmnalni->gmni_tx = tx;
+               spin_unlock(&gmnalni->gmni_tx_lock);
+               up(&gmnalni->gmni_tx_token);
+                CDEBUG(D_NET, "Returned tx to general list\n");
         }
        return;
 }
@@ -284,561 +263,155 @@ gmnal_return_stxd(gmnal_ni_t *gmnalni, gmnal_stxd_t *txd)
  *     receive thread.
  */
 int
-gmnal_alloc_srxd(gmnal_ni_t *gmnalni)
+gmnal_alloc_rxs (gmnal_ni_t *gmnalni)
 {
-       int nrx = 0, nsrx = 0, i = 0;
-       gmnal_srxd_t    *rxd = NULL;
-       void    *rxbuffer = NULL;
+        int          nrxcred;
+        int          nrx;
+        int          i;
+       gmnal_rx_t  *rxd;
+       void        *rxbuffer;
 
        CDEBUG(D_TRACE, "gmnal_alloc_small rx\n");
 
        spin_lock(&gmnalni->gmni_gm_lock);
-       nrx = gm_num_receive_tokens(gmnalni->gmni_port);
+       nrxcred = gm_num_receive_tokens(gmnalni->gmni_port);
        spin_unlock(&gmnalni->gmni_gm_lock);
        CDEBUG(D_NET, "total number of receive tokens available is [%d]\n",
-              nrx);
+              nrxcred);
 
-       nsrx = nrx/2;
-       nsrx = 12;
-       /*
-        *      make the number of rxds twice our total
-        *      number of stxds plus 1
-        */
-       nsrx = num_stxds*2 + 2;
-
-       CDEBUG(D_NET, "Allocated [%d] receive tokens to small messages\n",
-              nsrx);
+       nrx = num_txds*2 + 2;
+        if (nrx > nrxcred) {
+                CERROR("Can't allocate %d rx credits: (%d available)\n",
+                       nrx, nrxcred);
+                return -ENOMEM;
+        }
 
+       CDEBUG(D_NET, "Allocated [%d] receive tokens to small messages\n", nrx);
 
        spin_lock(&gmnalni->gmni_gm_lock);
-       gmnalni->gmni_srxd_hash = gm_create_hash(gm_hash_compare_ptrs, 
-                                                  gm_hash_hash_ptr, 0, 0, nsrx, 0);
+       gmnalni->gmni_rx_hash = gm_create_hash(gm_hash_compare_ptrs, 
+                                                  gm_hash_hash_ptr, 0, 0, nrx, 0);
        spin_unlock(&gmnalni->gmni_gm_lock);
-       if (!gmnalni->gmni_srxd_hash) {
-                       CERROR("Failed to create hash table\n");
-                       return -ENOMEM;
+       if (gmnalni->gmni_rx_hash == NULL) {
+                CERROR("Failed to create hash table\n");
+                return -ENOMEM;
        }
 
-       for (i=0; i<=nsrx; i++) {
-               PORTAL_ALLOC(rxd, sizeof(gmnal_srxd_t));
-               if (!rxd) {
+        LASSERT (gmnalni->gmni_rx == NULL);
+
+       for (i=0; i <= nrx; i++) {
+
+               PORTAL_ALLOC(rxd, sizeof(*rxd));
+               if (rxd == NULL) {
                        CERROR("Failed to malloc rxd [%d]\n", i);
                        return -ENOMEM;
                }
 
                spin_lock(&gmnalni->gmni_gm_lock);
                rxbuffer = gm_dma_malloc(gmnalni->gmni_port, 
-                                        gmnalni->gmni_small_msg_size);
+                                        gmnalni->gmni_msg_size);
                spin_unlock(&gmnalni->gmni_gm_lock);
-               if (!rxbuffer) {
+               if (rxbuffer == NULL) {
                        CERROR("Failed to gm_dma_malloc rxbuffer [%d], "
-                              "size [%d]\n",i ,gmnalni->gmni_small_msg_size);
-                       PORTAL_FREE(rxd, sizeof(gmnal_srxd_t));
+                              "size [%d]\n",i ,gmnalni->gmni_msg_size);
+                       PORTAL_FREE(rxd, sizeof(*rxd));
                        return -ENOMEM;
                }
 
-               rxd->rx_buffer = rxbuffer;
-               rxd->rx_size = gmnalni->gmni_small_msg_size;
+               rxd->rx_msg = (gmnal_msg_t *)rxbuffer;
+               rxd->rx_size = gmnalni->gmni_msg_size;
                rxd->rx_gmsize = gm_min_size_for_length(rxd->rx_size);
 
-               if (gm_hash_insert(gmnalni->gmni_srxd_hash,
-                                  (void*)rxbuffer, (void*)rxd)) {
+               rxd->rx_next = gmnalni->gmni_rx;
+               gmnalni->gmni_rx = rxd;
 
+               if (gm_hash_insert(gmnalni->gmni_rx_hash,
+                                  (void*)rxbuffer, (void*)rxd)) {
                        CERROR("failed to create hash entry rxd[%p] "
                               "for rxbuffer[%p]\n", rxd, rxbuffer);
                        return -ENOMEM;
                }
 
-               rxd->rx_next = gmnalni->gmni_srxd;
-               gmnalni->gmni_srxd = rxd;
                CDEBUG(D_NET, "Registered rxd [%p] with buffer [%p], "
-                      "size [%d]\n", rxd, rxd->rx_buffer, rxd->rx_size);
+                      "size [%d]\n", rxd, rxd->rx_msg, rxd->rx_size);
        }
 
        return 0;
 }
 
-
-
-/*     Free the list of wired and gm_registered small rx buffers and the 
- *     rx descriptors that go along with them.
- */
 void
-gmnal_free_srxd(gmnal_ni_t *gmnalni)
+gmnal_free_rxs(gmnal_ni_t *gmnalni)
 {
-       gmnal_srxd_t *rxd = gmnalni->gmni_srxd, *_rxd = NULL;
+       gmnal_rx_t *rx;
 
        CDEBUG(D_TRACE, "gmnal_free_small rx\n");
 
-       while(rxd) {
-               CDEBUG(D_NET, "Freeing rxd [%p] buffer [%p], size [%d]\n",
-                      rxd, rxd->rx_buffer, rxd->rx_size);
-               _rxd = rxd;
-               rxd = rxd->rx_next;
+       while ((rx = gmnalni->gmni_rx) != NULL) {
+                gmnalni->gmni_rx = rx->rx_next;
 
+               CDEBUG(D_NET, "Freeing rxd [%p] buffer [%p], size [%d]\n",
+                      rx, rx->rx_msg, rx->rx_size);
+#if 0
+                /* We free buffers after we've shutdown the GM port */
                spin_lock(&gmnalni->gmni_gm_lock);
-               gm_dma_free(gmnalni->gmni_port, _rxd->rx_buffer);
+               gm_dma_free(gmnalni->gmni_port, _rxd->rx_msg);
                spin_unlock(&gmnalni->gmni_gm_lock);
-
-               PORTAL_FREE(_rxd, sizeof(gmnal_srxd_t));
+#endif
+               PORTAL_FREE(rx, sizeof(*rx));
        }
-       return;
-}
 
-
-/*
- *     Given a pointer to a srxd find 
- *     the relevant descriptor for it
- *     This is done by searching a hash
- *     list that is created when the srxd's 
- *     are created
- */
-gmnal_srxd_t *
-gmnal_rxbuffer_to_srxd(gmnal_ni_t *gmnalni, void *rxbuffer)
-{
-       gmnal_srxd_t    *srxd = NULL;
-       CDEBUG(D_TRACE, "gmnalni [%p], rxbuffer [%p]\n", gmnalni, rxbuffer);
-       srxd = gm_hash_find(gmnalni->gmni_srxd_hash, rxbuffer);
-       CDEBUG(D_NET, "srxd is [%p]\n", srxd);
-       return(srxd);
+#if 0
+        /* see above */
+        if (gmnalni->gmni_rx_hash != NULL) {
+                spin_lock(&gmnalni->gmni_gm_lock);
+                gm_destroy_hash(gmnalni->gmni_rx_hash);
+                spin_unlock(&gmnalni->gmni_gm_lock);
+        }
+#endif
 }
 
-
 void
 gmnal_stop_rxthread(gmnal_ni_t *gmnalni)
 {
-       int     delay = 30;
-
-
-
-       CDEBUG(D_TRACE, "Attempting to stop rxthread gmnalni [%p]\n", 
-               gmnalni);
+       int     count = 2;
+        int     i;
        
        gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_STOP;
 
-       gmnal_remove_rxtwe(gmnalni);
-       /*
-        *      kick the thread 
-        */
-       up(&gmnalni->gmni_rxtwe_wait);
+        for (i = 0; i < num_rx_threads; i++)
+                up(&gmnalni->gmni_rxq_wait);
 
-       while(gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET && delay--) {
+       while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
                CDEBUG(D_NET, "gmnal_stop_rxthread sleeping\n");
                 gmnal_yield(1);
-               up(&gmnalni->gmni_rxtwe_wait);
-       }
 
-       if (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
-               CERROR("I don't know how to wake the thread\n");
-       } else {
-               CDEBUG(D_NET, "rx thread seems to have stopped\n");
+                count++;
+                if ((count & (count - 1)) == 0)
+                        CWARN("Waiting for rxthreads to stop\n");
        }
 }
 
 void
 gmnal_stop_ctthread(gmnal_ni_t *gmnalni)
 {
-       int     delay = 15;
-
+        int count = 2;
 
-
-       CDEBUG(D_TRACE, "Attempting to stop ctthread gmnalni [%p]\n", 
-              gmnalni);
-       
        gmnalni->gmni_ctthread_flag = GMNAL_THREAD_STOP;
+
        spin_lock(&gmnalni->gmni_gm_lock);
        gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 10, 
                     NULL, NULL);
        spin_unlock(&gmnalni->gmni_gm_lock);
 
-       while(gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP && delay--) {
+       while (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
                CDEBUG(D_NET, "gmnal_stop_ctthread sleeping\n");
                 gmnal_yield(1);
-       }
-
-       if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
-               CERROR("I DON'T KNOW HOW TO WAKE THE THREAD\n");
-       } else {
-               CDEBUG(D_NET, "CT THREAD SEEMS TO HAVE STOPPED\n");
+                count++;
+                if ((count & (count - 1)) == 0)
+                        CWARN("Waiting for ctthread to stop\n");
        }
 }
 
-
-
-char * 
-gmnal_gm_error(gm_status_t status)
-{
-       return(gm_strerror(status));
-
-       switch(status) {
-               case(GM_SUCCESS):
-                       return("SUCCESS");
-               case(GM_FAILURE):
-                       return("FAILURE");
-               case(GM_INPUT_BUFFER_TOO_SMALL):
-                       return("INPUT_BUFFER_TOO_SMALL");
-               case(GM_OUTPUT_BUFFER_TOO_SMALL):
-                       return("OUTPUT_BUFFER_TOO_SMALL");
-               case(GM_TRY_AGAIN ):
-                       return("TRY_AGAIN");
-               case(GM_BUSY):
-                       return("BUSY");
-               case(GM_MEMORY_FAULT):
-                       return("MEMORY_FAULT");
-               case(GM_INTERRUPTED):
-                       return("INTERRUPTED");
-               case(GM_INVALID_PARAMETER):
-                       return("INVALID_PARAMETER");
-               case(GM_OUT_OF_MEMORY):
-                       return("OUT_OF_MEMORY");
-               case(GM_INVALID_COMMAND):
-                       return("INVALID_COMMAND");
-               case(GM_PERMISSION_DENIED):
-                       return("PERMISSION_DENIED");
-               case(GM_INTERNAL_ERROR):
-                       return("INTERNAL_ERROR");
-               case(GM_UNATTACHED):
-                       return("UNATTACHED");
-               case(GM_UNSUPPORTED_DEVICE):
-                       return("UNSUPPORTED_DEVICE");
-               case(GM_SEND_TIMED_OUT):
-                       return("GM_SEND_TIMEDOUT");
-               case(GM_SEND_REJECTED):
-                       return("GM_SEND_REJECTED");
-               case(GM_SEND_TARGET_PORT_CLOSED):
-                       return("GM_SEND_TARGET_PORT_CLOSED");
-               case(GM_SEND_TARGET_NODE_UNREACHABLE):
-                       return("GM_SEND_TARGET_NODE_UNREACHABLE");
-               case(GM_SEND_DROPPED):
-                       return("GM_SEND_DROPPED");
-               case(GM_SEND_PORT_CLOSED):
-                       return("GM_SEND_PORT_CLOSED");
-               case(GM_NODE_ID_NOT_YET_SET):
-                       return("GM_NODE_ID_NOT_YET_SET");
-               case(GM_STILL_SHUTTING_DOWN):
-                       return("GM_STILL_SHUTTING_DOWN");
-               case(GM_CLONE_BUSY):
-                       return("GM_CLONE_BUSY");
-               case(GM_NO_SUCH_DEVICE):
-                       return("GM_NO_SUCH_DEVICE");
-               case(GM_ABORTED):
-                       return("GM_ABORTED");
-               case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
-                       return("GM_INCOMPATIBLE_LIB_AND_DRIVER");
-               case(GM_UNTRANSLATED_SYSTEM_ERROR):
-                       return("GM_UNTRANSLATED_SYSTEM_ERROR");
-               case(GM_ACCESS_DENIED):
-                       return("GM_ACCESS_DENIED");
-
-
-/*
- *     These ones are in the docs but aren't in the header file 
-               case(GM_DEV_NOT_FOUND):
-                       return("GM_DEV_NOT_FOUND");
-               case(GM_INVALID_PORT_NUMBER):
-                       return("GM_INVALID_PORT_NUMBER");
-               case(GM_UC_ERROR):
-                       return("GM_US_ERROR");
-               case(GM_PAGE_TABLE_FULL):
-                       return("GM_PAGE_TABLE_FULL");
-               case(GM_MINOR_OVERFLOW):
-                       return("GM_MINOR_OVERFLOW");
-               case(GM_SEND_ORPHANED):
-                       return("GM_SEND_ORPHANED");
-               case(GM_HARDWARE_FAULT):
-                       return("GM_HARDWARE_FAULT");
-               case(GM_DATA_CORRUPTED):
-                       return("GM_DATA_CORRUPTED");
-               case(GM_TIMED_OUT):
-                       return("GM_TIMED_OUT");
-               case(GM_USER_ERROR):
-                       return("GM_USER_ERROR");
-               case(GM_NO_MATCH):
-                       return("GM_NOMATCH");
-               case(GM_NOT_SUPPORTED_IN_KERNEL):
-                       return("GM_NOT_SUPPORTED_IN_KERNEL");
-               case(GM_NOT_SUPPORTED_ON_ARCH):
-                       return("GM_NOT_SUPPORTED_ON_ARCH");
-               case(GM_PTE_REF_CNT_OVERFLOW):
-                       return("GM_PTR_REF_CNT_OVERFLOW");
-               case(GM_NO_DRIVER_SUPPORT):
-                       return("GM_NO_DRIVER_SUPPORT");
-               case(GM_FIRMWARE_NOT_RUNNING):
-                       return("GM_FIRMWARE_NOT_RUNNING");
-
- *     These ones are in the docs but aren't in the header file 
- */
-               default:
-                       return("UNKNOWN GM ERROR CODE");
-       }
-}
-
-
-char *
-gmnal_rxevent(gm_recv_event_t  *ev)
-{
-       short   event;
-       event = GM_RECV_EVENT_TYPE(ev);
-       switch(event) {
-               case(GM_NO_RECV_EVENT):
-                       return("GM_NO_RECV_EVENT");
-               case(GM_SENDS_FAILED_EVENT):
-                       return("GM_SEND_FAILED_EVENT");
-               case(GM_ALARM_EVENT):
-                       return("GM_ALARM_EVENT");
-               case(GM_SENT_EVENT):
-                       return("GM_SENT_EVENT");
-               case(_GM_SLEEP_EVENT):
-                       return("_GM_SLEEP_EVENT");
-               case(GM_RAW_RECV_EVENT):
-                       return("GM_RAW_RECV_EVENT");
-               case(GM_BAD_SEND_DETECTED_EVENT):
-                       return("GM_BAD_SEND_DETECTED_EVENT");
-               case(GM_SEND_TOKEN_VIOLATION_EVENT):
-                       return("GM_SEND_TOKEN_VIOLATION_EVENT");
-               case(GM_RECV_TOKEN_VIOLATION_EVENT):
-                       return("GM_RECV_TOKEN_VIOLATION_EVENT");
-               case(GM_BAD_RECV_TOKEN_EVENT):
-                       return("GM_BAD_RECV_TOKEN_EVENT");
-               case(GM_ALARM_VIOLATION_EVENT):
-                       return("GM_ALARM_VIOLATION_EVENT");
-               case(GM_RECV_EVENT):
-                       return("GM_RECV_EVENT");
-               case(GM_HIGH_RECV_EVENT):
-                       return("GM_HIGH_RECV_EVENT");
-               case(GM_PEER_RECV_EVENT):
-                       return("GM_PEER_RECV_EVENT");
-               case(GM_HIGH_PEER_RECV_EVENT):
-                       return("GM_HIGH_PEER_RECV_EVENT");
-               case(GM_FAST_RECV_EVENT):
-                       return("GM_FAST_RECV_EVENT");
-               case(GM_FAST_HIGH_RECV_EVENT):
-                       return("GM_FAST_HIGH_RECV_EVENT");
-               case(GM_FAST_PEER_RECV_EVENT):
-                       return("GM_FAST_PEER_RECV_EVENT");
-               case(GM_FAST_HIGH_PEER_RECV_EVENT):
-                       return("GM_FAST_HIGH_PEER_RECV_EVENT");
-               case(GM_REJECTED_SEND_EVENT):
-                       return("GM_REJECTED_SEND_EVENT");
-               case(GM_ORPHANED_SEND_EVENT):
-                       return("GM_ORPHANED_SEND_EVENT");
-               case(GM_BAD_RESEND_DETECTED_EVENT):
-                       return("GM_BAD_RESEND_DETETED_EVENT");
-               case(GM_DROPPED_SEND_EVENT):
-                       return("GM_DROPPED_SEND_EVENT");
-               case(GM_BAD_SEND_VMA_EVENT):
-                       return("GM_BAD_SEND_VMA_EVENT");
-               case(GM_BAD_RECV_VMA_EVENT):
-                       return("GM_BAD_RECV_VMA_EVENT");
-               case(_GM_FLUSHED_ALARM_EVENT):
-                       return("GM_FLUSHED_ALARM_EVENT");
-               case(GM_SENT_TOKENS_EVENT):
-                       return("GM_SENT_TOKENS_EVENTS");
-               case(GM_IGNORE_RECV_EVENT):
-                       return("GM_IGNORE_RECV_EVENT");
-               case(GM_ETHERNET_RECV_EVENT):
-                       return("GM_ETHERNET_RECV_EVENT");
-               case(GM_NEW_NO_RECV_EVENT):
-                       return("GM_NEW_NO_RECV_EVENT");
-               case(GM_NEW_SENDS_FAILED_EVENT):
-                       return("GM_NEW_SENDS_FAILED_EVENT");
-               case(GM_NEW_ALARM_EVENT):
-                       return("GM_NEW_ALARM_EVENT");
-               case(GM_NEW_SENT_EVENT):
-                       return("GM_NEW_SENT_EVENT");
-               case(_GM_NEW_SLEEP_EVENT):
-                       return("GM_NEW_SLEEP_EVENT");
-               case(GM_NEW_RAW_RECV_EVENT):
-                       return("GM_NEW_RAW_RECV_EVENT");
-               case(GM_NEW_BAD_SEND_DETECTED_EVENT):
-                       return("GM_NEW_BAD_SEND_DETECTED_EVENT");
-               case(GM_NEW_SEND_TOKEN_VIOLATION_EVENT):
-                       return("GM_NEW_SEND_TOKEN_VIOLATION_EVENT");
-               case(GM_NEW_RECV_TOKEN_VIOLATION_EVENT):
-                       return("GM_NEW_RECV_TOKEN_VIOLATION_EVENT");
-               case(GM_NEW_BAD_RECV_TOKEN_EVENT):
-                       return("GM_NEW_BAD_RECV_TOKEN_EVENT");
-               case(GM_NEW_ALARM_VIOLATION_EVENT):
-                       return("GM_NEW_ALARM_VIOLATION_EVENT");
-               case(GM_NEW_RECV_EVENT):
-                       return("GM_NEW_RECV_EVENT");
-               case(GM_NEW_HIGH_RECV_EVENT):
-                       return("GM_NEW_HIGH_RECV_EVENT");
-               case(GM_NEW_PEER_RECV_EVENT):
-                       return("GM_NEW_PEER_RECV_EVENT");
-               case(GM_NEW_HIGH_PEER_RECV_EVENT):
-                       return("GM_NEW_HIGH_PEER_RECV_EVENT");
-               case(GM_NEW_FAST_RECV_EVENT):
-                       return("GM_NEW_FAST_RECV_EVENT");
-               case(GM_NEW_FAST_HIGH_RECV_EVENT):
-                       return("GM_NEW_FAST_HIGH_RECV_EVENT");
-               case(GM_NEW_FAST_PEER_RECV_EVENT):
-                       return("GM_NEW_FAST_PEER_RECV_EVENT");
-               case(GM_NEW_FAST_HIGH_PEER_RECV_EVENT):
-                       return("GM_NEW_FAST_HIGH_PEER_RECV_EVENT");
-               case(GM_NEW_REJECTED_SEND_EVENT):
-                       return("GM_NEW_REJECTED_SEND_EVENT");
-               case(GM_NEW_ORPHANED_SEND_EVENT):
-                       return("GM_NEW_ORPHANED_SEND_EVENT");
-               case(_GM_NEW_PUT_NOTIFICATION_EVENT):
-                       return("_GM_NEW_PUT_NOTIFICATION_EVENT");
-               case(GM_NEW_FREE_SEND_TOKEN_EVENT):
-                       return("GM_NEW_FREE_SEND_TOKEN_EVENT");
-               case(GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT):
-                       return("GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT");
-               case(GM_NEW_BAD_RESEND_DETECTED_EVENT):
-                       return("GM_NEW_BAD_RESEND_DETECTED_EVENT");
-               case(GM_NEW_DROPPED_SEND_EVENT):
-                       return("GM_NEW_DROPPED_SEND_EVENT");
-               case(GM_NEW_BAD_SEND_VMA_EVENT):
-                       return("GM_NEW_BAD_SEND_VMA_EVENT");
-               case(GM_NEW_BAD_RECV_VMA_EVENT):
-                       return("GM_NEW_BAD_RECV_VMA_EVENT");
-               case(_GM_NEW_FLUSHED_ALARM_EVENT):
-                       return("GM_NEW_FLUSHED_ALARM_EVENT");
-               case(GM_NEW_SENT_TOKENS_EVENT):
-                       return("GM_NEW_SENT_TOKENS_EVENT");
-               case(GM_NEW_IGNORE_RECV_EVENT):
-                       return("GM_NEW_IGNORE_RECV_EVENT");
-               case(GM_NEW_ETHERNET_RECV_EVENT):
-                       return("GM_NEW_ETHERNET_RECV_EVENT");
-               default:
-                       return("Unknown Recv event");
-#if 0
-               case(/* _GM_PUT_NOTIFICATION_EVENT */
-               case(/* GM_FREE_SEND_TOKEN_EVENT */
-               case(/* GM_FREE_HIGH_SEND_TOKEN_EVENT */
-#endif
-       }
-}
-
-
-void
-gmnal_yield(int delay)
-{
-       set_current_state(TASK_INTERRUPTIBLE);
-       schedule_timeout(delay);
-}
-
-int
-gmnal_is_small_msg(gmnal_ni_t *gmnalni, int niov, struct iovec *iov, 
-                   int len)
-{
-
-       CDEBUG(D_TRACE, "len [%d] limit[%d]\n", len, 
-              gmnalni->gmni_small_msg_size);
-
-       if ((len + sizeof(ptl_hdr_t) + sizeof(gmnal_msghdr_t)) 
-                    < gmnalni->gmni_small_msg_size) {
-
-               CDEBUG(D_NET, "Yep, small message\n");
-               return(1);
-       } else {
-               CERROR("No, not small message\n");
-               /*
-                *      could be made up of lots of little ones !
-                */
-               return(0);
-       }
-
-}
-
-/* 
- *     extract info from the receive event.
- *     Have to do this before the next call to gm_receive
- *     Deal with all endian stuff here.
- *     Then stick work entry on list where rxthreads
- *     can get it to complete the receive
- */
-int
-gmnal_add_rxtwe(gmnal_ni_t *gmnalni, gm_recv_t *recv)
-{
-       gmnal_rxtwe_t   *we = NULL;
-
-       CDEBUG(D_NET, "adding entry to list\n");
-
-       PORTAL_ALLOC(we, sizeof(gmnal_rxtwe_t));
-       if (!we) {
-               CERROR("failed to malloc\n");
-               return -ENOMEM;
-       }
-       we->buffer = gm_ntohp(recv->buffer);
-       we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
-       we->sport = (int)gm_ntoh_u8(recv->sender_port_id);
-       we->type = (int)gm_ntoh_u8(recv->type);
-       we->length = (int)gm_ntohl(recv->length);
-
-       spin_lock(&gmnalni->gmni_rxtwe_lock);
-       if (gmnalni->gmni_rxtwe_tail) {
-               gmnalni->gmni_rxtwe_tail->next = we;
-       } else {
-               gmnalni->gmni_rxtwe_head = we;
-               gmnalni->gmni_rxtwe_tail = we;
-       }
-       gmnalni->gmni_rxtwe_tail = we;
-       spin_unlock(&gmnalni->gmni_rxtwe_lock);
-
-       up(&gmnalni->gmni_rxtwe_wait);
-       return 0;
-}
-
-void
-gmnal_remove_rxtwe(gmnal_ni_t *gmnalni)
-{
-       gmnal_rxtwe_t   *_we, *we = gmnalni->gmni_rxtwe_head;
-
-       CDEBUG(D_NET, "removing all work list entries\n");
-
-       spin_lock(&gmnalni->gmni_rxtwe_lock);
-       CDEBUG(D_NET, "Got lock\n");
-       while (we) {
-               _we = we;
-               we = we->next;
-               PORTAL_FREE(_we, sizeof(gmnal_rxtwe_t));
-       }
-       spin_unlock(&gmnalni->gmni_rxtwe_lock);
-       gmnalni->gmni_rxtwe_head = NULL;
-       gmnalni->gmni_rxtwe_tail = NULL;
-}
-
-gmnal_rxtwe_t *
-gmnal_get_rxtwe(gmnal_ni_t *gmnalni)
-{
-       gmnal_rxtwe_t   *we = NULL;
-
-       CDEBUG(D_NET, "Getting entry to list\n");
-
-       do  {
-               while(down_interruptible(&gmnalni->gmni_rxtwe_wait) != 0)
-                        /* do nothing */;
-
-               if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP) {
-                       /*
-                        *      time to stop
-                        *      TO DO some one free the work entries
-                        */
-                       return(NULL);
-               }
-
-               spin_lock(&gmnalni->gmni_rxtwe_lock);
-               if (gmnalni->gmni_rxtwe_head) {
-                       CDEBUG(D_NET, "Got a work entry\n");
-                       we = gmnalni->gmni_rxtwe_head;
-                       gmnalni->gmni_rxtwe_head = we->next;
-                       if (!gmnalni->gmni_rxtwe_head)
-                               gmnalni->gmni_rxtwe_tail = NULL;
-               } else {
-                       CWARN("woken but no work\n");
-               }
-
-               spin_unlock(&gmnalni->gmni_rxtwe_lock);
-       } while (!we);
-
-       CDEBUG(D_NET, "Returning we[%p]\n", we);
-       return(we);
-}
-
-
 /*
  *     Start the caretaker thread and a number of receiver threads
  *     The caretaker thread gets events from the gm library.
@@ -851,16 +424,21 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
 {
 
        int     threads = 0;
+        int     flag;
+        
+        INIT_LIST_HEAD(&gmnalni->gmni_rxq);
+       spin_lock_init(&gmnalni->gmni_rxq_lock);
+       sema_init(&gmnalni->gmni_rxq_wait, 0);
+
        /*
         *      the alarm is used to wake the caretaker thread from 
         *      gm_unknown call (sleeping) to exit it.
         */
        CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n");
        gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm);
-       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
-
 
        CDEBUG(D_NET, "Starting caretaker thread\n");
+       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
        gmnalni->gmni_ctthread_pid = 
                 kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0);
        if (gmnalni->gmni_ctthread_pid <= 0) {
@@ -868,14 +446,13 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
                return -ENOMEM;
        }
 
-       while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
+       while (gmnalni->gmni_ctthread_flag != GMNAL_THREAD_RESET) {
                gmnal_yield(1);
                CDEBUG(D_NET, "Waiting for caretaker thread signs of life\n");
        }
 
        CDEBUG(D_NET, "caretaker thread has started\n");
 
-
        /*
         *      Now start a number of receiver threads
         *      these treads get work to do from the caretaker (ct) thread
@@ -883,19 +460,17 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
        gmnalni->gmni_rxthread_flag = GMNAL_THREAD_RESET;
        gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_RESET;
 
+       spin_lock_init(&gmnalni->gmni_rxthread_flag_lock);
        for (threads=0; threads<NRXTHREADS; threads++)
                gmnalni->gmni_rxthread_pid[threads] = -1;
-       spin_lock_init(&gmnalni->gmni_rxtwe_lock);
-       spin_lock_init(&gmnalni->gmni_rxthread_flag_lock);
-       sema_init(&gmnalni->gmni_rxtwe_wait, 0);
-       gmnalni->gmni_rxtwe_head = NULL;
-       gmnalni->gmni_rxtwe_tail = NULL;
+
         /*
          *      If the default number of receive threades isn't
          *      modified at load time, then start one thread per cpu
          */
         if (num_rx_threads == -1)
                 num_rx_threads = smp_num_cpus;
+
        CDEBUG(D_NET, "Starting [%d] receive threads\n", num_rx_threads);
        for (threads=0; threads<num_rx_threads; threads++) {
                gmnalni->gmni_rxthread_pid[threads] = 
@@ -910,11 +485,12 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
 
        for (;;) {
                spin_lock(&gmnalni->gmni_rxthread_flag_lock);
-               if (gmnalni->gmni_rxthread_flag == GMNAL_RXTHREADS_STARTED) {
-                       spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-                       break;
-               }
+                flag = gmnalni->gmni_rxthread_flag;
                spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
+                
+               if (flag == GMNAL_RXTHREADS_STARTED)
+                        break;
+
                gmnal_yield(1);
        }
 
@@ -922,3 +498,313 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
 
        return 0;
 }
+
+char * 
+gmnal_gmstatus2str(gm_status_t status)
+{
+       return(gm_strerror(status));
+
+       switch(status) {
+        case(GM_SUCCESS):
+                return("SUCCESS");
+        case(GM_FAILURE):
+                return("FAILURE");
+        case(GM_INPUT_BUFFER_TOO_SMALL):
+                return("INPUT_BUFFER_TOO_SMALL");
+        case(GM_OUTPUT_BUFFER_TOO_SMALL):
+                return("OUTPUT_BUFFER_TOO_SMALL");
+        case(GM_TRY_AGAIN ):
+                return("TRY_AGAIN");
+        case(GM_BUSY):
+                return("BUSY");
+        case(GM_MEMORY_FAULT):
+                return("MEMORY_FAULT");
+        case(GM_INTERRUPTED):
+                return("INTERRUPTED");
+        case(GM_INVALID_PARAMETER):
+                return("INVALID_PARAMETER");
+        case(GM_OUT_OF_MEMORY):
+                return("OUT_OF_MEMORY");
+        case(GM_INVALID_COMMAND):
+                return("INVALID_COMMAND");
+        case(GM_PERMISSION_DENIED):
+                return("PERMISSION_DENIED");
+        case(GM_INTERNAL_ERROR):
+                return("INTERNAL_ERROR");
+        case(GM_UNATTACHED):
+                return("UNATTACHED");
+        case(GM_UNSUPPORTED_DEVICE):
+                return("UNSUPPORTED_DEVICE");
+        case(GM_SEND_TIMED_OUT):
+                return("GM_SEND_TIMEDOUT");
+        case(GM_SEND_REJECTED):
+                return("GM_SEND_REJECTED");
+        case(GM_SEND_TARGET_PORT_CLOSED):
+                return("GM_SEND_TARGET_PORT_CLOSED");
+        case(GM_SEND_TARGET_NODE_UNREACHABLE):
+                return("GM_SEND_TARGET_NODE_UNREACHABLE");
+        case(GM_SEND_DROPPED):
+                return("GM_SEND_DROPPED");
+        case(GM_SEND_PORT_CLOSED):
+                return("GM_SEND_PORT_CLOSED");
+        case(GM_NODE_ID_NOT_YET_SET):
+                return("GM_NODE_ID_NOT_YET_SET");
+        case(GM_STILL_SHUTTING_DOWN):
+                return("GM_STILL_SHUTTING_DOWN");
+        case(GM_CLONE_BUSY):
+                return("GM_CLONE_BUSY");
+        case(GM_NO_SUCH_DEVICE):
+                return("GM_NO_SUCH_DEVICE");
+        case(GM_ABORTED):
+                return("GM_ABORTED");
+        case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
+                return("GM_INCOMPATIBLE_LIB_AND_DRIVER");
+        case(GM_UNTRANSLATED_SYSTEM_ERROR):
+                return("GM_UNTRANSLATED_SYSTEM_ERROR");
+        case(GM_ACCESS_DENIED):
+                return("GM_ACCESS_DENIED");
+
+        
+        /*
+         *     These ones are in the docs but aren't in the header file 
+         case(GM_DEV_NOT_FOUND):
+         return("GM_DEV_NOT_FOUND");
+         case(GM_INVALID_PORT_NUMBER):
+         return("GM_INVALID_PORT_NUMBER");
+         case(GM_UC_ERROR):
+         return("GM_US_ERROR");
+         case(GM_PAGE_TABLE_FULL):
+         return("GM_PAGE_TABLE_FULL");
+         case(GM_MINOR_OVERFLOW):
+         return("GM_MINOR_OVERFLOW");
+         case(GM_SEND_ORPHANED):
+         return("GM_SEND_ORPHANED");
+         case(GM_HARDWARE_FAULT):
+         return("GM_HARDWARE_FAULT");
+         case(GM_DATA_CORRUPTED):
+         return("GM_DATA_CORRUPTED");
+         case(GM_TIMED_OUT):
+         return("GM_TIMED_OUT");
+         case(GM_USER_ERROR):
+         return("GM_USER_ERROR");
+         case(GM_NO_MATCH):
+         return("GM_NOMATCH");
+         case(GM_NOT_SUPPORTED_IN_KERNEL):
+         return("GM_NOT_SUPPORTED_IN_KERNEL");
+         case(GM_NOT_SUPPORTED_ON_ARCH):
+         return("GM_NOT_SUPPORTED_ON_ARCH");
+         case(GM_PTE_REF_CNT_OVERFLOW):
+         return("GM_PTR_REF_CNT_OVERFLOW");
+         case(GM_NO_DRIVER_SUPPORT):
+         return("GM_NO_DRIVER_SUPPORT");
+         case(GM_FIRMWARE_NOT_RUNNING):
+         return("GM_FIRMWARE_NOT_RUNNING");
+         *     These ones are in the docs but aren't in the header file 
+         */
+
+        default:
+                return("UNKNOWN GM ERROR CODE");
+       }
+}
+
+
+char *
+gmnal_rxevent2str(gm_recv_event_t *ev)
+{
+       short   event;
+       event = GM_RECV_EVENT_TYPE(ev);
+       switch(event) {
+        case(GM_NO_RECV_EVENT):
+                return("GM_NO_RECV_EVENT");
+        case(GM_SENDS_FAILED_EVENT):
+                return("GM_SEND_FAILED_EVENT");
+        case(GM_ALARM_EVENT):
+                return("GM_ALARM_EVENT");
+        case(GM_SENT_EVENT):
+                return("GM_SENT_EVENT");
+        case(_GM_SLEEP_EVENT):
+                return("_GM_SLEEP_EVENT");
+        case(GM_RAW_RECV_EVENT):
+                return("GM_RAW_RECV_EVENT");
+        case(GM_BAD_SEND_DETECTED_EVENT):
+                return("GM_BAD_SEND_DETECTED_EVENT");
+        case(GM_SEND_TOKEN_VIOLATION_EVENT):
+                return("GM_SEND_TOKEN_VIOLATION_EVENT");
+        case(GM_RECV_TOKEN_VIOLATION_EVENT):
+                return("GM_RECV_TOKEN_VIOLATION_EVENT");
+        case(GM_BAD_RECV_TOKEN_EVENT):
+                return("GM_BAD_RECV_TOKEN_EVENT");
+        case(GM_ALARM_VIOLATION_EVENT):
+                return("GM_ALARM_VIOLATION_EVENT");
+        case(GM_RECV_EVENT):
+                return("GM_RECV_EVENT");
+        case(GM_HIGH_RECV_EVENT):
+                return("GM_HIGH_RECV_EVENT");
+        case(GM_PEER_RECV_EVENT):
+                return("GM_PEER_RECV_EVENT");
+        case(GM_HIGH_PEER_RECV_EVENT):
+                return("GM_HIGH_PEER_RECV_EVENT");
+        case(GM_FAST_RECV_EVENT):
+                return("GM_FAST_RECV_EVENT");
+        case(GM_FAST_HIGH_RECV_EVENT):
+                return("GM_FAST_HIGH_RECV_EVENT");
+        case(GM_FAST_PEER_RECV_EVENT):
+                return("GM_FAST_PEER_RECV_EVENT");
+        case(GM_FAST_HIGH_PEER_RECV_EVENT):
+                return("GM_FAST_HIGH_PEER_RECV_EVENT");
+        case(GM_REJECTED_SEND_EVENT):
+                return("GM_REJECTED_SEND_EVENT");
+        case(GM_ORPHANED_SEND_EVENT):
+                return("GM_ORPHANED_SEND_EVENT");
+        case(GM_BAD_RESEND_DETECTED_EVENT):
+                return("GM_BAD_RESEND_DETETED_EVENT");
+        case(GM_DROPPED_SEND_EVENT):
+                return("GM_DROPPED_SEND_EVENT");
+        case(GM_BAD_SEND_VMA_EVENT):
+                return("GM_BAD_SEND_VMA_EVENT");
+        case(GM_BAD_RECV_VMA_EVENT):
+                return("GM_BAD_RECV_VMA_EVENT");
+        case(_GM_FLUSHED_ALARM_EVENT):
+                return("GM_FLUSHED_ALARM_EVENT");
+        case(GM_SENT_TOKENS_EVENT):
+                return("GM_SENT_TOKENS_EVENTS");
+        case(GM_IGNORE_RECV_EVENT):
+                return("GM_IGNORE_RECV_EVENT");
+        case(GM_ETHERNET_RECV_EVENT):
+                return("GM_ETHERNET_RECV_EVENT");
+        case(GM_NEW_NO_RECV_EVENT):
+                return("GM_NEW_NO_RECV_EVENT");
+        case(GM_NEW_SENDS_FAILED_EVENT):
+                return("GM_NEW_SENDS_FAILED_EVENT");
+        case(GM_NEW_ALARM_EVENT):
+                return("GM_NEW_ALARM_EVENT");
+        case(GM_NEW_SENT_EVENT):
+                return("GM_NEW_SENT_EVENT");
+        case(_GM_NEW_SLEEP_EVENT):
+                return("GM_NEW_SLEEP_EVENT");
+        case(GM_NEW_RAW_RECV_EVENT):
+                return("GM_NEW_RAW_RECV_EVENT");
+        case(GM_NEW_BAD_SEND_DETECTED_EVENT):
+                return("GM_NEW_BAD_SEND_DETECTED_EVENT");
+        case(GM_NEW_SEND_TOKEN_VIOLATION_EVENT):
+                return("GM_NEW_SEND_TOKEN_VIOLATION_EVENT");
+        case(GM_NEW_RECV_TOKEN_VIOLATION_EVENT):
+                return("GM_NEW_RECV_TOKEN_VIOLATION_EVENT");
+        case(GM_NEW_BAD_RECV_TOKEN_EVENT):
+                return("GM_NEW_BAD_RECV_TOKEN_EVENT");
+        case(GM_NEW_ALARM_VIOLATION_EVENT):
+                return("GM_NEW_ALARM_VIOLATION_EVENT");
+        case(GM_NEW_RECV_EVENT):
+                return("GM_NEW_RECV_EVENT");
+        case(GM_NEW_HIGH_RECV_EVENT):
+                return("GM_NEW_HIGH_RECV_EVENT");
+        case(GM_NEW_PEER_RECV_EVENT):
+                return("GM_NEW_PEER_RECV_EVENT");
+        case(GM_NEW_HIGH_PEER_RECV_EVENT):
+                return("GM_NEW_HIGH_PEER_RECV_EVENT");
+        case(GM_NEW_FAST_RECV_EVENT):
+                return("GM_NEW_FAST_RECV_EVENT");
+        case(GM_NEW_FAST_HIGH_RECV_EVENT):
+                return("GM_NEW_FAST_HIGH_RECV_EVENT");
+        case(GM_NEW_FAST_PEER_RECV_EVENT):
+                return("GM_NEW_FAST_PEER_RECV_EVENT");
+        case(GM_NEW_FAST_HIGH_PEER_RECV_EVENT):
+                return("GM_NEW_FAST_HIGH_PEER_RECV_EVENT");
+        case(GM_NEW_REJECTED_SEND_EVENT):
+                return("GM_NEW_REJECTED_SEND_EVENT");
+        case(GM_NEW_ORPHANED_SEND_EVENT):
+                return("GM_NEW_ORPHANED_SEND_EVENT");
+        case(_GM_NEW_PUT_NOTIFICATION_EVENT):
+                return("_GM_NEW_PUT_NOTIFICATION_EVENT");
+        case(GM_NEW_FREE_SEND_TOKEN_EVENT):
+                return("GM_NEW_FREE_SEND_TOKEN_EVENT");
+        case(GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT):
+                return("GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT");
+        case(GM_NEW_BAD_RESEND_DETECTED_EVENT):
+                return("GM_NEW_BAD_RESEND_DETECTED_EVENT");
+        case(GM_NEW_DROPPED_SEND_EVENT):
+                return("GM_NEW_DROPPED_SEND_EVENT");
+        case(GM_NEW_BAD_SEND_VMA_EVENT):
+                return("GM_NEW_BAD_SEND_VMA_EVENT");
+        case(GM_NEW_BAD_RECV_VMA_EVENT):
+                return("GM_NEW_BAD_RECV_VMA_EVENT");
+        case(_GM_NEW_FLUSHED_ALARM_EVENT):
+                return("GM_NEW_FLUSHED_ALARM_EVENT");
+        case(GM_NEW_SENT_TOKENS_EVENT):
+                return("GM_NEW_SENT_TOKENS_EVENT");
+        case(GM_NEW_IGNORE_RECV_EVENT):
+                return("GM_NEW_IGNORE_RECV_EVENT");
+        case(GM_NEW_ETHERNET_RECV_EVENT):
+                return("GM_NEW_ETHERNET_RECV_EVENT");
+        default:
+                return("Unknown Recv event");
+        /* _GM_PUT_NOTIFICATION_EVENT */
+        /* GM_FREE_SEND_TOKEN_EVENT */
+        /* GM_FREE_HIGH_SEND_TOKEN_EVENT */
+        }
+}
+
+
+void
+gmnal_yield(int delay)
+{
+       set_current_state(TASK_INTERRUPTIBLE);
+       schedule_timeout(delay);
+}
+
+int
+gmnal_enqueue_rx(gmnal_ni_t *gmnalni, gm_recv_t *recv)
+{
+        void         *ptr = gm_ntohp(recv->buffer);
+        gmnal_rx_t *rx = gm_hash_find(gmnalni->gmni_rx_hash, ptr);
+
+       LASSERT (rx != NULL);
+        LASSERT (rx->rx_msg == (gmnal_msg_t *)ptr);
+
+        rx->rx_recv_nob = gm_ntohl(recv->length);
+        rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
+        rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
+        rx->rx_recv_type = gm_ntoh_u8(recv->type);
+        
+       spin_lock(&gmnalni->gmni_rxq_lock);
+        list_add_tail (&rx->rx_list, &gmnalni->gmni_rxq);
+       spin_unlock(&gmnalni->gmni_rxq_lock);
+
+       up(&gmnalni->gmni_rxq_wait);
+       return 0;
+}
+
+gmnal_rx_t *
+gmnal_dequeue_rx(gmnal_ni_t *gmnalni)
+{
+       gmnal_rx_t      *rx;
+
+       CDEBUG(D_NET, "Getting entry to list\n");
+
+        for (;;) {
+               while(down_interruptible(&gmnalni->gmni_rxq_wait) != 0)
+                        /* do nothing */;
+
+               if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP)
+                       return NULL;
+
+               spin_lock(&gmnalni->gmni_rxq_lock);
+
+                if (list_empty(&gmnalni->gmni_rxq)) {
+                        rx = NULL;
+                } else {
+                        rx = list_entry(gmnalni->gmni_rxq.next,
+                                        gmnal_rx_t, rx_list);
+                        list_del(&rx->rx_list);
+                }
+
+               spin_unlock(&gmnalni->gmni_rxq_lock);
+
+                if (rx != NULL)
+                        return rx;
+                
+                CWARN("woken but no work\n");
+       }
+}
+
+