Whamcloud - gitweb
- merge 2 weeks of b1_4 fixes onto HEAD
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
index 2bcb853..e1237a8 100644 (file)
 
 #include "qswnal.h"
 
-EP_STATUSBLK  kqswnal_rpc_success;
-EP_STATUSBLK  kqswnal_rpc_failed;
-
 /*
  *  LIB functions follow
  *
  */
-static ptl_err_t
-kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
-             size_t len)
-{
-        CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
-                nal->ni.nid, len, src_addr, dst_addr );
-        memcpy( dst_addr, src_addr, len );
-
-        return (PTL_OK);
-}
-
-static ptl_err_t
-kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
-              size_t len)
-{
-        CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
-                nal->ni.nid, len, src_addr, dst_addr );
-        memcpy( dst_addr, src_addr, len );
-
-        return (PTL_OK);
-}
-
-static void *
-kqswnal_malloc(nal_cb_t *nal, size_t len)
-{
-        void *buf;
-
-        PORTAL_ALLOC(buf, len);
-        return (buf);
-}
-
-static void
-kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
-{
-        PORTAL_FREE(buf, len);
-}
-
-static void
-kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
-{
-        va_list ap;
-        char msg[256];
-
-        va_start (ap, fmt);
-        vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
-        va_end (ap);
-
-        msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
-
-        CDEBUG (D_NET, "%s", msg);
-}
-
-#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
-# error "Can't save/restore irq contexts in different procedures"
-#endif
-
-static void
-kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
-{
-        kqswnal_data_t *data= nal->nal_data;
-
-        spin_lock_irqsave(&data->kqn_statelock, *flags);
-}
-
-
-static void
-kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
-{
-        kqswnal_data_t *data= nal->nal_data;
-
-        spin_unlock_irqrestore(&data->kqn_statelock, *flags);
-}
-
-static void
-kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
-        /* holding kqn_statelock */
-
-        if (eq->event_callback != NULL)
-                eq->event_callback(ev);
-
-        if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
-                wake_up_all(&kqswnal_data.kqn_yield_waitq);
-}
-
 static int
-kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
 {
-        if (nid == nal->ni.nid)
+        if (nid == nal->libnal_ni.ni_pid.nid)
                 *dist = 0;                      /* it's me */
         else if (kqswnal_nid2elanid (nid) >= 0)
                 *dist = 1;                      /* it's my peer */
@@ -212,11 +124,12 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_
         do {
                 int  fraglen = kiov->kiov_len - offset;
 
-                /* nob exactly spans the iovs */
-                LASSERT (fraglen <= nob);
-                /* each frag fits in a page */
+                /* each page frag is contained in one page */
                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
 
+                if (fraglen > nob)
+                        fraglen = nob;
+
                 nmapped++;
                 if (nmapped > maxmapped) {
                         CERROR("Can't map message in %d pages (max %d)\n",
@@ -328,11 +241,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
         
         do {
                 int  fraglen = iov->iov_len - offset;
-                long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
-
-                /* nob exactly spans the iovs */
-                LASSERT (fraglen <= nob);
+                long npages;
                 
+                if (fraglen > nob)
+                        fraglen = nob;
+                npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
+
                 nmapped += npages;
                 if (nmapped > maxmapped) {
                         CERROR("Can't map message in %d pages (max %d)\n",
@@ -519,40 +433,29 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
-        lib_msg_t     *msg;
-        lib_msg_t     *repmsg = NULL;
-
         switch (ktx->ktx_state) {
         case KTX_FORWARDING:       /* router asked me to forward this packet */
                 kpr_fwd_done (&kqswnal_data.kqn_router,
                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
                 break;
 
-        case KTX_SENDING:          /* packet sourced locally */
-                lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
+        case KTX_RDMAING:          /* optimized GET/PUT handled */
+        case KTX_PUTTING:          /* optimized PUT sent */
+        case KTX_SENDING:          /* normal send */
+                lib_finalize (&kqswnal_lib, NULL,
                               (lib_msg_t *)ktx->ktx_args[1],
-                              (error == 0) ? PTL_OK : 
-                              (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
+                              (error == 0) ? PTL_OK : PTL_FAIL);
                 break;
 
-        case KTX_GETTING:          /* Peer has DMA-ed direct? */
-                msg = (lib_msg_t *)ktx->ktx_args[1];
-
-                if (error == 0) {
-                        repmsg = lib_create_reply_msg (&kqswnal_lib, 
-                                                       ktx->ktx_nid, msg);
-                        if (repmsg == NULL)
-                                error = -ENOMEM;
-                }
-                
-                if (error == 0) {
-                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
-                                      msg, PTL_OK);
-                        lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
-                } else {
-                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
-                                      (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
-                }
+        case KTX_GETTING:          /* optimized GET sent & REPLY received */
+                /* Complete the GET with success since we can't avoid
+                 * delivering a REPLY event; we committed to it when we
+                 * launched the GET */
+                lib_finalize (&kqswnal_lib, NULL, 
+                              (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
+                lib_finalize (&kqswnal_lib, NULL,
+                              (lib_msg_t *)ktx->ktx_args[2],
+                              (error == 0) ? PTL_OK : PTL_FAIL);
                 break;
 
         default:
@@ -580,16 +483,27 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
                 kqswnal_notify_peer_down(ktx);
                 status = -EHOSTDOWN;
 
-        } else if (ktx->ktx_state == KTX_GETTING) {
-                /* RPC completed OK; what did our peer put in the status
+        } else switch (ktx->ktx_state) {
+
+        case KTX_GETTING:
+        case KTX_PUTTING:
+                /* RPC completed OK; but what did our peer put in the status
                  * block? */
 #if MULTIRAIL_EKC
                 status = ep_txd_statusblk(txd)->Data[0];
 #else
                 status = ep_txd_statusblk(txd)->Status;
 #endif
-        } else {
+                break;
+                
+        case KTX_FORWARDING:
+        case KTX_SENDING:
                 status = 0;
+                break;
+                
+        default:
+                LBUG();
+                break;
         }
 
         kqswnal_tx_done (ktx, status);
@@ -610,21 +524,20 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                 return (-ESHUTDOWN);
 
         LASSERT (dest >= 0);                    /* must be a peer */
-        if (ktx->ktx_state == KTX_GETTING) {
-                /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
-                 * other frags are the GET sink which we obviously don't
-                 * send here :) */
-#if MULTIRAIL_EKC
+
+        switch (ktx->ktx_state) {
+        case KTX_GETTING:
+        case KTX_PUTTING:
+                /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
+                 * The other frags are the payload, awaiting RDMA */
                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
                                      ktx->ktx_port, attr,
                                      kqswnal_txhandler, ktx,
                                      NULL, ktx->ktx_frags, 1);
-#else
-                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
-                                     ktx->ktx_port, attr, kqswnal_txhandler,
-                                     ktx, NULL, ktx->ktx_frags, 1);
-#endif
-        } else {
+                break;
+
+        case KTX_FORWARDING:
+        case KTX_SENDING:
 #if MULTIRAIL_EKC
                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
                                          ktx->ktx_port, attr,
@@ -636,6 +549,12 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                                        kqswnal_txhandler, ktx, 
                                        ktx->ktx_frags, ktx->ktx_nfrag);
 #endif
+                break;
+                
+        default:
+                LBUG();
+                rc = -EINVAL;                   /* no compiler warning please */
+                break;
         }
 
         switch (rc) {
@@ -658,6 +577,7 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         }
 }
 
+#if 0
 static char *
 hdr_type_string (ptl_hdr_t *hdr)
 {
@@ -726,6 +646,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
         }
 
 }                               /* end of print_hdr() */
+#endif
 
 #if !MULTIRAIL_EKC
 void
@@ -787,114 +708,291 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
         CERROR ("DATAVEC too small\n");
         return (-E2BIG);
 }
+#else
+int
+kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
+                    int nrfrag, EP_NMD *rfrag)
+{
+        int  i;
+
+        if (nlfrag != nrfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       nlfrag, nrfrag);
+                return (-EINVAL);
+        }
+        
+        for (i = 0; i < nlfrag; i++)
+                if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
+                        return (-EINVAL);
+                }
+        
+        return (0);
+}
 #endif
 
-int
-kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
-                   struct iovec *iov, ptl_kiov_t *kiov, 
-                   int offset, int nob)
+kqswnal_remotemd_t *
+kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
 {
-        kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+        ptl_hdr_t          *hdr = (ptl_hdr_t *)buffer;
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
-        int                 rc;
-#if MULTIRAIL_EKC
-        int                 i;
-#else
-        EP_DATAVEC          datav[EP_MAXFRAG];
-        int                 ndatav;
-#endif
-        LASSERT (krx->krx_rpc_reply_needed);
-        LASSERT ((iov == NULL) != (kiov == NULL));
+        ptl_nid_t           nid = kqswnal_rx_nid(krx);
+
+        /* Note (1) lib_parse has already flipped hdr.
+         *      (2) RDMA addresses are sent in native endian-ness.  When
+         *      EKC copes with different endian nodes, I'll fix this (and
+         *      eat my hat :) */
+
+        LASSERT (krx->krx_nob >= sizeof(*hdr));
+
+        if (hdr->type != type) {
+                CERROR ("Unexpected optimized get/put type %d (%d expected)"
+                        "from "LPX64"\n", hdr->type, type, nid);
+                return (NULL);
+        }
+        
+        if (hdr->src_nid != nid) {
+                CERROR ("Unexpected optimized get/put source NID "
+                        LPX64" from "LPX64"\n", hdr->src_nid, nid);
+                return (NULL);
+        }
+
+        LASSERT (nid == expected_nid);
 
-        /* see kqswnal_sendmsg comment regarding endian-ness */
         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
                 /* msg too small to discover rmd size */
                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
-                return (-EINVAL);
+                return (NULL);
         }
-        
+
         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
                 /* rmd doesn't fit in the incoming message */
                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
                         krx->krx_nob, rmd->kqrmd_nfrag,
                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
-                return (-EINVAL);
+                return (NULL);
         }
 
