Whamcloud - gitweb
update patchless client for support 2.6.20, 2.6.21 and RHEL5 kernels.
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
index b5213a5..ea6a8d1 100644 (file)
  *     This file contains all gmnal send and receive functions
  */
 
-#include "gmnal.h"
+#include "gmlnd.h"
 
 void
-gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
-               ptl_nid_t dstnid, int type)
+gmnal_notify_peer_down(gmnal_tx_t *tx)
 {
-        gmnal_msg_t *msg = tx->tx_msg;
+        struct timeval     now;
+        time_t             then;
 
+        do_gettimeofday (&now);
+        then = now.tv_sec - (jiffies - tx->tx_launchtime)/HZ;
+
+        lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
+}
+
+void
+gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
+               lnet_nid_t dstnid, int type)
+{
         /* CAVEAT EMPTOR! this only sets the common message fields. */
         msg->gmm_magic    = GMNAL_MSG_MAGIC;
         msg->gmm_version  = GMNAL_MSG_VERSION;
         msg->gmm_type     = type;
-        msg->gmm_srcnid   = gmnalni->gmni_libnal->libnal_ni.ni_pid.nid;
+        msg->gmm_srcnid   = lnet_ptlcompat_srcnid(gmni->gmni_ni->ni_nid,
+                                                  dstnid);
         msg->gmm_dstnid   = dstnid;
 }
 
 int
-gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
+gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
 {
-        gmnal_msg_t *msg = rx->rx_msg;
+        gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
         const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);
+        int          buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
+                                                gmni->gmni_small_msgsize;
         int          flip;
 
