Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
index 96749cd..4c2bd6a 100644 (file)
@@ -33,7 +33,7 @@ EP_STATUSBLK  kqswnal_rpc_failed;
  *  LIB functions follow
  *
  */
-static int
+static ptl_err_t
 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
              size_t len)
 {
@@ -41,10 +41,10 @@ kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
               size_t len)
 {
@@ -52,7 +52,7 @@ kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
 static void *
@@ -157,13 +157,12 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                           kqswnal_data.kqn_eptxdmahandle,
                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
-
 #endif
         ktx->ktx_nmappedpages = 0;
 }
 
 int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
+kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
 {
         int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
@@ -188,8 +187,16 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before 'offset' */
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+
         do {
-                int  fraglen = kiov->kiov_len;
+                int  fraglen = kiov->kiov_len - offset;
 
                 /* nob exactly spans the iovs */
                 LASSERT (fraglen <= nob);
@@ -212,7 +219,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 /* XXX this is really crap, but we'll have to kmap until
                  * EKC has a page (rather than vaddr) mapping interface */
 
-                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, page %d, %d total\n",
@@ -257,6 +264,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 kiov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
@@ -271,7 +279,8 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
 }
 
 int
-kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
+kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
+                    int niov, struct iovec *iov)
 {
         int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
@@ -295,8 +304,16 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before offset */
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
         do {
-                int  fraglen = iov->iov_len;
+                int  fraglen = iov->iov_len - offset;
                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
 
                 /* nob exactly spans the iovs */
@@ -317,12 +334,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
-                        ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
-                        nmapped);
+                       ktx, nfrags, iov->iov_base + offset, fraglen, 
+                       basepage, npages, nmapped);
 
 #if MULTIRAIL_EKC
                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
-                             iov->iov_base, fraglen,
+                             iov->iov_base + offset, fraglen,
                              kqswnal_data.kqn_ep_tx_nmh, basepage,
                              &railmask, &ktx->ktx_frags[nfrags]);
 
@@ -336,7 +353,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 #else
                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
-                                       iov->iov_base, fraglen,
+                                       iov->iov_base + offset, fraglen,
                                        basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
@@ -357,6 +374,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                 iov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
@@ -483,7 +501,7 @@ void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
         lib_msg_t     *msg;
-        lib_msg_t     *repmsg;
+        lib_msg_t     *repmsg = NULL;
 
         switch (ktx->ktx_state) {
         case KTX_FORWARDING:       /* router asked me to forward this packet */
@@ -493,21 +511,29 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 
         case KTX_SENDING:          /* packet sourced locally */
                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
-                              (lib_msg_t *)ktx->ktx_args[1]);
+                              (lib_msg_t *)ktx->ktx_args[1],
+                              (error == 0) ? PTL_OK : 
+                              (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
                 break;
 
         case KTX_GETTING:          /* Peer has DMA-ed direct? */
                 msg = (lib_msg_t *)ktx->ktx_args[1];
-                repmsg = NULL;
 
-                if (error == 0) 
+                if (error == 0) {
                         repmsg = lib_fake_reply_msg (&kqswnal_lib, 
                                                      ktx->ktx_nid, msg->md);
+                        if (repmsg == NULL)
+                                error = -ENOMEM;
+                }
                 
-                lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
-
-                if (repmsg != NULL) 
-                        lib_finalize (&kqswnal_lib, NULL, repmsg);
+                if (error == 0) {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
+                                      msg, PTL_OK);
+                        lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
+                } else {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
+                                      (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+                }
                 break;
 
         default:
@@ -533,7 +559,7 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
                         ktx->ktx_nid, status);
 
                 kqswnal_notify_peer_down(ktx);
