Whamcloud - gitweb
* GMNAL
authoreeb <eeb>
Thu, 18 Aug 2005 17:43:26 +0000 (17:43 +0000)
committereeb <eeb>
Thu, 18 Aug 2005 17:43:26 +0000 (17:43 +0000)
    - cleaned up structs
    - removed buggy RDMA handling (previous version is tagged
      GM_PRE_REMOVE_BUGGY_RDMA for reference)
    - fixed memory leak on teardown

lnet/klnds/gmlnd/gmlnd.h
lnet/klnds/gmlnd/gmlnd_api.c
lnet/klnds/gmlnd/gmlnd_cb.c
lnet/klnds/gmlnd/gmlnd_comm.c
lnet/klnds/gmlnd/gmlnd_module.c
lnet/klnds/gmlnd/gmlnd_utils.c

index 6076d14..fe39506 100644 (file)
 #define GMNAL_MAGIC                    0x1234abcd
 
 #define GMNAL_SMALL_MESSAGE            1078
-#define GMNAL_LARGE_MESSAGE_INIT       1079
-#define GMNAL_LARGE_MESSAGE_ACK                1080
-#define GMNAL_LARGE_MESSAGE_FINI       1081
 
-extern  int gmnal_small_msg_size;
 extern  int num_rx_threads;
 extern  int num_stxds;
 extern  int gm_port_id;
@@ -117,67 +113,58 @@ extern  int gm_port_id;
  *     and the other by the NAL rxthreads when doing sends. 
  *     This helps prevent deadlock caused by stxd starvation.
  */
-typedef struct _gmnal_stxd_t {
-       void                    *buffer;
-       int                     buffer_size;
-       gm_size_t               gm_size;
-       int                     msg_size;
-       int                     gm_target_node;
-       int                     gm_priority;
-       int                     type;
-       struct _gmnal_data_t    *nal_data;
-       lib_msg_t               *cookie;
-       int                     niov;
-       struct iovec            iov[PTL_MD_MAX_IOV];
-       struct _gmnal_stxd_t    *next;
-        int                     rxt; 
-        int                     kniov;
-        struct iovec            *iovec_dup;
+typedef struct gmnal_stxd {
+       struct gmnal_stxd       *tx_next;
+       void                    *tx_buffer;
+       int                      tx_buffer_size;
+       gm_size_t                tx_gm_size;
+       int                      tx_msg_size;
+       int                      tx_gmlid;
+       int                      tx_gm_priority;
+       int                      tx_type;
+        ptl_nid_t                tx_nid;
+       struct gmnal_ni         *tx_gmni;
+       lib_msg_t               *tx_cookie;
+       int                      tx_niov;
+        int                      tx_rxt; 
+        int                      tx_kniov;
+        struct iovec            *tx_iovec_dup;
+       struct iovec             tx_iov[PTL_MD_MAX_IOV];
 } gmnal_stxd_t;
 
 /*
- *     keeps a transmit token for large transmit (gm_get)
- *     and a pointer to rxd that is used as context for large receive
- */
-typedef struct _gmnal_ltxd_t {
-       struct _gmnal_ltxd_t    *next;
-       struct  _gmnal_srxd_t  *srxd;
-} gmnal_ltxd_t;
-
-
-/*
  *     as for gmnal_stxd_t 
  *     a hash table in nal_data find srxds from
  *     the rx buffer address. hash table populated at init time
  */
-typedef struct _gmnal_srxd_t {
-       void                    *buffer;
-       int                     size;
-       gm_size_t               gmsize;
-       unsigned int            gm_source_node;
-       gmnal_stxd_t            *source_stxd;
-       int                     type;
-       int                     nsiov;
-       int                     nriov;
-       struct iovec            *riov;
-       int                     ncallbacks;
-       spinlock_t              callback_lock;
-       int                     callback_status;
-       lib_msg_t               *cookie;
-       struct _gmnal_srxd_t    *next;
-       struct _gmnal_data_t    *nal_data;
+typedef struct gmnal_srxd {
+       void                    *rx_buffer;
+       int                      rx_size;
+       gm_size_t                rx_gmsize;
+       unsigned int             rx_sender_gmid;
+       __u64                    rx_source_stxd;
+       int                      rx_type;
+       int                      rx_nsiov;
+       int                      rx_nriov;
+       struct iovec            *rx_riov;
+       int                      rx_ncallbacks;
+       spinlock_t               rx_callback_lock;
+       int                      rx_callback_status;
+       lib_msg_t               *rx_cookie;
+       struct gmnal_srxd       *rx_next;
+       struct gmnal_ni         *rx_gmni;
 } gmnal_srxd_t;
 
 /*
  *     Header which lmgnal puts at the start of each message
  *     watch alignment for ia32/64 interaction
  */
-typedef struct _gmnal_msghdr {
-       __s32           magic;
-       __s32           type;
-       __u32           sender_node_id;
-       __s32           niov;
-       gm_remote_ptr_t stxd_remote_ptr; /* 64 bits */
+typedef struct gmnal_msghdr {
+       __s32           gmm_magic;
+       __s32           gmm_type;
+       __s32           gmm_niov;
+       __u32           gmm_sender_gmid;
+       __u64           gmm_stxd_remote_ptr;
 } WIRE_ATTR gmnal_msghdr_t;
 
 /*
@@ -192,13 +179,13 @@ typedef struct    _gmnal_msghdr {
  *     is exhausted (as caretaker thread is responsible for replacing 
  *     transmit descriptors on the free list)
  */
-typedef struct _gmnal_rxtwe {
+typedef struct gmnal_rxtwe {
        void                    *buffer;
        unsigned                snode;
        unsigned                sport;
        unsigned                type;
        unsigned                length;
-       struct _gmnal_rxtwe     *next;
+       struct gmnal_rxtwe      *next;
 } gmnal_rxtwe_t;
 
 /*
@@ -206,43 +193,35 @@ typedef struct _gmnal_rxtwe {
  */
 #define NRXTHREADS 10 /* max number of receiver threads */
 
-typedef struct _gmnal_data_t {
-       int             refcnt;
-       spinlock_t      cb_lock;
-       spinlock_t      stxd_lock;
-       struct semaphore stxd_token;
-       gmnal_stxd_t    *stxd;
-       spinlock_t      rxt_stxd_lock;
-       struct semaphore rxt_stxd_token;
-       gmnal_stxd_t    *rxt_stxd;
-       spinlock_t      ltxd_lock;
-       struct semaphore ltxd_token;
-       gmnal_ltxd_t    *ltxd;
-       spinlock_t      srxd_lock;
-       struct semaphore srxd_token;
-       gmnal_srxd_t    *srxd;
-       struct gm_hash  *srxd_hash;
-       nal_t           *nal;   
-       lib_nal_t       *libnal;
-       struct gm_port  *gm_port;
-       unsigned int    gm_local_nid;
-       unsigned int    gm_global_nid;
-       spinlock_t      gm_lock;
-       long            rxthread_pid[NRXTHREADS];
-       int             rxthread_stop_flag;
-       spinlock_t      rxthread_flag_lock;
-       long            rxthread_flag;
-       long            ctthread_pid;
-       int             ctthread_flag;
-       gm_alarm_t      ctthread_alarm;
-       int             small_msg_size;
-       int             small_msg_gmsize;
-       gmnal_rxtwe_t   *rxtwe_head;
-       gmnal_rxtwe_t   *rxtwe_tail;
-       spinlock_t      rxtwe_lock;
-       struct  semaphore rxtwe_wait;
-        struct ctl_table_header *sysctl;
-} gmnal_data_t;
+typedef struct gmnal_ni {
+       spinlock_t       gmni_stxd_lock;
+       struct semaphore gmni_stxd_token;
+       gmnal_stxd_t    *gmni_stxd;
+       spinlock_t       gmni_rxt_stxd_lock;
+       struct semaphore gmni_rxt_stxd_token;
+       gmnal_stxd_t    *gmni_rxt_stxd;
+       gmnal_srxd_t    *gmni_srxd;
+       struct gm_hash  *gmni_srxd_hash;
+       nal_t           *gmni_nal;      
+       lib_nal_t       *gmni_libnal;
+       struct gm_port  *gmni_port;
+       __u32            gmni_local_gmid;
+       __u32            gmni_global_gmid;
+       spinlock_t       gmni_gm_lock;          /* serialise GM calls */
+       long             gmni_rxthread_pid[NRXTHREADS];
+       int              gmni_rxthread_stop_flag;
+       spinlock_t       gmni_rxthread_flag_lock;
+       long             gmni_rxthread_flag;
+       long             gmni_ctthread_pid;
+       int              gmni_ctthread_flag;
+       gm_alarm_t       gmni_ctthread_alarm;
+       int              gmni_small_msg_size;
+       int              gmni_small_msg_gmsize;
+       gmnal_rxtwe_t   *gmni_rxtwe_head;
+       gmnal_rxtwe_t   *gmni_rxtwe_tail;
+       spinlock_t       gmni_rxtwe_lock;
+       struct semaphore gmni_rxtwe_wait;
+} gmnal_ni_t;
 
 /*
  *     Flags to start/stop and check status of threads
@@ -255,22 +234,12 @@ typedef struct _gmnal_data_t {
 #define GMNAL_RXTHREADS_STARTED ( (1<<num_rx_threads)-1)
 
 
-extern gmnal_data_t    *global_nal_data;
-
 /*
  * for ioctl get pid
  */
 #define GMNAL_IOC_GET_GNID 1   
 
 /*
- *     Return codes
- */
-#define GMNAL_STATUS_OK        0
-#define GMNAL_STATUS_FAIL      1
-#define GMNAL_STATUS_NOMEM     2
-
-
-/*
  *     FUNCTION PROTOTYPES
  */
 
@@ -319,31 +288,26 @@ void  gmnal_fini(void);
 /*
  *     Small and Large Transmit and Receive Descriptor Functions
  */
-int            gmnal_alloc_txd(gmnal_data_t *);
-void           gmnal_free_txd(gmnal_data_t *);
-gmnal_stxd_t*  gmnal_get_stxd(gmnal_data_t *, int);
-void           gmnal_return_stxd(gmnal_data_t *, gmnal_stxd_t *);
-gmnal_ltxd_t*  gmnal_get_ltxd(gmnal_data_t *);
-void           gmnal_return_ltxd(gmnal_data_t *, gmnal_ltxd_t *);
-
-int            gmnal_alloc_srxd(gmnal_data_t *);
-void           gmnal_free_srxd(gmnal_data_t *);
-gmnal_srxd_t*  gmnal_get_srxd(gmnal_data_t *, int);
-void           gmnal_return_srxd(gmnal_data_t *, gmnal_srxd_t *);
+int            gmnal_alloc_txd(gmnal_ni_t *);
+void           gmnal_free_txd(gmnal_ni_t *);
+gmnal_stxd_t*  gmnal_get_stxd(gmnal_ni_t *, int);
+void           gmnal_return_stxd(gmnal_ni_t *, gmnal_stxd_t *);
+
+int            gmnal_alloc_srxd(gmnal_ni_t *);
+void           gmnal_free_srxd(gmnal_ni_t *);
 
 /*
  *     general utility functions
  */
-gmnal_srxd_t   *gmnal_rxbuffer_to_srxd(gmnal_data_t *, void*);
-void           gmnal_stop_rxthread(gmnal_data_t *);
-void           gmnal_stop_ctthread(gmnal_data_t *);
+gmnal_srxd_t   *gmnal_rxbuffer_to_srxd(gmnal_ni_t *, void*);
+void           gmnal_stop_rxthread(gmnal_ni_t *);
+void           gmnal_stop_ctthread(gmnal_ni_t *);
 void           gmnal_drop_sends_callback(gm_port_t *, void *, gm_status_t);
 void           gmnal_resume_sending_callback(gm_port_t *, void *, gm_status_t);
 char           *gmnal_gm_error(gm_status_t);
 char           *gmnal_rxevent(gm_recv_event_t*);
-int            gmnal_is_small_msg(gmnal_data_t*, int, struct iovec*, int);
 void           gmnal_yield(int);
-int            gmnal_start_kernel_threads(gmnal_data_t *);
+int            gmnal_start_kernel_threads(gmnal_ni_t *);
 
 
 /*
@@ -355,47 +319,21 @@ int               gmnal_start_kernel_threads(gmnal_data_t *);
  */
 int            gmnal_ct_thread(void *); /* caretaker thread */
 int            gmnal_rx_thread(void *); /* receive thread */
-int            gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int);
-int            gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*);
-int            gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *);
-int            gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *);
-gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *);
-void           gmnal_remove_rxtwe(gmnal_data_t *);
+void           gmnal_pre_receive(gmnal_ni_t*, gmnal_rxtwe_t*, int);
+void           gmnal_rx_bad(gmnal_ni_t *, gmnal_rxtwe_t *);
+void           gmnal_rx_requeue_buffer(gmnal_ni_t *, gmnal_srxd_t *);
+int            gmnal_add_rxtwe(gmnal_ni_t *, gm_recv_t *);
+gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_ni_t *);
+void           gmnal_remove_rxtwe(gmnal_ni_t *);
 
 
 /*
  *     Small messages
  */
-ptl_err_t      gmnal_small_rx(lib_nal_t *, void *, lib_msg_t *);
-ptl_err_t      gmnal_small_tx(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
-                               int, ptl_nid_t, ptl_pid_t,
-                               gmnal_stxd_t*, int);
+ptl_err_t       gmnal_small_tx(lib_nal_t *libnal, void *private, 
+                               lib_msg_t *cookie, ptl_hdr_t *hdr, 
+                               int type, ptl_nid_t nid, 
+                               gmnal_stxd_t *stxd, int size);
 void           gmnal_small_tx_callback(gm_port_t *, void *, gm_status_t);
 
-
-
-/*
- *     Large messages
- */
-int            gmnal_large_rx(lib_nal_t *, void *, lib_msg_t *, unsigned int, 
-                               struct iovec *, size_t, size_t, size_t);
-
-int            gmnal_large_tx(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *, 
-                               int, ptl_nid_t, ptl_pid_t, unsigned int, 
-                               struct iovec*, size_t, int);
-
-void           gmnal_large_tx_callback(gm_port_t *, void *, gm_status_t);
-
-int            gmnal_remote_get(gmnal_srxd_t *, int, struct iovec*, int, 
-                                 struct iovec*);
-
-void           gmnal_remote_get_callback(gm_port_t *, void *, gm_status_t);
-
-int            gmnal_copyiov(int, gmnal_srxd_t *, int, struct iovec*, int, 
-                              struct iovec*);
-
-void           gmnal_large_tx_ack(gmnal_data_t *, gmnal_srxd_t *);
-void           gmnal_large_tx_ack_callback(gm_port_t *, void *, gm_status_t);
-void           gmnal_large_tx_ack_received(gmnal_data_t *, gmnal_srxd_t *);
-
 #endif /*__INCLUDE_GMNAL_H__*/
index 12efc63..105be90 100644 (file)
 
 #include "gmnal.h"
 
-
-
-gmnal_data_t   *global_nal_data = NULL;
-#define         GLOBAL_NID_STR_LEN      16
-char            global_nid_str[GLOBAL_NID_STR_LEN] = {0};
 ptl_handle_ni_t kgmnal_ni;
 
 extern int gmnal_cmd(struct portals_cfg *pcfg, void *private);
 
 /*
- *      Write the global nid /proc/sys/gmnal/globalnid
- */
-#define GMNAL_SYSCTL    201
-#define GMNAL_SYSCTL_GLOBALNID  1
-
-static ctl_table gmnal_sysctl_table[] = {
-        {GMNAL_SYSCTL_GLOBALNID, "globalnid",
-         global_nid_str, GLOBAL_NID_STR_LEN,
-         0444, NULL, &proc_dostring},
-        { 0 }
-};
-
-
-static ctl_table gmnalnal_top_sysctl_table[] = {
-        {GMNAL_SYSCTL, "gmnal", NULL, 0, 0555, gmnal_sysctl_table},
-        { 0 }
-};
-
-/*
  *     gmnal_api_shutdown
  *      nal_refct == 0 => called on last matching PtlNIFini()
  *     Close down this interface and free any resources associated with it
@@ -62,7 +38,7 @@ static ctl_table gmnalnal_top_sysctl_table[] = {
 void
 gmnal_api_shutdown(nal_t *nal)
 {
-       gmnal_data_t    *nal_data;
+       gmnal_ni_t      *gmnalni;
        lib_nal_t       *libnal;
 
         if (nal->nal_refct != 0) {
@@ -71,11 +47,9 @@ gmnal_api_shutdown(nal_t *nal)
                 return;
         }
 
-        LASSERT(nal == global_nal_data->nal);
         libnal = (lib_nal_t *)nal->nal_data;
-        nal_data = (gmnal_data_t *)libnal->libnal_data;
-        LASSERT(nal_data == global_nal_data);
-       CDEBUG(D_TRACE, "gmnal_api_shutdown: nal_data [%p]\n", nal_data);
+        gmnalni = (gmnal_ni_t *)libnal->libnal_data;
+       CDEBUG(D_TRACE, "gmnal_api_shutdown: gmnalni [%p]\n", gmnalni);
 
         /* Stop portals calling our ioctl handler */
         libcfs_nal_cmd_unregister(GMNAL);
@@ -86,21 +60,17 @@ gmnal_api_shutdown(nal_t *nal)
          * shutdown our threads, THEN lib_fini() */
         lib_fini(libnal);
 
