Whamcloud - gitweb
* Applied the last patch in Bug 2306, which changes the portals router/NAL
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
index 478c25f..157dc70 100644 (file)
@@ -775,7 +775,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                    int offset, int nob)
 {
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        char               *buffer = (char *)page_address(krx->krx_pages[0]);
+        char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
         int                 rc;
 #if MULTIRAIL_EKC
@@ -1008,7 +1008,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         }
         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
-        
+
         if (kqswnal_data.kqn_optimized_gets &&
             type == PTL_MSG_GET &&              /* doing a GET */
             nid == targetnid) {                 /* not forwarding */
@@ -1167,7 +1167,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
         int             rc;
         kqswnal_tx_t   *ktx;
-        struct iovec   *iov = fwd->kprfd_iov;
+        ptl_kiov_t     *kiov = fwd->kprfd_kiov;
         int             niov = fwd->kprfd_niov;
         int             nob = fwd->kprfd_nob;
         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
@@ -1177,11 +1177,9 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         LBUG ();
 #endif
         /* The router wants this NAL to forward a packet */
-        CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
+        CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
                 fwd, nid, niov, nob);
 
-        LASSERT (niov > 0);
-        
         ktx = kqswnal_get_idle_tx (fwd, 0);
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
@@ -1195,44 +1193,44 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 goto failed;
         }
 
-        if (nob > KQSW_NRXMSGBYTES_LARGE) {
-                CERROR ("Can't forward [%p] to "LPX64
-                        ": size %d bigger than max packet size %ld\n",
-                        fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
-                rc = -EMSGSIZE;
-                goto failed;
-        }
+        /* copy hdr into pre-mapped buffer */
+        memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
+        ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
-        ktx->ktx_port    = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+        ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
         ktx->ktx_nid     = nid;
         ktx->ktx_state   = KTX_FORWARDING;
         ktx->ktx_args[0] = fwd;
+        ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
 
-        if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
-            nob <= KQSW_TX_BUFFER_SIZE) 
+        if (nob <= KQSW_TX_MAXCONTIG) 
         {
-                /* send from ktx's pre-mapped contiguous buffer? */
-                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob);
+                /* send payload from ktx's pre-mapped contiguous buffer */
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
-                              0, nob);
+                              0, KQSW_HDR_SIZE + nob);
 #else
                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
-                ktx->ktx_frags[0].Len = nob;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
 #endif
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
+                if (nob > 0)
+                        lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
+                                          niov, kiov, 0, nob);
         }
         else
         {
-                /* zero copy */
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
-                rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov);
+                /* zero copy payload */
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
                         goto failed;
-
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
         rc = kqswnal_launch (ktx);
@@ -1257,7 +1255,7 @@ kqswnal_fwd_callback (void *arg, int error)
 
         if (error != 0)
         {
-                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
@@ -1371,8 +1369,9 @@ kqswnal_requeue_rx (kqswnal_rx_t *krx)
 void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
-        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
+        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
+        int             payload_nob;
         int             nob;
         int             niov;
 
@@ -1398,16 +1397,26 @@ kqswnal_rx (kqswnal_rx_t *krx)
                 return;
         }
 
-        /* NB forwarding may destroy iov; rebuild every time */
-        for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
-        {
-                LASSERT (niov < krx->krx_npages);
-                krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
-                krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
+        nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
+        niov = 0;
+        if (nob > 0) {
+                krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
+                krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
+                niov = 1;
+                nob -= PAGE_SIZE - KQSW_HDR_SIZE;
+                
+                while (nob > 0) {
+                        LASSERT (niov < krx->krx_npages);
+                        
+                        krx->krx_kiov[niov].kiov_offset = 0;
+                        krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
+                        niov++;
+                        nob -= PAGE_SIZE;
+                }
         }
 
-        kpr_fwd_init (&krx->krx_fwd, dest_nid,
-                      krx->krx_nob, niov, krx->krx_iov,
+        kpr_fwd_init (&krx->krx_fwd, dest_nid, 
+                      hdr, payload_nob, niov, krx->krx_kiov,
                       kqswnal_fwd_callback, krx);
 
         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
@@ -1471,7 +1480,7 @@ kqswnal_rxhandler(EP_RXD *rxd)
 void
 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 {
-        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
                 ", dpid %d, spid %d, type %d\n",
@@ -1526,6 +1535,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                  size_t        rlen)
 {
         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+        char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
         int           page;
         char         *page_ptr;
         int           page_nob;
@@ -1535,8 +1545,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         kqsw_csum_t   senders_csum;
         kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
-                                           sizeof(ptl_hdr_t));
+        kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
         size_t        csum_len = mlen;
         int           csum_frags = 0;
         int           csum_nob = 0;
@@ -1545,8 +1554,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 
         atomic_inc (&csum_counter);
 
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                                sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
         if (senders_csum != hdr_csum)
                 kqswnal_csum_error (krx, 1);
 #endif
@@ -1567,8 +1575,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 
         if (mlen != 0) {
                 page     = 0;
-                page_ptr = ((char *) page_address(krx->krx_pages[0])) +
-                        KQSW_HDR_SIZE;
+                page_ptr = buffer + KQSW_HDR_SIZE;
                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
 
                 LASSERT (niov > 0);
@@ -1621,7 +1628,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                         {
                                 page++;
                                 LASSERT (page < krx->krx_npages);
-                                page_ptr = page_address(krx->krx_pages[page]);
+                                page_ptr = page_address(krx->krx_kiov[page].kiov_page);
                                 page_nob = PAGE_SIZE;
                         }
 
@@ -1649,8 +1656,8 @@ kqswnal_recvmsg (nal_cb_t     *nal,
         }
 
 #if KQSW_CHECKSUM
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
+                sizeof(kqsw_csum_t));
 
         if (csum_len != rlen)
                 CERROR("Unable to checksum data in user's buffer\n");