Whamcloud - gitweb
i=eeb:
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
index 206d86b..4b26f28 100644 (file)
  *     This file contains all gmnal send and receive functions
  */
 
-#include "gmnal.h"
+#include "gmlnd.h"
 
-/*
- *     The caretaker thread
- *     This is main thread of execution for the NAL side
- *     This guy waits in gm_blocking_recvive and gets
- *     woken up when the myrinet adaptor gets an interrupt.
- *     Hands off receive operations to the receive thread 
- *     This thread Looks after gm_callbacks etc inline.
- */
-int
-gmnal_ct_thread(void *arg)
+void
+gmnal_notify_peer_down(gmnal_tx_t *tx)
 {
-       gmnal_data_t            *nal_data;
-       gm_recv_event_t         *rxevent = NULL;
-       gm_recv_t               *recv = NULL;
+        struct timeval     now;
+        time_t             then;
 
-       if (!arg) {
-               CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
-               return(-1);
-       }
+        do_gettimeofday (&now);
+        then = now.tv_sec - (jiffies - tx->tx_launchtime)/HZ;
 
-       nal_data = (gmnal_data_t*)arg;
-       CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
-
-       sprintf(current->comm, "gmnal_ct");
-
-       daemonize();
-
-       nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
-
-       GMNAL_GM_LOCK(nal_data);
-       while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
-               CDEBUG(D_NET, "waiting\n");
-               rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
-               if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
-                       CDEBUG(D_INFO, "time to exit\n");
-                       break;
-               }
-               CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
-               switch (GM_RECV_EVENT_TYPE(rxevent)) {
-
-                       case(GM_RECV_EVENT):
-                               CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
-                               recv = (gm_recv_t*)&rxevent->recv;
-                               GMNAL_GM_UNLOCK(nal_data);
-                               gmnal_add_rxtwe(nal_data, recv);
-                               GMNAL_GM_LOCK(nal_data);
-                               CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
-                       break;
-                       case(_GM_SLEEP_EVENT):
-                               /*
-                                *      Blocking receive above just returns
-                                *      immediatly with _GM_SLEEP_EVENT
-                                *      Don't know what this is
-                                */
-                               CDEBUG(D_NET, "Sleeping in gm_unknown\n");
-                               GMNAL_GM_UNLOCK(nal_data);
-                               gm_unknown(nal_data->gm_port, rxevent);
-                               GMNAL_GM_LOCK(nal_data);
-                               CDEBUG(D_INFO, "Awake from gm_unknown\n");
-                               break;
-                               
-                       default:
-                               /*
-                                *      Don't know what this is
-                                *      gm_unknown will make sense of it
-                                *      Should be able to do something with
-                                *      FAST_RECV_EVENTS here.
-                                */
-                               CDEBUG(D_NET, "Passing event to gm_unknown\n");
-                               GMNAL_GM_UNLOCK(nal_data);
-                               gm_unknown(nal_data->gm_port, rxevent);
-                               GMNAL_GM_LOCK(nal_data);
-                               CDEBUG(D_INFO, "Processed unknown event\n");
-               }
-       }
-       GMNAL_GM_UNLOCK(nal_data);
-       nal_data->ctthread_flag = GMNAL_THREAD_RESET;
-       CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
-       return(GMNAL_STATUS_OK);
+        lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
 }
 
-
-/*
- *     process a receive event
- */
-int gmnal_rx_thread(void *arg)
+void
+gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
+               lnet_nid_t dstnid, int type)
 {
-       gmnal_data_t            *nal_data;
-       void                    *buffer;
-       gmnal_rxtwe_t           *we = NULL;
-       int                     rank;
-
-       if (!arg) {
-               CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
-               return(-1);
-       }
-
-       nal_data = (gmnal_data_t*)arg;
-       CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
-
-       for (rank=0; rank<num_rx_threads; rank++)
-               if (nal_data->rxthread_pid[rank] == current->pid)
-                       break;
-
-       sprintf(current->comm, "gmnal_rx_%d", rank);
-
-       daemonize();
-       /*
-        *      set 1 bit for each thread started
-        *      doesn't matter which bit
-        */
-       spin_lock(&nal_data->rxthread_flag_lock);
-       if (nal_data->rxthread_flag)
-               nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
-       else
-               nal_data->rxthread_flag = 1;
-       CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
-       spin_unlock(&nal_data->rxthread_flag_lock);
-
-       while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
-               CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
-               we = gmnal_get_rxtwe(nal_data);
-               if (!we) {
-                       CDEBUG(D_INFO, "Receive thread time to exit\n");
-                       break;
-               }
-
-               buffer = we->buffer;
-               switch(((gmnal_msghdr_t*)buffer)->type) {
-               case(GMNAL_SMALL_MESSAGE):
-                       gmnal_pre_receive(nal_data, we, 
-                                          GMNAL_SMALL_MESSAGE);
-               break;  
-               case(GMNAL_LARGE_MESSAGE_INIT):
-                       gmnal_pre_receive(nal_data, we, 
-                                          GMNAL_LARGE_MESSAGE_INIT);
-               break;  
-               case(GMNAL_LARGE_MESSAGE_ACK):
-                       gmnal_pre_receive(nal_data, we, 
-                                          GMNAL_LARGE_MESSAGE_ACK);
-               break;  
-               default:
-                       CDEBUG(D_ERROR, "Unsupported message type\n");
-                       gmnal_rx_bad(nal_data, we, NULL);
-               }
-               PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
-       }
-
-       spin_lock(&nal_data->rxthread_flag_lock);
-       nal_data->rxthread_flag/=2;
-       CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
-       spin_unlock(&nal_data->rxthread_flag_lock);
-       CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
-       return(GMNAL_STATUS_OK);
+        /* CAVEAT EMPTOR! this only sets the common message fields. */
+        msg->gmm_magic    = GMNAL_MSG_MAGIC;
+        msg->gmm_version  = GMNAL_MSG_VERSION;
+        msg->gmm_type     = type;
+        msg->gmm_srcnid   = lnet_ptlcompat_srcnid(gmni->gmni_ni->ni_nid,
+                                                  dstnid);
+        msg->gmm_dstnid   = dstnid;
 }
 
-
-
-/*
- *     Start processing a small message receive
- *     Get here from gmnal_receive_thread
- *     Hand off to lib_parse, which calls cb_recv
- *     which hands back to gmnal_small_receive
- *     Deal with all endian stuff here.
- */
 int
-gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
+gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
 {
-       gmnal_srxd_t    *srxd = NULL;
-       void            *buffer = NULL;
-       unsigned int snode, sport, type, length;
-       gmnal_msghdr_t  *gmnal_msghdr;
-       ptl_hdr_t       *portals_hdr;
-        int              rc;
-
-       CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", 
-              nal_data, we, gmnal_type);
-
-       buffer = we->buffer;
-       snode = we->snode;
-       sport = we->sport;
-       type = we->type;
-       buffer = we->buffer;
-       length = we->length;
-
-       gmnal_msghdr = (gmnal_msghdr_t*)buffer;
-       portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
-
-       CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
-              "type [%d], length [%d], buffer [%p]\n",
-              snode, sport, type, length, buffer);
-       CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
-              "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id, 
-              gmnal_msghdr->magic, gmnal_msghdr->type);
-       CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
-              "dest_node ["LPD64"]\n", portals_hdr->src_nid, 
-              portals_hdr->dest_nid);
-
-       
-       /*
-        *      Get a receive descriptor for this message
-        */
-       srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
-       CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
-       if (!srxd) {
-               CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
-                /* I think passing a NULL srxd to lib_parse will crash
-                 * gmnal_recv() */
-                LBUG();
-               lib_parse(nal_data->libnal, portals_hdr, srxd);
-               return(GMNAL_STATUS_FAIL);
-       }
+        gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
+        const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);
+        int          buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
+                                                gmni->gmni_small_msgsize;
+        int          flip;
+
+        /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
+
+        /* GM may not overflow our buffer */
+        LASSERT (rx->rx_recv_nob <= buffnob);
+
+        /* 6 bytes are enough to have received magic + version */
+        if (rx->rx_recv_nob < 6) {
+                CERROR("Short message from gmid %u: %d\n", 
+                       rx->rx_recv_gmid, rx->rx_recv_nob);
+                return -EPROTO;
+        }
 
-       /*
-        *      no need to bother portals library with this
-        */
-       if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
-               gmnal_large_tx_ack_received(nal_data, srxd);
-               return(GMNAL_STATUS_OK);
-       }
+        if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
+                flip = 0;
+        } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
+                flip = 1;
+        } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
+                   msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
+                return EPROTO;
+        } else {
+                CERROR("Bad magic from gmid %u: %08x\n", 
+                       rx->rx_recv_gmid, msg->gmm_magic);
+                return -EPROTO;
+        }
 
-       srxd->nal_data = nal_data;
-       srxd->type = gmnal_type;
-       srxd->nsiov = gmnal_msghdr->niov;
-       srxd->gm_source_node = gmnal_msghdr->sender_node_id;
-       
-       CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n", 
-              buffer+GMNAL_MSGHDR_SIZE);
-       /*
-        *      control passes to lib, which calls cb_recv 
-        *      cb_recv is responsible for returning the buffer 
-        *      for future receive
-        */
-       rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
-
-        if (rc != PTL_OK) {
-                /* I just received garbage; take appropriate action... */
-                LBUG();
+        if (msg->gmm_version != 
+            (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
+                return EPROTO;
         }
 
-       return(GMNAL_STATUS_OK);
-}
+        if (rx->rx_recv_nob < hdr_size) {
+                CERROR("Short message from %u: %d\n",
+                       rx->rx_recv_gmid, rx->rx_recv_nob);
+                return -EPROTO;
+        }
 
+        if (flip) {
+                /* leave magic unflipped as a clue to peer endianness */
+                __swab16s(&msg->gmm_version);
+                __swab16s(&msg->gmm_type);
+                __swab64s(&msg->gmm_srcnid);
+                __swab64s(&msg->gmm_dstnid);
+        }
+        
+        if (msg->gmm_srcnid == LNET_NID_ANY) {
+                CERROR("Bad src nid from %u: %s\n", 
+                       rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
+                return -EPROTO;
+        }
 
+        if (!lnet_ptlcompat_matchnid(gmni->gmni_ni->ni_nid, 
+                                     msg->gmm_dstnid)) {
+                CERROR("Bad dst nid from %u: %s\n",
+                       rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
+                return -EPROTO;
+        }
+        
+        switch (msg->gmm_type) {
+        default:
+                CERROR("Unknown message type from %u: %x\n", 
+                       rx->rx_recv_gmid, msg->gmm_type);
+                return -EPROTO;
+                
+        case GMNAL_MSG_IMMEDIATE:
+                if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
+                        CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n", 
+                               rx->rx_recv_gmid, rx->rx_recv_nob, 
+                               offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
+                        return -EPROTO;
+                }
+                break;
+        }
+        return 0;
+}
 
-/*
- *     After a receive has been processed, 
- *     hang out the receive buffer again.
- *     This implicitly returns a receive token.
- */
-int
-gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
+gmnal_tx_t *
+gmnal_get_tx(gmnal_ni_t *gmni)
 {
-       CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
+       gmnal_tx_t       *tx = NULL;
 
-       CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
+        spin_lock(&gmni->gmni_tx_lock);
 
-       GMNAL_GM_LOCK(nal_data);
-       gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
-                                       srxd->gmsize, GM_LOW_PRIORITY, 0 );
-       GMNAL_GM_UNLOCK(nal_data);
-
-       return(GMNAL_STATUS_OK);
-}
+        if (gmni->gmni_shutdown ||
+            list_empty(&gmni->gmni_idle_txs)) {
+                spin_unlock(&gmni->gmni_tx_lock);
+                return NULL;
+        }
+        
+        tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
+        list_del(&tx->tx_list);
 
+        spin_unlock(&gmni->gmni_tx_lock);
 
-/*
- *     Handle a bad message
- *     A bad message is one we don't expect or can't interpret
- */
-int
-gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
-{
-       CDEBUG(D_TRACE, "Can't handle message\n");
-
-       if (!srxd)
-               srxd = gmnal_rxbuffer_to_srxd(nal_data, 
-                                              we->buffer);
-       if (srxd) {
-               gmnal_rx_requeue_buffer(nal_data, srxd);
-       } else {
-               CDEBUG(D_ERROR, "Can't find a descriptor for this buffer\n");
-               /*
-                *      get rid of it ?
-                */
-               return(GMNAL_STATUS_FAIL);
-       }
-
-       return(GMNAL_STATUS_OK);
+        LASSERT (tx->tx_lntmsg == NULL);
+        LASSERT (tx->tx_ltxb == NULL);
+        LASSERT (!tx->tx_credit);
+        
+        return tx;
 }
 
-
-
-/*
- *     Process a small message receive.
- *     Get here from gmnal_receive_thread, gmnal_pre_receive
- *     lib_parse, cb_recv
- *     Put data from prewired receive buffer into users buffer(s)
- *     Hang out the receive buffer again for another receive
- *     Call lib_finalize
- */
-ptl_err_t
-gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
+void
+gmnal_tx_done(gmnal_tx_t *tx, int rc)
 {
-       gmnal_srxd_t    *srxd = NULL;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
+       gmnal_ni_t *gmni = tx->tx_gmni;
+        int         wake_sched = 0;
+        lnet_msg_t *lnetmsg = tx->tx_lntmsg;
+        
+        tx->tx_lntmsg = NULL;
 
+        spin_lock(&gmni->gmni_tx_lock);
+        
+        if (tx->tx_ltxb != NULL) {
+                wake_sched = 1;
+                list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
+                tx->tx_ltxb = NULL;
+        }
+        
+        if (tx->tx_credit) {
+                wake_sched = 1;
+                gmni->gmni_tx_credits++;
+                tx->tx_credit = 0;
+        }
+        
+        list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
 
-       if (!private) {
-               CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
-               lib_finalize(libnal, private, cookie, PTL_FAIL);
-               return(PTL_FAIL);
-       }
+        if (wake_sched)
+                gmnal_check_txqueues_locked(gmni);
 
-       srxd = (gmnal_srxd_t*)private;
-
-       /*
-        *      let portals library know receive is complete
-        */
-       CDEBUG(D_PORTALS, "calling lib_finalize\n");
-       lib_finalize(libnal, private, cookie, PTL_OK);
-       /*
-        *      return buffer so it can be used again
-        */
-       CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
-       GMNAL_GM_LOCK(nal_data);
-       gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
-                                          srxd->gmsize, GM_LOW_PRIORITY, 0);
-       GMNAL_GM_UNLOCK(nal_data);
-
-       return(PTL_OK);
-}
+        spin_unlock(&gmni->gmni_tx_lock);
 