-       gmnal_stop_rxthread(nal_data);
-       gmnal_stop_ctthread(nal_data);
-       gmnal_free_txd(nal_data);
-       gmnal_free_srxd(nal_data);
-       spin_lock(&nal_data->gm_lock);
-       gm_close(nal_data->gm_port);
+       gmnal_stop_rxthread(gmnalni);
+       gmnal_stop_ctthread(gmnalni);
+       gmnal_free_txd(gmnalni);
+       gmnal_free_srxd(gmnalni);
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_close(gmnalni->gmni_port);
        gm_finalize();
-       spin_unlock(&nal_data->gm_lock);
-        if (nal_data->sysctl)
-                unregister_sysctl_table (nal_data->sysctl);
+       spin_unlock(&gmnalni->gmni_gm_lock);
         /* Don't free 'nal'; it's a static struct */
-       PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+       PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
        PORTAL_FREE(libnal, sizeof(lib_nal_t));
-
-        global_nal_data = NULL;
 }
 
 
@@ -111,10 +81,10 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
 {
 
        lib_nal_t       *libnal = NULL;
-       gmnal_data_t    *nal_data = NULL;
+       gmnal_ni_t      *gmnalni = NULL;
        gmnal_srxd_t    *srxd = NULL;
        gm_status_t     gm_status;
-       unsigned int    local_nid = 0, global_nid = 0;
+       unsigned int    local_gmid = 0, global_gmid = 0;
         ptl_process_id_t process_id;
 
         if (nal->nal_refct != 0) {
@@ -130,24 +100,22 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
 
        CDEBUG(D_TRACE, "startup\n");
 
-        LASSERT(global_nal_data == NULL);
-
-       PORTAL_ALLOC(nal_data, sizeof(gmnal_data_t));
-       if (!nal_data) {
+       PORTAL_ALLOC(gmnalni, sizeof(gmnal_ni_t));
+       if (!gmnalni) {
                CERROR("can't get memory\n");
                return(PTL_NO_SPACE);
        }       
-       memset(nal_data, 0, sizeof(gmnal_data_t));
+       memset(gmnalni, 0, sizeof(gmnal_ni_t));
        /*
         *      set the small message buffer size 
         */
 
-       CDEBUG(D_INFO, "Allocd and reset nal_data[%p]\n", nal_data);
-       CDEBUG(D_INFO, "small_msg_size is [%d]\n", nal_data->small_msg_size);
+       CDEBUG(D_NET, "Allocd and reset gmnalni[%p]\n", gmnalni);
+       CDEBUG(D_NET, "small_msg_size is [%d]\n", gmnalni->gmni_small_msg_size);
 
        PORTAL_ALLOC(libnal, sizeof(lib_nal_t));
        if (!libnal) {
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
                return(PTL_NO_SPACE);
        }
 
@@ -159,27 +127,23 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
         libnal->libnal_map = NULL;
         libnal->libnal_unmap = NULL;
         libnal->libnal_dist = gmnal_cb_dist;
-        libnal->libnal_data = NULL;
+        libnal->libnal_data = gmnalni;
 
-       CDEBUG(D_INFO, "Allocd and reset libnal[%p]\n", libnal);
+       CDEBUG(D_NET, "Allocd and reset libnal[%p]\n", libnal);
 
-       /*
-        *      String them all together
-        */
-       libnal->libnal_data = (void*)nal_data;
-       nal_data->nal = nal;
-       nal_data->libnal = libnal;
+       gmnalni->gmni_nal = nal;
+       gmnalni->gmni_libnal = libnal;
 
-       spin_lock_init(&nal_data->gm_lock);
+       spin_lock_init(&gmnalni->gmni_gm_lock);
 
 
        /*
         *      initialise the interface,
         */
-       CDEBUG(D_INFO, "Calling gm_init\n");
+       CDEBUG(D_NET, "Calling gm_init\n");
        if (gm_init() != GM_SUCCESS) {
                CERROR("call to gm_init failed\n");
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
                PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
@@ -189,14 +153,14 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
               "name [%s], version [%d]\n", gm_port_id,
               "gmnal", GM_API_VERSION);
 
-       spin_lock(&nal_data->gm_lock);
-       gm_status = gm_open(&nal_data->gm_port, 0, gm_port_id, "gmnal",
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_status = gm_open(&gmnalni->gmni_port, 0, gm_port_id, "gmnal",
                            GM_API_VERSION);
-       spin_unlock(&nal_data->gm_lock);
+       spin_unlock(&gmnalni->gmni_gm_lock);
 
-       CDEBUG(D_INFO, "gm_open returned [%d]\n", gm_status);
+       CDEBUG(D_NET, "gm_open returned [%d]\n", gm_status);
        if (gm_status == GM_SUCCESS) {
-               CDEBUG(D_INFO,"gm_open succeeded port[%p]\n",nal_data->gm_port);
+               CDEBUG(D_NET,"gm_open succeeded port[%p]\n",gmnalni->gmni_port);
        } else {
                switch(gm_status) {
                case(GM_INVALID_PARAMETER):
@@ -219,26 +183,31 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                                gm_status);
                        break;
                }       
-               spin_lock(&nal_data->gm_lock);
+               spin_lock(&gmnalni->gmni_gm_lock);
                gm_finalize();
-               spin_unlock(&nal_data->gm_lock);
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
                PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
 
-       nal_data->small_msg_size = gmnal_small_msg_size;
-       nal_data->small_msg_gmsize =
-                       gm_min_size_for_length(gmnal_small_msg_size);
+       gmnalni->gmni_small_msg_size = sizeof(gmnal_msghdr_t) + 
+                                        sizeof(ptl_hdr_t) +
+                                        PTL_MTU +
+                                        928;    /* !! */
+        CWARN("Msg size %08x\n", gmnalni->gmni_small_msg_size);
 
-       if (gmnal_alloc_srxd(nal_data) != GMNAL_STATUS_OK) {
+       gmnalni->gmni_small_msg_gmsize =
+                gm_min_size_for_length(gmnalni->gmni_small_msg_size);
+
+       if (gmnal_alloc_srxd(gmnalni) != 0) {
                CERROR("Failed to allocate small rx descriptors\n");
-               gmnal_free_txd(nal_data);
-               spin_lock(&nal_data->gm_lock);
-               gm_close(nal_data->gm_port);
+               gmnal_free_txd(gmnalni);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_close(gmnalni->gmni_port);
                gm_finalize();
-               spin_unlock(&nal_data->gm_lock);
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
                PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
@@ -248,26 +217,27 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
         *      Hang out a bunch of small receive buffers
         *      In fact hang them all out
         */
-       while((srxd = gmnal_get_srxd(nal_data, 0))) {
+        for (srxd = gmnalni->gmni_srxd; srxd != NULL; srxd = srxd->rx_next) {
                CDEBUG(D_NET, "giving [%p] to gm_provide_recvive_buffer\n", 
-                      srxd->buffer);
-               spin_lock(&nal_data->gm_lock);
-               gm_provide_receive_buffer_with_tag(nal_data->gm_port, 
-                                                  srxd->buffer, srxd->gmsize, 
+                      srxd->rx_buffer);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, 
+                                                  srxd->rx_buffer, 
+                                                   srxd->rx_gmsize, 
                                                   GM_LOW_PRIORITY, 0);
-               spin_unlock(&nal_data->gm_lock);
+               spin_unlock(&gmnalni->gmni_gm_lock);
        }
        
        /*
         *      Allocate pools of small tx buffers and descriptors
         */
-       if (gmnal_alloc_txd(nal_data) != GMNAL_STATUS_OK) {
+       if (gmnal_alloc_txd(gmnalni) != 0) {
                CERROR("Failed to allocate small tx descriptors\n");
-               spin_lock(&nal_data->gm_lock);
-               gm_close(nal_data->gm_port);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_close(gmnalni->gmni_port);
                gm_finalize();
-               spin_unlock(&nal_data->gm_lock);
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
                PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
@@ -276,71 +246,71 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
         *      Initialise the portals library
         */
        CDEBUG(D_NET, "Getting node id\n");
-       spin_lock(&nal_data->gm_lock);
-       gm_status = gm_get_node_id(nal_data->gm_port, &local_nid);
-       spin_unlock(&nal_data->gm_lock);
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_status = gm_get_node_id(gmnalni->gmni_port, &local_gmid);
+       spin_unlock(&gmnalni->gmni_gm_lock);
        if (gm_status != GM_SUCCESS) {
-               gmnal_stop_rxthread(nal_data);
-               gmnal_stop_ctthread(nal_data);
+               gmnal_stop_rxthread(gmnalni);
+               gmnal_stop_ctthread(gmnalni);
                CERROR("can't determine node id\n");
-               gmnal_free_txd(nal_data);
-               gmnal_free_srxd(nal_data);
-               spin_lock(&nal_data->gm_lock);
-               gm_close(nal_data->gm_port);
+               gmnal_free_txd(gmnalni);
+               gmnal_free_srxd(gmnalni);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_close(gmnalni->gmni_port);
                gm_finalize();
-               spin_unlock(&nal_data->gm_lock);
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
                PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
 
-       nal_data->gm_local_nid = local_nid;
-       CDEBUG(D_INFO, "Local node id is [%u]\n", local_nid);
+       gmnalni->gmni_local_gmid = local_gmid;
+       CDEBUG(D_NET, "Local node id is [%u]\n", local_gmid);
 
-       spin_lock(&nal_data->gm_lock);
-       gm_status = gm_node_id_to_global_id(nal_data->gm_port, local_nid, 
-                                           &global_nid);
-       spin_unlock(&nal_data->gm_lock);
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_status = gm_node_id_to_global_id(gmnalni->gmni_port, 
+                                            local_gmid, 
+                                           &global_gmid);
+       spin_unlock(&gmnalni->gmni_gm_lock);
        if (gm_status != GM_SUCCESS) {
                CERROR("failed to obtain global id\n");
-               gmnal_stop_rxthread(nal_data);
-               gmnal_stop_ctthread(nal_data);
-               gmnal_free_txd(nal_data);
-               gmnal_free_srxd(nal_data);
-               spin_lock(&nal_data->gm_lock);
-               gm_close(nal_data->gm_port);
+               gmnal_stop_rxthread(gmnalni);
+               gmnal_stop_ctthread(gmnalni);
+               gmnal_free_txd(gmnalni);
+               gmnal_free_srxd(gmnalni);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_close(gmnalni->gmni_port);
                gm_finalize();
-               spin_unlock(&nal_data->gm_lock);
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
                PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
-       CDEBUG(D_INFO, "Global node id is [%u]\n", global_nid);
-       nal_data->gm_global_nid = global_nid;
-        snprintf(global_nid_str, GLOBAL_NID_STR_LEN, "%u", global_nid);
+       CDEBUG(D_NET, "Global node id is [%u]\n", global_gmid);
+       gmnalni->gmni_global_gmid = global_gmid;
 
 /*
        pid = gm_getpid();
 */
         process_id.pid = requested_pid;
-        process_id.nid = global_nid;
+        process_id.nid = global_gmid;
 
-       CDEBUG(D_INFO, "portals_pid is [%u]\n", process_id.pid);
-       CDEBUG(D_INFO, "portals_nid is ["LPU64"]\n", process_id.nid);
+       CDEBUG(D_NET, "portals_pid is [%u]\n", process_id.pid);
+       CDEBUG(D_NET, "portals_nid is ["LPU64"]\n", process_id.nid);
 
        CDEBUG(D_PORTALS, "calling lib_init\n");
        if (lib_init(libnal, nal, process_id,
                      requested_limits, actual_limits) != PTL_OK) {
                CERROR("lib_init failed\n");
-               gmnal_stop_rxthread(nal_data);
-               gmnal_stop_ctthread(nal_data);
-               gmnal_free_txd(nal_data);
-               gmnal_free_srxd(nal_data);
-               spin_lock(&nal_data->gm_lock);
-               gm_close(nal_data->gm_port);
+               gmnal_stop_rxthread(gmnalni);
+               gmnal_stop_ctthread(gmnalni);
+               gmnal_free_txd(gmnalni);
+               gmnal_free_srxd(gmnalni);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_close(gmnalni->gmni_port);
                gm_finalize();
-               spin_unlock(&nal_data->gm_lock);
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));
                PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
@@ -349,43 +319,36 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
         * Now that we have initialised the portals library, start receive threads,
         * we do this to avoid processing messages before we can parse them
         */
-       gmnal_start_kernel_threads(nal_data);
+       gmnal_start_kernel_threads(gmnalni);
 
-       while (nal_data->rxthread_flag != GMNAL_RXTHREADS_STARTED) {
+       while (gmnalni->gmni_rxthread_flag != GMNAL_RXTHREADS_STARTED) {
                gmnal_yield(1);
-               CDEBUG(D_INFO, "Waiting for receive thread signs of life\n");
+               CDEBUG(D_NET, "Waiting for receive thread signs of life\n");
        }
 
-       CDEBUG(D_INFO, "receive thread seems to have started\n");
+       CDEBUG(D_NET, "receive thread seems to have started\n");
 
        if (libcfs_nal_cmd_register(GMNAL, &gmnal_cmd, libnal->libnal_data) != 0) {
-               CDEBUG(D_INFO, "libcfs_nal_cmd_register failed\n");
+               CDEBUG(D_NET, "libcfs_nal_cmd_register failed\n");
 
                 /* XXX these cleanup cases should be restructured to
                  * minimise duplication... */
                 lib_fini(libnal);
                 
-               gmnal_stop_rxthread(nal_data);
-               gmnal_stop_ctthread(nal_data);
-               gmnal_free_txd(nal_data);
-               gmnal_free_srxd(nal_data);
-               spin_lock(&nal_data->gm_lock);
-               gm_close(nal_data->gm_port);
+               gmnal_stop_rxthread(gmnalni);
+               gmnal_stop_ctthread(gmnalni);
+               gmnal_free_txd(gmnalni);
+               gmnal_free_srxd(gmnalni);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_close(gmnalni->gmni_port);
                gm_finalize();
-               spin_unlock(&nal_data->gm_lock);
-               PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               PORTAL_FREE(gmnalni, sizeof(gmnal_ni_t));       
                PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
         }
 
-        /* might be better to initialise this at module load rather than in
-         * NAL startup */
-        nal_data->sysctl = NULL;
-        nal_data->sysctl = register_sysctl_table (gmnalnal_top_sysctl_table, 0);
-
-       CDEBUG(D_INFO, "gmnal_init finished\n");
-
-       global_nal_data = libnal->libnal_data;
+       CDEBUG(D_NET, "gmnal_init finished\n");
 
        return(PTL_OK);
 }
@@ -399,7 +362,7 @@ int gmnal_init(void)
 {
         int    rc;
 
-       CDEBUG(D_INFO, "reset nal[%p]\n", &the_gm_nal);
+       CDEBUG(D_NET, "reset nal[%p]\n", &the_gm_nal);
 
         the_gm_nal = (nal_t) {
                 .nal_ni_init = gmnal_api_startup,
@@ -430,5 +393,4 @@ void gmnal_fini()
         PtlNIFini(kgmnal_ni);
 
         ptl_unregister_nal(GMNAL);
-        LASSERT(global_nal_data == NULL);
 }
index d94bb88..ac4c485 100644 (file)
@@ -31,281 +31,213 @@ ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                   unsigned int niov, struct iovec *iov, size_t offset,
                   size_t mlen, size_t rlen)
 {
-        void            *buffer = NULL;
        gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       int             status = PTL_OK;
-        size_t          msglen = mlen;
-        size_t          nob;
+        size_t           nobleft = mlen;
+        void            *buffer = NULL;
+        size_t           nob;
 
        CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
               "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
               libnal, private, cookie, niov, iov, offset, mlen, rlen);
 
-       switch(srxd->type) {
-       case(GMNAL_SMALL_MESSAGE):
-               CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
-               /* HP SFS 1380: Proactively change receives to avoid a receive
-                *  side occurrence of filling pkmap_count[].
-                */
-               buffer = srxd->buffer;
-               buffer += sizeof(gmnal_msghdr_t);
-               buffer += sizeof(ptl_hdr_t);
-
-               while(niov--) {
-                       if (offset >= iov->iov_len) {
-                               offset -= iov->iov_len;
-                       } else {
-                                nob = MIN (iov->iov_len - offset, msglen);
-                                CDEBUG(D_INFO, "processing iov [%p] base [%p] "
-                                       "offset [%d] len ["LPSZ"] to [%p] left "
-                                       "["LPSZ"]\n", iov, iov->iov_base,
-                                       offset, nob, buffer, msglen);
-                                gm_bcopy(buffer, iov->iov_base + offset, nob);
-                                buffer += nob;
-                                msglen -= nob;
-                                offset = 0;
-                       }
-                       iov++;
-               }
-               status = gmnal_small_rx(libnal, private, cookie);
-       break;
-       case(GMNAL_LARGE_MESSAGE_INIT):
-               CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
-               status = gmnal_large_rx(libnal, private, cookie, niov, 
-                                        iov, offset, mlen, rlen);
-       }
+       LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE);
+        
+        buffer = srxd->rx_buffer;
+        buffer += sizeof(gmnal_msghdr_t);
+        buffer += sizeof(ptl_hdr_t);
+
+        while(nobleft > 0) {
+                LASSERT (niov > 0);
+
+                if (offset >= iov->iov_len) {
+                        offset -= iov->iov_len;
+                } else {
+                        nob = MIN (iov->iov_len - offset, nobleft);
 
-       CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
-       return(status);
+                        gm_bcopy(buffer, iov->iov_base + offset, nob);
+
+                        buffer += nob;
+                        nobleft -= nob;
+                        offset = 0;
+                }
+                niov--;
+                iov++;
+        }
+
+        lib_finalize(libnal, private, cookie, PTL_OK);
+       return PTL_OK;
 }
 
 ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
-                              lib_msg_t *cookie, unsigned int kniov,
+                              lib_msg_t *cookie, unsigned int nkiov,
                               ptl_kiov_t *kiov, size_t offset, size_t mlen,
                               size_t rlen)
 {
        gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       int             status = PTL_OK;
-       char            *ptr = NULL;
-       void            *buffer = NULL;
-
+        size_t           nobleft = mlen;
+        size_t           nob;
+       char            *ptr;
+       void            *buffer;
 
        CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
-              "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
-              libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
-
-       if (srxd->type == GMNAL_SMALL_MESSAGE) {
-                size_t          msglen = mlen;
-                size_t          nob;
-
-               buffer = srxd->buffer;
-               buffer += sizeof(gmnal_msghdr_t);
-               buffer += sizeof(ptl_hdr_t);
-
-               /*
-                *      map each page and create an iovec for it
-                */
-               while (kniov--) {
-                       /* HP SFS 1380: Proactively change receives to avoid a
-                        *  receive side occurrence of filling pkmap_count[].
-                        */
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
-                               kniov, kiov);
-
-                       if (offset >= kiov->kiov_len) {
-                               offset -= kiov->kiov_len;
-                       } else {
-                                nob = MIN (kiov->kiov_len - offset, msglen);
-                               CDEBUG(D_INFO, "kniov page [%p] len [%d] "
-                                       "offset[%d]\n", kiov->kiov_page,
-                                       kiov->kiov_len, kiov->kiov_offset);
-                               ptr = ((char *)kmap(kiov->kiov_page)) +
-                                        kiov->kiov_offset;
-
-                                CDEBUG(D_INFO, "processing ptr [%p] offset [%d] "
-                                       "len ["LPSZ"] from [%p] left ["LPSZ"]\n",
-                                       ptr, offset, nob, buffer, msglen);
-                                gm_bcopy(buffer, ptr + offset, nob);
-                               kunmap(kiov->kiov_page);
-                                buffer += nob;
-                                msglen -= nob;
-                                offset = 0;
-                        }
-                        kiov++;
+              "cookie[%p], kniov[%d], kiov [%p], "
+               "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+              libnal, private, cookie, nkiov, kiov, offset, mlen, rlen);
+
+       LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE);
+
+        buffer = srxd->rx_buffer;
+        buffer += sizeof(gmnal_msghdr_t);
+        buffer += sizeof(ptl_hdr_t);
+
+        while (nobleft > 0) {
+                LASSERT (nkiov > 0);
+
+                if (offset >= kiov->kiov_len) {
+                        offset -= kiov->kiov_len;
+                } else {
+                        nob = MIN (kiov->kiov_len - offset, nobleft);
+
+                        ptr = ((char *)kmap(kiov->kiov_page)) +
+                              kiov->kiov_offset;
+
+                        gm_bcopy(buffer, ptr + offset, nob);
+
+                        kunmap(kiov->kiov_page);
+
+                        buffer += nob;
+                        nobleft -= nob;
+                        offset = 0;
                }
-               CDEBUG(D_INFO, "calling gmnal_small_rx\n");
-               status = gmnal_small_rx(libnal, private, cookie);
+                kiov++;
+                nkiov--;
        }
 
-       CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
-       return(status);
+        lib_finalize(libnal, private, cookie, PTL_OK);
+
+       return PTL_OK;
 }
 
 
 ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                         ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                        unsigned int niov, struct iovec *iov, size_t offset,
-                        size_t len)
+                        unsigned int niov, struct iovec *iov, 
+                        size_t offset, size_t len)
 {
 
-       gmnal_data_t    *nal_data;
+       gmnal_ni_t      *gmnalni = libnal->libnal_data;
        void            *buffer = NULL;
        gmnal_stxd_t    *stxd = NULL;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
-               "] nid["LPU64"]\n", niov, offset, len, nid);
-       nal_data = libnal->libnal_data;
-       if (!nal_data) {
-               CERROR("no nal_data\n");
-               return(PTL_FAIL);
-       } else {
-               CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
-       }
-
-       if (gmnal_is_small_msg(nal_data, niov, iov, len)) {
-                size_t msglen = len;
-                size_t nob;
-
-               CDEBUG(D_INFO, "This is a small message send\n");
-               /*
-                * HP SFS 1380: With the change to gmnal_small_tx, need to get
-                * the stxd and do relevant setup here
-                */
-               stxd = gmnal_get_stxd(nal_data, 1);
-               CDEBUG(D_INFO, "stxd [%p]\n", stxd);
-               /* Set the offset of the data to copy into the buffer */
-               buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
-               while(niov--) {
-                       if (offset >= iov->iov_len) {
-                               offset -= iov->iov_len;
-                       } else {
-                                nob = MIN (iov->iov_len - offset, msglen);
-                                CDEBUG(D_INFO, "processing iov [%p] base [%p]"
-                                      " offset [%d] len ["LPSZ"] to [%p] left"
-                                      " ["LPSZ"]\n", iov, iov->iov_base,
-                                      offset, nob, buffer, msglen);
-                                gm_bcopy(iov->iov_base + offset, buffer, nob);
-                                buffer += nob;
-                                msglen -= nob;
-                                offset = 0;
-                       }
-                       iov++;
-               }
-               gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
-                              stxd,  len);
-       } else {
-               CERROR("Large message send is not supported\n");
-               lib_finalize(libnal, private, cookie, PTL_FAIL);
-               return(PTL_FAIL);
-               gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
-                               niov, iov, offset, len);
-       }
-       return(PTL_OK);
+        size_t           nobleft = len;
+        size_t           nob;
+        ptl_err_t        rc;
+
+       CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] "
+               "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid);
+
+        if ((nid >> 32) != 0) {
+                CERROR("Illegal nid: "LPU64"\n", nid);
+                return PTL_FAIL;
+        }
+
+        stxd = gmnal_get_stxd(gmnalni, 1);
+        CDEBUG(D_NET, "stxd [%p]\n", stxd);
+
+        /* Set the offset of the data to copy into the buffer */
+        buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
+
+        while(nobleft > 0) {
+                LASSERT (niov > 0);
+                
+                if (offset >= iov->iov_len) {
+                        offset -= iov->iov_len;
+                } else {
+                        nob = MIN (iov->iov_len - offset, nobleft);
+
+                        gm_bcopy(iov->iov_base + offset, buffer, nob);
+
+                        buffer += nob;
+                        nobleft -= nob;
+                        offset = 0;
+                }
+                niov--;
+                iov++;
+        }
+
+        rc = gmnal_small_tx(libnal, private, cookie, hdr, type, 
+                            nid, stxd,  len);
+        if (rc != PTL_OK)
+                gmnal_return_stxd(gmnalni, stxd);
+
+       return rc;
 }
 
 ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
                               lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
