Whamcloud - gitweb
Maintain separate list of transmit descriptors for large messages.
authormdoyle <mdoyle>
Mon, 13 Oct 2003 16:40:39 +0000 (16:40 +0000)
committermdoyle <mdoyle>
Mon, 13 Oct 2003 16:40:39 +0000 (16:40 +0000)
lnet/klnds/gmlnd/gmlnd.h
lnet/klnds/gmlnd/gmlnd_api.c
lnet/klnds/gmlnd/gmlnd_comm.c
lnet/klnds/gmlnd/gmlnd_utils.c
lustre/portals/knals/gmnal/gmnal.h
lustre/portals/knals/gmnal/gmnal_api.c
lustre/portals/knals/gmnal/gmnal_comm.c
lustre/portals/knals/gmnal/gmnal_utils.c

index 2db6c9b..c7c4c1d 100644 (file)
@@ -104,7 +104,6 @@ typedef struct _gmnal_stxd_t {
        lib_msg_t               *cookie;
        int                     niov;
        struct iovec            iov[PTL_MD_MAX_IOV];
-       struct  _gmnal_srxd_t  *srxd;
        struct _gmnal_stxd_t    *next;
         int                     rxt; 
         int                     kniov;
@@ -112,6 +111,16 @@ typedef struct _gmnal_stxd_t {
 } gmnal_stxd_t;
 
 /*
+ *     keeps a transmit token for large transmit (gm_get)
+ *     and a pointer to rxd that is used as context for large receive
+ */
+typedef struct _gmnal_ltxd_t {
+       struct _gmnal_ltxd_t    *next;
+       struct  _gmnal_srxd_t  *srxd;
+} gmnal_ltxd_t;
+
+
+/*
  *     as for gmnal_stxd_t 
  *     a hash table in nal_data find srxds from
  *     the rx buffer address. hash table populated at init time
@@ -181,6 +190,9 @@ typedef struct _gmnal_data_t {
        spinlock_t      rxt_stxd_lock;
        struct semaphore rxt_stxd_token;
        gmnal_stxd_t    *rxt_stxd;
+       spinlock_t      ltxd_lock;
+       struct semaphore ltxd_token;
+       gmnal_ltxd_t    *ltxd;
        spinlock_t      srxd_lock;
        struct semaphore srxd_token;
        gmnal_srxd_t    *srxd;
@@ -264,6 +276,14 @@ extern gmnal_data_t        *global_nal_data;
 #define GMNAL_RXT_TXD_TRYGETTOKEN(a)   down_trylock(&a->rxt_stxd_token)
 #define GMNAL_RXT_TXD_RETURNTOKEN(a)   up(&a->rxt_stxd_token);
 
+#define GMNAL_LTXD_LOCK_INIT(a)                spin_lock_init(&a->ltxd_lock);
+#define GMNAL_LTXD_LOCK(a)             spin_lock(&a->ltxd_lock);
+#define GMNAL_LTXD_UNLOCK(a)           spin_unlock(&a->ltxd_lock);
+#define GMNAL_LTXD_TOKEN_INIT(a, n)    sema_init(&a->ltxd_token, n);
+#define GMNAL_LTXD_GETTOKEN(a)         down(&a->ltxd_token);
+#define GMNAL_LTXD_TRYGETTOKEN(a)      down_trylock(&a->ltxd_token)
+#define GMNAL_LTXD_RETURNTOKEN(a)      up(&a->ltxd_token);
+
 #define GMNAL_RXD_LOCK_INIT(a)         spin_lock_init(&a->srxd_lock);
 #define GMNAL_RXD_LOCK(a)              spin_lock(&a->srxd_lock);
 #define GMNAL_RXD_UNLOCK(a)            spin_unlock(&a->srxd_lock);
@@ -376,12 +396,14 @@ void  gmnal_fini(void);
 
 
 /*
- *     Small Transmit and Receive Descriptor Functions
+ *     Small and Large Transmit and Receive Descriptor Functions
  */
-int            gmnal_alloc_stxd(gmnal_data_t *);
-void           gmnal_free_stxd(gmnal_data_t *);
+int            gmnal_alloc_txd(gmnal_data_t *);
+void           gmnal_free_txd(gmnal_data_t *);
 gmnal_stxd_t*  gmnal_get_stxd(gmnal_data_t *, int);
 void           gmnal_return_stxd(gmnal_data_t *, gmnal_stxd_t *);
+gmnal_ltxd_t*  gmnal_get_ltxd(gmnal_data_t *);
+void           gmnal_return_ltxd(gmnal_data_t *, gmnal_ltxd_t *);
 
 int            gmnal_alloc_srxd(gmnal_data_t *);
 void           gmnal_free_srxd(gmnal_data_t *);
index 40d23db..c22c91b 100644 (file)
@@ -307,7 +307,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
 
        if (gmnal_alloc_srxd(nal_data) != GMNAL_STATUS_OK) {
                CDEBUG(D_ERROR, "Failed to allocate small rx descriptors\n");
-               gmnal_free_stxd(nal_data);
+               gmnal_free_txd(nal_data);
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
                gm_finalize();
@@ -336,7 +336,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
        /*
         *      Allocate pools of small tx buffers and descriptors
         */
-       if (gmnal_alloc_stxd(nal_data) != GMNAL_STATUS_OK) {
+       if (gmnal_alloc_txd(nal_data) != GMNAL_STATUS_OK) {
                CDEBUG(D_ERROR, "Failed to allocate small tx descriptors\n");
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
@@ -369,7 +369,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
                gmnal_stop_rxthread(nal_data);
                gmnal_stop_ctthread(nal_data);
                CDEBUG(D_ERROR, "can't determine node id\n");
-               gmnal_free_stxd(nal_data);
+               gmnal_free_txd(nal_data);
                gmnal_free_srxd(nal_data);
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
@@ -390,7 +390,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
                CDEBUG(D_ERROR, "failed to obtain global id\n");
                gmnal_stop_rxthread(nal_data);
                gmnal_stop_ctthread(nal_data);
-               gmnal_free_stxd(nal_data);
+               gmnal_free_txd(nal_data);
                gmnal_free_srxd(nal_data);
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
@@ -417,7 +417,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
                CDEBUG(D_ERROR, "lib_init failed\n");
                gmnal_stop_rxthread(nal_data);
                gmnal_stop_ctthread(nal_data);
-               gmnal_free_stxd(nal_data);
+               gmnal_free_txd(nal_data);
                gmnal_free_srxd(nal_data);
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
@@ -453,7 +453,7 @@ void gmnal_fini()
 
        gmnal_stop_rxthread(nal_data);
        gmnal_stop_ctthread(nal_data);
-       gmnal_free_stxd(nal_data);
+       gmnal_free_txd(nal_data);
        gmnal_free_srxd(nal_data);
        GMNAL_GM_LOCK(nal_data);
        gm_close(nal_data->gm_port);
index 57e85d7..05934f3 100644 (file)
@@ -950,7 +950,7 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
        unsigned long   sbuf_long;
        gm_remote_ptr_t remote_ptr = 0;
        unsigned int    source_node;
-       gmnal_stxd_t    *stxd = NULL;
+       gmnal_ltxd_t    *ltxd = NULL;
        gmnal_data_t    *nal_data = srxd->nal_data;
 
        CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
@@ -989,8 +989,8 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                        ncalls++;
                        if (do_copy) {
                                CDEBUG(D_INFO, "slen>rlen\n");
-                               stxd = gmnal_get_stxd(nal_data, 1);
-                               stxd->srxd = srxd;
+                               ltxd = gmnal_get_ltxd(nal_data);
+                               ltxd->srxd = srxd;
                                GMNAL_GM_LOCK(nal_data);
                                /* 
                                 *      funny business to get rid 
@@ -1001,7 +1001,7 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                                gm_get(nal_data->gm_port, remote_ptr, rbuf, 
                                       rlen, GM_LOW_PRIORITY, source_node, 
                                       GMNAL_GM_PORT, 
-                                      gmnal_remote_get_callback, stxd);
+                                      gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
                        /*
@@ -1017,15 +1017,15 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                        ncalls++;
                        if (do_copy) {
                                CDEBUG(D_INFO, "slen<rlen\n");
-                               stxd = gmnal_get_stxd(nal_data, 1);
-                               stxd->srxd = srxd;
+                               ltxd = gmnal_get_ltxd(nal_data);
+                               ltxd->srxd = srxd;
                                GMNAL_GM_LOCK(nal_data);
                                sbuf_long = (unsigned long) sbuf;
                                remote_ptr = (gm_remote_ptr_t)sbuf_long;
                                gm_get(nal_data->gm_port, remote_ptr, rbuf, 
                                       slen, GM_LOW_PRIORITY, source_node, 
                                       GMNAL_GM_PORT, 
-                                      gmnal_remote_get_callback, stxd);
+                                      gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
                        /*
@@ -1040,15 +1040,15 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                        ncalls++;
                        if (do_copy) {
                                CDEBUG(D_INFO, "rlen=slen\n");
-                               stxd = gmnal_get_stxd(nal_data, 1);
-                               stxd->srxd = srxd;
+                               ltxd = gmnal_get_ltxd(nal_data);
+                               ltxd->srxd = srxd;
                                GMNAL_GM_LOCK(nal_data);
                                sbuf_long = (unsigned long) sbuf;
                                remote_ptr = (gm_remote_ptr_t)sbuf_long;
                                gm_get(nal_data->gm_port, remote_ptr, rbuf, 
                                       rlen, GM_LOW_PRIORITY, source_node, 
                                       GMNAL_GM_PORT, 
-                                      gmnal_remote_get_callback, stxd);
+                                      gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
                        /*
@@ -1078,8 +1078,8 @@ gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
                           gm_status_t status)
 {
 
-       gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       gmnal_srxd_t    *srxd = stxd->srxd;
+       gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
+       gmnal_srxd_t    *srxd = ltxd->srxd;
        nal_cb_t        *nal_cb = srxd->nal_data->nal_cb;
        int             lastone;
        struct  iovec   *riov;
@@ -1103,7 +1103,7 @@ gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
        /*
         *      everyone returns a send token
         */
-       gmnal_return_stxd(nal_data, stxd);
+       gmnal_return_ltxd(nal_data, ltxd);
 
        if (!lastone) {
                CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
index d40a943..eebf92a 100644 (file)
@@ -40,35 +40,53 @@ gmnal_is_rxthread(gmnal_data_t *nal_data)
 
 
 /*
+ *     Allocate tx descriptors/tokens (large and small)
  *     allocate a number of small tx buffers and register with GM
  *     so they are wired and set up for DMA. This is a costly operation.
  *     Also allocate a corrosponding descriptor to keep track of 
  *     the buffer.
- *     Put all descriptors on singly linked list to be available to send 
+ *     Put all small descriptors on singly linked list to be available to send 
  *     function.
+ *     Allocate the rest of the available tx tokens for large messages. These will be
+ *     used to do gm_gets in gmnal_copyiov     
  */
 int
-gmnal_alloc_stxd(gmnal_data_t *nal_data)
+gmnal_alloc_txd(gmnal_data_t *nal_data)
 {
-       int ntx = 0, nstx = 0, i = 0, nrxt_stx = 10;
+       int ntx= 0, nstx= 0, nrxt_stx= 0,
+           nltx= 0, i = 0;
        gmnal_stxd_t    *txd = NULL;
+       gmnal_ltxd_t    *ltxd = NULL;
        void    *txbuffer = NULL;
 
        CDEBUG(D_TRACE, "gmnal_alloc_small tx\n");
 
        GMNAL_GM_LOCK(nal_data);
+       /*
+        *      total number of transmit tokens
+        */
        ntx = gm_num_send_tokens(nal_data->gm_port);
        GMNAL_GM_UNLOCK(nal_data);
        CDEBUG(D_INFO, "total number of send tokens available is [%d]\n", ntx);
        
-       nstx = ntx/2;
        /*
-        * num_stxds from gmnal_module.c
+        *      allocate a number for small sends
+        *      num_stxds from gmnal_module.c
         */
        nstx = num_stxds;
+       /*
+        *      give that number plus 1 to the receive threads
+        */
         nrxt_stx = nstx + 1;
 
-       CDEBUG(D_INFO, "Allocated [%d] send tokens to small messages\n", nstx);
+       /*
+        *      give the rest for gm_gets
+        */
+       nltx = ntx - (nrxt_stx + nstx);
+       if (nltx < 1) {
+               CDEBUG(D_ERROR, "No tokens available for large messages\n");
+               return(GMNAL_STATUS_FAIL);
+       }
 
 
        /*
@@ -85,6 +103,8 @@ gmnal_alloc_stxd(gmnal_data_t *nal_data)
        GMNAL_TXD_LOCK_INIT(nal_data);
        GMNAL_RXT_TXD_TOKEN_INIT(nal_data, nrxt_stx);
        GMNAL_RXT_TXD_LOCK_INIT(nal_data);
+       GMNAL_LTXD_TOKEN_INIT(nal_data, nltx);
+       GMNAL_LTXD_LOCK_INIT(nal_data);
        
        for (i=0; i<=nstx; i++) {
                PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
@@ -144,6 +164,14 @@ gmnal_alloc_stxd(gmnal_data_t *nal_data)
                       size [%d]\n", txd, txd->buffer, txd->buffer_size);
        }
 
+       /*
+        *      string together large tokens
+        */
+       for (i=0; i<=nltx ; i++) {
+               PORTAL_ALLOC(ltxd, sizeof(gmnal_ltxd_t));
+               ltxd->next = nal_data->ltxd;
+               nal_data->ltxd = ltxd;
+       }
        return(GMNAL_STATUS_OK);
 }
 
@@ -151,9 +179,10 @@ gmnal_alloc_stxd(gmnal_data_t *nal_data)
  *     the tx descriptors that go along with them.
  */
 void
-gmnal_free_stxd(gmnal_data_t *nal_data)
+gmnal_free_txd(gmnal_data_t *nal_data)
 {
        gmnal_stxd_t *txd = nal_data->stxd, *_txd = NULL;
+       gmnal_ltxd_t *ltxd = NULL, *_ltxd = NULL;
 
        CDEBUG(D_TRACE, "gmnal_free_small tx\n");
 
@@ -178,6 +207,13 @@ gmnal_free_stxd(gmnal_data_t *nal_data)
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
        }
+       ltxd = nal_data->ltxd;
+       while(txd) {
+               _ltxd = ltxd;
+               ltxd = ltxd->next;
+               PORTAL_FREE(_ltxd, sizeof(gmnal_ltxd_t));
+       }
+       
        return;
 }
 
@@ -203,8 +239,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block)
                GMNAL_RXT_TXD_GETTOKEN(nal_data);
                GMNAL_RXT_TXD_LOCK(nal_data);
                txd = nal_data->rxt_stxd;
-               if (txd)
-                       nal_data->rxt_stxd = txd->next;
+               nal_data->rxt_stxd = txd->next;
                GMNAL_RXT_TXD_UNLOCK(nal_data);
                CDEBUG(D_INFO, "RXTHREAD got [%p], head is [%p]\n", 
                       txd, nal_data->rxt_stxd);
@@ -223,8 +258,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block)
                }
                GMNAL_TXD_LOCK(nal_data);
                txd = nal_data->stxd;
-               if (txd)
-                       nal_data->stxd = txd->next;
+               nal_data->stxd = txd->next;
                GMNAL_TXD_UNLOCK(nal_data);
                CDEBUG(D_INFO, "got [%p], head is [%p]\n", txd, 
                       nal_data->stxd);
@@ -266,6 +300,43 @@ gmnal_return_stxd(gmnal_data_t *nal_data, gmnal_stxd_t *txd)
 
 
 /*
+ *     Get a large transmit descriptor from the free list
+ *     This implicitly gets us a transmit  token .
+ *     always wait for one.
+ */
+gmnal_ltxd_t *
+gmnal_get_ltxd(gmnal_data_t *nal_data)
+{
+
+       gmnal_ltxd_t    *ltxd = NULL;
+
+       CDEBUG(D_TRACE, "nal_data [%p]\n", nal_data);
+
+       GMNAL_LTXD_GETTOKEN(nal_data);
+       GMNAL_LTXD_LOCK(nal_data);
+       ltxd = nal_data->ltxd;
+       nal_data->ltxd = ltxd->next;
+       GMNAL_LTXD_UNLOCK(nal_data);
+       CDEBUG(D_INFO, "got [%p], head is [%p]\n", ltxd, nal_data->ltxd);
+       return(ltxd);
+}
+
+/*
+ *     Return an ltxd to the list
+ */
+void
+gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd)
+{
+       CDEBUG(D_TRACE, "nal_data [%p], ltxd[%p]\n", nal_data, ltxd);
+
+       GMNAL_LTXD_LOCK(nal_data);
+       ltxd->next = nal_data->ltxd;
+       nal_data->ltxd = ltxd;
+       GMNAL_LTXD_UNLOCK(nal_data);
+       GMNAL_LTXD_RETURNTOKEN(nal_data);
+       return;
+}
+/*
  *     allocate a number of small rx buffers and register with GM
  *     so they are wired and set up for DMA. This is a costly operation.
  *     Also allocate a corrosponding descriptor to keep track of 
@@ -1006,10 +1077,10 @@ EXPORT_SYMBOL(gmnal_alloc_srxd);
 EXPORT_SYMBOL(gmnal_get_srxd);
 EXPORT_SYMBOL(gmnal_return_srxd);
 EXPORT_SYMBOL(gmnal_free_srxd);
-EXPORT_SYMBOL(gmnal_alloc_stxd);
+EXPORT_SYMBOL(gmnal_alloc_txd);
 EXPORT_SYMBOL(gmnal_get_stxd);
 EXPORT_SYMBOL(gmnal_return_stxd);
-EXPORT_SYMBOL(gmnal_free_stxd);
+EXPORT_SYMBOL(gmnal_free_txd);
 EXPORT_SYMBOL(gmnal_rxbuffer_to_srxd);
 EXPORT_SYMBOL(gmnal_rxevent);
 EXPORT_SYMBOL(gmnal_gm_error);
index 2db6c9b..c7c4c1d 100644 (file)
@@ -104,7 +104,6 @@ typedef struct _gmnal_stxd_t {
        lib_msg_t               *cookie;
        int                     niov;
        struct iovec            iov[PTL_MD_MAX_IOV];
-       struct  _gmnal_srxd_t  *srxd;
        struct _gmnal_stxd_t    *next;
         int                     rxt; 
         int                     kniov;
@@ -112,6 +111,16 @@ typedef struct _gmnal_stxd_t {
 } gmnal_stxd_t;
 
 /*
+ *     keeps a transmit token for large transmit (gm_get)
+ *     and a pointer to rxd that is used as context for large receive
+ */
+typedef struct _gmnal_ltxd_t {
+       struct _gmnal_ltxd_t    *next;
+       struct  _gmnal_srxd_t  *srxd;
+} gmnal_ltxd_t;
+
+
+/*
  *     as for gmnal_stxd_t 
  *     a hash table in nal_data find srxds from
  *     the rx buffer address. hash table populated at init time
@@ -181,6 +190,9 @@ typedef struct _gmnal_data_t {
        spinlock_t      rxt_stxd_lock;
        struct semaphore rxt_stxd_token;
        gmnal_stxd_t    *rxt_stxd;
+       spinlock_t      ltxd_lock;
+       struct semaphore ltxd_token;
+       gmnal_ltxd_t    *ltxd;
        spinlock_t      srxd_lock;
        struct semaphore srxd_token;
        gmnal_srxd_t    *srxd;
@@ -264,6 +276,14 @@ extern gmnal_data_t        *global_nal_data;
 #define GMNAL_RXT_TXD_TRYGETTOKEN(a)   down_trylock(&a->rxt_stxd_token)
 #define GMNAL_RXT_TXD_RETURNTOKEN(a)   up(&a->rxt_stxd_token);
 
+#define GMNAL_LTXD_LOCK_INIT(a)                spin_lock_init(&a->ltxd_lock);
+#define GMNAL_LTXD_LOCK(a)             spin_lock(&a->ltxd_lock);
+#define GMNAL_LTXD_UNLOCK(a)           spin_unlock(&a->ltxd_lock);
+#define GMNAL_LTXD_TOKEN_INIT(a, n)    sema_init(&a->ltxd_token, n);
+#define GMNAL_LTXD_GETTOKEN(a)         down(&a->ltxd_token);
+#define GMNAL_LTXD_TRYGETTOKEN(a)      down_trylock(&a->ltxd_token)
+#define GMNAL_LTXD_RETURNTOKEN(a)      up(&a->ltxd_token);
+
 #define GMNAL_RXD_LOCK_INIT(a)         spin_lock_init(&a->srxd_lock);
 #define GMNAL_RXD_LOCK(a)              spin_lock(&a->srxd_lock);
 #define GMNAL_RXD_UNLOCK(a)            spin_unlock(&a->srxd_lock);
@@ -376,12 +396,14 @@ void  gmnal_fini(void);
 
 
 /*
- *     Small Transmit and Receive Descriptor Functions
+ *     Small and Large Transmit and Receive Descriptor Functions
  */
-int            gmnal_alloc_stxd(gmnal_data_t *);
-void           gmnal_free_stxd(gmnal_data_t *);
+int            gmnal_alloc_txd(gmnal_data_t *);
+void           gmnal_free_txd(gmnal_data_t *);
 gmnal_stxd_t*  gmnal_get_stxd(gmnal_data_t *, int);
 void           gmnal_return_stxd(gmnal_data_t *, gmnal_stxd_t *);
+gmnal_ltxd_t*  gmnal_get_ltxd(gmnal_data_t *);
+void           gmnal_return_ltxd(gmnal_data_t *, gmnal_ltxd_t *);
 
 int            gmnal_alloc_srxd(gmnal_data_t *);
 void           gmnal_free_srxd(gmnal_data_t *);
index 40d23db..c22c91b 100644 (file)
@@ -307,7 +307,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
 
        if (gmnal_alloc_srxd(nal_data) != GMNAL_STATUS_OK) {
                CDEBUG(D_ERROR, "Failed to allocate small rx descriptors\n");
-               gmnal_free_stxd(nal_data);
+               gmnal_free_txd(nal_data);
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
                gm_finalize();
@@ -336,7 +336,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
        /*
         *      Allocate pools of small tx buffers and descriptors
         */
-       if (gmnal_alloc_stxd(nal_data) != GMNAL_STATUS_OK) {
+       if (gmnal_alloc_txd(nal_data) != GMNAL_STATUS_OK) {
                CDEBUG(D_ERROR, "Failed to allocate small tx descriptors\n");
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
@@ -369,7 +369,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
                gmnal_stop_rxthread(nal_data);
                gmnal_stop_ctthread(nal_data);
                CDEBUG(D_ERROR, "can't determine node id\n");
-               gmnal_free_stxd(nal_data);
+               gmnal_free_txd(nal_data);
                gmnal_free_srxd(nal_data);
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
@@ -390,7 +390,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
                CDEBUG(D_ERROR, "failed to obtain global id\n");
                gmnal_stop_rxthread(nal_data);
                gmnal_stop_ctthread(nal_data);
-               gmnal_free_stxd(nal_data);
+               gmnal_free_txd(nal_data);
                gmnal_free_srxd(nal_data);
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
@@ -417,7 +417,7 @@ gmnal_init(int interface, ptl_pt_index_t ptl_size, ptl_ac_index_t ac_size,
                CDEBUG(D_ERROR, "lib_init failed\n");
                gmnal_stop_rxthread(nal_data);
                gmnal_stop_ctthread(nal_data);
-               gmnal_free_stxd(nal_data);
+               gmnal_free_txd(nal_data);
                gmnal_free_srxd(nal_data);
                GMNAL_GM_LOCK(nal_data);
                gm_close(nal_data->gm_port);
@@ -453,7 +453,7 @@ void gmnal_fini()
 
        gmnal_stop_rxthread(nal_data);
        gmnal_stop_ctthread(nal_data);
-       gmnal_free_stxd(nal_data);
+       gmnal_free_txd(nal_data);
        gmnal_free_srxd(nal_data);
        GMNAL_GM_LOCK(nal_data);
        gm_close(nal_data->gm_port);
index 57e85d7..05934f3 100644 (file)
@@ -950,7 +950,7 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
        unsigned long   sbuf_long;
        gm_remote_ptr_t remote_ptr = 0;
        unsigned int    source_node;
-       gmnal_stxd_t    *stxd = NULL;
+       gmnal_ltxd_t    *ltxd = NULL;
        gmnal_data_t    *nal_data = srxd->nal_data;
 
        CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
@@ -989,8 +989,8 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                        ncalls++;
                        if (do_copy) {
                                CDEBUG(D_INFO, "slen>rlen\n");
-                               stxd = gmnal_get_stxd(nal_data, 1);
-                               stxd->srxd = srxd;
+                               ltxd = gmnal_get_ltxd(nal_data);
+                               ltxd->srxd = srxd;
                                GMNAL_GM_LOCK(nal_data);
                                /* 
                                 *      funny business to get rid 
@@ -1001,7 +1001,7 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                                gm_get(nal_data->gm_port, remote_ptr, rbuf, 
                                       rlen, GM_LOW_PRIORITY, source_node, 
                                       GMNAL_GM_PORT, 
-                                      gmnal_remote_get_callback, stxd);
+                                      gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
                        /*
@@ -1017,15 +1017,15 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                        ncalls++;
                        if (do_copy) {
                                CDEBUG(D_INFO, "slen<rlen\n");
-                               stxd = gmnal_get_stxd(nal_data, 1);
-                               stxd->srxd = srxd;
+                               ltxd = gmnal_get_ltxd(nal_data);
+                               ltxd->srxd = srxd;
                                GMNAL_GM_LOCK(nal_data);
                                sbuf_long = (unsigned long) sbuf;
                                remote_ptr = (gm_remote_ptr_t)sbuf_long;
                                gm_get(nal_data->gm_port, remote_ptr, rbuf, 
                                       slen, GM_LOW_PRIORITY, source_node, 
                                       GMNAL_GM_PORT, 
-                                      gmnal_remote_get_callback, stxd);
+                                      gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
                        /*
@@ -1040,15 +1040,15 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                        ncalls++;
                        if (do_copy) {
                                CDEBUG(D_INFO, "rlen=slen\n");
-                               stxd = gmnal_get_stxd(nal_data, 1);
-                               stxd->srxd = srxd;
+                               ltxd = gmnal_get_ltxd(nal_data);
+                               ltxd->srxd = srxd;
                                GMNAL_GM_LOCK(nal_data);
                                sbuf_long = (unsigned long) sbuf;
                                remote_ptr = (gm_remote_ptr_t)sbuf_long;
                                gm_get(nal_data->gm_port, remote_ptr, rbuf, 
                                       rlen, GM_LOW_PRIORITY, source_node, 
                                       GMNAL_GM_PORT, 
-                                      gmnal_remote_get_callback, stxd);
+                                      gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
                        /*
@@ -1078,8 +1078,8 @@ gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
                           gm_status_t status)
 {
 
-       gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       gmnal_srxd_t    *srxd = stxd->srxd;
+       gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
+       gmnal_srxd_t    *srxd = ltxd->srxd;
        nal_cb_t        *nal_cb = srxd->nal_data->nal_cb;
        int             lastone;
        struct  iovec   *riov;
@@ -1103,7 +1103,7 @@ gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
        /*
         *      everyone returns a send token
         */
-       gmnal_return_stxd(nal_data, stxd);
+       gmnal_return_ltxd(nal_data, ltxd);
 
        if (!lastone) {
                CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
index d40a943..eebf92a 100644 (file)
@@ -40,35 +40,53 @@ gmnal_is_rxthread(gmnal_data_t *nal_data)
 
 
 /*
+ *     Allocate tx descriptors/tokens (large and small)
  *     allocate a number of small tx buffers and register with GM
  *     so they are wired and set up for DMA. This is a costly operation.
  *     Also allocate a corrosponding descriptor to keep track of 
  *     the buffer.
- *     Put all descriptors on singly linked list to be available to send 
+ *     Put all small descriptors on singly linked list to be available to send 
  *     function.
+ *     Allocate the rest of the available tx tokens for large messages. These will be
+ *     used to do gm_gets in gmnal_copyiov     
  */
 int
-gmnal_alloc_stxd(gmnal_data_t *nal_data)
+gmnal_alloc_txd(gmnal_data_t *nal_data)
 {
-       int ntx = 0, nstx = 0, i = 0, nrxt_stx = 10;
+       int ntx= 0, nstx= 0, nrxt_stx= 0,
+           nltx= 0, i = 0;
        gmnal_stxd_t    *txd = NULL;
+       gmnal_ltxd_t    *ltxd = NULL;
        void    *txbuffer = NULL;
 
        CDEBUG(D_TRACE, "gmnal_alloc_small tx\n");
 
        GMNAL_GM_LOCK(nal_data);
+       /*
+        *      total number of transmit tokens
+        */
        ntx = gm_num_send_tokens(nal_data->gm_port);
        GMNAL_GM_UNLOCK(nal_data);
        CDEBUG(D_INFO, "total number of send tokens available is [%d]\n", ntx);
        
-       nstx = ntx/2;
        /*
-        * num_stxds from gmnal_module.c
+        *      allocate a number for small sends
+        *      num_stxds from gmnal_module.c
         */
        nstx = num_stxds;
+       /*
+        *      give that number plus 1 to the receive threads
+        */
         nrxt_stx = nstx + 1;
 
-       CDEBUG(D_INFO, "Allocated [%d] send tokens to small messages\n", nstx);
+       /*
+        *      give the rest for gm_gets
+        */
+       nltx = ntx - (nrxt_stx + nstx);
+       if (nltx < 1) {
+               CDEBUG(D_ERROR, "No tokens available for large messages\n");
+               return(GMNAL_STATUS_FAIL);
+       }
 
 
        /*
@@ -85,6 +103,8 @@ gmnal_alloc_stxd(gmnal_data_t *nal_data)
        GMNAL_TXD_LOCK_INIT(nal_data);
        GMNAL_RXT_TXD_TOKEN_INIT(nal_data, nrxt_stx);
        GMNAL_RXT_TXD_LOCK_INIT(nal_data);
+       GMNAL_LTXD_TOKEN_INIT(nal_data, nltx);
+       GMNAL_LTXD_LOCK_INIT(nal_data);
        
        for (i=0; i<=nstx; i++) {
                PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
@@ -144,6 +164,14 @@ gmnal_alloc_stxd(gmnal_data_t *nal_data)
                       size [%d]\n", txd, txd->buffer, txd->buffer_size);
        }
 
+       /*
+        *      string together large tokens
+        */
+       for (i=0; i<=nltx ; i++) {
+               PORTAL_ALLOC(ltxd, sizeof(gmnal_ltxd_t));
+               ltxd->next = nal_data->ltxd;
+               nal_data->ltxd = ltxd;
+       }
        return(GMNAL_STATUS_OK);
 }
 
@@ -151,9 +179,10 @@ gmnal_alloc_stxd(gmnal_data_t *nal_data)
  *     the tx descriptors that go along with them.
  */
 void
-gmnal_free_stxd(gmnal_data_t *nal_data)
+gmnal_free_txd(gmnal_data_t *nal_data)
 {
        gmnal_stxd_t *txd = nal_data->stxd, *_txd = NULL;
+       gmnal_ltxd_t *ltxd = NULL, *_ltxd = NULL;
 
        CDEBUG(D_TRACE, "gmnal_free_small tx\n");
 
@@ -178,6 +207,13 @@ gmnal_free_stxd(gmnal_data_t *nal_data)
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
        }
+       ltxd = nal_data->ltxd;
+       while(txd) {
+               _ltxd = ltxd;
+               ltxd = ltxd->next;
+               PORTAL_FREE(_ltxd, sizeof(gmnal_ltxd_t));
+       }
+       
        return;
 }
 
@@ -203,8 +239,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block)
                GMNAL_RXT_TXD_GETTOKEN(nal_data);
                GMNAL_RXT_TXD_LOCK(nal_data);
                txd = nal_data->rxt_stxd;
-               if (txd)
-                       nal_data->rxt_stxd = txd->next;
+               nal_data->rxt_stxd = txd->next;
                GMNAL_RXT_TXD_UNLOCK(nal_data);
                CDEBUG(D_INFO, "RXTHREAD got [%p], head is [%p]\n", 
                       txd, nal_data->rxt_stxd);
@@ -223,8 +258,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block)
                }
                GMNAL_TXD_LOCK(nal_data);
                txd = nal_data->stxd;
-               if (txd)
-                       nal_data->stxd = txd->next;
+               nal_data->stxd = txd->next;
                GMNAL_TXD_UNLOCK(nal_data);
                CDEBUG(D_INFO, "got [%p], head is [%p]\n", txd, 
                       nal_data->stxd);
@@ -266,6 +300,43 @@ gmnal_return_stxd(gmnal_data_t *nal_data, gmnal_stxd_t *txd)
 
 
 /*
+ *     Get a large transmit descriptor from the free list
+ *     This implicitly gets us a transmit  token .
+ *     always wait for one.
+ */
+gmnal_ltxd_t *
+gmnal_get_ltxd(gmnal_data_t *nal_data)
+{
+
+       gmnal_ltxd_t    *ltxd = NULL;
+
+       CDEBUG(D_TRACE, "nal_data [%p]\n", nal_data);
+
+       GMNAL_LTXD_GETTOKEN(nal_data);
+       GMNAL_LTXD_LOCK(nal_data);
+       ltxd = nal_data->ltxd;
+       nal_data->ltxd = ltxd->next;
+       GMNAL_LTXD_UNLOCK(nal_data);
+       CDEBUG(D_INFO, "got [%p], head is [%p]\n", ltxd, nal_data->ltxd);
+       return(ltxd);
+}
+
+/*
+ *     Return an ltxd to the list
+ */
+void
+gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd)
+{
+       CDEBUG(D_TRACE, "nal_data [%p], ltxd[%p]\n", nal_data, ltxd);
+
+       GMNAL_LTXD_LOCK(nal_data);
+       ltxd->next = nal_data->ltxd;
+       nal_data->ltxd = ltxd;
+       GMNAL_LTXD_UNLOCK(nal_data);
+       GMNAL_LTXD_RETURNTOKEN(nal_data);
+       return;
+}
+/*
  *     allocate a number of small rx buffers and register with GM
  *     so they are wired and set up for DMA. This is a costly operation.
  *     Also allocate a corrosponding descriptor to keep track of 
@@ -1006,10 +1077,10 @@ EXPORT_SYMBOL(gmnal_alloc_srxd);
 EXPORT_SYMBOL(gmnal_get_srxd);
 EXPORT_SYMBOL(gmnal_return_srxd);
 EXPORT_SYMBOL(gmnal_free_srxd);
-EXPORT_SYMBOL(gmnal_alloc_stxd);
+EXPORT_SYMBOL(gmnal_alloc_txd);
 EXPORT_SYMBOL(gmnal_get_stxd);
 EXPORT_SYMBOL(gmnal_return_stxd);
-EXPORT_SYMBOL(gmnal_free_stxd);
+EXPORT_SYMBOL(gmnal_free_txd);
 EXPORT_SYMBOL(gmnal_rxbuffer_to_srxd);
 EXPORT_SYMBOL(gmnal_rxevent);
 EXPORT_SYMBOL(gmnal_gm_error);