Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal_cb.c
index 478c25f..61c88f6 100644 (file)
@@ -85,6 +85,9 @@ kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
         CDEBUG (D_NET, "%s", msg);
 }
 
+#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
+# error "Can't save/restore irq contexts in different procedures"
+#endif
 
 static void
 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
@@ -103,6 +106,17 @@ kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
 }
 
+static void
+kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
+{
+        /* holding kqn_statelock */
+
+        if (eq->event_callback != NULL)
+                eq->event_callback(ev);
+
+        if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
+                wake_up_all(&kqswnal_data.kqn_yield_waitq);
+}
 
 static int
 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
@@ -513,15 +527,15 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
                               (lib_msg_t *)ktx->ktx_args[1],
                               (error == 0) ? PTL_OK : 
-                              (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+                              (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
                 break;
 
         case KTX_GETTING:          /* Peer has DMA-ed direct? */
                 msg = (lib_msg_t *)ktx->ktx_args[1];
 
                 if (error == 0) {
-                        repmsg = lib_fake_reply_msg (&kqswnal_lib, 
-                                                     ktx->ktx_nid, msg->md);
+                        repmsg = lib_create_reply_msg (&kqswnal_lib, 
+                                                       ktx->ktx_nid, msg);
                         if (repmsg == NULL)
                                 error = -ENOMEM;
                 }
@@ -532,7 +546,7 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
                         lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
                 } else {
                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
-                                      (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+                                      (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
                 }
                 break;
 
@@ -775,7 +789,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                    int offset, int nob)
 {
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        char               *buffer = (char *)page_address(krx->krx_pages[0]);
+        char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
         int                 rc;
 #if MULTIRAIL_EKC
@@ -937,7 +951,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                           in_interrupt()));
         if (ktx == NULL) {
                 kqswnal_cerror_hdr (hdr);
-                return (PTL_NOSPACE);
+                return (PTL_NO_SPACE);
         }
 
         ktx->ktx_nid     = targetnid;
@@ -1008,7 +1022,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         }
         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
-        
+
         if (kqswnal_data.kqn_optimized_gets &&
             type == PTL_MSG_GET &&              /* doing a GET */
             nid == targetnid) {                 /* not forwarding */
@@ -1167,7 +1181,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
         int             rc;
         kqswnal_tx_t   *ktx;
-        struct iovec   *iov = fwd->kprfd_iov;
+        ptl_kiov_t     *kiov = fwd->kprfd_kiov;
         int             niov = fwd->kprfd_niov;
         int             nob = fwd->kprfd_nob;
         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
@@ -1177,11 +1191,9 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         LBUG ();
 #endif
         /* The router wants this NAL to forward a packet */
-        CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
+        CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
                 fwd, nid, niov, nob);
 
-        LASSERT (niov > 0);
-        
         ktx = kqswnal_get_idle_tx (fwd, 0);
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
@@ -1195,44 +1207,44 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 goto failed;
         }
 
-        if (nob > KQSW_NRXMSGBYTES_LARGE) {
-                CERROR ("Can't forward [%p] to "LPX64
-                        ": size %d bigger than max packet size %ld\n",
-                        fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
-                rc = -EMSGSIZE;
-                goto failed;
-        }
+        /* copy hdr into pre-mapped buffer */
+        memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
+        ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
-        ktx->ktx_port    = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+        ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
         ktx->ktx_nid     = nid;
         ktx->ktx_state   = KTX_FORWARDING;
         ktx->ktx_args[0] = fwd;
+        ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
 
-        if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
-            nob <= KQSW_TX_BUFFER_SIZE) 
+        if (nob <= KQSW_TX_MAXCONTIG) 
         {
-                /* send from ktx's pre-mapped contiguous buffer? */
-                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob);
+                /* send payload from ktx's pre-mapped contiguous buffer */
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
-                              0, nob);
+                              0, KQSW_HDR_SIZE + nob);
 #else
                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