-        /* Map the source data... */
+        return (rmd);
+}
+
+void
+kqswnal_rdma_store_complete (EP_RXD *rxd) 
+{
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (ktx->ktx_state == KTX_RDMAING);
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_decref (krx);
+
+        /* free ktx & finalize() its lib_msg_t */
+        kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
+}
+
+void
+kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
+{
+        /* Completed fetching the PUT data */
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        unsigned long flags;
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (ktx->ktx_state == KTX_RDMAING);
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+
+        /* Set the RPC completion status */
+        status = (status == EP_SUCCESS) ? 0 : -ECONNABORTED;
+        krx->krx_rpc_reply_status = status;
+
+        /* free ktx & finalize() its lib_msg_t */
+        kqswnal_tx_done(ktx, status);
+
+        if (!in_interrupt()) {
+                /* OK to complete the RPC now (iff I had the last ref) */
+                kqswnal_rx_decref (krx);
+                return;
+        }
+
+        LASSERT (krx->krx_state == KRX_PARSE);
+        krx->krx_state = KRX_COMPLETING;
+
+        /* Complete the RPC in thread context */
+        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+
+        list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
+        wake_up (&kqswnal_data.kqn_sched_waitq);
+
+        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+}
+
+int
+kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
+              int niov, struct iovec *iov, ptl_kiov_t *kiov,
+              size_t offset, size_t len)
+{
+        kqswnal_remotemd_t *rmd;
+        kqswnal_tx_t       *ktx;
+        int                 eprc;
+        int                 rc;
+#if !MULTIRAIL_EKC
+        EP_DATAVEC          datav[EP_MAXFRAG];
+        int                 ndatav;
+#endif
+
+        LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
+        /* Not both mapped and paged payload */
+        LASSERT (iov == NULL || kiov == NULL);
+        /* RPC completes with failure by default */
+        LASSERT (krx->krx_rpc_reply_needed);
+        LASSERT (krx->krx_rpc_reply_status != 0);
+
+        rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
+        if (rmd == NULL)
+                return (-EPROTO);
+
+        if (len == 0) {
+                /* data got truncated to nothing. */
+                lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
+                /* Let kqswnal_rx_done() complete the RPC with success */
+                krx->krx_rpc_reply_status = 0;
+                return (0);
+        }
+        
+        /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
+           actually sending a portals message with it */
+        ktx = kqswnal_get_idle_tx(NULL, 0);
+        if (ktx == NULL) {
+                CERROR ("Can't get txd for RDMA with "LPX64"\n",
+                        libmsg->ev.initiator.nid);
+                return (-ENOMEM);
+        }
+
+        ktx->ktx_state   = KTX_RDMAING;
+        ktx->ktx_nid     = libmsg->ev.initiator.nid;
+        ktx->ktx_args[0] = krx;
+        ktx->ktx_args[1] = libmsg;
+
+        /* Start mapping at offset 0 (we're not mapping any headers) */
         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+        
         if (kiov != NULL)
-                rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
+                rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
         else
-                rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
+                rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
 
         if (rc != 0) {
-                CERROR ("Can't map source data: %d\n", rc);
-                return (rc);
+                CERROR ("Can't map local RDMA data: %d\n", rc);
+                goto out;
         }
 
 #if MULTIRAIL_EKC
-        if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
-                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
-                       ktx->ktx_nfrag, rmd->kqrmd_nfrag);
-                return (-EINVAL);
+        rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
+                                 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (rc != 0) {
+                CERROR ("Incompatible RDMA descriptors\n");
+                goto out;
         }