+        /* Delay finalize until tx is free */
+        if (lnetmsg != NULL)
+                lnet_finalize(gmni->gmni_ni, lnetmsg, rc);
+}
 
-/*
- *     Start a small transmit. 
- *     Use the given send token (and wired transmit buffer).
- *     Copy headers to wired buffer and initiate gm_send from the wired buffer.
- *     The callback function informs when the send is complete.
- */
-ptl_err_t
-gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-               ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
-               gmnal_stxd_t *stxd, int size)
+void 
+gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
+                          gm_status_t status)
 {
-       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
-       void            *buffer = NULL;
-       gmnal_msghdr_t  *msghdr = NULL;
-       int             tot_size = 0;
-       unsigned int    local_nid;
-       gm_status_t     gm_status = GM_SUCCESS;
-
-       CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
-              "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
-              "size [%d]\n", libnal, private, cookie, hdr, type,
-              global_nid, pid, stxd, size);
-
-       CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
-              hdr->dest_nid, hdr->src_nid);
-
-       if (!nal_data) {
-               CDEBUG(D_ERROR, "no nal_data\n");
-               return(PTL_FAIL);
-       } else {
-               CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
-       }
+       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
 
-       GMNAL_GM_LOCK(nal_data);
-       gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
-                                           &local_nid);
-       GMNAL_GM_UNLOCK(nal_data);
-       if (gm_status != GM_SUCCESS) {
-               CDEBUG(D_ERROR, "Failed to obtain local id\n");
-               return(PTL_FAIL);
-       }
-       CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
-
-       stxd->type = GMNAL_SMALL_MESSAGE;
-       stxd->cookie = cookie;
-
-       /*
-        *      Copy gmnal_msg_hdr and portals header to the transmit buffer
-        *      Then send the message, as the data has previously been copied in
-        *      (HP SFS 1380).
-        */
-       buffer = stxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-
-       msghdr->magic = GMNAL_MAGIC;
-       msghdr->type = GMNAL_SMALL_MESSAGE;
-       msghdr->sender_node_id = nal_data->gm_global_nid;
-       CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
-       buffer += sizeof(gmnal_msghdr_t);
-
-       CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
-       gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
-
-       buffer += sizeof(ptl_hdr_t);
-
-       CDEBUG(D_INFO, "sending\n");
-       tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
-       stxd->msg_size = tot_size;
-
-
-       CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
-              "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
-              "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, 
-              stxd->msg_size, global_nid, local_nid, stxd);
-
-       GMNAL_GM_LOCK(nal_data);
-       stxd->gm_priority = GM_LOW_PRIORITY;
-       stxd->gm_target_node = local_nid;
-       gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
-                                     stxd->gm_size, stxd->msg_size, 
-                                     GM_LOW_PRIORITY, local_nid, 
-                                     gmnal_small_tx_callback, (void*)stxd);
-       GMNAL_GM_UNLOCK(nal_data);
-       CDEBUG(D_INFO, "done\n");
-               
-       return(PTL_OK);
-}
+        LASSERT(!in_interrupt());
+         
+        CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n", 
+               tx, status, gmnal_gmstatus2str(status),
+               libcfs_nid2str(tx->tx_nid));
 
+        gmnal_tx_done(tx, -EIO);
+}
 
-/*
- *     A callback to indicate the small transmit operation is compete
- *     Check for erros and try to deal with them.
- *     Call lib_finalise to inform the client application that the send 
- *     is complete and the memory can be reused.
- *     Return the stxd when finished with it (returns a send token)
- */
 void 
-gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
+gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
 {
-       gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       lib_msg_t       *cookie = stxd->cookie;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
-       lib_nal_t       *libnal = nal_data->libnal;
-       unsigned         gnid = 0;
-       gm_status_t      gm_status = 0;
-
-       if (!stxd) {
-               CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
-               return;
-       }
-       if (status != GM_SUCCESS) {
-               GMNAL_GM_LOCK(nal_data);
-               gm_status = gm_node_id_to_global_id(nal_data->gm_port,
-                                                   stxd->gm_target_node,&gnid);
-               GMNAL_GM_UNLOCK(nal_data);
-               if (gm_status != GM_SUCCESS) {
-                       CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
-                              gm_status);
-                       gnid = 0;
-               }
-               CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s] to [%u]\n",
-                      stxd, gmnal_gm_error(status), gnid);
-       }
+       gmnal_tx_t      *tx = (gmnal_tx_t*)context;
+       gmnal_ni_t      *gmni = tx->tx_gmni;
 
