Whamcloud - gitweb
* Applied the last patch in Bug 2306, which changes the portals router/NAL
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
index 96749cd..157dc70 100644 (file)
@@ -33,7 +33,7 @@ EP_STATUSBLK  kqswnal_rpc_failed;
  *  LIB functions follow
  *
  */
-static int
+static ptl_err_t
 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
              size_t len)
 {
@@ -41,10 +41,10 @@ kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
               size_t len)
 {
@@ -52,7 +52,7 @@ kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
 static void *
@@ -157,13 +157,12 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                           kqswnal_data.kqn_eptxdmahandle,
                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
-
 #endif
         ktx->ktx_nmappedpages = 0;
 }
 
 int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
+kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
 {
         int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
@@ -188,8 +187,16 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before 'offset' */
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+
         do {
-                int  fraglen = kiov->kiov_len;
+                int  fraglen = kiov->kiov_len - offset;
 
                 /* nob exactly spans the iovs */
                 LASSERT (fraglen <= nob);
@@ -212,7 +219,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 /* XXX this is really crap, but we'll have to kmap until
                  * EKC has a page (rather than vaddr) mapping interface */
 
-                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, page %d, %d total\n",
@@ -257,6 +264,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 kiov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
@@ -271,7 +279,8 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
 }
 
 int
-kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
+kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
+                    int niov, struct iovec *iov)
 {
         int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
@@ -295,8 +304,16 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before offset */
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
         do {
-                int  fraglen = iov->iov_len;
+                int  fraglen = iov->iov_len - offset;
                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
 
                 /* nob exactly spans the iovs */
@@ -317,12 +334,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
-                        ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
-                        nmapped);
+                       ktx, nfrags, iov->iov_base + offset, fraglen, 
+                       basepage, npages, nmapped);
 
 #if MULTIRAIL_EKC
                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
-                             iov->iov_base, fraglen,
+                             iov->iov_base + offset, fraglen,
                              kqswnal_data.kqn_ep_tx_nmh, basepage,
                              &railmask, &ktx->ktx_frags[nfrags]);
 
@@ -336,7 +353,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 #else
                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
-                                       iov->iov_base, fraglen,
+                                       iov->iov_base + offset, fraglen,
                                        basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
@@ -357,6 +374,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                 iov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
@@ -483,7 +501,7 @@ void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
         lib_msg_t     *msg;
-        lib_msg_t     *repmsg;
+        lib_msg_t     *repmsg = NULL;
 
         switch (ktx->ktx_state) {
         case KTX_FORWARDING:       /* router asked me to forward this packet */
@@ -493,21 +511,29 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 
         case KTX_SENDING:          /* packet sourced locally */
                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
-                              (lib_msg_t *)ktx->ktx_args[1]);
+                              (lib_msg_t *)ktx->ktx_args[1],
+                              (error == 0) ? PTL_OK : 
+                              (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
                 break;
 
         case KTX_GETTING:          /* Peer has DMA-ed direct? */
                 msg = (lib_msg_t *)ktx->ktx_args[1];
-                repmsg = NULL;
 
-                if (error == 0) 
+                if (error == 0) {
                         repmsg = lib_fake_reply_msg (&kqswnal_lib, 
                                                      ktx->ktx_nid, msg->md);
+                        if (repmsg == NULL)
+                                error = -ENOMEM;
+                }
                 
-                lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
-
-                if (repmsg != NULL) 
-                        lib_finalize (&kqswnal_lib, NULL, repmsg);
+                if (error == 0) {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
+                                      msg, PTL_OK);
+                        lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
+                } else {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
+                                      (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+                }
                 break;
 
         default:
@@ -533,7 +559,7 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
                         ktx->ktx_nid, status);
 
                 kqswnal_notify_peer_down(ktx);