-        
-        for (i = 0; i < rmd->kqrmd_nfrag; i++)
-                if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
-                        CERROR("Can't cope with unequal frags %d(%d):"
-                               " %d local %d remote\n",
-                               i, rmd->kqrmd_nfrag, 
-                               ktx->ktx_frags[i].nmd_len, 
-                               rmd->kqrmd_frag[i].nmd_len);
-                        return (-EINVAL);
-                }
 #else
-        ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
-                                      ktx->ktx_nfrag, ktx->ktx_frags,
-                                      rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        switch (type) {
+        default:
+                LBUG();
+
+        case PTL_MSG_GET:
+                ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+                                             ktx->ktx_nfrag, ktx->ktx_frags,
+                                             rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+                break;
+
+        case PTL_MSG_PUT:
+                ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+                                             rmd->kqrmd_nfrag, rmd->kqrmd_frag,
+                                             ktx->ktx_nfrag, ktx->ktx_frags);
+                break;
+        }
+                
         if (ndatav < 0) {
                 CERROR ("Can't create datavec: %d\n", ndatav);
-                return (ndatav);
+                rc = ndatav;
+                goto out;
         }
 #endif
 
-        /* Our caller will start to race with kqswnal_dma_reply_complete... */
-        LASSERT (atomic_read (&krx->krx_refcount) == 1);
-        atomic_set (&krx->krx_refcount, 2);
+        LASSERT (atomic_read(&krx->krx_refcount) > 0);
+        /* Take an extra ref for the completion callback */
+        atomic_inc(&krx->krx_refcount);
 
-#if MULTIRAIL_EKC
-        rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
-                             &kqswnal_rpc_success,
-                             ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
-        if (rc == EP_SUCCESS)
-                return (0);
+        switch (type) {
+        default:
+                LBUG();
 
-        /* Well we tried... */
-        krx->krx_rpc_reply_needed = 0;
+        case PTL_MSG_GET:
+#if MULTIRAIL_EKC
+                eprc = ep_complete_rpc(krx->krx_rxd, 
+                                       kqswnal_rdma_store_complete, ktx, 
+                                       &kqswnal_data.kqn_rpc_success,
+                                       ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
 #else
-        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
-                              &kqswnal_rpc_success, datav, ndatav);
-        if (rc == EP_SUCCESS)
-                return (0);
-
-        /* "old" EKC destroys rxd on failed completion */
-        krx->krx_rxd = NULL;
+                eprc = ep_complete_rpc (krx->krx_rxd, 
+                                        kqswnal_rdma_store_complete, ktx,
+                                        &kqswnal_data.kqn_rpc_success, 
+                                        datav, ndatav);
+                if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
+                        krx->krx_rxd = NULL;
 #endif
+                if (eprc != EP_SUCCESS) {
+                        CERROR("can't complete RPC: %d\n", eprc);
+                        /* don't re-attempt RPC completion */
+                        krx->krx_rpc_reply_needed = 0;
+                        rc = -ECONNABORTED;
+                }
+                break;
+                
+        case PTL_MSG_PUT:
+#if MULTIRAIL_EKC
+                eprc = ep_rpc_get (krx->krx_rxd, 
+                                   kqswnal_rdma_fetch_complete, ktx,
+                                   rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
+                eprc = ep_rpc_get (krx->krx_rxd,
+                                   kqswnal_rdma_fetch_complete, ktx,
+                                   datav, ndatav);
+#endif
+                if (eprc != EP_SUCCESS) {
+                        CERROR("ep_rpc_get failed: %d\n", eprc);
+                        rc = -ECONNABORTED;
+                }
+                break;
+        }
 
-        CERROR("can't complete RPC: %d\n", rc);
-
-        /* reset refcount back to 1: we're not going to be racing with
-         * kqswnal_dma_reply_complete. */
-        atomic_set (&krx->krx_refcount, 1);
+ out:
+        if (rc != 0) {
+                kqswnal_rx_decref(krx);                 /* drop callback's ref */
+                kqswnal_put_idle_tx (ktx);
+        }
 
-        return (-ECONNABORTED);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc);
 }
 
 static ptl_err_t