-                              ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
+                              ptl_nid_t nid, ptl_pid_t pid, unsigned int nkiov,
                               ptl_kiov_t *kiov, size_t offset, size_t len)
 {
 
-       gmnal_data_t    *nal_data;
-       char            *ptr;
+       gmnal_ni_t      *gmnalni = libnal->libnal_data;
        void            *buffer = NULL;
        gmnal_stxd_t    *stxd = NULL;
-       ptl_err_t       status = PTL_OK;
+        size_t           nobleft = len;
+       char            *ptr;
+       ptl_err_t        rc;
+        size_t           nob;
 
        CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
-               LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
-       nal_data = libnal->libnal_data;
-       if (!nal_data) {
-               CERROR("no nal_data\n");
-               return(PTL_FAIL);
-       } else {
-               CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
-       }
+               LPSZ"] len["LPSZ"]\n", nid, nkiov, offset, len);
+
+        if ((nid >> 32) != 0) {
+                CERROR("Illegal nid: "LPU64"\n", nid);
+                return PTL_FAIL;
+        }
+
+       stxd = gmnal_get_stxd(gmnalni, 1);
+       CDEBUG(D_NET, "stxd [%p]\n", stxd);
 
-       /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
-        * more aggressively.  This is the fix for a livelock situation under
-        * load on ia32 that occurs when there are no more available entries in
-        * the pkmap_count array.  Just fill the buffer and let gmnal_small_tx
-        * put the headers in after we pass it the stxd pointer.
-        */
-       stxd = gmnal_get_stxd(nal_data, 1);
-       CDEBUG(D_INFO, "stxd [%p]\n", stxd);
        /* Set the offset of the data to copy into the buffer */
-       buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
-
-       if (gmnal_is_small_msg(nal_data, 0, NULL, len)) {
-                size_t msglen = len;
-                size_t nob;
-
-               CDEBUG(D_INFO, "This is a small message send\n");
-
-               while(kniov--) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
-                       if (offset >= kiov->kiov_len) {
-                               offset -= kiov->kiov_len;
-                       } else {
-                                nob = MIN (kiov->kiov_len - offset, msglen);
-                               CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                                      kiov->kiov_page, kiov->kiov_len, 
-                                      kiov->kiov_offset);
-
-                               ptr = ((char *)kmap(kiov->kiov_page)) +
-                                        kiov->kiov_offset;
-
-                                CDEBUG(D_INFO, "processing ptr [%p] offset [%d]"
-                                       " len ["LPSZ"] to [%p] left ["LPSZ"]\n",
-                                       ptr, offset, nob, buffer, msglen);
-                                gm_bcopy(ptr + offset, buffer, nob);
-                               kunmap(kiov->kiov_page);
-                                buffer += nob;
-                                msglen -= nob;
-                                offset = 0;
-                       }
-                        kiov++;
-               }
-               status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
-                                       pid, stxd, len);
-       } else {
-               int     i = 0;
-               struct  iovec   *iovec = NULL, *iovec_dup = NULL;
-               ptl_kiov_t *kiov_dup = kiov;
-
-               PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
-               iovec_dup = iovec;
-               CERROR("Large message send it is not supported yet\n");
-               PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
-               return(PTL_FAIL);
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) 
-                                                + kiov->kiov_offset;
-                       iovec->iov_len = kiov->kiov_len;
-                        iovec++;
-                        kiov++;
-               }
-               gmnal_large_tx(libnal, private, cookie, hdr, type, nid, 
-                               pid, kniov, iovec, offset, len);
-               for (i=0; i<kniov; i++) {
-                       kunmap(kiov_dup->kiov_page);
-                       kiov_dup++;
-               }
-               PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
-       }
-       return(status);
+       buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
+
+        while (nobleft > 0) {
+                LASSERT (nkiov > 0);
+
+                if (offset >= kiov->kiov_len) {
+                        offset -= kiov->kiov_len;
+                } else {
+                        nob = MIN (kiov->kiov_len - offset, nobleft);
+
+                        ptr = ((char *)kmap(kiov->kiov_page)) +
+                              kiov->kiov_offset;
+
+                        gm_bcopy(ptr + offset, buffer, nob);
+
+                        kunmap(kiov->kiov_page);
+
+                        buffer += nob;
+                        nobleft -= nob;
+                        offset = 0;
+                }
+                nkiov--;
+                kiov++;
+        }
+
+        rc = gmnal_small_tx(libnal, private, cookie, hdr, type, 
+                                nid, stxd, len);
+
+        if (rc != PTL_OK)
+                gmnal_return_stxd(gmnalni, stxd);
+        
+       return rc;
 }
 
 int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
 {
        CDEBUG(D_TRACE, "gmnal_cb_dist\n");
-       if (dist)
-               *dist = 27;
-       return(PTL_OK);
+
+       if (dist != NULL)
+               *dist = 1;
+
+       return PTL_OK;
 }
index c618680..4720099 100644 (file)
 int
 gmnal_ct_thread(void *arg)
 {
-       gmnal_data_t            *nal_data;
+       gmnal_ni_t              *gmnalni;
        gm_recv_event_t         *rxevent = NULL;
        gm_recv_t               *recv = NULL;
 
        if (!arg) {
-               CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
+               CDEBUG(D_NET, "NO gmnalni. Exiting\n");
                return(-1);
        }
 
-       nal_data = (gmnal_data_t*)arg;
-       CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
+       gmnalni = (gmnal_ni_t*)arg;
+       CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
 
        sprintf(current->comm, "gmnal_ct");
 
        kportal_daemonize("gmnalctd");
 
-       nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
+       gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
 
-       spin_lock(&nal_data->gm_lock);
-       while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
+       spin_lock(&gmnalni->gmni_gm_lock);
+       while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
                CDEBUG(D_NET, "waiting\n");
-               rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
-               if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
-                       CDEBUG(D_INFO, "time to exit\n");
+               rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
+               if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
+                       CDEBUG(D_NET, "time to exit\n");
                        break;
                }
-               CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
+               CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent(rxevent));
                switch (GM_RECV_EVENT_TYPE(rxevent)) {
 
                        case(GM_RECV_EVENT):
                                CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
                                recv = (gm_recv_t*)&rxevent->recv;
-                               spin_unlock(&nal_data->gm_lock);
-                               gmnal_add_rxtwe(nal_data, recv);
-                               spin_lock(&nal_data->gm_lock);
+                               spin_unlock(&gmnalni->gmni_gm_lock);
+                               gmnal_add_rxtwe(gmnalni, recv);
+                               spin_lock(&gmnalni->gmni_gm_lock);
                                CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
                        break;
                        case(_GM_SLEEP_EVENT):
@@ -80,10 +80,10 @@ gmnal_ct_thread(void *arg)
                                 *      Don't know what this is
                                 */
                                CDEBUG(D_NET, "Sleeping in gm_unknown\n");
-                               spin_unlock(&nal_data->gm_lock);
-                               gm_unknown(nal_data->gm_port, rxevent);
-                               spin_lock(&nal_data->gm_lock);
-                               CDEBUG(D_INFO, "Awake from gm_unknown\n");
+                               spin_unlock(&gmnalni->gmni_gm_lock);
+                               gm_unknown(gmnalni->gmni_port, rxevent);
+                               spin_lock(&gmnalni->gmni_gm_lock);
+                               CDEBUG(D_NET, "Awake from gm_unknown\n");
                                break;
                                
                        default:
@@ -94,89 +94,87 @@ gmnal_ct_thread(void *arg)
                                 *      FAST_RECV_EVENTS here.
                                 */
                                CDEBUG(D_NET, "Passing event to gm_unknown\n");
-                               spin_unlock(&nal_data->gm_lock);
-                               gm_unknown(nal_data->gm_port, rxevent);
-                               spin_lock(&nal_data->gm_lock);
-                               CDEBUG(D_INFO, "Processed unknown event\n");
+                               spin_unlock(&gmnalni->gmni_gm_lock);
+                               gm_unknown(gmnalni->gmni_port, rxevent);
+                               spin_lock(&gmnalni->gmni_gm_lock);
+                               CDEBUG(D_NET, "Processed unknown event\n");
                }
        }
-       spin_unlock(&nal_data->gm_lock);
-       nal_data->ctthread_flag = GMNAL_THREAD_RESET;
-       CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
-       return(GMNAL_STATUS_OK);
+       spin_unlock(&gmnalni->gmni_gm_lock);
+       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
+       CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
+
+       return 0;
 }
 
 
 /*
  *     process a receive event
  */
-int gmnal_rx_thread(void *arg)
+int 
+gmnal_rx_thread(void *arg)
 {
         char                     name[16];
-       gmnal_data_t            *nal_data;
+       gmnal_ni_t              *gmnalni;
        void                    *buffer;
        gmnal_rxtwe_t           *we = NULL;
        int                     rank;
 
        if (!arg) {
-               CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
+               CDEBUG(D_NET, "NO gmnalni. Exiting\n");
                return(-1);
        }
 
-       nal_data = (gmnal_data_t*)arg;
-       CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
+       gmnalni = (gmnal_ni_t*)arg;
+       CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
 
        for (rank=0; rank<num_rx_threads; rank++)
-               if (nal_data->rxthread_pid[rank] == current->pid)
+               if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
                        break;
 
        snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
-
        kportal_daemonize(name);
+
        /*
         *      set 1 bit for each thread started
         *      doesn't matter which bit
         */
-       spin_lock(&nal_data->rxthread_flag_lock);
-       if (nal_data->rxthread_flag)
-               nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
+       spin_lock(&gmnalni->gmni_rxthread_flag_lock);
+       if (gmnalni->gmni_rxthread_flag)
+               gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
        else
-               nal_data->rxthread_flag = 1;
-       CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
-       spin_unlock(&nal_data->rxthread_flag_lock);
+               gmnalni->gmni_rxthread_flag = 1;
+       CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
+       spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
 
-       while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
+       while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
                CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
-               we = gmnal_get_rxtwe(nal_data);
+               we = gmnal_get_rxtwe(gmnalni);
                if (!we) {
-                       CDEBUG(D_INFO, "Receive thread time to exit\n");
+                       CDEBUG(D_NET, "Receive thread time to exit\n");
                        break;
                }
 
                buffer = we->buffer;
-               switch(((gmnal_msghdr_t*)buffer)->type) {
+               switch(((gmnal_msghdr_t*)buffer)->gmm_type) {
                case(GMNAL_SMALL_MESSAGE):
-                       gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE);
-               break;
-               case(GMNAL_LARGE_MESSAGE_INIT):
-                       gmnal_pre_receive(nal_data,we,GMNAL_LARGE_MESSAGE_INIT);
-               break;
-               case(GMNAL_LARGE_MESSAGE_ACK):
-                       gmnal_pre_receive(nal_data, we,GMNAL_LARGE_MESSAGE_ACK);
+                       gmnal_pre_receive(gmnalni, we, GMNAL_SMALL_MESSAGE);
                break;
                default:
+#warning better handling
                        CERROR("Unsupported message type\n");
-                       gmnal_rx_bad(nal_data, we, NULL);
+                       gmnal_rx_bad(gmnalni, we);
                }
                PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
        }
 
