From f6594c21c4101ad0115620240a838715e3096d36 Mon Sep 17 00:00:00 2001 From: eeb Date: Fri, 27 Feb 2004 16:43:23 +0000 Subject: [PATCH] * Applied the last patch in Bug 2306, which changes the portals router/NAL forwarding interface from a single struct iovec for the message header + payload, to passing the message header by pointer, and the message body by ptl_kiov_t (== page,offset,size). This restores portals router performance by allowing it to use socknal zero copy again. * Applied the socknal connection load balancing fix from Bug 2817 --- lnet/include/linux/kp30.h | 18 ++-- lnet/klnds/qswlnd/qswlnd.c | 21 ++--- lnet/klnds/qswlnd/qswlnd.h | 3 +- lnet/klnds/qswlnd/qswlnd_cb.c | 101 +++++++++++----------- lnet/klnds/socklnd/socklnd.c | 45 +++++----- lnet/klnds/socklnd/socklnd.h | 31 +++---- lnet/klnds/socklnd/socklnd_cb.c | 134 ++++++++++++------------------ lnet/router/router.c | 5 +- lustre/portals/include/linux/kp30.h | 18 ++-- lustre/portals/knals/qswnal/qswnal.c | 21 ++--- lustre/portals/knals/qswnal/qswnal.h | 3 +- lustre/portals/knals/qswnal/qswnal_cb.c | 101 +++++++++++----------- lustre/portals/knals/socknal/socknal.c | 45 +++++----- lustre/portals/knals/socknal/socknal.h | 31 +++---- lustre/portals/knals/socknal/socknal_cb.c | 134 ++++++++++++------------------ lustre/portals/router/router.c | 5 +- 16 files changed, 342 insertions(+), 374 deletions(-) diff --git a/lnet/include/linux/kp30.h b/lnet/include/linux/kp30.h index 75e83b4..53828de 100644 --- a/lnet/include/linux/kp30.h +++ b/lnet/include/linux/kp30.h @@ -362,13 +362,14 @@ typedef struct { struct list_head kprfd_list; /* stash in queues (routing target can use) */ ptl_nid_t kprfd_target_nid; /* final destination NID */ ptl_nid_t kprfd_gateway_nid; /* gateway NID */ - int kprfd_nob; /* # message bytes (including header) */ - int kprfd_niov; /* # message frags (including header) */ - struct iovec *kprfd_iov; /* message fragments */ - void *kprfd_router_arg; // originating NAL's router arg + ptl_hdr_t *kprfd_hdr; /* header in wire byte order */ + int kprfd_nob; /* # payload bytes */ + int kprfd_niov; /* # payload frags */ + ptl_kiov_t *kprfd_kiov; /* payload fragments */ + void *kprfd_router_arg; /* originating NAL's router arg */ kpr_fwd_callback_t kprfd_callback; /* completion callback */ void *kprfd_callback_arg; /* completion callback arg */ - kprfd_scratch_t kprfd_scratch; // scratchpad for routing targets + kprfd_scratch_t kprfd_scratch; /* scratchpad for routing targets */ } kpr_fwd_desc_t; typedef void (*kpr_fwd_t)(void *arg, kpr_fwd_desc_t *fwd); @@ -471,15 +472,16 @@ kpr_lookup (kpr_router_t *router, ptl_nid_t nid, int nob, ptl_nid_t *gateway_nid } static inline void -kpr_fwd_init (kpr_fwd_desc_t *fwd, ptl_nid_t nid, - int nob, int niov, struct iovec *iov, +kpr_fwd_init (kpr_fwd_desc_t *fwd, ptl_nid_t nid, ptl_hdr_t *hdr, + int nob, int niov, ptl_kiov_t *kiov, kpr_fwd_callback_t callback, void *callback_arg) { fwd->kprfd_target_nid = nid; fwd->kprfd_gateway_nid = nid; + fwd->kprfd_hdr = hdr; fwd->kprfd_nob = nob; fwd->kprfd_niov = niov; - fwd->kprfd_iov = iov; + fwd->kprfd_kiov = kiov; fwd->kprfd_callback = callback; fwd->kprfd_callback_arg = callback_arg; } diff --git a/lnet/klnds/qswlnd/qswlnd.c b/lnet/klnds/qswlnd/qswlnd.c index 90c9a95..3b3b5d4 100644 --- a/lnet/klnds/qswlnd/qswlnd.c +++ b/lnet/klnds/qswlnd/qswlnd.c @@ -348,10 +348,10 @@ kqswnal_finalise (void) for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) { kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i]; - /* If krx_pages[0] got allocated, it got mapped. + /* If krx_kiov[0].kiov_page got allocated, it got mapped. * NB subsequent pages get merged */ - if (krx->krx_pages[0] != NULL) + if (krx->krx_kiov[0].kiov_page != NULL) ep_dvma_unload(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh, &krx->krx_elanbuffer); @@ -416,8 +416,8 @@ kqswnal_finalise (void) kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i]; for (j = 0; j < krx->krx_npages; j++) - if (krx->krx_pages[j] != NULL) - __free_page (krx->krx_pages[j]); + if (krx->krx_kiov[j].kiov_page != NULL) + __free_page (krx->krx_kiov[j].kiov_page); } PORTAL_FREE(kqswnal_data.kqn_rxds, @@ -709,18 +709,19 @@ kqswnal_initialise (void) LASSERT (krx->krx_npages > 0); for (j = 0; j < krx->krx_npages; j++) { - krx->krx_pages[j] = alloc_page(GFP_KERNEL); - if (krx->krx_pages[j] == NULL) - { + struct page *page = alloc_page(GFP_KERNEL); + + if (page == NULL) { kqswnal_finalise (); return (-ENOMEM); } - LASSERT(page_address(krx->krx_pages[j]) != NULL); + krx->krx_kiov[j].kiov_page = page; + LASSERT(page_address(page) != NULL); #if MULTIRAIL_EKC ep_dvma_load(kqswnal_data.kqn_ep, NULL, - page_address(krx->krx_pages[j]), + page_address(page), PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh, elan_page_idx, &all_rails, &elanbuffer); @@ -736,7 +737,7 @@ kqswnal_initialise (void) #else elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eprxdmahandle, - page_address(krx->krx_pages[j]), + page_address(page), PAGE_SIZE, elan_page_idx, &elanbuffer); if (j == 0) diff --git a/lnet/klnds/qswlnd/qswlnd.h b/lnet/klnds/qswlnd/qswlnd.h index b1b9a45..5ebf30a 100644 --- a/lnet/klnds/qswlnd/qswlnd.h +++ b/lnet/klnds/qswlnd/qswlnd.h @@ -153,8 +153,7 @@ typedef struct int krx_rpc_reply_sent; /* rpc reply sent */ atomic_t krx_refcount; /* how to tell when rpc is done */ kpr_fwd_desc_t krx_fwd; /* embedded forwarding descriptor */ - struct page *krx_pages[KQSW_NRXMSGPAGES_LARGE]; /* pages allocated */ - struct iovec krx_iov[KQSW_NRXMSGPAGES_LARGE]; /* iovec for forwarding */ + ptl_kiov_t krx_kiov[KQSW_NRXMSGPAGES_LARGE]; /* buffer frags */ } kqswnal_rx_t; typedef struct diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c index 478c25f..157dc70 100644 --- a/lnet/klnds/qswlnd/qswlnd_cb.c +++ b/lnet/klnds/qswlnd/qswlnd_cb.c @@ -775,7 +775,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, int offset, int nob) { kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; - char *buffer = (char *)page_address(krx->krx_pages[0]); + char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page); kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); int rc; #if MULTIRAIL_EKC @@ -1008,7 +1008,7 @@ kqswnal_sendmsg (nal_cb_t *nal, } memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum)); #endif - + if (kqswnal_data.kqn_optimized_gets && type == PTL_MSG_GET && /* doing a GET */ nid == targetnid) { /* not forwarding */ @@ -1167,7 +1167,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { int rc; kqswnal_tx_t *ktx; - struct iovec *iov = fwd->kprfd_iov; + ptl_kiov_t *kiov = fwd->kprfd_kiov; int niov = fwd->kprfd_niov; int nob = fwd->kprfd_nob; ptl_nid_t nid = fwd->kprfd_gateway_nid; @@ -1177,11 +1177,9 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) LBUG (); #endif /* The router wants this NAL to forward a packet */ - CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n", + CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n", fwd, nid, niov, nob); - LASSERT (niov > 0); - ktx = kqswnal_get_idle_tx (fwd, 0); if (ktx == NULL) /* can't get txd right now */ return; /* fwd will be scheduled when tx desc freed */ @@ -1195,44 +1193,44 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) goto failed; } - if (nob > KQSW_NRXMSGBYTES_LARGE) { - CERROR ("Can't forward [%p] to "LPX64 - ": size %d bigger than max packet size %ld\n", - fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE); - rc = -EMSGSIZE; - goto failed; - } + /* copy hdr into pre-mapped buffer */ + memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t)); + ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; - ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ? + ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; ktx->ktx_nid = nid; ktx->ktx_state = KTX_FORWARDING; ktx->ktx_args[0] = fwd; + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) && - nob <= KQSW_TX_BUFFER_SIZE) + if (nob <= KQSW_TX_MAXCONTIG) { - /* send from ktx's pre-mapped contiguous buffer? */ - lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob); + /* send payload from ktx's pre-mapped contiguous buffer */ #if MULTIRAIL_EKC ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, - 0, nob); + 0, KQSW_HDR_SIZE + nob); #else ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; - ktx->ktx_frags[0].Len = nob; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob; #endif - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; + if (nob > 0) + lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE, + niov, kiov, 0, nob); } else { - /* zero copy */ - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; - rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov); + /* zero copy payload */ +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE; +#endif + rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov); if (rc != 0) goto failed; - - ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base; } rc = kqswnal_launch (ktx); @@ -1257,7 +1255,7 @@ kqswnal_fwd_callback (void *arg, int error) if (error != 0) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n", NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error); @@ -1371,8 +1369,9 @@ kqswnal_requeue_rx (kqswnal_rx_t *krx) void kqswnal_rx (kqswnal_rx_t *krx) { - ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page); ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid); + int payload_nob; int nob; int niov; @@ -1398,16 +1397,26 @@ kqswnal_rx (kqswnal_rx_t *krx) return; } - /* NB forwarding may destroy iov; rebuild every time */ - for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++) - { - LASSERT (niov < krx->krx_npages); - krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]); - krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob); + nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE; + niov = 0; + if (nob > 0) { + krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE; + krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob); + niov = 1; + nob -= PAGE_SIZE - KQSW_HDR_SIZE; + + while (nob > 0) { + LASSERT (niov < krx->krx_npages); + + krx->krx_kiov[niov].kiov_offset = 0; + krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob); + niov++; + nob -= PAGE_SIZE; + } } - kpr_fwd_init (&krx->krx_fwd, dest_nid, - krx->krx_nob, niov, krx->krx_iov, + kpr_fwd_init (&krx->krx_fwd, dest_nid, + hdr, payload_nob, niov, krx->krx_kiov, kqswnal_fwd_callback, krx); kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd); @@ -1471,7 +1480,7 @@ kqswnal_rxhandler(EP_RXD *rxd) void kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64 ", dpid %d, spid %d, type %d\n", @@ -1526,6 +1535,7 @@ kqswnal_recvmsg (nal_cb_t *nal, size_t rlen) { kqswnal_rx_t *krx = (kqswnal_rx_t *)private; + char *buffer = page_address(krx->krx_kiov[0].kiov_page); int page; char *page_ptr; int page_nob; @@ -1535,8 +1545,7 @@ kqswnal_recvmsg (nal_cb_t *nal, #if KQSW_CHECKSUM kqsw_csum_t senders_csum; kqsw_csum_t payload_csum = 0; - kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]), - sizeof(ptl_hdr_t)); + kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t)); size_t csum_len = mlen; int csum_frags = 0; int csum_nob = 0; @@ -1545,8 +1554,7 @@ kqswnal_recvmsg (nal_cb_t *nal, atomic_inc (&csum_counter); - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); if (senders_csum != hdr_csum) kqswnal_csum_error (krx, 1); #endif @@ -1567,8 +1575,7 @@ kqswnal_recvmsg (nal_cb_t *nal, if (mlen != 0) { page = 0; - page_ptr = ((char *) page_address(krx->krx_pages[0])) + - KQSW_HDR_SIZE; + page_ptr = buffer + KQSW_HDR_SIZE; page_nob = PAGE_SIZE - KQSW_HDR_SIZE; LASSERT (niov > 0); @@ -1621,7 +1628,7 @@ kqswnal_recvmsg (nal_cb_t *nal, { page++; LASSERT (page < krx->krx_npages); - page_ptr = page_address(krx->krx_pages[page]); + page_ptr = page_address(krx->krx_kiov[page].kiov_page); page_nob = PAGE_SIZE; } @@ -1649,8 +1656,8 @@ kqswnal_recvmsg (nal_cb_t *nal, } #if KQSW_CHECKSUM - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), + sizeof(kqsw_csum_t)); if (csum_len != rlen) CERROR("Unable to checksum data in user's buffer\n"); diff --git a/lnet/klnds/socklnd/socklnd.c b/lnet/klnds/socklnd/socklnd.c index c47dcb4..2c44b43 100644 --- a/lnet/klnds/socklnd/socklnd.c +++ b/lnet/klnds/socklnd/socklnd.c @@ -1388,6 +1388,7 @@ ksocknal_cmd(struct portals_cfg *pcfg, void * private) void ksocknal_free_fmbs (ksock_fmb_pool_t *p) { + int npages = p->fmp_buff_pages; ksock_fmb_t *fmb; int i; @@ -1399,12 +1400,12 @@ ksocknal_free_fmbs (ksock_fmb_pool_t *p) fmb = list_entry(p->fmp_idle_fmbs.next, ksock_fmb_t, fmb_list); - for (i = 0; i < fmb->fmb_npages; i++) - if (fmb->fmb_pages[i] != NULL) - __free_page(fmb->fmb_pages[i]); - + for (i = 0; i < npages; i++) + if (fmb->fmb_kiov[i].kiov_page != NULL) + __free_page(fmb->fmb_kiov[i].kiov_page); + list_del(&fmb->fmb_list); - PORTAL_FREE(fmb, sizeof(*fmb)); + PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages])); } } @@ -1603,10 +1604,12 @@ ksocknal_module_init (void) spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock); INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs); INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns); + ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES; spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock); INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs); INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns); + ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES; spin_lock_init (&ksocknal_data.ksnd_reaper_lock); INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns); @@ -1690,34 +1693,36 @@ ksocknal_module_init (void) for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS + SOCKNAL_LARGE_FWD_NMSGS); i++) { - ksock_fmb_t *fmb; + ksock_fmb_t *fmb; + ksock_fmb_pool_t *pool; + + + if (i < SOCKNAL_SMALL_FWD_NMSGS) + pool = &ksocknal_data.ksnd_small_fmp; + else + pool = &ksocknal_data.ksnd_large_fmp; - PORTAL_ALLOC(fmb, sizeof(*fmb)); + PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t, + fmb_kiov[pool->fmp_buff_pages])); if (fmb == NULL) { ksocknal_module_fini(); return (-ENOMEM); } - if (i < SOCKNAL_SMALL_FWD_NMSGS) { - fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES; - fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp; - } else { - fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES; - fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp; - } - - for (j = 0; j < fmb->fmb_npages; j++) { - fmb->fmb_pages[j] = alloc_page(GFP_KERNEL); + fmb->fmb_pool = pool; + + for (j = 0; j < pool->fmp_buff_pages; j++) { + fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL); - if (fmb->fmb_pages[j] == NULL) { + if (fmb->fmb_kiov[j].kiov_page == NULL) { ksocknal_module_fini (); return (-ENOMEM); } - LASSERT(page_address(fmb->fmb_pages[j]) != NULL); + LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL); } - list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs); + list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs); } } diff --git a/lnet/klnds/socklnd/socklnd.h b/lnet/klnds/socklnd/socklnd.h index 0f0b9bd..2767c41 100644 --- a/lnet/klnds/socklnd/socklnd.h +++ b/lnet/klnds/socklnd/socklnd.h @@ -88,7 +88,7 @@ #define SOCKNAL_SMALL_FWD_PAGES 1 /* # pages in a small message fwd buffer */ -#define SOCKNAL_LARGE_FWD_PAGES (PAGE_ALIGN (sizeof (ptl_hdr_t) + PTL_MTU) >> PAGE_SHIFT) +#define SOCKNAL_LARGE_FWD_PAGES (PAGE_ALIGN(PTL_MTU) >> PAGE_SHIFT) /* # pages in a large message fwd buffer */ #define SOCKNAL_RESCHED 100 /* # scheduler loops before reschedule */ @@ -115,6 +115,7 @@ typedef struct /* pool of forwarding buffers */ struct list_head fmp_idle_fmbs; /* free buffers */ struct list_head fmp_blocked_conns; /* connections waiting for a buffer */ int fmp_nactive_fmbs; /* # buffers in use */ + int fmp_buff_pages; /* # pages per buffer */ } ksock_fmb_pool_t; @@ -193,18 +194,13 @@ typedef struct { #define SOCKNAL_INIT_ALL 3 /* A packet just assembled for transmission is represented by 1 or more - * struct iovec fragments and 0 or more ptl_kiov_t fragments. Forwarded - * messages, or messages from an MD with PTL_MD_KIOV _not_ set have 0 - * ptl_kiov_t fragments. Messages from an MD with PTL_MD_KIOV set, have 1 - * struct iovec fragment (the header) and up to PTL_MD_MAX_IOV ptl_kiov_t - * fragments. + * struct iovec fragments (the first frag contains the portals header), + * followed by 0 or more ptl_kiov_t fragments. * * On the receive side, initially 1 struct iovec fragment is posted for - * receive (the header). Once the header has been received, if the message - * requires forwarding or will be received into mapped memory, up to - * PTL_MD_MAX_IOV struct iovec fragments describe the target memory. - * Otherwise up to PTL_MD_MAX_IOV ptl_kiov_t fragments are used. - */ + * receive (the header). Once the header has been received, the payload is + * received into either struct iovec or ptl_kiov_t fragments, depending on + * what the header matched or whether the message needs forwarding. */ struct ksock_conn; /* forward ref */ struct ksock_peer; /* forward ref */ @@ -227,6 +223,12 @@ typedef struct /* transmit packet */ #endif } ksock_tx_t; +typedef struct /* forwarded packet */ +{ + ksock_tx_t ftx_tx; /* send info */ + struct iovec ftx_iov; /* hdr iovec */ +} ksock_ftx_t; + #define KSOCK_ZCCD_2_TX(ptr) list_entry (ptr, ksock_tx_t, tx_zccd) /* network zero copy callback descriptor embedded in ksock_tx_t */ @@ -254,15 +256,14 @@ typedef struct /* Kernel portals Socket Forward { /* (socknal->router) */ struct list_head fmb_list; /* queue idle */ kpr_fwd_desc_t fmb_fwd; /* router's descriptor */ - int fmb_npages; /* # pages allocated */ ksock_fmb_pool_t *fmb_pool; /* owning pool */ struct ksock_peer *fmb_peer; /* peer received from */ - struct page *fmb_pages[SOCKNAL_LARGE_FWD_PAGES]; - struct iovec fmb_iov[SOCKNAL_LARGE_FWD_PAGES]; + ptl_hdr_t fmb_hdr; /* message header */ + ptl_kiov_t fmb_kiov[0]; /* payload frags */ } ksock_fmb_t; /* space for the rx frag descriptors; we either read a single contiguous - * header, or PTL_MD_MAX_IOV frags of payload of either type. */ + * header, or up to PTL_MD_MAX_IOV frags of payload of either type. */ typedef union { struct iovec iov[PTL_MD_MAX_IOV]; ptl_kiov_t kiov[PTL_MD_MAX_IOV]; diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index 0678d41..72bd0b7 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -123,7 +123,7 @@ ksocknal_free_ltx (ksock_ltx_t *ltx) PORTAL_FREE(ltx, ltx->ltx_desc_size); } -#if SOCKNAL_ZC +#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) struct page * ksocknal_kvaddr_to_page (unsigned long vaddr) { @@ -159,7 +159,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) int more = (tx->tx_niov > 1) || (tx->tx_nkiov > 0) || (!list_empty (&conn->ksnc_tx_queue)); -#if SOCKNAL_ZC +#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) int offset = vaddr & (PAGE_SIZE - 1); int zcsize = MIN (fragsize, PAGE_SIZE - offset); struct page *page; @@ -171,7 +171,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) LASSERT (fragsize <= tx->tx_resid); LASSERT (tx->tx_niov > 0); -#if SOCKNAL_ZC +#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) if (zcsize >= ksocknal_data.ksnd_zc_min_frag && (sock->sk->route_caps & NETIF_F_SG) && (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) && @@ -771,7 +771,8 @@ ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) /* Find the conn with the shortest tx queue */ list_for_each (tmp, &peer->ksnp_conns) { ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list); - int nob = atomic_read(&c->ksnc_tx_nob); + int nob = atomic_read(&c->ksnc_tx_nob) + + c->ksnc_sock->sk->sk_wmem_queued; LASSERT (!c->ksnc_closing); @@ -1132,7 +1133,7 @@ void ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { ptl_nid_t nid = fwd->kprfd_gateway_nid; - ksock_tx_t *tx = (ksock_tx_t *)&fwd->kprfd_scratch; + ksock_ftx_t *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch; int rc; CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd, @@ -1142,14 +1143,18 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) if (nid == ksocknal_lib.ni.nid) nid = fwd->kprfd_target_nid; - tx->tx_isfwd = 1; /* This is a forwarding packet */ - tx->tx_nob = fwd->kprfd_nob; - tx->tx_niov = fwd->kprfd_niov; - tx->tx_iov = fwd->kprfd_iov; - tx->tx_nkiov = 0; - tx->tx_kiov = NULL; + /* setup iov for hdr */ + ftx->ftx_iov.iov_base = fwd->kprfd_hdr; + ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t); + + ftx->ftx_tx.tx_isfwd = 1; /* This is a forwarding packet */ + ftx->ftx_tx.tx_nob = sizeof(ptl_hdr_t) + fwd->kprfd_nob; + ftx->ftx_tx.tx_niov = 1; + ftx->ftx_tx.tx_iov = &ftx->ftx_iov; + ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov; + ftx->ftx_tx.tx_kiov = fwd->kprfd_kiov; - rc = ksocknal_launch_packet (tx, nid); + rc = ksocknal_launch_packet (&ftx->ftx_tx, nid); if (rc != 0) kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc); } @@ -1177,7 +1182,7 @@ ksocknal_fmb_callback (void *arg, int error) { ksock_fmb_t *fmb = (ksock_fmb_t *)arg; ksock_fmb_pool_t *fmp = fmb->fmb_pool; - ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page); ksock_conn_t *conn = NULL; ksock_sched_t *sched; unsigned long flags; @@ -1235,7 +1240,6 @@ ksock_fmb_t * ksocknal_get_idle_fmb (ksock_conn_t *conn) { int payload_nob = conn->ksnc_rx_nob_left; - int packet_nob = sizeof (ptl_hdr_t) + payload_nob; unsigned long flags; ksock_fmb_pool_t *pool; ksock_fmb_t *fmb; @@ -1243,7 +1247,7 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn) LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB); LASSERT (kpr_routing(&ksocknal_data.ksnd_router)); - if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE) + if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE) pool = &ksocknal_data.ksnd_small_fmp; else pool = &ksocknal_data.ksnd_large_fmp; @@ -1274,98 +1278,64 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn) int ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb) { - int payload_nob = conn->ksnc_rx_nob_left; - int packet_nob = sizeof (ptl_hdr_t) + payload_nob; + int payload_nob = conn->ksnc_rx_nob_left; ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid); - int niov; /* at least the header */ - int nob; + int niov = 0; + int nob = payload_nob; LASSERT (conn->ksnc_rx_scheduled); LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB); LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left); LASSERT (payload_nob >= 0); - LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE); + LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE); LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE); - - /* Got a forwarding buffer; copy the header we just read into the - * forwarding buffer. If there's payload, start reading reading it - * into the buffer, otherwise the forwarding buffer can be kicked - * off immediately. - * - * NB fmb->fmb_iov spans the WHOLE packet. - * conn->ksnc_rx_iov spans just the payload. - */ - fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]); - - /* copy header */ - memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t)); + LASSERT (fmb->fmb_kiov[0].kiov_offset == 0); /* Take a ref on the conn's peer to prevent module unload before - * forwarding completes. NB we ref peer and not conn since because - * all refs on conn after it has been closed must remove themselves - * in finite time */ + * forwarding completes. */ fmb->fmb_peer = conn->ksnc_peer; atomic_inc (&conn->ksnc_peer->ksnp_refcount); - if (payload_nob == 0) { /* got complete packet already */ - CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n", - conn, NTOH__u64 (conn->ksnc_hdr.src_nid), - dest_nid, packet_nob); + /* Copy the header we just read into the forwarding buffer. If + * there's payload, start reading reading it into the buffer, + * otherwise the forwarding buffer can be kicked off + * immediately. */ + fmb->fmb_hdr = conn->ksnc_hdr; - fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t); + while (nob > 0) { + LASSERT (niov < fmb->fmb_pool->fmp_buff_pages); + LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0); + fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob); + nob -= PAGE_SIZE; + niov++; + } + + kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr, + payload_nob, niov, fmb->fmb_kiov, + ksocknal_fmb_callback, fmb); - kpr_fwd_init (&fmb->fmb_fwd, dest_nid, - packet_nob, 1, fmb->fmb_iov, - ksocknal_fmb_callback, fmb); + if (payload_nob == 0) { /* got complete packet already */ + CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n", + conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid); - /* forward it now */ kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd); ksocknal_new_packet (conn, 0); /* on to next packet */ return (1); } - niov = 1; - if (packet_nob <= PAGE_SIZE) { /* whole packet fits in first page */ - fmb->fmb_iov[0].iov_len = packet_nob; - } else { - fmb->fmb_iov[0].iov_len = PAGE_SIZE; - nob = packet_nob - PAGE_SIZE; - - do { - LASSERT (niov < fmb->fmb_npages); - fmb->fmb_iov[niov].iov_base = - page_address (fmb->fmb_pages[niov]); - fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob); - nob -= PAGE_SIZE; - niov++; - } while (nob > 0); - } - - kpr_fwd_init (&fmb->fmb_fwd, dest_nid, - packet_nob, niov, fmb->fmb_iov, - ksocknal_fmb_callback, fmb); - conn->ksnc_cookie = fmb; /* stash fmb for later */ conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */ - /* payload is desc's iov-ed buffer, but skipping the hdr */ - LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) / - sizeof (struct iovec)); - - conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; - conn->ksnc_rx_iov[0].iov_base = - (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) + - sizeof (ptl_hdr_t)); - conn->ksnc_rx_iov[0].iov_len = - fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t); - - if (niov > 1) - memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1], - (niov - 1) * sizeof (struct iovec)); - - conn->ksnc_rx_niov = niov; + /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed + * buffer */ + LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t)); + conn->ksnc_rx_niov = 0; + conn->ksnc_rx_nkiov = niov; + conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; + memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t)); + CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob); return (0); diff --git a/lnet/router/router.c b/lnet/router/router.c index e29f628..d0dbf0a 100644 --- a/lnet/router/router.c +++ b/lnet/router/router.c @@ -456,14 +456,13 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd) CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd, target_nid, src_ne->kpne_interface.kprni_nalid); - LASSERT (nob >= sizeof (ptl_hdr_t)); /* at least got a packet header */ - LASSERT (nob == lib_iov_nob (fwd->kprfd_niov, fwd->kprfd_iov)); + LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov)); atomic_inc (&kpr_queue_depth); atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */ kpr_fwd_packets++; /* (loose) stats accounting */ - kpr_fwd_bytes += nob; + kpr_fwd_bytes += nob + sizeof(ptl_hdr_t); if (src_ne->kpne_shutdown) /* caller is shutting down */ goto out; diff --git a/lustre/portals/include/linux/kp30.h b/lustre/portals/include/linux/kp30.h index 75e83b4..53828de 100644 --- a/lustre/portals/include/linux/kp30.h +++ b/lustre/portals/include/linux/kp30.h @@ -362,13 +362,14 @@ typedef struct { struct list_head kprfd_list; /* stash in queues (routing target can use) */ ptl_nid_t kprfd_target_nid; /* final destination NID */ ptl_nid_t kprfd_gateway_nid; /* gateway NID */ - int kprfd_nob; /* # message bytes (including header) */ - int kprfd_niov; /* # message frags (including header) */ - struct iovec *kprfd_iov; /* message fragments */ - void *kprfd_router_arg; // originating NAL's router arg + ptl_hdr_t *kprfd_hdr; /* header in wire byte order */ + int kprfd_nob; /* # payload bytes */ + int kprfd_niov; /* # payload frags */ + ptl_kiov_t *kprfd_kiov; /* payload fragments */ + void *kprfd_router_arg; /* originating NAL's router arg */ kpr_fwd_callback_t kprfd_callback; /* completion callback */ void *kprfd_callback_arg; /* completion callback arg */ - kprfd_scratch_t kprfd_scratch; // scratchpad for routing targets + kprfd_scratch_t kprfd_scratch; /* scratchpad for routing targets */ } kpr_fwd_desc_t; typedef void (*kpr_fwd_t)(void *arg, kpr_fwd_desc_t *fwd); @@ -471,15 +472,16 @@ kpr_lookup (kpr_router_t *router, ptl_nid_t nid, int nob, ptl_nid_t *gateway_nid } static inline void -kpr_fwd_init (kpr_fwd_desc_t *fwd, ptl_nid_t nid, - int nob, int niov, struct iovec *iov, +kpr_fwd_init (kpr_fwd_desc_t *fwd, ptl_nid_t nid, ptl_hdr_t *hdr, + int nob, int niov, ptl_kiov_t *kiov, kpr_fwd_callback_t callback, void *callback_arg) { fwd->kprfd_target_nid = nid; fwd->kprfd_gateway_nid = nid; + fwd->kprfd_hdr = hdr; fwd->kprfd_nob = nob; fwd->kprfd_niov = niov; - fwd->kprfd_iov = iov; + fwd->kprfd_kiov = kiov; fwd->kprfd_callback = callback; fwd->kprfd_callback_arg = callback_arg; } diff --git a/lustre/portals/knals/qswnal/qswnal.c b/lustre/portals/knals/qswnal/qswnal.c index 90c9a95..3b3b5d4 100644 --- a/lustre/portals/knals/qswnal/qswnal.c +++ b/lustre/portals/knals/qswnal/qswnal.c @@ -348,10 +348,10 @@ kqswnal_finalise (void) for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) { kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i]; - /* If krx_pages[0] got allocated, it got mapped. + /* If krx_kiov[0].kiov_page got allocated, it got mapped. * NB subsequent pages get merged */ - if (krx->krx_pages[0] != NULL) + if (krx->krx_kiov[0].kiov_page != NULL) ep_dvma_unload(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh, &krx->krx_elanbuffer); @@ -416,8 +416,8 @@ kqswnal_finalise (void) kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i]; for (j = 0; j < krx->krx_npages; j++) - if (krx->krx_pages[j] != NULL) - __free_page (krx->krx_pages[j]); + if (krx->krx_kiov[j].kiov_page != NULL) + __free_page (krx->krx_kiov[j].kiov_page); } PORTAL_FREE(kqswnal_data.kqn_rxds, @@ -709,18 +709,19 @@ kqswnal_initialise (void) LASSERT (krx->krx_npages > 0); for (j = 0; j < krx->krx_npages; j++) { - krx->krx_pages[j] = alloc_page(GFP_KERNEL); - if (krx->krx_pages[j] == NULL) - { + struct page *page = alloc_page(GFP_KERNEL); + + if (page == NULL) { kqswnal_finalise (); return (-ENOMEM); } - LASSERT(page_address(krx->krx_pages[j]) != NULL); + krx->krx_kiov[j].kiov_page = page; + LASSERT(page_address(page) != NULL); #if MULTIRAIL_EKC ep_dvma_load(kqswnal_data.kqn_ep, NULL, - page_address(krx->krx_pages[j]), + page_address(page), PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh, elan_page_idx, &all_rails, &elanbuffer); @@ -736,7 +737,7 @@ kqswnal_initialise (void) #else elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState, kqswnal_data.kqn_eprxdmahandle, - page_address(krx->krx_pages[j]), + page_address(page), PAGE_SIZE, elan_page_idx, &elanbuffer); if (j == 0) diff --git a/lustre/portals/knals/qswnal/qswnal.h b/lustre/portals/knals/qswnal/qswnal.h index b1b9a45..5ebf30a 100644 --- a/lustre/portals/knals/qswnal/qswnal.h +++ b/lustre/portals/knals/qswnal/qswnal.h @@ -153,8 +153,7 @@ typedef struct int krx_rpc_reply_sent; /* rpc reply sent */ atomic_t krx_refcount; /* how to tell when rpc is done */ kpr_fwd_desc_t krx_fwd; /* embedded forwarding descriptor */ - struct page *krx_pages[KQSW_NRXMSGPAGES_LARGE]; /* pages allocated */ - struct iovec krx_iov[KQSW_NRXMSGPAGES_LARGE]; /* iovec for forwarding */ + ptl_kiov_t krx_kiov[KQSW_NRXMSGPAGES_LARGE]; /* buffer frags */ } kqswnal_rx_t; typedef struct diff --git a/lustre/portals/knals/qswnal/qswnal_cb.c b/lustre/portals/knals/qswnal/qswnal_cb.c index 478c25f..157dc70 100644 --- a/lustre/portals/knals/qswnal/qswnal_cb.c +++ b/lustre/portals/knals/qswnal/qswnal_cb.c @@ -775,7 +775,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, int offset, int nob) { kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; - char *buffer = (char *)page_address(krx->krx_pages[0]); + char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page); kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); int rc; #if MULTIRAIL_EKC @@ -1008,7 +1008,7 @@ kqswnal_sendmsg (nal_cb_t *nal, } memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum)); #endif - + if (kqswnal_data.kqn_optimized_gets && type == PTL_MSG_GET && /* doing a GET */ nid == targetnid) { /* not forwarding */ @@ -1167,7 +1167,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { int rc; kqswnal_tx_t *ktx; - struct iovec *iov = fwd->kprfd_iov; + ptl_kiov_t *kiov = fwd->kprfd_kiov; int niov = fwd->kprfd_niov; int nob = fwd->kprfd_nob; ptl_nid_t nid = fwd->kprfd_gateway_nid; @@ -1177,11 +1177,9 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) LBUG (); #endif /* The router wants this NAL to forward a packet */ - CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n", + CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n", fwd, nid, niov, nob); - LASSERT (niov > 0); - ktx = kqswnal_get_idle_tx (fwd, 0); if (ktx == NULL) /* can't get txd right now */ return; /* fwd will be scheduled when tx desc freed */ @@ -1195,44 +1193,44 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) goto failed; } - if (nob > KQSW_NRXMSGBYTES_LARGE) { - CERROR ("Can't forward [%p] to "LPX64 - ": size %d bigger than max packet size %ld\n", - fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE); - rc = -EMSGSIZE; - goto failed; - } + /* copy hdr into pre-mapped buffer */ + memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t)); + ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; - ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ? + ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; ktx->ktx_nid = nid; ktx->ktx_state = KTX_FORWARDING; ktx->ktx_args[0] = fwd; + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) && - nob <= KQSW_TX_BUFFER_SIZE) + if (nob <= KQSW_TX_MAXCONTIG) { - /* send from ktx's pre-mapped contiguous buffer? */ - lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob); + /* send payload from ktx's pre-mapped contiguous buffer */ #if MULTIRAIL_EKC ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, - 0, nob); + 0, KQSW_HDR_SIZE + nob); #else ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; - ktx->ktx_frags[0].Len = nob; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob; #endif - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; + if (nob > 0) + lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE, + niov, kiov, 0, nob); } else { - /* zero copy */ - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; - rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov); + /* zero copy payload */ +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE; +#endif + rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov); if (rc != 0) goto failed; - - ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base; } rc = kqswnal_launch (ktx); @@ -1257,7 +1255,7 @@ kqswnal_fwd_callback (void *arg, int error) if (error != 0) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n", NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error); @@ -1371,8 +1369,9 @@ kqswnal_requeue_rx (kqswnal_rx_t *krx) void kqswnal_rx (kqswnal_rx_t *krx) { - ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page); ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid); + int payload_nob; int nob; int niov; @@ -1398,16 +1397,26 @@ kqswnal_rx (kqswnal_rx_t *krx) return; } - /* NB forwarding may destroy iov; rebuild every time */ - for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++) - { - LASSERT (niov < krx->krx_npages); - krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]); - krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob); + nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE; + niov = 0; + if (nob > 0) { + krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE; + krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob); + niov = 1; + nob -= PAGE_SIZE - KQSW_HDR_SIZE; + + while (nob > 0) { + LASSERT (niov < krx->krx_npages); + + krx->krx_kiov[niov].kiov_offset = 0; + krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob); + niov++; + nob -= PAGE_SIZE; + } } - kpr_fwd_init (&krx->krx_fwd, dest_nid, - krx->krx_nob, niov, krx->krx_iov, + kpr_fwd_init (&krx->krx_fwd, dest_nid, + hdr, payload_nob, niov, krx->krx_kiov, kqswnal_fwd_callback, krx); kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd); @@ -1471,7 +1480,7 @@ kqswnal_rxhandler(EP_RXD *rxd) void kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64 ", dpid %d, spid %d, type %d\n", @@ -1526,6 +1535,7 @@ kqswnal_recvmsg (nal_cb_t *nal, size_t rlen) { kqswnal_rx_t *krx = (kqswnal_rx_t *)private; + char *buffer = page_address(krx->krx_kiov[0].kiov_page); int page; char *page_ptr; int page_nob; @@ -1535,8 +1545,7 @@ kqswnal_recvmsg (nal_cb_t *nal, #if KQSW_CHECKSUM kqsw_csum_t senders_csum; kqsw_csum_t payload_csum = 0; - kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]), - sizeof(ptl_hdr_t)); + kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t)); size_t csum_len = mlen; int csum_frags = 0; int csum_nob = 0; @@ -1545,8 +1554,7 @@ kqswnal_recvmsg (nal_cb_t *nal, atomic_inc (&csum_counter); - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); if (senders_csum != hdr_csum) kqswnal_csum_error (krx, 1); #endif @@ -1567,8 +1575,7 @@ kqswnal_recvmsg (nal_cb_t *nal, if (mlen != 0) { page = 0; - page_ptr = ((char *) page_address(krx->krx_pages[0])) + - KQSW_HDR_SIZE; + page_ptr = buffer + KQSW_HDR_SIZE; page_nob = PAGE_SIZE - KQSW_HDR_SIZE; LASSERT (niov > 0); @@ -1621,7 +1628,7 @@ kqswnal_recvmsg (nal_cb_t *nal, { page++; LASSERT (page < krx->krx_npages); - page_ptr = page_address(krx->krx_pages[page]); + page_ptr = page_address(krx->krx_kiov[page].kiov_page); page_nob = PAGE_SIZE; } @@ -1649,8 +1656,8 @@ kqswnal_recvmsg (nal_cb_t *nal, } #if KQSW_CHECKSUM - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), + sizeof(kqsw_csum_t)); if (csum_len != rlen) CERROR("Unable to checksum data in user's buffer\n"); diff --git a/lustre/portals/knals/socknal/socknal.c b/lustre/portals/knals/socknal/socknal.c index c47dcb4..2c44b43 100644 --- a/lustre/portals/knals/socknal/socknal.c +++ b/lustre/portals/knals/socknal/socknal.c @@ -1388,6 +1388,7 @@ ksocknal_cmd(struct portals_cfg *pcfg, void * private) void ksocknal_free_fmbs (ksock_fmb_pool_t *p) { + int npages = p->fmp_buff_pages; ksock_fmb_t *fmb; int i; @@ -1399,12 +1400,12 @@ ksocknal_free_fmbs (ksock_fmb_pool_t *p) fmb = list_entry(p->fmp_idle_fmbs.next, ksock_fmb_t, fmb_list); - for (i = 0; i < fmb->fmb_npages; i++) - if (fmb->fmb_pages[i] != NULL) - __free_page(fmb->fmb_pages[i]); - + for (i = 0; i < npages; i++) + if (fmb->fmb_kiov[i].kiov_page != NULL) + __free_page(fmb->fmb_kiov[i].kiov_page); + list_del(&fmb->fmb_list); - PORTAL_FREE(fmb, sizeof(*fmb)); + PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages])); } } @@ -1603,10 +1604,12 @@ ksocknal_module_init (void) spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock); INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs); INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns); + ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES; spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock); INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs); INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns); + ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES; spin_lock_init (&ksocknal_data.ksnd_reaper_lock); INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns); @@ -1690,34 +1693,36 @@ ksocknal_module_init (void) for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS + SOCKNAL_LARGE_FWD_NMSGS); i++) { - ksock_fmb_t *fmb; + ksock_fmb_t *fmb; + ksock_fmb_pool_t *pool; + + + if (i < SOCKNAL_SMALL_FWD_NMSGS) + pool = &ksocknal_data.ksnd_small_fmp; + else + pool = &ksocknal_data.ksnd_large_fmp; - PORTAL_ALLOC(fmb, sizeof(*fmb)); + PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t, + fmb_kiov[pool->fmp_buff_pages])); if (fmb == NULL) { ksocknal_module_fini(); return (-ENOMEM); } - if (i < SOCKNAL_SMALL_FWD_NMSGS) { - fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES; - fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp; - } else { - fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES; - fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp; - } - - for (j = 0; j < fmb->fmb_npages; j++) { - fmb->fmb_pages[j] = alloc_page(GFP_KERNEL); + fmb->fmb_pool = pool; + + for (j = 0; j < pool->fmp_buff_pages; j++) { + fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL); - if (fmb->fmb_pages[j] == NULL) { + if (fmb->fmb_kiov[j].kiov_page == NULL) { ksocknal_module_fini (); return (-ENOMEM); } - LASSERT(page_address(fmb->fmb_pages[j]) != NULL); + LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL); } - list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs); + list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs); } } diff --git a/lustre/portals/knals/socknal/socknal.h b/lustre/portals/knals/socknal/socknal.h index 0f0b9bd..2767c41 100644 --- a/lustre/portals/knals/socknal/socknal.h +++ b/lustre/portals/knals/socknal/socknal.h @@ -88,7 +88,7 @@ #define SOCKNAL_SMALL_FWD_PAGES 1 /* # pages in a small message fwd buffer */ -#define SOCKNAL_LARGE_FWD_PAGES (PAGE_ALIGN (sizeof (ptl_hdr_t) + PTL_MTU) >> PAGE_SHIFT) +#define SOCKNAL_LARGE_FWD_PAGES (PAGE_ALIGN(PTL_MTU) >> PAGE_SHIFT) /* # pages in a large message fwd buffer */ #define SOCKNAL_RESCHED 100 /* # scheduler loops before reschedule */ @@ -115,6 +115,7 @@ typedef struct /* pool of forwarding buffers */ struct list_head fmp_idle_fmbs; /* free buffers */ struct list_head fmp_blocked_conns; /* connections waiting for a buffer */ int fmp_nactive_fmbs; /* # buffers in use */ + int fmp_buff_pages; /* # pages per buffer */ } ksock_fmb_pool_t; @@ -193,18 +194,13 @@ typedef struct { #define SOCKNAL_INIT_ALL 3 /* A packet just assembled for transmission is represented by 1 or more - * struct iovec fragments and 0 or more ptl_kiov_t fragments. Forwarded - * messages, or messages from an MD with PTL_MD_KIOV _not_ set have 0 - * ptl_kiov_t fragments. Messages from an MD with PTL_MD_KIOV set, have 1 - * struct iovec fragment (the header) and up to PTL_MD_MAX_IOV ptl_kiov_t - * fragments. + * struct iovec fragments (the first frag contains the portals header), + * followed by 0 or more ptl_kiov_t fragments. * * On the receive side, initially 1 struct iovec fragment is posted for - * receive (the header). Once the header has been received, if the message - * requires forwarding or will be received into mapped memory, up to - * PTL_MD_MAX_IOV struct iovec fragments describe the target memory. - * Otherwise up to PTL_MD_MAX_IOV ptl_kiov_t fragments are used. - */ + * receive (the header). Once the header has been received, the payload is + * received into either struct iovec or ptl_kiov_t fragments, depending on + * what the header matched or whether the message needs forwarding. */ struct ksock_conn; /* forward ref */ struct ksock_peer; /* forward ref */ @@ -227,6 +223,12 @@ typedef struct /* transmit packet */ #endif } ksock_tx_t; +typedef struct /* forwarded packet */ +{ + ksock_tx_t ftx_tx; /* send info */ + struct iovec ftx_iov; /* hdr iovec */ +} ksock_ftx_t; + #define KSOCK_ZCCD_2_TX(ptr) list_entry (ptr, ksock_tx_t, tx_zccd) /* network zero copy callback descriptor embedded in ksock_tx_t */ @@ -254,15 +256,14 @@ typedef struct /* Kernel portals Socket Forward { /* (socknal->router) */ struct list_head fmb_list; /* queue idle */ kpr_fwd_desc_t fmb_fwd; /* router's descriptor */ - int fmb_npages; /* # pages allocated */ ksock_fmb_pool_t *fmb_pool; /* owning pool */ struct ksock_peer *fmb_peer; /* peer received from */ - struct page *fmb_pages[SOCKNAL_LARGE_FWD_PAGES]; - struct iovec fmb_iov[SOCKNAL_LARGE_FWD_PAGES]; + ptl_hdr_t fmb_hdr; /* message header */ + ptl_kiov_t fmb_kiov[0]; /* payload frags */ } ksock_fmb_t; /* space for the rx frag descriptors; we either read a single contiguous - * header, or PTL_MD_MAX_IOV frags of payload of either type. */ + * header, or up to PTL_MD_MAX_IOV frags of payload of either type. */ typedef union { struct iovec iov[PTL_MD_MAX_IOV]; ptl_kiov_t kiov[PTL_MD_MAX_IOV]; diff --git a/lustre/portals/knals/socknal/socknal_cb.c b/lustre/portals/knals/socknal/socknal_cb.c index 0678d41..72bd0b7 100644 --- a/lustre/portals/knals/socknal/socknal_cb.c +++ b/lustre/portals/knals/socknal/socknal_cb.c @@ -123,7 +123,7 @@ ksocknal_free_ltx (ksock_ltx_t *ltx) PORTAL_FREE(ltx, ltx->ltx_desc_size); } -#if SOCKNAL_ZC +#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) struct page * ksocknal_kvaddr_to_page (unsigned long vaddr) { @@ -159,7 +159,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) int more = (tx->tx_niov > 1) || (tx->tx_nkiov > 0) || (!list_empty (&conn->ksnc_tx_queue)); -#if SOCKNAL_ZC +#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) int offset = vaddr & (PAGE_SIZE - 1); int zcsize = MIN (fragsize, PAGE_SIZE - offset); struct page *page; @@ -171,7 +171,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx) LASSERT (fragsize <= tx->tx_resid); LASSERT (tx->tx_niov > 0); -#if SOCKNAL_ZC +#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC) if (zcsize >= ksocknal_data.ksnd_zc_min_frag && (sock->sk->route_caps & NETIF_F_SG) && (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) && @@ -771,7 +771,8 @@ ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) /* Find the conn with the shortest tx queue */ list_for_each (tmp, &peer->ksnp_conns) { ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list); - int nob = atomic_read(&c->ksnc_tx_nob); + int nob = atomic_read(&c->ksnc_tx_nob) + + c->ksnc_sock->sk->sk_wmem_queued; LASSERT (!c->ksnc_closing); @@ -1132,7 +1133,7 @@ void ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { ptl_nid_t nid = fwd->kprfd_gateway_nid; - ksock_tx_t *tx = (ksock_tx_t *)&fwd->kprfd_scratch; + ksock_ftx_t *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch; int rc; CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd, @@ -1142,14 +1143,18 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) if (nid == ksocknal_lib.ni.nid) nid = fwd->kprfd_target_nid; - tx->tx_isfwd = 1; /* This is a forwarding packet */ - tx->tx_nob = fwd->kprfd_nob; - tx->tx_niov = fwd->kprfd_niov; - tx->tx_iov = fwd->kprfd_iov; - tx->tx_nkiov = 0; - tx->tx_kiov = NULL; + /* setup iov for hdr */ + ftx->ftx_iov.iov_base = fwd->kprfd_hdr; + ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t); + + ftx->ftx_tx.tx_isfwd = 1; /* This is a forwarding packet */ + ftx->ftx_tx.tx_nob = sizeof(ptl_hdr_t) + fwd->kprfd_nob; + ftx->ftx_tx.tx_niov = 1; + ftx->ftx_tx.tx_iov = &ftx->ftx_iov; + ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov; + ftx->ftx_tx.tx_kiov = fwd->kprfd_kiov; - rc = ksocknal_launch_packet (tx, nid); + rc = ksocknal_launch_packet (&ftx->ftx_tx, nid); if (rc != 0) kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc); } @@ -1177,7 +1182,7 @@ ksocknal_fmb_callback (void *arg, int error) { ksock_fmb_t *fmb = (ksock_fmb_t *)arg; ksock_fmb_pool_t *fmp = fmb->fmb_pool; - ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page); ksock_conn_t *conn = NULL; ksock_sched_t *sched; unsigned long flags; @@ -1235,7 +1240,6 @@ ksock_fmb_t * ksocknal_get_idle_fmb (ksock_conn_t *conn) { int payload_nob = conn->ksnc_rx_nob_left; - int packet_nob = sizeof (ptl_hdr_t) + payload_nob; unsigned long flags; ksock_fmb_pool_t *pool; ksock_fmb_t *fmb; @@ -1243,7 +1247,7 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn) LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB); LASSERT (kpr_routing(&ksocknal_data.ksnd_router)); - if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE) + if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE) pool = &ksocknal_data.ksnd_small_fmp; else pool = &ksocknal_data.ksnd_large_fmp; @@ -1274,98 +1278,64 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn) int ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb) { - int payload_nob = conn->ksnc_rx_nob_left; - int packet_nob = sizeof (ptl_hdr_t) + payload_nob; + int payload_nob = conn->ksnc_rx_nob_left; ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid); - int niov; /* at least the header */ - int nob; + int niov = 0; + int nob = payload_nob; LASSERT (conn->ksnc_rx_scheduled); LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB); LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left); LASSERT (payload_nob >= 0); - LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE); + LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE); LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE); - - /* Got a forwarding buffer; copy the header we just read into the - * forwarding buffer. If there's payload, start reading reading it - * into the buffer, otherwise the forwarding buffer can be kicked - * off immediately. - * - * NB fmb->fmb_iov spans the WHOLE packet. - * conn->ksnc_rx_iov spans just the payload. - */ - fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]); - - /* copy header */ - memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t)); + LASSERT (fmb->fmb_kiov[0].kiov_offset == 0); /* Take a ref on the conn's peer to prevent module unload before - * forwarding completes. NB we ref peer and not conn since because - * all refs on conn after it has been closed must remove themselves - * in finite time */ + * forwarding completes. */ fmb->fmb_peer = conn->ksnc_peer; atomic_inc (&conn->ksnc_peer->ksnp_refcount); - if (payload_nob == 0) { /* got complete packet already */ - CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n", - conn, NTOH__u64 (conn->ksnc_hdr.src_nid), - dest_nid, packet_nob); + /* Copy the header we just read into the forwarding buffer. If + * there's payload, start reading reading it into the buffer, + * otherwise the forwarding buffer can be kicked off + * immediately. */ + fmb->fmb_hdr = conn->ksnc_hdr; - fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t); + while (nob > 0) { + LASSERT (niov < fmb->fmb_pool->fmp_buff_pages); + LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0); + fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob); + nob -= PAGE_SIZE; + niov++; + } + + kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr, + payload_nob, niov, fmb->fmb_kiov, + ksocknal_fmb_callback, fmb); - kpr_fwd_init (&fmb->fmb_fwd, dest_nid, - packet_nob, 1, fmb->fmb_iov, - ksocknal_fmb_callback, fmb); + if (payload_nob == 0) { /* got complete packet already */ + CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n", + conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid); - /* forward it now */ kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd); ksocknal_new_packet (conn, 0); /* on to next packet */ return (1); } - niov = 1; - if (packet_nob <= PAGE_SIZE) { /* whole packet fits in first page */ - fmb->fmb_iov[0].iov_len = packet_nob; - } else { - fmb->fmb_iov[0].iov_len = PAGE_SIZE; - nob = packet_nob - PAGE_SIZE; - - do { - LASSERT (niov < fmb->fmb_npages); - fmb->fmb_iov[niov].iov_base = - page_address (fmb->fmb_pages[niov]); - fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob); - nob -= PAGE_SIZE; - niov++; - } while (nob > 0); - } - - kpr_fwd_init (&fmb->fmb_fwd, dest_nid, - packet_nob, niov, fmb->fmb_iov, - ksocknal_fmb_callback, fmb); - conn->ksnc_cookie = fmb; /* stash fmb for later */ conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */ - /* payload is desc's iov-ed buffer, but skipping the hdr */ - LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) / - sizeof (struct iovec)); - - conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; - conn->ksnc_rx_iov[0].iov_base = - (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) + - sizeof (ptl_hdr_t)); - conn->ksnc_rx_iov[0].iov_len = - fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t); - - if (niov > 1) - memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1], - (niov - 1) * sizeof (struct iovec)); - - conn->ksnc_rx_niov = niov; + /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed + * buffer */ + LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t)); + conn->ksnc_rx_niov = 0; + conn->ksnc_rx_nkiov = niov; + conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; + memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t)); + CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob); return (0); diff --git a/lustre/portals/router/router.c b/lustre/portals/router/router.c index e29f628..d0dbf0a 100644 --- a/lustre/portals/router/router.c +++ b/lustre/portals/router/router.c @@ -456,14 +456,13 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd) CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd, target_nid, src_ne->kpne_interface.kprni_nalid); - LASSERT (nob >= sizeof (ptl_hdr_t)); /* at least got a packet header */ - LASSERT (nob == lib_iov_nob (fwd->kprfd_niov, fwd->kprfd_iov)); + LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov)); atomic_inc (&kpr_queue_depth); atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */ kpr_fwd_packets++; /* (loose) stats accounting */ - kpr_fwd_bytes += nob; + kpr_fwd_bytes += nob + sizeof(ptl_hdr_t); if (src_ne->kpne_shutdown) /* caller is shutting down */ goto out; -- 1.8.3.1