-kqswnal_sendmsg (nal_cb_t     *nal,
+kqswnal_sendmsg (lib_nal_t    *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  ptl_hdr_t    *hdr,
@@ -916,6 +1014,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         int                sumoff;
         int                sumnob;
 #endif
+        /* NB 1. hdr is in network byte order */
+        /*    2. 'private' depends on the message type */
         
         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
                " pid %u\n", payload_nob, payload_niov, nid, pid);
@@ -934,6 +1034,15 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_FAIL);
         }
 
+        if (type == PTL_MSG_REPLY &&            /* can I look in 'private' */
+            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
+                /* Must be a REPLY for an optimized GET */
+                rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
+                                   payload_niov, payload_iov, payload_kiov, 
+                                   payload_offset, payload_nob);
+                return ((rc == 0) ? PTL_OK : PTL_FAIL);
+        }
+
         targetnid = nid;
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
@@ -956,35 +1065,16 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                           type == PTL_MSG_REPLY ||
                                           in_interrupt()));
         if (ktx == NULL) {
-                kqswnal_cerror_hdr (hdr);
+                CERROR ("Can't get txd for msg type %d for "LPX64"\n",
+                        type, libmsg->ev.initiator.nid);
                 return (PTL_NO_SPACE);
         }
 
+        ktx->ktx_state   = KTX_SENDING;
         ktx->ktx_nid     = targetnid;
         ktx->ktx_args[0] = private;
         ktx->ktx_args[1] = libmsg;