-       spin_lock(&nal_data->rxthread_flag_lock);
-       nal_data->rxthread_flag/=2;
-       CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
-       spin_unlock(&nal_data->rxthread_flag_lock);
-       CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
-       return(GMNAL_STATUS_OK);
+       spin_lock(&gmnalni->gmni_rxthread_flag_lock);
+       gmnalni->gmni_rxthread_flag/=2;
+       CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
+       spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
+       CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
+
+       return 0;
 }
 
 
@@ -188,83 +186,54 @@ int gmnal_rx_thread(void *arg)
  *     which hands back to gmnal_small_receive
  *     Deal with all endian stuff here.
  */
-int
-gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
+void
+gmnal_pre_receive(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we, int gmnal_type)
 {
        gmnal_srxd_t    *srxd = NULL;
        void            *buffer = NULL;
-       unsigned int snode, sport, type, length;
        gmnal_msghdr_t  *gmnal_msghdr;
        ptl_hdr_t       *portals_hdr;
-        int              rc;
 
-       CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
-              nal_data, we, gmnal_type);
+       CDEBUG(D_NET, "gmnalni [%p], we[%p] type [%d]\n",
+              gmnalni, we, gmnal_type);
 
        buffer = we->buffer;
-       snode = we->snode;
-       sport = we->sport;
-       type = we->type;
-       buffer = we->buffer;
-       length = we->length;
 
        gmnal_msghdr = (gmnal_msghdr_t*)buffer;
        portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t));
 
-       CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
+       CDEBUG(D_NET, "rx_event:: Sender node [%d], Sender Port [%d], "
               "type [%d], length [%d], buffer [%p]\n",
-              snode, sport, type, length, buffer);
-       CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
-              "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
-              gmnal_msghdr->magic, gmnal_msghdr->type);
-       CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
+               we->snode, we->sport, we->type, we->length, buffer);
+       CDEBUG(D_NET, "gmnal_msghdr:: Sender node [%u], magic [%d], "
+              "gmnal_type [%d]\n", gmnal_msghdr->gmm_sender_gmid,
+              gmnal_msghdr->gmm_magic, gmnal_msghdr->gmm_type);
+       CDEBUG(D_NET, "portals_hdr:: Sender node ["LPD64"], "
               "dest_node ["LPD64"]\n", portals_hdr->src_nid,
               portals_hdr->dest_nid);
 
        /*
         *      Get a receive descriptor for this message
         */
-       srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
-       CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
+       srxd = gmnal_rxbuffer_to_srxd(gmnalni, buffer);
+       CDEBUG(D_NET, "Back from gmnal_rxbuffer_to_srxd\n");
        if (!srxd) {
                CERROR("Failed to get receive descriptor\n");
-                /* I think passing a NULL srxd to lib_parse will crash
-                 * gmnal_recv() */
                 LBUG();
-               lib_parse(nal_data->libnal, portals_hdr, srxd);
-               return(GMNAL_STATUS_FAIL);
        }
 
-       /*
-        *      no need to bother portals library with this
-        */
-       if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
-               gmnal_large_tx_ack_received(nal_data, srxd);
-               return(GMNAL_STATUS_OK);
-       }
-
-       srxd->nal_data = nal_data;
-       srxd->type = gmnal_type;
-       srxd->nsiov = gmnal_msghdr->niov;
-       srxd->gm_source_node = gmnal_msghdr->sender_node_id;
+       srxd->rx_gmni = gmnalni;
+       srxd->rx_type = gmnal_type;
+       srxd->rx_nsiov = gmnal_msghdr->gmm_niov;
+       srxd->rx_sender_gmid = gmnal_msghdr->gmm_sender_gmid;
 
        CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
               buffer+sizeof(gmnal_msghdr_t));
-       /*
-        *      control passes to lib, which calls cb_recv 
-        *      cb_recv is responsible for returning the buffer 
-        *      for future receive
-        */
-       rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
 
-        if (rc != PTL_OK) {
-                /* I just received garbage; return the srxd for use */
-                CWARN("Returning srxd and discarding message, "
-                        "lib_parse didn't like it.\n");
-                return(gmnal_rx_bad(nal_data, we, srxd));
-        }
+       (void)lib_parse(gmnalni->gmni_libnal, portals_hdr, srxd);
+        /* Ignore error; we're connectionless */
 
-       return(GMNAL_STATUS_OK);
+        gmnal_rx_requeue_buffer(gmnalni, srxd);
 }
 
 
@@ -274,19 +243,15 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
  *     hang out the receive buffer again.
  *     This implicitly returns a receive token.
  */
-int
-gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
+void
+gmnal_rx_requeue_buffer(gmnal_ni_t *gmnalni, gmnal_srxd_t *srxd)
 {
-       CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
-
-       CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
+       CDEBUG(D_NET, "requeueing srxd[%p] gmnalni[%p]\n", srxd, gmnalni);
 
-       spin_lock(&nal_data->gm_lock);
-       gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
-                                       srxd->gmsize, GM_LOW_PRIORITY, 0 );
-       spin_unlock(&nal_data->gm_lock);
-
-       return(GMNAL_STATUS_OK);
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, srxd->rx_buffer,
+                                           srxd->rx_gmsize, GM_LOW_PRIORITY, 0 );
+       spin_unlock(&gmnalni->gmni_gm_lock);
 }
 
 
@@ -294,71 +259,22 @@ gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
  *     Handle a bad message
  *     A bad message is one we don't expect or can't interpret
  */
-int
-gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
+void
+gmnal_rx_bad(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we)
 {
-       CDEBUG(D_TRACE, "Can't handle message\n");
-
-       if (!srxd)
-               srxd = gmnal_rxbuffer_to_srxd(nal_data, 
-                                              we->buffer);
-       if (srxd) {
-               gmnal_rx_requeue_buffer(nal_data, srxd);
-       } else {
+        gmnal_srxd_t *srxd = gmnal_rxbuffer_to_srxd(gmnalni, 
+                                                    we->buffer);
+       if (srxd == NULL) {
                CERROR("Can't find a descriptor for this buffer\n");
-               /*
-                *      get rid of it ?
-                */
-               return(GMNAL_STATUS_FAIL);
+               return;
        }
 
-       return(GMNAL_STATUS_OK);
+        gmnal_rx_requeue_buffer(gmnalni, srxd);
 }
 
 
 
 /*
- *     Process a small message receive.
- *     Get here from gmnal_receive_thread, gmnal_pre_receive
- *     lib_parse, cb_recv
- *     Put data from prewired receive buffer into users buffer(s)
- *     Hang out the receive buffer again for another receive
- *     Call lib_finalize
- */
-ptl_err_t
-gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
-{
-       gmnal_srxd_t    *srxd = NULL;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
-
-
-       if (!private) {
-               CERROR("gmnal_small_rx no context\n");
-               lib_finalize(libnal, private, cookie, PTL_FAIL);
-               return(PTL_FAIL);
-       }
-
-       srxd = (gmnal_srxd_t*)private;
-
-       /*
-        *      let portals library know receive is complete
-        */
-       CDEBUG(D_PORTALS, "calling lib_finalize\n");
-       lib_finalize(libnal, private, cookie, PTL_OK);
-       /*
-        *      return buffer so it can be used again
-        */
-       CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
-       spin_lock(&nal_data->gm_lock);
-       gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
-                                          srxd->gmsize, GM_LOW_PRIORITY, 0);
-       spin_unlock(&nal_data->gm_lock);
-
-       return(PTL_OK);
-}
-
-
-/*
  *     Start a small transmit. 
  *     Use the given send token (and wired transmit buffer).
  *     Copy headers to wired buffer and initiate gm_send from the wired buffer.
@@ -366,83 +282,81 @@ gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
  */
 ptl_err_t
 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-               ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
+               ptl_hdr_t *hdr, int type, ptl_nid_t nid,
                gmnal_stxd_t *stxd, int size)
 {
-       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
+       gmnal_ni_t      *gmnalni = (gmnal_ni_t*)libnal->libnal_data;
        void            *buffer = NULL;
        gmnal_msghdr_t  *msghdr = NULL;
        int             tot_size = 0;
-       unsigned int    local_nid;
        gm_status_t     gm_status = GM_SUCCESS;
 
-       CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
-              "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
+       CDEBUG(D_NET, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
+              "hdr [%p] type [%d] nid ["LPU64"] stxd [%p] "
               "size [%d]\n", libnal, private, cookie, hdr, type,
-              global_nid, pid, stxd, size);
+              nid, stxd, size);
 
-       CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
+       CDEBUG(D_NET, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
               hdr->dest_nid, hdr->src_nid);
 
-       if (!nal_data) {
-               CERROR("no nal_data\n");
-               return(PTL_FAIL);
-       } else {
-               CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
-       }
+        LASSERT ((nid >> 32) == 0);
+        LASSERT (gmnalni != NULL);
+
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid, 
+                                            &stxd->tx_gmlid);
+       spin_unlock(&gmnalni->gmni_gm_lock);
 
-       spin_lock(&nal_data->gm_lock);
-       gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
-                                           &local_nid);
-       spin_unlock(&nal_data->gm_lock);
        if (gm_status != GM_SUCCESS) {
                CERROR("Failed to obtain local id\n");
                return(PTL_FAIL);
        }
-       CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
 
-       stxd->type = GMNAL_SMALL_MESSAGE;
-       stxd->cookie = cookie;
+       CDEBUG(D_NET, "Local Node_id is [%u][%x]\n", 
+               stxd->tx_gmlid, stxd->tx_gmlid);
+
+        stxd->tx_nid = nid;
+       stxd->tx_cookie = cookie;
+       stxd->tx_type = GMNAL_SMALL_MESSAGE;
+       stxd->tx_gm_priority = GM_LOW_PRIORITY;
 
        /*
         *      Copy gmnal_msg_hdr and portals header to the transmit buffer
         *      Then send the message, as the data has previously been copied in
         *      (HP SFS 1380).
         */
-       buffer = stxd->buffer;
+       buffer = stxd->tx_buffer;
        msghdr = (gmnal_msghdr_t*)buffer;
 
-       msghdr->magic = GMNAL_MAGIC;
-       msghdr->type = GMNAL_SMALL_MESSAGE;
-       msghdr->sender_node_id = nal_data->gm_global_nid;
-       CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
+       msghdr->gmm_magic = GMNAL_MAGIC;
+       msghdr->gmm_type = GMNAL_SMALL_MESSAGE;
+       msghdr->gmm_sender_gmid = gmnalni->gmni_global_gmid;
+       CDEBUG(D_NET, "processing msghdr at [%p]\n", buffer);
 
        buffer += sizeof(gmnal_msghdr_t);
 
-       CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
+       CDEBUG(D_NET, "processing  portals hdr at [%p]\n", buffer);
        gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
 
        buffer += sizeof(ptl_hdr_t);
 
-       CDEBUG(D_INFO, "sending\n");
+       CDEBUG(D_NET, "sending\n");
        tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
-       stxd->msg_size = tot_size;
-
+       stxd->tx_msg_size = tot_size;
 
        CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
-              "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
-              "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
-              stxd->msg_size, global_nid, local_nid, stxd);
-
-       spin_lock(&nal_data->gm_lock);
-       stxd->gm_priority = GM_LOW_PRIORITY;
-       stxd->gm_target_node = local_nid;
-       gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
-                                     stxd->gm_size, stxd->msg_size,
-                                     GM_LOW_PRIORITY, local_nid,
+              "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
+              "stxd [%p]\n", gmnalni->gmni_port, stxd->tx_buffer, 
+               stxd->tx_gm_size, stxd->tx_msg_size, nid, stxd->tx_gmlid, 
+               stxd);
+
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_send_to_peer_with_callback(gmnalni->gmni_port, stxd->tx_buffer,
+                                     stxd->tx_gm_size, stxd->tx_msg_size,
+                                      stxd->tx_gm_priority, stxd->tx_gmlid,
                                      gmnal_small_tx_callback, (void*)stxd);
-       spin_unlock(&nal_data->gm_lock);
-       CDEBUG(D_INFO, "done\n");
+       spin_unlock(&gmnalni->gmni_gm_lock);
+       CDEBUG(D_NET, "done\n");
 
        return(PTL_OK);
 }
@@ -459,29 +373,17 @@ void
 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
 {
        gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       lib_msg_t       *cookie = stxd->cookie;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
-       lib_nal_t       *libnal = nal_data->libnal;
-       unsigned         gnid = 0;
-       gm_status_t      gm_status = 0;
+       lib_msg_t       *cookie = stxd->tx_cookie;
+       gmnal_ni_t      *gmnalni = stxd->tx_gmni;
+       lib_nal_t       *libnal = gmnalni->gmni_libnal;
 
        if (!stxd) {
-               CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
+               CDEBUG(D_NET, "send completion event for unknown stxd\n");
                return;
        }
-       if (status != GM_SUCCESS) {
-               spin_lock(&nal_data->gm_lock);
-               gm_status = gm_node_id_to_global_id(nal_data->gm_port,
-                                                   stxd->gm_target_node,&gnid);
-               spin_unlock(&nal_data->gm_lock);
-               if (gm_status != GM_SUCCESS) {
-                       CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
-                              gm_status);
-                       gnid = 0;
-               }
-               CERROR("Result of send stxd [%p] is [%s] to [%u]\n",
-                      stxd, gmnal_gm_error(status), gnid);
-       }
+       if (status != GM_SUCCESS)
+               CERROR("Result of send stxd [%p] is [%s] to ["LPU64"]\n",
+                      stxd, gmnal_gm_error(status), stxd->tx_nid);
 
        switch(status) {
                case(GM_SUCCESS):
@@ -494,28 +396,28 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
                 *      do a resend on the dropped ones
                 */
                        CERROR("send stxd [%p] dropped, resending\n", context);
-                       spin_lock(&nal_data->gm_lock);
-                       gm_send_to_peer_with_callback(nal_data->gm_port,
-                                                     stxd->buffer,
-                                                     stxd->gm_size,
-                                                     stxd->msg_size,
-                                                     stxd->gm_priority,
-                                                     stxd->gm_target_node,
+                       spin_lock(&gmnalni->gmni_gm_lock);
+                       gm_send_to_peer_with_callback(gmnalni->gmni_port,
+                                                     stxd->tx_buffer,
+                                                     stxd->tx_gm_size,
+                                                     stxd->tx_msg_size,
+                                                     stxd->tx_gm_priority,
+                                                     stxd->tx_gmlid,
                                                      gmnal_small_tx_callback,
                                                      context);
-                       spin_unlock(&nal_data->gm_lock);
+                       spin_unlock(&gmnalni->gmni_gm_lock);
                return;
                case(GM_TIMED_OUT):
                case(GM_SEND_TIMED_OUT):
                /*
                 *      drop these ones
                 */
-                       CDEBUG(D_INFO, "calling gm_drop_sends\n");
-                       spin_lock(&nal_data->gm_lock);
-                       gm_drop_sends(nal_data->gm_port, stxd->gm_priority, 
-                                     stxd->gm_target_node, gm_port_id, 
+                       CDEBUG(D_NET, "calling gm_drop_sends\n");
+                       spin_lock(&gmnalni->gmni_gm_lock);
+                       gm_drop_sends(gmnalni->gmni_port, stxd->tx_gm_priority, 
+                                     stxd->tx_gmlid, gm_port_id, 
                                      gmnal_drop_sends_callback, context);
-                       spin_unlock(&nal_data->gm_lock);
+                       spin_unlock(&gmnalni->gmni_gm_lock);
 
                return;
 
@@ -566,29 +468,14 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
                case(GM_FIRMWARE_NOT_RUNNING):
                case(GM_YP_NO_MATCH):
                default:
-                gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
-                                      stxd->gm_target_node, gm_port_id,
-                                      gmnal_resume_sending_callback, context);
+                gm_resume_sending(gmnalni->gmni_port, stxd->tx_gm_priority,
+                                  stxd->tx_gmlid, gm_port_id,
+                                  gmnal_resume_sending_callback, context);
                 return;
 
        }
 
-       /*
-        *      TO DO
-        *      If this is a large message init,
-        *      we're not finished with the data yet,
-        *      so can't call lib_finalise.
-        *      However, we're also holding on to a 
-        *      stxd here (to keep track of the source
-        *      iovec only). Should use another structure
-        *      to keep track of iovec and return stxd to 
-        *      free list earlier.
-        */
-       if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
-               CDEBUG(D_INFO, "large transmit done\n");
-               return;
-       }
-       gmnal_return_stxd(nal_data, stxd);
+       gmnal_return_stxd(gmnalni, stxd);
        lib_finalize(libnal, stxd, cookie, PTL_OK);
        return;
 }