-       switch(status) {
-               case(GM_SUCCESS):
-               break;
-
-
-
-               case(GM_SEND_DROPPED):
-               /*
-                *      do a resend on the dropped ones
-                */
-                       CDEBUG(D_ERROR, "send stxd [%p] was dropped "
-                              "resending\n", context);
-                       GMNAL_GM_LOCK(nal_data);
-                       gm_send_to_peer_with_callback(nal_data->gm_port, 
-                                                     stxd->buffer, 
-                                                     stxd->gm_size, 
-                                                     stxd->msg_size, 
-                                                     stxd->gm_priority, 
-                                                     stxd->gm_target_node, 
-                                                     gmnal_small_tx_callback,
-                                                     context);
-                       GMNAL_GM_UNLOCK(nal_data);
-               
-               return;
-               case(GM_TIMED_OUT):
-               case(GM_SEND_TIMED_OUT):
-               /*
-                *      drop these ones
-                */
-                       CDEBUG(D_INFO, "calling gm_drop_sends\n");
-                       GMNAL_GM_LOCK(nal_data);
-                       gm_drop_sends(nal_data->gm_port, stxd->gm_priority, 
-                                     stxd->gm_target_node, GMNAL_GM_PORT_ID, 
-                                     gmnal_drop_sends_callback, context);
-                       GMNAL_GM_UNLOCK(nal_data);
+        LASSERT(!in_interrupt());
 
-               return;
-
-
-               /*
-                *      abort on these ?
-                */
-               case(GM_TRY_AGAIN):
-               case(GM_INTERRUPTED):
-               case(GM_FAILURE):
-               case(GM_INPUT_BUFFER_TOO_SMALL):
-               case(GM_OUTPUT_BUFFER_TOO_SMALL):
-               case(GM_BUSY):
-               case(GM_MEMORY_FAULT):
-               case(GM_INVALID_PARAMETER):
-               case(GM_OUT_OF_MEMORY):
-               case(GM_INVALID_COMMAND):
-               case(GM_PERMISSION_DENIED):
-               case(GM_INTERNAL_ERROR):
-               case(GM_UNATTACHED):
-               case(GM_UNSUPPORTED_DEVICE):
-               case(GM_SEND_REJECTED):
-               case(GM_SEND_TARGET_PORT_CLOSED):
-               case(GM_SEND_TARGET_NODE_UNREACHABLE):
-               case(GM_SEND_PORT_CLOSED):
-               case(GM_NODE_ID_NOT_YET_SET):
-               case(GM_STILL_SHUTTING_DOWN):
-               case(GM_CLONE_BUSY):
-               case(GM_NO_SUCH_DEVICE):
-               case(GM_ABORTED):
-               case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
-               case(GM_UNTRANSLATED_SYSTEM_ERROR):
-               case(GM_ACCESS_DENIED):
-               case(GM_NO_DRIVER_SUPPORT):
-               case(GM_PTE_REF_CNT_OVERFLOW):
-               case(GM_NOT_SUPPORTED_IN_KERNEL):
-               case(GM_NOT_SUPPORTED_ON_ARCH):
-               case(GM_NO_MATCH):
-               case(GM_USER_ERROR):
-               case(GM_DATA_CORRUPTED):
-               case(GM_HARDWARE_FAULT):
-               case(GM_SEND_ORPHANED):
-               case(GM_MINOR_OVERFLOW):
-               case(GM_PAGE_TABLE_FULL):
-               case(GM_UC_ERROR):
-               case(GM_INVALID_PORT_NUMBER):
-               case(GM_DEV_NOT_FOUND):
-               case(GM_FIRMWARE_NOT_RUNNING):
-               case(GM_YP_NO_MATCH):
-               default:
-                gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
-                                      stxd->gm_target_node, GMNAL_GM_PORT_ID,
-                                      gmnal_resume_sending_callback, context);
+       switch(status) {
+        case GM_SUCCESS:
+                gmnal_tx_done(tx, 0);
                 return;
 
-       }
-
-       /*
-        *      TO DO
-        *      If this is a large message init,
-        *      we're not finished with the data yet,
-        *      so can't call lib_finalise.
-        *      However, we're also holding on to a 
-        *      stxd here (to keep track of the source
-        *      iovec only). Should use another structure
-        *      to keep track of iovec and return stxd to 
-        *      free list earlier.
-        */
-       if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
-               CDEBUG(D_INFO, "large transmit done\n");
+        case GM_SEND_DROPPED:
+                CDEBUG(D_NETERROR, "Dropped tx %p to %s\n", 
+                       tx, libcfs_nid2str(tx->tx_nid));
+                /* Another tx failed and called gm_drop_sends() which made this
+                 * one complete immediately */
+                gmnal_tx_done(tx, -EIO);
+                return;
+                        
+        default:
+                /* Some error; NB don't complete tx yet; we need its credit for
+                 * gm_drop_sends() */
+                CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
+                       tx, status, gmnal_gmstatus2str(status), 
+                       libcfs_nid2str(tx->tx_nid));
+
+                gmnal_notify_peer_down(tx);
+
+                spin_lock(&gmni->gmni_gm_lock);
+                gm_drop_sends(gmni->gmni_port, 
+                              tx->tx_ltxb != NULL ?
+                              GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
+                              tx->tx_gmlid, *gmnal_tunables.gm_port, 
+                              gmnal_drop_sends_callback, tx);
+                spin_unlock(&gmni->gmni_gm_lock);
                return;
        }
-       gmnal_return_stxd(nal_data, stxd);
-       lib_finalize(libnal, stxd, cookie, PTL_OK);
-       return;
-}
 
-/*
- *     After an error on the port
- *     call this to allow future sends to complete
- */
-void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
-                                 gm_status_t status)
-{
-        gmnal_data_t    *nal_data;
-        gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-        CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
-        gmnal_return_stxd(stxd->nal_data, stxd);
-        return;
+        /* not reached */
+        LBUG();
 }
 