+        /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
+
+        /* GM may not overflow our buffer */
+        LASSERT (rx->rx_recv_nob <= buffnob);
+
         /* 6 bytes are enough to have received magic + version */
         if (rx->rx_recv_nob < 6) {
                 CERROR("Short message from gmid %u: %d\n", 
@@ -57,6 +75,9 @@ gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
                 flip = 0;
         } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
                 flip = 1;
+        } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
+                   msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
+                return EPROTO;
         } else {
                 CERROR("Bad magic from gmid %u: %08x\n", 
                        rx->rx_recv_gmid, msg->gmm_magic);
@@ -65,9 +86,7 @@ gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
 
         if (msg->gmm_version != 
             (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
-                CERROR("Bad version from gmid %u: %d\n", 
-                       rx->rx_recv_gmid, msg->gmm_version);
-                return -EPROTO;
+                return EPROTO;
         }
 
         if (rx->rx_recv_nob < hdr_size) {
@@ -84,15 +103,16 @@ gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
                 __swab64s(&msg->gmm_dstnid);
         }
         
-        if (msg->gmm_srcnid == PTL_NID_ANY) {
-                CERROR("Bad src nid from %u: "LPX64"\n", 
-                       rx->rx_recv_gmid, msg->gmm_srcnid);
+        if (msg->gmm_srcnid == LNET_NID_ANY) {
+                CERROR("Bad src nid from %u: %s\n", 
+                       rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
                 return -EPROTO;
         }
 
-        if (msg->gmm_dstnid != gmnalni->gmni_libnal->libnal_ni.ni_pid.nid) {
-                CERROR("Bad dst nid from %u: "LPX64"\n",
-                       rx->rx_recv_gmid, msg->gmm_dstnid);
+        if (!lnet_ptlcompat_matchnid(gmni->gmni_ni->ni_nid, 
+                                     msg->gmm_dstnid)) {
+                CERROR("Bad dst nid from %u: %s\n",
+                       rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
                 return -EPROTO;
         }
         
@@ -114,235 +134,430 @@ gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
         return 0;
 }
 
+gmnal_tx_t *
+gmnal_get_tx(gmnal_ni_t *gmni)
+{
+       gmnal_tx_t       *tx = NULL;
 
-/*
- *     The caretaker thread
- *     This is main thread of execution for the NAL side
- *     This guy waits in gm_blocking_recvive and gets
- *     woken up when the myrinet adaptor gets an interrupt.
- *     Hands off receive operations to the receive thread 
- *     This thread Looks after gm_callbacks etc inline.
- */
-int
-gmnal_ct_thread(void *arg)
+        spin_lock(&gmni->gmni_tx_lock);
+
+        if (gmni->gmni_shutdown ||
+            list_empty(&gmni->gmni_idle_txs)) {
+                spin_unlock(&gmni->gmni_tx_lock);
+                return NULL;
+        }
+        
+        tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
+        list_del(&tx->tx_list);
+
+        spin_unlock(&gmni->gmni_tx_lock);
+
+        LASSERT (tx->tx_lntmsg == NULL);
+        LASSERT (tx->tx_ltxb == NULL);
+        LASSERT (!tx->tx_credit);
+        
+        return tx;
+}
+
+void
+gmnal_tx_done(gmnal_tx_t *tx, int rc)
 {
-       gmnal_ni_t              *gmnalni = arg;
-       gm_recv_event_t         *rxevent = NULL;
-       gm_recv_t               *recv = NULL;
+       gmnal_ni_t *gmni = tx->tx_gmni;
+        int         wake_sched = 0;
+        lnet_msg_t *lnetmsg = tx->tx_lntmsg;
+        
+        tx->tx_lntmsg = NULL;
 
-       sprintf(current->comm, "gmnal_ct");
-       kportal_daemonize("gmnalctd");
+        spin_lock(&gmni->gmni_tx_lock);
+        
+        if (tx->tx_ltxb != NULL) {
+                wake_sched = 1;
+                list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
+                tx->tx_ltxb = NULL;
+        }
+        
+        if (tx->tx_credit) {
+                wake_sched = 1;
+                gmni->gmni_tx_credits++;
+                tx->tx_credit = 0;
+        }
+        
+        list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
 
-       gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
+        if (wake_sched)
+                gmnal_check_txqueues_locked(gmni);
 
-       while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
+        spin_unlock(&gmni->gmni_tx_lock);
 
-                spin_lock(&gmnalni->gmni_gm_lock);
-               rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
-                spin_unlock(&gmnalni->gmni_gm_lock);
+        /* Delay finalize until tx is free */
+        if (lnetmsg != NULL)
+                lnet_finalize(gmni->gmni_ni, lnetmsg, 0);
+}
 
-               if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
-                       CDEBUG(D_NET, "time to exit\n");
-                       break;
-               }
+void 
+gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
+                          gm_status_t status)
+{
+       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
 
-               CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent));
+        LASSERT(!in_interrupt());
+         
+        CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n", 
+               tx, status, gmnal_gmstatus2str(status),
+               libcfs_nid2str(tx->tx_nid));
 
-               if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) {
-                        recv = (gm_recv_t*)&rxevent->recv;
-                        gmnal_enqueue_rx(gmnalni, recv);
-                } else {
-                        gm_unknown(gmnalni->gmni_port, rxevent);
-               }
+        gmnal_tx_done(tx, -EIO);
+}
+
+void 
+gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
+{
+       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
+       gmnal_ni_t      *gmni = tx->tx_gmni;
+
+        LASSERT(!in_interrupt());
+
+       switch(status) {
+        case GM_SUCCESS:
+                gmnal_tx_done(tx, 0);
+                return;
+
+        case GM_SEND_DROPPED:
+                CDEBUG(D_NETERROR, "Dropped tx %p to %s\n", 
+                       tx, libcfs_nid2str(tx->tx_nid));
+                /* Another tx failed and called gm_drop_sends() which made this
+                 * one complete immediately */
+                gmnal_tx_done(tx, -EIO);
+                return;
+                        
+        default:
+                /* Some error; NB don't complete tx yet; we need its credit for
+                 * gm_drop_sends() */
+                CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
+                       tx, status, gmnal_gmstatus2str(status), 
+                       libcfs_nid2str(tx->tx_nid));
+
+                gmnal_notify_peer_down(tx);
+
+                spin_lock(&gmni->gmni_gm_lock);
+                gm_drop_sends(gmni->gmni_port, 
+                              tx->tx_ltxb != NULL ?
+                              GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
+                              tx->tx_gmlid, *gmnal_tunables.gm_port, 
+                              gmnal_drop_sends_callback, tx);
+                spin_unlock(&gmni->gmni_gm_lock);
+               return;
        }
 