@@ -601,10 +488,11 @@ void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
                                  gm_status_t status)
 {
         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-        gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
-        CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
-        gmnal_return_stxd(stxd->nal_data, stxd);
-        lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL);
+        gmnal_ni_t     *gmnalni = stxd->tx_gmni;
+
+        CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
+        gmnal_return_stxd(gmnalni, stxd);
+        lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
         return;
 }
 
@@ -613,735 +501,28 @@ void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
                                gm_status_t status)
 {
        gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       gmnal_data_t    *nal_data = stxd->nal_data;
+       gmnal_ni_t      *gmnalni = stxd->tx_gmni;
 
-       CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
+       CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
        if (status == GM_SUCCESS) {
-               spin_lock(&nal_data->gm_lock);
-               gm_send_to_peer_with_callback(gm_port, stxd->buffer, 
-                                             stxd->gm_size, stxd->msg_size, 
-                                             stxd->gm_priority, 
-                                             stxd->gm_target_node, 
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_send_to_peer_with_callback(gm_port, stxd->tx_buffer, 
+                                             stxd->tx_gm_size, 
+                                              stxd->tx_msg_size, 
+                                             stxd->tx_gm_priority, 
+                                             stxd->tx_gmlid, 
                                              gmnal_small_tx_callback, 
                                              context);
-               spin_unlock(&nal_data->gm_lock);
+               spin_unlock(&gmnalni->gmni_gm_lock);
        } else {
                CERROR("send_to_peer status for stxd [%p] is "
                       "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
                 /* Recycle the stxd */
-               gmnal_return_stxd(nal_data, stxd);
-               lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL);
-       }
-
-       return;
-}
-
-
-/*
- *     Begine a large transmit.
- *     Do a gm_register of the memory pointed to by the iovec 
- *     and send details to the receiver. The receiver does a gm_get
- *     to pull the data and sends and ack when finished. Upon receipt of
- *     this ack, deregister the memory. Only 1 send token is required here.
- */
-int
-gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
-               unsigned int niov, struct iovec *iov, size_t offset, int size)
-{
-
-       gmnal_data_t    *nal_data;
-       gmnal_stxd_t    *stxd = NULL;
-       void            *buffer = NULL;
-       gmnal_msghdr_t  *msghdr = NULL;
-       unsigned int    local_nid;
-       int             mlen = 0;       /* the size of the init message data */
-       struct iovec    *iov_dup = NULL;
-       gm_status_t     gm_status;
-       int             niov_dup;
-
-
-       CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
-              "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
-              "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type, 
-              global_nid, pid, niov, iov, size);
-
-       if (libnal)
-               nal_data = (gmnal_data_t*)libnal->libnal_data;
-       else  {
-               CERROR("no libnal.\n");
-               return(GMNAL_STATUS_FAIL);
-       }
-       
-
-       /*
-        *      Get stxd and buffer. Put local address of data in buffer, 
-        *      send local addresses to target, 
-        *      wait for the target node to suck the data over.
-        *      The stxd is used to ren
-        */
-       stxd = gmnal_get_stxd(nal_data, 1);
-       CDEBUG(D_INFO, "stxd [%p]\n", stxd);
-
-       stxd->type = GMNAL_LARGE_MESSAGE_INIT;
-       stxd->cookie = cookie;
-
-       /*
-        *      Copy gmnal_msg_hdr and portals header to the transmit buffer
-        *      Then copy the iov in
-        */
-       buffer = stxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-
-       CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
-       msghdr->magic = GMNAL_MAGIC;
-       msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
-       msghdr->sender_node_id = nal_data->gm_global_nid;
-       msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
-       msghdr->niov = niov ;
-       buffer += sizeof(gmnal_msghdr_t);
-       mlen = sizeof(gmnal_msghdr_t);
-       CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
-
-       CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
-
-       gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
-       buffer += sizeof(ptl_hdr_t);
-       mlen += sizeof(ptl_hdr_t); 
-       CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
-        while (offset >= iov->iov_len) {
-                offset -= iov->iov_len;
-                niov--;
-                iov++;
-        } 
-
-        LASSERT(offset >= 0);
-        /*
-        *      Store the iovs in the stxd for we can get 
-        *      them later if we need them
-        */
-        stxd->iov[0].iov_base = iov->iov_base + offset; 
-        stxd->iov[0].iov_len = iov->iov_len - offset; 
-       CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
-        if (niov > 1)
-               gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
-       stxd->niov = niov;
-
-       /*
-        *      copy the iov to the buffer so target knows 
-        *      where to get the data from
-        */
-       CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
-       gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
-       mlen += stxd->niov*(sizeof(struct iovec));
-       CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-       
-       /*
-        *      register the memory so the NIC can get hold of the data
-        *      This is a slow process. it'd be good to overlap it 
-        *      with something else.
-        */
-        iov = stxd->iov;
-       iov_dup = iov;
-       niov_dup = niov;
-       while(niov--) {
-               CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
-                      iov->iov_base, iov->iov_len);
-               spin_lock(&nal_data->gm_lock);
-               gm_status = gm_register_memory(nal_data->gm_port, 
-                                              iov->iov_base, iov->iov_len);
-               if (gm_status != GM_SUCCESS) {
-                       spin_unlock(&nal_data->gm_lock);
-                       CERROR("gm_register_memory returns [%d][%s] "
-                              "for memory [%p] len ["LPSZ"]\n", 
-                              gm_status, gmnal_gm_error(gm_status), 
-                              iov->iov_base, iov->iov_len);
-                       spin_lock(&nal_data->gm_lock);
-                       while (iov_dup != iov) {
-                               gm_deregister_memory(nal_data->gm_port, 
-                                                    iov_dup->iov_base, 
-                                                    iov_dup->iov_len);
-                               iov_dup++;
-                       }
-                       spin_unlock(&nal_data->gm_lock);
-                       gmnal_return_stxd(nal_data, stxd);
-                       return(PTL_FAIL);
-               }
-
-               spin_unlock(&nal_data->gm_lock);
-               iov++;
-       }
-
-       /*
-        *      Send the init message to the target
-        */
-       CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
-       spin_lock(&nal_data->gm_lock);
-       gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
-                                           &local_nid);
-       if (gm_status != GM_SUCCESS) {
-               spin_unlock(&nal_data->gm_lock);
-               CERROR("Failed to obtain local id\n");
-               gmnal_return_stxd(nal_data, stxd);
-               /* TO DO deregister memory on failure */
-               return(GMNAL_STATUS_FAIL);
-       }
-       CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
-       gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
-                                     stxd->gm_size, mlen, GM_LOW_PRIORITY, 
-                                     local_nid, gmnal_large_tx_callback, 
-                                     (void*)stxd);
-       spin_unlock(&nal_data->gm_lock);
-
-       CDEBUG(D_INFO, "done\n");
-
-       return(PTL_OK);
-}
-
-/*
- *     Callback function indicates that send of buffer with 
- *     large message iovec has completed (or failed).
- */
-void 
-gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
-{
-       gmnal_small_tx_callback(gm_port, context, status);
-
-}
-
-
-
-/*
- *     Have received a buffer that contains an iovec of the sender. 
- *     Do a gm_register_memory of the receivers buffer and then do a get
- *     data from the sender.
- */
-int
-gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               unsigned int nriov, struct iovec *riov, size_t offset, 
-               size_t mlen, size_t rlen)
-{
-       gmnal_data_t    *nal_data = libnal->libnal_data;
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       void            *buffer = NULL;
-       struct  iovec   *riov_dup;
-       int             nriov_dup;
-       gmnal_msghdr_t  *msghdr = NULL;
-       gm_status_t     gm_status;
-
-       CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
-              "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
-               libnal, private, cookie, nriov, riov, mlen, rlen);
-
-       if (!srxd) {
-               CERROR("gmnal_large_rx no context\n");
-               lib_finalize(libnal, private, cookie, PTL_FAIL);
-               return(PTL_FAIL);
-       }
-
-       buffer = srxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-       buffer += sizeof(gmnal_msghdr_t);
-       buffer += sizeof(ptl_hdr_t);
-
-       /*
-        *      Store the senders stxd address in the srxd for this message
-        *      The gmnal_large_message_ack needs it to notify the sender
-        *      the pull of data is complete
-        */
-       srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
-
-       /*
-        *      Register the receivers memory
-        *      get the data,
-        *      tell the sender that we got the data
-        *      then tell the receiver we got the data
-        *      TO DO
-        *      If the iovecs match, could interleave 
-        *      gm_registers and gm_gets for each element
-        */
-        while (offset >= riov->iov_len) {
-                offset -= riov->iov_len;
-                riov++;
-                nriov--;
-        } 
-        LASSERT (nriov >= 0);
-        LASSERT (offset >= 0);
-       /*
-        *      do this so the final gm_get callback can deregister the memory
-        */
-       PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
-
-        srxd->riov[0].iov_base = riov->iov_base + offset;
-        srxd->riov[0].iov_len = riov->iov_len - offset;
-        if (nriov > 1)
-               gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
-       srxd->nriov = nriov;
-
-        riov = srxd->riov;
-       nriov_dup = nriov;
-       riov_dup = riov;
-       while(nriov--) {
-               CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
-                      riov->iov_base, riov->iov_len);
-               spin_lock(&nal_data->gm_lock);
-               gm_status = gm_register_memory(nal_data->gm_port,
-                                              riov->iov_base, riov->iov_len);
-               if (gm_status != GM_SUCCESS) {
-                       spin_unlock(&nal_data->gm_lock);
-                       CERROR("gm_register_memory returns [%d][%s] "
-                              "for memory [%p] len ["LPSZ"]\n",
-                              gm_status, gmnal_gm_error(gm_status),
-                              riov->iov_base, riov->iov_len);
-                       spin_lock(&nal_data->gm_lock);
-                       while (riov_dup != riov) {
-                               gm_deregister_memory(nal_data->gm_port, 
-                                                    riov_dup->iov_base, 
-                                                    riov_dup->iov_len);
-                               riov_dup++;
-                       }
-                       spin_lock(&nal_data->gm_lock);
-                       /*
-                        *      give back srxd and buffer. Send NACK to sender
-                        */
-                        PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
-                       return(PTL_FAIL);
-               }
-               spin_unlock(&nal_data->gm_lock);
-               riov++;
+               gmnal_return_stxd(gmnalni, stxd);
+               lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
        }
 
-       /*
-        *      now do gm_get to get the data
-        */
-       srxd->cookie = cookie;
-       if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
-                             nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
-               CERROR("can't get the data");
-       }
-
-       CDEBUG(D_INFO, "lgmanl_large_rx done\n");
-
-       return(PTL_OK);
-}
-
-
-/*
- *     Perform a number of remote gets as part of receiving 
- *     a large message.
- *     The final one to complete (i.e. the last callback to get called)
- *     tidies up.
- *     gm_get requires a send token.
- */
-int
-gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, 
-                 int nriov, struct iovec *riov)
-{
-
-       int     ncalls = 0;
-
-       CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
-              "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
-
-
-       ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
-       if (ncalls < 0) {
-               CERROR("there's something wrong with the iovecs\n");
-               return(GMNAL_STATUS_FAIL);
-       }
-       CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
-       spin_lock_init(&srxd->callback_lock);
-       srxd->ncallbacks = ncalls;
-       srxd->callback_status = 0;
-
-       ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
-       if (ncalls < 0) {
-               CERROR("there's something wrong with the iovecs\n");
-               return(GMNAL_STATUS_FAIL);
-       }
-
-       return(GMNAL_STATUS_OK);
-
-}
-
-
-/*
- *     pull data from source node (source iovec) to a local iovec.
- *     The iovecs may not match which adds the complications below.
- *     Count the number of gm_gets that will be required so the callbacks
- *     can determine who is the last one.
- */    
-int
-gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, 
-              struct iovec *siov, int nriov, struct iovec *riov)
-{
-
-       int     ncalls = 0;
-       int     slen = siov->iov_len, rlen = riov->iov_len;
-       char    *sbuf = siov->iov_base, *rbuf = riov->iov_base; 
-       unsigned long   sbuf_long;
-       gm_remote_ptr_t remote_ptr = 0;
-       unsigned int    source_node;
-       gmnal_ltxd_t    *ltxd = NULL;
-       gmnal_data_t    *nal_data = srxd->nal_data;
-
-       CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
-       if (do_copy) {
-               if (!nal_data) {
-                       CERROR("Bad args No nal_data\n");
-                       return(GMNAL_STATUS_FAIL);
-               }
-               spin_lock(&nal_data->gm_lock);
-               if (gm_global_id_to_node_id(nal_data->gm_port,
-                                           srxd->gm_source_node,
-                                           &source_node) != GM_SUCCESS) {
-
-                       CERROR("cannot resolve global_id [%u] "
-                              "to local node_id\n", srxd->gm_source_node);
-                       spin_unlock(&nal_data->gm_lock);
-                       return(GMNAL_STATUS_FAIL);
-               }
-               spin_unlock(&nal_data->gm_lock);
-               /*
-                *      We need a send token to use gm_get
-                *      getting an stxd gets us a send token.
-                *      the stxd is used as the context to the
-                *      callback function (so stxd can be returned).
-                *      Set pointer in stxd to srxd so callback count in srxd
-                *      can be decremented to find last callback to complete
-                */
-               CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
-                      srxd->gm_source_node, source_node);
-       }
-
-       do {
-               CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
-                               sbuf, slen, rbuf, rlen);
-               if (slen > rlen) {
-                       ncalls++;
-                       if (do_copy) {
-                               CDEBUG(D_INFO, "slen>rlen\n");
-                               ltxd = gmnal_get_ltxd(nal_data);
-                               ltxd->srxd = srxd;
-                               spin_lock(&nal_data->gm_lock);
-                               /* 
-                                *      funny business to get rid 
-                                *      of compiler warning 
-                                */
-                               sbuf_long = (unsigned long) sbuf;
-                               remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
-                                      rlen, GM_LOW_PRIORITY, source_node,
-                                      gm_port_id,
-                                      gmnal_remote_get_callback, ltxd);
-                               spin_unlock(&nal_data->gm_lock);
-                       }
-                       /*
-                        *      at the end of 1 iov element
-                        */
-                       sbuf+=rlen;
-                       slen-=rlen;
-                       riov++;
-                       nriov--;
-                       rbuf = riov->iov_base;
-                       rlen = riov->iov_len;
-               } else if (rlen > slen) {
-                       ncalls++;
-                       if (do_copy) {
-                               CDEBUG(D_INFO, "slen<rlen\n");
-                               ltxd = gmnal_get_ltxd(nal_data);
-                               ltxd->srxd = srxd;
-                               spin_lock(&nal_data->gm_lock);
-                               sbuf_long = (unsigned long) sbuf;
-                               remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
-                                      slen, GM_LOW_PRIORITY, source_node,
-                                      gm_port_id,
-                                      gmnal_remote_get_callback, ltxd);
-                               spin_unlock(&nal_data->gm_lock);
-                       }
-                       /*
-                        *      at end of siov element
-                        */
-                       rbuf+=slen;
-                       rlen-=slen;
-                       siov++;
-                       sbuf = siov->iov_base;
-                       slen = siov->iov_len;
-               } else {
-                       ncalls++;
-                       if (do_copy) {
-                               CDEBUG(D_INFO, "rlen=slen\n");
-                               ltxd = gmnal_get_ltxd(nal_data);
-                               ltxd->srxd = srxd;
-                               spin_lock(&nal_data->gm_lock);
-                               sbuf_long = (unsigned long) sbuf;
-                               remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
-                                      rlen, GM_LOW_PRIORITY, source_node,
-                                      gm_port_id,
-                                      gmnal_remote_get_callback, ltxd);
-                               spin_unlock(&nal_data->gm_lock);
-                       }
-                       /*
-                        *      at end of siov and riov element
-                        */
-                       siov++;
-                       sbuf = siov->iov_base;
-                       slen = siov->iov_len;
-                       riov++;
-                       nriov--;
-                       rbuf = riov->iov_base;
-                       rlen = riov->iov_len;
-               }
-
-       } while (nriov);
-       return(ncalls);
-}
-
-
-/*
- *     The callback function that is invoked after each gm_get call completes.
- *     Multiple callbacks may be invoked for 1 transaction, only the final
- *     callback has work to do.
- */
-void
-gmnal_remote_get_callback(gm_port_t *gm_port, void *context, 
-                          gm_status_t status)
-{
-
-       gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
-       gmnal_srxd_t    *srxd = ltxd->srxd;
-       lib_nal_t       *libnal = srxd->nal_data->libnal;
-       int             lastone;
-       struct  iovec   *riov;
-       int             nriov;
-       gmnal_data_t    *nal_data;
-
-       CDEBUG(D_TRACE, "called for context [%p]\n", context);
-
-       if (status != GM_SUCCESS) {
-               CERROR("reports error [%d/%s]\n",status,gmnal_gm_error(status));
-       }
-
-       spin_lock(&srxd->callback_lock);
-       srxd->ncallbacks--;
-       srxd->callback_status |= status;
-       lastone = srxd->ncallbacks?0:1;
-       spin_unlock(&srxd->callback_lock);
-       nal_data = srxd->nal_data;
-
-       /*
-        *      everyone returns a send token
-        */
-       gmnal_return_ltxd(nal_data, ltxd);
-
-       if (!lastone) {
-               CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
-               return;
-       }
-
-       /*
-        *      Let our client application proceed
-        */
-       CERROR("final callback context[%p]\n", srxd);
-       lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
-
-       /*
-        *      send an ack to the sender to let him know we got the data
-        */
-       gmnal_large_tx_ack(nal_data, srxd);
-
-       /*
-        *      Unregister the memory that was used
-        *      This is a very slow business (slower then register)
-        */
-       nriov = srxd->nriov;
-       riov = srxd->riov;
-       spin_lock(&nal_data->gm_lock);
-       while (nriov--) {
-               CERROR("deregister memory [%p]\n", riov->iov_base);
-               if (gm_deregister_memory(srxd->nal_data->gm_port,
-                                        riov->iov_base, riov->iov_len)) {
-                       CERROR("failed to deregister memory [%p]\n",
-                              riov->iov_base);
-               }
-               riov++;
-       }
-       spin_unlock(&nal_data->gm_lock);
-       PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
-
-       /*
-        *      repost the receive buffer (return receive token)
-        */
-       spin_lock(&nal_data->gm_lock);
-       gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
-                                          srxd->gmsize, GM_LOW_PRIORITY, 0);   
-       spin_unlock(&nal_data->gm_lock);
-       
        return;
 }
 
 