-
-void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
-                               gm_status_t status)
-{
-       gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       gmnal_data_t    *nal_data = stxd->nal_data;
-
-       CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
-       if (status == GM_SUCCESS) {
-               GMNAL_GM_LOCK(nal_data);
-               gm_send_to_peer_with_callback(gm_port, stxd->buffer, 
-                                             stxd->gm_size, stxd->msg_size, 
-                                             stxd->gm_priority, 
-                                             stxd->gm_target_node, 
-                                             gmnal_small_tx_callback, 
-                                             context);
-               GMNAL_GM_UNLOCK(nal_data);
-       } else {
-               CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is "
-                      "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
-       }
-
-
-       return;
-}
-
-
-/*
- *     Begine a large transmit.
- *     Do a gm_register of the memory pointed to by the iovec 
- *     and send details to the receiver. The receiver does a gm_get
- *     to pull the data and sends and ack when finished. Upon receipt of
- *     this ack, deregister the memory. Only 1 send token is required here.
- */
-int
-gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
-               unsigned int niov, struct iovec *iov, size_t offset, int size)
+void
+gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
 {
+        gmnal_tx_t    *tx;
+        gmnal_txbuf_t *ltxb;
+        int            gmsize;
+        int            pri;
+        void          *netaddr;
+        
+        tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
+             list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
+
+        if (tx != NULL &&
+            (tx->tx_large_nob == 0 || 
+             !list_empty(&gmni->gmni_idle_ltxbs))) {
+
+                /* consume tx */
+                list_del(&tx->tx_list);
+                
+                LASSERT (tx->tx_ltxb == NULL);
+
+                if (tx->tx_large_nob != 0) {
+                        ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
+                                          gmnal_txbuf_t, txb_list);
+
+                        /* consume large buffer */
+                        list_del(&ltxb->txb_list);
+
+                        spin_unlock(&gmni->gmni_tx_lock);
+
+                        /* Unlocking here allows sends to get re-ordered,
+                         * but we want to allow other CPUs to progress... */
+
+                        tx->tx_ltxb = ltxb;
+
+                        /* marshall message in tx_ltxb...
+                         * 1. Copy what was marshalled so far (in tx_buf) */
+                        memcpy(GMNAL_NETBUF_MSG(&ltxb->txb_buf),
+                               GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
+
+                        /* 2. Copy the payload */
+                        if (tx->tx_large_iskiov)
+                                lnet_copy_kiov2kiov(
+                                        gmni->gmni_large_pages,
+                                        ltxb->txb_buf.nb_kiov,
+                                        tx->tx_msgnob,
+                                        tx->tx_large_niov,
+                                        tx->tx_large_frags.kiov,
+                                        tx->tx_large_offset,
+                                        tx->tx_large_nob);
+                        else
+                                lnet_copy_iov2kiov(
+                                        gmni->gmni_large_pages,
+                                        ltxb->txb_buf.nb_kiov,
+                                        tx->tx_msgnob,
+                                        tx->tx_large_niov,
+                                        tx->tx_large_frags.iov,
+                                        tx->tx_large_offset,
+                                        tx->tx_large_nob);
+
+                        tx->tx_msgnob += tx->tx_large_nob;
+
+                        spin_lock(&gmni->gmni_tx_lock);
+                }
+
+                list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
+        }
 
-       gmnal_data_t    *nal_data;
-       gmnal_stxd_t    *stxd = NULL;
-       void            *buffer = NULL;
-       gmnal_msghdr_t  *msghdr = NULL;
-       unsigned int    local_nid;
-       int             mlen = 0;       /* the size of the init message data */
-       struct iovec    *iov_dup = NULL;
-       gm_status_t     gm_status;
-       int             niov_dup;
-
-
-       CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
-              "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
-              "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type, 
-              global_nid, pid, niov, iov, size);
-
-       if (libnal)
-               nal_data = (gmnal_data_t*)libnal->libnal_data;
-       else  {
-               CDEBUG(D_ERROR, "no libnal.\n");
-               return(GMNAL_STATUS_FAIL);
-       }
-       
-
-       /*
-        *      Get stxd and buffer. Put local address of data in buffer, 
-        *      send local addresses to target, 
-        *      wait for the target node to suck the data over.
-        *      The stxd is used to ren
-        */
-       stxd = gmnal_get_stxd(nal_data, 1);
-       CDEBUG(D_INFO, "stxd [%p]\n", stxd);
-
-       stxd->type = GMNAL_LARGE_MESSAGE_INIT;
-       stxd->cookie = cookie;
-
-       /*
-        *      Copy gmnal_msg_hdr and portals header to the transmit buffer
-        *      Then copy the iov in
-        */
-       buffer = stxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-
-       CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
-       msghdr->magic = GMNAL_MAGIC;
-       msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
-       msghdr->sender_node_id = nal_data->gm_global_nid;
-       msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
-       msghdr->niov = niov ;
-       buffer += sizeof(gmnal_msghdr_t);
-       mlen = sizeof(gmnal_msghdr_t);
-       CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
-
-       CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
-
-       gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
-       buffer += sizeof(ptl_hdr_t);
-       mlen += sizeof(ptl_hdr_t); 
-       CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
-        while (offset >= iov->iov_len) {
-                offset -= iov->iov_len;
-                niov--;
-                iov++;
-        } 
-
-        LASSERT(offset >= 0);
-        /*
-        *      Store the iovs in the stxd for we can get 
-        *      them later if we need them
-        */
-        stxd->iov[0].iov_base = iov->iov_base + offset; 
-        stxd->iov[0].iov_len = iov->iov_len - offset; 
-       CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
-        if (niov > 1)
-               gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
-       stxd->niov = niov;
-
-       /*
-        *      copy the iov to the buffer so target knows 
-        *      where to get the data from
-        */
-       CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
-       gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
-       mlen += stxd->niov*(sizeof(struct iovec));
-       CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-       
-       /*
-        *      register the memory so the NIC can get hold of the data
-        *      This is a slow process. it'd be good to overlap it 
-        *      with something else.
-        */
-        iov = stxd->iov;
-       iov_dup = iov;
-       niov_dup = niov;
-       while(niov--) {
-               CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
-                      iov->iov_base, iov->iov_len);
-               GMNAL_GM_LOCK(nal_data);
-               gm_status = gm_register_memory(nal_data->gm_port, 
-                                              iov->iov_base, iov->iov_len);
-               if (gm_status != GM_SUCCESS) {
-                       GMNAL_GM_UNLOCK(nal_data);
-                       CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
-                              "for memory [%p] len ["LPSZ"]\n", 
-                              gm_status, gmnal_gm_error(gm_status), 
-                              iov->iov_base, iov->iov_len);
-                       GMNAL_GM_LOCK(nal_data);
-                       while (iov_dup != iov) {
-                               gm_deregister_memory(nal_data->gm_port, 
-                                                    iov_dup->iov_base, 
-                                                    iov_dup->iov_len);
-                               iov_dup++;
-                       }
-                       GMNAL_GM_UNLOCK(nal_data);
-                       gmnal_return_stxd(nal_data, stxd);
-                       return(PTL_FAIL);
-               }
-
-               GMNAL_GM_UNLOCK(nal_data);
-               iov++;
-       }
+        if (!list_empty(&gmni->gmni_cred_txq) &&
+            gmni->gmni_tx_credits != 0) {
 
-       /*
-        *      Send the init message to the target
-        */
-       CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
-       GMNAL_GM_LOCK(nal_data);
-       gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
-                                           &local_nid);
-       if (gm_status != GM_SUCCESS) {
-               GMNAL_GM_UNLOCK(nal_data);
-               CDEBUG(D_ERROR, "Failed to obtain local id\n");
-               gmnal_return_stxd(nal_data, stxd);
-               /* TO DO deregister memory on failure */
-               return(GMNAL_STATUS_FAIL);
-       }
-       CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
-       gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
-                                     stxd->gm_size, mlen, GM_LOW_PRIORITY, 
-                                     local_nid, gmnal_large_tx_callback, 
-                                     (void*)stxd);
-       GMNAL_GM_UNLOCK(nal_data);
-       
-       CDEBUG(D_INFO, "done\n");
-               
-       return(PTL_OK);
-}
+                tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
 
-/*
- *     Callback function indicates that send of buffer with 
- *     large message iovec has completed (or failed).
- */
-void 
-gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
-{
-       gmnal_small_tx_callback(gm_port, context, status);
+                /* consume tx and 1 credit */
+                list_del(&tx->tx_list);
+                gmni->gmni_tx_credits--;
 
-}
+                spin_unlock(&gmni->gmni_tx_lock);
 
+                /* Unlocking here allows sends to get re-ordered, but we want
+                 * to allow other CPUs to progress... */
 
+                LASSERT(!tx->tx_credit);
+                tx->tx_credit = 1;
 
-/*
- *     Have received a buffer that contains an iovec of the sender. 
- *     Do a gm_register_memory of the receivers buffer and then do a get
- *     data from the sender.
- */
-int
-gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               unsigned int nriov, struct iovec *riov, size_t offset, 
-               size_t mlen, size_t rlen)
-{
-       gmnal_data_t    *nal_data = libnal->libnal_data;
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       void            *buffer = NULL;
-       struct  iovec   *riov_dup;
-       int             nriov_dup;
-       gmnal_msghdr_t  *msghdr = NULL;
-       gm_status_t     gm_status;
-
-       CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
-              "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
-               libnal, private, cookie, nriov, riov, mlen, rlen);
-
-       if (!srxd) {
-               CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
-               lib_finalize(libnal, private, cookie, PTL_FAIL);
-               return(PTL_FAIL);
-       }
+                tx->tx_launchtime = jiffies;
 
