Whamcloud - gitweb
Landing b_hd_newconfig on HEAD
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_utils.c
index 00bedf5..9810731 100644 (file)
  *   along with Lustre; if not, write to the Free Software
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
-/*
- *     All utilities required by lgmanl
- */
 
-#include "gmnal.h"
+#include "gmlnd.h"
+
+void
+gmnal_free_netbuf_pages (gmnal_netbuf_t *nb, int npages) 
+{
+        int     i;
+        
+        for (i = 0; i < npages; i++)
+                __free_page(nb->nb_kiov[i].kiov_page);
+}
 
-/*
- *     Am I one of the gmnal rxthreads ?
- */
 int
-gmnal_is_rxthread(gmnal_ni_t *gmnalni)
+gmnal_alloc_netbuf_pages (gmnal_ni_t *gmni, gmnal_netbuf_t *nb, int npages)
 {
-       int i;
+        int          i;
+        gm_status_t  gmrc;
 
-       for (i = 0; i < gmnalni->gmni_nrxthreads; i++)
-               if (gmnalni->gmni_rxthread_pid[i] == current->pid)
-                       return 1;
-       return 0;
+        LASSERT (npages > 0);
+
+        for (i = 0; i < npages; i++) {
+                
+                nb->nb_kiov[i].kiov_page = alloc_page(GFP_KERNEL);
+                nb->nb_kiov[i].kiov_offset = 0;
+                nb->nb_kiov[i].kiov_len = PAGE_SIZE;
+
+                if (nb->nb_kiov[i].kiov_page == NULL) {
+                        CERROR("Can't allocate page\n");
+                        gmnal_free_netbuf_pages(nb, i);
+                        return -ENOMEM;
+                }
+
+                CDEBUG(D_NET,"[%3d] page %p, phys "LPX64", @ "LPX64"\n",
+                       i, nb->nb_kiov[i].kiov_page, 
+                       lnet_page2phys(nb->nb_kiov[i].kiov_page),
+                       gmni->gmni_netaddr_base);
+
+                gmrc = gm_register_memory_ex_phys(
+                        gmni->gmni_port,
+                        lnet_page2phys(nb->nb_kiov[i].kiov_page),
+                        PAGE_SIZE,
+                        gmni->gmni_netaddr_base);
+                CDEBUG(D_NET,"[%3d] page %p: %d\n", 
+                       i, nb->nb_kiov[i].kiov_page, gmrc);
+
+                if (gmrc != GM_SUCCESS) {
+                        CERROR("Can't map page: %d(%s)\n", gmrc,
+                               gmnal_gmstatus2str(gmrc));
+                        gmnal_free_netbuf_pages(nb, i+1);
+                        return -ENOMEM;
+                }
+                
+                if (i == 0) 
+                        nb->nb_netaddr = gmni->gmni_netaddr_base;
+                
+                gmni->gmni_netaddr_base += PAGE_SIZE;
+        }
+        
+        return 0;
 }
 