-       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
-       CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
-       return 0;
+        /* not reached */
+        LBUG();
 }
 
-
-/*
- *     process a receive event
- */
-int 
-gmnal_rx_thread(void *arg)
+void
+gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
 {
-       gmnal_ni_t    *gmnalni = arg;
-        char           name[16];
-       gmnal_rx_t    *rx;
-       int            rank;
-
-       for (rank=0; rank<num_rx_threads; rank++)
-               if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
-                       break;
-
-       snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
-       kportal_daemonize(name);
-
-       /*
-        *      set 1 bit for each thread started
-        *      doesn't matter which bit
-        */
-       spin_lock(&gmnalni->gmni_rxthread_flag_lock);
-       if (gmnalni->gmni_rxthread_flag)
-               gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
-       else
-               gmnalni->gmni_rxthread_flag = 1;
-       spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-
-       while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
-               CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
-
-               rx = gmnal_dequeue_rx(gmnalni);
-               if (rx == NULL) {
-                       CDEBUG(D_NET, "Receive thread time to exit\n");
-                       break;
-               }
-                
-                /* We're connectionless: simply ignore packets on error */
+        gmnal_tx_t    *tx;
+        gmnal_txbuf_t *ltxb;
+        int            gmsize;
+        int            pri;
+        void          *netaddr;
+        
+        tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
+             list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
+
+        if (tx != NULL &&
+            (tx->tx_large_nob == 0 || 
+             !list_empty(&gmni->gmni_idle_ltxbs))) {
+
+                /* consume tx */
+                list_del(&tx->tx_list);
                 
-                if (gmnal_unpack_msg(gmnalni, rx) == 0) {
-                        
-                        LASSERT (rx->rx_msg->gmm_type == GMNAL_MSG_IMMEDIATE);
-                        (void)lib_parse(gmnalni->gmni_libnal, 
-                                        &rx->rx_msg->gmm_u.immediate.gmim_hdr,
-                                        rx);
+                LASSERT (tx->tx_ltxb == NULL);
+
+                if (tx->tx_large_nob != 0) {
+                        ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
+                                          gmnal_txbuf_t, txb_list);
+
+                        /* consume large buffer */
+                        list_del(&ltxb->txb_list);
+
+                        spin_unlock(&gmni->gmni_tx_lock);
+
+                        /* Unlocking here allows sends to get re-ordered,
+                         * but we want to allow other CPUs to progress... */
+
+                        tx->tx_ltxb = ltxb;
+
+                        /* marshall message in tx_ltxb...
+                         * 1. Copy what was marshalled so far (in tx_buf) */
+                        memcpy(GMNAL_NETBUF_MSG(&ltxb->txb_buf),
+                               GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
+
+                        /* 2. Copy the payload */
+                        if (tx->tx_large_iskiov)
+                                lnet_copy_kiov2kiov(
+                                        gmni->gmni_large_pages,
+                                        ltxb->txb_buf.nb_kiov,
+                                        tx->tx_msgnob,
+                                        tx->tx_large_niov,
+                                        tx->tx_large_frags.kiov,
+                                        tx->tx_large_offset,
+                                        tx->tx_large_nob);
+                        else
+                                lnet_copy_iov2kiov(
+                                        gmni->gmni_large_pages,
+                                        ltxb->txb_buf.nb_kiov,
+                                        tx->tx_msgnob,
+                                        tx->tx_large_niov,
+                                        tx->tx_large_frags.iov,
+                                        tx->tx_large_offset,
+                                        tx->tx_large_nob);
+
+                        tx->tx_msgnob += tx->tx_large_nob;
+
+                        spin_lock(&gmni->gmni_tx_lock);
                 }
 
-                gmnal_post_rx(gmnalni, rx);
-       }
+                list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
+        }
 
-       spin_lock(&gmnalni->gmni_rxthread_flag_lock);
-       gmnalni->gmni_rxthread_flag /= 2;
-       spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
+        if (!list_empty(&gmni->gmni_cred_txq) &&
+            gmni->gmni_tx_credits != 0) {
 
-       CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
-       return 0;
+                tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
+
+                /* consume tx and 1 credit */
+                list_del(&tx->tx_list);
+                gmni->gmni_tx_credits--;
+
+                spin_unlock(&gmni->gmni_tx_lock);
+
+                /* Unlocking here allows sends to get re-ordered, but we want
+                 * to allow other CPUs to progress... */
+
+                LASSERT(!tx->tx_credit);
+                tx->tx_credit = 1;
+
+                tx->tx_launchtime = jiffies;
+
+                if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
+                        LASSERT (tx->tx_ltxb == NULL);
+                        netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
+                        gmsize = gmni->gmni_small_gmsize;
+                        pri = GMNAL_SMALL_PRIORITY;
+                } else {
+                        LASSERT (tx->tx_ltxb != NULL);
+                        netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
+                        gmsize = gmni->gmni_large_gmsize;
+                        pri = GMNAL_LARGE_PRIORITY;
+                }
+
+                spin_lock(&gmni->gmni_gm_lock);
+
+                gm_send_to_peer_with_callback(gmni->gmni_port, 
+                                              netaddr, gmsize, 
+                                              tx->tx_msgnob,
+                                              pri, 
+                                              tx->tx_gmlid,
+                                              gmnal_tx_callback, 
+                                              (void*)tx);
+
+                spin_unlock(&gmni->gmni_gm_lock);
+                spin_lock(&gmni->gmni_tx_lock);
+        }
 }
 
 void
