struct list_head kprfd_list; /* stash in queues (routing target can use) */
ptl_nid_t kprfd_target_nid; /* final destination NID */
ptl_nid_t kprfd_gateway_nid; /* gateway NID */
- int kprfd_nob; /* # message bytes (including header) */
- int kprfd_niov; /* # message frags (including header) */
- struct iovec *kprfd_iov; /* message fragments */
- void *kprfd_router_arg; // originating NAL's router arg
+ ptl_hdr_t *kprfd_hdr; /* header in wire byte order */
+ int kprfd_nob; /* # payload bytes */
+ int kprfd_niov; /* # payload frags */
+ ptl_kiov_t *kprfd_kiov; /* payload fragments */
+ void *kprfd_router_arg; /* originating NAL's router arg */
kpr_fwd_callback_t kprfd_callback; /* completion callback */
void *kprfd_callback_arg; /* completion callback arg */
- kprfd_scratch_t kprfd_scratch; // scratchpad for routing targets
+ kprfd_scratch_t kprfd_scratch; /* scratchpad for routing targets */
} kpr_fwd_desc_t;
typedef void (*kpr_fwd_t)(void *arg, kpr_fwd_desc_t *fwd);
}
static inline void
-kpr_fwd_init (kpr_fwd_desc_t *fwd, ptl_nid_t nid,
- int nob, int niov, struct iovec *iov,
+kpr_fwd_init (kpr_fwd_desc_t *fwd, ptl_nid_t nid, ptl_hdr_t *hdr,
+ int nob, int niov, ptl_kiov_t *kiov,
kpr_fwd_callback_t callback, void *callback_arg)
{
fwd->kprfd_target_nid = nid;
fwd->kprfd_gateway_nid = nid;
+ fwd->kprfd_hdr = hdr;
fwd->kprfd_nob = nob;
fwd->kprfd_niov = niov;
- fwd->kprfd_iov = iov;
+ fwd->kprfd_kiov = kiov;
fwd->kprfd_callback = callback;
fwd->kprfd_callback_arg = callback_arg;
}
for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
- /* If krx_pages[0] got allocated, it got mapped.
+ /* If krx_kiov[0].kiov_page got allocated, it got mapped.
* NB subsequent pages get merged */
- if (krx->krx_pages[0] != NULL)
+ if (krx->krx_kiov[0].kiov_page != NULL)
ep_dvma_unload(kqswnal_data.kqn_ep,
kqswnal_data.kqn_ep_rx_nmh,
&krx->krx_elanbuffer);
kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
for (j = 0; j < krx->krx_npages; j++)
- if (krx->krx_pages[j] != NULL)
- __free_page (krx->krx_pages[j]);
+ if (krx->krx_kiov[j].kiov_page != NULL)
+ __free_page (krx->krx_kiov[j].kiov_page);
}
PORTAL_FREE(kqswnal_data.kqn_rxds,
LASSERT (krx->krx_npages > 0);
for (j = 0; j < krx->krx_npages; j++)
{
- krx->krx_pages[j] = alloc_page(GFP_KERNEL);
- if (krx->krx_pages[j] == NULL)
- {
+ struct page *page = alloc_page(GFP_KERNEL);
+
+ if (page == NULL) {
kqswnal_finalise ();
return (-ENOMEM);
}
- LASSERT(page_address(krx->krx_pages[j]) != NULL);
+ krx->krx_kiov[j].kiov_page = page;
+ LASSERT(page_address(page) != NULL);
#if MULTIRAIL_EKC
ep_dvma_load(kqswnal_data.kqn_ep, NULL,
- page_address(krx->krx_pages[j]),
+ page_address(page),
PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
elan_page_idx, &all_rails, &elanbuffer);
#else
elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
kqswnal_data.kqn_eprxdmahandle,
- page_address(krx->krx_pages[j]),
+ page_address(page),
PAGE_SIZE, elan_page_idx,
&elanbuffer);
if (j == 0)
int krx_rpc_reply_sent; /* rpc reply sent */
atomic_t krx_refcount; /* how to tell when rpc is done */
kpr_fwd_desc_t krx_fwd; /* embedded forwarding descriptor */
- struct page *krx_pages[KQSW_NRXMSGPAGES_LARGE]; /* pages allocated */
- struct iovec krx_iov[KQSW_NRXMSGPAGES_LARGE]; /* iovec for forwarding */
+ ptl_kiov_t krx_kiov[KQSW_NRXMSGPAGES_LARGE]; /* buffer frags */
} kqswnal_rx_t;
typedef struct
int offset, int nob)
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
- char *buffer = (char *)page_address(krx->krx_pages[0]);
+ char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
int rc;
#if MULTIRAIL_EKC
}
memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
#endif
-
+
if (kqswnal_data.kqn_optimized_gets &&
type == PTL_MSG_GET && /* doing a GET */
nid == targetnid) { /* not forwarding */
{
int rc;
kqswnal_tx_t *ktx;
- struct iovec *iov = fwd->kprfd_iov;
+ ptl_kiov_t *kiov = fwd->kprfd_kiov;
int niov = fwd->kprfd_niov;
int nob = fwd->kprfd_nob;
ptl_nid_t nid = fwd->kprfd_gateway_nid;
LBUG ();
#endif
/* The router wants this NAL to forward a packet */
- CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
+ CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
fwd, nid, niov, nob);
- LASSERT (niov > 0);
-
ktx = kqswnal_get_idle_tx (fwd, 0);
if (ktx == NULL) /* can't get txd right now */
return; /* fwd will be scheduled when tx desc freed */
goto failed;
}
- if (nob > KQSW_NRXMSGBYTES_LARGE) {
- CERROR ("Can't forward [%p] to "LPX64
- ": size %d bigger than max packet size %ld\n",
- fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
- rc = -EMSGSIZE;
- goto failed;
- }
+ /* copy hdr into pre-mapped buffer */
+ memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
+ ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
- ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+ ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
ktx->ktx_nid = nid;
ktx->ktx_state = KTX_FORWARDING;
ktx->ktx_args[0] = fwd;
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
- nob <= KQSW_TX_BUFFER_SIZE)
+ if (nob <= KQSW_TX_MAXCONTIG)
{
- /* send from ktx's pre-mapped contiguous buffer? */
- lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob);
+ /* send payload from ktx's pre-mapped contiguous buffer */
#if MULTIRAIL_EKC
ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
- 0, nob);
+ 0, KQSW_HDR_SIZE + nob);
#else
ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags[0].Len = nob;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
#endif
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
+ if (nob > 0)
+ lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
+ niov, kiov, 0, nob);
}
else
{
- /* zero copy */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
- rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov);
+ /* zero copy payload */
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+ rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
if (rc != 0)
goto failed;
-
- ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
}
rc = kqswnal_launch (ktx);
if (error != 0)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
void
kqswnal_rx (kqswnal_rx_t *krx)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
+ int payload_nob;
int nob;
int niov;
return;
}
- /* NB forwarding may destroy iov; rebuild every time */
- for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
- {
- LASSERT (niov < krx->krx_npages);
- krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
- krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
+ nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
+ niov = 0;
+ if (nob > 0) {
+ krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
+ krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
+ niov = 1;
+ nob -= PAGE_SIZE - KQSW_HDR_SIZE;
+
+ while (nob > 0) {
+ LASSERT (niov < krx->krx_npages);
+
+ krx->krx_kiov[niov].kiov_offset = 0;
+ krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
+ niov++;
+ nob -= PAGE_SIZE;
+ }
}
- kpr_fwd_init (&krx->krx_fwd, dest_nid,
- krx->krx_nob, niov, krx->krx_iov,
+ kpr_fwd_init (&krx->krx_fwd, dest_nid,
+ hdr, payload_nob, niov, krx->krx_kiov,
kqswnal_fwd_callback, krx);
kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
void
kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
", dpid %d, spid %d, type %d\n",
size_t rlen)
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+ char *buffer = page_address(krx->krx_kiov[0].kiov_page);
int page;
char *page_ptr;
int page_nob;
#if KQSW_CHECKSUM
kqsw_csum_t senders_csum;
kqsw_csum_t payload_csum = 0;
- kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
- sizeof(ptl_hdr_t));
+ kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
size_t csum_len = mlen;
int csum_frags = 0;
int csum_nob = 0;
atomic_inc (&csum_counter);
- memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
- sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
+ memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
if (senders_csum != hdr_csum)
kqswnal_csum_error (krx, 1);
#endif
if (mlen != 0) {
page = 0;
- page_ptr = ((char *) page_address(krx->krx_pages[0])) +
- KQSW_HDR_SIZE;
+ page_ptr = buffer + KQSW_HDR_SIZE;
page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
LASSERT (niov > 0);
{
page++;
LASSERT (page < krx->krx_npages);
- page_ptr = page_address(krx->krx_pages[page]);
+ page_ptr = page_address(krx->krx_kiov[page].kiov_page);
page_nob = PAGE_SIZE;
}
}
#if KQSW_CHECKSUM
- memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
- sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
+ memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
+ sizeof(kqsw_csum_t));
if (csum_len != rlen)
CERROR("Unable to checksum data in user's buffer\n");
void
ksocknal_free_fmbs (ksock_fmb_pool_t *p)
{
+ int npages = p->fmp_buff_pages;
ksock_fmb_t *fmb;
int i;
fmb = list_entry(p->fmp_idle_fmbs.next,
ksock_fmb_t, fmb_list);
- for (i = 0; i < fmb->fmb_npages; i++)
- if (fmb->fmb_pages[i] != NULL)
- __free_page(fmb->fmb_pages[i]);
-
+ for (i = 0; i < npages; i++)
+ if (fmb->fmb_kiov[i].kiov_page != NULL)
+ __free_page(fmb->fmb_kiov[i].kiov_page);
+
list_del(&fmb->fmb_list);
- PORTAL_FREE(fmb, sizeof(*fmb));
+ PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
}
}
spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
+ ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
+ ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
SOCKNAL_LARGE_FWD_NMSGS); i++) {
- ksock_fmb_t *fmb;
+ ksock_fmb_t *fmb;
+ ksock_fmb_pool_t *pool;
+
+
+ if (i < SOCKNAL_SMALL_FWD_NMSGS)
+ pool = &ksocknal_data.ksnd_small_fmp;
+ else
+ pool = &ksocknal_data.ksnd_large_fmp;
- PORTAL_ALLOC(fmb, sizeof(*fmb));
+ PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
+ fmb_kiov[pool->fmp_buff_pages]));
if (fmb == NULL) {
ksocknal_module_fini();
return (-ENOMEM);
}
- if (i < SOCKNAL_SMALL_FWD_NMSGS) {
- fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
- fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
- } else {
- fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
- fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
- }
-
- for (j = 0; j < fmb->fmb_npages; j++) {
- fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
+ fmb->fmb_pool = pool;
+
+ for (j = 0; j < pool->fmp_buff_pages; j++) {
+ fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
- if (fmb->fmb_pages[j] == NULL) {
+ if (fmb->fmb_kiov[j].kiov_page == NULL) {
ksocknal_module_fini ();
return (-ENOMEM);
}
- LASSERT(page_address(fmb->fmb_pages[j]) != NULL);
+ LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
}
- list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
+ list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
}
}
#define SOCKNAL_SMALL_FWD_PAGES 1 /* # pages in a small message fwd buffer */
-#define SOCKNAL_LARGE_FWD_PAGES (PAGE_ALIGN (sizeof (ptl_hdr_t) + PTL_MTU) >> PAGE_SHIFT)
+#define SOCKNAL_LARGE_FWD_PAGES (PAGE_ALIGN(PTL_MTU) >> PAGE_SHIFT)
/* # pages in a large message fwd buffer */
#define SOCKNAL_RESCHED 100 /* # scheduler loops before reschedule */
struct list_head fmp_idle_fmbs; /* free buffers */
struct list_head fmp_blocked_conns; /* connections waiting for a buffer */
int fmp_nactive_fmbs; /* # buffers in use */
+ int fmp_buff_pages; /* # pages per buffer */
} ksock_fmb_pool_t;
#define SOCKNAL_INIT_ALL 3
/* A packet just assembled for transmission is represented by 1 or more
- * struct iovec fragments and 0 or more ptl_kiov_t fragments. Forwarded
- * messages, or messages from an MD with PTL_MD_KIOV _not_ set have 0
- * ptl_kiov_t fragments. Messages from an MD with PTL_MD_KIOV set, have 1
- * struct iovec fragment (the header) and up to PTL_MD_MAX_IOV ptl_kiov_t
- * fragments.
+ * struct iovec fragments (the first frag contains the portals header),
+ * followed by 0 or more ptl_kiov_t fragments.
*
* On the receive side, initially 1 struct iovec fragment is posted for
- * receive (the header). Once the header has been received, if the message
- * requires forwarding or will be received into mapped memory, up to
- * PTL_MD_MAX_IOV struct iovec fragments describe the target memory.
- * Otherwise up to PTL_MD_MAX_IOV ptl_kiov_t fragments are used.
- */
+ * receive (the header). Once the header has been received, the payload is
+ * received into either struct iovec or ptl_kiov_t fragments, depending on
+ * what the header matched or whether the message needs forwarding. */
struct ksock_conn; /* forward ref */
struct ksock_peer; /* forward ref */
#endif
} ksock_tx_t;
+typedef struct /* forwarded packet */
+{
+ ksock_tx_t ftx_tx; /* send info */
+ struct iovec ftx_iov; /* hdr iovec */
+} ksock_ftx_t;
+
#define KSOCK_ZCCD_2_TX(ptr) list_entry (ptr, ksock_tx_t, tx_zccd)
/* network zero copy callback descriptor embedded in ksock_tx_t */
{ /* (socknal->router) */
struct list_head fmb_list; /* queue idle */
kpr_fwd_desc_t fmb_fwd; /* router's descriptor */
- int fmb_npages; /* # pages allocated */
ksock_fmb_pool_t *fmb_pool; /* owning pool */
struct ksock_peer *fmb_peer; /* peer received from */
- struct page *fmb_pages[SOCKNAL_LARGE_FWD_PAGES];
- struct iovec fmb_iov[SOCKNAL_LARGE_FWD_PAGES];
+ ptl_hdr_t fmb_hdr; /* message header */
+ ptl_kiov_t fmb_kiov[0]; /* payload frags */
} ksock_fmb_t;
/* space for the rx frag descriptors; we either read a single contiguous
- * header, or PTL_MD_MAX_IOV frags of payload of either type. */
+ * header, or up to PTL_MD_MAX_IOV frags of payload of either type. */
typedef union {
struct iovec iov[PTL_MD_MAX_IOV];
ptl_kiov_t kiov[PTL_MD_MAX_IOV];
PORTAL_FREE(ltx, ltx->ltx_desc_size);
}
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
struct page *
ksocknal_kvaddr_to_page (unsigned long vaddr)
{
int more = (tx->tx_niov > 1) ||
(tx->tx_nkiov > 0) ||
(!list_empty (&conn->ksnc_tx_queue));
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
int offset = vaddr & (PAGE_SIZE - 1);
int zcsize = MIN (fragsize, PAGE_SIZE - offset);
struct page *page;
LASSERT (fragsize <= tx->tx_resid);
LASSERT (tx->tx_niov > 0);
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
(sock->sk->route_caps & NETIF_F_SG) &&
(sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
/* Find the conn with the shortest tx queue */
list_for_each (tmp, &peer->ksnp_conns) {
ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
- int nob = atomic_read(&c->ksnc_tx_nob);
+ int nob = atomic_read(&c->ksnc_tx_nob) +
+ c->ksnc_sock->sk->sk_wmem_queued;
LASSERT (!c->ksnc_closing);
ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
{
ptl_nid_t nid = fwd->kprfd_gateway_nid;
- ksock_tx_t *tx = (ksock_tx_t *)&fwd->kprfd_scratch;
+ ksock_ftx_t *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
int rc;
CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
if (nid == ksocknal_lib.ni.nid)
nid = fwd->kprfd_target_nid;
- tx->tx_isfwd = 1; /* This is a forwarding packet */
- tx->tx_nob = fwd->kprfd_nob;
- tx->tx_niov = fwd->kprfd_niov;
- tx->tx_iov = fwd->kprfd_iov;
- tx->tx_nkiov = 0;
- tx->tx_kiov = NULL;
+ /* setup iov for hdr */
+ ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
+ ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
+
+ ftx->ftx_tx.tx_isfwd = 1; /* This is a forwarding packet */
+ ftx->ftx_tx.tx_nob = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
+ ftx->ftx_tx.tx_niov = 1;
+ ftx->ftx_tx.tx_iov = &ftx->ftx_iov;
+ ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
+ ftx->ftx_tx.tx_kiov = fwd->kprfd_kiov;
- rc = ksocknal_launch_packet (tx, nid);
+ rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
if (rc != 0)
kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
}
{
ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
ksock_fmb_pool_t *fmp = fmb->fmb_pool;
- ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
ksock_conn_t *conn = NULL;
ksock_sched_t *sched;
unsigned long flags;
ksocknal_get_idle_fmb (ksock_conn_t *conn)
{
int payload_nob = conn->ksnc_rx_nob_left;
- int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
unsigned long flags;
ksock_fmb_pool_t *pool;
ksock_fmb_t *fmb;
LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
- if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
+ if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
pool = &ksocknal_data.ksnd_small_fmp;
else
pool = &ksocknal_data.ksnd_large_fmp;
int
ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
{
- int payload_nob = conn->ksnc_rx_nob_left;
- int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
+ int payload_nob = conn->ksnc_rx_nob_left;
ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
- int niov; /* at least the header */
- int nob;
+ int niov = 0;
+ int nob = payload_nob;
LASSERT (conn->ksnc_rx_scheduled);
LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
LASSERT (payload_nob >= 0);
- LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
+ LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
-
- /* Got a forwarding buffer; copy the header we just read into the
- * forwarding buffer. If there's payload, start reading reading it
- * into the buffer, otherwise the forwarding buffer can be kicked
- * off immediately.
- *
- * NB fmb->fmb_iov spans the WHOLE packet.
- * conn->ksnc_rx_iov spans just the payload.
- */
- fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
-
- /* copy header */
- memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
+ LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
/* Take a ref on the conn's peer to prevent module unload before
- * forwarding completes. NB we ref peer and not conn since because
- * all refs on conn after it has been closed must remove themselves
- * in finite time */
+ * forwarding completes. */
fmb->fmb_peer = conn->ksnc_peer;
atomic_inc (&conn->ksnc_peer->ksnp_refcount);
- if (payload_nob == 0) { /* got complete packet already */
- CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
- conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
- dest_nid, packet_nob);
+ /* Copy the header we just read into the forwarding buffer. If
+ * there's payload, start reading reading it into the buffer,
+ * otherwise the forwarding buffer can be kicked off
+ * immediately. */
+ fmb->fmb_hdr = conn->ksnc_hdr;
- fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
+ while (nob > 0) {
+ LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
+ LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
+ fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
+ nob -= PAGE_SIZE;
+ niov++;
+ }
+
+ kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
+ payload_nob, niov, fmb->fmb_kiov,
+ ksocknal_fmb_callback, fmb);
- kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
- packet_nob, 1, fmb->fmb_iov,
- ksocknal_fmb_callback, fmb);
+ if (payload_nob == 0) { /* got complete packet already */
+ CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
+ conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
- /* forward it now */
kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
ksocknal_new_packet (conn, 0); /* on to next packet */
return (1);
}
- niov = 1;
- if (packet_nob <= PAGE_SIZE) { /* whole packet fits in first page */
- fmb->fmb_iov[0].iov_len = packet_nob;
- } else {
- fmb->fmb_iov[0].iov_len = PAGE_SIZE;
- nob = packet_nob - PAGE_SIZE;
-
- do {
- LASSERT (niov < fmb->fmb_npages);
- fmb->fmb_iov[niov].iov_base =
- page_address (fmb->fmb_pages[niov]);
- fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
- nob -= PAGE_SIZE;
- niov++;
- } while (nob > 0);
- }
-
- kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
- packet_nob, niov, fmb->fmb_iov,
- ksocknal_fmb_callback, fmb);
-
conn->ksnc_cookie = fmb; /* stash fmb for later */
conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
- /* payload is desc's iov-ed buffer, but skipping the hdr */
- LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
- sizeof (struct iovec));
-
- conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
- conn->ksnc_rx_iov[0].iov_base =
- (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
- sizeof (ptl_hdr_t));
- conn->ksnc_rx_iov[0].iov_len =
- fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
-
- if (niov > 1)
- memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
- (niov - 1) * sizeof (struct iovec));
-
- conn->ksnc_rx_niov = niov;
+ /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
+ * buffer */
+ LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
+ conn->ksnc_rx_niov = 0;
+ conn->ksnc_rx_nkiov = niov;
+ conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
+ memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
+
CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
return (0);
CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
target_nid, src_ne->kpne_interface.kprni_nalid);
- LASSERT (nob >= sizeof (ptl_hdr_t)); /* at least got a packet header */
- LASSERT (nob == lib_iov_nob (fwd->kprfd_niov, fwd->kprfd_iov));
+ LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov));
atomic_inc (&kpr_queue_depth);
atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
kpr_fwd_packets++; /* (loose) stats accounting */
- kpr_fwd_bytes += nob;
+ kpr_fwd_bytes += nob + sizeof(ptl_hdr_t);
if (src_ne->kpne_shutdown) /* caller is shutting down */
goto out;
struct list_head kprfd_list; /* stash in queues (routing target can use) */
ptl_nid_t kprfd_target_nid; /* final destination NID */
ptl_nid_t kprfd_gateway_nid; /* gateway NID */
- int kprfd_nob; /* # message bytes (including header) */
- int kprfd_niov; /* # message frags (including header) */
- struct iovec *kprfd_iov; /* message fragments */
- void *kprfd_router_arg; // originating NAL's router arg
+ ptl_hdr_t *kprfd_hdr; /* header in wire byte order */
+ int kprfd_nob; /* # payload bytes */
+ int kprfd_niov; /* # payload frags */
+ ptl_kiov_t *kprfd_kiov; /* payload fragments */
+ void *kprfd_router_arg; /* originating NAL's router arg */
kpr_fwd_callback_t kprfd_callback; /* completion callback */
void *kprfd_callback_arg; /* completion callback arg */
- kprfd_scratch_t kprfd_scratch; // scratchpad for routing targets
+ kprfd_scratch_t kprfd_scratch; /* scratchpad for routing targets */
} kpr_fwd_desc_t;
typedef void (*kpr_fwd_t)(void *arg, kpr_fwd_desc_t *fwd);
}
static inline void
-kpr_fwd_init (kpr_fwd_desc_t *fwd, ptl_nid_t nid,
- int nob, int niov, struct iovec *iov,
+kpr_fwd_init (kpr_fwd_desc_t *fwd, ptl_nid_t nid, ptl_hdr_t *hdr,
+ int nob, int niov, ptl_kiov_t *kiov,
kpr_fwd_callback_t callback, void *callback_arg)
{
fwd->kprfd_target_nid = nid;
fwd->kprfd_gateway_nid = nid;
+ fwd->kprfd_hdr = hdr;
fwd->kprfd_nob = nob;
fwd->kprfd_niov = niov;
- fwd->kprfd_iov = iov;
+ fwd->kprfd_kiov = kiov;
fwd->kprfd_callback = callback;
fwd->kprfd_callback_arg = callback_arg;
}
for (i = 0; i < KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE; i++) {
kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
- /* If krx_pages[0] got allocated, it got mapped.
+ /* If krx_kiov[0].kiov_page got allocated, it got mapped.
* NB subsequent pages get merged */
- if (krx->krx_pages[0] != NULL)
+ if (krx->krx_kiov[0].kiov_page != NULL)
ep_dvma_unload(kqswnal_data.kqn_ep,
kqswnal_data.kqn_ep_rx_nmh,
&krx->krx_elanbuffer);
kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
for (j = 0; j < krx->krx_npages; j++)
- if (krx->krx_pages[j] != NULL)
- __free_page (krx->krx_pages[j]);
+ if (krx->krx_kiov[j].kiov_page != NULL)
+ __free_page (krx->krx_kiov[j].kiov_page);
}
PORTAL_FREE(kqswnal_data.kqn_rxds,
LASSERT (krx->krx_npages > 0);
for (j = 0; j < krx->krx_npages; j++)
{
- krx->krx_pages[j] = alloc_page(GFP_KERNEL);
- if (krx->krx_pages[j] == NULL)
- {
+ struct page *page = alloc_page(GFP_KERNEL);
+
+ if (page == NULL) {
kqswnal_finalise ();
return (-ENOMEM);
}
- LASSERT(page_address(krx->krx_pages[j]) != NULL);
+ krx->krx_kiov[j].kiov_page = page;
+ LASSERT(page_address(page) != NULL);
#if MULTIRAIL_EKC
ep_dvma_load(kqswnal_data.kqn_ep, NULL,
- page_address(krx->krx_pages[j]),
+ page_address(page),
PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
elan_page_idx, &all_rails, &elanbuffer);
#else
elan3_dvma_kaddr_load(kqswnal_data.kqn_ep->DmaState,
kqswnal_data.kqn_eprxdmahandle,
- page_address(krx->krx_pages[j]),
+ page_address(page),
PAGE_SIZE, elan_page_idx,
&elanbuffer);
if (j == 0)
int krx_rpc_reply_sent; /* rpc reply sent */
atomic_t krx_refcount; /* how to tell when rpc is done */
kpr_fwd_desc_t krx_fwd; /* embedded forwarding descriptor */
- struct page *krx_pages[KQSW_NRXMSGPAGES_LARGE]; /* pages allocated */
- struct iovec krx_iov[KQSW_NRXMSGPAGES_LARGE]; /* iovec for forwarding */
+ ptl_kiov_t krx_kiov[KQSW_NRXMSGPAGES_LARGE]; /* buffer frags */
} kqswnal_rx_t;
typedef struct
int offset, int nob)
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
- char *buffer = (char *)page_address(krx->krx_pages[0]);
+ char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
int rc;
#if MULTIRAIL_EKC
}
memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
#endif
-
+
if (kqswnal_data.kqn_optimized_gets &&
type == PTL_MSG_GET && /* doing a GET */
nid == targetnid) { /* not forwarding */
{
int rc;
kqswnal_tx_t *ktx;
- struct iovec *iov = fwd->kprfd_iov;
+ ptl_kiov_t *kiov = fwd->kprfd_kiov;
int niov = fwd->kprfd_niov;
int nob = fwd->kprfd_nob;
ptl_nid_t nid = fwd->kprfd_gateway_nid;
LBUG ();
#endif
/* The router wants this NAL to forward a packet */
- CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
+ CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
fwd, nid, niov, nob);
- LASSERT (niov > 0);
-
ktx = kqswnal_get_idle_tx (fwd, 0);
if (ktx == NULL) /* can't get txd right now */
return; /* fwd will be scheduled when tx desc freed */
goto failed;
}
- if (nob > KQSW_NRXMSGBYTES_LARGE) {
- CERROR ("Can't forward [%p] to "LPX64
- ": size %d bigger than max packet size %ld\n",
- fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
- rc = -EMSGSIZE;
- goto failed;
- }
+ /* copy hdr into pre-mapped buffer */
+ memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
+ ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
- ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+ ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
ktx->ktx_nid = nid;
ktx->ktx_state = KTX_FORWARDING;
ktx->ktx_args[0] = fwd;
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
- nob <= KQSW_TX_BUFFER_SIZE)
+ if (nob <= KQSW_TX_MAXCONTIG)
{
- /* send from ktx's pre-mapped contiguous buffer? */
- lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob);
+ /* send payload from ktx's pre-mapped contiguous buffer */
#if MULTIRAIL_EKC
ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
- 0, nob);
+ 0, KQSW_HDR_SIZE + nob);
#else
ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags[0].Len = nob;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
#endif
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
+ if (nob > 0)
+ lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
+ niov, kiov, 0, nob);
}
else
{
- /* zero copy */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
- rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov);
+ /* zero copy payload */
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+ rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
if (rc != 0)
goto failed;
-
- ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
}
rc = kqswnal_launch (ktx);
if (error != 0)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
void
kqswnal_rx (kqswnal_rx_t *krx)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
+ int payload_nob;
int nob;
int niov;
return;
}
- /* NB forwarding may destroy iov; rebuild every time */
- for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
- {
- LASSERT (niov < krx->krx_npages);
- krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
- krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
+ nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
+ niov = 0;
+ if (nob > 0) {
+ krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
+ krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
+ niov = 1;
+ nob -= PAGE_SIZE - KQSW_HDR_SIZE;
+
+ while (nob > 0) {
+ LASSERT (niov < krx->krx_npages);
+
+ krx->krx_kiov[niov].kiov_offset = 0;
+ krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
+ niov++;
+ nob -= PAGE_SIZE;
+ }
}
- kpr_fwd_init (&krx->krx_fwd, dest_nid,
- krx->krx_nob, niov, krx->krx_iov,
+ kpr_fwd_init (&krx->krx_fwd, dest_nid,
+ hdr, payload_nob, niov, krx->krx_kiov,
kqswnal_fwd_callback, krx);
kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
void
kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
", dpid %d, spid %d, type %d\n",
size_t rlen)
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+ char *buffer = page_address(krx->krx_kiov[0].kiov_page);
int page;
char *page_ptr;
int page_nob;
#if KQSW_CHECKSUM
kqsw_csum_t senders_csum;
kqsw_csum_t payload_csum = 0;
- kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
- sizeof(ptl_hdr_t));
+ kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
size_t csum_len = mlen;
int csum_frags = 0;
int csum_nob = 0;
atomic_inc (&csum_counter);
- memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
- sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
+ memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
if (senders_csum != hdr_csum)
kqswnal_csum_error (krx, 1);
#endif
if (mlen != 0) {
page = 0;
- page_ptr = ((char *) page_address(krx->krx_pages[0])) +
- KQSW_HDR_SIZE;
+ page_ptr = buffer + KQSW_HDR_SIZE;
page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
LASSERT (niov > 0);
{
page++;
LASSERT (page < krx->krx_npages);
- page_ptr = page_address(krx->krx_pages[page]);
+ page_ptr = page_address(krx->krx_kiov[page].kiov_page);
page_nob = PAGE_SIZE;
}
}
#if KQSW_CHECKSUM
- memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
- sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
+ memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
+ sizeof(kqsw_csum_t));
if (csum_len != rlen)
CERROR("Unable to checksum data in user's buffer\n");
void
ksocknal_free_fmbs (ksock_fmb_pool_t *p)
{
+ int npages = p->fmp_buff_pages;
ksock_fmb_t *fmb;
int i;
fmb = list_entry(p->fmp_idle_fmbs.next,
ksock_fmb_t, fmb_list);
- for (i = 0; i < fmb->fmb_npages; i++)
- if (fmb->fmb_pages[i] != NULL)
- __free_page(fmb->fmb_pages[i]);
-
+ for (i = 0; i < npages; i++)
+ if (fmb->fmb_kiov[i].kiov_page != NULL)
+ __free_page(fmb->fmb_kiov[i].kiov_page);
+
list_del(&fmb->fmb_list);
- PORTAL_FREE(fmb, sizeof(*fmb));
+ PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
}
}
spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
+ ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
+ ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
SOCKNAL_LARGE_FWD_NMSGS); i++) {
- ksock_fmb_t *fmb;
+ ksock_fmb_t *fmb;
+ ksock_fmb_pool_t *pool;
+
+
+ if (i < SOCKNAL_SMALL_FWD_NMSGS)
+ pool = &ksocknal_data.ksnd_small_fmp;
+ else
+ pool = &ksocknal_data.ksnd_large_fmp;
- PORTAL_ALLOC(fmb, sizeof(*fmb));
+ PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
+ fmb_kiov[pool->fmp_buff_pages]));
if (fmb == NULL) {
ksocknal_module_fini();
return (-ENOMEM);
}
- if (i < SOCKNAL_SMALL_FWD_NMSGS) {
- fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
- fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
- } else {
- fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
- fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
- }
-
- for (j = 0; j < fmb->fmb_npages; j++) {
- fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
+ fmb->fmb_pool = pool;
+
+ for (j = 0; j < pool->fmp_buff_pages; j++) {
+ fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
- if (fmb->fmb_pages[j] == NULL) {
+ if (fmb->fmb_kiov[j].kiov_page == NULL) {
ksocknal_module_fini ();
return (-ENOMEM);
}
- LASSERT(page_address(fmb->fmb_pages[j]) != NULL);
+ LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
}
- list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
+ list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
}
}
#define SOCKNAL_SMALL_FWD_PAGES 1 /* # pages in a small message fwd buffer */
-#define SOCKNAL_LARGE_FWD_PAGES (PAGE_ALIGN (sizeof (ptl_hdr_t) + PTL_MTU) >> PAGE_SHIFT)
+#define SOCKNAL_LARGE_FWD_PAGES (PAGE_ALIGN(PTL_MTU) >> PAGE_SHIFT)
/* # pages in a large message fwd buffer */
#define SOCKNAL_RESCHED 100 /* # scheduler loops before reschedule */
struct list_head fmp_idle_fmbs; /* free buffers */
struct list_head fmp_blocked_conns; /* connections waiting for a buffer */
int fmp_nactive_fmbs; /* # buffers in use */
+ int fmp_buff_pages; /* # pages per buffer */
} ksock_fmb_pool_t;
#define SOCKNAL_INIT_ALL 3
/* A packet just assembled for transmission is represented by 1 or more
- * struct iovec fragments and 0 or more ptl_kiov_t fragments. Forwarded
- * messages, or messages from an MD with PTL_MD_KIOV _not_ set have 0
- * ptl_kiov_t fragments. Messages from an MD with PTL_MD_KIOV set, have 1
- * struct iovec fragment (the header) and up to PTL_MD_MAX_IOV ptl_kiov_t
- * fragments.
+ * struct iovec fragments (the first frag contains the portals header),
+ * followed by 0 or more ptl_kiov_t fragments.
*
* On the receive side, initially 1 struct iovec fragment is posted for
- * receive (the header). Once the header has been received, if the message
- * requires forwarding or will be received into mapped memory, up to
- * PTL_MD_MAX_IOV struct iovec fragments describe the target memory.
- * Otherwise up to PTL_MD_MAX_IOV ptl_kiov_t fragments are used.
- */
+ * receive (the header). Once the header has been received, the payload is
+ * received into either struct iovec or ptl_kiov_t fragments, depending on
+ * what the header matched or whether the message needs forwarding. */
struct ksock_conn; /* forward ref */
struct ksock_peer; /* forward ref */
#endif
} ksock_tx_t;
+typedef struct /* forwarded packet */
+{
+ ksock_tx_t ftx_tx; /* send info */
+ struct iovec ftx_iov; /* hdr iovec */
+} ksock_ftx_t;
+
#define KSOCK_ZCCD_2_TX(ptr) list_entry (ptr, ksock_tx_t, tx_zccd)
/* network zero copy callback descriptor embedded in ksock_tx_t */
{ /* (socknal->router) */
struct list_head fmb_list; /* queue idle */
kpr_fwd_desc_t fmb_fwd; /* router's descriptor */
- int fmb_npages; /* # pages allocated */
ksock_fmb_pool_t *fmb_pool; /* owning pool */
struct ksock_peer *fmb_peer; /* peer received from */
- struct page *fmb_pages[SOCKNAL_LARGE_FWD_PAGES];
- struct iovec fmb_iov[SOCKNAL_LARGE_FWD_PAGES];
+ ptl_hdr_t fmb_hdr; /* message header */
+ ptl_kiov_t fmb_kiov[0]; /* payload frags */
} ksock_fmb_t;
/* space for the rx frag descriptors; we either read a single contiguous
- * header, or PTL_MD_MAX_IOV frags of payload of either type. */
+ * header, or up to PTL_MD_MAX_IOV frags of payload of either type. */
typedef union {
struct iovec iov[PTL_MD_MAX_IOV];
ptl_kiov_t kiov[PTL_MD_MAX_IOV];
PORTAL_FREE(ltx, ltx->ltx_desc_size);
}
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
struct page *
ksocknal_kvaddr_to_page (unsigned long vaddr)
{
int more = (tx->tx_niov > 1) ||
(tx->tx_nkiov > 0) ||
(!list_empty (&conn->ksnc_tx_queue));
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
int offset = vaddr & (PAGE_SIZE - 1);
int zcsize = MIN (fragsize, PAGE_SIZE - offset);
struct page *page;
LASSERT (fragsize <= tx->tx_resid);
LASSERT (tx->tx_niov > 0);
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
(sock->sk->route_caps & NETIF_F_SG) &&
(sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
/* Find the conn with the shortest tx queue */
list_for_each (tmp, &peer->ksnp_conns) {
ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
- int nob = atomic_read(&c->ksnc_tx_nob);
+ int nob = atomic_read(&c->ksnc_tx_nob) +
+ c->ksnc_sock->sk->sk_wmem_queued;
LASSERT (!c->ksnc_closing);
ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
{
ptl_nid_t nid = fwd->kprfd_gateway_nid;
- ksock_tx_t *tx = (ksock_tx_t *)&fwd->kprfd_scratch;
+ ksock_ftx_t *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
int rc;
CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
if (nid == ksocknal_lib.ni.nid)
nid = fwd->kprfd_target_nid;
- tx->tx_isfwd = 1; /* This is a forwarding packet */
- tx->tx_nob = fwd->kprfd_nob;
- tx->tx_niov = fwd->kprfd_niov;
- tx->tx_iov = fwd->kprfd_iov;
- tx->tx_nkiov = 0;
- tx->tx_kiov = NULL;
+ /* setup iov for hdr */
+ ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
+ ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
+
+ ftx->ftx_tx.tx_isfwd = 1; /* This is a forwarding packet */
+ ftx->ftx_tx.tx_nob = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
+ ftx->ftx_tx.tx_niov = 1;
+ ftx->ftx_tx.tx_iov = &ftx->ftx_iov;
+ ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
+ ftx->ftx_tx.tx_kiov = fwd->kprfd_kiov;
- rc = ksocknal_launch_packet (tx, nid);
+ rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
if (rc != 0)
kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
}
{
ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
ksock_fmb_pool_t *fmp = fmb->fmb_pool;
- ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
ksock_conn_t *conn = NULL;
ksock_sched_t *sched;
unsigned long flags;
ksocknal_get_idle_fmb (ksock_conn_t *conn)
{
int payload_nob = conn->ksnc_rx_nob_left;
- int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
unsigned long flags;
ksock_fmb_pool_t *pool;
ksock_fmb_t *fmb;
LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
- if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
+ if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
pool = &ksocknal_data.ksnd_small_fmp;
else
pool = &ksocknal_data.ksnd_large_fmp;
int
ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
{
- int payload_nob = conn->ksnc_rx_nob_left;
- int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
+ int payload_nob = conn->ksnc_rx_nob_left;
ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
- int niov; /* at least the header */
- int nob;
+ int niov = 0;
+ int nob = payload_nob;
LASSERT (conn->ksnc_rx_scheduled);
LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
LASSERT (payload_nob >= 0);
- LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
+ LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
-
- /* Got a forwarding buffer; copy the header we just read into the
- * forwarding buffer. If there's payload, start reading reading it
- * into the buffer, otherwise the forwarding buffer can be kicked
- * off immediately.
- *
- * NB fmb->fmb_iov spans the WHOLE packet.
- * conn->ksnc_rx_iov spans just the payload.
- */
- fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
-
- /* copy header */
- memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
+ LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
/* Take a ref on the conn's peer to prevent module unload before
- * forwarding completes. NB we ref peer and not conn since because
- * all refs on conn after it has been closed must remove themselves
- * in finite time */
+ * forwarding completes. */
fmb->fmb_peer = conn->ksnc_peer;
atomic_inc (&conn->ksnc_peer->ksnp_refcount);
- if (payload_nob == 0) { /* got complete packet already */
- CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
- conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
- dest_nid, packet_nob);
+ /* Copy the header we just read into the forwarding buffer. If
+ * there's payload, start reading reading it into the buffer,
+ * otherwise the forwarding buffer can be kicked off
+ * immediately. */
+ fmb->fmb_hdr = conn->ksnc_hdr;
- fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
+ while (nob > 0) {
+ LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
+ LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
+ fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
+ nob -= PAGE_SIZE;
+ niov++;
+ }
+
+ kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
+ payload_nob, niov, fmb->fmb_kiov,
+ ksocknal_fmb_callback, fmb);
- kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
- packet_nob, 1, fmb->fmb_iov,
- ksocknal_fmb_callback, fmb);
+ if (payload_nob == 0) { /* got complete packet already */
+ CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
+ conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
- /* forward it now */
kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
ksocknal_new_packet (conn, 0); /* on to next packet */
return (1);
}
- niov = 1;
- if (packet_nob <= PAGE_SIZE) { /* whole packet fits in first page */
- fmb->fmb_iov[0].iov_len = packet_nob;
- } else {
- fmb->fmb_iov[0].iov_len = PAGE_SIZE;
- nob = packet_nob - PAGE_SIZE;
-
- do {
- LASSERT (niov < fmb->fmb_npages);
- fmb->fmb_iov[niov].iov_base =
- page_address (fmb->fmb_pages[niov]);
- fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
- nob -= PAGE_SIZE;
- niov++;
- } while (nob > 0);
- }
-
- kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
- packet_nob, niov, fmb->fmb_iov,
- ksocknal_fmb_callback, fmb);
-
conn->ksnc_cookie = fmb; /* stash fmb for later */
conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
- /* payload is desc's iov-ed buffer, but skipping the hdr */
- LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
- sizeof (struct iovec));
-
- conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
- conn->ksnc_rx_iov[0].iov_base =
- (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
- sizeof (ptl_hdr_t));
- conn->ksnc_rx_iov[0].iov_len =
- fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
-
- if (niov > 1)
- memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
- (niov - 1) * sizeof (struct iovec));
-
- conn->ksnc_rx_niov = niov;
+ /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
+ * buffer */
+ LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
+ conn->ksnc_rx_niov = 0;
+ conn->ksnc_rx_nkiov = niov;
+ conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
+ memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
+
CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
return (0);
CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
target_nid, src_ne->kpne_interface.kprni_nalid);
- LASSERT (nob >= sizeof (ptl_hdr_t)); /* at least got a packet header */
- LASSERT (nob == lib_iov_nob (fwd->kprfd_niov, fwd->kprfd_iov));
+ LASSERT (nob == lib_kiov_nob (fwd->kprfd_niov, fwd->kprfd_kiov));
atomic_inc (&kpr_queue_depth);
atomic_inc (&src_ne->kpne_refcount); /* source nal is busy until fwd completes */
kpr_fwd_packets++; /* (loose) stats accounting */
- kpr_fwd_bytes += nob;
+ kpr_fwd_bytes += nob + sizeof(ptl_hdr_t);
if (src_ne->kpne_shutdown) /* caller is shutting down */
goto out;