-gmnal_tx_t *
-gmnal_alloc_tx (gmnal_ni_t *gmnalni) 
+void
+gmnal_free_ltxbuf (gmnal_ni_t *gmni, gmnal_txbuf_t *txb)
 {
-        gmnal_tx_t  *tx;
-        void        *buffer;
+        int            npages = gmni->gmni_large_pages;
+
+        LASSERT (gmni->gmni_port == NULL);
+        /* No unmapping; the port has been closed */
+
+        gmnal_free_netbuf_pages(&txb->txb_buf, gmni->gmni_large_pages);
+        LIBCFS_FREE(txb, offsetof(gmnal_txbuf_t, txb_buf.nb_kiov[npages]));
+}
+
+int
+gmnal_alloc_ltxbuf (gmnal_ni_t *gmni)
+{
+        int            npages = gmni->gmni_large_pages;
+        int            sz = offsetof(gmnal_txbuf_t, txb_buf.nb_kiov[npages]);
+        gmnal_txbuf_t *txb;
+        int            rc;
         
-        PORTAL_ALLOC(tx, sizeof(*tx));
-        if (tx == NULL) {
-                CERROR ("Failed to allocate tx\n");
-                return NULL;
+        LIBCFS_ALLOC(txb, sz);
+        if (txb == NULL) {
+                CERROR("Can't allocate large txbuffer\n");
+                return -ENOMEM;
         }
-        
-        buffer = gm_dma_malloc(gmnalni->gmni_port, gmnalni->gmni_msg_size);
-        if (buffer == NULL) {
-                CERROR("Failed to gm_dma_malloc tx buffer size [%d]\n", 
-                       gmnalni->gmni_msg_size);
-                PORTAL_FREE(tx, sizeof(*tx));
-                return NULL;
+
+        rc = gmnal_alloc_netbuf_pages(gmni, &txb->txb_buf, npages);
+        if (rc != 0) {
+                LIBCFS_FREE(txb, sz);
+                return rc;
         }
 
-        memset(tx, 0, sizeof(*tx));
-        tx->tx_msg = (gmnal_msg_t *)buffer;
-        tx->tx_buffer_size = gmnalni->gmni_msg_size;
-        tx->tx_gm_size = gm_min_size_for_length(tx->tx_buffer_size);
-        tx->tx_gmni = gmnalni;
+        list_add_tail(&txb->txb_list, &gmni->gmni_idle_ltxbs);
 
-        CDEBUG(D_NET, "Created tx [%p] with buffer [%p], size [%d]\n", 
-               tx, tx->tx_msg, tx->tx_buffer_size);
+        txb->txb_next = gmni->gmni_ltxbs;
+        gmni->gmni_ltxbs = txb;
 
-        return tx;
+        return 0;
 }
 
 void
 gmnal_free_tx (gmnal_tx_t *tx)
 {
-        gmnal_ni_t *gmnalni = tx->tx_gmni;
-        
-        CDEBUG(D_NET, "Freeing tx [%p] with buffer [%p], size [%d]\n", 
-               tx, tx->tx_msg, tx->tx_buffer_size);
-#if 0
-        /* We free buffers after we've closed the GM port */
-        gm_dma_free(gmnalni->gmni_port, tx->tx_msg);
-#endif
-        PORTAL_FREE(tx, sizeof(*tx));
+        LASSERT (tx->tx_gmni->gmni_port == NULL);
+
+        gmnal_free_netbuf_pages(&tx->tx_buf, 1);
+        LIBCFS_FREE(tx, sizeof(*tx));
 }
 
 int
-gmnal_alloc_txs(gmnal_ni_t *gmnalni)
+gmnal_alloc_tx (gmnal_ni_t *gmni) 
 {
-       int           ntxcred = gm_num_send_tokens(gmnalni->gmni_port);
-       int           ntx;
-        int           nrxt_tx;
-        int           i;
-       gmnal_tx_t   *tx;
-
-        CWARN("ntxcred: %d\n", ntxcred);
-
-       ntx = num_txds;
-        nrxt_tx = num_txds + 1;
-
-        if (ntx + nrxt_tx > ntxcred) {
-                CERROR ("Asked for %d + %d tx credits, but only %d available\n",
-                        ntx, nrxt_tx, ntxcred);
+        gmnal_tx_t  *tx;
+        int          rc;
+        
+        LIBCFS_ALLOC(tx, sizeof(*tx));
+        if (tx == NULL) {
+                CERROR("Failed to allocate tx\n");
                 return -ENOMEM;
         }
         
-       /* A semaphore is initialised with the number of transmit tokens
-        * available.  To get a stxd, acquire the token semaphore.  this
-        * decrements the available token count (if no tokens you block here,
-        * someone returning a stxd will release the semaphore and wake you)
-        * When token is obtained acquire the spinlock to manipulate the
-        * list */
-       sema_init(&gmnalni->gmni_tx_token, ntx);
-       spin_lock_init(&gmnalni->gmni_tx_lock);
-        LASSERT (gmnalni->gmni_tx == NULL);
-
-       for (i = 0; i <= ntx; i++) {
-                tx = gmnal_alloc_tx(gmnalni);
-               if (tx == NULL) {
-                        CERROR("Failed to create tx %d\n", i);
-                        return -ENOMEM;
-                }
-                
-                tx->tx_rxt = 0;
-               tx->tx_next = gmnalni->gmni_tx;
-               gmnalni->gmni_tx = tx;
-       }
-
-       sema_init(&gmnalni->gmni_rxt_tx_token, nrxt_tx);
-       spin_lock_init(&gmnalni->gmni_rxt_tx_lock);
-        LASSERT (gmnalni->gmni_rxt_tx == NULL);
+        memset(tx, 0, sizeof(*tx));
 
-       for (i = 0; i <= nrxt_tx; i++) {
-                tx = gmnal_alloc_tx(gmnalni);
-               if (tx == NULL) {
-                        CERROR("Failed to create tx %d + %d\n", ntx, i);
-                        return -ENOMEM;
-                }
+        rc = gmnal_alloc_netbuf_pages(gmni, &tx->tx_buf, 1);
+        if (rc != 0) {
+                LIBCFS_FREE(tx, sizeof(*tx));
+                return -ENOMEM;
+        }
 
-                tx->tx_rxt = 1;
-               tx->tx_next = gmnalni->gmni_rxt_tx;
-               gmnalni->gmni_rxt_tx = tx;
-       }
+        tx->tx_gmni = gmni;
+        
+        list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
 
-       return 0;
+        tx->tx_next = gmni->gmni_txs;
+        gmni->gmni_txs = tx;
+                
+        return 0;
 }
 
 void
-gmnal_free_txs(gmnal_ni_t *gmnalni)
+gmnal_free_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
 {
-       gmnal_tx_t *tx;
-
-        while ((tx = gmnalni->gmni_tx) != NULL) {
-                gmnalni->gmni_tx = tx->tx_next;
-                gmnal_free_tx (tx);
-       }
+        int   npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
+        
+        LASSERT (gmni->gmni_port == NULL);
 
-        while ((tx = gmnalni->gmni_rxt_tx) != NULL) {
-                gmnalni->gmni_rxt_tx = tx->tx_next;
-                gmnal_free_tx (tx);
-       }
+        gmnal_free_netbuf_pages(&rx->rx_buf, npages);
+        LIBCFS_FREE(rx, offsetof(gmnal_rx_t, rx_buf.nb_kiov[npages]));
 }
 
-
-/*
- *     Get a tx from the list
- *     This get us a wired and gm_registered small tx buffer.
- *     This implicitly gets us a send token also.
- */
-gmnal_tx_t *
-gmnal_get_tx(gmnal_ni_t *gmnalni, int block)
+int
+gmnal_alloc_rx (gmnal_ni_t *gmni, int islarge)
 {
+        int         npages = islarge ? gmni->gmni_large_pages : 1;
+        int         sz = offsetof(gmnal_rx_t, rx_buf.nb_kiov[npages]);
+        int         rc;
+        gmnal_rx_t *rx;
+        gm_status_t gmrc;
+        
+        LIBCFS_ALLOC(rx, sz);
+        if (rx == NULL) {
+                CERROR("Failed to allocate rx\n");
+                return -ENOMEM;
+        }
+        
+        memset(rx, 0, sizeof(*rx));
 
-       gmnal_tx_t      *tx = NULL;
-       pid_t           pid = current->pid;
-
-
-       CDEBUG(D_TRACE, "gmnal_get_tx gmnalni [%p] block[%d] pid [%d]\n", 
-              gmnalni, block, pid);
-
-       if (gmnal_is_rxthread(gmnalni)) {
-                CDEBUG(D_NET, "RXTHREAD Attempting to get token\n");
-               down(&gmnalni->gmni_rxt_tx_token);
-               spin_lock(&gmnalni->gmni_rxt_tx_lock);
-               tx = gmnalni->gmni_rxt_tx;
-               gmnalni->gmni_rxt_tx = tx->tx_next;
-               spin_unlock(&gmnalni->gmni_rxt_tx_lock);
-               CDEBUG(D_NET, "RXTHREAD got [%p], head is [%p]\n", 
-                      tx, gmnalni->gmni_rxt_tx);
-                tx->tx_rxt = 1;
-        } else {
-               if (block) {
-                        CDEBUG(D_NET, "Attempting to get token\n");
-                       down(&gmnalni->gmni_tx_token);
-                        CDEBUG(D_PORTALS, "Got token\n");
-               } else {
-                       if (down_trylock(&gmnalni->gmni_tx_token)) {
-                               CERROR("can't get token\n");
-                               return(NULL);
-                       }
-               }
-               spin_lock(&gmnalni->gmni_tx_lock);
-               tx = gmnalni->gmni_tx;
-               gmnalni->gmni_tx = tx->tx_next;
-               spin_unlock(&gmnalni->gmni_tx_lock);
-               CDEBUG(D_NET, "got [%p], head is [%p]\n", tx,
-                      gmnalni->gmni_tx);
-        }       /* general tx get */
-
-       return tx;
+        rc = gmnal_alloc_netbuf_pages(gmni, &rx->rx_buf, npages);
+        if (rc != 0) {
+                LIBCFS_FREE(rx, sz);
+                return rc;
+        }
+        
+        rx->rx_islarge = islarge;
+        rx->rx_next = gmni->gmni_rxs;
+        gmni->gmni_rxs = rx;
+
+        gmrc = gm_hash_insert(gmni->gmni_rx_hash, 
+                              GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf), rx);
+        if (gmrc != GM_SUCCESS) {
+                CERROR("Couldn't add rx to hash table: %d\n", gmrc);
+                return -ENOMEM;
+        }
+        
+        return 0;
 }
 
-/*
- *     Return a tx to the list
- */
 void
-gmnal_return_tx(gmnal_ni_t *gmnalni, gmnal_tx_t *tx)
+gmnal_free_ltxbufs (gmnal_ni_t *gmni)
 {
-       CDEBUG(D_TRACE, "gmnalni [%p], tx[%p] rxt[%d]\n", gmnalni,
-              tx, tx->tx_rxt);
-
-        /*
-         *      this transmit descriptor is 
-         *      for the rxthread
-         */
-        if (tx->tx_rxt) {
-               spin_lock(&gmnalni->gmni_rxt_tx_lock);
-               tx->tx_next = gmnalni->gmni_rxt_tx;
-               gmnalni->gmni_rxt_tx = tx;
-               spin_unlock(&gmnalni->gmni_rxt_tx_lock);
-               up(&gmnalni->gmni_rxt_tx_token);
-                CDEBUG(D_NET, "Returned tx to rxthread list\n");
-        } else {
-               spin_lock(&gmnalni->gmni_tx_lock);
-               tx->tx_next = gmnalni->gmni_tx;
-               gmnalni->gmni_tx = tx;
-               spin_unlock(&gmnalni->gmni_tx_lock);
-               up(&gmnalni->gmni_tx_token);
-                CDEBUG(D_NET, "Returned tx to general list\n");
+        gmnal_txbuf_t *txb;
+        
+        while ((txb = gmni->gmni_ltxbs) != NULL) {
+                gmni->gmni_ltxbs = txb->txb_next;
+                gmnal_free_ltxbuf(gmni, txb);
         }
-       return;
 }
 
-
-/*
- *     allocate a number of small rx buffers and register with GM
- *     so they are wired and set up for DMA. This is a costly operation.
- *     Also allocate a corrosponding descriptor to keep track of 
- *     the buffer.
- *     Put all descriptors on singly linked list to be available to 
- *     receive thread.
- */
 int
-gmnal_alloc_rxs (gmnal_ni_t *gmnalni)
+gmnal_alloc_ltxbufs (gmnal_ni_t *gmni)
 {
-        int          nrxcred = gm_num_receive_tokens(gmnalni->gmni_port);
-        int          nrx;
-        int          i;
-       gmnal_rx_t  *rxd;
-       void        *rxbuffer;
-
-        CWARN("nrxcred: %d\n", nrxcred);
+        int     nlarge_tx_bufs = *gmnal_tunables.gm_nlarge_tx_bufs;
+        int     i;
+        int     rc;
 
-       nrx = num_txds*2 + 2;
-        if (nrx > nrxcred) {
-                CERROR("Can't allocate %d rx credits: (%d available)\n",
-                       nrx, nrxcred);
-                return -ENOMEM;
+        for (i = 0; i < nlarge_tx_bufs; i++) {
+                rc = gmnal_alloc_ltxbuf(gmni);
+                
+                if (rc != 0)
+                        return rc;
         }
 
-       CDEBUG(D_NET, "Allocated [%d] receive tokens to small messages\n", nrx);
+        return 0;
+}
 
-       gmnalni->gmni_rx_hash = gm_create_hash(gm_hash_compare_ptrs, 
-                                               gm_hash_hash_ptr, 0, 0, nrx, 0);
-       if (gmnalni->gmni_rx_hash == NULL) {
-                CERROR("Failed to create hash table\n");
-                return -ENOMEM;
-       }
+void
+gmnal_free_txs(gmnal_ni_t *gmni)
+{
+       gmnal_tx_t *tx;
 
-        LASSERT (gmnalni->gmni_rx == NULL);
-
-       for (i=0; i <= nrx; i++) {
-
-               PORTAL_ALLOC(rxd, sizeof(*rxd));
-               if (rxd == NULL) {
-                       CERROR("Failed to malloc rxd [%d]\n", i);
-                       return -ENOMEM;
-               }
-
-               rxbuffer = gm_dma_malloc(gmnalni->gmni_port, 
-                                        gmnalni->gmni_msg_size);
-               if (rxbuffer == NULL) {
-                       CERROR("Failed to gm_dma_malloc rxbuffer [%d], "
-                              "size [%d]\n",i ,gmnalni->gmni_msg_size);
-                       PORTAL_FREE(rxd, sizeof(*rxd));
-                       return -ENOMEM;
-               }
-
-               rxd->rx_msg = (gmnal_msg_t *)rxbuffer;
-               rxd->rx_size = gmnalni->gmni_msg_size;
-               rxd->rx_gmsize = gm_min_size_for_length(rxd->rx_size);
-
-               rxd->rx_next = gmnalni->gmni_rx;
-               gmnalni->gmni_rx = rxd;
-
-               if (gm_hash_insert(gmnalni->gmni_rx_hash,
-                                  (void*)rxbuffer, (void*)rxd)) {
-                       CERROR("failed to create hash entry rxd[%p] "
-                              "for rxbuffer[%p]\n", rxd, rxbuffer);
-                       return -ENOMEM;
-               }
-
-               CDEBUG(D_NET, "Registered rxd [%p] with buffer [%p], "
-                      "size [%d]\n", rxd, rxd->rx_msg, rxd->rx_size);
+        while ((tx = gmni->gmni_txs) != NULL) {
+                gmni->gmni_txs = tx->tx_next;
+                gmnal_free_tx (tx);
        }
-
-       return 0;
 }
 
-void
-gmnal_free_rxs(gmnal_ni_t *gmnalni)
+int
+gmnal_alloc_txs(gmnal_ni_t *gmni)
 {
-       gmnal_rx_t *rx;
-
-       CDEBUG(D_TRACE, "gmnal_free_small rx\n");
+        int           ntxcred = gm_num_send_tokens(gmni->gmni_port);
+        int           ntx = *gmnal_tunables.gm_ntx;
+        int           i;
+        int           rc;
 
-       while ((rx = gmnalni->gmni_rx) != NULL) {
-                gmnalni->gmni_rx = rx->rx_next;
+        CDEBUG(D_NET, "ntxcred: %d\n", ntxcred);
+        gmni->gmni_tx_credits = ntxcred;
 
-               CDEBUG(D_NET, "Freeing rxd [%p] buffer [%p], size [%d]\n",
-                      rx, rx->rx_msg, rx->rx_size);
-#if 0
-                /* We free buffers after we've shutdown the GM port */
-               gm_dma_free(gmnalni->gmni_port, _rxd->rx_msg);
-#endif
-               PORTAL_FREE(rx, sizeof(*rx));
-       }
+        for (i = 0; i < ntx; i++) {
+                rc = gmnal_alloc_tx(gmni);
+                if (rc != 0)
+                        return rc;
+        }
 
-#if 0
-        /* see above */
-        if (gmnalni->gmni_rx_hash != NULL)
-                gm_destroy_hash(gmnalni->gmni_rx_hash);
-#endif
+        return 0;
 }
 
 void
-gmnal_stop_threads(gmnal_ni_t *gmnalni)
+gmnal_free_rxs(gmnal_ni_t *gmni)
 {
-        int count = 2;
-        int i;
+       gmnal_rx_t *rx;
 
-        gmnalni->gmni_thread_shutdown = 1;
+       while ((rx = gmni->gmni_rxs) != NULL) {
+                gmni->gmni_rxs = rx->rx_next;
 
-        /* wake ctthread with an alarm */
-       spin_lock(&gmnalni->gmni_gm_lock);
-       gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 
-                     0, NULL, NULL);
-       spin_unlock(&gmnalni->gmni_gm_lock);
+                gmnal_free_rx(gmni, rx);
+        }
 
-        /* wake each rxthread */
-        for (i = 0; i < num_online_cpus(); i++)
-                up(&gmnalni->gmni_rxq_wait);
-        
-       while (atomic_read(&gmnalni->gmni_nthreads) != 0) {
-                count++;
-                if ((count & (count - 1)) == 0)
-                        CWARN("Waiting for %d threads to stop\n",
-                              atomic_read(&gmnalni->gmni_nthreads));
-                gmnal_yield(1);
-       }
+        LASSERT (gmni->gmni_port == NULL);
+#if 0
+        /* GM releases all resources allocated to a port when it closes */
+        if (gmni->gmni_rx_hash != NULL)
+                gm_destroy_hash(gmni->gmni_rx_hash);
+#endif
 }
 
-/*
- *     Start the caretaker thread and a number of receiver threads
- *     The caretaker thread gets events from the gm library.
- *     It passes receive events to the receiver threads via a work list.
- *     It processes other events itself in gm_unknown. These will be
- *     callback events or sleeps.
- */
 int
-gmnal_start_threads(gmnal_ni_t *gmnalni)
+gmnal_alloc_rxs (gmnal_ni_t *gmni)
 {
-        int     i;
-        int     pid;
-
-        gmnalni->gmni_thread_shutdown = 0;
-        gmnalni->gmni_nrxthreads = 0;
-        atomic_set(&gmnalni->gmni_nthreads, 0);
-
-        INIT_LIST_HEAD(&gmnalni->gmni_rxq);
-       spin_lock_init(&gmnalni->gmni_rxq_lock);
-       sema_init(&gmnalni->gmni_rxq_wait, 0);
-
-       /*
-        *      the alarm is used to wake the caretaker thread from 
-        *      gm_unknown call (sleeping) to exit it.
-        */
-       CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n");
-       gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm);
-
-        pid = kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0);
-       if (pid < 0) {
-               CERROR("Caretaker thread failed to start: %d\n", pid);
-               return pid;
-       }
-        atomic_inc(&gmnalni->gmni_nthreads);
-
-       for (i = 0; i < num_online_cpus(); i++) {
+        int          nrxcred = gm_num_receive_tokens(gmni->gmni_port);
+        int          nrx_small = *gmnal_tunables.gm_nrx_small;
+        int          nrx_large = *gmnal_tunables.gm_nrx_large;
+        int          nrx = nrx_large + nrx_small;
+        int          rc;
+        int          i;
 
-                pid = kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0);
-                if (pid < 0) {
-                        CERROR("rx thread failed to start: %d\n", pid);
-                        gmnal_stop_threads(gmnalni);
-                        return pid;
-                }
+        CDEBUG(D_NET, "nrxcred: %d(%dL+%dS)\n", nrxcred, nrx_large, nrx_small);
 
-                atomic_inc(&gmnalni->gmni_nthreads);
-               gmnalni->gmni_rxthread_pid[i] = pid;
-                gmnalni->gmni_nrxthreads++;
+        if (nrx > nrxcred) {
+                int nlarge = (nrx_large * nrxcred)/nrx;
+                int nsmall = nrxcred - nlarge;
+                
+                CWARN("Only %d rx credits: "
+                      "reducing large %d->%d, small %d->%d\n", nrxcred,
+                      nrx_large, nlarge, nrx_small, nsmall);
+                
+                *gmnal_tunables.gm_nrx_large = nrx_large = nlarge;
+                *gmnal_tunables.gm_nrx_small = nrx_small = nsmall;
+                nrx = nlarge + nsmall;
+        }
+        
+       gmni->gmni_rx_hash = gm_create_hash(gm_hash_compare_ptrs, 
+                                            gm_hash_hash_ptr, 0, 0, nrx, 0);
+       if (gmni->gmni_rx_hash == NULL) {
+                CERROR("Failed to create hash table\n");
+                return -ENOMEM;
        }
 
+        for (i = 0; i < nrx; i++ ) {
+                rc = gmnal_alloc_rx(gmni, i < nrx_large);
+                if (rc != 0)
+                        return rc;
+        }
+
        return 0;
 }
 
@@ -674,62 +577,3 @@ gmnal_yield(int delay)
        set_current_state(TASK_INTERRUPTIBLE);
        schedule_timeout(delay);
 }
-
-int
-gmnal_enqueue_rx(gmnal_ni_t *gmnalni, gm_recv_t *recv)
-{
-        void       *ptr = gm_ntohp(recv->buffer);
-        gmnal_rx_t *rx = gm_hash_find(gmnalni->gmni_rx_hash, ptr);
-
-        /* No locking; hash is read-only */
-
-       LASSERT (rx != NULL);
-        LASSERT (rx->rx_msg == (gmnal_msg_t *)ptr);
-
-        rx->rx_recv_nob = gm_ntohl(recv->length);
-        rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
-        rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
-        rx->rx_recv_type = gm_ntoh_u8(recv->type);
-        
-       spin_lock(&gmnalni->gmni_rxq_lock);
-        list_add_tail (&rx->rx_list, &gmnalni->gmni_rxq);
-       spin_unlock(&gmnalni->gmni_rxq_lock);
-
-       up(&gmnalni->gmni_rxq_wait);
-       return 0;
-}
-
-gmnal_rx_t *
-gmnal_dequeue_rx(gmnal_ni_t *gmnalni)
-{
-       gmnal_rx_t      *rx;
-
-       CDEBUG(D_NET, "Getting entry to list\n");
-
-        for (;;) {
-               while(down_interruptible(&gmnalni->gmni_rxq_wait) != 0)
-                        /* do nothing */;
-
-               if (gmnalni->gmni_thread_shutdown)
-                       return NULL;
-
-               spin_lock(&gmnalni->gmni_rxq_lock);
-
-                if (list_empty(&gmnalni->gmni_rxq)) {
-                        rx = NULL;
-                } else {
-                        rx = list_entry(gmnalni->gmni_rxq.next,
-                                        gmnal_rx_t, rx_list);
-                        list_del(&rx->rx_list);
-                }
-
-               spin_unlock(&gmnalni->gmni_rxq_lock);
-
-                if (rx != NULL)
-                        return rx;
-                
-                CWARN("woken but no work\n");
-       }
-}
-
-