-gmnal_post_rx(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
+gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
 {
-       CDEBUG(D_NET, "requeueing rx[%p] gmnalni[%p]\n", rx, gmnalni);
-
-       spin_lock(&gmnalni->gmni_gm_lock);
-       gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, rx->rx_msg,
-                                           rx->rx_gmsize, GM_LOW_PRIORITY, 0 );
-       spin_unlock(&gmnalni->gmni_gm_lock);
+        int   gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
+                                        gmni->gmni_small_gmsize;
+        int   pri    = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
+                                        GMNAL_SMALL_PRIORITY;
+        void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
+
+       CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
+
+       spin_lock(&gmni->gmni_gm_lock);
+        gm_provide_receive_buffer_with_tag(gmni->gmni_port, 
+                                           buffer, gmsize, pri, 0);
+       spin_unlock(&gmni->gmni_gm_lock);
 }
 
-void 
-gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
-                              gm_status_t status)
+void
+gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
 {
-       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
-       gmnal_ni_t      *gmnalni = tx->tx_gmni;
-       lib_msg_t       *libmsg = tx->tx_libmsg;
+        /* Future protocol version compatibility support!
+         * The next gmlnd-specific protocol rev will first send a message to
+         * check version; I reply with a stub message containing my current
+         * magic+version... */
+        gmnal_msg_t *msg;
+        gmnal_tx_t  *tx = gmnal_get_tx(gmni);
+
+        if (tx == NULL) {
+                CERROR("Can't allocate tx to send version info to %u\n",
+                       rx->rx_recv_gmid);
+                return;
+        }
 
-        CWARN("status for tx [%p] is [%d][%s]\n", 
-              tx, status, gmnal_gmstatus2str(status));
+        LASSERT (tx->tx_lntmsg == NULL);        /* no finalize */
 
-        gmnal_return_tx(gmnalni, tx);
-        lib_finalize(gmnalni->gmni_libnal, NULL, libmsg, PTL_FAIL);
-}
+        tx->tx_nid = LNET_NID_ANY;
+        tx->tx_gmlid = rx->rx_recv_gmid;
 