-/*
- *     Called on target node.
- *     After pulling data from a source node
- *     send an ack message to indicate the large transmit is complete.
- */
-void 
-gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
-{
-
-       gmnal_stxd_t    *stxd;
-       gmnal_msghdr_t *msghdr;
-       void            *buffer = NULL;
-       unsigned int    local_nid;
-       gm_status_t     gm_status = GM_SUCCESS;
-
-       CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
-              srxd->gm_source_node);
-
-       spin_lock(&nal_data->gm_lock);
-       gm_status = gm_global_id_to_node_id(nal_data->gm_port, 
-                                           srxd->gm_source_node, &local_nid);
-       spin_unlock(&nal_data->gm_lock);
-       if (gm_status != GM_SUCCESS) {
-               CERROR("Failed to obtain local id\n");
-               return;
-       }
-       CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
-
-       stxd = gmnal_get_stxd(nal_data, 1);
-       CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
-
-       stxd->nal_data = nal_data;
-       stxd->type = GMNAL_LARGE_MESSAGE_ACK;
-
-       /*
-        *      Copy gmnal_msg_hdr and portals header to the transmit buffer
-        *      Then copy the data in
-        */
-       buffer = stxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-
-       /*
-        *      Add in the address of the original stxd from the sender node
-        *      so it knows which thread to notify.
-        */
-       msghdr->magic = GMNAL_MAGIC;
-       msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
-       msghdr->sender_node_id = nal_data->gm_global_nid;
-       msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
-       CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
-       CDEBUG(D_INFO, "sending\n");
-       stxd->msg_size= sizeof(gmnal_msghdr_t);
-
-
-       CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
-              "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
-              "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
-              stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
-       spin_lock(&nal_data->gm_lock);
-       stxd->gm_priority = GM_LOW_PRIORITY;
-       stxd->gm_target_node = local_nid;
-       gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
-                                     stxd->gm_size, stxd->msg_size,
-                                     GM_LOW_PRIORITY, local_nid,
-                                     gmnal_large_tx_ack_callback,
-                                     (void*)stxd);
-
-       spin_unlock(&nal_data->gm_lock);
-       CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
-
-       return;
-}
-
-
-/*
- *     A callback to indicate the small transmit operation is compete
- *     Check for errors and try to deal with them.
- *     Call lib_finalise to inform the client application that the
- *     send is complete and the memory can be reused.
- *     Return the stxd when finished with it (returns a send token)
- */
-void
-gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
-                            gm_status_t status)
-{
-       gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
-
-       if (!stxd) {
-               CERROR("send completion event for unknown stxd\n");
-               return;
-       }
-       CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
-              stxd, status);
-       gmnal_return_stxd(stxd->nal_data, stxd);
-
-       spin_unlock(&nal_data->gm_lock);
-       return;
-}
-
-/*
- *     Indicates the large transmit operation is compete.
- *     Called on transmit side (means data has been pulled  by receiver 
- *     or failed).
- *     Call lib_finalise to inform the client application that the send 
- *     is complete, deregister the memory and return the stxd. 
- *     Finally, report the rx buffer that the ack message was delivered in.
- */
-void 
-gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
-{
-       lib_nal_t       *libnal = nal_data->libnal;
-       gmnal_stxd_t    *stxd = NULL;
-       gmnal_msghdr_t  *msghdr = NULL;
-       void            *buffer = NULL;
-       struct  iovec   *iov;
-
-
-       CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
-
-       buffer = srxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-       stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
-
-       CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
-
-       lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
-
-       /*
-        *      extract the iovec from the stxd, deregister the memory.
-        *      free the space used to store the iovec
-        */
-       iov = stxd->iov;
-       while(stxd->niov--) {
-               CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
-                      iov->iov_base, iov->iov_len);
-               spin_lock(&nal_data->gm_lock);
-               gm_deregister_memory(nal_data->gm_port, iov->iov_base, 
-                                    iov->iov_len);
-               spin_unlock(&nal_data->gm_lock);
-               iov++;
-       }
-
-       /*
-        *      return the send token
-        *      TO DO It is bad to hold onto the send token so long?
-        */
-       gmnal_return_stxd(nal_data, stxd);
-
-
-       /*
-        *      requeue the receive buffer 
-        */
-       gmnal_rx_requeue_buffer(nal_data, srxd);
-       
-
-       return;
-}
index 9fa2ea5..7a7a907 100644 (file)
@@ -22,7 +22,6 @@
 #include "gmnal.h"
 
 
-int gmnal_small_msg_size = sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t) + PTL_MTU + 928;
 /*
  *      -1 indicates default value.
  *      This is 1 thread per cpu
@@ -35,16 +34,16 @@ int gm_port_id = 4;
 int
 gmnal_cmd(struct portals_cfg *pcfg, void *private)
 {
-       gmnal_data_t    *nal_data = NULL;
+       gmnal_ni_t      *gmnalni = NULL;
        char            *name = NULL;
        int             nid = -2;
-       int             gnid;
+       int             gmid;
        gm_status_t     gm_status;
 
 
        CDEBUG(D_TRACE, "gmnal_cmd [%d] private [%p]\n",
               pcfg->pcfg_command, private);
-       nal_data = (gmnal_data_t*)private;
+       gmnalni = (gmnal_ni_t*)private;
        switch(pcfg->pcfg_command) {
        /*
         * just reuse already defined GET_NID. Should define GMNAL version
@@ -54,31 +53,31 @@ gmnal_cmd(struct portals_cfg *pcfg, void *private)
                PORTAL_ALLOC(name, pcfg->pcfg_plen1);
                copy_from_user(name, PCFG_PBUF(pcfg, 1), pcfg->pcfg_plen1);
 
-               spin_lock(&nal_data->gm_lock);
-               //nid = gm_host_name_to_node_id(nal_data->gm_port, name);
-                gm_status = gm_host_name_to_node_id_ex(nal_data->gm_port, 0,
+               spin_lock(&gmnalni->gmni_gm_lock);
+               //nid = gm_host_name_to_node_id(gmnalni->gmni_port, name);
+                gm_status = gm_host_name_to_node_id_ex(gmnalni->gmni_port, 0,
                                                        name, &nid);
-               spin_unlock(&nal_data->gm_lock);
+               spin_unlock(&gmnalni->gmni_gm_lock);
                 if (gm_status != GM_SUCCESS) {
-                        CDEBUG(D_INFO, "gm_host_name_to_node_id_ex(...host %s) "
+                        CDEBUG(D_NET, "gm_host_name_to_node_id_ex(...host %s) "
                                "failed[%d]\n", name, gm_status);
                         return (-1);
                 } else
-                       CDEBUG(D_INFO, "Local node %s id is [%d]\n", name, nid);
-               spin_lock(&nal_data->gm_lock);
-               gm_status = gm_node_id_to_global_id(nal_data->gm_port,
-                                                   nid, &gnid);
-               spin_unlock(&nal_data->gm_lock);
+                       CDEBUG(D_NET, "Local node %s id is [%d]\n", name, nid);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_status = gm_node_id_to_global_id(gmnalni->gmni_port,
+                                                   nid, &gmid);
+               spin_unlock(&gmnalni->gmni_gm_lock);
                if (gm_status != GM_SUCCESS) {
-                       CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
+                       CDEBUG(D_NET, "gm_node_id_to_global_id failed[%d]\n",
                               gm_status);
                        return(-1);
                }
-               CDEBUG(D_INFO, "Global node is is [%u][%x]\n", gnid, gnid);
-               copy_to_user(PCFG_PBUF(pcfg, 2), &gnid, pcfg->pcfg_plen2);
+               CDEBUG(D_NET, "Global node is is [%u][%x]\n", gmid, gmid);
+               copy_to_user(PCFG_PBUF(pcfg, 2), &gmid, pcfg->pcfg_plen2);
        break;
        default:
-               CDEBUG(D_INFO, "gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command);
+               CDEBUG(D_NET, "gmnal_cmd UNKNOWN[%d]\n", pcfg->pcfg_command);
                pcfg->pcfg_nid2 = -1;
        }
 
@@ -94,16 +93,16 @@ gmnal_load(void)
        CDEBUG(D_TRACE, "This is the gmnal module initialisation routine\n");
 
 
-       CDEBUG(D_INFO, "Calling gmnal_init\n");
+       CDEBUG(D_NET, "Calling gmnal_init\n");
         status = gmnal_init();
        if (status == PTL_OK) {
-               CDEBUG(D_INFO, "Portals GMNAL initialised ok\n");
+               CDEBUG(D_NET, "Portals GMNAL initialised ok\n");
        } else {
-               CDEBUG(D_INFO, "Portals GMNAL Failed to initialise\n");
+               CDEBUG(D_NET, "Portals GMNAL Failed to initialise\n");
                return(-ENODEV);
        }
 
-       CDEBUG(D_INFO, "This is the end of the gmnal init routine");
+       CDEBUG(D_NET, "This is the end of the gmnal init routine");
 
        return(0);
 }
@@ -121,7 +120,6 @@ module_init(gmnal_load);
 
 module_exit(gmnal_unload);
 
-MODULE_PARM(gmnal_small_msg_size, "i");
 MODULE_PARM(num_rx_threads, "i");
 MODULE_PARM(num_stxds, "i");
 MODULE_PARM(gm_port_id, "i");
index 1cbb728..aee16fb 100644 (file)
  *     Am I one of the gmnal rxthreads ?
  */
 int
-gmnal_is_rxthread(gmnal_data_t *nal_data)
+gmnal_is_rxthread(gmnal_ni_t *gmnalni)
 {
        int i;
        for (i=0; i<num_rx_threads; i++) {
-               if (nal_data->rxthread_pid[i] == current->pid)
+               if (gmnalni->gmni_rxthread_pid[i] == current->pid)
                        return(1);
        }
        return(0);
@@ -51,23 +51,24 @@ gmnal_is_rxthread(gmnal_data_t *nal_data)
  *     used to do gm_gets in gmnal_copyiov     
  */
 int
-gmnal_alloc_txd(gmnal_data_t *nal_data)
+gmnal_alloc_txd(gmnal_ni_t *gmnalni)
 {
-       int ntx= 0, nstx= 0, nrxt_stx= 0,
-           nltx= 0, i = 0;
-       gmnal_stxd_t    *txd = NULL;
-       gmnal_ltxd_t    *ltxd = NULL;
-       void    *txbuffer = NULL;
+       int           ntx;
+        int           nstx;
+        int           nrxt_stx;
+        int           i;
+       gmnal_stxd_t *txd;
+       void         *txbuffer;
 
        CDEBUG(D_TRACE, "gmnal_alloc_small tx\n");
 
-       spin_lock(&nal_data->gm_lock);
+       spin_lock(&gmnalni->gmni_gm_lock);
        /*
         *      total number of transmit tokens
         */
-       ntx = gm_num_send_tokens(nal_data->gm_port);
-       spin_unlock(&nal_data->gm_lock);
-       CDEBUG(D_INFO, "total number of send tokens available is [%d]\n", ntx);
+       ntx = gm_num_send_tokens(gmnalni->gmni_port);
+       spin_unlock(&gmnalni->gmni_gm_lock);
+       CDEBUG(D_NET, "total number of send tokens available is [%d]\n", ntx);
 
        /*
         *      allocate a number for small sends
@@ -75,144 +76,121 @@ gmnal_alloc_txd(gmnal_data_t *nal_data)
         */
        nstx = num_stxds;
        /*
-        *      give that number plus 1 to the receive threads
+        *      give the rest to the receive threads
         */
-        nrxt_stx = nstx + 1;
-
-       /*
-        *      give the rest for gm_gets
-        */
-       nltx = ntx - (nrxt_stx + nstx);
-       if (nltx < 1) {
-               CERROR("No tokens available for large messages\n");
-               return(GMNAL_STATUS_FAIL);
-       }
+        nrxt_stx = num_stxds + 1;
 
-
-       /*
-        * A semaphore is initialised with the
-        * number of transmit tokens available.
-        * To get a stxd, acquire the token semaphore.
-        * this decrements the available token count
-        * (if no tokens you block here, someone returning a
-        * stxd will release the semaphore and wake you)
-        * When token is obtained acquire the spinlock
-        * to manipulate the list
-        */
-       sema_init(&nal_data->stxd_token, nstx);
-       spin_lock_init(&nal_data->stxd_lock);
-       sema_init(&nal_data->rxt_stxd_token, nrxt_stx);
-       spin_lock_init(&nal_data->rxt_stxd_lock);
-       sema_init(&nal_data->ltxd_token, nltx);
-       spin_lock_init(&nal_data->ltxd_lock);
+        if (nstx + nrxt_stx > ntx) {
+                CERROR ("Asked for %d + %d tx credits, but only %d available\n",
+                        nstx, nrxt_stx, ntx);
+                return -ENOMEM;
+        }
+        
+       /* A semaphore is initialised with the number of transmit tokens
+        * available.  To get a stxd, acquire the token semaphore.  this
+        * decrements the available token count (if no tokens you block here,
+        * someone returning a stxd will release the semaphore and wake you)
+        * When token is obtained acquire the spinlock to manipulate the
+        * list */
+       sema_init(&gmnalni->gmni_stxd_token, nstx);
+       spin_lock_init(&gmnalni->gmni_stxd_lock);
+
+       sema_init(&gmnalni->gmni_rxt_stxd_token, nrxt_stx);
+       spin_lock_init(&gmnalni->gmni_rxt_stxd_lock);
 
        for (i=0; i<=nstx; i++) {
-               PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
-               if (!txd) {
+               PORTAL_ALLOC(txd, sizeof(*txd));
+               if (txd == NULL) {
                        CERROR("Failed to malloc txd [%d]\n", i);
-                       return(GMNAL_STATUS_NOMEM);
+                       return -ENOMEM;
                }
-               spin_lock(&nal_data->gm_lock);
-               txbuffer = gm_dma_malloc(nal_data->gm_port,
-                                        nal_data->small_msg_size);
-               spin_unlock(&nal_data->gm_lock);
-               if (!txbuffer) {
+               spin_lock(&gmnalni->gmni_gm_lock);
+               txbuffer = gm_dma_malloc(gmnalni->gmni_port,
+                                        gmnalni->gmni_small_msg_size);
+               spin_unlock(&gmnalni->gmni_gm_lock);
+               if (txbuffer == NULL) {
                        CERROR("Failed to gm_dma_malloc txbuffer [%d], "
-                              "size [%d]\n", i, nal_data->small_msg_size);
-                       PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
-                       return(GMNAL_STATUS_FAIL);
+                              "size [%d]\n", i, gmnalni->gmni_small_msg_size);
+                       PORTAL_FREE(txd, sizeof(*txd));
+                       return -ENOMEM;
                }
-               txd->buffer = txbuffer;
-               txd->buffer_size = nal_data->small_msg_size;
-               txd->gm_size = gm_min_size_for_length(txd->buffer_size);
-               txd->nal_data = (struct _gmnal_data_t*)nal_data;
-                txd->rxt = 0;
-
-               txd->next = nal_data->stxd;
-               nal_data->stxd = txd;
-               CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], "
-                      "size [%d]\n", txd, txd->buffer, txd->buffer_size);
+               txd->tx_buffer = txbuffer;
+               txd->tx_buffer_size = gmnalni->gmni_small_msg_size;
+               txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size);
+               txd->tx_gmni = gmnalni;
+                txd->tx_rxt = 0;
+
+               txd->tx_next = gmnalni->gmni_stxd;
+               gmnalni->gmni_stxd = txd;
+               CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], "
+                      "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
        }
 
        for (i=0; i<=nrxt_stx; i++) {
                PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
                if (!txd) {
                        CERROR("Failed to malloc txd [%d]\n", i);
-                       return(GMNAL_STATUS_NOMEM);
+                       return -ENOMEM;
                }
-               spin_lock(&nal_data->gm_lock);
-               txbuffer = gm_dma_malloc(nal_data->gm_port, 
-                                        nal_data->small_msg_size);
-               spin_unlock(&nal_data->gm_lock);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               txbuffer = gm_dma_malloc(gmnalni->gmni_port, 
+                                        gmnalni->gmni_small_msg_size);
+               spin_unlock(&gmnalni->gmni_gm_lock);
                if (!txbuffer) {
                        CERROR("Failed to gm_dma_malloc txbuffer [%d],"
-                              " size [%d]\n",i, nal_data->small_msg_size);
+                              " size [%d]\n",i, gmnalni->gmni_small_msg_size);
                        PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
-                       return(GMNAL_STATUS_FAIL);
+                       return -ENOMEM;
                }