-                status = -EIO;
+                status = -EHOSTDOWN;
 
         } else if (ktx->ktx_state == KTX_GETTING) {
                 /* RPC completed OK; what did our peer put in the status
@@ -745,7 +771,8 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
 
 int
 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
-                   struct iovec *iov, ptl_kiov_t *kiov, int nob)
+                   struct iovec *iov, ptl_kiov_t *kiov, 
+                   int offset, int nob)
 {
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
         char               *buffer = (char *)page_address(krx->krx_pages[0]);
@@ -779,9 +806,9 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         /* Map the source data... */
         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
         if (kiov != NULL)
-                rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
+                rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
         else
-                rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
+                rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
 
         if (rc != 0) {
                 CERROR ("Can't map source data: %d\n", rc);
@@ -846,7 +873,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         return (-ECONNABORTED);
 }
 
-static int
+static ptl_err_t
 kqswnal_sendmsg (nal_cb_t     *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
@@ -857,6 +884,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  unsigned int  payload_niov,
                  struct iovec *payload_iov,
                  ptl_kiov_t   *payload_kiov,
+                 size_t        payload_offset,
                  size_t        payload_nob)
 {
         kqswnal_tx_t      *ktx;
@@ -865,6 +893,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         int                i;
         kqsw_csum_t        csum;
+        int                sumoff;
         int                sumnob;
 #endif
         
@@ -928,9 +957,9 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 }
 
                 /* peer expects RPC completion with GET data */
-                rc = kqswnal_dma_reply (ktx,
-                                        payload_niov, payload_iov, 
-                                        payload_kiov, payload_nob);
+                rc = kqswnal_dma_reply (ktx, payload_niov, 
+                                        payload_iov, payload_kiov, 
+                                        payload_offset, payload_nob);
                 if (rc == 0)
                         return (PTL_OK);
                 
@@ -945,22 +974,39 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
-        for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
+        for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
+                LASSERT(i < niov);
                 if (payload_kiov != NULL) {
                         ptl_kiov_t *kiov = &payload_kiov[i];
-                        char       *addr = ((char *)kmap (kiov->kiov_page)) +
-                                           kiov->kiov_offset;
-                        
-                        csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
-                        sumnob -= kiov->kiov_len;
+
+                        if (sumoff >= kiov->kiov_len) {
+                                sumoff -= kiov->kiov_len;
+                        } else {
+                                char *addr = ((char *)kmap (kiov->kiov_page)) +
+                                             kiov->kiov_offset + sumoff;
+                                int   fragnob = kiov->kiov_len - sumoff;
+
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                                kunmap(kiov->kiov_page);
+                        }
                 } else {
                         struct iovec *iov = &payload_iov[i];
 
-                        csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
-                        sumnob -= iov->iov_len;
+                        if (sumoff > iov->iov_len) {
+                                sumoff -= iov->iov_len;
+                        } else {
+                                char *addr = iov->iov_base + sumoff;
+                                int   fragnob = iov->iov_len - sumoff;
+                                
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                        }
                 }
         }
-        memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
+        memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
         
         if (kqswnal_data.kqn_optimized_gets &&
@@ -987,10 +1033,10 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_state = KTX_GETTING;
 
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
-                        rc = kqswnal_map_tx_kiov (ktx, md->length,
+                        rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
                                                   md->md_niov, md->md_iov.kiov);
                 else
-                        rc = kqswnal_map_tx_iov (ktx, md->length,
+                        rc = kqswnal_map_tx_iov (ktx, 0, md->length,
                                                  md->md_niov, md->md_iov.iov);
 
                 if (rc < 0) {
@@ -1033,10 +1079,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 if (payload_nob > 0) {
                         if (payload_kiov != NULL)
                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                   payload_niov, payload_kiov, payload_nob);
+                                                   payload_niov, payload_kiov, 
+                                                   payload_offset, payload_nob);
                         else
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                  payload_niov, payload_iov, payload_nob);
+                                                  payload_niov, payload_iov, 
+                                                  payload_offset, payload_nob);
                 }
         } else {
 
@@ -1052,10 +1100,10 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
 #endif
                 if (payload_kiov != NULL)
-                        rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
+                        rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
                                                   payload_niov, payload_kiov);
                 else
-                        rc = kqswnal_map_tx_iov (ktx, payload_nob,
+                        rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
                                                  payload_niov, payload_iov);
                 if (rc != 0) {
                         kqswnal_put_idle_tx (ktx);
@@ -1078,7 +1126,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_send (nal_cb_t     *nal,
               void         *private,
               lib_msg_t    *libmsg,
@@ -1088,13 +1136,15 @@ kqswnal_send (nal_cb_t     *nal,
               ptl_pid_t     pid,
               unsigned int  payload_niov,
               struct iovec *payload_iov,
+              size_t        payload_offset,
               size_t        payload_nob)
 {
         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
-                                 payload_niov, payload_iov, NULL, payload_nob));
+                                 payload_niov, payload_iov, NULL, 
+                                 payload_offset, payload_nob));
 }
 