-
-        if (type == PTL_MSG_REPLY &&
-            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
-                if (nid != targetnid ||
-                    kqswnal_nid2elanid(nid) != 
-                    ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
-                        CERROR("Optimized reply nid conflict: "
-                               "nid "LPX64" via "LPX64" elanID %d\n",
-                               nid, targetnid,
-                               ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        rc = -EINVAL;
-                        goto out;
-                }
-
-                /* peer expects RPC completion with GET data */
-                rc = kqswnal_dma_reply (ktx, payload_niov, 
-                                        payload_iov, payload_kiov, 
-                                        payload_offset, payload_nob);
-                if (rc != 0)
-                        CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
-                goto out;
-        }
+        ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
@@ -1027,28 +1117,31 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
 
-        if (kqswnal_tunables.kqn_optimized_gets &&
-            type == PTL_MSG_GET &&              /* doing a GET */
-            nid == targetnid) {                 /* not forwarding */
+        /* The first frag will be the pre-mapped buffer for (at least) the
+         * portals header. */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+
+        if (nid == targetnid &&                 /* not forwarding */
+            ((type == PTL_MSG_GET &&            /* optimize GET? */
+              kqswnal_tunables.kqn_optimized_gets != 0 &&
+              NTOH__u32(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
+             (type == PTL_MSG_PUT &&            /* optimize PUT? */
+              kqswnal_tunables.kqn_optimized_puts != 0 &&
+              payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
                 lib_md_t           *md = libmsg->md;
                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
                 
-                /* Optimised path: I send over the Elan vaddrs of the get
-                 * sink buffers, and my peer DMAs directly into them.
+                /* Optimised path: I send over the Elan vaddrs of the local
+                 * buffers, and my peer DMAs directly to/from them.
                  *
                  * First I set up ktx as if it was going to send this
                  * payload, (it needs to map it anyway).  This fills
                  * ktx_frags[1] and onward with the network addresses
                  * of the GET sink frags.  I copy these into ktx_buffer,
-                 * immediately after the header, and send that as my GET
-                 * message.
-                 *
-                 * Note that the addresses are sent in native endian-ness.
-                 * When EKC copes with different endian nodes, I'll fix
-                 * this (and eat my hat :) */
+                 * immediately after the header, and send that as my
+                 * message. */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_GETTING;
+                ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
 
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
@@ -1078,12 +1171,21 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
 #endif
+                if (type == PTL_MSG_GET) {
+                        /* Allocate reply message now while I'm in thread context */
+                        ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
+                                                                 nid, libmsg);
+                        if (ktx->ktx_args[2] == NULL)
+                                goto out;
+
+                        /* NB finalizing the REPLY message is my
+                         * responsibility now, whatever happens. */
+                }
+                
         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
 
                 /* small message: single frag copied into the pre-mapped buffer */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_SENDING;
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
                               0, KQSW_HDR_SIZE + payload_nob);
@@ -1105,8 +1207,6 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 
                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_SENDING;
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
                               0, KQSW_HDR_SIZE);
@@ -1135,15 +1235,29 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                rc == 0 ? "Sent" : "Failed to send",
                payload_nob, nid, targetnid, rc);
 
-        if (rc != 0)
+        if (rc != 0) {
+                if (ktx->ktx_state == KTX_GETTING &&
+                    ktx->ktx_args[2] != NULL) {
+                        /* We committed to reply, but there was a problem
+                         * launching the GET.  We can't avoid delivering a
+                         * REPLY event since we committed above, so we
+                         * pretend the GET succeeded but the REPLY
+                         * failed. */
+                        rc = 0;
+                        lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
+                        lib_finalize (&kqswnal_lib, private,
+                                      (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
+                }
+                
                 kqswnal_put_idle_tx (ktx);
-
+        }
+        
         atomic_dec(&kqswnal_data.kqn_pending_txs);
         return (rc == 0 ? PTL_OK : PTL_FAIL);
 }
 
 static ptl_err_t
-kqswnal_send (nal_cb_t     *nal,
+kqswnal_send (lib_nal_t    *nal,
               void         *private,
               lib_msg_t    *libmsg,
               ptl_hdr_t    *hdr,
@@ -1161,7 +1275,7 @@ kqswnal_send (nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_send_pages (nal_cb_t     *nal,
+kqswnal_send_pages (lib_nal_t    *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     ptl_hdr_t    *hdr,
@@ -1200,7 +1314,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
 
-        if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
+        if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
                 nid = fwd->kprfd_target_nid;    /* target is final dest */
 
         if (kqswnal_nid2elanid (nid) < 0) {
@@ -1254,9 +1368,8 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (rc != 0) {
                 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
 
-                kqswnal_put_idle_tx (ktx);
                 /* complete now (with failure) */
-                kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+                kqswnal_tx_done (ktx, rc);
         }
 
         atomic_dec(&kqswnal_data.kqn_pending_txs);
@@ -1277,29 +1390,48 @@ kqswnal_fwd_callback (void *arg, int error)
                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
         }
 
-        kqswnal_requeue_rx (krx);
+        LASSERT (atomic_read(&krx->krx_refcount) == 1);
+        kqswnal_rx_decref (krx);
 }
 
 void
-kqswnal_dma_reply_complete (EP_RXD *rxd) 
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
 {
-        int           status = ep_rxd_status(rxd);
-        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
-        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
-        
-        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
-               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+        LASSERT (!krx->krx_rpc_reply_needed);
 
-        LASSERT (krx->krx_rxd == rxd);
-        LASSERT (krx->krx_rpc_reply_needed);
+        krx->krx_state = KRX_POSTED;
 
-        krx->krx_rpc_reply_needed = 0;
-        kqswnal_rx_done (krx);
+#if MULTIRAIL_EKC
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, 
+                                   kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
+#else                
+        if (kqswnal_data.kqn_shuttingdown)
+                return;
 
-        lib_finalize (&kqswnal_lib, NULL, msg,
-                      (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
-        kqswnal_put_idle_tx (ktx);
+        if (krx->krx_rxd == NULL) {
+                /* We had a failed ep_complete_rpc() which nukes the
+                 * descriptor in "old" EKC */
+                int eprc = ep_queue_receive(krx->krx_eprx, 
+                                            kqswnal_rxhandler, krx,
+                                            krx->krx_elanbuffer, 
+                                            krx->krx_npages * PAGE_SIZE, 0);
+                LASSERT (eprc == EP_SUCCESS);
+                /* We don't handle failure here; it's incredibly rare
+                 * (never reported?) and only happens with "old" EKC */
+        } else {
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   krx->krx_elanbuffer, 
+                                   krx->krx_npages * PAGE_SIZE);
+        }
+#endif
 }
 
 void
@@ -1319,71 +1451,45 @@ kqswnal_rpc_complete (EP_RXD *rxd)
 }
 
 void
-kqswnal_requeue_rx (kqswnal_rx_t *krx) 
+kqswnal_rx_done (kqswnal_rx_t *krx) 
 {
-        int   rc;
+        int           rc;
+        EP_STATUSBLK *sblk;
 
         LASSERT (atomic_read(&krx->krx_refcount) == 0);
 
         if (krx->krx_rpc_reply_needed) {
+                /* We've not completed the peer's RPC yet... */
+                sblk = (krx->krx_rpc_reply_status == 0) ? 
+                       &kqswnal_data.kqn_rpc_success : 
+                       &kqswnal_data.kqn_rpc_failed;
 
-                /* We failed to complete the peer's optimized GET (e.g. we
-                 * couldn't map the source buffers).  We complete the
-                 * peer's EKC rpc now with failure. */
+                LASSERT (!in_interrupt());
 #if MULTIRAIL_EKC
-                rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
-                                     &kqswnal_rpc_failed, NULL, NULL, 0);
+                rc = ep_complete_rpc(krx->krx_rxd, 
+                                     kqswnal_rpc_complete, krx,
+                                     sblk, NULL, NULL, 0);
                 if (rc == EP_SUCCESS)
                         return;
-                
-                CERROR("can't complete RPC: %d\n", rc);
 #else
-                if (krx->krx_rxd != NULL) {
-                        /* We didn't try (and fail) to complete earlier... */
-                        rc = ep_complete_rpc(krx->krx_rxd, 
-                                             kqswnal_rpc_complete, krx,
-                                             &kqswnal_rpc_failed, NULL, 0);
-                        if (rc == EP_SUCCESS)
-                                return;
-
-                        CERROR("can't complete RPC: %d\n", rc);
-                }
-                
-                /* NB the old ep_complete_rpc() frees rxd on failure, so we
-                 * have to requeue from scratch here, unless we're shutting
-                 * down */
-                if (kqswnal_data.kqn_shuttingdown)
+                rc = ep_complete_rpc(krx->krx_rxd, 
+                                     kqswnal_rpc_complete, krx,
+                                     sblk, NULL, 0);
+                if (rc == EP_SUCCESS)
                         return;
 
-                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                                      krx->krx_elanbuffer, 
-                                      krx->krx_npages * PAGE_SIZE, 0);
-                LASSERT (rc == EP_SUCCESS);
-                /* We don't handle failure here; it's incredibly rare
-                 * (never reported?) and only happens with "old" EKC */
-                return;
+                /* "old" EKC destroys rxd on failed completion */
+                krx->krx_rxd = NULL;
 #endif
+                CERROR("can't complete RPC: %d\n", rc);
+                krx->krx_rpc_reply_needed = 0;
         }
 
-#if MULTIRAIL_EKC
-        if (kqswnal_data.kqn_shuttingdown) {
-                /* free EKC rxd on shutdown */
-                ep_complete_receive(krx->krx_rxd);
-        } else {
-                /* repost receive */
-                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
-                                   &krx->krx_elanbuffer, 0);
-        }
-#else                
-        /* don't actually requeue on shutdown */
-        if (!kqswnal_data.kqn_shuttingdown) 
-                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
-                                   krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
-#endif
+        kqswnal_requeue_rx(krx);
 }
         
 void
-kqswnal_rx (kqswnal_rx_t *krx)
+kqswnal_parse (kqswnal_rx_t *krx)
 {
         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
@@ -1391,25 +1497,28 @@ kqswnal_rx (kqswnal_rx_t *krx)
         int             nob;
         int             niov;
 
-        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+        LASSERT (atomic_read(&krx->krx_refcount) == 1);
+
+        if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
+                /* I ignore parse errors since I'm not consuming a byte
+                 * stream */
+                (void)lib_parse (&kqswnal_lib, hdr, krx);
 
-        if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                atomic_set(&krx->krx_refcount, 1);
-                lib_parse (&kqswnal_lib, hdr, krx);
-                kqswnal_rx_done(krx);
+                /* Drop my ref; any RDMA activity takes an additional ref */
+                kqswnal_rx_decref(krx);
                 return;
         }
 
 #if KQSW_CHECKSUM
-        CERROR ("checksums for forwarded packets not implemented\n");
-        LBUG ();
+        LASSERTF (0, "checksums for forwarded packets not implemented\n");
 #endif
+
         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
         {
                 CERROR("dropping packet from "LPX64" for "LPX64
                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
 
-                kqswnal_requeue_rx (krx);
+                kqswnal_rx_decref (krx);
                 return;
         }
 
@@ -1451,7 +1560,9 @@ kqswnal_rxhandler(EP_RXD *rxd)
                rxd, krx, nob, status);
 
         LASSERT (krx != NULL);
-
+        LASSERT (krx->krx_state = KRX_POSTED);
+        
+        krx->krx_state = KRX_PARSE;
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
 #if MULTIRAIL_EKC
@@ -1459,7 +1570,10 @@ kqswnal_rxhandler(EP_RXD *rxd)
 #else
         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
 #endif
-        
+        /* Default to failure if an RPC reply is requested but not handled */
+        krx->krx_rpc_reply_status = -EPROTO;
+        atomic_set (&krx->krx_refcount, 1);
+
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
@@ -1475,12 +1589,12 @@ kqswnal_rxhandler(EP_RXD *rxd)
                         CERROR("receive status failed with status %d nob %d\n",
                                ep_rxd_status(rxd), nob);
 #endif
-                kqswnal_requeue_rx (krx);
+                kqswnal_rx_decref(krx);
                 return;
         }
 
         if (!in_interrupt()) {
-                kqswnal_rx (krx);
+                kqswnal_parse(krx);
                 return;
         }
 
@@ -1540,7 +1654,7 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 #endif
 
 static ptl_err_t
-kqswnal_recvmsg (nal_cb_t     *nal,
+kqswnal_recvmsg (lib_nal_t    *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  unsigned int  niov,
@@ -1552,16 +1666,18 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 {
         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
+        ptl_hdr_t    *hdr = (ptl_hdr_t *)buffer;
         int           page;
         char         *page_ptr;
         int           page_nob;
         char         *iov_ptr;
         int           iov_nob;
         int           frag;
+        int           rc;
 #if KQSW_CHECKSUM
         kqsw_csum_t   senders_csum;
         kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
+        kqsw_csum_t   hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
         size_t        csum_len = mlen;
         int           csum_frags = 0;
         int           csum_nob = 0;
@@ -1574,8 +1690,18 @@ kqswnal_recvmsg (nal_cb_t     *nal,
         if (senders_csum != hdr_csum)
                 kqswnal_csum_error (krx, 1);
 #endif
+        /* NB lib_parse() has already flipped *hdr */
+
         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
 
+        if (krx->krx_rpc_reply_needed &&
+            hdr->type == PTL_MSG_PUT) {
+                /* This must be an optimized PUT */
+                rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
+                                   niov, iov, kiov, offset, mlen);
+                return (rc == 0 ? PTL_OK : PTL_FAIL);
+        }
+
         /* What was actually received must be >= payload. */
         LASSERT (mlen <= rlen);
         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
@@ -1691,7 +1817,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_recv(nal_cb_t     *nal,
+kqswnal_recv(lib_nal_t    *nal,
              void         *private,
              lib_msg_t    *libmsg,
              unsigned int  niov,
@@ -1706,7 +1832,7 @@ kqswnal_recv(nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_recv_pages (nal_cb_t     *nal,
+kqswnal_recv_pages (lib_nal_t    *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     unsigned int  niov,
@@ -1766,7 +1892,18 @@ kqswnal_scheduler (void *arg)
                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
                                                flags);
 
-                        kqswnal_rx (krx);
+                        switch (krx->krx_state) {
+                        case KRX_PARSE:
+                                kqswnal_parse (krx);
+                                break;
+                        case KRX_COMPLETING:
+                                /* Drop last ref to reply to RPC and requeue */
+                                LASSERT (krx->krx_rpc_reply_needed);
+                                kqswnal_rx_decref (krx);
+                                break;
+                        default:
+                                LBUG();
+                        }
 
                         did_something = 1;
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
@@ -1835,20 +1972,12 @@ kqswnal_scheduler (void *arg)
         return (0);
 }
 
-nal_cb_t kqswnal_lib =
+lib_nal_t kqswnal_lib =
 {
-        nal_data:       &kqswnal_data,         /* NAL private data */
-        cb_send:        kqswnal_send,
-        cb_send_pages:  kqswnal_send_pages,
-        cb_recv:        kqswnal_recv,
-        cb_recv_pages:  kqswnal_recv_pages,
-        cb_read:        kqswnal_read,
-        cb_write:       kqswnal_write,
-        cb_malloc:      kqswnal_malloc,
-        cb_free:        kqswnal_free,
-        cb_printf:      kqswnal_printf,
-        cb_cli:         kqswnal_cli,
-        cb_sti:         kqswnal_sti,
-        cb_callback:    kqswnal_callback,
-        cb_dist:        kqswnal_dist
+        libnal_data:       &kqswnal_data,         /* NAL private data */
+        libnal_send:        kqswnal_send,
+        libnal_send_pages:  kqswnal_send_pages,
+        libnal_recv:        kqswnal_recv,
+        libnal_recv_pages:  kqswnal_recv_pages,
+        libnal_dist:        kqswnal_dist
 };