-               txd->buffer = txbuffer;
-               txd->buffer_size = nal_data->small_msg_size;
-               txd->gm_size = gm_min_size_for_length(txd->buffer_size);
-               txd->nal_data = (struct _gmnal_data_t*)nal_data;
-                txd->rxt = 1;
-
-               txd->next = nal_data->rxt_stxd;
-               nal_data->rxt_stxd = txd;
-               CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], "
-                      "size [%d]\n", txd, txd->buffer, txd->buffer_size);
+               txd->tx_buffer = txbuffer;
+               txd->tx_buffer_size = gmnalni->gmni_small_msg_size;
+               txd->tx_gm_size = gm_min_size_for_length(txd->tx_buffer_size);
+               txd->tx_gmni = gmnalni;
+                txd->tx_rxt = 1;
+
+               txd->tx_next = gmnalni->gmni_rxt_stxd;
+               gmnalni->gmni_rxt_stxd = txd;
+               CDEBUG(D_NET, "Registered txd [%p] with buffer [%p], "
+                      "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
        }
 
-       /*
-        *      string together large tokens
-        */
-       for (i=0; i<=nltx ; i++) {
-               PORTAL_ALLOC(ltxd, sizeof(gmnal_ltxd_t));
-               ltxd->next = nal_data->ltxd;
-               nal_data->ltxd = ltxd;
-       }
-       return(GMNAL_STATUS_OK);
+       return 0;
 }
 
 /*     Free the list of wired and gm_registered small tx buffers and 
  *     the tx descriptors that go along with them.
  */
 void
-gmnal_free_txd(gmnal_data_t *nal_data)
+gmnal_free_txd(gmnal_ni_t *gmnalni)
 {
-       gmnal_stxd_t *txd = nal_data->stxd, *_txd = NULL;
-       gmnal_ltxd_t *ltxd = NULL, *_ltxd = NULL;
+       gmnal_stxd_t *txd;
+        gmnal_stxd_t *_txd;
 
        CDEBUG(D_TRACE, "gmnal_free_small tx\n");
 
-       while(txd) {
-               CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], "
-                      "size [%d]\n", txd, txd->buffer, txd->buffer_size);
+        txd = gmnalni->gmni_stxd;
+       while(txd != NULL) {
+               CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], "
+                      "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
                _txd = txd;
-               txd = txd->next;
-               spin_lock(&nal_data->gm_lock);
-               gm_dma_free(nal_data->gm_port, _txd->buffer);
-               spin_unlock(&nal_data->gm_lock);
+               txd = txd->tx_next;
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer);
+               spin_unlock(&gmnalni->gmni_gm_lock);
                PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
        }
-        txd = nal_data->rxt_stxd;
+
+        txd = gmnalni->gmni_rxt_stxd;
        while(txd) {
-               CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], "
-                      "size [%d]\n", txd, txd->buffer, txd->buffer_size);
+               CDEBUG(D_NET, "Freeing txd [%p] with buffer [%p], "
+                      "size [%d]\n", txd, txd->tx_buffer, txd->tx_buffer_size);
                _txd = txd;
-               txd = txd->next;
-               spin_lock(&nal_data->gm_lock);
-               gm_dma_free(nal_data->gm_port, _txd->buffer);
-               spin_unlock(&nal_data->gm_lock);
+               txd = txd->tx_next;
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_dma_free(gmnalni->gmni_port, _txd->tx_buffer);
+               spin_unlock(&gmnalni->gmni_gm_lock);
                PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
        }
-       ltxd = nal_data->ltxd;
-       while(txd) {
-               _ltxd = ltxd;
-               ltxd = ltxd->next;
-               PORTAL_FREE(_ltxd, sizeof(gmnal_ltxd_t));
-       }
-       
-       return;
 }
 
 
@@ -222,45 +200,45 @@ gmnal_free_txd(gmnal_data_t *nal_data)
  *     This implicitly gets us a send token also.
  */
 gmnal_stxd_t *
-gmnal_get_stxd(gmnal_data_t *nal_data, int block)
+gmnal_get_stxd(gmnal_ni_t *gmnalni, int block)
 {
 
        gmnal_stxd_t    *txd = NULL;
        pid_t           pid = current->pid;
 
 
-       CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", 
-              nal_data, block, pid);
-
-       if (gmnal_is_rxthread(nal_data)) {
-                CDEBUG(D_INFO, "RXTHREAD Attempting to get token\n");
-               down(&nal_data->rxt_stxd_token);
-               spin_lock(&nal_data->rxt_stxd_lock);
-               txd = nal_data->rxt_stxd;
-               nal_data->rxt_stxd = txd->next;
-               spin_unlock(&nal_data->rxt_stxd_lock);
-               CDEBUG(D_INFO, "RXTHREAD got [%p], head is [%p]\n", 
-                      txd, nal_data->rxt_stxd);
-                txd->kniov = 0;
-                txd->rxt = 1;
+       CDEBUG(D_TRACE, "gmnal_get_stxd gmnalni [%p] block[%d] pid [%d]\n", 
+              gmnalni, block, pid);
+
+       if (gmnal_is_rxthread(gmnalni)) {
+                CDEBUG(D_NET, "RXTHREAD Attempting to get token\n");
+               down(&gmnalni->gmni_rxt_stxd_token);
+               spin_lock(&gmnalni->gmni_rxt_stxd_lock);
+               txd = gmnalni->gmni_rxt_stxd;
+               gmnalni->gmni_rxt_stxd = txd->tx_next;
+               spin_unlock(&gmnalni->gmni_rxt_stxd_lock);
+               CDEBUG(D_NET, "RXTHREAD got [%p], head is [%p]\n", 
+                      txd, gmnalni->gmni_rxt_stxd);
+                txd->tx_kniov = 0;
+                txd->tx_rxt = 1;
         } else {
                if (block) {
-                        CDEBUG(D_INFO, "Attempting to get token\n");
-                       down(&nal_data->stxd_token);
+                        CDEBUG(D_NET, "Attempting to get token\n");
+                       down(&gmnalni->gmni_stxd_token);
                         CDEBUG(D_PORTALS, "Got token\n");
                } else {
-                       if (down_trylock(&nal_data->stxd_token)) {
+                       if (down_trylock(&gmnalni->gmni_stxd_token)) {
                                CERROR("can't get token\n");
                                return(NULL);
                        }
                }
-               spin_lock(&nal_data->stxd_lock);
-               txd = nal_data->stxd;
-               nal_data->stxd = txd->next;
-               spin_unlock(&nal_data->stxd_lock);
-               CDEBUG(D_INFO, "got [%p], head is [%p]\n", txd,
-                      nal_data->stxd);
-                txd->kniov = 0;
+               spin_lock(&gmnalni->gmni_stxd_lock);
+               txd = gmnalni->gmni_stxd;
+               gmnalni->gmni_stxd = txd->tx_next;
+               spin_unlock(&gmnalni->gmni_stxd_lock);
+               CDEBUG(D_NET, "got [%p], head is [%p]\n", txd,
+                      gmnalni->gmni_stxd);
+                txd->tx_kniov = 0;
         }       /* general txd get */
        return(txd);
 }
@@ -269,72 +247,35 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block)
  *     Return a txd to the list
  */
 void
-gmnal_return_stxd(gmnal_data_t *nal_data, gmnal_stxd_t *txd)
+gmnal_return_stxd(gmnal_ni_t *gmnalni, gmnal_stxd_t *txd)
 {
-       CDEBUG(D_TRACE, "nal_data [%p], txd[%p] rxt[%d]\n", nal_data,
-              txd, txd->rxt);
+       CDEBUG(D_TRACE, "gmnalni [%p], txd[%p] rxt[%d]\n", gmnalni,
+              txd, txd->tx_rxt);
 
         /*
          *      this transmit descriptor is 
          *      for the rxthread
          */
-        if (txd->rxt) {
-               spin_lock(&nal_data->rxt_stxd_lock);
-               txd->next = nal_data->rxt_stxd;
-               nal_data->rxt_stxd = txd;
-               spin_unlock(&nal_data->rxt_stxd_lock);
-               up(&nal_data->rxt_stxd_token);
-                CDEBUG(D_INFO, "Returned stxd to rxthread list\n");
+        if (txd->tx_rxt) {
+               spin_lock(&gmnalni->gmni_rxt_stxd_lock);
+               txd->tx_next = gmnalni->gmni_rxt_stxd;
+               gmnalni->gmni_rxt_stxd = txd;
+               spin_unlock(&gmnalni->gmni_rxt_stxd_lock);
+               up(&gmnalni->gmni_rxt_stxd_token);
+                CDEBUG(D_NET, "Returned stxd to rxthread list\n");
         } else {
-               spin_lock(&nal_data->stxd_lock);
-               txd->next = nal_data->stxd;
-               nal_data->stxd = txd;
-               spin_unlock(&nal_data->stxd_lock);
-               up(&nal_data->stxd_token);
-                CDEBUG(D_INFO, "Returned stxd to general list\n");
+               spin_lock(&gmnalni->gmni_stxd_lock);
+               txd->tx_next = gmnalni->gmni_stxd;
+               gmnalni->gmni_stxd = txd;
+               spin_unlock(&gmnalni->gmni_stxd_lock);
+               up(&gmnalni->gmni_stxd_token);
+                CDEBUG(D_NET, "Returned stxd to general list\n");
         }
        return;
 }
 
 
 /*
- *     Get a large transmit descriptor from the free list
- *     This implicitly gets us a transmit  token .
- *     always wait for one.
- */
-gmnal_ltxd_t *
-gmnal_get_ltxd(gmnal_data_t *nal_data)
-{
-
-       gmnal_ltxd_t    *ltxd = NULL;
-
-       CDEBUG(D_TRACE, "nal_data [%p]\n", nal_data);
-
-       down(&nal_data->ltxd_token);
-       spin_lock(&nal_data->ltxd_lock);
-       ltxd = nal_data->ltxd;
-       nal_data->ltxd = ltxd->next;
-       spin_unlock(&nal_data->ltxd_lock);
-       CDEBUG(D_INFO, "got [%p], head is [%p]\n", ltxd, nal_data->ltxd);
-       return(ltxd);
-}
-
-/*
- *     Return an ltxd to the list
- */
-void
-gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd)
-{
-       CDEBUG(D_TRACE, "nal_data [%p], ltxd[%p]\n", nal_data, ltxd);
-
-       spin_lock(&nal_data->ltxd_lock);
-       ltxd->next = nal_data->ltxd;
-       nal_data->ltxd = ltxd;
-       spin_unlock(&nal_data->ltxd_lock);
-       up(&nal_data->ltxd_token);
-       return;
-}
-/*
  *     allocate a number of small rx buffers and register with GM
  *     so they are wired and set up for DMA. This is a costly operation.
  *     Also allocate a corrosponding descriptor to keep track of 
@@ -343,7 +284,7 @@ gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd)
  *     receive thread.
  */
 int