-                status = -EIO;
+                status = -EHOSTDOWN;
 
         } else if (ktx->ktx_state == KTX_GETTING) {
                 /* RPC completed OK; what did our peer put in the status
@@ -745,10 +771,11 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
 
 int
 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
-                   struct iovec *iov, ptl_kiov_t *kiov, int nob)
+                   struct iovec *iov, ptl_kiov_t *kiov, 
+                   int offset, int nob)
 {
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        char               *buffer = (char *)page_address(krx->krx_pages[0]);
+        char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
         int                 rc;
 #if MULTIRAIL_EKC
@@ -779,9 +806,9 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         /* Map the source data... */
         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
         if (kiov != NULL)
-                rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
+                rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
         else
-                rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
+                rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
 
         if (rc != 0) {
                 CERROR ("Can't map source data: %d\n", rc);
@@ -846,7 +873,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         return (-ECONNABORTED);
 }
 
-static int
+static ptl_err_t
 kqswnal_sendmsg (nal_cb_t     *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
@@ -857,6 +884,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  unsigned int  payload_niov,
                  struct iovec *payload_iov,
                  ptl_kiov_t   *payload_kiov,
+                 size_t        payload_offset,
                  size_t        payload_nob)
 {
         kqswnal_tx_t      *ktx;
@@ -865,6 +893,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         int                i;
         kqsw_csum_t        csum;
+        int                sumoff;
         int                sumnob;
 #endif
         
@@ -928,9 +957,9 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 }
 
                 /* peer expects RPC completion with GET data */
-                rc = kqswnal_dma_reply (ktx,
-                                        payload_niov, payload_iov, 
-                                        payload_kiov, payload_nob);
+                rc = kqswnal_dma_reply (ktx, payload_niov, 
+                                        payload_iov, payload_kiov, 
+                                        payload_offset, payload_nob);
                 if (rc == 0)
                         return (PTL_OK);
                 
@@ -945,24 +974,41 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
-        for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
+        for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
+                LASSERT(i < niov);
                 if (payload_kiov != NULL) {
                         ptl_kiov_t *kiov = &payload_kiov[i];
-                        char       *addr = ((char *)kmap (kiov->kiov_page)) +
-                                           kiov->kiov_offset;
-                        
-                        csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
-                        sumnob -= kiov->kiov_len;
+
+                        if (sumoff >= kiov->kiov_len) {
+                                sumoff -= kiov->kiov_len;
+                        } else {
+                                char *addr = ((char *)kmap (kiov->kiov_page)) +
+                                             kiov->kiov_offset + sumoff;
+                                int   fragnob = kiov->kiov_len - sumoff;
+
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                                kunmap(kiov->kiov_page);
+                        }
                 } else {
                         struct iovec *iov = &payload_iov[i];
 
-                        csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
-                        sumnob -= iov->iov_len;
+                        if (sumoff > iov->iov_len) {
+                                sumoff -= iov->iov_len;
+                        } else {
+                                char *addr = iov->iov_base + sumoff;
+                                int   fragnob = iov->iov_len - sumoff;
+                                
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                        }
                 }
         }
-        memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
+        memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
-        
+
         if (kqswnal_data.kqn_optimized_gets &&
             type == PTL_MSG_GET &&              /* doing a GET */
             nid == targetnid) {                 /* not forwarding */
@@ -987,10 +1033,10 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_state = KTX_GETTING;
 
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
-                        rc = kqswnal_map_tx_kiov (ktx, md->length,
+                        rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
                                                   md->md_niov, md->md_iov.kiov);
                 else
-                        rc = kqswnal_map_tx_iov (ktx, md->length,
+                        rc = kqswnal_map_tx_iov (ktx, 0, md->length,
                                                  md->md_niov, md->md_iov.iov);
 
                 if (rc < 0) {
@@ -1033,10 +1079,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 if (payload_nob > 0) {
                         if (payload_kiov != NULL)
                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                   payload_niov, payload_kiov, payload_nob);
+                                                   payload_niov, payload_kiov, 
+                                                   payload_offset, payload_nob);
                         else
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                  payload_niov, payload_iov, payload_nob);
+                                                  payload_niov, payload_iov, 
+                                                  payload_offset, payload_nob);
                 }
         } else {
 
@@ -1052,10 +1100,10 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
 #endif
                 if (payload_kiov != NULL)
-                        rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
+                        rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
                                                   payload_niov, payload_kiov);
                 else
-                        rc = kqswnal_map_tx_iov (ktx, payload_nob,
+                        rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
                                                  payload_niov, payload_iov);
                 if (rc != 0) {
                         kqswnal_put_idle_tx (ktx);
@@ -1078,7 +1126,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_send (nal_cb_t     *nal,
               void         *private,
               lib_msg_t    *libmsg,
@@ -1088,13 +1136,15 @@ kqswnal_send (nal_cb_t     *nal,
               ptl_pid_t     pid,
               unsigned int  payload_niov,
               struct iovec *payload_iov,
+              size_t        payload_offset,
               size_t        payload_nob)
 {
         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
-                                 payload_niov, payload_iov, NULL, payload_nob));
+                                 payload_niov, payload_iov, NULL, 
+                                 payload_offset, payload_nob));
 }
 