-       buffer = srxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-       buffer += sizeof(gmnal_msghdr_t);
-       buffer += sizeof(ptl_hdr_t);
-
-       /*
-        *      Store the senders stxd address in the srxd for this message
-        *      The gmnal_large_message_ack needs it to notify the sender
-        *      the pull of data is complete
-        */
-       srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
-
-       /*
-        *      Register the receivers memory
-        *      get the data,
-        *      tell the sender that we got the data
-        *      then tell the receiver we got the data
-        *      TO DO
-        *      If the iovecs match, could interleave 
-        *      gm_registers and gm_gets for each element
-        */
-        while (offset >= riov->iov_len) {
-                offset -= riov->iov_len;
-                riov++;
-                nriov--;
-        } 
-        LASSERT (nriov >= 0);
-        LASSERT (offset >= 0);
-       /*
-        *      do this so the final gm_get callback can deregister the memory
-        */
-       PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
-
-        srxd->riov[0].iov_base = riov->iov_base + offset;
-        srxd->riov[0].iov_len = riov->iov_len - offset;
-        if (nriov > 1)
-               gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
-       srxd->nriov = nriov;
-        
-        riov = srxd->riov;
-       nriov_dup = nriov;
-       riov_dup = riov;
-       while(nriov--) {
-               CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
-                      riov->iov_base, riov->iov_len);
-               GMNAL_GM_LOCK(nal_data);
-               gm_status = gm_register_memory(nal_data->gm_port, 
-                                              riov->iov_base, riov->iov_len);
-               if (gm_status != GM_SUCCESS) {
-                       GMNAL_GM_UNLOCK(nal_data);
-                       CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
-                              "for memory [%p] len ["LPSZ"]\n", 
-                              gm_status, gmnal_gm_error(gm_status), 
-                              riov->iov_base, riov->iov_len);
-                       GMNAL_GM_LOCK(nal_data);
-                       while (riov_dup != riov) {
-                               gm_deregister_memory(nal_data->gm_port, 
-                                                    riov_dup->iov_base, 
-                                                    riov_dup->iov_len);
-                               riov_dup++;
-                       }
-                       GMNAL_GM_LOCK(nal_data);
-                       /*
-                        *      give back srxd and buffer. Send NACK to sender
-                        */
-                        PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
-                       return(PTL_FAIL);
-               }
-               GMNAL_GM_UNLOCK(nal_data);
-               riov++;
-       }
+                if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
+                        LASSERT (tx->tx_ltxb == NULL);
+                        netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
+                        gmsize = gmni->gmni_small_gmsize;
+                        pri = GMNAL_SMALL_PRIORITY;
+                } else {
+                        LASSERT (tx->tx_ltxb != NULL);
+                        netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
+                        gmsize = gmni->gmni_large_gmsize;
+                        pri = GMNAL_LARGE_PRIORITY;
+                }
 
-       /*
-        *      now do gm_get to get the data
-        */
-       srxd->cookie = cookie;
-       if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer, 
-                             nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
-               CDEBUG(D_ERROR, "can't get the data");
-       }
+                spin_lock(&gmni->gmni_gm_lock);
 
-       CDEBUG(D_INFO, "lgmanl_large_rx done\n");
+                gm_send_to_peer_with_callback(gmni->gmni_port, 
+                                              netaddr, gmsize, 
+                                              tx->tx_msgnob,
+                                              pri, 
+                                              tx->tx_gmlid,
+                                              gmnal_tx_callback, 
+                                              (void*)tx);
 
-       return(PTL_OK);
+                spin_unlock(&gmni->gmni_gm_lock);
+                spin_lock(&gmni->gmni_tx_lock);
+        }
 }
 
+void
+gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
+{
+        int   gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
+                                        gmni->gmni_small_gmsize;
+        int   pri    = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
+                                        GMNAL_SMALL_PRIORITY;
+        void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
+
+       CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
+
+       spin_lock(&gmni->gmni_gm_lock);
+        gm_provide_receive_buffer_with_tag(gmni->gmni_port, 
+                                           buffer, gmsize, pri, 0);
+       spin_unlock(&gmni->gmni_gm_lock);
+}
 
-/*
- *     Perform a number of remote gets as part of receiving 
- *     a large message.
- *     The final one to complete (i.e. the last callback to get called)
- *     tidies up.
- *     gm_get requires a send token.
- */
-int
-gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, 
-                 int nriov, struct iovec *riov)
+void
+gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
 {
+        /* Future protocol version compatibility support!
+         * The next gmlnd-specific protocol rev will first send a message to
+         * check version; I reply with a stub message containing my current
+         * magic+version... */
+        gmnal_msg_t *msg;
+        gmnal_tx_t  *tx = gmnal_get_tx(gmni);
+
+        if (tx == NULL) {
+                CERROR("Can't allocate tx to send version info to %u\n",
+                       rx->rx_recv_gmid);
+                return;
+        }
 
-       int     ncalls = 0;
+        LASSERT (tx->tx_lntmsg == NULL);        /* no finalize */
 
-       CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
-              "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
+        tx->tx_nid = LNET_NID_ANY;
+        tx->tx_gmlid = rx->rx_recv_gmid;
 
+        msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
+        msg->gmm_magic   = GMNAL_MSG_MAGIC;
+        msg->gmm_version = GMNAL_MSG_VERSION;
 
-       ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
-       if (ncalls < 0) {
-               CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
-               return(GMNAL_STATUS_FAIL);
-       }
-       CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
-       spin_lock_init(&srxd->callback_lock);
-       srxd->ncallbacks = ncalls;
-       srxd->callback_status = 0;
-
-       ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
-       if (ncalls < 0) {
-               CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
-               return(GMNAL_STATUS_FAIL);
-       }
+        /* just send magic + version */
+        tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
+        tx->tx_large_nob = 0;
 
-       return(GMNAL_STATUS_OK);
+        spin_lock(&gmni->gmni_tx_lock);
 
-}
+        list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+        gmnal_check_txqueues_locked(gmni);
 
+        spin_unlock(&gmni->gmni_tx_lock);
+}
 
-/*
- *     pull data from source node (source iovec) to a local iovec.
- *     The iovecs may not match which adds the complications below.
- *     Count the number of gm_gets that will be required so the callbacks
- *     can determine who is the last one.
- */    
 int
-gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, 
-              struct iovec *siov, int nriov, struct iovec *riov)
+gmnal_rx_thread(void *arg)
 {
+       gmnal_ni_t      *gmni = arg;
+       gm_recv_event_t *rxevent = NULL;
+       gm_recv_t       *recv = NULL;
+        gmnal_rx_t      *rx;
+        int              rc;
 
-       int     ncalls = 0;
-       int     slen = siov->iov_len, rlen = riov->iov_len;
-       char    *sbuf = siov->iov_base, *rbuf = riov->iov_base; 
-       unsigned long   sbuf_long;
-       gm_remote_ptr_t remote_ptr = 0;
-       unsigned int    source_node;
-       gmnal_ltxd_t    *ltxd = NULL;
-       gmnal_data_t    *nal_data = srxd->nal_data;
-
-       CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
-       if (do_copy) {
-               if (!nal_data) {
-                       CDEBUG(D_ERROR, "Bad args No nal_data\n");
-                       return(GMNAL_STATUS_FAIL);
-               }
-               GMNAL_GM_LOCK(nal_data);
-               if (gm_global_id_to_node_id(nal_data->gm_port, 
-                                           srxd->gm_source_node, 
-                                           &source_node) != GM_SUCCESS) {
-
-                       CDEBUG(D_ERROR, "cannot resolve global_id [%u] "
-                              "to local node_id\n", srxd->gm_source_node);
-                       GMNAL_GM_UNLOCK(nal_data);
-                       return(GMNAL_STATUS_FAIL);
-               }
-               GMNAL_GM_UNLOCK(nal_data);
-               /*
-                *      We need a send token to use gm_get
-                *      getting an stxd gets us a send token.
-                *      the stxd is used as the context to the
-                *      callback function (so stxd can be returned).
-                *      Set pointer in stxd to srxd so callback count in srxd
-                *      can be decremented to find last callback to complete
-                */
-               CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n", 
-                      srxd->gm_source_node, source_node);
+       cfs_daemonize("gmnal_rxd");
+
+        down(&gmni->gmni_rx_mutex);
+
+       while (!gmni->gmni_shutdown) {
+
+                spin_lock(&gmni->gmni_gm_lock);
+               rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
+                spin_unlock(&gmni->gmni_gm_lock);
+
+                switch (GM_RECV_EVENT_TYPE(rxevent)) {
+                default:
+                        gm_unknown(gmni->gmni_port, rxevent);
+                        continue;
+
+                case GM_FAST_RECV_EVENT:
+                case GM_FAST_PEER_RECV_EVENT:
+                case GM_PEER_RECV_EVENT:
+                case GM_FAST_HIGH_RECV_EVENT:
+                case GM_FAST_HIGH_PEER_RECV_EVENT:
+                case GM_HIGH_PEER_RECV_EVENT:
+                case GM_RECV_EVENT:
+                case GM_HIGH_RECV_EVENT:
+                        break;
+                }
+                
+                recv = &rxevent->recv;
+                rx = gm_hash_find(gmni->gmni_rx_hash, 
+                                  gm_ntohp(recv->buffer));
+                LASSERT (rx != NULL);
+
+                rx->rx_recv_nob  = gm_ntoh_u32(recv->length);
+                rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
+                rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
+                rx->rx_recv_type = gm_ntoh_u8(recv->type);
+
+                switch (GM_RECV_EVENT_TYPE(rxevent)) {
+                case GM_FAST_RECV_EVENT:
+                case GM_FAST_PEER_RECV_EVENT:
+                case GM_FAST_HIGH_RECV_EVENT:
+                case GM_FAST_HIGH_PEER_RECV_EVENT:
+                        LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
+
+                        memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
+                               gm_ntohp(recv->message), rx->rx_recv_nob);
+                        break;
+                }
+
+                up(&gmni->gmni_rx_mutex);
+
+                CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx, 
+                        GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
+                        gm_ntohp(recv->buffer), rx->rx_recv_nob);
+
+                /* We're connectionless: simply drop packets with
+                 * errors */
+                rc = gmnal_unpack_msg(gmni, rx);
+
+                if (rc == 0) {
+                        gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
+                        
+                        LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+                        rc =  lnet_parse(gmni->gmni_ni, 
+                                         &msg->gmm_u.immediate.gmim_hdr,
+                                         msg->gmm_srcnid,
+                                         rx, 0);
+                } else if (rc > 0) {
+                        gmnal_version_reply(gmni, rx);
+                        rc = -EPROTO;           /* repost rx */
+                }
+
+                if (rc < 0)                     /* parse failure */
+                        gmnal_post_rx(gmni, rx);
+
+                down(&gmni->gmni_rx_mutex);
        }
 
-       do {
-               CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
-                               sbuf, slen, rbuf, rlen);
-               if (slen > rlen) {
-                       ncalls++;
-                       if (do_copy) {
-                               CDEBUG(D_INFO, "slen>rlen\n");
-                               ltxd = gmnal_get_ltxd(nal_data);
-                               ltxd->srxd = srxd;
-                               GMNAL_GM_LOCK(nal_data);
-                               /* 
-                                *      funny business to get rid 
-                                *      of compiler warning 
-                                */
-                               sbuf_long = (unsigned long) sbuf;
-                               remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
-                                      rlen, GM_LOW_PRIORITY, source_node,
-                                      GMNAL_GM_PORT_ID,
-                                      gmnal_remote_get_callback, ltxd);
-                               GMNAL_GM_UNLOCK(nal_data);
-                       }
-                       /*
-                        *      at the end of 1 iov element
-                        */
-                       sbuf+=rlen;
-                       slen-=rlen;
-                       riov++;
-                       nriov--;
-                       rbuf = riov->iov_base;
-                       rlen = riov->iov_len;
-               } else if (rlen > slen) {
-                       ncalls++;
-                       if (do_copy) {
-                               CDEBUG(D_INFO, "slen<rlen\n");
-                               ltxd = gmnal_get_ltxd(nal_data);
-                               ltxd->srxd = srxd;
-                               GMNAL_GM_LOCK(nal_data);
-                               sbuf_long = (unsigned long) sbuf;
-                               remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
-                                      slen, GM_LOW_PRIORITY, source_node,
-                                      GMNAL_GM_PORT_ID,
-                                      gmnal_remote_get_callback, ltxd);
-                               GMNAL_GM_UNLOCK(nal_data);
-                       }
-                       /*
-                        *      at end of siov element
-                        */
-                       rbuf+=slen;
-                       rlen-=slen;
-                       siov++;
-                       sbuf = siov->iov_base;
-                       slen = siov->iov_len;
-               } else {
-                       ncalls++;
-                       if (do_copy) {
-                               CDEBUG(D_INFO, "rlen=slen\n");
-                               ltxd = gmnal_get_ltxd(nal_data);
-                               ltxd->srxd = srxd;
-                               GMNAL_GM_LOCK(nal_data);
-                               sbuf_long = (unsigned long) sbuf;
-                               remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
-                                      rlen, GM_LOW_PRIORITY, source_node,
-                                      GMNAL_GM_PORT_ID,
-                                      gmnal_remote_get_callback, ltxd);
-                               GMNAL_GM_UNLOCK(nal_data);
-                       }
-                       /*
-                        *      at end of siov and riov element
-                        */
-                       siov++;
-                       sbuf = siov->iov_base;
-                       slen = siov->iov_len;
-                       riov++;
-                       nriov--;
-                       rbuf = riov->iov_base;
-                       rlen = riov->iov_len;
-               }
-
-       } while (nriov);
-       return(ncalls);
-}
+        up(&gmni->gmni_rx_mutex);
 
+       CDEBUG(D_NET, "exiting\n");
+        atomic_dec(&gmni->gmni_nthreads);
+       return 0;
+}
 
-/*
- *     The callback function that is invoked after each gm_get call completes.
- *     Multiple callbacks may be invoked for 1 transaction, only the final
- *     callback has work to do.
- */
 void