-static int
+static ptl_err_t
 kqswnal_send_pages (nal_cb_t     *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
@@ -1104,10 +1154,12 @@ kqswnal_send_pages (nal_cb_t     *nal,
                     ptl_pid_t     pid,
                     unsigned int  payload_niov,
                     ptl_kiov_t   *payload_kiov,
+                    size_t        payload_offset,
                     size_t        payload_nob)
 {
         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
-                                 payload_niov, NULL, payload_kiov, payload_nob));
+                                 payload_niov, NULL, payload_kiov, 
+                                 payload_offset, payload_nob));
 }
 
 void
@@ -1161,7 +1213,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
             nob <= KQSW_TX_BUFFER_SIZE) 
         {
                 /* send from ktx's pre-mapped contiguous buffer? */
-                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
+                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob);
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
                               0, nob);
@@ -1176,7 +1228,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         {
                 /* zero copy */
                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
-                rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
+                rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov);
                 if (rc != 0)
                         goto failed;
 
@@ -1231,7 +1283,8 @@ kqswnal_dma_reply_complete (EP_RXD *rxd)
         krx->krx_rpc_reply_needed = 0;
         kqswnal_rx_done (krx);
 
-        lib_finalize (&kqswnal_lib, NULL, msg);
+        lib_finalize (&kqswnal_lib, NULL, msg,
+                      (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
         kqswnal_put_idle_tx (ktx);
 }
 
@@ -1461,13 +1514,14 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 }
 #endif
 
-static int
+static ptl_err_t
 kqswnal_recvmsg (nal_cb_t     *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  unsigned int  niov,
                  struct iovec *iov,
                  ptl_kiov_t   *kiov,
+                 size_t        offset,
                  size_t        mlen,
                  size_t        rlen)
 {
@@ -1498,10 +1552,13 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #endif
         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
 
-        /* What was actually received must be >= payload.
-         * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
-        LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
+        /* What was actually received must be >= payload. */
         LASSERT (mlen <= rlen);
+        if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
+                CERROR("Bad message size: have %d, need %d + %d\n",
+                       krx->krx_nob, KQSW_HDR_SIZE, mlen);
+                return (PTL_FAIL);
+        }
 
         /* It must be OK to kmap() if required */
         LASSERT (kiov == NULL || !in_interrupt ());
@@ -1516,20 +1573,37 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
 
                 LASSERT (niov > 0);
+                
                 if (kiov != NULL) {
-                        iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
-                        iov_nob = kiov->kiov_len;
+                        /* skip complete frags */
+                        while (offset >= kiov->kiov_len) {
+                                offset -= kiov->kiov_len;
+                                kiov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
+                        iov_nob = kiov->kiov_len - offset;
                 } else {
-                        iov_ptr = iov->iov_base;
-                        iov_nob = iov->iov_len;
+                        /* skip complete frags */
+                        while (offset >= iov->iov_len) {
+                                offset -= iov->iov_len;
+                                iov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = iov->iov_base + offset;
+                        iov_nob = iov->iov_len - offset;
                 }
-
+                
                 for (;;)
                 {
-                        /* We expect the iov to exactly match mlen */
-                        LASSERT (iov_nob <= mlen);
-                        
-                        frag = MIN (page_nob, iov_nob);
+                        frag = mlen;
+                        if (frag > page_nob)
+                                frag = page_nob;
+                        if (frag > iov_nob)
+                                frag = iov_nob;
+
                         memcpy (iov_ptr, page_ptr, frag);
 #if KQSW_CHECKSUM
                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
@@ -1588,33 +1662,39 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                        "csum_nob %d\n",
                         hdr_csum, payload_csum, csum_frags, csum_nob);
 #endif
-        lib_finalize(nal, private, libmsg);
+        lib_finalize(nal, private, libmsg, PTL_OK);
 
-        return (rlen);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_recv(nal_cb_t     *nal,
              void         *private,
              lib_msg_t    *libmsg,
              unsigned int  niov,
              struct iovec *iov,
+             size_t        offset,
              size_t        mlen,
              size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, iov, NULL, 
+                                offset, mlen, rlen));
 }
 
-static int
+static ptl_err_t
 kqswnal_recv_pages (nal_cb_t     *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     unsigned int  niov,
                     ptl_kiov_t   *kiov,
+                    size_t        offset,
                     size_t        mlen,
                     size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, NULL, kiov, 
+                                offset, mlen, rlen));
 }
 
 int