-gmnal_alloc_srxd(gmnal_data_t *nal_data)
+gmnal_alloc_srxd(gmnal_ni_t *gmnalni)
 {
        int nrx = 0, nsrx = 0, i = 0;
        gmnal_srxd_t    *rxd = NULL;
@@ -351,10 +292,10 @@ gmnal_alloc_srxd(gmnal_data_t *nal_data)
 
        CDEBUG(D_TRACE, "gmnal_alloc_small rx\n");
 
-       spin_lock(&nal_data->gm_lock);
-       nrx = gm_num_receive_tokens(nal_data->gm_port);
-       spin_unlock(&nal_data->gm_lock);
-       CDEBUG(D_INFO, "total number of receive tokens available is [%d]\n",
+       spin_lock(&gmnalni->gmni_gm_lock);
+       nrx = gm_num_receive_tokens(gmnalni->gmni_port);
+       spin_unlock(&gmnalni->gmni_gm_lock);
+       CDEBUG(D_NET, "total number of receive tokens available is [%d]\n",
               nrx);
 
        nsrx = nrx/2;
@@ -365,59 +306,56 @@ gmnal_alloc_srxd(gmnal_data_t *nal_data)
         */
        nsrx = num_stxds*2 + 2;
 
-       CDEBUG(D_INFO, "Allocated [%d] receive tokens to small messages\n",
+       CDEBUG(D_NET, "Allocated [%d] receive tokens to small messages\n",
               nsrx);
 
 
-       spin_lock(&nal_data->gm_lock);
-       nal_data->srxd_hash = gm_create_hash(gm_hash_compare_ptrs, 
-                                            gm_hash_hash_ptr, 0, 0, nsrx, 0);
-       spin_unlock(&nal_data->gm_lock);
-       if (!nal_data->srxd_hash) {
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gmnalni->gmni_srxd_hash = gm_create_hash(gm_hash_compare_ptrs, 
+                                                  gm_hash_hash_ptr, 0, 0, nsrx, 0);
+       spin_unlock(&gmnalni->gmni_gm_lock);
+       if (!gmnalni->gmni_srxd_hash) {
                        CERROR("Failed to create hash table\n");
-                       return(GMNAL_STATUS_NOMEM);
+                       return -ENOMEM;
        }
 
-       sema_init(&nal_data->srxd_token, nsrx);
-       spin_lock_init(&nal_data->srxd_lock);
-
        for (i=0; i<=nsrx; i++) {
                PORTAL_ALLOC(rxd, sizeof(gmnal_srxd_t));
                if (!rxd) {
                        CERROR("Failed to malloc rxd [%d]\n", i);
-                       return(GMNAL_STATUS_NOMEM);
+                       return -ENOMEM;
                }
 
-               spin_lock(&nal_data->gm_lock);
-               rxbuffer = gm_dma_malloc(nal_data->gm_port, 
-                                        nal_data->small_msg_size);
-               spin_unlock(&nal_data->gm_lock);
+               spin_lock(&gmnalni->gmni_gm_lock);
+               rxbuffer = gm_dma_malloc(gmnalni->gmni_port, 
+                                        gmnalni->gmni_small_msg_size);
+               spin_unlock(&gmnalni->gmni_gm_lock);
                if (!rxbuffer) {
                        CERROR("Failed to gm_dma_malloc rxbuffer [%d], "
-                              "size [%d]\n",i ,nal_data->small_msg_size);
+                              "size [%d]\n",i ,gmnalni->gmni_small_msg_size);
                        PORTAL_FREE(rxd, sizeof(gmnal_srxd_t));
-                       return(GMNAL_STATUS_FAIL);
+                       return -ENOMEM;
                }
 
-               rxd->buffer = rxbuffer;
-               rxd->size = nal_data->small_msg_size;
-               rxd->gmsize = gm_min_size_for_length(rxd->size);
+               rxd->rx_buffer = rxbuffer;
+               rxd->rx_size = gmnalni->gmni_small_msg_size;
+               rxd->rx_gmsize = gm_min_size_for_length(rxd->rx_size);
 
-               if (gm_hash_insert(nal_data->srxd_hash,
+               if (gm_hash_insert(gmnalni->gmni_srxd_hash,
                                   (void*)rxbuffer, (void*)rxd)) {
 
                        CERROR("failed to create hash entry rxd[%p] "
                               "for rxbuffer[%p]\n", rxd, rxbuffer);
-                       return(GMNAL_STATUS_FAIL);
+                       return -ENOMEM;
                }
 
-               rxd->next = nal_data->srxd;
-               nal_data->srxd = rxd;
-               CDEBUG(D_INFO, "Registered rxd [%p] with buffer [%p], "
-                      "size [%d]\n", rxd, rxd->buffer, rxd->size);
+               rxd->rx_next = gmnalni->gmni_srxd;
+               gmnalni->gmni_srxd = rxd;
+               CDEBUG(D_NET, "Registered rxd [%p] with buffer [%p], "
+                      "size [%d]\n", rxd, rxd->rx_buffer, rxd->rx_size);
        }
 
-       return(GMNAL_STATUS_OK);
+       return 0;
 }
 
 
@@ -426,29 +364,22 @@ gmnal_alloc_srxd(gmnal_data_t *nal_data)
  *     rx descriptors that go along with them.
  */
 void
-gmnal_free_srxd(gmnal_data_t *nal_data)
+gmnal_free_srxd(gmnal_ni_t *gmnalni)
 {
-       gmnal_srxd_t *rxd = nal_data->srxd, *_rxd = NULL;
+       gmnal_srxd_t *rxd = gmnalni->gmni_srxd, *_rxd = NULL;
 
        CDEBUG(D_TRACE, "gmnal_free_small rx\n");
 
        while(rxd) {
-               CDEBUG(D_INFO, "Freeing rxd [%p] buffer [%p], size [%d]\n",
-                      rxd, rxd->buffer, rxd->size);
+               CDEBUG(D_NET, "Freeing rxd [%p] buffer [%p], size [%d]\n",
+                      rxd, rxd->rx_buffer, rxd->rx_size);
                _rxd = rxd;
-               rxd = rxd->next;
+               rxd = rxd->rx_next;
+
+               spin_lock(&gmnalni->gmni_gm_lock);
+               gm_dma_free(gmnalni->gmni_port, _rxd->rx_buffer);
+               spin_unlock(&gmnalni->gmni_gm_lock);
 
-#if 0
-               spin_lock(&nal_data->gm_lock);
-               gm_deregister_memory(nal_data->gm_port, _rxd->buffer, 
-                                    _rxd->size);
-               spin_unlock(&nal_data->gm_lock);
-               PORTAL_FREE(_rxd->buffer, GMNAL_SMALL_RXBUFFER_SIZE);
-#else
-               spin_lock(&nal_data->gm_lock);
-               gm_dma_free(nal_data->gm_port, _rxd->buffer);
-               spin_unlock(&nal_data->gm_lock);
-#endif
                PORTAL_FREE(_rxd, sizeof(gmnal_srxd_t));
        }
        return;
@@ -456,51 +387,6 @@ gmnal_free_srxd(gmnal_data_t *nal_data)
 
 
 /*
- *     Get a rxd from the free list
- *     This get us a wired and gm_registered small rx buffer.
- *     This implicitly gets us a receive token also.
- */
-gmnal_srxd_t *
-gmnal_get_srxd(gmnal_data_t *nal_data, int block)
-{
-
-       gmnal_srxd_t    *rxd = NULL;
-       CDEBUG(D_TRACE, "nal_data [%p] block [%d]\n", nal_data, block);
-
-       if (block) {
-               down(&nal_data->srxd_token);
-       } else {
-               if (down_trylock(&nal_data->srxd_token)) {
-                       CDEBUG(D_INFO, "gmnal_get_srxd Can't get token\n");
-                       return(NULL);
-               }
-       }
-       spin_lock(&nal_data->srxd_lock);
-       rxd = nal_data->srxd;
-       if (rxd)
-               nal_data->srxd = rxd->next;
-       spin_unlock(&nal_data->srxd_lock);
-       CDEBUG(D_INFO, "got [%p], head is [%p]\n", rxd, nal_data->srxd);
-       return(rxd);
-}
-
-/*
- *     Return an rxd to the list
- */
-void
-gmnal_return_srxd(gmnal_data_t *nal_data, gmnal_srxd_t *rxd)
-{
-       CDEBUG(D_TRACE, "nal_data [%p], rxd[%p]\n", nal_data, rxd);
-
-       spin_lock(&nal_data->srxd_lock);
-       rxd->next = nal_data->srxd;
-       nal_data->srxd = rxd;
-       spin_unlock(&nal_data->srxd_lock);
-       up(&nal_data->srxd_token);
-       return;
-}
-
-/*
  *     Given a pointer to a srxd find 
  *     the relevant descriptor for it
  *     This is done by searching a hash
@@ -508,72 +394,72 @@ gmnal_return_srxd(gmnal_data_t *nal_data, gmnal_srxd_t *rxd)
  *     are created
  */
 gmnal_srxd_t *
-gmnal_rxbuffer_to_srxd(gmnal_data_t *nal_data, void *rxbuffer)
+gmnal_rxbuffer_to_srxd(gmnal_ni_t *gmnalni, void *rxbuffer)
 {
        gmnal_srxd_t    *srxd = NULL;
-       CDEBUG(D_TRACE, "nal_data [%p], rxbuffer [%p]\n", nal_data, rxbuffer);
-       srxd = gm_hash_find(nal_data->srxd_hash, rxbuffer);
-       CDEBUG(D_INFO, "srxd is [%p]\n", srxd);
+       CDEBUG(D_TRACE, "gmnalni [%p], rxbuffer [%p]\n", gmnalni, rxbuffer);
+       srxd = gm_hash_find(gmnalni->gmni_srxd_hash, rxbuffer);
+       CDEBUG(D_NET, "srxd is [%p]\n", srxd);
        return(srxd);
 }
 
 
 void
-gmnal_stop_rxthread(gmnal_data_t *nal_data)
+gmnal_stop_rxthread(gmnal_ni_t *gmnalni)
 {
        int     delay = 30;
 
 
 
-       CDEBUG(D_TRACE, "Attempting to stop rxthread nal_data [%p]\n", 
-               nal_data);
+       CDEBUG(D_TRACE, "Attempting to stop rxthread gmnalni [%p]\n", 
+               gmnalni);
        
-       nal_data->rxthread_stop_flag = GMNAL_THREAD_STOP;
+       gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_STOP;
 
-       gmnal_remove_rxtwe(nal_data);
+       gmnal_remove_rxtwe(gmnalni);
        /*
         *      kick the thread 
         */
-       up(&nal_data->rxtwe_wait);
+       up(&gmnalni->gmni_rxtwe_wait);
 
-       while(nal_data->rxthread_flag != GMNAL_THREAD_RESET && delay--) {
-               CDEBUG(D_INFO, "gmnal_stop_rxthread sleeping\n");
+       while(gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET && delay--) {
+               CDEBUG(D_NET, "gmnal_stop_rxthread sleeping\n");
                 gmnal_yield(1);
-               up(&nal_data->rxtwe_wait);
+               up(&gmnalni->gmni_rxtwe_wait);
        }
 
-       if (nal_data->rxthread_flag != GMNAL_THREAD_RESET) {
+       if (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
                CERROR("I don't know how to wake the thread\n");
        } else {
-               CDEBUG(D_INFO, "rx thread seems to have stopped\n");
+               CDEBUG(D_NET, "rx thread seems to have stopped\n");
        }
 }
 
 void
-gmnal_stop_ctthread(gmnal_data_t *nal_data)
+gmnal_stop_ctthread(gmnal_ni_t *gmnalni)
 {
        int     delay = 15;
 
 
 
-       CDEBUG(D_TRACE, "Attempting to stop ctthread nal_data [%p]\n", 
-              nal_data);
+       CDEBUG(D_TRACE, "Attempting to stop ctthread gmnalni [%p]\n", 
+              gmnalni);
        
-       nal_data->ctthread_flag = GMNAL_THREAD_STOP;
-       spin_lock(&nal_data->gm_lock);
-       gm_set_alarm(nal_data->gm_port, &nal_data->ctthread_alarm, 10, 
+       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_STOP;
+       spin_lock(&gmnalni->gmni_gm_lock);
+       gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 10, 
                     NULL, NULL);
-       spin_unlock(&nal_data->gm_lock);
+       spin_unlock(&gmnalni->gmni_gm_lock);
 
-       while(nal_data->ctthread_flag == GMNAL_THREAD_STOP && delay--) {
-               CDEBUG(D_INFO, "gmnal_stop_ctthread sleeping\n");
+       while(gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP && delay--) {
+               CDEBUG(D_NET, "gmnal_stop_ctthread sleeping\n");
                 gmnal_yield(1);
        }
 
-       if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
+       if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
                CERROR("I DON'T KNOW HOW TO WAKE THE THREAD\n");
        } else {
-               CDEBUG(D_INFO, "CT THREAD SEEMS TO HAVE STOPPED\n");
+               CDEBUG(D_NET, "CT THREAD SEEMS TO HAVE STOPPED\n");
        }
 }
 
@@ -835,17 +721,17 @@ gmnal_yield(int delay)
 }
 
 int
-gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, 
+gmnal_is_small_msg(gmnal_ni_t *gmnalni, int niov, struct iovec *iov, 
                    int len)
 {
 
        CDEBUG(D_TRACE, "len [%d] limit[%d]\n", len, 
-              nal_data->small_msg_size);
+              gmnalni->gmni_small_msg_size);
 
        if ((len + sizeof(ptl_hdr_t) + sizeof(gmnal_msghdr_t)) 
-                    < nal_data->small_msg_size) {
+                    < gmnalni->gmni_small_msg_size) {
 
-               CDEBUG(D_INFO, "Yep, small message\n");
+               CDEBUG(D_NET, "Yep, small message\n");
                return(1);
        } else {
                CERROR("No, not small message\n");
@@ -865,7 +751,7 @@ gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov,
  *     can get it to complete the receive
  */
 int
-gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
+gmnal_add_rxtwe(gmnal_ni_t *gmnalni, gm_recv_t *recv)
 {
        gmnal_rxtwe_t   *we = NULL;
 
@@ -874,7 +760,7 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
        PORTAL_ALLOC(we, sizeof(gmnal_rxtwe_t));
        if (!we) {
                CERROR("failed to malloc\n");
-               return(GMNAL_STATUS_FAIL);
+               return -ENOMEM;
        }
        we->buffer = gm_ntohp(recv->buffer);
        we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
@@ -882,70 +768,73 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
        we->type = (int)gm_ntoh_u8(recv->type);
        we->length = (int)gm_ntohl(recv->length);
 
-       spin_lock(&nal_data->rxtwe_lock);
-       if (nal_data->rxtwe_tail) {
-               nal_data->rxtwe_tail->next = we;
+       spin_lock(&gmnalni->gmni_rxtwe_lock);
+       if (gmnalni->gmni_rxtwe_tail) {
+               gmnalni->gmni_rxtwe_tail->next = we;
        } else {
-               nal_data->rxtwe_head = we;
-               nal_data->rxtwe_tail = we;
+               gmnalni->gmni_rxtwe_head = we;
+               gmnalni->gmni_rxtwe_tail = we;
        }
-       nal_data->rxtwe_tail = we;
-       spin_unlock(&nal_data->rxtwe_lock);
+       gmnalni->gmni_rxtwe_tail = we;
+       spin_unlock(&gmnalni->gmni_rxtwe_lock);
 
-       up(&nal_data->rxtwe_wait);
-       return(GMNAL_STATUS_OK);
+       up(&gmnalni->gmni_rxtwe_wait);
+       return 0;
 }
 
 void
-gmnal_remove_rxtwe(gmnal_data_t *nal_data)
+gmnal_remove_rxtwe(gmnal_ni_t *gmnalni)
 {
-       gmnal_rxtwe_t   *_we, *we = nal_data->rxtwe_head;
+       gmnal_rxtwe_t   *_we, *we = gmnalni->gmni_rxtwe_head;
 
        CDEBUG(D_NET, "removing all work list entries\n");
 
-       spin_lock(&nal_data->rxtwe_lock);
+       spin_lock(&gmnalni->gmni_rxtwe_lock);
        CDEBUG(D_NET, "Got lock\n");
        while (we) {
                _we = we;
                we = we->next;
                PORTAL_FREE(_we, sizeof(gmnal_rxtwe_t));
        }
-       spin_unlock(&nal_data->rxtwe_lock);
-       nal_data->rxtwe_head = NULL;
-       nal_data->rxtwe_tail = NULL;
+       spin_unlock(&gmnalni->gmni_rxtwe_lock);
+       gmnalni->gmni_rxtwe_head = NULL;
+       gmnalni->gmni_rxtwe_tail = NULL;
 }
 
 gmnal_rxtwe_t *
-gmnal_get_rxtwe(gmnal_data_t *nal_data)
+gmnal_get_rxtwe(gmnal_ni_t *gmnalni)
 {
        gmnal_rxtwe_t   *we = NULL;
 
        CDEBUG(D_NET, "Getting entry to list\n");
 
        do  {
-               while(down_interruptible(&nal_data->rxtwe_wait) != 0)
+               while(down_interruptible(&gmnalni->gmni_rxtwe_wait) != 0)
                         /* do nothing */;
-               if (nal_data->rxthread_stop_flag == GMNAL_THREAD_STOP) {
+
+               if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP) {
                        /*
                         *      time to stop
                         *      TO DO some one free the work entries
                         */
                        return(NULL);
                }
-               spin_lock(&nal_data->rxtwe_lock);
-               if (nal_data->rxtwe_head) {
-                       CDEBUG(D_INFO, "Got a work entry\n");
-                       we = nal_data->rxtwe_head;
-                       nal_data->rxtwe_head = we->next;
-                       if (!nal_data->rxtwe_head)
-                               nal_data->rxtwe_tail = NULL;
+
+               spin_lock(&gmnalni->gmni_rxtwe_lock);
+               if (gmnalni->gmni_rxtwe_head) {
+                       CDEBUG(D_NET, "Got a work entry\n");
+                       we = gmnalni->gmni_rxtwe_head;
+                       gmnalni->gmni_rxtwe_head = we->next;
+                       if (!gmnalni->gmni_rxtwe_head)
+                               gmnalni->gmni_rxtwe_tail = NULL;
                } else {
                        CWARN("woken but no work\n");
                }
-               spin_unlock(&nal_data->rxtwe_lock);
+
+               spin_unlock(&gmnalni->gmni_rxtwe_lock);
        } while (!we);
 
-       CDEBUG(D_INFO, "Returning we[%p]\n", we);
+       CDEBUG(D_NET, "Returning we[%p]\n", we);
        return(we);
 }
 
@@ -958,7 +847,7 @@ gmnal_get_rxtwe(gmnal_data_t *nal_data)
  *     callback events or sleeps.
  */
 int
-gmnal_start_kernel_threads(gmnal_data_t *nal_data)
+gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
 {
 
        int     threads = 0;
@@ -967,69 +856,69 @@ gmnal_start_kernel_threads(gmnal_data_t *nal_data)
         *      gm_unknown call (sleeping) to exit it.
         */
        CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n");
-       gm_initialize_alarm(&nal_data->ctthread_alarm);
-       nal_data->ctthread_flag = GMNAL_THREAD_RESET;
+       gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm);
+       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
 
 
-       CDEBUG(D_INFO, "Starting caretaker thread\n");
-       nal_data->ctthread_pid = 
-                kernel_thread(gmnal_ct_thread, (void*)nal_data, 0);
-       if (nal_data->ctthread_pid <= 0) {
+       CDEBUG(D_NET, "Starting caretaker thread\n");
+       gmnalni->gmni_ctthread_pid = 
+                kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0);
+       if (gmnalni->gmni_ctthread_pid <= 0) {
                CERROR("Caretaker thread failed to start\n");
-               return(GMNAL_STATUS_FAIL);
+               return -ENOMEM;
        }
 
-       while (nal_data->rxthread_flag != GMNAL_THREAD_RESET) {
+       while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
                gmnal_yield(1);
-               CDEBUG(D_INFO, "Waiting for caretaker thread signs of life\n");
+               CDEBUG(D_NET, "Waiting for caretaker thread signs of life\n");
        }
 
-       CDEBUG(D_INFO, "caretaker thread has started\n");
+       CDEBUG(D_NET, "caretaker thread has started\n");
 
 
        /*
         *      Now start a number of receiver threads
         *      these treads get work to do from the caretaker (ct) thread
         */
-       nal_data->rxthread_flag = GMNAL_THREAD_RESET;
-       nal_data->rxthread_stop_flag = GMNAL_THREAD_RESET;
+       gmnalni->gmni_rxthread_flag = GMNAL_THREAD_RESET;
+       gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_RESET;
 
        for (threads=0; threads<NRXTHREADS; threads++)
-               nal_data->rxthread_pid[threads] = -1;
-       spin_lock_init(&nal_data->rxtwe_lock);
-       spin_lock_init(&nal_data->rxthread_flag_lock);
-       sema_init(&nal_data->rxtwe_wait, 0);
-       nal_data->rxtwe_head = NULL;
-       nal_data->rxtwe_tail = NULL;
+               gmnalni->gmni_rxthread_pid[threads] = -1;
+       spin_lock_init(&gmnalni->gmni_rxtwe_lock);
+       spin_lock_init(&gmnalni->gmni_rxthread_flag_lock);
+       sema_init(&gmnalni->gmni_rxtwe_wait, 0);
+       gmnalni->gmni_rxtwe_head = NULL;
+       gmnalni->gmni_rxtwe_tail = NULL;
         /*
          *      If the default number of receive threades isn't
          *      modified at load time, then start one thread per cpu
          */
         if (num_rx_threads == -1)
                 num_rx_threads = smp_num_cpus;
-       CDEBUG(D_INFO, "Starting [%d] receive threads\n", num_rx_threads);
+       CDEBUG(D_NET, "Starting [%d] receive threads\n", num_rx_threads);
        for (threads=0; threads<num_rx_threads; threads++) {
-               nal_data->rxthread_pid[threads] = 
-                      kernel_thread(gmnal_rx_thread, (void*)nal_data, 0);
-               if (nal_data->rxthread_pid[threads] <= 0) {
+               gmnalni->gmni_rxthread_pid[threads] = 
+                      kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0);
+               if (gmnalni->gmni_rxthread_pid[threads] <= 0) {
                        CERROR("Receive thread failed to start\n");
-                       gmnal_stop_rxthread(nal_data);
-                       gmnal_stop_ctthread(nal_data);
-                       return(GMNAL_STATUS_FAIL);
+                       gmnal_stop_rxthread(gmnalni);
+                       gmnal_stop_ctthread(gmnalni);
+                       return -ENOMEM;
                }
        }
 
        for (;;) {
-               spin_lock(&nal_data->rxthread_flag_lock);
-               if (nal_data->rxthread_flag == GMNAL_RXTHREADS_STARTED) {
-                       spin_unlock(&nal_data->rxthread_flag_lock);
+               spin_lock(&gmnalni->gmni_rxthread_flag_lock);
+               if (gmnalni->gmni_rxthread_flag == GMNAL_RXTHREADS_STARTED) {
+                       spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
                        break;
                }
-               spin_unlock(&nal_data->rxthread_flag_lock);
+               spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
                gmnal_yield(1);
        }
 
-       CDEBUG(D_INFO, "receive threads seem to have started\n");
+       CDEBUG(D_NET, "receive threads seem to have started\n");
 
-       return(GMNAL_STATUS_OK);
+       return 0;
 }