-gmnal_remote_get_callback(gm_port_t *gm_port, void *context, 
-                          gm_status_t status)
+gmnal_stop_threads(gmnal_ni_t *gmni)
 {
+        int count = 2;
 
-       gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
-       gmnal_srxd_t    *srxd = ltxd->srxd;
-       lib_nal_t       *libnal = srxd->nal_data->libnal;
-       int             lastone;
-       struct  iovec   *riov;
-       int             nriov;
-       gmnal_data_t    *nal_data;
-
-       CDEBUG(D_TRACE, "called for context [%p]\n", context);
-
-       if (status != GM_SUCCESS) {
-               CDEBUG(D_ERROR, "reports error [%d][%s]\n", status, 
-                      gmnal_gm_error(status));
-       }
-
-       spin_lock(&srxd->callback_lock);
-       srxd->ncallbacks--;
-       srxd->callback_status |= status;
-       lastone = srxd->ncallbacks?0:1;
-       spin_unlock(&srxd->callback_lock);
-       nal_data = srxd->nal_data;
-
-       /*
-        *      everyone returns a send token
-        */
-       gmnal_return_ltxd(nal_data, ltxd);
-
-       if (!lastone) {
-               CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
-               return;
-       }
-       
-       /*
-        *      Let our client application proceed
-        */     
-       CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
-       lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
-
-       /*
-        *      send an ack to the sender to let him know we got the data
-        */
-       gmnal_large_tx_ack(nal_data, srxd);
-
-       /*
-        *      Unregister the memory that was used
-        *      This is a very slow business (slower then register)
-        */
-       nriov = srxd->nriov;
-       riov = srxd->riov;
-       GMNAL_GM_LOCK(nal_data);
-       while (nriov--) {
-               CDEBUG(D_ERROR, "deregister memory [%p]\n", riov->iov_base);
-               if (gm_deregister_memory(srxd->nal_data->gm_port, 
-                                        riov->iov_base, riov->iov_len)) {
-                       CDEBUG(D_ERROR, "failed to deregister memory [%p]\n", 
-                              riov->iov_base);
-               }
-               riov++;
+        gmni->gmni_shutdown = 1;
+        mb();
+        
+        /* wake rxthread owning gmni_rx_mutex with an alarm. */
+       spin_lock(&gmni->gmni_gm_lock);
+       gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
+       spin_unlock(&gmni->gmni_gm_lock);
+
+       while (atomic_read(&gmni->gmni_nthreads) != 0) {
+                count++;
+                if ((count & (count - 1)) == 0)
+                        CWARN("Waiting for %d threads to stop\n",
+                              atomic_read(&gmni->gmni_nthreads));
+                gmnal_yield(1);
        }
-       GMNAL_GM_UNLOCK(nal_data);
-       PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
-
-       /*
-        *      repost the receive buffer (return receive token)
-        */
-       GMNAL_GM_LOCK(nal_data);
-       gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
-                                          srxd->gmsize, GM_LOW_PRIORITY, 0);   
-       GMNAL_GM_UNLOCK(nal_data);
-       
-       return;
 }
 
-
-/*
- *     Called on target node.
- *     After pulling data from a source node
- *     send an ack message to indicate the large transmit is complete.
- */
-void 
-gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
+int
+gmnal_start_threads(gmnal_ni_t *gmni)
 {
+        int     i;
+        int     pid;
 
-       gmnal_stxd_t    *stxd;
-       gmnal_msghdr_t *msghdr;
-       void            *buffer = NULL;
-       unsigned int    local_nid;
-       gm_status_t     gm_status = GM_SUCCESS;
-
-       CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd, 
-              srxd->gm_source_node);
-
-       GMNAL_GM_LOCK(nal_data);
-       gm_status = gm_global_id_to_node_id(nal_data->gm_port, 
-                                           srxd->gm_source_node, &local_nid);
-       GMNAL_GM_UNLOCK(nal_data);
-       if (gm_status != GM_SUCCESS) {
-               CDEBUG(D_ERROR, "Failed to obtain local id\n");
-               return;
-       }
-       CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
-
-       stxd = gmnal_get_stxd(nal_data, 1);
-       CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
-
-       stxd->nal_data = nal_data;
-       stxd->type = GMNAL_LARGE_MESSAGE_ACK;
-
-       /*
-        *      Copy gmnal_msg_hdr and portals header to the transmit buffer
-        *      Then copy the data in
-        */
-       buffer = stxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-
-       /*
-        *      Add in the address of the original stxd from the sender node
-        *      so it knows which thread to notify.
-        */
-       msghdr->magic = GMNAL_MAGIC;
-       msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
-       msghdr->sender_node_id = nal_data->gm_global_nid;
-       msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
-       CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
-       CDEBUG(D_INFO, "sending\n");
-       stxd->msg_size= sizeof(gmnal_msghdr_t);
-
-
-       CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
-              "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
-              "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, 
-              stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
-       GMNAL_GM_LOCK(nal_data);
-       stxd->gm_priority = GM_LOW_PRIORITY;
-       stxd->gm_target_node = local_nid;
-       gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
-                                     stxd->gm_size, stxd->msg_size, 
-                                     GM_LOW_PRIORITY, local_nid, 
-                                     gmnal_large_tx_ack_callback, 
-                                     (void*)stxd);
-       
-       GMNAL_GM_UNLOCK(nal_data);
-       CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
-               
-       return;
-}
+        LASSERT (!gmni->gmni_shutdown);
+        LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
 
+       gm_initialize_alarm(&gmni->gmni_alarm);
 
-/*
- *     A callback to indicate the small transmit operation is compete
- *     Check for errors and try to deal with them.
- *     Call lib_finalise to inform the client application that the 
- *     send is complete and the memory can be reused.
- *     Return the stxd when finished with it (returns a send token)
- */
-void 
-gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context, 
-                            gm_status_t status)
-{
-       gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
+       for (i = 0; i < num_online_cpus(); i++) {
 
-       if (!stxd) {
-               CDEBUG(D_ERROR, "send completion event for unknown stxd\n");
-               return;
-       }
-       CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
-              stxd, status);
-       gmnal_return_stxd(stxd->nal_data, stxd);
-
-       GMNAL_GM_UNLOCK(nal_data);
-       return;
-}
+                pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
+                if (pid < 0) {
+                        CERROR("rx thread failed to start: %d\n", pid);
+                        gmnal_stop_threads(gmni);
+                        return pid;
+                }
 
-/*
- *     Indicates the large transmit operation is compete.
- *     Called on transmit side (means data has been pulled  by receiver 
- *     or failed).
- *     Call lib_finalise to inform the client application that the send 
- *     is complete, deregister the memory and return the stxd. 
- *     Finally, report the rx buffer that the ack message was delivered in.
- */
-void 
-gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
-{
-       lib_nal_t       *libnal = nal_data->libnal;
-       gmnal_stxd_t    *stxd = NULL;
-       gmnal_msghdr_t  *msghdr = NULL;
-       void            *buffer = NULL;
-       struct  iovec   *iov;
-
-
-       CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
-
-       buffer = srxd->buffer;
-       msghdr = (gmnal_msghdr_t*)buffer;
-       stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
-
-       CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
-
-       lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
-
-       /*
-        *      extract the iovec from the stxd, deregister the memory.
-        *      free the space used to store the iovec
-        */
-       iov = stxd->iov;
-       while(stxd->niov--) {
-               CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
-                      iov->iov_base, iov->iov_len);
-               GMNAL_GM_LOCK(nal_data);
-               gm_deregister_memory(nal_data->gm_port, iov->iov_base, 
-                                    iov->iov_len);
-               GMNAL_GM_UNLOCK(nal_data);
-               iov++;
+                atomic_inc(&gmni->gmni_nthreads);
        }
 
-       /*
-        *      return the send token
-        *      TO DO It is bad to hold onto the send token so long?
-        */
-       gmnal_return_stxd(nal_data, stxd);
-
-
-       /*
-        *      requeue the receive buffer 
-        */
-       gmnal_rx_requeue_buffer(nal_data, srxd);
-       
-
-       return;
+       return 0;
 }