X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Fklnds%2Fqswlnd%2Fqswlnd_cb.c;h=157dc706e14ea2ccc97fdaba861088a1c4930067;hb=f6594c21c4101ad0115620240a838715e3096d36;hp=96749cd7e0ab7205db6522bcf77d2c95e762fa17;hpb=3c3a1473fd0f42653cc2a6f6ef7e560f29009241;p=fs%2Flustre-release.git diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c index 96749cd..157dc70 100644 --- a/lnet/klnds/qswlnd/qswlnd_cb.c +++ b/lnet/klnds/qswlnd/qswlnd_cb.c @@ -33,7 +33,7 @@ EP_STATUSBLK kqswnal_rpc_failed; * LIB functions follow * */ -static int +static ptl_err_t kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, size_t len) { @@ -41,10 +41,10 @@ kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, nal->ni.nid, len, src_addr, dst_addr ); memcpy( dst_addr, src_addr, len ); - return (0); + return (PTL_OK); } -static int +static ptl_err_t kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, size_t len) { @@ -52,7 +52,7 @@ kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, nal->ni.nid, len, src_addr, dst_addr ); memcpy( dst_addr, src_addr, len ); - return (0); + return (PTL_OK); } static void * @@ -157,13 +157,12 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eptxdmahandle, ktx->ktx_basepage, ktx->ktx_nmappedpages); - #endif ktx->ktx_nmappedpages = 0; } int -kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) +kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov) { int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; @@ -188,8 +187,16 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) LASSERT (niov > 0); LASSERT (nob > 0); + /* skip complete frags before 'offset' */ + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + do { - int fraglen = kiov->kiov_len; + int fraglen = kiov->kiov_len - offset; /* nob exactly spans the iovs */ LASSERT (fraglen <= nob); @@ -212,7 +219,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) /* XXX this is really crap, but we'll have to kmap until * EKC has a page (rather than vaddr) mapping interface */ - ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; + ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; CDEBUG(D_NET, "%p[%d] loading %p for %d, page %d, %d total\n", @@ -257,6 +264,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) kiov++; niov--; nob -= fraglen; + offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); @@ -271,7 +279,8 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) } int -kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) +kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, + int niov, struct iovec *iov) { int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; @@ -295,8 +304,16 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) LASSERT (niov > 0); LASSERT (nob > 0); + /* skip complete frags before offset */ + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + do { - int fraglen = iov->iov_len; + int fraglen = iov->iov_len - offset; long npages = kqswnal_pages_spanned (iov->iov_base, fraglen); /* nob exactly spans the iovs */ @@ -317,12 +334,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) CDEBUG(D_NET, "%p[%d] loading %p for %d, pages %d for %ld, %d total\n", - ktx, nfrags, iov->iov_base, fraglen, basepage, npages, - nmapped); + ktx, nfrags, iov->iov_base + offset, fraglen, + basepage, npages, nmapped); #if MULTIRAIL_EKC ep_dvma_load(kqswnal_data.kqn_ep, NULL, - iov->iov_base, fraglen, + iov->iov_base + offset, fraglen, kqswnal_data.kqn_ep_tx_nmh, basepage, &railmask, &ktx->ktx_frags[nfrags]); @@ -336,7 +353,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) #else elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eptxdmahandle, - iov->iov_base, fraglen, + iov->iov_base + offset, fraglen, basepage, &ktx->ktx_frags[nfrags].Base); if (nfrags > 0 && /* previous frag mapped */ @@ -357,6 +374,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) iov++; niov--; nob -= fraglen; + offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); @@ -483,7 +501,7 @@ void kqswnal_tx_done (kqswnal_tx_t *ktx, int error) { lib_msg_t *msg; - lib_msg_t *repmsg; + lib_msg_t *repmsg = NULL; switch (ktx->ktx_state) { case KTX_FORWARDING: /* router asked me to forward this packet */ @@ -493,21 +511,29 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) case KTX_SENDING: /* packet sourced locally */ lib_finalize (&kqswnal_lib, ktx->ktx_args[0], - (lib_msg_t *)ktx->ktx_args[1]); + (lib_msg_t *)ktx->ktx_args[1], + (error == 0) ? PTL_OK : + (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); break; case KTX_GETTING: /* Peer has DMA-ed direct? */ msg = (lib_msg_t *)ktx->ktx_args[1]; - repmsg = NULL; - if (error == 0) + if (error == 0) { repmsg = lib_fake_reply_msg (&kqswnal_lib, ktx->ktx_nid, msg->md); + if (repmsg == NULL) + error = -ENOMEM; + } - lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg); - - if (repmsg != NULL) - lib_finalize (&kqswnal_lib, NULL, repmsg); + if (error == 0) { + lib_finalize (&kqswnal_lib, ktx->ktx_args[0], + msg, PTL_OK); + lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK); + } else { + lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg, + (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); + } break; default: @@ -533,7 +559,7 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) ktx->ktx_nid, status); kqswnal_notify_peer_down(ktx); - status = -EIO; + status = -EHOSTDOWN; } else if (ktx->ktx_state == KTX_GETTING) { /* RPC completed OK; what did our peer put in the status @@ -745,10 +771,11 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv, int kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, - struct iovec *iov, ptl_kiov_t *kiov, int nob) + struct iovec *iov, ptl_kiov_t *kiov, + int offset, int nob) { kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; - char *buffer = (char *)page_address(krx->krx_pages[0]); + char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page); kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); int rc; #if MULTIRAIL_EKC @@ -779,9 +806,9 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, /* Map the source data... */ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; if (kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov); + rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov); else - rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov); + rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov); if (rc != 0) { CERROR ("Can't map source data: %d\n", rc); @@ -846,7 +873,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, return (-ECONNABORTED); } -static int +static ptl_err_t kqswnal_sendmsg (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -857,6 +884,7 @@ kqswnal_sendmsg (nal_cb_t *nal, unsigned int payload_niov, struct iovec *payload_iov, ptl_kiov_t *payload_kiov, + size_t payload_offset, size_t payload_nob) { kqswnal_tx_t *ktx; @@ -865,6 +893,7 @@ kqswnal_sendmsg (nal_cb_t *nal, #if KQSW_CHECKSUM int i; kqsw_csum_t csum; + int sumoff; int sumnob; #endif @@ -928,9 +957,9 @@ kqswnal_sendmsg (nal_cb_t *nal, } /* peer expects RPC completion with GET data */ - rc = kqswnal_dma_reply (ktx, - payload_niov, payload_iov, - payload_kiov, payload_nob); + rc = kqswnal_dma_reply (ktx, payload_niov, + payload_iov, payload_kiov, + payload_offset, payload_nob); if (rc == 0) return (PTL_OK); @@ -945,24 +974,41 @@ kqswnal_sendmsg (nal_cb_t *nal, #if KQSW_CHECKSUM csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr)); memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum)); - for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) { + for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) { + LASSERT(i < niov); if (payload_kiov != NULL) { ptl_kiov_t *kiov = &payload_kiov[i]; - char *addr = ((char *)kmap (kiov->kiov_page)) + - kiov->kiov_offset; - - csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len)); - sumnob -= kiov->kiov_len; + + if (sumoff >= kiov->kiov_len) { + sumoff -= kiov->kiov_len; + } else { + char *addr = ((char *)kmap (kiov->kiov_page)) + + kiov->kiov_offset + sumoff; + int fragnob = kiov->kiov_len - sumoff; + + csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob)); + sumnob -= fragnob; + sumoff = 0; + kunmap(kiov->kiov_page); + } } else { struct iovec *iov = &payload_iov[i]; - csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len)); - sumnob -= iov->iov_len; + if (sumoff > iov->iov_len) { + sumoff -= iov->iov_len; + } else { + char *addr = iov->iov_base + sumoff; + int fragnob = iov->iov_len - sumoff; + + csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob)); + sumnob -= fragnob; + sumoff = 0; + } } } - memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum)); + memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum)); #endif - + if (kqswnal_data.kqn_optimized_gets && type == PTL_MSG_GET && /* doing a GET */ nid == targetnid) { /* not forwarding */ @@ -987,10 +1033,10 @@ kqswnal_sendmsg (nal_cb_t *nal, ktx->ktx_state = KTX_GETTING; if ((libmsg->md->options & PTL_MD_KIOV) != 0) - rc = kqswnal_map_tx_kiov (ktx, md->length, + rc = kqswnal_map_tx_kiov (ktx, 0, md->length, md->md_niov, md->md_iov.kiov); else - rc = kqswnal_map_tx_iov (ktx, md->length, + rc = kqswnal_map_tx_iov (ktx, 0, md->length, md->md_niov, md->md_iov.iov); if (rc < 0) { @@ -1033,10 +1079,12 @@ kqswnal_sendmsg (nal_cb_t *nal, if (payload_nob > 0) { if (payload_kiov != NULL) lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, - payload_niov, payload_kiov, payload_nob); + payload_niov, payload_kiov, + payload_offset, payload_nob); else lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, - payload_niov, payload_iov, payload_nob); + payload_niov, payload_iov, + payload_offset, payload_nob); } } else { @@ -1052,10 +1100,10 @@ kqswnal_sendmsg (nal_cb_t *nal, ktx->ktx_frags[0].Len = KQSW_HDR_SIZE; #endif if (payload_kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, payload_nob, + rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, payload_niov, payload_kiov); else - rc = kqswnal_map_tx_iov (ktx, payload_nob, + rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, payload_niov, payload_iov); if (rc != 0) { kqswnal_put_idle_tx (ktx); @@ -1078,7 +1126,7 @@ kqswnal_sendmsg (nal_cb_t *nal, return (PTL_OK); } -static int +static ptl_err_t kqswnal_send (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -1088,13 +1136,15 @@ kqswnal_send (nal_cb_t *nal, ptl_pid_t pid, unsigned int payload_niov, struct iovec *payload_iov, + size_t payload_offset, size_t payload_nob) { return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, - payload_niov, payload_iov, NULL, payload_nob)); + payload_niov, payload_iov, NULL, + payload_offset, payload_nob)); } -static int +static ptl_err_t kqswnal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -1104,10 +1154,12 @@ kqswnal_send_pages (nal_cb_t *nal, ptl_pid_t pid, unsigned int payload_niov, ptl_kiov_t *payload_kiov, + size_t payload_offset, size_t payload_nob) { return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, - payload_niov, NULL, payload_kiov, payload_nob)); + payload_niov, NULL, payload_kiov, + payload_offset, payload_nob)); } void @@ -1115,7 +1167,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { int rc; kqswnal_tx_t *ktx; - struct iovec *iov = fwd->kprfd_iov; + ptl_kiov_t *kiov = fwd->kprfd_kiov; int niov = fwd->kprfd_niov; int nob = fwd->kprfd_nob; ptl_nid_t nid = fwd->kprfd_gateway_nid; @@ -1125,11 +1177,9 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) LBUG (); #endif /* The router wants this NAL to forward a packet */ - CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n", + CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n", fwd, nid, niov, nob); - LASSERT (niov > 0); - ktx = kqswnal_get_idle_tx (fwd, 0); if (ktx == NULL) /* can't get txd right now */ return; /* fwd will be scheduled when tx desc freed */ @@ -1143,44 +1193,44 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) goto failed; } - if (nob > KQSW_NRXMSGBYTES_LARGE) { - CERROR ("Can't forward [%p] to "LPX64 - ": size %d bigger than max packet size %ld\n", - fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE); - rc = -EMSGSIZE; - goto failed; - } + /* copy hdr into pre-mapped buffer */ + memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t)); + ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; - ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ? + ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; ktx->ktx_nid = nid; ktx->ktx_state = KTX_FORWARDING; ktx->ktx_args[0] = fwd; + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) && - nob <= KQSW_TX_BUFFER_SIZE) + if (nob <= KQSW_TX_MAXCONTIG) { - /* send from ktx's pre-mapped contiguous buffer? */ - lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob); + /* send payload from ktx's pre-mapped contiguous buffer */ #if MULTIRAIL_EKC ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, - 0, nob); + 0, KQSW_HDR_SIZE + nob); #else ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; - ktx->ktx_frags[0].Len = nob; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob; #endif - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; + if (nob > 0) + lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE, + niov, kiov, 0, nob); } else { - /* zero copy */ - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; - rc = kqswnal_map_tx_iov (ktx, nob, niov, iov); + /* zero copy payload */ +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE; +#endif + rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov); if (rc != 0) goto failed; - - ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base; } rc = kqswnal_launch (ktx); @@ -1205,7 +1255,7 @@ kqswnal_fwd_callback (void *arg, int error) if (error != 0) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n", NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error); @@ -1231,7 +1281,8 @@ kqswnal_dma_reply_complete (EP_RXD *rxd) krx->krx_rpc_reply_needed = 0; kqswnal_rx_done (krx); - lib_finalize (&kqswnal_lib, NULL, msg); + lib_finalize (&kqswnal_lib, NULL, msg, + (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL); kqswnal_put_idle_tx (ktx); } @@ -1318,8 +1369,9 @@ kqswnal_requeue_rx (kqswnal_rx_t *krx) void kqswnal_rx (kqswnal_rx_t *krx) { - ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page); ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid); + int payload_nob; int nob; int niov; @@ -1345,16 +1397,26 @@ kqswnal_rx (kqswnal_rx_t *krx) return; } - /* NB forwarding may destroy iov; rebuild every time */ - for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++) - { - LASSERT (niov < krx->krx_npages); - krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]); - krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob); + nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE; + niov = 0; + if (nob > 0) { + krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE; + krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob); + niov = 1; + nob -= PAGE_SIZE - KQSW_HDR_SIZE; + + while (nob > 0) { + LASSERT (niov < krx->krx_npages); + + krx->krx_kiov[niov].kiov_offset = 0; + krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob); + niov++; + nob -= PAGE_SIZE; + } } - kpr_fwd_init (&krx->krx_fwd, dest_nid, - krx->krx_nob, niov, krx->krx_iov, + kpr_fwd_init (&krx->krx_fwd, dest_nid, + hdr, payload_nob, niov, krx->krx_kiov, kqswnal_fwd_callback, krx); kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd); @@ -1418,7 +1480,7 @@ kqswnal_rxhandler(EP_RXD *rxd) void kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64 ", dpid %d, spid %d, type %d\n", @@ -1461,17 +1523,19 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) } #endif -static int +static ptl_err_t kqswnal_recvmsg (nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { kqswnal_rx_t *krx = (kqswnal_rx_t *)private; + char *buffer = page_address(krx->krx_kiov[0].kiov_page); int page; char *page_ptr; int page_nob; @@ -1481,8 +1545,7 @@ kqswnal_recvmsg (nal_cb_t *nal, #if KQSW_CHECKSUM kqsw_csum_t senders_csum; kqsw_csum_t payload_csum = 0; - kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]), - sizeof(ptl_hdr_t)); + kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t)); size_t csum_len = mlen; int csum_frags = 0; int csum_nob = 0; @@ -1491,45 +1554,63 @@ kqswnal_recvmsg (nal_cb_t *nal, atomic_inc (&csum_counter); - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); if (senders_csum != hdr_csum) kqswnal_csum_error (krx, 1); #endif CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen); - /* What was actually received must be >= payload. - * This is an LASSERT, as lib_finalize() doesn't have a completion status. */ - LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen); + /* What was actually received must be >= payload. */ LASSERT (mlen <= rlen); + if (krx->krx_nob < KQSW_HDR_SIZE + mlen) { + CERROR("Bad message size: have %d, need %d + %d\n", + krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen); + return (PTL_FAIL); + } /* It must be OK to kmap() if required */ LASSERT (kiov == NULL || !in_interrupt ()); /* Either all pages or all vaddrs */ LASSERT (!(kiov != NULL && iov != NULL)); - - if (mlen != 0) - { + + if (mlen != 0) { page = 0; - page_ptr = ((char *) page_address(krx->krx_pages[0])) + - KQSW_HDR_SIZE; + page_ptr = buffer + KQSW_HDR_SIZE; page_nob = PAGE_SIZE - KQSW_HDR_SIZE; LASSERT (niov > 0); + if (kiov != NULL) { - iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; - iov_nob = kiov->kiov_len; + /* skip complete frags */ + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + iov_ptr = ((char *)kmap (kiov->kiov_page)) + + kiov->kiov_offset + offset; + iov_nob = kiov->kiov_len - offset; } else { - iov_ptr = iov->iov_base; - iov_nob = iov->iov_len; + /* skip complete frags */ + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + iov_ptr = iov->iov_base + offset; + iov_nob = iov->iov_len - offset; } - + for (;;) { - /* We expect the iov to exactly match mlen */ - LASSERT (iov_nob <= mlen); - - frag = MIN (page_nob, iov_nob); + frag = mlen; + if (frag > page_nob) + frag = page_nob; + if (frag > iov_nob) + frag = iov_nob; + memcpy (iov_ptr, page_ptr, frag); #if KQSW_CHECKSUM payload_csum = kqsw_csum (payload_csum, iov_ptr, frag); @@ -1547,7 +1628,7 @@ kqswnal_recvmsg (nal_cb_t *nal, { page++; LASSERT (page < krx->krx_npages); - page_ptr = page_address(krx->krx_pages[page]); + page_ptr = page_address(krx->krx_kiov[page].kiov_page); page_nob = PAGE_SIZE; } @@ -1575,8 +1656,8 @@ kqswnal_recvmsg (nal_cb_t *nal, } #if KQSW_CHECKSUM - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), + sizeof(kqsw_csum_t)); if (csum_len != rlen) CERROR("Unable to checksum data in user's buffer\n"); @@ -1588,33 +1669,39 @@ kqswnal_recvmsg (nal_cb_t *nal, "csum_nob %d\n", hdr_csum, payload_csum, csum_frags, csum_nob); #endif - lib_finalize(nal, private, libmsg); + lib_finalize(nal, private, libmsg, PTL_OK); - return (rlen); + return (PTL_OK); } -static int +static ptl_err_t kqswnal_recv(nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen)); + return (kqswnal_recvmsg(nal, private, libmsg, + niov, iov, NULL, + offset, mlen, rlen)); } -static int +static ptl_err_t kqswnal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen)); + return (kqswnal_recvmsg(nal, private, libmsg, + niov, NULL, kiov, + offset, mlen, rlen)); } int