-static int
+static ptl_err_t
 kqswnal_send_pages (nal_cb_t     *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
@@ -1104,10 +1154,12 @@ kqswnal_send_pages (nal_cb_t     *nal,
                     ptl_pid_t     pid,
                     unsigned int  payload_niov,
                     ptl_kiov_t   *payload_kiov,
+                    size_t        payload_offset,
                     size_t        payload_nob)
 {
         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
-                                 payload_niov, NULL, payload_kiov, payload_nob));
+                                 payload_niov, NULL, payload_kiov, 
+                                 payload_offset, payload_nob));
 }
 
 void
@@ -1115,7 +1167,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
         int             rc;
         kqswnal_tx_t   *ktx;
-        struct iovec   *iov = fwd->kprfd_iov;
+        ptl_kiov_t     *kiov = fwd->kprfd_kiov;
         int             niov = fwd->kprfd_niov;
         int             nob = fwd->kprfd_nob;
         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
@@ -1125,11 +1177,9 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         LBUG ();
 #endif
         /* The router wants this NAL to forward a packet */
-        CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
+        CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
                 fwd, nid, niov, nob);
 
-        LASSERT (niov > 0);
-        
         ktx = kqswnal_get_idle_tx (fwd, 0);
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
@@ -1143,44 +1193,44 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 goto failed;
         }
 
-        if (nob > KQSW_NRXMSGBYTES_LARGE) {
-                CERROR ("Can't forward [%p] to "LPX64
-                        ": size %d bigger than max packet size %ld\n",
-                        fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
-                rc = -EMSGSIZE;
-                goto failed;
-        }
+        /* copy hdr into pre-mapped buffer */
+        memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
+        ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
-        ktx->ktx_port    = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+        ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
         ktx->ktx_nid     = nid;
         ktx->ktx_state   = KTX_FORWARDING;
         ktx->ktx_args[0] = fwd;
+        ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
 
-        if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
-            nob <= KQSW_TX_BUFFER_SIZE) 
+        if (nob <= KQSW_TX_MAXCONTIG) 
         {
-                /* send from ktx's pre-mapped contiguous buffer? */
-                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
+                /* send payload from ktx's pre-mapped contiguous buffer */
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
-                              0, nob);
+                              0, KQSW_HDR_SIZE + nob);
 #else
                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