-void 
-gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
-                          gm_status_t status)
-{
-       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
-       gmnal_ni_t      *gmnalni = tx->tx_gmni;
+        msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
+        msg->gmm_magic   = GMNAL_MSG_MAGIC;
+        msg->gmm_version = GMNAL_MSG_VERSION;
+
+        /* just send magic + version */
+        tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
+        tx->tx_large_nob = 0;
+
+        spin_lock(&gmni->gmni_tx_lock);
 
-        CERROR("status for tx [%p] is [%d][%s]\n", 
-               tx, status, gmnal_gmstatus2str(status));
+        list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+        gmnal_check_txqueues_locked(gmni);
 
-        gm_resume_sending(gmnalni->gmni_port, tx->tx_gm_priority,
-                          tx->tx_gmlid, gm_port_id,
-                          gmnal_resume_sending_callback, tx);
+        spin_unlock(&gmni->gmni_tx_lock);
 }
 
-void 
-gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
+int
+gmnal_rx_thread(void *arg)
 {
-       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
-       gmnal_ni_t      *gmnalni = tx->tx_gmni;
-       lib_nal_t       *libnal = gmnalni->gmni_libnal;
-       lib_msg_t       *libmsg = tx->tx_libmsg;
-        ptl_err_t        rc;
+       gmnal_ni_t      *gmni = arg;
+       gm_recv_event_t *rxevent = NULL;
+       gm_recv_t       *recv = NULL;
+        gmnal_rx_t      *rx;
+        int              rc;
+
+       cfs_daemonize("gmnal_rxd");
+
+        down(&gmni->gmni_rx_mutex);
+
+       while (!gmni->gmni_shutdown) {
+
+                spin_lock(&gmni->gmni_gm_lock);
+               rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
+                spin_unlock(&gmni->gmni_gm_lock);
+
+                switch (GM_RECV_EVENT_TYPE(rxevent)) {
+                default:
+                        gm_unknown(gmni->gmni_port, rxevent);
+                        continue;
+
+                case GM_FAST_RECV_EVENT:
+                case GM_FAST_PEER_RECV_EVENT:
+                case GM_PEER_RECV_EVENT:
+                case GM_FAST_HIGH_RECV_EVENT:
+                case GM_FAST_HIGH_PEER_RECV_EVENT:
+                case GM_HIGH_PEER_RECV_EVENT:
+                case GM_RECV_EVENT:
+                case GM_HIGH_RECV_EVENT:
+                        break;
+                }
+                
+                recv = &rxevent->recv;
+                rx = gm_hash_find(gmni->gmni_rx_hash, 
+                                  gm_ntohp(recv->buffer));
+                LASSERT (rx != NULL);
+
+                rx->rx_recv_nob  = gm_ntoh_u32(recv->length);
+                rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
+                rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
+                rx->rx_recv_type = gm_ntoh_u8(recv->type);
+
+                switch (GM_RECV_EVENT_TYPE(rxevent)) {
+                case GM_FAST_RECV_EVENT:
+                case GM_FAST_PEER_RECV_EVENT:
+                case GM_FAST_HIGH_RECV_EVENT:
+                case GM_FAST_HIGH_PEER_RECV_EVENT:
+                        LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
+
+                        memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
+                               gm_ntohp(recv->message), rx->rx_recv_nob);
+                        break;
+                }
 
-       if (!tx) {
-               CERROR("send completion event for unknown tx\n");
-               return;
-       }
+                up(&gmni->gmni_rx_mutex);
 
-       switch(status) {
-        case(GM_SUCCESS):
-                rc = PTL_OK;
-                break;
+                CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx, 
+                        GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
+                        gm_ntohp(recv->buffer), rx->rx_recv_nob);
 
-        case(GM_SEND_DROPPED):
-                rc = PTL_FAIL;
-                break;
+                /* We're connectionless: simply drop packets with
+                 * errors */
+                rc = gmnal_unpack_msg(gmni, rx);
+
+                if (rc == 0) {
+                        gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
                         
-        default:
-                CERROR("Error %d(%s), nid "LPD64"\n",
-                       status, gmnal_gmstatus2str(status), tx->tx_nid);
+                        LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+                        rc =  lnet_parse(gmni->gmni_ni, 
+                                         &msg->gmm_u.immediate.gmim_hdr,
+                                         msg->gmm_srcnid,
+                                         rx, 0);
+                } else if (rc > 0) {
+                        gmnal_version_reply(gmni, rx);
+                        rc = -EPROTO;           /* repost rx */
+                }
 
-                spin_lock(&gmnalni->gmni_gm_lock);
-                gm_drop_sends(gmnalni->gmni_port, tx->tx_gm_priority, 
-                              tx->tx_gmlid, gm_port_id, 
-                              gmnal_drop_sends_callback, tx);
-                spin_unlock(&gmnalni->gmni_gm_lock);
-               return;
+                if (rc < 0)                     /* parse failure */
+                        gmnal_post_rx(gmni, rx);
+
+                down(&gmni->gmni_rx_mutex);
        }
 