-                ktx->ktx_frags[0].Len = nob;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
 #endif
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
+                if (nob > 0)
+                        lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
+                                          niov, kiov, 0, nob);
         }
         else
         {
-                /* zero copy */
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
-                rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov);
+                /* zero copy payload */
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
                         goto failed;
-
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
         rc = kqswnal_launch (ktx);
@@ -1257,7 +1269,7 @@ kqswnal_fwd_callback (void *arg, int error)
 
         if (error != 0)
         {
-                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
@@ -1371,8 +1383,9 @@ kqswnal_requeue_rx (kqswnal_rx_t *krx)
 void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
-        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
+        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
+        int             payload_nob;
         int             nob;
         int             niov;
 
@@ -1398,16 +1411,26 @@ kqswnal_rx (kqswnal_rx_t *krx)
                 return;
         }
 
-        /* NB forwarding may destroy iov; rebuild every time */
-        for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
-        {
-                LASSERT (niov < krx->krx_npages);
-                krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
-                krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
+        nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
+        niov = 0;
+        if (nob > 0) {
+                krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
+                krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
+                niov = 1;
+                nob -= PAGE_SIZE - KQSW_HDR_SIZE;
+                
+                while (nob > 0) {
+                        LASSERT (niov < krx->krx_npages);
+                        
+                        krx->krx_kiov[niov].kiov_offset = 0;
+                        krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
+                        niov++;
+                        nob -= PAGE_SIZE;
+                }
         }
 
-        kpr_fwd_init (&krx->krx_fwd, dest_nid,
-                      krx->krx_nob, niov, krx->krx_iov,
+        kpr_fwd_init (&krx->krx_fwd, dest_nid, 
+                      hdr, payload_nob, niov, krx->krx_kiov,
                       kqswnal_fwd_callback, krx);
 
         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
@@ -1471,7 +1494,7 @@ kqswnal_rxhandler(EP_RXD *rxd)
 void
 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 {
-        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
                 ", dpid %d, spid %d, type %d\n",
@@ -1526,6 +1549,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                  size_t        rlen)
 {
         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+        char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
         int           page;
         char         *page_ptr;
         int           page_nob;
@@ -1535,8 +1559,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         kqsw_csum_t   senders_csum;
         kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
-                                           sizeof(ptl_hdr_t));
+        kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
         size_t        csum_len = mlen;
         int           csum_frags = 0;
         int           csum_nob = 0;
@@ -1545,8 +1568,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 
         atomic_inc (&csum_counter);
 
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                                sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
         if (senders_csum != hdr_csum)
                 kqswnal_csum_error (krx, 1);
 #endif
@@ -1567,8 +1589,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 
         if (mlen != 0) {
                 page     = 0;
-                page_ptr = ((char *) page_address(krx->krx_pages[0])) +
-                        KQSW_HDR_SIZE;
+                page_ptr = buffer + KQSW_HDR_SIZE;
                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
 
                 LASSERT (niov > 0);
@@ -1621,7 +1642,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                         {
                                 page++;
                                 LASSERT (page < krx->krx_npages);
-                                page_ptr = page_address(krx->krx_pages[page]);
+                                page_ptr = page_address(krx->krx_kiov[page].kiov_page);
                                 page_nob = PAGE_SIZE;
                         }
 
@@ -1649,8 +1670,8 @@ kqswnal_recvmsg (nal_cb_t     *nal,
         }
 
 #if KQSW_CHECKSUM
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
+                sizeof(kqsw_csum_t));
 
         if (csum_len != rlen)
                 CERROR("Unable to checksum data in user's buffer\n");
@@ -1838,5 +1859,6 @@ nal_cb_t kqswnal_lib =
         cb_printf:      kqswnal_printf,
         cb_cli:         kqswnal_cli,
         cb_sti:         kqswnal_sti,
+        cb_callback:    kqswnal_callback,
         cb_dist:        kqswnal_dist
 };