-                ktx->ktx_frags[0].Len = nob;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
 #endif
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
+                if (nob > 0)
+                        lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
+                                          niov, kiov, 0, nob);
         }
         else
         {
-                /* zero copy */
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
-                rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
+                /* zero copy payload */
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
                         goto failed;
-
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
         rc = kqswnal_launch (ktx);
@@ -1205,7 +1255,7 @@ kqswnal_fwd_callback (void *arg, int error)
 
         if (error != 0)
         {
-                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
@@ -1231,7 +1281,8 @@ kqswnal_dma_reply_complete (EP_RXD *rxd)
         krx->krx_rpc_reply_needed = 0;
         kqswnal_rx_done (krx);
 
-        lib_finalize (&kqswnal_lib, NULL, msg);
+        lib_finalize (&kqswnal_lib, NULL, msg,
+                      (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
         kqswnal_put_idle_tx (ktx);
 }
 
@@ -1318,8 +1369,9 @@ kqswnal_requeue_rx (kqswnal_rx_t *krx)
 void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
-        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
+        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
+        int             payload_nob;
         int             nob;
         int             niov;
 
@@ -1345,16 +1397,26 @@ kqswnal_rx (kqswnal_rx_t *krx)
                 return;
         }
 
-        /* NB forwarding may destroy iov; rebuild every time */
-        for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
-        {
-                LASSERT (niov < krx->krx_npages);
-                krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
-                krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
+        nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
+        niov = 0;
+        if (nob > 0) {
+                krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
+                krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
+                niov = 1;
+                nob -= PAGE_SIZE - KQSW_HDR_SIZE;
+                
+                while (nob > 0) {
+                        LASSERT (niov < krx->krx_npages);
+                        
+                        krx->krx_kiov[niov].kiov_offset = 0;
+                        krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
+                        niov++;
+                        nob -= PAGE_SIZE;
+                }
         }
 
-        kpr_fwd_init (&krx->krx_fwd, dest_nid,
-                      krx->krx_nob, niov, krx->krx_iov,
+        kpr_fwd_init (&krx->krx_fwd, dest_nid, 
+                      hdr, payload_nob, niov, krx->krx_kiov,
                       kqswnal_fwd_callback, krx);
 
         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
@@ -1418,7 +1480,7 @@ kqswnal_rxhandler(EP_RXD *rxd)
 void
 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 {
-        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
                 ", dpid %d, spid %d, type %d\n",
@@ -1461,17 +1523,19 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 }
 #endif
 
-static int
+static ptl_err_t
 kqswnal_recvmsg (nal_cb_t     *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  unsigned int  niov,
                  struct iovec *iov,
                  ptl_kiov_t   *kiov,
+                 size_t        offset,
                  size_t        mlen,
                  size_t        rlen)
 {
         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+        char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
         int           page;
         char         *page_ptr;
         int           page_nob;
@@ -1481,8 +1545,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         kqsw_csum_t   senders_csum;
         kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
-                                           sizeof(ptl_hdr_t));
+        kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
         size_t        csum_len = mlen;
         int           csum_frags = 0;
         int           csum_nob = 0;
@@ -1491,45 +1554,63 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 
         atomic_inc (&csum_counter);
 
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                                sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
         if (senders_csum != hdr_csum)
                 kqswnal_csum_error (krx, 1);
 #endif
         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
 
-        /* What was actually received must be >= payload.
-         * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
-        LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
+        /* What was actually received must be >= payload. */
         LASSERT (mlen <= rlen);
+        if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
+                CERROR("Bad message size: have %d, need %d + %d\n",
+                       krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
+                return (PTL_FAIL);
+        }
 
         /* It must be OK to kmap() if required */
         LASSERT (kiov == NULL || !in_interrupt ());
         /* Either all pages or all vaddrs */
         LASSERT (!(kiov != NULL && iov != NULL));
-        
-        if (mlen != 0)
-        {
+
+        if (mlen != 0) {
                 page     = 0;
-                page_ptr = ((char *) page_address(krx->krx_pages[0])) +
-                        KQSW_HDR_SIZE;
+                page_ptr = buffer + KQSW_HDR_SIZE;
                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
 
                 LASSERT (niov > 0);
+
                 if (kiov != NULL) {
-                        iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
-                        iov_nob = kiov->kiov_len;
+                        /* skip complete frags */
+                        while (offset >= kiov->kiov_len) {
+                                offset -= kiov->kiov_len;
+                                kiov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = ((char *)kmap (kiov->kiov_page)) +
+                                kiov->kiov_offset + offset;
+                        iov_nob = kiov->kiov_len - offset;
                 } else {
-                        iov_ptr = iov->iov_base;
-                        iov_nob = iov->iov_len;
+                        /* skip complete frags */
+                        while (offset >= iov->iov_len) {
+                                offset -= iov->iov_len;
+                                iov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = iov->iov_base + offset;
+                        iov_nob = iov->iov_len - offset;
                 }
-
+                
                 for (;;)
                 {
-                        /* We expect the iov to exactly match mlen */
-                        LASSERT (iov_nob <= mlen);
-                        
-                        frag = MIN (page_nob, iov_nob);
+                        frag = mlen;
+                        if (frag > page_nob)
+                                frag = page_nob;
+                        if (frag > iov_nob)
+                                frag = iov_nob;
+
                         memcpy (iov_ptr, page_ptr, frag);
 #if KQSW_CHECKSUM
                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
@@ -1547,7 +1628,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                         {
                                 page++;
                                 LASSERT (page < krx->krx_npages);
-                                page_ptr = page_address(krx->krx_pages[page]);
+                                page_ptr = page_address(krx->krx_kiov[page].kiov_page);
                                 page_nob = PAGE_SIZE;
                         }
 
@@ -1575,8 +1656,8 @@ kqswnal_recvmsg (nal_cb_t     *nal,
         }
 
 #if KQSW_CHECKSUM
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
+                sizeof(kqsw_csum_t));
 
         if (csum_len != rlen)
                 CERROR("Unable to checksum data in user's buffer\n");
@@ -1588,33 +1669,39 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                        "csum_nob %d\n",
                         hdr_csum, payload_csum, csum_frags, csum_nob);
 #endif
-        lib_finalize(nal, private, libmsg);
+        lib_finalize(nal, private, libmsg, PTL_OK);
 
-        return (rlen);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_recv(nal_cb_t     *nal,
              void         *private,
              lib_msg_t    *libmsg,
              unsigned int  niov,
              struct iovec *iov,
+             size_t        offset,
              size_t        mlen,
              size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, iov, NULL, 
+                                offset, mlen, rlen));
 }
 
-static int
+static ptl_err_t
 kqswnal_recv_pages (nal_cb_t     *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     unsigned int  niov,
                     ptl_kiov_t   *kiov,
+                    size_t        offset,
                     size_t        mlen,
                     size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, NULL, kiov, 
+                                offset, mlen, rlen));
 }
 
 int