-       gmnal_return_tx(gmnalni, tx);
-       lib_finalize(libnal, NULL, libmsg, rc);
-       return;
+        up(&gmni->gmni_rx_mutex);
+
+       CDEBUG(D_NET, "exiting\n");
+        atomic_dec(&gmni->gmni_nthreads);
+       return 0;
 }
 
-ptl_err_t
-gmnal_post_tx (gmnal_ni_t *gmnalni, gmnal_tx_t *tx, 
-               lib_msg_t *libmsg, ptl_nid_t nid, int nob)
+void
+gmnal_stop_threads(gmnal_ni_t *gmni)
 {
-        gm_status_t  gm_status;
+        int count = 2;
 
-       CDEBUG(D_NET, "send %d bytes to "LPU64"\n", nob, nid);
-
-        LASSERT ((nid >> 32) == 0);
+        gmni->gmni_shutdown = 1;
+        mb();
+        
+        /* wake rxthread owning gmni_rx_mutex with an alarm. */
+       spin_lock(&gmni->gmni_gm_lock);
+       gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
+       spin_unlock(&gmni->gmni_gm_lock);
+
+       while (atomic_read(&gmni->gmni_nthreads) != 0) {
+                count++;
+                if ((count & (count - 1)) == 0)
+                        CWARN("Waiting for %d threads to stop\n",
+                              atomic_read(&gmni->gmni_nthreads));
+                gmnal_yield(1);
+       }
+}
 
-       spin_lock(&gmnalni->gmni_gm_lock);
-       gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid, 
-                                            &tx->tx_gmlid);
-       spin_unlock(&gmnalni->gmni_gm_lock);
+int
+gmnal_start_threads(gmnal_ni_t *gmni)
+{
+        int     i;
+        int     pid;
 
-       if (gm_status != GM_SUCCESS) {
-               CERROR("Failed to obtain local id\n");
-                gmnal_return_tx(gmnalni, tx);
-               return PTL_FAIL;
-       }
+        LASSERT (!gmni->gmni_shutdown);
+        LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
 
-       CDEBUG(D_NET, "Local Node_id is [%u][%x]\n", 
-               tx->tx_gmlid, tx->tx_gmlid);
+       gm_initialize_alarm(&gmni->gmni_alarm);
 
-        tx->tx_nid = nid;
-       tx->tx_libmsg = libmsg;
-       tx->tx_gm_priority = GM_LOW_PRIORITY;
-       tx->tx_msg_size = nob;
+       for (i = 0; i < num_online_cpus(); i++) {
 
-       CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
-              "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
-              "tx [%p]\n", gmnalni->gmni_port, tx->tx_msg, 
-               tx->tx_gm_size, tx->tx_msg_size, 
-               tx->tx_nid, tx->tx_gmlid, tx);
+                pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
+                if (pid < 0) {
+                        CERROR("rx thread failed to start: %d\n", pid);
+                        gmnal_stop_threads(gmni);
+                        return pid;
+                }
 
-       spin_lock(&gmnalni->gmni_gm_lock);
-       gm_send_to_peer_with_callback(gmnalni->gmni_port, tx->tx_msg,
-                                     tx->tx_gm_size, tx->tx_msg_size,
-                                      tx->tx_gm_priority, tx->tx_gmlid,
-                                     gmnal_tx_callback, (void*)tx);
-       spin_unlock(&gmnalni->gmni_gm_lock);
+                atomic_inc(&gmni->gmni_nthreads);
+       }
 
-       return PTL_OK;
+       return 0;
 }